diff --git a/packages/backend/server/src/core/doc/manager.ts b/packages/backend/server/src/core/doc/manager.ts index f8c23407db..6a3836cb37 100644 --- a/packages/backend/server/src/core/doc/manager.ts +++ b/packages/backend/server/src/core/doc/manager.ts @@ -19,6 +19,7 @@ import { import { Cache, + CallTimer, Config, EventEmitter, type EventPayload, @@ -463,6 +464,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { }); } + @CallTimer('doc', 'upsert') private async upsert( workspaceId: string, guid: string, @@ -472,73 +474,87 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { updatedAt: Date, initialSeq?: number ) { - return this.lockSnapshotForUpsert(workspaceId, guid, async () => { - const blob = Buffer.from(encodeStateAsUpdate(doc)); + const blob = Buffer.from(encodeStateAsUpdate(doc)); - if (isEmptyBuffer(blob)) { - return false; + if (isEmptyBuffer(blob)) { + return false; + } + + const state = Buffer.from(encodeStateVector(doc)); + + await this.db.$queryRaw`BEGIN;`; + let committed = false; + const commit = async () => { + if (!committed) { + committed = true; + await this.db.$queryRaw`COMMIT;`; } + }; + try { + const [snapshot]: { + workspace_id: string; + id: string; + blob: Buffer; + state?: Buffer; + }[] = await this.db.$queryRaw` + -- LOCK TABLE "Snapshot" IN SHARE ROW EXCLUSIVE MODE; + SELECT * FROM snapshots WHERE workspace_id = ${workspaceId} AND guid = ${guid} limit 1 + FOR UPDATE; + `; - const state = Buffer.from(encodeStateVector(doc)); - - return await this.db.$transaction(async db => { - const snapshot = await db.snapshot.findUnique({ - where: { - id_workspaceId: { - id: guid, - workspaceId, - }, - }, - }); - - // update - if (snapshot) { - // only update if state is newer - if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) { - await db.snapshot.update({ - select: { - seq: true, - }, - where: { - id_workspaceId: { - workspaceId, - id: guid, - }, - }, - data: { - blob, - state, - updatedAt, - }, - }); - - return true; - } else { - return false; - } - } else { - // create - await db.snapshot.create({ + // update + if (snapshot) { + // only update if state is newer + if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) { + await this.db.snapshot.update({ select: { seq: true, }, + where: { + id_workspaceId: { + workspaceId, + id: guid, + }, + }, data: { - id: guid, - workspaceId, blob, state, - seq: initialSeq, - createdAt: updatedAt, updatedAt, }, }); return true; + } else { + return false; } - }); - }); - } + } else { + // create + // no record exists, should commit the previous row lock first + await commit(); + await this.db.snapshot.create({ + select: { + seq: true, + }, + data: { + id: guid, + workspaceId, + blob, + state, + seq: initialSeq, + createdAt: updatedAt, + updatedAt, + }, + }); + return true; + } + } catch (e) { + await this.db.$queryRaw`ROLLBACK;`; + throw e; + } finally { + await commit(); + } + } private async _get( workspaceId: string, guid: string @@ -559,6 +575,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { * Squash updates into a single update and save it as snapshot, * and delete the updates records at the same time. */ + @CallTimer('doc', 'squash') private async squash(updates: Update[], snapshot: Snapshot | null) { if (!updates.length) { throw new Error('No updates to squash'); @@ -761,18 +778,6 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { ); } - async lockSnapshotForUpsert( - workspaceId: string, - guid: string, - job: () => Promise - ) { - return this.doWithLock( - 'doc:manager:snapshot', - `${workspaceId}::${guid}`, - job - ); - } - @Cron(CronExpression.EVERY_MINUTE) async reportUpdatesQueueCount() { metrics.doc