feat(nbstore): add blob sync storage (#10752)

This commit is contained in:
EYHN
2025-03-14 18:05:54 +08:00
committed by GitHub
parent a2eb3fe1b2
commit 05200ad7b7
56 changed files with 1441 additions and 404 deletions

View File

@@ -1,7 +1,10 @@
import type { ErrorDataUnion, ErrorNames } from '@affine/graphql';
import { GraphQLError as BaseGraphQLError } from 'graphql';
export type ErrorName = keyof typeof ErrorNames | 'NETWORK_ERROR';
export type ErrorName =
| keyof typeof ErrorNames
| 'NETWORK_ERROR'
| 'CONTENT_TOO_LARGE';
export interface UserFriendlyErrorResponse {
status: number;

View File

@@ -27,6 +27,7 @@
"yjs": "^13.6.21"
},
"devDependencies": {
"@affine/error": "workspace:*",
"@affine/graphql": "workspace:*",
"fake-indexeddb": "^6.0.0",
"idb": "^8.0.0",
@@ -34,6 +35,7 @@
"vitest": "3.0.8"
},
"peerDependencies": {
"@affine/error": "workspace:*",
"@affine/graphql": "workspace:*",
"idb": "^8.0.0",
"socket.io-client": "^4.7.5"

View File

@@ -5,6 +5,7 @@ import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import {
IndexedDBBlobStorage,
IndexedDBBlobSyncStorage,
IndexedDBDocStorage,
IndexedDBDocSyncStorage,
} from '../impls/idb';
@@ -138,8 +139,15 @@ test('blob', async () => {
type: 'workspace',
});
const blobSync = new IndexedDBBlobSyncStorage({
id: 'ws1',
flavour: 'a',
type: 'workspace',
});
const peerA = new SpaceStorage({
blob: a,
blobSync,
});
const peerB = new SpaceStorage({
blob: b,

View File

@@ -1,39 +1,68 @@
import type { BlobRecord, BlobStorage } from '../storage';
import { SingletonLocker } from '../storage/lock';
import type { BlobSync } from '../sync/blob';
export class BlobFrontend {
// Since 'set' and 'get' operations may be called in rapid succession, we use a lock mechanism
// to ensure that 'get' requests for the same blob are paused when a 'set' operation is in progress.
private readonly lock = new SingletonLocker();
constructor(
public readonly storage: BlobStorage,
readonly storage: BlobStorage,
private readonly sync: BlobSync
) {}
get(blobId: string) {
return this.sync.downloadBlob(blobId);
get state$() {
return this.sync.state$;
}
set(blob: BlobRecord) {
return this.sync.uploadBlob(blob);
async get(blobId: string) {
await using lock = await this.lock.lock('blob', blobId);
const local = await this.storage.get(blobId);
if (local) {
return local;
}
await lock[Symbol.asyncDispose]();
await this.sync.downloadBlob(blobId);
return await this.storage.get(blobId);
}
fullDownload() {
return this.sync.fullDownload();
async set(blob: BlobRecord) {
if (blob.data.byteLength > this.maxBlobSize) {
for (const cb of this.onReachedMaxBlobSizeCallbacks) {
cb(blob.data.byteLength);
}
throw new Error('Blob size exceeds the maximum limit');
}
await using lock = await this.lock.lock('blob', blob.key);
await this.storage.set(blob);
await lock[Symbol.asyncDispose]();
// We don't wait for the upload to complete,
// as the upload process runs asynchronously in the background
this.sync.uploadBlob(blob).catch(err => {
// never reach here
console.error(err);
});
return;
}
fullUpload() {
return this.sync.fullUpload();
fullDownload(peerId?: string, signal?: AbortSignal) {
return this.sync.fullDownload(peerId, signal);
}
addPriority(_id: string, _priority: number) {
// not support yet
}
readonly state$ = this.sync.state$;
private maxBlobSize = 1024 * 1024 * 100; // 100MB
private readonly onReachedMaxBlobSizeCallbacks: Set<
(byteSize: number) => void
> = new Set();
setMaxBlobSize(max: number) {
this.sync.setMaxBlobSize(max);
this.maxBlobSize = max;
}
onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void {
return this.sync.onReachedMaxBlobSize(cb);
this.onReachedMaxBlobSizeCallbacks.add(cb);
return () => this.onReachedMaxBlobSizeCallbacks.delete(cb);
}
}

View File

@@ -1,11 +1,18 @@
import { UserFriendlyError } from '@affine/error';
import {
deleteBlobMutation,
listBlobsQuery,
releaseDeletedBlobsMutation,
setBlobMutation,
workspaceQuotaQuery,
} from '@affine/graphql';
import { type BlobRecord, BlobStorageBase } from '../../storage';
import {
type BlobRecord,
BlobStorageBase,
OverCapacityError,
OverSizeError,
} from '../../storage';
import { HttpConnection } from './http';
interface CloudBlobStorageOptions {
@@ -15,6 +22,7 @@ interface CloudBlobStorageOptions {
export class CloudBlobStorage extends BlobStorageBase {
static readonly identifier = 'CloudBlobStorage';
override readonly isReadonly = false;
constructor(private readonly options: CloudBlobStorageOptions) {
super();
@@ -22,7 +30,7 @@ export class CloudBlobStorage extends BlobStorageBase {
readonly connection = new HttpConnection(this.options.serverBaseUrl);
override async get(key: string) {
override async get(key: string, signal?: AbortSignal) {
const res = await this.connection.fetch(
'/api/workspaces/' + this.options.id + '/blobs/' + key,
{
@@ -30,6 +38,7 @@ export class CloudBlobStorage extends BlobStorageBase {
headers: {
'x-affine-version': BUILD_CONFIG.appVersion,
},
signal,
}
);
@@ -52,14 +61,32 @@ export class CloudBlobStorage extends BlobStorageBase {
}
}
override async set(blob: BlobRecord) {
await this.connection.gql({
query: setBlobMutation,
variables: {
workspaceId: this.options.id,
blob: new File([blob.data], blob.key, { type: blob.mime }),
},
});
override async set(blob: BlobRecord, signal?: AbortSignal) {
try {
const blobSizeLimit = await this.getBlobSizeLimit();
if (blob.data.byteLength > blobSizeLimit) {
throw new OverSizeError();
}
await this.connection.gql({
query: setBlobMutation,
variables: {
workspaceId: this.options.id,
blob: new File([blob.data], blob.key, { type: blob.mime }),
},
context: {
signal,
},
});
} catch (err) {
const userFriendlyError = UserFriendlyError.fromAny(err);
if (userFriendlyError.is('BLOB_QUOTA_EXCEEDED')) {
throw new OverCapacityError();
}
if (userFriendlyError.is('CONTENT_TOO_LARGE')) {
throw new OverSizeError();
}
throw err;
}
}
override async delete(key: string, permanently: boolean) {
@@ -87,4 +114,28 @@ export class CloudBlobStorage extends BlobStorageBase {
createdAt: new Date(blob.createdAt),
}));
}
private blobSizeLimitCache: number | null = null;
private blobSizeLimitCacheTime = 0;
private async getBlobSizeLimit() {
// If cache time is less than 120 seconds, return the cached value directly
if (
this.blobSizeLimitCache !== null &&
Date.now() - this.blobSizeLimitCacheTime < 120 * 1000
) {
return this.blobSizeLimitCache;
}
try {
const res = await this.connection.gql({
query: workspaceQuotaQuery,
variables: { id: this.options.id },
});
this.blobSizeLimitCache = res.workspace.quota.blobLimit;
this.blobSizeLimitCacheTime = Date.now();
return this.blobSizeLimitCache;
} catch (err) {
throw UserFriendlyError.fromAny(err);
}
}
}

View File

@@ -1,3 +1,4 @@
import { UserFriendlyError } from '@affine/error';
import { gqlFetcherFactory } from '@affine/graphql';
import { DummyConnection } from '../../connection';
@@ -29,19 +30,32 @@ export class HttpConnection extends DummyConnection {
},
})
.catch(err => {
throw new Error('fetch error: ' + err);
throw new UserFriendlyError({
status: 504,
code: 'NETWORK_ERROR',
type: 'NETWORK_ERROR',
name: 'NETWORK_ERROR',
message: `Network error: ${err.message}`,
stacktrace: err.stack,
});
});
clearTimeout(timeoutId);
if (!res.ok && res.status !== 404) {
let reason: string | any = '';
if (res.headers.get('Content-Type')?.includes('application/json')) {
try {
reason = JSON.stringify(await res.json());
} catch {
// ignore
}
if (res.status === 413) {
throw new UserFriendlyError({
status: 413,
code: 'CONTENT_TOO_LARGE',
type: 'CONTENT_TOO_LARGE',
name: 'CONTENT_TOO_LARGE',
message: 'Content too large',
});
} else if (
res.headers.get('Content-Type')?.startsWith('application/json')
) {
throw UserFriendlyError.fromAny(await res.json());
} else {
throw UserFriendlyError.fromAny(await res.text());
}
throw new Error('fetch error status: ' + res.status + ' ' + reason);
}
return res;
};

View File

@@ -0,0 +1,36 @@
import { share } from '../../connection';
import { BlobSyncStorageBase } from '../../storage';
import { IDBConnection, type IDBConnectionOptions } from './db';
export class IndexedDBBlobSyncStorage extends BlobSyncStorageBase {
static readonly identifier = 'IndexedDBBlobSyncStorage';
readonly connection = share(new IDBConnection(this.options));
constructor(private readonly options: IDBConnectionOptions) {
super();
}
get db() {
return this.connection;
}
async setBlobUploadedAt(
peer: string,
blobId: string,
uploadedAt: Date | null
): Promise<void> {
const trx = this.db.inner.db.transaction('blobSync', 'readwrite');
await trx.store.put({
peer,
key: blobId,
uploadedAt,
});
}
async getBlobUploadedAt(peer: string, blobId: string): Promise<Date | null> {
const trx = this.db.inner.db.transaction('blobSync', 'readonly');
const record = await trx.store.get([peer, blobId]);
return record?.uploadedAt ?? null;
}
}

View File

@@ -8,6 +8,7 @@ import { IDBConnection, type IDBConnectionOptions } from './db';
export class IndexedDBBlobStorage extends BlobStorageBase {
static readonly identifier = 'IndexedDBBlobStorage';
override readonly isReadonly = false;
readonly connection = share(new IDBConnection(this.options));

View File

@@ -1,9 +1,11 @@
import type { StorageConstructor } from '..';
import { IndexedDBBlobStorage } from './blob';
import { IndexedDBBlobSyncStorage } from './blob-sync';
import { IndexedDBDocStorage } from './doc';
import { IndexedDBDocSyncStorage } from './doc-sync';
export * from './blob';
export * from './blob-sync';
export * from './doc';
export * from './doc-sync';
@@ -11,4 +13,5 @@ export const idbStorages = [
IndexedDBDocStorage,
IndexedDBBlobStorage,
IndexedDBDocSyncStorage,
IndexedDBBlobSyncStorage,
] satisfies StorageConstructor[];

View File

@@ -36,6 +36,11 @@ Table(PeerClocks)
| peer | docId | clock | pushed |
|------|-------|-----------|-----------|
| str | str | Date | Date |
Table(BlobSync)
| peer | key | uploadedAt |
|------|-----|------------|
| str | str | Date |
*/
export interface DocStorageSchema extends DBSchema {
snapshots: {
@@ -81,6 +86,17 @@ export interface DocStorageSchema extends DBSchema {
deletedAt: Date | null;
};
};
blobSync: {
key: [string, string];
value: {
peer: string;
key: string;
uploadedAt: Date | null;
};
indexes: {
peer: string;
};
};
blobData: {
key: string;
value: {
@@ -175,11 +191,19 @@ const init: Migrate = db => {
autoIncrement: false,
});
};
const initBlobSync: Migrate = db => {
const blobSync = db.createObjectStore('blobSync', {
keyPath: ['peer', 'key'],
autoIncrement: false,
});
blobSync.createIndex('peer', 'peer', { unique: false });
};
// END REGION
// 1. all schema changed should be put in migrations
// 2. order matters
const migrations: Migrate[] = [init];
const migrations: Migrate[] = [init, initBlobSync];
export const migrator = {
version: migrations.length,

View File

@@ -7,6 +7,7 @@ import { BlobIDBConnection, type BlobIDBConnectionOptions } from './db';
*/
export class IndexedDBV1BlobStorage extends BlobStorageBase {
static readonly identifier = 'IndexedDBV1BlobStorage';
override readonly isReadonly = true;
constructor(private readonly options: BlobIDBConnectionOptions) {
super();

View File

@@ -0,0 +1,32 @@
import { share } from '../../connection';
import { BlobSyncStorageBase } from '../../storage';
import { NativeDBConnection, type SqliteNativeDBOptions } from './db';
export class SqliteBlobSyncStorage extends BlobSyncStorageBase {
static readonly identifier = 'SqliteBlobSyncStorage';
override connection = share(new NativeDBConnection(this.options));
constructor(private readonly options: SqliteNativeDBOptions) {
super();
}
get db() {
return this.connection.apis;
}
override async setBlobUploadedAt(
peer: string,
blobId: string,
uploadedAt: Date | null
): Promise<void> {
await this.db.setBlobUploadedAt(peer, blobId, uploadedAt);
}
override async getBlobUploadedAt(
peer: string,
blobId: string
): Promise<Date | null> {
return this.db.getBlobUploadedAt(peer, blobId);
}
}

View File

@@ -4,6 +4,7 @@ import { NativeDBConnection, type SqliteNativeDBOptions } from './db';
export class SqliteBlobStorage extends BlobStorageBase {
static readonly identifier = 'SqliteBlobStorage';
override readonly isReadonly = false;
override connection = share(new NativeDBConnection(this.options));

View File

@@ -13,67 +13,75 @@ export interface SqliteNativeDBOptions {
readonly id: string;
}
export type NativeDBApis = {
connect(id: string): Promise<void>;
disconnect(id: string): Promise<void>;
pushUpdate(id: string, docId: string, update: Uint8Array): Promise<Date>;
getDocSnapshot(id: string, docId: string): Promise<DocRecord | null>;
setDocSnapshot(id: string, snapshot: DocRecord): Promise<boolean>;
getDocUpdates(id: string, docId: string): Promise<DocRecord[]>;
markUpdatesMerged(
export interface NativeDBApis {
connect: (id: string) => Promise<void>;
disconnect: (id: string) => Promise<void>;
pushUpdate: (id: string, docId: string, update: Uint8Array) => Promise<Date>;
getDocSnapshot: (id: string, docId: string) => Promise<DocRecord | null>;
setDocSnapshot: (id: string, snapshot: DocRecord) => Promise<boolean>;
getDocUpdates: (id: string, docId: string) => Promise<DocRecord[]>;
markUpdatesMerged: (
id: string,
docId: string,
updates: Date[]
): Promise<number>;
deleteDoc(id: string, docId: string): Promise<void>;
getDocClocks(
id: string,
after?: Date | undefined | null
): Promise<DocClock[]>;
getDocClock(id: string, docId: string): Promise<DocClock | null>;
getBlob(id: string, key: string): Promise<BlobRecord | null>;
setBlob(id: string, blob: BlobRecord): Promise<void>;
deleteBlob(id: string, key: string, permanently: boolean): Promise<void>;
releaseBlobs(id: string): Promise<void>;
listBlobs(id: string): Promise<ListedBlobRecord[]>;
getPeerRemoteClocks(id: string, peer: string): Promise<DocClock[]>;
getPeerRemoteClock(
) => Promise<number>;
deleteDoc: (id: string, docId: string) => Promise<void>;
getDocClocks: (id: string, after?: Date | null) => Promise<DocClock[]>;
getDocClock: (id: string, docId: string) => Promise<DocClock | null>;
getBlob: (id: string, key: string) => Promise<BlobRecord | null>;
setBlob: (id: string, blob: BlobRecord) => Promise<void>;
deleteBlob: (id: string, key: string, permanently: boolean) => Promise<void>;
releaseBlobs: (id: string) => Promise<void>;
listBlobs: (id: string) => Promise<ListedBlobRecord[]>;
getPeerRemoteClocks: (id: string, peer: string) => Promise<DocClock[]>;
getPeerRemoteClock: (
id: string,
peer: string,
docId: string
): Promise<DocClock | null>;
setPeerRemoteClock(
) => Promise<DocClock | null>;
setPeerRemoteClock: (
id: string,
peer: string,
docId: string,
clock: Date
): Promise<void>;
getPeerPulledRemoteClocks(id: string, peer: string): Promise<DocClock[]>;
getPeerPulledRemoteClock(
) => Promise<void>;
getPeerPulledRemoteClocks: (id: string, peer: string) => Promise<DocClock[]>;
getPeerPulledRemoteClock: (
id: string,
peer: string,
docId: string
): Promise<DocClock | null>;
setPeerPulledRemoteClock(
) => Promise<DocClock | null>;
setPeerPulledRemoteClock: (
id: string,
peer: string,
docId: string,
clock: Date
): Promise<void>;
getPeerPushedClocks(id: string, peer: string): Promise<DocClock[]>;
getPeerPushedClock(
) => Promise<void>;
getPeerPushedClocks: (id: string, peer: string) => Promise<DocClock[]>;
getPeerPushedClock: (
id: string,
peer: string,
docId: string
): Promise<DocClock | null>;
setPeerPushedClock(
) => Promise<DocClock | null>;
setPeerPushedClock: (
id: string,
peer: string,
docId: string,
clock: Date
): Promise<void>;
clearClocks(id: string): Promise<void>;
};
) => Promise<void>;
clearClocks: (id: string) => Promise<void>;
setBlobUploadedAt: (
id: string,
peer: string,
blobId: string,
uploadedAt: Date | null
) => Promise<void>;
getBlobUploadedAt: (
id: string,
peer: string,
blobId: string
) => Promise<Date | null>;
}
type NativeDBApisWrapper = NativeDBApis extends infer APIs
? {

View File

@@ -1,9 +1,11 @@
import type { StorageConstructor } from '..';
import { SqliteBlobStorage } from './blob';
import { SqliteBlobSyncStorage } from './blob-sync';
import { SqliteDocStorage } from './doc';
import { SqliteDocSyncStorage } from './doc-sync';
export * from './blob';
export * from './blob-sync';
export { bindNativeDBApis, type NativeDBApis } from './db';
export * from './doc';
export * from './doc-sync';
@@ -12,4 +14,5 @@ export const sqliteStorages = [
SqliteDocStorage,
SqliteBlobStorage,
SqliteDocSyncStorage,
SqliteBlobSyncStorage,
] satisfies StorageConstructor[];

View File

@@ -9,6 +9,7 @@ import { apis } from './db';
export class SqliteV1BlobStorage extends BlobStorageBase {
static identifier = 'SqliteV1BlobStorage';
override connection = new DummyConnection();
override readonly isReadonly = true;
constructor(private readonly options: { type: SpaceType; id: string }) {
super();

View File

@@ -0,0 +1,30 @@
import type { Connection } from '../connection';
import type { Storage } from './storage';
export interface BlobSyncStorage extends Storage {
readonly storageType: 'blobSync';
setBlobUploadedAt(
peer: string,
blobId: string,
uploadedAt: Date | null
): Promise<void>;
getBlobUploadedAt(peer: string, blobId: string): Promise<Date | null>;
}
export abstract class BlobSyncStorageBase implements BlobSyncStorage {
readonly storageType = 'blobSync';
abstract readonly connection: Connection;
abstract setBlobUploadedAt(
peer: string,
blobId: string,
uploadedAt: Date | null
): Promise<void>;
abstract getBlobUploadedAt(
peer: string,
blobId: string
): Promise<Date | null>;
}

View File

@@ -17,6 +17,7 @@ export interface ListedBlobRecord {
export interface BlobStorage extends Storage {
readonly storageType: 'blob';
readonly isReadonly: boolean;
get(key: string, signal?: AbortSignal): Promise<BlobRecord | null>;
set(blob: BlobRecord, signal?: AbortSignal): Promise<void>;
delete(
@@ -31,7 +32,7 @@ export interface BlobStorage extends Storage {
export abstract class BlobStorageBase implements BlobStorage {
readonly storageType = 'blob';
abstract readonly connection: Connection;
abstract readonly isReadonly: boolean;
abstract get(key: string, signal?: AbortSignal): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord, signal?: AbortSignal): Promise<void>;
abstract delete(

View File

@@ -0,0 +1,14 @@
import { type Connection, DummyConnection } from '../../connection';
import type { BlobSyncStorage } from '../blob-sync';
export class DummyBlobSyncStorage implements BlobSyncStorage {
storageType = 'blobSync' as const;
connection: Connection<any> = new DummyConnection();
setBlobUploadedAt(): Promise<void> {
return Promise.resolve();
}
getBlobUploadedAt(): Promise<Date | null> {
return Promise.resolve(new Date());
}
}

View File

@@ -6,6 +6,7 @@ import {
} from '../blob';
export class DummyBlobStorage extends BlobStorageBase {
override readonly isReadonly = true;
override get(
_key: string,
_signal?: AbortSignal

View File

@@ -1 +1,2 @@
export * from './over-capacity';
export * from './over-size';

View File

@@ -1,5 +1,5 @@
export class OverCapacityError extends Error {
constructor(public originError?: any) {
super('Storage over capacity. Origin error: ' + originError);
super('Storage over capacity.');
}
}

View File

@@ -0,0 +1,5 @@
export class OverSizeError extends Error {
constructor(public originError?: any) {
super('Blob size exceeds the limit.');
}
}

View File

@@ -2,15 +2,22 @@ import EventEmitter2 from 'eventemitter2';
import type { AwarenessStorage } from './awareness';
import type { BlobStorage } from './blob';
import type { BlobSyncStorage } from './blob-sync';
import type { DocStorage } from './doc';
import type { DocSyncStorage } from './doc-sync';
import { DummyAwarenessStorage } from './dummy/awareness';
import { DummyBlobStorage } from './dummy/blob';
import { DummyBlobSyncStorage } from './dummy/blob-sync';
import { DummyDocStorage } from './dummy/doc';
import { DummyDocSyncStorage } from './dummy/doc-sync';
import type { StorageType } from './storage';
type Storages = DocStorage | BlobStorage | DocSyncStorage | AwarenessStorage;
type Storages =
| DocStorage
| BlobStorage
| BlobSyncStorage
| DocSyncStorage
| AwarenessStorage;
export type SpaceStorageOptions = {
[K in StorageType]?: Storages & { storageType: K };
@@ -27,8 +34,9 @@ export class SpaceStorage {
this.storages = {
awareness: storages.awareness ?? new DummyAwarenessStorage(),
blob: storages.blob ?? new DummyBlobStorage(),
blobSync: storages.blobSync ?? new DummyBlobSyncStorage(),
doc: storages.doc ?? new DummyDocStorage(),
['docSync']: storages['docSync'] ?? new DummyDocSyncStorage(),
docSync: storages.docSync ?? new DummyDocSyncStorage(),
};
}
@@ -70,6 +78,7 @@ export class SpaceStorage {
export * from './awareness';
export * from './blob';
export * from './blob-sync';
export * from './doc';
export * from './doc-sync';
export * from './errors';

View File

@@ -1,6 +1,6 @@
import type { Connection } from '../connection';
export type StorageType = 'blob' | 'doc' | 'docSync' | 'awareness';
export type StorageType = 'blob' | 'blobSync' | 'doc' | 'docSync' | 'awareness';
export interface Storage {
readonly storageType: StorageType;

View File

@@ -1,213 +1,196 @@
import EventEmitter2 from 'eventemitter2';
import { difference } from 'lodash-es';
import { BehaviorSubject, type Observable } from 'rxjs';
import {
combineLatest,
map,
type Observable,
ReplaySubject,
share,
throttleTime,
} from 'rxjs';
import type { BlobRecord, BlobStorage } from '../../storage';
import { OverCapacityError } from '../../storage';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
import type { BlobRecord, BlobStorage, BlobSyncStorage } from '../../storage';
import { MANUALLY_STOP } from '../../utils/throw-if-aborted';
import type { PeerStorageOptions } from '../types';
import { BlobSyncPeer } from './peer';
export interface BlobSyncState {
isStorageOverCapacity: boolean;
total: number;
synced: number;
uploading: number;
downloading: number;
error: number;
overCapacity: boolean;
}
export interface BlobSyncBlobState {
uploading: boolean;
downloading: boolean;
errorMessage?: string | null;
overSize: boolean;
}
export interface BlobSync {
readonly state$: Observable<BlobSyncState>;
downloadBlob(
blobId: string,
signal?: AbortSignal
): Promise<BlobRecord | null>;
uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise<void>;
fullDownload(signal?: AbortSignal): Promise<void>;
fullUpload(signal?: AbortSignal): Promise<void>;
setMaxBlobSize(size: number): void;
onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void;
blobState$(blobId: string): Observable<BlobSyncBlobState>;
downloadBlob(blobId: string): Promise<void>;
uploadBlob(blob: BlobRecord): Promise<void>;
/**
* Download all blobs from a peer
* @param peerId - The peer id to download from, if not provided, all peers will be downloaded
* @param signal - The abort signal
* @returns A promise that resolves when the download is complete
*/
fullDownload(peerId?: string, signal?: AbortSignal): Promise<void>;
}
export class BlobSyncImpl implements BlobSync {
readonly state$ = new BehaviorSubject<BlobSyncState>({
isStorageOverCapacity: false,
total: Object.values(this.storages.remotes).length ? 1 : 0,
synced: 0,
});
private abort: AbortController | null = null;
private maxBlobSize: number = 1024 * 1024 * 100; // 100MB
readonly event = new EventEmitter2();
// abort all pending jobs when the sync is destroyed
private abortController = new AbortController();
private started = false;
private readonly peers: BlobSyncPeer[] = Object.entries(
this.storages.remotes
).map(
([peerId, remote]) =>
new BlobSyncPeer(peerId, this.storages.local, remote, this.blobSync)
);
constructor(readonly storages: PeerStorageOptions<BlobStorage>) {}
async downloadBlob(blobId: string, signal?: AbortSignal) {
try {
const localBlob = await this.storages.local.get(blobId, signal);
if (localBlob) {
return localBlob;
}
for (const storage of Object.values(this.storages.remotes)) {
const data = await storage.get(blobId, signal);
if (data) {
await this.storages.local.set(data, signal);
return data;
}
}
return null;
} catch (e) {
console.error('error when download blob', e);
return null;
}
}
async uploadBlob(blob: BlobRecord, signal?: AbortSignal) {
if (blob.data.length > this.maxBlobSize) {
this.event.emit('abort-large-blob', blob.data.length);
console.error('blob over limit, abort set');
}
await this.storages.local.set(blob);
await Promise.allSettled(
Object.values(this.storages.remotes).map(async remote => {
try {
return await remote.set(blob, signal);
} catch (err) {
if (err instanceof OverCapacityError) {
this.state$.next({
isStorageOverCapacity: true,
total: this.state$.value.total,
synced: this.state$.value.synced,
});
readonly state$ = combineLatest(this.peers.map(peer => peer.peerState$)).pipe(
// throttle the state to 1 second to avoid spamming the UI
throttleTime(1000),
map(allPeers =>
allPeers.length === 0
? {
uploading: 0,
downloading: 0,
error: 0,
overCapacity: false,
}
throw err;
}
: {
uploading: allPeers.reduce((acc, peer) => acc + peer.uploading, 0),
downloading: allPeers.reduce(
(acc, peer) => acc + peer.downloading,
0
),
error: allPeers.reduce((acc, peer) => acc + peer.error, 0),
overCapacity: allPeers.some(p => p.overCapacity),
}
),
share({
connector: () => new ReplaySubject(1),
})
) as Observable<BlobSyncState>;
blobState$(blobId: string) {
return combineLatest(
this.peers.map(peer => peer.blobPeerState$(blobId))
).pipe(
throttleTime(1000),
map(
peers =>
({
uploading: peers.some(p => p.uploading),
downloading: peers.some(p => p.downloading),
errorMessage: peers.find(p => p.errorMessage)?.errorMessage,
overSize: peers.some(p => p.overSize),
}) satisfies BlobSyncBlobState
),
share({
connector: () => new ReplaySubject(1),
})
);
}
async fullDownload(signal?: AbortSignal) {
throwIfAborted(signal);
constructor(
readonly storages: PeerStorageOptions<BlobStorage>,
readonly blobSync: BlobSyncStorage
) {}
await this.storages.local.connection.waitForConnected(signal);
const localList = (await this.storages.local.list(signal)).map(b => b.key);
this.state$.next({
...this.state$.value,
synced: localList.length,
});
await Promise.allSettled(
Object.entries(this.storages.remotes).map(
async ([remotePeer, remote]) => {
await remote.connection.waitForConnected(signal);
const remoteList = (await remote.list(signal)).map(b => b.key);
this.state$.next({
...this.state$.value,
total: Math.max(this.state$.value.total, remoteList.length),
});
throwIfAborted(signal);
const needDownload = difference(remoteList, localList);
for (const key of needDownload) {
try {
const data = await remote.get(key, signal);
throwIfAborted(signal);
if (data) {
await this.storages.local.set(data, signal);
this.state$.next({
...this.state$.value,
synced: this.state$.value.synced + 1,
});
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [${remotePeer}] to [local]`,
err
);
}
}
}
)
);
}
async fullUpload(signal?: AbortSignal) {
throwIfAborted(signal);
await this.storages.local.connection.waitForConnected(signal);
const localList = (await this.storages.local.list(signal)).map(b => b.key);
await Promise.allSettled(
Object.entries(this.storages.remotes).map(
async ([remotePeer, remote]) => {
await remote.connection.waitForConnected(signal);
const remoteList = (await remote.list(signal)).map(b => b.key);
throwIfAborted(signal);
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.storages.local.get(key, signal);
throwIfAborted(signal);
if (data) {
await remote.set(data, signal);
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [local] to [${remotePeer}]`,
err
);
}
}
}
)
);
}
start() {
if (this.abort) {
this.abort.abort(MANUALLY_STOP);
}
const abort = new AbortController();
this.abort = abort;
this.fullUpload(abort.signal).catch(error => {
if (error === MANUALLY_STOP) {
downloadBlob(blobId: string) {
const signal = this.abortController.signal;
return Promise.race(
this.peers.map(p => p.downloadBlob(blobId, signal))
).catch(err => {
if (err === MANUALLY_STOP) {
return;
}
console.error('sync blob error', error);
// should never reach here, `downloadBlob()` should never throw
console.error(err);
});
}
uploadBlob(blob: BlobRecord) {
return Promise.all(
this.peers.map(p => p.uploadBlob(blob, this.abortController.signal))
).catch(err => {
if (err === MANUALLY_STOP) {
return;
}
// should never reach here, `uploadBlob()` should never throw
console.error(err);
}) as Promise<void>;
}
// start the upload loop
start() {
if (this.started) {
return;
}
this.started = true;
const signal = this.abortController.signal;
Promise.allSettled(this.peers.map(p => p.fullUploadLoop(signal))).catch(
err => {
// should never reach here
console.error(err);
}
);
}
// download all blobs from a peer
async fullDownload(
peerId?: string,
outerSignal?: AbortSignal
): Promise<void> {
return Promise.race([
Promise.all(
peerId
? [this.fullDownloadPeer(peerId)]
: this.peers.map(p => this.fullDownloadPeer(p.peerId))
),
new Promise<void>((_, reject) => {
// Reject the promise if the outer signal is aborted
// The outer signal only controls the API promise, not the actual download process
if (outerSignal?.aborted) {
reject(outerSignal.reason);
}
outerSignal?.addEventListener('abort', reason => {
reject(reason);
});
}),
]) as Promise<void>;
}
// cache the download promise for each peer
// this is used to avoid downloading the same peer multiple times
private readonly fullDownloadPromise = new Map<string, Promise<void>>();
private fullDownloadPeer(peerId: string) {
const peer = this.peers.find(p => p.peerId === peerId);
if (!peer) {
return;
}
const existing = this.fullDownloadPromise.get(peerId);
if (existing) {
return existing;
}
const promise = peer
.fullDownload(this.abortController.signal)
.finally(() => {
this.fullDownloadPromise.delete(peerId);
});
this.fullDownloadPromise.set(peerId, promise);
return promise;
}
stop() {
this.abort?.abort(MANUALLY_STOP);
this.abort = null;
}
addPriority(_id: string, _priority: number): () => void {
// TODO: implement
return () => {};
}
setMaxBlobSize(size: number): void {
this.maxBlobSize = size;
}
onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void {
this.event.on('abort-large-blob', cb);
return () => {
this.event.off('abort-large-blob', cb);
};
this.abortController.abort();
this.abortController = new AbortController();
this.started = false;
}
}

View File

@@ -0,0 +1,463 @@
import { difference } from 'lodash-es';
import { Observable, ReplaySubject, share, Subject } from 'rxjs';
import type { BlobRecord, BlobStorage } from '../../storage';
import { OverCapacityError, OverSizeError } from '../../storage';
import type { BlobSyncStorage } from '../../storage/blob-sync';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
export interface BlobSyncPeerState {
uploading: number;
downloading: number;
error: number;
overCapacity: boolean;
}
export interface BlobSyncPeerBlobState {
uploading: boolean;
downloading: boolean;
overSize: boolean;
errorMessage?: string | null;
}
export class BlobSyncPeer {
private readonly status = new BlobSyncPeerStatus();
get peerState$() {
return this.status.peerState$;
}
blobPeerState$(blobId: string) {
return this.status.blobPeerState$(blobId);
}
constructor(
readonly peerId: string,
readonly local: BlobStorage,
readonly remote: BlobStorage,
readonly blobSync: BlobSyncStorage
) {}
private readonly downloadingPromise = new Map<string, Promise<void>>();
downloadBlob(blobId: string, signal?: AbortSignal): Promise<void> {
// if the blob is already downloading, return the existing promise
const existing = this.downloadingPromise.get(blobId);
if (existing) {
return existing;
}
const backoffRetry = {
delay: 1000,
maxDelay: 10000,
count: this.remote.isReadonly ? 1 : 5, // readonly remote storage will not retry
};
const promise = new Promise<void>((resolve, reject) => {
// mark the blob as downloading
this.status.blobDownloading(blobId);
let attempts = 0;
const attempt = async () => {
try {
throwIfAborted(signal);
const data = await this.remote.get(blobId, signal);
throwIfAborted(signal);
if (data) {
// mark the blob as uploaded to avoid uploading the same blob again
await this.blobSync.setBlobUploadedAt(
this.peerId,
blobId,
new Date()
);
await this.local.set(data, signal);
} else {
// if the blob is not found, maybe the uploader have't uploaded the blob yet, we will retry several times
attempts++;
if (attempts < backoffRetry.count) {
const waitTime = Math.min(
Math.pow(2, attempts - 1) * backoffRetry.delay,
backoffRetry.maxDelay
);
// eslint-disable-next-line @typescript-eslint/no-misused-promises
setTimeout(attempt, waitTime);
}
}
resolve();
} catch (error) {
// if we encounter any error, reject without retry
reject(error);
}
};
// eslint-disable-next-line @typescript-eslint/no-floating-promises
attempt();
})
.catch(error => {
if (error === MANUALLY_STOP) {
throw error;
}
this.status.blobError(
blobId,
error instanceof Error ? error.message : String(error)
);
})
.finally(() => {
this.status.blobDownloadFinish(blobId);
this.downloadingPromise.delete(blobId);
});
this.downloadingPromise.set(blobId, promise);
return promise;
}
uploadingPromise = new Map<string, Promise<void>>();
uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise<void> {
if (this.remote.isReadonly) {
return Promise.resolve();
}
const existing = this.uploadingPromise.get(blob.key);
if (existing) {
return existing;
}
const promise = (async () => {
// mark the blob as uploading
this.status.blobUploading(blob.key);
await this.blobSync.setBlobUploadedAt(this.peerId, blob.key, null);
try {
throwIfAborted(signal);
await this.remote.set(blob, signal);
await this.blobSync.setBlobUploadedAt(
this.peerId,
blob.key,
new Date()
);
// free the remote storage over capacity flag
this.status.remoteOverCapacityFree();
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
if (err instanceof OverCapacityError) {
// mark the remote storage as over capacity, this will stop the upload loop
this.status.remoteOverCapacity();
this.status.blobError(blob.key, 'Remote storage over capacity');
} else if (err instanceof OverSizeError) {
this.status.blobOverSize(blob.key);
this.status.blobError(blob.key, 'Blob size too large');
} else {
this.status.blobError(
blob.key,
err instanceof Error ? err.message : String(err)
);
}
} finally {
this.status.blobUploadFinish(blob.key);
}
})().finally(() => {
this.uploadingPromise.delete(blob.key);
});
this.uploadingPromise.set(blob.key, promise);
return promise;
}
async fullUploadLoop(signal?: AbortSignal) {
while (true) {
try {
await this.fullUpload(signal);
} catch (err) {
if (signal?.aborted) {
return;
}
// should never reach here
console.warn('Blob full upload error, retry in 15s', err);
}
// wait for 15s before next loop
await new Promise<void>(resolve => {
setTimeout(resolve, 15000);
});
if (signal?.aborted) {
return;
}
}
}
private async fullUpload(signal?: AbortSignal) {
if (this.remote.isReadonly) {
return;
}
// if the remote storage is over capacity, skip the upload loop
if (this.status.overCapacity) {
return;
}
await this.local.connection.waitForConnected(signal);
await this.remote.connection.waitForConnected(signal);
const localList = await this.local.list();
const needUpload: string[] = [];
for (const blob of localList) {
const uploadedAt = await this.blobSync.getBlobUploadedAt(
this.peerId,
blob.key
);
if (uploadedAt === null) {
needUpload.push(blob.key);
} else {
// if the blob has uploaded, we clear its error flag here.
// this ensures that the sync status seen by the user is clean.
this.status.blobErrorFree(blob.key);
}
}
if (needUpload.length === 0) {
return;
}
// mark all blobs as will upload
for (const blobKey of needUpload) {
this.status.blobWillUpload(blobKey);
}
try {
if (needUpload.length <= 3) {
// if there is only few blobs to upload, upload them one by one
// upload the blobs
for (const blobKey of needUpload) {
const data = await this.local.get(blobKey);
throwIfAborted(signal);
if (data) {
await this.uploadBlob(data, signal);
}
}
} else {
// if there are many blobs to upload, call remote list to reduce unnecessary uploads
const remoteList = new Set((await this.remote.list()).map(b => b.key));
for (const blobKey of needUpload) {
if (remoteList.has(blobKey)) {
// if the blob is already uploaded, set the blob as uploaded
await this.blobSync.setBlobUploadedAt(
this.peerId,
blobKey,
new Date()
);
// mark the blob as uploaded
this.status.blobUploadFinish(blobKey);
continue;
}
// if the blob is over size, skip it
if (this.status.overSize.has(blobKey)) {
continue;
}
const data = await this.local.get(blobKey);
throwIfAborted(signal);
if (data) {
await this.uploadBlob(data, signal);
}
}
}
} finally {
// remove all will upload flags
for (const blobKey of needUpload) {
this.status.blobWillUploadFinish(blobKey);
}
}
}
async fullDownload(signal?: AbortSignal) {
await this.local.connection.waitForConnected(signal);
await this.remote.connection.waitForConnected(signal);
const localList = (await this.local.list()).map(b => b.key);
const remoteList = (await this.remote.list()).map(b => b.key);
const needDownload = difference(remoteList, localList);
// mark all blobs as will download
for (const blobKey of needDownload) {
this.status.blobWillDownload(blobKey);
}
try {
for (const blobKey of needDownload) {
throwIfAborted(signal);
// download the blobs
await this.downloadBlob(blobKey, signal);
}
} finally {
// remove all will download flags
for (const blobKey of needDownload) {
this.status.blobWillDownloadFinish(blobKey);
}
}
}
async markBlobUploaded(blobKey: string): Promise<void> {
await this.blobSync.setBlobUploadedAt(this.peerId, blobKey, new Date());
}
}
class BlobSyncPeerStatus {
overCapacity = false;
willUpload = new Set<string>();
uploading = new Set<string>();
downloading = new Set<string>();
willDownload = new Set<string>();
error = new Map<string, string>();
overSize = new Set<string>();
peerState$ = new Observable<BlobSyncPeerState>(subscribe => {
const next = () => {
subscribe.next({
uploading: this.willUpload.union(this.uploading).size,
downloading: this.willDownload.union(this.downloading).size,
error: this.error.size,
overCapacity: this.overCapacity,
});
};
next();
const dispose = this.statusUpdatedSubject$.subscribe(() => {
next();
});
return () => {
dispose.unsubscribe();
};
}).pipe(
share({
connector: () => new ReplaySubject(1),
})
);
blobPeerState$(blobId: string) {
return new Observable<BlobSyncPeerBlobState>(subscribe => {
const next = () => {
subscribe.next({
uploading: this.willUpload.has(blobId) || this.uploading.has(blobId),
downloading:
this.willDownload.has(blobId) || this.downloading.has(blobId),
errorMessage: this.error.get(blobId) ?? null,
overSize: this.overSize.has(blobId),
});
};
next();
const dispose = this.statusUpdatedSubject$.subscribe(updatedBlobId => {
if (updatedBlobId === blobId || updatedBlobId === true) {
next();
}
});
return () => {
dispose.unsubscribe();
};
});
}
private readonly statusUpdatedSubject$ = new Subject<string | true>();
blobUploading(blobId: string) {
if (!this.uploading.has(blobId)) {
this.uploading.add(blobId);
this.statusUpdatedSubject$.next(blobId);
}
}
blobUploadFinish(blobId: string) {
let deleted = false;
deleted = this.uploading.delete(blobId) || deleted;
deleted = this.willUpload.delete(blobId) || deleted;
if (deleted) {
this.statusUpdatedSubject$.next(blobId);
}
this.blobErrorFree(blobId);
}
blobWillUpload(blobId: string) {
if (!this.willUpload.has(blobId)) {
this.willUpload.add(blobId);
this.statusUpdatedSubject$.next(blobId);
}
}
blobWillUploadFinish(blobId: string) {
const deleted = this.willUpload.delete(blobId);
if (deleted) {
this.statusUpdatedSubject$.next(blobId);
}
}
blobDownloading(blobId: string) {
if (!this.downloading.has(blobId)) {
this.downloading.add(blobId);
this.statusUpdatedSubject$.next(blobId);
}
}
blobDownloadFinish(blobId: string) {
let deleted = false;
deleted = this.willDownload.delete(blobId) || deleted;
deleted = this.downloading.delete(blobId) || deleted;
if (deleted) {
this.statusUpdatedSubject$.next(blobId);
}
this.blobErrorFree(blobId);
}
blobWillDownload(blobId: string) {
if (!this.willDownload.has(blobId)) {
this.willDownload.add(blobId);
this.statusUpdatedSubject$.next(blobId);
}
}
blobWillDownloadFinish(blobId: string) {
const deleted = this.willDownload.delete(blobId);
if (deleted) {
this.statusUpdatedSubject$.next(blobId);
}
}
blobError(blobId: string, errorMessage: string) {
this.error.set(blobId, errorMessage);
this.statusUpdatedSubject$.next(blobId);
}
remoteOverCapacity() {
if (!this.overCapacity) {
this.overCapacity = true;
this.statusUpdatedSubject$.next(true);
}
}
remoteOverCapacityFree() {
if (this.overCapacity) {
this.overCapacity = false;
this.statusUpdatedSubject$.next(true);
}
}
blobOverSize(blobId: string) {
this.overSize.add(blobId);
this.statusUpdatedSubject$.next(blobId);
}
blobErrorFree(blobId: string) {
let deleted = false;
deleted = this.error.delete(blobId) || deleted;
deleted = this.overSize.delete(blobId) || deleted;
if (deleted) {
this.statusUpdatedSubject$.next(blobId);
}
}
}

View File

@@ -557,10 +557,10 @@ export class DocSyncPeer {
};
this.statusUpdatedSubject$.next(true);
}
// wait for 1s before next retry
// wait for 5s before next retry
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 1000);
setTimeout(resolve, 5000);
}),
new Promise((_, reject) => {
// exit if manually stopped

View File

@@ -24,6 +24,7 @@ export class Sync {
const doc = storages.local.get('doc');
const blob = storages.local.get('blob');
const docSync = storages.local.get('docSync');
const blobSync = storages.local.get('blobSync');
const awareness = storages.local.get('awareness');
this.doc = new DocSyncImpl(
@@ -38,15 +39,18 @@ export class Sync {
},
docSync
);
this.blob = new BlobSyncImpl({
local: blob,
remotes: Object.fromEntries(
Object.entries(storages.remotes).map(([peerId, remote]) => [
peerId,
remote.get('blob'),
])
),
});
this.blob = new BlobSyncImpl(
{
local: blob,
remotes: Object.fromEntries(
Object.entries(storages.remotes).map(([peerId, remote]) => [
peerId,
remote.get('blob'),
])
),
},
blobSync
);
this.awareness = new AwarenessSyncImpl({
local: awareness,
remotes: Object.fromEntries(

View File

@@ -176,6 +176,7 @@ class WorkerBlobStorage implements BlobStorage {
constructor(private readonly client: OpClient<WorkerOps>) {}
readonly storageType = 'blob';
readonly isReadonly = false;
get(key: string, _signal?: AbortSignal): Promise<BlobRecord | null> {
return this.client.call('blobStorage.getBlob', key);
@@ -233,47 +234,38 @@ class WorkerBlobSync implements BlobSync {
get state$() {
return this.client.ob$('blobSync.state');
}
setMaxBlobSize(size: number): void {
this.client.call('blobSync.setMaxBlobSize', size).catch(err => {
console.error('error setting max blob size', err);
});
blobState$(blobId: string) {
return this.client.ob$('blobSync.blobState', blobId);
}
onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void {
const subscription = this.client
.ob$('blobSync.onReachedMaxBlobSize')
.subscribe(byteSize => {
cb(byteSize);
});
return () => {
subscription.unsubscribe();
};
}
downloadBlob(
blobId: string,
_signal?: AbortSignal
): Promise<BlobRecord | null> {
downloadBlob(blobId: string): Promise<void> {
return this.client.call('blobSync.downloadBlob', blobId);
}
uploadBlob(blob: BlobRecord, _signal?: AbortSignal): Promise<void> {
uploadBlob(blob: BlobRecord): Promise<void> {
return this.client.call('blobSync.uploadBlob', blob);
}
fullDownload(signal?: AbortSignal): Promise<void> {
const download = this.client.call('blobSync.fullDownload');
fullDownload(peerId?: string, signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const abortListener = () => {
reject(signal?.reason);
subscription.unsubscribe();
};
signal?.addEventListener('abort', () => {
download.cancel();
signal?.addEventListener('abort', abortListener);
const subscription = this.client
.ob$('blobSync.fullDownload', peerId ?? null)
.subscribe({
next() {
signal?.removeEventListener('abort', abortListener);
resolve();
},
error(err) {
signal?.removeEventListener('abort', abortListener);
reject(err);
},
});
});
return download;
}
fullUpload(signal?: AbortSignal): Promise<void> {
const upload = this.client.call('blobSync.fullUpload');
signal?.addEventListener('abort', () => {
upload.cancel();
});
return upload;
}
}

View File

@@ -170,25 +170,6 @@ class StoreConsumer {
this.blobStorage.delete(key, permanently),
'blobStorage.releaseBlobs': () => this.blobStorage.release(),
'blobStorage.listBlobs': () => this.blobStorage.list(),
'docSyncStorage.clearClocks': () => this.docSyncStorage.clearClocks(),
'docSyncStorage.getPeerPulledRemoteClock': ({ peer, docId }) =>
this.docSyncStorage.getPeerPulledRemoteClock(peer, docId),
'docSyncStorage.getPeerPulledRemoteClocks': ({ peer }) =>
this.docSyncStorage.getPeerPulledRemoteClocks(peer),
'docSyncStorage.setPeerPulledRemoteClock': ({ peer, clock }) =>
this.docSyncStorage.setPeerPulledRemoteClock(peer, clock),
'docSyncStorage.getPeerRemoteClock': ({ peer, docId }) =>
this.docSyncStorage.getPeerRemoteClock(peer, docId),
'docSyncStorage.getPeerRemoteClocks': ({ peer }) =>
this.docSyncStorage.getPeerRemoteClocks(peer),
'docSyncStorage.setPeerRemoteClock': ({ peer, clock }) =>
this.docSyncStorage.setPeerRemoteClock(peer, clock),
'docSyncStorage.getPeerPushedClock': ({ peer, docId }) =>
this.docSyncStorage.getPeerPushedClock(peer, docId),
'docSyncStorage.getPeerPushedClocks': ({ peer }) =>
this.docSyncStorage.getPeerPushedClocks(peer),
'docSyncStorage.setPeerPushedClock': ({ peer, clock }) =>
this.docSyncStorage.setPeerPushedClock(peer, clock),
'awarenessStorage.update': ({ awareness, origin }) =>
this.awarenessStorage.update(awareness, origin),
'awarenessStorage.subscribeUpdate': docId =>
@@ -232,20 +213,23 @@ class StoreConsumer {
return () => undo();
}),
'docSync.resetSync': () => this.docSync.resetSync(),
'blobSync.state': () => this.blobSync.state$,
'blobSync.blobState': blobId => this.blobSync.blobState$(blobId),
'blobSync.downloadBlob': key => this.blobSync.downloadBlob(key),
'blobSync.uploadBlob': blob => this.blobSync.uploadBlob(blob),
'blobSync.fullDownload': (_, { signal }) =>
this.blobSync.fullDownload(signal),
'blobSync.fullUpload': (_, { signal }) =>
this.blobSync.fullUpload(signal),
'blobSync.state': () => this.blobSync.state$,
'blobSync.setMaxBlobSize': size => this.blobSync.setMaxBlobSize(size),
'blobSync.onReachedMaxBlobSize': () =>
'blobSync.fullDownload': peerId =>
new Observable(subscriber => {
const undo = this.blobSync.onReachedMaxBlobSize(byteSize => {
subscriber.next(byteSize);
});
return () => undo();
const abortController = new AbortController();
this.blobSync
.fullDownload(peerId ?? undefined, abortController.signal)
.then(() => {
subscriber.next();
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
}),
'awarenessSync.update': ({ awareness, origin }) =>
this.awarenessSync.update(awareness, origin),

View File

@@ -10,7 +10,7 @@ import type {
StorageType,
} from '../storage';
import type { AwarenessRecord } from '../storage/awareness';
import type { BlobSyncState } from '../sync/blob';
import type { BlobSyncBlobState, BlobSyncState } from '../sync/blob';
import type { DocSyncDocState, DocSyncState } from '../sync/doc';
type StorageInitOptions = Values<{
@@ -45,22 +45,6 @@ interface GroupedWorkerOps {
listBlobs: [void, ListedBlobRecord[]];
};
docSyncStorage: {
getPeerPulledRemoteClocks: [{ peer: string }, DocClocks];
getPeerPulledRemoteClock: [
{ peer: string; docId: string },
DocClock | null,
];
setPeerPulledRemoteClock: [{ peer: string; clock: DocClock }, void];
getPeerRemoteClocks: [{ peer: string }, DocClocks];
getPeerRemoteClock: [{ peer: string; docId: string }, DocClock | null];
setPeerRemoteClock: [{ peer: string; clock: DocClock }, void];
getPeerPushedClocks: [{ peer: string }, DocClocks];
getPeerPushedClock: [{ peer: string; docId: string }, DocClock | null];
setPeerPushedClock: [{ peer: string; clock: DocClock }, void];
clearClocks: [void, void];
};
awarenessStorage: {
update: [{ awareness: AwarenessRecord; origin?: string }, void];
subscribeUpdate: [
@@ -85,13 +69,11 @@ interface GroupedWorkerOps {
};
blobSync: {
downloadBlob: [string, BlobRecord | null];
uploadBlob: [BlobRecord, void];
fullDownload: [void, void];
fullUpload: [void, void];
setMaxBlobSize: [number, void];
onReachedMaxBlobSize: [void, number];
state: [void, BlobSyncState];
blobState: [string, BlobSyncBlobState];
downloadBlob: [string, void];
uploadBlob: [BlobRecord, void];
fullDownload: [string | null, void];
};
awarenessSync: {

View File

@@ -6,5 +6,9 @@
"outDir": "./dist",
"tsBuildInfoFile": "./dist/tsconfig.tsbuildinfo"
},
"references": [{ "path": "../infra" }, { "path": "../../frontend/graphql" }]
"references": [
{ "path": "../infra" },
{ "path": "../error" },
{ "path": "../../frontend/graphql" }
]
}

View File

@@ -45,4 +45,6 @@ export const nbstoreHandlers: NativeDBApis = {
getPeerPushedClock: POOL.getPeerPushedClock.bind(POOL),
setPeerPushedClock: POOL.setPeerPushedClock.bind(POOL),
clearClocks: POOL.clearClocks.bind(POOL),
setBlobUploadedAt: POOL.setBlobUploadedAt.bind(POOL),
getBlobUploadedAt: POOL.getBlobUploadedAt.bind(POOL),
};

View File

@@ -34,6 +34,8 @@ public class NbStorePlugin: CAPPlugin, CAPBridgedPlugin {
CAPPluginMethod(name: "getPeerPushedClocks", returnType: CAPPluginReturnPromise),
CAPPluginMethod(name: "setPeerPushedClock", returnType: CAPPluginReturnPromise),
CAPPluginMethod(name: "clearClocks", returnType: CAPPluginReturnPromise),
CAPPluginMethod(name: "getBlobUploadedAt", returnType: CAPPluginReturnPromise),
CAPPluginMethod(name: "setBlobUploadedAt", returnType: CAPPluginReturnPromise),
]
@objc func connect(_ call: CAPPluginCall) {
@@ -490,6 +492,49 @@ public class NbStorePlugin: CAPPlugin, CAPBridgedPlugin {
}
}
@objc func getBlobUploadedAt(_ call: CAPPluginCall) {
Task {
do {
let id = try call.getStringEnsure("id")
let peer = try call.getStringEnsure("peer")
let blobId = try call.getStringEnsure("blobId")
let uploadedAt = try await docStoragePool.getBlobUploadedAt(
universalId: id,
peer: peer,
blobId: blobId
)
call.resolve([
"uploadedAt": uploadedAt as Any
])
} catch {
call.reject("Failed to get blob uploaded, \(error)", nil, error)
}
}
}
@objc func setBlobUploadedAt(_ call: CAPPluginCall) {
Task {
do {
let id = try call.getStringEnsure("id")
let peer = try call.getStringEnsure("peer")
let blobId = try call.getStringEnsure("blobId")
let uploadedAt = call.getInt("uploadedAt")
try await docStoragePool.setBlobUploadedAt(
universalId: id,
peer: peer,
blobId: blobId,
uploadedAt: uploadedAt == nil ? nil : Int64(uploadedAt!)
)
call.resolve()
} catch {
call.reject("Failed to set blob uploaded, \(error)", nil, error)
}
}
}
@objc func clearClocks(_ call: CAPPluginCall) {
Task {
do {

View File

@@ -514,6 +514,8 @@ public protocol DocStoragePoolProtocol: AnyObject {
func getBlob(universalId: String, key: String) async throws -> Blob?
func getBlobUploadedAt(universalId: String, peer: String, blobId: String) async throws -> Int64?
func getDocClock(universalId: String, docId: String) async throws -> DocClock?
func getDocClocks(universalId: String, after: Int64?) async throws -> [DocClock]
@@ -544,6 +546,8 @@ public protocol DocStoragePoolProtocol: AnyObject {
func setBlob(universalId: String, blob: SetBlob) async throws
func setBlobUploadedAt(universalId: String, peer: String, blobId: String, uploadedAt: Int64?) async throws
func setDocSnapshot(universalId: String, snapshot: DocRecord) async throws -> Bool
func setPeerPulledRemoteClock(universalId: String, peer: String, docId: String, clock: Int64) async throws
@@ -709,6 +713,23 @@ open func getBlob(universalId: String, key: String)async throws -> Blob? {
)
}
open func getBlobUploadedAt(universalId: String, peer: String, blobId: String)async throws -> Int64? {
return
try await uniffiRustCallAsync(
rustFutureFunc: {
uniffi_affine_mobile_native_fn_method_docstoragepool_get_blob_uploaded_at(
self.uniffiClonePointer(),
FfiConverterString.lower(universalId),FfiConverterString.lower(peer),FfiConverterString.lower(blobId)
)
},
pollFunc: ffi_affine_mobile_native_rust_future_poll_rust_buffer,
completeFunc: ffi_affine_mobile_native_rust_future_complete_rust_buffer,
freeFunc: ffi_affine_mobile_native_rust_future_free_rust_buffer,
liftFunc: FfiConverterOptionInt64.lift,
errorHandler: FfiConverterTypeUniffiError.lift
)
}
open func getDocClock(universalId: String, docId: String)async throws -> DocClock? {
return
try await uniffiRustCallAsync(
@@ -964,6 +985,23 @@ open func setBlob(universalId: String, blob: SetBlob)async throws {
)
}
open func setBlobUploadedAt(universalId: String, peer: String, blobId: String, uploadedAt: Int64?)async throws {
return
try await uniffiRustCallAsync(
rustFutureFunc: {
uniffi_affine_mobile_native_fn_method_docstoragepool_set_blob_uploaded_at(
self.uniffiClonePointer(),
FfiConverterString.lower(universalId),FfiConverterString.lower(peer),FfiConverterString.lower(blobId),FfiConverterOptionInt64.lower(uploadedAt)
)
},
pollFunc: ffi_affine_mobile_native_rust_future_poll_void,
completeFunc: ffi_affine_mobile_native_rust_future_complete_void,
freeFunc: ffi_affine_mobile_native_rust_future_free_void,
liftFunc: { $0 },
errorHandler: FfiConverterTypeUniffiError.lift
)
}
open func setDocSnapshot(universalId: String, snapshot: DocRecord)async throws -> Bool {
return
try await uniffiRustCallAsync(
@@ -1972,6 +2010,9 @@ private let initializationResult: InitializationResult = {
if (uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob() != 56927) {
return InitializationResult.apiChecksumMismatch
}
if (uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob_uploaded_at() != 41270) {
return InitializationResult.apiChecksumMismatch
}
if (uniffi_affine_mobile_native_checksum_method_docstoragepool_get_doc_clock() != 48394) {
return InitializationResult.apiChecksumMismatch
}
@@ -2017,6 +2058,9 @@ private let initializationResult: InitializationResult = {
if (uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob() != 31398) {
return InitializationResult.apiChecksumMismatch
}
if (uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob_uploaded_at() != 7188) {
return InitializationResult.apiChecksumMismatch
}
if (uniffi_affine_mobile_native_checksum_method_docstoragepool_set_doc_snapshot() != 5287) {
return InitializationResult.apiChecksumMismatch
}

View File

@@ -291,6 +291,11 @@ uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_disconnect(void*_N
uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_get_blob(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer key
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT
uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_get_blob_uploaded_at(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer peer, RustBuffer blob_id
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_DOC_CLOCK
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_GET_DOC_CLOCK
uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_get_doc_clock(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer doc_id
@@ -366,6 +371,11 @@ uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_release_blobs(void
uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_set_blob(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer blob
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT
uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_set_blob_uploaded_at(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer peer, RustBuffer blob_id, RustBuffer uploaded_at
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_DOC_SNAPSHOT
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_FN_METHOD_DOCSTORAGEPOOL_SET_DOC_SNAPSHOT
uint64_t uniffi_affine_mobile_native_fn_method_docstoragepool_set_doc_snapshot(void*_Nonnull ptr, RustBuffer universal_id, RustBuffer snapshot
@@ -728,6 +738,12 @@ uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_disconnect(v
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_BLOB
uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob(void
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_BLOB_UPLOADED_AT
uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_get_blob_uploaded_at(void
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_GET_DOC_CLOCK
@@ -818,6 +834,12 @@ uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_release_blob
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_BLOB
uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob(void
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT
#define UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_BLOB_UPLOADED_AT
uint16_t uniffi_affine_mobile_native_checksum_method_docstoragepool_set_blob_uploaded_at(void
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_AFFINE_MOBILE_NATIVE_CHECKSUM_METHOD_DOCSTORAGEPOOL_SET_DOC_SNAPSHOT

View File

@@ -137,5 +137,16 @@ export interface NbStorePlugin {
docId: string;
timestamp: number;
}) => Promise<void>;
getBlobUploadedAt: (options: {
id: string;
peer: string;
blobId: string;
}) => Promise<{ uploadedAt: number | null }>;
setBlobUploadedAt: (options: {
id: string;
peer: string;
blobId: string;
uploadedAt: number | null;
}) => Promise<void>;
clearClocks: (options: { id: string }) => Promise<void>;
}

View File

@@ -311,4 +311,29 @@ export const NbStoreNativeDBApis: NativeDBApis = {
id,
});
},
getBlobUploadedAt: async function (
id: string,
peer: string,
blobId: string
): Promise<Date | null> {
const result = await NbStore.getBlobUploadedAt({
id,
peer,
blobId,
});
return result.uploadedAt ? new Date(result.uploadedAt) : null;
},
setBlobUploadedAt: async function (
id: string,
peer: string,
blobId: string,
uploadedAt: Date | null
): Promise<void> {
await NbStore.setBlobUploadedAt({
id,
peer,
blobId,
uploadedAt: uploadedAt ? uploadedAt.getTime() : null,
});
},
};

View File

@@ -3,6 +3,7 @@ import './array-to-spliced';
import './dispose';
import './iterator-helpers';
import './promise-with-resolvers';
import './set-union';
import { polyfillEventLoop } from './request-idle-callback';
import { polyfillResizeObserver } from './resize-observer';

View File

@@ -0,0 +1 @@
import 'core-js/es/set/union.js';

View File

@@ -33,8 +33,8 @@ export const OverCapacityNotification = () => {
useEffect(() => {
const disposableOverCapacity =
currentWorkspace.engine.blob.state$.subscribe(
debounce(({ isStorageOverCapacity }: BlobSyncState) => {
const isOver = isStorageOverCapacity;
debounce(({ overCapacity }: BlobSyncState) => {
const isOver = overCapacity;
if (!isOver) {
return;
}

View File

@@ -8,8 +8,8 @@ import type { Workspace } from '@affine/core/modules/workspace';
import { useI18n } from '@affine/i18n';
import { universalId } from '@affine/nbstore';
import track from '@affine/track';
import { LiveData, useLiveData, useService } from '@toeverything/infra';
import { useMemo, useState } from 'react';
import { useService } from '@toeverything/infra';
import { useState } from 'react';
interface ExportPanelProps {
workspace: Workspace;
@@ -22,39 +22,18 @@ export const DesktopExportPanel = ({ workspace }: ExportPanelProps) => {
const desktopApi = useService(DesktopApiService);
const isLocalWorkspace = workspace.flavour === 'local';
const docSyncState = useLiveData(
useMemo(() => {
return workspace
? LiveData.from(workspace.engine.doc.state$, null).throttleTime(500)
: null;
}, [workspace])
);
const blobSyncState = useLiveData(
useMemo(() => {
return workspace
? LiveData.from(workspace.engine.blob.state$, null).throttleTime(500)
: null;
}, [workspace])
);
const docSynced = !docSyncState?.syncing;
const blobSynced =
!blobSyncState || blobSyncState.synced === blobSyncState.total;
const [fullSyncing, setFullSyncing] = useState(false);
const [fullSynced, setFullSynced] = useState(false);
const shouldWaitForFullSync =
isLocalWorkspace || !isOnline || (fullSynced && docSynced && blobSynced);
const fullSyncing = fullSynced && (!docSynced || !blobSynced);
const shouldWaitForFullSync = !isLocalWorkspace && isOnline && !fullSynced;
const fullSync = useAsyncCallback(async () => {
// NOTE: doc full sync is always started by default
// await workspace.engine.doc.waitForSynced();
workspace.engine.blob.fullDownload().catch(() => {
/* noop */
});
setFullSyncing(true);
await workspace.engine.blob.fullDownload();
await workspace.engine.doc.waitForSynced();
setFullSynced(true);
}, [workspace.engine.blob]);
setFullSyncing(false);
}, [workspace.engine.blob, workspace.engine.doc]);
const onExport = useAsyncCallback(async () => {
if (saving) {
@@ -86,7 +65,7 @@ export const DesktopExportPanel = ({ workspace }: ExportPanelProps) => {
}
}, [desktopApi, saving, t, workspace]);
if (!shouldWaitForFullSync) {
if (shouldWaitForFullSync) {
return (
<SettingRow name={t['Export']()} desc={t['Full Sync Description']()}>
<Button

View File

@@ -106,6 +106,7 @@ export class CMDKQuickSearchService extends Service {
primaryMode: 'page',
docProps,
});
this.workbenchService.workbench.openDoc(newDoc.id);
} else if (result.id === 'creation:create-edgeless') {
const newDoc = this.docsService.createDoc({

View File

@@ -13,6 +13,7 @@ import type {
import { CloudBlobStorage, StaticCloudDocStorage } from '@affine/nbstore/cloud';
import {
IndexedDBBlobStorage,
IndexedDBBlobSyncStorage,
IndexedDBDocStorage,
IndexedDBDocSyncStorage,
} from '@affine/nbstore/idb';
@@ -22,6 +23,7 @@ import {
} from '@affine/nbstore/idb/v1';
import {
SqliteBlobStorage,
SqliteBlobSyncStorage,
SqliteDocStorage,
SqliteDocSyncStorage,
} from '@affine/nbstore/sqlite';
@@ -115,6 +117,10 @@ class CloudWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS
? SqliteDocSyncStorage
: IndexedDBDocSyncStorage;
BlobSyncStorageType =
BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS
? SqliteBlobSyncStorage
: IndexedDBBlobSyncStorage;
async deleteWorkspace(id: string): Promise<void> {
await this.graphqlService.gql({
@@ -439,6 +445,14 @@ class CloudWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
id: workspaceId,
},
},
blobSync: {
name: this.BlobSyncStorageType.identifier,
opts: {
flavour: this.flavour,
type: 'workspace',
id: workspaceId,
},
},
awareness: {
name: 'BroadcastChannelAwarenessStorage',
opts: {

View File

@@ -7,6 +7,7 @@ import {
} from '@affine/nbstore';
import {
IndexedDBBlobStorage,
IndexedDBBlobSyncStorage,
IndexedDBDocStorage,
IndexedDBDocSyncStorage,
} from '@affine/nbstore/idb';
@@ -16,6 +17,7 @@ import {
} from '@affine/nbstore/idb/v1';
import {
SqliteBlobStorage,
SqliteBlobSyncStorage,
SqliteDocStorage,
SqliteDocSyncStorage,
} from '@affine/nbstore/sqlite';
@@ -101,6 +103,10 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS
? SqliteDocSyncStorage
: IndexedDBDocSyncStorage;
BlobSyncStorageType =
BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS
? SqliteBlobSyncStorage
: IndexedDBBlobSyncStorage;
async deleteWorkspace(id: string): Promise<void> {
setLocalWorkspaceIds(ids => ids.filter(x => x !== id));
@@ -321,6 +327,14 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
id: workspaceId,
},
},
blobSync: {
name: this.BlobSyncStorageType.identifier,
opts: {
flavour: this.flavour,
type: 'workspace',
id: workspaceId,
},
},
docSync: {
name: this.DocSyncStorageType.identifier,
opts: {

View File

@@ -63,5 +63,10 @@ export class WorkspaceEngine extends Entity<{
this.doc.addPriority(rootDoc.guid, 100);
this.doc.start();
this.disposables.push(() => this.doc.stop());
// fully migrate blobs from v1 to v2, its won't do anything if v1 storage is not exist
store.blobFrontend.fullDownload('v1').catch(() => {
// should never reach here
});
}
}

View File

@@ -599,4 +599,48 @@ impl DocStoragePool {
pub async fn clear_clocks(&self, universal_id: String) -> Result<()> {
Ok(self.inner.get(universal_id).await?.clear_clocks().await?)
}
pub async fn set_blob_uploaded_at(
&self,
universal_id: String,
peer: String,
blob_id: String,
uploaded_at: Option<i64>,
) -> Result<()> {
Ok(
self
.inner
.get(universal_id)
.await?
.set_blob_uploaded_at(
peer,
blob_id,
uploaded_at
.map(|t| {
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(t)
.ok_or(UniffiError::TimestampDecodingError)
.map(|t| t.naive_utc())
})
.transpose()?,
)
.await?,
)
}
pub async fn get_blob_uploaded_at(
&self,
universal_id: String,
peer: String,
blob_id: String,
) -> Result<Option<i64>> {
Ok(
self
.inner
.get(universal_id)
.await?
.get_blob_uploaded_at(peer, blob_id)
.await?
.map(|t| t.and_utc().timestamp_millis()),
)
}
}

View File

@@ -59,6 +59,8 @@ export declare class DocStoragePool {
getPeerPushedClock(universalId: string, peer: string, docId: string): Promise<DocClock | null>
setPeerPushedClock(universalId: string, peer: string, docId: string, clock: Date): Promise<void>
clearClocks(universalId: string): Promise<void>
setBlobUploadedAt(universalId: string, peer: string, blobId: string, uploadedAt?: Date | undefined | null): Promise<void>
getBlobUploadedAt(universalId: string, peer: string, blobId: string): Promise<Date | null>
}
export declare class Mp3Encoder {

View File

@@ -0,0 +1,92 @@
use chrono::NaiveDateTime;
use super::{error::Result, storage::SqliteDocStorage};
impl SqliteDocStorage {
pub async fn set_blob_uploaded_at(
&self,
peer: String,
blob_id: String,
uploaded_at: Option<NaiveDateTime>,
) -> Result<()> {
sqlx::query(
r#"
INSERT INTO peer_blob_sync (peer, blob_id, uploaded_at)
VALUES ($1, $2, $3)
ON CONFLICT(peer, blob_id)
DO UPDATE SET uploaded_at=$3;"#,
)
.bind(peer)
.bind(blob_id)
.bind(uploaded_at)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_blob_uploaded_at(
&self,
peer: String,
blob_id: String,
) -> Result<Option<NaiveDateTime>> {
let result = sqlx::query_scalar!(
"SELECT uploaded_at FROM peer_blob_sync WHERE peer = ? AND blob_id = ?",
peer,
blob_id
)
.fetch_optional(&self.pool)
.await?;
Ok(result.flatten())
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use super::*;
async fn get_storage() -> SqliteDocStorage {
let storage = SqliteDocStorage::new(":memory:".to_string());
storage.connect().await.unwrap();
storage
}
#[tokio::test]
async fn blob_uploaded_at() {
let storage = get_storage().await;
let peer = String::from("peer1");
let blob_id = String::from("blob1");
let uploaded_at = storage
.get_blob_uploaded_at(peer.clone(), blob_id.clone())
.await
.unwrap();
assert!(uploaded_at.is_none());
let now = Utc::now().naive_utc();
storage
.set_blob_uploaded_at(peer.clone(), blob_id.clone(), Some(now))
.await
.unwrap();
let uploaded_at = storage
.get_blob_uploaded_at(peer.clone(), blob_id.clone())
.await
.unwrap();
assert!(uploaded_at.is_some());
assert_eq!(uploaded_at.unwrap(), now);
storage
.set_blob_uploaded_at(peer.clone(), blob_id.clone(), None)
.await
.unwrap();
let uploaded_at = storage
.get_blob_uploaded_at(peer.clone(), blob_id.clone())
.await
.unwrap();
assert!(uploaded_at.is_none());
}
}

View File

@@ -1,9 +1,10 @@
pub mod blob;
pub mod blob_sync;
pub mod doc;
pub mod doc_sync;
pub mod error;
pub mod pool;
pub mod storage;
pub mod sync;
use chrono::NaiveDateTime;
use napi::bindgen_prelude::*;
@@ -402,6 +403,38 @@ impl DocStoragePool {
self.get(universal_id).await?.clear_clocks().await?;
Ok(())
}
#[napi]
pub async fn set_blob_uploaded_at(
&self,
universal_id: String,
peer: String,
blob_id: String,
uploaded_at: Option<NaiveDateTime>,
) -> Result<()> {
self
.get(universal_id)
.await?
.set_blob_uploaded_at(peer, blob_id, uploaded_at)
.await?;
Ok(())
}
#[napi]
pub async fn get_blob_uploaded_at(
&self,
universal_id: String,
peer: String,
blob_id: String,
) -> Result<Option<NaiveDateTime>> {
let result = self
.get(universal_id)
.await?
.get_blob_uploaded_at(peer, blob_id)
.await?;
Ok(result)
}
}
#[napi]

View File

@@ -57,6 +57,20 @@ CREATE TABLE "peer_clocks" (
PRIMARY KEY (peer, doc_id)
);
CREATE INDEX peer_clocks_doc_id ON peer_clocks (doc_id);
"#,
None,
),
// add blob_sync table
(
"add_blob_sync",
r#"
CREATE TABLE "peer_blob_sync" (
peer VARCHAR NOT NULL,
blob_id VARCHAR NOT NULL,
uploaded_at TIMESTAMP,
PRIMARY KEY (peer, blob_id)
);
CREATE INDEX peer_blob_sync_peer ON peer_blob_sync (peer);
"#,
None,
),