mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-12 04:18:54 +00:00
fix(infra): avoid data loss (#6111)
This commit is contained in:
@@ -13,6 +13,9 @@ export function fixWorkspaceVersion(rootDoc: YDoc) {
|
||||
* Blocksuite just set the value, do nothing else.
|
||||
*/
|
||||
function doFix() {
|
||||
if (meta.size === 0) {
|
||||
return;
|
||||
}
|
||||
const workspaceVersion = meta.get('workspaceVersion');
|
||||
if (typeof workspaceVersion !== 'number' || workspaceVersion < 2) {
|
||||
transact(
|
||||
|
||||
@@ -68,15 +68,17 @@ export class SyncEngine {
|
||||
this.onStatusChange.emit(s);
|
||||
}
|
||||
isRootDocLoaded = LiveData.from(
|
||||
new Observable(observer => {
|
||||
new Observable<boolean>(observer => {
|
||||
observer.next(
|
||||
this.status.local
|
||||
? this.status.local.step > SyncPeerStep.LoadingRootDoc
|
||||
: false
|
||||
[this.status?.local, ...(this.status?.remotes ?? [])].some(
|
||||
p => p?.rootDocLoaded === true
|
||||
)
|
||||
);
|
||||
this.onStatusChange.on(status => {
|
||||
observer.next(
|
||||
status.local ? status.local.step > SyncPeerStep.LoadingRootDoc : false
|
||||
[status?.local, ...(status?.remotes ?? [])].some(
|
||||
p => p?.rootDocLoaded === true
|
||||
)
|
||||
);
|
||||
});
|
||||
}),
|
||||
|
||||
@@ -20,6 +20,7 @@ export interface SyncPeerStatus {
|
||||
pendingPullUpdates: number;
|
||||
pendingPushUpdates: number;
|
||||
lastError: string | null;
|
||||
rootDocLoaded: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -56,6 +57,7 @@ export class SyncPeer {
|
||||
pendingPullUpdates: 0,
|
||||
pendingPushUpdates: 0,
|
||||
lastError: null,
|
||||
rootDocLoaded: false,
|
||||
};
|
||||
onStatusChange = new Slot<SyncPeerStatus>();
|
||||
readonly abort = new AbortController();
|
||||
@@ -122,6 +124,7 @@ export class SyncPeer {
|
||||
pendingPullUpdates: 0,
|
||||
pendingPushUpdates: 0,
|
||||
lastError: 'Retrying sync after 5 seconds',
|
||||
rootDocLoaded: this.status.rootDocLoaded,
|
||||
};
|
||||
await Promise.race([
|
||||
new Promise<void>(resolve => {
|
||||
@@ -295,6 +298,13 @@ export class SyncPeer {
|
||||
(await this.storage.pull(doc.guid, encodeStateVector(doc))) ?? {};
|
||||
throwIfAborted(abort);
|
||||
|
||||
if (docData !== undefined && doc.guid === this.rootDoc.guid) {
|
||||
this.status = {
|
||||
...this.status,
|
||||
rootDocLoaded: true,
|
||||
};
|
||||
}
|
||||
|
||||
if (docData) {
|
||||
applyUpdate(doc, docData, 'load');
|
||||
}
|
||||
@@ -400,6 +410,7 @@ export class SyncPeer {
|
||||
pendingPushUpdates:
|
||||
this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0),
|
||||
lastError,
|
||||
rootDocLoaded: this.status.rootDocLoaded,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@ import { base64ToUint8Array, uint8ArrayToBase64 } from '../utils/base64';
|
||||
|
||||
const logger = new DebugLogger('affine:storage:socketio');
|
||||
|
||||
(window as any)._TEST_SIMULATE_SYNC_LAG = Promise.resolve();
|
||||
|
||||
export class AffineSyncStorage implements SyncStorage {
|
||||
name = 'affine-cloud';
|
||||
|
||||
@@ -57,6 +59,9 @@ export class AffineSyncStorage implements SyncStorage {
|
||||
docId: string,
|
||||
state: Uint8Array
|
||||
): Promise<{ data: Uint8Array; state?: Uint8Array } | null> {
|
||||
// for testing
|
||||
await (window as any)._TEST_SIMULATE_SYNC_LAG;
|
||||
|
||||
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
|
||||
|
||||
logger.debug('doc-load-v2', {
|
||||
|
||||
@@ -20,6 +20,13 @@ export class SQLiteSyncStorage implements SyncStorage {
|
||||
);
|
||||
|
||||
if (update) {
|
||||
if (
|
||||
update.byteLength === 0 ||
|
||||
(update.byteLength === 2 && update[0] === 0 && update[1] === 0)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
data: update,
|
||||
state: encodeStateVectorFromUpdate(update),
|
||||
|
||||
Reference in New Issue
Block a user