fix(server): avoid snapshot write conflict (#5174)

This commit is contained in:
liuyi
2023-12-04 11:12:15 +00:00
parent d911d21d1c
commit b4b4a3b625
2 changed files with 230 additions and 70 deletions

View File

@@ -11,6 +11,7 @@ import { chunk } from 'lodash-es';
import { defer, retry } from 'rxjs';
import {
applyUpdate,
decodeStateVector,
Doc,
encodeStateAsUpdate,
encodeStateVector,
@@ -40,6 +41,36 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
return compare(yBinary, yBinary2, true);
}
/**
* Detect whether rhs state is newer than lhs state.
*
* How could we tell a state is newer:
*
* i. if the state vector size is larger, it's newer
* ii. if the state vector size is same, compare each client's state
*/
function isStateNewer(lhs: Buffer, rhs: Buffer): boolean {
const lhsVector = decodeStateVector(lhs);
const rhsVector = decodeStateVector(rhs);
if (lhsVector.size < rhsVector.size) {
return true;
}
for (const [client, state] of lhsVector) {
const rstate = rhsVector.get(client);
if (!rstate) {
return false;
}
if (state < rstate) {
return true;
}
}
return false;
}
function isEmptyBuffer(buf: Buffer): boolean {
return (
buf.length === 0 ||
@@ -374,23 +405,17 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
}
const { id, workspaceId } = candidate;
// acquire lock
const ok = await this.lockUpdatesForAutoSquash(workspaceId, id);
if (!ok) {
return;
}
try {
await this._get(workspaceId, id);
} catch (e) {
this.logger.error(
`Failed to apply updates for workspace: ${workspaceId}, guid: ${id}`
);
this.logger.error(e);
} finally {
await this.unlockUpdatesForAutoSquash(workspaceId, id);
}
await this.lockUpdatesForAutoSquash(workspaceId, id, async () => {
try {
await this._get(workspaceId, id);
} catch (e) {
this.logger.error(
`Failed to apply updates for workspace: ${workspaceId}, guid: ${id}`
);
this.logger.error(e);
}
});
}
private async getAutoSquashCandidate() {
@@ -414,34 +439,67 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
doc: Doc,
initialSeq?: number
) {
const blob = Buffer.from(encodeStateAsUpdate(doc));
const state = Buffer.from(encodeStateVector(doc));
return this.lockSnapshotForUpsert(workspaceId, guid, async () => {
const blob = Buffer.from(encodeStateAsUpdate(doc));
if (isEmptyBuffer(blob)) {
return;
}
if (isEmptyBuffer(blob)) {
return false;
}
await this.db.snapshot.upsert({
select: {
seq: true,
},
where: {
id_workspaceId: {
id: guid,
workspaceId,
},
},
create: {
id: guid,
workspaceId,
blob,
state,
seq: initialSeq,
},
update: {
blob,
state,
},
const state = Buffer.from(encodeStateVector(doc));
return await this.db.$transaction(async db => {
const snapshot = await db.snapshot.findUnique({
where: {
id_workspaceId: {
id: guid,
workspaceId,
},
},
});
// update
if (snapshot) {
// only update if state is newer
if (isStateNewer(snapshot.state ?? Buffer.from([0]), state)) {
await db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
blob,
state,
},
});
return true;
} else {
return false;
}
} else {
// create
await db.snapshot.create({
select: {
seq: true,
},
data: {
id: guid,
workspaceId,
blob,
state,
seq: initialSeq,
},
});
return true;
}
});
});
}
@@ -484,21 +542,26 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
this.event.emit('doc:manager:snapshot:beforeUpdate', snapshot);
}
await this.upsert(workspaceId, id, doc, last.seq);
this.logger.debug(
`Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}`
);
await this.db.update.deleteMany({
where: {
id,
workspaceId,
seq: {
in: updates.map(u => u.seq),
},
},
});
const done = await this.upsert(workspaceId, id, doc, last.seq);
if (done) {
this.logger.debug(
`Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}`
);
await this.db.update.deleteMany({
where: {
id,
workspaceId,
seq: {
in: updates.map(u => u.seq),
},
},
});
await this.updateCachedUpdatesCount(workspaceId, id, -updates.length);
}
await this.updateCachedUpdatesCount(workspaceId, id, -updates.length);
return doc;
}
@@ -581,22 +644,44 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
return null;
}
private async lockUpdatesForAutoSquash(workspaceId: string, guid: string) {
return this.cache.setnx(
private async doWithLock<T>(lock: string, job: () => Promise<T>) {
const acquired = await this.cache.setnx(lock, 1, {
ttl: 60 * 1000,
});
if (!acquired) {
return;
}
try {
return await job();
} finally {
await this.cache.delete(lock).catch(e => {
// safe, the lock will be expired when ttl ends
this.logger.error(`Failed to release lock ${lock}`, e);
});
}
}
private async lockUpdatesForAutoSquash<T>(
workspaceId: string,
guid: string,
job: () => Promise<T>
) {
return this.doWithLock(
`doc:manager:updates-lock:${workspaceId}::${guid}`,
1,
{
ttl: 60 * 1000,
}
job
);
}
private async unlockUpdatesForAutoSquash(workspaceId: string, guid: string) {
return this.cache
.delete(`doc:manager:updates-lock:${workspaceId}::${guid}`)
.catch(e => {
// safe, the lock will be expired when ttl ends
this.logger.error('Failed to release updates lock', e);
});
async lockSnapshotForUpsert<T>(
workspaceId: string,
guid: string,
job: () => Promise<T>
) {
return this.doWithLock(
`doc:manager:snapshot-lock:${workspaceId}::${guid}`,
job
);
}
}