feat(workspace): more status for SyncPeer (#4983)

This commit is contained in:
EYHN
2023-11-20 20:37:12 +08:00
committed by GitHub
parent 70e71bd43e
commit c9f1fd9649
4 changed files with 141 additions and 65 deletions

View File

@@ -2,15 +2,19 @@ import 'fake-indexeddb/auto';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema, Workspace } from '@blocksuite/store';
import { describe, expect, test } from 'vitest';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { createIndexedDBStorage } from '../../storage';
import { SyncPeer } from '../';
import { SyncPeer, SyncPeerStep } from '../';
const schema = new Schema();
schema.register(AffineSchemas).register(__unstableSchemas);
beforeEach(() => {
vi.useFakeTimers({ toFake: ['requestIdleCallback'] });
});
describe('sync', () => {
test('basic - indexeddb', async () => {
let prev: any;
@@ -30,7 +34,7 @@ describe('sync', () => {
const page = workspace.createPage({
id: 'page0',
});
await page.waitForLoaded();
await page.load();
const pageBlockId = page.addBlock('affine:page', {
title: new page.Text(''),
});
@@ -59,4 +63,32 @@ describe('sync', () => {
syncPeer.stop();
}
});
test('status', async () => {
const workspace = new Workspace({
id: 'test - status',
isSSR: true,
schema,
});
const syncPeer = new SyncPeer(
workspace.doc,
createIndexedDBStorage(workspace.doc.guid)
);
expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingRootDoc);
await syncPeer.waitForSynced();
expect(syncPeer.status.step).toBe(SyncPeerStep.Synced);
const page = workspace.createPage({
id: 'page0',
});
expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingSubDoc);
await page.load();
await syncPeer.waitForSynced();
page.addBlock('affine:page', {
title: new page.Text(''),
});
expect(syncPeer.status.step).toBe(SyncPeerStep.Syncing);
syncPeer.stop();
});
});

View File

@@ -3,7 +3,7 @@ import { Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import type { Storage } from '../storage';
import { SyncPeer, SyncPeerStatus } from './peer';
import { SyncPeer, SyncPeerStep } from './peer';
export const MANUALLY_STOP = 'manually-stop';
@@ -149,25 +149,25 @@ export class SyncEngine {
updateSyncingState(peers: SyncPeer[]) {
let status = SyncEngineStatus.Synced;
for (const peer of peers) {
if (peer.status !== SyncPeerStatus.Synced) {
if (peer.status.step !== SyncPeerStep.Synced) {
status = SyncEngineStatus.Syncing;
break;
}
}
for (const peer of peers) {
if (peer.status === SyncPeerStatus.LoadingSubDoc) {
if (peer.status.step === SyncPeerStep.LoadingSubDoc) {
status = SyncEngineStatus.LoadingSubDoc;
break;
}
}
for (const peer of peers) {
if (peer.status === SyncPeerStatus.LoadingRootDoc) {
if (peer.status.step === SyncPeerStep.LoadingRootDoc) {
status = SyncEngineStatus.LoadingRootDoc;
break;
}
}
for (const peer of peers) {
if (peer.status === SyncPeerStatus.Retrying) {
if (peer.status.step === SyncPeerStep.Retrying) {
status = SyncEngineStatus.Retrying;
break;
}

View File

@@ -1,5 +1,6 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import { isEqual } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs';
@@ -8,10 +9,29 @@ import { AsyncQueue } from '../utils/async-queue';
import { throwIfAborted } from '../utils/throw-if-aborted';
import { MANUALLY_STOP } from './engine';
export enum SyncPeerStep {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Loaded = 4.5,
Syncing = 5,
Synced = 6,
}
export interface SyncPeerStatus {
step: SyncPeerStep;
totalDocs: number;
loadedDocs: number;
pendingPullUpdates: number;
pendingPushUpdates: number;
}
/**
* # SyncPeer
* A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs.
*
* ```
* ┌─────┐
* │Start│
* └──┬──┘
@@ -27,23 +47,20 @@ import { MANUALLY_STOP } from './engine';
* ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐
* │queue├──────►apply updates◄───────┤queue│
* └─────┘ └─────────────┘ └─────┘
* ```
*
* listen: listen for updates from ydoc, typically from user modifications.
* subscribe: listen for updates from storage, typically from other users.
*
*/
export enum SyncPeerStatus {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Loaded = 4.5,
Syncing = 5,
Synced = 6,
}
export class SyncPeer {
private _status = SyncPeerStatus.Stopped;
private _status: SyncPeerStatus = {
step: SyncPeerStep.LoadingRootDoc,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
};
onStatusChange = new Slot<SyncPeerStatus>();
abort = new AbortController();
get name() {
@@ -56,7 +73,6 @@ export class SyncPeer {
private storage: Storage
) {
this.logger.debug('peer start');
this.status = SyncPeerStatus.LoadingRootDoc;
this.syncRetryLoop(this.abort.signal).catch(err => {
// should not reach here
@@ -65,8 +81,8 @@ export class SyncPeer {
}
private set status(s: SyncPeerStatus) {
if (s !== this._status) {
this.logger.debug('status change', SyncPeerStatus[s]);
if (!isEqual(s, this._status)) {
this.logger.debug('status change', s);
this._status = s;
this.onStatusChange.emit(s);
}
@@ -102,7 +118,13 @@ export class SyncPeer {
}
try {
this.logger.error('retry after 5 seconds');
this.status = SyncPeerStatus.Retrying;
this.status = {
step: SyncPeerStep.Retrying,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
};
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 5 * 1000);
@@ -134,22 +156,36 @@ export class SyncPeer {
docId: string;
data: Uint8Array;
}>;
pushingUpdate: boolean;
pullUpdatesQueue: AsyncQueue<{
docId: string;
data: Uint8Array;
}>;
subdocLoading: boolean;
subdocsLoadQueue: AsyncQueue<Doc>;
} = {
connectedDocs: new Map(),
pushUpdatesQueue: new AsyncQueue(),
pushingUpdate: false,
pullUpdatesQueue: new AsyncQueue(),
subdocLoading: false,
subdocsLoadQueue: new AsyncQueue(),
};
initState() {
this.state.connectedDocs.clear();
this.state.pushUpdatesQueue.clear();
this.state.pullUpdatesQueue.clear();
this.state.subdocsLoadQueue.clear();
this.state.pushingUpdate = false;
this.state.subdocLoading = false;
}
/**
* main synchronization logic
*/
async sync(abortOuter: AbortSignal) {
this.initState();
const abortInner = new AbortController();
abortOuter.addEventListener('abort', reason => {
@@ -158,6 +194,8 @@ export class SyncPeer {
let dispose: (() => void) | null = null;
try {
this.reportSyncStatus();
// start listen storage updates
dispose = await this.storage.subscribe(
this.handleStorageUpdates,
@@ -169,41 +207,29 @@ export class SyncPeer {
throwIfAborted(abortInner.signal);
// Step 1: load root doc
this.status = SyncPeerStatus.LoadingRootDoc;
await this.connectDoc(this.rootDoc, abortInner.signal);
this.status = SyncPeerStatus.LoadingSubDoc;
// Step 2: load subdocs
this.state.subdocsLoadQueue.push(
...Array.from(this.rootDoc.getSubdocs())
);
this.reportSyncStatus();
this.rootDoc.on('subdocs', this.handleSubdocsUpdate);
while (this.state.subdocsLoadQueue.length > 0) {
const subdoc = await this.state.subdocsLoadQueue.next(
abortInner.signal
);
await this.connectDoc(subdoc, abortInner.signal);
}
this.status = SyncPeerStatus.Syncing;
this.updateSyncStatus();
// Finally: start sync
await Promise.all([
// listen subdocs
// load subdocs
(async () => {
while (throwIfAborted(abortInner.signal)) {
const subdoc = await this.state.subdocsLoadQueue.next(
abortInner.signal
);
this.status = SyncPeerStatus.LoadingSubDoc;
this.state.subdocLoading = true;
this.reportSyncStatus();
await this.connectDoc(subdoc, abortInner.signal);
this.status = SyncPeerStatus.Syncing;
this.updateSyncStatus();
this.state.subdocLoading = false;
this.reportSyncStatus();
}
})(),
// pull updates
@@ -212,7 +238,6 @@ export class SyncPeer {
const { docId, data } = await this.state.pullUpdatesQueue.next(
abortInner.signal
);
this.updateSyncStatus();
// don't apply empty data or Uint8Array([0, 0])
if (
!(
@@ -225,6 +250,7 @@ export class SyncPeer {
applyUpdate(subdoc, data, this.name);
}
}
this.reportSyncStatus();
}
})(),
// push updates
@@ -233,6 +259,8 @@ export class SyncPeer {
const { docId, data } = await this.state.pushUpdatesQueue.next(
abortInner.signal
);
this.state.pushingUpdate = true;
this.reportSyncStatus();
// don't push empty data or Uint8Array([0, 0])
if (
@@ -244,7 +272,8 @@ export class SyncPeer {
await this.storage.push(docId, data);
}
this.updateSyncStatus();
this.state.pushingUpdate = false;
this.reportSyncStatus();
}
})(),
]);
@@ -279,11 +308,14 @@ export class SyncPeer {
// mark rootDoc as loaded
doc.emit('sync', [true]);
this.reportSyncStatus();
}
disconnectDoc(doc: Doc) {
doc.off('update', this.handleYDocUpdates);
this.state.connectedDocs.delete(doc.guid);
this.reportSyncStatus();
}
// handle updates from ydoc
@@ -296,7 +328,7 @@ export class SyncPeer {
docId: doc.guid,
data: update,
});
this.updateSyncStatus();
this.reportSyncStatus();
};
// handle subdocs changes, append new subdocs to queue, remove subdocs from queue
@@ -315,7 +347,7 @@ export class SyncPeer {
this.disconnectDoc(subdoc);
this.state.subdocsLoadQueue.remove(doc => doc === subdoc);
}
this.updateSyncStatus();
this.reportSyncStatus();
};
// handle updates from storage
@@ -324,37 +356,45 @@ export class SyncPeer {
docId,
data,
});
this.updateSyncStatus();
this.reportSyncStatus();
};
updateSyncStatus() {
// if status is not syncing, do nothing
if (this.status < SyncPeerStatus.Syncing) {
return;
}
if (
this.state.pushUpdatesQueue.length === 0 &&
this.state.pullUpdatesQueue.length === 0 &&
this.state.subdocsLoadQueue.length === 0
reportSyncStatus() {
let step;
if (this.state.connectedDocs.size === 0) {
step = SyncPeerStep.LoadingRootDoc;
} else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) {
step = SyncPeerStep.LoadingSubDoc;
} else if (
this.state.pullUpdatesQueue.length ||
this.state.pushUpdatesQueue.length ||
this.state.pushingUpdate
) {
if (this.status === SyncPeerStatus.Syncing) {
this.status = SyncPeerStatus.Synced;
}
step = SyncPeerStep.Syncing;
} else {
if (this.status === SyncPeerStatus.Synced) {
this.status = SyncPeerStatus.Syncing;
}
step = SyncPeerStep.Synced;
}
this.status = {
step: step,
totalDocs:
this.state.connectedDocs.size + this.state.subdocsLoadQueue.length,
loadedDocs: this.state.connectedDocs.size,
pendingPullUpdates:
this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0),
pendingPushUpdates:
this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0),
};
}
async waitForSynced(abort?: AbortSignal) {
if (this.status >= SyncPeerStatus.Synced) {
if (this.status.step >= SyncPeerStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status >= SyncPeerStatus.Synced) {
if (status.step >= SyncPeerStep.Synced) {
resolve();
}
});
@@ -372,13 +412,13 @@ export class SyncPeer {
}
async waitForLoaded(abort?: AbortSignal) {
if (this.status > SyncPeerStatus.Loaded) {
if (this.status.step > SyncPeerStep.Loaded) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status > SyncPeerStatus.Loaded) {
if (status.step > SyncPeerStep.Loaded) {
resolve();
}
});

View File

@@ -55,4 +55,8 @@ export class AsyncQueue<T> {
this._queue.splice(index, 1);
}
}
clear() {
this._queue = [];
}
}