perf(core): only full sync before exporting (#10408)

This commit is contained in:
liuyi
2025-02-25 12:41:56 +08:00
committed by GitHub
parent c644a46b8d
commit e5e5c0a8ba
8 changed files with 222 additions and 120 deletions

View File

@@ -15,8 +15,12 @@ export class BlobFrontend {
return this.sync.uploadBlob(blob);
}
fullSync() {
return this.sync.fullSync();
fullDownload() {
return this.sync.fullDownload();
}
fullUpload() {
return this.sync.fullUpload();
}
addPriority(_id: string, _priority: number) {

View File

@@ -9,6 +9,8 @@ import type { PeerStorageOptions } from '../types';
export interface BlobSyncState {
isStorageOverCapacity: boolean;
total: number;
synced: number;
}
export interface BlobSync {
@@ -18,7 +20,8 @@ export interface BlobSync {
signal?: AbortSignal
): Promise<BlobRecord | null>;
uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise<void>;
fullSync(signal?: AbortSignal): Promise<void>;
fullDownload(signal?: AbortSignal): Promise<void>;
fullUpload(signal?: AbortSignal): Promise<void>;
setMaxBlobSize(size: number): void;
onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void;
}
@@ -26,6 +29,8 @@ export interface BlobSync {
export class BlobSyncImpl implements BlobSync {
readonly state$ = new BehaviorSubject<BlobSyncState>({
isStorageOverCapacity: false,
total: Object.values(this.storages.remotes).length ? 1 : 0,
synced: 0,
});
private abort: AbortController | null = null;
private maxBlobSize: number = 1024 * 1024 * 100; // 100MB
@@ -34,19 +39,24 @@ export class BlobSyncImpl implements BlobSync {
constructor(readonly storages: PeerStorageOptions<BlobStorage>) {}
async downloadBlob(blobId: string, signal?: AbortSignal) {
const localBlob = await this.storages.local.get(blobId, signal);
if (localBlob) {
return localBlob;
}
for (const storage of Object.values(this.storages.remotes)) {
const data = await storage.get(blobId, signal);
if (data) {
await this.storages.local.set(data, signal);
return data;
try {
const localBlob = await this.storages.local.get(blobId, signal);
if (localBlob) {
return localBlob;
}
for (const storage of Object.values(this.storages.remotes)) {
const data = await storage.get(blobId, signal);
if (data) {
await this.storages.local.set(data, signal);
return data;
}
}
return null;
} catch (e) {
console.error('error when download blob', e);
return null;
}
return null;
}
async uploadBlob(blob: BlobRecord, signal?: AbortSignal) {
@@ -62,7 +72,11 @@ export class BlobSyncImpl implements BlobSync {
return await remote.set(blob, signal);
} catch (err) {
if (err instanceof OverCapacityError) {
this.state$.next({ isStorageOverCapacity: true });
this.state$.next({
isStorageOverCapacity: true,
total: this.state$.value.total,
synced: this.state$.value.synced,
});
}
throw err;
}
@@ -70,71 +84,95 @@ export class BlobSyncImpl implements BlobSync {
);
}
async fullSync(signal?: AbortSignal) {
async fullDownload(signal?: AbortSignal) {
throwIfAborted(signal);
await this.storages.local.connection.waitForConnected(signal);
const localList = (await this.storages.local.list(signal)).map(b => b.key);
this.state$.next({
...this.state$.value,
synced: localList.length,
});
for (const [remotePeer, remote] of Object.entries(this.storages.remotes)) {
let localList: string[] = [];
let remoteList: string[] = [];
await Promise.allSettled(
Object.entries(this.storages.remotes).map(
async ([remotePeer, remote]) => {
await remote.connection.waitForConnected(signal);
await remote.connection.waitForConnected(signal);
const remoteList = (await remote.list(signal)).map(b => b.key);
try {
localList = (await this.storages.local.list(signal)).map(b => b.key);
throwIfAborted(signal);
remoteList = (await remote.list(signal)).map(b => b.key);
throwIfAborted(signal);
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(`error when sync`, err);
continue;
}
this.state$.next({
...this.state$.value,
total: Math.max(this.state$.value.total, remoteList.length),
});
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.storages.local.get(key, signal);
throwIfAborted(signal);
if (data) {
await remote.set(data, signal);
throwIfAborted(signal);
const needDownload = difference(remoteList, localList);
for (const key of needDownload) {
try {
const data = await remote.get(key, signal);
throwIfAborted(signal);
if (data) {
await this.storages.local.set(data, signal);
this.state$.next({
...this.state$.value,
synced: this.state$.value.synced + 1,
});
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [${remotePeer}] to [local]`,
err
);
}
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [local] to [${remotePeer}]`,
err
);
}
}
)
);
}
const needDownload = difference(remoteList, localList);
async fullUpload(signal?: AbortSignal) {
throwIfAborted(signal);
await this.storages.local.connection.waitForConnected(signal);
const localList = (await this.storages.local.list(signal)).map(b => b.key);
await Promise.allSettled(
Object.entries(this.storages.remotes).map(
async ([remotePeer, remote]) => {
await remote.connection.waitForConnected(signal);
const remoteList = (await remote.list(signal)).map(b => b.key);
for (const key of needDownload) {
try {
const data = await remote.get(key, signal);
throwIfAborted(signal);
if (data) {
await this.storages.local.set(data, signal);
throwIfAborted(signal);
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.storages.local.get(key, signal);
throwIfAborted(signal);
if (data) {
await remote.set(data, signal);
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [local] to [${remotePeer}]`,
err
);
}
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [${remotePeer}] to [local]`,
err
);
}
}
}
)
);
}
start() {
@@ -144,16 +182,12 @@ export class BlobSyncImpl implements BlobSync {
const abort = new AbortController();
this.abort = abort;
// TODO(@eyhn): fix this, large blob may cause iOS to crash?
if (!BUILD_CONFIG.isIOS) {
this.fullSync(abort.signal).catch(error => {
if (error === MANUALLY_STOP) {
return;
}
console.error('sync blob error', error);
});
}
this.fullUpload(abort.signal).catch(error => {
if (error === MANUALLY_STOP) {
return;
}
console.error('sync blob error', error);
});
}
stop() {

View File

@@ -257,26 +257,23 @@ class WorkerBlobSync implements BlobSync {
uploadBlob(blob: BlobRecord, _signal?: AbortSignal): Promise<void> {
return this.client.call('blobSync.uploadBlob', blob);
}
fullSync(signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const abortListener = () => {
reject(signal?.reason);
subscription.unsubscribe();
};
fullDownload(signal?: AbortSignal): Promise<void> {
const download = this.client.call('blobSync.fullDownload');
signal?.addEventListener('abort', abortListener);
const subscription = this.client.ob$('blobSync.fullSync').subscribe({
next() {
signal?.removeEventListener('abort', abortListener);
resolve();
},
error(err) {
signal?.removeEventListener('abort', abortListener);
reject(err);
},
});
signal?.addEventListener('abort', () => {
download.cancel();
});
return download;
}
fullUpload(signal?: AbortSignal): Promise<void> {
const upload = this.client.call('blobSync.fullUpload');
signal?.addEventListener('abort', () => {
upload.cancel();
});
return upload;
}
}

View File

@@ -234,20 +234,10 @@ class StoreConsumer {
'docSync.resetSync': () => this.docSync.resetSync(),
'blobSync.downloadBlob': key => this.blobSync.downloadBlob(key),
'blobSync.uploadBlob': blob => this.blobSync.uploadBlob(blob),
'blobSync.fullSync': () =>
new Observable(subscriber => {
const abortController = new AbortController();
this.blobSync
.fullSync(abortController.signal)
.then(() => {
subscriber.next(true);
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
}),
'blobSync.fullDownload': (_, { signal }) =>
this.blobSync.fullDownload(signal),
'blobSync.fullUpload': (_, { signal }) =>
this.blobSync.fullUpload(signal),
'blobSync.state': () => this.blobSync.state$,
'blobSync.setMaxBlobSize': size => this.blobSync.setMaxBlobSize(size),
'blobSync.onReachedMaxBlobSize': () =>

View File

@@ -87,7 +87,8 @@ interface GroupedWorkerOps {
blobSync: {
downloadBlob: [string, BlobRecord | null];
uploadBlob: [BlobRecord, void];
fullSync: [void, boolean];
fullDownload: [void, void];
fullUpload: [void, void];
setMaxBlobSize: [number, void];
onReachedMaxBlobSize: [void, number];
state: [void, BlobSyncState];