From 05200ad7b792f75ee2f570083459e5d2e2629f68 Mon Sep 17 00:00:00 2001 From: EYHN Date: Fri, 14 Mar 2025 18:05:54 +0800 Subject: [PATCH] feat(nbstore): add blob sync storage (#10752) --- .../framework/store/src/transformer/assets.ts | 5 - packages/common/error/src/index.ts | 5 +- packages/common/nbstore/package.json | 2 + .../common/nbstore/src/__tests__/sync.spec.ts | 8 + packages/common/nbstore/src/frontend/blob.ts | 61 ++- .../common/nbstore/src/impls/cloud/blob.ts | 71 ++- .../common/nbstore/src/impls/cloud/http.ts | 32 +- .../common/nbstore/src/impls/idb/blob-sync.ts | 36 ++ packages/common/nbstore/src/impls/idb/blob.ts | 1 + .../common/nbstore/src/impls/idb/index.ts | 3 + .../common/nbstore/src/impls/idb/schema.ts | 26 +- .../common/nbstore/src/impls/idb/v1/blob.ts | 1 + .../nbstore/src/impls/sqlite/blob-sync.ts | 32 ++ .../common/nbstore/src/impls/sqlite/blob.ts | 1 + .../common/nbstore/src/impls/sqlite/db.ts | 82 ++-- .../common/nbstore/src/impls/sqlite/index.ts | 3 + .../nbstore/src/impls/sqlite/v1/blob.ts | 1 + .../common/nbstore/src/storage/blob-sync.ts | 30 ++ packages/common/nbstore/src/storage/blob.ts | 3 +- .../nbstore/src/storage/dummy/blob-sync.ts | 14 + .../common/nbstore/src/storage/dummy/blob.ts | 1 + .../nbstore/src/storage/errors/index.ts | 1 + .../src/storage/errors/over-capacity.ts | 2 +- .../nbstore/src/storage/errors/over-size.ts | 5 + packages/common/nbstore/src/storage/index.ts | 13 +- .../common/nbstore/src/storage/storage.ts | 2 +- .../common/nbstore/src/sync/blob/index.ts | 357 +++++++------- packages/common/nbstore/src/sync/blob/peer.ts | 463 ++++++++++++++++++ packages/common/nbstore/src/sync/doc/peer.ts | 4 +- packages/common/nbstore/src/sync/index.ts | 22 +- packages/common/nbstore/src/worker/client.ts | 60 +-- .../common/nbstore/src/worker/consumer.ts | 44 +- packages/common/nbstore/src/worker/ops.ts | 28 +- packages/common/nbstore/tsconfig.json | 6 +- .../electron/src/helper/nbstore/handlers.ts | 2 + .../App/Plugins/NBStore/NBStorePlugin.swift | 45 ++ .../App/App/uniffi/affine_mobile_native.swift | 44 ++ .../App/App/uniffi/affine_mobile_nativeFFI.h | 22 + .../ios/src/plugins/nbstore/definitions.ts | 11 + .../apps/ios/src/plugins/nbstore/index.ts | 25 + .../core/src/bootstrap/polyfill/browser.ts | 1 + .../core/src/bootstrap/polyfill/set-union.ts | 1 + .../src/components/over-capacity/index.tsx | 4 +- .../workspace-setting/storage/export.tsx | 41 +- .../src/modules/quicksearch/services/cmdk.ts | 1 + .../modules/workspace-engine/impls/cloud.ts | 14 + .../modules/workspace-engine/impls/local.ts | 14 + .../src/modules/workspace/entities/engine.ts | 5 + packages/frontend/mobile-native/src/lib.rs | 44 ++ packages/frontend/native/index.d.ts | 2 + .../frontend/native/nbstore/src/blob_sync.rs | 92 ++++ .../nbstore/src/{sync.rs => doc_sync.rs} | 0 packages/frontend/native/nbstore/src/lib.rs | 35 +- packages/frontend/native/schema/src/lib.rs | 14 + tools/utils/src/workspace.gen.ts | 1 + yarn.lock | 2 + 56 files changed, 1441 insertions(+), 404 deletions(-) create mode 100644 packages/common/nbstore/src/impls/idb/blob-sync.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/blob-sync.ts create mode 100644 packages/common/nbstore/src/storage/blob-sync.ts create mode 100644 packages/common/nbstore/src/storage/dummy/blob-sync.ts create mode 100644 packages/common/nbstore/src/storage/errors/over-size.ts create mode 100644 packages/common/nbstore/src/sync/blob/peer.ts create mode 100644 packages/frontend/core/src/bootstrap/polyfill/set-union.ts create mode 100644 packages/frontend/native/nbstore/src/blob_sync.rs rename packages/frontend/native/nbstore/src/{sync.rs => doc_sync.rs} (100%) diff --git a/blocksuite/framework/store/src/transformer/assets.ts b/blocksuite/framework/store/src/transformer/assets.ts index 62be8e3ce8..fe942dca2e 100644 --- a/blocksuite/framework/store/src/transformer/assets.ts +++ b/blocksuite/framework/store/src/transformer/assets.ts @@ -89,11 +89,6 @@ export class AssetsManager { ); } - const exists = (await this._blob.get(blobId)) !== null; - if (exists) { - return; - } - await this._blob.set(blobId, blob); } } diff --git a/packages/common/error/src/index.ts b/packages/common/error/src/index.ts index 70a22e32af..7a2366a16c 100644 --- a/packages/common/error/src/index.ts +++ b/packages/common/error/src/index.ts @@ -1,7 +1,10 @@ import type { ErrorDataUnion, ErrorNames } from '@affine/graphql'; import { GraphQLError as BaseGraphQLError } from 'graphql'; -export type ErrorName = keyof typeof ErrorNames | 'NETWORK_ERROR'; +export type ErrorName = + | keyof typeof ErrorNames + | 'NETWORK_ERROR' + | 'CONTENT_TOO_LARGE'; export interface UserFriendlyErrorResponse { status: number; diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index d0d917b242..25a658c90f 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -27,6 +27,7 @@ "yjs": "^13.6.21" }, "devDependencies": { + "@affine/error": "workspace:*", "@affine/graphql": "workspace:*", "fake-indexeddb": "^6.0.0", "idb": "^8.0.0", @@ -34,6 +35,7 @@ "vitest": "3.0.8" }, "peerDependencies": { + "@affine/error": "workspace:*", "@affine/graphql": "workspace:*", "idb": "^8.0.0", "socket.io-client": "^4.7.5" diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts index 63a820e485..8bc7ec8a56 100644 --- a/packages/common/nbstore/src/__tests__/sync.spec.ts +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -5,6 +5,7 @@ import { Doc as YDoc, encodeStateAsUpdate } from 'yjs'; import { IndexedDBBlobStorage, + IndexedDBBlobSyncStorage, IndexedDBDocStorage, IndexedDBDocSyncStorage, } from '../impls/idb'; @@ -138,8 +139,15 @@ test('blob', async () => { type: 'workspace', }); + const blobSync = new IndexedDBBlobSyncStorage({ + id: 'ws1', + flavour: 'a', + type: 'workspace', + }); + const peerA = new SpaceStorage({ blob: a, + blobSync, }); const peerB = new SpaceStorage({ blob: b, diff --git a/packages/common/nbstore/src/frontend/blob.ts b/packages/common/nbstore/src/frontend/blob.ts index 5ae4ad4759..6ee6bd6f4d 100644 --- a/packages/common/nbstore/src/frontend/blob.ts +++ b/packages/common/nbstore/src/frontend/blob.ts @@ -1,39 +1,68 @@ import type { BlobRecord, BlobStorage } from '../storage'; +import { SingletonLocker } from '../storage/lock'; import type { BlobSync } from '../sync/blob'; export class BlobFrontend { + // Since 'set' and 'get' operations may be called in rapid succession, we use a lock mechanism + // to ensure that 'get' requests for the same blob are paused when a 'set' operation is in progress. + private readonly lock = new SingletonLocker(); constructor( - public readonly storage: BlobStorage, + readonly storage: BlobStorage, private readonly sync: BlobSync ) {} - get(blobId: string) { - return this.sync.downloadBlob(blobId); + get state$() { + return this.sync.state$; } - set(blob: BlobRecord) { - return this.sync.uploadBlob(blob); + async get(blobId: string) { + await using lock = await this.lock.lock('blob', blobId); + const local = await this.storage.get(blobId); + if (local) { + return local; + } + await lock[Symbol.asyncDispose](); + + await this.sync.downloadBlob(blobId); + return await this.storage.get(blobId); } - fullDownload() { - return this.sync.fullDownload(); + async set(blob: BlobRecord) { + if (blob.data.byteLength > this.maxBlobSize) { + for (const cb of this.onReachedMaxBlobSizeCallbacks) { + cb(blob.data.byteLength); + } + throw new Error('Blob size exceeds the maximum limit'); + } + await using lock = await this.lock.lock('blob', blob.key); + await this.storage.set(blob); + await lock[Symbol.asyncDispose](); + + // We don't wait for the upload to complete, + // as the upload process runs asynchronously in the background + this.sync.uploadBlob(blob).catch(err => { + // never reach here + console.error(err); + }); + + return; } - fullUpload() { - return this.sync.fullUpload(); + fullDownload(peerId?: string, signal?: AbortSignal) { + return this.sync.fullDownload(peerId, signal); } - addPriority(_id: string, _priority: number) { - // not support yet - } - - readonly state$ = this.sync.state$; + private maxBlobSize = 1024 * 1024 * 100; // 100MB + private readonly onReachedMaxBlobSizeCallbacks: Set< + (byteSize: number) => void + > = new Set(); setMaxBlobSize(max: number) { - this.sync.setMaxBlobSize(max); + this.maxBlobSize = max; } onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void { - return this.sync.onReachedMaxBlobSize(cb); + this.onReachedMaxBlobSizeCallbacks.add(cb); + return () => this.onReachedMaxBlobSizeCallbacks.delete(cb); } } diff --git a/packages/common/nbstore/src/impls/cloud/blob.ts b/packages/common/nbstore/src/impls/cloud/blob.ts index b6ed2ce448..a11684f09b 100644 --- a/packages/common/nbstore/src/impls/cloud/blob.ts +++ b/packages/common/nbstore/src/impls/cloud/blob.ts @@ -1,11 +1,18 @@ +import { UserFriendlyError } from '@affine/error'; import { deleteBlobMutation, listBlobsQuery, releaseDeletedBlobsMutation, setBlobMutation, + workspaceQuotaQuery, } from '@affine/graphql'; -import { type BlobRecord, BlobStorageBase } from '../../storage'; +import { + type BlobRecord, + BlobStorageBase, + OverCapacityError, + OverSizeError, +} from '../../storage'; import { HttpConnection } from './http'; interface CloudBlobStorageOptions { @@ -15,6 +22,7 @@ interface CloudBlobStorageOptions { export class CloudBlobStorage extends BlobStorageBase { static readonly identifier = 'CloudBlobStorage'; + override readonly isReadonly = false; constructor(private readonly options: CloudBlobStorageOptions) { super(); @@ -22,7 +30,7 @@ export class CloudBlobStorage extends BlobStorageBase { readonly connection = new HttpConnection(this.options.serverBaseUrl); - override async get(key: string) { + override async get(key: string, signal?: AbortSignal) { const res = await this.connection.fetch( '/api/workspaces/' + this.options.id + '/blobs/' + key, { @@ -30,6 +38,7 @@ export class CloudBlobStorage extends BlobStorageBase { headers: { 'x-affine-version': BUILD_CONFIG.appVersion, }, + signal, } ); @@ -52,14 +61,32 @@ export class CloudBlobStorage extends BlobStorageBase { } } - override async set(blob: BlobRecord) { - await this.connection.gql({ - query: setBlobMutation, - variables: { - workspaceId: this.options.id, - blob: new File([blob.data], blob.key, { type: blob.mime }), - }, - }); + override async set(blob: BlobRecord, signal?: AbortSignal) { + try { + const blobSizeLimit = await this.getBlobSizeLimit(); + if (blob.data.byteLength > blobSizeLimit) { + throw new OverSizeError(); + } + await this.connection.gql({ + query: setBlobMutation, + variables: { + workspaceId: this.options.id, + blob: new File([blob.data], blob.key, { type: blob.mime }), + }, + context: { + signal, + }, + }); + } catch (err) { + const userFriendlyError = UserFriendlyError.fromAny(err); + if (userFriendlyError.is('BLOB_QUOTA_EXCEEDED')) { + throw new OverCapacityError(); + } + if (userFriendlyError.is('CONTENT_TOO_LARGE')) { + throw new OverSizeError(); + } + throw err; + } } override async delete(key: string, permanently: boolean) { @@ -87,4 +114,28 @@ export class CloudBlobStorage extends BlobStorageBase { createdAt: new Date(blob.createdAt), })); } + + private blobSizeLimitCache: number | null = null; + private blobSizeLimitCacheTime = 0; + private async getBlobSizeLimit() { + // If cache time is less than 120 seconds, return the cached value directly + if ( + this.blobSizeLimitCache !== null && + Date.now() - this.blobSizeLimitCacheTime < 120 * 1000 + ) { + return this.blobSizeLimitCache; + } + try { + const res = await this.connection.gql({ + query: workspaceQuotaQuery, + variables: { id: this.options.id }, + }); + + this.blobSizeLimitCache = res.workspace.quota.blobLimit; + this.blobSizeLimitCacheTime = Date.now(); + return this.blobSizeLimitCache; + } catch (err) { + throw UserFriendlyError.fromAny(err); + } + } } diff --git a/packages/common/nbstore/src/impls/cloud/http.ts b/packages/common/nbstore/src/impls/cloud/http.ts index 62e38a8db3..2ef3f6d97f 100644 --- a/packages/common/nbstore/src/impls/cloud/http.ts +++ b/packages/common/nbstore/src/impls/cloud/http.ts @@ -1,3 +1,4 @@ +import { UserFriendlyError } from '@affine/error'; import { gqlFetcherFactory } from '@affine/graphql'; import { DummyConnection } from '../../connection'; @@ -29,19 +30,32 @@ export class HttpConnection extends DummyConnection { }, }) .catch(err => { - throw new Error('fetch error: ' + err); + throw new UserFriendlyError({ + status: 504, + code: 'NETWORK_ERROR', + type: 'NETWORK_ERROR', + name: 'NETWORK_ERROR', + message: `Network error: ${err.message}`, + stacktrace: err.stack, + }); }); clearTimeout(timeoutId); if (!res.ok && res.status !== 404) { - let reason: string | any = ''; - if (res.headers.get('Content-Type')?.includes('application/json')) { - try { - reason = JSON.stringify(await res.json()); - } catch { - // ignore - } + if (res.status === 413) { + throw new UserFriendlyError({ + status: 413, + code: 'CONTENT_TOO_LARGE', + type: 'CONTENT_TOO_LARGE', + name: 'CONTENT_TOO_LARGE', + message: 'Content too large', + }); + } else if ( + res.headers.get('Content-Type')?.startsWith('application/json') + ) { + throw UserFriendlyError.fromAny(await res.json()); + } else { + throw UserFriendlyError.fromAny(await res.text()); } - throw new Error('fetch error status: ' + res.status + ' ' + reason); } return res; }; diff --git a/packages/common/nbstore/src/impls/idb/blob-sync.ts b/packages/common/nbstore/src/impls/idb/blob-sync.ts new file mode 100644 index 0000000000..3c320f81de --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/blob-sync.ts @@ -0,0 +1,36 @@ +import { share } from '../../connection'; +import { BlobSyncStorageBase } from '../../storage'; +import { IDBConnection, type IDBConnectionOptions } from './db'; + +export class IndexedDBBlobSyncStorage extends BlobSyncStorageBase { + static readonly identifier = 'IndexedDBBlobSyncStorage'; + + readonly connection = share(new IDBConnection(this.options)); + + constructor(private readonly options: IDBConnectionOptions) { + super(); + } + + get db() { + return this.connection; + } + + async setBlobUploadedAt( + peer: string, + blobId: string, + uploadedAt: Date | null + ): Promise { + const trx = this.db.inner.db.transaction('blobSync', 'readwrite'); + await trx.store.put({ + peer, + key: blobId, + uploadedAt, + }); + } + + async getBlobUploadedAt(peer: string, blobId: string): Promise { + const trx = this.db.inner.db.transaction('blobSync', 'readonly'); + const record = await trx.store.get([peer, blobId]); + return record?.uploadedAt ?? null; + } +} diff --git a/packages/common/nbstore/src/impls/idb/blob.ts b/packages/common/nbstore/src/impls/idb/blob.ts index d3ff484ff2..9abb02c773 100644 --- a/packages/common/nbstore/src/impls/idb/blob.ts +++ b/packages/common/nbstore/src/impls/idb/blob.ts @@ -8,6 +8,7 @@ import { IDBConnection, type IDBConnectionOptions } from './db'; export class IndexedDBBlobStorage extends BlobStorageBase { static readonly identifier = 'IndexedDBBlobStorage'; + override readonly isReadonly = false; readonly connection = share(new IDBConnection(this.options)); diff --git a/packages/common/nbstore/src/impls/idb/index.ts b/packages/common/nbstore/src/impls/idb/index.ts index fa9be9a301..de8f2ee724 100644 --- a/packages/common/nbstore/src/impls/idb/index.ts +++ b/packages/common/nbstore/src/impls/idb/index.ts @@ -1,9 +1,11 @@ import type { StorageConstructor } from '..'; import { IndexedDBBlobStorage } from './blob'; +import { IndexedDBBlobSyncStorage } from './blob-sync'; import { IndexedDBDocStorage } from './doc'; import { IndexedDBDocSyncStorage } from './doc-sync'; export * from './blob'; +export * from './blob-sync'; export * from './doc'; export * from './doc-sync'; @@ -11,4 +13,5 @@ export const idbStorages = [ IndexedDBDocStorage, IndexedDBBlobStorage, IndexedDBDocSyncStorage, + IndexedDBBlobSyncStorage, ] satisfies StorageConstructor[]; diff --git a/packages/common/nbstore/src/impls/idb/schema.ts b/packages/common/nbstore/src/impls/idb/schema.ts index 6867a28af2..4420b72e8e 100644 --- a/packages/common/nbstore/src/impls/idb/schema.ts +++ b/packages/common/nbstore/src/impls/idb/schema.ts @@ -36,6 +36,11 @@ Table(PeerClocks) | peer | docId | clock | pushed | |------|-------|-----------|-----------| | str | str | Date | Date | + +Table(BlobSync) +| peer | key | uploadedAt | +|------|-----|------------| +| str | str | Date | */ export interface DocStorageSchema extends DBSchema { snapshots: { @@ -81,6 +86,17 @@ export interface DocStorageSchema extends DBSchema { deletedAt: Date | null; }; }; + blobSync: { + key: [string, string]; + value: { + peer: string; + key: string; + uploadedAt: Date | null; + }; + indexes: { + peer: string; + }; + }; blobData: { key: string; value: { @@ -175,11 +191,19 @@ const init: Migrate = db => { autoIncrement: false, }); }; +const initBlobSync: Migrate = db => { + const blobSync = db.createObjectStore('blobSync', { + keyPath: ['peer', 'key'], + autoIncrement: false, + }); + + blobSync.createIndex('peer', 'peer', { unique: false }); +}; // END REGION // 1. all schema changed should be put in migrations // 2. order matters -const migrations: Migrate[] = [init]; +const migrations: Migrate[] = [init, initBlobSync]; export const migrator = { version: migrations.length, diff --git a/packages/common/nbstore/src/impls/idb/v1/blob.ts b/packages/common/nbstore/src/impls/idb/v1/blob.ts index ba85d8c3e1..5e8b929813 100644 --- a/packages/common/nbstore/src/impls/idb/v1/blob.ts +++ b/packages/common/nbstore/src/impls/idb/v1/blob.ts @@ -7,6 +7,7 @@ import { BlobIDBConnection, type BlobIDBConnectionOptions } from './db'; */ export class IndexedDBV1BlobStorage extends BlobStorageBase { static readonly identifier = 'IndexedDBV1BlobStorage'; + override readonly isReadonly = true; constructor(private readonly options: BlobIDBConnectionOptions) { super(); diff --git a/packages/common/nbstore/src/impls/sqlite/blob-sync.ts b/packages/common/nbstore/src/impls/sqlite/blob-sync.ts new file mode 100644 index 0000000000..d2b4e6211b --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/blob-sync.ts @@ -0,0 +1,32 @@ +import { share } from '../../connection'; +import { BlobSyncStorageBase } from '../../storage'; +import { NativeDBConnection, type SqliteNativeDBOptions } from './db'; + +export class SqliteBlobSyncStorage extends BlobSyncStorageBase { + static readonly identifier = 'SqliteBlobSyncStorage'; + + override connection = share(new NativeDBConnection(this.options)); + + constructor(private readonly options: SqliteNativeDBOptions) { + super(); + } + + get db() { + return this.connection.apis; + } + + override async setBlobUploadedAt( + peer: string, + blobId: string, + uploadedAt: Date | null + ): Promise { + await this.db.setBlobUploadedAt(peer, blobId, uploadedAt); + } + + override async getBlobUploadedAt( + peer: string, + blobId: string + ): Promise { + return this.db.getBlobUploadedAt(peer, blobId); + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/blob.ts b/packages/common/nbstore/src/impls/sqlite/blob.ts index ebfb7ca090..2e937689d7 100644 --- a/packages/common/nbstore/src/impls/sqlite/blob.ts +++ b/packages/common/nbstore/src/impls/sqlite/blob.ts @@ -4,6 +4,7 @@ import { NativeDBConnection, type SqliteNativeDBOptions } from './db'; export class SqliteBlobStorage extends BlobStorageBase { static readonly identifier = 'SqliteBlobStorage'; + override readonly isReadonly = false; override connection = share(new NativeDBConnection(this.options)); diff --git a/packages/common/nbstore/src/impls/sqlite/db.ts b/packages/common/nbstore/src/impls/sqlite/db.ts index b89d52804e..ed4f838ff8 100644 --- a/packages/common/nbstore/src/impls/sqlite/db.ts +++ b/packages/common/nbstore/src/impls/sqlite/db.ts @@ -13,67 +13,75 @@ export interface SqliteNativeDBOptions { readonly id: string; } -export type NativeDBApis = { - connect(id: string): Promise; - disconnect(id: string): Promise; - pushUpdate(id: string, docId: string, update: Uint8Array): Promise; - getDocSnapshot(id: string, docId: string): Promise; - setDocSnapshot(id: string, snapshot: DocRecord): Promise; - getDocUpdates(id: string, docId: string): Promise; - markUpdatesMerged( +export interface NativeDBApis { + connect: (id: string) => Promise; + disconnect: (id: string) => Promise; + pushUpdate: (id: string, docId: string, update: Uint8Array) => Promise; + getDocSnapshot: (id: string, docId: string) => Promise; + setDocSnapshot: (id: string, snapshot: DocRecord) => Promise; + getDocUpdates: (id: string, docId: string) => Promise; + markUpdatesMerged: ( id: string, docId: string, updates: Date[] - ): Promise; - deleteDoc(id: string, docId: string): Promise; - getDocClocks( - id: string, - after?: Date | undefined | null - ): Promise; - getDocClock(id: string, docId: string): Promise; - getBlob(id: string, key: string): Promise; - setBlob(id: string, blob: BlobRecord): Promise; - deleteBlob(id: string, key: string, permanently: boolean): Promise; - releaseBlobs(id: string): Promise; - listBlobs(id: string): Promise; - getPeerRemoteClocks(id: string, peer: string): Promise; - getPeerRemoteClock( + ) => Promise; + deleteDoc: (id: string, docId: string) => Promise; + getDocClocks: (id: string, after?: Date | null) => Promise; + getDocClock: (id: string, docId: string) => Promise; + getBlob: (id: string, key: string) => Promise; + setBlob: (id: string, blob: BlobRecord) => Promise; + deleteBlob: (id: string, key: string, permanently: boolean) => Promise; + releaseBlobs: (id: string) => Promise; + listBlobs: (id: string) => Promise; + getPeerRemoteClocks: (id: string, peer: string) => Promise; + getPeerRemoteClock: ( id: string, peer: string, docId: string - ): Promise; - setPeerRemoteClock( + ) => Promise; + setPeerRemoteClock: ( id: string, peer: string, docId: string, clock: Date - ): Promise; - getPeerPulledRemoteClocks(id: string, peer: string): Promise; - getPeerPulledRemoteClock( + ) => Promise; + getPeerPulledRemoteClocks: (id: string, peer: string) => Promise; + getPeerPulledRemoteClock: ( id: string, peer: string, docId: string - ): Promise; - setPeerPulledRemoteClock( + ) => Promise; + setPeerPulledRemoteClock: ( id: string, peer: string, docId: string, clock: Date - ): Promise; - getPeerPushedClocks(id: string, peer: string): Promise; - getPeerPushedClock( + ) => Promise; + getPeerPushedClocks: (id: string, peer: string) => Promise; + getPeerPushedClock: ( id: string, peer: string, docId: string - ): Promise; - setPeerPushedClock( + ) => Promise; + setPeerPushedClock: ( id: string, peer: string, docId: string, clock: Date - ): Promise; - clearClocks(id: string): Promise; -}; + ) => Promise; + clearClocks: (id: string) => Promise; + setBlobUploadedAt: ( + id: string, + peer: string, + blobId: string, + uploadedAt: Date | null + ) => Promise; + getBlobUploadedAt: ( + id: string, + peer: string, + blobId: string + ) => Promise; +} type NativeDBApisWrapper = NativeDBApis extends infer APIs ? { diff --git a/packages/common/nbstore/src/impls/sqlite/index.ts b/packages/common/nbstore/src/impls/sqlite/index.ts index aa312d7c24..8797983ca2 100644 --- a/packages/common/nbstore/src/impls/sqlite/index.ts +++ b/packages/common/nbstore/src/impls/sqlite/index.ts @@ -1,9 +1,11 @@ import type { StorageConstructor } from '..'; import { SqliteBlobStorage } from './blob'; +import { SqliteBlobSyncStorage } from './blob-sync'; import { SqliteDocStorage } from './doc'; import { SqliteDocSyncStorage } from './doc-sync'; export * from './blob'; +export * from './blob-sync'; export { bindNativeDBApis, type NativeDBApis } from './db'; export * from './doc'; export * from './doc-sync'; @@ -12,4 +14,5 @@ export const sqliteStorages = [ SqliteDocStorage, SqliteBlobStorage, SqliteDocSyncStorage, + SqliteBlobSyncStorage, ] satisfies StorageConstructor[]; diff --git a/packages/common/nbstore/src/impls/sqlite/v1/blob.ts b/packages/common/nbstore/src/impls/sqlite/v1/blob.ts index d955888219..5397359c9f 100644 --- a/packages/common/nbstore/src/impls/sqlite/v1/blob.ts +++ b/packages/common/nbstore/src/impls/sqlite/v1/blob.ts @@ -9,6 +9,7 @@ import { apis } from './db'; export class SqliteV1BlobStorage extends BlobStorageBase { static identifier = 'SqliteV1BlobStorage'; override connection = new DummyConnection(); + override readonly isReadonly = true; constructor(private readonly options: { type: SpaceType; id: string }) { super(); diff --git a/packages/common/nbstore/src/storage/blob-sync.ts b/packages/common/nbstore/src/storage/blob-sync.ts new file mode 100644 index 0000000000..d0f6e6cda8 --- /dev/null +++ b/packages/common/nbstore/src/storage/blob-sync.ts @@ -0,0 +1,30 @@ +import type { Connection } from '../connection'; +import type { Storage } from './storage'; + +export interface BlobSyncStorage extends Storage { + readonly storageType: 'blobSync'; + + setBlobUploadedAt( + peer: string, + blobId: string, + uploadedAt: Date | null + ): Promise; + + getBlobUploadedAt(peer: string, blobId: string): Promise; +} + +export abstract class BlobSyncStorageBase implements BlobSyncStorage { + readonly storageType = 'blobSync'; + abstract readonly connection: Connection; + + abstract setBlobUploadedAt( + peer: string, + blobId: string, + uploadedAt: Date | null + ): Promise; + + abstract getBlobUploadedAt( + peer: string, + blobId: string + ): Promise; +} diff --git a/packages/common/nbstore/src/storage/blob.ts b/packages/common/nbstore/src/storage/blob.ts index 553ee05905..4b53874179 100644 --- a/packages/common/nbstore/src/storage/blob.ts +++ b/packages/common/nbstore/src/storage/blob.ts @@ -17,6 +17,7 @@ export interface ListedBlobRecord { export interface BlobStorage extends Storage { readonly storageType: 'blob'; + readonly isReadonly: boolean; get(key: string, signal?: AbortSignal): Promise; set(blob: BlobRecord, signal?: AbortSignal): Promise; delete( @@ -31,7 +32,7 @@ export interface BlobStorage extends Storage { export abstract class BlobStorageBase implements BlobStorage { readonly storageType = 'blob'; abstract readonly connection: Connection; - + abstract readonly isReadonly: boolean; abstract get(key: string, signal?: AbortSignal): Promise; abstract set(blob: BlobRecord, signal?: AbortSignal): Promise; abstract delete( diff --git a/packages/common/nbstore/src/storage/dummy/blob-sync.ts b/packages/common/nbstore/src/storage/dummy/blob-sync.ts new file mode 100644 index 0000000000..c6f0e6f609 --- /dev/null +++ b/packages/common/nbstore/src/storage/dummy/blob-sync.ts @@ -0,0 +1,14 @@ +import { type Connection, DummyConnection } from '../../connection'; +import type { BlobSyncStorage } from '../blob-sync'; + +export class DummyBlobSyncStorage implements BlobSyncStorage { + storageType = 'blobSync' as const; + connection: Connection = new DummyConnection(); + + setBlobUploadedAt(): Promise { + return Promise.resolve(); + } + getBlobUploadedAt(): Promise { + return Promise.resolve(new Date()); + } +} diff --git a/packages/common/nbstore/src/storage/dummy/blob.ts b/packages/common/nbstore/src/storage/dummy/blob.ts index b4f9354d31..b4553febe3 100644 --- a/packages/common/nbstore/src/storage/dummy/blob.ts +++ b/packages/common/nbstore/src/storage/dummy/blob.ts @@ -6,6 +6,7 @@ import { } from '../blob'; export class DummyBlobStorage extends BlobStorageBase { + override readonly isReadonly = true; override get( _key: string, _signal?: AbortSignal diff --git a/packages/common/nbstore/src/storage/errors/index.ts b/packages/common/nbstore/src/storage/errors/index.ts index 38d63766cf..97b5a0d3fa 100644 --- a/packages/common/nbstore/src/storage/errors/index.ts +++ b/packages/common/nbstore/src/storage/errors/index.ts @@ -1 +1,2 @@ export * from './over-capacity'; +export * from './over-size'; diff --git a/packages/common/nbstore/src/storage/errors/over-capacity.ts b/packages/common/nbstore/src/storage/errors/over-capacity.ts index 574ce93332..e62c023e58 100644 --- a/packages/common/nbstore/src/storage/errors/over-capacity.ts +++ b/packages/common/nbstore/src/storage/errors/over-capacity.ts @@ -1,5 +1,5 @@ export class OverCapacityError extends Error { constructor(public originError?: any) { - super('Storage over capacity. Origin error: ' + originError); + super('Storage over capacity.'); } } diff --git a/packages/common/nbstore/src/storage/errors/over-size.ts b/packages/common/nbstore/src/storage/errors/over-size.ts new file mode 100644 index 0000000000..67ac4bf2b2 --- /dev/null +++ b/packages/common/nbstore/src/storage/errors/over-size.ts @@ -0,0 +1,5 @@ +export class OverSizeError extends Error { + constructor(public originError?: any) { + super('Blob size exceeds the limit.'); + } +} diff --git a/packages/common/nbstore/src/storage/index.ts b/packages/common/nbstore/src/storage/index.ts index fb19c87172..c15bd76cab 100644 --- a/packages/common/nbstore/src/storage/index.ts +++ b/packages/common/nbstore/src/storage/index.ts @@ -2,15 +2,22 @@ import EventEmitter2 from 'eventemitter2'; import type { AwarenessStorage } from './awareness'; import type { BlobStorage } from './blob'; +import type { BlobSyncStorage } from './blob-sync'; import type { DocStorage } from './doc'; import type { DocSyncStorage } from './doc-sync'; import { DummyAwarenessStorage } from './dummy/awareness'; import { DummyBlobStorage } from './dummy/blob'; +import { DummyBlobSyncStorage } from './dummy/blob-sync'; import { DummyDocStorage } from './dummy/doc'; import { DummyDocSyncStorage } from './dummy/doc-sync'; import type { StorageType } from './storage'; -type Storages = DocStorage | BlobStorage | DocSyncStorage | AwarenessStorage; +type Storages = + | DocStorage + | BlobStorage + | BlobSyncStorage + | DocSyncStorage + | AwarenessStorage; export type SpaceStorageOptions = { [K in StorageType]?: Storages & { storageType: K }; @@ -27,8 +34,9 @@ export class SpaceStorage { this.storages = { awareness: storages.awareness ?? new DummyAwarenessStorage(), blob: storages.blob ?? new DummyBlobStorage(), + blobSync: storages.blobSync ?? new DummyBlobSyncStorage(), doc: storages.doc ?? new DummyDocStorage(), - ['docSync']: storages['docSync'] ?? new DummyDocSyncStorage(), + docSync: storages.docSync ?? new DummyDocSyncStorage(), }; } @@ -70,6 +78,7 @@ export class SpaceStorage { export * from './awareness'; export * from './blob'; +export * from './blob-sync'; export * from './doc'; export * from './doc-sync'; export * from './errors'; diff --git a/packages/common/nbstore/src/storage/storage.ts b/packages/common/nbstore/src/storage/storage.ts index 5c501fe5cd..dca0e3c115 100644 --- a/packages/common/nbstore/src/storage/storage.ts +++ b/packages/common/nbstore/src/storage/storage.ts @@ -1,6 +1,6 @@ import type { Connection } from '../connection'; -export type StorageType = 'blob' | 'doc' | 'docSync' | 'awareness'; +export type StorageType = 'blob' | 'blobSync' | 'doc' | 'docSync' | 'awareness'; export interface Storage { readonly storageType: StorageType; diff --git a/packages/common/nbstore/src/sync/blob/index.ts b/packages/common/nbstore/src/sync/blob/index.ts index 3ccc03f282..ead12a5a74 100644 --- a/packages/common/nbstore/src/sync/blob/index.ts +++ b/packages/common/nbstore/src/sync/blob/index.ts @@ -1,213 +1,196 @@ -import EventEmitter2 from 'eventemitter2'; -import { difference } from 'lodash-es'; -import { BehaviorSubject, type Observable } from 'rxjs'; +import { + combineLatest, + map, + type Observable, + ReplaySubject, + share, + throttleTime, +} from 'rxjs'; -import type { BlobRecord, BlobStorage } from '../../storage'; -import { OverCapacityError } from '../../storage'; -import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; +import type { BlobRecord, BlobStorage, BlobSyncStorage } from '../../storage'; +import { MANUALLY_STOP } from '../../utils/throw-if-aborted'; import type { PeerStorageOptions } from '../types'; +import { BlobSyncPeer } from './peer'; export interface BlobSyncState { - isStorageOverCapacity: boolean; - total: number; - synced: number; + uploading: number; + downloading: number; + error: number; + overCapacity: boolean; +} + +export interface BlobSyncBlobState { + uploading: boolean; + downloading: boolean; + errorMessage?: string | null; + overSize: boolean; } export interface BlobSync { readonly state$: Observable; - downloadBlob( - blobId: string, - signal?: AbortSignal - ): Promise; - uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise; - fullDownload(signal?: AbortSignal): Promise; - fullUpload(signal?: AbortSignal): Promise; - setMaxBlobSize(size: number): void; - onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void; + blobState$(blobId: string): Observable; + downloadBlob(blobId: string): Promise; + uploadBlob(blob: BlobRecord): Promise; + /** + * Download all blobs from a peer + * @param peerId - The peer id to download from, if not provided, all peers will be downloaded + * @param signal - The abort signal + * @returns A promise that resolves when the download is complete + */ + fullDownload(peerId?: string, signal?: AbortSignal): Promise; } export class BlobSyncImpl implements BlobSync { - readonly state$ = new BehaviorSubject({ - isStorageOverCapacity: false, - total: Object.values(this.storages.remotes).length ? 1 : 0, - synced: 0, - }); - private abort: AbortController | null = null; - private maxBlobSize: number = 1024 * 1024 * 100; // 100MB - readonly event = new EventEmitter2(); + // abort all pending jobs when the sync is destroyed + private abortController = new AbortController(); + private started = false; + private readonly peers: BlobSyncPeer[] = Object.entries( + this.storages.remotes + ).map( + ([peerId, remote]) => + new BlobSyncPeer(peerId, this.storages.local, remote, this.blobSync) + ); - constructor(readonly storages: PeerStorageOptions) {} - - async downloadBlob(blobId: string, signal?: AbortSignal) { - try { - const localBlob = await this.storages.local.get(blobId, signal); - if (localBlob) { - return localBlob; - } - - for (const storage of Object.values(this.storages.remotes)) { - const data = await storage.get(blobId, signal); - if (data) { - await this.storages.local.set(data, signal); - return data; - } - } - return null; - } catch (e) { - console.error('error when download blob', e); - return null; - } - } - - async uploadBlob(blob: BlobRecord, signal?: AbortSignal) { - if (blob.data.length > this.maxBlobSize) { - this.event.emit('abort-large-blob', blob.data.length); - console.error('blob over limit, abort set'); - } - - await this.storages.local.set(blob); - await Promise.allSettled( - Object.values(this.storages.remotes).map(async remote => { - try { - return await remote.set(blob, signal); - } catch (err) { - if (err instanceof OverCapacityError) { - this.state$.next({ - isStorageOverCapacity: true, - total: this.state$.value.total, - synced: this.state$.value.synced, - }); + readonly state$ = combineLatest(this.peers.map(peer => peer.peerState$)).pipe( + // throttle the state to 1 second to avoid spamming the UI + throttleTime(1000), + map(allPeers => + allPeers.length === 0 + ? { + uploading: 0, + downloading: 0, + error: 0, + overCapacity: false, } - throw err; - } + : { + uploading: allPeers.reduce((acc, peer) => acc + peer.uploading, 0), + downloading: allPeers.reduce( + (acc, peer) => acc + peer.downloading, + 0 + ), + error: allPeers.reduce((acc, peer) => acc + peer.error, 0), + overCapacity: allPeers.some(p => p.overCapacity), + } + ), + share({ + connector: () => new ReplaySubject(1), + }) + ) as Observable; + + blobState$(blobId: string) { + return combineLatest( + this.peers.map(peer => peer.blobPeerState$(blobId)) + ).pipe( + throttleTime(1000), + map( + peers => + ({ + uploading: peers.some(p => p.uploading), + downloading: peers.some(p => p.downloading), + errorMessage: peers.find(p => p.errorMessage)?.errorMessage, + overSize: peers.some(p => p.overSize), + }) satisfies BlobSyncBlobState + ), + share({ + connector: () => new ReplaySubject(1), }) ); } - async fullDownload(signal?: AbortSignal) { - throwIfAborted(signal); + constructor( + readonly storages: PeerStorageOptions, + readonly blobSync: BlobSyncStorage + ) {} - await this.storages.local.connection.waitForConnected(signal); - const localList = (await this.storages.local.list(signal)).map(b => b.key); - this.state$.next({ - ...this.state$.value, - synced: localList.length, - }); - - await Promise.allSettled( - Object.entries(this.storages.remotes).map( - async ([remotePeer, remote]) => { - await remote.connection.waitForConnected(signal); - - const remoteList = (await remote.list(signal)).map(b => b.key); - - this.state$.next({ - ...this.state$.value, - total: Math.max(this.state$.value.total, remoteList.length), - }); - - throwIfAborted(signal); - - const needDownload = difference(remoteList, localList); - for (const key of needDownload) { - try { - const data = await remote.get(key, signal); - throwIfAborted(signal); - if (data) { - await this.storages.local.set(data, signal); - this.state$.next({ - ...this.state$.value, - synced: this.state$.value.synced + 1, - }); - throwIfAborted(signal); - } - } catch (err) { - if (err === MANUALLY_STOP) { - throw err; - } - console.error( - `error when sync ${key} from [${remotePeer}] to [local]`, - err - ); - } - } - } - ) - ); - } - - async fullUpload(signal?: AbortSignal) { - throwIfAborted(signal); - - await this.storages.local.connection.waitForConnected(signal); - const localList = (await this.storages.local.list(signal)).map(b => b.key); - - await Promise.allSettled( - Object.entries(this.storages.remotes).map( - async ([remotePeer, remote]) => { - await remote.connection.waitForConnected(signal); - - const remoteList = (await remote.list(signal)).map(b => b.key); - - throwIfAborted(signal); - - const needUpload = difference(localList, remoteList); - for (const key of needUpload) { - try { - const data = await this.storages.local.get(key, signal); - throwIfAborted(signal); - if (data) { - await remote.set(data, signal); - throwIfAborted(signal); - } - } catch (err) { - if (err === MANUALLY_STOP) { - throw err; - } - console.error( - `error when sync ${key} from [local] to [${remotePeer}]`, - err - ); - } - } - } - ) - ); - } - - start() { - if (this.abort) { - this.abort.abort(MANUALLY_STOP); - } - - const abort = new AbortController(); - this.abort = abort; - this.fullUpload(abort.signal).catch(error => { - if (error === MANUALLY_STOP) { + downloadBlob(blobId: string) { + const signal = this.abortController.signal; + return Promise.race( + this.peers.map(p => p.downloadBlob(blobId, signal)) + ).catch(err => { + if (err === MANUALLY_STOP) { return; } - console.error('sync blob error', error); + // should never reach here, `downloadBlob()` should never throw + console.error(err); }); } + uploadBlob(blob: BlobRecord) { + return Promise.all( + this.peers.map(p => p.uploadBlob(blob, this.abortController.signal)) + ).catch(err => { + if (err === MANUALLY_STOP) { + return; + } + // should never reach here, `uploadBlob()` should never throw + console.error(err); + }) as Promise; + } + + // start the upload loop + start() { + if (this.started) { + return; + } + this.started = true; + + const signal = this.abortController.signal; + Promise.allSettled(this.peers.map(p => p.fullUploadLoop(signal))).catch( + err => { + // should never reach here + console.error(err); + } + ); + } + + // download all blobs from a peer + async fullDownload( + peerId?: string, + outerSignal?: AbortSignal + ): Promise { + return Promise.race([ + Promise.all( + peerId + ? [this.fullDownloadPeer(peerId)] + : this.peers.map(p => this.fullDownloadPeer(p.peerId)) + ), + new Promise((_, reject) => { + // Reject the promise if the outer signal is aborted + // The outer signal only controls the API promise, not the actual download process + if (outerSignal?.aborted) { + reject(outerSignal.reason); + } + outerSignal?.addEventListener('abort', reason => { + reject(reason); + }); + }), + ]) as Promise; + } + + // cache the download promise for each peer + // this is used to avoid downloading the same peer multiple times + private readonly fullDownloadPromise = new Map>(); + private fullDownloadPeer(peerId: string) { + const peer = this.peers.find(p => p.peerId === peerId); + if (!peer) { + return; + } + const existing = this.fullDownloadPromise.get(peerId); + if (existing) { + return existing; + } + const promise = peer + .fullDownload(this.abortController.signal) + .finally(() => { + this.fullDownloadPromise.delete(peerId); + }); + this.fullDownloadPromise.set(peerId, promise); + return promise; + } + stop() { - this.abort?.abort(MANUALLY_STOP); - this.abort = null; - } - - addPriority(_id: string, _priority: number): () => void { - // TODO: implement - return () => {}; - } - - setMaxBlobSize(size: number): void { - this.maxBlobSize = size; - } - - onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void { - this.event.on('abort-large-blob', cb); - return () => { - this.event.off('abort-large-blob', cb); - }; + this.abortController.abort(); + this.abortController = new AbortController(); + this.started = false; } } diff --git a/packages/common/nbstore/src/sync/blob/peer.ts b/packages/common/nbstore/src/sync/blob/peer.ts new file mode 100644 index 0000000000..cc49fc6be9 --- /dev/null +++ b/packages/common/nbstore/src/sync/blob/peer.ts @@ -0,0 +1,463 @@ +import { difference } from 'lodash-es'; +import { Observable, ReplaySubject, share, Subject } from 'rxjs'; + +import type { BlobRecord, BlobStorage } from '../../storage'; +import { OverCapacityError, OverSizeError } from '../../storage'; +import type { BlobSyncStorage } from '../../storage/blob-sync'; +import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; + +export interface BlobSyncPeerState { + uploading: number; + downloading: number; + error: number; + overCapacity: boolean; +} + +export interface BlobSyncPeerBlobState { + uploading: boolean; + downloading: boolean; + overSize: boolean; + errorMessage?: string | null; +} + +export class BlobSyncPeer { + private readonly status = new BlobSyncPeerStatus(); + + get peerState$() { + return this.status.peerState$; + } + + blobPeerState$(blobId: string) { + return this.status.blobPeerState$(blobId); + } + + constructor( + readonly peerId: string, + readonly local: BlobStorage, + readonly remote: BlobStorage, + readonly blobSync: BlobSyncStorage + ) {} + + private readonly downloadingPromise = new Map>(); + + downloadBlob(blobId: string, signal?: AbortSignal): Promise { + // if the blob is already downloading, return the existing promise + const existing = this.downloadingPromise.get(blobId); + if (existing) { + return existing; + } + + const backoffRetry = { + delay: 1000, + maxDelay: 10000, + count: this.remote.isReadonly ? 1 : 5, // readonly remote storage will not retry + }; + + const promise = new Promise((resolve, reject) => { + // mark the blob as downloading + this.status.blobDownloading(blobId); + + let attempts = 0; + + const attempt = async () => { + try { + throwIfAborted(signal); + const data = await this.remote.get(blobId, signal); + throwIfAborted(signal); + if (data) { + // mark the blob as uploaded to avoid uploading the same blob again + await this.blobSync.setBlobUploadedAt( + this.peerId, + blobId, + new Date() + ); + await this.local.set(data, signal); + } else { + // if the blob is not found, maybe the uploader have't uploaded the blob yet, we will retry several times + attempts++; + if (attempts < backoffRetry.count) { + const waitTime = Math.min( + Math.pow(2, attempts - 1) * backoffRetry.delay, + backoffRetry.maxDelay + ); + // eslint-disable-next-line @typescript-eslint/no-misused-promises + setTimeout(attempt, waitTime); + } + } + resolve(); + } catch (error) { + // if we encounter any error, reject without retry + reject(error); + } + }; + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + attempt(); + }) + .catch(error => { + if (error === MANUALLY_STOP) { + throw error; + } + this.status.blobError( + blobId, + error instanceof Error ? error.message : String(error) + ); + }) + .finally(() => { + this.status.blobDownloadFinish(blobId); + this.downloadingPromise.delete(blobId); + }); + + this.downloadingPromise.set(blobId, promise); + return promise; + } + + uploadingPromise = new Map>(); + + uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise { + if (this.remote.isReadonly) { + return Promise.resolve(); + } + + const existing = this.uploadingPromise.get(blob.key); + if (existing) { + return existing; + } + + const promise = (async () => { + // mark the blob as uploading + this.status.blobUploading(blob.key); + await this.blobSync.setBlobUploadedAt(this.peerId, blob.key, null); + try { + throwIfAborted(signal); + await this.remote.set(blob, signal); + await this.blobSync.setBlobUploadedAt( + this.peerId, + blob.key, + new Date() + ); + + // free the remote storage over capacity flag + this.status.remoteOverCapacityFree(); + } catch (err) { + if (err === MANUALLY_STOP) { + throw err; + } + if (err instanceof OverCapacityError) { + // mark the remote storage as over capacity, this will stop the upload loop + this.status.remoteOverCapacity(); + this.status.blobError(blob.key, 'Remote storage over capacity'); + } else if (err instanceof OverSizeError) { + this.status.blobOverSize(blob.key); + this.status.blobError(blob.key, 'Blob size too large'); + } else { + this.status.blobError( + blob.key, + err instanceof Error ? err.message : String(err) + ); + } + } finally { + this.status.blobUploadFinish(blob.key); + } + })().finally(() => { + this.uploadingPromise.delete(blob.key); + }); + + this.uploadingPromise.set(blob.key, promise); + return promise; + } + + async fullUploadLoop(signal?: AbortSignal) { + while (true) { + try { + await this.fullUpload(signal); + } catch (err) { + if (signal?.aborted) { + return; + } + // should never reach here + console.warn('Blob full upload error, retry in 15s', err); + } + // wait for 15s before next loop + await new Promise(resolve => { + setTimeout(resolve, 15000); + }); + if (signal?.aborted) { + return; + } + } + } + + private async fullUpload(signal?: AbortSignal) { + if (this.remote.isReadonly) { + return; + } + + // if the remote storage is over capacity, skip the upload loop + if (this.status.overCapacity) { + return; + } + + await this.local.connection.waitForConnected(signal); + await this.remote.connection.waitForConnected(signal); + + const localList = await this.local.list(); + + const needUpload: string[] = []; + for (const blob of localList) { + const uploadedAt = await this.blobSync.getBlobUploadedAt( + this.peerId, + blob.key + ); + if (uploadedAt === null) { + needUpload.push(blob.key); + } else { + // if the blob has uploaded, we clear its error flag here. + // this ensures that the sync status seen by the user is clean. + this.status.blobErrorFree(blob.key); + } + } + + if (needUpload.length === 0) { + return; + } + + // mark all blobs as will upload + for (const blobKey of needUpload) { + this.status.blobWillUpload(blobKey); + } + + try { + if (needUpload.length <= 3) { + // if there is only few blobs to upload, upload them one by one + + // upload the blobs + for (const blobKey of needUpload) { + const data = await this.local.get(blobKey); + throwIfAborted(signal); + if (data) { + await this.uploadBlob(data, signal); + } + } + } else { + // if there are many blobs to upload, call remote list to reduce unnecessary uploads + const remoteList = new Set((await this.remote.list()).map(b => b.key)); + + for (const blobKey of needUpload) { + if (remoteList.has(blobKey)) { + // if the blob is already uploaded, set the blob as uploaded + await this.blobSync.setBlobUploadedAt( + this.peerId, + blobKey, + new Date() + ); + + // mark the blob as uploaded + this.status.blobUploadFinish(blobKey); + continue; + } + + // if the blob is over size, skip it + if (this.status.overSize.has(blobKey)) { + continue; + } + + const data = await this.local.get(blobKey); + throwIfAborted(signal); + if (data) { + await this.uploadBlob(data, signal); + } + } + } + } finally { + // remove all will upload flags + for (const blobKey of needUpload) { + this.status.blobWillUploadFinish(blobKey); + } + } + } + + async fullDownload(signal?: AbortSignal) { + await this.local.connection.waitForConnected(signal); + await this.remote.connection.waitForConnected(signal); + + const localList = (await this.local.list()).map(b => b.key); + const remoteList = (await this.remote.list()).map(b => b.key); + + const needDownload = difference(remoteList, localList); + + // mark all blobs as will download + for (const blobKey of needDownload) { + this.status.blobWillDownload(blobKey); + } + + try { + for (const blobKey of needDownload) { + throwIfAborted(signal); + // download the blobs + await this.downloadBlob(blobKey, signal); + } + } finally { + // remove all will download flags + for (const blobKey of needDownload) { + this.status.blobWillDownloadFinish(blobKey); + } + } + } + + async markBlobUploaded(blobKey: string): Promise { + await this.blobSync.setBlobUploadedAt(this.peerId, blobKey, new Date()); + } +} + +class BlobSyncPeerStatus { + overCapacity = false; + willUpload = new Set(); + uploading = new Set(); + downloading = new Set(); + willDownload = new Set(); + error = new Map(); + overSize = new Set(); + + peerState$ = new Observable(subscribe => { + const next = () => { + subscribe.next({ + uploading: this.willUpload.union(this.uploading).size, + downloading: this.willDownload.union(this.downloading).size, + error: this.error.size, + overCapacity: this.overCapacity, + }); + }; + next(); + const dispose = this.statusUpdatedSubject$.subscribe(() => { + next(); + }); + return () => { + dispose.unsubscribe(); + }; + }).pipe( + share({ + connector: () => new ReplaySubject(1), + }) + ); + + blobPeerState$(blobId: string) { + return new Observable(subscribe => { + const next = () => { + subscribe.next({ + uploading: this.willUpload.has(blobId) || this.uploading.has(blobId), + downloading: + this.willDownload.has(blobId) || this.downloading.has(blobId), + errorMessage: this.error.get(blobId) ?? null, + overSize: this.overSize.has(blobId), + }); + }; + next(); + const dispose = this.statusUpdatedSubject$.subscribe(updatedBlobId => { + if (updatedBlobId === blobId || updatedBlobId === true) { + next(); + } + }); + return () => { + dispose.unsubscribe(); + }; + }); + } + + private readonly statusUpdatedSubject$ = new Subject(); + + blobUploading(blobId: string) { + if (!this.uploading.has(blobId)) { + this.uploading.add(blobId); + this.statusUpdatedSubject$.next(blobId); + } + } + + blobUploadFinish(blobId: string) { + let deleted = false; + deleted = this.uploading.delete(blobId) || deleted; + deleted = this.willUpload.delete(blobId) || deleted; + if (deleted) { + this.statusUpdatedSubject$.next(blobId); + } + this.blobErrorFree(blobId); + } + + blobWillUpload(blobId: string) { + if (!this.willUpload.has(blobId)) { + this.willUpload.add(blobId); + this.statusUpdatedSubject$.next(blobId); + } + } + + blobWillUploadFinish(blobId: string) { + const deleted = this.willUpload.delete(blobId); + if (deleted) { + this.statusUpdatedSubject$.next(blobId); + } + } + + blobDownloading(blobId: string) { + if (!this.downloading.has(blobId)) { + this.downloading.add(blobId); + this.statusUpdatedSubject$.next(blobId); + } + } + + blobDownloadFinish(blobId: string) { + let deleted = false; + deleted = this.willDownload.delete(blobId) || deleted; + deleted = this.downloading.delete(blobId) || deleted; + if (deleted) { + this.statusUpdatedSubject$.next(blobId); + } + this.blobErrorFree(blobId); + } + + blobWillDownload(blobId: string) { + if (!this.willDownload.has(blobId)) { + this.willDownload.add(blobId); + this.statusUpdatedSubject$.next(blobId); + } + } + + blobWillDownloadFinish(blobId: string) { + const deleted = this.willDownload.delete(blobId); + if (deleted) { + this.statusUpdatedSubject$.next(blobId); + } + } + + blobError(blobId: string, errorMessage: string) { + this.error.set(blobId, errorMessage); + this.statusUpdatedSubject$.next(blobId); + } + + remoteOverCapacity() { + if (!this.overCapacity) { + this.overCapacity = true; + this.statusUpdatedSubject$.next(true); + } + } + + remoteOverCapacityFree() { + if (this.overCapacity) { + this.overCapacity = false; + this.statusUpdatedSubject$.next(true); + } + } + + blobOverSize(blobId: string) { + this.overSize.add(blobId); + this.statusUpdatedSubject$.next(blobId); + } + + blobErrorFree(blobId: string) { + let deleted = false; + deleted = this.error.delete(blobId) || deleted; + deleted = this.overSize.delete(blobId) || deleted; + if (deleted) { + this.statusUpdatedSubject$.next(blobId); + } + } +} diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts index b5e260c822..3acead8bdf 100644 --- a/packages/common/nbstore/src/sync/doc/peer.ts +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -557,10 +557,10 @@ export class DocSyncPeer { }; this.statusUpdatedSubject$.next(true); } - // wait for 1s before next retry + // wait for 5s before next retry await Promise.race([ new Promise(resolve => { - setTimeout(resolve, 1000); + setTimeout(resolve, 5000); }), new Promise((_, reject) => { // exit if manually stopped diff --git a/packages/common/nbstore/src/sync/index.ts b/packages/common/nbstore/src/sync/index.ts index 0ee391a02f..ad5513dcbe 100644 --- a/packages/common/nbstore/src/sync/index.ts +++ b/packages/common/nbstore/src/sync/index.ts @@ -24,6 +24,7 @@ export class Sync { const doc = storages.local.get('doc'); const blob = storages.local.get('blob'); const docSync = storages.local.get('docSync'); + const blobSync = storages.local.get('blobSync'); const awareness = storages.local.get('awareness'); this.doc = new DocSyncImpl( @@ -38,15 +39,18 @@ export class Sync { }, docSync ); - this.blob = new BlobSyncImpl({ - local: blob, - remotes: Object.fromEntries( - Object.entries(storages.remotes).map(([peerId, remote]) => [ - peerId, - remote.get('blob'), - ]) - ), - }); + this.blob = new BlobSyncImpl( + { + local: blob, + remotes: Object.fromEntries( + Object.entries(storages.remotes).map(([peerId, remote]) => [ + peerId, + remote.get('blob'), + ]) + ), + }, + blobSync + ); this.awareness = new AwarenessSyncImpl({ local: awareness, remotes: Object.fromEntries( diff --git a/packages/common/nbstore/src/worker/client.ts b/packages/common/nbstore/src/worker/client.ts index 43279bbdbc..0891bf1440 100644 --- a/packages/common/nbstore/src/worker/client.ts +++ b/packages/common/nbstore/src/worker/client.ts @@ -176,6 +176,7 @@ class WorkerBlobStorage implements BlobStorage { constructor(private readonly client: OpClient) {} readonly storageType = 'blob'; + readonly isReadonly = false; get(key: string, _signal?: AbortSignal): Promise { return this.client.call('blobStorage.getBlob', key); @@ -233,47 +234,38 @@ class WorkerBlobSync implements BlobSync { get state$() { return this.client.ob$('blobSync.state'); } - setMaxBlobSize(size: number): void { - this.client.call('blobSync.setMaxBlobSize', size).catch(err => { - console.error('error setting max blob size', err); - }); + blobState$(blobId: string) { + return this.client.ob$('blobSync.blobState', blobId); } - onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void { - const subscription = this.client - .ob$('blobSync.onReachedMaxBlobSize') - .subscribe(byteSize => { - cb(byteSize); - }); - return () => { - subscription.unsubscribe(); - }; - } - downloadBlob( - blobId: string, - _signal?: AbortSignal - ): Promise { + + downloadBlob(blobId: string): Promise { return this.client.call('blobSync.downloadBlob', blobId); } - uploadBlob(blob: BlobRecord, _signal?: AbortSignal): Promise { + uploadBlob(blob: BlobRecord): Promise { return this.client.call('blobSync.uploadBlob', blob); } - fullDownload(signal?: AbortSignal): Promise { - const download = this.client.call('blobSync.fullDownload'); + fullDownload(peerId?: string, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const abortListener = () => { + reject(signal?.reason); + subscription.unsubscribe(); + }; - signal?.addEventListener('abort', () => { - download.cancel(); + signal?.addEventListener('abort', abortListener); + + const subscription = this.client + .ob$('blobSync.fullDownload', peerId ?? null) + .subscribe({ + next() { + signal?.removeEventListener('abort', abortListener); + resolve(); + }, + error(err) { + signal?.removeEventListener('abort', abortListener); + reject(err); + }, + }); }); - - return download; - } - fullUpload(signal?: AbortSignal): Promise { - const upload = this.client.call('blobSync.fullUpload'); - - signal?.addEventListener('abort', () => { - upload.cancel(); - }); - - return upload; } } diff --git a/packages/common/nbstore/src/worker/consumer.ts b/packages/common/nbstore/src/worker/consumer.ts index 76d2d69418..315c1d0297 100644 --- a/packages/common/nbstore/src/worker/consumer.ts +++ b/packages/common/nbstore/src/worker/consumer.ts @@ -170,25 +170,6 @@ class StoreConsumer { this.blobStorage.delete(key, permanently), 'blobStorage.releaseBlobs': () => this.blobStorage.release(), 'blobStorage.listBlobs': () => this.blobStorage.list(), - 'docSyncStorage.clearClocks': () => this.docSyncStorage.clearClocks(), - 'docSyncStorage.getPeerPulledRemoteClock': ({ peer, docId }) => - this.docSyncStorage.getPeerPulledRemoteClock(peer, docId), - 'docSyncStorage.getPeerPulledRemoteClocks': ({ peer }) => - this.docSyncStorage.getPeerPulledRemoteClocks(peer), - 'docSyncStorage.setPeerPulledRemoteClock': ({ peer, clock }) => - this.docSyncStorage.setPeerPulledRemoteClock(peer, clock), - 'docSyncStorage.getPeerRemoteClock': ({ peer, docId }) => - this.docSyncStorage.getPeerRemoteClock(peer, docId), - 'docSyncStorage.getPeerRemoteClocks': ({ peer }) => - this.docSyncStorage.getPeerRemoteClocks(peer), - 'docSyncStorage.setPeerRemoteClock': ({ peer, clock }) => - this.docSyncStorage.setPeerRemoteClock(peer, clock), - 'docSyncStorage.getPeerPushedClock': ({ peer, docId }) => - this.docSyncStorage.getPeerPushedClock(peer, docId), - 'docSyncStorage.getPeerPushedClocks': ({ peer }) => - this.docSyncStorage.getPeerPushedClocks(peer), - 'docSyncStorage.setPeerPushedClock': ({ peer, clock }) => - this.docSyncStorage.setPeerPushedClock(peer, clock), 'awarenessStorage.update': ({ awareness, origin }) => this.awarenessStorage.update(awareness, origin), 'awarenessStorage.subscribeUpdate': docId => @@ -232,20 +213,23 @@ class StoreConsumer { return () => undo(); }), 'docSync.resetSync': () => this.docSync.resetSync(), + 'blobSync.state': () => this.blobSync.state$, + 'blobSync.blobState': blobId => this.blobSync.blobState$(blobId), 'blobSync.downloadBlob': key => this.blobSync.downloadBlob(key), 'blobSync.uploadBlob': blob => this.blobSync.uploadBlob(blob), - 'blobSync.fullDownload': (_, { signal }) => - this.blobSync.fullDownload(signal), - 'blobSync.fullUpload': (_, { signal }) => - this.blobSync.fullUpload(signal), - 'blobSync.state': () => this.blobSync.state$, - 'blobSync.setMaxBlobSize': size => this.blobSync.setMaxBlobSize(size), - 'blobSync.onReachedMaxBlobSize': () => + 'blobSync.fullDownload': peerId => new Observable(subscriber => { - const undo = this.blobSync.onReachedMaxBlobSize(byteSize => { - subscriber.next(byteSize); - }); - return () => undo(); + const abortController = new AbortController(); + this.blobSync + .fullDownload(peerId ?? undefined, abortController.signal) + .then(() => { + subscriber.next(); + subscriber.complete(); + }) + .catch(error => { + subscriber.error(error); + }); + return () => abortController.abort(MANUALLY_STOP); }), 'awarenessSync.update': ({ awareness, origin }) => this.awarenessSync.update(awareness, origin), diff --git a/packages/common/nbstore/src/worker/ops.ts b/packages/common/nbstore/src/worker/ops.ts index 964377924d..2765f12bb0 100644 --- a/packages/common/nbstore/src/worker/ops.ts +++ b/packages/common/nbstore/src/worker/ops.ts @@ -10,7 +10,7 @@ import type { StorageType, } from '../storage'; import type { AwarenessRecord } from '../storage/awareness'; -import type { BlobSyncState } from '../sync/blob'; +import type { BlobSyncBlobState, BlobSyncState } from '../sync/blob'; import type { DocSyncDocState, DocSyncState } from '../sync/doc'; type StorageInitOptions = Values<{ @@ -45,22 +45,6 @@ interface GroupedWorkerOps { listBlobs: [void, ListedBlobRecord[]]; }; - docSyncStorage: { - getPeerPulledRemoteClocks: [{ peer: string }, DocClocks]; - getPeerPulledRemoteClock: [ - { peer: string; docId: string }, - DocClock | null, - ]; - setPeerPulledRemoteClock: [{ peer: string; clock: DocClock }, void]; - getPeerRemoteClocks: [{ peer: string }, DocClocks]; - getPeerRemoteClock: [{ peer: string; docId: string }, DocClock | null]; - setPeerRemoteClock: [{ peer: string; clock: DocClock }, void]; - getPeerPushedClocks: [{ peer: string }, DocClocks]; - getPeerPushedClock: [{ peer: string; docId: string }, DocClock | null]; - setPeerPushedClock: [{ peer: string; clock: DocClock }, void]; - clearClocks: [void, void]; - }; - awarenessStorage: { update: [{ awareness: AwarenessRecord; origin?: string }, void]; subscribeUpdate: [ @@ -85,13 +69,11 @@ interface GroupedWorkerOps { }; blobSync: { - downloadBlob: [string, BlobRecord | null]; - uploadBlob: [BlobRecord, void]; - fullDownload: [void, void]; - fullUpload: [void, void]; - setMaxBlobSize: [number, void]; - onReachedMaxBlobSize: [void, number]; state: [void, BlobSyncState]; + blobState: [string, BlobSyncBlobState]; + downloadBlob: [string, void]; + uploadBlob: [BlobRecord, void]; + fullDownload: [string | null, void]; }; awarenessSync: { diff --git a/packages/common/nbstore/tsconfig.json b/packages/common/nbstore/tsconfig.json index 5c091c14fb..9586a8f303 100644 --- a/packages/common/nbstore/tsconfig.json +++ b/packages/common/nbstore/tsconfig.json @@ -6,5 +6,9 @@ "outDir": "./dist", "tsBuildInfoFile": "./dist/tsconfig.tsbuildinfo" }, - "references": [{ "path": "../infra" }, { "path": "../../frontend/graphql" }] + "references": [ + { "path": "../infra" }, + { "path": "../error" }, + { "path": "../../frontend/graphql" } + ] } diff --git a/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts index e0bb134ca7..21040120eb 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts @@ -45,4 +45,6 @@ export const nbstoreHandlers: NativeDBApis = { getPeerPushedClock: POOL.getPeerPushedClock.bind(POOL), setPeerPushedClock: POOL.setPeerPushedClock.bind(POOL), clearClocks: POOL.clearClocks.bind(POOL), + setBlobUploadedAt: POOL.setBlobUploadedAt.bind(POOL), + getBlobUploadedAt: POOL.getBlobUploadedAt.bind(POOL), }; diff --git a/packages/frontend/apps/ios/App/App/Plugins/NBStore/NBStorePlugin.swift b/packages/frontend/apps/ios/App/App/Plugins/NBStore/NBStorePlugin.swift index a81c47c114..a5bfcfcf31 100644 --- a/packages/frontend/apps/ios/App/App/Plugins/NBStore/NBStorePlugin.swift +++ b/packages/frontend/apps/ios/App/App/Plugins/NBStore/NBStorePlugin.swift @@ -34,6 +34,8 @@ public class NbStorePlugin: CAPPlugin, CAPBridgedPlugin { CAPPluginMethod(name: "getPeerPushedClocks", returnType: CAPPluginReturnPromise), CAPPluginMethod(name: "setPeerPushedClock", returnType: CAPPluginReturnPromise), CAPPluginMethod(name: "clearClocks", returnType: CAPPluginReturnPromise), + CAPPluginMethod(name: "getBlobUploadedAt", returnType: CAPPluginReturnPromise), + CAPPluginMethod(name: "setBlobUploadedAt", returnType: CAPPluginReturnPromise), ] @objc func connect(_ call: CAPPluginCall) { @@ -490,6 +492,49 @@ public class NbStorePlugin: CAPPlugin, CAPBridgedPlugin { } } + @objc func getBlobUploadedAt(_ call: CAPPluginCall) { + Task { + do { + let id = try call.getStringEnsure("id") + let peer = try call.getStringEnsure("peer") + let blobId = try call.getStringEnsure("blobId") + + let uploadedAt = try await docStoragePool.getBlobUploadedAt( + universalId: id, + peer: peer, + blobId: blobId + ) + + call.resolve([ + "uploadedAt": uploadedAt as Any + ]) + } catch { + call.reject("Failed to get blob uploaded, \(error)", nil, error) + } + } + } + + @objc func setBlobUploadedAt(_ call: CAPPluginCall) { + Task { + do { + let id = try call.getStringEnsure("id") + let peer = try call.getStringEnsure("peer") + let blobId = try call.getStringEnsure("blobId") + let uploadedAt = call.getInt("uploadedAt") + + try await docStoragePool.setBlobUploadedAt( + universalId: id, + peer: peer, + blobId: blobId, + uploadedAt: uploadedAt == nil ? nil : Int64(uploadedAt!) + ) + call.resolve() + } catch { + call.reject("Failed to set blob uploaded, \(error)", nil, error) + } + } + } + @objc func clearClocks(_ call: CAPPluginCall) { Task { do { diff --git a/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_native.swift b/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_native.swift index 942f0688ed..b3251eedf8 100644 --- a/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_native.swift +++ b/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_native.swift @@ -514,6 +514,8 @@ public protocol DocStoragePoolProtocol: AnyObject { func getBlob(universalId: String, key: String) async throws -> Blob? + func getBlobUploadedAt(universalId: String, peer: String, blobId: String) async throws -> Int64? + func getDocClock(universalId: String, docId: String) async throws -> DocClock? func getDocClocks(universalId: String, after: Int64?) async throws -> [DocClock] @@ -544,6 +546,8 @@ public protocol DocStoragePoolProtocol: AnyObject { func setBlob(universalId: String, blob: SetBlob) async throws + func setBlobUploadedAt(universalId: String, peer: String, blobId: String, uploadedAt: Int64?) async throws + func setDocSnapshot(universalId: String, snapshot: DocRecord) async throws -> Bool func setPeerPulledRemoteClock(universalId: String, peer: String, docId: String, clock: Int64) async throws @@ -709,6 +713,23 @@ open func getBlob(universalId: String, key: String)async throws -> Blob? { ) } +open func getBlobUploadedAt(universalId: String, peer: String, blobId: String)async throws -> Int64? { + return + try await uniffiRustCallAsync( + rustFutureFunc: { + uniffi_affine_mobile_native_fn_method_docstoragepool_get_blob_uploaded_at( + self.uniffiClonePointer(), + FfiConverterString.lower(universalId),FfiConverterString.lower(peer),FfiConverterString.lower(blobId) + ) + }, + pollFunc: ffi_affine_mobile_native_rust_future_poll_rust_buffer, + completeFunc: ffi_affine_mobile_native_rust_future_complete_rust_buffer, + freeFunc: ffi_affine_mobile_native_rust_future_free_rust_buffer, + liftFunc: FfiConverterOptionInt64.lift, + errorHandler: FfiConverterTypeUniffiError.lift + ) +} + open func getDocClock(universalId: String, docId: String)async throws -> DocClock? { return try await uniffiRustCallAsync( @@ -964,6 +985,23 @@ open func setBlob(universalId: String, blob: SetBlob)async throws { ) } +open func setBlobUploadedAt(universalId: String, peer: String, blobId: String, uploadedAt: Int64?)async throws { + return + try await uniffiRustCallAsync( + rustFutureFunc: { + uniffi_affine_mobile_native_fn_method_docstoragepool_set_blob_uploaded_at( + self.uniffiClonePointer(), + FfiConverterString.lower(universalId),FfiConverterString.lower(peer),FfiConverterString.lower(blobId),FfiConverterOptionInt64.lower(uploadedAt) + ) + }, + pollFunc: ffi_affine_mobile_native_rust_future_poll_void, + completeFunc: ffi_affine_mobile_native_rust_future_complete_void, + freeFunc: ffi_affine_mobile_native_rust_future_free_void, + liftFunc: { $0 }, + errorHandler: FfiConverterTypeUniffiError.lift + ) +} + open func setDocSnapshot(universalId: String, snapshot: DocRecord)async throws -> Bool { return try await uniffiRustCallAsync( @@ -1972,6 +2010,9 @@ private let initializationResult: InitializationResult = { if (uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob() != 56927) { return InitializationResult.apiChecksumMismatch } + if (uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob_uploaded_at() != 41270) { + return InitializationResult.apiChecksumMismatch + } if (uniffi_affine_mobile_native_checksum_method_docstoragepool_get_doc_clock() != 48394) { return InitializationResult.apiChecksumMismatch } @@ -2017,6 +2058,9 @@ private let initializationResult: InitializationResult = { if (uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob() != 31398) { return InitializationResult.apiChecksumMismatch } + if (uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob_uploaded_at() != 7188) { + return InitializationResult.apiChecksumMismatch + } if (uniffi_affine_mobile_native_checksum_method_docstoragepool_set_doc_snapshot() != 5287) { return InitializationResult.apiChecksumMismatch } diff --git a/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_nativeFFI.h b/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_nativeFFI.h index 4b7c59eeee..bff076d8ad 100644 --- a/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_nativeFFI.h +++ b/packages/frontend/apps/ios/App/App/uniffi/affine_mobile_nativeFFI.h @@ -291,6 +291,11 @@ uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_disconnect(void*_N uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_get_blob(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer key ); #endif +#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT +#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT +uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_get_blob_uploaded_at(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer peer, RustBuffer blob_id +); +#endif #ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_DOC_CLOCK #define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_DOC_CLOCK uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_get_doc_clock(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer doc_id @@ -366,6 +371,11 @@ uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_release_blobs(void uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_set_blob(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer blob ); #endif +#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT +#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT +uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_set_blob_uploaded_at(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer peer, RustBuffer blob_id, RustBuffer uploaded_at +); +#endif #ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_DOC_SNAPSHOT #define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_DOC_SNAPSHOT uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_set_doc_snapshot(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer snapshot @@ -728,6 +738,12 @@ uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_disconnect(v #define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_BLOB uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob(void +); +#endif +#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT +#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT +uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob_uploaded_at(void + ); #endif #ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_DOC_CLOCK @@ -818,6 +834,12 @@ uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_release_blob #define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_BLOB uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob(void +); +#endif +#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT +#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT +uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob_uploaded_at(void + ); #endif #ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_DOC_SNAPSHOT diff --git a/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts b/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts index e5347730c5..a90f271fe0 100644 --- a/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts +++ b/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts @@ -137,5 +137,16 @@ export interface NbStorePlugin { docId: string; timestamp: number; }) => Promise; + getBlobUploadedAt: (options: { + id: string; + peer: string; + blobId: string; + }) => Promise<{ uploadedAt: number | null }>; + setBlobUploadedAt: (options: { + id: string; + peer: string; + blobId: string; + uploadedAt: number | null; + }) => Promise; clearClocks: (options: { id: string }) => Promise; } diff --git a/packages/frontend/apps/ios/src/plugins/nbstore/index.ts b/packages/frontend/apps/ios/src/plugins/nbstore/index.ts index 23ff58b55c..4491240967 100644 --- a/packages/frontend/apps/ios/src/plugins/nbstore/index.ts +++ b/packages/frontend/apps/ios/src/plugins/nbstore/index.ts @@ -311,4 +311,29 @@ export const NbStoreNativeDBApis: NativeDBApis = { id, }); }, + getBlobUploadedAt: async function ( + id: string, + peer: string, + blobId: string + ): Promise { + const result = await NbStore.getBlobUploadedAt({ + id, + peer, + blobId, + }); + return result.uploadedAt ? new Date(result.uploadedAt) : null; + }, + setBlobUploadedAt: async function ( + id: string, + peer: string, + blobId: string, + uploadedAt: Date | null + ): Promise { + await NbStore.setBlobUploadedAt({ + id, + peer, + blobId, + uploadedAt: uploadedAt ? uploadedAt.getTime() : null, + }); + }, }; diff --git a/packages/frontend/core/src/bootstrap/polyfill/browser.ts b/packages/frontend/core/src/bootstrap/polyfill/browser.ts index 24ffb099f0..b2d966ff9f 100644 --- a/packages/frontend/core/src/bootstrap/polyfill/browser.ts +++ b/packages/frontend/core/src/bootstrap/polyfill/browser.ts @@ -3,6 +3,7 @@ import './array-to-spliced'; import './dispose'; import './iterator-helpers'; import './promise-with-resolvers'; +import './set-union'; import { polyfillEventLoop } from './request-idle-callback'; import { polyfillResizeObserver } from './resize-observer'; diff --git a/packages/frontend/core/src/bootstrap/polyfill/set-union.ts b/packages/frontend/core/src/bootstrap/polyfill/set-union.ts new file mode 100644 index 0000000000..4d167c81c4 --- /dev/null +++ b/packages/frontend/core/src/bootstrap/polyfill/set-union.ts @@ -0,0 +1 @@ +import 'core-js/es/set/union.js'; diff --git a/packages/frontend/core/src/components/over-capacity/index.tsx b/packages/frontend/core/src/components/over-capacity/index.tsx index 72feef5bd4..96c0ffcf7e 100644 --- a/packages/frontend/core/src/components/over-capacity/index.tsx +++ b/packages/frontend/core/src/components/over-capacity/index.tsx @@ -33,8 +33,8 @@ export const OverCapacityNotification = () => { useEffect(() => { const disposableOverCapacity = currentWorkspace.engine.blob.state$.subscribe( - debounce(({ isStorageOverCapacity }: BlobSyncState) => { - const isOver = isStorageOverCapacity; + debounce(({ overCapacity }: BlobSyncState) => { + const isOver = overCapacity; if (!isOver) { return; } diff --git a/packages/frontend/core/src/desktop/dialogs/setting/workspace-setting/storage/export.tsx b/packages/frontend/core/src/desktop/dialogs/setting/workspace-setting/storage/export.tsx index f47ffb2c09..fea3962fc6 100644 --- a/packages/frontend/core/src/desktop/dialogs/setting/workspace-setting/storage/export.tsx +++ b/packages/frontend/core/src/desktop/dialogs/setting/workspace-setting/storage/export.tsx @@ -8,8 +8,8 @@ import type { Workspace } from '@affine/core/modules/workspace'; import { useI18n } from '@affine/i18n'; import { universalId } from '@affine/nbstore'; import track from '@affine/track'; -import { LiveData, useLiveData, useService } from '@toeverything/infra'; -import { useMemo, useState } from 'react'; +import { useService } from '@toeverything/infra'; +import { useState } from 'react'; interface ExportPanelProps { workspace: Workspace; @@ -22,39 +22,18 @@ export const DesktopExportPanel = ({ workspace }: ExportPanelProps) => { const desktopApi = useService(DesktopApiService); const isLocalWorkspace = workspace.flavour === 'local'; - const docSyncState = useLiveData( - useMemo(() => { - return workspace - ? LiveData.from(workspace.engine.doc.state$, null).throttleTime(500) - : null; - }, [workspace]) - ); - - const blobSyncState = useLiveData( - useMemo(() => { - return workspace - ? LiveData.from(workspace.engine.blob.state$, null).throttleTime(500) - : null; - }, [workspace]) - ); - - const docSynced = !docSyncState?.syncing; - const blobSynced = - !blobSyncState || blobSyncState.synced === blobSyncState.total; + const [fullSyncing, setFullSyncing] = useState(false); const [fullSynced, setFullSynced] = useState(false); - const shouldWaitForFullSync = - isLocalWorkspace || !isOnline || (fullSynced && docSynced && blobSynced); - const fullSyncing = fullSynced && (!docSynced || !blobSynced); + const shouldWaitForFullSync = !isLocalWorkspace && isOnline && !fullSynced; const fullSync = useAsyncCallback(async () => { - // NOTE: doc full sync is always started by default - // await workspace.engine.doc.waitForSynced(); - workspace.engine.blob.fullDownload().catch(() => { - /* noop */ - }); + setFullSyncing(true); + await workspace.engine.blob.fullDownload(); + await workspace.engine.doc.waitForSynced(); setFullSynced(true); - }, [workspace.engine.blob]); + setFullSyncing(false); + }, [workspace.engine.blob, workspace.engine.doc]); const onExport = useAsyncCallback(async () => { if (saving) { @@ -86,7 +65,7 @@ export const DesktopExportPanel = ({ workspace }: ExportPanelProps) => { } }, [desktopApi, saving, t, workspace]); - if (!shouldWaitForFullSync) { + if (shouldWaitForFullSync) { return (