diff --git a/packages/common/infra/src/op/consumer.ts b/packages/common/infra/src/op/consumer.ts index 0b24c4b5ea..4a0d2cc283 100644 --- a/packages/common/infra/src/op/consumer.ts +++ b/packages/common/infra/src/op/consumer.ts @@ -126,6 +126,16 @@ export class OpConsumer extends AutoMessageHandler { this.registeredOpHandlers.set(op, handler); } + registerAll( + handlers: OpNames extends string + ? { [K in OpNames]: OpHandler } + : never + ) { + for (const [op, handler] of Object.entries(handlers)) { + this.register(op as any, handler as any); + } + } + before>( op: Op, handler: (...input: OpInput) => void diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index 5a9c66757c..21bf934bb2 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -6,7 +6,7 @@ "sideEffects": false, "exports": { ".": "./src/index.ts", - "./op": "./src/op/index.ts", + "./worker": "./src/worker/index.ts", "./idb": "./src/impls/idb/index.ts", "./idb/v1": "./src/impls/idb/v1/index.ts", "./cloud": "./src/impls/cloud/index.ts", diff --git a/packages/common/nbstore/src/__tests__/frontend.spec.ts b/packages/common/nbstore/src/__tests__/frontend.spec.ts index cedd7ad903..9acfca646f 100644 --- a/packages/common/nbstore/src/__tests__/frontend.spec.ts +++ b/packages/common/nbstore/src/__tests__/frontend.spec.ts @@ -8,7 +8,7 @@ import { AwarenessFrontend } from '../frontend/awareness'; import { DocFrontend } from '../frontend/doc'; import { BroadcastChannelAwarenessStorage } from '../impls/broadcast-channel/awareness'; import { IndexedDBDocStorage } from '../impls/idb'; -import { AwarenessSync } from '../sync/awareness'; +import { AwarenessSyncImpl } from '../sync/awareness'; import { expectYjsEqual } from './utils'; test('doc', async () => { @@ -23,9 +23,9 @@ test('doc', async () => { type: 'workspace', }); - docStorage.connect(); + docStorage.connection.connect(); - await docStorage.waitForConnected(); + await docStorage.connection.waitForConnected(); const frontend1 = new DocFrontend(docStorage, null); frontend1.start(); @@ -68,11 +68,11 @@ test('awareness', async () => { type: 'workspace', }); - storage1.connect(); - storage2.connect(); + storage1.connection.connect(); + storage2.connection.connect(); - await storage1.waitForConnected(); - await storage2.waitForConnected(); + await storage1.connection.waitForConnected(); + await storage2.connection.waitForConnected(); // peer a const docA = new YDoc({ guid: 'test-doc' }); @@ -90,13 +90,13 @@ test('awareness', async () => { const awarenessC = new Awareness(docC); { - const sync = new AwarenessSync(storage1, [storage2]); + const sync = new AwarenessSyncImpl(storage1, [storage2]); const frontend = new AwarenessFrontend(sync); frontend.connect(awarenessA); frontend.connect(awarenessB); } { - const sync = new AwarenessSync(storage2, [storage1]); + const sync = new AwarenessSyncImpl(storage2, [storage1]); const frontend = new AwarenessFrontend(sync); frontend.connect(awarenessC); } diff --git a/packages/common/nbstore/src/connection/connection.ts b/packages/common/nbstore/src/connection/connection.ts index 0df9900521..cd15248d32 100644 --- a/packages/common/nbstore/src/connection/connection.ts +++ b/packages/common/nbstore/src/connection/connection.ts @@ -8,7 +8,20 @@ export type ConnectionStatus = | 'error' | 'closed'; -export abstract class Connection { +export interface Connection { + readonly status: ConnectionStatus; + readonly inner: T; + connect(): void; + disconnect(): void; + waitForConnected(signal?: AbortSignal): Promise; + onStatusChanged( + cb: (status: ConnectionStatus, error?: Error) => void + ): () => void; +} + +export abstract class AutoReconnectConnection + implements Connection +{ private readonly event = new EventEmitter2(); private _inner: T | null = null; private _status: ConnectionStatus = 'idle'; @@ -160,12 +173,22 @@ export abstract class Connection { }; } -export class DummyConnection extends Connection { - doConnect() { - return Promise.resolve(undefined); - } +export class DummyConnection implements Connection { + readonly status: ConnectionStatus = 'connected'; + readonly inner: undefined; - doDisconnect() { + connect(): void { return; } + disconnect(): void { + return; + } + waitForConnected(_signal?: AbortSignal): Promise { + return Promise.resolve(); + } + onStatusChanged( + _cb: (status: ConnectionStatus, error?: Error) => void + ): () => void { + return () => {}; + } } diff --git a/packages/common/nbstore/src/connection/shared-connection.ts b/packages/common/nbstore/src/connection/shared-connection.ts index da69e84905..1f11bb4a43 100644 --- a/packages/common/nbstore/src/connection/shared-connection.ts +++ b/packages/common/nbstore/src/connection/shared-connection.ts @@ -1,7 +1,7 @@ -import type { Connection } from './connection'; +import type { AutoReconnectConnection } from './connection'; -const CONNECTIONS: Map> = new Map(); -export function share>(conn: T): T { +const CONNECTIONS: Map> = new Map(); +export function share>(conn: T): T { if (!conn.shareId) { throw new Error( `Connection ${conn.constructor.name} is not shareable.\nIf you want to make it shareable, please override [shareId].` diff --git a/packages/common/nbstore/src/frontend/awareness.ts b/packages/common/nbstore/src/frontend/awareness.ts index 19a092789b..524c790fba 100644 --- a/packages/common/nbstore/src/frontend/awareness.ts +++ b/packages/common/nbstore/src/frontend/awareness.ts @@ -51,10 +51,10 @@ export class AwarenessFrontend { applyAwarenessUpdate(awareness, update.bin, origin); }; const handleSyncCollect = () => { - return { + return Promise.resolve({ docId: awareness.doc.guid, bin: encodeAwarenessUpdate(awareness, [awareness.clientID]), - }; + }); }; const unsubscribe = this.sync.subscribeUpdate( awareness.doc.guid, diff --git a/packages/common/nbstore/src/frontend/blob.ts b/packages/common/nbstore/src/frontend/blob.ts index 40af3f7773..7bfc76ab45 100644 --- a/packages/common/nbstore/src/frontend/blob.ts +++ b/packages/common/nbstore/src/frontend/blob.ts @@ -17,7 +17,7 @@ export class BlobFrontend { return this.sync ? this.sync.uploadBlob(blob) : this.storage.set(blob); } - addPriority(id: string, priority: number) { - return this.sync?.addPriority(id, priority); + addPriority(_id: string, _priority: number) { + // not support yet } } diff --git a/packages/common/nbstore/src/frontend/doc.ts b/packages/common/nbstore/src/frontend/doc.ts index c95aee1c18..9cbfc301b7 100644 --- a/packages/common/nbstore/src/frontend/doc.ts +++ b/packages/common/nbstore/src/frontend/doc.ts @@ -37,7 +37,7 @@ interface DocFrontendOptions { } export class DocFrontend { - private readonly uniqueId = `frontend:${this.storage.peer}:${nanoid()}`; + private readonly uniqueId = `frontend:${nanoid()}`; private readonly prioritySettings = new Map(); @@ -88,7 +88,6 @@ export class DocFrontend { }), ]); - // eslint-disable-next-line no-constant-condition while (true) { throwIfAborted(signal); const docId = await this.status.jobDocQueue.asyncPop(signal); diff --git a/packages/common/nbstore/src/impls/broadcast-channel/awareness.ts b/packages/common/nbstore/src/impls/broadcast-channel/awareness.ts index 837dc04784..b4ec7a67de 100644 --- a/packages/common/nbstore/src/impls/broadcast-channel/awareness.ts +++ b/packages/common/nbstore/src/impls/broadcast-channel/awareness.ts @@ -1,9 +1,6 @@ import { nanoid } from 'nanoid'; -import { - type AwarenessRecord, - AwarenessStorage, -} from '../../storage/awareness'; +import { type AwarenessRecord, AwarenessStorageBase } from '../../storage'; import { BroadcastChannelConnection } from './channel'; type ChannelMessage = @@ -19,13 +16,13 @@ type ChannelMessage = collectId: string; } | { - type: 'awareness-collect-fallback'; + type: 'awareness-collect-feedback'; docId: string; bin: Uint8Array; collectId: string; }; -export class BroadcastChannelAwarenessStorage extends AwarenessStorage { +export class BroadcastChannelAwarenessStorage extends AwarenessStorageBase { override readonly storageType = 'awareness'; override readonly connection = new BroadcastChannelConnection(this.options); get channel() { @@ -36,7 +33,7 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage { string, Set<{ onUpdate: (update: AwarenessRecord, origin?: string) => void; - onCollect: () => AwarenessRecord; + onCollect: () => Promise; }> >(); @@ -57,12 +54,20 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage { override subscribeUpdate( id: string, onUpdate: (update: AwarenessRecord, origin?: string) => void, - onCollect: () => AwarenessRecord + onCollect: () => Promise ): () => void { const subscribers = this.subscriptions.get(id) ?? new Set(); subscribers.forEach(subscriber => { - const fallback = subscriber.onCollect(); - onUpdate(fallback); + subscriber + .onCollect() + .then(awareness => { + if (awareness) { + onUpdate(awareness); + } + }) + .catch(error => { + console.error('error in on collect awareness', error); + }); }); const collectUniqueId = nanoid(); @@ -84,18 +89,23 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage { message.data.type === 'awareness-collect' && message.data.docId === id ) { - const fallback = onCollect(); - if (fallback) { - this.channel.postMessage({ - type: 'awareness-collect-fallback', - docId: message.data.docId, - bin: fallback.bin, - collectId: collectUniqueId, - } satisfies ChannelMessage); - } + onCollect() + .then(awareness => { + if (awareness) { + this.channel.postMessage({ + type: 'awareness-collect-feedback', + docId: message.data.docId, + bin: awareness.bin, + collectId: collectUniqueId, + } satisfies ChannelMessage); + } + }) + .catch(error => { + console.error('error in on collect awareness', error); + }); } if ( - message.data.type === 'awareness-collect-fallback' && + message.data.type === 'awareness-collect-feedback' && message.data.docId === id && message.data.collectId === collectUniqueId ) { diff --git a/packages/common/nbstore/src/impls/broadcast-channel/channel.ts b/packages/common/nbstore/src/impls/broadcast-channel/channel.ts index cd40bd7f21..fae9fbb750 100644 --- a/packages/common/nbstore/src/impls/broadcast-channel/channel.ts +++ b/packages/common/nbstore/src/impls/broadcast-channel/channel.ts @@ -1,7 +1,7 @@ -import { Connection } from '../../connection'; +import { AutoReconnectConnection } from '../../connection'; import type { StorageOptions } from '../../storage'; -export class BroadcastChannelConnection extends Connection { +export class BroadcastChannelConnection extends AutoReconnectConnection { readonly channelName = `channel:${this.opts.peer}:${this.opts.type}:${this.opts.id}`; constructor(private readonly opts: StorageOptions) { diff --git a/packages/common/nbstore/src/impls/cloud/awareness.ts b/packages/common/nbstore/src/impls/cloud/awareness.ts index be9a56ef45..5ee8ccd0e9 100644 --- a/packages/common/nbstore/src/impls/cloud/awareness.ts +++ b/packages/common/nbstore/src/impls/cloud/awareness.ts @@ -3,7 +3,7 @@ import type { SocketOptions } from 'socket.io-client'; import { share } from '../../connection'; import { type AwarenessRecord, - AwarenessStorage, + AwarenessStorageBase, type AwarenessStorageOptions, } from '../../storage/awareness'; import { @@ -16,7 +16,7 @@ interface CloudAwarenessStorageOptions extends AwarenessStorageOptions { socketOptions: SocketOptions; } -export class CloudAwarenessStorage extends AwarenessStorage { +export class CloudAwarenessStorage extends AwarenessStorageBase { connection = share( new SocketConnection(this.peer, this.options.socketOptions) ); @@ -38,7 +38,7 @@ export class CloudAwarenessStorage extends AwarenessStorage void, - onCollect: () => AwarenessRecord + onCollect: () => Promise ): () => void { // TODO: handle disconnect // leave awareness @@ -92,14 +92,16 @@ export class CloudAwarenessStorage extends AwarenessStorage { - const record = onCollect(); - const encodedUpdate = await uint8ArrayToBase64(record.bin); - this.socket.emit('space:update-awareness', { - spaceType: this.spaceType, - spaceId: this.spaceId, - docId: record.docId, - awarenessUpdate: encodedUpdate, - }); + const record = await onCollect(); + if (record) { + const encodedUpdate = await uint8ArrayToBase64(record.bin); + this.socket.emit('space:update-awareness', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId: record.docId, + awarenessUpdate: encodedUpdate, + }); + } })().catch(err => console.error('awareness upload failed', err)); } }; diff --git a/packages/common/nbstore/src/impls/cloud/blob.ts b/packages/common/nbstore/src/impls/cloud/blob.ts index 91cf113b9c..2dac5f6ed9 100644 --- a/packages/common/nbstore/src/impls/cloud/blob.ts +++ b/packages/common/nbstore/src/impls/cloud/blob.ts @@ -7,16 +7,31 @@ import { } from '@affine/graphql'; import { DummyConnection } from '../../connection'; -import { type BlobRecord, BlobStorage } from '../../storage'; +import { + type BlobRecord, + BlobStorageBase, + type BlobStorageOptions, +} from '../../storage'; -export class CloudBlobStorage extends BlobStorage { - private readonly gql = gqlFetcherFactory(this.options.peer + '/graphql'); +interface CloudBlobStorageOptions extends BlobStorageOptions { + apiBaseUrl: string; +} + +export class CloudBlobStorage extends BlobStorageBase { + private readonly gql = gqlFetcherFactory( + this.options.apiBaseUrl + '/graphql' + ); override connection = new DummyConnection(); override async get(key: string) { const res = await fetch( - this.options.peer + '/api/workspaces/' + this.spaceId + '/blobs/' + key, + this.options.apiBaseUrl + + '/api/workspaces/' + + this.spaceId + + '/blobs/' + + key, { + cache: 'default', headers: { 'x-affine-version': BUILD_CONFIG.appVersion, }, diff --git a/packages/common/nbstore/src/impls/cloud/doc.ts b/packages/common/nbstore/src/impls/cloud/doc.ts index 7be0d02cb2..757b93351c 100644 --- a/packages/common/nbstore/src/impls/cloud/doc.ts +++ b/packages/common/nbstore/src/impls/cloud/doc.ts @@ -1,10 +1,14 @@ -import type { SocketOptions } from 'socket.io-client'; +import type { Socket, SocketOptions } from 'socket.io-client'; -import { share } from '../../connection'; +import { + type Connection, + type ConnectionStatus, + share, +} from '../../connection'; import { type DocClock, type DocClocks, - DocStorage, + DocStorageBase, type DocStorageOptions, type DocUpdate, } from '../../storage'; @@ -17,63 +21,14 @@ import { interface CloudDocStorageOptions extends DocStorageOptions { socketOptions: SocketOptions; + serverBaseUrl: string; } -export class CloudDocStorage extends DocStorage { - connection = share( - new SocketConnection(this.peer, this.options.socketOptions) - ); - - private disposeConnectionStatusListener?: () => void; - - private get socket() { +export class CloudDocStorage extends DocStorageBase { + get socket() { return this.connection.inner; } - override connect() { - if (!this.disposeConnectionStatusListener) { - this.disposeConnectionStatusListener = this.connection.onStatusChanged( - status => { - if (status === 'connected') { - this.join().catch(err => { - console.error('doc storage join failed', err); - }); - this.socket.on('space:broadcast-doc-update', this.onServerUpdate); - } - } - ); - } - super.connect(); - } - - override disconnect() { - if (this.disposeConnectionStatusListener) { - this.disposeConnectionStatusListener(); - } - this.socket.emit('space:leave', { - spaceType: this.spaceType, - spaceId: this.spaceId, - }); - this.socket.off('space:broadcast-doc-update', this.onServerUpdate); - super.disconnect(); - } - - async join() { - try { - const res = await this.socket.emitWithAck('space:join', { - spaceType: this.spaceType, - spaceId: this.spaceId, - clientVersion: BUILD_CONFIG.appVersion, - }); - - if ('error' in res) { - this.connection.setStatus('closed', new Error(res.error.message)); - } - } catch (e) { - this.connection.setStatus('error', e as Error); - } - } - onServerUpdate: ServerEventsMap['space:broadcast-doc-update'] = message => { if ( this.spaceType === message.spaceType && @@ -88,6 +43,11 @@ export class CloudDocStorage extends DocStorage { } }; + readonly connection = new CloudDocStorageConnection( + this.options, + this.onServerUpdate + ); + override async getDocSnapshot(docId: string) { const response = await this.socket.emitWithAck('space:load-doc', { spaceType: this.spaceType, @@ -207,3 +167,84 @@ export class CloudDocStorage extends DocStorage { return 0; } } + +class CloudDocStorageConnection implements Connection { + connection = share( + new SocketConnection( + `${this.options.serverBaseUrl}/`, + this.options.socketOptions + ) + ); + + private disposeConnectionStatusListener?: () => void; + + private get socket() { + return this.connection.inner; + } + + constructor( + private readonly options: CloudDocStorageOptions, + private readonly onServerUpdate: ServerEventsMap['space:broadcast-doc-update'] + ) {} + + get status() { + return this.connection.status; + } + + get inner() { + return this.connection.inner; + } + + connect(): void { + if (!this.disposeConnectionStatusListener) { + this.disposeConnectionStatusListener = this.connection.onStatusChanged( + status => { + if (status === 'connected') { + this.join().catch(err => { + console.error('doc storage join failed', err); + }); + this.socket.on('space:broadcast-doc-update', this.onServerUpdate); + } + } + ); + } + return this.connection.connect(); + } + + async join() { + try { + const res = await this.socket.emitWithAck('space:join', { + spaceType: this.options.type, + spaceId: this.options.id, + clientVersion: BUILD_CONFIG.appVersion, + }); + + if ('error' in res) { + this.connection.setStatus('closed', new Error(res.error.message)); + } + } catch (e) { + this.connection.setStatus('error', e as Error); + } + } + + disconnect() { + if (this.disposeConnectionStatusListener) { + this.disposeConnectionStatusListener(); + } + this.socket.emit('space:leave', { + spaceType: this.options.type, + spaceId: this.options.id, + }); + this.socket.off('space:broadcast-doc-update', this.onServerUpdate); + this.connection.disconnect(); + } + + waitForConnected(signal?: AbortSignal): Promise { + return this.connection.waitForConnected(signal); + } + onStatusChanged( + cb: (status: ConnectionStatus, error?: Error) => void + ): () => void { + return this.connection.onStatusChanged(cb); + } +} diff --git a/packages/common/nbstore/src/impls/cloud/socket.ts b/packages/common/nbstore/src/impls/cloud/socket.ts index 79d31057ce..558afb56f2 100644 --- a/packages/common/nbstore/src/impls/cloud/socket.ts +++ b/packages/common/nbstore/src/impls/cloud/socket.ts @@ -4,7 +4,10 @@ import { type SocketOptions, } from 'socket.io-client'; -import { Connection, type ConnectionStatus } from '../../connection'; +import { + AutoReconnectConnection, + type ConnectionStatus, +} from '../../connection'; // TODO(@forehalo): use [UserFriendlyError] interface EventError { @@ -150,7 +153,7 @@ export function base64ToUint8Array(base64: string) { return new Uint8Array(binaryArray); } -export class SocketConnection extends Connection { +export class SocketConnection extends AutoReconnectConnection { manager = new SocketIOManager(this.endpoint, { autoConnect: false, transports: ['websocket'], diff --git a/packages/common/nbstore/src/impls/idb/blob.ts b/packages/common/nbstore/src/impls/idb/blob.ts index 02c67267d5..0ea7fc7821 100644 --- a/packages/common/nbstore/src/impls/idb/blob.ts +++ b/packages/common/nbstore/src/impls/idb/blob.ts @@ -1,12 +1,12 @@ import { share } from '../../connection'; import { type BlobRecord, - BlobStorage, + BlobStorageBase, type ListedBlobRecord, } from '../../storage'; import { IDBConnection } from './db'; -export class IndexedDBBlobStorage extends BlobStorage { +export class IndexedDBBlobStorage extends BlobStorageBase { readonly connection = share(new IDBConnection(this.options)); get db() { diff --git a/packages/common/nbstore/src/impls/idb/db.ts b/packages/common/nbstore/src/impls/idb/db.ts index 52d3d088c8..c7ba4282aa 100644 --- a/packages/common/nbstore/src/impls/idb/db.ts +++ b/packages/common/nbstore/src/impls/idb/db.ts @@ -1,10 +1,10 @@ import { type IDBPDatabase, openDB } from 'idb'; -import { Connection } from '../../connection'; +import { AutoReconnectConnection } from '../../connection'; import type { StorageOptions } from '../../storage'; import { type DocStorageSchema, migrator } from './schema'; -export class IDBConnection extends Connection<{ +export class IDBConnection extends AutoReconnectConnection<{ db: IDBPDatabase; channel: BroadcastChannel; }> { diff --git a/packages/common/nbstore/src/impls/idb/doc.ts b/packages/common/nbstore/src/impls/idb/doc.ts index b188b5b84b..086b4ca29a 100644 --- a/packages/common/nbstore/src/impls/idb/doc.ts +++ b/packages/common/nbstore/src/impls/idb/doc.ts @@ -2,7 +2,7 @@ import { type DocClock, type DocClocks, type DocRecord, - DocStorage, + DocStorageBase, type DocStorageOptions, type DocUpdate, } from '../../storage'; @@ -15,7 +15,7 @@ interface ChannelMessage { origin?: string; } -export class IndexedDBDocStorage extends DocStorage { +export class IndexedDBDocStorage extends DocStorageBase { readonly connection = new IDBConnection(this.options); get db() { diff --git a/packages/common/nbstore/src/impls/idb/sync.ts b/packages/common/nbstore/src/impls/idb/sync.ts index b359a1554d..77c9568bdc 100644 --- a/packages/common/nbstore/src/impls/idb/sync.ts +++ b/packages/common/nbstore/src/impls/idb/sync.ts @@ -1,7 +1,7 @@ import { share } from '../../connection'; -import { type DocClock, type DocClocks, SyncStorage } from '../../storage'; +import { BasicSyncStorage, type DocClock, type DocClocks } from '../../storage'; import { IDBConnection } from './db'; -export class IndexedDBSyncStorage extends SyncStorage { +export class IndexedDBSyncStorage extends BasicSyncStorage { readonly connection = share(new IDBConnection(this.options)); get db() { diff --git a/packages/common/nbstore/src/impls/idb/v1/blob.ts b/packages/common/nbstore/src/impls/idb/v1/blob.ts index fcd370f62b..508bb851e6 100644 --- a/packages/common/nbstore/src/impls/idb/v1/blob.ts +++ b/packages/common/nbstore/src/impls/idb/v1/blob.ts @@ -1,11 +1,11 @@ import { share } from '../../../connection'; -import { BlobStorage, type ListedBlobRecord } from '../../../storage'; +import { BlobStorageBase, type ListedBlobRecord } from '../../../storage'; import { BlobIDBConnection } from './db'; /** * @deprecated readonly */ -export class IndexedDBV1BlobStorage extends BlobStorage { +export class IndexedDBV1BlobStorage extends BlobStorageBase { readonly connection = share(new BlobIDBConnection(this.spaceId)); get db() { diff --git a/packages/common/nbstore/src/impls/idb/v1/db.ts b/packages/common/nbstore/src/impls/idb/v1/db.ts index b12dbf6d04..b0934fd21d 100644 --- a/packages/common/nbstore/src/impls/idb/v1/db.ts +++ b/packages/common/nbstore/src/impls/idb/v1/db.ts @@ -1,6 +1,6 @@ import { type DBSchema, type IDBPDatabase, openDB } from 'idb'; -import { Connection } from '../../../connection'; +import { AutoReconnectConnection } from '../../../connection'; export interface DocDBSchema extends DBSchema { workspace: { @@ -15,7 +15,9 @@ export interface DocDBSchema extends DBSchema { }; } -export class DocIDBConnection extends Connection> { +export class DocIDBConnection extends AutoReconnectConnection< + IDBPDatabase +> { override get shareId() { return 'idb(old):affine-local'; } @@ -40,7 +42,9 @@ export interface BlobDBSchema extends DBSchema { }; } -export class BlobIDBConnection extends Connection> { +export class BlobIDBConnection extends AutoReconnectConnection< + IDBPDatabase +> { constructor(private readonly workspaceId: string) { super(); } diff --git a/packages/common/nbstore/src/impls/idb/v1/doc.ts b/packages/common/nbstore/src/impls/idb/v1/doc.ts index 7dc538830b..cd21db5527 100644 --- a/packages/common/nbstore/src/impls/idb/v1/doc.ts +++ b/packages/common/nbstore/src/impls/idb/v1/doc.ts @@ -1,11 +1,15 @@ import { share } from '../../../connection'; -import { type DocRecord, DocStorage, type DocUpdate } from '../../../storage'; +import { + type DocRecord, + DocStorageBase, + type DocUpdate, +} from '../../../storage'; import { DocIDBConnection } from './db'; /** * @deprecated readonly */ -export class IndexedDBV1DocStorage extends DocStorage { +export class IndexedDBV1DocStorage extends DocStorageBase { readonly connection = share(new DocIDBConnection()); get db() { diff --git a/packages/common/nbstore/src/impls/index.ts b/packages/common/nbstore/src/impls/index.ts index bff2874ad4..e43dd7df34 100644 --- a/packages/common/nbstore/src/impls/index.ts +++ b/packages/common/nbstore/src/impls/index.ts @@ -1,5 +1,10 @@ import type { Storage } from '../storage'; -import { CloudBlobStorage, CloudDocStorage } from './cloud'; +import { BroadcastChannelAwarenessStorage } from './broadcast-channel/awareness'; +import { + CloudAwarenessStorage, + CloudBlobStorage, + CloudDocStorage, +} from './cloud'; import { IndexedDBBlobStorage, IndexedDBDocStorage, @@ -13,6 +18,7 @@ const idb: StorageConstructor[] = [ IndexedDBDocStorage, IndexedDBBlobStorage, IndexedDBSyncStorage, + BroadcastChannelAwarenessStorage, ]; const idbv1: StorageConstructor[] = [ @@ -20,7 +26,11 @@ const idbv1: StorageConstructor[] = [ IndexedDBV1BlobStorage, ]; -const cloud: StorageConstructor[] = [CloudDocStorage, CloudBlobStorage]; +const cloud: StorageConstructor[] = [ + CloudDocStorage, + CloudBlobStorage, + CloudAwarenessStorage, +]; export const storages: StorageConstructor[] = cloud.concat(idbv1, idb); diff --git a/packages/common/nbstore/src/impls/sqlite/blob.ts b/packages/common/nbstore/src/impls/sqlite/blob.ts index 803f433fa5..40f9f44cc0 100644 --- a/packages/common/nbstore/src/impls/sqlite/blob.ts +++ b/packages/common/nbstore/src/impls/sqlite/blob.ts @@ -1,8 +1,8 @@ import { share } from '../../connection'; -import { type BlobRecord, BlobStorage } from '../../storage'; +import { type BlobRecord, BlobStorageBase } from '../../storage'; import { NativeDBConnection } from './db'; -export class SqliteBlobStorage extends BlobStorage { +export class SqliteBlobStorage extends BlobStorageBase { override connection = share( new NativeDBConnection(this.peer, this.spaceType, this.spaceId) ); diff --git a/packages/common/nbstore/src/impls/sqlite/db.ts b/packages/common/nbstore/src/impls/sqlite/db.ts index e7f94a1e89..861d41ed12 100644 --- a/packages/common/nbstore/src/impls/sqlite/db.ts +++ b/packages/common/nbstore/src/impls/sqlite/db.ts @@ -1,6 +1,6 @@ import { apis } from '@affine/electron-api'; -import { Connection } from '../../connection'; +import { AutoReconnectConnection } from '../../connection'; import { type SpaceType, universalId } from '../../storage'; type NativeDBApis = NonNullable['nbstore'] extends infer APIs @@ -13,7 +13,7 @@ type NativeDBApis = NonNullable['nbstore'] extends infer APIs } : never; -export class NativeDBConnection extends Connection { +export class NativeDBConnection extends AutoReconnectConnection { readonly apis: NativeDBApis; constructor( diff --git a/packages/common/nbstore/src/impls/sqlite/doc.ts b/packages/common/nbstore/src/impls/sqlite/doc.ts index 3147130e63..1c2bd4f0df 100644 --- a/packages/common/nbstore/src/impls/sqlite/doc.ts +++ b/packages/common/nbstore/src/impls/sqlite/doc.ts @@ -1,8 +1,8 @@ import { share } from '../../connection'; -import { type DocClock, DocStorage, type DocUpdate } from '../../storage'; +import { type DocClock, DocStorageBase, type DocUpdate } from '../../storage'; import { NativeDBConnection } from './db'; -export class SqliteDocStorage extends DocStorage { +export class SqliteDocStorage extends DocStorageBase { override connection = share( new NativeDBConnection(this.peer, this.spaceType, this.spaceId) ); diff --git a/packages/common/nbstore/src/impls/sqlite/sync.ts b/packages/common/nbstore/src/impls/sqlite/sync.ts index 26da3f6377..6344fdb526 100644 --- a/packages/common/nbstore/src/impls/sqlite/sync.ts +++ b/packages/common/nbstore/src/impls/sqlite/sync.ts @@ -1,8 +1,8 @@ import { share } from '../../connection'; -import { type DocClock, SyncStorage } from '../../storage'; +import { BasicSyncStorage, type DocClock } from '../../storage'; import { NativeDBConnection } from './db'; -export class SqliteSyncStorage extends SyncStorage { +export class SqliteSyncStorage extends BasicSyncStorage { override connection = share( new NativeDBConnection(this.peer, this.spaceType, this.spaceId) ); diff --git a/packages/common/nbstore/src/impls/sqlite/v1/blob.ts b/packages/common/nbstore/src/impls/sqlite/v1/blob.ts index d01ab58198..7f3de15bb5 100644 --- a/packages/common/nbstore/src/impls/sqlite/v1/blob.ts +++ b/packages/common/nbstore/src/impls/sqlite/v1/blob.ts @@ -1,13 +1,13 @@ import { apis } from '@affine/electron-api'; -import { DummyConnection, share } from '../../../connection'; -import { BlobStorage } from '../../../storage'; +import { DummyConnection } from '../../../connection'; +import { BlobStorageBase } from '../../../storage'; /** * @deprecated readonly */ -export class SqliteV1BlobStorage extends BlobStorage { - override connection = share(new DummyConnection()); +export class SqliteV1BlobStorage extends BlobStorageBase { + override connection = new DummyConnection(); get db() { if (!apis) { diff --git a/packages/common/nbstore/src/impls/sqlite/v1/doc.ts b/packages/common/nbstore/src/impls/sqlite/v1/doc.ts index 085a76ce41..bcf108cbb9 100644 --- a/packages/common/nbstore/src/impls/sqlite/v1/doc.ts +++ b/packages/common/nbstore/src/impls/sqlite/v1/doc.ts @@ -1,13 +1,17 @@ import { apis } from '@affine/electron-api'; -import { DummyConnection, share } from '../../../connection'; -import { type DocRecord, DocStorage, type DocUpdate } from '../../../storage'; +import { DummyConnection } from '../../../connection'; +import { + type DocRecord, + DocStorageBase, + type DocUpdate, +} from '../../../storage'; /** * @deprecated readonly */ -export class SqliteV1DocStorage extends DocStorage { - override connection = share(new DummyConnection()); +export class SqliteV1DocStorage extends DocStorageBase { + override connection = new DummyConnection(); get db() { if (!apis) { diff --git a/packages/common/nbstore/src/op/consumer.ts b/packages/common/nbstore/src/op/consumer.ts deleted file mode 100644 index 812af778ed..0000000000 --- a/packages/common/nbstore/src/op/consumer.ts +++ /dev/null @@ -1,128 +0,0 @@ -import type { OpConsumer } from '@toeverything/infra/op'; -import { Observable } from 'rxjs'; - -import { getAvailableStorageImplementations } from '../impls'; -import { - BlobStorage, - DocStorage, - HistoricalDocStorage, - SpaceStorage, - type Storage, - type StorageOptions, - SyncStorage, -} from '../storage'; -import type { SpaceStorageOps } from './ops'; - -export class SpaceStorageConsumer extends SpaceStorage { - constructor(private readonly consumer: OpConsumer) { - super([]); - this.registerConnectionHandlers(); - this.listen(); - } - - listen() { - this.consumer.listen(); - } - - add(name: string, options: StorageOptions) { - const Storage = getAvailableStorageImplementations(name); - const storage = new Storage(options); - this.storages.set(storage.storageType, storage); - this.registerStorageHandlers(storage); - } - - override async destroy() { - await super.destroy(); - this.consumer.destroy(); - } - - private registerConnectionHandlers() { - this.consumer.register('addStorage', ({ name, opts }) => { - this.add(name, opts); - }); - this.consumer.register('connect', this.connect.bind(this)); - this.consumer.register('disconnect', this.disconnect.bind(this)); - this.consumer.register('destroy', this.destroy.bind(this)); - } - - private registerStorageHandlers(storage: Storage) { - if (storage instanceof DocStorage) { - this.registerDocHandlers(storage); - } else if (storage instanceof BlobStorage) { - this.registerBlobHandlers(storage); - } else if (storage instanceof SyncStorage) { - this.registerSyncHandlers(storage); - } - } - - private registerDocHandlers(storage: DocStorage) { - this.consumer.register('getDoc', storage.getDoc.bind(storage)); - this.consumer.register('getDocDiff', ({ docId, state }) => { - return storage.getDocDiff(docId, state); - }); - this.consumer.register('pushDocUpdate', ({ update, origin }) => { - return storage.pushDocUpdate(update, origin); - }); - this.consumer.register( - 'getDocTimestamps', - storage.getDocTimestamps.bind(storage) - ); - this.consumer.register('deleteDoc', storage.deleteDoc.bind(storage)); - this.consumer.register('subscribeDocUpdate', () => { - return new Observable(subscriber => { - subscriber.add( - storage.subscribeDocUpdate((update, origin) => { - subscriber.next({ update, origin }); - }) - ); - }); - }); - - if (storage instanceof HistoricalDocStorage) { - this.consumer.register('listHistory', ({ docId, filter }) => { - return storage.listHistories(docId, filter); - }); - this.consumer.register('getHistory', ({ docId, timestamp }) => { - return storage.getHistory(docId, timestamp); - }); - this.consumer.register('deleteHistory', ({ docId, timestamp }) => { - return storage.deleteHistory(docId, timestamp); - }); - this.consumer.register('rollbackDoc', ({ docId, timestamp }) => { - return storage.rollbackDoc(docId, timestamp); - }); - } - } - - private registerBlobHandlers(storage: BlobStorage) { - this.consumer.register('getBlob', key => { - return storage.get(key); - }); - this.consumer.register('setBlob', blob => { - return storage.set(blob); - }); - this.consumer.register('deleteBlob', ({ key, permanently }) => { - return storage.delete(key, permanently); - }); - this.consumer.register('listBlobs', storage.list.bind(storage)); - this.consumer.register('releaseBlobs', storage.release.bind(storage)); - } - - private registerSyncHandlers(storage: SyncStorage) { - this.consumer.register( - 'getPeerClocks', - storage.getPeerRemoteClocks.bind(storage) - ); - this.consumer.register('setPeerClock', ({ peer, ...clock }) => { - return storage.setPeerRemoteClock(peer, clock); - }); - this.consumer.register( - 'getPeerPushedClocks', - storage.getPeerPushedClocks.bind(storage) - ); - this.consumer.register('setPeerPushedClock', ({ peer, ...clock }) => { - return storage.setPeerPushedClock(peer, clock); - }); - this.consumer.register('clearClocks', storage.clearClocks.bind(storage)); - } -} diff --git a/packages/common/nbstore/src/op/index.ts b/packages/common/nbstore/src/op/index.ts deleted file mode 100644 index f07cbee357..0000000000 --- a/packages/common/nbstore/src/op/index.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { OpClient } from '@toeverything/infra/op'; - -import type { Storage } from '../storage'; -import type { SpaceStorageOps } from './ops'; - -export { SpaceStorageConsumer } from './consumer'; - -export class SpaceStorageClient extends OpClient { - /** - * Adding a storage implementation to the backend. - * - * NOTE: - * Because the storage beckend might be put behind a worker, we cant pass the instance but only - * the constructor name and its options to let the backend construct the instance. - */ - async addStorage Storage>( - Impl: T, - ...opts: ConstructorParameters - ) { - await this.call('addStorage', { name: Impl.name, opts: opts[0] }); - } - - async connect() { - await this.call('connect'); - } - - async disconnect() { - await this.call('disconnect'); - } - - override destroy() { - this.call('destroy').catch(console.error); - super.destroy(); - } - - connection$() { - return this.ob$('connection'); - } -} - -export class SpaceStorageWorkerClient extends SpaceStorageClient { - private readonly worker: Worker; - constructor() { - const worker = new Worker(new URL('./worker.ts', import.meta.url)); - super(worker); - this.worker = worker; - } - - override destroy() { - super.destroy(); - this.worker.terminate(); - } -} diff --git a/packages/common/nbstore/src/op/ops.ts b/packages/common/nbstore/src/op/ops.ts deleted file mode 100644 index 6509acc5f4..0000000000 --- a/packages/common/nbstore/src/op/ops.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { type OpSchema } from '@toeverything/infra/op'; - -import type { ConnectionStatus } from '../connection'; -import type { - BlobRecord, - DocClock, - DocClocks, - DocDiff, - DocRecord, - DocUpdate, - HistoryFilter, - ListedBlobRecord, - ListedHistory, - StorageOptions, - StorageType, -} from '../storage'; - -export interface SpaceStorageOps extends OpSchema { - // init - addStorage: [{ name: string; opts: StorageOptions }, void]; - - // connection - connect: [void, void]; - disconnect: [void, void]; - connection: [ - void, - { storage: StorageType; status: ConnectionStatus; error?: Error }, - ]; - destroy: [void, void]; - - // doc - getDoc: [string, DocRecord | null]; - getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null]; - pushDocUpdate: [{ update: DocUpdate; origin?: string }, DocClock]; - getDocTimestamps: [Date, DocClocks]; - deleteDoc: [string, void]; - subscribeDocUpdate: [void, { update: DocRecord; origin?: string }]; - - // history - listHistory: [{ docId: string; filter?: HistoryFilter }, ListedHistory[]]; - getHistory: [DocClock, DocRecord | null]; - deleteHistory: [DocClock, void]; - rollbackDoc: [DocClock & { editor?: string }, void]; - - // blob - getBlob: [string, BlobRecord | null]; - setBlob: [BlobRecord, void]; - deleteBlob: [{ key: string; permanently: boolean }, void]; - releaseBlobs: [void, void]; - listBlobs: [void, ListedBlobRecord[]]; - - // sync - getPeerClocks: [string, DocClocks]; - setPeerClock: [{ peer: string } & DocClock, void]; - getPeerPushedClocks: [string, DocClocks]; - setPeerPushedClock: [{ peer: string } & DocClock, void]; - clearClocks: [void, void]; -} diff --git a/packages/common/nbstore/src/op/worker.ts b/packages/common/nbstore/src/op/worker.ts deleted file mode 100644 index 62b85b2c5b..0000000000 --- a/packages/common/nbstore/src/op/worker.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { OpConsumer } from '@toeverything/infra/op'; - -import { SpaceStorageConsumer } from './consumer'; -import type { SpaceStorageOps } from './ops'; - -const consumer = new SpaceStorageConsumer( - // @ts-expect-error safe - new OpConsumer(self) -); - -consumer.listen(); diff --git a/packages/common/nbstore/src/storage/awareness.ts b/packages/common/nbstore/src/storage/awareness.ts index 5b47f3a450..489de1a0aa 100644 --- a/packages/common/nbstore/src/storage/awareness.ts +++ b/packages/common/nbstore/src/storage/awareness.ts @@ -1,4 +1,4 @@ -import { Storage, type StorageOptions } from './storage'; +import { type Storage, StorageBase, type StorageOptions } from './storage'; export interface AwarenessStorageOptions extends StorageOptions {} @@ -7,21 +7,35 @@ export type AwarenessRecord = { bin: Uint8Array; }; -export abstract class AwarenessStorage< - Options extends AwarenessStorageOptions = AwarenessStorageOptions, -> extends Storage { - override readonly storageType = 'awareness'; +export interface AwarenessStorage extends Storage { + readonly storageType: 'awareness'; /** * Update the awareness record. * * @param origin - Internal identifier to recognize the source in the "update" event. Will not be stored or transferred. */ + update(record: AwarenessRecord, origin?: string): Promise; + subscribeUpdate( + id: string, + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => Promise + ): () => void; +} + +export abstract class AwarenessStorageBase< + Options extends AwarenessStorageOptions = AwarenessStorageOptions, + > + extends StorageBase + implements AwarenessStorage +{ + override readonly storageType = 'awareness'; + abstract update(record: AwarenessRecord, origin?: string): Promise; abstract subscribeUpdate( id: string, onUpdate: (update: AwarenessRecord, origin?: string) => void, - onCollect: () => AwarenessRecord + onCollect: () => Promise ): () => void; } diff --git a/packages/common/nbstore/src/storage/blob.ts b/packages/common/nbstore/src/storage/blob.ts index 8625e4b5c2..4ad70517ee 100644 --- a/packages/common/nbstore/src/storage/blob.ts +++ b/packages/common/nbstore/src/storage/blob.ts @@ -1,4 +1,4 @@ -import { Storage, type StorageOptions } from './storage'; +import { type Storage, StorageBase, type StorageOptions } from './storage'; export interface BlobStorageOptions extends StorageOptions {} @@ -16,9 +16,25 @@ export interface ListedBlobRecord { createdAt?: Date; } -export abstract class BlobStorage< - Options extends BlobStorageOptions = BlobStorageOptions, -> extends Storage { +export interface BlobStorage extends Storage { + readonly storageType: 'blob'; + get(key: string, signal?: AbortSignal): Promise; + set(blob: BlobRecord, signal?: AbortSignal): Promise; + delete( + key: string, + permanently: boolean, + signal?: AbortSignal + ): Promise; + release(signal?: AbortSignal): Promise; + list(signal?: AbortSignal): Promise; +} + +export abstract class BlobStorageBase< + Options extends BlobStorageOptions = BlobStorageOptions, + > + extends StorageBase + implements BlobStorage +{ override readonly storageType = 'blob'; abstract get(key: string, signal?: AbortSignal): Promise; diff --git a/packages/common/nbstore/src/storage/doc.ts b/packages/common/nbstore/src/storage/doc.ts index 3f5bfabe07..9a2c5a0818 100644 --- a/packages/common/nbstore/src/storage/doc.ts +++ b/packages/common/nbstore/src/storage/doc.ts @@ -4,7 +4,7 @@ import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs'; import { isEmptyUpdate } from '../utils/is-empty-update'; import type { Locker } from './lock'; import { SingletonLocker } from './lock'; -import { Storage, type StorageOptions } from './storage'; +import { type Storage, StorageBase, type StorageOptions } from './storage'; export interface DocClock { docId: string; @@ -37,17 +37,67 @@ export interface DocStorageOptions extends StorageOptions { mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; } -export abstract class DocStorage< - Opts extends DocStorageOptions = DocStorageOptions, -> extends Storage { +export interface DocStorage extends Storage { + readonly storageType: 'doc'; + + /** + * Get a doc record with latest binary. + */ + getDoc(docId: string): Promise; + /** + * Get a yjs binary diff with the given state vector. + */ + getDocDiff(docId: string, state?: Uint8Array): Promise; + /** + * Push updates into storage + * + * @param origin - Internal identifier to recognize the source in the "update" event. Will not be stored or transferred. + */ + pushDocUpdate(update: DocUpdate, origin?: string): Promise; + + /** + * Get the timestamp of the latest update of a doc. + */ + getDocTimestamp(docId: string): Promise; + + /** + * Get all docs timestamps info. especially for useful in sync process. + */ + getDocTimestamps(after?: Date): Promise; + + /** + * Delete a specific doc data with all snapshots and updates + */ + deleteDoc(docId: string): Promise; + + /** + * Subscribe on doc updates emitted from storage itself. + * + * NOTE: + * + * There is not always update emitted from storage itself. + * + * For example, in Sqlite storage, the update will only come from user's updating on docs, + * in other words, the update will never somehow auto generated in storage internally. + * + * But for Cloud storage, there will be updates broadcasted from other clients, + * so the storage will emit updates to notify the client to integrate them. + */ + subscribeDocUpdate( + callback: (update: DocRecord, origin?: string) => void + ): () => void; +} + +export abstract class DocStorageBase< + Opts extends DocStorageOptions = DocStorageOptions, + > + extends StorageBase + implements DocStorage +{ private readonly event = new EventEmitter2(); override readonly storageType = 'doc'; protected readonly locker: Locker = new SingletonLocker(); - // REGION: open apis by Op system - /** - * Get a doc record with latest binary. - */ async getDoc(docId: string) { await using _lock = await this.lockDocForUpdate(docId); @@ -78,9 +128,6 @@ export abstract class DocStorage< return snapshot; } - /** - * Get a yjs binary diff with the given state vector. - */ async getDocDiff(docId: string, state?: Uint8Array) { const doc = await this.getDoc(docId); @@ -96,41 +143,14 @@ export abstract class DocStorage< }; } - /** - * Push updates into storage - * - * @param origin - Internal identifier to recognize the source in the "update" event. Will not be stored or transferred. - */ abstract pushDocUpdate(update: DocUpdate, origin?: string): Promise; - /** - * Get the timestamp of the latest update of a doc. - */ abstract getDocTimestamp(docId: string): Promise; - /** - * Get all docs timestamps info. especially for useful in sync process. - */ abstract getDocTimestamps(after?: Date): Promise; - /** - * Delete a specific doc data with all snapshots and updates - */ abstract deleteDoc(docId: string): Promise; - /** - * Subscribe on doc updates emitted from storage itself. - * - * NOTE: - * - * There is not always update emitted from storage itself. - * - * For example, in Sqlite storage, the update will only come from user's updating on docs, - * in other words, the update will never somehow auto generated in storage internally. - * - * But for Cloud storage, there will be updates broadcasted from other clients, - * so the storage will emit updates to notify the client to integrate them. - */ subscribeDocUpdate(callback: (update: DocRecord, origin?: string) => void) { this.event.on('update', callback); @@ -138,7 +158,6 @@ export abstract class DocStorage< this.event.off('update', callback); }; } - // ENDREGION // REGION: api for internal usage protected on( diff --git a/packages/common/nbstore/src/storage/history.ts b/packages/common/nbstore/src/storage/history.ts index f112b3c83b..5908b75f70 100644 --- a/packages/common/nbstore/src/storage/history.ts +++ b/packages/common/nbstore/src/storage/history.ts @@ -7,7 +7,7 @@ import { UndoManager, } from 'yjs'; -import { type DocRecord, DocStorage, type DocStorageOptions } from './doc'; +import { type DocRecord, DocStorageBase, type DocStorageOptions } from './doc'; export interface HistoryFilter { before?: Date; @@ -21,7 +21,7 @@ export interface ListedHistory { export abstract class HistoricalDocStorage< Options extends DocStorageOptions = DocStorageOptions, -> extends DocStorage { +> extends DocStorageBase { constructor(opts: Options) { super(opts); diff --git a/packages/common/nbstore/src/storage/index.ts b/packages/common/nbstore/src/storage/index.ts index 46da1f065c..eaea342bb9 100644 --- a/packages/common/nbstore/src/storage/index.ts +++ b/packages/common/nbstore/src/storage/index.ts @@ -40,20 +40,20 @@ export class SpaceStorage { connect() { Array.from(this.storages.values()).forEach(storage => { - storage.connect(); + storage.connection.connect(); }); } disconnect() { Array.from(this.storages.values()).forEach(storage => { - storage.disconnect(); + storage.connection.disconnect(); }); } - async waitForConnected() { + async waitForConnected(signal?: AbortSignal) { await Promise.all( Array.from(this.storages.values()).map(storage => - storage.waitForConnected() + storage.connection.waitForConnected(signal) ) ); } @@ -65,6 +65,7 @@ export class SpaceStorage { } } +export * from './awareness'; export * from './blob'; export * from './doc'; export * from './history'; diff --git a/packages/common/nbstore/src/storage/storage.ts b/packages/common/nbstore/src/storage/storage.ts index b1c3ac7e27..8cc5a7e87e 100644 --- a/packages/common/nbstore/src/storage/storage.ts +++ b/packages/common/nbstore/src/storage/storage.ts @@ -80,7 +80,18 @@ export function parseUniversalId(id: string) { return result as any; } -export abstract class Storage { +export interface Storage { + readonly storageType: StorageType; + readonly connection: Connection; + readonly peer: string; + readonly spaceType: string; + readonly spaceId: string; + readonly universalId: string; +} + +export abstract class StorageBase + implements Storage +{ abstract readonly storageType: StorageType; abstract readonly connection: Connection; @@ -101,16 +112,4 @@ export abstract class Storage { } constructor(public readonly options: Opts) {} - - connect() { - this.connection.connect(); - } - - disconnect() { - this.connection.disconnect(); - } - - async waitForConnected() { - await this.connection.waitForConnected(); - } } diff --git a/packages/common/nbstore/src/storage/sync.ts b/packages/common/nbstore/src/storage/sync.ts index cc10c3cf41..9edda9b630 100644 --- a/packages/common/nbstore/src/storage/sync.ts +++ b/packages/common/nbstore/src/storage/sync.ts @@ -1,11 +1,32 @@ import type { DocClock, DocClocks } from './doc'; -import { Storage, type StorageOptions } from './storage'; +import { type Storage, StorageBase, type StorageOptions } from './storage'; export interface SyncStorageOptions extends StorageOptions {} -export abstract class SyncStorage< - Opts extends SyncStorageOptions = SyncStorageOptions, -> extends Storage { +export interface SyncStorage extends Storage { + readonly storageType: 'sync'; + + getPeerRemoteClock(peer: string, docId: string): Promise; + getPeerRemoteClocks(peer: string): Promise; + setPeerRemoteClock(peer: string, clock: DocClock): Promise; + getPeerPulledRemoteClock( + peer: string, + docId: string + ): Promise; + getPeerPulledRemoteClocks(peer: string): Promise; + setPeerPulledRemoteClock(peer: string, clock: DocClock): Promise; + getPeerPushedClock(peer: string, docId: string): Promise; + getPeerPushedClocks(peer: string): Promise; + setPeerPushedClock(peer: string, clock: DocClock): Promise; + clearClocks(): Promise; +} + +export abstract class BasicSyncStorage< + Opts extends SyncStorageOptions = SyncStorageOptions, + > + extends StorageBase + implements SyncStorage +{ override readonly storageType = 'sync'; abstract getPeerRemoteClock( diff --git a/packages/common/nbstore/src/sync/awareness/index.ts b/packages/common/nbstore/src/sync/awareness/index.ts index cfdcbf9047..51971448f3 100644 --- a/packages/common/nbstore/src/sync/awareness/index.ts +++ b/packages/common/nbstore/src/sync/awareness/index.ts @@ -3,7 +3,16 @@ import type { AwarenessStorage, } from '../../storage/awareness'; -export class AwarenessSync { +export interface AwarenessSync { + update(record: AwarenessRecord, origin?: string): Promise; + subscribeUpdate( + id: string, + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => Promise + ): () => void; +} + +export class AwarenessSyncImpl implements AwarenessSync { constructor( readonly local: AwarenessStorage, readonly remotes: AwarenessStorage[] @@ -18,7 +27,7 @@ export class AwarenessSync { subscribeUpdate( id: string, onUpdate: (update: AwarenessRecord, origin?: string) => void, - onCollect: () => AwarenessRecord + onCollect: () => Promise ): () => void { const unsubscribes = [this.local, ...this.remotes].map(peer => peer.subscribeUpdate(id, onUpdate, onCollect) diff --git a/packages/common/nbstore/src/sync/blob/index.ts b/packages/common/nbstore/src/sync/blob/index.ts index 7a337b5bef..8cafa4816b 100644 --- a/packages/common/nbstore/src/sync/blob/index.ts +++ b/packages/common/nbstore/src/sync/blob/index.ts @@ -3,7 +3,15 @@ import { difference } from 'lodash-es'; import type { BlobRecord, BlobStorage } from '../../storage'; import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; -export class BlobSync { +export interface BlobSync { + downloadBlob( + blobId: string, + signal?: AbortSignal + ): Promise; + uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise; +} + +export class BlobSyncImpl implements BlobSync { private abort: AbortController | null = null; constructor( diff --git a/packages/common/nbstore/src/sync/doc/index.ts b/packages/common/nbstore/src/sync/doc/index.ts index 0728487bda..e5465051f9 100644 --- a/packages/common/nbstore/src/sync/doc/index.ts +++ b/packages/common/nbstore/src/sync/doc/index.ts @@ -17,7 +17,13 @@ export interface DocSyncDocState { errorMessage: string | null; } -export class DocSync { +export interface DocSync { + readonly state$: Observable; + docState$(docId: string): Observable; + addPriority(id: string, priority: number): () => void; +} + +export class DocSyncImpl implements DocSync { private readonly peers: DocSyncPeer[] = this.remotes.map( remote => new DocSyncPeer(this.local, this.sync, remote) ); diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts index b11d65cdb6..2ffff7299a 100644 --- a/packages/common/nbstore/src/sync/doc/peer.ts +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -92,7 +92,7 @@ export class DocSyncPeer { /** * random unique id for recognize self in "update" event */ - private readonly uniqueId = `sync:${this.local.peer}:${this.remote.peer}:${nanoid()}`; + private readonly uniqueId = `sync:${this.local.universalId}:${this.remote.universalId}:${nanoid()}`; private readonly prioritySettings = new Map(); constructor( @@ -435,7 +435,6 @@ export class DocSyncPeer { }; async mainLoop(signal?: AbortSignal) { - // eslint-disable-next-line no-constant-condition while (true) { try { await this.retryLoop(signal); @@ -594,12 +593,12 @@ export class DocSyncPeer { } // begin to process jobs - // eslint-disable-next-line no-constant-condition + while (true) { throwIfAborted(signal); const docId = await this.status.jobDocQueue.asyncPop(signal); - // eslint-disable-next-line no-constant-condition + while (true) { // batch process jobs for the same doc const jobs = this.status.jobMap.get(docId); diff --git a/packages/common/nbstore/src/sync/index.ts b/packages/common/nbstore/src/sync/index.ts index 00c76dbb52..d787f1ff98 100644 --- a/packages/common/nbstore/src/sync/index.ts +++ b/packages/common/nbstore/src/sync/index.ts @@ -1,19 +1,23 @@ import { combineLatest, map, type Observable, of } from 'rxjs'; -import type { BlobStorage, DocStorage, SpaceStorage } from '../storage'; -import type { AwarenessStorage } from '../storage/awareness'; -import { AwarenessSync } from './awareness'; -import { BlobSync } from './blob'; -import { DocSync, type DocSyncState } from './doc'; +import type { + AwarenessStorage, + BlobStorage, + DocStorage, + SpaceStorage, +} from '../storage'; +import { AwarenessSyncImpl } from './awareness'; +import { BlobSyncImpl } from './blob'; +import { DocSyncImpl, type DocSyncState } from './doc'; export interface SyncState { doc?: DocSyncState; } export class Sync { - readonly doc: DocSync | null; - readonly blob: BlobSync | null; - readonly awareness: AwarenessSync | null; + readonly doc: DocSyncImpl | null; + readonly blob: BlobSyncImpl | null; + readonly awareness: AwarenessSyncImpl | null; readonly state$: Observable; @@ -28,7 +32,7 @@ export class Sync { this.doc = doc && sync - ? new DocSync( + ? new DocSyncImpl( doc, sync, peers @@ -37,7 +41,7 @@ export class Sync { ) : null; this.blob = blob - ? new BlobSync( + ? new BlobSyncImpl( blob, peers .map(peer => peer.tryGet('blob')) @@ -45,7 +49,7 @@ export class Sync { ) : null; this.awareness = awareness - ? new AwarenessSync( + ? new AwarenessSyncImpl( awareness, peers .map(peer => peer.tryGet('awareness')) diff --git a/packages/common/nbstore/src/worker/client.ts b/packages/common/nbstore/src/worker/client.ts new file mode 100644 index 0000000000..f4f8ef8603 --- /dev/null +++ b/packages/common/nbstore/src/worker/client.ts @@ -0,0 +1,294 @@ +import type { OpClient } from '@toeverything/infra/op'; + +import { DummyConnection } from '../connection'; +import { DocFrontend } from '../frontend/doc'; +import { + type AwarenessRecord, + type AwarenessStorage, + type BlobRecord, + type BlobStorage, + type DocRecord, + type DocStorage, + type DocUpdate, + type ListedBlobRecord, + type StorageOptions, + universalId, +} from '../storage'; +import type { AwarenessSync } from '../sync/awareness'; +import type { BlobSync } from '../sync/blob'; +import type { DocSync } from '../sync/doc'; +import type { WorkerOps } from './ops'; + +export class WorkerClient { + constructor( + private readonly client: OpClient, + private readonly options: StorageOptions + ) {} + + readonly docStorage = new WorkerDocStorage(this.client, this.options); + readonly blobStorage = new WorkerBlobStorage(this.client, this.options); + readonly awarenessStorage = new WorkerAwarenessStorage( + this.client, + this.options + ); + readonly docSync = new WorkerDocSync(this.client); + readonly blobSync = new WorkerBlobSync(this.client); + readonly awarenessSync = new WorkerAwarenessSync(this.client); + + readonly docFrontend = new DocFrontend(this.docStorage, this.docSync); +} + +class WorkerDocStorage implements DocStorage { + constructor( + private readonly client: OpClient, + private readonly options: StorageOptions + ) {} + + readonly peer = this.options.peer; + readonly spaceType = this.options.type; + readonly spaceId = this.options.id; + readonly universalId = universalId(this.options); + readonly storageType = 'doc'; + + async getDoc(docId: string) { + return this.client.call('docStorage.getDoc', docId); + } + + async getDocDiff(docId: string, state?: Uint8Array) { + return this.client.call('docStorage.getDocDiff', { docId, state }); + } + + async pushDocUpdate(update: DocUpdate, origin?: string) { + return this.client.call('docStorage.pushDocUpdate', { update, origin }); + } + + async getDocTimestamp(docId: string) { + return this.client.call('docStorage.getDocTimestamp', docId); + } + + async getDocTimestamps(after?: Date) { + return this.client.call('docStorage.getDocTimestamps', after ?? null); + } + + async deleteDoc(docId: string) { + return this.client.call('docStorage.deleteDoc', docId); + } + + subscribeDocUpdate(callback: (update: DocRecord, origin?: string) => void) { + const subscription = this.client + .ob$('docStorage.subscribeDocUpdate') + .subscribe(value => { + callback(value.update, value.origin); + }); + return () => { + subscription.unsubscribe(); + }; + } + + connection = new WorkerDocConnection(this.client); +} + +class WorkerDocConnection extends DummyConnection { + constructor(private readonly client: OpClient) { + super(); + } + + override waitForConnected(signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const abortListener = () => { + reject(signal?.reason); + subscription.unsubscribe(); + }; + + signal?.addEventListener('abort', abortListener); + + const subscription = this.client + .ob$('docStorage.waitForConnected') + .subscribe({ + next() { + signal?.removeEventListener('abort', abortListener); + resolve(); + }, + error(err) { + signal?.removeEventListener('abort', abortListener); + reject(err); + }, + }); + }); + } +} + +class WorkerBlobStorage implements BlobStorage { + constructor( + private readonly client: OpClient, + private readonly options: StorageOptions + ) {} + + readonly storageType = 'blob'; + readonly peer = this.options.peer; + readonly spaceType = this.options.type; + readonly spaceId = this.options.id; + readonly universalId = universalId(this.options); + + get(key: string, _signal?: AbortSignal): Promise { + return this.client.call('blobStorage.getBlob', key); + } + set(blob: BlobRecord, _signal?: AbortSignal): Promise { + return this.client.call('blobStorage.setBlob', blob); + } + + delete( + key: string, + permanently: boolean, + _signal?: AbortSignal + ): Promise { + return this.client.call('blobStorage.deleteBlob', { key, permanently }); + } + + release(_signal?: AbortSignal): Promise { + return this.client.call('blobStorage.releaseBlobs'); + } + + list(_signal?: AbortSignal): Promise { + return this.client.call('blobStorage.listBlobs'); + } + + connection = new DummyConnection(); +} + +class WorkerAwarenessStorage implements AwarenessStorage { + constructor( + private readonly client: OpClient, + private readonly options: StorageOptions + ) {} + + readonly storageType = 'awareness'; + readonly peer = this.options.peer; + readonly spaceType = this.options.type; + readonly spaceId = this.options.id; + readonly universalId = universalId(this.options); + + update(record: AwarenessRecord, origin?: string): Promise { + return this.client.call('awarenessStorage.update', { + awareness: record, + origin, + }); + } + subscribeUpdate( + id: string, + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => Promise + ): () => void { + const subscription = this.client + .ob$('awarenessStorage.subscribeUpdate', id) + .subscribe({ + next: update => { + if (update.type === 'awareness-update') { + onUpdate(update.awareness, update.origin); + } + if (update.type === 'awareness-collect') { + onCollect() + .then(record => { + if (record) { + this.client + .call('awarenessStorage.collect', { + awareness: record, + collectId: update.collectId, + }) + .catch(err => { + console.error('error feedback collected awareness', err); + }); + } + }) + .catch(err => { + console.error('error collecting awareness', err); + }); + } + }, + }); + return () => { + subscription.unsubscribe(); + }; + } + connection = new DummyConnection(); +} + +class WorkerDocSync implements DocSync { + constructor(private readonly client: OpClient) {} + + readonly state$ = this.client.ob$('docSync.state'); + + docState$(docId: string) { + return this.client.ob$('docSync.docState', docId); + } + + addPriority(docId: string, priority: number) { + const subscription = this.client + .ob$('docSync.addPriority', { docId, priority }) + .subscribe(); + return () => { + subscription.unsubscribe(); + }; + } +} + +class WorkerBlobSync implements BlobSync { + constructor(private readonly client: OpClient) {} + downloadBlob( + blobId: string, + _signal?: AbortSignal + ): Promise { + return this.client.call('blobSync.downloadBlob', blobId); + } + uploadBlob(blob: BlobRecord, _signal?: AbortSignal): Promise { + return this.client.call('blobSync.uploadBlob', blob); + } +} + +class WorkerAwarenessSync implements AwarenessSync { + constructor(private readonly client: OpClient) {} + + update(record: AwarenessRecord, origin?: string): Promise { + return this.client.call('awarenessSync.update', { + awareness: record, + origin, + }); + } + + subscribeUpdate( + id: string, + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => Promise + ): () => void { + const subscription = this.client + .ob$('awarenessSync.subscribeUpdate', id) + .subscribe({ + next: update => { + if (update.type === 'awareness-update') { + onUpdate(update.awareness, update.origin); + } + if (update.type === 'awareness-collect') { + onCollect() + .then(record => { + if (record) { + this.client + .call('awarenessSync.collect', { + awareness: record, + collectId: update.collectId, + }) + .catch(err => { + console.error('error feedback collected awareness', err); + }); + } + }) + .catch(err => { + console.error('error collecting awareness', err); + }); + } + }, + }); + return () => { + subscription.unsubscribe(); + }; + } +} diff --git a/packages/common/nbstore/src/worker/consumer.ts b/packages/common/nbstore/src/worker/consumer.ts new file mode 100644 index 0000000000..f83e94f435 --- /dev/null +++ b/packages/common/nbstore/src/worker/consumer.ts @@ -0,0 +1,256 @@ +import type { OpConsumer } from '@toeverything/infra/op'; +import { Observable } from 'rxjs'; + +import { getAvailableStorageImplementations } from '../impls'; +import { SpaceStorage, type StorageOptions } from '../storage'; +import type { AwarenessRecord } from '../storage/awareness'; +import { Sync } from '../sync'; +import type { WorkerOps } from './ops'; + +export class WorkerConsumer { + private remotes: SpaceStorage[] = []; + private local: SpaceStorage | null = null; + private sync: Sync | null = null; + + get ensureLocal() { + if (!this.local) { + throw new Error('Not initialized'); + } + return this.local; + } + + get ensureSync() { + if (!this.sync) { + throw new Error('Not initialized'); + } + return this.sync; + } + + get docStorage() { + return this.ensureLocal.get('doc'); + } + + get docSync() { + const docSync = this.ensureSync.doc; + if (!docSync) { + throw new Error('Doc sync not initialized'); + } + return docSync; + } + + get blobStorage() { + return this.ensureLocal.get('blob'); + } + + get blobSync() { + const blobSync = this.ensureSync.blob; + if (!blobSync) { + throw new Error('Blob sync not initialized'); + } + return blobSync; + } + + get syncStorage() { + return this.ensureLocal.get('sync'); + } + + get awarenessStorage() { + return this.ensureLocal.get('awareness'); + } + + get awarenessSync() { + const awarenessSync = this.ensureSync.awareness; + if (!awarenessSync) { + throw new Error('Awareness sync not initialized'); + } + return awarenessSync; + } + + constructor(private readonly consumer: OpConsumer) {} + + listen() { + this.registerHandlers(); + this.consumer.listen(); + } + + async init(init: { + local: { name: string; opts: StorageOptions }[]; + remotes: { name: string; opts: StorageOptions }[][]; + }) { + this.local = new SpaceStorage( + init.local.map(opt => { + const Storage = getAvailableStorageImplementations(opt.name); + return new Storage(opt.opts); + }) + ); + this.remotes = init.remotes.map(opts => { + return new SpaceStorage( + opts.map(opt => { + const Storage = getAvailableStorageImplementations(opt.name); + return new Storage(opt.opts); + }) + ); + }); + this.sync = new Sync(this.local, this.remotes); + this.local.connect(); + for (const remote of this.remotes) { + remote.connect(); + } + this.sync.start(); + } + + async destroy() { + this.sync?.stop(); + this.local?.disconnect(); + await this.local?.destroy(); + for (const remote of this.remotes) { + remote.disconnect(); + await remote.destroy(); + } + } + + private registerHandlers() { + const collectJobs = new Map< + string, + (awareness: AwarenessRecord | null) => void + >(); + let collectId = 0; + this.consumer.registerAll({ + 'worker.init': this.init.bind(this), + 'worker.destroy': this.destroy.bind(this), + 'docStorage.getDoc': (docId: string) => this.docStorage.getDoc(docId), + 'docStorage.getDocDiff': ({ docId, state }) => + this.docStorage.getDocDiff(docId, state), + 'docStorage.pushDocUpdate': ({ update, origin }) => + this.docStorage.pushDocUpdate(update, origin), + 'docStorage.getDocTimestamps': after => + this.docStorage.getDocTimestamps(after ?? undefined), + 'docStorage.getDocTimestamp': docId => + this.docStorage.getDocTimestamp(docId), + 'docStorage.deleteDoc': (docId: string) => + this.docStorage.deleteDoc(docId), + 'docStorage.subscribeDocUpdate': () => + new Observable(subscriber => { + return this.docStorage.subscribeDocUpdate((update, origin) => { + subscriber.next({ update, origin }); + }); + }), + 'docStorage.waitForConnected': () => + new Observable(subscriber => { + const abortController = new AbortController(); + this.docStorage.connection + .waitForConnected(abortController.signal) + .then(() => { + subscriber.next(true); + subscriber.complete(); + }) + .catch(error => { + subscriber.error(error); + }); + return () => abortController.abort(); + }), + 'blobStorage.getBlob': key => this.blobStorage.get(key), + 'blobStorage.setBlob': blob => this.blobStorage.set(blob), + 'blobStorage.deleteBlob': ({ key, permanently }) => + this.blobStorage.delete(key, permanently), + 'blobStorage.releaseBlobs': () => this.blobStorage.release(), + 'blobStorage.listBlobs': () => this.blobStorage.list(), + 'syncStorage.clearClocks': () => this.syncStorage.clearClocks(), + 'syncStorage.getPeerPulledRemoteClock': ({ peer, docId }) => + this.syncStorage.getPeerPulledRemoteClock(peer, docId), + 'syncStorage.getPeerPulledRemoteClocks': ({ peer }) => + this.syncStorage.getPeerPulledRemoteClocks(peer), + 'syncStorage.setPeerPulledRemoteClock': ({ peer, clock }) => + this.syncStorage.setPeerPulledRemoteClock(peer, clock), + 'syncStorage.getPeerRemoteClock': ({ peer, docId }) => + this.syncStorage.getPeerRemoteClock(peer, docId), + 'syncStorage.getPeerRemoteClocks': ({ peer }) => + this.syncStorage.getPeerRemoteClocks(peer), + 'syncStorage.setPeerRemoteClock': ({ peer, clock }) => + this.syncStorage.setPeerRemoteClock(peer, clock), + 'syncStorage.getPeerPushedClock': ({ peer, docId }) => + this.syncStorage.getPeerPushedClock(peer, docId), + 'syncStorage.getPeerPushedClocks': ({ peer }) => + this.syncStorage.getPeerPushedClocks(peer), + 'syncStorage.setPeerPushedClock': ({ peer, clock }) => + this.syncStorage.setPeerPushedClock(peer, clock), + 'awarenessStorage.update': ({ awareness, origin }) => + this.awarenessStorage.update(awareness, origin), + 'awarenessStorage.subscribeUpdate': docId => + new Observable(subscriber => { + return this.awarenessStorage.subscribeUpdate( + docId, + (update, origin) => { + subscriber.next({ + type: 'awareness-update', + awareness: update, + origin, + }); + }, + () => { + const currentCollectId = collectId++; + const promise = new Promise(resolve => { + collectJobs.set(currentCollectId.toString(), awareness => { + resolve(awareness); + collectJobs.delete(currentCollectId.toString()); + }); + }); + return promise; + } + ); + }), + 'awarenessStorage.collect': ({ collectId, awareness }) => + collectJobs.get(collectId)?.(awareness), + 'docSync.state': () => + new Observable(subscriber => { + const subscription = this.docSync.state$.subscribe(state => { + subscriber.next(state); + }); + return () => subscription.unsubscribe(); + }), + 'docSync.docState': docId => + new Observable(subscriber => { + const subscription = this.docSync + .docState$(docId) + .subscribe(state => { + subscriber.next(state); + }); + return () => subscription.unsubscribe(); + }), + 'docSync.addPriority': ({ docId, priority }) => + new Observable(() => { + const undo = this.docSync.addPriority(docId, priority); + return () => undo(); + }), + 'blobSync.downloadBlob': key => this.blobSync.downloadBlob(key), + 'blobSync.uploadBlob': blob => this.blobSync.uploadBlob(blob), + 'awarenessSync.update': ({ awareness, origin }) => + this.awarenessSync.update(awareness, origin), + 'awarenessSync.subscribeUpdate': docId => + new Observable(subscriber => { + return this.awarenessStorage.subscribeUpdate( + docId, + (update, origin) => { + subscriber.next({ + type: 'awareness-update', + awareness: update, + origin, + }); + }, + () => { + const currentCollectId = collectId++; + const promise = new Promise(resolve => { + collectJobs.set(currentCollectId.toString(), awareness => { + resolve(awareness); + collectJobs.delete(currentCollectId.toString()); + }); + }); + return promise; + } + ); + }), + 'awarenessSync.collect': ({ collectId, awareness }) => + collectJobs.get(collectId)?.(awareness), + }); + } +} diff --git a/packages/common/nbstore/src/worker/ops.ts b/packages/common/nbstore/src/worker/ops.ts new file mode 100644 index 0000000000..aabf7c2889 --- /dev/null +++ b/packages/common/nbstore/src/worker/ops.ts @@ -0,0 +1,122 @@ +import type { + BlobRecord, + DocClock, + DocClocks, + DocDiff, + DocRecord, + DocUpdate, + ListedBlobRecord, + StorageOptions, +} from '../storage'; +import type { AwarenessRecord } from '../storage/awareness'; +import type { DocSyncDocState, DocSyncState } from '../sync/doc'; + +interface GroupedWorkerOps { + worker: { + init: [ + { + local: { name: string; opts: StorageOptions }[]; + remotes: { name: string; opts: StorageOptions }[][]; + }, + void, + ]; + destroy: [void, void]; + }; + + docStorage: { + getDoc: [string, DocRecord | null]; + getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null]; + pushDocUpdate: [{ update: DocUpdate; origin?: string }, DocClock]; + getDocTimestamps: [Date | null, DocClocks]; + getDocTimestamp: [string, DocClock | null]; + deleteDoc: [string, void]; + subscribeDocUpdate: [void, { update: DocRecord; origin?: string }]; + waitForConnected: [void, boolean]; + }; + + blobStorage: { + getBlob: [string, BlobRecord | null]; + setBlob: [BlobRecord, void]; + deleteBlob: [{ key: string; permanently: boolean }, void]; + releaseBlobs: [void, void]; + listBlobs: [void, ListedBlobRecord[]]; + }; + + syncStorage: { + getPeerPulledRemoteClocks: [{ peer: string }, DocClocks]; + getPeerPulledRemoteClock: [ + { peer: string; docId: string }, + DocClock | null, + ]; + setPeerPulledRemoteClock: [{ peer: string; clock: DocClock }, void]; + getPeerRemoteClocks: [{ peer: string }, DocClocks]; + getPeerRemoteClock: [{ peer: string; docId: string }, DocClock | null]; + setPeerRemoteClock: [{ peer: string; clock: DocClock }, void]; + getPeerPushedClocks: [{ peer: string }, DocClocks]; + getPeerPushedClock: [{ peer: string; docId: string }, DocClock | null]; + setPeerPushedClock: [{ peer: string; clock: DocClock }, void]; + clearClocks: [void, void]; + }; + + awarenessStorage: { + update: [{ awareness: AwarenessRecord; origin?: string }, void]; + subscribeUpdate: [ + string, + ( + | { + type: 'awareness-update'; + awareness: AwarenessRecord; + origin?: string; + } + | { type: 'awareness-collect'; collectId: string } + ), + ]; + collect: [{ collectId: string; awareness: AwarenessRecord }, void]; + }; + + docSync: { + state: [void, DocSyncState]; + docState: [string, DocSyncDocState]; + addPriority: [{ docId: string; priority: number }, boolean]; + }; + + blobSync: { + downloadBlob: [string, BlobRecord | null]; + uploadBlob: [BlobRecord, void]; + }; + + awarenessSync: { + update: [{ awareness: AwarenessRecord; origin?: string }, void]; + subscribeUpdate: [ + string, + ( + | { + type: 'awareness-update'; + awareness: AwarenessRecord; + origin?: string; + } + | { type: 'awareness-collect'; collectId: string } + ), + ]; + collect: [{ collectId: string; awareness: AwarenessRecord }, void]; + }; +} + +type Values = T extends { [k in keyof T]: any } ? T[keyof T] : never; +type UnionToIntersection = (U extends any ? (x: U) => void : never) extends ( + x: infer I +) => void + ? I + : never; + +export type WorkerOps = UnionToIntersection< + Values< + Values<{ + [k in keyof GroupedWorkerOps]: { + [k2 in keyof GroupedWorkerOps[k]]: k2 extends string + ? Record<`${k}.${k2}`, GroupedWorkerOps[k][k2]> + : never; + }; + }> + > +>; diff --git a/packages/frontend/apps/electron/src/helper/nbstore/blob.ts b/packages/frontend/apps/electron/src/helper/nbstore/blob.ts index 6e41097b45..c1e0641db9 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/blob.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/blob.ts @@ -1,8 +1,8 @@ -import { type BlobRecord, BlobStorage, share } from '@affine/nbstore'; +import { type BlobRecord, BlobStorageBase, share } from '@affine/nbstore'; import { NativeDBConnection } from './db'; -export class SqliteBlobStorage extends BlobStorage { +export class SqliteBlobStorage extends BlobStorageBase { override connection = share( new NativeDBConnection(this.peer, this.spaceType, this.spaceId) ); diff --git a/packages/frontend/apps/electron/src/helper/nbstore/db.ts b/packages/frontend/apps/electron/src/helper/nbstore/db.ts index af03271a74..d5edb4c333 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/db.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/db.ts @@ -1,13 +1,13 @@ import path from 'node:path'; import { DocStorage as NativeDocStorage } from '@affine/native'; -import { Connection, type SpaceType } from '@affine/nbstore'; +import { AutoReconnectConnection, type SpaceType } from '@affine/nbstore'; import fs from 'fs-extra'; import { logger } from '../logger'; import { getSpaceDBPath } from '../workspace/meta'; -export class NativeDBConnection extends Connection { +export class NativeDBConnection extends AutoReconnectConnection { constructor( private readonly peer: string, private readonly type: SpaceType, diff --git a/packages/frontend/apps/electron/src/helper/nbstore/doc.ts b/packages/frontend/apps/electron/src/helper/nbstore/doc.ts index 016ef9efd6..4078f50513 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/doc.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/doc.ts @@ -1,14 +1,14 @@ import { type DocClocks, type DocRecord, - DocStorage, + DocStorageBase, type DocUpdate, share, } from '@affine/nbstore'; import { NativeDBConnection } from './db'; -export class SqliteDocStorage extends DocStorage { +export class SqliteDocStorage extends DocStorageBase { override connection = share( new NativeDBConnection(this.peer, this.spaceType, this.spaceId) ); diff --git a/packages/frontend/apps/electron/src/helper/nbstore/sync.ts b/packages/frontend/apps/electron/src/helper/nbstore/sync.ts index 2ffcf259e9..2942371b59 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/sync.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/sync.ts @@ -1,13 +1,13 @@ import { + BasicSyncStorage, type DocClock, type DocClocks, share, - SyncStorage, } from '@affine/nbstore'; import { NativeDBConnection } from './db'; -export class SqliteSyncStorage extends SyncStorage { +export class SqliteSyncStorage extends BasicSyncStorage { override connection = share( new NativeDBConnection(this.peer, this.spaceType, this.spaceId) );