feat(server): cluster level event system (#9884)

This commit is contained in:
forehalo
2025-01-25 14:51:03 +00:00
parent 0d2c2ea21e
commit 6370f45928
43 changed files with 634 additions and 364 deletions

View File

@@ -1,71 +1,18 @@
import type { Snapshot, User, Workspace } from '@prisma/client';
import { Flatten, Payload } from './types';
export interface WorkspaceEvents {
members: {
reviewRequested: Payload<{ inviteId: string }>;
requestDeclined: Payload<{
userId: User['id'];
workspaceId: Workspace['id'];
}>;
requestApproved: Payload<{ inviteId: string }>;
roleChanged: Payload<{
userId: User['id'];
workspaceId: Workspace['id'];
permission: number;
}>;
ownershipTransferred: Payload<{
from: User['id'];
to: User['id'];
workspaceId: Workspace['id'];
}>;
ownershipReceived: Payload<{ workspaceId: Workspace['id'] }>;
updated: Payload<{ workspaceId: Workspace['id']; count: number }>;
leave: Payload<{
user: Pick<User, 'id' | 'email'>;
workspaceId: Workspace['id'];
}>;
removed: Payload<{ workspaceId: Workspace['id']; userId: User['id'] }>;
};
deleted: Payload<Workspace['id']>;
blob: {
deleted: Payload<{
workspaceId: Workspace['id'];
key: string;
}>;
sync: Payload<{
workspaceId: Workspace['id'];
key: string;
}>;
};
declare global {
/**
* Event definitions can be extended by
*
* @example
*
* declare global {
* interface Events {
* 'user.subscription.created': {
* userId: User['id'];
* }
* }
* }
*/
interface Events {}
}
export interface DocEvents {
deleted: Payload<Pick<Snapshot, 'id' | 'workspaceId'>>;
updated: Payload<Pick<Snapshot, 'id' | 'workspaceId'>>;
}
/**
* Event definitions can be extended by
*
* @example
*
* declare module './event/def' {
* interface UserEvents {
* created: Payload<User>;
* }
* }
*
* assert<Event, 'user.created'>()
*/
export interface EventDefinitions {
workspace: WorkspaceEvents;
snapshot: DocEvents;
}
export type EventKV = Flatten<EventDefinitions>;
export type Event = keyof EventKV;
export type EventPayload<E extends Event> = EventKV[E];
export type { Payload };
export type EventName = keyof Events;

View File

@@ -0,0 +1,141 @@
import {
applyDecorators,
Injectable,
Logger,
OnApplicationBootstrap,
} from '@nestjs/common';
import {
EventEmitter2,
EventEmitterReadinessWatcher,
OnEvent as RawOnEvent,
OnEventMetadata,
} from '@nestjs/event-emitter';
import {
OnGatewayConnection,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import type { Server, Socket } from 'socket.io';
import { CallMetric } from '../metrics';
import type { EventName } from './def';
const EventHandlerWrapper = (event: EventName): MethodDecorator => {
// @ts-expect-error allow
return (
_target,
key,
desc: TypedPropertyDescriptor<(...args: any[]) => any>
) => {
const originalMethod = desc.value;
if (!originalMethod) {
return desc;
}
desc.value = function (...args: any[]) {
new Logger(EventBus.name).log(
`Event handler: ${event} (${key.toString()})`
);
return originalMethod.apply(this, args);
};
};
};
export const OnEvent = (
event: EventName,
opts?: OnEventMetadata['options']
) => {
const namespace = event.split('.')[0];
return applyDecorators(
EventHandlerWrapper(event),
CallMetric('event', 'event_handler', undefined, { event, namespace }),
RawOnEvent(event, opts)
);
};
/**
* We use socket.io system to auto pub/sub on server to server broadcast events
*/
@WebSocketGateway({
namespace: 's2s',
})
@Injectable()
export class EventBus implements OnGatewayConnection, OnApplicationBootstrap {
private readonly logger = new Logger(EventBus.name);
@WebSocketServer()
private readonly server?: Server;
constructor(
private readonly emitter: EventEmitter2,
private readonly watcher: EventEmitterReadinessWatcher
) {}
handleConnection(client: Socket) {
// for internal usage only, disallow any connection from client
this.logger.warn(
`EventBus get suspicious connection from client ${client.id}, disconnecting...`
);
client.disconnect();
}
async onApplicationBootstrap() {
this.watcher
.waitUntilReady()
.then(() => {
const events = this.emitter.eventNames() as EventName[];
events.forEach(event => {
// Proxy all events received from server(trigger by `server.serverSideEmit`)
// to internal event system
this.server?.on(event, payload => {
this.logger.log(`Server Event: ${event} (Received)`);
this.emit(event, payload);
});
});
})
.catch(() => {
// startup time promise, never throw at runtime
});
}
/**
* Emit event to trigger all listeners on current instance
*/
async emitAsync<T extends EventName>(event: T, payload: Events[T]) {
this.logger.log(`Dispatch event: ${event} (async)`);
return await this.emitter.emitAsync(event, payload);
}
/**
* Emit event to trigger all listeners on current instance
*/
emit<T extends EventName>(event: T, payload: Events[T]) {
this.logger.log(`Dispatch event: ${event}`);
return this.emitter.emit(event, payload);
}
/**
* Broadcast event to trigger all listeners on all instance in cluster
*/
broadcast<T extends EventName>(event: T, payload: Events[T]) {
this.logger.log(`Server Event: ${event} (Send)`);
this.server?.serverSideEmit(event, payload);
}
on<T extends EventName>(
event: T,
listener: (payload: Events[T]) => void | Promise<any>,
opts?: OnEventMetadata['options']
) {
this.emitter.on(event, listener as any, opts);
return () => {
this.emitter.off(event, listener as any);
};
}
waitFor<T extends EventName>(name: T, timeout?: number) {
return this.emitter.waitFor(name, timeout);
}
}

View File

@@ -1,43 +1,14 @@
import { Global, Injectable, Module } from '@nestjs/common';
import {
EventEmitter2,
EventEmitterModule,
OnEvent as RawOnEvent,
} from '@nestjs/event-emitter';
import { Global, Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import type { Event, EventPayload } from './def';
@Injectable()
export class EventEmitter {
constructor(private readonly emitter: EventEmitter2) {}
emit<E extends Event>(event: E, payload: EventPayload<E>) {
return this.emitter.emit(event, payload);
}
emitAsync<E extends Event>(event: E, payload: EventPayload<E>) {
return this.emitter.emitAsync(event, payload);
}
on<E extends Event>(event: E, handler: (payload: EventPayload<E>) => void) {
return this.emitter.on(event, handler);
}
once<E extends Event>(event: E, handler: (payload: EventPayload<E>) => void) {
return this.emitter.once(event, handler);
}
}
export const OnEvent = RawOnEvent as (
event: Event,
opts?: Parameters<typeof RawOnEvent>[1]
) => MethodDecorator;
import { EventBus, OnEvent } from './eventbus';
@Global()
@Module({
imports: [EventEmitterModule.forRoot()],
providers: [EventEmitter],
exports: [EventEmitter],
imports: [EventEmitterModule.forRoot({ global: false })],
providers: [EventBus],
exports: [EventBus],
})
export class EventModule {}
export { Event, EventPayload };
export { EventBus, OnEvent };

View File

@@ -1,22 +0,0 @@
import type { Join, PathType } from '../utils/types';
export type Payload<T> = {
__payload: true;
data: T;
};
export type Leaves<T, P extends string = ''> =
T extends Record<string, any>
? {
[K in keyof T]: K extends string
? T[K] extends Payload<any>
? K
: Join<K, Leaves<T[K], P>>
: never;
}[keyof T]
: never;
export type Flatten<T extends Record<string, any>> = {
// @ts-expect-error allow
[K in Leaves<T>]: PathType<T, K> extends Payload<infer U> ? U : never;
};

View File

@@ -14,7 +14,7 @@ export {
getAFFiNEConfigModifier,
} from './config';
export * from './error';
export { EventEmitter, type EventPayload, OnEvent } from './event';
export { EventBus, OnEvent } from './event';
export type { GraphqlContext } from './graphql';
export * from './guard';
export { CryptoHelper, URLHelper } from './helpers';

View File

@@ -37,7 +37,8 @@ export type KnownMetricScopes =
| 'doc'
| 'sse'
| 'mail'
| 'ai';
| 'ai'
| 'event';
const metricCreators: MetricCreators = {
counter(meter: Meter, name: string, opts?: MetricOptions) {

View File

@@ -1,16 +1,36 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import {
Injectable,
Logger,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { Redis as IORedis, RedisOptions } from 'ioredis';
import { Config } from '../../base/config';
class Redis extends IORedis implements OnModuleDestroy {
class Redis extends IORedis implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(this.constructor.name);
constructor(opts: RedisOptions) {
super(opts);
}
errorHandler = (err: Error) => {
this.logger.error(err);
};
onModuleInit() {
this.on('error', this.errorHandler);
}
onModuleDestroy() {
this.disconnect();
}
override duplicate(override?: Partial<RedisOptions>): IORedis {
const client = super.duplicate(override);
client.on('error', this.errorHandler);
return client;
}
}
@Injectable()

View File

@@ -1,22 +1,7 @@
import { FlattenedAppRuntimeConfig } from '../config/types';
import { OnEvent } from '../event';
import { Payload } from '../event/def';
declare module '../event/def' {
interface EventDefinitions {
runtime: {
[K in keyof FlattenedAppRuntimeConfig]: {
changed: Payload<FlattenedAppRuntimeConfig[K]>;
};
};
declare global {
interface Events {
'runtime.changed__NOT_IMPLEMENTED__': Partial<FlattenedAppRuntimeConfig>;
}
}
/**
* not implemented yet
*/
export const OnRuntimeConfigChange_DO_NOT_USE = (
nameWithModule: keyof FlattenedAppRuntimeConfig
) => {
return OnEvent(`runtime.${nameWithModule}.changed`);
};

View File

@@ -39,17 +39,18 @@ export class SocketIoAdapter extends IoAdapter {
}
const pubClient = this.app.get(SocketIoRedis);
pubClient.on('error', err => {
console.error(err);
});
const subClient = pubClient.duplicate();
subClient.on('error', err => {
console.error(err);
});
server.adapter(createAdapter(pubClient, subClient));
const close = server.close;
server.close = async fn => {
await close.call(server, fn);
// NOTE(@forehalo):
// the lifecycle of duplicated redis client will not be controlled by nestjs lifecycle
// we've got to manually disconnect it
subClient.disconnect();
};
return server;
}