fix(nbstore): connect before do operation (#11569)

This commit is contained in:
EYHN
2025-04-10 16:05:46 +08:00
committed by GitHub
parent 3629a725d2
commit a759a1988e
9 changed files with 150 additions and 104 deletions

View File

@@ -16,6 +16,7 @@ export class BlobFrontend {
} }
async get(blobId: string) { async get(blobId: string) {
await this.waitForConnected();
await using lock = await this.lock.lock('blob', blobId); await using lock = await this.lock.lock('blob', blobId);
const local = await this.storage.get(blobId); const local = await this.storage.get(blobId);
if (local) { if (local) {
@@ -30,6 +31,7 @@ export class BlobFrontend {
} }
async set(blob: BlobRecord) { async set(blob: BlobRecord) {
await this.waitForConnected();
if (blob.data.byteLength > this.maxBlobSize) { if (blob.data.byteLength > this.maxBlobSize) {
for (const cb of this.onReachedMaxBlobSizeCallbacks) { for (const cb of this.onReachedMaxBlobSizeCallbacks) {
cb(blob.data.byteLength); cb(blob.data.byteLength);
@@ -57,6 +59,7 @@ export class BlobFrontend {
* @throws This method will throw an error if the blob is not found locally, if the upload is aborted, or if it fails due to storage limitations. * @throws This method will throw an error if the blob is not found locally, if the upload is aborted, or if it fails due to storage limitations.
*/ */
async upload(blobIdOrRecord: string | BlobRecord): Promise<boolean> { async upload(blobIdOrRecord: string | BlobRecord): Promise<boolean> {
await this.waitForConnected();
const blob = const blob =
typeof blobIdOrRecord === 'string' typeof blobIdOrRecord === 'string'
? await this.storage.get(blobIdOrRecord) ? await this.storage.get(blobIdOrRecord)
@@ -84,4 +87,8 @@ export class BlobFrontend {
this.onReachedMaxBlobSizeCallbacks.add(cb); this.onReachedMaxBlobSizeCallbacks.add(cb);
return () => this.onReachedMaxBlobSizeCallbacks.delete(cb); return () => this.onReachedMaxBlobSizeCallbacks.delete(cb);
} }
private waitForConnected(signal?: AbortSignal) {
return this.storage.connection.waitForConnected(signal);
}
} }

View File

@@ -1,5 +1,14 @@
import type { IndexerStorage } from '../storage'; import { switchMap } from 'rxjs';
import type {
AggregateOptions,
IndexerSchema,
IndexerStorage,
Query,
SearchOptions,
} from '../storage';
import type { IndexerSync } from '../sync/indexer'; import type { IndexerSync } from '../sync/indexer';
import { fromPromise } from '../utils/from-promise';
export class IndexerFrontend { export class IndexerFrontend {
constructor( constructor(
@@ -15,17 +24,50 @@ export class IndexerFrontend {
return this.sync.docState$(docId); return this.sync.docState$(docId);
} }
search = this.storage.search.bind(this.storage); async search<T extends keyof IndexerSchema, const O extends SearchOptions<T>>(
aggregate = this.storage.aggregate.bind(this.storage); table: T,
// eslint-disable-next-line rxjs/finnish query: Query<T>,
search$ = this.storage.search$.bind(this.storage); options?: O
// eslint-disable-next-line rxjs/finnish ) {
aggregate$ = this.storage.aggregate$.bind(this.storage); await this.waitForConnected();
return this.storage.search(table, query, options);
}
async aggregate<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(table: T, query: Query<T>, field: keyof IndexerSchema[T], options?: O) {
await this.waitForConnected();
return this.storage.aggregate(table, query, field, options);
}
search$<T extends keyof IndexerSchema, const O extends SearchOptions<T>>(
table: T,
query: Query<T>,
options?: O
) {
return fromPromise(signal => this.waitForConnected(signal)).pipe(
switchMap(() => this.storage.search$(table, query, options))
);
}
aggregate$<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(table: T, query: Query<T>, field: keyof IndexerSchema[T], options?: O) {
return fromPromise(signal => this.waitForConnected(signal)).pipe(
switchMap(() => this.storage.aggregate$(table, query, field, options))
);
}
addPriority(docId: string, priority: number) { addPriority(docId: string, priority: number) {
return this.sync.addPriority(docId, priority); return this.sync.addPriority(docId, priority);
} }
private waitForConnected(signal?: AbortSignal) {
return this.storage.connection.waitForConnected(signal);
}
waitForCompleted(signal?: AbortSignal) { waitForCompleted(signal?: AbortSignal) {
return this.sync.waitForCompleted(signal); return this.sync.waitForCompleted(signal);
} }

View File

@@ -1,3 +1,4 @@
import { share } from '../../connection';
import { import {
type DocClock, type DocClock,
type DocClocks, type DocClocks,
@@ -17,7 +18,7 @@ interface ChannelMessage {
export class IndexedDBDocStorage extends DocStorageBase<IDBConnectionOptions> { export class IndexedDBDocStorage extends DocStorageBase<IDBConnectionOptions> {
static readonly identifier = 'IndexedDBDocStorage'; static readonly identifier = 'IndexedDBDocStorage';
readonly connection = new IDBConnection(this.options); readonly connection = share(new IDBConnection(this.options));
get db() { get db() {
return this.connection.inner.db; return this.connection.inner.db;

View File

@@ -1,5 +1,6 @@
import { merge, Observable, of, Subject, throttleTime } from 'rxjs'; import { merge, Observable, of, Subject, throttleTime } from 'rxjs';
import { share } from '../../../connection';
import type { import type {
AggregateOptions, AggregateOptions,
AggregateResult, AggregateResult,
@@ -10,13 +11,14 @@ import type {
SearchResult, SearchResult,
} from '../../../storage'; } from '../../../storage';
import { IndexerStorageBase } from '../../../storage'; import { IndexerStorageBase } from '../../../storage';
import { fromPromise } from '../../../utils/from-promise';
import { IDBConnection, type IDBConnectionOptions } from '../db'; import { IDBConnection, type IDBConnectionOptions } from '../db';
import { DataStruct } from './data-struct'; import { DataStruct } from './data-struct';
import { backoffRetry, exhaustMapWithTrailing, fromPromise } from './utils'; import { backoffRetry, exhaustMapWithTrailing } from './utils';
export class IndexedDBIndexerStorage extends IndexerStorageBase { export class IndexedDBIndexerStorage extends IndexerStorageBase {
static readonly identifier = 'IndexedDBIndexerStorage'; static readonly identifier = 'IndexedDBIndexerStorage';
readonly connection = new IDBConnection(this.options); readonly connection = share(new IDBConnection(this.options));
override isReadonly = false; override isReadonly = false;
private readonly data = new DataStruct(); private readonly data = new DataStruct();
private readonly tableUpdate$ = new Subject<string>(); private readonly tableUpdate$ = new Subject<string>();

View File

@@ -3,7 +3,7 @@ import {
defer, defer,
exhaustMap, exhaustMap,
finalize, finalize,
Observable, type Observable,
type ObservableInput, type ObservableInput,
type OperatorFunction, type OperatorFunction,
retry, retry,
@@ -14,8 +14,6 @@ import {
timer, timer,
} from 'rxjs'; } from 'rxjs';
import { MANUALLY_STOP } from '../../../utils/throw-if-aborted';
/** /**
* Like exhaustMap, but also includes the trailing value emitted from the source observable while waiting for the preceding inner observable to complete * Like exhaustMap, but also includes the trailing value emitted from the source observable while waiting for the preceding inner observable to complete
* *
@@ -45,33 +43,6 @@ export function exhaustMapWithTrailing<T, R>(
}); });
} }
/**
* Convert a promise to an observable.
*
* like `from` but support `AbortSignal`.
*/
export function fromPromise<T>(
promise: Promise<T> | ((signal: AbortSignal) => Promise<T>)
): Observable<T> {
return new Observable(subscriber => {
const abortController = new AbortController();
const rawPromise =
promise instanceof Function ? promise(abortController.signal) : promise;
rawPromise
.then(value => {
subscriber.next(value);
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
});
}
/** /**
* An operator that retries the source observable when an error occurs. * An operator that retries the source observable when an error occurs.
* *

View File

@@ -0,0 +1,30 @@
import { Observable } from 'rxjs';
import { MANUALLY_STOP } from './throw-if-aborted';
/**
* Convert a promise to an observable.
*
* like `from` but support `AbortSignal`.
*/
export function fromPromise<T>(
promise: Promise<T> | ((signal: AbortSignal) => Promise<T>)
): Observable<T> {
return new Observable(subscriber => {
const abortController = new AbortController();
const rawPromise =
promise instanceof Function ? promise(abortController.signal) : promise;
rawPromise
.then(value => {
subscriber.next(value);
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
});
}

View File

@@ -172,28 +172,14 @@ class WorkerDocConnection extends DummyConnection {
super(); super();
} }
override waitForConnected(signal?: AbortSignal): Promise<void> { promise: Promise<void> | undefined;
return new Promise((resolve, reject) => {
const abortListener = () => {
reject(signal?.reason);
subscription.unsubscribe();
};
signal?.addEventListener('abort', abortListener); override waitForConnected(): Promise<void> {
if (this.promise) {
const subscription = this.client return this.promise;
.ob$('docStorage.waitForConnected') }
.subscribe({ this.promise = this.client.call('docStorage.waitForConnected');
next() { return this.promise;
signal?.removeEventListener('abort', abortListener);
resolve();
},
error(err) {
signal?.removeEventListener('abort', abortListener);
reject(err);
},
});
});
} }
} }
@@ -226,7 +212,23 @@ class WorkerBlobStorage implements BlobStorage {
return this.client.call('blobStorage.listBlobs'); return this.client.call('blobStorage.listBlobs');
} }
connection = new DummyConnection(); connection = new WorkerBlobConnection(this.client);
}
class WorkerBlobConnection extends DummyConnection {
constructor(private readonly client: OpClient<WorkerOps>) {
super();
}
promise: Promise<void> | undefined;
override waitForConnected(): Promise<void> {
if (this.promise) {
return this.promise;
}
this.promise = this.client.call('blobStorage.waitForConnected');
return this.promise;
}
} }
class WorkerDocSync implements DocSync { class WorkerDocSync implements DocSync {
@@ -346,7 +348,7 @@ class WorkerIndexerStorage implements IndexerStorage {
constructor(private readonly client: OpClient<WorkerOps>) {} constructor(private readonly client: OpClient<WorkerOps>) {}
readonly storageType = 'indexer'; readonly storageType = 'indexer';
readonly isReadonly = true; readonly isReadonly = true;
connection = new DummyConnection(); connection = new WorkerIndexerConnection(this.client);
search<T extends keyof IndexerSchema, const O extends SearchOptions<T>>( search<T extends keyof IndexerSchema, const O extends SearchOptions<T>>(
table: T, table: T,
@@ -421,6 +423,22 @@ class WorkerIndexerStorage implements IndexerStorage {
} }
} }
class WorkerIndexerConnection extends DummyConnection {
constructor(private readonly client: OpClient<WorkerOps>) {
super();
}
promise: Promise<void> | undefined;
override waitForConnected(): Promise<void> {
if (this.promise) {
return this.promise;
}
this.promise = this.client.call('indexerStorage.waitForConnected');
return this.promise;
}
}
class WorkerIndexerSync implements IndexerSync { class WorkerIndexerSync implements IndexerSync {
constructor(private readonly client: OpClient<WorkerOps>) {} constructor(private readonly client: OpClient<WorkerOps>) {}
waitForCompleted(signal?: AbortSignal): Promise<void> { waitForCompleted(signal?: AbortSignal): Promise<void> {

View File

@@ -158,26 +158,16 @@ class StoreConsumer {
subscriber.next({ update, origin }); subscriber.next({ update, origin });
}); });
}), }),
'docStorage.waitForConnected': () => 'docStorage.waitForConnected': (_, ctx) =>
new Observable(subscriber => { this.docStorage.connection.waitForConnected(ctx.signal),
const abortController = new AbortController();
this.docStorage.connection
.waitForConnected(abortController.signal)
.then(() => {
subscriber.next(true);
subscriber.complete();
})
.catch((error: any) => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
}),
'blobStorage.getBlob': key => this.blobStorage.get(key), 'blobStorage.getBlob': key => this.blobStorage.get(key),
'blobStorage.setBlob': blob => this.blobStorage.set(blob), 'blobStorage.setBlob': blob => this.blobStorage.set(blob),
'blobStorage.deleteBlob': ({ key, permanently }) => 'blobStorage.deleteBlob': ({ key, permanently }) =>
this.blobStorage.delete(key, permanently), this.blobStorage.delete(key, permanently),
'blobStorage.releaseBlobs': () => this.blobStorage.release(), 'blobStorage.releaseBlobs': () => this.blobStorage.release(),
'blobStorage.listBlobs': () => this.blobStorage.list(), 'blobStorage.listBlobs': () => this.blobStorage.list(),
'blobStorage.waitForConnected': (_, ctx) =>
this.blobStorage.connection.waitForConnected(ctx.signal),
'awarenessStorage.update': ({ awareness, origin }) => 'awarenessStorage.update': ({ awareness, origin }) =>
this.awarenessStorage.update(awareness, origin), this.awarenessStorage.update(awareness, origin),
'awarenessStorage.subscribeUpdate': docId => 'awarenessStorage.subscribeUpdate': docId =>
@@ -205,6 +195,8 @@ class StoreConsumer {
}), }),
'awarenessStorage.collect': ({ collectId, awareness }) => 'awarenessStorage.collect': ({ collectId, awareness }) =>
collectJobs.get(collectId)?.(awareness), collectJobs.get(collectId)?.(awareness),
'awarenessStorage.waitForConnected': (_, ctx) =>
this.awarenessStorage.connection.waitForConnected(ctx.signal),
'docSync.state': () => this.docSync.state$, 'docSync.state': () => this.docSync.state$,
'docSync.docState': docId => 'docSync.docState': docId =>
new Observable(subscriber => { new Observable(subscriber => {
@@ -278,6 +270,8 @@ class StoreConsumer {
this.indexerStorage.search$(table, query, options), this.indexerStorage.search$(table, query, options),
'indexerStorage.subscribeAggregate': ({ table, query, field, options }) => 'indexerStorage.subscribeAggregate': ({ table, query, field, options }) =>
this.indexerStorage.aggregate$(table, query, field, options), this.indexerStorage.aggregate$(table, query, field, options),
'indexerStorage.waitForConnected': (_, ctx) =>
this.indexerStorage.connection.waitForConnected(ctx.signal),
'indexerSync.state': () => this.indexerSync.state$, 'indexerSync.state': () => this.indexerSync.state$,
'indexerSync.docState': (docId: string) => 'indexerSync.docState': (docId: string) =>
this.indexerSync.docState$(docId), this.indexerSync.docState$(docId),
@@ -286,32 +280,10 @@ class StoreConsumer {
const undo = this.indexerSync.addPriority(docId, priority); const undo = this.indexerSync.addPriority(docId, priority);
return () => undo(); return () => undo();
}), }),
'indexerSync.waitForCompleted': () => 'indexerSync.waitForCompleted': (_, ctx) =>
new Observable(subscriber => { this.indexerSync.waitForCompleted(ctx.signal),
this.indexerSync 'indexerSync.waitForDocCompleted': (docId: string, ctx) =>
.waitForCompleted() this.indexerSync.waitForDocCompleted(docId, ctx.signal),
.then(() => {
subscriber.next();
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
}),
'indexerSync.waitForDocCompleted': (docId: string) =>
new Observable(subscriber => {
const abortController = new AbortController();
this.indexerSync
.waitForDocCompleted(docId, abortController.signal)
.then(() => {
subscriber.next();
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
}),
}); });
} }
} }

View File

@@ -40,7 +40,7 @@ interface GroupedWorkerOps {
getDocTimestamp: [string, DocClock | null]; getDocTimestamp: [string, DocClock | null];
deleteDoc: [string, void]; deleteDoc: [string, void];
subscribeDocUpdate: [void, { update: DocRecord; origin?: string }]; subscribeDocUpdate: [void, { update: DocRecord; origin?: string }];
waitForConnected: [void, boolean]; waitForConnected: [void, void];
}; };
blobStorage: { blobStorage: {
@@ -49,6 +49,7 @@ interface GroupedWorkerOps {
deleteBlob: [{ key: string; permanently: boolean }, void]; deleteBlob: [{ key: string; permanently: boolean }, void];
releaseBlobs: [void, void]; releaseBlobs: [void, void];
listBlobs: [void, ListedBlobRecord[]]; listBlobs: [void, ListedBlobRecord[]];
waitForConnected: [void, void];
}; };
awarenessStorage: { awarenessStorage: {
@@ -65,6 +66,7 @@ interface GroupedWorkerOps {
), ),
]; ];
collect: [{ collectId: string; awareness: AwarenessRecord }, void]; collect: [{ collectId: string; awareness: AwarenessRecord }, void];
waitForConnected: [void, void];
}; };
indexerStorage: { indexerStorage: {
@@ -94,6 +96,7 @@ interface GroupedWorkerOps {
}, },
AggregateResult<any, any>, AggregateResult<any, any>,
]; ];
waitForConnected: [void, void];
}; };
docSync: { docSync: {