diff --git a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts index e0c307a59d..23b1b06f10 100644 --- a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts +++ b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts @@ -1,6 +1,8 @@ +import { getQueueToken } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; import { TestingModule } from '@nestjs/testing'; import test from 'ava'; +import { Queue as Bullmq } from 'bullmq'; import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; import Sinon from 'sinon'; @@ -15,6 +17,7 @@ import { JobHandlerScanner } from '../scanner'; let module: TestingModule; let queue: JobQueue; let executor: JobExecutor; +let bullmq: Bullmq; declare global { interface Jobs { @@ -62,9 +65,6 @@ test.before(async () => { stalledInterval: 100, }, }, - queue: { - defaultJobOptions: { delay: 1000 }, - }, }, }), JobModule.forRoot(), @@ -78,13 +78,12 @@ test.before(async () => { queue = module.get(JobQueue); executor = module.get(JobExecutor); + bullmq = module.get(getQueueToken('nightly'), { strict: false }); }); -test.afterEach(async () => { - // @ts-expect-error private api - const inner = queue.getQueue('nightly'); - await inner.obliterate({ force: true }); - await inner.resume(); +test.beforeEach(async () => { + await bullmq.obliterate({ force: true }); + await bullmq.resume(); }); test.after.always(async () => { @@ -106,25 +105,20 @@ test('should register job handler', async t => { test('should add job to queue', async t => { const job = await queue.add('nightly.__test__job', { name: 'test' }); - // @ts-expect-error private api - const innerQueue = queue.getQueue('nightly'); - const queuedJob = await innerQueue.getJob(job.id!); + const queuedJob = await queue.get(job.id!, job.name as JobName); - t.is(queuedJob.name, job.name); + t.is(queuedJob!.name, job.name); }); test('should remove job from queue', async t => { const job = await queue.add('nightly.__test__job', { name: 'test' }); - // @ts-expect-error private api - const innerQueue = queue.getQueue('nightly'); - const data = await queue.remove(job.id!, job.name as JobName); t.deepEqual(data, { name: 'test' }); const nullData = await queue.remove(job.id!, job.name as JobName); - const nullJob = await innerQueue.getJob(job.id!); + const nullJob = await bullmq.getJob(job.id!); t.is(nullData, undefined); t.is(nullJob, undefined); @@ -137,7 +131,6 @@ test('should start workers', async t => { const worker = executor.workers.get('nightly')!; t.truthy(worker); - t.true(worker.isRunning()); }); test('should dispatch job handler', async t => { diff --git a/packages/backend/server/src/base/job/queue/def.ts b/packages/backend/server/src/base/job/queue/def.ts index c5994fb7f9..d1f2aa60eb 100644 --- a/packages/backend/server/src/base/job/queue/def.ts +++ b/packages/backend/server/src/base/job/queue/def.ts @@ -65,5 +65,7 @@ export function getJobHandlerMetadata(target: any): JobName[] { } export enum JOB_SIGNAL { - RETRY = 'retry', + Retry = 'retry', + Repeat = 'repeat', + Done = 'done', } diff --git a/packages/backend/server/src/base/job/queue/executor.ts b/packages/backend/server/src/base/job/queue/executor.ts index 08f4003a27..8a06bc3608 100644 --- a/packages/backend/server/src/base/job/queue/executor.ts +++ b/packages/backend/server/src/base/job/queue/executor.ts @@ -1,5 +1,7 @@ +import { getQueueToken } from '@nestjs/bullmq'; import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; -import { Worker } from 'bullmq'; +import { ModuleRef } from '@nestjs/core'; +import { Job, Queue as Bullmq, Worker } from 'bullmq'; import { difference, merge } from 'lodash-es'; import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; @@ -19,7 +21,8 @@ export class JobExecutor implements OnModuleDestroy { constructor( private readonly config: Config, private readonly redis: QueueRedis, - private readonly scanner: JobHandlerScanner + private readonly scanner: JobHandlerScanner, + private readonly ref: ModuleRef ) {} @OnEvent('config.init') @@ -49,7 +52,7 @@ export class JobExecutor implements OnModuleDestroy { await this.stopWorkers(); } - async run(name: JobName, payload: any) { + async run(name: JobName, payload: any): Promise { const ns = namespace(name); const handler = this.scanner.getHandler(name); @@ -70,13 +73,9 @@ export class JobExecutor implements OnModuleDestroy { const signature = `[${name}] (${handler.name})`; try { this.logger.debug(`Job started: ${signature}`); - const result = await handler.fn(payload); - - if (result === JOB_SIGNAL.RETRY) { - throw new Error(`Manually job retry`); - } - + const ret = await handler.fn(payload); this.logger.debug(`Job finished: ${signature}`); + return ret; } catch (e) { this.logger.error(`Job failed: ${signature}`, e); throw e; @@ -94,7 +93,7 @@ export class JobExecutor implements OnModuleDestroy { const activeJobs = metrics.queue.counter('active_jobs'); activeJobs.add(1, { queue: ns }); try { - await fn(); + return await fn(); } finally { activeJobs.add(-1, { queue: ns }); } @@ -117,7 +116,7 @@ export class JobExecutor implements OnModuleDestroy { const worker = new Worker( queue, async job => { - await this.run(job.name as JobName, job.data); + return await this.run(job.name as JobName, job.data); }, merge( {}, @@ -135,6 +134,12 @@ export class JobExecutor implements OnModuleDestroy { this.logger.error(`Queue Worker [${queue}] error`, error); }); + worker.on('completed', (job, result) => { + this.handleJobReturn(job, result).catch(() => { + /* noop */ + }); + }); + this.logger.log( `Queue Worker [${queue}] started; concurrency=${concurrency};` ); @@ -143,6 +148,16 @@ export class JobExecutor implements OnModuleDestroy { } } + async handleJobReturn(job: Job, result: JOB_SIGNAL) { + if (result === JOB_SIGNAL.Repeat || result === JOB_SIGNAL.Retry) { + try { + await this.getQueue(job.name).add(job.name, job.data, job.opts); + } catch (e) { + this.logger.error(`Failed to add job [${job.name}]`, e); + } + } + } + private async stopWorkers() { await Promise.all( Array.from(this.workers.values()).map(async worker => { @@ -150,4 +165,8 @@ export class JobExecutor implements OnModuleDestroy { }) ); } + + private getQueue(ns: string): Bullmq { + return this.ref.get(getQueueToken(ns), { strict: false }); + } } diff --git a/packages/backend/server/src/base/job/queue/queue.ts b/packages/backend/server/src/base/job/queue/queue.ts index be98195249..2515a55b0d 100644 --- a/packages/backend/server/src/base/job/queue/queue.ts +++ b/packages/backend/server/src/base/job/queue/queue.ts @@ -37,6 +37,12 @@ export class JobQueue { return undefined; } + async get(jobId: string, jobName: T) { + const ns = namespace(jobName); + const queue = this.getQueue(ns); + return (await queue.getJob(jobId)) as Job | undefined; + } + private getQueue(ns: string): Queue { return this.moduleRef.get(getQueueToken(ns), { strict: false }); } diff --git a/packages/backend/server/src/core/doc-service/job.ts b/packages/backend/server/src/core/doc-service/job.ts index f9f1e32cd8..af4809fa3d 100644 --- a/packages/backend/server/src/core/doc-service/job.ts +++ b/packages/backend/server/src/core/doc-service/job.ts @@ -2,7 +2,8 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { PrismaClient } from '@prisma/client'; -import { JobQueue, metrics, OnJob } from '../../base'; +import { JOB_SIGNAL, JobQueue, metrics, OnJob } from '../../base'; +import { Models } from '../../models'; import { PgWorkspaceDocStorageAdapter } from '../doc'; declare global { @@ -20,7 +21,8 @@ export class DocServiceCronJob { constructor( private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly prisma: PrismaClient, - private readonly job: JobQueue + private readonly job: JobQueue, + private readonly models: Models ) {} @OnJob('doc.mergePendingDocUpdates') @@ -29,30 +31,40 @@ export class DocServiceCronJob { docId, }: Jobs['doc.mergePendingDocUpdates']) { await this.workspace.getDoc(workspaceId, docId); + const updatesLeft = await this.models.doc.getUpdateCount( + workspaceId, + docId + ); + + return updatesLeft > 100 ? JOB_SIGNAL.Repeat : JOB_SIGNAL.Done; } - @Cron(CronExpression.EVERY_10_SECONDS) + @Cron(CronExpression.EVERY_30_SECONDS) async schedule() { - const group = await this.prisma.update.groupBy({ - by: ['workspaceId', 'id'], - _count: true, - }); + const group = await this.models.doc.groupedUpdatesCount(); for (const update of group) { - if (update._count > 100) { - await this.job.add( - 'doc.mergePendingDocUpdates', - { - workspaceId: update.workspaceId, - docId: update.id, - }, - { - jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`, - priority: update._count, - delay: 0, - } - ); + const jobId = `doc:merge-pending-updates:${update.workspaceId}:${update.id}`; + + const job = await this.job.get(jobId, 'doc.mergePendingDocUpdates'); + + if (job && job.opts.priority !== 0 && update._count > 100) { + // reschedule long pending doc with highest priority, 0 is the highest priority + await this.job.remove(jobId, 'doc.mergePendingDocUpdates'); } + + await this.job.add( + 'doc.mergePendingDocUpdates', + { + workspaceId: update.workspaceId, + docId: update.id, + }, + { + jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`, + priority: update._count > 100 ? 0 : 100, + delay: 0, + } + ); } } diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts index bed729ab77..08b6552c9b 100644 --- a/packages/backend/server/src/core/doc/adapters/workspace.ts +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -105,6 +105,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { // keep it simple to let all update merged in one job jobId: `doc:merge-pending-updates:${workspaceId}:${docId}`, delay: 30 * 1000 /* 30s */, + priority: 100, } ); turn++; diff --git a/packages/backend/server/src/core/mail/job.ts b/packages/backend/server/src/core/mail/job.ts index aa99ca6f0e..b2b7ef3da1 100644 --- a/packages/backend/server/src/core/mail/job.ts +++ b/packages/backend/server/src/core/mail/job.ts @@ -101,7 +101,7 @@ export class MailJob { ...options, }); - return result === false ? JOB_SIGNAL.RETRY : undefined; + return result === false ? JOB_SIGNAL.Retry : undefined; } private async fetchWorkspaceProps(workspaceId: string) { diff --git a/packages/backend/server/src/models/doc.ts b/packages/backend/server/src/models/doc.ts index eb489653c3..63a4cee422 100644 --- a/packages/backend/server/src/models/doc.ts +++ b/packages/backend/server/src/models/doc.ts @@ -88,6 +88,13 @@ export class DocModel extends BaseModel { return await this.db.update.count(); } + async groupedUpdatesCount() { + return await this.db.update.groupBy({ + by: ['workspaceId', 'id'], + _count: true, + }); + } + /** * Delete updates by workspaceId, docId, and createdAts. */