diff --git a/packages/common/nbstore/src/frontend/blob.ts b/packages/common/nbstore/src/frontend/blob.ts new file mode 100644 index 0000000000..6d4c1e319d --- /dev/null +++ b/packages/common/nbstore/src/frontend/blob.ts @@ -0,0 +1,23 @@ +import type { BlobRecord, BlobStorage } from '../storage'; +import type { BlobSyncEngine } from '../sync/blob'; + +export class BlobFrontend { + constructor( + readonly storage: BlobStorage, + readonly sync?: BlobSyncEngine + ) {} + + get(blobId: string) { + return this.sync + ? this.sync.downloadBlob(blobId) + : this.storage.get(blobId); + } + + set(blob: BlobRecord) { + return this.sync ? this.sync.uploadBlob(blob) : this.storage.set(blob); + } + + addPriority(id: string, priority: number) { + return this.sync?.addPriority(id, priority); + } +} diff --git a/packages/common/nbstore/src/storage/blob.ts b/packages/common/nbstore/src/storage/blob.ts index 6926468aa9..8625e4b5c2 100644 --- a/packages/common/nbstore/src/storage/blob.ts +++ b/packages/common/nbstore/src/storage/blob.ts @@ -6,14 +6,14 @@ export interface BlobRecord { key: string; data: Uint8Array; mime: string; - createdAt: Date; + createdAt?: Date; } export interface ListedBlobRecord { key: string; mime: string; size: number; - createdAt: Date; + createdAt?: Date; } export abstract class BlobStorage< diff --git a/packages/common/nbstore/src/sync/blob/index.ts b/packages/common/nbstore/src/sync/blob/index.ts index b54e1868a9..a04a9920df 100644 --- a/packages/common/nbstore/src/sync/blob/index.ts +++ b/packages/common/nbstore/src/sync/blob/index.ts @@ -1,6 +1,6 @@ import { difference } from 'lodash-es'; -import type { BlobStorage } from '../../storage'; +import type { BlobRecord, BlobStorage } from '../../storage'; import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; export class BlobSyncEngine { @@ -11,6 +11,29 @@ export class BlobSyncEngine { readonly remotes: BlobStorage[] ) {} + async downloadBlob(blobId: string, signal?: AbortSignal) { + const localBlob = await this.local.get(blobId, signal); + if (localBlob) { + return localBlob; + } + + for (const storage of this.remotes) { + const data = await storage.get(blobId, signal); + if (data) { + await this.local.set(data, signal); + return data; + } + } + return null; + } + + async uploadBlob(blob: BlobRecord, signal?: AbortSignal) { + await this.local.set(blob); + await Promise.allSettled( + this.remotes.map(remote => remote.set(blob, signal)) + ); + } + private async sync(signal?: AbortSignal) { throwIfAborted(signal); @@ -94,4 +117,9 @@ export class BlobSyncEngine { this.abort?.abort(); this.abort = null; } + + addPriority(_id: string, _priority: number): () => void { + // TODO: implement + return () => {}; + } }