feat(workspace): priority load opened page (#5156)

This commit is contained in:
EYHN
2023-12-04 11:32:10 +00:00
parent b4b4a3b625
commit 7878ce5c2c
5 changed files with 93 additions and 31 deletions

View File

@@ -3,6 +3,7 @@ import { Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import type { Storage } from '../storage';
import { SharedPriorityTarget } from '../utils/async-queue';
import { MANUALLY_STOP, SyncEngineStep } from './consts';
import { SyncPeer, type SyncPeerStatus, SyncPeerStep } from './peer';
@@ -56,6 +57,8 @@ export class SyncEngine {
this.onStatusChange.emit(s);
}
priorityTarget = new SharedPriorityTarget();
get status() {
return this._status;
}
@@ -110,7 +113,11 @@ export class SyncEngine {
const cleanUp: (() => void)[] = [];
try {
// Step 1: start local sync peer
state.localPeer = new SyncPeer(this.rootDoc, this.local);
state.localPeer = new SyncPeer(
this.rootDoc,
this.local,
this.priorityTarget
);
cleanUp.push(
state.localPeer.onStatusChange.on(() => {
@@ -126,7 +133,7 @@ export class SyncEngine {
// Step 3: start remote sync peer
state.remotePeers = this.remotes.map(remote => {
const peer = new SyncPeer(this.rootDoc, remote);
const peer = new SyncPeer(this.rootDoc, remote, this.priorityTarget);
cleanUp.push(
peer.onStatusChange.on(() => {
if (!signal.aborted)
@@ -237,4 +244,8 @@ export class SyncEngine {
]);
}
}
setPriorityRule(target: ((id: string) => boolean) | null) {
this.priorityTarget.priorityRule = target;
}
}

View File

@@ -5,7 +5,7 @@ import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import { mergeUpdates, type Storage } from '../storage';
import { AsyncQueue } from '../utils/async-queue';
import { PriorityAsyncQueue, SharedPriorityTarget } from '../utils/async-queue';
import { throwIfAborted } from '../utils/throw-if-aborted';
import { MANUALLY_STOP } from './consts';
@@ -70,7 +70,8 @@ export class SyncPeer {
constructor(
private readonly rootDoc: Doc,
private readonly storage: Storage
private readonly storage: Storage,
private readonly priorityTarget = new SharedPriorityTarget()
) {
this.logger.debug('peer start');
@@ -152,24 +153,24 @@ export class SyncPeer {
private readonly state: {
connectedDocs: Map<string, Doc>;
pushUpdatesQueue: AsyncQueue<{
docId: string;
pushUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array[];
}>;
pushingUpdate: boolean;
pullUpdatesQueue: AsyncQueue<{
docId: string;
pullUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array;
}>;
subdocLoading: boolean;
subdocsLoadQueue: AsyncQueue<Doc>;
subdocsLoadQueue: PriorityAsyncQueue<{ id: string; doc: Doc }>;
} = {
connectedDocs: new Map(),
pushUpdatesQueue: new AsyncQueue(),
pushUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
pushingUpdate: false,
pullUpdatesQueue: new AsyncQueue(),
pullUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
subdocLoading: false,
subdocsLoadQueue: new AsyncQueue(),
subdocsLoadQueue: new PriorityAsyncQueue([], this.priorityTarget),
};
initState() {
@@ -211,7 +212,10 @@ export class SyncPeer {
// Step 2: load subdocs
this.state.subdocsLoadQueue.push(
...Array.from(this.rootDoc.getSubdocs())
...Array.from(this.rootDoc.getSubdocs()).map(doc => ({
id: doc.guid,
doc,
}))
);
this.reportSyncStatus();
@@ -227,7 +231,7 @@ export class SyncPeer {
);
this.state.subdocLoading = true;
this.reportSyncStatus();
await this.connectDoc(subdoc, abortInner.signal);
await this.connectDoc(subdoc.doc, abortInner.signal);
this.state.subdocLoading = false;
this.reportSyncStatus();
}
@@ -235,7 +239,7 @@ export class SyncPeer {
// pull updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { docId, data } = await this.state.pullUpdatesQueue.next(
const { id, data } = await this.state.pullUpdatesQueue.next(
abortInner.signal
);
// don't apply empty data or Uint8Array([0, 0])
@@ -245,7 +249,7 @@ export class SyncPeer {
(data.byteLength === 2 && data[0] === 0 && data[1] === 0)
)
) {
const subdoc = this.state.connectedDocs.get(docId);
const subdoc = this.state.connectedDocs.get(id);
if (subdoc) {
applyUpdate(subdoc, data, this.name);
}
@@ -256,7 +260,7 @@ export class SyncPeer {
// push updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { docId, data } = await this.state.pushUpdatesQueue.next(
const { id, data } = await this.state.pushUpdatesQueue.next(
abortInner.signal
);
this.state.pushingUpdate = true;
@@ -271,7 +275,7 @@ export class SyncPeer {
(merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0)
)
) {
await this.storage.push(docId, merged);
await this.storage.push(id, merged);
}
this.state.pushingUpdate = false;
@@ -299,7 +303,7 @@ export class SyncPeer {
// diff root doc and in-storage, save updates to pendingUpdates
this.state.pushUpdatesQueue.push({
docId: doc.guid,
id: doc.guid,
data: [encodeStateAsUpdate(doc, inStorageState)],
});
@@ -327,14 +331,12 @@ export class SyncPeer {
return;
}
const exist = this.state.pushUpdatesQueue.find(
({ docId }) => docId === doc.guid
);
const exist = this.state.pushUpdatesQueue.find(({ id }) => id === doc.guid);
if (exist) {
exist.data.push(update);
} else {
this.state.pushUpdatesQueue.push({
docId: doc.guid,
id: doc.guid,
data: [update],
});
}
@@ -351,20 +353,20 @@ export class SyncPeer {
removed: Set<Doc>;
}) => {
for (const subdoc of added) {
this.state.subdocsLoadQueue.push(subdoc);
this.state.subdocsLoadQueue.push({ id: subdoc.guid, doc: subdoc });
}
for (const subdoc of removed) {
this.disconnectDoc(subdoc);
this.state.subdocsLoadQueue.remove(doc => doc === subdoc);
this.state.subdocsLoadQueue.remove(doc => doc.doc === subdoc);
}
this.reportSyncStatus();
};
// handle updates from storage
handleStorageUpdates = (docId: string, data: Uint8Array) => {
handleStorageUpdates = (id: string, data: Uint8Array) => {
this.state.pullUpdatesQueue.push({
docId,
id,
data,
});
this.reportSyncStatus();

View File

@@ -12,8 +12,11 @@ export class AsyncQueue<T> {
return this._queue.length;
}
async next(abort?: AbortSignal): Promise<T> {
const update = this._queue.shift();
async next(
abort?: AbortSignal,
dequeue: (arr: T[]) => T | undefined = a => a.shift()
): Promise<T> {
const update = dequeue(this._queue);
if (update) {
return update;
} else {
@@ -35,7 +38,7 @@ export class AsyncQueue<T> {
}),
]);
return this.next(abort);
return this.next(abort, dequeue);
}
}
@@ -64,3 +67,35 @@ export class AsyncQueue<T> {
this._queue = [];
}
}
export class PriorityAsyncQueue<
T extends { id: string },
> extends AsyncQueue<T> {
constructor(
init: T[] = [],
public readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget()
) {
super(init);
}
override next(abort?: AbortSignal | undefined): Promise<T> {
return super.next(abort, arr => {
if (this.priorityTarget.priorityRule !== null) {
const index = arr.findIndex(
update => this.priorityTarget.priorityRule?.(update.id)
);
if (index !== -1) {
return arr.splice(index, 1)[0];
}
}
return arr.shift();
});
}
}
/**
* Shared priority target can be shared by multiple queues.
*/
export class SharedPriorityTarget {
public priorityRule: ((id: string) => boolean) | null = null;
}