diff --git a/libs/datasource/jwt-rpc/src/indexeddb.ts b/libs/datasource/jwt-rpc/src/indexeddb.ts index bb38fb8a88..512daea6d4 100644 --- a/libs/datasource/jwt-rpc/src/indexeddb.ts +++ b/libs/datasource/jwt-rpc/src/indexeddb.ts @@ -134,7 +134,7 @@ export class IndexedDBProvider extends Observable { } /** - * Destroys this instance and removes all data from SQLite. + * Destroys this instance and removes all data from indexeddb. * * @return {Promise} */ diff --git a/libs/datasource/jwt-rpc/src/sqlite.ts b/libs/datasource/jwt-rpc/src/sqlite.ts index 36e374efef..0d47c4615b 100644 --- a/libs/datasource/jwt-rpc/src/sqlite.ts +++ b/libs/datasource/jwt-rpc/src/sqlite.ts @@ -41,6 +41,7 @@ const initSQLiteInstance = async () => { _sqliteProcessing = true; _sqliteInstance = await sqlite({ locateFile: () => + // @ts-ignore new URL('sql.js/dist/sql-wasm.wasm', import.meta.url).href, }); _sqliteProcessing = false; diff --git a/libs/datasource/jwt/src/adapter/yjs/index.ts b/libs/datasource/jwt/src/adapter/yjs/index.ts index d815fe4738..2e7681ae53 100644 --- a/libs/datasource/jwt/src/adapter/yjs/index.ts +++ b/libs/datasource/jwt/src/adapter/yjs/index.ts @@ -18,11 +18,7 @@ import { snapshot, } from 'yjs'; -import { - IndexedDBProvider, - SQLiteProvider, - WebsocketProvider, -} from '@toeverything/datasource/jwt-rpc'; +import { IndexedDBProvider } from '@toeverything/datasource/jwt-rpc'; import { AsyncDatabaseAdapter, @@ -31,7 +27,7 @@ import { Connectivity, HistoryManager, } from '../../adapter'; -import { BucketBackend, BlockItem, BlockTypes } from '../../types'; +import { BlockItem, BlockTypes } from '../../types'; import { getLogger, sha3, sleep } from '../../utils'; import { YjsRemoteBinaries } from './binary'; @@ -43,51 +39,26 @@ import { } from './operation'; import { EmitEvents, Suspend } from './listener'; import { YjsHistoryManager } from './history'; +import { YjsProvider } from './provider'; declare const JWT_DEV: boolean; const logger = getLogger('BlockDB:yjs'); +type ConnectivityListener = ( + workspace: string, + connectivity: Connectivity +) => void; type YjsProviders = { awareness: Awareness; idb: IndexedDBProvider; binariesIdb: IndexedDBProvider; - fstore?: SQLiteProvider; - ws?: WebsocketProvider; - backend: string; gatekeeper: GateKeeper; + connListener: { listeners?: ConnectivityListener }; userId: string; remoteToken?: string; // remote storage token }; const _yjsDatabaseInstance = new Map(); -async function _initWebsocketProvider( - url: string, - room: string, - doc: Doc, - token?: string, - params?: YjsInitOptions['params'] -): Promise<[Awareness, WebsocketProvider | undefined]> { - const awareness = new Awareness(doc); - - if (token) { - const ws = new WebsocketProvider(token, url, room, doc, { - awareness, - params, - }) as any; // TODO: type is erased after cascading references - - // Wait for ws synchronization to complete, otherwise the data will be modified in reverse, which can be optimized later - return new Promise((resolve, reject) => { - // TODO: synced will also be triggered on reconnection after losing sync - // There needs to be an event mechanism to emit the synchronization state to the upper layer - ws.once('synced', () => resolve([awareness, ws])); - ws.once('lost-connection', () => resolve([awareness, ws])); - ws.once('connection-error', () => reject()); - }); - } else { - return [awareness, undefined]; - } -} - const _asyncInitLoading = new Set(); const _waitLoading = async (workspace: string) => { while (_asyncInitLoading.has(workspace)) { @@ -96,14 +67,11 @@ const _waitLoading = async (workspace: string) => { }; async function _initYjsDatabase( - backend: string, workspace: string, options: { - params: YjsInitOptions['params']; userId: string; token?: string; - importData?: Uint8Array; - exportData?: (binary: Uint8Array) => void; + provider?: Record; } ): Promise { if (_asyncInitLoading.has(workspace)) { @@ -119,28 +87,10 @@ async function _initYjsDatabase( } // if (instance) return instance; _asyncInitLoading.add(workspace); - const { params, userId, token: remoteToken } = options; + const { userId, token } = options; const doc = new Doc({ autoLoad: true, shouldLoad: true }); - - const idbp = new IndexedDBProvider(workspace, doc).whenSynced; - - const fs = new SQLiteProvider(workspace, doc, options.importData); - if (options.exportData) fs.registerExporter(options.exportData); - - const wsp = _initWebsocketProvider( - backend, - workspace, - doc, - remoteToken, - params - ); - - const [idb, [awareness, ws], fstore] = await Promise.all([ - idbp, - wsp, - fs.whenSynced, - ]); + const idb = await new IndexedDBProvider(workspace, doc).whenSynced; const binaries = new Doc({ autoLoad: true, shouldLoad: true }); const binariesIdb = await new IndexedDBProvider( @@ -148,6 +98,8 @@ async function _initYjsDatabase( binaries ).whenSynced; + const awareness = new Awareness(doc); + const gateKeeperData = doc.getMap>('gatekeeper'); const gatekeeper = new GateKeeper( @@ -157,44 +109,45 @@ async function _initYjsDatabase( gateKeeperData.get('common') || gateKeeperData.set('common', new YMap()) ); - _yjsDatabaseInstance.set(workspace, { + const connListener: { listeners?: ConnectivityListener } = {}; + if (options.provider) { + const emitState = (c: Connectivity) => + connListener.listeners?.(workspace, c); + await Promise.all( + Object.entries(options.provider).map(async ([, p]) => + p({ awareness, doc, token, workspace, emitState }) + ) + ); + } + const newInstance = { awareness, idb, binariesIdb, - fstore, - ws, - backend, gatekeeper, + connListener, userId, - remoteToken, - }); + remoteToken: token, + }; + + _yjsDatabaseInstance.set(workspace, newInstance); + _asyncInitLoading.delete(workspace); - return { - awareness, - idb, - binariesIdb, - fstore, - ws, - backend, - gatekeeper, - userId, - remoteToken, - }; + return newInstance; } export type { YjsBlockInstance } from './block'; export type { YjsContentOperation } from './operation'; export type YjsInitOptions = { - backend: typeof BucketBackend[keyof typeof BucketBackend]; - params?: Record; userId?: string; token?: string; - importData?: Uint8Array; - exportData?: (binary: Uint8Array) => void; + provider?: Record; }; +export { getYjsProviders } from './provider'; +export type { YjsProviderOptions } from './provider'; + export class YjsAdapter implements AsyncDatabaseAdapter { private readonly _provider: YjsProviders; private readonly _doc: Doc; // doc instance @@ -217,20 +170,11 @@ export class YjsAdapter implements AsyncDatabaseAdapter { workspace: string, options: YjsInitOptions ): Promise { - const { - backend, - params = {}, - userId = 'default', - token, - importData, - exportData, - } = options; - const providers = await _initYjsDatabase(backend, workspace, { - params, + const { userId = 'default', token, provider } = options; + const providers = await _initYjsDatabase(workspace, { userId, token, - importData, - exportData, + provider, }); return new YjsAdapter(providers); } @@ -255,18 +199,14 @@ export class YjsAdapter implements AsyncDatabaseAdapter { this._listener = new Map(); - const ws = providers.ws as any; - if (ws) { - const workspace = providers.idb.name; - const emitState = (connectivity: Connectivity) => { - this._listener.get('connectivity')?.( - new Map([[workspace, connectivity]]) - ); - }; - ws.on('synced', () => emitState('connected')); - ws.on('lost-connection', () => emitState('retry')); - ws.on('connection-error', () => emitState('retry')); - } + providers.connListener.listeners = ( + workspace: string, + connectivity: Connectivity + ) => { + this._listener.get('connectivity')?.( + new Map([[workspace, connectivity]]) + ); + }; const debounced_editing_notifier = debounce( () => { diff --git a/libs/datasource/jwt/src/adapter/yjs/provider.ts b/libs/datasource/jwt/src/adapter/yjs/provider.ts new file mode 100644 index 0000000000..d0f7f9d3af --- /dev/null +++ b/libs/datasource/jwt/src/adapter/yjs/provider.ts @@ -0,0 +1,75 @@ +import { Doc } from 'yjs'; +import { Awareness } from 'y-protocols/awareness.js'; + +import { + SQLiteProvider, + WebsocketProvider, +} from '@toeverything/datasource/jwt-rpc'; + +import { Connectivity } from '../../adapter'; +import { BucketBackend } from '../../types'; + +type YjsDefaultInstances = { + awareness: Awareness; + doc: Doc; + token?: string; + workspace: string; + emitState: (connectivity: Connectivity) => void; +}; + +export type YjsProvider = (instances: YjsDefaultInstances) => Promise; + +export type YjsProviderOptions = { + backend: typeof BucketBackend[keyof typeof BucketBackend]; + params?: Record; + importData?: Uint8Array; + exportData?: (binary: Uint8Array) => void; +}; + +export const getYjsProviders = ( + options: YjsProviderOptions +): Record => { + return { + sqlite: async (instances: YjsDefaultInstances) => { + const fs = new SQLiteProvider( + instances.workspace, + instances.doc, + options.importData + ); + if (options.exportData) fs.registerExporter(options.exportData); + await fs.whenSynced; + }, + ws: async (instances: YjsDefaultInstances) => { + if (instances.token) { + const ws = new WebsocketProvider( + instances.token, + options.backend, + instances.workspace, + instances.doc, + { + awareness: instances.awareness, + params: options.params, + } + ) as any; // TODO: type is erased after cascading references + + // Wait for ws synchronization to complete, otherwise the data will be modified in reverse, which can be optimized later + return new Promise((resolve, reject) => { + // TODO: synced will also be triggered on reconnection after losing sync + // There needs to be an event mechanism to emit the synchronization state to the upper layer + ws.once('synced', () => resolve()); + ws.once('lost-connection', () => resolve()); + ws.once('connection-error', () => reject()); + ws.on('synced', () => instances.emitState('connected')); + ws.on('lost-connection', () => + instances.emitState('retry') + ); + ws.on('connection-error', () => + instances.emitState('retry') + ); + }); + } else { + return; + } + }, + }; +}; diff --git a/libs/datasource/jwt/src/index.ts b/libs/datasource/jwt/src/index.ts index 6ced321c2c..acd3ce5f53 100644 --- a/libs/datasource/jwt/src/index.ts +++ b/libs/datasource/jwt/src/index.ts @@ -15,7 +15,11 @@ import { ContentTypes, Connectivity, } from './adapter'; -import { YjsBlockInstance } from './adapter/yjs'; +import { + getYjsProviders, + YjsBlockInstance, + YjsProviderOptions, +} from './adapter/yjs'; import { BaseBlock, BlockIndexer, @@ -27,11 +31,11 @@ import { BlockTypes, BlockTypeKeys, BlockFlavors, - BucketBackend, UUID, BlockFlavorKeys, BlockItem, ExcludeFunction, + BucketBackend, } from './types'; import { BlockEventBus, genUUID, getLogger } from './utils'; @@ -588,10 +592,16 @@ export class BlockClient< public static async init( workspace: string, - options: Partial = {} + options: Partial< + YjsInitOptions & YjsProviderOptions & BlockClientOptions + > = {} ): Promise { const instance = await YjsAdapter.init(workspace, { - backend: BucketBackend.YjsWebSocketAffine, + provider: getYjsProviders({ + backend: BucketBackend.YjsWebSocketAffine, + exportData: console.log.bind(console), + ...options, + }), ...options, }); return new BlockClient(instance, workspace, options);