From b6f46e01c54b41d21a850a2381a663bf0f80b53f Mon Sep 17 00:00:00 2001 From: forehalo Date: Wed, 21 Aug 2024 05:30:30 +0000 Subject: [PATCH] test(server): space adapters (#7903) --- .../backend/server/tests/doc/cron.spec.ts | 103 ++++++ .../server/tests/doc/userspace.spec.ts | 5 + .../server/tests/doc/workspace.spec.ts | 304 ++++++++++++++++++ .../backend/server/tests/sync/gateway.spec.ts | 5 + 4 files changed, 417 insertions(+) create mode 100644 packages/backend/server/tests/doc/cron.spec.ts create mode 100644 packages/backend/server/tests/doc/userspace.spec.ts create mode 100644 packages/backend/server/tests/doc/workspace.spec.ts create mode 100644 packages/backend/server/tests/sync/gateway.spec.ts diff --git a/packages/backend/server/tests/doc/cron.spec.ts b/packages/backend/server/tests/doc/cron.spec.ts new file mode 100644 index 0000000000..548ed2fbb1 --- /dev/null +++ b/packages/backend/server/tests/doc/cron.spec.ts @@ -0,0 +1,103 @@ +import { mock } from 'node:test'; + +import { TestingModule } from '@nestjs/testing'; +import { PrismaClient } from '@prisma/client'; +import test from 'ava'; +import * as Sinon from 'sinon'; + +import { DocStorageModule } from '../../src/core/doc'; +import { DocStorageCronJob } from '../../src/core/doc/job'; +import { Config } from '../../src/fundamentals/config'; +import { createTestingModule } from '../utils'; + +let m: TestingModule; +let timer: Sinon.SinonFakeTimers; +let db: PrismaClient; + +// cleanup database before each test +test.before(async () => { + timer = Sinon.useFakeTimers({ + toFake: ['setInterval'], + }); + m = await createTestingModule({ + imports: [DocStorageModule], + }); + + db = m.get(PrismaClient); +}); + +test.after.always(async () => { + await m.close(); + timer.restore(); +}); + +test('should poll when intervel due', async t => { + const manager = m.get(DocStorageCronJob); + const interval = m.get(Config).doc.manager.updatePollInterval; + + let resolve: any; + const fake = mock.method(manager, 'autoMergePendingDocUpdates', () => { + return new Promise(_resolve => { + resolve = _resolve; + }); + }); + + timer.tick(interval); + t.is(fake.mock.callCount(), 1); + + // busy + timer.tick(interval); + // @ts-expect-error private member + t.is(manager.busy, true); + t.is(fake.mock.callCount(), 1); + + resolve(); + await timer.tickAsync(1); + + // @ts-expect-error private member + t.is(manager.busy, false); + timer.tick(interval); + t.is(fake.mock.callCount(), 2); +}); + +test('should be able to cleanup expired history', async t => { + const timestamp = Date.now(); + + // insert expired data + await db.snapshotHistory.createMany({ + data: Array.from({ length: 10 }) + .fill(0) + .map((_, i) => ({ + workspaceId: '1', + id: '1', + blob: Buffer.from([1, 1]), + timestamp: new Date(timestamp - 10 - i), + expiredAt: new Date(timestamp - 1), + })), + }); + + // insert available data + await db.snapshotHistory.createMany({ + data: Array.from({ length: 10 }) + .fill(0) + .map((_, i) => ({ + workspaceId: '1', + id: '1', + blob: Buffer.from([1, 1]), + timestamp: new Date(timestamp + i), + expiredAt: new Date(timestamp + 1000), + })), + }); + + let count = await db.snapshotHistory.count(); + t.is(count, 20); + + await m.get(DocStorageCronJob).cleanupExpiredHistory(); + + count = await db.snapshotHistory.count(); + t.is(count, 10); + + const example = await db.snapshotHistory.findFirst(); + t.truthy(example); + t.true(example!.expiredAt > new Date()); +}); diff --git a/packages/backend/server/tests/doc/userspace.spec.ts b/packages/backend/server/tests/doc/userspace.spec.ts new file mode 100644 index 0000000000..2ee691ba21 --- /dev/null +++ b/packages/backend/server/tests/doc/userspace.spec.ts @@ -0,0 +1,5 @@ +import test from 'ava'; + +test('should test through userspace', t => { + t.pass(); +}); diff --git a/packages/backend/server/tests/doc/workspace.spec.ts b/packages/backend/server/tests/doc/workspace.spec.ts new file mode 100644 index 0000000000..34389f1ff5 --- /dev/null +++ b/packages/backend/server/tests/doc/workspace.spec.ts @@ -0,0 +1,304 @@ +import { TestingModule } from '@nestjs/testing'; +import { PrismaClient } from '@prisma/client'; +import test from 'ava'; +import * as Sinon from 'sinon'; +import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs'; + +import { + DocStorageModule, + PgWorkspaceDocStorageAdapter as Adapter, +} from '../../src/core/doc'; +import { ConfigModule } from '../../src/fundamentals/config'; +import { createTestingModule, initTestingDB } from '../utils'; + +let m: TestingModule; +let db: PrismaClient; +let adapter: Adapter; + +test.before('init testing module', async () => { + m = await createTestingModule({ + imports: [ + ConfigModule.forRoot({ + doc: { + manager: { + enableUpdateAutoMerging: false, + }, + }, + }), + DocStorageModule, + ], + }); + db = m.get(PrismaClient); + adapter = m.get(Adapter); + // @ts-expect-error private method + Sinon.stub(adapter, 'createDocHistory'); +}); + +test.beforeEach(async () => { + await initTestingDB(db); +}); + +test.after.always(async () => { + await m?.close(); +}); + +/** + * @deprecated `seq` would be removed + */ +test('should have sequential update number', async t => { + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Buffer[] = []; + + doc.on('update', update => { + updates.push(Buffer.from(update)); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + await adapter.pushDocUpdates('2', '2', updates); + + // [1,2,3] + let records = await db.update.findMany({ + where: { + workspaceId: '2', + id: '2', + }, + }); + + t.deepEqual( + records.map(({ seq }) => seq), + [1, 2, 3] + ); + + // merge + await adapter.getDoc('2', '2'); + + // fake the seq num is about to overflow + await db.snapshot.update({ + where: { + id_workspaceId: { + id: '2', + workspaceId: '2', + }, + }, + data: { + seq: 0x3ffffffe, + }, + }); + + await adapter.pushDocUpdates('2', '2', updates); + + records = await db.update.findMany({ + where: { + workspaceId: '2', + id: '2', + }, + }); + + t.deepEqual( + records.map(({ seq }) => seq), + [0x3ffffffe + 1, 0x3ffffffe + 2, 0x3ffffffe + 3] + ); + + // push a new update with new seq num + await adapter.pushDocUpdates('2', '2', updates.slice(0, 1)); + + // let the manager ignore update with the new seq num + // @ts-expect-error private method + const stub = Sinon.stub(adapter, 'getDocUpdates').resolves( + records.map(record => ({ + bin: record.blob, + timestamp: record.createdAt.getTime(), + })) + ); + + await adapter.getDoc('2', '2'); + stub.restore(); + + // should not merge in one run + t.not(await db.update.count(), 0); +}); + +test('should retry if failed to insert updates', async t => { + const stub = Sinon.stub(); + const createMany = db.update.createMany; + db.update.createMany = stub; + + stub.onCall(0).rejects(new Error()); + stub.onCall(1).resolves(); + + await t.notThrowsAsync(() => + adapter.pushDocUpdates('1', '1', [Buffer.from([0, 0])]) + ); + t.is(stub.callCount, 2); + + stub.reset(); + db.update.createMany = createMany; +}); + +test('should throw if meet max retry times', async t => { + const stub = Sinon.stub(); + const createMany = db.update.createMany; + db.update.createMany = stub; + + stub.rejects(new Error()); + + await t.throwsAsync( + () => adapter.pushDocUpdates('1', '1', [Buffer.from([0, 0])]), + { message: 'Failed to store doc updates.' } + ); + t.is(stub.callCount, 4); + + stub.reset(); + db.update.createMany = createMany; +}); + +test('should be able to merge updates as snapshot', async t => { + const doc = new YDoc(); + const text = doc.getText('content'); + text.insert(0, 'hello'); + const update = encodeStateAsUpdate(doc); + + await db.workspace.create({ + data: { + id: '1', + public: false, + }, + }); + + await db.update.createMany({ + data: [ + { + id: '1', + workspaceId: '1', + blob: Buffer.from(update), + seq: 1, + createdAt: new Date(Date.now() + 1), + }, + ], + }); + + t.deepEqual( + Buffer.from((await adapter.getDoc('1', '1'))!.bin), + Buffer.from(update) + ); + + let appendUpdate = Buffer.from([]); + doc.on('update', update => { + appendUpdate = Buffer.from(update); + }); + text.insert(5, 'world'); + + await db.update.create({ + data: { + workspaceId: '1', + id: '1', + blob: appendUpdate, + seq: 2, + createdAt: new Date(), + }, + }); + + { + const { bin } = (await adapter.getDoc('1', '1'))!; + const dbDoc = new YDoc(); + applyUpdate(dbDoc, bin); + + t.is(dbDoc.getText('content').toString(), 'helloworld'); + t.deepEqual(encodeStateAsUpdate(dbDoc), encodeStateAsUpdate(doc)); + } +}); + +test('should be able to merge updates into snapshot', async t => { + const updates: Buffer[] = []; + { + const doc = new YDoc(); + doc.on('update', data => { + updates.push(Buffer.from(data)); + }); + + const text = doc.getText('content'); + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + text.insert(11, '!'); + } + + { + await adapter.pushDocUpdates('1', '1', updates.slice(0, 2)); + // merge + const { bin } = (await adapter.getDoc('1', '1'))!; + const doc = new YDoc(); + applyUpdate(doc, bin); + + t.is(doc.getText('content').toString(), 'helloworld'); + } + + { + await adapter.pushDocUpdates('1', '1', updates.slice(2)); + // merge + const { bin } = (await adapter.getDoc('1', '1'))!; + const doc = new YDoc(); + applyUpdate(doc, bin); + + t.is(doc.getText('content').toString(), 'hello world!'); + } + + t.is(await db.update.count(), 0); +}); + +test('should not update snapshot if doc is outdated', async t => { + const updates: Buffer[] = []; + { + const doc = new YDoc(); + doc.on('update', data => { + updates.push(Buffer.from(data)); + }); + + const text = doc.getText('content'); + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + text.insert(11, '!'); + } + + await adapter.pushDocUpdates('2', '1', updates.slice(0, 2)); // 'helloworld' + // merge + await adapter.getDoc('2', '1'); + // fake the snapshot is a lot newer + await db.snapshot.update({ + where: { + id_workspaceId: { + workspaceId: '2', + id: '1', + }, + }, + data: { + updatedAt: new Date(Date.now() + 10000), + }, + }); + + { + await adapter.pushDocUpdates('2', '1', updates.slice(2)); // 'hello world!' + const { bin } = (await adapter.getDoc('2', '1'))!; + + // all updated will merged into doc not matter it's timestamp is outdated or not, + // but the snapshot record will not be updated + const doc = new YDoc(); + applyUpdate(doc, bin); + t.is(doc.getText('content').toString(), 'hello world!'); + } + + { + const doc = new YDoc(); + applyUpdate(doc, (await adapter.getDoc('2', '1'))!.bin); + // the snapshot will not get touched if the new doc's timestamp is outdated + t.is(doc.getText('content').toString(), 'helloworld'); + + // the updates are known as outdated, so they will be deleted + t.is(await db.update.count(), 0); + } +}); diff --git a/packages/backend/server/tests/sync/gateway.spec.ts b/packages/backend/server/tests/sync/gateway.spec.ts new file mode 100644 index 0000000000..51608689d7 --- /dev/null +++ b/packages/backend/server/tests/sync/gateway.spec.ts @@ -0,0 +1,5 @@ +import test from 'ava'; + +test('should test through sync gateway', t => { + t.pass(); +});