diff --git a/packages/backend/server/src/modules/doc/manager.ts b/packages/backend/server/src/modules/doc/manager.ts index 326902b35f..a1fd0faf4b 100644 --- a/packages/backend/server/src/modules/doc/manager.ts +++ b/packages/backend/server/src/modules/doc/manager.ts @@ -11,6 +11,7 @@ import { chunk } from 'lodash-es'; import { defer, retry } from 'rxjs'; import { applyUpdate, + decodeStateVector, Doc, encodeStateAsUpdate, encodeStateVector, @@ -40,6 +41,36 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean { return compare(yBinary, yBinary2, true); } +/** + * Detect whether rhs state is newer than lhs state. + * + * How could we tell a state is newer: + * + * i. if the state vector size is larger, it's newer + * ii. if the state vector size is same, compare each client's state + */ +function isStateNewer(lhs: Buffer, rhs: Buffer): boolean { + const lhsVector = decodeStateVector(lhs); + const rhsVector = decodeStateVector(rhs); + + if (lhsVector.size < rhsVector.size) { + return true; + } + + for (const [client, state] of lhsVector) { + const rstate = rhsVector.get(client); + if (!rstate) { + return false; + } + + if (state < rstate) { + return true; + } + } + + return false; +} + function isEmptyBuffer(buf: Buffer): boolean { return ( buf.length === 0 || @@ -374,23 +405,17 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { } const { id, workspaceId } = candidate; - // acquire lock - const ok = await this.lockUpdatesForAutoSquash(workspaceId, id); - if (!ok) { - return; - } - - try { - await this._get(workspaceId, id); - } catch (e) { - this.logger.error( - `Failed to apply updates for workspace: ${workspaceId}, guid: ${id}` - ); - this.logger.error(e); - } finally { - await this.unlockUpdatesForAutoSquash(workspaceId, id); - } + await this.lockUpdatesForAutoSquash(workspaceId, id, async () => { + try { + await this._get(workspaceId, id); + } catch (e) { + this.logger.error( + `Failed to apply updates for workspace: ${workspaceId}, guid: ${id}` + ); + this.logger.error(e); + } + }); } private async getAutoSquashCandidate() { @@ -414,34 +439,67 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { doc: Doc, initialSeq?: number ) { - const blob = Buffer.from(encodeStateAsUpdate(doc)); - const state = Buffer.from(encodeStateVector(doc)); + return this.lockSnapshotForUpsert(workspaceId, guid, async () => { + const blob = Buffer.from(encodeStateAsUpdate(doc)); - if (isEmptyBuffer(blob)) { - return; - } + if (isEmptyBuffer(blob)) { + return false; + } - await this.db.snapshot.upsert({ - select: { - seq: true, - }, - where: { - id_workspaceId: { - id: guid, - workspaceId, - }, - }, - create: { - id: guid, - workspaceId, - blob, - state, - seq: initialSeq, - }, - update: { - blob, - state, - }, + const state = Buffer.from(encodeStateVector(doc)); + + return await this.db.$transaction(async db => { + const snapshot = await db.snapshot.findUnique({ + where: { + id_workspaceId: { + id: guid, + workspaceId, + }, + }, + }); + + // update + if (snapshot) { + // only update if state is newer + if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) { + await db.snapshot.update({ + select: { + seq: true, + }, + where: { + id_workspaceId: { + workspaceId, + id: guid, + }, + }, + data: { + blob, + state, + }, + }); + + return true; + } else { + return false; + } + } else { + // create + await db.snapshot.create({ + select: { + seq: true, + }, + data: { + id: guid, + workspaceId, + blob, + state, + seq: initialSeq, + }, + }); + + return true; + } + }); }); } @@ -484,21 +542,26 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { this.event.emit('doc:manager:snapshot:beforeUpdate', snapshot); } - await this.upsert(workspaceId, id, doc, last.seq); - this.logger.debug( - `Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}` - ); - await this.db.update.deleteMany({ - where: { - id, - workspaceId, - seq: { - in: updates.map(u => u.seq), - }, - }, - }); + const done = await this.upsert(workspaceId, id, doc, last.seq); + + if (done) { + this.logger.debug( + `Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}` + ); + + await this.db.update.deleteMany({ + where: { + id, + workspaceId, + seq: { + in: updates.map(u => u.seq), + }, + }, + }); + + await this.updateCachedUpdatesCount(workspaceId, id, -updates.length); + } - await this.updateCachedUpdatesCount(workspaceId, id, -updates.length); return doc; } @@ -581,22 +644,44 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { return null; } - private async lockUpdatesForAutoSquash(workspaceId: string, guid: string) { - return this.cache.setnx( + private async doWithLock(lock: string, job: () => Promise) { + const acquired = await this.cache.setnx(lock, 1, { + ttl: 60 * 1000, + }); + + if (!acquired) { + return; + } + + try { + return await job(); + } finally { + await this.cache.delete(lock).catch(e => { + // safe, the lock will be expired when ttl ends + this.logger.error(`Failed to release lock ${lock}`, e); + }); + } + } + + private async lockUpdatesForAutoSquash( + workspaceId: string, + guid: string, + job: () => Promise + ) { + return this.doWithLock( `doc:manager:updates-lock:${workspaceId}::${guid}`, - 1, - { - ttl: 60 * 1000, - } + job ); } - private async unlockUpdatesForAutoSquash(workspaceId: string, guid: string) { - return this.cache - .delete(`doc:manager:updates-lock:${workspaceId}::${guid}`) - .catch(e => { - // safe, the lock will be expired when ttl ends - this.logger.error('Failed to release updates lock', e); - }); + async lockSnapshotForUpsert( + workspaceId: string, + guid: string, + job: () => Promise + ) { + return this.doWithLock( + `doc:manager:snapshot-lock:${workspaceId}::${guid}`, + job + ); } } diff --git a/packages/backend/server/tests/doc.spec.ts b/packages/backend/server/tests/doc.spec.ts index 83147b96fe..2fbec0be8a 100644 --- a/packages/backend/server/tests/doc.spec.ts +++ b/packages/backend/server/tests/doc.spec.ts @@ -6,7 +6,12 @@ import { Test, TestingModule } from '@nestjs/testing'; import test from 'ava'; import { register } from 'prom-client'; import * as Sinon from 'sinon'; -import { Doc as YDoc, encodeStateAsUpdate } from 'yjs'; +import { + applyUpdate, + decodeStateVector, + Doc as YDoc, + encodeStateAsUpdate, +} from 'yjs'; import { CacheModule } from '../src/cache'; import { Config, ConfigModule } from '../src/config'; @@ -283,3 +288,73 @@ test('should throw if meet max retry times', async t => { ); t.is(stub.callCount, 5); }); + +test('should not update snapshot if state is outdated', async t => { + const db = m.get(PrismaService); + const manager = m.get(DocManager); + + await db.snapshot.create({ + data: { + id: '2', + workspaceId: '2', + blob: Buffer.from([0, 0]), + seq: 1, + }, + }); + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Buffer[] = []; + + doc.on('update', update => { + updates.push(Buffer.from(update)); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + await Promise.all(updates.map(update => manager.push('2', '2', update))); + + const updateWith3Records = await manager.getUpdates('2', '2'); + text.insert(11, '!'); + await manager.push('2', '2', updates[3]); + const updateWith4Records = await manager.getUpdates('2', '2'); + + // Simulation: + // Node A get 3 updates and squash them at time 1, will finish at time 10 + // Node B get 4 updates and squash them at time 3, will finish at time 8 + // Node B finish the squash first, and update the snapshot + // Node A finish the squash later, and update the snapshot to an outdated state + // Time: ----------------------> + // A: ^get ^upsert + // B: ^get ^upsert + // + // We should avoid such situation + // @ts-expect-error private + await manager.squash(updateWith4Records, null); + // @ts-expect-error private + await manager.squash(updateWith3Records, null); + + const result = await db.snapshot.findUnique({ + where: { + id_workspaceId: { + id: '2', + workspaceId: '2', + }, + }, + }); + + if (!result) { + t.fail('snapshot not found'); + return; + } + + const state = decodeStateVector(result.state!); + t.is(state.get(doc.clientID), 12); + + const d = new YDoc(); + applyUpdate(d, result.blob!); + + const dtext = d.getText('content'); + t.is(dtext.toString(), 'hello world!'); +});