test(server): space adapters (#7903)

This commit is contained in:
forehalo
2024-08-21 05:30:30 +00:00
parent e20bdbf925
commit b6f46e01c5
4 changed files with 417 additions and 0 deletions

View File

@@ -0,0 +1,103 @@
import { mock } from 'node:test';
import { TestingModule } from '@nestjs/testing';
import { PrismaClient } from '@prisma/client';
import test from 'ava';
import * as Sinon from 'sinon';
import { DocStorageModule } from '../../src/core/doc';
import { DocStorageCronJob } from '../../src/core/doc/job';
import { Config } from '../../src/fundamentals/config';
import { createTestingModule } from '../utils';
let m: TestingModule;
let timer: Sinon.SinonFakeTimers;
let db: PrismaClient;
// cleanup database before each test
test.before(async () => {
timer = Sinon.useFakeTimers({
toFake: ['setInterval'],
});
m = await createTestingModule({
imports: [DocStorageModule],
});
db = m.get(PrismaClient);
});
test.after.always(async () => {
await m.close();
timer.restore();
});
test('should poll when intervel due', async t => {
const manager = m.get(DocStorageCronJob);
const interval = m.get(Config).doc.manager.updatePollInterval;
let resolve: any;
const fake = mock.method(manager, 'autoMergePendingDocUpdates', () => {
return new Promise(_resolve => {
resolve = _resolve;
});
});
timer.tick(interval);
t.is(fake.mock.callCount(), 1);
// busy
timer.tick(interval);
// @ts-expect-error private member
t.is(manager.busy, true);
t.is(fake.mock.callCount(), 1);
resolve();
await timer.tickAsync(1);
// @ts-expect-error private member
t.is(manager.busy, false);
timer.tick(interval);
t.is(fake.mock.callCount(), 2);
});
test('should be able to cleanup expired history', async t => {
const timestamp = Date.now();
// insert expired data
await db.snapshotHistory.createMany({
data: Array.from({ length: 10 })
.fill(0)
.map((_, i) => ({
workspaceId: '1',
id: '1',
blob: Buffer.from([1, 1]),
timestamp: new Date(timestamp - 10 - i),
expiredAt: new Date(timestamp - 1),
})),
});
// insert available data
await db.snapshotHistory.createMany({
data: Array.from({ length: 10 })
.fill(0)
.map((_, i) => ({
workspaceId: '1',
id: '1',
blob: Buffer.from([1, 1]),
timestamp: new Date(timestamp + i),
expiredAt: new Date(timestamp + 1000),
})),
});
let count = await db.snapshotHistory.count();
t.is(count, 20);
await m.get(DocStorageCronJob).cleanupExpiredHistory();
count = await db.snapshotHistory.count();
t.is(count, 10);
const example = await db.snapshotHistory.findFirst();
t.truthy(example);
t.true(example!.expiredAt > new Date());
});

View File

@@ -0,0 +1,5 @@
import test from 'ava';
test('should test through userspace', t => {
t.pass();
});

View File

@@ -0,0 +1,304 @@
import { TestingModule } from '@nestjs/testing';
import { PrismaClient } from '@prisma/client';
import test from 'ava';
import * as Sinon from 'sinon';
import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import {
DocStorageModule,
PgWorkspaceDocStorageAdapter as Adapter,
} from '../../src/core/doc';
import { ConfigModule } from '../../src/fundamentals/config';
import { createTestingModule, initTestingDB } from '../utils';
let m: TestingModule;
let db: PrismaClient;
let adapter: Adapter;
test.before('init testing module', async () => {
m = await createTestingModule({
imports: [
ConfigModule.forRoot({
doc: {
manager: {
enableUpdateAutoMerging: false,
},
},
}),
DocStorageModule,
],
});
db = m.get(PrismaClient);
adapter = m.get(Adapter);
// @ts-expect-error private method
Sinon.stub(adapter, 'createDocHistory');
});
test.beforeEach(async () => {
await initTestingDB(db);
});
test.after.always(async () => {
await m?.close();
});
/**
* @deprecated `seq` would be removed
*/
test('should have sequential update number', async t => {
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await adapter.pushDocUpdates('2', '2', updates);
// [1,2,3]
let records = await db.update.findMany({
where: {
workspaceId: '2',
id: '2',
},
});
t.deepEqual(
records.map(({ seq }) => seq),
[1, 2, 3]
);
// merge
await adapter.getDoc('2', '2');
// fake the seq num is about to overflow
await db.snapshot.update({
where: {
id_workspaceId: {
id: '2',
workspaceId: '2',
},
},
data: {
seq: 0x3ffffffe,
},
});
await adapter.pushDocUpdates('2', '2', updates);
records = await db.update.findMany({
where: {
workspaceId: '2',
id: '2',
},
});
t.deepEqual(
records.map(({ seq }) => seq),
[0x3ffffffe + 1, 0x3ffffffe + 2, 0x3ffffffe + 3]
);
// push a new update with new seq num
await adapter.pushDocUpdates('2', '2', updates.slice(0, 1));
// let the manager ignore update with the new seq num
// @ts-expect-error private method
const stub = Sinon.stub(adapter, 'getDocUpdates').resolves(
records.map(record => ({
bin: record.blob,
timestamp: record.createdAt.getTime(),
}))
);
await adapter.getDoc('2', '2');
stub.restore();
// should not merge in one run
t.not(await db.update.count(), 0);
});
test('should retry if failed to insert updates', async t => {
const stub = Sinon.stub();
const createMany = db.update.createMany;
db.update.createMany = stub;
stub.onCall(0).rejects(new Error());
stub.onCall(1).resolves();
await t.notThrowsAsync(() =>
adapter.pushDocUpdates('1', '1', [Buffer.from([0, 0])])
);
t.is(stub.callCount, 2);
stub.reset();
db.update.createMany = createMany;
});
test('should throw if meet max retry times', async t => {
const stub = Sinon.stub();
const createMany = db.update.createMany;
db.update.createMany = stub;
stub.rejects(new Error());
await t.throwsAsync(
() => adapter.pushDocUpdates('1', '1', [Buffer.from([0, 0])]),
{ message: 'Failed to store doc updates.' }
);
t.is(stub.callCount, 4);
stub.reset();
db.update.createMany = createMany;
});
test('should be able to merge updates as snapshot', async t => {
const doc = new YDoc();
const text = doc.getText('content');
text.insert(0, 'hello');
const update = encodeStateAsUpdate(doc);
await db.workspace.create({
data: {
id: '1',
public: false,
},
});
await db.update.createMany({
data: [
{
id: '1',
workspaceId: '1',
blob: Buffer.from(update),
seq: 1,
createdAt: new Date(Date.now() + 1),
},
],
});
t.deepEqual(
Buffer.from((await adapter.getDoc('1', '1'))!.bin),
Buffer.from(update)
);
let appendUpdate = Buffer.from([]);
doc.on('update', update => {
appendUpdate = Buffer.from(update);
});
text.insert(5, 'world');
await db.update.create({
data: {
workspaceId: '1',
id: '1',
blob: appendUpdate,
seq: 2,
createdAt: new Date(),
},
});
{
const { bin } = (await adapter.getDoc('1', '1'))!;
const dbDoc = new YDoc();
applyUpdate(dbDoc, bin);
t.is(dbDoc.getText('content').toString(), 'helloworld');
t.deepEqual(encodeStateAsUpdate(dbDoc), encodeStateAsUpdate(doc));
}
});
test('should be able to merge updates into snapshot', async t => {
const updates: Buffer[] = [];
{
const doc = new YDoc();
doc.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
}
{
await adapter.pushDocUpdates('1', '1', updates.slice(0, 2));
// merge
const { bin } = (await adapter.getDoc('1', '1'))!;
const doc = new YDoc();
applyUpdate(doc, bin);
t.is(doc.getText('content').toString(), 'helloworld');
}
{
await adapter.pushDocUpdates('1', '1', updates.slice(2));
// merge
const { bin } = (await adapter.getDoc('1', '1'))!;
const doc = new YDoc();
applyUpdate(doc, bin);
t.is(doc.getText('content').toString(), 'hello world!');
}
t.is(await db.update.count(), 0);
});
test('should not update snapshot if doc is outdated', async t => {
const updates: Buffer[] = [];
{
const doc = new YDoc();
doc.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
}
await adapter.pushDocUpdates('2', '1', updates.slice(0, 2)); // 'helloworld'
// merge
await adapter.getDoc('2', '1');
// fake the snapshot is a lot newer
await db.snapshot.update({
where: {
id_workspaceId: {
workspaceId: '2',
id: '1',
},
},
data: {
updatedAt: new Date(Date.now() + 10000),
},
});
{
await adapter.pushDocUpdates('2', '1', updates.slice(2)); // 'hello world!'
const { bin } = (await adapter.getDoc('2', '1'))!;
// all updated will merged into doc not matter it's timestamp is outdated or not,
// but the snapshot record will not be updated
const doc = new YDoc();
applyUpdate(doc, bin);
t.is(doc.getText('content').toString(), 'hello world!');
}
{
const doc = new YDoc();
applyUpdate(doc, (await adapter.getDoc('2', '1'))!.bin);
// the snapshot will not get touched if the new doc's timestamp is outdated
t.is(doc.getText('content').toString(), 'helloworld');
// the updates are known as outdated, so they will be deleted
t.is(await db.update.count(), 0);
}
});

View File

@@ -0,0 +1,5 @@
import test from 'ava';
test('should test through sync gateway', t => {
t.pass();
});