diff --git a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx index dd73f412d7..59981a169a 100644 --- a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx +++ b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx @@ -1,5 +1,8 @@ import { WorkspaceFlavour } from '@affine/env/workspace'; -import { SyncEngineStatus } from '@affine/workspace/providers'; +import { + type SyncEngineStatus, + SyncEngineStep, +} from '@affine/workspace/providers'; import { CloudWorkspaceIcon, LocalWorkspaceIcon, @@ -86,14 +89,13 @@ const WorkspaceStatus = ({ }) => { const isOnline = useSystemOnline(); - const [syncEngineStatus, setSyncEngineStatus] = useState( - SyncEngineStatus.Synced - ); + const [syncEngineStatus, setSyncEngineStatus] = + useState(null); const syncEngine = useCurrentSyncEngine(); useEffect(() => { - setSyncEngineStatus(syncEngine?.status ?? SyncEngineStatus.Synced); + setSyncEngineStatus(syncEngine?.status ?? null); const disposable = syncEngine?.onStatusChange.on( debounce(status => { setSyncEngineStatus(status); @@ -112,26 +114,19 @@ const WorkspaceStatus = ({ if (!isOnline) { return 'Disconnected, please check your network connection'; } - switch (syncEngineStatus) { - case SyncEngineStatus.Syncing: - case SyncEngineStatus.LoadingSubDoc: - case SyncEngineStatus.LoadingRootDoc: - return 'Syncing with AFFiNE Cloud'; - case SyncEngineStatus.Retrying: - return 'Sync disconnected due to unexpected issues, reconnecting.'; - default: - return 'Synced with AFFiNE Cloud'; + if (!syncEngineStatus || syncEngineStatus.step === SyncEngineStep.Syncing) { + return 'Syncing with AFFiNE Cloud'; } - }, [currentWorkspace.flavour, syncEngineStatus, isOnline]); + if (syncEngineStatus.retrying) { + return 'Sync disconnected due to unexpected issues, reconnecting.'; + } + return 'Synced with AFFiNE Cloud'; + }, [currentWorkspace.flavour, isOnline, syncEngineStatus]); const CloudWorkspaceSyncStatus = useCallback(() => { - if ( - syncEngineStatus === SyncEngineStatus.Syncing || - syncEngineStatus === SyncEngineStatus.LoadingSubDoc || - syncEngineStatus === SyncEngineStatus.LoadingRootDoc - ) { + if (!syncEngineStatus || syncEngineStatus.step === SyncEngineStep.Syncing) { return SyncingWorkspaceStatus(); - } else if (syncEngineStatus === SyncEngineStatus.Retrying) { + } else if (syncEngineStatus.retrying) { return UnSyncWorkspaceStatus(); } else { return CloudWorkspaceStatus(); diff --git a/packages/frontend/core/src/pages/workspace/detail-page.tsx b/packages/frontend/core/src/pages/workspace/detail-page.tsx index 41badc2849..eab4a53335 100644 --- a/packages/frontend/core/src/pages/workspace/detail-page.tsx +++ b/packages/frontend/core/src/pages/workspace/detail-page.tsx @@ -5,7 +5,7 @@ import { } from '@affine/component/page-list'; import { WorkspaceSubPath } from '@affine/env/workspace'; import { globalBlockSuiteSchema } from '@affine/workspace/manager'; -import { SyncEngineStatus } from '@affine/workspace/providers'; +import { SyncEngineStep } from '@affine/workspace/providers'; import type { EditorContainer } from '@blocksuite/editor'; import { assertExists } from '@blocksuite/global/utils'; import type { Page } from '@blocksuite/store'; @@ -144,7 +144,7 @@ export const DetailPage = (): ReactElement => { // if sync engine has been synced and the page is null, wait 1s and jump to 404 page. useEffect(() => { - if (currentSyncEngineStatus === SyncEngineStatus.Synced && !page) { + if (currentSyncEngineStatus?.step === SyncEngineStep.Synced && !page) { const timeout = setTimeout(() => { navigate.jumpTo404(); }, 1000); diff --git a/packages/frontend/workspace/src/providers/sync/__tests__/engine.spec.ts b/packages/frontend/workspace/src/providers/sync/__tests__/engine.spec.ts new file mode 100644 index 0000000000..6e9028caa5 --- /dev/null +++ b/packages/frontend/workspace/src/providers/sync/__tests__/engine.spec.ts @@ -0,0 +1,172 @@ +import 'fake-indexeddb/auto'; + +import { setTimeout } from 'node:timers/promises'; + +import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; +import { Schema, Workspace } from '@blocksuite/store'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; +import { Doc } from 'yjs'; + +import { createIndexedDBStorage } from '../../storage'; +import { SyncEngine, SyncEngineStep, SyncPeerStep } from '../'; +import { createTestStorage } from './test-storage'; + +const schema = new Schema(); + +schema.register(AffineSchemas).register(__unstableSchemas); + +beforeEach(() => { + vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); +}); + +describe('SyncEngine', () => { + test('basic - indexeddb', async () => { + let prev: any; + { + const workspace = new Workspace({ + id: 'test', + isSSR: true, + schema, + }); + + const syncEngine = new SyncEngine( + workspace.doc, + createIndexedDBStorage(workspace.doc.guid), + [ + createIndexedDBStorage(workspace.doc.guid + '1'), + createIndexedDBStorage(workspace.doc.guid + '2'), + ] + ); + syncEngine.start(); + + const page = workspace.createPage({ + id: 'page0', + }); + await page.load(); + const pageBlockId = page.addBlock('affine:page', { + title: new page.Text(''), + }); + page.addBlock('affine:surface', {}, pageBlockId); + const frameId = page.addBlock('affine:note', {}, pageBlockId); + page.addBlock('affine:paragraph', {}, frameId); + await syncEngine.waitForSynced(); + syncEngine.stop(); + prev = workspace.doc.toJSON(); + } + + { + const workspace = new Workspace({ + id: 'test', + isSSR: true, + schema, + }); + const syncEngine = new SyncEngine( + workspace.doc, + createIndexedDBStorage(workspace.doc.guid), + [] + ); + syncEngine.start(); + await syncEngine.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncEngine.stop(); + } + + { + const workspace = new Workspace({ + id: 'test', + isSSR: true, + schema, + }); + const syncEngine = new SyncEngine( + workspace.doc, + createIndexedDBStorage(workspace.doc.guid + '1'), + [] + ); + syncEngine.start(); + await syncEngine.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncEngine.stop(); + } + + { + const workspace = new Workspace({ + id: 'test', + isSSR: true, + schema, + }); + const syncEngine = new SyncEngine( + workspace.doc, + createIndexedDBStorage(workspace.doc.guid + '2'), + [] + ); + syncEngine.start(); + await syncEngine.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncEngine.stop(); + } + }); + + test('status', async () => { + const ydoc = new Doc({ guid: 'test - status' }); + + const localStorage = createTestStorage(createIndexedDBStorage(ydoc.guid)); + const remoteStorage = createTestStorage(createIndexedDBStorage(ydoc.guid)); + + localStorage.pausePull(); + localStorage.pausePush(); + remoteStorage.pausePull(); + remoteStorage.pausePush(); + + const syncEngine = new SyncEngine(ydoc, localStorage, [remoteStorage]); + expect(syncEngine.status.step).toEqual(SyncEngineStep.Stopped); + + syncEngine.start(); + await setTimeout(100); + + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.LoadingRootDoc); + + localStorage.resumePull(); + await setTimeout(100); + + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual( + SyncPeerStep.LoadingRootDoc + ); + + remoteStorage.resumePull(); + await setTimeout(100); + + expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced); + + ydoc.getArray('test').insert(0, [1, 2, 3]); + await setTimeout(100); + + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Syncing); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing); + + localStorage.resumePush(); + await setTimeout(100); + + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing); + + remoteStorage.resumePush(); + await setTimeout(100); + + expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced); + }); +}); diff --git a/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts b/packages/frontend/workspace/src/providers/sync/__tests__/peer.spec.ts similarity index 98% rename from packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts rename to packages/frontend/workspace/src/providers/sync/__tests__/peer.spec.ts index ad6be44720..db605a89a4 100644 --- a/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts +++ b/packages/frontend/workspace/src/providers/sync/__tests__/peer.spec.ts @@ -15,7 +15,7 @@ beforeEach(() => { vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); }); -describe('sync', () => { +describe('SyncPeer', () => { test('basic - indexeddb', async () => { let prev: any; { diff --git a/packages/frontend/workspace/src/providers/sync/__tests__/test-storage.ts b/packages/frontend/workspace/src/providers/sync/__tests__/test-storage.ts new file mode 100644 index 0000000000..ab5390e0e8 --- /dev/null +++ b/packages/frontend/workspace/src/providers/sync/__tests__/test-storage.ts @@ -0,0 +1,42 @@ +import type { Storage } from '../../storage'; + +export function createTestStorage(origin: Storage) { + const controler = { + pausedPull: Promise.resolve(), + resumePull: () => {}, + pausedPush: Promise.resolve(), + resumePush: () => {}, + }; + + return { + name: `${origin.name}(testing)`, + pull(docId: string, state: Uint8Array) { + return controler.pausedPull.then(() => origin.pull(docId, state)); + }, + push(docId: string, data: Uint8Array) { + return controler.pausedPush.then(() => origin.push(docId, data)); + }, + subscribe( + cb: (docId: string, data: Uint8Array) => void, + disconnect: (reason: string) => void + ) { + return origin.subscribe(cb, disconnect); + }, + pausePull() { + controler.pausedPull = new Promise(resolve => { + controler.resumePull = resolve; + }); + }, + resumePull() { + controler.resumePull?.(); + }, + pausePush() { + controler.pausedPush = new Promise(resolve => { + controler.resumePush = resolve; + }); + }, + resumePush() { + controler.resumePush?.(); + }, + }; +} diff --git a/packages/frontend/workspace/src/providers/sync/engine.ts b/packages/frontend/workspace/src/providers/sync/engine.ts index 015d4c9ec9..ae723708c6 100644 --- a/packages/frontend/workspace/src/providers/sync/engine.ts +++ b/packages/frontend/workspace/src/providers/sync/engine.ts @@ -3,13 +3,27 @@ import { Slot } from '@blocksuite/global/utils'; import type { Doc } from 'yjs'; import type { Storage } from '../storage'; -import { SyncPeer, SyncPeerStep } from './peer'; +import { SyncPeer, type SyncPeerStatus, SyncPeerStep } from './peer'; export const MANUALLY_STOP = 'manually-stop'; +export enum SyncEngineStep { + Stopped = 0, + Syncing = 1, + Synced = 2, +} + +export interface SyncEngineStatus { + step: SyncEngineStep; + local: SyncPeerStatus | null; + remotes: (SyncPeerStatus | null)[]; + retrying: boolean; +} + /** * # SyncEngine * + * ``` * ┌────────────┐ * │ SyncEngine │ * └─────┬──────┘ @@ -25,6 +39,7 @@ export const MANUALLY_STOP = 'manually-stop'; * │ SyncPeer │ │ SyncPeer │ │ SyncPeer │ * │ Remote │ │ Remote │ │ Remote │ * └────────────┘ └────────────┘ └────────────┘ + * ``` * * Sync engine manage sync peers * @@ -34,29 +49,18 @@ export const MANUALLY_STOP = 'manually-stop'; * 3. start remote sync * 4. continuously sync local and remote */ -export enum SyncEngineStatus { - Stopped = 0, - Retrying = 1, - LoadingRootDoc = 2, - LoadingSubDoc = 3, - Syncing = 4, - Synced = 5, -} - export class SyncEngine { get rootDocId() { return this.rootDoc.guid; } logger = new DebugLogger('affine:sync-engine:' + this.rootDocId); - private _status = SyncEngineStatus.Stopped; + private _status: SyncEngineStatus; onStatusChange = new Slot(); private set status(s: SyncEngineStatus) { - if (s !== this._status) { - this.logger.info('status change', SyncEngineStatus[s]); - this._status = s; - this.onStatusChange.emit(s); - } + this.logger.info('status change', SyncEngineStep[s.step]); + this._status = s; + this.onStatusChange.emit(s); } get status() { @@ -69,15 +73,21 @@ export class SyncEngine { private rootDoc: Doc, private local: Storage, private remotes: Storage[] - ) {} + ) { + this._status = { + step: SyncEngineStep.Stopped, + local: null, + remotes: remotes.map(() => null), + retrying: false, + }; + } start() { - if (this.status !== SyncEngineStatus.Stopped) { + if (this.status.step !== SyncEngineStep.Stopped) { this.stop(); } this.abort = new AbortController(); - this.status = SyncEngineStatus.LoadingRootDoc; this.sync(this.abort.signal).catch(err => { // should never reach here this.logger.error(err); @@ -86,37 +96,54 @@ export class SyncEngine { stop() { this.abort.abort(MANUALLY_STOP); - this.status = SyncEngineStatus.Stopped; + this._status = { + step: SyncEngineStep.Stopped, + local: null, + remotes: this.remotes.map(() => null), + retrying: false, + }; } // main sync process, should never return until abort async sync(signal: AbortSignal) { - let localPeer: SyncPeer | null = null; - const remotePeers: SyncPeer[] = []; + const state: { + localPeer: SyncPeer | null; + remotePeers: (SyncPeer | null)[]; + } = { + localPeer: null, + remotePeers: this.remotes.map(() => null), + }; + const cleanUp: (() => void)[] = []; try { // Step 1: start local sync peer - localPeer = new SyncPeer(this.rootDoc, this.local); + state.localPeer = new SyncPeer(this.rootDoc, this.local); - // Step 2: wait for local sync complete - await localPeer.waitForLoaded(signal); - - // Step 3: start remote sync peer - remotePeers.push( - ...this.remotes.map(remote => new SyncPeer(this.rootDoc, remote)) + cleanUp.push( + state.localPeer.onStatusChange.on(() => { + if (!signal.aborted) + this.updateSyncingState(state.localPeer, state.remotePeers); + }).dispose ); - const peers = [localPeer, ...remotePeers]; + this.updateSyncingState(state.localPeer, state.remotePeers); - this.updateSyncingState(peers); + // Step 2: wait for local sync complete + await state.localPeer.waitForLoaded(signal); - for (const peer of peers) { + // Step 3: start remote sync peer + state.remotePeers = this.remotes.map(remote => { + const peer = new SyncPeer(this.rootDoc, remote); cleanUp.push( peer.onStatusChange.on(() => { - if (!signal.aborted) this.updateSyncingState(peers); + if (!signal.aborted) + this.updateSyncingState(state.localPeer, state.remotePeers); }).dispose ); - } + return peer; + }); + + this.updateSyncingState(state.localPeer, state.remotePeers); // Step 4: continuously sync local and remote @@ -136,9 +163,9 @@ export class SyncEngine { throw error; } finally { // stop peers - localPeer?.stop(); - for (const remotePeer of remotePeers) { - remotePeer.stop(); + state.localPeer?.stop(); + for (const remotePeer of state.remotePeers) { + remotePeer?.stop(); } for (const clean of cleanUp) { clean(); @@ -146,43 +173,33 @@ export class SyncEngine { } } - updateSyncingState(peers: SyncPeer[]) { - let status = SyncEngineStatus.Synced; - for (const peer of peers) { - if (peer.status.step !== SyncPeerStep.Synced) { - status = SyncEngineStatus.Syncing; + updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) { + let step = SyncEngineStep.Synced; + const allPeer = [local, ...remotes]; + for (const peer of allPeer) { + if (!peer || peer.status.step !== SyncPeerStep.Synced) { + step = SyncEngineStep.Syncing; break; } } - for (const peer of peers) { - if (peer.status.step === SyncPeerStep.LoadingSubDoc) { - status = SyncEngineStatus.LoadingSubDoc; - break; - } - } - for (const peer of peers) { - if (peer.status.step === SyncPeerStep.LoadingRootDoc) { - status = SyncEngineStatus.LoadingRootDoc; - break; - } - } - for (const peer of peers) { - if (peer.status.step === SyncPeerStep.Retrying) { - status = SyncEngineStatus.Retrying; - break; - } - } - this.status = status; + this.status = { + step, + local: local?.status ?? null, + remotes: remotes.map(peer => peer?.status ?? null), + retrying: allPeer.some( + peer => peer?.status.step === SyncPeerStep.Retrying + ), + }; } async waitForSynced(abort?: AbortSignal) { - if (this.status == SyncEngineStatus.Synced) { + if (this.status.step == SyncEngineStep.Synced) { return; } else { return Promise.race([ new Promise(resolve => { this.onStatusChange.on(status => { - if (status == SyncEngineStatus.Synced) { + if (status.step == SyncEngineStep.Synced) { resolve(); } }); @@ -200,13 +217,18 @@ export class SyncEngine { } async waitForLoadedRootDoc(abort?: AbortSignal) { - if (this.status > SyncEngineStatus.LoadingRootDoc) { + function isLoadedRootDoc(status: SyncEngineStatus) { + return ![status.local, ...status.remotes].some( + peer => !peer || peer.step <= SyncPeerStep.LoadingRootDoc + ); + } + if (isLoadedRootDoc(this.status)) { return; } else { return Promise.race([ new Promise(resolve => { this.onStatusChange.on(status => { - if (status > SyncEngineStatus.LoadingRootDoc) { + if (isLoadedRootDoc(status)) { resolve(); } });