diff --git a/apps/server/migrations/20231009081826_updates_manager/migration.sql b/apps/server/migrations/20231009081826_updates_manager/migration.sql new file mode 100644 index 0000000000..8f24895010 --- /dev/null +++ b/apps/server/migrations/20231009081826_updates_manager/migration.sql @@ -0,0 +1,19 @@ +/* + Warnings: + + - A unique constraint covering the columns `[workspace_id,guid,seq]` on the table `updates` will be added. If there are existing duplicate values, this will fail. + - Added the required column `seq` to the `updates` table without a default value. This is not possible if the table is not empty. + +*/ +-- DropIndex +DROP INDEX "updates_guid_workspace_id_idx"; + +-- AlterTable +ALTER TABLE "snapshots" ADD COLUMN "seq" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "state" BYTEA; + +-- AlterTable +ALTER TABLE "updates" ADD COLUMN "seq" INTEGER NOT NULL; + +-- CreateIndex +CREATE UNIQUE INDEX "updates_workspace_id_guid_seq_key" ON "updates"("workspace_id", "guid", "seq"); diff --git a/apps/server/schema.prisma b/apps/server/schema.prisma index 93b481b081..be3aab7294 100644 --- a/apps/server/schema.prisma +++ b/apps/server/schema.prisma @@ -134,6 +134,8 @@ model Snapshot { id String @default(uuid()) @map("guid") @db.VarChar workspaceId String @map("workspace_id") @db.VarChar blob Bytes @db.ByteA + seq Int @default(0) @db.Integer + state Bytes? @db.ByteA createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6) updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(6) @@ -144,12 +146,13 @@ model Snapshot { // backup during other update operation queue downtime model Update { objectId String @id @default(uuid()) @map("object_id") @db.VarChar - id String @map("guid") @db.VarChar workspaceId String @map("workspace_id") @db.VarChar + id String @map("guid") @db.VarChar + seq Int @db.Integer blob Bytes @db.ByteA createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6) - @@index([id, workspaceId]) + @@unique([workspaceId, id, seq]) @@map("updates") } diff --git a/apps/server/src/modules/doc/manager.ts b/apps/server/src/modules/doc/manager.ts index 928359d8eb..2d0784f2b5 100644 --- a/apps/server/src/modules/doc/manager.ts +++ b/apps/server/src/modules/doc/manager.ts @@ -5,7 +5,9 @@ import { OnModuleDestroy, OnModuleInit, } from '@nestjs/common'; -import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs'; +import { Snapshot, Update } from '@prisma/client'; +import { defer, retry } from 'rxjs'; +import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'; import { Config } from '../../config'; import { Metrics } from '../../metrics/metrics'; @@ -29,6 +31,8 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean { return compare(yBinary, yBinary2, true); } +const MAX_SEQ_NUM = 0x3fffffff; // u31 + /** * Since we can't directly save all client updates into database, in which way the database will overload, * we need to buffer the updates and merge them to reduce db write. @@ -36,13 +40,12 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean { * And also, if a new client join, it would be nice to see the latest doc asap, * so we need to at least store a snapshot of the doc and return quickly, * along side all the updates that have not been applies to that snapshot(timestamp). - * - * @see [RedisUpdateManager](./redis-manager.ts) - redis backed manager */ @Injectable() export class DocManager implements OnModuleInit, OnModuleDestroy { protected logger = new Logger(DocManager.name); private job: NodeJS.Timeout | null = null; + private seqMap = new Map(); private busy = false; constructor( @@ -84,17 +87,14 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { return doc; } - protected yjsMergeUpdates(...updates: Buffer[]): Buffer { + protected applyUpdates(guid: string, ...updates: Buffer[]): Doc { const doc = this.recoverDoc(...updates); - - return Buffer.from(encodeStateAsUpdate(doc)); - } - - protected mergeUpdates(guid: string, ...updates: Buffer[]): Buffer { - const yjsResult = this.yjsMergeUpdates(...updates); this.metrics.jwstCodecMerge(1, {}); - let log = false; + + // test jwst codec if (this.config.doc.manager.experimentalMergeWithJwstCodec) { + const yjsResult = Buffer.from(encodeStateAsUpdate(doc)); + let log = false; try { const jwstResult = jwstMergeUpdates(updates); if (!compare(yjsResult, jwstResult)) { @@ -121,7 +121,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { } } - return yjsResult; + return doc; } /** @@ -131,7 +131,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { this.job = setInterval(() => { if (!this.busy) { this.busy = true; - this.apply() + this.autoSquash() .catch(() => { /* we handle all errors in work itself */ }) @@ -161,185 +161,146 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { } /** - * add update to manager for later processing like fast merging. + * add update to manager for later processing. */ async push(workspaceId: string, guid: string, update: Buffer) { - await this.db.update.create({ - data: { - workspaceId, - id: guid, - blob: update, - }, + await new Promise((resolve, reject) => { + defer(async () => { + const seq = await this.getUpdateSeq(workspaceId, guid); + await this.db.update.create({ + data: { + workspaceId, + id: guid, + seq, + blob: update, + }, + }); + }) + .pipe(retry(MAX_SEQ_NUM)) // retry until seq num not conflict + .subscribe({ + next: () => { + this.logger.verbose( + `pushed update for workspace: ${workspaceId}, guid: ${guid}` + ); + resolve(); + }, + error: reject, + }); }); + } - this.logger.verbose( - `pushed update for workspace: ${workspaceId}, guid: ${guid}` - ); + /** + * get the latest doc with all update applied. + */ + async get(workspaceId: string, guid: string): Promise { + const result = await this._get(workspaceId, guid); + if (result) { + if ('doc' in result) { + return result.doc; + } else if ('snapshot' in result) { + return this.recoverDoc(result.snapshot); + } + } + + return null; + } + + /** + * get the latest doc binary with all update applied. + */ + async getBinary(workspaceId: string, guid: string): Promise { + const result = await this._get(workspaceId, guid); + if (result) { + if ('doc' in result) { + return Buffer.from(encodeStateAsUpdate(result.doc)); + } else if ('snapshot' in result) { + return result.snapshot; + } + } + + return null; + } + + /** + * get the latest doc state vector with all update applied. + */ + async getState(workspaceId: string, guid: string): Promise { + const snapshot = await this.getSnapshot(workspaceId, guid); + const updates = await this.getUpdates(workspaceId, guid); + + if (updates.length) { + const doc = await this.squash(updates, snapshot); + return Buffer.from(encodeStateVector(doc)); + } + + return snapshot ? snapshot.state : null; } /** * get the snapshot of the doc we've seen. */ - async getSnapshot( - workspaceId: string, - guid: string - ): Promise { - const snapshot = await this.db.snapshot.findFirst({ + protected async getSnapshot(workspaceId: string, guid: string) { + return this.db.snapshot.findUnique({ where: { - workspaceId, - id: guid, + id_workspaceId: { + workspaceId, + id: guid, + }, }, }); - - return snapshot?.blob; } /** * get pending updates */ - async getUpdates(workspaceId: string, guid: string): Promise { - const updates = await this.db.update.findMany({ + protected async getUpdates(workspaceId: string, guid: string) { + return this.db.update.findMany({ where: { workspaceId, id: guid, }, + orderBy: { + seq: 'asc', + }, }); - - return updates.map(update => update.blob); - } - - /** - * get the latest doc with all update applied. - * - * latest = snapshot + updates - */ - async getLatest(workspaceId: string, guid: string): Promise { - const snapshot = await this.getSnapshot(workspaceId, guid); - const updates = await this.getUpdates(workspaceId, guid); - - if (updates.length) { - if (snapshot) { - return this.recoverDoc(snapshot, ...updates); - } else { - return this.recoverDoc(...updates); - } - } - - if (snapshot) { - return this.recoverDoc(snapshot); - } - - return undefined; - } - - /** - * get the latest doc and convert it to update binary - */ - async getLatestUpdate( - workspaceId: string, - guid: string - ): Promise { - const doc = await this.getLatest(workspaceId, guid); - - return doc ? Buffer.from(encodeStateAsUpdate(doc)) : undefined; } /** * apply pending updates to snapshot */ - async apply() { - const updates = await this.db - .$transaction(async db => { - // find the first update and batch process updates with same id - const first = await db.update.findFirst({ - orderBy: { - createdAt: 'asc', - }, - }); + protected async autoSquash() { + // find the first update and batch process updates with same id + const first = await this.db.update.findFirst({ + orderBy: { + createdAt: 'asc', + }, + }); - // no pending updates - if (!first) { - return; - } - - const { id, workspaceId } = first; - const updates = await db.update.findMany({ - where: { - id, - workspaceId, - }, - }); - - // no pending updates - if (!updates.length) { - return; - } - - // remove update that will be merged later - await db.update.deleteMany({ - where: { - id, - workspaceId, - }, - }); - - return updates; - }) - .catch( - // transaction failed, it's safe to ignore - e => { - this.logger.error(`Failed to fetch updates: ${e}`); - } - ); - - // we put update merging logic outside transaction will make the processing more complex, - // but it's better to do so, since the merging may takes a lot of time, - // which may slow down the whole db. - if (!updates?.length) { + // no pending updates + if (!first) { return; } - const { id, workspaceId } = updates[0]; - - this.logger.verbose( - `applying ${updates.length} updates for workspace: ${workspaceId}, guid: ${id}` - ); + const { id, workspaceId } = first; try { - const snapshot = await this.db.snapshot.findFirst({ - where: { - workspaceId, - id, - }, - }); - - // merge updates - const merged = snapshot - ? this.mergeUpdates(id, snapshot.blob, ...updates.map(u => u.blob)) - : this.mergeUpdates(id, ...updates.map(u => u.blob)); - - // save snapshot - await this.upsert(workspaceId, id, merged); + await this._get(workspaceId, id); } catch (e) { - // failed to merge updates, put them back - this.logger.error(`Failed to merge updates: ${e}`); - - await this.db.update - .createMany({ - data: updates.map(u => ({ - id: u.id, - workspaceId: u.workspaceId, - blob: u.blob, - })), - }) - .catch(e => { - // failed to recover, fallback TBD - this.logger.error(`Fetal: failed to put updates back to db: ${e}`); - }); + this.logger.error( + `Failed to apply updates for workspace: ${workspaceId}, guid: ${id}` + ); + this.logger.error(e); } } - protected async upsert(workspaceId: string, guid: string, blob: Buffer) { + protected async upsert( + workspaceId: string, + guid: string, + doc: Doc, + seq?: number + ) { + const blob = Buffer.from(encodeStateAsUpdate(doc)); + const state = Buffer.from(encodeStateVector(doc)); return this.db.snapshot.upsert({ where: { id_workspaceId: { @@ -351,10 +312,103 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { id: guid, workspaceId, blob, + state, + seq, }, update: { blob, + state, }, }); } + + protected async _get( + workspaceId: string, + guid: string + ): Promise<{ doc: Doc } | { snapshot: Buffer } | null> { + const snapshot = await this.getSnapshot(workspaceId, guid); + const updates = await this.getUpdates(workspaceId, guid); + + if (updates.length) { + return { + doc: await this.squash(updates, snapshot), + }; + } + + return snapshot ? { snapshot: snapshot.blob } : null; + } + + /** + * Squash updates into a single update and save it as snapshot, + * and delete the updates records at the same time. + */ + protected async squash(updates: Update[], snapshot: Snapshot | null) { + if (!updates.length) { + throw new Error('No updates to squash'); + } + const first = updates[0]; + const last = updates[updates.length - 1]; + + const doc = this.applyUpdates( + first.id, + snapshot ? snapshot.blob : Buffer.from([0, 0]), + ...updates.map(u => u.blob) + ); + + const { id, workspaceId } = first; + + await this.upsert(workspaceId, id, doc, last.seq); + await this.db.update.deleteMany({ + where: { + id, + workspaceId, + seq: { + in: updates.map(u => u.seq), + }, + }, + }); + return doc; + } + + private async getUpdateSeq(workspaceId: string, guid: string) { + try { + const { seq } = await this.db.snapshot.update({ + select: { + seq: true, + }, + where: { + id_workspaceId: { + workspaceId, + id: guid, + }, + }, + data: { + seq: { + increment: 1, + }, + }, + }); + + // reset + if (seq === MAX_SEQ_NUM) { + await this.db.snapshot.update({ + where: { + id_workspaceId: { + workspaceId, + id: guid, + }, + }, + data: { + seq: 0, + }, + }); + } + + return seq; + } catch { + const last = this.seqMap.get(workspaceId + guid) ?? 0; + this.seqMap.set(workspaceId + guid, last + 1); + return last + 1; + } + } } diff --git a/apps/server/src/modules/doc/redis-manager.ts b/apps/server/src/modules/doc/redis-manager.ts index e8270cfa4f..f448b09a1a 100644 --- a/apps/server/src/modules/doc/redis-manager.ts +++ b/apps/server/src/modules/doc/redis-manager.ts @@ -23,6 +23,9 @@ const pushUpdateLua = ` redis.call('rpush', KEYS[2], ARGV[2]) `; +/** + * @deprecated unstable + */ @Injectable() export class RedisDocManager extends DocManager { private readonly redis: Redis; @@ -44,41 +47,15 @@ export class RedisDocManager extends DocManager { override onModuleInit(): void { if (this.automation) { - this.logger.log('Use Redis'); this.setup(); } } - override async push(workspaceId: string, guid: string, update: Buffer) { - try { - const key = `${workspaceId}:${guid}`; - - // @ts-expect-error custom command - this.redis.pushDocUpdate(pending, updates`${key}`, key, update); - - this.logger.verbose( - `pushed update for workspace: ${workspaceId}, guid: ${guid}` - ); - } catch (e) { - return await super.push(workspaceId, guid, update); - } - } - - override async getUpdates( - workspaceId: string, - guid: string - ): Promise { - try { - return this.redis.lrangeBuffer(updates`${workspaceId}:${guid}`, 0, -1); - } catch (e) { - return super.getUpdates(workspaceId, guid); - } - } - - override async apply(): Promise { + override async autoSquash(): Promise { // incase some update fallback to db - await super.apply(); + await super.autoSquash(); + // consume rest updates in redis queue const pendingDoc = await this.redis.spop(pending).catch(() => null); // safe if (!pendingDoc) { @@ -127,13 +104,12 @@ export class RedisDocManager extends DocManager { const snapshot = await this.getSnapshot(workspaceId, id); // merge - const blob = snapshot - ? this.mergeUpdates(id, snapshot, ...updates) - : this.mergeUpdates(id, ...updates); + const doc = snapshot + ? this.applyUpdates(id, snapshot.blob, ...updates) + : this.applyUpdates(id, ...updates); // update snapshot - - await this.upsert(workspaceId, id, blob); + await this.upsert(workspaceId, id, doc, snapshot?.seq); // delete merged updates await this.redis diff --git a/apps/server/src/modules/sync/events/events.gateway.ts b/apps/server/src/modules/sync/events/events.gateway.ts index 30f76972f4..334d474f61 100644 --- a/apps/server/src/modules/sync/events/events.gateway.ts +++ b/apps/server/src/modules/sync/events/events.gateway.ts @@ -138,7 +138,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { } const guid = trimGuid(message.workspaceId, message.guid); - const doc = await this.docManager.getLatest(message.workspaceId, guid); + const doc = await this.docManager.get(message.workspaceId, guid); if (!doc) { endTimer(); diff --git a/apps/server/src/modules/sync/events/events.module.ts b/apps/server/src/modules/sync/events/events.module.ts index f9a1c1bef0..cf2fbb9ff3 100644 --- a/apps/server/src/modules/sync/events/events.module.ts +++ b/apps/server/src/modules/sync/events/events.module.ts @@ -3,10 +3,9 @@ import { Module } from '@nestjs/common'; import { DocModule } from '../../doc'; import { PermissionService } from '../../workspaces/permission'; import { EventsGateway } from './events.gateway'; -import { WorkspaceService } from './workspace'; @Module({ imports: [DocModule.forFeature()], - providers: [EventsGateway, PermissionService, WorkspaceService], + providers: [EventsGateway, PermissionService], }) export class EventsModule {} diff --git a/apps/server/src/modules/sync/events/workspace.ts b/apps/server/src/modules/sync/events/workspace.ts deleted file mode 100644 index 669cc41b6c..0000000000 --- a/apps/server/src/modules/sync/events/workspace.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { Doc, encodeStateAsUpdate } from 'yjs'; - -import { DocManager } from '../../doc'; -import { assertExists } from '../utils'; - -@Injectable() -export class WorkspaceService { - constructor(private readonly docManager: DocManager) {} - - async getDocsFromWorkspaceId(workspaceId: string): Promise< - Array<{ - guid: string; - update: Buffer; - }> - > { - const docs: Array<{ - guid: string; - update: Buffer; - }> = []; - const queue: Array<[string, Doc]> = []; - // Workspace Doc's guid is the same as workspaceId. This is achieved by when creating a new workspace, the doc guid - // is manually set to workspaceId. - const doc = await this.docManager.getLatest(workspaceId, workspaceId); - if (doc) { - queue.push([workspaceId, doc]); - } - - while (queue.length > 0) { - const head = queue.pop(); - assertExists(head); - const [guid, doc] = head; - docs.push({ - guid: guid, - update: Buffer.from(encodeStateAsUpdate(doc)), - }); - - for (const { guid } of doc.subdocs) { - const subDoc = await this.docManager.getLatest(workspaceId, guid); - if (subDoc) { - queue.push([guid, subDoc]); - } - } - } - - return docs; - } -} diff --git a/apps/server/src/modules/workspaces/controller.ts b/apps/server/src/modules/workspaces/controller.ts index d453a02567..858af6fabd 100644 --- a/apps/server/src/modules/workspaces/controller.ts +++ b/apps/server/src/modules/workspaces/controller.ts @@ -72,7 +72,7 @@ export class WorkspacesController { throw new ForbiddenException('Permission denied'); } - const update = await this.docManager.getLatestUpdate(ws, id); + const update = await this.docManager.getBinary(ws, id); if (!update) { throw new NotFoundException('Doc not found'); diff --git a/apps/server/tests/doc.spec.ts b/apps/server/tests/doc.spec.ts index 71f1e4a580..7e09d54968 100644 --- a/apps/server/tests/doc.spec.ts +++ b/apps/server/tests/doc.spec.ts @@ -75,7 +75,8 @@ test('should poll when intervel due', async t => { const interval = m.get(Config).doc.manager.updatePollInterval; let resolve: any; - const fake = mock.method(manager, 'apply', () => { + // @ts-expect-error private method + const fake = mock.method(manager, 'autoSquash', () => { return new Promise(_resolve => { resolve = _resolve; }); @@ -121,19 +122,22 @@ test('should merge update when intervel due', async t => { id: '1', workspaceId: '1', blob: Buffer.from([0, 0]), + seq: 1, }, { id: '1', workspaceId: '1', blob: Buffer.from(update), + seq: 2, }, ], }); - await manager.apply(); + // @ts-expect-error private method + await manager.autoSquash(); t.deepEqual( - (await manager.getLatestUpdate(ws.id, '1'))?.toString('hex'), + (await manager.getBinary(ws.id, '1'))?.toString('hex'), Buffer.from(update.buffer).toString('hex') ); @@ -148,13 +152,95 @@ test('should merge update when intervel due', async t => { workspaceId: ws.id, id: '1', blob: appendUpdate, + seq: 3, }, }); - await manager.apply(); + // @ts-expect-error private method + await manager.autoSquash(); t.deepEqual( - (await manager.getLatestUpdate(ws.id, '1'))?.toString('hex'), + (await manager.getBinary(ws.id, '1'))?.toString('hex'), Buffer.from(encodeStateAsUpdate(doc)).toString('hex') ); }); + +test('should have sequential update number', async t => { + const db = m.get(PrismaService); + const manager = m.get(DocManager); + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Buffer[] = []; + + doc.on('update', update => { + updates.push(Buffer.from(update)); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + await Promise.all(updates.map(update => manager.push('2', '2', update))); + + // [1,2,3] + // @ts-expect-error private method + let records = await manager.getUpdates('2', '2'); + + t.deepEqual( + records.map(({ seq }) => seq), + [1, 2, 3] + ); + + // @ts-expect-error private method + await manager.autoSquash(); + + await db.snapshot.update({ + where: { + id_workspaceId: { + id: '2', + workspaceId: '2', + }, + }, + data: { + seq: 0x3ffffffe, + }, + }); + + await Promise.all(updates.map(update => manager.push('2', '2', update))); + + // @ts-expect-error private method + records = await manager.getUpdates('2', '2'); + + // push a new update with new seq num + await manager.push('2', '2', updates[0]); + + // let the manager ignore update with the new seq num + // @ts-expect-error private method + const stub = Sinon.stub(manager, 'getUpdates').resolves(records); + + // @ts-expect-error private method + await manager.autoSquash(); + stub.restore(); + + // @ts-expect-error private method + records = await manager.getUpdates('2', '2'); + + // should not merge in one run + t.not(records.length, 0); +}); + +test('should retry if seq num conflict', async t => { + const manager = m.get(DocManager); + + // @ts-expect-error private method + const stub = Sinon.stub(manager, 'getUpdateSeq'); + + stub.onCall(0).resolves(1); + // seq num conflict + stub.onCall(1).resolves(1); + stub.onCall(2).resolves(2); + await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0]))); + await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0]))); + + t.is(stub.callCount, 3); +});