From b40f007ccf9c6d4eb1c97f79ff604f2b05de0156 Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Thu, 6 Feb 2025 02:50:27 +0000 Subject: [PATCH] feat(server): doc model (#9834) close CLOUD-104 --- .../server/src/__tests__/models/doc.spec.ts | 589 ++++++++++++++++++ .../backend/server/src/models/common/doc.ts | 14 + .../backend/server/src/models/common/index.ts | 1 + packages/backend/server/src/models/doc.ts | 480 ++++++++++++++ packages/backend/server/src/models/index.ts | 3 + 5 files changed, 1087 insertions(+) create mode 100644 packages/backend/server/src/__tests__/models/doc.spec.ts create mode 100644 packages/backend/server/src/models/common/doc.ts create mode 100644 packages/backend/server/src/models/doc.ts diff --git a/packages/backend/server/src/__tests__/models/doc.spec.ts b/packages/backend/server/src/__tests__/models/doc.spec.ts new file mode 100644 index 0000000000..2f19651b84 --- /dev/null +++ b/packages/backend/server/src/__tests__/models/doc.spec.ts @@ -0,0 +1,589 @@ +import { randomUUID } from 'node:crypto'; + +import ava, { TestFn } from 'ava'; + +import { Config } from '../../base/config'; +import { DocModel } from '../../models/doc'; +import { type User, UserModel } from '../../models/user'; +import { type Workspace, WorkspaceModel } from '../../models/workspace'; +import { createTestingModule, type TestingModule } from '../utils'; + +interface Context { + config: Config; + module: TestingModule; + user: UserModel; + workspace: WorkspaceModel; + doc: DocModel; +} + +const test = ava as TestFn; + +test.before(async t => { + const module = await createTestingModule(); + + t.context.user = module.get(UserModel); + t.context.workspace = module.get(WorkspaceModel); + t.context.doc = module.get(DocModel); + t.context.config = module.get(Config); + t.context.module = module; +}); + +let user: User; +let workspace: Workspace; + +test.beforeEach(async t => { + await t.context.module.initTestingDB(); + user = await t.context.user.create({ + email: 'test@affine.pro', + }); + workspace = await t.context.workspace.create(user.id); +}); + +test.after(async t => { + await t.context.module.close(); +}); + +test('should create a batch updates on a doc', async t => { + const docId = randomUUID(); + const updates = await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + seq: 1, + }, + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp: Date.now() + 1000, + seq: 2, + }, + ]); + t.is(updates.count, 2); +}); + +test('should create error when createdAt timestamp is not unique', async t => { + const docId = randomUUID(); + const timestamp = Date.now(); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp, + editorId: user.id, + seq: 1, + }, + ]); + await t.throwsAsync( + t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp, + editorId: user.id, + seq: 2, + }, + ]), + { + message: + /Unique constraint failed on the fields: \(`workspace_id`,`guid`,`created_at`\)/, + } + ); +}); + +test('should find updates by spaceId and docId', async t => { + const docId = randomUUID(); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + seq: 1, + }, + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp: Date.now() + 1000, + editorId: user.id, + seq: 2, + }, + ]); + const foundUpdates = await t.context.doc.findUpdates(workspace.id, docId); + t.is(foundUpdates.length, 2); + t.deepEqual(foundUpdates[0].blob, Buffer.from('blob1')); + t.deepEqual(foundUpdates[1].blob, Buffer.from('blob2')); + + let count = await t.context.doc.getUpdateCount(workspace.id, docId); + t.is(count, 2); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob3'), + timestamp: Date.now(), + editorId: user.id, + seq: 3, + }, + ]); + count = await t.context.doc.getUpdateCount(workspace.id, docId); + t.is(count, 3); +}); + +test('should delete updates by spaceId, docId, and createdAts', async t => { + const docId = randomUUID(); + const timestamps = [Date.now(), Date.now() + 1000]; + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: timestamps[0], + editorId: user.id, + seq: 1, + }, + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp: timestamps[1], + seq: 2, + }, + ]); + let count = await t.context.doc.deleteUpdates( + workspace.id, + docId, + timestamps + ); + t.is(count, 2); + count = await t.context.doc.getUpdateCount(workspace.id, docId); + t.is(count, 0); + + // delete non-existing updates + count = await t.context.doc.deleteUpdates(workspace.id, docId, timestamps); + t.is(count, 0); +}); + +test('should get global update count', async t => { + const docId = randomUUID(); + const docId2 = randomUUID(); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + seq: 1, + }, + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp: Date.now() + 1000, + editorId: user.id, + seq: 2, + }, + { + spaceId: workspace.id, + docId: docId2, + blob: Buffer.from('blob2'), + timestamp: Date.now() + 1000, + editorId: user.id, + seq: 2, + }, + ]); + const count = await t.context.doc.getGlobalUpdateCount(); + t.is(count, 3); +}); + +test('should upsert a doc', async t => { + const snapshot = { + spaceId: workspace.id, + docId: randomUUID(), + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.upsert(snapshot); + const foundSnapshot = await t.context.doc.get( + snapshot.spaceId, + snapshot.docId + ); + t.truthy(foundSnapshot); + t.deepEqual(foundSnapshot!.blob, snapshot.blob); + t.is(foundSnapshot!.editorId, user.id); + t.is(foundSnapshot!.timestamp, snapshot.timestamp); + + // update snapshot's editorId + const otherUser = await t.context.user.create({ + email: 'test2@affine.pro', + }); + const newSnapshot = { + ...snapshot, + editorId: otherUser.id, + }; + await t.context.doc.upsert(newSnapshot); + const updatedSnapshot = await t.context.doc.get( + snapshot.spaceId, + snapshot.docId + ); + t.truthy(updatedSnapshot); + t.deepEqual(updatedSnapshot!.blob, snapshot.blob); + t.is(updatedSnapshot!.editorId, otherUser.id); +}); + +test('should get a doc meta', async t => { + const snapshot = { + spaceId: workspace.id, + docId: randomUUID(), + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.upsert(snapshot); + const meta = await t.context.doc.getMeta(snapshot.spaceId, snapshot.docId); + t.truthy(meta); + t.deepEqual(meta!.createdByUser, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + t.deepEqual(meta!.updatedByUser, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + t.truthy(meta!.createdAt); + t.deepEqual(meta!.updatedAt, new Date(snapshot.timestamp)); + + // update snapshot's editorId + const otherUser = await t.context.user.create({ + email: 'test2@affine.pro', + }); + const newSnapshot = { + ...snapshot, + editorId: otherUser.id, + timestamp: Date.now(), + }; + await t.context.doc.upsert(newSnapshot); + const updatedSnapshotMeta = await t.context.doc.getMeta( + snapshot.spaceId, + snapshot.docId + ); + t.truthy(updatedSnapshotMeta); + t.deepEqual(updatedSnapshotMeta!.createdByUser, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + t.deepEqual(updatedSnapshotMeta!.updatedByUser, { + id: otherUser.id, + name: otherUser.name, + avatarUrl: otherUser.avatarUrl, + }); + // createdAt should not change + t.deepEqual(updatedSnapshotMeta!.createdAt, meta!.createdAt); + t.deepEqual(updatedSnapshotMeta!.updatedAt, new Date(newSnapshot.timestamp)); + + // get null when doc not found + const notFoundMeta = await t.context.doc.getMeta( + snapshot.spaceId, + randomUUID() + ); + t.is(notFoundMeta, null); +}); + +test('should create a history record', async t => { + const snapshot = { + spaceId: workspace.id, + docId: randomUUID(), + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.upsert(snapshot); + const created = await t.context.doc.createHistory(snapshot, 1000); + t.truthy(created); + t.deepEqual(created.timestamp, snapshot.timestamp); + t.deepEqual(created.editor, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + const history = await t.context.doc.getHistory( + snapshot.spaceId, + snapshot.docId, + snapshot.timestamp + ); + t.deepEqual(history, { + ...created, + blob: snapshot.blob, + }); +}); + +test('should return null when history timestamp not match', async t => { + const snapshot = { + spaceId: workspace.id, + docId: randomUUID(), + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.upsert(snapshot); + await t.context.doc.createHistory(snapshot, 1000); + const history = await t.context.doc.getHistory( + snapshot.spaceId, + snapshot.docId, + snapshot.timestamp + 1 + ); + t.is(history, null); +}); + +test('should find history records', async t => { + const docId = randomUUID(); + const snapshot1 = { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: Date.now() - 1000, + editorId: user.id, + }; + const snapshot2 = { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.createHistory(snapshot1, 1000); + await t.context.doc.createHistory(snapshot2, 1000); + let histories = await t.context.doc.findHistories(workspace.id, docId); + t.is(histories.length, 2); + t.deepEqual(histories[0].timestamp, snapshot2.timestamp); + t.deepEqual(histories[0].editor, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + t.deepEqual(histories[1].timestamp, snapshot1.timestamp); + t.deepEqual(histories[1].editor, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + // only take 1 history, order by timestamp desc + histories = await t.context.doc.findHistories(workspace.id, docId, { + take: 1, + }); + t.is(histories.length, 1); + t.deepEqual(histories[0].timestamp, snapshot2.timestamp); + t.deepEqual(histories[0].editor, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + // get empty history + histories = await t.context.doc.findHistories(workspace.id, docId, { + before: Date.now() - 1000000, + }); + t.is(histories.length, 0); +}); + +test('should get latest history', async t => { + const docId = randomUUID(); + const snapshot1 = { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: Date.now() - 1000, + editorId: user.id, + }; + const snapshot2 = { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.createHistory(snapshot1, 1000); + await t.context.doc.createHistory(snapshot2, 1000); + const history = await t.context.doc.getLatestHistory(workspace.id, docId); + t.truthy(history); + t.deepEqual(history!.timestamp, snapshot2.timestamp); + t.deepEqual(history!.editor, { + id: user.id, + name: user.name, + avatarUrl: user.avatarUrl, + }); + // return null when no history + const emptyHistory = await t.context.doc.getLatestHistory( + workspace.id, + randomUUID() + ); + t.is(emptyHistory, null); +}); + +test('should delete a doc, including histories, snapshots and updates', async t => { + const docId = randomUUID(); + const snapshot = { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.upsert(snapshot); + await t.context.doc.createHistory(snapshot, 1000); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob2'), + timestamp: Date.now(), + editorId: user.id, + seq: 1, + }, + ]); + await t.context.doc.delete(workspace.id, docId); + const foundSnapshot = await t.context.doc.get(workspace.id, docId); + t.is(foundSnapshot, null); + const foundHistory = await t.context.doc.getLatestHistory( + workspace.id, + docId + ); + t.is(foundHistory, null); + const foundUpdates = await t.context.doc.findUpdates(workspace.id, docId); + t.is(foundUpdates.length, 0); +}); + +test('should delete all docs in a workspace', async t => { + const docId1 = randomUUID(); + const docId2 = randomUUID(); + const snapshot1 = { + spaceId: workspace.id, + docId: docId1, + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + }; + const snapshot2 = { + spaceId: workspace.id, + docId: docId2, + blob: Buffer.from('blob2'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.upsert(snapshot1); + await t.context.doc.createHistory(snapshot1, 1000); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId: docId1, + blob: Buffer.from('blob2'), + timestamp: Date.now(), + editorId: user.id, + seq: 1, + }, + ]); + await t.context.doc.upsert(snapshot2); + await t.context.doc.createHistory(snapshot2, 1000); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId: docId2, + blob: Buffer.from('blob2'), + timestamp: Date.now(), + editorId: user.id, + seq: 1, + }, + ]); + const deletedCount = await t.context.doc.deleteAllByWorkspaceId(workspace.id); + t.is(deletedCount, 2); + const foundSnapshot1 = await t.context.doc.get(workspace.id, docId1); + t.is(foundSnapshot1, null); + const foundHistory1 = await t.context.doc.getLatestHistory( + workspace.id, + docId1 + ); + t.is(foundHistory1, null); + const foundUpdates1 = await t.context.doc.findUpdates(workspace.id, docId1); + t.is(foundUpdates1.length, 0); + const foundSnapshot2 = await t.context.doc.get(workspace.id, docId2); + t.is(foundSnapshot2, null); + const foundHistory2 = await t.context.doc.getLatestHistory( + workspace.id, + docId2 + ); + t.is(foundHistory2, null); + const foundUpdates2 = await t.context.doc.findUpdates(workspace.id, docId2); + t.is(foundUpdates2.length, 0); +}); + +test('should find all docs timestamps in a workspace', async t => { + const docId1 = randomUUID(); + const docId2 = randomUUID(); + const timestamp1 = Date.now(); + const timestamp2 = Date.now() + 1000; + const timestamp3 = Date.now() + 2000; + const snapshot1 = { + spaceId: workspace.id, + docId: docId1, + blob: Buffer.from('blob1'), + timestamp: timestamp1, + editorId: user.id, + }; + const snapshot2 = { + spaceId: workspace.id, + docId: docId2, + blob: Buffer.from('blob2'), + timestamp: timestamp2, + editorId: user.id, + }; + await t.context.doc.upsert(snapshot1); + await t.context.doc.createUpdates([ + { + spaceId: workspace.id, + docId: docId1, + blob: Buffer.from('blob2'), + timestamp: timestamp3, + editorId: user.id, + seq: 1, + }, + ]); + await t.context.doc.upsert(snapshot2); + const timestamps = await t.context.doc.findTimestampsByWorkspaceId( + workspace.id + ); + t.deepEqual(timestamps, { + [docId1]: timestamp3, + [docId2]: timestamp2, + }); +}); + +test('should increase doc seq', async t => { + const docId = randomUUID(); + const snapshot = { + spaceId: workspace.id, + docId, + blob: Buffer.from('blob1'), + timestamp: Date.now(), + editorId: user.id, + }; + await t.context.doc.upsert(snapshot); + const seq1 = await t.context.doc.increaseSeq(workspace.id, docId, 88); + t.is(seq1, 88); + const seq2 = await t.context.doc.increaseSeq(workspace.id, docId, 2); + t.is(seq2, 90); + // hit max seq, then reset to zero + await t.context.doc.increaseSeq(workspace.id, docId, 0x3fffffff); + const seq3 = await t.context.doc.increaseSeq(workspace.id, docId, 1); + t.is(seq3, 1); +}); diff --git a/packages/backend/server/src/models/common/doc.ts b/packages/backend/server/src/models/common/doc.ts new file mode 100644 index 0000000000..9206e7e1ea --- /dev/null +++ b/packages/backend/server/src/models/common/doc.ts @@ -0,0 +1,14 @@ +import type { User } from '@prisma/client'; + +export interface Doc { + /** + * Can be workspace or user id. + */ + spaceId: string; + docId: string; + blob: Buffer; + timestamp: number; + editorId?: string; +} + +export type DocEditor = Pick; diff --git a/packages/backend/server/src/models/common/index.ts b/packages/backend/server/src/models/common/index.ts index 445e67262d..8f93c2e5cf 100644 --- a/packages/backend/server/src/models/common/index.ts +++ b/packages/backend/server/src/models/common/index.ts @@ -1,2 +1,3 @@ +export * from './doc'; export * from './feature'; export * from './page'; diff --git a/packages/backend/server/src/models/doc.ts b/packages/backend/server/src/models/doc.ts new file mode 100644 index 0000000000..e6172d9ebe --- /dev/null +++ b/packages/backend/server/src/models/doc.ts @@ -0,0 +1,480 @@ +import { Injectable } from '@nestjs/common'; +import { Transactional } from '@nestjs-cls/transactional'; +import type { Update } from '@prisma/client'; + +import { BaseModel } from './base'; +import type { Doc, DocEditor } from './common'; + +export interface DocRecord extends Doc { + // TODO: deprecated field, remove in the future + seq: number | null; +} + +export interface DocHistorySimple { + timestamp: number; + editor: DocEditor | null; +} + +export interface DocHistory { + blob: Buffer; + timestamp: number; + editor: DocEditor | null; +} + +export interface DocHistoryFilter { + /** + * timestamp to filter histories before. + */ + before?: number; + /** + * limit the number of histories to return. + * + * Default to `100`. + */ + take?: number; +} + +/** + * Workspace Doc Model + * + * This model is responsible for managing the workspace docs, including: + * - Updates: the changes made to the doc. + * - History: the doc history of the doc. + * - Doc: the doc itself. + */ +@Injectable() +export class DocModel extends BaseModel { + // #region Update + + private updateToDocRecord(row: Update): DocRecord { + return { + spaceId: row.workspaceId, + docId: row.id, + blob: row.blob, + timestamp: row.createdAt.getTime(), + editorId: row.createdBy || undefined, + seq: row.seq, + }; + } + + private docRecordToUpdate(record: DocRecord): Update { + return { + workspaceId: record.spaceId, + id: record.docId, + blob: record.blob, + createdAt: new Date(record.timestamp), + createdBy: record.editorId || null, + seq: record.seq, + }; + } + + private get userSelectFields() { + return { + select: { + id: true, + name: true, + avatarUrl: true, + }, + }; + } + + async createUpdates(updates: DocRecord[]) { + return await this.db.update.createMany({ + data: updates.map(r => this.docRecordToUpdate(r)), + }); + } + + /** + * Find updates by workspaceId and docId. + */ + async findUpdates(workspaceId: string, docId: string): Promise { + const rows = await this.db.update.findMany({ + where: { + workspaceId, + id: docId, + }, + orderBy: { + createdAt: 'asc', + }, + }); + return rows.map(r => this.updateToDocRecord(r)); + } + + /** + * Get the pending updates count by workspaceId and docId. + */ + async getUpdateCount(workspaceId: string, docId: string) { + return await this.db.update.count({ + where: { + workspaceId, + id: docId, + }, + }); + } + + /** + * Get the global pending updates count. + */ + async getGlobalUpdateCount() { + return await this.db.update.count(); + } + + /** + * Delete updates by workspaceId, docId, and createdAts. + */ + async deleteUpdates( + workspaceId: string, + docId: string, + timestamps: number[] + ) { + const { count } = await this.db.update.deleteMany({ + where: { + workspaceId, + id: docId, + createdAt: { + in: timestamps.map(t => new Date(t)), + }, + }, + }); + this.logger.log( + `Deleted ${count} updates for workspace ${workspaceId} doc ${docId}` + ); + return count; + } + + // #endregion + + // #region History + + /** + * Create a doc history with a max age. + */ + async createHistory( + snapshot: Doc, + maxAge: number + ): Promise { + const row = await this.db.snapshotHistory.create({ + select: { + timestamp: true, + createdByUser: this.userSelectFields, + }, + data: { + workspaceId: snapshot.spaceId, + id: snapshot.docId, + timestamp: new Date(snapshot.timestamp), + blob: snapshot.blob, + createdBy: snapshot.editorId, + expiredAt: new Date(Date.now() + maxAge), + }, + }); + return { + timestamp: row.timestamp.getTime(), + editor: row.createdByUser, + }; + } + + /** + * Find doc history by workspaceId and docId. + * + * Only including timestamp, createdByUser + */ + async findHistories( + workspaceId: string, + docId: string, + filter?: DocHistoryFilter + ): Promise { + const rows = await this.db.snapshotHistory.findMany({ + select: { + timestamp: true, + createdByUser: this.userSelectFields, + }, + where: { + workspaceId, + id: docId, + timestamp: { + lt: filter?.before ? new Date(filter.before) : new Date(), + }, + }, + orderBy: { + timestamp: 'desc', + }, + take: filter?.take ?? 100, + }); + return rows.map(r => ({ + timestamp: r.timestamp.getTime(), + editor: r.createdByUser, + })); + } + + /** + * Get the history of a doc at a specific timestamp. + * + * Including blob and createdByUser + */ + async getHistory( + workspaceId: string, + docId: string, + timestamp: number + ): Promise { + const row = await this.db.snapshotHistory.findUnique({ + where: { + workspaceId_id_timestamp: { + workspaceId, + id: docId, + timestamp: new Date(timestamp), + }, + }, + include: { + createdByUser: this.userSelectFields, + }, + }); + if (!row) { + return null; + } + return { + blob: row.blob, + timestamp: row.timestamp.getTime(), + editor: row.createdByUser, + }; + } + + /** + * Get the latest history of a doc. + * + * Only including timestamp, createdByUser + */ + async getLatestHistory( + workspaceId: string, + docId: string + ): Promise { + const row = await this.db.snapshotHistory.findFirst({ + where: { + workspaceId, + id: docId, + }, + select: { + timestamp: true, + createdByUser: this.userSelectFields, + }, + orderBy: { + timestamp: 'desc', + }, + }); + if (!row) { + return null; + } + return { + timestamp: row.timestamp.getTime(), + editor: row.createdByUser, + }; + } + + // #endregion + + // #region Doc + + /** + * insert or update a doc. + */ + async upsert(doc: Doc) { + const { spaceId, docId, blob, timestamp, editorId } = doc; + const updatedAt = new Date(timestamp); + // CONCERNS: + // i. Because we save the real user's last seen action time as `updatedAt`, + // it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save. + // + // ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint. + // In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot. + // where: { workspaceId_id: {}, updatedAt: { lt: updatedAt } } + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + const result: { updatedAt: Date }[] = await this.db.$queryRaw` + INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at", "created_by", "updated_by") + VALUES (${spaceId}, ${docId}, ${blob}, DEFAULT, ${updatedAt}, ${editorId}, ${editorId}) + ON CONFLICT ("workspace_id", "guid") + DO UPDATE SET "blob" = ${blob}, "updated_at" = ${updatedAt}, "updated_by" = ${editorId} + WHERE "snapshots"."workspace_id" = ${spaceId} AND "snapshots"."guid" = ${docId} AND "snapshots"."updated_at" <= ${updatedAt} + RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt" + `; + + // if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process, + // the updates has been applied to current `doc` must have been seen by the other process as well. + // The `updatedSnapshot` will be `undefined` in this case. + return result.at(0); + } + + /** + * Get a doc by workspaceId and docId. + */ + async get(workspaceId: string, docId: string): Promise { + const row = await this.db.snapshot.findUnique({ + where: { + workspaceId_id: { + workspaceId, + id: docId, + }, + }, + }); + if (!row) { + return null; + } + return { + spaceId: row.workspaceId, + docId: row.id, + blob: row.blob, + timestamp: row.updatedAt.getTime(), + editorId: row.updatedBy || undefined, + }; + } + + async getMeta(workspaceId: string, docId: string) { + return await this.db.snapshot.findUnique({ + where: { + workspaceId_id: { + workspaceId, + id: docId, + }, + }, + select: { + createdAt: true, + updatedAt: true, + createdByUser: this.userSelectFields, + updatedByUser: this.userSelectFields, + }, + }); + } + + /** + * @deprecated updates do not rely on seq number anymore + */ + async increaseSeq(workspaceId: string, docId: string, increment: number) { + const MAX_SEQ_NUM = 0x3fffffff; // u31 + const { seq } = await this.db.snapshot.update({ + select: { + seq: true, + }, + where: { + workspaceId_id: { + workspaceId, + id: docId, + }, + }, + data: { + seq: { + increment, + }, + }, + }); + + if (!seq) { + return increment; + } + + // reset + if (seq >= MAX_SEQ_NUM) { + await this.db.snapshot.update({ + select: { + seq: true, + }, + where: { + workspaceId_id: { + workspaceId, + id: docId, + }, + }, + data: { + seq: 0, + }, + }); + } + return seq; + } + + /** + * Delete a doc and it's updates and snapshots. + */ + @Transactional() + async delete(workspaceId: string, docId: string) { + const ident = { where: { workspaceId, id: docId } }; + const { count: snapshots } = await this.db.snapshot.deleteMany(ident); + const { count: updates } = await this.db.update.deleteMany(ident); + const { count: histories } = + await this.db.snapshotHistory.deleteMany(ident); + this.logger.log( + `Deleted workspace ${workspaceId} doc ${docId}, including ${snapshots} snapshots, ${updates} updates, and ${histories} histories` + ); + } + + /** + * Delete the whole workspace's docs and their updates and snapshots. + */ + @Transactional() + async deleteAllByWorkspaceId(workspaceId: string) { + const ident = { where: { workspaceId } }; + const { count: snapshots } = await this.db.snapshot.deleteMany(ident); + const { count: updates } = await this.db.update.deleteMany(ident); + const { count: histories } = + await this.db.snapshotHistory.deleteMany(ident); + this.logger.log( + `Deleted workspace ${workspaceId} all docs, including ${snapshots} snapshots, ${updates} updates, and ${histories} histories` + ); + return snapshots; + } + + /** + * Find the timestamps of docs by workspaceId. + * + * @param after Only return timestamps after this timestamp. + */ + async findTimestampsByWorkspaceId(workspaceId: string, after?: number) { + const snapshots = await this.db.snapshot.findMany({ + select: { + id: true, + updatedAt: true, + }, + where: { + workspaceId, + ...(after + ? { + updatedAt: { + gt: new Date(after), + }, + } + : {}), + }, + }); + + const updates = await this.db.update.groupBy({ + where: { + workspaceId, + ...(after + ? { + // [createdAt] in updates table is indexed, so it's fast + createdAt: { + gt: new Date(after), + }, + } + : {}), + }, + by: ['id'], + _max: { + createdAt: true, + }, + }); + + const result: Record = {}; + + snapshots.forEach(s => { + result[s.id] = s.updatedAt.getTime(); + }); + + updates.forEach(u => { + if (u._max.createdAt) { + result[u.id] = u._max.createdAt.getTime(); + } + }); + + return result; + } + + // #endregion +} diff --git a/packages/backend/server/src/models/index.ts b/packages/backend/server/src/models/index.ts index 65522a0f71..44067c03e5 100644 --- a/packages/backend/server/src/models/index.ts +++ b/packages/backend/server/src/models/index.ts @@ -7,6 +7,7 @@ import { import { ModuleRef } from '@nestjs/core'; import { ApplyType } from '../base'; +import { DocModel } from './doc'; import { FeatureModel } from './feature'; import { PageModel } from './page'; import { MODELS_SYMBOL } from './provider'; @@ -26,6 +27,7 @@ const MODELS = { page: PageModel, userFeature: UserFeatureModel, workspaceFeature: WorkspaceFeatureModel, + doc: DocModel, }; type ModelsType = { @@ -78,6 +80,7 @@ const ModelsSymbolProvider: ExistingProvider = { export class ModelsModule {} export * from './common'; +export * from './doc'; export * from './feature'; export * from './page'; export * from './session';