diff --git a/packages/backend/server/src/__tests__/event/cluster.spec.ts b/packages/backend/server/src/__tests__/event/cluster.spec.ts index 7c83337012..3e4394c0b4 100644 --- a/packages/backend/server/src/__tests__/event/cluster.spec.ts +++ b/packages/backend/server/src/__tests__/event/cluster.spec.ts @@ -1,5 +1,6 @@ import { INestApplication } from '@nestjs/common'; import ava, { TestFn } from 'ava'; +import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; import Sinon from 'sinon'; import { EventBus } from '../../base'; @@ -11,6 +12,7 @@ const test = ava as TestFn<{ app1: INestApplication; app2: INestApplication; }>; + async function createApp() { const m = await createTestingModule( { @@ -49,14 +51,20 @@ test('should broadcast event to cluster instances', async t => { // app 2 for broadcasting const eventbus2 = app2.get(EventBus); - eventbus2.broadcast('__test__.event', { count: 0 }); + const cls = ClsServiceManager.getClsService(); + cls.run(() => { + cls.set(CLS_ID, 'test-request-id'); + eventbus2.broadcast('__test__.event', { count: 0, requestId: cls.getId() }); + }); // cause the cross instances broadcasting is asynchronization calling // we should wait for the event's arriving before asserting await eventbus1.waitFor('__test__.event'); - t.true(listener.calledOnceWith({ count: 0 })); - t.true(runtimeListener.calledOnceWith({ count: 0 })); + t.true(listener.calledOnceWith({ count: 0, requestId: 'test-request-id' })); + t.true( + runtimeListener.calledOnceWith({ count: 0, requestId: 'test-request-id' }) + ); off(); }); diff --git a/packages/backend/server/src/__tests__/event/provider.ts b/packages/backend/server/src/__tests__/event/provider.ts index 296c55d1bd..4c71744832 100644 --- a/packages/backend/server/src/__tests__/event/provider.ts +++ b/packages/backend/server/src/__tests__/event/provider.ts @@ -4,7 +4,7 @@ import { OnEvent } from '../../base'; declare global { interface Events { - '__test__.event': { count: number }; + '__test__.event': { count: number; requestId?: string }; '__test__.event2': { count: number }; '__test__.throw': { count: number }; } @@ -13,10 +13,15 @@ declare global { @Injectable() export class Listeners { @OnEvent('__test__.event') - onTestEvent({ count }: Events['__test__.event']) { - return { - count: count + 1, - }; + onTestEvent({ count, requestId }: Events['__test__.event']) { + return requestId + ? { + count: count + 1, + requestId, + } + : { + count: count + 1, + }; } @OnEvent('__test__.throw') diff --git a/packages/backend/server/src/base/event/eventbus.ts b/packages/backend/server/src/base/event/eventbus.ts index c7fb0f5429..e1ee65612e 100644 --- a/packages/backend/server/src/base/event/eventbus.ts +++ b/packages/backend/server/src/base/event/eventbus.ts @@ -1,3 +1,5 @@ +import { randomUUID } from 'node:crypto'; + import { applyDecorators, Injectable, @@ -15,6 +17,7 @@ import { WebSocketGateway, WebSocketServer, } from '@nestjs/websockets'; +import { CLS_ID, ClsService } from 'nestjs-cls'; import type { Server, Socket } from 'socket.io'; import { CallMetric } from '../metrics'; @@ -69,7 +72,8 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { constructor( private readonly emitter: EventEmitter2, - private readonly watcher: EventEmitterReadinessWatcher + private readonly watcher: EventEmitterReadinessWatcher, + private readonly cls: ClsService ) {} handleConnection(client: Socket) { @@ -88,9 +92,13 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { events.forEach(event => { // Proxy all events received from server(trigger by `server.serverSideEmit`) // to internal event system - this.server?.on(event, payload => { - this.logger.log(`Server Event: ${event} (Received)`); - this.emit(event, payload); + this.server?.on(event, (payload, requestId?: string) => { + this.cls.run(() => { + requestId = requestId ?? `server_event-${randomUUID()}`; + this.cls.set(CLS_ID, requestId); + this.logger.log(`Server Event: ${event} (Received)`); + this.emit(event, payload); + }); }); }); }) @@ -120,7 +128,7 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { */ broadcast(event: T, payload: Events[T]) { this.logger.log(`Server Event: ${event} (Send)`); - this.server?.serverSideEmit(event, payload); + this.server?.serverSideEmit(event, payload, this.cls.getId()); } on(