diff --git a/.docker/selfhost/schema.json b/.docker/selfhost/schema.json index 05e9d91e24..56ff4eb0d9 100644 --- a/.docker/selfhost/schema.json +++ b/.docker/selfhost/schema.json @@ -56,15 +56,9 @@ "properties": { "apolloDriverConfig": { "type": "object", - "description": "The config for underlying nestjs GraphQL and apollo driver engine.\n@default {\"buildSchemaOptions\":{\"numberScalarMode\":\"integer\"},\"useGlobalPrefix\":true,\"playground\":true,\"introspection\":true,\"sortSchema\":true}\n@link https://docs.nestjs.com/graphql/quick-start", + "description": "The config for underlying nestjs GraphQL and apollo driver engine.\n@default {\"introspection\":true}\n@link https://docs.nestjs.com/graphql/quick-start", "default": { - "buildSchemaOptions": { - "numberScalarMode": "integer" - }, - "useGlobalPrefix": true, - "playground": true, - "introspection": true, - "sortSchema": true + "introspection": true } } } @@ -86,16 +80,13 @@ "properties": { "queue": { "type": "object", - "description": "The config for job queues\n@default {\"prefix\":\"affine_job\",\"defaultJobOptions\":{\"attempts\":5,\"removeOnComplete\":true,\"removeOnFail\":{\"age\":86400,\"count\":500}}}\n@link https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html", + "description": "The config for job queues\n@default {\"attempts\":5,\"removeOnComplete\":true,\"removeOnFail\":{\"age\":86400,\"count\":500}}\n@link https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html", "default": { - "prefix": "affine_job", - "defaultJobOptions": { - "attempts": 5, - "removeOnComplete": true, - "removeOnFail": { - "age": 86400, - "count": 500 - } + "attempts": 5, + "removeOnComplete": true, + "removeOnFail": { + "age": 86400, + "count": 500 } } }, diff --git a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts index 23b1b06f10..af4415bac5 100644 --- a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts +++ b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts @@ -58,12 +58,10 @@ test.before(async () => { ConfigModule.override({ job: { worker: { - defaultWorkerOptions: { - // NOTE(@forehalo): - // bullmq will hold the connection to check stalled jobs, - // which will keep the test process alive to timeout. - stalledInterval: 100, - }, + // NOTE(@forehalo): + // bullmq will hold the connection to check stalled jobs, + // which will keep the test process alive to timeout. + stalledInterval: 100, }, }, }), diff --git a/packages/backend/server/src/base/job/queue/config.ts b/packages/backend/server/src/base/job/queue/config.ts index 51da1bb037..0f32961fce 100644 --- a/packages/backend/server/src/base/job/queue/config.ts +++ b/packages/backend/server/src/base/job/queue/config.ts @@ -1,4 +1,4 @@ -import { QueueOptions, WorkerOptions } from 'bullmq'; +import { DefaultJobOptions, WorkerOptions } from 'bullmq'; import { defineModuleConfig, JSONSchema } from '../../config'; import { Queue } from './def'; @@ -6,10 +6,8 @@ import { Queue } from './def'; declare global { interface AppConfigSchema { job: { - queue: ConfigItem>; - worker: ConfigItem<{ - defaultWorkerOptions: Omit; - }>; + queue: ConfigItem>; + worker: ConfigItem>; queues: { [key in Queue]: ConfigItem<{ concurrency: number; @@ -30,15 +28,12 @@ defineModuleConfig('job', { queue: { desc: 'The config for job queues', default: { - prefix: env.testing ? 'affine_job_test' : 'affine_job', - defaultJobOptions: { - attempts: 5, - // should remove job after it's completed, because we will add a new job with the same job id - removeOnComplete: true, - removeOnFail: { - age: 24 * 3600 /* 1 day */, - count: 500, - }, + attempts: 5, + // should remove job after it's completed, because we will add a new job with the same job id + removeOnComplete: true, + removeOnFail: { + age: 24 * 3600 /* 1 day */, + count: 500, }, }, link: 'https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html', @@ -46,9 +41,7 @@ defineModuleConfig('job', { worker: { desc: 'The config for job workers', - default: { - defaultWorkerOptions: {}, - }, + default: {}, link: 'https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html', }, diff --git a/packages/backend/server/src/base/job/queue/executor.ts b/packages/backend/server/src/base/job/queue/executor.ts index 5e82e49b33..c6a00eee5b 100644 --- a/packages/backend/server/src/base/job/queue/executor.ts +++ b/packages/backend/server/src/base/job/queue/executor.ts @@ -1,7 +1,7 @@ -import { getQueueToken } from '@nestjs/bullmq'; +import { getQueueToken, getSharedConfigToken } from '@nestjs/bullmq'; import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; import { ModuleRef } from '@nestjs/core'; -import { Job, Queue as Bullmq, Worker } from 'bullmq'; +import { Job, Queue as Bullmq, Worker, WorkerOptions } from 'bullmq'; import { difference, merge } from 'lodash-es'; import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; @@ -118,16 +118,12 @@ export class JobExecutor implements OnModuleDestroy { async job => { return await this.run(job.name as JobName, job.data); }, - merge( - {}, - this.config.job.queue, - this.config.job.worker.defaultWorkerOptions, - queueOptions, - { - concurrency, - connection: this.redis, - } - ) + merge({}, this.config.job.queue, this.config.job.worker, queueOptions, { + prefix: this.ref.get(getSharedConfigToken(), { strict: false }) + .prefix, + concurrency, + connection: this.redis, + } as WorkerOptions) ); worker.on('error', error => { diff --git a/packages/backend/server/src/base/job/queue/index.ts b/packages/backend/server/src/base/job/queue/index.ts index 70124a6364..f6f954e4ad 100644 --- a/packages/backend/server/src/base/job/queue/index.ts +++ b/packages/backend/server/src/base/job/queue/index.ts @@ -2,6 +2,7 @@ import './config'; import { BullModule } from '@nestjs/bullmq'; import { DynamicModule } from '@nestjs/common'; +import { type QueueOptions } from 'bullmq'; import { Config } from '../../config'; import { QueueRedis } from '../../redis'; @@ -17,9 +18,19 @@ export class JobModule { module: JobModule, imports: [ BullModule.forRootAsync({ - useFactory: (config: Config, redis: QueueRedis) => { + useFactory: (config: Config, redis: QueueRedis): QueueOptions => { + let prefix = 'affine_job'; + if (env.testing) { + prefix += '_test'; + } else if (!env.namespaces.production) { + prefix += '_' + env.NAMESPACE; + } return { - ...config.job.queue, + // NOTE(@forehalo): + // we distinguish jobs by namespace, + // to avoid new jobs been dropped by old deployments + prefix, + defaultJobOptions: config.job.queue, connection: redis, }; },