feat(nbstore): share worker between workspaces (#9947)

This commit is contained in:
EYHN
2025-02-05 07:57:03 +00:00
parent 972d76d685
commit ee0cfe4dc7
30 changed files with 430 additions and 434 deletions

View File

@@ -1,4 +1,3 @@
import { share } from '../../connection';
import {
type AwarenessRecord,
AwarenessStorageBase,
@@ -23,10 +22,10 @@ export class CloudAwarenessStorage extends AwarenessStorageBase {
super();
}
connection = share(new SocketConnection(`${this.options.serverBaseUrl}/`));
connection = new SocketConnection(`${this.options.serverBaseUrl}/`);
private get socket() {
return this.connection.inner;
return this.connection.inner.socket;
}
override async update(record: AwarenessRecord): Promise<void> {

View File

@@ -25,7 +25,7 @@ export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
static readonly identifier = 'CloudDocStorage';
get socket() {
return this.connection.inner;
return this.connection.inner.socket;
}
get idConverter() {
if (!this.connection.idConverter) {
@@ -191,7 +191,7 @@ class CloudDocStorageConnection extends SocketConnection {
idConverter: IdConverter | null = null;
override async doConnect(signal?: AbortSignal) {
const socket = await super.doConnect(signal);
const { socket, disconnect } = await super.doConnect(signal);
try {
const res = await socket.emitWithAck('space:join', {
@@ -210,20 +210,26 @@ class CloudDocStorageConnection extends SocketConnection {
socket.on('space:broadcast-doc-update', this.onServerUpdate);
return socket;
return { socket, disconnect };
} catch (e) {
socket.close();
disconnect();
throw e;
}
}
override doDisconnect(socket: Socket) {
override doDisconnect({
socket,
disconnect,
}: {
socket: Socket;
disconnect: () => void;
}) {
socket.emit('space:leave', {
spaceType: this.options.type,
spaceId: this.options.id,
});
socket.off('space:broadcast-doc-update', this.onServerUpdate);
super.disconnect();
super.doDisconnect({ socket, disconnect });
}
async getIdConverter(socket: Socket) {

View File

@@ -1,7 +1,6 @@
import {
Manager as SocketIOManager,
type Socket as SocketIO,
type SocketOptions,
} from 'socket.io-client';
import { AutoReconnectConnection } from '../../connection';
@@ -151,49 +150,78 @@ export function base64ToUint8Array(base64: string) {
return new Uint8Array(binaryArray);
}
const SOCKET_IOMANAGER_CACHE = new Map<string, SocketIOManager>();
function getSocketIOManager(endpoint: string) {
let manager = SOCKET_IOMANAGER_CACHE.get(endpoint);
if (!manager) {
manager = new SocketIOManager(endpoint, {
class SocketManager {
private readonly socketIOManager: SocketIOManager;
socket: Socket;
refCount = 0;
constructor(endpoint: string) {
this.socketIOManager = new SocketIOManager(endpoint, {
autoConnect: false,
transports: ['websocket'],
secure: new URL(endpoint).protocol === 'https:',
// we will handle reconnection by ourselves
reconnection: false,
});
SOCKET_IOMANAGER_CACHE.set(endpoint, manager);
this.socket = this.socketIOManager.socket('/');
}
connect() {
let disconnected = false;
this.refCount++;
this.socket.connect();
return {
socket: this.socket,
disconnect: () => {
if (disconnected) {
return;
}
disconnected = true;
this.refCount--;
if (this.refCount === 0) {
this.socket.disconnect();
}
},
};
}
}
const SOCKET_MANAGER_CACHE = new Map<string, SocketManager>();
function getSocketManager(endpoint: string) {
let manager = SOCKET_MANAGER_CACHE.get(endpoint);
if (!manager) {
manager = new SocketManager(endpoint);
SOCKET_MANAGER_CACHE.set(endpoint, manager);
}
return manager;
}
export class SocketConnection extends AutoReconnectConnection<Socket> {
manager = getSocketIOManager(this.endpoint);
export class SocketConnection extends AutoReconnectConnection<{
socket: Socket;
disconnect: () => void;
}> {
manager = getSocketManager(this.endpoint);
constructor(
private readonly endpoint: string,
private readonly socketOptions?: SocketOptions
) {
constructor(private readonly endpoint: string) {
super();
}
override get shareId() {
return `socket:${this.endpoint}`;
}
override async doConnect(signal?: AbortSignal) {
const socket = this.manager.socket('/', this.socketOptions);
const { socket, disconnect } = this.manager.connect();
try {
throwIfAborted(signal);
await Promise.race([
new Promise<void>((resolve, reject) => {
if (socket.connected) {
resolve();
return;
}
socket.once('connect', () => {
resolve();
});
socket.once('connect_error', err => {
reject(err);
});
socket.open();
}),
new Promise<void>((_resolve, reject) => {
signal?.addEventListener('abort', () => {
@@ -202,18 +230,21 @@ export class SocketConnection extends AutoReconnectConnection<Socket> {
}),
]);
} catch (err) {
socket.close();
disconnect();
throw err;
}
socket.on('disconnect', this.handleDisconnect);
return socket;
return {
socket,
disconnect,
};
}
override doDisconnect(conn: Socket) {
conn.off('disconnect', this.handleDisconnect);
conn.close();
override doDisconnect(conn: { socket: Socket; disconnect: () => void }) {
conn.socket.off('disconnect', this.handleDisconnect);
conn.disconnect();
}
handleDisconnect = (reason: SocketIO.DisconnectReason) => {

View File

@@ -15,7 +15,7 @@ export class SqliteDocStorage extends DocStorageBase<SqliteNativeDBOptions> {
return this.connection.apis;
}
override async pushDocUpdate(update: DocUpdate) {
override async pushDocUpdate(update: DocUpdate, origin?: string) {
const timestamp = await this.db.pushUpdate(update.docId, update.bin);
this.emit(

View File

@@ -1,4 +1,4 @@
import type { OpClient } from '@toeverything/infra/op';
import { OpClient, transfer } from '@toeverything/infra/op';
import { DummyConnection } from '../connection';
import { AwarenessFrontend, BlobFrontend, DocFrontend } from '../frontend';
@@ -14,18 +14,68 @@ import {
import type { AwarenessSync } from '../sync/awareness';
import type { BlobSync } from '../sync/blob';
import type { DocSync } from '../sync/doc';
import type { WorkerInitOptions, WorkerOps } from './ops';
import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
export type { WorkerInitOptions } from './ops';
export type { StoreInitOptions as WorkerInitOptions } from './ops';
export class WorkerClient {
constructor(
private readonly client: OpClient<WorkerOps>,
options: WorkerInitOptions
) {
this.client.call('worker.init', options).catch(err => {
console.error('error initializing worker', err);
export class StoreManagerClient {
private readonly connections = new Map<
string,
{
store: StoreClient;
dispose: () => void;
}
>();
constructor(private readonly client: OpClient<WorkerManagerOps>) {}
open(key: string, options: StoreInitOptions) {
const { port1, port2 } = new MessageChannel();
const client = new OpClient<WorkerOps>(port1);
const closeKey = crypto.randomUUID();
this.client
.call(
'open',
transfer(
{
key,
closeKey,
options,
port: port2,
},
[port2]
)
)
.catch(err => {
console.error('error opening', err);
});
const connection = {
store: new StoreClient(client),
dispose: () => {
this.client.call('close', closeKey).catch(err => {
console.error('error closing', err);
});
this.connections.delete(closeKey);
},
};
this.connections.set(closeKey, connection);
return connection;
}
dispose() {
this.connections.forEach(connection => {
connection.dispose();
});
}
}
export class StoreClient {
constructor(private readonly client: OpClient<WorkerOps>) {
this.docStorage = new WorkerDocStorage(this.client);
this.blobStorage = new WorkerBlobStorage(this.client);
this.docSync = new WorkerDocSync(this.client);

View File

@@ -1,5 +1,5 @@
import { MANUALLY_STOP } from '@toeverything/infra';
import type { OpConsumer } from '@toeverything/infra/op';
import { OpConsumer } from '@toeverything/infra/op';
import { Observable } from 'rxjs';
import { type StorageConstructor } from '../impls';
@@ -7,14 +7,13 @@ import { SpaceStorage } from '../storage';
import type { AwarenessRecord } from '../storage/awareness';
import { Sync } from '../sync';
import type { PeerStorageOptions } from '../sync/types';
import type { WorkerInitOptions, WorkerOps } from './ops';
import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
export type { WorkerOps };
export type { WorkerManagerOps };
export class WorkerConsumer {
private inited = false;
private storages: PeerStorageOptions<SpaceStorage> | null = null;
private sync: Sync | null = null;
class StoreConsumer {
private readonly storages: PeerStorageOptions<SpaceStorage>;
private readonly sync: Sync;
get ensureLocal() {
if (!this.storages) {
@@ -59,18 +58,9 @@ export class WorkerConsumer {
}
constructor(
private readonly availableStorageImplementations: StorageConstructor[]
) {}
bindConsumer(consumer: OpConsumer<WorkerOps>) {
this.registerHandlers(consumer);
}
init(init: WorkerInitOptions) {
if (this.inited) {
return;
}
this.inited = true;
private readonly availableStorageImplementations: StorageConstructor[],
init: StoreInitOptions
) {
this.storages = {
local: new SpaceStorage(
Object.fromEntries(
@@ -122,6 +112,10 @@ export class WorkerConsumer {
this.sync.start();
}
bindConsumer(consumer: OpConsumer<WorkerOps>) {
this.registerHandlers(consumer);
}
async destroy() {
this.sync?.stop();
this.storages?.local.disconnect();
@@ -139,8 +133,6 @@ export class WorkerConsumer {
>();
let collectId = 0;
consumer.registerAll({
'worker.init': this.init.bind(this),
'worker.destroy': this.destroy.bind(this),
'docStorage.getDoc': (docId: string) => this.docStorage.getDoc(docId),
'docStorage.getDocDiff': ({ docId, state }) =>
this.docStorage.getDocDiff(docId, state),
@@ -298,3 +290,61 @@ export class WorkerConsumer {
});
}
}
export class StoreManagerConsumer {
private readonly storeDisposers = new Map<string, () => void>();
private readonly storePool = new Map<
string,
{ store: StoreConsumer; refCount: number }
>();
constructor(
private readonly availableStorageImplementations: StorageConstructor[]
) {}
bindConsumer(consumer: OpConsumer<WorkerManagerOps>) {
this.registerHandlers(consumer);
}
private registerHandlers(consumer: OpConsumer<WorkerManagerOps>) {
consumer.registerAll({
open: ({ port, key, closeKey, options }) => {
console.debug('open store', key, closeKey);
let storeRef = this.storePool.get(key);
if (!storeRef) {
const store = new StoreConsumer(
this.availableStorageImplementations,
options
);
storeRef = { store, refCount: 0 };
}
storeRef.refCount++;
const workerConsumer = new OpConsumer<WorkerOps>(port);
storeRef.store.bindConsumer(workerConsumer);
this.storeDisposers.set(closeKey, () => {
storeRef.refCount--;
if (storeRef.refCount === 0) {
storeRef.store.destroy().catch(error => {
console.error(error);
});
this.storePool.delete(key);
}
});
this.storePool.set(key, storeRef);
return closeKey;
},
close: key => {
console.debug('close store', key);
const workerDisposer = this.storeDisposers.get(key);
if (!workerDisposer) {
throw new Error('Worker not found');
}
workerDisposer();
this.storeDisposers.delete(key);
},
});
}
}

View File

@@ -20,17 +20,12 @@ type StorageInitOptions = Values<{
};
}>;
export interface WorkerInitOptions {
export interface StoreInitOptions {
local: { [key in StorageType]?: StorageInitOptions };
remotes: Record<string, { [key in StorageType]?: StorageInitOptions }>;
}
interface GroupedWorkerOps {
worker: {
init: [WorkerInitOptions, void];
destroy: [void, void];
};
docStorage: {
getDoc: [string, DocRecord | null];
getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null];
@@ -132,3 +127,16 @@ export type WorkerOps = UnionToIntersection<
}>
>
>;
export type WorkerManagerOps = {
open: [
{
port: MessagePort;
key: string;
closeKey: string;
options: StoreInitOptions;
},
string,
];
close: [string, void];
};