diff --git a/packages/backend/server/src/__tests__/doc/userspace.spec.ts b/packages/backend/server/src/__tests__/doc/userspace.spec.ts index 2ee691ba21..14f97d74d6 100644 --- a/packages/backend/server/src/__tests__/doc/userspace.spec.ts +++ b/packages/backend/server/src/__tests__/doc/userspace.spec.ts @@ -1,5 +1,145 @@ -import test from 'ava'; +import { randomUUID } from 'node:crypto'; -test('should test through userspace', t => { - t.pass(); +import ava, { TestFn } from 'ava'; +import { applyUpdate, Doc as YDoc } from 'yjs'; + +import { ConfigModule } from '../../base/config'; +import { + DocStorageModule, + PgUserspaceDocStorageAdapter as Adapter, +} from '../../core/doc'; +import { Models, type User } from '../../models'; +import { createTestingModule, type TestingModule } from '../utils'; + +interface Context { + module: TestingModule; + models: Models; + adapter: Adapter; +} + +const test = ava as TestFn; + +test.before(async t => { + const module = await createTestingModule({ + imports: [ + ConfigModule.forRoot({ + doc: { + manager: { + enableUpdateAutoMerging: false, + }, + }, + }), + DocStorageModule, + ], + }); + + t.context.models = module.get(Models); + t.context.adapter = module.get(Adapter); + t.context.module = module; +}); + +let user: User; + +test.beforeEach(async t => { + await t.context.module.initTestingDB(); + user = await t.context.models.user.create({ + email: 'test@affine.pro', + }); +}); + +test.after(async t => { + await t.context.module.close(); +}); + +test('should push user doc updates work', async t => { + const docId = randomUUID(); + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Uint8Array[] = []; + + doc.on('update', update => { + updates.push(update); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + let timestamp = await t.context.adapter.pushDocUpdates( + user.id, + docId, + updates + ); + t.truthy(timestamp); + + let record = await t.context.adapter.getDoc(user.id, docId); + const newDoc = new YDoc(); + applyUpdate(newDoc, record!.bin); + + t.is(newDoc.getText('content').toString(), 'hello world'); + // find all timestamps + const timestamps = await t.context.adapter.getSpaceDocTimestamps(user.id); + t.deepEqual(timestamps, { + [docId]: timestamp, + }); +}); + +test('should delete user doc work', async t => { + const docId = randomUUID(); + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Uint8Array[] = []; + + doc.on('update', update => { + updates.push(update); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + let timestamp = await t.context.adapter.pushDocUpdates( + user.id, + docId, + updates + ); + t.truthy(timestamp); + + let record = await t.context.adapter.getDoc(user.id, docId); + t.truthy(record); + + await t.context.adapter.deleteDoc(user.id, docId); + + record = await t.context.adapter.getDoc(user.id, docId); + t.falsy(record); +}); + +test('should delete all user docs work', async t => { + const docId = randomUUID(); + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Uint8Array[] = []; + + doc.on('update', update => { + updates.push(update); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + let timestamp = await t.context.adapter.pushDocUpdates( + user.id, + docId, + updates + ); + t.truthy(timestamp); + + let record = await t.context.adapter.getDoc(user.id, docId); + t.truthy(record); + + await t.context.adapter.deleteSpace(user.id); + + record = await t.context.adapter.getDoc(user.id, docId); + t.falsy(record); }); diff --git a/packages/backend/server/src/core/doc/adapters/userspace.ts b/packages/backend/server/src/core/doc/adapters/userspace.ts index d909e09b26..6853d05f93 100644 --- a/packages/backend/server/src/core/doc/adapters/userspace.ts +++ b/packages/backend/server/src/core/doc/adapters/userspace.ts @@ -1,15 +1,15 @@ import { Injectable } from '@nestjs/common'; -import { PrismaClient } from '@prisma/client'; import { Mutex } from '../../../base'; +import { Models } from '../../../models'; import { DocStorageOptions } from '../options'; import { DocRecord, DocStorageAdapter } from '../storage'; @Injectable() export class PgUserspaceDocStorageAdapter extends DocStorageAdapter { constructor( - private readonly db: PrismaClient, private readonly mutex: Mutex, + private readonly models: Models, options: DocStorageOptions ) { super(options); @@ -42,7 +42,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter { } override async getDoc(spaceId: string, docId: string) { - return this.getDocSnapshot(spaceId, docId); + return await this.getDocSnapshot(spaceId, docId); } async pushDocUpdates( @@ -79,103 +79,46 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter { } async deleteDoc(userId: string, docId: string) { - await this.db.userSnapshot.deleteMany({ - where: { - userId, - id: docId, - }, - }); + await this.models.userDoc.delete(userId, docId); } async deleteSpace(userId: string) { - await this.db.userSnapshot.deleteMany({ - where: { - userId, - }, - }); + await this.models.userDoc.deleteAllByUserId(userId); } async getSpaceDocTimestamps(userId: string, after?: number) { - const snapshots = await this.db.userSnapshot.findMany({ - select: { - id: true, - updatedAt: true, - }, - where: { - userId, - ...(after - ? { - updatedAt: { - gt: new Date(after), - }, - } - : {}), - }, - }); - - const result: Record = {}; - - snapshots.forEach(s => { - result[s.id] = s.updatedAt.getTime(); - }); - - return result; + return await this.models.userDoc.findTimestampsByUserId(userId, after); } protected async getDocSnapshot(userId: string, docId: string) { - const snapshot = await this.db.userSnapshot.findUnique({ - where: { - userId_id: { - userId, - id: docId, - }, - }, - }); + const snapshot = await this.models.userDoc.get(userId, docId); if (!snapshot) { return null; } return { - spaceId: userId, - docId, + spaceId: snapshot.spaceId, + docId: snapshot.docId, bin: snapshot.blob, - timestamp: snapshot.updatedAt.getTime(), - editor: snapshot.userId, + timestamp: snapshot.timestamp, + editor: snapshot.editorId, }; } protected async setDocSnapshot(snapshot: DocRecord) { // we always get lock before writing to user snapshot table, // so a simple upsert without testing on updatedAt is safe - await this.db.userSnapshot.upsert({ - where: { - userId_id: { - userId: snapshot.spaceId, - id: snapshot.docId, - }, - }, - update: { - blob: Buffer.from(snapshot.bin), - updatedAt: new Date(snapshot.timestamp), - }, - create: { - userId: snapshot.spaceId, - id: snapshot.docId, - blob: Buffer.from(snapshot.bin), - createdAt: new Date(snapshot.timestamp), - updatedAt: new Date(snapshot.timestamp), - }, + await this.models.userDoc.upsert({ + ...snapshot, + blob: Buffer.from(snapshot.bin), }); return true; } - protected override async lockDocForUpdate( - workspaceId: string, - docId: string - ) { - const lock = await this.mutex.acquire(`userspace:${workspaceId}:${docId}`); + protected override async lockDocForUpdate(spaceId: string, docId: string) { + const lock = await this.mutex.acquire(`userspace:${spaceId}:${docId}`); if (!lock) { throw new Error('Too many concurrent writings');