From 869d98d019d9003a3c50fac81dcc293e229a5780 Mon Sep 17 00:00:00 2001 From: Peng Xiao Date: Fri, 21 Jul 2023 13:23:18 +0800 Subject: [PATCH] perf: lazy doc provider factory (#3330) Co-authored-by: Alex Yang --- .../src/providers/datasource-doc-adapter.ts | 19 ++ .../workspace/src/providers/lazy-provider.ts | 148 ++++++++++++++ packages/y-provider/README.md | 8 + packages/y-provider/package.json | 17 ++ .../y-provider/src/__tests__/index.spec.ts | 181 +++++++++++++++++ packages/y-provider/src/index.ts | 2 + packages/y-provider/src/lazy-provider.ts | 182 ++++++++++++++++++ packages/y-provider/src/types.ts | 19 ++ packages/y-provider/src/utils.ts | 14 ++ packages/y-provider/tsconfig.json | 9 + yarn.lock | 10 + 11 files changed, 609 insertions(+) create mode 100644 packages/workspace/src/providers/datasource-doc-adapter.ts create mode 100644 packages/workspace/src/providers/lazy-provider.ts create mode 100644 packages/y-provider/README.md create mode 100644 packages/y-provider/package.json create mode 100644 packages/y-provider/src/__tests__/index.spec.ts create mode 100644 packages/y-provider/src/index.ts create mode 100644 packages/y-provider/src/lazy-provider.ts create mode 100644 packages/y-provider/src/types.ts create mode 100644 packages/y-provider/src/utils.ts create mode 100644 packages/y-provider/tsconfig.json diff --git a/packages/workspace/src/providers/datasource-doc-adapter.ts b/packages/workspace/src/providers/datasource-doc-adapter.ts new file mode 100644 index 0000000000..c0af541c75 --- /dev/null +++ b/packages/workspace/src/providers/datasource-doc-adapter.ts @@ -0,0 +1,19 @@ +export interface DatasourceDocAdapter { + // request diff update from other clients + queryDocState: ( + guid: string, + options?: { + stateVector?: Uint8Array; + targetClientId?: number; + } + ) => Promise; + + // send update to the datasource + sendDocUpdate: (guid: string, update: Uint8Array) => Promise; + + // listen to update from the datasource. Returns a function to unsubscribe. + // this is optional because some datasource might not support it + onDocUpdate?( + callback: (guid: string, update: Uint8Array) => void + ): () => void; +} diff --git a/packages/workspace/src/providers/lazy-provider.ts b/packages/workspace/src/providers/lazy-provider.ts new file mode 100644 index 0000000000..376c61669c --- /dev/null +++ b/packages/workspace/src/providers/lazy-provider.ts @@ -0,0 +1,148 @@ +import type { PassiveDocProvider } from '@blocksuite/store'; +import { + applyUpdate, + type Doc, + encodeStateAsUpdate, + encodeStateVectorFromUpdate, +} from 'yjs'; + +import type { DatasourceDocAdapter } from './datasource-doc-adapter'; + +const selfUpdateOrigin = 'lazy-provider-self-origin'; + +function getDoc(doc: Doc, guid: string): Doc | undefined { + if (doc.guid === guid) { + return doc; + } + for (const subdoc of doc.subdocs) { + const found = getDoc(subdoc, guid); + if (found) { + return found; + } + } + return undefined; +} + +/** + * Creates a lazy provider that connects to a datasource and synchronizes a root document. + */ +export const createLazyProvider = ( + rootDoc: Doc, + datasource: DatasourceDocAdapter +): Omit => { + let connected = false; + const pendingMap = new Map(); // guid -> pending-updates + const disposableMap = new Map void>>(); + let datasourceUnsub: (() => void) | undefined; + + async function syncDoc(doc: Doc) { + const guid = doc.guid; + // perf: optimize me + const currentUpdate = encodeStateAsUpdate(doc); + + const remoteUpdate = await datasource.queryDocState(guid, { + stateVector: encodeStateVectorFromUpdate(currentUpdate), + }); + + const updates = [currentUpdate]; + pendingMap.set(guid, []); + + if (remoteUpdate) { + applyUpdate(doc, remoteUpdate, selfUpdateOrigin); + const newUpdate = encodeStateAsUpdate( + doc, + encodeStateVectorFromUpdate(remoteUpdate) + ); + updates.push(newUpdate); + await datasource.sendDocUpdate(guid, newUpdate); + } + } + + function setupDocListener(doc: Doc) { + const disposables = new Set<() => void>(); + disposableMap.set(doc.guid, disposables); + const updateHandler = async (update: Uint8Array, origin: unknown) => { + if (origin === selfUpdateOrigin) { + return; + } + datasource.sendDocUpdate(doc.guid, update).catch(console.error); + }; + + const subdocLoadHandler = (event: { loaded: Set }) => { + event.loaded.forEach(subdoc => { + connectDoc(subdoc).catch(console.error); + }); + }; + + doc.on('update', updateHandler); + doc.on('subdocs', subdocLoadHandler); + // todo: handle destroy? + disposables.add(() => { + doc.off('update', updateHandler); + doc.off('subdocs', subdocLoadHandler); + }); + } + + function setupDatasourceListeners() { + datasourceUnsub = datasource.onDocUpdate?.((guid, update) => { + const doc = getDoc(rootDoc, guid); + if (doc) { + applyUpdate(doc, update); + // + if (pendingMap.has(guid)) { + pendingMap.get(guid)?.forEach(update => applyUpdate(doc, update)); + pendingMap.delete(guid); + } + } else { + // This case happens when the father doc is not yet updated, + // so that the child doc is not yet created. + // We need to put it into cache so that it can be applied later. + console.warn('idb: doc not found', guid); + pendingMap.set(guid, (pendingMap.get(guid) ?? []).concat(update)); + } + }); + } + + // when a subdoc is loaded, we need to sync it with the datasource and setup listeners + async function connectDoc(doc: Doc) { + setupDocListener(doc); + await syncDoc(doc); + await Promise.all( + [...doc.subdocs] + .filter(subdoc => subdoc.shouldLoad) + .map(subdoc => connectDoc(subdoc)) + ); + } + + function disposeAll() { + disposableMap.forEach(disposables => { + disposables.forEach(dispose => dispose()); + }); + disposableMap.clear(); + } + + function connect() { + connected = true; + + // root doc should be already loaded, + // but we want to populate the cache for later update events + connectDoc(rootDoc).catch(console.error); + setupDatasourceListeners(); + } + + async function disconnect() { + connected = false; + disposeAll(); + datasourceUnsub?.(); + datasourceUnsub = undefined; + } + + return { + get connected() { + return connected; + }, + passive: true, + connect, + disconnect, + }; +}; diff --git a/packages/y-provider/README.md b/packages/y-provider/README.md new file mode 100644 index 0000000000..91338abb2b --- /dev/null +++ b/packages/y-provider/README.md @@ -0,0 +1,8 @@ +# A set of provider utilities for Yjs + +## createLazyProvider + +A factory function to create a lazy provider. It will not download the document from the provider until the first time a document is loaded at the parent doc. + +To use it, first define a `DatasourceDocAdapter`. +Then, create a `LazyProvider` with `createLazyProvider(rootDoc, datasource)`. diff --git a/packages/y-provider/package.json b/packages/y-provider/package.json new file mode 100644 index 0000000000..56b1264c4f --- /dev/null +++ b/packages/y-provider/package.json @@ -0,0 +1,17 @@ +{ + "name": "@affine/y-provider", + "type": "module", + "version": "0.7.0-canary.47", + "description": "Yjs provider utilities for AFFiNE", + "exports": { + ".": "./src/index.ts" + }, + "main": "./src/index.ts", + "module": "./src/index.ts", + "devDependencies": { + "@blocksuite/store": "0.0.0-20230719163314-76d863fc-nightly" + }, + "peerDependencies": { + "yjs": "^13.5.51" + } +} diff --git a/packages/y-provider/src/__tests__/index.spec.ts b/packages/y-provider/src/__tests__/index.spec.ts new file mode 100644 index 0000000000..f8f068b475 --- /dev/null +++ b/packages/y-provider/src/__tests__/index.spec.ts @@ -0,0 +1,181 @@ +import { setTimeout } from 'node:timers/promises'; + +import { describe, expect, test } from 'vitest'; +import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs'; + +import { createLazyProvider } from '../lazy-provider'; +import type { DatasourceDocAdapter } from '../types'; +import { getDoc } from '../utils'; + +const createMemoryDatasource = (rootDoc: Doc) => { + const selfUpdateOrigin = Symbol('self-origin'); + const listeners = new Set<(guid: string, update: Uint8Array) => void>(); + + function trackDoc(doc: Doc) { + doc.on('update', (update, origin) => { + if (origin === selfUpdateOrigin) { + return; + } + for (const listener of listeners) { + listener(doc.guid, update); + } + }); + + doc.on('subdocs', () => { + for (const subdoc of rootDoc.subdocs) { + trackDoc(subdoc); + } + }); + } + + trackDoc(rootDoc); + + const adapter = { + queryDocState: async (guid, options) => { + const subdoc = getDoc(rootDoc, guid); + if (!subdoc) { + return false; + } + return encodeStateAsUpdate(subdoc, options?.stateVector); + }, + sendDocUpdate: async (guid, update) => { + const subdoc = getDoc(rootDoc, guid); + if (!subdoc) { + return; + } + applyUpdate(subdoc, update, selfUpdateOrigin); + }, + onDocUpdate: callback => { + listeners.add(callback); + return () => { + listeners.delete(callback); + }; + }, + } satisfies DatasourceDocAdapter; + return { + rootDoc, // expose rootDoc for testing + ...adapter, + }; +}; + +describe('y-provider', () => { + test('should sync a subdoc if it is loaded after connect', async () => { + const remoteRootDoc = new Doc(); // this is the remote doc lives in remote + const datasource = createMemoryDatasource(remoteRootDoc); + + const remotesubdoc = new Doc(); + remotesubdoc.getText('text').insert(0, 'test-subdoc-value'); + // populate remote doc with simple data + remoteRootDoc.getMap('map').set('test-0', 'test-0-value'); + remoteRootDoc.getMap('map').set('subdoc', remotesubdoc); + + const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync + const provider = createLazyProvider(rootDoc, datasource); + + provider.connect(); + + await setTimeout(); // wait for the provider to sync + + const subdoc = rootDoc.getMap('map').get('subdoc') as Doc; + + expect(rootDoc.getMap('map').get('test-0')).toBe('test-0-value'); + expect(subdoc.getText('text').toJSON()).toBe(''); + + // onload, the provider should sync the subdoc + subdoc.load(); + await setTimeout(); + expect(subdoc.getText('text').toJSON()).toBe('test-subdoc-value'); + + remotesubdoc.getText('text').insert(0, 'prefix-'); + await setTimeout(); + expect(subdoc.getText('text').toJSON()).toBe('prefix-test-subdoc-value'); + }); + + test('should sync a shouldLoad=true subdoc on connect', async () => { + const remoteRootDoc = new Doc(); // this is the remote doc lives in remote + const datasource = createMemoryDatasource(remoteRootDoc); + + const remotesubdoc = new Doc(); + remotesubdoc.getText('text').insert(0, 'test-subdoc-value'); + + // populate remote doc with simple data + remoteRootDoc.getMap('map').set('test-0', 'test-0-value'); + remoteRootDoc.getMap('map').set('subdoc', remotesubdoc); + + const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync + applyUpdate(rootDoc, encodeStateAsUpdate(remoteRootDoc)); // sync rootDoc with remoteRootDoc + + const subdoc = rootDoc.getMap('map').get('subdoc') as Doc; + expect(subdoc.getText('text').toJSON()).toBe(''); + + subdoc.load(); + const provider = createLazyProvider(rootDoc, datasource); + + provider.connect(); + await setTimeout(); // wait for the provider to sync + expect(subdoc.getText('text').toJSON()).toBe('test-subdoc-value'); + }); + + test('should send existing local update to remote on connect', async () => { + const remoteRootDoc = new Doc(); // this is the remote doc lives in remote + const datasource = createMemoryDatasource(remoteRootDoc); + + const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync + applyUpdate(rootDoc, encodeStateAsUpdate(remoteRootDoc)); // sync rootDoc with remoteRootDoc + + rootDoc.getText('text').insert(0, 'test-value'); + const provider = createLazyProvider(rootDoc, datasource); + provider.connect(); + await setTimeout(); // wait for the provider to sync + + expect(remoteRootDoc.getText('text').toJSON()).toBe('test-value'); + }); + + test('should send local update to remote for subdoc after connect', async () => { + const remoteRootDoc = new Doc(); // this is the remote doc lives in remote + const datasource = createMemoryDatasource(remoteRootDoc); + + const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync + const provider = createLazyProvider(rootDoc, datasource); + + provider.connect(); + + await setTimeout(); // wait for the provider to sync + + const subdoc = new Doc(); + rootDoc.getMap('map').set('subdoc', subdoc); + subdoc.getText('text').insert(0, 'test-subdoc-value'); + + await setTimeout(); // wait for the provider to sync + + const remoteSubdoc = remoteRootDoc.getMap('map').get('subdoc') as Doc; + expect(remoteSubdoc.getText('text').toJSON()).toBe('test-subdoc-value'); + }); + + test('should not send local update to remote for subdoc after disconnect', async () => { + const remoteRootDoc = new Doc(); // this is the remote doc lives in remote + const datasource = createMemoryDatasource(remoteRootDoc); + + const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync + const provider = createLazyProvider(rootDoc, datasource); + + provider.connect(); + + await setTimeout(); // wait for the provider to sync + + const subdoc = new Doc(); + rootDoc.getMap('map').set('subdoc', subdoc); + + await setTimeout(); // wait for the provider to sync + + const remoteSubdoc = remoteRootDoc.getMap('map').get('subdoc') as Doc; + expect(remoteSubdoc.getText('text').toJSON()).toBe(''); + + provider.disconnect(); + subdoc.getText('text').insert(0, 'test-subdoc-value'); + setTimeout(); + expect(remoteSubdoc.getText('text').toJSON()).toBe(''); + + expect(provider.connected).toBe(false); + }); +}); diff --git a/packages/y-provider/src/index.ts b/packages/y-provider/src/index.ts new file mode 100644 index 0000000000..2397bb712e --- /dev/null +++ b/packages/y-provider/src/index.ts @@ -0,0 +1,2 @@ +export * from './lazy-provider'; +export * from './types'; diff --git a/packages/y-provider/src/lazy-provider.ts b/packages/y-provider/src/lazy-provider.ts new file mode 100644 index 0000000000..241f428229 --- /dev/null +++ b/packages/y-provider/src/lazy-provider.ts @@ -0,0 +1,182 @@ +import type { PassiveDocProvider } from '@blocksuite/store'; +import { + applyUpdate, + type Doc, + encodeStateAsUpdate, + encodeStateVectorFromUpdate, +} from 'yjs'; + +import type { DatasourceDocAdapter } from './types'; + +const selfUpdateOrigin = 'lazy-provider-self-origin'; + +function getDoc(doc: Doc, guid: string): Doc | undefined { + if (doc.guid === guid) { + return doc; + } + for (const subdoc of doc.subdocs) { + const found = getDoc(subdoc, guid); + if (found) { + return found; + } + } + return undefined; +} + +/** + * Creates a lazy provider that connects to a datasource and synchronizes a root document. + */ +export const createLazyProvider = ( + rootDoc: Doc, + datasource: DatasourceDocAdapter +): Omit => { + let connected = false; + const pendingMap = new Map(); // guid -> pending-updates + const disposableMap = new Map void>>(); + const connectedDocs = new Set(); + let datasourceUnsub: (() => void) | undefined; + + async function syncDoc(doc: Doc) { + const guid = doc.guid; + // perf: optimize me + const currentUpdate = encodeStateAsUpdate(doc); + + const remoteUpdate = await datasource.queryDocState(guid, { + stateVector: encodeStateVectorFromUpdate(currentUpdate), + }); + + const updates = [currentUpdate]; + pendingMap.set(guid, []); + + if (remoteUpdate) { + applyUpdate(doc, remoteUpdate, selfUpdateOrigin); + const newUpdate = encodeStateAsUpdate( + doc, + encodeStateVectorFromUpdate(remoteUpdate) + ); + updates.push(newUpdate); + await datasource.sendDocUpdate(guid, newUpdate); + } + } + + /** + * Sets up event listeners for a Yjs document. + * @param doc - The Yjs document to set up listeners for. + */ + function setupDocListener(doc: Doc) { + const disposables = new Set<() => void>(); + disposableMap.set(doc.guid, disposables); + const updateHandler = async (update: Uint8Array, origin: unknown) => { + if (origin === selfUpdateOrigin) { + return; + } + datasource.sendDocUpdate(doc.guid, update).catch(console.error); + }; + + const subdocLoadHandler = (event: { + loaded: Set; + removed: Set; + }) => { + event.loaded.forEach(subdoc => { + connectDoc(subdoc).catch(console.error); + }); + event.removed.forEach(subdoc => { + disposeDoc(subdoc); + }); + }; + + doc.on('update', updateHandler); + doc.on('subdocs', subdocLoadHandler); + // todo: handle destroy? + disposables.add(() => { + doc.off('update', updateHandler); + doc.off('subdocs', subdocLoadHandler); + }); + } + + /** + * Sets up event listeners for the datasource. + * Specifically, listens for updates to documents and applies them to the corresponding Yjs document. + */ + function setupDatasourceListeners() { + datasourceUnsub = datasource.onDocUpdate?.((guid, update) => { + const doc = getDoc(rootDoc, guid); + if (doc) { + applyUpdate(doc, update); + // + if (pendingMap.has(guid)) { + pendingMap.get(guid)?.forEach(update => applyUpdate(doc, update)); + pendingMap.delete(guid); + } + } else { + // This case happens when the father doc is not yet updated, + // so that the child doc is not yet created. + // We need to put it into cache so that it can be applied later. + console.warn('idb: doc not found', guid); + pendingMap.set(guid, (pendingMap.get(guid) ?? []).concat(update)); + } + }); + } + + // when a subdoc is loaded, we need to sync it with the datasource and setup listeners + async function connectDoc(doc: Doc) { + // skip if already connected + if (connectedDocs.has(doc.guid)) { + return; + } + connectedDocs.add(doc.guid); + setupDocListener(doc); + await syncDoc(doc); + await Promise.all( + [...doc.subdocs] + .filter(subdoc => subdoc.shouldLoad) + .map(subdoc => connectDoc(subdoc)) + ); + } + + function disposeDoc(doc: Doc) { + connectedDocs.delete(doc.guid); + const disposables = disposableMap.get(doc.guid); + if (disposables) { + disposables.forEach(dispose => dispose()); + disposableMap.delete(doc.guid); + } + // also dispose all subdocs + doc.subdocs.forEach(disposeDoc); + } + + function disposeAll() { + disposableMap.forEach(disposables => { + disposables.forEach(dispose => dispose()); + }); + disposableMap.clear(); + } + + /** + * Connects to the datasource and sets up event listeners for document updates. + */ + function connect() { + connected = true; + + // root doc should be already loaded, + // but we want to populate the cache for later update events + connectDoc(rootDoc).catch(console.error); + setupDatasourceListeners(); + } + + async function disconnect() { + connected = false; + disposeAll(); + datasourceUnsub?.(); + datasourceUnsub = undefined; + } + + return { + get connected() { + return connected; + }, + passive: true, + connect, + disconnect, + }; +}; diff --git a/packages/y-provider/src/types.ts b/packages/y-provider/src/types.ts new file mode 100644 index 0000000000..c0af541c75 --- /dev/null +++ b/packages/y-provider/src/types.ts @@ -0,0 +1,19 @@ +export interface DatasourceDocAdapter { + // request diff update from other clients + queryDocState: ( + guid: string, + options?: { + stateVector?: Uint8Array; + targetClientId?: number; + } + ) => Promise; + + // send update to the datasource + sendDocUpdate: (guid: string, update: Uint8Array) => Promise; + + // listen to update from the datasource. Returns a function to unsubscribe. + // this is optional because some datasource might not support it + onDocUpdate?( + callback: (guid: string, update: Uint8Array) => void + ): () => void; +} diff --git a/packages/y-provider/src/utils.ts b/packages/y-provider/src/utils.ts new file mode 100644 index 0000000000..2be791444e --- /dev/null +++ b/packages/y-provider/src/utils.ts @@ -0,0 +1,14 @@ +import type { Doc } from 'yjs'; + +export function getDoc(doc: Doc, guid: string): Doc | undefined { + if (doc.guid === guid) { + return doc; + } + for (const subdoc of doc.subdocs) { + const found = getDoc(subdoc, guid); + if (found) { + return found; + } + } + return undefined; +} diff --git a/packages/y-provider/tsconfig.json b/packages/y-provider/tsconfig.json new file mode 100644 index 0000000000..8034f93ce4 --- /dev/null +++ b/packages/y-provider/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + "composite": true, + "noEmit": false, + "outDir": "lib" + } +} diff --git a/yarn.lock b/yarn.lock index 31d6a3eb34..6409fd333d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -636,6 +636,16 @@ __metadata: languageName: unknown linkType: soft +"@affine/y-provider@workspace:packages/y-provider": + version: 0.0.0-use.local + resolution: "@affine/y-provider@workspace:packages/y-provider" + dependencies: + "@blocksuite/store": 0.0.0-20230719163314-76d863fc-nightly + peerDependencies: + yjs: ^13.5.51 + languageName: unknown + linkType: soft + "@alloc/quick-lru@npm:^5.2.0": version: 5.2.0 resolution: "@alloc/quick-lru@npm:5.2.0"