fix(server): doc upsert race condition (#5755)

This commit is contained in:
liuyi
2024-01-31 19:08:52 +08:00
committed by LongYinan
parent 26db1d436d
commit f1ccc504b5

View File

@@ -19,6 +19,7 @@ import {
import { import {
Cache, Cache,
CallTimer,
Config, Config,
EventEmitter, EventEmitter,
type EventPayload, type EventPayload,
@@ -463,6 +464,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
}); });
} }
@CallTimer('doc', 'upsert')
private async upsert( private async upsert(
workspaceId: string, workspaceId: string,
guid: string, guid: string,
@@ -472,73 +474,87 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
updatedAt: Date, updatedAt: Date,
initialSeq?: number initialSeq?: number
) { ) {
return this.lockSnapshotForUpsert(workspaceId, guid, async () => { const blob = Buffer.from(encodeStateAsUpdate(doc));
const blob = Buffer.from(encodeStateAsUpdate(doc));
if (isEmptyBuffer(blob)) { if (isEmptyBuffer(blob)) {
return false; return false;
}
const state = Buffer.from(encodeStateVector(doc));
await this.db.$queryRaw`BEGIN;`;
let committed = false;
const commit = async () => {
if (!committed) {
committed = true;
await this.db.$queryRaw`COMMIT;`;
} }
};
try {
const [snapshot]: {
workspace_id: string;
id: string;
blob: Buffer;
state?: Buffer;
}[] = await this.db.$queryRaw`
-- LOCK TABLE "Snapshot" IN SHARE ROW EXCLUSIVE MODE;
SELECT * FROM snapshots WHERE workspace_id = ${workspaceId} AND guid = ${guid} limit 1
FOR UPDATE;
`;
const state = Buffer.from(encodeStateVector(doc)); // update
if (snapshot) {
return await this.db.$transaction(async db => { // only update if state is newer
const snapshot = await db.snapshot.findUnique({ if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) {
where: { await this.db.snapshot.update({
id_workspaceId: {
id: guid,
workspaceId,
},
},
});
// update
if (snapshot) {
// only update if state is newer
if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) {
await db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
blob,
state,
updatedAt,
},
});
return true;
} else {
return false;
}
} else {
// create
await db.snapshot.create({
select: { select: {
seq: true, seq: true,
}, },
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: { data: {
id: guid,
workspaceId,
blob, blob,
state, state,
seq: initialSeq,
createdAt: updatedAt,
updatedAt, updatedAt,
}, },
}); });
return true; return true;
} else {
return false;
} }
}); } else {
}); // create
} // no record exists, should commit the previous row lock first
await commit();
await this.db.snapshot.create({
select: {
seq: true,
},
data: {
id: guid,
workspaceId,
blob,
state,
seq: initialSeq,
createdAt: updatedAt,
updatedAt,
},
});
return true;
}
} catch (e) {
await this.db.$queryRaw`ROLLBACK;`;
throw e;
} finally {
await commit();
}
}
private async _get( private async _get(
workspaceId: string, workspaceId: string,
guid: string guid: string
@@ -559,6 +575,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
* Squash updates into a single update and save it as snapshot, * Squash updates into a single update and save it as snapshot,
* and delete the updates records at the same time. * and delete the updates records at the same time.
*/ */
@CallTimer('doc', 'squash')
private async squash(updates: Update[], snapshot: Snapshot | null) { private async squash(updates: Update[], snapshot: Snapshot | null) {
if (!updates.length) { if (!updates.length) {
throw new Error('No updates to squash'); throw new Error('No updates to squash');
@@ -761,18 +778,6 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
); );
} }
async lockSnapshotForUpsert<T>(
workspaceId: string,
guid: string,
job: () => Promise<T>
) {
return this.doWithLock(
'doc:manager:snapshot',
`${workspaceId}::${guid}`,
job
);
}
@Cron(CronExpression.EVERY_MINUTE) @Cron(CronExpression.EVERY_MINUTE)
async reportUpdatesQueueCount() { async reportUpdatesQueueCount() {
metrics.doc metrics.doc