From 7091111f85a6f498c586f6a4eda73eb06600182f Mon Sep 17 00:00:00 2001 From: EYHN Date: Thu, 27 Mar 2025 12:11:30 +0000 Subject: [PATCH] feat(nbstore): add upload function to blob frontend (#11247) --- packages/common/nbstore/src/frontend/blob.ts | 27 +++++-- .../common/nbstore/src/sync/blob/index.ts | 48 ++++++++----- packages/common/nbstore/src/sync/blob/peer.ts | 71 ++++++++++++++++--- packages/common/nbstore/src/worker/client.ts | 4 +- packages/common/nbstore/src/worker/ops.ts | 4 +- 5 files changed, 116 insertions(+), 38 deletions(-) diff --git a/packages/common/nbstore/src/frontend/blob.ts b/packages/common/nbstore/src/frontend/blob.ts index 6ee6bd6f4d..f2ce64cbe8 100644 --- a/packages/common/nbstore/src/frontend/blob.ts +++ b/packages/common/nbstore/src/frontend/blob.ts @@ -23,7 +23,9 @@ export class BlobFrontend { } await lock[Symbol.asyncDispose](); - await this.sync.downloadBlob(blobId); + await this.sync.downloadBlob(blobId).catch(() => { + // ignore the error as it has already been recorded in the sync status + }); return await this.storage.get(blobId); } @@ -40,14 +42,31 @@ export class BlobFrontend { // We don't wait for the upload to complete, // as the upload process runs asynchronously in the background - this.sync.uploadBlob(blob).catch(err => { - // never reach here - console.error(err); + this.sync.uploadBlob(blob, true /* force upload */).catch(() => { + // ignore the error as it has already been recorded in the sync status }); return; } + /** + * Uploads a blob to the peer. Do nothing if the blob has already been uploaded. + * + * @returns Always resolves to true when successful + * + * @throws This method will throw an error if the blob is not found locally, if the upload is aborted, or if it fails due to storage limitations. + */ + async upload(blobIdOrRecord: string | BlobRecord): Promise { + const blob = + typeof blobIdOrRecord === 'string' + ? await this.storage.get(blobIdOrRecord) + : blobIdOrRecord; + if (!blob) { + throw new Error(`Blob ${blobIdOrRecord} not found`); + } + return this.sync.uploadBlob(blob, false); + } + fullDownload(peerId?: string, signal?: AbortSignal) { return this.sync.fullDownload(peerId, signal); } diff --git a/packages/common/nbstore/src/sync/blob/index.ts b/packages/common/nbstore/src/sync/blob/index.ts index 1464eaa693..b96edbba44 100644 --- a/packages/common/nbstore/src/sync/blob/index.ts +++ b/packages/common/nbstore/src/sync/blob/index.ts @@ -8,7 +8,6 @@ import { } from 'rxjs'; import type { BlobRecord, BlobStorage, BlobSyncStorage } from '../../storage'; -import { MANUALLY_STOP } from '../../utils/throw-if-aborted'; import type { PeerStorageOptions } from '../types'; import { BlobSyncPeer } from './peer'; @@ -29,13 +28,30 @@ export interface BlobSyncBlobState { export interface BlobSync { readonly state$: Observable; blobState$(blobId: string): Observable; - downloadBlob(blobId: string): Promise; - uploadBlob(blob: BlobRecord): Promise; + /** + * Downloads a blob from all peers + * @param blobId - The blob ID to download + * @returns A promise that resolves to true when the download is complete from any peer, false if no peer has the blob + * + * @throws This method will throw an error if the download is aborted or fails due to network issues. + */ + downloadBlob(blobId: string): Promise; + /** + * Upload a blob to all peers + * @param blob - The blob to upload + * @param force - Whether to force upload the blob, even if it has already been uploaded + * @returns A promise that resolves when the upload is complete, should always resolve to true + * + * @throws This method will throw an error if the upload is aborted or fails due to storage limitations. + */ + uploadBlob(blob: BlobRecord, force?: boolean): Promise; /** * Download all blobs from a peer * @param peerId - The peer id to download from, if not provided, all peers will be downloaded * @param signal - The abort signal * @returns A promise that resolves when the download is complete + * + * @throws This method will never throw an error, but the promise will reject if the signal is aborted. */ fullDownload(peerId?: string, signal?: AbortSignal): Promise; } @@ -102,15 +118,15 @@ export class BlobSyncImpl implements BlobSync { readonly blobSync: BlobSyncStorage ) {} - downloadBlob(blobId: string): Promise { + downloadBlob(blobId: string): Promise { const signal = this.abortController.signal; - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { let completed = 0; const totalPeers = this.peers.length; if (totalPeers === 0) { - resolve(); + resolve(false); return; } @@ -122,35 +138,29 @@ export class BlobSyncImpl implements BlobSync { .then(result => { if (result === true) { // resolve if the peer has success - resolve(); + resolve(true); } }) .catch(err => { - // should never throw - // unless the signal is aborted reject(err); }) .finally(() => { completed++; if (completed === totalPeers) { // resolve if all peers finish - resolve(); + resolve(false); } }); }); }); } - uploadBlob(blob: BlobRecord) { + uploadBlob(blob: BlobRecord, force = false): Promise { return Promise.all( - this.peers.map(p => p.uploadBlob(blob, this.abortController.signal)) - ).catch(err => { - if (err === MANUALLY_STOP) { - return; - } - // should never reach here, `uploadBlob()` should never throw - console.error(err); - }) as Promise; + this.peers.map(p => + p.uploadBlob(blob, force, this.abortController.signal) + ) + ).then(() => true as const); } // start the upload loop diff --git a/packages/common/nbstore/src/sync/blob/peer.ts b/packages/common/nbstore/src/sync/blob/peer.ts index ad6f62dd30..b1b709afc1 100644 --- a/packages/common/nbstore/src/sync/blob/peer.ts +++ b/packages/common/nbstore/src/sync/blob/peer.ts @@ -41,10 +41,12 @@ export class BlobSyncPeer { private readonly downloadingPromise = new Map>(); /** - * Downloads a blob from the peer with retry logic - * @returns true if the blob is downloaded successfully, false if the blob is not found or encounters an error + * Downloads a blob from the peer with exponential backoff retry logic + * @param blobId - The ID of the blob to download + * @param signal - Optional AbortSignal to cancel the download + * @returns true if the blob is downloaded successfully, false if the blob is not found after retries * - * @throws This method will never throw (errors are saved to the sync status) unless the signal is aborted + * @throws This method will throw an error if the download operation fails due to network issues or is aborted */ downloadBlob(blobId: string, signal?: AbortSignal): Promise { // if the blob is already downloading, return the existing promise @@ -111,7 +113,7 @@ export class BlobSyncPeer { blobId, error instanceof Error ? error.message : String(error) ); - return false; + throw error; }) .finally(() => { this.status.blobDownloadFinish(blobId); @@ -122,11 +124,35 @@ export class BlobSyncPeer { return promise; } - uploadingPromise = new Map>(); + uploadingPromise = new Map>(); - uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise { + /** + * Upload a blob to the peer + * @param blob - The blob to upload + * @param force - Whether to force upload the blob, even if it has already been uploaded + * @param signal - The abort signal + * @returns The promise should always resolve to true when the upload is complete. + * + * @throws This method will throw an error if the upload is aborted or fails due to storage limitations. + */ + async uploadBlob( + blob: BlobRecord, + force = false, + signal?: AbortSignal + ): Promise { if (this.remote.isReadonly) { - return Promise.resolve(); + return true; + } + + if (!force) { + // if the blob has been uploaded, skip the upload + const uploadedAt = await this.blobSync.getBlobUploadedAt( + this.peerId, + blob.key + ); + if (uploadedAt) { + return true; + } } const existing = this.uploadingPromise.get(blob.key); @@ -149,6 +175,7 @@ export class BlobSyncPeer { // free the remote storage over capacity flag this.status.remoteOverCapacityFree(); + return true; } catch (err) { if (err === MANUALLY_STOP) { throw err; @@ -166,12 +193,13 @@ export class BlobSyncPeer { err instanceof Error ? err.message : String(err) ); } + throw err; } finally { this.status.blobUploadFinish(blob.key); } })().finally(() => { this.uploadingPromise.delete(blob.key); - }); + }) as Promise; this.uploadingPromise.set(blob.key, promise); return promise; @@ -246,7 +274,14 @@ export class BlobSyncPeer { const data = await this.local.get(blobKey); throwIfAborted(signal); if (data) { - await this.uploadBlob(data, signal); + try { + await this.uploadBlob(data, false, signal); + } catch (err) { + if (err === MANUALLY_STOP) { + throw err; + } + // ignore the error as it has already been recorded in the sync status + } } } } else { @@ -275,7 +310,14 @@ export class BlobSyncPeer { const data = await this.local.get(blobKey); throwIfAborted(signal); if (data) { - await this.uploadBlob(data, signal); + try { + await this.uploadBlob(data, false, signal); + } catch (err) { + if (err === MANUALLY_STOP) { + throw err; + } + // ignore the error as it has already been recorded in the sync status + } } } } @@ -305,7 +347,14 @@ export class BlobSyncPeer { for (const blobKey of needDownload) { throwIfAborted(signal); // download the blobs - await this.downloadBlob(blobKey, signal); + try { + await this.downloadBlob(blobKey, signal); + } catch (err) { + if (err === MANUALLY_STOP) { + throw err; + } + // ignore the error as it has already been recorded in the sync status + } } } finally { // remove all will download flags diff --git a/packages/common/nbstore/src/worker/client.ts b/packages/common/nbstore/src/worker/client.ts index 0891bf1440..6852c8c1dd 100644 --- a/packages/common/nbstore/src/worker/client.ts +++ b/packages/common/nbstore/src/worker/client.ts @@ -238,10 +238,10 @@ class WorkerBlobSync implements BlobSync { return this.client.ob$('blobSync.blobState', blobId); } - downloadBlob(blobId: string): Promise { + downloadBlob(blobId: string): Promise { return this.client.call('blobSync.downloadBlob', blobId); } - uploadBlob(blob: BlobRecord): Promise { + uploadBlob(blob: BlobRecord): Promise { return this.client.call('blobSync.uploadBlob', blob); } fullDownload(peerId?: string, signal?: AbortSignal): Promise { diff --git a/packages/common/nbstore/src/worker/ops.ts b/packages/common/nbstore/src/worker/ops.ts index 2765f12bb0..019e58ec49 100644 --- a/packages/common/nbstore/src/worker/ops.ts +++ b/packages/common/nbstore/src/worker/ops.ts @@ -71,8 +71,8 @@ interface GroupedWorkerOps { blobSync: { state: [void, BlobSyncState]; blobState: [string, BlobSyncBlobState]; - downloadBlob: [string, void]; - uploadBlob: [BlobRecord, void]; + downloadBlob: [string, boolean]; + uploadBlob: [BlobRecord, true]; fullDownload: [string | null, void]; };