chore(server): adjust job config (#11405)

This commit is contained in:
forehalo
2025-04-02 09:48:45 +00:00
parent 85d176ce6f
commit 35bea20b80
5 changed files with 43 additions and 54 deletions

View File

@@ -58,12 +58,10 @@ test.before(async () => {
ConfigModule.override({
job: {
worker: {
defaultWorkerOptions: {
// NOTE(@forehalo):
// bullmq will hold the connection to check stalled jobs,
// which will keep the test process alive to timeout.
stalledInterval: 100,
},
// NOTE(@forehalo):
// bullmq will hold the connection to check stalled jobs,
// which will keep the test process alive to timeout.
stalledInterval: 100,
},
},
}),

View File

@@ -1,4 +1,4 @@
import { QueueOptions, WorkerOptions } from 'bullmq';
import { DefaultJobOptions, WorkerOptions } from 'bullmq';
import { defineModuleConfig, JSONSchema } from '../../config';
import { Queue } from './def';
@@ -6,10 +6,8 @@ import { Queue } from './def';
declare global {
interface AppConfigSchema {
job: {
queue: ConfigItem<Omit<QueueOptions, 'connection' | 'telemetry'>>;
worker: ConfigItem<{
defaultWorkerOptions: Omit<WorkerOptions, 'connection' | 'telemetry'>;
}>;
queue: ConfigItem<Omit<DefaultJobOptions, 'connection' | 'telemetry'>>;
worker: ConfigItem<Omit<WorkerOptions, 'connection' | 'telemetry'>>;
queues: {
[key in Queue]: ConfigItem<{
concurrency: number;
@@ -30,15 +28,12 @@ defineModuleConfig('job', {
queue: {
desc: 'The config for job queues',
default: {
prefix: env.testing ? 'affine_job_test' : 'affine_job',
defaultJobOptions: {
attempts: 5,
// should remove job after it's completed, because we will add a new job with the same job id
removeOnComplete: true,
removeOnFail: {
age: 24 * 3600 /* 1 day */,
count: 500,
},
attempts: 5,
// should remove job after it's completed, because we will add a new job with the same job id
removeOnComplete: true,
removeOnFail: {
age: 24 * 3600 /* 1 day */,
count: 500,
},
},
link: 'https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html',
@@ -46,9 +41,7 @@ defineModuleConfig('job', {
worker: {
desc: 'The config for job workers',
default: {
defaultWorkerOptions: {},
},
default: {},
link: 'https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html',
},

View File

@@ -1,7 +1,7 @@
import { getQueueToken } from '@nestjs/bullmq';
import { getQueueToken, getSharedConfigToken } from '@nestjs/bullmq';
import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { Job, Queue as Bullmq, Worker } from 'bullmq';
import { Job, Queue as Bullmq, Worker, WorkerOptions } from 'bullmq';
import { difference, merge } from 'lodash-es';
import { CLS_ID, ClsServiceManager } from 'nestjs-cls';
@@ -118,16 +118,12 @@ export class JobExecutor implements OnModuleDestroy {
async job => {
return await this.run(job.name as JobName, job.data);
},
merge(
{},
this.config.job.queue,
this.config.job.worker.defaultWorkerOptions,
queueOptions,
{
concurrency,
connection: this.redis,
}
)
merge({}, this.config.job.queue, this.config.job.worker, queueOptions, {
prefix: this.ref.get(getSharedConfigToken(), { strict: false })
.prefix,
concurrency,
connection: this.redis,
} as WorkerOptions)
);
worker.on('error', error => {

View File

@@ -2,6 +2,7 @@ import './config';
import { BullModule } from '@nestjs/bullmq';
import { DynamicModule } from '@nestjs/common';
import { type QueueOptions } from 'bullmq';
import { Config } from '../../config';
import { QueueRedis } from '../../redis';
@@ -17,9 +18,19 @@ export class JobModule {
module: JobModule,
imports: [
BullModule.forRootAsync({
useFactory: (config: Config, redis: QueueRedis) => {
useFactory: (config: Config, redis: QueueRedis): QueueOptions => {
let prefix = 'affine_job';
if (env.testing) {
prefix += '_test';
} else if (!env.namespaces.production) {
prefix += '_' + env.NAMESPACE;
}
return {
...config.job.queue,
// NOTE(@forehalo):
// we distinguish jobs by namespace,
// to avoid new jobs been dropped by old deployments
prefix,
defaultJobOptions: config.job.queue,
connection: redis,
};
},