From 3dde47dd08a248c9f95f3b152f44d422f013e685 Mon Sep 17 00:00:00 2001 From: forehalo Date: Fri, 14 Feb 2025 11:29:02 +0000 Subject: [PATCH] fix(server): event handler bindings (#10165) --- packages/backend/server/package.json | 2 +- .../src/__tests__/event/cluster.spec.ts | 30 ++- .../src/__tests__/event/eventbus.spec.ts | 144 +++++++++--- .../server/src/__tests__/event/provider.ts | 31 ++- .../backend/server/src/base/event/eventbus.ts | 210 ++++++++++++------ .../backend/server/src/base/event/index.ts | 12 +- packages/backend/server/src/base/index.ts | 9 +- .../backend/server/src/base/metrics/utils.ts | 83 +++---- .../server/src/base/nestjs/decorator.ts | 45 ++++ .../backend/server/src/base/nestjs/index.ts | 1 + .../backend/server/src/base/utils/request.ts | 4 +- .../backend/server/src/core/sync/gateway.ts | 2 +- .../server/src/core/workspaces/event.ts | 81 +++++++ .../server/src/core/workspaces/index.ts | 2 + .../src/core/workspaces/resolvers/team.ts | 69 ------ .../server/src/plugins/payment/cron.ts | 3 +- .../server/src/plugins/payment/quota.ts | 2 +- yarn.lock | 16 +- 18 files changed, 486 insertions(+), 260 deletions(-) create mode 100644 packages/backend/server/src/base/nestjs/decorator.ts create mode 100644 packages/backend/server/src/core/workspaces/event.ts diff --git a/packages/backend/server/package.json b/packages/backend/server/package.json index 94e17998f3..80be0714ad 100644 --- a/packages/backend/server/package.json +++ b/packages/backend/server/package.json @@ -31,7 +31,6 @@ "@nestjs/apollo": "^12.2.2", "@nestjs/common": "^10.4.15", "@nestjs/core": "^10.4.15", - "@nestjs/event-emitter": "^2.1.1", "@nestjs/graphql": "^12.2.2", "@nestjs/platform-express": "^10.4.15", "@nestjs/platform-socket.io": "^10.4.15", @@ -62,6 +61,7 @@ "@socket.io/redis-adapter": "^8.3.0", "cookie-parser": "^1.4.7", "dotenv": "^16.4.7", + "eventemitter2": "^6.4.9", "eventsource-parser": "^3.0.0", "express": "^4.21.2", "fast-xml-parser": "^4.5.0", diff --git a/packages/backend/server/src/__tests__/event/cluster.spec.ts b/packages/backend/server/src/__tests__/event/cluster.spec.ts index 3e4394c0b4..c51e919313 100644 --- a/packages/backend/server/src/__tests__/event/cluster.spec.ts +++ b/packages/backend/server/src/__tests__/event/cluster.spec.ts @@ -51,20 +51,32 @@ test('should broadcast event to cluster instances', async t => { // app 2 for broadcasting const eventbus2 = app2.get(EventBus); - const cls = ClsServiceManager.getClsService(); - cls.run(() => { - cls.set(CLS_ID, 'test-request-id'); - eventbus2.broadcast('__test__.event', { count: 0, requestId: cls.getId() }); - }); + eventbus2.broadcast('__test__.event', { count: 0 }); // cause the cross instances broadcasting is asynchronization calling // we should wait for the event's arriving before asserting await eventbus1.waitFor('__test__.event'); - t.true(listener.calledOnceWith({ count: 0, requestId: 'test-request-id' })); - t.true( - runtimeListener.calledOnceWith({ count: 0, requestId: 'test-request-id' }) - ); + t.true(listener.calledOnceWith({ count: 0 })); + t.true(runtimeListener.calledOnceWith({ count: 0 })); off(); }); + +test('should continuously use the same request id', async t => { + const { app1, app2 } = t.context; + + const eventbus1 = app1.get(EventBus); + const eventbus2 = app2.get(EventBus); + + const listener = Sinon.spy(app1.get(Listeners), 'onRequestId'); + + const cls = ClsServiceManager.getClsService(); + cls.run(() => { + cls.set(CLS_ID, 'test-request-id'); + eventbus2.broadcast('__test__.requestId', {}); + }); + + await eventbus1.waitFor('__test__.requestId'); + t.true(listener.lastCall.returned('test-request-id')); +}); diff --git a/packages/backend/server/src/__tests__/event/eventbus.spec.ts b/packages/backend/server/src/__tests__/event/eventbus.spec.ts index dc9e021771..be247227a7 100644 --- a/packages/backend/server/src/__tests__/event/eventbus.spec.ts +++ b/packages/backend/server/src/__tests__/event/eventbus.spec.ts @@ -1,5 +1,6 @@ import { TestingModule } from '@nestjs/testing'; import ava, { TestFn } from 'ava'; +import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; import Sinon from 'sinon'; import { EventBus, metrics } from '../../base'; @@ -9,7 +10,7 @@ import { Listeners } from './provider'; export const test = ava as TestFn<{ module: TestingModule; eventbus: EventBus; - listener: Sinon.SinonSpy; + listeners: Sinon.SinonSpiedInstance; }>; test.before(async t => { @@ -19,30 +20,20 @@ test.before(async t => { const eventbus = m.get(EventBus); t.context.module = m; t.context.eventbus = eventbus; - t.context.listener = Sinon.spy(m.get(Listeners), 'onTestEvent'); }); -test.afterEach(() => { - Sinon.reset(); +test.beforeEach(t => { + Sinon.restore(); + const { module } = t.context; + t.context.listeners = Sinon.spy(module.get(Listeners)); }); test.after(async t => { await t.context.module.close(); }); -test('should register event listener', t => { - const { eventbus } = t.context; - - // @ts-expect-error private member - t.true(eventbus.emitter.eventNames().includes('__test__.event')); - - eventbus.on('__test__.event2', () => {}); - // @ts-expect-error private member - t.true(eventbus.emitter.eventNames().includes('__test__.event2')); -}); - test('should dispatch event listener', t => { - const { eventbus, listener } = t.context; + const { eventbus, listeners } = t.context; const runtimeListener = Sinon.stub(); const off = eventbus.on('__test__.event', runtimeListener); @@ -50,29 +41,53 @@ test('should dispatch event listener', t => { const payload = { count: 0 }; eventbus.emit('__test__.event', payload); - t.true(listener.calledOnceWithExactly(payload)); + t.true(listeners.onTestEvent.calledOnceWithExactly(payload)); t.true(runtimeListener.calledOnceWithExactly(payload)); off(); }); test('should dispatch async event listener', async t => { - const { eventbus, listener } = t.context; + const { eventbus, listeners } = t.context; - const runtimeListener = Sinon.stub().returns({ count: 2 }); + const runtimeListener = Sinon.stub().returnsArg(0); const off = eventbus.on('__test__.event', runtimeListener); const payload = { count: 0 }; const returns = await eventbus.emitAsync('__test__.event', payload); - t.true(listener.calledOnceWithExactly(payload)); + t.true(listeners.onTestEvent.calledOnceWithExactly(payload)); + t.true(listeners.onTestEventAndEvent2.calledOnceWithExactly(payload)); t.true(runtimeListener.calledOnceWithExactly(payload)); - t.deepEqual(returns, [{ count: 1 }, { count: 2 }]); + t.deepEqual(returns, [payload, payload, payload]); off(); }); +test('should dispatch multiple event handlers with same name', async t => { + const { eventbus, listeners } = t.context; + + const payload = { count: 0 }; + await eventbus.emitAsync('__test__.event', payload); + + t.true(listeners.onTestEvent.calledOnceWithExactly(payload)); + t.true(listeners.onTestEventAndEvent2.calledOnceWithExactly(payload)); +}); + +test('should dispatch event listener with multiple event names', async t => { + const { eventbus, listeners } = t.context; + + const payload = { count: 0 }; + await eventbus.emitAsync('__test__.event', payload); + + t.like(listeners.onTestEventAndEvent2.lastCall.args[0], payload); + + await eventbus.emitAsync('__test__.event2', payload); + + t.like(listeners.onTestEventAndEvent2.lastCall.args[0], payload); +}); + test('should record event handler call metrics', async t => { const { eventbus } = t.context; const timerStub = Sinon.stub( @@ -86,26 +101,103 @@ test('should record event handler call metrics', async t => { await eventbus.emitAsync('__test__.event', { count: 0 }); - t.deepEqual(timerStub.getCall(0).args[1], { + t.true(timerStub.calledTwice); + t.deepEqual(timerStub.firstCall.args[1], { name: 'event_handler', event: '__test__.event', namespace: '__test__', + handler: 'Listeners.onTestEvent', + error: false, + }); + t.deepEqual(timerStub.lastCall.args[1], { + name: 'event_handler', + event: '__test__.event', + namespace: '__test__', + handler: 'Listeners.onTestEventAndEvent2', error: false, }); - t.deepEqual(counterStub.getCall(0).args[1], { + t.true(counterStub.calledTwice); + t.deepEqual(counterStub.firstCall.args[1], { + name: 'event_handler', event: '__test__.event', namespace: '__test__', + handler: 'Listeners.onTestEvent', + error: false, + }); + t.deepEqual(counterStub.lastCall.args[1], { + name: 'event_handler', + event: '__test__.event', + namespace: '__test__', + handler: 'Listeners.onTestEventAndEvent2', + error: false, }); - Sinon.reset(); + timerStub.reset(); + counterStub.reset(); + await eventbus.emitAsync('__test__.event2', { count: 0 }); - await eventbus.emitAsync('__test__.throw', { count: 0 }); + t.true(timerStub.calledOnce); + t.deepEqual(timerStub.firstCall.args[1], { + name: 'event_handler', + event: '__test__.event2', + namespace: '__test__', + handler: 'Listeners.onTestEventAndEvent2', + error: false, + }); - t.deepEqual(timerStub.getCall(0).args[1], { + t.true(counterStub.calledOnce); + t.deepEqual(counterStub.firstCall.args[1], { + name: 'event_handler', + event: '__test__.event2', + namespace: '__test__', + handler: 'Listeners.onTestEventAndEvent2', + error: false, + }); + + timerStub.reset(); + counterStub.reset(); + try { + await eventbus.emitAsync('__test__.throw', { count: 0 }); + } catch { + // noop + } + + t.true(timerStub.calledOnce); + t.deepEqual(timerStub.firstCall.args[1], { name: 'event_handler', event: '__test__.throw', namespace: '__test__', + handler: 'Listeners.onThrow', + error: true, + }); + + t.true(counterStub.calledOnce); + t.deepEqual(counterStub.firstCall.args[1], { + name: 'event_handler', + event: '__test__.throw', + namespace: '__test__', + handler: 'Listeners.onThrow', error: true, }); }); + +test('should generate request id for event', async t => { + const { eventbus, listeners } = t.context; + + await eventbus.emitAsync('__test__.requestId', {}); + + t.true(listeners.onRequestId.lastCall.returnValue.includes(':event/')); +}); + +test('should continuously use the same request id', async t => { + const { eventbus, listeners } = t.context; + + const cls = ClsServiceManager.getClsService(); + await cls.run(async () => { + cls.set(CLS_ID, 'test-request-id'); + await eventbus.emitAsync('__test__.requestId', {}); + }); + + t.true(listeners.onRequestId.lastCall.returned('test-request-id')); +}); diff --git a/packages/backend/server/src/__tests__/event/provider.ts b/packages/backend/server/src/__tests__/event/provider.ts index 4c71744832..58b67e9b62 100644 --- a/packages/backend/server/src/__tests__/event/provider.ts +++ b/packages/backend/server/src/__tests__/event/provider.ts @@ -1,31 +1,40 @@ import { Injectable } from '@nestjs/common'; +import { ClsServiceManager } from 'nestjs-cls'; -import { OnEvent } from '../../base'; +import { genRequestId, OnEvent } from '../../base'; declare global { interface Events { - '__test__.event': { count: number; requestId?: string }; + '__test__.event': { count: number }; '__test__.event2': { count: number }; '__test__.throw': { count: number }; + '__test__.requestId': {}; } } @Injectable() export class Listeners { @OnEvent('__test__.event') - onTestEvent({ count, requestId }: Events['__test__.event']) { - return requestId - ? { - count: count + 1, - requestId, - } - : { - count: count + 1, - }; + onTestEvent(payload: Events['__test__.event']) { + return payload; + } + + @OnEvent('__test__.event') + @OnEvent('__test__.event2') + onTestEventAndEvent2( + payload: Events['__test__.event'] | Events['__test__.event2'] + ) { + return payload; } @OnEvent('__test__.throw') onThrow() { throw new Error('Error in event handler'); } + + @OnEvent('__test__.requestId') + onRequestId() { + const cls = ClsServiceManager.getClsService(); + return cls.getId() ?? genRequestId('event'); + } } diff --git a/packages/backend/server/src/base/event/eventbus.ts b/packages/backend/server/src/base/event/eventbus.ts index 8b017369f8..d7c2799d6c 100644 --- a/packages/backend/server/src/base/event/eventbus.ts +++ b/packages/backend/server/src/base/event/eventbus.ts @@ -1,59 +1,44 @@ import { - applyDecorators, Injectable, Logger, OnApplicationBootstrap, + OnModuleInit, } from '@nestjs/common'; -import { - EventEmitter2, - EventEmitterReadinessWatcher, - OnEvent as RawOnEvent, - OnEventMetadata, -} from '@nestjs/event-emitter'; +import { DiscoveryService, MetadataScanner } from '@nestjs/core'; import { OnGatewayConnection, WebSocketGateway, WebSocketServer, } from '@nestjs/websockets'; -import { CLS_ID, ClsService } from 'nestjs-cls'; +import EventEmitter2, { type OnOptions } from 'eventemitter2'; +import { CLS_ID, ClsService, ClsServiceManager } from 'nestjs-cls'; import type { Server, Socket } from 'socket.io'; -import { CallMetric } from '../metrics'; +import { wrapCallMetric } from '../metrics'; +import { PushMetadata, sliceMetadata } from '../nestjs'; import { genRequestId } from '../utils'; import type { EventName } from './def'; -const EventHandlerWrapper = (event: EventName): MethodDecorator => { - // @ts-expect-error allow - return ( - _target, - key, - desc: TypedPropertyDescriptor<(...args: any[]) => any> - ) => { - const originalMethod = desc.value; - if (!originalMethod) { - return desc; - } +const EVENT_LISTENER_METADATA = Symbol('event_listener'); +interface EventHandlerMetadata { + namespace: string; + event: EventName; + opts?: OnOptions; +} - desc.value = function (...args: any[]) { - new Logger(EventBus.name).log( - `Event handler: ${event} (${key.toString()})` - ); - return originalMethod.apply(this, args); - }; - }; -}; +interface EventOptions extends OnOptions { + prepend?: boolean; + name?: string; + suppressError?: boolean; +} -export const OnEvent = ( - event: EventName, - opts?: OnEventMetadata['options'] -) => { +export const OnEvent = (event: EventName, opts?: EventOptions) => { const namespace = event.split('.')[0]; - - return applyDecorators( - EventHandlerWrapper(event), - CallMetric('event', 'event_handler', undefined, { event, namespace }), - RawOnEvent(event, opts) - ); + return PushMetadata(EVENT_LISTENER_METADATA, { + namespace, + event, + opts, + }); }; /** @@ -63,7 +48,9 @@ export const OnEvent = ( namespace: 's2s', }) @Injectable() -export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { +export class EventBus + implements OnGatewayConnection, OnApplicationBootstrap, OnModuleInit +{ private readonly logger = new Logger(EventBus.name); @WebSocketServer() @@ -71,8 +58,9 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { constructor( private readonly emitter: EventEmitter2, - private readonly watcher: EventEmitterReadinessWatcher, - private readonly cls: ClsService + private readonly cls: ClsService, + private readonly discovery: DiscoveryService, + private readonly scanner: MetadataScanner ) {} handleConnection(client: Socket) { @@ -83,27 +71,21 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { client.disconnect(); } + async onModuleInit() { + this.bindEventHandlers(); + } + async onApplicationBootstrap() { - this.watcher - .waitUntilReady() - .then(() => { - const events = this.emitter.eventNames() as EventName[]; - events.forEach(event => { - // Proxy all events received from server(trigger by `server.serverSideEmit`) - // to internal event system - this.server?.on(event, (payload, requestId?: string) => { - this.cls.run(() => { - requestId = requestId ?? genRequestId('se'); - this.cls.set(CLS_ID, requestId); - this.logger.log(`Server Event: ${event} (Received)`); - this.emit(event, payload); - }); - }); - }); - }) - .catch(() => { - // startup time promise, never throw at runtime + // Proxy all events received from server(trigger by `server.serverSideEmit`) + // to internal event system + this.server?.on('broadcast', (event, payload, requestId?: string) => { + this.cls.run(() => { + requestId = requestId ?? genRequestId('event'); + this.cls.set(CLS_ID, requestId); + this.logger.log(`Server Event: ${event} (Received)`); + this.emit(event, payload); }); + }); } /** @@ -127,22 +109,122 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { */ broadcast(event: T, payload: Events[T]) { this.logger.log(`Server Event: ${event} (Send)`); - this.server?.serverSideEmit(event, payload, this.cls.getId()); + this.server?.serverSideEmit('broadcast', event, payload, this.cls.getId()); } on( event: T, listener: (payload: Events[T]) => void | Promise, - opts?: OnEventMetadata['options'] + opts: EventOptions = {} ) { - this.emitter.on(event, listener as any, opts); + const namespace = event.split('.')[0]; + const { name, prepend, suppressError } = opts; + let signature = name ?? listener.name ?? 'anonymous fn'; + + const add = prepend ? this.emitter.prependListener : this.emitter.on; + + const handler = wrapCallMetric( + async (payload: any) => { + this.logger.verbose(`Handle event [${event}] (${signature})`); + + const cls = ClsServiceManager.getClsService(); + return await cls.run({ ifNested: 'reuse' }, async () => { + const requestId = cls.getId(); + if (!requestId) { + cls.set(CLS_ID, genRequestId('event')); + } + try { + return await listener(payload); + } catch (e) { + if (suppressError) { + this.logger.error( + `Error happened when handling event [${event}] (${signature})`, + e + ); + } else { + throw e; + } + } + }); + }, + 'event', + 'event_handler', + { + event, + namespace, + handler: signature, + } + ); + + add.call(this.emitter, event, handler as any, opts); + + this.logger.verbose( + `Event handler for [${event}] registered ${name ? `in [${name}]` : ''}` + ); return () => { - this.emitter.off(event, listener as any); + this.emitter.off(event, handler as any); }; } waitFor(name: T, timeout?: number) { return this.emitter.waitFor(name, timeout); } + + private bindEventHandlers() { + // make sure all our job handlers defined in [Providers] to make the code organization clean. + // const providers = [...this.discovery.getProviders(), this.discovery.getControllers()] + const providers = this.discovery.getProviders(); + + providers.forEach(wrapper => { + const { instance, name } = wrapper; + if (!instance || wrapper.isAlias) { + return; + } + + const proto = Object.getPrototypeOf(instance); + const methods = this.scanner.getAllMethodNames(proto); + + methods.forEach(method => { + const fn = instance[method]; + + let defs = sliceMetadata( + EVENT_LISTENER_METADATA, + fn + ); + + if (defs.length === 0) { + return; + } + + const signature = `${name}.${method}`; + + if (typeof fn !== 'function') { + throw new Error(`Event handler [${signature}] is not a function.`); + } + + if (!wrapper.isDependencyTreeStatic()) { + throw new Error( + `Provider [${name}] could not be RequestScoped or TransientScoped injectable if it contains event handlers.` + ); + } + + defs.forEach(({ event, opts }) => { + this.on( + event, + (payload: any) => { + // NOTE(@forehalo): + // we might create spies on the event handlers when testing, + // avoid reusing `fn` variable to fail the spies or stubs + return instance[method](payload); + }, + { + ...opts, + name: signature, + } + ); + }); + }); + }); + } } diff --git a/packages/backend/server/src/base/event/index.ts b/packages/backend/server/src/base/event/index.ts index 9dea08d1eb..5c6f008309 100644 --- a/packages/backend/server/src/base/event/index.ts +++ b/packages/backend/server/src/base/event/index.ts @@ -1,12 +1,18 @@ import { Global, Module } from '@nestjs/common'; -import { EventEmitterModule } from '@nestjs/event-emitter'; +import { DiscoveryModule } from '@nestjs/core'; +import EventEmitter2 from 'eventemitter2'; import { EventBus, OnEvent } from './eventbus'; +const EmitProvider = { + provide: EventEmitter2, + useValue: new EventEmitter2(), +}; + @Global() @Module({ - imports: [EventEmitterModule.forRoot({ global: false })], - providers: [EventBus], + imports: [DiscoveryModule], + providers: [EventBus, EmitProvider], exports: [EventBus], }) export class EventModule {} diff --git a/packages/backend/server/src/base/index.ts b/packages/backend/server/src/base/index.ts index e33a27dfe9..a9a6f60817 100644 --- a/packages/backend/server/src/base/index.ts +++ b/packages/backend/server/src/base/index.ts @@ -28,14 +28,7 @@ export { AFFiNELogger } from './logger'; export { MailService } from './mailer'; export { CallMetric, metrics } from './metrics'; export { Lock, Locker, Mutex, RequestMutex } from './mutex'; -export { - GatewayErrorWrapper, - getOptionalModuleMetadata, - GlobalExceptionFilter, - mapAnyError, - mapSseError, - OptionalModule, -} from './nestjs'; +export * from './nestjs'; export { type PrismaTransaction } from './prisma'; export { Runtime } from './runtime'; export * from './storage'; diff --git a/packages/backend/server/src/base/metrics/utils.ts b/packages/backend/server/src/base/metrics/utils.ts index 59ff8ba945..f054dee6f3 100644 --- a/packages/backend/server/src/base/metrics/utils.ts +++ b/packages/backend/server/src/base/metrics/utils.ts @@ -1,5 +1,6 @@ import type { Attributes } from '@opentelemetry/api'; +import { makeMethodDecorator } from '../nestjs/decorator'; import { type KnownMetricScopes, metrics } from './metrics'; /** @@ -9,57 +10,41 @@ import { type KnownMetricScopes, metrics } from './metrics'; * @param attrs attributes * @returns */ -export const CallMetric = ( +export const CallMetric = makeMethodDecorator( + (scope: KnownMetricScopes, name: string, attrs?: Attributes) => { + return (_target, _key, fn) => { + return wrapCallMetric(fn, scope, name, attrs); + }; + } +); + +export function wrapCallMetric any>( + fn: Fn, scope: KnownMetricScopes, name: string, - record?: { timer?: boolean; count?: boolean; error?: boolean }, attrs?: Attributes -): MethodDecorator => { - // @ts-expect-error allow - return ( - _target, - _key, - desc: TypedPropertyDescriptor<(...args: any[]) => any> - ) => { - const originalMethod = desc.value; - if (!originalMethod) { - return desc; +) { + return async function (this: any, ...args: any[]) { + const start = Date.now(); + let error = false; + + try { + return await fn.call(this, ...args); + } catch (err) { + error = true; + throw err; + } finally { + const count = metrics[scope].counter('function_calls', { + description: 'function call counter', + }); + + const timer = metrics[scope].histogram('function_timer', { + description: 'function call time costs', + unit: 'ms', + }); + + count.add(1, { ...attrs, name, error }); + timer.record(Date.now() - start, { ...attrs, name, error }); } - - const timer = metrics[scope].histogram('function_timer', { - description: 'function call time costs', - unit: 'ms', - }); - const count = metrics[scope].counter('function_calls', { - description: 'function call counter', - }); - - desc.value = async function (...args: any[]) { - const start = Date.now(); - let error = false; - - const end = () => { - timer?.record(Date.now() - start, { ...attrs, name, error }); - }; - - try { - if (!record || !!record.count) { - count.add(1, attrs); - } - return await originalMethod.apply(this, args); - } catch (err) { - if (!record || !!record.error) { - error = true; - } - throw err; - } finally { - count.add(1, { ...attrs, name, error }); - if (!record || !!record.timer) { - end(); - } - } - }; - - return desc; }; -}; +} diff --git a/packages/backend/server/src/base/nestjs/decorator.ts b/packages/backend/server/src/base/nestjs/decorator.ts new file mode 100644 index 0000000000..21b2da2df6 --- /dev/null +++ b/packages/backend/server/src/base/nestjs/decorator.ts @@ -0,0 +1,45 @@ +export function makeMethodDecorator< + T extends any[], + Fn extends (...args: any[]) => any, +>( + decorator: (...args: T) => (target: any, key: string | symbol, fn: Fn) => Fn +) { + return (...args: T) => { + return ( + target: any, + key: string | symbol, + desc: TypedPropertyDescriptor + ) => { + const originalFn = desc.value; + if (!originalFn || typeof originalFn !== 'function') { + throw new Error( + `MethodDecorator must be applied to a function but got ${typeof originalFn}` + ); + } + + const decoratedFn = decorator(...args)(target, key, originalFn); + desc.value = decoratedFn; + return desc; + }; + }; +} + +export function PushMetadata(key: string | symbol, value: T) { + const decorator: ClassDecorator | MethodDecorator = ( + target, + _, + descriptor + ) => { + const metadataTarget = descriptor?.value ?? target; + + const metadataArray = Reflect.getMetadata(key, metadataTarget) || []; + metadataArray.push(value); + Reflect.defineMetadata(key, metadataArray, metadataTarget); + }; + + return decorator; +} + +export function sliceMetadata(key: string | symbol, target: any): T[] { + return Reflect.getMetadata(key, target) || []; +} diff --git a/packages/backend/server/src/base/nestjs/index.ts b/packages/backend/server/src/base/nestjs/index.ts index dc26fb62cf..94b5282c9b 100644 --- a/packages/backend/server/src/base/nestjs/index.ts +++ b/packages/backend/server/src/base/nestjs/index.ts @@ -1,3 +1,4 @@ import './config'; +export * from './decorator'; export * from './exception'; export * from './optional-module'; diff --git a/packages/backend/server/src/base/utils/request.ts b/packages/backend/server/src/base/utils/request.ts index 80b1aa503f..f39646d42d 100644 --- a/packages/backend/server/src/base/utils/request.ts +++ b/packages/backend/server/src/base/utils/request.ts @@ -87,11 +87,11 @@ export function parseCookies( * - `graphql`: graphql request * - `http`: http request * - `ws`: websocket request - * - `se`: server event + * - `event`: event * - `job`: cron job * - `rpc`: rpc request */ -export type RequestType = GqlContextType | 'se' | 'job'; +export type RequestType = GqlContextType | 'event' | 'job'; export function genRequestId(type: RequestType) { return `${AFFiNE.flavor.type}:${type}/${randomUUID()}`; diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts index f871b98e5e..9e0cd1887f 100644 --- a/packages/backend/server/src/core/sync/gateway.ts +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -34,7 +34,7 @@ import { DocID } from '../utils/doc'; const SubscribeMessage = (event: string) => applyDecorators( GatewayErrorWrapper(event), - CallMetric('socketio', 'event_duration', undefined, { event }), + CallMetric('socketio', 'event_duration', { event }), RawSubscribeMessage(event) ); diff --git a/packages/backend/server/src/core/workspaces/event.ts b/packages/backend/server/src/core/workspaces/event.ts new file mode 100644 index 0000000000..324ba94347 --- /dev/null +++ b/packages/backend/server/src/core/workspaces/event.ts @@ -0,0 +1,81 @@ +import { Injectable } from '@nestjs/common'; + +import { OnEvent } from '../../base'; +import { Models } from '../../models'; +import { WorkspaceService } from './resolvers/service'; + +@Injectable() +export class WorkspaceEvents { + constructor( + private readonly workspaceService: WorkspaceService, + private readonly models: Models + ) {} + + @OnEvent('workspace.members.reviewRequested') + async onReviewRequested({ + inviteId, + }: Events['workspace.members.reviewRequested']) { + // send review request mail to owner and admin + await this.workspaceService.sendReviewRequestedEmail(inviteId); + } + + @OnEvent('workspace.members.requestApproved') + async onApproveRequest({ + inviteId, + }: Events['workspace.members.requestApproved']) { + // send approve mail + await this.workspaceService.sendReviewApproveEmail(inviteId); + } + + @OnEvent('workspace.members.requestDeclined') + async onDeclineRequest({ + userId, + workspaceId, + }: Events['workspace.members.requestDeclined']) { + const user = await this.models.user.getPublicUser(userId); + // send decline mail + await this.workspaceService.sendReviewDeclinedEmail( + user?.email, + workspaceId + ); + } + + @OnEvent('workspace.members.roleChanged') + async onRoleChanged({ + userId, + workspaceId, + permission, + }: Events['workspace.members.roleChanged']) { + // send role changed mail + await this.workspaceService.sendRoleChangedEmail(userId, { + id: workspaceId, + role: permission, + }); + } + + @OnEvent('workspace.members.ownershipTransferred') + async onOwnerTransferred({ + workspaceId, + from, + to, + }: Events['workspace.members.ownershipTransferred']) { + // send ownership transferred mail + const fromUser = await this.models.user.getPublicUser(from); + const toUser = await this.models.user.getPublicUser(to); + + if (fromUser) { + await this.workspaceService.sendOwnershipTransferredEmail( + fromUser.email, + { + id: workspaceId, + } + ); + } + + if (toUser) { + await this.workspaceService.sendOwnershipReceivedEmail(toUser.email, { + id: workspaceId, + }); + } + } +} diff --git a/packages/backend/server/src/core/workspaces/index.ts b/packages/backend/server/src/core/workspaces/index.ts index fb5423cb40..0879672f0a 100644 --- a/packages/backend/server/src/core/workspaces/index.ts +++ b/packages/backend/server/src/core/workspaces/index.ts @@ -8,6 +8,7 @@ import { QuotaModule } from '../quota'; import { StorageModule } from '../storage'; import { UserModule } from '../user'; import { WorkspacesController } from './controller'; +import { WorkspaceEvents } from './event'; import { DocHistoryResolver, DocResolver, @@ -37,6 +38,7 @@ import { DocHistoryResolver, WorkspaceBlobResolver, WorkspaceService, + WorkspaceEvents, ], exports: [WorkspaceService], }) diff --git a/packages/backend/server/src/core/workspaces/resolvers/team.ts b/packages/backend/server/src/core/workspaces/resolvers/team.ts index a89e33de0c..3b0fe5d89b 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/team.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/team.ts @@ -14,7 +14,6 @@ import { Cache, EventBus, MemberNotFoundInSpace, - OnEvent, RequestMutex, TooManyRequest, URLHelper, @@ -350,72 +349,4 @@ export class TeamWorkspaceResolver { return new TooManyRequest(); } } - - @OnEvent('workspace.members.reviewRequested') - async onReviewRequested({ - inviteId, - }: Events['workspace.members.reviewRequested']) { - // send review request mail to owner and admin - await this.workspaceService.sendReviewRequestedEmail(inviteId); - } - - @OnEvent('workspace.members.requestApproved') - async onApproveRequest({ - inviteId, - }: Events['workspace.members.requestApproved']) { - // send approve mail - await this.workspaceService.sendReviewApproveEmail(inviteId); - } - - @OnEvent('workspace.members.requestDeclined') - async onDeclineRequest({ - userId, - workspaceId, - }: Events['workspace.members.requestDeclined']) { - const user = await this.models.user.getPublicUser(userId); - // send decline mail - await this.workspaceService.sendReviewDeclinedEmail( - user?.email, - workspaceId - ); - } - - @OnEvent('workspace.members.roleChanged') - async onRoleChanged({ - userId, - workspaceId, - permission, - }: Events['workspace.members.roleChanged']) { - // send role changed mail - await this.workspaceService.sendRoleChangedEmail(userId, { - id: workspaceId, - role: permission, - }); - } - - @OnEvent('workspace.members.ownershipTransferred') - async onOwnerTransferred({ - workspaceId, - from, - to, - }: Events['workspace.members.ownershipTransferred']) { - // send ownership transferred mail - const fromUser = await this.models.user.getPublicUser(from); - const toUser = await this.models.user.getPublicUser(to); - - if (fromUser) { - await this.workspaceService.sendOwnershipTransferredEmail( - fromUser.email, - { - id: workspaceId, - } - ); - } - - if (toUser) { - await this.workspaceService.sendOwnershipReceivedEmail(toUser.email, { - id: workspaceId, - }); - } - } } diff --git a/packages/backend/server/src/plugins/payment/cron.ts b/packages/backend/server/src/plugins/payment/cron.ts index 20fbd10b4b..ebae9d2933 100644 --- a/packages/backend/server/src/plugins/payment/cron.ts +++ b/packages/backend/server/src/plugins/payment/cron.ts @@ -1,9 +1,8 @@ import { Injectable } from '@nestjs/common'; -import { OnEvent } from '@nestjs/event-emitter'; import { Cron, CronExpression } from '@nestjs/schedule'; import { PrismaClient } from '@prisma/client'; -import { EventBus } from '../../base'; +import { EventBus, OnEvent } from '../../base'; import { SubscriptionPlan, SubscriptionRecurring, diff --git a/packages/backend/server/src/plugins/payment/quota.ts b/packages/backend/server/src/plugins/payment/quota.ts index 0294139fe1..2f6e8dabfd 100644 --- a/packages/backend/server/src/plugins/payment/quota.ts +++ b/packages/backend/server/src/plugins/payment/quota.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { OnEvent } from '@nestjs/event-emitter'; +import { OnEvent } from '../../base'; import { PermissionService } from '../../core/permission'; import { WorkspaceService } from '../../core/workspaces/resolvers'; import { Models } from '../../models'; diff --git a/yarn.lock b/yarn.lock index 2dc0d76b60..5610f0956c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -786,7 +786,6 @@ __metadata: "@nestjs/apollo": "npm:^12.2.2" "@nestjs/common": "npm:^10.4.15" "@nestjs/core": "npm:^10.4.15" - "@nestjs/event-emitter": "npm:^2.1.1" "@nestjs/graphql": "npm:^12.2.2" "@nestjs/platform-express": "npm:^10.4.15" "@nestjs/platform-socket.io": "npm:^10.4.15" @@ -834,6 +833,7 @@ __metadata: cookie-parser: "npm:^1.4.7" cross-env: "npm:^7.0.3" dotenv: "npm:^16.4.7" + eventemitter2: "npm:^6.4.9" eventsource-parser: "npm:^3.0.0" express: "npm:^4.21.2" fast-xml-parser: "npm:^4.5.0" @@ -8852,18 +8852,6 @@ __metadata: languageName: node linkType: hard -"@nestjs/event-emitter@npm:^2.1.1": - version: 2.1.1 - resolution: "@nestjs/event-emitter@npm:2.1.1" - dependencies: - eventemitter2: "npm:6.4.9" - peerDependencies: - "@nestjs/common": ^8.0.0 || ^9.0.0 || ^10.0.0 - "@nestjs/core": ^8.0.0 || ^9.0.0 || ^10.0.0 - checksum: 10/79b1ae873515a11b577d9e57b2c605b6aa9830ba60017cc54097c49e543dc23abcfe625425e375fa3525998b87319b1b1c0437a75deabdedf7ebeb02ab487e0c - languageName: node - linkType: hard - "@nestjs/graphql@npm:^12.2.2": version: 12.2.2 resolution: "@nestjs/graphql@npm:12.2.2" @@ -21181,7 +21169,7 @@ __metadata: languageName: node linkType: hard -"eventemitter2@npm:6.4.9, eventemitter2@npm:^6.4.9": +"eventemitter2@npm:^6.4.9": version: 6.4.9 resolution: "eventemitter2@npm:6.4.9" checksum: 10/b829b1c6b11e15926b635092b5ad62b4463d1c928859831dcae606e988cf41893059e3541f5a8209d21d2f15314422ddd4d84d20830b4bf44978608d15b06b08