diff --git a/packages/common/nbstore/src/frontend/doc.ts b/packages/common/nbstore/src/frontend/doc.ts index f9ffa6a55a..1f2f1e559a 100644 --- a/packages/common/nbstore/src/frontend/doc.ts +++ b/packages/common/nbstore/src/frontend/doc.ts @@ -1,13 +1,16 @@ import { groupBy } from 'lodash-es'; import { nanoid } from 'nanoid'; -import type { Subscription } from 'rxjs'; import { combineLatest, + filter, + first, + lastValueFrom, map, Observable, ReplaySubject, share, Subject, + throttleTime, } from 'rxjs'; import { applyUpdate, @@ -22,6 +25,7 @@ import type { DocRecord, DocStorage } from '../storage'; import type { DocSync } from '../sync/doc'; import { AsyncPriorityQueue } from '../utils/async-priority-queue'; import { isEmptyUpdate } from '../utils/is-empty-update'; +import { takeUntilAbort } from '../utils/take-until-abort'; import { MANUALLY_STOP, throwIfAborted } from '../utils/throw-if-aborted'; const NBSTORE_ORIGIN = 'nbstore-frontend'; @@ -86,6 +90,10 @@ export type DocFrontendState = { * number of docs that have been loaded to yjs doc instance */ loaded: number; + /** + * some data is being applied to yjs doc instance, or some data is being saved to local doc storage + */ + updating: boolean; /** * number of docs that are syncing with remote peers */ @@ -128,7 +136,7 @@ export class DocFrontend { readonly options: DocFrontendOptions = {} ) {} - docState$(docId: string): Observable { + private _docState$(docId: string): Observable { const frontendState$ = new Observable<{ ready: boolean; loaded: boolean; @@ -160,24 +168,38 @@ export class DocFrontend { ); } - state$ = combineLatest([ - new Observable<{ total: number; loaded: number }>(subscriber => { - const next = () => { - subscriber.next({ - total: this.status.docs.size, - loaded: this.status.connectedDocs.size, - }); - }; - next(); - return this.statusUpdatedSubject$.subscribe(() => { + docState$(docId: string): Observable { + return this._docState$(docId).pipe( + throttleTime(1000, undefined, { + trailing: true, + leading: true, + }) + ); + } + + private readonly _state$ = combineLatest([ + new Observable<{ total: number; loaded: number; updating: boolean }>( + subscriber => { + const next = () => { + subscriber.next({ + total: this.status.docs.size, + loaded: this.status.connectedDocs.size, + updating: + this.status.jobMap.size > 0 || this.status.currentJob !== null, + }); + }; next(); - }); - }), + return this.statusUpdatedSubject$.subscribe(() => { + next(); + }); + } + ), this.sync.state$, ]).pipe( map(([frontend, sync]) => ({ total: sync.total ?? frontend.total, loaded: frontend.loaded, + updating: frontend.updating, syncing: sync.syncing, synced: sync.synced, syncRetrying: sync.retrying, @@ -188,6 +210,13 @@ export class DocFrontend { }) ) satisfies Observable; + state$ = this._state$.pipe( + throttleTime(1000, undefined, { + leading: true, + trailing: true, + }) + ); + start() { if (this.abort.signal.aborted) { throw new Error('doc frontend can only start once'); @@ -463,96 +492,43 @@ ${changedList} return merge(updates.filter(bin => !isEmptyUpdate(bin))); } - async waitForSynced(abort?: AbortSignal) { - let sub: Subscription | undefined = undefined; - return Promise.race([ - new Promise(resolve => { - sub = this.state$?.subscribe(status => { - if (status.synced) { - resolve(); - } - }); - }), - new Promise((_, reject) => { - if (abort?.aborted) { - reject(abort?.reason); - } - abort?.addEventListener('abort', () => { - reject(abort.reason); - }); - }), - ]).finally(() => { - sub?.unsubscribe(); - }); + async waitForUpdated(docId?: string, abort?: AbortSignal) { + const source$: Observable = docId + ? this._docState$(docId) + : this._state$; + await lastValueFrom( + source$.pipe( + filter(status => !status.updating), + takeUntilAbort(abort), + first() + ) + ); + return; } async waitForDocLoaded(docId: string, abort?: AbortSignal) { - let sub: Subscription | undefined = undefined; - return Promise.race([ - new Promise(resolve => { - sub = this.docState$(docId).subscribe(state => { - if (state.loaded) { - resolve(); - } - }); - }), - new Promise((_, reject) => { - if (abort?.aborted) { - reject(abort?.reason); - } - abort?.addEventListener('abort', () => { - reject(abort.reason); - }); - }), - ]).finally(() => { - sub?.unsubscribe(); - }); + await lastValueFrom( + this._docState$(docId).pipe( + filter(state => state.loaded), + takeUntilAbort(abort), + first() + ) + ); } - async waitForDocSynced(docId: string, abort?: AbortSignal) { - let sub: Subscription | undefined = undefined; - return Promise.race([ - new Promise(resolve => { - sub = this.docState$(docId).subscribe(state => { - if (state.synced && !state.updating) { - resolve(); - } - }); - }), - new Promise((_, reject) => { - if (abort?.aborted) { - reject(abort?.reason); - } - abort?.addEventListener('abort', () => { - reject(abort.reason); - }); - }), - ]).finally(() => { - sub?.unsubscribe(); - }); + async waitForSynced(docId?: string, abort?: AbortSignal) { + await this.waitForUpdated(docId, abort); + await this.sync.waitForSynced(docId, abort); } async waitForDocReady(docId: string, abort?: AbortSignal) { - let sub: Subscription | undefined = undefined; - return Promise.race([ - new Promise(resolve => { - sub = this.docState$(docId).subscribe(state => { - if (state.ready) { - resolve(); - } - }); - }), - new Promise((_, reject) => { - if (abort?.aborted) { - reject(abort?.reason); - } - abort?.addEventListener('abort', () => { - reject(abort.reason); - }); - }), - ]).finally(() => { - sub?.unsubscribe(); - }); + await lastValueFrom( + this._docState$(docId).pipe( + filter(state => state.ready), + takeUntilAbort(abort), + first() + ) + ); } async resetSync() { diff --git a/packages/common/nbstore/src/sync/doc/index.ts b/packages/common/nbstore/src/sync/doc/index.ts index 52b254f70d..40fae0f82b 100644 --- a/packages/common/nbstore/src/sync/doc/index.ts +++ b/packages/common/nbstore/src/sync/doc/index.ts @@ -1,9 +1,20 @@ import type { Observable } from 'rxjs'; -import { combineLatest, map, of, ReplaySubject, share } from 'rxjs'; +import { + combineLatest, + filter, + first, + lastValueFrom, + map, + of, + ReplaySubject, + share, + throttleTime, +} from 'rxjs'; import type { DocStorage, DocSyncStorage } from '../../storage'; import { DummyDocStorage } from '../../storage/dummy/doc'; import { DummyDocSyncStorage } from '../../storage/dummy/doc-sync'; +import { takeUntilAbort } from '../../utils/take-until-abort'; import { MANUALLY_STOP } from '../../utils/throw-if-aborted'; import type { PeerStorageOptions } from '../types'; import { DocSyncPeer } from './peer'; @@ -26,6 +37,7 @@ export interface DocSyncDocState { export interface DocSync { readonly state$: Observable; docState$(docId: string): Observable; + waitForSynced(docId?: string, abort?: AbortSignal): Promise; addPriority(id: string, priority: number): () => void; resetSync(): Promise; } @@ -39,7 +51,9 @@ export class DocSyncImpl implements DocSync { ); private abort: AbortController | null = null; - state$ = combineLatest(this.peers.map(peer => peer.peerState$)).pipe( + private readonly _state$ = combineLatest( + this.peers.map(peer => peer.peerState$) + ).pipe( map(allPeers => allPeers.length === 0 ? { @@ -66,6 +80,14 @@ export class DocSyncImpl implements DocSync { }) ) as Observable; + state$ = this._state$.pipe( + // throttle the state to 1 second to avoid spamming the UI + throttleTime(1000, undefined, { + leading: true, + trailing: true, + }) + ); + constructor( readonly storages: PeerStorageOptions, readonly sync: DocSyncStorage @@ -84,7 +106,7 @@ export class DocSyncImpl implements DocSync { ); } - docState$(docId: string): Observable { + private _docState$(docId: string): Observable { if (this.peers.length === 0) { return of({ errorMessage: null, @@ -106,6 +128,29 @@ export class DocSyncImpl implements DocSync { ); } + docState$(docId: string): Observable { + return this._docState$(docId).pipe( + // throttle the state to 1 second to avoid spamming the UI + throttleTime(1000, undefined, { + leading: true, + trailing: true, + }) + ); + } + + async waitForSynced(docId?: string, abort?: AbortSignal): Promise { + const source$: Observable = docId + ? this._docState$(docId) + : this._state$; + await lastValueFrom( + source$.pipe( + filter(state => state.synced), + takeUntilAbort(abort), + first() + ) + ); + } + start() { if (this.abort) { this.abort.abort(MANUALLY_STOP); diff --git a/packages/common/nbstore/src/sync/indexer/index.ts b/packages/common/nbstore/src/sync/indexer/index.ts index b2adfe762f..8d3af17fe0 100644 --- a/packages/common/nbstore/src/sync/indexer/index.ts +++ b/packages/common/nbstore/src/sync/indexer/index.ts @@ -2,6 +2,7 @@ import { readAllDocsFromRootDoc } from '@affine/reader'; import { filter, first, + lastValueFrom, Observable, ReplaySubject, share, @@ -71,60 +72,37 @@ export class IndexerSyncImpl implements IndexerSync { state$ = this.status.state$.pipe( // throttle the state to 1 second to avoid spamming the UI - throttleTime(1000) + throttleTime(1000, undefined, { + leading: true, + trailing: true, + }) ); docState$(docId: string) { return this.status.docState$(docId).pipe( // throttle the state to 1 second to avoid spamming the UI - throttleTime(1000) + throttleTime(1000, undefined, { leading: true, trailing: true }) ); } - waitForCompleted(signal?: AbortSignal) { - return new Promise((resolve, reject) => { - this.status.state$ - .pipe( - filter(state => state.completed), - takeUntilAbort(signal), - first() - ) - .subscribe({ - next: () => { - resolve(); - }, - error: err => { - reject(err); - }, - }); - }); - } - - waitForDocCompleted(docId: string, signal?: AbortSignal) { - return new Promise((resolve, reject) => { - this.status - .docState$(docId) - .pipe( - filter(state => state.completed), - takeUntilAbort(signal), - first() - ) - .subscribe({ - next: () => { - resolve(); - }, - error: err => { - reject(err); - }, - }); - }); - } - - readonly interval = () => - new Promise(resolve => - requestIdleCallback(() => resolve(), { - timeout: 200, - }) + async waitForCompleted(signal?: AbortSignal) { + await lastValueFrom( + this.status.state$.pipe( + filter(state => state.completed), + takeUntilAbort(signal), + first() + ) ); + } + + async waitForDocCompleted(docId: string, signal?: AbortSignal) { + await lastValueFrom( + this.status.docState$(docId).pipe( + filter(state => state.completed), + takeUntilAbort(signal), + first() + ) + ); + } constructor( readonly doc: DocStorage, diff --git a/packages/common/nbstore/src/worker/client.ts b/packages/common/nbstore/src/worker/client.ts index d9b104a2bd..b793fcaff6 100644 --- a/packages/common/nbstore/src/worker/client.ts +++ b/packages/common/nbstore/src/worker/client.ts @@ -242,6 +242,10 @@ class WorkerDocSync implements DocSync { return this.client.ob$('docSync.docState', docId); } + async waitForSynced(docId?: string, abort?: AbortSignal): Promise { + await this.client.call('docSync.waitForSynced', docId ?? null, abort); + } + addPriority(docId: string, priority: number) { const subscription = this.client .ob$('docSync.addPriority', { docId, priority }) diff --git a/packages/common/nbstore/src/worker/consumer.ts b/packages/common/nbstore/src/worker/consumer.ts index f9105ec2d7..a0b197c804 100644 --- a/packages/common/nbstore/src/worker/consumer.ts +++ b/packages/common/nbstore/src/worker/consumer.ts @@ -212,6 +212,8 @@ class StoreConsumer { const undo = this.docSync.addPriority(docId, priority); return () => undo(); }), + 'docSync.waitForSynced': (docId, ctx) => + this.docSync.waitForSynced(docId ?? undefined, ctx.signal), 'docSync.resetSync': () => this.docSync.resetSync(), 'blobSync.state': () => this.blobSync.state$, 'blobSync.blobState': blobId => this.blobSync.blobState$(blobId), diff --git a/packages/common/nbstore/src/worker/ops.ts b/packages/common/nbstore/src/worker/ops.ts index a2e1a9ea14..e71b37dae6 100644 --- a/packages/common/nbstore/src/worker/ops.ts +++ b/packages/common/nbstore/src/worker/ops.ts @@ -102,6 +102,7 @@ interface GroupedWorkerOps { docSync: { state: [void, DocSyncState]; docState: [string, DocSyncDocState]; + waitForSynced: [string | null, void]; addPriority: [{ docId: string; priority: number }, boolean]; resetSync: [void, void]; }; diff --git a/packages/frontend/core/src/components/workspace-selector/workspace-card/index.tsx b/packages/frontend/core/src/components/workspace-selector/workspace-card/index.tsx index 2a8e80941c..ee27aa5c04 100644 --- a/packages/frontend/core/src/components/workspace-selector/workspace-card/index.tsx +++ b/packages/frontend/core/src/components/workspace-selector/workspace-card/index.tsx @@ -88,7 +88,7 @@ const useSyncEngineSyncProgress = (meta: WorkspaceMetadata) => { const engineState = useLiveData( useMemo(() => { return workspace - ? LiveData.from(workspace.engine.doc.state$, null).throttleTime(500) + ? LiveData.from(workspace.engine.doc.state$, null) : null; }, [workspace]) ); diff --git a/packages/frontend/core/src/modules/blob-management/entity/unused-blobs.ts b/packages/frontend/core/src/modules/blob-management/entity/unused-blobs.ts index 6f8067f7cf..f5f493eb84 100644 --- a/packages/frontend/core/src/modules/blob-management/entity/unused-blobs.ts +++ b/packages/frontend/core/src/modules/blob-management/entity/unused-blobs.ts @@ -8,15 +8,7 @@ import { onStart, } from '@toeverything/infra'; import { fileTypeFromBuffer } from 'file-type'; -import { - filter, - firstValueFrom, - fromEvent, - map, - switchMap, - takeUntil, - tap, -} from 'rxjs'; +import { switchMap, tap } from 'rxjs'; import type { DocsSearchService } from '../../docs-search'; import type { WorkspaceService } from '../../workspace'; @@ -85,15 +77,7 @@ export class UnusedBlobs extends Entity { async getUnusedBlobs(abortSignal?: AbortSignal) { // Wait for both sync and indexing to complete - const ready$ = this.workspaceService.workspace.engine.doc.state$ - .pipe(filter(state => state.syncing === 0 && !state.syncRetrying)) - .pipe(map(() => true)); - - await firstValueFrom( - abortSignal - ? ready$.pipe(takeUntil(fromEvent(abortSignal, 'abort'))) - : ready$ - ); + await this.workspaceService.workspace.engine.doc.waitForSynced(); await this.docsSearchService.indexer.waitForCompleted(abortSignal); diff --git a/packages/frontend/core/src/modules/import-clipper/services/import.ts b/packages/frontend/core/src/modules/import-clipper/services/import.ts index e316081c93..1ed6036d8a 100644 --- a/packages/frontend/core/src/modules/import-clipper/services/import.ts +++ b/packages/frontend/core/src/modules/import-clipper/services/import.ts @@ -44,8 +44,8 @@ export class ImportClipperService extends Service { docsService.list.setPrimaryMode(docId, 'page'); workspace.engine.doc.addPriority(workspace.id, 100); workspace.engine.doc.addPriority(docId, 100); - await workspace.engine.doc.waitForDocSynced(workspace.id); - await workspace.engine.doc.waitForDocSynced(docId); + await workspace.engine.doc.waitForSynced(workspace.id); + await workspace.engine.doc.waitForSynced(docId); disposeWorkspace(); return docId; } else { diff --git a/tests/affine-local/e2e/local-first-avatar.spec.ts b/tests/affine-local/e2e/local-first-avatar.spec.ts index 37e4253dcb..c6928e7fda 100644 --- a/tests/affine-local/e2e/local-first-avatar.spec.ts +++ b/tests/affine-local/e2e/local-first-avatar.spec.ts @@ -33,7 +33,7 @@ test('should create a page with a local first avatar and remove it', async ({ .nth(0) .getByTestId('workspace-avatar') .click(); - await page.waitForTimeout(1000); + await page.waitForTimeout(2000); await page.getByTestId('workspace-name').click(); await page .getByTestId('workspace-card')