From 97c4ae48b5e7736dffa452e22cc52ba119a77db0 Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 9 Apr 2024 04:40:15 +0000 Subject: [PATCH] fix(infra): fix sync issues on old ids (#6474) --- .../infra/src/workspace/engine/doc/remote.ts | 72 ++++++++++++++++--- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/packages/common/infra/src/workspace/engine/doc/remote.ts b/packages/common/infra/src/workspace/engine/doc/remote.ts index c336ff4f08..bc9d4a5c27 100644 --- a/packages/common/infra/src/workspace/engine/doc/remote.ts +++ b/packages/common/infra/src/workspace/engine/doc/remote.ts @@ -146,7 +146,10 @@ export class DocEngineRemotePart { await this.jobs.pullAndPush(docId, signal); } else { const pulled = await this.storage.loadDocServerClockPulled(docId); - if (pulled === null || pulled !== this.status.serverClocks.get(docId)) { + if ( + pulled === null || + pulled !== this.status.serverClocks.get(normalizeServerDocId(docId)) + ) { await this.jobs.pull(docId, signal); } } @@ -204,10 +207,13 @@ export class DocEngineRemotePart { serverClock, } = serverData; await this.storage.saveServerClock( - new Map([[docId, serverClock]]), + new Map([[normalizeServerDocId(docId), serverClock]]), signal ); - this.actions.updateServerClock(docId, serverClock); + this.actions.updateServerClock( + normalizeServerDocId(docId), + serverClock + ); await this.storage.commitDocAsServerUpdate( docId, newData, @@ -242,10 +248,13 @@ export class DocEngineRemotePart { signal ); await this.storage.saveServerClock( - new Map([[docId, serverClock]]), + new Map([[normalizeServerDocId(docId), serverClock]]), signal ); - this.actions.updateServerClock(docId, serverClock); + this.actions.updateServerClock( + normalizeServerDocId(docId), + serverClock + ); } await this.storage.saveDocPushedSeqNum(docId, seqNum, signal); } @@ -275,10 +284,10 @@ export class DocEngineRemotePart { update: newData, }); await this.storage.saveServerClock( - new Map([[docId, serverClock]]), + new Map([[normalizeServerDocId(docId), serverClock]]), signal ); - this.actions.updateServerClock(docId, serverClock); + this.actions.updateServerClock(normalizeServerDocId(docId), serverClock); }, save: async ( docId: string, @@ -287,10 +296,10 @@ export class DocEngineRemotePart { ) => { const serverClock = jobs.reduce((a, b) => Math.max(a, b.serverClock), 0); await this.storage.saveServerClock( - new Map([[docId, serverClock]]), + new Map([[normalizeServerDocId(docId), serverClock]]), signal ); - this.actions.updateServerClock(docId, serverClock); + this.actions.updateServerClock(normalizeServerDocId(docId), serverClock); if (this.status.connectedDocs.has(docId)) { const data = jobs .map(j => j.update) @@ -543,3 +552,48 @@ export class DocEngineRemotePart { this.status.jobDocQueue.updatePriority(docId, priority); } } + +// use normalized id in server clock +function normalizeServerDocId(raw: string) { + enum DocVariant { + Workspace = 'workspace', + Page = 'page', + Space = 'space', + Settings = 'settings', + Unknown = 'unknown', + } + + try { + if (!raw.length) { + throw new Error('Invalid Empty Doc ID'); + } + + let parts = raw.split(':'); + + if (parts.length > 3) { + // special adapt case `wsId:space:page:pageId` + if (parts[1] === DocVariant.Space && parts[2] === DocVariant.Page) { + parts = [parts[0], DocVariant.Space, parts[3]]; + } else { + throw new Error(`Invalid format of Doc ID: ${raw}`); + } + } else if (parts.length === 2) { + // `${variant}:${guid}` + throw new Error('not supported'); + } else if (parts.length === 1) { + // ${ws} or ${pageId} + parts = ['', DocVariant.Unknown, parts[0]]; + } + + const docId = parts.at(2); + + if (!docId) { + throw new Error('ID is required'); + } + + return docId; + } catch (err) { + logger.error('Error on normalize docId ' + raw, err); + return raw; + } +}