mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-13 04:48:53 +00:00
fix(server): avoid server overloading by too many updates (#4846)
This commit is contained in:
@@ -68,31 +68,50 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
|
||||
this.destroy();
|
||||
}
|
||||
|
||||
protected recoverDoc(...updates: Buffer[]): Doc {
|
||||
protected recoverDoc(...updates: Buffer[]): Promise<Doc> {
|
||||
const doc = new Doc();
|
||||
const chunks = chunk(updates, 100);
|
||||
|
||||
updates.forEach((update, i) => {
|
||||
try {
|
||||
if (update.length) {
|
||||
applyUpdate(doc, update);
|
||||
return new Promise(resolve => {
|
||||
const next = () => {
|
||||
const updates = chunks.shift();
|
||||
if (updates?.length) {
|
||||
updates.forEach(u => {
|
||||
try {
|
||||
applyUpdate(doc, u);
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to apply update: ${updates
|
||||
.map(u => u.toString('hex'))
|
||||
.join('\n')}`
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
// avoid applying too many updates in single round which will take the whole cpu time like dead lock
|
||||
setImmediate(() => {
|
||||
next();
|
||||
});
|
||||
} else {
|
||||
resolve(doc);
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to apply updates, index: ${i}\nUpdate: ${updates
|
||||
.map(u => u.toString('hex'))
|
||||
.join('\n')}`
|
||||
);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
return doc;
|
||||
next();
|
||||
});
|
||||
}
|
||||
|
||||
protected applyUpdates(guid: string, ...updates: Buffer[]): Doc {
|
||||
const doc = this.recoverDoc(...updates);
|
||||
protected async applyUpdates(
|
||||
guid: string,
|
||||
...updates: Buffer[]
|
||||
): Promise<Doc> {
|
||||
const doc = await this.recoverDoc(...updates);
|
||||
|
||||
// test jwst codec
|
||||
if (this.config.doc.manager.experimentalMergeWithJwstCodec) {
|
||||
if (
|
||||
this.config.doc.manager.experimentalMergeWithJwstCodec &&
|
||||
updates.length < 100 /* avoid overloading */
|
||||
) {
|
||||
this.metrics.jwstCodecMerge(1, {});
|
||||
const yjsResult = Buffer.from(encodeStateAsUpdate(doc));
|
||||
let log = false;
|
||||
@@ -312,6 +331,10 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
|
||||
workspaceId,
|
||||
id: guid,
|
||||
},
|
||||
// take it ease, we don't want to overload db and or cpu
|
||||
// if we limit the taken number here,
|
||||
// user will never see the latest doc if there are too many updates pending to be merged.
|
||||
take: 100,
|
||||
});
|
||||
|
||||
// perf(memory): avoid sorting in db
|
||||
@@ -402,7 +425,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
|
||||
const first = updates[0];
|
||||
const last = updates[updates.length - 1];
|
||||
|
||||
const doc = this.applyUpdates(
|
||||
const doc = await this.applyUpdates(
|
||||
first.id,
|
||||
snapshot ? snapshot.blob : Buffer.from([0, 0]),
|
||||
...updates.map(u => u.blob)
|
||||
|
||||
@@ -103,9 +103,9 @@ export class RedisDocManager extends DocManager {
|
||||
const snapshot = await this.getSnapshot(docId.workspace, docId.guid);
|
||||
|
||||
// merge
|
||||
const doc = snapshot
|
||||
const doc = await (snapshot
|
||||
? this.applyUpdates(docId.full, snapshot.blob, ...updates)
|
||||
: this.applyUpdates(docId.full, ...updates);
|
||||
: this.applyUpdates(docId.full, ...updates));
|
||||
|
||||
// update snapshot
|
||||
await this.upsert(docId.workspace, docId.guid, doc, snapshot?.seq);
|
||||
|
||||
Reference in New Issue
Block a user