feat(nbstore): add upload function to blob frontend (#11247)

This commit is contained in:
EYHN
2025-03-27 12:11:30 +00:00
parent f2cdf67c2a
commit 7091111f85
5 changed files with 116 additions and 38 deletions

View File

@@ -23,7 +23,9 @@ export class BlobFrontend {
}
await lock[Symbol.asyncDispose]();
await this.sync.downloadBlob(blobId);
await this.sync.downloadBlob(blobId).catch(() => {
// ignore the error as it has already been recorded in the sync status
});
return await this.storage.get(blobId);
}
@@ -40,14 +42,31 @@ export class BlobFrontend {
// We don't wait for the upload to complete,
// as the upload process runs asynchronously in the background
this.sync.uploadBlob(blob).catch(err => {
// never reach here
console.error(err);
this.sync.uploadBlob(blob, true /* force upload */).catch(() => {
// ignore the error as it has already been recorded in the sync status
});
return;
}
/**
* Uploads a blob to the peer. Do nothing if the blob has already been uploaded.
*
* @returns Always resolves to true when successful
*
* @throws This method will throw an error if the blob is not found locally, if the upload is aborted, or if it fails due to storage limitations.
*/
async upload(blobIdOrRecord: string | BlobRecord): Promise<boolean> {
const blob =
typeof blobIdOrRecord === 'string'
? await this.storage.get(blobIdOrRecord)
: blobIdOrRecord;
if (!blob) {
throw new Error(`Blob ${blobIdOrRecord} not found`);
}
return this.sync.uploadBlob(blob, false);
}
fullDownload(peerId?: string, signal?: AbortSignal) {
return this.sync.fullDownload(peerId, signal);
}

View File

@@ -8,7 +8,6 @@ import {
} from 'rxjs';
import type { BlobRecord, BlobStorage, BlobSyncStorage } from '../../storage';
import { MANUALLY_STOP } from '../../utils/throw-if-aborted';
import type { PeerStorageOptions } from '../types';
import { BlobSyncPeer } from './peer';
@@ -29,13 +28,30 @@ export interface BlobSyncBlobState {
export interface BlobSync {
readonly state$: Observable<BlobSyncState>;
blobState$(blobId: string): Observable<BlobSyncBlobState>;
downloadBlob(blobId: string): Promise<void>;
uploadBlob(blob: BlobRecord): Promise<void>;
/**
* Downloads a blob from all peers
* @param blobId - The blob ID to download
* @returns A promise that resolves to true when the download is complete from any peer, false if no peer has the blob
*
* @throws This method will throw an error if the download is aborted or fails due to network issues.
*/
downloadBlob(blobId: string): Promise<boolean>;
/**
* Upload a blob to all peers
* @param blob - The blob to upload
* @param force - Whether to force upload the blob, even if it has already been uploaded
* @returns A promise that resolves when the upload is complete, should always resolve to true
*
* @throws This method will throw an error if the upload is aborted or fails due to storage limitations.
*/
uploadBlob(blob: BlobRecord, force?: boolean): Promise<true>;
/**
* Download all blobs from a peer
* @param peerId - The peer id to download from, if not provided, all peers will be downloaded
* @param signal - The abort signal
* @returns A promise that resolves when the download is complete
*
* @throws This method will never throw an error, but the promise will reject if the signal is aborted.
*/
fullDownload(peerId?: string, signal?: AbortSignal): Promise<void>;
}
@@ -102,15 +118,15 @@ export class BlobSyncImpl implements BlobSync {
readonly blobSync: BlobSyncStorage
) {}
downloadBlob(blobId: string): Promise<void> {
downloadBlob(blobId: string): Promise<boolean> {
const signal = this.abortController.signal;
return new Promise<void>((resolve, reject) => {
return new Promise<boolean>((resolve, reject) => {
let completed = 0;
const totalPeers = this.peers.length;
if (totalPeers === 0) {
resolve();
resolve(false);
return;
}
@@ -122,35 +138,29 @@ export class BlobSyncImpl implements BlobSync {
.then(result => {
if (result === true) {
// resolve if the peer has success
resolve();
resolve(true);
}
})
.catch(err => {
// should never throw
// unless the signal is aborted
reject(err);
})
.finally(() => {
completed++;
if (completed === totalPeers) {
// resolve if all peers finish
resolve();
resolve(false);
}
});
});
});
}
uploadBlob(blob: BlobRecord) {
uploadBlob(blob: BlobRecord, force = false): Promise<true> {
return Promise.all(
this.peers.map(p => p.uploadBlob(blob, this.abortController.signal))
).catch(err => {
if (err === MANUALLY_STOP) {
return;
}
// should never reach here, `uploadBlob()` should never throw
console.error(err);
}) as Promise<void>;
this.peers.map(p =>
p.uploadBlob(blob, force, this.abortController.signal)
)
).then(() => true as const);
}
// start the upload loop

View File

@@ -41,10 +41,12 @@ export class BlobSyncPeer {
private readonly downloadingPromise = new Map<string, Promise<boolean>>();
/**
* Downloads a blob from the peer with retry logic
* @returns true if the blob is downloaded successfully, false if the blob is not found or encounters an error
* Downloads a blob from the peer with exponential backoff retry logic
* @param blobId - The ID of the blob to download
* @param signal - Optional AbortSignal to cancel the download
* @returns true if the blob is downloaded successfully, false if the blob is not found after retries
*
* @throws This method will never throw (errors are saved to the sync status) unless the signal is aborted
* @throws This method will throw an error if the download operation fails due to network issues or is aborted
*/
downloadBlob(blobId: string, signal?: AbortSignal): Promise<boolean> {
// if the blob is already downloading, return the existing promise
@@ -111,7 +113,7 @@ export class BlobSyncPeer {
blobId,
error instanceof Error ? error.message : String(error)
);
return false;
throw error;
})
.finally(() => {
this.status.blobDownloadFinish(blobId);
@@ -122,11 +124,35 @@ export class BlobSyncPeer {
return promise;
}
uploadingPromise = new Map<string, Promise<void>>();
uploadingPromise = new Map<string, Promise<true>>();
uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise<void> {
/**
* Upload a blob to the peer
* @param blob - The blob to upload
* @param force - Whether to force upload the blob, even if it has already been uploaded
* @param signal - The abort signal
* @returns The promise should always resolve to true when the upload is complete.
*
* @throws This method will throw an error if the upload is aborted or fails due to storage limitations.
*/
async uploadBlob(
blob: BlobRecord,
force = false,
signal?: AbortSignal
): Promise<true> {
if (this.remote.isReadonly) {
return Promise.resolve();
return true;
}
if (!force) {
// if the blob has been uploaded, skip the upload
const uploadedAt = await this.blobSync.getBlobUploadedAt(
this.peerId,
blob.key
);
if (uploadedAt) {
return true;
}
}
const existing = this.uploadingPromise.get(blob.key);
@@ -149,6 +175,7 @@ export class BlobSyncPeer {
// free the remote storage over capacity flag
this.status.remoteOverCapacityFree();
return true;
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
@@ -166,12 +193,13 @@ export class BlobSyncPeer {
err instanceof Error ? err.message : String(err)
);
}
throw err;
} finally {
this.status.blobUploadFinish(blob.key);
}
})().finally(() => {
this.uploadingPromise.delete(blob.key);
});
}) as Promise<true>;
this.uploadingPromise.set(blob.key, promise);
return promise;
@@ -246,7 +274,14 @@ export class BlobSyncPeer {
const data = await this.local.get(blobKey);
throwIfAborted(signal);
if (data) {
await this.uploadBlob(data, signal);
try {
await this.uploadBlob(data, false, signal);
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
// ignore the error as it has already been recorded in the sync status
}
}
}
} else {
@@ -275,7 +310,14 @@ export class BlobSyncPeer {
const data = await this.local.get(blobKey);
throwIfAborted(signal);
if (data) {
await this.uploadBlob(data, signal);
try {
await this.uploadBlob(data, false, signal);
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
// ignore the error as it has already been recorded in the sync status
}
}
}
}
@@ -305,7 +347,14 @@ export class BlobSyncPeer {
for (const blobKey of needDownload) {
throwIfAborted(signal);
// download the blobs
await this.downloadBlob(blobKey, signal);
try {
await this.downloadBlob(blobKey, signal);
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
// ignore the error as it has already been recorded in the sync status
}
}
} finally {
// remove all will download flags

View File

@@ -238,10 +238,10 @@ class WorkerBlobSync implements BlobSync {
return this.client.ob$('blobSync.blobState', blobId);
}
downloadBlob(blobId: string): Promise<void> {
downloadBlob(blobId: string): Promise<boolean> {
return this.client.call('blobSync.downloadBlob', blobId);
}
uploadBlob(blob: BlobRecord): Promise<void> {
uploadBlob(blob: BlobRecord): Promise<true> {
return this.client.call('blobSync.uploadBlob', blob);
}
fullDownload(peerId?: string, signal?: AbortSignal): Promise<void> {

View File

@@ -71,8 +71,8 @@ interface GroupedWorkerOps {
blobSync: {
state: [void, BlobSyncState];
blobState: [string, BlobSyncBlobState];
downloadBlob: [string, void];
uploadBlob: [BlobRecord, void];
downloadBlob: [string, boolean];
uploadBlob: [BlobRecord, true];
fullDownload: [string | null, void];
};