fix(server): doc upsert without row lock (#5765)

This commit is contained in:
liuyi
2024-02-01 09:49:02 +00:00
parent 7d951a975f
commit 2f3c6f104e
4 changed files with 189 additions and 172 deletions

View File

@@ -265,7 +265,9 @@ model Snapshot {
seq Int @default(0) @db.Integer
state Bytes? @db.ByteA
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6)
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(6)
// the `updated_at` field will not record the time of record changed,
// but the created time of last seen update that has been merged into snapshot.
updatedAt DateTime @map("updated_at") @db.Timestamptz(6)
@@id([id, workspaceId])
@@map("snapshots")

View File

@@ -10,7 +10,6 @@ import { chunk } from 'lodash-es';
import { defer, retry } from 'rxjs';
import {
applyUpdate,
decodeStateVector,
Doc,
encodeStateAsUpdate,
encodeStateVector,
@@ -46,36 +45,6 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
return compare(yBinary, yBinary2, true);
}
/**
* Detect whether rhs state is newer than lhs state.
*
* How could we tell a state is newer:
*
* i. if the state vector size is larger, it's newer
* ii. if the state vector size is same, compare each client's state
*/
function isStateNewer(lhs: Buffer, rhs: Buffer): boolean {
const lhsVector = decodeStateVector(lhs);
const rhsVector = decodeStateVector(rhs);
if (lhsVector.size < rhsVector.size) {
return true;
}
for (const [client, state] of lhsVector) {
const rstate = rhsVector.get(client);
if (!rstate) {
return false;
}
if (state < rstate) {
return true;
}
}
return false;
}
export function isEmptyBuffer(buf: Buffer): boolean {
return (
buf.length === 0 ||
@@ -120,6 +89,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
this.destroy();
}
@CallTimer('doc', 'yjs_recover_updates_to_doc')
private recoverDoc(...updates: Buffer[]): Promise<Doc> {
const doc = new Doc();
const chunks = chunk(updates, 10);
@@ -383,7 +353,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
const updates = await this.getUpdates(workspaceId, guid);
if (updates.length) {
const doc = await this.squash(updates, snapshot);
const doc = await this.squash(snapshot, updates);
return Buffer.from(encodeStateVector(doc));
}
@@ -464,97 +434,94 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
});
}
/**
* @returns whether the snapshot is updated to the latest, `undefined` means the doc to be upserted is outdated.
*/
@CallTimer('doc', 'upsert')
private async upsert(
workspaceId: string,
guid: string,
doc: Doc,
// we always delay the snapshot update to avoid db overload,
// so the value of `updatedAt` will not be accurate to user's real action time
// so the value of auto updated `updatedAt` by db will never be accurate to user's real action time
updatedAt: Date,
initialSeq?: number
seq: number
) {
const blob = Buffer.from(encodeStateAsUpdate(doc));
if (isEmptyBuffer(blob)) {
return false;
return undefined;
}
const state = Buffer.from(encodeStateVector(doc));
await this.db.$queryRaw`BEGIN;`;
let committed = false;
const commit = async () => {
if (!committed) {
committed = true;
await this.db.$queryRaw`COMMIT;`;
}
};
// CONCERNS:
// i. Because we save the real user's last seen action time as `updatedAt`,
// it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save.
//
// ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint.
// In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot.
// where: { id_workspaceId: {}, updatedAt: { lt: updatedAt } }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
//
// iii. Only set the seq number when creating the snapshot.
// For updating scenario, the seq number will be updated when updates pushed to db.
try {
const [snapshot]: {
workspace_id: string;
id: string;
blob: Buffer;
state?: Buffer;
}[] = await this.db.$queryRaw`
-- LOCK TABLE "Snapshot" IN SHARE ROW EXCLUSIVE MODE;
SELECT * FROM snapshots WHERE workspace_id = ${workspaceId} AND guid = ${guid} limit 1
FOR UPDATE;
const result: { updatedAt: Date }[] = await this.db.$queryRaw`
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "state", "seq", "created_at", "updated_at")
VALUES (${workspaceId}, ${guid}, ${blob}, ${state}, ${seq}, DEFAULT, ${updatedAt})
ON CONFLICT ("workspace_id", "guid")
DO UPDATE SET "blob" = ${blob}, "state" = ${state}, "updated_at" = ${updatedAt}, "seq" = ${seq}
WHERE "snapshots"."workspace_id" = ${workspaceId} AND "snapshots"."guid" = ${guid} AND "snapshots"."updated_at" <= ${updatedAt}
RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt"
`;
// update
if (snapshot) {
// only update if state is newer
if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) {
await this.db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
blob,
state,
updatedAt,
},
});
// const result = await this.db.snapshot.upsert({
// select: {
// updatedAt: true,
// seq: true,
// },
// where: {
// id_workspaceId: {
// workspaceId,
// id: guid,
// },
// ⬇️ NOT SUPPORTED BY PRISMA YET
// updatedAt: {
// lt: updatedAt,
// },
// },
// update: {
// blob,
// state,
// updatedAt,
// },
// create: {
// workspaceId,
// id: guid,
// blob,
// state,
// updatedAt,
// seq,
// },
// });
return true;
} else {
return false;
}
} else {
// create
// no record exists, should commit the previous row lock first
await commit();
await this.db.snapshot.create({
select: {
seq: true,
},
data: {
id: guid,
workspaceId,
blob,
state,
seq: initialSeq,
createdAt: updatedAt,
updatedAt,
},
});
// if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process,
// the updates has been applied to current `doc` must have been seen by the other process as well.
// The `updatedSnapshot` will be `undefined` in this case.
const updatedSnapshot = result.at(0);
return true;
if (!updatedSnapshot) {
return undefined;
}
return true;
} catch (e) {
await this.db.$queryRaw`ROLLBACK;`;
throw e;
} finally {
await commit();
this.logger.error('Failed to upsert snapshot', e);
return false;
}
}
private async _get(
workspaceId: string,
guid: string
@@ -564,7 +531,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
if (updates.length) {
return {
doc: await this.squash(updates, snapshot),
doc: await this.squash(snapshot, updates),
};
}
@@ -576,17 +543,16 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
* and delete the updates records at the same time.
*/
@CallTimer('doc', 'squash')
private async squash(updates: Update[], snapshot: Snapshot | null) {
private async squash(snapshot: Snapshot | null, updates: Update[]) {
if (!updates.length) {
throw new Error('No updates to squash');
}
const first = updates[0];
const last = updates[updates.length - 1];
const { id, workspaceId } = first;
const last = updates[updates.length - 1];
const { id, workspaceId } = last;
const doc = await this.applyUpdates(
first.id,
id,
snapshot ? snapshot.blob : Buffer.from([0, 0]),
...updates.map(u => u.blob)
);
@@ -617,19 +583,24 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
);
}
// always delete updates
// the upsert will return false if the state is not newer, so we don't need to worry about it
const { count } = await this.db.update.deleteMany({
where: {
id,
workspaceId,
seq: {
in: updates.map(u => u.seq),
// we will keep the updates only if the upsert failed on unknown reason
// `done === undefined` means the updates is outdated(have already been merged by other process), safe to be deleted
// `done === true` means the upsert is successful, safe to be deleted
if (done !== false) {
// always delete updates
// the upsert will return false if the state is not newer, so we don't need to worry about it
const { count } = await this.db.update.deleteMany({
where: {
id,
workspaceId,
seq: {
in: updates.map(u => u.seq),
},
},
},
});
});
await this.updateCachedUpdatesCount(workspaceId, id, -count);
await this.updateCachedUpdatesCount(workspaceId, id, -count);
}
return doc;
}

View File

@@ -277,6 +277,7 @@ export class WorkspaceResolver {
id: workspace.id,
workspaceId: workspace.id,
blob: buffer,
updatedAt: new Date(),
},
});
}

View File

@@ -4,12 +4,7 @@ import { TestingModule } from '@nestjs/testing';
import { PrismaClient } from '@prisma/client';
import test from 'ava';
import * as Sinon from 'sinon';
import {
applyUpdate,
decodeStateVector,
Doc as YDoc,
encodeStateAsUpdate,
} from 'yjs';
import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { DocManager, DocModule } from '../src/core/doc';
import { QuotaModule } from '../src/core/quota';
@@ -277,72 +272,120 @@ test('should throw if meet max retry times', async t => {
t.is(stub.callCount, 5);
});
test('should not update snapshot if state is outdated', async t => {
const db = m.get(PrismaClient);
test('should be able to insert the snapshot if it is new created', async t => {
const manager = m.get(DocManager);
await db.snapshot.create({
data: {
id: '2',
workspaceId: '2',
blob: Buffer.from([0, 0]),
seq: 1,
},
});
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
const update = encodeStateAsUpdate(doc);
await Promise.all(updates.map(update => manager.push('2', '2', update)));
await manager.push('1', '1', Buffer.from(update));
const updateWith3Records = await manager.getUpdates('2', '2');
text.insert(11, '!');
await manager.push('2', '2', updates[3]);
const updateWith4Records = await manager.getUpdates('2', '2');
// Simulation:
// Node A get 3 updates and squash them at time 1, will finish at time 10
// Node B get 4 updates and squash them at time 3, will finish at time 8
// Node B finish the squash first, and update the snapshot
// Node A finish the squash later, and update the snapshot to an outdated state
// Time: ---------------------->
// A: ^get ^upsert
// B: ^get ^upsert
//
// We should avoid such situation
const updates = await manager.getUpdates('1', '1');
t.is(updates.length, 1);
// @ts-expect-error private
await manager.squash(updateWith4Records, null);
// @ts-expect-error private
await manager.squash(updateWith3Records, null);
const snapshot = await manager.squash(null, updates);
const result = await db.snapshot.findUnique({
t.truthy(snapshot);
t.is(snapshot.getText('content').toString(), 'hello');
const restUpdates = await manager.getUpdates('1', '1');
t.is(restUpdates.length, 0);
});
test('should be able to merge updates into snapshot', async t => {
const manager = m.get(DocManager);
const updates: Buffer[] = [];
{
const doc = new YDoc();
doc.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
}
{
await manager.batchPush('1', '1', updates.slice(0, 2));
// do the merge
const doc = (await manager.get('1', '1'))!;
t.is(doc.getText('content').toString(), 'helloworld');
}
{
await manager.batchPush('1', '1', updates.slice(2));
const doc = (await manager.get('1', '1'))!;
t.is(doc.getText('content').toString(), 'hello world!');
}
const restUpdates = await manager.getUpdates('1', '1');
t.is(restUpdates.length, 0);
});
test('should not update snapshot if doc is outdated', async t => {
const manager = m.get(DocManager);
const db = m.get(PrismaClient);
const updates: Buffer[] = [];
{
const doc = new YDoc();
doc.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
}
await manager.batchPush('2', '1', updates.slice(0, 2)); // 'helloworld'
// merge updates into snapshot
await manager.get('2', '1');
// fake the snapshot is a lot newer
await db.snapshot.update({
where: {
id_workspaceId: {
id: '2',
workspaceId: '2',
id: '1',
},
},
data: {
updatedAt: new Date(Date.now() + 10000),
},
});
if (!result) {
t.fail('snapshot not found');
return;
{
const snapshot = await manager.getSnapshot('2', '1');
await manager.batchPush('2', '1', updates.slice(2)); // 'hello world!'
const updateRecords = await manager.getUpdates('2', '1');
// @ts-expect-error private
const doc = await manager.squash(snapshot, updateRecords);
// all updated will merged into doc not matter it's timestamp is outdated or not,
// but the snapshot record will not be updated
t.is(doc.getText('content').toString(), 'hello world!');
}
const state = decodeStateVector(result.state!);
t.is(state.get(doc.clientID), 12);
{
const doc = new YDoc();
applyUpdate(doc, (await manager.getSnapshot('2', '1'))!.blob);
// the snapshot will not get touched if the new doc's timestamp is outdated
t.is(doc.getText('content').toString(), 'helloworld');
const d = new YDoc();
applyUpdate(d, result.blob!);
const dtext = d.getText('content');
t.is(dtext.toString(), 'hello world!');
// the updates are known as outdated, so they will be deleted
t.is((await manager.getUpdates('2', '1')).length, 0);
}
});