fix(nbstore): adjust doc sync logic (#10342)

This commit is contained in:
EYHN
2025-02-21 08:17:07 +00:00
parent f3218ab3bc
commit 244d683d83
2 changed files with 47 additions and 27 deletions

View File

@@ -399,23 +399,31 @@ export class DocFrontend {
this.statusUpdatedSubject$.next(job.docId);
}
/**
* skip listen doc update when apply update
*/
private skipDocUpdate = false;
applyUpdate(docId: string, update: Uint8Array) {
const doc = this.status.docs.get(docId);
if (doc && !isEmptyUpdate(update)) {
try {
this.skipDocUpdate = true;
applyUpdate(doc, update, NBSTORE_ORIGIN);
} catch (err) {
console.error('failed to apply update yjs doc', err);
} finally {
this.skipDocUpdate = false;
}
}
}
private readonly handleDocUpdate = (
update: Uint8Array,
origin: any,
_origin: any,
doc: YDoc
) => {
if (origin === NBSTORE_ORIGIN) {
if (this.skipDocUpdate) {
return;
}
if (!this.status.docs.has(doc.guid)) {

View File

@@ -17,7 +17,7 @@ type Job =
| {
type: 'push';
docId: string;
update: Uint8Array;
update?: Uint8Array;
clock: Date;
}
| {
@@ -278,7 +278,9 @@ export class DocSyncPeer {
);
const merged = await this.mergeUpdates(
jobs.map(j => j.update).filter(update => !isEmptyUpdate(update))
jobs
.map(j => j.update ?? new Uint8Array())
.filter(update => !isEmptyUpdate(update))
);
if (!isEmptyUpdate(merged)) {
const { timestamp } = await this.remote.pushDocUpdate(
@@ -316,11 +318,6 @@ export class DocSyncPeer {
state: serverStateVector,
timestamp: remoteClock,
} = remoteDocRecord;
this.schedule({
type: 'save',
docId,
remoteClock,
});
throwIfAborted(signal);
const { timestamp: localClock } = await this.local.pushDocUpdate(
{
@@ -359,9 +356,10 @@ export class DocSyncPeer {
});
}
throwIfAborted(signal);
await this.syncMetadata.setPeerPushedClock(this.peerId, {
this.schedule({
type: 'push',
docId,
timestamp: localClock,
clock: localClock,
});
} else {
if (localDocRecord) {
@@ -380,6 +378,11 @@ export class DocSyncPeer {
remoteClock,
});
}
this.schedule({
type: 'push',
docId,
clock: localDocRecord.timestamp,
});
await this.syncMetadata.setPeerPushedClock(this.peerId, {
docId,
timestamp: localDocRecord.timestamp,
@@ -400,7 +403,7 @@ export class DocSyncPeer {
}
const { missing: newData, timestamp: remoteClock } = serverDoc;
throwIfAborted(signal);
await this.local.pushDocUpdate(
const { timestamp } = await this.local.pushDocUpdate(
{
docId,
bin: newData,
@@ -413,9 +416,9 @@ export class DocSyncPeer {
timestamp: remoteClock,
});
this.schedule({
type: 'save',
type: 'push',
docId,
remoteClock: remoteClock,
clock: timestamp,
});
},
save: async (
@@ -438,13 +441,20 @@ export class DocSyncPeer {
throwIfAborted(signal);
if (!isEmptyUpdate(update)) {
await this.local.pushDocUpdate(
const { timestamp } = await this.local.pushDocUpdate(
{
docId,
bin: update,
},
this.uniqueId
);
// schedule push job to mark the timestamp as pushed timestamp
this.schedule({
type: 'push',
docId,
clock: timestamp,
});
}
throwIfAborted(signal);
@@ -457,15 +467,9 @@ export class DocSyncPeer {
});
private readonly actions = {
updateRemoteClock: async (docId: string, remoteClock: Date) => {
const updated = this.status.remoteClocks.setIfBigger(docId, remoteClock);
if (updated) {
await this.syncMetadata.setPeerRemoteClock(this.peerId, {
docId,
timestamp: remoteClock,
});
this.statusUpdatedSubject$.next(docId);
}
updateRemoteClock: (docId: string, remoteClock: Date) => {
this.status.remoteClocks.setIfBigger(docId, remoteClock);
this.statusUpdatedSubject$.next(docId);
},
addDoc: (docId: string) => {
if (!this.status.docs.has(docId)) {
@@ -511,6 +515,7 @@ export class DocSyncPeer {
}) => {
// try add doc for new doc
this.actions.addDoc(docId);
this.actions.updateRemoteClock(docId, remoteClock);
// schedule push job
this.schedule({
@@ -684,7 +689,14 @@ export class DocSyncPeer {
const maxClockValue = this.status.remoteClocks.max;
const newClocks = await this.remote.getDocTimestamps(maxClockValue);
for (const [id, v] of Object.entries(newClocks)) {
await this.actions.updateRemoteClock(id, v);
this.actions.updateRemoteClock(id, v);
}
for (const [id, v] of Object.entries(newClocks)) {
await this.syncMetadata.setPeerRemoteClock(this.peerId, {
docId: id,
timestamp: v,
});
}
// add all docs from remote
@@ -778,9 +790,9 @@ export class DocSyncPeer {
};
}
protected mergeUpdates(updates: Uint8Array[]) {
protected mergeUpdates = (updates: Uint8Array[]) => {
const merge = this.options?.mergeUpdates ?? mergeUpdates;
return merge(updates.filter(bin => !isEmptyUpdate(bin)));
}
};
}