diff --git a/packages/common/nbstore/src/impls/cloud/awareness.ts b/packages/common/nbstore/src/impls/cloud/awareness.ts index 40b562f5c7..65c60cedb6 100644 --- a/packages/common/nbstore/src/impls/cloud/awareness.ts +++ b/packages/common/nbstore/src/impls/cloud/awareness.ts @@ -1,4 +1,3 @@ -import { share } from '../../connection'; import { type AwarenessRecord, AwarenessStorageBase, @@ -23,10 +22,10 @@ export class CloudAwarenessStorage extends AwarenessStorageBase { super(); } - connection = share(new SocketConnection(`${this.options.serverBaseUrl}/`)); + connection = new SocketConnection(`${this.options.serverBaseUrl}/`); private get socket() { - return this.connection.inner; + return this.connection.inner.socket; } override async update(record: AwarenessRecord): Promise { diff --git a/packages/common/nbstore/src/impls/cloud/doc.ts b/packages/common/nbstore/src/impls/cloud/doc.ts index 4f0ac0be94..353b8e91bb 100644 --- a/packages/common/nbstore/src/impls/cloud/doc.ts +++ b/packages/common/nbstore/src/impls/cloud/doc.ts @@ -25,7 +25,7 @@ export class CloudDocStorage extends DocStorageBase { static readonly identifier = 'CloudDocStorage'; get socket() { - return this.connection.inner; + return this.connection.inner.socket; } get idConverter() { if (!this.connection.idConverter) { @@ -191,7 +191,7 @@ class CloudDocStorageConnection extends SocketConnection { idConverter: IdConverter | null = null; override async doConnect(signal?: AbortSignal) { - const socket = await super.doConnect(signal); + const { socket, disconnect } = await super.doConnect(signal); try { const res = await socket.emitWithAck('space:join', { @@ -210,20 +210,26 @@ class CloudDocStorageConnection extends SocketConnection { socket.on('space:broadcast-doc-update', this.onServerUpdate); - return socket; + return { socket, disconnect }; } catch (e) { - socket.close(); + disconnect(); throw e; } } - override doDisconnect(socket: Socket) { + override doDisconnect({ + socket, + disconnect, + }: { + socket: Socket; + disconnect: () => void; + }) { socket.emit('space:leave', { spaceType: this.options.type, spaceId: this.options.id, }); socket.off('space:broadcast-doc-update', this.onServerUpdate); - super.disconnect(); + super.doDisconnect({ socket, disconnect }); } async getIdConverter(socket: Socket) { diff --git a/packages/common/nbstore/src/impls/cloud/socket.ts b/packages/common/nbstore/src/impls/cloud/socket.ts index 528d3be039..683bb486b0 100644 --- a/packages/common/nbstore/src/impls/cloud/socket.ts +++ b/packages/common/nbstore/src/impls/cloud/socket.ts @@ -1,7 +1,6 @@ import { Manager as SocketIOManager, type Socket as SocketIO, - type SocketOptions, } from 'socket.io-client'; import { AutoReconnectConnection } from '../../connection'; @@ -151,49 +150,78 @@ export function base64ToUint8Array(base64: string) { return new Uint8Array(binaryArray); } -const SOCKET_IOMANAGER_CACHE = new Map(); -function getSocketIOManager(endpoint: string) { - let manager = SOCKET_IOMANAGER_CACHE.get(endpoint); - if (!manager) { - manager = new SocketIOManager(endpoint, { +class SocketManager { + private readonly socketIOManager: SocketIOManager; + socket: Socket; + refCount = 0; + + constructor(endpoint: string) { + this.socketIOManager = new SocketIOManager(endpoint, { autoConnect: false, transports: ['websocket'], secure: new URL(endpoint).protocol === 'https:', // we will handle reconnection by ourselves reconnection: false, }); - SOCKET_IOMANAGER_CACHE.set(endpoint, manager); + this.socket = this.socketIOManager.socket('/'); + } + + connect() { + let disconnected = false; + this.refCount++; + this.socket.connect(); + return { + socket: this.socket, + disconnect: () => { + if (disconnected) { + return; + } + disconnected = true; + this.refCount--; + if (this.refCount === 0) { + this.socket.disconnect(); + } + }, + }; + } +} + +const SOCKET_MANAGER_CACHE = new Map(); +function getSocketManager(endpoint: string) { + let manager = SOCKET_MANAGER_CACHE.get(endpoint); + if (!manager) { + manager = new SocketManager(endpoint); + SOCKET_MANAGER_CACHE.set(endpoint, manager); } return manager; } -export class SocketConnection extends AutoReconnectConnection { - manager = getSocketIOManager(this.endpoint); +export class SocketConnection extends AutoReconnectConnection<{ + socket: Socket; + disconnect: () => void; +}> { + manager = getSocketManager(this.endpoint); - constructor( - private readonly endpoint: string, - private readonly socketOptions?: SocketOptions - ) { + constructor(private readonly endpoint: string) { super(); } - override get shareId() { - return `socket:${this.endpoint}`; - } - override async doConnect(signal?: AbortSignal) { - const socket = this.manager.socket('/', this.socketOptions); + const { socket, disconnect } = this.manager.connect(); try { throwIfAborted(signal); await Promise.race([ new Promise((resolve, reject) => { + if (socket.connected) { + resolve(); + return; + } socket.once('connect', () => { resolve(); }); socket.once('connect_error', err => { reject(err); }); - socket.open(); }), new Promise((_resolve, reject) => { signal?.addEventListener('abort', () => { @@ -202,18 +230,21 @@ export class SocketConnection extends AutoReconnectConnection { }), ]); } catch (err) { - socket.close(); + disconnect(); throw err; } socket.on('disconnect', this.handleDisconnect); - return socket; + return { + socket, + disconnect, + }; } - override doDisconnect(conn: Socket) { - conn.off('disconnect', this.handleDisconnect); - conn.close(); + override doDisconnect(conn: { socket: Socket; disconnect: () => void }) { + conn.socket.off('disconnect', this.handleDisconnect); + conn.disconnect(); } handleDisconnect = (reason: SocketIO.DisconnectReason) => { diff --git a/packages/common/nbstore/src/impls/sqlite/doc.ts b/packages/common/nbstore/src/impls/sqlite/doc.ts index 7b94d15d26..aa2e954e5c 100644 --- a/packages/common/nbstore/src/impls/sqlite/doc.ts +++ b/packages/common/nbstore/src/impls/sqlite/doc.ts @@ -15,7 +15,7 @@ export class SqliteDocStorage extends DocStorageBase { return this.connection.apis; } - override async pushDocUpdate(update: DocUpdate) { + override async pushDocUpdate(update: DocUpdate, origin?: string) { const timestamp = await this.db.pushUpdate(update.docId, update.bin); this.emit( diff --git a/packages/common/nbstore/src/worker/client.ts b/packages/common/nbstore/src/worker/client.ts index 8e92c62caa..72004fced9 100644 --- a/packages/common/nbstore/src/worker/client.ts +++ b/packages/common/nbstore/src/worker/client.ts @@ -1,4 +1,4 @@ -import type { OpClient } from '@toeverything/infra/op'; +import { OpClient, transfer } from '@toeverything/infra/op'; import { DummyConnection } from '../connection'; import { AwarenessFrontend, BlobFrontend, DocFrontend } from '../frontend'; @@ -14,18 +14,68 @@ import { import type { AwarenessSync } from '../sync/awareness'; import type { BlobSync } from '../sync/blob'; import type { DocSync } from '../sync/doc'; -import type { WorkerInitOptions, WorkerOps } from './ops'; +import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops'; -export type { WorkerInitOptions } from './ops'; +export type { StoreInitOptions as WorkerInitOptions } from './ops'; -export class WorkerClient { - constructor( - private readonly client: OpClient, - options: WorkerInitOptions - ) { - this.client.call('worker.init', options).catch(err => { - console.error('error initializing worker', err); +export class StoreManagerClient { + private readonly connections = new Map< + string, + { + store: StoreClient; + dispose: () => void; + } + >(); + + constructor(private readonly client: OpClient) {} + + open(key: string, options: StoreInitOptions) { + const { port1, port2 } = new MessageChannel(); + + const client = new OpClient(port1); + const closeKey = crypto.randomUUID(); + + this.client + .call( + 'open', + transfer( + { + key, + closeKey, + options, + port: port2, + }, + [port2] + ) + ) + .catch(err => { + console.error('error opening', err); + }); + + const connection = { + store: new StoreClient(client), + dispose: () => { + this.client.call('close', closeKey).catch(err => { + console.error('error closing', err); + }); + this.connections.delete(closeKey); + }, + }; + + this.connections.set(closeKey, connection); + + return connection; + } + + dispose() { + this.connections.forEach(connection => { + connection.dispose(); }); + } +} + +export class StoreClient { + constructor(private readonly client: OpClient) { this.docStorage = new WorkerDocStorage(this.client); this.blobStorage = new WorkerBlobStorage(this.client); this.docSync = new WorkerDocSync(this.client); diff --git a/packages/common/nbstore/src/worker/consumer.ts b/packages/common/nbstore/src/worker/consumer.ts index 409cab58d9..106be1ef8b 100644 --- a/packages/common/nbstore/src/worker/consumer.ts +++ b/packages/common/nbstore/src/worker/consumer.ts @@ -1,5 +1,5 @@ import { MANUALLY_STOP } from '@toeverything/infra'; -import type { OpConsumer } from '@toeverything/infra/op'; +import { OpConsumer } from '@toeverything/infra/op'; import { Observable } from 'rxjs'; import { type StorageConstructor } from '../impls'; @@ -7,14 +7,13 @@ import { SpaceStorage } from '../storage'; import type { AwarenessRecord } from '../storage/awareness'; import { Sync } from '../sync'; import type { PeerStorageOptions } from '../sync/types'; -import type { WorkerInitOptions, WorkerOps } from './ops'; +import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops'; -export type { WorkerOps }; +export type { WorkerManagerOps }; -export class WorkerConsumer { - private inited = false; - private storages: PeerStorageOptions | null = null; - private sync: Sync | null = null; +class StoreConsumer { + private readonly storages: PeerStorageOptions; + private readonly sync: Sync; get ensureLocal() { if (!this.storages) { @@ -59,18 +58,9 @@ export class WorkerConsumer { } constructor( - private readonly availableStorageImplementations: StorageConstructor[] - ) {} - - bindConsumer(consumer: OpConsumer) { - this.registerHandlers(consumer); - } - - init(init: WorkerInitOptions) { - if (this.inited) { - return; - } - this.inited = true; + private readonly availableStorageImplementations: StorageConstructor[], + init: StoreInitOptions + ) { this.storages = { local: new SpaceStorage( Object.fromEntries( @@ -122,6 +112,10 @@ export class WorkerConsumer { this.sync.start(); } + bindConsumer(consumer: OpConsumer) { + this.registerHandlers(consumer); + } + async destroy() { this.sync?.stop(); this.storages?.local.disconnect(); @@ -139,8 +133,6 @@ export class WorkerConsumer { >(); let collectId = 0; consumer.registerAll({ - 'worker.init': this.init.bind(this), - 'worker.destroy': this.destroy.bind(this), 'docStorage.getDoc': (docId: string) => this.docStorage.getDoc(docId), 'docStorage.getDocDiff': ({ docId, state }) => this.docStorage.getDocDiff(docId, state), @@ -298,3 +290,61 @@ export class WorkerConsumer { }); } } + +export class StoreManagerConsumer { + private readonly storeDisposers = new Map void>(); + private readonly storePool = new Map< + string, + { store: StoreConsumer; refCount: number } + >(); + + constructor( + private readonly availableStorageImplementations: StorageConstructor[] + ) {} + + bindConsumer(consumer: OpConsumer) { + this.registerHandlers(consumer); + } + + private registerHandlers(consumer: OpConsumer) { + consumer.registerAll({ + open: ({ port, key, closeKey, options }) => { + console.debug('open store', key, closeKey); + let storeRef = this.storePool.get(key); + + if (!storeRef) { + const store = new StoreConsumer( + this.availableStorageImplementations, + options + ); + storeRef = { store, refCount: 0 }; + } + storeRef.refCount++; + + const workerConsumer = new OpConsumer(port); + storeRef.store.bindConsumer(workerConsumer); + + this.storeDisposers.set(closeKey, () => { + storeRef.refCount--; + if (storeRef.refCount === 0) { + storeRef.store.destroy().catch(error => { + console.error(error); + }); + this.storePool.delete(key); + } + }); + this.storePool.set(key, storeRef); + return closeKey; + }, + close: key => { + console.debug('close store', key); + const workerDisposer = this.storeDisposers.get(key); + if (!workerDisposer) { + throw new Error('Worker not found'); + } + workerDisposer(); + this.storeDisposers.delete(key); + }, + }); + } +} diff --git a/packages/common/nbstore/src/worker/ops.ts b/packages/common/nbstore/src/worker/ops.ts index 5de0baaa56..1618d89a07 100644 --- a/packages/common/nbstore/src/worker/ops.ts +++ b/packages/common/nbstore/src/worker/ops.ts @@ -20,17 +20,12 @@ type StorageInitOptions = Values<{ }; }>; -export interface WorkerInitOptions { +export interface StoreInitOptions { local: { [key in StorageType]?: StorageInitOptions }; remotes: Record; } interface GroupedWorkerOps { - worker: { - init: [WorkerInitOptions, void]; - destroy: [void, void]; - }; - docStorage: { getDoc: [string, DocRecord | null]; getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null]; @@ -132,3 +127,16 @@ export type WorkerOps = UnionToIntersection< }> > >; + +export type WorkerManagerOps = { + open: [ + { + port: MessagePort; + key: string; + closeKey: string; + options: StoreInitOptions; + }, + string, + ]; + close: [string, void]; +}; diff --git a/packages/frontend/apps/android/src/app.tsx b/packages/frontend/apps/android/src/app.tsx index 8e0c809632..26bd12581b 100644 --- a/packages/frontend/apps/android/src/app.tsx +++ b/packages/frontend/apps/android/src/app.tsx @@ -14,7 +14,7 @@ import { PopupWindowProvider } from '@affine/core/modules/url'; import { ClientSchemeProvider } from '@affine/core/modules/url/providers/client-schema'; import { configureBrowserWorkbenchModule } from '@affine/core/modules/workbench'; import { configureBrowserWorkspaceFlavours } from '@affine/core/modules/workspace-engine'; -import { WorkerClient } from '@affine/nbstore/worker/client'; +import { StoreManagerClient } from '@affine/nbstore/worker/client'; import { App as CapacitorApp } from '@capacitor/app'; import { InAppBrowser } from '@capgo/inappbrowser'; import { Framework, FrameworkRoot, getCurrentStore } from '@toeverything/infra'; @@ -22,6 +22,17 @@ import { OpClient } from '@toeverything/infra/op'; import { Suspense } from 'react'; import { RouterProvider } from 'react-router-dom'; +const storeManagerClient = new StoreManagerClient( + new OpClient( + new Worker( + new URL(/* webpackChunkName: "nbstore" */ './nbstore.ts', import.meta.url) + ) + ) +); +window.addEventListener('beforeunload', () => { + storeManagerClient.dispose(); +}); + const future = { v7_startTransition: true, } as const; @@ -33,15 +44,12 @@ configureLocalStorageStateStorageImpls(framework); configureBrowserWorkspaceFlavours(framework); configureMobileModules(framework); framework.impl(NbstoreProvider, { - openStore(_key, options) { - const worker = new Worker( - new URL(/* webpackChunkName: "nbstore" */ './nbstore.ts', import.meta.url) - ); - const client = new WorkerClient(new OpClient(worker), options); + openStore(key, options) { + const { store, dispose } = storeManagerClient.open(key, options); return { - store: client, + store, dispose: () => { - worker.terminate(); + dispose(); }, }; }, diff --git a/packages/frontend/apps/android/src/nbstore.ts b/packages/frontend/apps/android/src/nbstore.ts index 7b3fefd3fe..30b53ac08e 100644 --- a/packages/frontend/apps/android/src/nbstore.ts +++ b/packages/frontend/apps/android/src/nbstore.ts @@ -4,18 +4,18 @@ import { broadcastChannelStorages } from '@affine/nbstore/broadcast-channel'; import { cloudStorages } from '@affine/nbstore/cloud'; import { idbStorages } from '@affine/nbstore/idb'; import { - WorkerConsumer, - type WorkerOps, + StoreManagerConsumer, + type WorkerManagerOps, } from '@affine/nbstore/worker/consumer'; import { type MessageCommunicapable, OpConsumer } from '@toeverything/infra/op'; -const consumer = new WorkerConsumer([ +const consumer = new StoreManagerConsumer([ ...idbStorages, ...broadcastChannelStorages, ...cloudStorages, ]); -const opConsumer = new OpConsumer( +const opConsumer = new OpConsumer( globalThis as MessageCommunicapable ); diff --git a/packages/frontend/apps/electron-renderer/src/app.tsx b/packages/frontend/apps/electron-renderer/src/app.tsx index 336c414a2f..345a12cbf1 100644 --- a/packages/frontend/apps/electron-renderer/src/app.tsx +++ b/packages/frontend/apps/electron-renderer/src/app.tsx @@ -36,7 +36,7 @@ import { WorkspacesService } from '@affine/core/modules/workspace'; import { configureBrowserWorkspaceFlavours } from '@affine/core/modules/workspace-engine'; import createEmotionCache from '@affine/core/utils/create-emotion-cache'; import { apis, events } from '@affine/electron-api'; -import { WorkerClient } from '@affine/nbstore/worker/client'; +import { StoreManagerClient } from '@affine/nbstore/worker/client'; import { CacheProvider } from '@emotion/react'; import { Framework, FrameworkRoot, getCurrentStore } from '@toeverything/infra'; import { OpClient } from '@toeverything/infra/op'; @@ -45,6 +45,11 @@ import { RouterProvider } from 'react-router-dom'; import { DesktopThemeSync } from './theme-sync'; +const storeManagerClient = createStoreManagerClient(); +window.addEventListener('beforeunload', () => { + storeManagerClient.dispose(); +}); + const desktopWhiteList = [ '/open-app/signin-redirect', '/open-app/url', @@ -81,50 +86,12 @@ configureSpellCheckSettingModule(framework); configureDesktopBackupModule(framework); framework.impl(NbstoreProvider, { openStore(key, options) { - const { port1: portForOpClient, port2: portForWorker } = - new MessageChannel(); - let portFromWorker: MessagePort | null = null; - let portId = crypto.randomUUID(); + const { store, dispose } = storeManagerClient.open(key, options); - const handleMessage = (ev: MessageEvent) => { - if ( - ev.data.type === 'electron:worker-connect' && - ev.data.portId === portId - ) { - portFromWorker = ev.ports[0]; - // connect portForWorker and portFromWorker - portFromWorker.addEventListener('message', ev => { - portForWorker.postMessage(ev.data); - }); - portForWorker.addEventListener('message', ev => { - // oxlint-disable-next-line no-non-null-assertion - portFromWorker!.postMessage(ev.data); - }); - portForWorker.start(); - portFromWorker.start(); - } - }; - - window.addEventListener('message', handleMessage); - - // oxlint-disable-next-line no-non-null-assertion - apis!.worker.connectWorker(key, portId).catch(err => { - console.error('failed to connect worker', err); - }); - - const store = new WorkerClient(new OpClient(portForOpClient), options); - portForOpClient.start(); return { store, dispose: () => { - window.removeEventListener('message', handleMessage); - portForOpClient.close(); - portForWorker.close(); - portFromWorker?.close(); - // oxlint-disable-next-line no-non-null-assertion - apis!.worker.disconnectWorker(key, portId).catch(err => { - console.error('failed to disconnect worker', err); - }); + dispose(); }, }; }, @@ -238,3 +205,39 @@ export function App() { ); } + +function createStoreManagerClient() { + const { port1: portForOpClient, port2: portForWorker } = new MessageChannel(); + let portFromWorker: MessagePort | null = null; + let portId = crypto.randomUUID(); + + const handleMessage = (ev: MessageEvent) => { + if ( + ev.data.type === 'electron:worker-connect' && + ev.data.portId === portId + ) { + portFromWorker = ev.ports[0]; + // connect portForWorker and portFromWorker + portFromWorker.addEventListener('message', ev => { + portForWorker.postMessage(ev.data, [...ev.ports]); + }); + portForWorker.addEventListener('message', ev => { + // oxlint-disable-next-line no-non-null-assertion + portFromWorker!.postMessage(ev.data, [...ev.ports]); + }); + portForWorker.start(); + portFromWorker.start(); + } + }; + + window.addEventListener('message', handleMessage); + + // oxlint-disable-next-line no-non-null-assertion + apis!.worker.connectWorker('affine-shared-worker', portId).catch(err => { + console.error('failed to connect worker', err); + }); + + const storeManager = new StoreManagerClient(new OpClient(portForOpClient)); + portForOpClient.start(); + return storeManager; +} diff --git a/packages/frontend/apps/electron-renderer/src/background-worker/index.ts b/packages/frontend/apps/electron-renderer/src/background-worker/index.ts index f9b2aaff07..6df4d0e61c 100644 --- a/packages/frontend/apps/electron-renderer/src/background-worker/index.ts +++ b/packages/frontend/apps/electron-renderer/src/background-worker/index.ts @@ -9,8 +9,8 @@ import { sqliteV1Storages, } from '@affine/nbstore/sqlite/v1'; import { - WorkerConsumer, - type WorkerOps, + StoreManagerConsumer, + type WorkerManagerOps, } from '@affine/nbstore/worker/consumer'; import { OpConsumer } from '@toeverything/infra/op'; @@ -19,7 +19,7 @@ bindNativeDBApis(apis!.nbstore); // oxlint-disable-next-line no-non-null-assertion bindNativeDBV1Apis(apis!.db); -const worker = new WorkerConsumer([ +const storeManager = new StoreManagerConsumer([ ...sqliteStorages, ...sqliteV1Storages, ...broadcastChannelStorages, @@ -30,7 +30,7 @@ window.addEventListener('message', ev => { if (ev.data.type === 'electron:worker-connect') { const port = ev.ports[0]; - const consumer = new OpConsumer(port); - worker.bindConsumer(consumer); + const consumer = new OpConsumer(port); + storeManager.bindConsumer(consumer); } }); diff --git a/packages/frontend/apps/electron-renderer/src/nbstore.ts b/packages/frontend/apps/electron-renderer/src/nbstore.ts deleted file mode 100644 index d4c33c5513..0000000000 --- a/packages/frontend/apps/electron-renderer/src/nbstore.ts +++ /dev/null @@ -1,96 +0,0 @@ -import '@affine/core/bootstrap/electron'; - -import type { ClientHandler } from '@affine/electron-api'; -import { broadcastChannelStorages } from '@affine/nbstore/broadcast-channel'; -import { cloudStorages } from '@affine/nbstore/cloud'; -import { bindNativeDBApis, sqliteStorages } from '@affine/nbstore/sqlite'; -import { - bindNativeDBV1Apis, - sqliteV1Storages, -} from '@affine/nbstore/sqlite/v1'; -import { - WorkerConsumer, - type WorkerOps, -} from '@affine/nbstore/worker/consumer'; -import { OpConsumer } from '@toeverything/infra/op'; -import { AsyncCall } from 'async-call-rpc'; - -const worker = new WorkerConsumer([ - ...sqliteStorages, - ...sqliteV1Storages, - ...broadcastChannelStorages, - ...cloudStorages, -]); - -let activeConnectionCount = 0; -let electronAPIsInitialized = false; - -function connectElectronAPIs(port: MessagePort) { - if (electronAPIsInitialized) { - return; - } - electronAPIsInitialized = true; - port.postMessage({ type: '__electron-apis-init__' }); - - const { promise, resolve } = Promise.withResolvers(); - port.addEventListener('message', event => { - if (event.data.type === '__electron-apis__') { - const [port] = event.ports; - resolve(port); - } - }); - - const rpc = AsyncCall>(null, { - channel: promise.then(p => ({ - on(listener) { - p.onmessage = e => { - listener(e.data); - }; - p.start(); - return () => { - p.onmessage = null; - try { - p.close(); - } catch (err) { - console.error('close port error', err); - } - }; - }, - send(data) { - p.postMessage(data); - }, - })), - log: false, - }); - - const electronAPIs = new Proxy(rpc as any, { - get(_, namespace: string) { - return new Proxy(rpc as any, { - get(_, method: string) { - return rpc[`${namespace}:${method}`]; - }, - }); - }, - }); - - bindNativeDBApis(electronAPIs.nbstore); - bindNativeDBV1Apis(electronAPIs.db); -} - -(globalThis as any).onconnect = (event: MessageEvent) => { - activeConnectionCount++; - const port = event.ports[0]; - port.addEventListener('message', (event: MessageEvent) => { - if (event.data.type === '__close__') { - activeConnectionCount--; - if (activeConnectionCount === 0) { - globalThis.close(); - } - } - }); - - connectElectronAPIs(port); - - const consumer = new OpConsumer(port); - worker.bindConsumer(consumer); -}; diff --git a/packages/frontend/apps/electron/src/main/windows-manager/main-window.ts b/packages/frontend/apps/electron/src/main/windows-manager/main-window.ts index 7e775e118f..7c57406b78 100644 --- a/packages/frontend/apps/electron/src/main/windows-manager/main-window.ts +++ b/packages/frontend/apps/electron/src/main/windows-manager/main-window.ts @@ -269,12 +269,6 @@ export async function openUrlInHiddenWindow(urlObj: URL) { win.webContents.openDevTools(); } - win.on('close', e => { - e.preventDefault(); - if (win && !win.isDestroyed()) { - win.destroy(); - } - }); logger.info('loading page at', url); win.loadURL(url).catch(e => { logger.error('failed to load url', e); diff --git a/packages/frontend/apps/electron/src/main/windows-manager/tab-views.ts b/packages/frontend/apps/electron/src/main/windows-manager/tab-views.ts index 8b65633636..0b113b4b50 100644 --- a/packages/frontend/apps/electron/src/main/windows-manager/tab-views.ts +++ b/packages/frontend/apps/electron/src/main/windows-manager/tab-views.ts @@ -395,7 +395,9 @@ export class WebContentViewsManager { if (this.mainWindow && view) { this.mainWindow.contentView.removeChildView(view); - view?.webContents.close(); + view?.webContents.close({ + waitForBeforeUnload: true, + }); } }, 500); // delay a bit to get rid of the flicker }; diff --git a/packages/frontend/apps/electron/src/main/worker/pool.ts b/packages/frontend/apps/electron/src/main/worker/pool.ts index 2ee3c965cc..63b86323e4 100644 --- a/packages/frontend/apps/electron/src/main/worker/pool.ts +++ b/packages/frontend/apps/electron/src/main/worker/pool.ts @@ -43,13 +43,9 @@ export class WorkerManager { show: false, }); let disconnectHelperProcess: (() => void) | null = null; - worker.on('close', e => { - e.preventDefault(); - if (worker && !worker.isDestroyed()) { - worker.destroy(); - this.workers.delete(key); - disconnectHelperProcess?.(); - } + worker.on('closed', () => { + this.workers.delete(key); + disconnectHelperProcess?.(); }); worker.loadURL(backgroundWorkerViewUrl).catch(e => { logger.error('failed to load url', e); @@ -74,6 +70,7 @@ export class WorkerManager { this.disconnectWorker(key, portId); }); const worker = await this.getOrCreateWorker(key); + worker.ports.add(portId); const { port1: portForWorker, port2: portForRenderer } = new MessageChannelMain(); diff --git a/packages/frontend/apps/ios/src/app.tsx b/packages/frontend/apps/ios/src/app.tsx index c304210e2a..68a0761f1c 100644 --- a/packages/frontend/apps/ios/src/app.tsx +++ b/packages/frontend/apps/ios/src/app.tsx @@ -27,7 +27,7 @@ import { configureBrowserWorkbenchModule } from '@affine/core/modules/workbench' import { WorkspacesService } from '@affine/core/modules/workspace'; import { configureBrowserWorkspaceFlavours } from '@affine/core/modules/workspace-engine'; import { I18n } from '@affine/i18n'; -import { WorkerClient } from '@affine/nbstore/worker/client'; +import { StoreManagerClient } from '@affine/nbstore/worker/client'; import { defaultBlockMarkdownAdapterMatchers, docLinkBaseURLMiddleware, @@ -56,6 +56,11 @@ import { Intelligents } from './plugins/intelligents'; import { NbStoreNativeDBApis } from './plugins/nbstore'; import { enableNavigationGesture$ } from './web-navigation-control'; +const storeManagerClient = createStoreManagerClient(); +window.addEventListener('beforeunload', () => { + storeManagerClient.dispose(); +}); + const future = { v7_startTransition: true, } as const; @@ -67,46 +72,12 @@ configureLocalStorageStateStorageImpls(framework); configureBrowserWorkspaceFlavours(framework); configureMobileModules(framework); framework.impl(NbstoreProvider, { - openStore(_key, options) { - const worker = new Worker( - new URL( - /* webpackChunkName: "nbstore-worker" */ './worker.ts', - import.meta.url - ) - ); - const { port1: nativeDBApiChannelServer, port2: nativeDBApiChannelClient } = - new MessageChannel(); - AsyncCall(NbStoreNativeDBApis, { - channel: { - on(listener) { - const f = (e: MessageEvent) => { - listener(e.data); - }; - nativeDBApiChannelServer.addEventListener('message', f); - return () => { - nativeDBApiChannelServer.removeEventListener('message', f); - }; - }, - send(data) { - nativeDBApiChannelServer.postMessage(data); - }, - }, - log: false, - }); - nativeDBApiChannelServer.start(); - worker.postMessage( - { - type: 'native-db-api-channel', - port: nativeDBApiChannelClient, - }, - [nativeDBApiChannelClient] - ); - const client = new WorkerClient(new OpClient(worker), options); + openStore(key, options) { + const { store, dispose } = storeManagerClient.open(key, options); return { - store: client, + store, dispose: () => { - worker.terminate(); - nativeDBApiChannelServer.close(); + dispose(); }, }; }, @@ -336,3 +307,40 @@ export function App() { ); } + +function createStoreManagerClient() { + const worker = new Worker( + new URL( + /* webpackChunkName: "nbstore-worker" */ './worker.ts', + import.meta.url + ) + ); + const { port1: nativeDBApiChannelServer, port2: nativeDBApiChannelClient } = + new MessageChannel(); + AsyncCall(NbStoreNativeDBApis, { + channel: { + on(listener) { + const f = (e: MessageEvent) => { + listener(e.data); + }; + nativeDBApiChannelServer.addEventListener('message', f); + return () => { + nativeDBApiChannelServer.removeEventListener('message', f); + }; + }, + send(data) { + nativeDBApiChannelServer.postMessage(data); + }, + }, + log: false, + }); + nativeDBApiChannelServer.start(); + worker.postMessage( + { + type: 'native-db-api-channel', + port: nativeDBApiChannelClient, + }, + [nativeDBApiChannelClient] + ); + return new StoreManagerClient(new OpClient(worker)); +} diff --git a/packages/frontend/apps/ios/src/worker.ts b/packages/frontend/apps/ios/src/worker.ts index 65f7ffe38a..e55b69dd8a 100644 --- a/packages/frontend/apps/ios/src/worker.ts +++ b/packages/frontend/apps/ios/src/worker.ts @@ -8,8 +8,8 @@ import { sqliteStorages, } from '@affine/nbstore/sqlite'; import { - WorkerConsumer, - type WorkerOps, + StoreManagerConsumer, + type WorkerManagerOps, } from '@affine/nbstore/worker/consumer'; import { type MessageCommunicapable, OpConsumer } from '@toeverything/infra/op'; import { AsyncCall } from 'async-call-rpc'; @@ -41,12 +41,14 @@ globalThis.addEventListener('message', e => { } }); -const consumer = new OpConsumer(globalThis as MessageCommunicapable); +const consumer = new OpConsumer( + globalThis as MessageCommunicapable +); -const worker = new WorkerConsumer([ +const storeManager = new StoreManagerConsumer([ ...sqliteStorages, ...broadcastChannelStorages, ...cloudStorages, ]); -worker.bindConsumer(consumer); +storeManager.bindConsumer(consumer); diff --git a/packages/frontend/apps/mobile/src/app.tsx b/packages/frontend/apps/mobile/src/app.tsx index e0d4997e35..9625cc503e 100644 --- a/packages/frontend/apps/mobile/src/app.tsx +++ b/packages/frontend/apps/mobile/src/app.tsx @@ -13,12 +13,30 @@ import { import { PopupWindowProvider } from '@affine/core/modules/url'; import { configureBrowserWorkbenchModule } from '@affine/core/modules/workbench'; import { configureBrowserWorkspaceFlavours } from '@affine/core/modules/workspace-engine'; -import { WorkerClient } from '@affine/nbstore/worker/client'; +import { StoreManagerClient } from '@affine/nbstore/worker/client'; import { Framework, FrameworkRoot, getCurrentStore } from '@toeverything/infra'; import { OpClient } from '@toeverything/infra/op'; import { Suspense } from 'react'; import { RouterProvider } from 'react-router-dom'; +let storeManagerClient: StoreManagerClient; + +if (window.SharedWorker) { + const worker = new SharedWorker( + new URL(/* webpackChunkName: "nbstore" */ './nbstore.ts', import.meta.url), + { name: 'affine-shared-worker' } + ); + storeManagerClient = new StoreManagerClient(new OpClient(worker.port)); +} else { + const worker = new Worker( + new URL(/* webpackChunkName: "nbstore" */ './nbstore.ts', import.meta.url) + ); + storeManagerClient = new StoreManagerClient(new OpClient(worker)); +} +window.addEventListener('beforeunload', () => { + storeManagerClient.dispose(); +}); + const future = { v7_startTransition: true, } as const; @@ -31,38 +49,13 @@ configureBrowserWorkspaceFlavours(framework); configureMobileModules(framework); framework.impl(NbstoreProvider, { openStore(key, options) { - if (window.SharedWorker) { - const worker = new SharedWorker( - new URL( - /* webpackChunkName: "nbstore" */ './nbstore.ts', - import.meta.url - ), - { name: key } - ); - const client = new WorkerClient(new OpClient(worker.port), options); - worker.port.start(); - return { - store: client, - dispose: () => { - worker.port.postMessage({ type: '__close__' }); - worker.port.close(); - }, - }; - } else { - const worker = new Worker( - new URL( - /* webpackChunkName: "nbstore" */ './nbstore.ts', - import.meta.url - ) - ); - const client = new WorkerClient(new OpClient(worker), options); - return { - store: client, - dispose: () => { - worker.terminate(); - }, - }; - } + const { store, dispose } = storeManagerClient.open(key, options); + return { + store: store, + dispose: () => { + dispose(); + }, + }; }, }); framework.impl(PopupWindowProvider, { diff --git a/packages/frontend/apps/mobile/src/nbstore.ts b/packages/frontend/apps/mobile/src/nbstore.ts index df075f9400..caea0d8d4f 100644 --- a/packages/frontend/apps/mobile/src/nbstore.ts +++ b/packages/frontend/apps/mobile/src/nbstore.ts @@ -5,12 +5,12 @@ import { cloudStorages } from '@affine/nbstore/cloud'; import { idbStorages } from '@affine/nbstore/idb'; import { idbV1Storages } from '@affine/nbstore/idb/v1'; import { - WorkerConsumer, - type WorkerOps, + StoreManagerConsumer, + type WorkerManagerOps, } from '@affine/nbstore/worker/consumer'; import { type MessageCommunicapable, OpConsumer } from '@toeverything/infra/op'; -const consumer = new WorkerConsumer([ +const consumer = new StoreManagerConsumer([ ...idbStorages, ...idbV1Storages, ...broadcastChannelStorages, @@ -19,28 +19,14 @@ const consumer = new WorkerConsumer([ if ('onconnect' in globalThis) { // if in shared worker - let activeConnectionCount = 0; (globalThis as any).onconnect = (event: MessageEvent) => { - activeConnectionCount++; const port = event.ports[0]; - port.addEventListener('message', (event: MessageEvent) => { - if (event.data.type === '__close__') { - activeConnectionCount--; - if (activeConnectionCount === 0) { - globalThis.close(); - } - } - }); - - const opConsumer = new OpConsumer(port); - consumer.bindConsumer(opConsumer); + consumer.bindConsumer(new OpConsumer(port)); }; } else { // if in worker - const opConsumer = new OpConsumer( - globalThis as MessageCommunicapable + consumer.bindConsumer( + new OpConsumer(globalThis as MessageCommunicapable) ); - - consumer.bindConsumer(opConsumer); } diff --git a/packages/frontend/apps/web/src/app.tsx b/packages/frontend/apps/web/src/app.tsx index 3391230928..8e82d8d147 100644 --- a/packages/frontend/apps/web/src/app.tsx +++ b/packages/frontend/apps/web/src/app.tsx @@ -12,7 +12,7 @@ import { PopupWindowProvider } from '@affine/core/modules/url'; import { configureBrowserWorkbenchModule } from '@affine/core/modules/workbench'; import { configureBrowserWorkspaceFlavours } from '@affine/core/modules/workspace-engine'; import createEmotionCache from '@affine/core/utils/create-emotion-cache'; -import { WorkerClient } from '@affine/nbstore/worker/client'; +import { StoreManagerClient } from '@affine/nbstore/worker/client'; import { CacheProvider } from '@emotion/react'; import { Framework, FrameworkRoot, getCurrentStore } from '@toeverything/infra'; import { OpClient } from '@toeverything/infra/op'; @@ -21,6 +21,24 @@ import { RouterProvider } from 'react-router-dom'; const cache = createEmotionCache(); +let storeManagerClient: StoreManagerClient; + +if (window.SharedWorker) { + const worker = new SharedWorker( + new URL(/* webpackChunkName: "nbstore" */ './nbstore.ts', import.meta.url), + { name: 'affine-shared-worker' } + ); + storeManagerClient = new StoreManagerClient(new OpClient(worker.port)); +} else { + const worker = new Worker( + new URL(/* webpackChunkName: "nbstore" */ './nbstore.ts', import.meta.url) + ); + storeManagerClient = new StoreManagerClient(new OpClient(worker)); +} +window.addEventListener('beforeunload', () => { + storeManagerClient.dispose(); +}); + const future = { v7_startTransition: true, } as const; @@ -32,37 +50,7 @@ configureLocalStorageStateStorageImpls(framework); configureBrowserWorkspaceFlavours(framework); framework.impl(NbstoreProvider, { openStore(key, options) { - if (window.SharedWorker) { - const worker = new SharedWorker( - new URL( - /* webpackChunkName: "nbstore" */ './nbstore.ts', - import.meta.url - ), - { name: key } - ); - const client = new WorkerClient(new OpClient(worker.port), options); - return { - store: client, - dispose: () => { - worker.port.postMessage({ type: '__close__' }); - worker.port.close(); - }, - }; - } else { - const worker = new Worker( - new URL( - /* webpackChunkName: "nbstore" */ './nbstore.ts', - import.meta.url - ) - ); - const client = new WorkerClient(new OpClient(worker), options); - return { - store: client, - dispose: () => { - worker.terminate(); - }, - }; - } + return storeManagerClient.open(key, options); }, }); framework.impl(PopupWindowProvider, { diff --git a/packages/frontend/apps/web/src/nbstore.ts b/packages/frontend/apps/web/src/nbstore.ts index df075f9400..caea0d8d4f 100644 --- a/packages/frontend/apps/web/src/nbstore.ts +++ b/packages/frontend/apps/web/src/nbstore.ts @@ -5,12 +5,12 @@ import { cloudStorages } from '@affine/nbstore/cloud'; import { idbStorages } from '@affine/nbstore/idb'; import { idbV1Storages } from '@affine/nbstore/idb/v1'; import { - WorkerConsumer, - type WorkerOps, + StoreManagerConsumer, + type WorkerManagerOps, } from '@affine/nbstore/worker/consumer'; import { type MessageCommunicapable, OpConsumer } from '@toeverything/infra/op'; -const consumer = new WorkerConsumer([ +const consumer = new StoreManagerConsumer([ ...idbStorages, ...idbV1Storages, ...broadcastChannelStorages, @@ -19,28 +19,14 @@ const consumer = new WorkerConsumer([ if ('onconnect' in globalThis) { // if in shared worker - let activeConnectionCount = 0; (globalThis as any).onconnect = (event: MessageEvent) => { - activeConnectionCount++; const port = event.ports[0]; - port.addEventListener('message', (event: MessageEvent) => { - if (event.data.type === '__close__') { - activeConnectionCount--; - if (activeConnectionCount === 0) { - globalThis.close(); - } - } - }); - - const opConsumer = new OpConsumer(port); - consumer.bindConsumer(opConsumer); + consumer.bindConsumer(new OpConsumer(port)); }; } else { // if in worker - const opConsumer = new OpConsumer( - globalThis as MessageCommunicapable + consumer.bindConsumer( + new OpConsumer(globalThis as MessageCommunicapable) ); - - consumer.bindConsumer(opConsumer); } diff --git a/packages/frontend/core/src/bootstrap/browser.ts b/packages/frontend/core/src/bootstrap/browser.ts index 5e824eaa70..35073fbc88 100644 --- a/packages/frontend/core/src/bootstrap/browser.ts +++ b/packages/frontend/core/src/bootstrap/browser.ts @@ -1,5 +1,4 @@ // ORDER MATTERS import './env'; import './public-path'; -import './shared-worker'; import './polyfill/browser'; diff --git a/packages/frontend/core/src/bootstrap/electron.ts b/packages/frontend/core/src/bootstrap/electron.ts index 5a8e151d8d..f0391ff462 100644 --- a/packages/frontend/core/src/bootstrap/electron.ts +++ b/packages/frontend/core/src/bootstrap/electron.ts @@ -1,5 +1,4 @@ // ORDER MATTERS import './env'; import './public-path'; -import './shared-worker'; import './polyfill/electron'; diff --git a/packages/frontend/core/src/bootstrap/shared-worker.ts b/packages/frontend/core/src/bootstrap/shared-worker.ts deleted file mode 100644 index 8ae7500a18..0000000000 --- a/packages/frontend/core/src/bootstrap/shared-worker.ts +++ /dev/null @@ -1,25 +0,0 @@ -/** - * This is a wrapper for SharedWorker, - * added the `name` parameter to the `SharedWorker` URL so that - * multiple `SharedWorkers` can share one script file. - */ -const rawSharedWorker = globalThis.SharedWorker; - -// TODO(@eyhn): remove this when we can use single shared worker for all workspaces -function PatchedSharedWorker( - urlParam: URL | string, - options?: string | { name: string } -) { - const url = typeof urlParam === 'string' ? new URL(urlParam) : urlParam; - if (options) { - url.searchParams.append( - typeof options === 'string' ? options : options.name, - '' - ); - } - return new rawSharedWorker(url, options); -} -// if SharedWorker is not supported, do nothing -if (rawSharedWorker) { - globalThis.SharedWorker = PatchedSharedWorker as any; -} diff --git a/packages/frontend/core/src/modules/cloud/constant.ts b/packages/frontend/core/src/modules/cloud/constant.ts index 239b1c3752..89efeb0850 100644 --- a/packages/frontend/core/src/modules/cloud/constant.ts +++ b/packages/frontend/core/src/modules/cloud/constant.ts @@ -33,7 +33,9 @@ export const BUILD_IN_SERVERS: (ServerMetadata & { config: ServerConfig })[] = ? [ { id: 'affine-cloud', - baseUrl: location.origin, + baseUrl: BUILD_CONFIG.isElectron + ? 'http://localhost:8080' + : location.origin, config: { serverName: 'Affine Cloud', features: [ diff --git a/packages/frontend/core/src/modules/storage/providers/nbstore.ts b/packages/frontend/core/src/modules/storage/providers/nbstore.ts index 9d2a3cac95..756ecd779f 100644 --- a/packages/frontend/core/src/modules/storage/providers/nbstore.ts +++ b/packages/frontend/core/src/modules/storage/providers/nbstore.ts @@ -1,5 +1,5 @@ import type { - WorkerClient, + StoreClient, WorkerInitOptions, } from '@affine/nbstore/worker/client'; import { createIdentifier } from '@toeverything/infra'; @@ -17,7 +17,7 @@ export interface NbstoreProvider { key: string, options: WorkerInitOptions ): { - store: WorkerClient; + store: StoreClient; dispose: () => void; }; } diff --git a/packages/frontend/core/src/modules/userspace/entities/current-user-db.ts b/packages/frontend/core/src/modules/userspace/entities/current-user-db.ts index b60ee4c880..aaa8da4bc2 100644 --- a/packages/frontend/core/src/modules/userspace/entities/current-user-db.ts +++ b/packages/frontend/core/src/modules/userspace/entities/current-user-db.ts @@ -1,8 +1,9 @@ import { Entity, LiveData } from '@toeverything/infra'; -import { finalize, of, switchMap } from 'rxjs'; +import { Observable, of, switchMap } from 'rxjs'; import type { AuthService } from '../../cloud'; import type { UserspaceService } from '../services/userspace'; +import type { UserDBWithTables } from './user-db'; export class CurrentUserDB extends Entity { constructor( @@ -19,11 +20,12 @@ export class CurrentUserDB extends Entity { switchMap(userId => { if (userId) { const ref = this.userDBService.openDB(userId); - return of(ref.obj).pipe( - finalize(() => { + return new Observable(subscriber => { + subscriber.next(ref.obj); + return () => { ref.release(); - }) - ); + }; + }); } else { return of(null); } diff --git a/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts b/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts index 375720bc23..04c590ee61 100644 --- a/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts +++ b/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts @@ -1,6 +1,6 @@ import { IndexedDBDocStorage } from '@affine/nbstore/idb'; import { SqliteDocStorage } from '@affine/nbstore/sqlite'; -import type { WorkerClient } from '@affine/nbstore/worker/client'; +import type { StoreClient } from '@affine/nbstore/worker/client'; import { Entity } from '@toeverything/infra'; import type { ServerService } from '../../cloud'; @@ -10,7 +10,7 @@ export class UserDBEngine extends Entity<{ userId: string; }> { private readonly userId = this.props.userId; - readonly client: WorkerClient; + readonly client: StoreClient; DocStorageType = BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS diff --git a/packages/frontend/core/src/modules/userspace/entities/user-db.ts b/packages/frontend/core/src/modules/userspace/entities/user-db.ts index a436b56a76..067590151b 100644 --- a/packages/frontend/core/src/modules/userspace/entities/user-db.ts +++ b/packages/frontend/core/src/modules/userspace/entities/user-db.ts @@ -39,6 +39,10 @@ export class UserDB extends Entity<{ }); }); } + + override dispose(): void { + this.engine.dispose(); + } } export type UserDBWithTables = UserDB & { diff --git a/packages/frontend/core/src/modules/workspace/entities/engine.ts b/packages/frontend/core/src/modules/workspace/entities/engine.ts index 324cbf62ea..f2d8ec7dd6 100644 --- a/packages/frontend/core/src/modules/workspace/entities/engine.ts +++ b/packages/frontend/core/src/modules/workspace/entities/engine.ts @@ -1,5 +1,5 @@ import type { - WorkerClient, + StoreClient, WorkerInitOptions, } from '@affine/nbstore/worker/client'; import { Entity } from '@toeverything/infra'; @@ -12,7 +12,7 @@ export class WorkspaceEngine extends Entity<{ isSharedMode?: boolean; engineWorkerInitOptions: WorkerInitOptions; }> { - worker?: WorkerClient; + client?: StoreClient; started = false; constructor( @@ -23,24 +23,24 @@ export class WorkspaceEngine extends Entity<{ } get doc() { - if (!this.worker) { + if (!this.client) { throw new Error('Engine is not initialized'); } - return this.worker.docFrontend; + return this.client.docFrontend; } get blob() { - if (!this.worker) { + if (!this.client) { throw new Error('Engine is not initialized'); } - return this.worker.blobFrontend; + return this.client.blobFrontend; } get awareness() { - if (!this.worker) { + if (!this.client) { throw new Error('Engine is not initialized'); } - return this.worker.awarenessFrontend; + return this.client.awarenessFrontend; } start() { @@ -54,7 +54,7 @@ export class WorkspaceEngine extends Entity<{ `workspace:${this.workspaceService.workspace.flavour}:${this.workspaceService.workspace.id}`, this.props.engineWorkerInitOptions ); - this.worker = store; + this.client = store; this.disposables.push(dispose); this.eventBus.emit(WorkspaceEngineBeforeStart, this);