From 799fa9cfa64c5907142b3f2e56c59dbf7d9988f9 Mon Sep 17 00:00:00 2001 From: EYHN Date: Thu, 1 Feb 2024 06:58:09 +0000 Subject: [PATCH] fix(workspace): fix sync stuck (#5762) * remove MultipleBatchSyncSender * add timeout (30 seconds) on socket.emit --- .../workspace-impl/src/cloud/awareness.ts | 9 +- .../frontend/workspace-impl/src/cloud/sync.ts | 180 +++++++++++++++ .../src/cloud/sync/batch-sync-sender.ts | 107 --------- .../workspace-impl/src/cloud/sync/index.ts | 210 ------------------ 4 files changed, 185 insertions(+), 321 deletions(-) create mode 100644 packages/frontend/workspace-impl/src/cloud/sync.ts delete mode 100644 packages/frontend/workspace-impl/src/cloud/sync/batch-sync-sender.ts delete mode 100644 packages/frontend/workspace-impl/src/cloud/sync/index.ts diff --git a/packages/frontend/workspace-impl/src/cloud/awareness.ts b/packages/frontend/workspace-impl/src/cloud/awareness.ts index 348e31db3b..e1ec52d47c 100644 --- a/packages/frontend/workspace-impl/src/cloud/awareness.ts +++ b/packages/frontend/workspace-impl/src/cloud/awareness.ts @@ -32,12 +32,13 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider { window.addEventListener('beforeunload', this.windowBeforeUnloadHandler); - this.socket.connect(); - this.socket.on('connect', () => this.handleConnect()); - this.socket.emit('client-handshake-awareness', this.workspaceId); - this.socket.emit('awareness-init', this.workspaceId); + if (this.socket.connected) { + this.handleConnect(); + } else { + this.socket.connect(); + } } disconnect(): void { removeAwarenessStates( diff --git a/packages/frontend/workspace-impl/src/cloud/sync.ts b/packages/frontend/workspace-impl/src/cloud/sync.ts new file mode 100644 index 0000000000..3f57b71fca --- /dev/null +++ b/packages/frontend/workspace-impl/src/cloud/sync.ts @@ -0,0 +1,180 @@ +import { DebugLogger } from '@affine/debug'; +import { fetchWithTraceReport } from '@affine/graphql'; +import { type SyncStorage } from '@toeverything/infra'; +import type { CleanupService } from '@toeverything/infra/lifecycle'; + +import { getIoManager } from '../utils/affine-io'; +import { base64ToUint8Array, uint8ArrayToBase64 } from '../utils/base64'; + +const logger = new DebugLogger('affine:storage:socketio'); + +export class AffineSyncStorage implements SyncStorage { + name = 'affine-cloud'; + + SEND_TIMEOUT = 30000; + + socket = getIoManager().socket('/'); + + constructor( + private readonly workspaceId: string, + cleanupService: CleanupService + ) { + this.socket.on('connect', this.handleConnect); + + if (this.socket.connected) { + this.socket.emit('client-handshake-sync', this.workspaceId); + } else { + this.socket.connect(); + } + + cleanupService.add(() => { + this.cleanup(); + }); + } + + handleConnect = () => { + this.socket.emit('client-handshake-sync', this.workspaceId); + }; + + async pull( + docId: string, + state: Uint8Array + ): Promise<{ data: Uint8Array; state?: Uint8Array } | null> { + const stateVector = state ? await uint8ArrayToBase64(state) : undefined; + + logger.debug('doc-load-v2', { + workspaceId: this.workspaceId, + guid: docId, + stateVector, + }); + + const response: + | { error: any } + | { data: { missing: string; state: string } } = await this.socket + .timeout(this.SEND_TIMEOUT) + .emitWithAck('doc-load-v2', { + workspaceId: this.workspaceId, + guid: docId, + stateVector, + }); + + logger.debug('doc-load callback', { + workspaceId: this.workspaceId, + guid: docId, + stateVector, + response, + }); + + if ('error' in response) { + // TODO: result `EventError` with server + if (response.error.code === 'DOC_NOT_FOUND') { + return null; + } else { + throw new Error(response.error.message); + } + } else { + return { + data: base64ToUint8Array(response.data.missing), + state: response.data.state + ? base64ToUint8Array(response.data.state) + : undefined, + }; + } + } + + async push(docId: string, update: Uint8Array) { + logger.debug('client-update-v2', { + workspaceId: this.workspaceId, + guid: docId, + update, + }); + + const payload = await uint8ArrayToBase64(update); + + const response: { + // TODO: reuse `EventError` with server + error?: any; + data: any; + } = await this.socket + .timeout(this.SEND_TIMEOUT) + .emitWithAck('client-update-v2', { + workspaceId: this.workspaceId, + guid: docId, + updates: [payload], + }); + + // TODO: raise error with different code to users + if (response.error) { + logger.error('client-update-v2 error', { + workspaceId: this.workspaceId, + guid: docId, + response, + }); + + throw new Error(response.error); + } + } + + async subscribe( + cb: (docId: string, data: Uint8Array) => void, + disconnect: (reason: string) => void + ) { + const handleUpdate = async (message: { + workspaceId: string; + guid: string; + updates: string[]; + }) => { + if (message.workspaceId === this.workspaceId) { + message.updates.forEach(update => { + cb(message.guid, base64ToUint8Array(update)); + }); + } + }; + const handleDisconnect = (reason: string) => { + this.socket.off('server-updates', handleUpdate); + disconnect(reason); + }; + this.socket.on('server-updates', handleUpdate); + + this.socket.on('disconnect', handleDisconnect); + + return () => { + this.socket.off('server-updates', handleUpdate); + this.socket.off('disconnect', handleDisconnect); + }; + } + + cleanup() { + this.socket.emit('client-leave-sync', this.workspaceId); + this.socket.off('connect', this.handleConnect); + } +} + +export class AffineStaticSyncStorage implements SyncStorage { + name = 'affine-cloud-static'; + constructor(private readonly workspaceId: string) {} + + async pull( + docId: string + ): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> { + const response = await fetchWithTraceReport( + `/api/workspaces/${this.workspaceId}/docs/${docId}`, + { + priority: 'high', + } + ); + if (response.ok) { + const arrayBuffer = await response.arrayBuffer(); + + return { data: new Uint8Array(arrayBuffer) }; + } + + return null; + } + push(): Promise { + throw new Error('Method not implemented.'); + } + subscribe(): Promise<() => void> { + throw new Error('Method not implemented.'); + } +} diff --git a/packages/frontend/workspace-impl/src/cloud/sync/batch-sync-sender.ts b/packages/frontend/workspace-impl/src/cloud/sync/batch-sync-sender.ts deleted file mode 100644 index 764eea09f7..0000000000 --- a/packages/frontend/workspace-impl/src/cloud/sync/batch-sync-sender.ts +++ /dev/null @@ -1,107 +0,0 @@ -interface SyncUpdateSender { - ( - guid: string, - updates: Uint8Array[] - ): Promise<{ - accepted: boolean; - retry: boolean; - }>; -} - -/** - * BatchSyncSender is simple wrapper with vanilla update sync with several advanced features: - * - ACK mechanism, send updates sequentially with previous sync request correctly responds with ACK - * - batching updates, when waiting for previous ACK, new updates will be buffered and sent in single sync request - * - retryable, allow retry when previous sync request failed but with retry flag been set to true - */ -export class BatchSyncSender { - private readonly buffered: Uint8Array[] = []; - private job: Promise | null = null; - private started = true; - - constructor( - private readonly guid: string, - private readonly rawSender: SyncUpdateSender - ) {} - - send(update: Uint8Array) { - this.buffered.push(update); - this.next(); - return Promise.resolve(); - } - - stop() { - this.started = false; - } - - start() { - this.started = true; - this.next(); - } - - private next() { - if (!this.started || this.job || !this.buffered.length) { - return; - } - - const lastIndex = Math.min( - this.buffered.length - 1, - 99 /* max batch updates size */ - ); - const updates = this.buffered.slice(0, lastIndex + 1); - - if (updates.length) { - this.job = this.rawSender(this.guid, updates) - .then(({ accepted, retry }) => { - // remove pending updates if updates are accepted - if (accepted) { - this.buffered.splice(0, lastIndex + 1); - } - - // stop when previous sending failed and non-recoverable - if (accepted || retry) { - // avoid call stack overflow - setTimeout(() => { - this.next(); - }, 0); - } else { - this.stop(); - } - }) - .catch(() => { - this.stop(); - }) - .finally(() => { - this.job = null; - }); - } - } -} - -export class MultipleBatchSyncSender { - private senders: Record = {}; - - constructor(private readonly rawSender: SyncUpdateSender) {} - - async send(guid: string, update: Uint8Array) { - return this.getSender(guid).send(update); - } - - private getSender(guid: string) { - let sender = this.senders[guid]; - if (!sender) { - sender = new BatchSyncSender(guid, this.rawSender); - this.senders[guid] = sender; - } - - return sender; - } - - start() { - Object.values(this.senders).forEach(sender => sender.start()); - } - - stop() { - Object.values(this.senders).forEach(sender => sender.stop()); - } -} diff --git a/packages/frontend/workspace-impl/src/cloud/sync/index.ts b/packages/frontend/workspace-impl/src/cloud/sync/index.ts deleted file mode 100644 index f3b348863f..0000000000 --- a/packages/frontend/workspace-impl/src/cloud/sync/index.ts +++ /dev/null @@ -1,210 +0,0 @@ -import { DebugLogger } from '@affine/debug'; -import { fetchWithTraceReport } from '@affine/graphql'; -import { type SyncStorage } from '@toeverything/infra'; -import type { CleanupService } from '@toeverything/infra/lifecycle'; - -import { getIoManager } from '../../utils/affine-io'; -import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64'; -import { MultipleBatchSyncSender } from './batch-sync-sender'; - -const logger = new DebugLogger('affine:storage:socketio'); - -export class AffineSyncStorage implements SyncStorage { - name = 'affine-cloud'; - - socket = getIoManager().socket('/'); - - syncSender = new MultipleBatchSyncSender(async (guid, updates) => { - const payload = await Promise.all( - updates.map(update => uint8ArrayToBase64(update)) - ); - - return new Promise(resolve => { - this.socket.emit( - 'client-update-v2', - { - workspaceId: this.workspaceId, - guid, - updates: payload, - }, - (response: { - // TODO: reuse `EventError` with server - error?: any; - data: any; - }) => { - // TODO: raise error with different code to users - if (response.error) { - logger.error('client-update-v2 error', { - workspaceId: this.workspaceId, - guid, - response, - }); - } - - resolve({ - accepted: !response.error, - // TODO: reuse `EventError` with server - retry: response.error?.code === 'INTERNAL', - }); - } - ); - }); - }); - - constructor( - private readonly workspaceId: string, - cleanupService: CleanupService - ) { - this.socket.on('connect', this.handleConnect); - - this.socket.connect(); - - this.socket.emit( - 'client-handshake-sync', - this.workspaceId, - (response: { error?: any }) => { - if (!response.error) { - this.syncSender.start(); - } - } - ); - - cleanupService.add(() => { - this.cleanup(); - }); - } - - handleConnect = () => { - this.socket.emit( - 'client-handshake-sync', - this.workspaceId, - (response: { error?: any }) => { - if (!response.error) { - this.syncSender.start(); - } - } - ); - }; - - async pull( - docId: string, - state: Uint8Array - ): Promise<{ data: Uint8Array; state?: Uint8Array } | null> { - const stateVector = state ? await uint8ArrayToBase64(state) : undefined; - - return new Promise((resolve, reject) => { - logger.debug('doc-load-v2', { - workspaceId: this.workspaceId, - guid: docId, - stateVector, - }); - this.socket.emit( - 'doc-load-v2', - { - workspaceId: this.workspaceId, - guid: docId, - stateVector, - }, - ( - response: // TODO: reuse `EventError` with server - { error: any } | { data: { missing: string; state: string } } - ) => { - logger.debug('doc-load callback', { - workspaceId: this.workspaceId, - guid: docId, - stateVector, - response, - }); - - if ('error' in response) { - // TODO: result `EventError` with server - if (response.error.code === 'DOC_NOT_FOUND') { - resolve(null); - } else { - reject(new Error(response.error.message)); - } - } else { - resolve({ - data: base64ToUint8Array(response.data.missing), - state: response.data.state - ? base64ToUint8Array(response.data.state) - : undefined, - }); - } - } - ); - }); - } - - async push(docId: string, update: Uint8Array) { - logger.debug('client-update-v2', { - workspaceId: this.workspaceId, - guid: docId, - update, - }); - - await this.syncSender.send(docId, update); - } - - async subscribe( - cb: (docId: string, data: Uint8Array) => void, - disconnect: (reason: string) => void - ) { - const handleUpdate = async (message: { - workspaceId: string; - guid: string; - updates: string[]; - }) => { - if (message.workspaceId === this.workspaceId) { - message.updates.forEach(update => { - cb(message.guid, base64ToUint8Array(update)); - }); - } - }; - this.socket.on('server-updates', handleUpdate); - - this.socket.on('disconnect', reason => { - this.socket.off('server-updates', handleUpdate); - disconnect(reason); - }); - - return () => { - this.socket.off('server-updates', handleUpdate); - }; - } - - cleanup() { - this.syncSender.stop(); - this.socket.emit('client-leave-sync', this.workspaceId); - this.socket.off('connect', this.handleConnect); - } -} - -export class AffineStaticSyncStorage implements SyncStorage { - name = 'affine-cloud-static'; - constructor(private readonly workspaceId: string) {} - - async pull( - docId: string - ): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> { - const response = await fetchWithTraceReport( - `/api/workspaces/${this.workspaceId}/docs/${docId}`, - { - priority: 'high', - } - ); - if (response.ok) { - const arrayBuffer = await response.arrayBuffer(); - - return { data: new Uint8Array(arrayBuffer) }; - } - - return null; - } - push(): Promise { - throw new Error('Method not implemented.'); - } - subscribe(): Promise<() => void> { - throw new Error('Method not implemented.'); - } -}