feat: support get datasource status (#3645)

This commit is contained in:
Alex Yang
2023-08-10 01:05:34 -04:00
committed by GitHub
parent 05144abd6a
commit dafd5619e6
34 changed files with 836 additions and 46 deletions

View File

@@ -7,7 +7,8 @@ import {
encodeStateVectorFromUpdate,
} from 'yjs';
import type { DatasourceDocAdapter } from './types';
import type { DatasourceDocAdapter, StatusAdapter } from './types';
import type { Status } from './types';
function getDoc(doc: Doc, guid: string): Doc | undefined {
if (doc.guid === guid) {
@@ -33,7 +34,7 @@ export const createLazyProvider = (
rootDoc: Doc,
datasource: DatasourceDocAdapter,
options: LazyProviderOptions = {}
): Omit<PassiveDocProvider, 'flavour'> => {
): Omit<PassiveDocProvider, 'flavour'> & StatusAdapter => {
let connected = false;
const pendingMap = new Map<string, Uint8Array[]>(); // guid -> pending-updates
const disposableMap = new Map<string, Set<() => void>>();
@@ -42,11 +43,59 @@ export const createLazyProvider = (
const { origin = 'lazy-provider' } = options;
// todo: should we use a real state machine here like `xstate`?
let currentStatus: Status = {
type: 'idle',
};
let syncingStack = 0;
const callbackSet = new Set<() => void>();
const changeStatus = (newStatus: Status) => {
// simulate a stack, each syncing and synced should be paired
if (newStatus.type === 'idle') {
if (syncingStack !== 0) {
console.error('syncingStatus !== 0, this should not happen');
}
syncingStack = 0;
}
if (newStatus.type === 'syncing') {
syncingStack++;
}
if (newStatus.type === 'synced' || newStatus.type === 'error') {
syncingStack--;
}
if (syncingStack < 0) {
console.error('syncingStatus < 0, this should not happen');
}
if (syncingStack === 0) {
currentStatus = newStatus;
}
if (newStatus.type !== 'synced') {
currentStatus = newStatus;
}
callbackSet.forEach(cb => cb());
};
async function syncDoc(doc: Doc) {
const guid = doc.guid;
const remoteUpdate = await datasource.queryDocState(guid, {
stateVector: encodeStateVector(doc),
changeStatus({
type: 'syncing',
});
const remoteUpdate = await datasource
.queryDocState(guid, {
stateVector: encodeStateVector(doc),
})
.catch(error => {
changeStatus({
type: 'error',
error,
});
throw error;
});
changeStatus({
type: 'synced',
});
pendingMap.set(guid, []);
@@ -59,6 +108,9 @@ export const createLazyProvider = (
? encodeStateVectorFromUpdate(remoteUpdate)
: undefined;
if (!connected) {
return;
}
// perf: optimize me
// it is possible the doc is only in memory but not yet in the datasource
// we need to send the whole update to the datasource
@@ -76,7 +128,23 @@ export const createLazyProvider = (
if (origin === updateOrigin) {
return;
}
datasource.sendDocUpdate(doc.guid, update).catch(console.error);
changeStatus({
type: 'syncing',
});
datasource
.sendDocUpdate(doc.guid, update)
.then(() => {
changeStatus({
type: 'synced',
});
})
.catch(error => {
changeStatus({
type: 'error',
error,
});
console.error(error);
});
};
const subdocsHandler = (event: { loaded: Set<Doc>; removed: Set<Doc> }) => {
@@ -103,6 +171,9 @@ export const createLazyProvider = (
*/
function setupDatasourceListeners() {
datasourceUnsub = datasource.onDocUpdate?.((guid, update) => {
changeStatus({
type: 'syncing',
});
const doc = getDoc(rootDoc, guid);
if (doc) {
applyUpdate(doc, update, origin);
@@ -120,6 +191,9 @@ export const createLazyProvider = (
console.warn('idb: doc not found', guid);
pendingMap.set(guid, (pendingMap.get(guid) ?? []).concat(update));
}
changeStatus({
type: 'synced',
});
});
}
@@ -165,20 +239,44 @@ export const createLazyProvider = (
function connect() {
connected = true;
changeStatus({
type: 'syncing',
});
// root doc should be already loaded,
// but we want to populate the cache for later update events
connectDoc(rootDoc).catch(console.error);
connectDoc(rootDoc).catch(error => {
changeStatus({
type: 'error',
error,
});
console.error(error);
});
changeStatus({
type: 'synced',
});
setupDatasourceListeners();
}
async function disconnect() {
connected = false;
changeStatus({
type: 'idle',
});
disposeAll();
datasourceUnsub?.();
datasourceUnsub = undefined;
}
return {
get status() {
return currentStatus;
},
subscribeStatusChange(cb: () => void) {
callbackSet.add(cb);
return () => {
callbackSet.delete(cb);
};
},
get connected() {
return connected;
},

View File

@@ -1,4 +1,24 @@
export interface DatasourceDocAdapter {
export type Status =
| {
type: 'idle';
}
| {
type: 'syncing';
}
| {
type: 'synced';
}
| {
type: 'error';
error: Error;
};
export interface StatusAdapter {
readonly status: Status;
subscribeStatusChange(onStatusChange: () => void): () => void;
}
export interface DatasourceDocAdapter extends Partial<StatusAdapter> {
// request diff update from other clients
queryDocState: (
guid: string,