From c9f1fd96492ecdbedcb12582d69a711db412c862 Mon Sep 17 00:00:00 2001 From: EYHN Date: Mon, 20 Nov 2023 20:37:12 +0800 Subject: [PATCH] feat(workspace): more status for SyncPeer (#4983) --- .../src/providers/sync/__tests__/sync.spec.ts | 38 ++++- .../workspace/src/providers/sync/engine.ts | 10 +- .../workspace/src/providers/sync/peer.ts | 154 +++++++++++------- .../src/providers/utils/async-queue.ts | 4 + 4 files changed, 141 insertions(+), 65 deletions(-) diff --git a/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts b/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts index 5f06e61949..ad6be44720 100644 --- a/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts +++ b/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts @@ -2,15 +2,19 @@ import 'fake-indexeddb/auto'; import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; import { Schema, Workspace } from '@blocksuite/store'; -import { describe, expect, test } from 'vitest'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; import { createIndexedDBStorage } from '../../storage'; -import { SyncPeer } from '../'; +import { SyncPeer, SyncPeerStep } from '../'; const schema = new Schema(); schema.register(AffineSchemas).register(__unstableSchemas); +beforeEach(() => { + vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); +}); + describe('sync', () => { test('basic - indexeddb', async () => { let prev: any; @@ -30,7 +34,7 @@ describe('sync', () => { const page = workspace.createPage({ id: 'page0', }); - await page.waitForLoaded(); + await page.load(); const pageBlockId = page.addBlock('affine:page', { title: new page.Text(''), }); @@ -59,4 +63,32 @@ describe('sync', () => { syncPeer.stop(); } }); + + test('status', async () => { + const workspace = new Workspace({ + id: 'test - status', + isSSR: true, + schema, + }); + + const syncPeer = new SyncPeer( + workspace.doc, + createIndexedDBStorage(workspace.doc.guid) + ); + expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingRootDoc); + await syncPeer.waitForSynced(); + expect(syncPeer.status.step).toBe(SyncPeerStep.Synced); + + const page = workspace.createPage({ + id: 'page0', + }); + expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingSubDoc); + await page.load(); + await syncPeer.waitForSynced(); + page.addBlock('affine:page', { + title: new page.Text(''), + }); + expect(syncPeer.status.step).toBe(SyncPeerStep.Syncing); + syncPeer.stop(); + }); }); diff --git a/packages/frontend/workspace/src/providers/sync/engine.ts b/packages/frontend/workspace/src/providers/sync/engine.ts index 432e4199a3..015d4c9ec9 100644 --- a/packages/frontend/workspace/src/providers/sync/engine.ts +++ b/packages/frontend/workspace/src/providers/sync/engine.ts @@ -3,7 +3,7 @@ import { Slot } from '@blocksuite/global/utils'; import type { Doc } from 'yjs'; import type { Storage } from '../storage'; -import { SyncPeer, SyncPeerStatus } from './peer'; +import { SyncPeer, SyncPeerStep } from './peer'; export const MANUALLY_STOP = 'manually-stop'; @@ -149,25 +149,25 @@ export class SyncEngine { updateSyncingState(peers: SyncPeer[]) { let status = SyncEngineStatus.Synced; for (const peer of peers) { - if (peer.status !== SyncPeerStatus.Synced) { + if (peer.status.step !== SyncPeerStep.Synced) { status = SyncEngineStatus.Syncing; break; } } for (const peer of peers) { - if (peer.status === SyncPeerStatus.LoadingSubDoc) { + if (peer.status.step === SyncPeerStep.LoadingSubDoc) { status = SyncEngineStatus.LoadingSubDoc; break; } } for (const peer of peers) { - if (peer.status === SyncPeerStatus.LoadingRootDoc) { + if (peer.status.step === SyncPeerStep.LoadingRootDoc) { status = SyncEngineStatus.LoadingRootDoc; break; } } for (const peer of peers) { - if (peer.status === SyncPeerStatus.Retrying) { + if (peer.status.step === SyncPeerStep.Retrying) { status = SyncEngineStatus.Retrying; break; } diff --git a/packages/frontend/workspace/src/providers/sync/peer.ts b/packages/frontend/workspace/src/providers/sync/peer.ts index 1a208b67d3..18f8493b4f 100644 --- a/packages/frontend/workspace/src/providers/sync/peer.ts +++ b/packages/frontend/workspace/src/providers/sync/peer.ts @@ -1,5 +1,6 @@ import { DebugLogger } from '@affine/debug'; import { Slot } from '@blocksuite/global/utils'; +import { isEqual } from '@blocksuite/global/utils'; import type { Doc } from 'yjs'; import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs'; @@ -8,10 +9,29 @@ import { AsyncQueue } from '../utils/async-queue'; import { throwIfAborted } from '../utils/throw-if-aborted'; import { MANUALLY_STOP } from './engine'; +export enum SyncPeerStep { + Stopped = 0, + Retrying = 1, + LoadingRootDoc = 2, + LoadingSubDoc = 3, + Loaded = 4.5, + Syncing = 5, + Synced = 6, +} + +export interface SyncPeerStatus { + step: SyncPeerStep; + totalDocs: number; + loadedDocs: number; + pendingPullUpdates: number; + pendingPushUpdates: number; +} + /** * # SyncPeer * A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs. * + * ``` * ┌─────┐ * │Start│ * └──┬──┘ @@ -27,23 +47,20 @@ import { MANUALLY_STOP } from './engine'; * ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐ * │queue├──────►apply updates◄───────┤queue│ * └─────┘ └─────────────┘ └─────┘ + * ``` * * listen: listen for updates from ydoc, typically from user modifications. * subscribe: listen for updates from storage, typically from other users. * */ -export enum SyncPeerStatus { - Stopped = 0, - Retrying = 1, - LoadingRootDoc = 2, - LoadingSubDoc = 3, - Loaded = 4.5, - Syncing = 5, - Synced = 6, -} - export class SyncPeer { - private _status = SyncPeerStatus.Stopped; + private _status: SyncPeerStatus = { + step: SyncPeerStep.LoadingRootDoc, + totalDocs: 1, + loadedDocs: 0, + pendingPullUpdates: 0, + pendingPushUpdates: 0, + }; onStatusChange = new Slot(); abort = new AbortController(); get name() { @@ -56,7 +73,6 @@ export class SyncPeer { private storage: Storage ) { this.logger.debug('peer start'); - this.status = SyncPeerStatus.LoadingRootDoc; this.syncRetryLoop(this.abort.signal).catch(err => { // should not reach here @@ -65,8 +81,8 @@ export class SyncPeer { } private set status(s: SyncPeerStatus) { - if (s !== this._status) { - this.logger.debug('status change', SyncPeerStatus[s]); + if (!isEqual(s, this._status)) { + this.logger.debug('status change', s); this._status = s; this.onStatusChange.emit(s); } @@ -102,7 +118,13 @@ export class SyncPeer { } try { this.logger.error('retry after 5 seconds'); - this.status = SyncPeerStatus.Retrying; + this.status = { + step: SyncPeerStep.Retrying, + totalDocs: 1, + loadedDocs: 0, + pendingPullUpdates: 0, + pendingPushUpdates: 0, + }; await Promise.race([ new Promise(resolve => { setTimeout(resolve, 5 * 1000); @@ -134,22 +156,36 @@ export class SyncPeer { docId: string; data: Uint8Array; }>; + pushingUpdate: boolean; pullUpdatesQueue: AsyncQueue<{ docId: string; data: Uint8Array; }>; + subdocLoading: boolean; subdocsLoadQueue: AsyncQueue; } = { connectedDocs: new Map(), pushUpdatesQueue: new AsyncQueue(), + pushingUpdate: false, pullUpdatesQueue: new AsyncQueue(), + subdocLoading: false, subdocsLoadQueue: new AsyncQueue(), }; + initState() { + this.state.connectedDocs.clear(); + this.state.pushUpdatesQueue.clear(); + this.state.pullUpdatesQueue.clear(); + this.state.subdocsLoadQueue.clear(); + this.state.pushingUpdate = false; + this.state.subdocLoading = false; + } + /** * main synchronization logic */ async sync(abortOuter: AbortSignal) { + this.initState(); const abortInner = new AbortController(); abortOuter.addEventListener('abort', reason => { @@ -158,6 +194,8 @@ export class SyncPeer { let dispose: (() => void) | null = null; try { + this.reportSyncStatus(); + // start listen storage updates dispose = await this.storage.subscribe( this.handleStorageUpdates, @@ -169,41 +207,29 @@ export class SyncPeer { throwIfAborted(abortInner.signal); // Step 1: load root doc - this.status = SyncPeerStatus.LoadingRootDoc; - await this.connectDoc(this.rootDoc, abortInner.signal); - this.status = SyncPeerStatus.LoadingSubDoc; - // Step 2: load subdocs this.state.subdocsLoadQueue.push( ...Array.from(this.rootDoc.getSubdocs()) ); + this.reportSyncStatus(); this.rootDoc.on('subdocs', this.handleSubdocsUpdate); - while (this.state.subdocsLoadQueue.length > 0) { - const subdoc = await this.state.subdocsLoadQueue.next( - abortInner.signal - ); - await this.connectDoc(subdoc, abortInner.signal); - } - - this.status = SyncPeerStatus.Syncing; - this.updateSyncStatus(); - // Finally: start sync await Promise.all([ - // listen subdocs + // load subdocs (async () => { while (throwIfAborted(abortInner.signal)) { const subdoc = await this.state.subdocsLoadQueue.next( abortInner.signal ); - this.status = SyncPeerStatus.LoadingSubDoc; + this.state.subdocLoading = true; + this.reportSyncStatus(); await this.connectDoc(subdoc, abortInner.signal); - this.status = SyncPeerStatus.Syncing; - this.updateSyncStatus(); + this.state.subdocLoading = false; + this.reportSyncStatus(); } })(), // pull updates @@ -212,7 +238,6 @@ export class SyncPeer { const { docId, data } = await this.state.pullUpdatesQueue.next( abortInner.signal ); - this.updateSyncStatus(); // don't apply empty data or Uint8Array([0, 0]) if ( !( @@ -225,6 +250,7 @@ export class SyncPeer { applyUpdate(subdoc, data, this.name); } } + this.reportSyncStatus(); } })(), // push updates @@ -233,6 +259,8 @@ export class SyncPeer { const { docId, data } = await this.state.pushUpdatesQueue.next( abortInner.signal ); + this.state.pushingUpdate = true; + this.reportSyncStatus(); // don't push empty data or Uint8Array([0, 0]) if ( @@ -244,7 +272,8 @@ export class SyncPeer { await this.storage.push(docId, data); } - this.updateSyncStatus(); + this.state.pushingUpdate = false; + this.reportSyncStatus(); } })(), ]); @@ -279,11 +308,14 @@ export class SyncPeer { // mark rootDoc as loaded doc.emit('sync', [true]); + + this.reportSyncStatus(); } disconnectDoc(doc: Doc) { doc.off('update', this.handleYDocUpdates); this.state.connectedDocs.delete(doc.guid); + this.reportSyncStatus(); } // handle updates from ydoc @@ -296,7 +328,7 @@ export class SyncPeer { docId: doc.guid, data: update, }); - this.updateSyncStatus(); + this.reportSyncStatus(); }; // handle subdocs changes, append new subdocs to queue, remove subdocs from queue @@ -315,7 +347,7 @@ export class SyncPeer { this.disconnectDoc(subdoc); this.state.subdocsLoadQueue.remove(doc => doc === subdoc); } - this.updateSyncStatus(); + this.reportSyncStatus(); }; // handle updates from storage @@ -324,37 +356,45 @@ export class SyncPeer { docId, data, }); - this.updateSyncStatus(); + this.reportSyncStatus(); }; - updateSyncStatus() { - // if status is not syncing, do nothing - if (this.status < SyncPeerStatus.Syncing) { - return; - } - if ( - this.state.pushUpdatesQueue.length === 0 && - this.state.pullUpdatesQueue.length === 0 && - this.state.subdocsLoadQueue.length === 0 + reportSyncStatus() { + let step; + if (this.state.connectedDocs.size === 0) { + step = SyncPeerStep.LoadingRootDoc; + } else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) { + step = SyncPeerStep.LoadingSubDoc; + } else if ( + this.state.pullUpdatesQueue.length || + this.state.pushUpdatesQueue.length || + this.state.pushingUpdate ) { - if (this.status === SyncPeerStatus.Syncing) { - this.status = SyncPeerStatus.Synced; - } + step = SyncPeerStep.Syncing; } else { - if (this.status === SyncPeerStatus.Synced) { - this.status = SyncPeerStatus.Syncing; - } + step = SyncPeerStep.Synced; } + + this.status = { + step: step, + totalDocs: + this.state.connectedDocs.size + this.state.subdocsLoadQueue.length, + loadedDocs: this.state.connectedDocs.size, + pendingPullUpdates: + this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0), + pendingPushUpdates: + this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0), + }; } async waitForSynced(abort?: AbortSignal) { - if (this.status >= SyncPeerStatus.Synced) { + if (this.status.step >= SyncPeerStep.Synced) { return; } else { return Promise.race([ new Promise(resolve => { this.onStatusChange.on(status => { - if (status >= SyncPeerStatus.Synced) { + if (status.step >= SyncPeerStep.Synced) { resolve(); } }); @@ -372,13 +412,13 @@ export class SyncPeer { } async waitForLoaded(abort?: AbortSignal) { - if (this.status > SyncPeerStatus.Loaded) { + if (this.status.step > SyncPeerStep.Loaded) { return; } else { return Promise.race([ new Promise(resolve => { this.onStatusChange.on(status => { - if (status > SyncPeerStatus.Loaded) { + if (status.step > SyncPeerStep.Loaded) { resolve(); } }); diff --git a/packages/frontend/workspace/src/providers/utils/async-queue.ts b/packages/frontend/workspace/src/providers/utils/async-queue.ts index d0002332b9..8b146a4f07 100644 --- a/packages/frontend/workspace/src/providers/utils/async-queue.ts +++ b/packages/frontend/workspace/src/providers/utils/async-queue.ts @@ -55,4 +55,8 @@ export class AsyncQueue { this._queue.splice(index, 1); } } + + clear() { + this._queue = []; + } }