From a759a1988e567e5bc7459d56fa2be3022cd7a663 Mon Sep 17 00:00:00 2001 From: EYHN Date: Thu, 10 Apr 2025 16:05:46 +0800 Subject: [PATCH] fix(nbstore): connect before do operation (#11569) --- packages/common/nbstore/src/frontend/blob.ts | 7 ++ .../common/nbstore/src/frontend/indexer.ts | 56 ++++++++++++++-- packages/common/nbstore/src/impls/idb/doc.ts | 3 +- .../nbstore/src/impls/idb/indexer/index.ts | 6 +- .../nbstore/src/impls/idb/indexer/utils.ts | 31 +-------- .../common/nbstore/src/utils/from-promise.ts | 30 +++++++++ packages/common/nbstore/src/worker/client.ts | 64 ++++++++++++------- .../common/nbstore/src/worker/consumer.ts | 52 ++++----------- packages/common/nbstore/src/worker/ops.ts | 5 +- 9 files changed, 150 insertions(+), 104 deletions(-) create mode 100644 packages/common/nbstore/src/utils/from-promise.ts diff --git a/packages/common/nbstore/src/frontend/blob.ts b/packages/common/nbstore/src/frontend/blob.ts index f2ce64cbe8..011234c4ca 100644 --- a/packages/common/nbstore/src/frontend/blob.ts +++ b/packages/common/nbstore/src/frontend/blob.ts @@ -16,6 +16,7 @@ export class BlobFrontend { } async get(blobId: string) { + await this.waitForConnected(); await using lock = await this.lock.lock('blob', blobId); const local = await this.storage.get(blobId); if (local) { @@ -30,6 +31,7 @@ export class BlobFrontend { } async set(blob: BlobRecord) { + await this.waitForConnected(); if (blob.data.byteLength > this.maxBlobSize) { for (const cb of this.onReachedMaxBlobSizeCallbacks) { cb(blob.data.byteLength); @@ -57,6 +59,7 @@ export class BlobFrontend { * @throws This method will throw an error if the blob is not found locally, if the upload is aborted, or if it fails due to storage limitations. */ async upload(blobIdOrRecord: string | BlobRecord): Promise { + await this.waitForConnected(); const blob = typeof blobIdOrRecord === 'string' ? await this.storage.get(blobIdOrRecord) @@ -84,4 +87,8 @@ export class BlobFrontend { this.onReachedMaxBlobSizeCallbacks.add(cb); return () => this.onReachedMaxBlobSizeCallbacks.delete(cb); } + + private waitForConnected(signal?: AbortSignal) { + return this.storage.connection.waitForConnected(signal); + } } diff --git a/packages/common/nbstore/src/frontend/indexer.ts b/packages/common/nbstore/src/frontend/indexer.ts index 75862ea492..0ba6ccd007 100644 --- a/packages/common/nbstore/src/frontend/indexer.ts +++ b/packages/common/nbstore/src/frontend/indexer.ts @@ -1,5 +1,14 @@ -import type { IndexerStorage } from '../storage'; +import { switchMap } from 'rxjs'; + +import type { + AggregateOptions, + IndexerSchema, + IndexerStorage, + Query, + SearchOptions, +} from '../storage'; import type { IndexerSync } from '../sync/indexer'; +import { fromPromise } from '../utils/from-promise'; export class IndexerFrontend { constructor( @@ -15,17 +24,50 @@ export class IndexerFrontend { return this.sync.docState$(docId); } - search = this.storage.search.bind(this.storage); - aggregate = this.storage.aggregate.bind(this.storage); - // eslint-disable-next-line rxjs/finnish - search$ = this.storage.search$.bind(this.storage); - // eslint-disable-next-line rxjs/finnish - aggregate$ = this.storage.aggregate$.bind(this.storage); + async search>( + table: T, + query: Query, + options?: O + ) { + await this.waitForConnected(); + return this.storage.search(table, query, options); + } + + async aggregate< + T extends keyof IndexerSchema, + const O extends AggregateOptions, + >(table: T, query: Query, field: keyof IndexerSchema[T], options?: O) { + await this.waitForConnected(); + return this.storage.aggregate(table, query, field, options); + } + + search$>( + table: T, + query: Query, + options?: O + ) { + return fromPromise(signal => this.waitForConnected(signal)).pipe( + switchMap(() => this.storage.search$(table, query, options)) + ); + } + + aggregate$< + T extends keyof IndexerSchema, + const O extends AggregateOptions, + >(table: T, query: Query, field: keyof IndexerSchema[T], options?: O) { + return fromPromise(signal => this.waitForConnected(signal)).pipe( + switchMap(() => this.storage.aggregate$(table, query, field, options)) + ); + } addPriority(docId: string, priority: number) { return this.sync.addPriority(docId, priority); } + private waitForConnected(signal?: AbortSignal) { + return this.storage.connection.waitForConnected(signal); + } + waitForCompleted(signal?: AbortSignal) { return this.sync.waitForCompleted(signal); } diff --git a/packages/common/nbstore/src/impls/idb/doc.ts b/packages/common/nbstore/src/impls/idb/doc.ts index e5abc63f2e..46ca0bc4bc 100644 --- a/packages/common/nbstore/src/impls/idb/doc.ts +++ b/packages/common/nbstore/src/impls/idb/doc.ts @@ -1,3 +1,4 @@ +import { share } from '../../connection'; import { type DocClock, type DocClocks, @@ -17,7 +18,7 @@ interface ChannelMessage { export class IndexedDBDocStorage extends DocStorageBase { static readonly identifier = 'IndexedDBDocStorage'; - readonly connection = new IDBConnection(this.options); + readonly connection = share(new IDBConnection(this.options)); get db() { return this.connection.inner.db; diff --git a/packages/common/nbstore/src/impls/idb/indexer/index.ts b/packages/common/nbstore/src/impls/idb/indexer/index.ts index 587db67a16..6b31666a68 100644 --- a/packages/common/nbstore/src/impls/idb/indexer/index.ts +++ b/packages/common/nbstore/src/impls/idb/indexer/index.ts @@ -1,5 +1,6 @@ import { merge, Observable, of, Subject, throttleTime } from 'rxjs'; +import { share } from '../../../connection'; import type { AggregateOptions, AggregateResult, @@ -10,13 +11,14 @@ import type { SearchResult, } from '../../../storage'; import { IndexerStorageBase } from '../../../storage'; +import { fromPromise } from '../../../utils/from-promise'; import { IDBConnection, type IDBConnectionOptions } from '../db'; import { DataStruct } from './data-struct'; -import { backoffRetry, exhaustMapWithTrailing, fromPromise } from './utils'; +import { backoffRetry, exhaustMapWithTrailing } from './utils'; export class IndexedDBIndexerStorage extends IndexerStorageBase { static readonly identifier = 'IndexedDBIndexerStorage'; - readonly connection = new IDBConnection(this.options); + readonly connection = share(new IDBConnection(this.options)); override isReadonly = false; private readonly data = new DataStruct(); private readonly tableUpdate$ = new Subject(); diff --git a/packages/common/nbstore/src/impls/idb/indexer/utils.ts b/packages/common/nbstore/src/impls/idb/indexer/utils.ts index c30c92d847..d1b76feca4 100644 --- a/packages/common/nbstore/src/impls/idb/indexer/utils.ts +++ b/packages/common/nbstore/src/impls/idb/indexer/utils.ts @@ -3,7 +3,7 @@ import { defer, exhaustMap, finalize, - Observable, + type Observable, type ObservableInput, type OperatorFunction, retry, @@ -14,8 +14,6 @@ import { timer, } from 'rxjs'; -import { MANUALLY_STOP } from '../../../utils/throw-if-aborted'; - /** * Like exhaustMap, but also includes the trailing value emitted from the source observable while waiting for the preceding inner observable to complete * @@ -45,33 +43,6 @@ export function exhaustMapWithTrailing( }); } -/** - * Convert a promise to an observable. - * - * like `from` but support `AbortSignal`. - */ -export function fromPromise( - promise: Promise | ((signal: AbortSignal) => Promise) -): Observable { - return new Observable(subscriber => { - const abortController = new AbortController(); - - const rawPromise = - promise instanceof Function ? promise(abortController.signal) : promise; - - rawPromise - .then(value => { - subscriber.next(value); - subscriber.complete(); - }) - .catch(error => { - subscriber.error(error); - }); - - return () => abortController.abort(MANUALLY_STOP); - }); -} - /** * An operator that retries the source observable when an error occurs. * diff --git a/packages/common/nbstore/src/utils/from-promise.ts b/packages/common/nbstore/src/utils/from-promise.ts new file mode 100644 index 0000000000..dc9dd1ab74 --- /dev/null +++ b/packages/common/nbstore/src/utils/from-promise.ts @@ -0,0 +1,30 @@ +import { Observable } from 'rxjs'; + +import { MANUALLY_STOP } from './throw-if-aborted'; + +/** + * Convert a promise to an observable. + * + * like `from` but support `AbortSignal`. + */ +export function fromPromise( + promise: Promise | ((signal: AbortSignal) => Promise) +): Observable { + return new Observable(subscriber => { + const abortController = new AbortController(); + + const rawPromise = + promise instanceof Function ? promise(abortController.signal) : promise; + + rawPromise + .then(value => { + subscriber.next(value); + subscriber.complete(); + }) + .catch(error => { + subscriber.error(error); + }); + + return () => abortController.abort(MANUALLY_STOP); + }); +} diff --git a/packages/common/nbstore/src/worker/client.ts b/packages/common/nbstore/src/worker/client.ts index 5ded685d22..b179842a82 100644 --- a/packages/common/nbstore/src/worker/client.ts +++ b/packages/common/nbstore/src/worker/client.ts @@ -172,28 +172,14 @@ class WorkerDocConnection extends DummyConnection { super(); } - override waitForConnected(signal?: AbortSignal): Promise { - return new Promise((resolve, reject) => { - const abortListener = () => { - reject(signal?.reason); - subscription.unsubscribe(); - }; + promise: Promise | undefined; - signal?.addEventListener('abort', abortListener); - - const subscription = this.client - .ob$('docStorage.waitForConnected') - .subscribe({ - next() { - signal?.removeEventListener('abort', abortListener); - resolve(); - }, - error(err) { - signal?.removeEventListener('abort', abortListener); - reject(err); - }, - }); - }); + override waitForConnected(): Promise { + if (this.promise) { + return this.promise; + } + this.promise = this.client.call('docStorage.waitForConnected'); + return this.promise; } } @@ -226,7 +212,23 @@ class WorkerBlobStorage implements BlobStorage { return this.client.call('blobStorage.listBlobs'); } - connection = new DummyConnection(); + connection = new WorkerBlobConnection(this.client); +} + +class WorkerBlobConnection extends DummyConnection { + constructor(private readonly client: OpClient) { + super(); + } + + promise: Promise | undefined; + + override waitForConnected(): Promise { + if (this.promise) { + return this.promise; + } + this.promise = this.client.call('blobStorage.waitForConnected'); + return this.promise; + } } class WorkerDocSync implements DocSync { @@ -346,7 +348,7 @@ class WorkerIndexerStorage implements IndexerStorage { constructor(private readonly client: OpClient) {} readonly storageType = 'indexer'; readonly isReadonly = true; - connection = new DummyConnection(); + connection = new WorkerIndexerConnection(this.client); search>( table: T, @@ -421,6 +423,22 @@ class WorkerIndexerStorage implements IndexerStorage { } } +class WorkerIndexerConnection extends DummyConnection { + constructor(private readonly client: OpClient) { + super(); + } + + promise: Promise | undefined; + + override waitForConnected(): Promise { + if (this.promise) { + return this.promise; + } + this.promise = this.client.call('indexerStorage.waitForConnected'); + return this.promise; + } +} + class WorkerIndexerSync implements IndexerSync { constructor(private readonly client: OpClient) {} waitForCompleted(signal?: AbortSignal): Promise { diff --git a/packages/common/nbstore/src/worker/consumer.ts b/packages/common/nbstore/src/worker/consumer.ts index ebe8ea8fc4..0fea4d91fe 100644 --- a/packages/common/nbstore/src/worker/consumer.ts +++ b/packages/common/nbstore/src/worker/consumer.ts @@ -158,26 +158,16 @@ class StoreConsumer { subscriber.next({ update, origin }); }); }), - 'docStorage.waitForConnected': () => - new Observable(subscriber => { - const abortController = new AbortController(); - this.docStorage.connection - .waitForConnected(abortController.signal) - .then(() => { - subscriber.next(true); - subscriber.complete(); - }) - .catch((error: any) => { - subscriber.error(error); - }); - return () => abortController.abort(MANUALLY_STOP); - }), + 'docStorage.waitForConnected': (_, ctx) => + this.docStorage.connection.waitForConnected(ctx.signal), 'blobStorage.getBlob': key => this.blobStorage.get(key), 'blobStorage.setBlob': blob => this.blobStorage.set(blob), 'blobStorage.deleteBlob': ({ key, permanently }) => this.blobStorage.delete(key, permanently), 'blobStorage.releaseBlobs': () => this.blobStorage.release(), 'blobStorage.listBlobs': () => this.blobStorage.list(), + 'blobStorage.waitForConnected': (_, ctx) => + this.blobStorage.connection.waitForConnected(ctx.signal), 'awarenessStorage.update': ({ awareness, origin }) => this.awarenessStorage.update(awareness, origin), 'awarenessStorage.subscribeUpdate': docId => @@ -205,6 +195,8 @@ class StoreConsumer { }), 'awarenessStorage.collect': ({ collectId, awareness }) => collectJobs.get(collectId)?.(awareness), + 'awarenessStorage.waitForConnected': (_, ctx) => + this.awarenessStorage.connection.waitForConnected(ctx.signal), 'docSync.state': () => this.docSync.state$, 'docSync.docState': docId => new Observable(subscriber => { @@ -278,6 +270,8 @@ class StoreConsumer { this.indexerStorage.search$(table, query, options), 'indexerStorage.subscribeAggregate': ({ table, query, field, options }) => this.indexerStorage.aggregate$(table, query, field, options), + 'indexerStorage.waitForConnected': (_, ctx) => + this.indexerStorage.connection.waitForConnected(ctx.signal), 'indexerSync.state': () => this.indexerSync.state$, 'indexerSync.docState': (docId: string) => this.indexerSync.docState$(docId), @@ -286,32 +280,10 @@ class StoreConsumer { const undo = this.indexerSync.addPriority(docId, priority); return () => undo(); }), - 'indexerSync.waitForCompleted': () => - new Observable(subscriber => { - this.indexerSync - .waitForCompleted() - .then(() => { - subscriber.next(); - subscriber.complete(); - }) - .catch(error => { - subscriber.error(error); - }); - }), - 'indexerSync.waitForDocCompleted': (docId: string) => - new Observable(subscriber => { - const abortController = new AbortController(); - this.indexerSync - .waitForDocCompleted(docId, abortController.signal) - .then(() => { - subscriber.next(); - subscriber.complete(); - }) - .catch(error => { - subscriber.error(error); - }); - return () => abortController.abort(MANUALLY_STOP); - }), + 'indexerSync.waitForCompleted': (_, ctx) => + this.indexerSync.waitForCompleted(ctx.signal), + 'indexerSync.waitForDocCompleted': (docId: string, ctx) => + this.indexerSync.waitForDocCompleted(docId, ctx.signal), }); } } diff --git a/packages/common/nbstore/src/worker/ops.ts b/packages/common/nbstore/src/worker/ops.ts index 1f5ebb4da8..8e8730a369 100644 --- a/packages/common/nbstore/src/worker/ops.ts +++ b/packages/common/nbstore/src/worker/ops.ts @@ -40,7 +40,7 @@ interface GroupedWorkerOps { getDocTimestamp: [string, DocClock | null]; deleteDoc: [string, void]; subscribeDocUpdate: [void, { update: DocRecord; origin?: string }]; - waitForConnected: [void, boolean]; + waitForConnected: [void, void]; }; blobStorage: { @@ -49,6 +49,7 @@ interface GroupedWorkerOps { deleteBlob: [{ key: string; permanently: boolean }, void]; releaseBlobs: [void, void]; listBlobs: [void, ListedBlobRecord[]]; + waitForConnected: [void, void]; }; awarenessStorage: { @@ -65,6 +66,7 @@ interface GroupedWorkerOps { ), ]; collect: [{ collectId: string; awareness: AwarenessRecord }, void]; + waitForConnected: [void, void]; }; indexerStorage: { @@ -94,6 +96,7 @@ interface GroupedWorkerOps { }, AggregateResult, ]; + waitForConnected: [void, void]; }; docSync: {