feat: jwst connectivity

This commit is contained in:
DarkSky
2022-07-27 20:35:44 +08:00
parent a220ea517f
commit 79b1547721
6 changed files with 76 additions and 5 deletions

View File

@@ -23,6 +23,13 @@ export abstract class ServiceBaseClass {
return db.getWorkspace();
}
async listenConnectivity(
workspace: string,
callback: (state: string) => void
) {
this.database.listenConnectivity(workspace, workspace, callback);
}
async onHistoryChange(
workspace: string,
name: string,

View File

@@ -6,6 +6,7 @@ import {
BlockContentExporter,
BlockMatcher,
BlockInitOptions,
Connectivity,
} from '@toeverything/datasource/jwt';
import { sleep } from '@toeverything/utils';
@@ -81,6 +82,18 @@ export class Database {
return db;
}
async listenConnectivity(
workspace: string,
name: string,
listener: (connectivity: Connectivity) => void
) {
const db = await _getBlockDatabase(workspace);
return db.addConnectivityListener(name, state => {
const connectivity = state.get(name);
if (connectivity) listener(connectivity);
});
}
async registerContentExporter(
workspace: string,
name: string,

View File

@@ -9,6 +9,8 @@ export type BlockListener<S = ChangedStateKeys, R = unknown> = (
states: ChangedStates<S>
) => Promise<R> | R;
export type Connectivity = 'disconnect' | 'connecting' | 'connected' | 'retry';
export type Operable<T, Base = YAbstractType<any>> = T extends Base
? ContentOperation
: T;
@@ -145,7 +147,10 @@ interface AsyncDatabaseAdapter<C extends ContentOperation> {
getBlockByType(type: BlockItem<C>['type']): Promise<string[]>;
checkBlocks(keys: string[]): Promise<boolean>;
deleteBlocks(keys: string[]): Promise<string[]>;
on<S, R>(key: 'editing' | 'updated', listener: BlockListener<S, R>): void;
on<S, R>(
key: 'editing' | 'updated' | 'connectivity',
listener: BlockListener<S, R>
): void;
suspend(suspend: boolean): void;
history(): HistoryManager;
getUserId(): string;

View File

@@ -25,6 +25,7 @@ import {
AsyncDatabaseAdapter,
BlockListener,
ChangedStateKeys,
Connectivity,
HistoryManager,
} from '../../adapter';
import { BucketBackend, BlockItem, BlockTypes } from '../../types';
@@ -63,6 +64,7 @@ async function _initWebsocketProvider(
params?: YjsInitOptions['params']
): Promise<[Awareness, WebsocketProvider | undefined]> {
const awareness = new Awareness(doc);
if (token) {
const ws = new WebsocketProvider(token, url, room, doc, {
awareness,
@@ -75,7 +77,7 @@ async function _initWebsocketProvider(
// There needs to be an event mechanism to emit the synchronization state to the upper layer
ws.once('synced', () => resolve([awareness, ws]));
ws.once('lost-connection', () => resolve([awareness, ws]));
ws.on('connection-error', reject);
ws.once('connection-error', () => reject());
});
} else {
return [awareness, undefined];
@@ -226,6 +228,19 @@ export class YjsAdapter implements AsyncDatabaseAdapter<YjsContentOperation> {
this.#listener = new Map();
const ws = providers.ws as any;
if (ws) {
const workspace = providers.idb.name;
const emitState = (connectivity: Connectivity) => {
this.#listener.get('connectivity')?.(
new Map([[workspace, connectivity]])
);
};
ws.on('synced', () => emitState('connected'));
ws.on('lost-connection', () => emitState('retry'));
ws.on('connection-error', () => emitState('retry'));
}
const debounced_editing_notifier = debounce(
() => {
const listener: BlockListener<Set<string>> | undefined =
@@ -612,7 +627,10 @@ export class YjsAdapter implements AsyncDatabaseAdapter<YjsContentOperation> {
return fail;
}
on<S, R>(key: 'editing' | 'updated', listener: BlockListener<S, R>): void {
on<S, R>(
key: 'editing' | 'updated' | 'connectivity',
listener: BlockListener<S, R>
): void {
this.#listener.set(key, listener);
}

View File

@@ -13,6 +13,7 @@ import {
ContentOperation,
HistoryManager,
ContentTypes,
Connectivity,
} from './adapter';
import { YjsBlockInstance } from './adapter/yjs';
import {
@@ -124,6 +125,12 @@ export class BlockClient<
this.#event_bus.topic('updated').emit(states)
);
this.#adapter.on(
'connectivity',
(states: ChangedStates<Connectivity>) =>
this.#event_bus.topic('connectivity').emit(states)
);
this.#event_bus
.topic<string[]>('rebuild_index')
.on('rebuild_index', this.rebuild_index.bind(this), {
@@ -136,7 +143,7 @@ export class BlockClient<
public addBlockListener(tag: string, listener: BlockListener) {
const bus = this.#event_bus.topic<ChangedStates>('updated');
if (tag !== 'index' || !bus.has(tag)) bus.on(tag, listener);
else console.error(`block listener ${tag} is reserved`);
else console.error(`block listener ${tag} is reserved or exists`);
}
public removeBlockListener(tag: string) {
@@ -150,13 +157,28 @@ export class BlockClient<
const bus =
this.#event_bus.topic<ChangedStates<Set<string>>>('editing');
if (tag !== 'index' || !bus.has(tag)) bus.on(tag, listener);
else console.error(`editing listener ${tag} is reserved`);
else console.error(`editing listener ${tag} is reserved or exists`);
}
public removeEditingListener(tag: string) {
this.#event_bus.topic('editing').off(tag);
}
public addConnectivityListener(
tag: string,
listener: BlockListener<Connectivity>
) {
const bus =
this.#event_bus.topic<ChangedStates<Connectivity>>('connectivity');
if (tag !== 'index' || !bus.has(tag)) bus.on(tag, listener);
else
console.error(`connectivity listener ${tag} is reserved or exists`);
}
public removeConnectivityListener(tag: string) {
this.#event_bus.topic('connectivity').off(tag);
}
private inspector() {
return {
...this.#adapter.inspector(),
@@ -580,6 +602,7 @@ export type {
ArrayOperation,
MapOperation,
ChangedStates,
Connectivity,
} from './adapter';
export type {
BlockSearchItem,