feat(workspace): more status for SyncEngine (#4984)

This commit is contained in:
EYHN
2023-11-20 22:51:20 +08:00
committed by GitHub
parent c9f1fd9649
commit 9370110cdc
6 changed files with 320 additions and 89 deletions

View File

@@ -1,5 +1,8 @@
import { WorkspaceFlavour } from '@affine/env/workspace';
import { SyncEngineStatus } from '@affine/workspace/providers';
import {
type SyncEngineStatus,
SyncEngineStep,
} from '@affine/workspace/providers';
import {
CloudWorkspaceIcon,
LocalWorkspaceIcon,
@@ -86,14 +89,13 @@ const WorkspaceStatus = ({
}) => {
const isOnline = useSystemOnline();
const [syncEngineStatus, setSyncEngineStatus] = useState<SyncEngineStatus>(
SyncEngineStatus.Synced
);
const [syncEngineStatus, setSyncEngineStatus] =
useState<SyncEngineStatus | null>(null);
const syncEngine = useCurrentSyncEngine();
useEffect(() => {
setSyncEngineStatus(syncEngine?.status ?? SyncEngineStatus.Synced);
setSyncEngineStatus(syncEngine?.status ?? null);
const disposable = syncEngine?.onStatusChange.on(
debounce(status => {
setSyncEngineStatus(status);
@@ -112,26 +114,19 @@ const WorkspaceStatus = ({
if (!isOnline) {
return 'Disconnected, please check your network connection';
}
switch (syncEngineStatus) {
case SyncEngineStatus.Syncing:
case SyncEngineStatus.LoadingSubDoc:
case SyncEngineStatus.LoadingRootDoc:
return 'Syncing with AFFiNE Cloud';
case SyncEngineStatus.Retrying:
return 'Sync disconnected due to unexpected issues, reconnecting.';
default:
return 'Synced with AFFiNE Cloud';
if (!syncEngineStatus || syncEngineStatus.step === SyncEngineStep.Syncing) {
return 'Syncing with AFFiNE Cloud';
}
}, [currentWorkspace.flavour, syncEngineStatus, isOnline]);
if (syncEngineStatus.retrying) {
return 'Sync disconnected due to unexpected issues, reconnecting.';
}
return 'Synced with AFFiNE Cloud';
}, [currentWorkspace.flavour, isOnline, syncEngineStatus]);
const CloudWorkspaceSyncStatus = useCallback(() => {
if (
syncEngineStatus === SyncEngineStatus.Syncing ||
syncEngineStatus === SyncEngineStatus.LoadingSubDoc ||
syncEngineStatus === SyncEngineStatus.LoadingRootDoc
) {
if (!syncEngineStatus || syncEngineStatus.step === SyncEngineStep.Syncing) {
return SyncingWorkspaceStatus();
} else if (syncEngineStatus === SyncEngineStatus.Retrying) {
} else if (syncEngineStatus.retrying) {
return UnSyncWorkspaceStatus();
} else {
return CloudWorkspaceStatus();

View File

@@ -5,7 +5,7 @@ import {
} from '@affine/component/page-list';
import { WorkspaceSubPath } from '@affine/env/workspace';
import { globalBlockSuiteSchema } from '@affine/workspace/manager';
import { SyncEngineStatus } from '@affine/workspace/providers';
import { SyncEngineStep } from '@affine/workspace/providers';
import type { EditorContainer } from '@blocksuite/editor';
import { assertExists } from '@blocksuite/global/utils';
import type { Page } from '@blocksuite/store';
@@ -144,7 +144,7 @@ export const DetailPage = (): ReactElement => {
// if sync engine has been synced and the page is null, wait 1s and jump to 404 page.
useEffect(() => {
if (currentSyncEngineStatus === SyncEngineStatus.Synced && !page) {
if (currentSyncEngineStatus?.step === SyncEngineStep.Synced && !page) {
const timeout = setTimeout(() => {
navigate.jumpTo404();
}, 1000);

View File

@@ -0,0 +1,172 @@
import 'fake-indexeddb/auto';
import { setTimeout } from 'node:timers/promises';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema, Workspace } from '@blocksuite/store';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { Doc } from 'yjs';
import { createIndexedDBStorage } from '../../storage';
import { SyncEngine, SyncEngineStep, SyncPeerStep } from '../';
import { createTestStorage } from './test-storage';
const schema = new Schema();
schema.register(AffineSchemas).register(__unstableSchemas);
beforeEach(() => {
vi.useFakeTimers({ toFake: ['requestIdleCallback'] });
});
describe('SyncEngine', () => {
test('basic - indexeddb', async () => {
let prev: any;
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const syncEngine = new SyncEngine(
workspace.doc,
createIndexedDBStorage(workspace.doc.guid),
[
createIndexedDBStorage(workspace.doc.guid + '1'),
createIndexedDBStorage(workspace.doc.guid + '2'),
]
);
syncEngine.start();
const page = workspace.createPage({
id: 'page0',
});
await page.load();
const pageBlockId = page.addBlock('affine:page', {
title: new page.Text(''),
});
page.addBlock('affine:surface', {}, pageBlockId);
const frameId = page.addBlock('affine:note', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
await syncEngine.waitForSynced();
syncEngine.stop();
prev = workspace.doc.toJSON();
}
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const syncEngine = new SyncEngine(
workspace.doc,
createIndexedDBStorage(workspace.doc.guid),
[]
);
syncEngine.start();
await syncEngine.waitForSynced();
expect(workspace.doc.toJSON()).toEqual({
...prev,
});
syncEngine.stop();
}
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const syncEngine = new SyncEngine(
workspace.doc,
createIndexedDBStorage(workspace.doc.guid + '1'),
[]
);
syncEngine.start();
await syncEngine.waitForSynced();
expect(workspace.doc.toJSON()).toEqual({
...prev,
});
syncEngine.stop();
}
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const syncEngine = new SyncEngine(
workspace.doc,
createIndexedDBStorage(workspace.doc.guid + '2'),
[]
);
syncEngine.start();
await syncEngine.waitForSynced();
expect(workspace.doc.toJSON()).toEqual({
...prev,
});
syncEngine.stop();
}
});
test('status', async () => {
const ydoc = new Doc({ guid: 'test - status' });
const localStorage = createTestStorage(createIndexedDBStorage(ydoc.guid));
const remoteStorage = createTestStorage(createIndexedDBStorage(ydoc.guid));
localStorage.pausePull();
localStorage.pausePush();
remoteStorage.pausePull();
remoteStorage.pausePush();
const syncEngine = new SyncEngine(ydoc, localStorage, [remoteStorage]);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Stopped);
syncEngine.start();
await setTimeout(100);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.LoadingRootDoc);
localStorage.resumePull();
await setTimeout(100);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(
SyncPeerStep.LoadingRootDoc
);
remoteStorage.resumePull();
await setTimeout(100);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced);
ydoc.getArray('test').insert(0, [1, 2, 3]);
await setTimeout(100);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Syncing);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing);
localStorage.resumePush();
await setTimeout(100);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing);
remoteStorage.resumePush();
await setTimeout(100);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced);
});
});

View File

@@ -15,7 +15,7 @@ beforeEach(() => {
vi.useFakeTimers({ toFake: ['requestIdleCallback'] });
});
describe('sync', () => {
describe('SyncPeer', () => {
test('basic - indexeddb', async () => {
let prev: any;
{

View File

@@ -0,0 +1,42 @@
import type { Storage } from '../../storage';
export function createTestStorage(origin: Storage) {
const controler = {
pausedPull: Promise.resolve(),
resumePull: () => {},
pausedPush: Promise.resolve(),
resumePush: () => {},
};
return {
name: `${origin.name}(testing)`,
pull(docId: string, state: Uint8Array) {
return controler.pausedPull.then(() => origin.pull(docId, state));
},
push(docId: string, data: Uint8Array) {
return controler.pausedPush.then(() => origin.push(docId, data));
},
subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
) {
return origin.subscribe(cb, disconnect);
},
pausePull() {
controler.pausedPull = new Promise(resolve => {
controler.resumePull = resolve;
});
},
resumePull() {
controler.resumePull?.();
},
pausePush() {
controler.pausedPush = new Promise(resolve => {
controler.resumePush = resolve;
});
},
resumePush() {
controler.resumePush?.();
},
};
}

View File

@@ -3,13 +3,27 @@ import { Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import type { Storage } from '../storage';
import { SyncPeer, SyncPeerStep } from './peer';
import { SyncPeer, type SyncPeerStatus, SyncPeerStep } from './peer';
export const MANUALLY_STOP = 'manually-stop';
export enum SyncEngineStep {
Stopped = 0,
Syncing = 1,
Synced = 2,
}
export interface SyncEngineStatus {
step: SyncEngineStep;
local: SyncPeerStatus | null;
remotes: (SyncPeerStatus | null)[];
retrying: boolean;
}
/**
* # SyncEngine
*
* ```
* ┌────────────┐
* │ SyncEngine │
* └─────┬──────┘
@@ -25,6 +39,7 @@ export const MANUALLY_STOP = 'manually-stop';
* │ SyncPeer │ │ SyncPeer │ │ SyncPeer │
* │ Remote │ │ Remote │ │ Remote │
* └────────────┘ └────────────┘ └────────────┘
* ```
*
* Sync engine manage sync peers
*
@@ -34,29 +49,18 @@ export const MANUALLY_STOP = 'manually-stop';
* 3. start remote sync
* 4. continuously sync local and remote
*/
export enum SyncEngineStatus {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Syncing = 4,
Synced = 5,
}
export class SyncEngine {
get rootDocId() {
return this.rootDoc.guid;
}
logger = new DebugLogger('affine:sync-engine:' + this.rootDocId);
private _status = SyncEngineStatus.Stopped;
private _status: SyncEngineStatus;
onStatusChange = new Slot<SyncEngineStatus>();
private set status(s: SyncEngineStatus) {
if (s !== this._status) {
this.logger.info('status change', SyncEngineStatus[s]);
this._status = s;
this.onStatusChange.emit(s);
}
this.logger.info('status change', SyncEngineStep[s.step]);
this._status = s;
this.onStatusChange.emit(s);
}
get status() {
@@ -69,15 +73,21 @@ export class SyncEngine {
private rootDoc: Doc,
private local: Storage,
private remotes: Storage[]
) {}
) {
this._status = {
step: SyncEngineStep.Stopped,
local: null,
remotes: remotes.map(() => null),
retrying: false,
};
}
start() {
if (this.status !== SyncEngineStatus.Stopped) {
if (this.status.step !== SyncEngineStep.Stopped) {
this.stop();
}
this.abort = new AbortController();
this.status = SyncEngineStatus.LoadingRootDoc;
this.sync(this.abort.signal).catch(err => {
// should never reach here
this.logger.error(err);
@@ -86,37 +96,54 @@ export class SyncEngine {
stop() {
this.abort.abort(MANUALLY_STOP);
this.status = SyncEngineStatus.Stopped;
this._status = {
step: SyncEngineStep.Stopped,
local: null,
remotes: this.remotes.map(() => null),
retrying: false,
};
}
// main sync process, should never return until abort
async sync(signal: AbortSignal) {
let localPeer: SyncPeer | null = null;
const remotePeers: SyncPeer[] = [];
const state: {
localPeer: SyncPeer | null;
remotePeers: (SyncPeer | null)[];
} = {
localPeer: null,
remotePeers: this.remotes.map(() => null),
};
const cleanUp: (() => void)[] = [];
try {
// Step 1: start local sync peer
localPeer = new SyncPeer(this.rootDoc, this.local);
state.localPeer = new SyncPeer(this.rootDoc, this.local);
// Step 2: wait for local sync complete
await localPeer.waitForLoaded(signal);
// Step 3: start remote sync peer
remotePeers.push(
...this.remotes.map(remote => new SyncPeer(this.rootDoc, remote))
cleanUp.push(
state.localPeer.onStatusChange.on(() => {
if (!signal.aborted)
this.updateSyncingState(state.localPeer, state.remotePeers);
}).dispose
);
const peers = [localPeer, ...remotePeers];
this.updateSyncingState(state.localPeer, state.remotePeers);
this.updateSyncingState(peers);
// Step 2: wait for local sync complete
await state.localPeer.waitForLoaded(signal);
for (const peer of peers) {
// Step 3: start remote sync peer
state.remotePeers = this.remotes.map(remote => {
const peer = new SyncPeer(this.rootDoc, remote);
cleanUp.push(
peer.onStatusChange.on(() => {
if (!signal.aborted) this.updateSyncingState(peers);
if (!signal.aborted)
this.updateSyncingState(state.localPeer, state.remotePeers);
}).dispose
);
}
return peer;
});
this.updateSyncingState(state.localPeer, state.remotePeers);
// Step 4: continuously sync local and remote
@@ -136,9 +163,9 @@ export class SyncEngine {
throw error;
} finally {
// stop peers
localPeer?.stop();
for (const remotePeer of remotePeers) {
remotePeer.stop();
state.localPeer?.stop();
for (const remotePeer of state.remotePeers) {
remotePeer?.stop();
}
for (const clean of cleanUp) {
clean();
@@ -146,43 +173,33 @@ export class SyncEngine {
}
}
updateSyncingState(peers: SyncPeer[]) {
let status = SyncEngineStatus.Synced;
for (const peer of peers) {
if (peer.status.step !== SyncPeerStep.Synced) {
status = SyncEngineStatus.Syncing;
updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) {
let step = SyncEngineStep.Synced;
const allPeer = [local, ...remotes];
for (const peer of allPeer) {
if (!peer || peer.status.step !== SyncPeerStep.Synced) {
step = SyncEngineStep.Syncing;
break;
}
}
for (const peer of peers) {
if (peer.status.step === SyncPeerStep.LoadingSubDoc) {
status = SyncEngineStatus.LoadingSubDoc;
break;
}
}
for (const peer of peers) {
if (peer.status.step === SyncPeerStep.LoadingRootDoc) {
status = SyncEngineStatus.LoadingRootDoc;
break;
}
}
for (const peer of peers) {
if (peer.status.step === SyncPeerStep.Retrying) {
status = SyncEngineStatus.Retrying;
break;
}
}
this.status = status;
this.status = {
step,
local: local?.status ?? null,
remotes: remotes.map(peer => peer?.status ?? null),
retrying: allPeer.some(
peer => peer?.status.step === SyncPeerStep.Retrying
),
};
}
async waitForSynced(abort?: AbortSignal) {
if (this.status == SyncEngineStatus.Synced) {
if (this.status.step == SyncEngineStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status == SyncEngineStatus.Synced) {
if (status.step == SyncEngineStep.Synced) {
resolve();
}
});
@@ -200,13 +217,18 @@ export class SyncEngine {
}
async waitForLoadedRootDoc(abort?: AbortSignal) {
if (this.status > SyncEngineStatus.LoadingRootDoc) {
function isLoadedRootDoc(status: SyncEngineStatus) {
return ![status.local, ...status.remotes].some(
peer => !peer || peer.step <= SyncPeerStep.LoadingRootDoc
);
}
if (isLoadedRootDoc(this.status)) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status > SyncEngineStatus.LoadingRootDoc) {
if (isLoadedRootDoc(status)) {
resolve();
}
});