feat(nbstore): add doc sync frontend (#9070)

This commit is contained in:
EYHN
2024-12-11 07:53:25 +00:00
parent eee0ed45ee
commit 331e674e8b
8 changed files with 499 additions and 58 deletions

View File

@@ -0,0 +1,318 @@
import { groupBy } from 'lodash-es';
import { nanoid } from 'nanoid';
import { Subject } from 'rxjs';
import {
applyUpdate,
type Doc as YDoc,
encodeStateAsUpdate,
mergeUpdates,
} from 'yjs';
import type { DocRecord, DocStorage } from '../storage';
import type { DocSyncEngine } from '../sync/doc';
import { AsyncPriorityQueue } from '../utils/async-priority-queue';
import { isEmptyUpdate } from '../utils/is-empty-update';
import { throwIfAborted } from '../utils/throw-if-aborted';
const NBSTORE_ORIGIN = 'nbstore-frontend';
type Job =
| {
type: 'load';
docId: string;
}
| {
type: 'save';
docId: string;
update: Uint8Array;
}
| {
type: 'apply';
docId: string;
update: Uint8Array;
};
interface DocFrontendOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}
export class DocFrontend {
private readonly uniqueId = `frontend:${this.storage.peer}:${nanoid()}`;
private readonly prioritySettings = new Map<string, number>();
private readonly status = {
docs: new Map<string, YDoc>(),
connectedDocs: new Set<string>(),
readyDocs: new Set<string>(),
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map<string, Job[]>(),
currentJob: null as { docId: string; jobs: Job[] } | null,
};
private readonly statusUpdatedSubject$ = new Subject<string>();
private readonly abort = new AbortController();
constructor(
private readonly storage: DocStorage,
private readonly sync: DocSyncEngine | null,
readonly options: DocFrontendOptions = {}
) {}
start() {
if (this.abort.signal.aborted) {
throw new Error('doc frontend can only start once');
}
this.mainLoop(this.abort.signal).catch(error => {
console.error(error);
});
}
stop() {
this.abort.abort();
}
private async mainLoop(signal?: AbortSignal) {
const dispose = this.storage.subscribeDocUpdate((record, origin) => {
this.event.onStorageUpdate(record, origin);
});
try {
// wait for storage to connect
await Promise.race([
this.storage.connection.waitForConnected(signal),
new Promise((_, reject) => {
signal?.addEventListener('abort', reason => {
reject(reason);
});
}),
]);
// eslint-disable-next-line no-constant-condition
while (true) {
throwIfAborted(signal);
const docId = await this.status.jobDocQueue.asyncPop(signal);
const jobs = this.status.jobMap.get(docId);
this.status.jobMap.delete(docId);
if (!jobs) {
this.statusUpdatedSubject$.next(docId);
continue;
}
this.status.currentJob = { docId, jobs };
this.statusUpdatedSubject$.next(docId);
const { apply, load, save } = groupBy(jobs, job => job.type) as {
[key in Job['type']]?: Job[];
};
if (load?.length) {
await this.jobs.load(load[0] as any, signal);
}
for (const applyJob of apply ?? []) {
await this.jobs.apply(applyJob as any, signal);
}
if (save?.length) {
await this.jobs.save(docId, save as any, signal);
}
}
} finally {
dispose();
}
}
/**
* Add a doc to the frontend, the doc will sync with the doc storage.
* @param doc - The doc to add
* @param withSubDoc - Whether to add the subdocs of the doc
*/
addDoc(doc: YDoc, withSubDoc: boolean = false) {
this._addDoc(doc);
if (withSubDoc) {
doc.on('subdocs', ({ loaded }) => {
for (const subdoc of loaded) {
this._addDoc(subdoc);
}
});
}
}
readonly jobs = {
load: async (job: Job & { type: 'load' }, signal?: AbortSignal) => {
const doc = this.status.docs.get(job.docId);
if (!doc) {
return;
}
const existingData = encodeStateAsUpdate(doc);
if (!isEmptyUpdate(existingData)) {
this.schedule({
type: 'save',
docId: doc.guid,
update: existingData,
});
}
// mark doc as loaded
doc.emit('sync', [true, doc]);
this.status.connectedDocs.add(job.docId);
this.statusUpdatedSubject$.next(job.docId);
const docRecord = await this.storage.getDoc(job.docId);
throwIfAborted(signal);
if (!docRecord || isEmptyUpdate(docRecord.bin)) {
return;
}
this.applyUpdate(job.docId, docRecord.bin);
this.status.readyDocs.add(job.docId);
this.statusUpdatedSubject$.next(job.docId);
},
save: async (
docId: string,
jobs: (Job & { type: 'save' })[],
signal?: AbortSignal
) => {
if (!this.status.docs.has(docId)) {
return;
}
if (this.status.connectedDocs.has(docId)) {
const merged = await this.mergeUpdates(
jobs.map(j => j.update).filter(update => !isEmptyUpdate(update))
);
throwIfAborted(signal);
await this.storage.pushDocUpdate(
{
docId,
bin: merged,
},
this.uniqueId
);
}
},
apply: async (job: Job & { type: 'apply' }, signal?: AbortSignal) => {
throwIfAborted(signal);
if (!this.status.docs.has(job.docId)) {
return;
}
if (this.status.connectedDocs.has(job.docId)) {
this.applyUpdate(job.docId, job.update);
}
if (!isEmptyUpdate(job.update)) {
this.status.readyDocs.add(job.docId);
this.statusUpdatedSubject$.next(job.docId);
}
},
};
event = {
onStorageUpdate: (update: DocRecord, origin?: string) => {
if (origin !== this.uniqueId) {
this.schedule({
type: 'apply',
docId: update.docId,
update: update.bin,
});
}
},
};
/**
* Remove a doc from the frontend, the doc will stop syncing with the doc storage.
* It's not recommended to use this method directly, better to use `doc.destroy()`.
*
* @param doc - The doc to remove
*/
removeDoc(doc: YDoc) {
this.status.docs.delete(doc.guid);
this.status.connectedDocs.delete(doc.guid);
this.status.readyDocs.delete(doc.guid);
this.status.jobDocQueue.remove(doc.guid);
this.status.jobMap.delete(doc.guid);
this.statusUpdatedSubject$.next(doc.guid);
doc.off('update', this.handleDocUpdate);
}
addPriority(id: string, priority: number) {
const undoSyncPriority = this.sync?.addPriority(id, priority);
const oldPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, priority);
this.status.jobDocQueue.setPriority(id, oldPriority + priority);
return () => {
const currentPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, currentPriority - priority);
this.status.jobDocQueue.setPriority(id, currentPriority - priority);
undoSyncPriority?.();
};
}
private _addDoc(doc: YDoc) {
this.schedule({
type: 'load',
docId: doc.guid,
});
this.status.docs.set(doc.guid, doc);
this.statusUpdatedSubject$.next(doc.guid);
doc.on('update', this.handleDocUpdate);
doc.on('destroy', () => {
this.removeDoc(doc);
});
}
private schedule(job: Job) {
const priority = this.prioritySettings.get(job.docId) ?? 0;
this.status.jobDocQueue.push(job.docId, priority);
const existingJobs = this.status.jobMap.get(job.docId) ?? [];
existingJobs.push(job);
this.status.jobMap.set(job.docId, existingJobs);
this.statusUpdatedSubject$.next(job.docId);
}
applyUpdate(docId: string, update: Uint8Array) {
const doc = this.status.docs.get(docId);
if (doc && !isEmptyUpdate(update)) {
try {
applyUpdate(doc, update, NBSTORE_ORIGIN);
} catch (err) {
console.error('failed to apply update yjs doc', err);
}
}
}
private readonly handleDocUpdate = (
update: Uint8Array,
origin: any,
doc: YDoc
) => {
if (origin === NBSTORE_ORIGIN) {
return;
}
if (!this.status.docs.has(doc.guid)) {
return;
}
this.schedule({
type: 'save',
docId: doc.guid,
update,
});
};
protected mergeUpdates(updates: Uint8Array[]) {
const merge = this.options?.mergeUpdates ?? mergeUpdates;
return merge(updates.filter(bin => !isEmptyUpdate(bin)));
}
}