feat(server): job system (#10134)

This commit is contained in:
forehalo
2025-02-18 05:41:56 +00:00
parent f6a86c10fe
commit cb895d4cb0
26 changed files with 1045 additions and 131 deletions

View File

@@ -29,6 +29,7 @@
"@nestjs-cls/transactional": "^2.4.4", "@nestjs-cls/transactional": "^2.4.4",
"@nestjs-cls/transactional-adapter-prisma": "^1.2.7", "@nestjs-cls/transactional-adapter-prisma": "^1.2.7",
"@nestjs/apollo": "^12.2.2", "@nestjs/apollo": "^12.2.2",
"@nestjs/bullmq": "^10.2.3",
"@nestjs/common": "^10.4.15", "@nestjs/common": "^10.4.15",
"@nestjs/core": "^10.4.15", "@nestjs/core": "^10.4.15",
"@nestjs/graphql": "^12.2.2", "@nestjs/graphql": "^12.2.2",
@@ -59,6 +60,7 @@
"@prisma/instrumentation": "^5.22.0", "@prisma/instrumentation": "^5.22.0",
"@react-email/components": "0.0.33", "@react-email/components": "0.0.33",
"@socket.io/redis-adapter": "^8.3.0", "@socket.io/redis-adapter": "^8.3.0",
"bullmq": "^5.40.2",
"cookie-parser": "^1.4.7", "cookie-parser": "^1.4.7",
"dotenv": "^16.4.7", "dotenv": "^16.4.7",
"eventemitter2": "^6.4.9", "eventemitter2": "^6.4.9",

View File

@@ -14,13 +14,8 @@ test.before('start app', async t => {
// @ts-expect-error override // @ts-expect-error override
AFFiNE.flavor = { AFFiNE.flavor = {
type: 'doc', type: 'doc',
allinone: false,
graphql: false,
sync: false,
renderer: false,
doc: true, doc: true,
script: false, } as typeof AFFiNE.flavor;
} satisfies typeof AFFiNE.flavor;
const app = await createTestingApp({ const app = await createTestingApp({
imports: [buildAppModule()], imports: [buildAppModule()],
}); });

View File

@@ -15,13 +15,8 @@ test.before('start app', async t => {
// @ts-expect-error override // @ts-expect-error override
AFFiNE.flavor = { AFFiNE.flavor = {
type: 'graphql', type: 'graphql',
allinone: false,
graphql: true, graphql: true,
sync: false, } as typeof AFFiNE.flavor;
renderer: false,
doc: false,
script: false,
} satisfies typeof AFFiNE.flavor;
const app = await createTestingApp({ const app = await createTestingApp({
imports: [buildAppModule()], imports: [buildAppModule()],
}); });

View File

@@ -14,13 +14,8 @@ test.before('start app', async t => {
// @ts-expect-error override // @ts-expect-error override
AFFiNE.flavor = { AFFiNE.flavor = {
type: 'renderer', type: 'renderer',
allinone: false,
graphql: false,
sync: false,
renderer: true, renderer: true,
doc: false, } as typeof AFFiNE.flavor;
script: false,
} satisfies typeof AFFiNE.flavor;
const app = await createTestingApp({ const app = await createTestingApp({
imports: [buildAppModule()], imports: [buildAppModule()],
}); });

View File

@@ -14,13 +14,8 @@ test.before('start app', async t => {
// @ts-expect-error override // @ts-expect-error override
AFFiNE.flavor = { AFFiNE.flavor = {
type: 'sync', type: 'sync',
allinone: false,
graphql: false,
sync: true, sync: true,
renderer: false, } as typeof AFFiNE.flavor;
doc: false,
script: false,
} satisfies typeof AFFiNE.flavor;
const app = await createTestingApp({ const app = await createTestingApp({
imports: [buildAppModule()], imports: [buildAppModule()],
}); });

View File

@@ -18,6 +18,7 @@ import {
getOptionalModuleMetadata, getOptionalModuleMetadata,
getRequestIdFromHost, getRequestIdFromHost,
getRequestIdFromRequest, getRequestIdFromRequest,
ScannerModule,
} from './base'; } from './base';
import { CacheModule } from './base/cache'; import { CacheModule } from './base/cache';
import { AFFiNEConfig, ConfigModule, mergeConfigOverride } from './base/config'; import { AFFiNEConfig, ConfigModule, mergeConfigOverride } from './base/config';
@@ -25,6 +26,7 @@ import { ErrorModule } from './base/error';
import { EventModule } from './base/event'; import { EventModule } from './base/event';
import { GqlModule } from './base/graphql'; import { GqlModule } from './base/graphql';
import { HelpersModule } from './base/helpers'; import { HelpersModule } from './base/helpers';
import { JobModule } from './base/job';
import { LoggerModule } from './base/logger'; import { LoggerModule } from './base/logger';
import { MailModule } from './base/mailer'; import { MailModule } from './base/mailer';
import { MetricsModule } from './base/metrics'; import { MetricsModule } from './base/metrics';
@@ -89,6 +91,7 @@ export const FunctionalityModules = [
}), }),
ConfigModule.forRoot(), ConfigModule.forRoot(),
RuntimeModule, RuntimeModule,
ScannerModule,
EventModule, EventModule,
RedisModule, RedisModule,
CacheModule, CacheModule,
@@ -102,6 +105,7 @@ export const FunctionalityModules = [
ErrorModule, ErrorModule,
LoggerModule, LoggerModule,
WebSocketModule, WebSocketModule,
JobModule.forRoot(),
]; ];
function filterOptionalModule( function filterOptionalModule(

View File

@@ -1,3 +1,7 @@
import { OnOptions } from 'eventemitter2';
import { PushMetadata, sliceMetadata } from '../nestjs';
declare global { declare global {
/** /**
* Event definitions can be extended by * Event definitions can be extended by
@@ -16,3 +20,29 @@ declare global {
} }
export type EventName = keyof Events; export type EventName = keyof Events;
export const EVENT_LISTENER_METADATA = Symbol('event_listener');
interface EventHandlerMetadata {
namespace: string;
event: EventName;
opts?: OnOptions;
}
export interface EventOptions extends OnOptions {
prepend?: boolean;
name?: string;
suppressError?: boolean;
}
export const OnEvent = (event: EventName, opts?: EventOptions) => {
const namespace = event.split('.')[0];
return PushMetadata<EventHandlerMetadata>(EVENT_LISTENER_METADATA, {
namespace,
event,
opts,
});
};
export function getEventHandlerMetadata(target: any): EventHandlerMetadata[] {
return sliceMetadata<EventHandlerMetadata>(EVENT_LISTENER_METADATA, target);
}

View File

@@ -4,42 +4,20 @@ import {
OnApplicationBootstrap, OnApplicationBootstrap,
OnModuleInit, OnModuleInit,
} from '@nestjs/common'; } from '@nestjs/common';
import { DiscoveryService, MetadataScanner } from '@nestjs/core';
import { import {
OnGatewayConnection, OnGatewayConnection,
WebSocketGateway, WebSocketGateway,
WebSocketServer, WebSocketServer,
} from '@nestjs/websockets'; } from '@nestjs/websockets';
import EventEmitter2, { type OnOptions } from 'eventemitter2'; import EventEmitter2 from 'eventemitter2';
import { once } from 'lodash-es';
import { CLS_ID, ClsService, ClsServiceManager } from 'nestjs-cls'; import { CLS_ID, ClsService, ClsServiceManager } from 'nestjs-cls';
import type { Server, Socket } from 'socket.io'; import type { Server, Socket } from 'socket.io';
import { wrapCallMetric } from '../metrics'; import { wrapCallMetric } from '../metrics';
import { PushMetadata, sliceMetadata } from '../nestjs';
import { genRequestId } from '../utils'; import { genRequestId } from '../utils';
import type { EventName } from './def'; import { type EventName, type EventOptions } from './def';
import { EventHandlerScanner } from './scanner';
const EVENT_LISTENER_METADATA = Symbol('event_listener');
interface EventHandlerMetadata {
namespace: string;
event: EventName;
opts?: OnOptions;
}
interface EventOptions extends OnOptions {
prepend?: boolean;
name?: string;
suppressError?: boolean;
}
export const OnEvent = (event: EventName, opts?: EventOptions) => {
const namespace = event.split('.')[0];
return PushMetadata<EventHandlerMetadata>(EVENT_LISTENER_METADATA, {
namespace,
event,
opts,
});
};
/** /**
* We use socket.io system to auto pub/sub on server to server broadcast events * We use socket.io system to auto pub/sub on server to server broadcast events
@@ -59,8 +37,7 @@ export class EventBus
constructor( constructor(
private readonly emitter: EventEmitter2, private readonly emitter: EventEmitter2,
private readonly cls: ClsService, private readonly cls: ClsService,
private readonly discovery: DiscoveryService, private readonly scanner: EventHandlerScanner
private readonly scanner: MetadataScanner
) {} ) {}
handleConnection(client: Socket) { handleConnection(client: Socket) {
@@ -119,13 +96,14 @@ export class EventBus
) { ) {
const namespace = event.split('.')[0]; const namespace = event.split('.')[0];
const { name, prepend, suppressError } = opts; const { name, prepend, suppressError } = opts;
let signature = name ?? listener.name ?? 'anonymous fn'; const handlerName = name ?? listener.name ?? 'anonymous fn';
let signature = `[${event}] (${handlerName})`;
const add = prepend ? this.emitter.prependListener : this.emitter.on; const add = prepend ? this.emitter.prependListener : this.emitter.on;
const handler = wrapCallMetric( const handler = wrapCallMetric(
async (payload: any) => { async (payload: any) => {
this.logger.verbose(`Handle event [${event}] (${signature})`); this.logger.verbose(`Handle event ${signature}`);
const cls = ClsServiceManager.getClsService(); const cls = ClsServiceManager.getClsService();
return await cls.run({ ifNested: 'reuse' }, async () => { return await cls.run({ ifNested: 'reuse' }, async () => {
@@ -138,7 +116,7 @@ export class EventBus
} catch (e) { } catch (e) {
if (suppressError) { if (suppressError) {
this.logger.error( this.logger.error(
`Error happened when handling event [${event}] (${signature})`, `Error happened when handling event ${signature}`,
e e
); );
} else { } else {
@@ -152,15 +130,13 @@ export class EventBus
{ {
event, event,
namespace, namespace,
handler: signature, handler: handlerName,
} }
); );
add.call(this.emitter, event, handler as any, opts); add.call(this.emitter, event, handler as any, opts);
this.logger.verbose( this.logger.verbose(`Event handler registered ${signature}`);
`Event handler for [${event}] registered ${name ? `in [${name}]` : ''}`
);
return () => { return () => {
this.emitter.off(event, handler as any); this.emitter.off(event, handler as any);
@@ -171,60 +147,9 @@ export class EventBus
return this.emitter.waitFor(name, timeout); return this.emitter.waitFor(name, timeout);
} }
private bindEventHandlers() { private readonly bindEventHandlers = once(() => {
// make sure all our job handlers defined in [Providers] to make the code organization clean. this.scanner.scan().forEach(({ event, handler, opts }) => {
// const providers = [...this.discovery.getProviders(), this.discovery.getControllers()] this.on(event, handler, opts);
const providers = this.discovery.getProviders();
providers.forEach(wrapper => {
const { instance, name } = wrapper;
if (!instance || wrapper.isAlias) {
return;
}
const proto = Object.getPrototypeOf(instance);
const methods = this.scanner.getAllMethodNames(proto);
methods.forEach(method => {
const fn = instance[method];
let defs = sliceMetadata<EventHandlerMetadata>(
EVENT_LISTENER_METADATA,
fn
);
if (defs.length === 0) {
return;
}
const signature = `${name}.${method}`;
if (typeof fn !== 'function') {
throw new Error(`Event handler [${signature}] is not a function.`);
}
if (!wrapper.isDependencyTreeStatic()) {
throw new Error(
`Provider [${name}] could not be RequestScoped or TransientScoped injectable if it contains event handlers.`
);
}
defs.forEach(({ event, opts }) => {
this.on(
event,
(payload: any) => {
// NOTE(@forehalo):
// we might create spies on the event handlers when testing,
// avoid reusing `fn` variable to fail the spies or stubs
return instance[method](payload);
},
{
...opts,
name: signature,
}
);
});
});
}); });
} });
} }

View File

@@ -1,20 +1,20 @@
import { Global, Module } from '@nestjs/common'; import { Global, Module } from '@nestjs/common';
import { DiscoveryModule } from '@nestjs/core';
import EventEmitter2 from 'eventemitter2'; import EventEmitter2 from 'eventemitter2';
import { EventBus, OnEvent } from './eventbus'; import { EventBus } from './eventbus';
import { EventHandlerScanner } from './scanner';
const EmitProvider = { const EmitProvider = {
provide: EventEmitter2, provide: EventEmitter2,
useValue: new EventEmitter2(), useFactory: () => new EventEmitter2(),
}; };
@Global() @Global()
@Module({ @Module({
imports: [DiscoveryModule], providers: [EventBus, EventHandlerScanner, EmitProvider],
providers: [EventBus, EmitProvider],
exports: [EventBus], exports: [EventBus],
}) })
export class EventModule {} export class EventModule {}
export { EventBus, OnEvent }; export { EventBus };
export { OnEvent } from './def';

View File

@@ -0,0 +1,71 @@
import { Injectable } from '@nestjs/common';
import { once } from 'lodash-es';
import { ModuleScanner } from '../nestjs';
import {
type EventName,
type EventOptions,
getEventHandlerMetadata,
} from './def';
@Injectable()
export class EventHandlerScanner {
constructor(private readonly scanner: ModuleScanner) {}
scan = once(() => {
const handlers: Array<{
event: EventName;
handler: (payload: any) => any;
opts?: EventOptions;
}> = [];
const providers = this.scanner.getAtInjectables();
providers.forEach(wrapper => {
const { instance, name } = wrapper;
if (!instance || wrapper.isAlias) {
return;
}
const methods = this.scanner.getAllMethodNames(instance);
methods.forEach(method => {
const fn = instance[method];
let defs = getEventHandlerMetadata(instance[method]);
if (defs.length === 0) {
return;
}
const signature = `${name}.${method}`;
if (typeof fn !== 'function') {
throw new Error(`Event handler [${signature}] is not a function.`);
}
if (!wrapper.isDependencyTreeStatic()) {
throw new Error(
`Provider [${name}] could not be RequestScoped or TransientScoped injectable if it contains event handlers.`
);
}
defs.forEach(({ event, opts }) => {
handlers.push({
event,
handler: (payload: any) => {
// NOTE(@forehalo):
// we might create spies on the event handlers when testing,
// avoid reusing `fn` variable to fail the spies or stubs
return instance[method].bind(instance)(payload);
},
opts: {
name: signature,
...opts,
},
});
});
});
});
return handlers;
});
}

View File

@@ -24,6 +24,7 @@ export {
} from './graphql'; } from './graphql';
export * from './guard'; export * from './guard';
export { CryptoHelper, URLHelper } from './helpers'; export { CryptoHelper, URLHelper } from './helpers';
export * from './job';
export { AFFiNELogger } from './logger'; export { AFFiNELogger } from './logger';
export { MailService } from './mailer'; export { MailService } from './mailer';
export { CallMetric, metrics } from './metrics'; export { CallMetric, metrics } from './metrics';

View File

@@ -0,0 +1 @@
export { JobModule, JobQueue, OnJob } from './queue';

View File

@@ -0,0 +1,243 @@
import { Injectable } from '@nestjs/common';
import { TestingModule } from '@nestjs/testing';
import test from 'ava';
import { CLS_ID, ClsServiceManager } from 'nestjs-cls';
import Sinon from 'sinon';
import { createTestingModule } from '../../../../__tests__/utils';
import { ConfigModule } from '../../../config';
import { metrics } from '../../../metrics';
import { genRequestId } from '../../../utils';
import { JobModule, JobQueue, OnJob } from '..';
import { JobExecutor } from '../executor';
import { JobHandlerScanner } from '../scanner';
let module: TestingModule;
let queue: JobQueue;
let executor: JobExecutor;
declare global {
interface Jobs {
'nightly.__test__job': {
name: string;
};
'nightly.__test__job2': {
name: string;
};
'nightly.__test__throw': any;
'nightly.__test__requestId': any;
}
}
@Injectable()
class JobHandlers {
@OnJob('nightly.__test__job')
@OnJob('nightly.__test__job2')
async handleJob(job: Jobs['nightly.__test__job']) {
return job.name;
}
@OnJob('nightly.__test__throw')
async throwJob() {
throw new Error('Throw in job handler');
}
@OnJob('nightly.__test__requestId')
onRequestId() {
const cls = ClsServiceManager.getClsService();
return cls.getId() ?? genRequestId('job');
}
}
test.before(async () => {
module = await createTestingModule({
imports: [
ConfigModule.forRoot({
job: {
worker: {
// NOTE(@forehalo):
// bullmq will hold the connection to check stalled jobs,
// which will keep the test process alive to timeout.
stalledInterval: 100,
},
},
}),
JobModule.forRoot(),
],
providers: [JobHandlers],
});
queue = module.get(JobQueue);
executor = module.get(JobExecutor);
});
test.afterEach(async () => {
// @ts-expect-error private api
const inner = queue.getQueue('nightly');
await inner.obliterate({ force: true });
inner.resume();
});
test.after.always(async () => {
await module.close();
});
// #region scanner
test('should register job handler', async t => {
const scanner = module.get(JobHandlerScanner);
const handler = scanner.getHandler('nightly.__test__job');
t.is(handler!.name, 'JobHandlers.handleJob');
t.is(typeof handler!.fn, 'function');
const result = await handler!.fn({ name: 'test' });
t.is(result, 'test');
});
// #endregion
// #region queue
test('should add job to queue', async t => {
const job = await queue.add('nightly.__test__job', { name: 'test' });
// @ts-expect-error private api
const innerQueue = queue.getQueue('nightly');
const queuedJob = await innerQueue.getJob(job.id!);
t.is(queuedJob.name, job.name);
});
test('should remove job from queue', async t => {
const job = await queue.add('nightly.__test__job', { name: 'test' });
// @ts-expect-error private api
const innerQueue = queue.getQueue('nightly');
const data = await queue.remove(job.id!, job.name as JobName);
t.deepEqual(data, { name: 'test' });
const nullData = await queue.remove(job.id!, job.name as JobName);
const nullJob = await innerQueue.getJob(job.id!);
t.is(nullData, undefined);
t.is(nullJob, undefined);
});
// #endregion
// #region executor
test('should start workers', async t => {
// @ts-expect-error private api
const worker = executor.workers['nightly'];
t.truthy(worker);
t.true(worker.isRunning());
});
test('should dispatch job handler', async t => {
const handlers = module.get(JobHandlers);
const spy = Sinon.spy(handlers, 'handleJob');
await executor.run('nightly.__test__job', { name: 'test executor' });
t.true(spy.calledOnceWithExactly({ name: 'test executor' }));
});
test('should be able to record job metrics', async t => {
const counterStub = Sinon.stub(metrics.job.counter('function_calls'), 'add');
const timerStub = Sinon.stub(
metrics.job.histogram('function_timer'),
'record'
);
await executor.run('nightly.__test__job', { name: 'test executor' });
t.deepEqual(counterStub.firstCall.args[1], {
name: 'job_handler',
job: 'nightly.__test__job',
namespace: 'nightly',
handler: 'JobHandlers.handleJob',
error: false,
});
t.deepEqual(timerStub.firstCall.args[1], {
name: 'job_handler',
job: 'nightly.__test__job',
namespace: 'nightly',
handler: 'JobHandlers.handleJob',
error: false,
});
counterStub.reset();
timerStub.reset();
await executor.run('nightly.__test__job2', { name: 'test executor' });
t.deepEqual(counterStub.firstCall.args[1], {
name: 'job_handler',
job: 'nightly.__test__job2',
namespace: 'nightly',
handler: 'JobHandlers.handleJob',
error: false,
});
t.deepEqual(timerStub.firstCall.args[1], {
name: 'job_handler',
job: 'nightly.__test__job2',
namespace: 'nightly',
handler: 'JobHandlers.handleJob',
error: false,
});
counterStub.reset();
timerStub.reset();
await t.throwsAsync(
executor.run('nightly.__test__throw', { name: 'test executor' }),
{
message: 'Throw in job handler',
}
);
t.deepEqual(counterStub.firstCall.args[1], {
name: 'job_handler',
job: 'nightly.__test__throw',
namespace: 'nightly',
handler: 'JobHandlers.throwJob',
error: true,
});
t.deepEqual(timerStub.firstCall.args[1], {
name: 'job_handler',
job: 'nightly.__test__throw',
namespace: 'nightly',
handler: 'JobHandlers.throwJob',
error: true,
});
});
test('should generate request id', async t => {
const handlers = module.get(JobHandlers);
const spy = Sinon.spy(handlers, 'onRequestId');
await executor.run('nightly.__test__requestId', {});
t.true(spy.returnValues.some(v => v.includes(':job/')));
spy.restore();
});
test('should continuously use request id', async t => {
const handlers = module.get(JobHandlers);
const spy = Sinon.spy(handlers, 'onRequestId');
const cls = ClsServiceManager.getClsService();
await cls.run(async () => {
cls.set(CLS_ID, 'test-request-id');
await executor.run('nightly.__test__requestId', {});
});
t.true(spy.returned('test-request-id'));
spy.restore();
});
// #endregion

View File

@@ -0,0 +1,53 @@
import { QueueOptions, WorkerOptions } from 'bullmq';
import {
defineRuntimeConfig,
defineStartupConfig,
ModuleConfig,
} from '../../config';
import { Queue } from './def';
declare module '../../config' {
interface AppConfig {
job: ModuleConfig<
{
queue: Omit<QueueOptions, 'connection'>;
worker: Omit<WorkerOptions, 'connection'>;
},
{
queues: {
[key in Queue]: {
concurrency: number;
};
};
}
>;
}
}
defineStartupConfig('job', {
queue: {
prefix: 'affine_job',
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
removeOnFail: false,
},
},
worker: {},
});
defineRuntimeConfig('job', {
'queues.nightly.concurrency': {
default: 1,
desc: 'Concurrency of worker consuming of nightly checking job queue',
},
'queues.notification.concurrency': {
default: 10,
desc: 'Concurrency of worker consuming of notification job queue',
},
'queues.doc.concurrency': {
default: 1,
desc: 'Concurrency of worker consuming of doc job queue',
},
});

View File

@@ -0,0 +1,64 @@
import { join } from 'node:path';
import { PushMetadata, sliceMetadata } from '../../nestjs';
declare global {
/**
* Job definitions can be extended by
*
* @example
*
* declare global {
* interface Jobs {
* 'nightly.deleteExpiredUserSessions': {}
* ^^^^^^^ first segment must be namespace and a standalone queue will be created for each namespace
* }
* }
*/
interface Jobs {}
type JobName = keyof Jobs;
}
export const JOB_METADATA = Symbol('JOB');
export enum Queue {
NIGHTLY_JOB = 'nightly',
NOTIFICATION = 'notification',
DOC = 'doc',
}
export const QUEUES = Object.values(Queue);
export function namespace(job: JobName) {
const parts = job.split('.');
// no namespace
if (parts.length === 1) {
throw new Error(
`Job name must contain at least one namespace like [namespace].[job], get [${job}].`
);
}
return parts[0];
}
export const OnJob = (job: JobName) => {
const ns = namespace(job);
if (!QUEUES.includes(ns as Queue)) {
throw new Error(
`Invalid job queue: ${ns}, must be one of [${QUEUES.join(', ')}].
If you want to introduce new job queue, please modify the Queue enum first in ${join(AFFiNE.projectRoot, 'src/base/job/queue/def.ts')}`
);
}
if (job === ns) {
throw new Error("The job name must not be the same as it's namespace.");
}
return PushMetadata(JOB_METADATA, job);
};
export function getJobHandlerMetadata(target: any): JobName[] {
return sliceMetadata<JobName>(JOB_METADATA, target);
}

View File

@@ -0,0 +1,149 @@
import {
Injectable,
Logger,
OnApplicationBootstrap,
OnApplicationShutdown,
} from '@nestjs/common';
import { Worker } from 'bullmq';
import { difference } from 'lodash-es';
import { CLS_ID, ClsServiceManager } from 'nestjs-cls';
import { Config } from '../../config';
import { metrics, wrapCallMetric } from '../../metrics';
import { QueueRedis } from '../../redis';
import { Runtime } from '../../runtime';
import { genRequestId } from '../../utils';
import { namespace, Queue, QUEUES } from './def';
import { JobHandlerScanner } from './scanner';
@Injectable()
export class JobExecutor
implements OnApplicationBootstrap, OnApplicationShutdown
{
private readonly logger = new Logger('job');
private readonly workers: Record<string, Worker> = {};
constructor(
private readonly config: Config,
private readonly redis: QueueRedis,
private readonly scanner: JobHandlerScanner,
private readonly runtime: Runtime
) {}
async onApplicationBootstrap() {
const queues = this.config.flavor.graphql
? difference(QUEUES, [Queue.DOC])
: [];
// NOTE(@forehalo): only enable doc queue in doc service
if (this.config.flavor.doc) {
queues.push(Queue.DOC);
}
await this.startWorkers(queues);
}
async onApplicationShutdown() {
await this.stopWorkers();
}
async run(name: JobName, payload: any) {
const ns = namespace(name);
const handler = this.scanner.getHandler(name);
if (!handler) {
this.logger.warn(`Job handler for [${name}] not found.`);
return;
}
const fn = wrapCallMetric(
async () => {
const cls = ClsServiceManager.getClsService();
await cls.run({ ifNested: 'reuse' }, async () => {
const requestId = cls.getId();
if (!requestId) {
cls.set(CLS_ID, genRequestId('job'));
}
const signature = `[${name}] (${handler.name})`;
try {
this.logger.debug(`Job started: ${signature}`);
const result = await handler.fn(payload);
this.logger.debug(`Job finished: ${signature}`);
return result;
} catch (e) {
this.logger.error(`Job failed: ${signature}`, e);
throw e;
}
});
},
'job',
'job_handler',
{
job: name,
namespace: ns,
handler: handler.name,
}
);
const activeJobs = metrics.job.gauge('queue_active_jobs');
activeJobs.record(1, { queue: ns });
try {
return await fn();
} finally {
activeJobs.record(-1, { queue: ns });
}
}
private async startWorkers(queues: Queue[]) {
const configs =
(await this.runtime.fetchAll(
queues.reduce(
(ret, queue) => {
ret[`job/queues.${queue}.concurrency`] = true;
return ret;
},
{} as {
[key in `job/queues.${Queue}.concurrency`]: true;
}
)
// TODO(@forehalo): fix the override by [payment/service.spec.ts]
)) ?? {};
for (const queue of queues) {
const concurrency =
(configs[`job/queues.${queue}.concurrency`] as number) ??
this.config.job.worker.concurrency ??
1;
const worker = new Worker(
queue,
async job => {
await this.run(job.name as JobName, job.data);
},
{
...this.config.job.worker,
connection: this.redis,
concurrency,
}
);
worker.on('error', error => {
this.logger.error(`Queue Worker [${queue}] error`, error);
});
this.logger.log(
`Queue Worker [${queue}] started; concurrency=${concurrency};`
);
this.workers[queue] = worker;
}
}
private async stopWorkers() {
await Promise.all(
Object.values(this.workers).map(async worker => {
await worker.close(true);
})
);
}
}

View File

@@ -0,0 +1,37 @@
import './config';
import { BullModule } from '@nestjs/bullmq';
import { DynamicModule } from '@nestjs/common';
import { Config } from '../../config';
import { QueueRedis } from '../../redis';
import { QUEUES } from './def';
import { JobExecutor } from './executor';
import { JobQueue } from './queue';
import { JobHandlerScanner } from './scanner';
export class JobModule {
static forRoot(): DynamicModule {
return {
global: true,
module: JobModule,
imports: [
BullModule.forRootAsync({
useFactory: (config: Config, redis: QueueRedis) => {
return {
...config.job.queue,
connection: redis,
};
},
inject: [Config, QueueRedis],
}),
BullModule.registerQueue(...QUEUES.map(name => ({ name }))),
],
providers: [JobQueue, JobExecutor, JobHandlerScanner],
exports: [JobQueue],
};
}
}
export { JobQueue };
export { OnJob } from './def';

View File

@@ -0,0 +1,43 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { Job, JobsOptions, Queue } from 'bullmq';
import { namespace } from './def';
@Injectable()
export class JobQueue {
private readonly logger = new Logger(JobQueue.name);
constructor(private readonly moduleRef: ModuleRef) {}
async add<T extends JobName>(name: T, payload: Jobs[T], opts?: JobsOptions) {
const ns = namespace(name);
const queue = this.getQueue(ns);
const job = await queue.add(name, payload, opts);
this.logger.debug(`Job [${name}] added; id=${job.id}`);
return job;
}
async remove<T extends JobName>(jobId: string, jobName: T) {
const ns = namespace(jobName);
const queue = this.getQueue(ns);
const job = (await queue.getJob(jobId)) as Job<Jobs[T]> | undefined;
if (!job) {
return;
}
const removed = await queue.remove(jobId);
if (removed) {
this.logger.log(`Job ${jobName} removed from queue ${ns}`);
return job.data;
}
return undefined;
}
private getQueue(ns: string): Queue {
return this.moduleRef.get(getQueueToken(ns), { strict: false });
}
}

View File

@@ -0,0 +1,77 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ModuleScanner } from '../../nestjs';
import { getJobHandlerMetadata } from './def';
interface JobHandler {
name: string;
fn: (payload: any) => any;
}
@Injectable()
export class JobHandlerScanner implements OnModuleInit {
private readonly handlers: Record<string, JobHandler> = {};
constructor(private readonly scanner: ModuleScanner) {}
async onModuleInit() {
this.scan();
}
getHandler(jobName: JobName): JobHandler | undefined {
return this.handlers[jobName];
}
private scan() {
const providers = this.scanner.getAtInjectables();
providers.forEach(wrapper => {
const { instance, name } = wrapper;
if (!instance || wrapper.isAlias) {
return;
}
const methods = this.scanner.getAllMethodNames(instance);
methods.forEach(method => {
const fn = instance[method];
let jobNames = getJobHandlerMetadata(instance[method]);
if (jobNames.length === 0) {
return;
}
const signature = `${name}.${method}`;
if (typeof fn !== 'function') {
throw new Error(`Job handler [${signature}] is not a function.`);
}
if (!wrapper.isDependencyTreeStatic()) {
throw new Error(
`Provider [${name}] could not be RequestScoped or TransientScoped injectable if it contains job handlers.`
);
}
jobNames.forEach(jobName => {
if (this.handlers[jobName]) {
throw new Error(
`Job handler ${jobName} already defined in [${this.handlers[jobName].name}].`
);
}
this.handlers[jobName] = {
name: signature,
fn: (payload: any) => {
// NOTE(@forehalo):
// we might create spies on the job handlers when testing,
// avoid reusing `fn` variable to fail the spies or stubs
return instance[method].bind(instance)(payload);
},
};
});
});
});
}
}

View File

@@ -38,7 +38,8 @@ export type KnownMetricScopes =
| 'sse' | 'sse'
| 'mail' | 'mail'
| 'ai' | 'ai'
| 'event'; | 'event'
| 'job';
const metricCreators: MetricCreators = { const metricCreators: MetricCreators = {
counter(meter: Meter, name: string, opts?: MetricOptions) { counter(meter: Meter, name: string, opts?: MetricOptions) {

View File

@@ -2,3 +2,4 @@ import './config';
export * from './decorator'; export * from './decorator';
export * from './exception'; export * from './exception';
export * from './optional-module'; export * from './optional-module';
export * from './scanner';

View File

@@ -0,0 +1,60 @@
import { Global, Injectable, Module } from '@nestjs/common';
import {
DiscoveryModule,
DiscoveryService,
MetadataScanner,
} from '@nestjs/core';
import { RESOLVER_TYPE_METADATA } from '@nestjs/graphql';
@Injectable()
export class ModuleScanner {
constructor(
private readonly discovery: DiscoveryService,
private readonly scanner: MetadataScanner
) {}
getClassProviders() {
return this.discovery
.getProviders()
.filter(
wrapper =>
wrapper.instance && !wrapper.isAlias && !wrapper.isNotMetatype
);
}
getAtInjectables() {
return this.getClassProviders().filter(
wrapper => !this.isResolver(wrapper.instance)
);
}
getControllers() {
return this.discovery.getControllers();
}
getResolvers() {
return this.getClassProviders().filter(wrapper =>
this.isResolver(wrapper.instance)
);
}
getAllMethodNames(instance: any) {
return this.scanner.getAllMethodNames(Object.getPrototypeOf(instance));
}
isResolver(instance: any) {
if (typeof instance !== 'object') {
return false;
}
const metadata = Reflect.getMetadata(RESOLVER_TYPE_METADATA, instance);
return metadata !== undefined;
}
}
@Global()
@Module({
imports: [DiscoveryModule],
providers: [ModuleScanner],
exports: [ModuleScanner],
})
export class ScannerModule {}

View File

@@ -2,13 +2,18 @@ import './config';
import { Global, Module } from '@nestjs/common'; import { Global, Module } from '@nestjs/common';
import { CacheRedis, SessionRedis, SocketIoRedis } from './instances'; import {
CacheRedis,
QueueRedis,
SessionRedis,
SocketIoRedis,
} from './instances';
@Global() @Global()
@Module({ @Module({
providers: [CacheRedis, SessionRedis, SocketIoRedis], providers: [CacheRedis, SessionRedis, SocketIoRedis, QueueRedis],
exports: [CacheRedis, SessionRedis, SocketIoRedis], exports: [CacheRedis, SessionRedis, SocketIoRedis, QueueRedis],
}) })
export class RedisModule {} export class RedisModule {}
export { CacheRedis, SessionRedis, SocketIoRedis }; export { CacheRedis, QueueRedis, SessionRedis, SocketIoRedis };

View File

@@ -31,6 +31,16 @@ class Redis extends IORedis implements OnModuleInit, OnModuleDestroy {
client.on('error', this.errorHandler); client.on('error', this.errorHandler);
return client; return client;
} }
assertValidDBIndex(db: number) {
if (db && db > 15) {
throw new Error(
// Redis allows [0..16) by default
// we separate the db for different usages by `this.options.db + [0..4]`
`Invalid database index: ${db}, must be between 0 and 11`
);
}
}
} }
@Injectable() @Injectable()
@@ -53,3 +63,15 @@ export class SocketIoRedis extends Redis {
super({ ...config.redis, db: (config.redis.db ?? 0) + 3 }); super({ ...config.redis, db: (config.redis.db ?? 0) + 3 });
} }
} }
@Injectable()
export class QueueRedis extends Redis {
constructor(config: Config) {
super({
...config.redis,
db: (config.redis.db ?? 0) + 4,
// required explicitly set to `null` by bullmq
maxRetriesPerRequest: null,
});
}
}

View File

@@ -2,7 +2,6 @@
import './prelude'; import './prelude';
import { Logger } from '@nestjs/common'; import { Logger } from '@nestjs/common';
import { omit } from 'lodash-es';
import { createApp } from './app'; import { createApp } from './app';
import { URLHelper } from './base'; import { URLHelper } from './base';
@@ -15,9 +14,5 @@ const url = app.get(URLHelper);
const logger = new Logger('App'); const logger = new Logger('App');
logger.log(`AFFiNE Server is running in [${AFFiNE.type}] mode`); logger.log(`AFFiNE Server is running in [${AFFiNE.type}] mode`);
if (AFFiNE.node.dev) {
logger.log('Startup Configuration:');
logger.log(omit(globalThis.AFFiNE, 'ENV_MAP'));
}
logger.log(`Listening on http://${listeningHost}:${AFFiNE.server.port}`); logger.log(`Listening on http://${listeningHost}:${AFFiNE.server.port}`);
logger.log(`And the public server should be recognized as ${url.home}`); logger.log(`And the public server should be recognized as ${url.home}`);

152
yarn.lock
View File

@@ -785,6 +785,7 @@ __metadata:
"@nestjs-cls/transactional": "npm:^2.4.4" "@nestjs-cls/transactional": "npm:^2.4.4"
"@nestjs-cls/transactional-adapter-prisma": "npm:^1.2.7" "@nestjs-cls/transactional-adapter-prisma": "npm:^1.2.7"
"@nestjs/apollo": "npm:^12.2.2" "@nestjs/apollo": "npm:^12.2.2"
"@nestjs/bullmq": "npm:^10.2.3"
"@nestjs/common": "npm:^10.4.15" "@nestjs/common": "npm:^10.4.15"
"@nestjs/core": "npm:^10.4.15" "@nestjs/core": "npm:^10.4.15"
"@nestjs/graphql": "npm:^12.2.2" "@nestjs/graphql": "npm:^12.2.2"
@@ -830,6 +831,7 @@ __metadata:
"@types/sinon": "npm:^17.0.3" "@types/sinon": "npm:^17.0.3"
"@types/supertest": "npm:^6.0.2" "@types/supertest": "npm:^6.0.2"
ava: "npm:^6.2.0" ava: "npm:^6.2.0"
bullmq: "npm:^5.40.2"
c8: "npm:^10.1.3" c8: "npm:^10.1.3"
cookie-parser: "npm:^1.4.7" cookie-parser: "npm:^1.4.7"
cross-env: "npm:^7.0.3" cross-env: "npm:^7.0.3"
@@ -7784,6 +7786,48 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"@msgpackr-extract/msgpackr-extract-darwin-arm64@npm:3.0.3":
version: 3.0.3
resolution: "@msgpackr-extract/msgpackr-extract-darwin-arm64@npm:3.0.3"
conditions: os=darwin & cpu=arm64
languageName: node
linkType: hard
"@msgpackr-extract/msgpackr-extract-darwin-x64@npm:3.0.3":
version: 3.0.3
resolution: "@msgpackr-extract/msgpackr-extract-darwin-x64@npm:3.0.3"
conditions: os=darwin & cpu=x64
languageName: node
linkType: hard
"@msgpackr-extract/msgpackr-extract-linux-arm64@npm:3.0.3":
version: 3.0.3
resolution: "@msgpackr-extract/msgpackr-extract-linux-arm64@npm:3.0.3"
conditions: os=linux & cpu=arm64
languageName: node
linkType: hard
"@msgpackr-extract/msgpackr-extract-linux-arm@npm:3.0.3":
version: 3.0.3
resolution: "@msgpackr-extract/msgpackr-extract-linux-arm@npm:3.0.3"
conditions: os=linux & cpu=arm
languageName: node
linkType: hard
"@msgpackr-extract/msgpackr-extract-linux-x64@npm:3.0.3":
version: 3.0.3
resolution: "@msgpackr-extract/msgpackr-extract-linux-x64@npm:3.0.3"
conditions: os=linux & cpu=x64
languageName: node
linkType: hard
"@msgpackr-extract/msgpackr-extract-win32-x64@npm:3.0.3":
version: 3.0.3
resolution: "@msgpackr-extract/msgpackr-extract-win32-x64@npm:3.0.3"
conditions: os=win32 & cpu=x64
languageName: node
linkType: hard
"@mswjs/interceptors@npm:^0.37.0": "@mswjs/interceptors@npm:^0.37.0":
version: 0.37.6 version: 0.37.6
resolution: "@mswjs/interceptors@npm:0.37.6" resolution: "@mswjs/interceptors@npm:0.37.6"
@@ -8814,6 +8858,32 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"@nestjs/bull-shared@npm:^10.2.3":
version: 10.2.3
resolution: "@nestjs/bull-shared@npm:10.2.3"
dependencies:
tslib: "npm:2.8.1"
peerDependencies:
"@nestjs/common": ^8.0.0 || ^9.0.0 || ^10.0.0
"@nestjs/core": ^8.0.0 || ^9.0.0 || ^10.0.0
checksum: 10/bbd69f6eae80b4e356682f4c33b66cc1a07d85b182d1bcc80f942ec7dc7eff4613d5d64a33f7dc0dc1959079fb0195983e840aea0bf3cea69e3bf757bd20d302
languageName: node
linkType: hard
"@nestjs/bullmq@npm:^10.2.3":
version: 10.2.3
resolution: "@nestjs/bullmq@npm:10.2.3"
dependencies:
"@nestjs/bull-shared": "npm:^10.2.3"
tslib: "npm:2.8.1"
peerDependencies:
"@nestjs/common": ^8.0.0 || ^9.0.0 || ^10.0.0
"@nestjs/core": ^8.0.0 || ^9.0.0 || ^10.0.0
bullmq: ^3.0.0 || ^4.0.0 || ^5.0.0
checksum: 10/b1fd4cc1adc6189720c9ce15848e43f85a926b39b4bb4293912ffd0ae8f94d5af23cee69fbfa936b174d27dbfcc032e9390365815f387f524c1de7de596e708a
languageName: node
linkType: hard
"@nestjs/common@npm:^10.4.15": "@nestjs/common@npm:^10.4.15":
version: 10.4.15 version: 10.4.15
resolution: "@nestjs/common@npm:10.4.15" resolution: "@nestjs/common@npm:10.4.15"
@@ -17619,6 +17689,21 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"bullmq@npm:^5.40.2":
version: 5.40.2
resolution: "bullmq@npm:5.40.2"
dependencies:
cron-parser: "npm:^4.9.0"
ioredis: "npm:^5.4.1"
msgpackr: "npm:^1.11.2"
node-abort-controller: "npm:^3.1.1"
semver: "npm:^7.5.4"
tslib: "npm:^2.0.0"
uuid: "npm:^9.0.0"
checksum: 10/b3362252450f0d10269448a3295e9c4d5495a510c0a156a8cad6ef4ab88347dc8ee4406f0f39a2deb2ca407d6c0d2f90beb60398b5c8b5089c60c59106878509
languageName: node
linkType: hard
"bundle-name@npm:^4.1.0": "bundle-name@npm:^4.1.0":
version: 4.1.0 version: 4.1.0
resolution: "bundle-name@npm:4.1.0" resolution: "bundle-name@npm:4.1.0"
@@ -19119,6 +19204,15 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"cron-parser@npm:^4.9.0":
version: 4.9.0
resolution: "cron-parser@npm:4.9.0"
dependencies:
luxon: "npm:^3.2.1"
checksum: 10/ffca5e532a5ee0923412ee6e4c7f9bbceacc6ddf8810c16d3e9fb4fe5ec7e2de1b6896d7956f304bb6bc96b0ce37ad7e3935304179d52951c18d84107184faa7
languageName: node
linkType: hard
"cron@npm:3.2.1": "cron@npm:3.2.1":
version: 3.2.1 version: 3.2.1
resolution: "cron@npm:3.2.1" resolution: "cron@npm:3.2.1"
@@ -25592,7 +25686,7 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"luxon@npm:~3.5.0": "luxon@npm:^3.2.1, luxon@npm:~3.5.0":
version: 3.5.0 version: 3.5.0
resolution: "luxon@npm:3.5.0" resolution: "luxon@npm:3.5.0"
checksum: 10/48f86e6c1c96815139f8559456a3354a276ba79bcef0ae0d4f2172f7652f3ba2be2237b0e103b8ea0b79b47715354ac9fac04eb1db3485dcc72d5110491dd47f checksum: 10/48f86e6c1c96815139f8559456a3354a276ba79bcef0ae0d4f2172f7652f3ba2be2237b0e103b8ea0b79b47715354ac9fac04eb1db3485dcc72d5110491dd47f
@@ -26919,6 +27013,49 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"msgpackr-extract@npm:^3.0.2":
version: 3.0.3
resolution: "msgpackr-extract@npm:3.0.3"
dependencies:
"@msgpackr-extract/msgpackr-extract-darwin-arm64": "npm:3.0.3"
"@msgpackr-extract/msgpackr-extract-darwin-x64": "npm:3.0.3"
"@msgpackr-extract/msgpackr-extract-linux-arm": "npm:3.0.3"
"@msgpackr-extract/msgpackr-extract-linux-arm64": "npm:3.0.3"
"@msgpackr-extract/msgpackr-extract-linux-x64": "npm:3.0.3"
"@msgpackr-extract/msgpackr-extract-win32-x64": "npm:3.0.3"
node-gyp: "npm:latest"
node-gyp-build-optional-packages: "npm:5.2.2"
dependenciesMeta:
"@msgpackr-extract/msgpackr-extract-darwin-arm64":
optional: true
"@msgpackr-extract/msgpackr-extract-darwin-x64":
optional: true
"@msgpackr-extract/msgpackr-extract-linux-arm":
optional: true
"@msgpackr-extract/msgpackr-extract-linux-arm64":
optional: true
"@msgpackr-extract/msgpackr-extract-linux-x64":
optional: true
"@msgpackr-extract/msgpackr-extract-win32-x64":
optional: true
bin:
download-msgpackr-prebuilds: bin/download-prebuilds.js
checksum: 10/4bfe45cf6968310570765951691f1b8e85b6a837e5197b8232fc9285eef4b457992e73118d9d07c92a52cc23f9e837897b135e17ea0f73e3604540434051b62f
languageName: node
linkType: hard
"msgpackr@npm:^1.11.2":
version: 1.11.2
resolution: "msgpackr@npm:1.11.2"
dependencies:
msgpackr-extract: "npm:^3.0.2"
dependenciesMeta:
msgpackr-extract:
optional: true
checksum: 10/7602f1e91e5ba13f4289ec9cab0d3f3db87d4ed323bebcb40a0c43ba2f6153192bffb63a5bb4755faacb6e0985f307c35084f40eaba1c325b7035da91381f01a
languageName: node
linkType: hard
"msw@npm:^2.6.8, msw@npm:^2.7.0": "msw@npm:^2.6.8, msw@npm:^2.7.0":
version: 2.7.0 version: 2.7.0
resolution: "msw@npm:2.7.0" resolution: "msw@npm:2.7.0"
@@ -27317,6 +27454,19 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"node-gyp-build-optional-packages@npm:5.2.2":
version: 5.2.2
resolution: "node-gyp-build-optional-packages@npm:5.2.2"
dependencies:
detect-libc: "npm:^2.0.1"
bin:
node-gyp-build-optional-packages: bin.js
node-gyp-build-optional-packages-optional: optional.js
node-gyp-build-optional-packages-test: build-test.js
checksum: 10/f448a328cf608071dc8cc4426ac5be0daec4788e4e1759e9f7ffcd286822cc799384edce17a8c79e610c4bbfc8e3aff788f3681f1d88290e0ca7aaa5342a090f
languageName: node
linkType: hard
"node-gyp-build@npm:^4.2.2": "node-gyp-build@npm:^4.2.2":
version: 4.8.4 version: 4.8.4
resolution: "node-gyp-build@npm:4.8.4" resolution: "node-gyp-build@npm:4.8.4"