diff --git a/libs/components/editor-core/src/editor/editor.ts b/libs/components/editor-core/src/editor/editor.ts index 37d70da2bc..29950c0395 100644 --- a/libs/components/editor-core/src/editor/editor.ts +++ b/libs/components/editor-core/src/editor/editor.ts @@ -107,6 +107,11 @@ export class Editor implements Virgo { ); } this.bdCommands = new Commands(props.workspace); + + services.api.editorBlock.listenConnectivity(this.workspace, state => { + console.log(this.workspace, state); + }); + services.api.editorBlock.onHistoryChange( this.workspace, 'affine', diff --git a/libs/datasource/db-service/src/services/base.ts b/libs/datasource/db-service/src/services/base.ts index 81c8f3e9ac..dcf53dcb4c 100644 --- a/libs/datasource/db-service/src/services/base.ts +++ b/libs/datasource/db-service/src/services/base.ts @@ -23,6 +23,13 @@ export abstract class ServiceBaseClass { return db.getWorkspace(); } + async listenConnectivity( + workspace: string, + callback: (state: string) => void + ) { + this.database.listenConnectivity(workspace, workspace, callback); + } + async onHistoryChange( workspace: string, name: string, diff --git a/libs/datasource/db-service/src/services/database/index.ts b/libs/datasource/db-service/src/services/database/index.ts index 83d832a68b..de83b24db0 100644 --- a/libs/datasource/db-service/src/services/database/index.ts +++ b/libs/datasource/db-service/src/services/database/index.ts @@ -6,6 +6,7 @@ import { BlockContentExporter, BlockMatcher, BlockInitOptions, + Connectivity, } from '@toeverything/datasource/jwt'; import { sleep } from '@toeverything/utils'; @@ -81,6 +82,18 @@ export class Database { return db; } + async listenConnectivity( + workspace: string, + name: string, + listener: (connectivity: Connectivity) => void + ) { + const db = await _getBlockDatabase(workspace); + return db.addConnectivityListener(name, state => { + const connectivity = state.get(name); + if (connectivity) listener(connectivity); + }); + } + async registerContentExporter( workspace: string, name: string, diff --git a/libs/datasource/jwt/src/adapter/index.ts b/libs/datasource/jwt/src/adapter/index.ts index 101808a71e..ddf4f29ca4 100644 --- a/libs/datasource/jwt/src/adapter/index.ts +++ b/libs/datasource/jwt/src/adapter/index.ts @@ -9,6 +9,8 @@ export type BlockListener = ( states: ChangedStates ) => Promise | R; +export type Connectivity = 'disconnect' | 'connecting' | 'connected' | 'retry'; + export type Operable> = T extends Base ? ContentOperation : T; @@ -145,7 +147,10 @@ interface AsyncDatabaseAdapter { getBlockByType(type: BlockItem['type']): Promise; checkBlocks(keys: string[]): Promise; deleteBlocks(keys: string[]): Promise; - on(key: 'editing' | 'updated', listener: BlockListener): void; + on( + key: 'editing' | 'updated' | 'connectivity', + listener: BlockListener + ): void; suspend(suspend: boolean): void; history(): HistoryManager; getUserId(): string; diff --git a/libs/datasource/jwt/src/adapter/yjs/index.ts b/libs/datasource/jwt/src/adapter/yjs/index.ts index 12b2cce557..2d14aec56a 100644 --- a/libs/datasource/jwt/src/adapter/yjs/index.ts +++ b/libs/datasource/jwt/src/adapter/yjs/index.ts @@ -25,6 +25,7 @@ import { AsyncDatabaseAdapter, BlockListener, ChangedStateKeys, + Connectivity, HistoryManager, } from '../../adapter'; import { BucketBackend, BlockItem, BlockTypes } from '../../types'; @@ -63,6 +64,7 @@ async function _initWebsocketProvider( params?: YjsInitOptions['params'] ): Promise<[Awareness, WebsocketProvider | undefined]> { const awareness = new Awareness(doc); + if (token) { const ws = new WebsocketProvider(token, url, room, doc, { awareness, @@ -75,7 +77,7 @@ async function _initWebsocketProvider( // There needs to be an event mechanism to emit the synchronization state to the upper layer ws.once('synced', () => resolve([awareness, ws])); ws.once('lost-connection', () => resolve([awareness, ws])); - ws.on('connection-error', reject); + ws.once('connection-error', () => reject()); }); } else { return [awareness, undefined]; @@ -226,6 +228,19 @@ export class YjsAdapter implements AsyncDatabaseAdapter { this.#listener = new Map(); + const ws = providers.ws as any; + if (ws) { + const workspace = providers.idb.name; + const emitState = (connectivity: Connectivity) => { + this.#listener.get('connectivity')?.( + new Map([[workspace, connectivity]]) + ); + }; + ws.on('synced', () => emitState('connected')); + ws.on('lost-connection', () => emitState('retry')); + ws.on('connection-error', () => emitState('retry')); + } + const debounced_editing_notifier = debounce( () => { const listener: BlockListener> | undefined = @@ -612,7 +627,10 @@ export class YjsAdapter implements AsyncDatabaseAdapter { return fail; } - on(key: 'editing' | 'updated', listener: BlockListener): void { + on( + key: 'editing' | 'updated' | 'connectivity', + listener: BlockListener + ): void { this.#listener.set(key, listener); } diff --git a/libs/datasource/jwt/src/index.ts b/libs/datasource/jwt/src/index.ts index b092bf9776..c8d868cff7 100644 --- a/libs/datasource/jwt/src/index.ts +++ b/libs/datasource/jwt/src/index.ts @@ -13,6 +13,7 @@ import { ContentOperation, HistoryManager, ContentTypes, + Connectivity, } from './adapter'; import { YjsBlockInstance } from './adapter/yjs'; import { @@ -124,6 +125,12 @@ export class BlockClient< this.#event_bus.topic('updated').emit(states) ); + this.#adapter.on( + 'connectivity', + (states: ChangedStates) => + this.#event_bus.topic('connectivity').emit(states) + ); + this.#event_bus .topic('rebuild_index') .on('rebuild_index', this.rebuild_index.bind(this), { @@ -136,7 +143,7 @@ export class BlockClient< public addBlockListener(tag: string, listener: BlockListener) { const bus = this.#event_bus.topic('updated'); if (tag !== 'index' || !bus.has(tag)) bus.on(tag, listener); - else console.error(`block listener ${tag} is reserved`); + else console.error(`block listener ${tag} is reserved or exists`); } public removeBlockListener(tag: string) { @@ -150,13 +157,28 @@ export class BlockClient< const bus = this.#event_bus.topic>>('editing'); if (tag !== 'index' || !bus.has(tag)) bus.on(tag, listener); - else console.error(`editing listener ${tag} is reserved`); + else console.error(`editing listener ${tag} is reserved or exists`); } public removeEditingListener(tag: string) { this.#event_bus.topic('editing').off(tag); } + public addConnectivityListener( + tag: string, + listener: BlockListener + ) { + const bus = + this.#event_bus.topic>('connectivity'); + if (tag !== 'index' || !bus.has(tag)) bus.on(tag, listener); + else + console.error(`connectivity listener ${tag} is reserved or exists`); + } + + public removeConnectivityListener(tag: string) { + this.#event_bus.topic('connectivity').off(tag); + } + private inspector() { return { ...this.#adapter.inspector(), @@ -580,6 +602,7 @@ export type { ArrayOperation, MapOperation, ChangedStates, + Connectivity, } from './adapter'; export type { BlockSearchItem,