chore(server): never reuse cls id for job handler (#11449)

This commit is contained in:
forehalo
2025-04-03 12:26:02 +00:00
parent c8d22d97d5
commit 6939e80827
3 changed files with 54 additions and 58 deletions

View File

@@ -3,13 +3,11 @@ import { Injectable } from '@nestjs/common';
import { TestingModule } from '@nestjs/testing';
import test from 'ava';
import { Queue as Bullmq } from 'bullmq';
import { CLS_ID, ClsServiceManager } from 'nestjs-cls';
import Sinon from 'sinon';
import { createTestingModule } from '../../../../__tests__/utils';
import { ConfigModule } from '../../../config';
import { metrics } from '../../../metrics';
import { genRequestId } from '../../../utils';
import { JobModule, JobQueue, OnJob } from '..';
import { JobExecutor } from '../executor';
import { JobHandlerScanner } from '../scanner';
@@ -44,12 +42,6 @@ class JobHandlers {
async throwJob() {
throw new Error('Throw in job handler');
}
@OnJob('nightly.__test__requestId')
onRequestId() {
const cls = ClsServiceManager.getClsService();
return cls.getId() ?? genRequestId('job');
}
}
test.before(async () => {
@@ -111,7 +103,7 @@ test('should add job to queue', async t => {
test('should remove job from queue', async t => {
const job = await queue.add('nightly.__test__job', { name: 'test' });
const data = await queue.remove(job.id!, job.name as JobName);
const data = await queue.remove(job.id!, 'nightly.__test__job');
t.deepEqual(data, { name: 'test' });
@@ -197,28 +189,4 @@ test('should be able to record job metrics', async t => {
error: true,
});
});
test('should generate request id', async t => {
const handlers = module.get(JobHandlers);
const spy = Sinon.spy(handlers, 'onRequestId');
await executor.run('nightly.__test__requestId', {});
t.true(spy.returnValues.some(v => v.includes(':job:')));
spy.restore();
});
test('should continuously use request id', async t => {
const handlers = module.get(JobHandlers);
const spy = Sinon.spy(handlers, 'onRequestId');
const cls = ClsServiceManager.getClsService();
await cls.run(async () => {
cls.set(CLS_ID, 'test-request-id');
await executor.run('nightly.__test__requestId', {});
});
t.true(spy.returned('test-request-id'));
spy.restore();
});
// #endregion

View File

@@ -52,7 +52,10 @@ export class JobExecutor implements OnModuleDestroy {
await this.stopWorkers();
}
async run(name: JobName, payload: any): Promise<JOB_SIGNAL | undefined> {
async run<T extends JobName>(
name: T,
payload: Jobs[T]
): Promise<JOB_SIGNAL | undefined> {
const ns = namespace(name);
const handler = this.scanner.getHandler(name);
@@ -63,24 +66,16 @@ export class JobExecutor implements OnModuleDestroy {
const fn = wrapCallMetric(
async () => {
const cls = ClsServiceManager.getClsService();
await cls.run({ ifNested: 'reuse' }, async () => {
const requestId = cls.getId();
if (!requestId) {
cls.set(CLS_ID, genRequestId('job'));
}
const signature = `[${name}] (${handler.name})`;
try {
this.logger.log(`Job started: ${signature}`);
const ret = await handler.fn(payload);
this.logger.log(`Job finished: ${signature}, signal=${ret}`);
return ret;
} catch (e) {
this.logger.error(`Job failed: ${signature}`, e);
throw e;
}
});
const signature = `[${name}] (${handler.name})`;
try {
this.logger.log(`Job started: ${signature}`);
const ret = await handler.fn(payload);
this.logger.log(`Job finished: ${signature}, signal=${ret}`);
return ret;
} catch (e) {
this.logger.error(`Job failed: ${signature}`, e);
throw e;
}
},
'queue',
'job_handler',
@@ -116,7 +111,22 @@ export class JobExecutor implements OnModuleDestroy {
const worker = new Worker(
queue,
async job => {
return await this.run(job.name as JobName, job.data);
const cls = ClsServiceManager.getClsService();
let payload: any;
let requestId: string;
if (job.data.$$requestId) {
requestId = job.data.$$requestId;
payload = job.data.payload;
} else {
requestId = genRequestId('job');
payload = job.data;
}
return await cls.run(async () => {
cls.set(CLS_ID, requestId);
return await this.run(job.name as JobName, payload);
});
},
merge({}, this.config.job.queue, this.config.job.worker, queueOptions, {
prefix: this.ref.get(getSharedConfigToken(), { strict: false })

View File

@@ -2,9 +2,16 @@ import { getQueueToken } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { Job, JobsOptions, Queue } from 'bullmq';
import { ClsServiceManager } from 'nestjs-cls';
import { genRequestId } from '../../utils';
import { namespace } from './def';
interface JobData<T extends JobName> {
$$requestId: string;
payload: Jobs[T];
}
@Injectable()
export class JobQueue {
private readonly logger = new Logger(JobQueue.name);
@@ -14,15 +21,26 @@ export class JobQueue {
async add<T extends JobName>(name: T, payload: Jobs[T], opts?: JobsOptions) {
const ns = namespace(name);
const queue = this.getQueue(ns);
const job = await queue.add(name, payload, opts);
const job = await queue.add(
name,
{
$$requestId:
ClsServiceManager.getClsService().getId() ?? genRequestId('job'),
payload,
} as JobData<T>,
opts
);
this.logger.log(`Job [${name}] added; id=${job.id}`);
return job;
}
async remove<T extends JobName>(jobId: string, jobName: T) {
async remove<T extends JobName>(
jobId: string,
jobName: T
): Promise<Jobs[T] | undefined> {
const ns = namespace(jobName);
const queue = this.getQueue(ns);
const job = (await queue.getJob(jobId)) as Job<Jobs[T]> | undefined;
const job = (await queue.getJob(jobId)) as Job<JobData<T>> | undefined;
if (!job) {
return;
@@ -31,7 +49,7 @@ export class JobQueue {
const removed = await queue.remove(jobId);
if (removed) {
this.logger.log(`Job ${jobName} removed from queue ${ns}`);
return job.data;
return job.data.payload;
}
return undefined;
@@ -40,7 +58,7 @@ export class JobQueue {
async get<T extends JobName>(jobId: string, jobName: T) {
const ns = namespace(jobName);
const queue = this.getQueue(ns);
return (await queue.getJob(jobId)) as Job<Jobs[T]> | undefined;
return (await queue.getJob(jobId)) as Job<JobData<T>> | undefined;
}
private getQueue(ns: string): Queue {