feat(nbstore): better doc sync logic (#9037)

This commit is contained in:
EYHN
2024-12-10 06:49:21 +00:00
parent 0a7a2c3083
commit 35edf389b5
10 changed files with 305 additions and 99 deletions

View File

@@ -32,7 +32,7 @@ type Job =
type: 'save';
docId: string;
update?: Uint8Array;
serverClock: Date;
remoteClock: Date;
};
interface Status {
@@ -41,8 +41,6 @@ interface Status {
jobDocQueue: AsyncPriorityQueue;
jobMap: Map<string, Job[]>;
remoteClocks: ClockMap;
pulledRemoteClocks: ClockMap;
pushedClocks: ClockMap;
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
@@ -81,7 +79,7 @@ export class DocSyncPeer {
/**
* random unique id for recognize self in "update" event
*/
private readonly uniqueId = nanoid();
private readonly uniqueId = `sync:${this.local.peer}:${this.remote.peer}:${nanoid()}`;
private readonly prioritySettings = new Map<string, number>();
constructor(
@@ -97,8 +95,6 @@ export class DocSyncPeer {
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map(),
remoteClocks: new ClockMap(new Map()),
pulledRemoteClocks: new ClockMap(new Map()),
pushedClocks: new ClockMap(new Map()),
syncing: false,
retrying: false,
errorMessage: null,
@@ -107,14 +103,23 @@ export class DocSyncPeer {
private readonly jobs = createJobErrorCatcher({
connect: async (docId: string, signal?: AbortSignal) => {
const pushedClock = this.status.pushedClocks.get(docId);
const pushedClock =
(await this.syncMetadata.getPeerPushedClock(this.remote.peer, docId))
?.timestamp ?? null;
const clock = await this.local.getDocTimestamp(docId);
throwIfAborted(signal);
if (pushedClock === null || pushedClock !== clock?.timestamp) {
await this.jobs.pullAndPush(docId, signal);
} else {
const pulled = this.status.pulledRemoteClocks.get(docId);
// no need to push
const pulled =
(
await this.syncMetadata.getPeerPulledRemoteClock(
this.remote.peer,
docId
)
)?.timestamp ?? null;
if (pulled === null || pulled !== this.status.remoteClocks.get(docId)) {
await this.jobs.pull(docId, signal);
}
@@ -133,6 +138,7 @@ export class DocSyncPeer {
(a, b) => (a.getTime() > b.clock.getTime() ? a : b.clock),
new Date(0)
);
const merged = await this.mergeUpdates(
jobs.map(j => j.update).filter(update => !isEmptyUpdate(update))
);
@@ -147,19 +153,22 @@ export class DocSyncPeer {
this.schedule({
type: 'save',
docId,
serverClock: timestamp,
remoteClock: timestamp,
});
}
throwIfAborted(signal);
await this.actions.updatePushedClock(docId, maxClock);
await this.syncMetadata.setPeerPushedClock(this.remote.peer, {
docId,
timestamp: maxClock,
});
}
},
pullAndPush: async (docId: string, signal?: AbortSignal) => {
const docRecord = await this.local.getDoc(docId);
const localDocRecord = await this.local.getDoc(docId);
const stateVector =
docRecord && !isEmptyUpdate(docRecord.bin)
? encodeStateVectorFromUpdate(docRecord.bin)
localDocRecord && !isEmptyUpdate(localDocRecord.bin)
? encodeStateVectorFromUpdate(localDocRecord.bin)
: new Uint8Array();
const remoteDocRecord = await this.remote.getDocDiff(docId, stateVector);
@@ -167,12 +176,12 @@ export class DocSyncPeer {
const {
missing: newData,
state: serverStateVector,
timestamp: serverClock,
timestamp: remoteClock,
} = remoteDocRecord;
this.schedule({
type: 'save',
docId,
serverClock,
remoteClock,
});
throwIfAborted(signal);
const { timestamp: localClock } = await this.local.pushDocUpdate(
@@ -183,14 +192,17 @@ export class DocSyncPeer {
this.uniqueId
);
throwIfAborted(signal);
await this.actions.updatePulledRemoteClock(docId, serverClock);
await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, {
docId,
timestamp: remoteClock,
});
const diff =
docRecord && serverStateVector && serverStateVector.length > 0
? diffUpdate(docRecord.bin, serverStateVector)
: docRecord?.bin;
localDocRecord && serverStateVector && serverStateVector.length > 0
? diffUpdate(localDocRecord.bin, serverStateVector)
: localDocRecord?.bin;
if (diff && !isEmptyUpdate(diff)) {
throwIfAborted(signal);
const { timestamp: serverClock } = await this.remote.pushDocUpdate(
const { timestamp: remoteClock } = await this.remote.pushDocUpdate(
{
bin: diff,
docId,
@@ -200,18 +212,21 @@ export class DocSyncPeer {
this.schedule({
type: 'save',
docId,
serverClock,
remoteClock,
});
}
throwIfAborted(signal);
await this.actions.updatePushedClock(docId, localClock);
await this.syncMetadata.setPeerPushedClock(this.remote.peer, {
docId,
timestamp: localClock,
});
} else {
if (docRecord) {
if (!isEmptyUpdate(docRecord.bin)) {
if (localDocRecord) {
if (!isEmptyUpdate(localDocRecord.bin)) {
throwIfAborted(signal);
const { timestamp: serverClock } = await this.remote.pushDocUpdate(
const { timestamp: remoteClock } = await this.remote.pushDocUpdate(
{
bin: docRecord.bin,
bin: localDocRecord.bin,
docId,
},
this.uniqueId
@@ -219,10 +234,13 @@ export class DocSyncPeer {
this.schedule({
type: 'save',
docId,
serverClock,
remoteClock,
});
}
await this.actions.updatePushedClock(docId, docRecord.timestamp);
await this.syncMetadata.setPeerPushedClock(this.remote.peer, {
docId,
timestamp: localDocRecord.timestamp,
});
}
}
},
@@ -237,7 +255,7 @@ export class DocSyncPeer {
if (!serverDoc) {
return;
}
const { missing: newData, timestamp: serverClock } = serverDoc;
const { missing: newData, timestamp: remoteClock } = serverDoc;
throwIfAborted(signal);
await this.local.pushDocUpdate(
{
@@ -247,11 +265,14 @@ export class DocSyncPeer {
this.uniqueId
);
throwIfAborted(signal);
await this.actions.updatePulledRemoteClock(docId, serverClock);
await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, {
docId,
timestamp: remoteClock,
});
this.schedule({
type: 'save',
docId,
serverClock,
remoteClock: remoteClock,
});
},
save: async (
@@ -259,8 +280,8 @@ export class DocSyncPeer {
jobs: (Job & { type: 'save' })[],
signal?: AbortSignal
) => {
const serverClock = jobs.reduce(
(a, b) => (a.getTime() > b.serverClock.getTime() ? a : b.serverClock),
const remoteClock = jobs.reduce(
(a, b) => (a.getTime() > b.remoteClock.getTime() ? a : b.remoteClock),
new Date(0)
);
if (this.status.connectedDocs.has(docId)) {
@@ -282,7 +303,10 @@ export class DocSyncPeer {
);
throwIfAborted(signal);
await this.actions.updatePulledRemoteClock(docId, serverClock);
await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, {
docId,
timestamp: remoteClock,
});
}
},
});
@@ -298,29 +322,6 @@ export class DocSyncPeer {
this.statusUpdatedSubject$.next(docId);
}
},
updatePushedClock: async (docId: string, pushedClock: Date) => {
const updated = this.status.pushedClocks.setIfBigger(docId, pushedClock);
if (updated) {
await this.syncMetadata.setPeerPushedClock(this.remote.peer, {
docId,
timestamp: pushedClock,
});
this.statusUpdatedSubject$.next(docId);
}
},
updatePulledRemoteClock: async (docId: string, pulledClock: Date) => {
const updated = this.status.pulledRemoteClocks.setIfBigger(
docId,
pulledClock
);
if (updated) {
await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, {
docId,
timestamp: pulledClock,
});
this.statusUpdatedSubject$.next(docId);
}
},
addDoc: (docId: string) => {
if (!this.status.docs.has(docId)) {
this.status.docs.add(docId);
@@ -370,7 +371,7 @@ export class DocSyncPeer {
this.schedule({
type: 'save',
docId,
serverClock: remoteClock,
remoteClock: remoteClock,
update,
});
},
@@ -396,8 +397,6 @@ export class DocSyncPeer {
connectedDocs: new Set(),
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map(),
pulledRemoteClocks: new ClockMap(new Map()),
pushedClocks: new ClockMap(new Map()),
remoteClocks: new ClockMap(new Map()),
syncing: false,
// tell ui to show retrying status
@@ -478,7 +477,13 @@ export class DocSyncPeer {
// subscribe local doc updates
disposes.push(
this.local.subscribeDocUpdate((update, origin) => {
if (origin === this.uniqueId) {
if (
origin === this.uniqueId ||
origin?.startsWith(
`sync:${this.local.peer}:${this.remote.peer}:`
// skip if local and remote is same
)
) {
return;
}
this.events.localUpdated({
@@ -517,19 +522,6 @@ export class DocSyncPeer {
for (const [id, v] of Object.entries(cachedClocks)) {
this.status.remoteClocks.set(id, v);
}
const pulledClocks = await this.syncMetadata.getPeerPulledRemoteClocks(
this.remote.peer
);
for (const [id, v] of Object.entries(pulledClocks)) {
this.status.pulledRemoteClocks.set(id, v);
}
const pushedClocks = await this.syncMetadata.getPeerPushedClocks(
this.remote.peer
);
throwIfAborted(signal);
for (const [id, v] of Object.entries(pushedClocks)) {
this.status.pushedClocks.set(id, v);
}
this.statusUpdatedSubject$.next(true);
// get new clocks from server