From 7878ce5c2c5b974afd464b0c9210cb8d1aaaf7af Mon Sep 17 00:00:00 2001 From: EYHN Date: Mon, 4 Dec 2023 11:32:10 +0000 Subject: [PATCH] feat(workspace): priority load opened page (#5156) --- .../core/src/pages/workspace/detail-page.tsx | 14 ++++- .../workspace/src/providers/sync/engine.ts | 15 +++++- .../workspace/src/providers/sync/peer.ts | 52 ++++++++++--------- .../src/providers/utils/async-queue.ts | 41 +++++++++++++-- tests/affine-migration/e2e/basic.spec.ts | 2 + 5 files changed, 93 insertions(+), 31 deletions(-) diff --git a/packages/frontend/core/src/pages/workspace/detail-page.tsx b/packages/frontend/core/src/pages/workspace/detail-page.tsx index bc8cd4996c..10ff6ebdff 100644 --- a/packages/frontend/core/src/pages/workspace/detail-page.tsx +++ b/packages/frontend/core/src/pages/workspace/detail-page.tsx @@ -27,7 +27,10 @@ import { AffineErrorBoundary } from '../../components/affine/affine-error-bounda import { GlobalPageHistoryModal } from '../../components/affine/page-history-modal'; import { WorkspaceHeader } from '../../components/workspace-header'; import { useRegisterBlocksuiteEditorCommands } from '../../hooks/affine/use-register-blocksuite-editor-commands'; -import { useCurrentSyncEngineStatus } from '../../hooks/current/use-current-sync-engine'; +import { + useCurrentSyncEngine, + useCurrentSyncEngineStatus, +} from '../../hooks/current/use-current-sync-engine'; import { useCurrentWorkspace } from '../../hooks/current/use-current-workspace'; import { useNavigateHelper } from '../../hooks/use-navigate-helper'; import { performanceRenderLogger } from '../../shared'; @@ -111,9 +114,18 @@ const DetailPageImpl = (): ReactElement => { export const DetailPage = (): ReactElement => { const [currentWorkspace] = useCurrentWorkspace(); const currentSyncEngineStatus = useCurrentSyncEngineStatus(); + const currentSyncEngine = useCurrentSyncEngine(); const currentPageId = useAtomValue(currentPageIdAtom); const [page, setPage] = useState(null); + // set sync engine priority target + useEffect(() => { + if (!currentPageId) { + return; + } + currentSyncEngine?.setPriorityRule(id => id.endsWith(currentPageId)); + }, [currentPageId, currentSyncEngine, currentWorkspace]); + // load page by current page id useEffect(() => { if (!currentPageId) { diff --git a/packages/frontend/workspace/src/providers/sync/engine.ts b/packages/frontend/workspace/src/providers/sync/engine.ts index fca3e2c74a..6acfee83f0 100644 --- a/packages/frontend/workspace/src/providers/sync/engine.ts +++ b/packages/frontend/workspace/src/providers/sync/engine.ts @@ -3,6 +3,7 @@ import { Slot } from '@blocksuite/global/utils'; import type { Doc } from 'yjs'; import type { Storage } from '../storage'; +import { SharedPriorityTarget } from '../utils/async-queue'; import { MANUALLY_STOP, SyncEngineStep } from './consts'; import { SyncPeer, type SyncPeerStatus, SyncPeerStep } from './peer'; @@ -56,6 +57,8 @@ export class SyncEngine { this.onStatusChange.emit(s); } + priorityTarget = new SharedPriorityTarget(); + get status() { return this._status; } @@ -110,7 +113,11 @@ export class SyncEngine { const cleanUp: (() => void)[] = []; try { // Step 1: start local sync peer - state.localPeer = new SyncPeer(this.rootDoc, this.local); + state.localPeer = new SyncPeer( + this.rootDoc, + this.local, + this.priorityTarget + ); cleanUp.push( state.localPeer.onStatusChange.on(() => { @@ -126,7 +133,7 @@ export class SyncEngine { // Step 3: start remote sync peer state.remotePeers = this.remotes.map(remote => { - const peer = new SyncPeer(this.rootDoc, remote); + const peer = new SyncPeer(this.rootDoc, remote, this.priorityTarget); cleanUp.push( peer.onStatusChange.on(() => { if (!signal.aborted) @@ -237,4 +244,8 @@ export class SyncEngine { ]); } } + + setPriorityRule(target: ((id: string) => boolean) | null) { + this.priorityTarget.priorityRule = target; + } } diff --git a/packages/frontend/workspace/src/providers/sync/peer.ts b/packages/frontend/workspace/src/providers/sync/peer.ts index 5597bb8469..6a1955c4d7 100644 --- a/packages/frontend/workspace/src/providers/sync/peer.ts +++ b/packages/frontend/workspace/src/providers/sync/peer.ts @@ -5,7 +5,7 @@ import type { Doc } from 'yjs'; import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs'; import { mergeUpdates, type Storage } from '../storage'; -import { AsyncQueue } from '../utils/async-queue'; +import { PriorityAsyncQueue, SharedPriorityTarget } from '../utils/async-queue'; import { throwIfAborted } from '../utils/throw-if-aborted'; import { MANUALLY_STOP } from './consts'; @@ -70,7 +70,8 @@ export class SyncPeer { constructor( private readonly rootDoc: Doc, - private readonly storage: Storage + private readonly storage: Storage, + private readonly priorityTarget = new SharedPriorityTarget() ) { this.logger.debug('peer start'); @@ -152,24 +153,24 @@ export class SyncPeer { private readonly state: { connectedDocs: Map; - pushUpdatesQueue: AsyncQueue<{ - docId: string; + pushUpdatesQueue: PriorityAsyncQueue<{ + id: string; data: Uint8Array[]; }>; pushingUpdate: boolean; - pullUpdatesQueue: AsyncQueue<{ - docId: string; + pullUpdatesQueue: PriorityAsyncQueue<{ + id: string; data: Uint8Array; }>; subdocLoading: boolean; - subdocsLoadQueue: AsyncQueue; + subdocsLoadQueue: PriorityAsyncQueue<{ id: string; doc: Doc }>; } = { connectedDocs: new Map(), - pushUpdatesQueue: new AsyncQueue(), + pushUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget), pushingUpdate: false, - pullUpdatesQueue: new AsyncQueue(), + pullUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget), subdocLoading: false, - subdocsLoadQueue: new AsyncQueue(), + subdocsLoadQueue: new PriorityAsyncQueue([], this.priorityTarget), }; initState() { @@ -211,7 +212,10 @@ export class SyncPeer { // Step 2: load subdocs this.state.subdocsLoadQueue.push( - ...Array.from(this.rootDoc.getSubdocs()) + ...Array.from(this.rootDoc.getSubdocs()).map(doc => ({ + id: doc.guid, + doc, + })) ); this.reportSyncStatus(); @@ -227,7 +231,7 @@ export class SyncPeer { ); this.state.subdocLoading = true; this.reportSyncStatus(); - await this.connectDoc(subdoc, abortInner.signal); + await this.connectDoc(subdoc.doc, abortInner.signal); this.state.subdocLoading = false; this.reportSyncStatus(); } @@ -235,7 +239,7 @@ export class SyncPeer { // pull updates (async () => { while (throwIfAborted(abortInner.signal)) { - const { docId, data } = await this.state.pullUpdatesQueue.next( + const { id, data } = await this.state.pullUpdatesQueue.next( abortInner.signal ); // don't apply empty data or Uint8Array([0, 0]) @@ -245,7 +249,7 @@ export class SyncPeer { (data.byteLength === 2 && data[0] === 0 && data[1] === 0) ) ) { - const subdoc = this.state.connectedDocs.get(docId); + const subdoc = this.state.connectedDocs.get(id); if (subdoc) { applyUpdate(subdoc, data, this.name); } @@ -256,7 +260,7 @@ export class SyncPeer { // push updates (async () => { while (throwIfAborted(abortInner.signal)) { - const { docId, data } = await this.state.pushUpdatesQueue.next( + const { id, data } = await this.state.pushUpdatesQueue.next( abortInner.signal ); this.state.pushingUpdate = true; @@ -271,7 +275,7 @@ export class SyncPeer { (merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0) ) ) { - await this.storage.push(docId, merged); + await this.storage.push(id, merged); } this.state.pushingUpdate = false; @@ -299,7 +303,7 @@ export class SyncPeer { // diff root doc and in-storage, save updates to pendingUpdates this.state.pushUpdatesQueue.push({ - docId: doc.guid, + id: doc.guid, data: [encodeStateAsUpdate(doc, inStorageState)], }); @@ -327,14 +331,12 @@ export class SyncPeer { return; } - const exist = this.state.pushUpdatesQueue.find( - ({ docId }) => docId === doc.guid - ); + const exist = this.state.pushUpdatesQueue.find(({ id }) => id === doc.guid); if (exist) { exist.data.push(update); } else { this.state.pushUpdatesQueue.push({ - docId: doc.guid, + id: doc.guid, data: [update], }); } @@ -351,20 +353,20 @@ export class SyncPeer { removed: Set; }) => { for (const subdoc of added) { - this.state.subdocsLoadQueue.push(subdoc); + this.state.subdocsLoadQueue.push({ id: subdoc.guid, doc: subdoc }); } for (const subdoc of removed) { this.disconnectDoc(subdoc); - this.state.subdocsLoadQueue.remove(doc => doc === subdoc); + this.state.subdocsLoadQueue.remove(doc => doc.doc === subdoc); } this.reportSyncStatus(); }; // handle updates from storage - handleStorageUpdates = (docId: string, data: Uint8Array) => { + handleStorageUpdates = (id: string, data: Uint8Array) => { this.state.pullUpdatesQueue.push({ - docId, + id, data, }); this.reportSyncStatus(); diff --git a/packages/frontend/workspace/src/providers/utils/async-queue.ts b/packages/frontend/workspace/src/providers/utils/async-queue.ts index db29b8d43e..e7f994a39b 100644 --- a/packages/frontend/workspace/src/providers/utils/async-queue.ts +++ b/packages/frontend/workspace/src/providers/utils/async-queue.ts @@ -12,8 +12,11 @@ export class AsyncQueue { return this._queue.length; } - async next(abort?: AbortSignal): Promise { - const update = this._queue.shift(); + async next( + abort?: AbortSignal, + dequeue: (arr: T[]) => T | undefined = a => a.shift() + ): Promise { + const update = dequeue(this._queue); if (update) { return update; } else { @@ -35,7 +38,7 @@ export class AsyncQueue { }), ]); - return this.next(abort); + return this.next(abort, dequeue); } } @@ -64,3 +67,35 @@ export class AsyncQueue { this._queue = []; } } + +export class PriorityAsyncQueue< + T extends { id: string }, +> extends AsyncQueue { + constructor( + init: T[] = [], + public readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget() + ) { + super(init); + } + + override next(abort?: AbortSignal | undefined): Promise { + return super.next(abort, arr => { + if (this.priorityTarget.priorityRule !== null) { + const index = arr.findIndex( + update => this.priorityTarget.priorityRule?.(update.id) + ); + if (index !== -1) { + return arr.splice(index, 1)[0]; + } + } + return arr.shift(); + }); + } +} + +/** + * Shared priority target can be shared by multiple queues. + */ +export class SharedPriorityTarget { + public priorityRule: ((id: string) => boolean) | null = null; +} diff --git a/tests/affine-migration/e2e/basic.spec.ts b/tests/affine-migration/e2e/basic.spec.ts index 83d302db27..be1916b354 100644 --- a/tests/affine-migration/e2e/basic.spec.ts +++ b/tests/affine-migration/e2e/basic.spec.ts @@ -89,6 +89,8 @@ test('v3 to v4, surface migration', async ({ page }) => { await page.getByTestId('upgrade-workspace-button').click(); await waitForEditorLoad(page); + await page.waitForTimeout(500); + // check edgeless mode is correct await clickEdgelessModeButton(page); await expect(page.locator('edgeless-toolbar')).toBeVisible();