diff --git a/packages/backend/server/schema.prisma b/packages/backend/server/schema.prisma index f0b17cd978..ffc5be0b57 100644 --- a/packages/backend/server/schema.prisma +++ b/packages/backend/server/schema.prisma @@ -265,7 +265,9 @@ model Snapshot { seq Int @default(0) @db.Integer state Bytes? @db.ByteA createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6) - updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(6) + // the `updated_at` field will not record the time of record changed, + // but the created time of last seen update that has been merged into snapshot. + updatedAt DateTime @map("updated_at") @db.Timestamptz(6) @@id([id, workspaceId]) @@map("snapshots") diff --git a/packages/backend/server/src/core/doc/manager.ts b/packages/backend/server/src/core/doc/manager.ts index 6a3836cb37..74daacd3df 100644 --- a/packages/backend/server/src/core/doc/manager.ts +++ b/packages/backend/server/src/core/doc/manager.ts @@ -10,7 +10,6 @@ import { chunk } from 'lodash-es'; import { defer, retry } from 'rxjs'; import { applyUpdate, - decodeStateVector, Doc, encodeStateAsUpdate, encodeStateVector, @@ -46,36 +45,6 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean { return compare(yBinary, yBinary2, true); } -/** - * Detect whether rhs state is newer than lhs state. - * - * How could we tell a state is newer: - * - * i. if the state vector size is larger, it's newer - * ii. if the state vector size is same, compare each client's state - */ -function isStateNewer(lhs: Buffer, rhs: Buffer): boolean { - const lhsVector = decodeStateVector(lhs); - const rhsVector = decodeStateVector(rhs); - - if (lhsVector.size < rhsVector.size) { - return true; - } - - for (const [client, state] of lhsVector) { - const rstate = rhsVector.get(client); - if (!rstate) { - return false; - } - - if (state < rstate) { - return true; - } - } - - return false; -} - export function isEmptyBuffer(buf: Buffer): boolean { return ( buf.length === 0 || @@ -120,6 +89,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { this.destroy(); } + @CallTimer('doc', 'yjs_recover_updates_to_doc') private recoverDoc(...updates: Buffer[]): Promise { const doc = new Doc(); const chunks = chunk(updates, 10); @@ -383,7 +353,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { const updates = await this.getUpdates(workspaceId, guid); if (updates.length) { - const doc = await this.squash(updates, snapshot); + const doc = await this.squash(snapshot, updates); return Buffer.from(encodeStateVector(doc)); } @@ -464,97 +434,94 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { }); } + /** + * @returns whether the snapshot is updated to the latest, `undefined` means the doc to be upserted is outdated. + */ @CallTimer('doc', 'upsert') private async upsert( workspaceId: string, guid: string, doc: Doc, // we always delay the snapshot update to avoid db overload, - // so the value of `updatedAt` will not be accurate to user's real action time + // so the value of auto updated `updatedAt` by db will never be accurate to user's real action time updatedAt: Date, - initialSeq?: number + seq: number ) { const blob = Buffer.from(encodeStateAsUpdate(doc)); if (isEmptyBuffer(blob)) { - return false; + return undefined; } const state = Buffer.from(encodeStateVector(doc)); - await this.db.$queryRaw`BEGIN;`; - let committed = false; - const commit = async () => { - if (!committed) { - committed = true; - await this.db.$queryRaw`COMMIT;`; - } - }; + // CONCERNS: + // i. Because we save the real user's last seen action time as `updatedAt`, + // it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save. + // + // ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint. + // In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot. + // where: { id_workspaceId: {}, updatedAt: { lt: updatedAt } } + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + // + // iii. Only set the seq number when creating the snapshot. + // For updating scenario, the seq number will be updated when updates pushed to db. try { - const [snapshot]: { - workspace_id: string; - id: string; - blob: Buffer; - state?: Buffer; - }[] = await this.db.$queryRaw` - -- LOCK TABLE "Snapshot" IN SHARE ROW EXCLUSIVE MODE; - SELECT * FROM snapshots WHERE workspace_id = ${workspaceId} AND guid = ${guid} limit 1 - FOR UPDATE; + const result: { updatedAt: Date }[] = await this.db.$queryRaw` + INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "state", "seq", "created_at", "updated_at") + VALUES (${workspaceId}, ${guid}, ${blob}, ${state}, ${seq}, DEFAULT, ${updatedAt}) + ON CONFLICT ("workspace_id", "guid") + DO UPDATE SET "blob" = ${blob}, "state" = ${state}, "updated_at" = ${updatedAt}, "seq" = ${seq} + WHERE "snapshots"."workspace_id" = ${workspaceId} AND "snapshots"."guid" = ${guid} AND "snapshots"."updated_at" <= ${updatedAt} + RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt" `; - // update - if (snapshot) { - // only update if state is newer - if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) { - await this.db.snapshot.update({ - select: { - seq: true, - }, - where: { - id_workspaceId: { - workspaceId, - id: guid, - }, - }, - data: { - blob, - state, - updatedAt, - }, - }); + // const result = await this.db.snapshot.upsert({ + // select: { + // updatedAt: true, + // seq: true, + // }, + // where: { + // id_workspaceId: { + // workspaceId, + // id: guid, + // }, + // ⬇️ NOT SUPPORTED BY PRISMA YET + // updatedAt: { + // lt: updatedAt, + // }, + // }, + // update: { + // blob, + // state, + // updatedAt, + // }, + // create: { + // workspaceId, + // id: guid, + // blob, + // state, + // updatedAt, + // seq, + // }, + // }); - return true; - } else { - return false; - } - } else { - // create - // no record exists, should commit the previous row lock first - await commit(); - await this.db.snapshot.create({ - select: { - seq: true, - }, - data: { - id: guid, - workspaceId, - blob, - state, - seq: initialSeq, - createdAt: updatedAt, - updatedAt, - }, - }); + // if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process, + // the updates has been applied to current `doc` must have been seen by the other process as well. + // The `updatedSnapshot` will be `undefined` in this case. + const updatedSnapshot = result.at(0); - return true; + if (!updatedSnapshot) { + return undefined; } + + return true; } catch (e) { - await this.db.$queryRaw`ROLLBACK;`; - throw e; - } finally { - await commit(); + this.logger.error('Failed to upsert snapshot', e); + return false; } } + private async _get( workspaceId: string, guid: string @@ -564,7 +531,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { if (updates.length) { return { - doc: await this.squash(updates, snapshot), + doc: await this.squash(snapshot, updates), }; } @@ -576,17 +543,16 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { * and delete the updates records at the same time. */ @CallTimer('doc', 'squash') - private async squash(updates: Update[], snapshot: Snapshot | null) { + private async squash(snapshot: Snapshot | null, updates: Update[]) { if (!updates.length) { throw new Error('No updates to squash'); } - const first = updates[0]; - const last = updates[updates.length - 1]; - const { id, workspaceId } = first; + const last = updates[updates.length - 1]; + const { id, workspaceId } = last; const doc = await this.applyUpdates( - first.id, + id, snapshot ? snapshot.blob : Buffer.from([0, 0]), ...updates.map(u => u.blob) ); @@ -617,19 +583,24 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { ); } - // always delete updates - // the upsert will return false if the state is not newer, so we don't need to worry about it - const { count } = await this.db.update.deleteMany({ - where: { - id, - workspaceId, - seq: { - in: updates.map(u => u.seq), + // we will keep the updates only if the upsert failed on unknown reason + // `done === undefined` means the updates is outdated(have already been merged by other process), safe to be deleted + // `done === true` means the upsert is successful, safe to be deleted + if (done !== false) { + // always delete updates + // the upsert will return false if the state is not newer, so we don't need to worry about it + const { count } = await this.db.update.deleteMany({ + where: { + id, + workspaceId, + seq: { + in: updates.map(u => u.seq), + }, }, - }, - }); + }); - await this.updateCachedUpdatesCount(workspaceId, id, -count); + await this.updateCachedUpdatesCount(workspaceId, id, -count); + } return doc; } diff --git a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts index a3351b9776..57e01ef2c4 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts @@ -277,6 +277,7 @@ export class WorkspaceResolver { id: workspace.id, workspaceId: workspace.id, blob: buffer, + updatedAt: new Date(), }, }); } diff --git a/packages/backend/server/tests/doc.spec.ts b/packages/backend/server/tests/doc.spec.ts index b8b20a0857..e97cab118a 100644 --- a/packages/backend/server/tests/doc.spec.ts +++ b/packages/backend/server/tests/doc.spec.ts @@ -4,12 +4,7 @@ import { TestingModule } from '@nestjs/testing'; import { PrismaClient } from '@prisma/client'; import test from 'ava'; import * as Sinon from 'sinon'; -import { - applyUpdate, - decodeStateVector, - Doc as YDoc, - encodeStateAsUpdate, -} from 'yjs'; +import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs'; import { DocManager, DocModule } from '../src/core/doc'; import { QuotaModule } from '../src/core/quota'; @@ -277,72 +272,120 @@ test('should throw if meet max retry times', async t => { t.is(stub.callCount, 5); }); -test('should not update snapshot if state is outdated', async t => { - const db = m.get(PrismaClient); +test('should be able to insert the snapshot if it is new created', async t => { const manager = m.get(DocManager); - await db.snapshot.create({ - data: { - id: '2', - workspaceId: '2', - blob: Buffer.from([0, 0]), - seq: 1, - }, - }); const doc = new YDoc(); const text = doc.getText('content'); - const updates: Buffer[] = []; - - doc.on('update', update => { - updates.push(Buffer.from(update)); - }); - text.insert(0, 'hello'); - text.insert(5, 'world'); - text.insert(5, ' '); + const update = encodeStateAsUpdate(doc); - await Promise.all(updates.map(update => manager.push('2', '2', update))); + await manager.push('1', '1', Buffer.from(update)); - const updateWith3Records = await manager.getUpdates('2', '2'); - text.insert(11, '!'); - await manager.push('2', '2', updates[3]); - const updateWith4Records = await manager.getUpdates('2', '2'); - - // Simulation: - // Node A get 3 updates and squash them at time 1, will finish at time 10 - // Node B get 4 updates and squash them at time 3, will finish at time 8 - // Node B finish the squash first, and update the snapshot - // Node A finish the squash later, and update the snapshot to an outdated state - // Time: ----------------------> - // A: ^get ^upsert - // B: ^get ^upsert - // - // We should avoid such situation + const updates = await manager.getUpdates('1', '1'); + t.is(updates.length, 1); // @ts-expect-error private - await manager.squash(updateWith4Records, null); - // @ts-expect-error private - await manager.squash(updateWith3Records, null); + const snapshot = await manager.squash(null, updates); - const result = await db.snapshot.findUnique({ + t.truthy(snapshot); + t.is(snapshot.getText('content').toString(), 'hello'); + + const restUpdates = await manager.getUpdates('1', '1'); + + t.is(restUpdates.length, 0); +}); + +test('should be able to merge updates into snapshot', async t => { + const manager = m.get(DocManager); + + const updates: Buffer[] = []; + { + const doc = new YDoc(); + doc.on('update', data => { + updates.push(Buffer.from(data)); + }); + + const text = doc.getText('content'); + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + text.insert(11, '!'); + } + + { + await manager.batchPush('1', '1', updates.slice(0, 2)); + // do the merge + const doc = (await manager.get('1', '1'))!; + + t.is(doc.getText('content').toString(), 'helloworld'); + } + + { + await manager.batchPush('1', '1', updates.slice(2)); + const doc = (await manager.get('1', '1'))!; + + t.is(doc.getText('content').toString(), 'hello world!'); + } + + const restUpdates = await manager.getUpdates('1', '1'); + + t.is(restUpdates.length, 0); +}); + +test('should not update snapshot if doc is outdated', async t => { + const manager = m.get(DocManager); + const db = m.get(PrismaClient); + + const updates: Buffer[] = []; + { + const doc = new YDoc(); + doc.on('update', data => { + updates.push(Buffer.from(data)); + }); + + const text = doc.getText('content'); + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + text.insert(11, '!'); + } + + await manager.batchPush('2', '1', updates.slice(0, 2)); // 'helloworld' + // merge updates into snapshot + await manager.get('2', '1'); + // fake the snapshot is a lot newer + await db.snapshot.update({ where: { id_workspaceId: { - id: '2', workspaceId: '2', + id: '1', }, }, + data: { + updatedAt: new Date(Date.now() + 10000), + }, }); - if (!result) { - t.fail('snapshot not found'); - return; + { + const snapshot = await manager.getSnapshot('2', '1'); + await manager.batchPush('2', '1', updates.slice(2)); // 'hello world!' + const updateRecords = await manager.getUpdates('2', '1'); + + // @ts-expect-error private + const doc = await manager.squash(snapshot, updateRecords); + + // all updated will merged into doc not matter it's timestamp is outdated or not, + // but the snapshot record will not be updated + t.is(doc.getText('content').toString(), 'hello world!'); } - const state = decodeStateVector(result.state!); - t.is(state.get(doc.clientID), 12); + { + const doc = new YDoc(); + applyUpdate(doc, (await manager.getSnapshot('2', '1'))!.blob); + // the snapshot will not get touched if the new doc's timestamp is outdated + t.is(doc.getText('content').toString(), 'helloworld'); - const d = new YDoc(); - applyUpdate(d, result.blob!); - - const dtext = d.getText('content'); - t.is(dtext.toString(), 'hello world!'); + // the updates are known as outdated, so they will be deleted + t.is((await manager.getUpdates('2', '1')).length, 0); + } });