chore(server): reschedule doc merging jobs (#11318)

This commit is contained in:
forehalo
2025-04-01 10:57:54 +00:00
parent d38458b733
commit 6276732efc
8 changed files with 90 additions and 50 deletions

View File

@@ -1,6 +1,8 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { TestingModule } from '@nestjs/testing'; import { TestingModule } from '@nestjs/testing';
import test from 'ava'; import test from 'ava';
import { Queue as Bullmq } from 'bullmq';
import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; import { CLS_ID, ClsServiceManager } from 'nestjs-cls';
import Sinon from 'sinon'; import Sinon from 'sinon';
@@ -15,6 +17,7 @@ import { JobHandlerScanner } from '../scanner';
let module: TestingModule; let module: TestingModule;
let queue: JobQueue; let queue: JobQueue;
let executor: JobExecutor; let executor: JobExecutor;
let bullmq: Bullmq;
declare global { declare global {
interface Jobs { interface Jobs {
@@ -62,9 +65,6 @@ test.before(async () => {
stalledInterval: 100, stalledInterval: 100,
}, },
}, },
queue: {
defaultJobOptions: { delay: 1000 },
},
}, },
}), }),
JobModule.forRoot(), JobModule.forRoot(),
@@ -78,13 +78,12 @@ test.before(async () => {
queue = module.get(JobQueue); queue = module.get(JobQueue);
executor = module.get(JobExecutor); executor = module.get(JobExecutor);
bullmq = module.get(getQueueToken('nightly'), { strict: false });
}); });
test.afterEach(async () => { test.beforeEach(async () => {
// @ts-expect-error private api await bullmq.obliterate({ force: true });
const inner = queue.getQueue('nightly'); await bullmq.resume();
await inner.obliterate({ force: true });
await inner.resume();
}); });
test.after.always(async () => { test.after.always(async () => {
@@ -106,25 +105,20 @@ test('should register job handler', async t => {
test('should add job to queue', async t => { test('should add job to queue', async t => {
const job = await queue.add('nightly.__test__job', { name: 'test' }); const job = await queue.add('nightly.__test__job', { name: 'test' });
// @ts-expect-error private api const queuedJob = await queue.get(job.id!, job.name as JobName);
const innerQueue = queue.getQueue('nightly');
const queuedJob = await innerQueue.getJob(job.id!);
t.is(queuedJob.name, job.name); t.is(queuedJob!.name, job.name);
}); });
test('should remove job from queue', async t => { test('should remove job from queue', async t => {
const job = await queue.add('nightly.__test__job', { name: 'test' }); const job = await queue.add('nightly.__test__job', { name: 'test' });
// @ts-expect-error private api
const innerQueue = queue.getQueue('nightly');
const data = await queue.remove(job.id!, job.name as JobName); const data = await queue.remove(job.id!, job.name as JobName);
t.deepEqual(data, { name: 'test' }); t.deepEqual(data, { name: 'test' });
const nullData = await queue.remove(job.id!, job.name as JobName); const nullData = await queue.remove(job.id!, job.name as JobName);
const nullJob = await innerQueue.getJob(job.id!); const nullJob = await bullmq.getJob(job.id!);
t.is(nullData, undefined); t.is(nullData, undefined);
t.is(nullJob, undefined); t.is(nullJob, undefined);
@@ -137,7 +131,6 @@ test('should start workers', async t => {
const worker = executor.workers.get('nightly')!; const worker = executor.workers.get('nightly')!;
t.truthy(worker); t.truthy(worker);
t.true(worker.isRunning());
}); });
test('should dispatch job handler', async t => { test('should dispatch job handler', async t => {

View File

@@ -65,5 +65,7 @@ export function getJobHandlerMetadata(target: any): JobName[] {
} }
export enum JOB_SIGNAL { export enum JOB_SIGNAL {
RETRY = 'retry', Retry = 'retry',
Repeat = 'repeat',
Done = 'done',
} }

View File

@@ -1,5 +1,7 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import { Worker } from 'bullmq'; import { ModuleRef } from '@nestjs/core';
import { Job, Queue as Bullmq, Worker } from 'bullmq';
import { difference, merge } from 'lodash-es'; import { difference, merge } from 'lodash-es';
import { CLS_ID, ClsServiceManager } from 'nestjs-cls'; import { CLS_ID, ClsServiceManager } from 'nestjs-cls';
@@ -19,7 +21,8 @@ export class JobExecutor implements OnModuleDestroy {
constructor( constructor(
private readonly config: Config, private readonly config: Config,
private readonly redis: QueueRedis, private readonly redis: QueueRedis,
private readonly scanner: JobHandlerScanner private readonly scanner: JobHandlerScanner,
private readonly ref: ModuleRef
) {} ) {}
@OnEvent('config.init') @OnEvent('config.init')
@@ -49,7 +52,7 @@ export class JobExecutor implements OnModuleDestroy {
await this.stopWorkers(); await this.stopWorkers();
} }
async run(name: JobName, payload: any) { async run(name: JobName, payload: any): Promise<JOB_SIGNAL | undefined> {
const ns = namespace(name); const ns = namespace(name);
const handler = this.scanner.getHandler(name); const handler = this.scanner.getHandler(name);
@@ -70,13 +73,9 @@ export class JobExecutor implements OnModuleDestroy {
const signature = `[${name}] (${handler.name})`; const signature = `[${name}] (${handler.name})`;
try { try {
this.logger.debug(`Job started: ${signature}`); this.logger.debug(`Job started: ${signature}`);
const result = await handler.fn(payload); const ret = await handler.fn(payload);
if (result === JOB_SIGNAL.RETRY) {
throw new Error(`Manually job retry`);
}
this.logger.debug(`Job finished: ${signature}`); this.logger.debug(`Job finished: ${signature}`);
return ret;
} catch (e) { } catch (e) {
this.logger.error(`Job failed: ${signature}`, e); this.logger.error(`Job failed: ${signature}`, e);
throw e; throw e;
@@ -94,7 +93,7 @@ export class JobExecutor implements OnModuleDestroy {
const activeJobs = metrics.queue.counter('active_jobs'); const activeJobs = metrics.queue.counter('active_jobs');
activeJobs.add(1, { queue: ns }); activeJobs.add(1, { queue: ns });
try { try {
await fn(); return await fn();
} finally { } finally {
activeJobs.add(-1, { queue: ns }); activeJobs.add(-1, { queue: ns });
} }
@@ -117,7 +116,7 @@ export class JobExecutor implements OnModuleDestroy {
const worker = new Worker( const worker = new Worker(
queue, queue,
async job => { async job => {
await this.run(job.name as JobName, job.data); return await this.run(job.name as JobName, job.data);
}, },
merge( merge(
{}, {},
@@ -135,6 +134,12 @@ export class JobExecutor implements OnModuleDestroy {
this.logger.error(`Queue Worker [${queue}] error`, error); this.logger.error(`Queue Worker [${queue}] error`, error);
}); });
worker.on('completed', (job, result) => {
this.handleJobReturn(job, result).catch(() => {
/* noop */
});
});
this.logger.log( this.logger.log(
`Queue Worker [${queue}] started; concurrency=${concurrency};` `Queue Worker [${queue}] started; concurrency=${concurrency};`
); );
@@ -143,6 +148,16 @@ export class JobExecutor implements OnModuleDestroy {
} }
} }
async handleJobReturn(job: Job, result: JOB_SIGNAL) {
if (result === JOB_SIGNAL.Repeat || result === JOB_SIGNAL.Retry) {
try {
await this.getQueue(job.name).add(job.name, job.data, job.opts);
} catch (e) {
this.logger.error(`Failed to add job [${job.name}]`, e);
}
}
}
private async stopWorkers() { private async stopWorkers() {
await Promise.all( await Promise.all(
Array.from(this.workers.values()).map(async worker => { Array.from(this.workers.values()).map(async worker => {
@@ -150,4 +165,8 @@ export class JobExecutor implements OnModuleDestroy {
}) })
); );
} }
private getQueue(ns: string): Bullmq {
return this.ref.get(getQueueToken(ns), { strict: false });
}
} }

View File

@@ -37,6 +37,12 @@ export class JobQueue {
return undefined; return undefined;
} }
async get<T extends JobName>(jobId: string, jobName: T) {
const ns = namespace(jobName);
const queue = this.getQueue(ns);
return (await queue.getJob(jobId)) as Job<Jobs[T]> | undefined;
}
private getQueue(ns: string): Queue { private getQueue(ns: string): Queue {
return this.moduleRef.get(getQueueToken(ns), { strict: false }); return this.moduleRef.get(getQueueToken(ns), { strict: false });
} }

View File

@@ -2,7 +2,8 @@ import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule'; import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client'; import { PrismaClient } from '@prisma/client';
import { JobQueue, metrics, OnJob } from '../../base'; import { JOB_SIGNAL, JobQueue, metrics, OnJob } from '../../base';
import { Models } from '../../models';
import { PgWorkspaceDocStorageAdapter } from '../doc'; import { PgWorkspaceDocStorageAdapter } from '../doc';
declare global { declare global {
@@ -20,7 +21,8 @@ export class DocServiceCronJob {
constructor( constructor(
private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly prisma: PrismaClient, private readonly prisma: PrismaClient,
private readonly job: JobQueue private readonly job: JobQueue,
private readonly models: Models
) {} ) {}
@OnJob('doc.mergePendingDocUpdates') @OnJob('doc.mergePendingDocUpdates')
@@ -29,30 +31,40 @@ export class DocServiceCronJob {
docId, docId,
}: Jobs['doc.mergePendingDocUpdates']) { }: Jobs['doc.mergePendingDocUpdates']) {
await this.workspace.getDoc(workspaceId, docId); await this.workspace.getDoc(workspaceId, docId);
const updatesLeft = await this.models.doc.getUpdateCount(
workspaceId,
docId
);
return updatesLeft > 100 ? JOB_SIGNAL.Repeat : JOB_SIGNAL.Done;
} }
@Cron(CronExpression.EVERY_10_SECONDS) @Cron(CronExpression.EVERY_30_SECONDS)
async schedule() { async schedule() {
const group = await this.prisma.update.groupBy({ const group = await this.models.doc.groupedUpdatesCount();
by: ['workspaceId', 'id'],
_count: true,
});
for (const update of group) { for (const update of group) {
if (update._count > 100) { const jobId = `doc:merge-pending-updates:${update.workspaceId}:${update.id}`;
await this.job.add(
'doc.mergePendingDocUpdates', const job = await this.job.get(jobId, 'doc.mergePendingDocUpdates');
{
workspaceId: update.workspaceId, if (job && job.opts.priority !== 0 && update._count > 100) {
docId: update.id, // reschedule long pending doc with highest priority, 0 is the highest priority
}, await this.job.remove(jobId, 'doc.mergePendingDocUpdates');
{
jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`,
priority: update._count,
delay: 0,
}
);
} }
await this.job.add(
'doc.mergePendingDocUpdates',
{
workspaceId: update.workspaceId,
docId: update.id,
},
{
jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`,
priority: update._count > 100 ? 0 : 100,
delay: 0,
}
);
} }
} }

View File

@@ -105,6 +105,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
// keep it simple to let all update merged in one job // keep it simple to let all update merged in one job
jobId: `doc:merge-pending-updates:${workspaceId}:${docId}`, jobId: `doc:merge-pending-updates:${workspaceId}:${docId}`,
delay: 30 * 1000 /* 30s */, delay: 30 * 1000 /* 30s */,
priority: 100,
} }
); );
turn++; turn++;

View File

@@ -101,7 +101,7 @@ export class MailJob {
...options, ...options,
}); });
return result === false ? JOB_SIGNAL.RETRY : undefined; return result === false ? JOB_SIGNAL.Retry : undefined;
} }
private async fetchWorkspaceProps(workspaceId: string) { private async fetchWorkspaceProps(workspaceId: string) {

View File

@@ -88,6 +88,13 @@ export class DocModel extends BaseModel {
return await this.db.update.count(); return await this.db.update.count();
} }
async groupedUpdatesCount() {
return await this.db.update.groupBy({
by: ['workspaceId', 'id'],
_count: true,
});
}
/** /**
* Delete updates by workspaceId, docId, and createdAts. * Delete updates by workspaceId, docId, and createdAts.
*/ */