diff --git a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts index af4415bac5..e58d7c3862 100644 --- a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts +++ b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts @@ -3,13 +3,11 @@ import { Injectable } from '@nestjs/common'; import { TestingModule } from '@nestjs/testing'; import test from 'ava'; import { Queue as Bullmq } from 'bullmq'; -import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; import Sinon from 'sinon'; import { createTestingModule } from '../../../../__tests__/utils'; import { ConfigModule } from '../../../config'; import { metrics } from '../../../metrics'; -import { genRequestId } from '../../../utils'; import { JobModule, JobQueue, OnJob } from '..'; import { JobExecutor } from '../executor'; import { JobHandlerScanner } from '../scanner'; @@ -44,12 +42,6 @@ class JobHandlers { async throwJob() { throw new Error('Throw in job handler'); } - - @OnJob('nightly.__test__requestId') - onRequestId() { - const cls = ClsServiceManager.getClsService(); - return cls.getId() ?? genRequestId('job'); - } } test.before(async () => { @@ -111,7 +103,7 @@ test('should add job to queue', async t => { test('should remove job from queue', async t => { const job = await queue.add('nightly.__test__job', { name: 'test' }); - const data = await queue.remove(job.id!, job.name as JobName); + const data = await queue.remove(job.id!, 'nightly.__test__job'); t.deepEqual(data, { name: 'test' }); @@ -197,28 +189,4 @@ test('should be able to record job metrics', async t => { error: true, }); }); - -test('should generate request id', async t => { - const handlers = module.get(JobHandlers); - const spy = Sinon.spy(handlers, 'onRequestId'); - - await executor.run('nightly.__test__requestId', {}); - - t.true(spy.returnValues.some(v => v.includes(':job:'))); - - spy.restore(); -}); - -test('should continuously use request id', async t => { - const handlers = module.get(JobHandlers); - const spy = Sinon.spy(handlers, 'onRequestId'); - - const cls = ClsServiceManager.getClsService(); - await cls.run(async () => { - cls.set(CLS_ID, 'test-request-id'); - await executor.run('nightly.__test__requestId', {}); - }); - t.true(spy.returned('test-request-id')); - spy.restore(); -}); // #endregion diff --git a/packages/backend/server/src/base/job/queue/executor.ts b/packages/backend/server/src/base/job/queue/executor.ts index c6a00eee5b..990a325a30 100644 --- a/packages/backend/server/src/base/job/queue/executor.ts +++ b/packages/backend/server/src/base/job/queue/executor.ts @@ -52,7 +52,10 @@ export class JobExecutor implements OnModuleDestroy { await this.stopWorkers(); } - async run(name: JobName, payload: any): Promise { + async run( + name: T, + payload: Jobs[T] + ): Promise { const ns = namespace(name); const handler = this.scanner.getHandler(name); @@ -63,24 +66,16 @@ export class JobExecutor implements OnModuleDestroy { const fn = wrapCallMetric( async () => { - const cls = ClsServiceManager.getClsService(); - await cls.run({ ifNested: 'reuse' }, async () => { - const requestId = cls.getId(); - if (!requestId) { - cls.set(CLS_ID, genRequestId('job')); - } - - const signature = `[${name}] (${handler.name})`; - try { - this.logger.log(`Job started: ${signature}`); - const ret = await handler.fn(payload); - this.logger.log(`Job finished: ${signature}, signal=${ret}`); - return ret; - } catch (e) { - this.logger.error(`Job failed: ${signature}`, e); - throw e; - } - }); + const signature = `[${name}] (${handler.name})`; + try { + this.logger.log(`Job started: ${signature}`); + const ret = await handler.fn(payload); + this.logger.log(`Job finished: ${signature}, signal=${ret}`); + return ret; + } catch (e) { + this.logger.error(`Job failed: ${signature}`, e); + throw e; + } }, 'queue', 'job_handler', @@ -116,7 +111,22 @@ export class JobExecutor implements OnModuleDestroy { const worker = new Worker( queue, async job => { - return await this.run(job.name as JobName, job.data); + const cls = ClsServiceManager.getClsService(); + let payload: any; + let requestId: string; + + if (job.data.$$requestId) { + requestId = job.data.$$requestId; + payload = job.data.payload; + } else { + requestId = genRequestId('job'); + payload = job.data; + } + + return await cls.run(async () => { + cls.set(CLS_ID, requestId); + return await this.run(job.name as JobName, payload); + }); }, merge({}, this.config.job.queue, this.config.job.worker, queueOptions, { prefix: this.ref.get(getSharedConfigToken(), { strict: false }) diff --git a/packages/backend/server/src/base/job/queue/queue.ts b/packages/backend/server/src/base/job/queue/queue.ts index 9900cfecbe..9d5a0d5240 100644 --- a/packages/backend/server/src/base/job/queue/queue.ts +++ b/packages/backend/server/src/base/job/queue/queue.ts @@ -2,9 +2,16 @@ import { getQueueToken } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { ModuleRef } from '@nestjs/core'; import { Job, JobsOptions, Queue } from 'bullmq'; +import { ClsServiceManager } from 'nestjs-cls'; +import { genRequestId } from '../../utils'; import { namespace } from './def'; +interface JobData { + $$requestId: string; + payload: Jobs[T]; +} + @Injectable() export class JobQueue { private readonly logger = new Logger(JobQueue.name); @@ -14,15 +21,26 @@ export class JobQueue { async add(name: T, payload: Jobs[T], opts?: JobsOptions) { const ns = namespace(name); const queue = this.getQueue(ns); - const job = await queue.add(name, payload, opts); + const job = await queue.add( + name, + { + $$requestId: + ClsServiceManager.getClsService().getId() ?? genRequestId('job'), + payload, + } as JobData, + opts + ); this.logger.log(`Job [${name}] added; id=${job.id}`); return job; } - async remove(jobId: string, jobName: T) { + async remove( + jobId: string, + jobName: T + ): Promise { const ns = namespace(jobName); const queue = this.getQueue(ns); - const job = (await queue.getJob(jobId)) as Job | undefined; + const job = (await queue.getJob(jobId)) as Job> | undefined; if (!job) { return; @@ -31,7 +49,7 @@ export class JobQueue { const removed = await queue.remove(jobId); if (removed) { this.logger.log(`Job ${jobName} removed from queue ${ns}`); - return job.data; + return job.data.payload; } return undefined; @@ -40,7 +58,7 @@ export class JobQueue { async get(jobId: string, jobName: T) { const ns = namespace(jobName); const queue = this.getQueue(ns); - return (await queue.getJob(jobId)) as Job | undefined; + return (await queue.getJob(jobId)) as Job> | undefined; } private getQueue(ns: string): Queue {