refactor(server): use userDoc model on PgUserspaceDocStorageAdapter (#9845)

close CLOUD-104
This commit is contained in:
fengmk2
2025-02-06 02:50:28 +00:00
parent 8e7cfb6115
commit a2acacea3b
2 changed files with 159 additions and 76 deletions

View File

@@ -1,5 +1,145 @@
import test from 'ava';
import { randomUUID } from 'node:crypto';
test('should test through userspace', t => {
t.pass();
import ava, { TestFn } from 'ava';
import { applyUpdate, Doc as YDoc } from 'yjs';
import { ConfigModule } from '../../base/config';
import {
DocStorageModule,
PgUserspaceDocStorageAdapter as Adapter,
} from '../../core/doc';
import { Models, type User } from '../../models';
import { createTestingModule, type TestingModule } from '../utils';
interface Context {
module: TestingModule;
models: Models;
adapter: Adapter;
}
const test = ava as TestFn<Context>;
test.before(async t => {
const module = await createTestingModule({
imports: [
ConfigModule.forRoot({
doc: {
manager: {
enableUpdateAutoMerging: false,
},
},
}),
DocStorageModule,
],
});
t.context.models = module.get(Models);
t.context.adapter = module.get(Adapter);
t.context.module = module;
});
let user: User;
test.beforeEach(async t => {
await t.context.module.initTestingDB();
user = await t.context.models.user.create({
email: 'test@affine.pro',
});
});
test.after(async t => {
await t.context.module.close();
});
test('should push user doc updates work', async t => {
const docId = randomUUID();
const doc = new YDoc();
const text = doc.getText('content');
const updates: Uint8Array[] = [];
doc.on('update', update => {
updates.push(update);
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
let timestamp = await t.context.adapter.pushDocUpdates(
user.id,
docId,
updates
);
t.truthy(timestamp);
let record = await t.context.adapter.getDoc(user.id, docId);
const newDoc = new YDoc();
applyUpdate(newDoc, record!.bin);
t.is(newDoc.getText('content').toString(), 'hello world');
// find all timestamps
const timestamps = await t.context.adapter.getSpaceDocTimestamps(user.id);
t.deepEqual(timestamps, {
[docId]: timestamp,
});
});
test('should delete user doc work', async t => {
const docId = randomUUID();
const doc = new YDoc();
const text = doc.getText('content');
const updates: Uint8Array[] = [];
doc.on('update', update => {
updates.push(update);
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
let timestamp = await t.context.adapter.pushDocUpdates(
user.id,
docId,
updates
);
t.truthy(timestamp);
let record = await t.context.adapter.getDoc(user.id, docId);
t.truthy(record);
await t.context.adapter.deleteDoc(user.id, docId);
record = await t.context.adapter.getDoc(user.id, docId);
t.falsy(record);
});
test('should delete all user docs work', async t => {
const docId = randomUUID();
const doc = new YDoc();
const text = doc.getText('content');
const updates: Uint8Array[] = [];
doc.on('update', update => {
updates.push(update);
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
let timestamp = await t.context.adapter.pushDocUpdates(
user.id,
docId,
updates
);
t.truthy(timestamp);
let record = await t.context.adapter.getDoc(user.id, docId);
t.truthy(record);
await t.context.adapter.deleteSpace(user.id);
record = await t.context.adapter.getDoc(user.id, docId);
t.falsy(record);
});

View File

@@ -1,15 +1,15 @@
import { Injectable } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import { Mutex } from '../../../base';
import { Models } from '../../../models';
import { DocStorageOptions } from '../options';
import { DocRecord, DocStorageAdapter } from '../storage';
@Injectable()
export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
constructor(
private readonly db: PrismaClient,
private readonly mutex: Mutex,
private readonly models: Models,
options: DocStorageOptions
) {
super(options);
@@ -42,7 +42,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
}
override async getDoc(spaceId: string, docId: string) {
return this.getDocSnapshot(spaceId, docId);
return await this.getDocSnapshot(spaceId, docId);
}
async pushDocUpdates(
@@ -79,103 +79,46 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
}
async deleteDoc(userId: string, docId: string) {
await this.db.userSnapshot.deleteMany({
where: {
userId,
id: docId,
},
});
await this.models.userDoc.delete(userId, docId);
}
async deleteSpace(userId: string) {
await this.db.userSnapshot.deleteMany({
where: {
userId,
},
});
await this.models.userDoc.deleteAllByUserId(userId);
}
async getSpaceDocTimestamps(userId: string, after?: number) {
const snapshots = await this.db.userSnapshot.findMany({
select: {
id: true,
updatedAt: true,
},
where: {
userId,
...(after
? {
updatedAt: {
gt: new Date(after),
},
}
: {}),
},
});
const result: Record<string, number> = {};
snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});
return result;
return await this.models.userDoc.findTimestampsByUserId(userId, after);
}
protected async getDocSnapshot(userId: string, docId: string) {
const snapshot = await this.db.userSnapshot.findUnique({
where: {
userId_id: {
userId,
id: docId,
},
},
});
const snapshot = await this.models.userDoc.get(userId, docId);
if (!snapshot) {
return null;
}
return {
spaceId: userId,
docId,
spaceId: snapshot.spaceId,
docId: snapshot.docId,
bin: snapshot.blob,
timestamp: snapshot.updatedAt.getTime(),
editor: snapshot.userId,
timestamp: snapshot.timestamp,
editor: snapshot.editorId,
};
}
protected async setDocSnapshot(snapshot: DocRecord) {
// we always get lock before writing to user snapshot table,
// so a simple upsert without testing on updatedAt is safe
await this.db.userSnapshot.upsert({
where: {
userId_id: {
userId: snapshot.spaceId,
id: snapshot.docId,
},
},
update: {
blob: Buffer.from(snapshot.bin),
updatedAt: new Date(snapshot.timestamp),
},
create: {
userId: snapshot.spaceId,
id: snapshot.docId,
blob: Buffer.from(snapshot.bin),
createdAt: new Date(snapshot.timestamp),
updatedAt: new Date(snapshot.timestamp),
},
await this.models.userDoc.upsert({
...snapshot,
blob: Buffer.from(snapshot.bin),
});
return true;
}
protected override async lockDocForUpdate(
workspaceId: string,
docId: string
) {
const lock = await this.mutex.acquire(`userspace:${workspaceId}:${docId}`);
protected override async lockDocForUpdate(spaceId: string, docId: string) {
const lock = await this.mutex.acquire(`userspace:${spaceId}:${docId}`);
if (!lock) {
throw new Error('Too many concurrent writings');