From d6618b6891a4356f9fd411efa1133f978590d320 Mon Sep 17 00:00:00 2001 From: liuyi Date: Sat, 9 Nov 2024 15:23:38 +0800 Subject: [PATCH] feat(infra): introduce op pattern (#8734) --- packages/common/infra/package.json | 1 + packages/common/infra/src/op/README.md | 159 +++++++++++++ .../infra/src/op/__tests__/client.spec.ts | 215 ++++++++++++++++++ .../infra/src/op/__tests__/consumer.spec.ts | 197 ++++++++++++++++ .../infra/src/op/__tests__/message.spec.ts | 76 +++++++ packages/common/infra/src/op/client.ts | 206 +++++++++++++++++ packages/common/infra/src/op/consumer.ts | 188 +++++++++++++++ packages/common/infra/src/op/index.ts | 4 + packages/common/infra/src/op/message.ts | 155 +++++++++++++ packages/common/infra/src/op/types.ts | 36 +++ yarn.lock | 3 +- 11 files changed, 1239 insertions(+), 1 deletion(-) create mode 100644 packages/common/infra/src/op/README.md create mode 100644 packages/common/infra/src/op/__tests__/client.spec.ts create mode 100644 packages/common/infra/src/op/__tests__/consumer.spec.ts create mode 100644 packages/common/infra/src/op/__tests__/message.spec.ts create mode 100644 packages/common/infra/src/op/client.ts create mode 100644 packages/common/infra/src/op/consumer.ts create mode 100644 packages/common/infra/src/op/index.ts create mode 100644 packages/common/infra/src/op/message.ts create mode 100644 packages/common/infra/src/op/types.ts diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index 9e1a68e0c2..a2a471ac0b 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -16,6 +16,7 @@ "@affine/templates": "workspace:*", "@blocksuite/affine": "0.17.28", "@datastructures-js/binary-search-tree": "^5.3.2", + "eventemitter2": "^6.4.9", "foxact": "^0.2.33", "fractional-indexing": "^3.2.0", "fuse.js": "^7.0.0", diff --git a/packages/common/infra/src/op/README.md b/packages/common/infra/src/op/README.md new file mode 100644 index 0000000000..ba7f92af76 --- /dev/null +++ b/packages/common/infra/src/op/README.md @@ -0,0 +1,159 @@ +# Introduction + +Operation Pattern is a tiny `RPC` framework available both in frontend and backend. + +It introduces super simple call and listen signatures to make Worker, cross tabs SharedWorker or BroadcastChannel easier to use and reduce boilerplate. + +# usage + +## Register Op Handlers + +### Function call handler + +```ts +interface Ops extends OpSchema { + add: [{ a: number; b: number }, number] +} + +// register +const consumer: OpConsumer; +consumer.register('add', ({ a, b }) => a + b); + +// call +const client: OpClient; +const ret = client.call('add', { a: 1, b: 2 })); // Promise<3> +``` + +### Stream call handler + +```ts +interface Ops extends OpSchema { + subscribeStatus: [number, string]; +} + +// register +const consumer: OpConsumer; +consumer.register('subscribeStatus', (id: number) => { + return interval(3000).pipe(map(() => 'connected')); +}); + +// subscribe +const client: OpClient; +client.subscribe('subscribeStatus', 123, { + next: status => { + ui.setServerStatus(status); + }, + error: error => { + ui.setServerError(error); + }, + complete: () => { + // + }, +}); +``` + +### Transfer variables + +> [Transferable Objects](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects) + +#### Client transferables + +```ts +interface Ops extends OpSchema { + heavyWork: [{ name: string; data: Uint8Array; data2: Uint8Array }, void]; +} + +const client: OpClient; +const data = new Uint8Array([1, 2, 3]); +const nonTransferredData = new Uint8Array([1, 2, 3]); +client.call( + 'heavyWork', + transfer( + { + name: '', + data: data, + data2: nonTransferredData, + }, + [data.buffer] + ) +); + +// after transferring, you can not use the transferred variables anymore!!! +// moved +assertEq(data.byteLength, 0); +// copied +assertEq(nonTransferredData.byteLength, 3); +``` + +#### Consumer transferables + +```ts +interface Ops extends OpSchema { + job: [{ id: string }, Uint8Array]; +} + +const consumer: OpConsumer; +consumer.register('ops', ({ id }) => { + return interval(3000).pipe( + map(() => { + const data = new Uint8Array([1, 2, 3]); + transfer(data, [data.buffer]); + }) + ); +}); +``` + +## Communication + +### BroadcastChannel + +:::CAUTION + +BroadcastChannel doesn't support transfer transferable objects. All data passed through it's `postMessage` api would be structured cloned + +see [Structured_clone_algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm) + +::: + +```ts +const channel = new BroadcastChannel('domain'); +const consumer = new OpConsumer(channel); +consumer.listen(); + +const client = new OpClient(channel); +client.listen(); +``` + +### MessagePort + +```ts +const { port1, port2 } = new MessagePort(); + +const client = new OpClient(port1); +const consumer = new OpConsumer(port2); +``` + +### Worker + +```ts +const worker = new Worker('./xxx-worker'); +const client = new OpClient(worker); + +// in worker +const consumer = new OpConsumer(globalThis); +consumer.listen(); +``` + +### SharedWorker + +```ts +const worker = new SharedWorker('./xxx-worker'); +const client = new OpClient(worker.port); + +// in worker +globalThis.addEventListener('connect', event => { + const port = event.ports[0]; + const consumer = new OpConsumer(port); + consumer.listen(); +}); +``` diff --git a/packages/common/infra/src/op/__tests__/client.spec.ts b/packages/common/infra/src/op/__tests__/client.spec.ts new file mode 100644 index 0000000000..2803d0ba5d --- /dev/null +++ b/packages/common/infra/src/op/__tests__/client.spec.ts @@ -0,0 +1,215 @@ +import { afterEach } from 'node:test'; + +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { OpClient } from '../client'; +import { type MessageHandlers, transfer } from '../message'; +import type { OpSchema } from '../types'; + +interface TestOps extends OpSchema { + add: [{ a: number; b: number }, number]; + bin: [Uint8Array, Uint8Array]; + sub: [Uint8Array, number]; +} + +declare module 'vitest' { + interface TestContext { + producer: OpClient; + handlers: MessageHandlers; + postMessage: ReturnType; + } +} + +describe('op client', () => { + beforeEach(ctx => { + const { port1 } = new MessageChannel(); + // @ts-expect-error patch postMessage + port1.postMessage = vi.fn(port1.postMessage); + // @ts-expect-error patch postMessage + ctx.postMessage = port1.postMessage; + ctx.producer = new OpClient(port1); + // @ts-expect-error internal api + ctx.handlers = ctx.producer.handlers; + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('should send call op', async ctx => { + // @ts-expect-error internal api + const pendingCalls = ctx.producer.pendingCalls; + const result = ctx.producer.call('add', { a: 1, b: 2 }); + + expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(` + { + "id": "add:1", + "name": "add", + "payload": { + "a": 1, + "b": 2, + }, + "type": "call", + } + `); + expect(pendingCalls.has('add:1')).toBe(true); + + // fake consumer return + ctx.handlers.return({ type: 'return', id: 'add:1', data: 3 }); + + await expect(result).resolves.toBe(3); + + expect(pendingCalls.has('add:1')).toBe(false); + }); + + it('should transfer transferables with call op', async ctx => { + const data = new Uint8Array([1, 2, 3]); + const result = ctx.producer.call('bin', transfer(data, [data.buffer])); + + expect(ctx.postMessage.mock.calls[0][1].transfer[0]).toBeInstanceOf( + ArrayBuffer + ); + + // fake consumer return + ctx.handlers.return({ + type: 'return', + id: 'bin:1', + data: new Uint8Array([3, 2, 1]), + }); + + await expect(result).resolves.toEqual(new Uint8Array([3, 2, 1])); + expect(data.byteLength).toBe(0); + }); + + it('should cancel call', async ctx => { + const promise = ctx.producer.call('add', { a: 1, b: 2 }); + + promise.cancel(); + + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "id": "add:1", + "type": "cancel", + }, + ] + `); + + await expect(promise).rejects.toThrow('canceled'); + }); + + it('should timeout call', async ctx => { + const promise = ctx.producer.call('add', { a: 1, b: 2 }); + + vi.advanceTimersByTime(4000); + + await expect(promise).rejects.toThrow('timeout'); + }); + + it('should send subscribe op', async ctx => { + let ob = { + next: vi.fn(), + error: vi.fn(), + complete: vi.fn(), + }; + + // @ts-expect-error internal api + const subscriptions = ctx.producer.obs; + ctx.producer.subscribe('sub', new Uint8Array([1, 2, 3]), ob); + + expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(` + { + "id": "sub:1", + "name": "sub", + "payload": Uint8Array [ + 1, + 2, + 3, + ], + "type": "subscribe", + } + `); + expect(subscriptions.has('sub:1')).toBe(true); + + // fake consumer return + ctx.handlers.next({ type: 'next', id: 'sub:1', data: 1 }); + ctx.handlers.next({ type: 'next', id: 'sub:1', data: 2 }); + ctx.handlers.next({ type: 'next', id: 'sub:1', data: 3 }); + + expect(subscriptions.has('sub:1')).toBe(true); + + ctx.handlers.complete({ type: 'complete', id: 'sub:1' }); + + expect(ob.next).toHaveBeenCalledTimes(3); + expect(ob.complete).toHaveBeenCalledTimes(1); + + expect(subscriptions.has('sub:1')).toBe(false); + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "id": "sub:1", + "type": "unsubscribe", + }, + ] + `); + + // smoking + ob = { + next: vi.fn(), + error: vi.fn(), + complete: vi.fn(), + }; + ctx.producer.subscribe('sub', new Uint8Array([1, 2, 3]), ob); + + expect(subscriptions.has('sub:2')).toBe(true); + + ctx.handlers.next({ type: 'next', id: 'sub:2', data: 1 }); + ctx.handlers.error({ + type: 'error', + id: 'sub:2', + error: new Error('test'), + }); + + expect(ob.next).toHaveBeenCalledTimes(1); + expect(ob.error).toHaveBeenCalledTimes(1); + + expect(subscriptions.has('sub')).toBe(false); + }); + + it('should transfer transferables with subscribe op', async ctx => { + const data = new Uint8Array([1, 2, 3]); + const unsubscribe = ctx.producer.subscribe( + 'bin', + transfer(data, [data.buffer]), + { + next: vi.fn(), + } + ); + + expect(data.byteLength).toBe(0); + + unsubscribe(); + }); + + it('should unsubscribe subscription op', ctx => { + const unsubscribe = ctx.producer.subscribe( + 'sub', + new Uint8Array([1, 2, 3]), + { + next: vi.fn(), + } + ); + + unsubscribe(); + + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "id": "sub:1", + "type": "unsubscribe", + }, + ] + `); + }); +}); diff --git a/packages/common/infra/src/op/__tests__/consumer.spec.ts b/packages/common/infra/src/op/__tests__/consumer.spec.ts new file mode 100644 index 0000000000..0c779059b0 --- /dev/null +++ b/packages/common/infra/src/op/__tests__/consumer.spec.ts @@ -0,0 +1,197 @@ +import { afterEach } from 'node:test'; + +import { Observable } from 'rxjs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { OpConsumer } from '../consumer'; +import { type MessageHandlers, transfer } from '../message'; +import type { OpSchema } from '../types'; + +interface TestOps extends OpSchema { + add: [{ a: number; b: number }, number]; + any: [any, any]; +} + +declare module 'vitest' { + interface TestContext { + consumer: OpConsumer; + handlers: MessageHandlers; + postMessage: ReturnType; + } +} + +describe('op consumer', () => { + beforeEach(ctx => { + const { port2 } = new MessageChannel(); + // @ts-expect-error patch postMessage + port2.postMessage = vi.fn(port2.postMessage); + // @ts-expect-error patch postMessage + ctx.postMessage = port2.postMessage; + ctx.consumer = new OpConsumer(port2); + // @ts-expect-error internal api + ctx.handlers = ctx.consumer.handlers; + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('should throw if no handler registered', async ctx => { + ctx.handlers.call({ type: 'call', id: 'add:1', name: 'add', payload: {} }); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "error": [Error: Handler for operation [add] is not registered.], + "id": "add:1", + "type": "return", + }, + ] + `); + }); + + it('should handle call message', async ctx => { + ctx.consumer.register('add', ({ a, b }) => a + b); + + ctx.handlers.call({ + type: 'call', + id: 'add:1', + name: 'add', + payload: { a: 1, b: 2 }, + }); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(` + { + "data": 3, + "id": "add:1", + "type": "return", + } + `); + }); + + it('should handle cancel message', async ctx => { + ctx.consumer.register('add', ({ a, b }, { signal }) => { + const { reject, resolve, promise } = Promise.withResolvers(); + + signal?.addEventListener('abort', () => { + reject(new Error('canceled')); + }); + + setTimeout(() => { + resolve(a + b); + }, Number.MAX_SAFE_INTEGER); + + return promise; + }); + + ctx.handlers.call({ + type: 'call', + id: 'add:1', + name: 'add', + payload: { a: 1, b: 2 }, + }); + ctx.handlers.cancel({ type: 'cancel', id: 'add:1' }); + + await vi.advanceTimersByTimeAsync(1); + + expect(ctx.postMessage).not.toBeCalled(); + }); + + it('should transfer transferables in return', async ctx => { + const data = new Uint8Array([1, 2, 3]); + const nonTransferred = new Uint8Array([4, 5, 6]); + + ctx.consumer.register('any', () => { + return transfer({ data: { data, nonTransferred } }, [data.buffer]); + }); + + ctx.handlers.call({ type: 'call', id: 'any:1', name: 'any', payload: {} }); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage).toHaveBeenCalledOnce(); + + expect(data.byteLength).toBe(0); + expect(nonTransferred.byteLength).toBe(3); + }); + + it('should handle subscribe message', async ctx => { + ctx.consumer.register('any', data => { + return new Observable(observer => { + data.forEach((v: number) => observer.next(v)); + observer.complete(); + }); + }); + + ctx.handlers.subscribe({ + type: 'subscribe', + id: 'any:1', + name: 'any', + payload: transfer(new Uint8Array([1, 2, 3]), [ + new Uint8Array([1, 2, 3]).buffer, + ]), + }); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.calls.map(call => call[0])) + .toMatchInlineSnapshot(` + [ + { + "data": 1, + "id": "any:1", + "type": "next", + }, + { + "data": 2, + "id": "any:1", + "type": "next", + }, + { + "data": 3, + "id": "any:1", + "type": "next", + }, + { + "id": "any:1", + "type": "complete", + }, + ] + `); + }); + + it('should handle unsubscribe message', async ctx => { + ctx.consumer.register('any', data => { + return new Observable(observer => { + data.forEach((v: number) => { + setTimeout(() => { + observer.next(v); + }, 1); + }); + setTimeout(() => { + observer.complete(); + }, 1); + }); + }); + + ctx.handlers.subscribe({ + type: 'subscribe', + id: 'any:1', + name: 'any', + payload: transfer(new Uint8Array([1, 2, 3]), [ + new Uint8Array([1, 2, 3]).buffer, + ]), + }); + + ctx.handlers.unsubscribe({ type: 'unsubscribe', id: 'any:1' }); + + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.calls).toMatchInlineSnapshot(` + [ + [ + { + "id": "any:1", + "type": "complete", + }, + ], + ] + `); + }); +}); diff --git a/packages/common/infra/src/op/__tests__/message.spec.ts b/packages/common/infra/src/op/__tests__/message.spec.ts new file mode 100644 index 0000000000..677ab0fc23 --- /dev/null +++ b/packages/common/infra/src/op/__tests__/message.spec.ts @@ -0,0 +1,76 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + AutoMessageHandler, + ignoreUnknownEvent, + KNOWN_MESSAGE_TYPES, + type MessageCommunicapable, + type MessageHandlers, +} from '../message'; + +class CustomMessageHandler extends AutoMessageHandler { + public handlers: Partial = { + call: vi.fn(), + cancel: vi.fn(), + subscribe: vi.fn(), + unsubscribe: vi.fn(), + return: vi.fn(), + next: vi.fn(), + error: vi.fn(), + complete: vi.fn(), + }; +} + +declare module 'vitest' { + interface TestContext { + sendPort: MessageCommunicapable; + receivePort: MessageCommunicapable; + handler: CustomMessageHandler; + } +} + +describe('message', () => { + beforeEach(ctx => { + const listeners: ((event: MessageEvent) => void)[] = []; + ctx.sendPort = { + postMessage: (msg: any) => { + listeners.forEach(listener => { + listener(new MessageEvent('message', { data: msg })); + }); + }, + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + }; + + ctx.receivePort = { + postMessage: vi.fn(), + addEventListener: vi.fn((_event, handler) => { + listeners.push(handler); + }), + removeEventListener: vi.fn(), + }; + ctx.handler = new CustomMessageHandler(ctx.receivePort); + ctx.handler.listen(); + }); + + it('should ignore unknown message type', ctx => { + const handler = vi.fn(); + // @ts-expect-error internal api + ctx.handler.handleMessage = ignoreUnknownEvent(handler); + + ctx.sendPort.postMessage('connected'); + ctx.sendPort.postMessage({ type: 'call1' }); + ctx.sendPort.postMessage(new Uint8Array()); + ctx.sendPort.postMessage(null); + ctx.sendPort.postMessage(undefined); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('should handle known message type', async ctx => { + for (const type of KNOWN_MESSAGE_TYPES) { + ctx.sendPort.postMessage({ type }); + expect(ctx.handler.handlers[type]).toBeCalled(); + } + }); +}); diff --git a/packages/common/infra/src/op/client.ts b/packages/common/infra/src/op/client.ts new file mode 100644 index 0000000000..bdf66b0d0a --- /dev/null +++ b/packages/common/infra/src/op/client.ts @@ -0,0 +1,206 @@ +import { merge } from 'lodash-es'; +import { Observable, type Observer } from 'rxjs'; + +import { + AutoMessageHandler, + type CallMessage, + type CancelMessage, + fetchTransferables, + type MessageCommunicapable, + type MessageHandlers, + type SubscribeMessage, + type UnsubscribeMessage, +} from './message'; +import type { OpInput, OpNames, OpOutput, OpSchema } from './types'; + +export interface CancelablePromise extends Promise { + cancel(): void; +} + +interface PendingCall extends PromiseWithResolvers { + id: string; + timeout: number | NodeJS.Timeout; +} + +interface OpClientOptions { + timeout?: number; +} + +export class OpClient extends AutoMessageHandler { + private readonly callIds = new Map, number>(); + private readonly pendingCalls = new Map(); + private readonly obs = new Map>(); + private readonly options: OpClientOptions = { + timeout: 3000, + }; + + constructor(port: MessageCommunicapable, options: OpClientOptions = {}) { + super(port); + merge(this.options, options); + } + + protected override get handlers() { + return { + return: this.handleReturnMessage, + next: this.handleSubscriptionNextMessage, + error: this.handleSubscriptionErrorMessage, + complete: this.handleSubscriptionCompleteMessage, + }; + } + + private readonly handleReturnMessage: MessageHandlers['return'] = msg => { + const pending = this.pendingCalls.get(msg.id); + if (!pending) { + return; + } + + if ('error' in msg) { + pending.reject(msg.error); + } else { + pending.resolve(msg.data); + } + clearTimeout(pending.timeout); + this.pendingCalls.delete(msg.id); + }; + + private readonly handleSubscriptionNextMessage: MessageHandlers['next'] = + msg => { + const ob = this.obs.get(msg.id); + if (!ob) { + return; + } + + ob.next(msg.data); + }; + + private readonly handleSubscriptionErrorMessage: MessageHandlers['error'] = + msg => { + const ob = this.obs.get(msg.id); + if (!ob) { + return; + } + + ob.error(msg.error); + }; + + private readonly handleSubscriptionCompleteMessage: MessageHandlers['complete'] = + msg => { + const ob = this.obs.get(msg.id); + if (!ob) { + return; + } + + ob.complete(); + }; + + protected nextCallId(op: OpNames) { + let id = this.callIds.get(op) ?? 0; + id++; + this.callIds.set(op, id); + + return `${op}:${id}`; + } + + protected currentCallId(op: OpNames) { + return this.callIds.get(op) ?? 0; + } + + call>( + op: Op, + ...args: OpInput + ): CancelablePromise> { + const promiseWithResolvers = Promise.withResolvers(); + const payload = args[0]; + + const msg = { + type: 'call', + id: this.nextCallId(op), + name: op as string, + payload, + } satisfies CallMessage; + + const promise = promiseWithResolvers.promise as CancelablePromise; + + const raise = (reason: string) => { + const pending = this.pendingCalls.get(msg.id); + if (!pending) { + return; + } + this.port.postMessage({ + type: 'cancel', + id: msg.id, + } satisfies CancelMessage); + promiseWithResolvers.reject(new Error(reason)); + clearTimeout(pending.timeout); + this.pendingCalls.delete(msg.id); + }; + + promise.cancel = () => { + raise('canceled'); + }; + + const timeout = setTimeout(() => { + raise('timeout'); + }, this.options.timeout); + + const transferables = fetchTransferables(payload); + + this.port.postMessage(msg, { transfer: transferables }); + this.pendingCalls.set(msg.id, { + ...promiseWithResolvers, + timeout, + id: msg.id, + }); + + return promise; + } + + subscribe, Out extends OpOutput>( + op: Op, + ...args: [ + ...OpInput, + Partial> | ((value: Out) => void), + ] + ): () => void { + const payload = args[0]; + const observer = args[1] as Partial> | ((value: Out) => void); + + const msg = { + type: 'subscribe', + id: this.nextCallId(op), + name: op as string, + payload, + } satisfies SubscribeMessage; + + const sub = new Observable(ob => { + this.obs.set(msg.id, ob); + }).subscribe(observer); + + sub.add(() => { + this.obs.delete(msg.id); + this.port.postMessage({ + type: 'unsubscribe', + id: msg.id, + } satisfies UnsubscribeMessage); + }); + + const transferables = fetchTransferables(payload); + this.port.postMessage(msg, { transfer: transferables }); + + return () => { + sub.unsubscribe(); + }; + } + + destroy() { + super.close(); + this.pendingCalls.forEach(call => { + call.reject(new Error('client destroyed')); + }); + this.pendingCalls.clear(); + this.obs.forEach(ob => { + ob.complete(); + }); + this.obs.clear(); + } +} diff --git a/packages/common/infra/src/op/consumer.ts b/packages/common/infra/src/op/consumer.ts new file mode 100644 index 0000000000..2f94300e34 --- /dev/null +++ b/packages/common/infra/src/op/consumer.ts @@ -0,0 +1,188 @@ +import EventEmitter2 from 'eventemitter2'; +import { + defer, + from, + fromEvent, + Observable, + of, + share, + take, + takeUntil, +} from 'rxjs'; + +import { + AutoMessageHandler, + type CallMessage, + fetchTransferables, + type MessageHandlers, + type ReturnMessage, + type SubscribeMessage, + type SubscriptionCompleteMessage, + type SubscriptionErrorMessage, + type SubscriptionNextMessage, +} from './message'; +import type { OpInput, OpNames, OpOutput, OpSchema } from './types'; + +interface OpCallContext { + signal: AbortSignal; +} + +export type OpHandler> = ( + payload: OpInput[0], + ctx: OpCallContext +) => + | OpOutput + | Promise> + | Observable>; + +export class OpConsumer extends AutoMessageHandler { + private readonly eventBus = new EventEmitter2(); + + private readonly registeredOpHandlers = new Map< + OpNames, + OpHandler + >(); + + private readonly processing = new Map(); + + override get handlers() { + return { + call: this.handleCallMessage, + cancel: this.handleCancelMessage, + subscribe: this.handleSubscribeMessage, + unsubscribe: this.handleCancelMessage, + }; + } + + private readonly handleCallMessage: MessageHandlers['call'] = async msg => { + const abortController = new AbortController(); + this.processing.set(msg.id, abortController); + + this.eventBus.emit(`before:${msg.name}`, msg.payload); + this.ob$(msg, abortController.signal) + .pipe(take(1)) + .subscribe({ + next: data => { + this.eventBus.emit(`after:${msg.name}`, msg.payload, data); + const transferables = fetchTransferables(data); + this.port.postMessage( + { + type: 'return', + id: msg.id, + data, + } satisfies ReturnMessage, + { transfer: transferables } + ); + }, + error: error => { + this.port.postMessage({ + type: 'return', + id: msg.id, + error: error as Error, + } satisfies ReturnMessage); + }, + complete: () => { + this.processing.delete(msg.id); + }, + }); + }; + + private readonly handleSubscribeMessage: MessageHandlers['subscribe'] = + msg => { + const abortController = new AbortController(); + this.processing.set(msg.id, abortController); + + this.ob$(msg, abortController.signal).subscribe({ + next: data => { + const transferables = fetchTransferables(data); + this.port.postMessage( + { + type: 'next', + id: msg.id, + data, + } satisfies SubscriptionNextMessage, + { transfer: transferables } + ); + }, + error: error => { + this.port.postMessage({ + type: 'error', + id: msg.id, + error: error as Error, + } satisfies SubscriptionErrorMessage); + }, + complete: () => { + this.port.postMessage({ + type: 'complete', + id: msg.id, + } satisfies SubscriptionCompleteMessage); + this.processing.delete(msg.id); + }, + }); + }; + + private readonly handleCancelMessage: MessageHandlers['cancel'] & + MessageHandlers['unsubscribe'] = msg => { + const abortController = this.processing.get(msg.id); + if (!abortController) { + return; + } + + abortController.abort(); + }; + + register>(op: Op, handler: OpHandler) { + this.registeredOpHandlers.set(op, handler); + } + + before>( + op: Op, + handler: (...input: OpInput) => void + ) { + this.eventBus.on(`before:${op}`, handler); + } + + after>( + op: Op, + handler: (...args: [...OpInput, OpOutput]) => void + ) { + this.eventBus.on(`after:${op}`, handler); + } + + /** + * @internal + */ + ob$(op: CallMessage | SubscribeMessage, signal: AbortSignal) { + return defer(() => { + const handler = this.registeredOpHandlers.get(op.name as any); + if (!handler) { + throw new Error( + `Handler for operation [${op.name}] is not registered.` + ); + } + + const ret$ = handler(op.payload, { signal }); + + let ob$: Observable; + if (ret$ instanceof Promise) { + ob$ = from(ret$); + } else if (ret$ instanceof Observable) { + ob$ = ret$; + } else { + ob$ = of(ret$); + } + + return ob$.pipe(share(), takeUntil(fromEvent(signal, 'abort'))); + }); + } + + destroy() { + super.close(); + this.registeredOpHandlers.clear(); + this.processing.forEach(controller => { + controller.abort(); + }); + this.processing.clear(); + this.eventBus.removeAllListeners(); + } +} diff --git a/packages/common/infra/src/op/index.ts b/packages/common/infra/src/op/index.ts new file mode 100644 index 0000000000..501aa75437 --- /dev/null +++ b/packages/common/infra/src/op/index.ts @@ -0,0 +1,4 @@ +export * from './client'; +export * from './consumer'; +export { type MessageCommunicapable, transfer } from './message'; +export type { OpSchema } from './types'; diff --git a/packages/common/infra/src/op/message.ts b/packages/common/infra/src/op/message.ts new file mode 100644 index 0000000000..852cdc6d29 --- /dev/null +++ b/packages/common/infra/src/op/message.ts @@ -0,0 +1,155 @@ +const PRODUCER_MESSAGE_TYPES = [ + 'call', + 'cancel', + 'subscribe', + 'unsubscribe', +] as const; +const CONSUMER_MESSAGE_TYPES = ['return', 'next', 'error', 'complete'] as const; +export const KNOWN_MESSAGE_TYPES = new Set([ + ...PRODUCER_MESSAGE_TYPES, + ...CONSUMER_MESSAGE_TYPES, +]); + +type MessageType = + | (typeof PRODUCER_MESSAGE_TYPES)[number] + | (typeof CONSUMER_MESSAGE_TYPES)[number]; + +export interface Message { + type: MessageType; +} + +// in +export interface CallMessage extends Message { + type: 'call'; + id: string; + name: string; + payload: any; +} + +export interface CancelMessage extends Message { + type: 'cancel'; + id: string; +} + +export interface SubscribeMessage extends Message { + type: 'subscribe'; + id: string; + name: string; + payload: any; +} + +export interface UnsubscribeMessage extends Message { + type: 'unsubscribe'; + id: string; +} + +// out +export type ReturnMessage = { + type: 'return'; + id: string; +} & ( + | { + data: any; + } + | { + error: Error; + } +); + +export interface SubscriptionNextMessage extends Message { + type: 'next'; + id: string; + data: any; +} + +export interface SubscriptionErrorMessage extends Message { + type: 'error'; + id: string; + error: Error; +} + +export type SubscriptionCompleteMessage = { + type: 'complete'; + id: string; +}; + +export type Messages = + | CallMessage + | CancelMessage + | SubscribeMessage + | UnsubscribeMessage + | ReturnMessage + | SubscriptionNextMessage + | SubscriptionErrorMessage + | SubscriptionCompleteMessage; + +export type MessageHandlers = { + [Type in Messages['type']]: ( + message: Extract + ) => void; +}; + +export type MessageCommunicapable = Pick< + MessagePort, + 'postMessage' | 'addEventListener' | 'removeEventListener' +> & { + start?(): void; + close?(): void; +}; + +export function ignoreUnknownEvent(handler: (data: Messages) => void) { + return (event: MessageEvent) => { + const data = event.data; + + if ( + !data || + typeof data !== 'object' || + typeof data.type !== 'string' || + !KNOWN_MESSAGE_TYPES.has(data.type) + ) { + return; + } + + handler(data as any); + }; +} + +const TRANSFERABLES_CACHE = new Map(); +export function transfer(data: T, transferables: Transferable[]): T { + TRANSFERABLES_CACHE.set(data, transferables); + return data; +} + +export function fetchTransferables(data: any): Transferable[] | undefined { + const transferables = TRANSFERABLES_CACHE.get(data); + if (transferables) { + TRANSFERABLES_CACHE.delete(data); + } + + return transferables; +} + +export abstract class AutoMessageHandler { + protected abstract handlers: Partial; + + constructor(protected readonly port: MessageCommunicapable) {} + + protected handleMessage = ignoreUnknownEvent((msg: Messages) => { + const handler = this.handlers[msg.type]; + if (!handler) { + return; + } + + handler(msg as any); + }); + + listen() { + this.port.addEventListener('message', this.handleMessage); + this.port.start?.(); + } + + close() { + this.port.close?.(); + this.port.removeEventListener('message', this.handleMessage); + } +} diff --git a/packages/common/infra/src/op/types.ts b/packages/common/infra/src/op/types.ts new file mode 100644 index 0000000000..7de14e03e2 --- /dev/null +++ b/packages/common/infra/src/op/types.ts @@ -0,0 +1,36 @@ +type KeyToKey = { + [K in keyof T]: string extends K ? never : K; +}; + +declare type ValuesOf = T extends { + [K in keyof T]: infer _U; +} + ? _U + : never; + +export interface OpSchema { + [key: string]: [any, any?]; +} + +type RequiredInput = In extends void ? [] : In extends never ? [] : [In]; + +export type OpNames = ValuesOf>; +export type OpInput< + Ops extends OpSchema, + Type extends OpNames, +> = Type extends keyof Ops + ? Ops[Type] extends [infer In] + ? RequiredInput + : Ops[Type] extends [infer In, infer _Out] + ? RequiredInput + : never + : never; + +export type OpOutput< + Ops extends OpSchema, + Type extends OpNames, +> = Type extends keyof Ops + ? Ops[Type] extends [infer _In, infer Out] + ? Out + : never + : never; diff --git a/yarn.lock b/yarn.lock index 162b865915..655c6411d9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -13075,6 +13075,7 @@ __metadata: "@blocksuite/affine": "npm:0.17.28" "@datastructures-js/binary-search-tree": "npm:^5.3.2" "@testing-library/react": "npm:^16.0.0" + eventemitter2: "npm:^6.4.9" fake-indexeddb: "npm:^6.0.0" foxact: "npm:^0.2.33" fractional-indexing: "npm:^3.2.0" @@ -19984,7 +19985,7 @@ __metadata: languageName: node linkType: hard -"eventemitter2@npm:6.4.9": +"eventemitter2@npm:6.4.9, eventemitter2@npm:^6.4.9": version: 6.4.9 resolution: "eventemitter2@npm:6.4.9" checksum: 10/b829b1c6b11e15926b635092b5ad62b4463d1c928859831dcae606e988cf41893059e3541f5a8209d21d2f15314422ddd4d84d20830b4bf44978608d15b06b08