fix(server): avoid global rejection when event handler errors (#10467)

This commit is contained in:
liuyi
2025-02-27 14:25:46 +08:00
committed by GitHub
parent caa4dfedfc
commit b3821ad619
5 changed files with 103 additions and 24 deletions

View File

@@ -4,7 +4,7 @@ import { CLS_ID, ClsServiceManager } from 'nestjs-cls';
import Sinon from 'sinon';
import { EventBus, metrics } from '../../base';
import { createTestingModule } from '../utils';
import { createTestingModule, sleep } from '../utils';
import { Listeners } from './provider';
export const test = ava as TestFn<{
@@ -201,3 +201,55 @@ test('should continuously use the same request id', async t => {
t.true(listeners.onRequestId.lastCall.returned('test-request-id'));
});
test('should throw when emitting async event with uncaught error', async t => {
const { eventbus } = t.context;
await t.throwsAsync(
() => eventbus.emitAsync('__test__.throw', { count: 0 }),
{
message: 'Error in event handler',
}
);
});
test('should suppress thrown error when emitting async event', async t => {
const { eventbus } = t.context;
const spy = Sinon.spy();
// @ts-expect-error internal event
const off = eventbus.on('error', spy);
const promise = eventbus.emitAsync('__test__.suppressThrow', {});
await t.notThrowsAsync(promise);
t.true(spy.calledOnce);
const args = spy.firstCall.args[0];
t.is(args.event, '__test__.suppressThrow');
t.deepEqual(args.payload, {});
t.is(args.error.message, 'Error in event handler');
const returns = await promise;
t.deepEqual(returns, [undefined]);
off();
});
test('should catch thrown error when emitting sync event', async t => {
const { eventbus } = t.context;
const spy = Sinon.spy();
// @ts-expect-error internal event
const off = eventbus.on('error', spy);
t.notThrows(() => eventbus.emit('__test__.throw', { count: 0 }));
// wait a tick
await sleep(1);
t.true(spy.calledOnce);
const args = spy.firstCall.args[0];
t.is(args.event, '__test__.throw');
t.deepEqual(args.payload, { count: 0 });
t.is(args.error.message, 'Error in event handler');
off();
});

View File

@@ -8,6 +8,7 @@ declare global {
'__test__.event': { count: number };
'__test__.event2': { count: number };
'__test__.throw': { count: number };
'__test__.suppressThrow': {};
'__test__.requestId': {};
}
}
@@ -32,6 +33,11 @@ export class Listeners {
throw new Error('Error in event handler');
}
@OnEvent('__test__.suppressThrow', { suppressError: true })
onSuppressThrow() {
throw new Error('Error in event handler');
}
@OnEvent('__test__.requestId')
onRequestId() {
const cls = ClsServiceManager.getClsService();

View File

@@ -19,6 +19,12 @@ import { genRequestId } from '../utils';
import { type EventName, type EventOptions } from './def';
import { EventHandlerScanner } from './scanner';
interface EventHandlerErrorPayload {
event: string;
payload: any;
error: Error;
}
/**
* We use socket.io system to auto pub/sub on server to server broadcast events
*/
@@ -50,6 +56,9 @@ export class EventBus
async onModuleInit() {
this.bindEventHandlers();
this.emitter.on('error', ({ event, error }: EventHandlerErrorPayload) => {
this.logger.error(`Error happened when handling event ${event}`, error);
});
}
async onApplicationBootstrap() {
@@ -78,7 +87,16 @@ export class EventBus
*/
emit<T extends EventName>(event: T, payload: Events[T]) {
this.logger.log(`Dispatch event: ${event}`);
return this.emitter.emit(event, payload);
// NOTE(@forehalo):
// Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick.
// In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting.
// So we catch it here with `emitAsync`
this.emitter.emitAsync(event, payload).catch(e => {
this.emitter.emit('error', { event, payload, error: e });
});
return true;
}
/**
@@ -115,10 +133,11 @@ export class EventBus
return await listener(payload);
} catch (e) {
if (suppressError) {
this.logger.error(
`Error happened when handling event ${signature}`,
e
);
this.emitter.emit('error', {
event,
payload,
error: e,
} as EventHandlerErrorPayload);
} else {
throw e;
}

View File

@@ -5,7 +5,7 @@ import { DynamicModule } from '@nestjs/common';
import { Config } from '../../config';
import { QueueRedis } from '../../redis';
import { QUEUES } from './def';
import { Queue, QUEUES } from './def';
import { JobExecutor } from './executor';
import { JobQueue } from './queue';
import { JobHandlerScanner } from './scanner';
@@ -25,7 +25,15 @@ export class JobModule {
},
inject: [Config, QueueRedis],
}),
BullModule.registerQueue(...QUEUES.map(name => ({ name }))),
BullModule.registerQueue(
...QUEUES.map(name => {
if (name === Queue.NIGHTLY_JOB) {
// avoid nightly jobs been run multiple times
return { name, removeOnComplete: { age: 1000 * 60 * 60 } };
}
return { name };
})
),
],
providers: [JobQueue, JobExecutor, JobHandlerScanner],
exports: [JobQueue],

View File

@@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import { EventBus, JobQueue, OnEvent, OnJob } from '../../base';
import { EventBus, JobQueue, OnJob } from '../../base';
import {
SubscriptionPlan,
SubscriptionRecurring,
@@ -126,6 +126,15 @@ export class SubscriptionCronJobs {
});
for (const subscription of subscriptions) {
await this.db.subscription.delete({
where: {
targetId_plan: {
targetId: subscription.targetId,
plan: subscription.plan,
},
},
});
this.event.emit('user.subscription.canceled', {
userId: subscription.targetId,
plan: subscription.plan as SubscriptionPlan,
@@ -133,19 +142,4 @@ export class SubscriptionCronJobs {
});
}
}
@OnEvent('user.subscription.canceled')
async handleUserSubscriptionCanceled({
userId,
plan,
}: Events['user.subscription.canceled']) {
await this.db.subscription.delete({
where: {
targetId_plan: {
targetId: userId,
plan,
},
},
});
}
}