From 6370f4592808652ea612d1bd409b1a2f9832aa78 Mon Sep 17 00:00:00 2001 From: forehalo Date: Sat, 25 Jan 2025 14:51:03 +0000 Subject: [PATCH] feat(server): cluster level event system (#9884) --- .../src/__tests__/event/cluster.spec.ts | 62 ++++++++ .../src/__tests__/event/eventbus.spec.ts | 111 ++++++++++++++ .../server/src/__tests__/event/provider.ts | 26 ++++ .../server/src/__tests__/models/user.spec.ts | 11 +- .../src/__tests__/models/workspace.spec.ts | 11 +- .../__tests__/nestjs/error-handler.spec.ts | 10 +- .../src/__tests__/payment/service.spec.ts | 10 +- .../backend/server/src/__tests__/team.e2e.ts | 10 +- .../server/src/__tests__/utils/utils.ts | 9 +- packages/backend/server/src/app.module.ts | 2 +- packages/backend/server/src/app.ts | 6 +- packages/backend/server/src/base/event/def.ts | 85 ++--------- .../backend/server/src/base/event/eventbus.ts | 141 ++++++++++++++++++ .../backend/server/src/base/event/index.ts | 45 +----- .../backend/server/src/base/event/types.ts | 22 --- packages/backend/server/src/base/index.ts | 2 +- .../server/src/base/metrics/metrics.ts | 3 +- .../server/src/base/redis/instances.ts | 24 ++- .../backend/server/src/base/runtime/event.ts | 21 +-- .../server/src/base/websocket/adapter.ts | 17 ++- .../server/src/core/doc-renderer/service.ts | 12 +- .../server/src/core/doc/adapters/workspace.ts | 21 ++- packages/backend/server/src/core/doc/job.ts | 10 +- .../server/src/core/features/management.ts | 7 +- .../server/src/core/permission/service.ts | 4 +- .../server/src/core/selfhost/controller.ts | 10 +- .../src/core/storage/wrappers/avatar.ts | 3 +- .../server/src/core/storage/wrappers/blob.ts | 24 ++- .../backend/server/src/core/user/event.ts | 6 +- .../backend/server/src/core/user/types.ts | 9 -- .../src/core/workspaces/resolvers/service.ts | 5 +- .../src/core/workspaces/resolvers/team.ts | 15 +- .../core/workspaces/resolvers/workspace.ts | 6 +- packages/backend/server/src/models/user.ts | 29 ++-- .../backend/server/src/models/workspace.ts | 51 ++++++- .../server/src/plugins/license/service.ts | 17 ++- .../server/src/plugins/payment/controller.ts | 7 +- .../server/src/plugins/payment/cron.ts | 22 +-- .../src/plugins/payment/manager/user.ts | 4 +- .../src/plugins/payment/manager/workspace.ts | 7 +- .../server/src/plugins/payment/quota.ts | 9 +- .../server/src/plugins/payment/types.ts | 69 ++++----- .../server/src/plugins/payment/webhook.ts | 23 ++- 43 files changed, 634 insertions(+), 364 deletions(-) create mode 100644 packages/backend/server/src/__tests__/event/cluster.spec.ts create mode 100644 packages/backend/server/src/__tests__/event/eventbus.spec.ts create mode 100644 packages/backend/server/src/__tests__/event/provider.ts create mode 100644 packages/backend/server/src/base/event/eventbus.ts delete mode 100644 packages/backend/server/src/base/event/types.ts diff --git a/packages/backend/server/src/__tests__/event/cluster.spec.ts b/packages/backend/server/src/__tests__/event/cluster.spec.ts new file mode 100644 index 0000000000..7c83337012 --- /dev/null +++ b/packages/backend/server/src/__tests__/event/cluster.spec.ts @@ -0,0 +1,62 @@ +import { INestApplication } from '@nestjs/common'; +import ava, { TestFn } from 'ava'; +import Sinon from 'sinon'; + +import { EventBus } from '../../base'; +import { SocketIoAdapter } from '../../base/websocket'; +import { createTestingModule } from '../utils'; +import { Listeners } from './provider'; + +const test = ava as TestFn<{ + app1: INestApplication; + app2: INestApplication; +}>; +async function createApp() { + const m = await createTestingModule( + { + providers: [Listeners], + }, + false + ); + + const app = m.createNestApplication({ logger: false }); + + app.useWebSocketAdapter(new SocketIoAdapter(app)); + await app.init(); + + return app; +} + +test.before(async t => { + t.context.app1 = await createApp(); + t.context.app2 = await createApp(); +}); + +test.after(async t => { + await t.context.app1.close(); + await t.context.app2.close(); +}); + +test('should broadcast event to cluster instances', async t => { + const { app1, app2 } = t.context; + + // app 1 for listening + const eventbus1 = app1.get(EventBus); + + const listener = Sinon.spy(app1.get(Listeners), 'onTestEvent'); + const runtimeListener = Sinon.stub().returns({ count: 2 }); + const off = eventbus1.on('__test__.event', runtimeListener); + + // app 2 for broadcasting + const eventbus2 = app2.get(EventBus); + eventbus2.broadcast('__test__.event', { count: 0 }); + + // cause the cross instances broadcasting is asynchronization calling + // we should wait for the event's arriving before asserting + await eventbus1.waitFor('__test__.event'); + + t.true(listener.calledOnceWith({ count: 0 })); + t.true(runtimeListener.calledOnceWith({ count: 0 })); + + off(); +}); diff --git a/packages/backend/server/src/__tests__/event/eventbus.spec.ts b/packages/backend/server/src/__tests__/event/eventbus.spec.ts new file mode 100644 index 0000000000..dc9e021771 --- /dev/null +++ b/packages/backend/server/src/__tests__/event/eventbus.spec.ts @@ -0,0 +1,111 @@ +import { TestingModule } from '@nestjs/testing'; +import ava, { TestFn } from 'ava'; +import Sinon from 'sinon'; + +import { EventBus, metrics } from '../../base'; +import { createTestingModule } from '../utils'; +import { Listeners } from './provider'; + +export const test = ava as TestFn<{ + module: TestingModule; + eventbus: EventBus; + listener: Sinon.SinonSpy; +}>; + +test.before(async t => { + const m = await createTestingModule({ + providers: [Listeners], + }); + const eventbus = m.get(EventBus); + t.context.module = m; + t.context.eventbus = eventbus; + t.context.listener = Sinon.spy(m.get(Listeners), 'onTestEvent'); +}); + +test.afterEach(() => { + Sinon.reset(); +}); + +test.after(async t => { + await t.context.module.close(); +}); + +test('should register event listener', t => { + const { eventbus } = t.context; + + // @ts-expect-error private member + t.true(eventbus.emitter.eventNames().includes('__test__.event')); + + eventbus.on('__test__.event2', () => {}); + // @ts-expect-error private member + t.true(eventbus.emitter.eventNames().includes('__test__.event2')); +}); + +test('should dispatch event listener', t => { + const { eventbus, listener } = t.context; + + const runtimeListener = Sinon.stub(); + const off = eventbus.on('__test__.event', runtimeListener); + + const payload = { count: 0 }; + eventbus.emit('__test__.event', payload); + + t.true(listener.calledOnceWithExactly(payload)); + t.true(runtimeListener.calledOnceWithExactly(payload)); + + off(); +}); + +test('should dispatch async event listener', async t => { + const { eventbus, listener } = t.context; + + const runtimeListener = Sinon.stub().returns({ count: 2 }); + const off = eventbus.on('__test__.event', runtimeListener); + + const payload = { count: 0 }; + const returns = await eventbus.emitAsync('__test__.event', payload); + + t.true(listener.calledOnceWithExactly(payload)); + t.true(runtimeListener.calledOnceWithExactly(payload)); + + t.deepEqual(returns, [{ count: 1 }, { count: 2 }]); + + off(); +}); + +test('should record event handler call metrics', async t => { + const { eventbus } = t.context; + const timerStub = Sinon.stub( + metrics.event.histogram('function_timer'), + 'record' + ); + const counterStub = Sinon.stub( + metrics.event.counter('function_calls'), + 'add' + ); + + await eventbus.emitAsync('__test__.event', { count: 0 }); + + t.deepEqual(timerStub.getCall(0).args[1], { + name: 'event_handler', + event: '__test__.event', + namespace: '__test__', + error: false, + }); + + t.deepEqual(counterStub.getCall(0).args[1], { + event: '__test__.event', + namespace: '__test__', + }); + + Sinon.reset(); + + await eventbus.emitAsync('__test__.throw', { count: 0 }); + + t.deepEqual(timerStub.getCall(0).args[1], { + name: 'event_handler', + event: '__test__.throw', + namespace: '__test__', + error: true, + }); +}); diff --git a/packages/backend/server/src/__tests__/event/provider.ts b/packages/backend/server/src/__tests__/event/provider.ts new file mode 100644 index 0000000000..296c55d1bd --- /dev/null +++ b/packages/backend/server/src/__tests__/event/provider.ts @@ -0,0 +1,26 @@ +import { Injectable } from '@nestjs/common'; + +import { OnEvent } from '../../base'; + +declare global { + interface Events { + '__test__.event': { count: number }; + '__test__.event2': { count: number }; + '__test__.throw': { count: number }; + } +} + +@Injectable() +export class Listeners { + @OnEvent('__test__.event') + onTestEvent({ count }: Events['__test__.event']) { + return { + count: count + 1, + }; + } + + @OnEvent('__test__.throw') + onThrow() { + throw new Error('Error in event handler'); + } +} diff --git a/packages/backend/server/src/__tests__/models/user.spec.ts b/packages/backend/server/src/__tests__/models/user.spec.ts index 1ff71207e1..a12d5f0641 100644 --- a/packages/backend/server/src/__tests__/models/user.spec.ts +++ b/packages/backend/server/src/__tests__/models/user.spec.ts @@ -1,10 +1,9 @@ -import { EventEmitter2 } from '@nestjs/event-emitter'; import { TestingModule } from '@nestjs/testing'; import { PrismaClient } from '@prisma/client'; import ava, { TestFn } from 'ava'; import Sinon from 'sinon'; -import { EmailAlreadyUsed } from '../../base'; +import { EmailAlreadyUsed, EventBus } from '../../base'; import { Permission } from '../../models/common'; import { UserModel } from '../../models/user'; import { WorkspaceMemberStatus } from '../../models/workspace'; @@ -46,7 +45,7 @@ test('should create a new user', async t => { }); test('should trigger user.created event', async t => { - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const spy = Sinon.spy(); event.on('user.created', spy); @@ -117,7 +116,7 @@ test('should not update email to an existing one', async t => { }); test('should trigger user.updated event', async t => { - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const spy = Sinon.spy(); event.on('user.updated', spy); @@ -217,7 +216,7 @@ test('should fulfill user', async t => { }); test('should trigger user.updated event when fulfilling user', async t => { - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const createSpy = Sinon.spy(); const updateSpy = Sinon.spy(); event.on('user.created', createSpy); @@ -250,7 +249,7 @@ test('should delete user', async t => { }); test('should trigger user.deleted event', async t => { - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const spy = Sinon.spy(); event.on('user.deleted', spy); diff --git a/packages/backend/server/src/__tests__/models/workspace.spec.ts b/packages/backend/server/src/__tests__/models/workspace.spec.ts index cf095b3a7d..fccb5acf0a 100644 --- a/packages/backend/server/src/__tests__/models/workspace.spec.ts +++ b/packages/backend/server/src/__tests__/models/workspace.spec.ts @@ -1,10 +1,9 @@ -import { EventEmitter2 } from '@nestjs/event-emitter'; import { TestingModule } from '@nestjs/testing'; import { PrismaClient, WorkspaceMemberStatus } from '@prisma/client'; import ava, { TestFn } from 'ava'; import Sinon from 'sinon'; -import { Config } from '../../base/config'; +import { Config, EventBus } from '../../base'; import { Permission } from '../../models/common'; import { UserModel } from '../../models/user'; import { WorkspaceModel } from '../../models/workspace'; @@ -305,7 +304,7 @@ test('should grant member with read permission and Pending status by default', a email: 'test2@affine.pro', }); - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const updatedSpy = Sinon.spy(); event.on('workspace.members.updated', updatedSpy); const member1 = await t.context.workspace.grantMember( @@ -829,7 +828,7 @@ test('should delete workspace member in Pending, Accepted status', async t => { ); t.is(member.status, WorkspaceMemberStatus.Pending); - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const updatedSpy = Sinon.spy(); event.on('workspace.members.updated', updatedSpy); let success = await t.context.workspace.deleteMember( @@ -880,7 +879,7 @@ test('should trigger workspace.members.requestDeclined event when delete workspa ); t.is(member.status, WorkspaceMemberStatus.UnderReview); - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const updatedSpy = Sinon.spy(); const requestDeclinedSpy = Sinon.spy(); event.on('workspace.members.updated', updatedSpy); @@ -925,7 +924,7 @@ test('should trigger workspace.members.requestDeclined event when delete workspa ); t.is(member.status, WorkspaceMemberStatus.NeedMoreSeatAndReview); - const event = t.context.module.get(EventEmitter2); + const event = t.context.module.get(EventBus); const updatedSpy = Sinon.spy(); const requestDeclinedSpy = Sinon.spy(); event.on('workspace.members.updated', updatedSpy); diff --git a/packages/backend/server/src/__tests__/nestjs/error-handler.spec.ts b/packages/backend/server/src/__tests__/nestjs/error-handler.spec.ts index 31c252bb3d..a06dd1d313 100644 --- a/packages/backend/server/src/__tests__/nestjs/error-handler.spec.ts +++ b/packages/backend/server/src/__tests__/nestjs/error-handler.spec.ts @@ -105,18 +105,21 @@ function gql(app: INestApplication, query: string) { .expect(200); } -test.beforeEach(async ({ context }) => { +test.before(async ({ context }) => { const { app } = await createTestingApp({ providers: [TestResolver, TestGateway], controllers: [TestController], }); context.logger = Sinon.stub(new Logger().localInstance); - context.app = app; }); -test.afterEach.always(async ctx => { +test.beforeEach(() => { + Sinon.reset(); +}); + +test.after.always(async ctx => { await ctx.context.app.close(); }); @@ -131,6 +134,7 @@ test('should be able to handle known user error in graphql query', async t => { t.is(err.message, 'You do not have permission to access this resource.'); t.is(err.extensions.status, HttpStatus.FORBIDDEN); t.is(err.extensions.name, 'ACCESS_DENIED'); + // console.log(t.context.logger.error.getCalls()); t.true(t.context.logger.error.notCalled); }); diff --git a/packages/backend/server/src/__tests__/payment/service.spec.ts b/packages/backend/server/src/__tests__/payment/service.spec.ts index 383f524c50..334afa39c8 100644 --- a/packages/backend/server/src/__tests__/payment/service.spec.ts +++ b/packages/backend/server/src/__tests__/payment/service.spec.ts @@ -7,7 +7,7 @@ import Sinon from 'sinon'; import Stripe from 'stripe'; import { AppModule } from '../../app.module'; -import { EventEmitter, Runtime } from '../../base'; +import { EventBus, Runtime } from '../../base'; import { ConfigModule } from '../../base/config'; import { CurrentUser } from '../../core/auth'; import { AuthService } from '../../core/auth/service'; @@ -158,7 +158,7 @@ const test = ava as TestFn<{ db: PrismaClient; app: INestApplication; service: SubscriptionService; - event: Sinon.SinonStubbedInstance; + event: Sinon.SinonStubbedInstance; feature: Sinon.SinonStubbedInstance; runtime: Sinon.SinonStubbedInstance; stripe: { @@ -203,14 +203,12 @@ test.before(async t => { m.overrideProvider(FeatureManagementService).useValue( Sinon.createStubInstance(FeatureManagementService) ); - m.overrideProvider(EventEmitter).useValue( - Sinon.createStubInstance(EventEmitter) - ); + m.overrideProvider(EventBus).useValue(Sinon.createStubInstance(EventBus)); m.overrideProvider(Runtime).useValue(Sinon.createStubInstance(Runtime)); }, }); - t.context.event = app.get(EventEmitter); + t.context.event = app.get(EventBus); t.context.service = app.get(SubscriptionService); t.context.feature = app.get(FeatureManagementService); t.context.runtime = app.get(Runtime); diff --git a/packages/backend/server/src/__tests__/team.e2e.ts b/packages/backend/server/src/__tests__/team.e2e.ts index 7271be322f..87c0c68c22 100644 --- a/packages/backend/server/src/__tests__/team.e2e.ts +++ b/packages/backend/server/src/__tests__/team.e2e.ts @@ -10,7 +10,7 @@ import ava from 'ava'; import Sinon from 'sinon'; import { AppModule } from '../app.module'; -import { EventEmitter } from '../base'; +import { EventBus } from '../base'; import { AuthService } from '../core/auth'; import { DocContentService } from '../core/doc-renderer'; import { Permission, PermissionService } from '../core/permission'; @@ -41,7 +41,7 @@ import { const test = ava as TestFn<{ app: INestApplication; auth: AuthService; - event: Sinon.SinonStubbedInstance; + event: Sinon.SinonStubbedInstance; quota: QuotaService; quotaManager: QuotaManagementService; permissions: PermissionService; @@ -52,8 +52,8 @@ test.beforeEach(async t => { imports: [AppModule], tapModule: module => { module - .overrideProvider(EventEmitter) - .useValue(Sinon.createStubInstance(EventEmitter)); + .overrideProvider(EventBus) + .useValue(Sinon.createStubInstance(EventBus)); module.overrideProvider(DocContentService).useValue({ getWorkspaceContent() { return { @@ -67,7 +67,7 @@ test.beforeEach(async t => { t.context.app = app; t.context.auth = app.get(AuthService); - t.context.event = app.get(EventEmitter); + t.context.event = app.get(EventBus); t.context.quota = app.get(QuotaService); t.context.quotaManager = app.get(QuotaManagementService); t.context.permissions = app.get(PermissionService); diff --git a/packages/backend/server/src/__tests__/utils/utils.ts b/packages/backend/server/src/__tests__/utils/utils.ts index bb8918f0c4..d1261af230 100644 --- a/packages/backend/server/src/__tests__/utils/utils.ts +++ b/packages/backend/server/src/__tests__/utils/utils.ts @@ -112,7 +112,12 @@ export async function createTestingModule( if (init) { await m.init(); - + // we got a lot smoking tests try to break nestjs + // can't tolerate the noisy logs + // @ts-expect-error private + m.applyLogger({ + logger: ['fatal'], + }); const runtime = m.get(Runtime); // by pass password min length validation await runtime.set('auth/password.min', 1); @@ -128,7 +133,7 @@ export async function createTestingApp(moduleDef: TestingModuleMeatdata = {}) { cors: true, bodyParser: true, rawBody: true, - logger: ['warn'], + logger: ['fatal'], }); app.useGlobalFilters(new GlobalExceptionFilter(app.getHttpAdapter())); diff --git a/packages/backend/server/src/app.module.ts b/packages/backend/server/src/app.module.ts index 98719c3d80..8af87b6328 100644 --- a/packages/backend/server/src/app.module.ts +++ b/packages/backend/server/src/app.module.ts @@ -97,6 +97,7 @@ export const FunctionalityModules = [ HelpersModule, ErrorModule, LoggerModule, + WebSocketModule, ]; function filterOptionalModule( @@ -197,7 +198,6 @@ export function buildAppModule() { // basic .use(...FunctionalityModules) .use(ModelsModule) - .useIf(config => config.flavor.sync, WebSocketModule) // auth .use(UserModule, AuthModule, PermissionModule) diff --git a/packages/backend/server/src/app.ts b/packages/backend/server/src/app.ts index 36833e9c6f..824eb29afd 100644 --- a/packages/backend/server/src/app.ts +++ b/packages/backend/server/src/app.ts @@ -45,10 +45,8 @@ export async function createApp() { app.useGlobalFilters(new GlobalExceptionFilter(app.getHttpAdapter())); app.use(cookieParser()); - if (AFFiNE.flavor.sync) { - const adapter = new SocketIoAdapter(app); - app.useWebSocketAdapter(adapter); - } + const adapter = new SocketIoAdapter(app); + app.useWebSocketAdapter(adapter); if (AFFiNE.isSelfhosted && AFFiNE.metrics.telemetry.enabled) { const mixpanel = await import('mixpanel'); diff --git a/packages/backend/server/src/base/event/def.ts b/packages/backend/server/src/base/event/def.ts index f89c3fbed1..5dace9605c 100644 --- a/packages/backend/server/src/base/event/def.ts +++ b/packages/backend/server/src/base/event/def.ts @@ -1,71 +1,18 @@ -import type { Snapshot, User, Workspace } from '@prisma/client'; - -import { Flatten, Payload } from './types'; - -export interface WorkspaceEvents { - members: { - reviewRequested: Payload<{ inviteId: string }>; - requestDeclined: Payload<{ - userId: User['id']; - workspaceId: Workspace['id']; - }>; - requestApproved: Payload<{ inviteId: string }>; - roleChanged: Payload<{ - userId: User['id']; - workspaceId: Workspace['id']; - permission: number; - }>; - ownershipTransferred: Payload<{ - from: User['id']; - to: User['id']; - workspaceId: Workspace['id']; - }>; - ownershipReceived: Payload<{ workspaceId: Workspace['id'] }>; - updated: Payload<{ workspaceId: Workspace['id']; count: number }>; - leave: Payload<{ - user: Pick; - workspaceId: Workspace['id']; - }>; - removed: Payload<{ workspaceId: Workspace['id']; userId: User['id'] }>; - }; - deleted: Payload; - blob: { - deleted: Payload<{ - workspaceId: Workspace['id']; - key: string; - }>; - sync: Payload<{ - workspaceId: Workspace['id']; - key: string; - }>; - }; +declare global { + /** + * Event definitions can be extended by + * + * @example + * + * declare global { + * interface Events { + * 'user.subscription.created': { + * userId: User['id']; + * } + * } + * } + */ + interface Events {} } -export interface DocEvents { - deleted: Payload>; - updated: Payload>; -} - -/** - * Event definitions can be extended by - * - * @example - * - * declare module './event/def' { - * interface UserEvents { - * created: Payload; - * } - * } - * - * assert() - */ -export interface EventDefinitions { - workspace: WorkspaceEvents; - snapshot: DocEvents; -} - -export type EventKV = Flatten; - -export type Event = keyof EventKV; -export type EventPayload = EventKV[E]; -export type { Payload }; +export type EventName = keyof Events; diff --git a/packages/backend/server/src/base/event/eventbus.ts b/packages/backend/server/src/base/event/eventbus.ts new file mode 100644 index 0000000000..c7fb0f5429 --- /dev/null +++ b/packages/backend/server/src/base/event/eventbus.ts @@ -0,0 +1,141 @@ +import { + applyDecorators, + Injectable, + Logger, + OnApplicationBootstrap, +} from '@nestjs/common'; +import { + EventEmitter2, + EventEmitterReadinessWatcher, + OnEvent as RawOnEvent, + OnEventMetadata, +} from '@nestjs/event-emitter'; +import { + OnGatewayConnection, + WebSocketGateway, + WebSocketServer, +} from '@nestjs/websockets'; +import type { Server, Socket } from 'socket.io'; + +import { CallMetric } from '../metrics'; +import type { EventName } from './def'; + +const EventHandlerWrapper = (event: EventName): MethodDecorator => { + // @ts-expect-error allow + return ( + _target, + key, + desc: TypedPropertyDescriptor<(...args: any[]) => any> + ) => { + const originalMethod = desc.value; + if (!originalMethod) { + return desc; + } + + desc.value = function (...args: any[]) { + new Logger(EventBus.name).log( + `Event handler: ${event} (${key.toString()})` + ); + return originalMethod.apply(this, args); + }; + }; +}; + +export const OnEvent = ( + event: EventName, + opts?: OnEventMetadata['options'] +) => { + const namespace = event.split('.')[0]; + + return applyDecorators( + EventHandlerWrapper(event), + CallMetric('event', 'event_handler', undefined, { event, namespace }), + RawOnEvent(event, opts) + ); +}; + +/** + * We use socket.io system to auto pub/sub on server to server broadcast events + */ +@WebSocketGateway({ + namespace: 's2s', +}) +@Injectable() +export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { + private readonly logger = new Logger(EventBus.name); + + @WebSocketServer() + private readonly server?: Server; + + constructor( + private readonly emitter: EventEmitter2, + private readonly watcher: EventEmitterReadinessWatcher + ) {} + + handleConnection(client: Socket) { + // for internal usage only, disallow any connection from client + this.logger.warn( + `EventBus get suspicious connection from client ${client.id}, disconnecting...` + ); + client.disconnect(); + } + + async onApplicationBootstrap() { + this.watcher + .waitUntilReady() + .then(() => { + const events = this.emitter.eventNames() as EventName[]; + events.forEach(event => { + // Proxy all events received from server(trigger by `server.serverSideEmit`) + // to internal event system + this.server?.on(event, payload => { + this.logger.log(`Server Event: ${event} (Received)`); + this.emit(event, payload); + }); + }); + }) + .catch(() => { + // startup time promise, never throw at runtime + }); + } + + /** + * Emit event to trigger all listeners on current instance + */ + async emitAsync(event: T, payload: Events[T]) { + this.logger.log(`Dispatch event: ${event} (async)`); + return await this.emitter.emitAsync(event, payload); + } + + /** + * Emit event to trigger all listeners on current instance + */ + emit(event: T, payload: Events[T]) { + this.logger.log(`Dispatch event: ${event}`); + return this.emitter.emit(event, payload); + } + + /** + * Broadcast event to trigger all listeners on all instance in cluster + */ + broadcast(event: T, payload: Events[T]) { + this.logger.log(`Server Event: ${event} (Send)`); + this.server?.serverSideEmit(event, payload); + } + + on( + event: T, + listener: (payload: Events[T]) => void | Promise, + opts?: OnEventMetadata['options'] + ) { + this.emitter.on(event, listener as any, opts); + + return () => { + this.emitter.off(event, listener as any); + }; + } + + waitFor(name: T, timeout?: number) { + return this.emitter.waitFor(name, timeout); + } +} diff --git a/packages/backend/server/src/base/event/index.ts b/packages/backend/server/src/base/event/index.ts index 768d9b60a4..9dea08d1eb 100644 --- a/packages/backend/server/src/base/event/index.ts +++ b/packages/backend/server/src/base/event/index.ts @@ -1,43 +1,14 @@ -import { Global, Injectable, Module } from '@nestjs/common'; -import { - EventEmitter2, - EventEmitterModule, - OnEvent as RawOnEvent, -} from '@nestjs/event-emitter'; +import { Global, Module } from '@nestjs/common'; +import { EventEmitterModule } from '@nestjs/event-emitter'; -import type { Event, EventPayload } from './def'; - -@Injectable() -export class EventEmitter { - constructor(private readonly emitter: EventEmitter2) {} - - emit(event: E, payload: EventPayload) { - return this.emitter.emit(event, payload); - } - - emitAsync(event: E, payload: EventPayload) { - return this.emitter.emitAsync(event, payload); - } - - on(event: E, handler: (payload: EventPayload) => void) { - return this.emitter.on(event, handler); - } - - once(event: E, handler: (payload: EventPayload) => void) { - return this.emitter.once(event, handler); - } -} - -export const OnEvent = RawOnEvent as ( - event: Event, - opts?: Parameters[1] -) => MethodDecorator; +import { EventBus, OnEvent } from './eventbus'; @Global() @Module({ - imports: [EventEmitterModule.forRoot()], - providers: [EventEmitter], - exports: [EventEmitter], + imports: [EventEmitterModule.forRoot({ global: false })], + providers: [EventBus], + exports: [EventBus], }) export class EventModule {} -export { Event, EventPayload }; + +export { EventBus, OnEvent }; diff --git a/packages/backend/server/src/base/event/types.ts b/packages/backend/server/src/base/event/types.ts deleted file mode 100644 index 94e66e3756..0000000000 --- a/packages/backend/server/src/base/event/types.ts +++ /dev/null @@ -1,22 +0,0 @@ -import type { Join, PathType } from '../utils/types'; - -export type Payload = { - __payload: true; - data: T; -}; - -export type Leaves = - T extends Record - ? { - [K in keyof T]: K extends string - ? T[K] extends Payload - ? K - : Join> - : never; - }[keyof T] - : never; - -export type Flatten> = { - // @ts-expect-error allow - [K in Leaves]: PathType extends Payload ? U : never; -}; diff --git a/packages/backend/server/src/base/index.ts b/packages/backend/server/src/base/index.ts index af1b7f2335..d6ad06de8a 100644 --- a/packages/backend/server/src/base/index.ts +++ b/packages/backend/server/src/base/index.ts @@ -14,7 +14,7 @@ export { getAFFiNEConfigModifier, } from './config'; export * from './error'; -export { EventEmitter, type EventPayload, OnEvent } from './event'; +export { EventBus, OnEvent } from './event'; export type { GraphqlContext } from './graphql'; export * from './guard'; export { CryptoHelper, URLHelper } from './helpers'; diff --git a/packages/backend/server/src/base/metrics/metrics.ts b/packages/backend/server/src/base/metrics/metrics.ts index 555bfe1bcc..a73979424b 100644 --- a/packages/backend/server/src/base/metrics/metrics.ts +++ b/packages/backend/server/src/base/metrics/metrics.ts @@ -37,7 +37,8 @@ export type KnownMetricScopes = | 'doc' | 'sse' | 'mail' - | 'ai'; + | 'ai' + | 'event'; const metricCreators: MetricCreators = { counter(meter: Meter, name: string, opts?: MetricOptions) { diff --git a/packages/backend/server/src/base/redis/instances.ts b/packages/backend/server/src/base/redis/instances.ts index 7bec65f3f5..5fb3967c8b 100644 --- a/packages/backend/server/src/base/redis/instances.ts +++ b/packages/backend/server/src/base/redis/instances.ts @@ -1,16 +1,36 @@ -import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { + Injectable, + Logger, + OnModuleDestroy, + OnModuleInit, +} from '@nestjs/common'; import { Redis as IORedis, RedisOptions } from 'ioredis'; import { Config } from '../../base/config'; -class Redis extends IORedis implements OnModuleDestroy { +class Redis extends IORedis implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(this.constructor.name); constructor(opts: RedisOptions) { super(opts); } + errorHandler = (err: Error) => { + this.logger.error(err); + }; + + onModuleInit() { + this.on('error', this.errorHandler); + } + onModuleDestroy() { this.disconnect(); } + + override duplicate(override?: Partial): IORedis { + const client = super.duplicate(override); + client.on('error', this.errorHandler); + return client; + } } @Injectable() diff --git a/packages/backend/server/src/base/runtime/event.ts b/packages/backend/server/src/base/runtime/event.ts index 6401c502b5..6697747a00 100644 --- a/packages/backend/server/src/base/runtime/event.ts +++ b/packages/backend/server/src/base/runtime/event.ts @@ -1,22 +1,7 @@ import { FlattenedAppRuntimeConfig } from '../config/types'; -import { OnEvent } from '../event'; -import { Payload } from '../event/def'; -declare module '../event/def' { - interface EventDefinitions { - runtime: { - [K in keyof FlattenedAppRuntimeConfig]: { - changed: Payload; - }; - }; +declare global { + interface Events { + 'runtime.changed__NOT_IMPLEMENTED__': Partial; } } - -/** - * not implemented yet - */ -export const OnRuntimeConfigChange_DO_NOT_USE = ( - nameWithModule: keyof FlattenedAppRuntimeConfig -) => { - return OnEvent(`runtime.${nameWithModule}.changed`); -}; diff --git a/packages/backend/server/src/base/websocket/adapter.ts b/packages/backend/server/src/base/websocket/adapter.ts index 9153b000a0..b466d557fc 100644 --- a/packages/backend/server/src/base/websocket/adapter.ts +++ b/packages/backend/server/src/base/websocket/adapter.ts @@ -39,17 +39,18 @@ export class SocketIoAdapter extends IoAdapter { } const pubClient = this.app.get(SocketIoRedis); - - pubClient.on('error', err => { - console.error(err); - }); - const subClient = pubClient.duplicate(); - subClient.on('error', err => { - console.error(err); - }); server.adapter(createAdapter(pubClient, subClient)); + const close = server.close; + + server.close = async fn => { + await close.call(server, fn); + // NOTE(@forehalo): + // the lifecycle of duplicated redis client will not be controlled by nestjs lifecycle + // we've got to manually disconnect it + subClient.disconnect(); + }; return server; } diff --git a/packages/backend/server/src/core/doc-renderer/service.ts b/packages/backend/server/src/core/doc-renderer/service.ts index bac7da524f..131f52f853 100644 --- a/packages/backend/server/src/core/doc-renderer/service.ts +++ b/packages/backend/server/src/core/doc-renderer/service.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import { applyUpdate, Doc } from 'yjs'; -import { Cache, type EventPayload, OnEvent } from '../../base'; +import { Cache, OnEvent } from '../../base'; import { PgWorkspaceDocStorageAdapter } from '../doc'; import { type PageDocContent, @@ -78,15 +78,15 @@ export class DocContentService { return content; } - @OnEvent('snapshot.updated') + @OnEvent('doc.snapshot.updated') async markDocContentCacheStale({ workspaceId, - id, - }: EventPayload<'snapshot.updated'>) { + docId, + }: Events['doc.snapshot.updated']) { const key = - workspaceId === id + workspaceId === docId ? `workspace:${workspaceId}:content` - : `workspace:${workspaceId}:doc:${id}:content`; + : `workspace:${workspaceId}:doc:${docId}:content`; await this.cache.delete(key); } } diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts index 7bf9f6828e..c2d1c4e277 100644 --- a/packages/backend/server/src/core/doc/adapters/workspace.ts +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -6,7 +6,7 @@ import { Cache, DocHistoryNotFound, DocNotFound, - EventEmitter, + EventBus, FailedToSaveUpdates, FailedToUpsertSnapshot, metrics, @@ -22,7 +22,18 @@ import { } from '../storage'; const UPDATES_QUEUE_CACHE_KEY = 'doc:manager:updates'; - +declare global { + interface Events { + 'doc.snapshot.deleted': { + workspaceId: string; + docId: string; + }; + 'doc.snapshot.updated': { + workspaceId: string; + docId: string; + }; + } +} @Injectable() export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { private readonly logger = new Logger(PgWorkspaceDocStorageAdapter.name); @@ -31,7 +42,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { private readonly db: PrismaClient, private readonly mutex: Mutex, private readonly cache: Cache, - private readonly event: EventEmitter, + private readonly event: EventBus, protected override readonly options: DocStorageOptions ) { super(options); @@ -470,9 +481,9 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { const updatedSnapshot = result.at(0); if (updatedSnapshot) { - this.event.emit('snapshot.updated', { + this.event.emit('doc.snapshot.updated', { workspaceId: snapshot.spaceId, - id: snapshot.docId, + docId: snapshot.docId, }); } diff --git a/packages/backend/server/src/core/doc/job.ts b/packages/backend/server/src/core/doc/job.ts index f4551f1484..9b3a20ec37 100644 --- a/packages/backend/server/src/core/doc/job.ts +++ b/packages/backend/server/src/core/doc/job.ts @@ -2,13 +2,7 @@ import { Injectable, Logger, OnModuleInit, Optional } from '@nestjs/common'; import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'; import { PrismaClient } from '@prisma/client'; -import { - CallMetric, - Config, - type EventPayload, - metrics, - OnEvent, -} from '../../base'; +import { CallMetric, Config, metrics, OnEvent } from '../../base'; import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; @Injectable() @@ -81,7 +75,7 @@ export class DocStorageCronJob implements OnModuleInit { } @OnEvent('user.deleted') - async clearUserWorkspaces(payload: EventPayload<'user.deleted'>) { + async clearUserWorkspaces(payload: Events['user.deleted']) { for (const workspace of payload.ownedWorkspaces) { await this.workspace.deleteSpace(workspace); } diff --git a/packages/backend/server/src/core/features/management.ts b/packages/backend/server/src/core/features/management.ts index c8cf4ceea4..898c139e58 100644 --- a/packages/backend/server/src/core/features/management.ts +++ b/packages/backend/server/src/core/features/management.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; -import { type EventPayload, OnEvent, Runtime } from '../../base'; +import { Runtime } from '../../base'; import { Models } from '../../models'; import { FeatureService } from './service'; import { FeatureType } from './types'; @@ -167,9 +167,4 @@ export class FeatureManagementService { async listFeatureWorkspaces(feature: FeatureType) { return this.feature.listWorkspacesByFeature(feature); } - - @OnEvent('user.admin.created') - async onAdminUserCreated({ id }: EventPayload<'user.admin.created'>) { - await this.addAdmin(id); - } } diff --git a/packages/backend/server/src/core/permission/service.ts b/packages/backend/server/src/core/permission/service.ts index 8e15ec6fe3..489453c877 100644 --- a/packages/backend/server/src/core/permission/service.ts +++ b/packages/backend/server/src/core/permission/service.ts @@ -5,7 +5,7 @@ import { groupBy } from 'lodash-es'; import { DocAccessDenied, - EventEmitter, + EventBus, SpaceAccessDenied, SpaceOwnerNotFound, } from '../../base'; @@ -15,7 +15,7 @@ import { Permission, PublicPageMode } from './types'; export class PermissionService { constructor( private readonly prisma: PrismaClient, - private readonly event: EventEmitter + private readonly event: EventBus ) {} private get acceptedCondition() { diff --git a/packages/backend/server/src/core/selfhost/controller.ts b/packages/backend/server/src/core/selfhost/controller.ts index e1d80abbd9..35ba8a3136 100644 --- a/packages/backend/server/src/core/selfhost/controller.ts +++ b/packages/backend/server/src/core/selfhost/controller.ts @@ -3,7 +3,6 @@ import type { Request, Response } from 'express'; import { ActionForbidden, - EventEmitter, InternalServerError, Mutex, PasswordRequired, @@ -24,7 +23,6 @@ export class CustomSetupController { constructor( private readonly models: Models, private readonly auth: AuthService, - private readonly event: EventEmitter, private readonly mutex: Mutex, private readonly server: ServerService, private readonly runtime: Runtime @@ -62,7 +60,6 @@ export class CustomSetupController { if (!lock) { throw new InternalServerError(); } - const user = await this.models.user.create({ email: input.email, password: input.password, @@ -70,7 +67,12 @@ export class CustomSetupController { }); try { - await this.event.emitAsync('user.admin.created', user); + await this.models.userFeature.add( + user.id, + 'administrator', + 'selfhost setup' + ); + await this.auth.setCookies(req, res, user.id); res.send({ id: user.id, email: user.email, name: user.name }); } catch (e) { diff --git a/packages/backend/server/src/core/storage/wrappers/avatar.ts b/packages/backend/server/src/core/storage/wrappers/avatar.ts index ee69794268..ca4e9f3c81 100644 --- a/packages/backend/server/src/core/storage/wrappers/avatar.ts +++ b/packages/backend/server/src/core/storage/wrappers/avatar.ts @@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common'; import type { BlobInputType, - EventPayload, PutObjectMetadata, StorageProvider, } from '../../../base'; @@ -47,7 +46,7 @@ export class AvatarStorage { } @OnEvent('user.deleted') - async onUserDeleted(user: EventPayload<'user.deleted'>) { + async onUserDeleted(user: Events['user.deleted']) { if (user.avatarUrl) { await this.delete(user.avatarUrl); } diff --git a/packages/backend/server/src/core/storage/wrappers/blob.ts b/packages/backend/server/src/core/storage/wrappers/blob.ts index 85e4d50ad1..caf8ca972b 100644 --- a/packages/backend/server/src/core/storage/wrappers/blob.ts +++ b/packages/backend/server/src/core/storage/wrappers/blob.ts @@ -4,8 +4,7 @@ import { PrismaClient } from '@prisma/client'; import { autoMetadata, Config, - EventEmitter, - type EventPayload, + EventBus, type GetObjectMetadata, ListObjectsMetadata, OnEvent, @@ -21,7 +20,7 @@ export class WorkspaceBlobStorage { constructor( private readonly config: Config, - private readonly event: EventEmitter, + private readonly event: EventBus, private readonly storageFactory: StorageProviderFactory, private readonly db: PrismaClient ) { @@ -105,7 +104,7 @@ export class WorkspaceBlobStorage { }); deletedBlobs.forEach(blob => { - this.event.emit('workspace.blob.deleted', { + this.event.emit('workspace.blob.delete', { workspaceId: workspaceId, key: blob.key, }); @@ -152,10 +151,7 @@ export class WorkspaceBlobStorage { } @OnEvent('workspace.blob.sync') - async syncBlobMeta({ - workspaceId, - key, - }: EventPayload<'workspace.blob.sync'>) { + async syncBlobMeta({ workspaceId, key }: Events['workspace.blob.sync']) { try { const meta = await this.provider.head(`${workspaceId}/${key}`); @@ -176,23 +172,23 @@ export class WorkspaceBlobStorage { } @OnEvent('workspace.deleted') - async onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) { - const blobs = await this.list(workspaceId); + async onWorkspaceDeleted({ id }: Events['workspace.deleted']) { + const blobs = await this.list(id); // to reduce cpu time holding blobs.forEach(blob => { - this.event.emit('workspace.blob.deleted', { - workspaceId: workspaceId, + this.event.emit('workspace.blob.delete', { + workspaceId: id, key: blob.key, }); }); } - @OnEvent('workspace.blob.deleted') + @OnEvent('workspace.blob.delete') async onDeleteWorkspaceBlob({ workspaceId, key, - }: EventPayload<'workspace.blob.deleted'>) { + }: Events['workspace.blob.delete']) { await this.delete(workspaceId, key, true); } } diff --git a/packages/backend/server/src/core/user/event.ts b/packages/backend/server/src/core/user/event.ts index 89ec485221..6dc124f25d 100644 --- a/packages/backend/server/src/core/user/event.ts +++ b/packages/backend/server/src/core/user/event.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; -import { Config, type EventPayload, OnEvent } from '../../base'; +import { Config, OnEvent } from '../../base'; @Injectable() export class UserEventsListener { @@ -9,7 +9,7 @@ export class UserEventsListener { constructor(private readonly config: Config) {} @OnEvent('user.updated') - async onUserUpdated(user: EventPayload<'user.updated'>) { + async onUserUpdated(user: Events['user.updated']) { const { enabled, customerIo } = this.config.metrics; if (enabled && customerIo?.token) { const payload = { @@ -33,7 +33,7 @@ export class UserEventsListener { } @OnEvent('user.deleted') - async onUserDeleted(user: EventPayload<'user.deleted'>) { + async onUserDeleted(user: Events['user.deleted']) { const { enabled, customerIo } = this.config.metrics; if (enabled && customerIo?.token) { try { diff --git a/packages/backend/server/src/core/user/types.ts b/packages/backend/server/src/core/user/types.ts index 58062ba2bb..fd246f58c4 100644 --- a/packages/backend/server/src/core/user/types.ts +++ b/packages/backend/server/src/core/user/types.ts @@ -7,7 +7,6 @@ import { } from '@nestjs/graphql'; import type { User } from '@prisma/client'; -import type { Payload } from '../../base/event/def'; import { type CurrentUser } from '../auth/session'; @ObjectType() @@ -91,11 +90,3 @@ export class ManageUserInput { @Field({ description: 'User name', nullable: true }) name?: string; } - -declare module '../../base/event/def' { - interface UserEvents { - admin: { - created: Payload<{ id: string }>; - }; - } -} diff --git a/packages/backend/server/src/core/workspaces/resolvers/service.ts b/packages/backend/server/src/core/workspaces/resolvers/service.ts index d0c2fdacb5..bffb8d16f2 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/service.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/service.ts @@ -4,7 +4,6 @@ import { getStreamAsBuffer } from 'get-stream'; import { Cache, - type EventPayload, MailService, OnEvent, URLHelper, @@ -256,7 +255,7 @@ export class WorkspaceService { async onMemberLeave({ user, workspaceId, - }: EventPayload<'workspace.members.leave'>) { + }: Events['workspace.members.leave']) { const workspace = await this.getWorkspaceInfo(workspaceId); const owner = await this.permission.getWorkspaceOwner(workspaceId); await this.mailer.sendMemberLeaveEmail(owner.email, { @@ -269,7 +268,7 @@ export class WorkspaceService { async onMemberRemoved({ userId, workspaceId, - }: EventPayload<'workspace.members.requestDeclined'>) { + }: Events['workspace.members.requestDeclined']) { const user = await this.models.user.get(userId); if (!user) return; diff --git a/packages/backend/server/src/core/workspaces/resolvers/team.ts b/packages/backend/server/src/core/workspaces/resolvers/team.ts index dcb5d5edf8..81848fb476 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/team.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/team.ts @@ -11,8 +11,7 @@ import { nanoid } from 'nanoid'; import { Cache, - EventEmitter, - type EventPayload, + EventBus, MemberNotFoundInSpace, OnEvent, RequestMutex, @@ -43,7 +42,7 @@ export class TeamWorkspaceResolver { constructor( private readonly cache: Cache, - private readonly event: EventEmitter, + private readonly event: EventBus, private readonly url: URLHelper, private readonly prisma: PrismaClient, private readonly permissions: PermissionService, @@ -344,7 +343,7 @@ export class TeamWorkspaceResolver { @OnEvent('workspace.members.reviewRequested') async onReviewRequested({ inviteId, - }: EventPayload<'workspace.members.reviewRequested'>) { + }: Events['workspace.members.reviewRequested']) { // send review request mail to owner and admin await this.workspaceService.sendReviewRequestedEmail(inviteId); } @@ -352,7 +351,7 @@ export class TeamWorkspaceResolver { @OnEvent('workspace.members.requestApproved') async onApproveRequest({ inviteId, - }: EventPayload<'workspace.members.requestApproved'>) { + }: Events['workspace.members.requestApproved']) { // send approve mail await this.workspaceService.sendReviewApproveEmail(inviteId); } @@ -361,7 +360,7 @@ export class TeamWorkspaceResolver { async onDeclineRequest({ userId, workspaceId, - }: EventPayload<'workspace.members.requestDeclined'>) { + }: Events['workspace.members.requestDeclined']) { const user = await this.models.user.getPublicUser(userId); // send decline mail await this.workspaceService.sendReviewDeclinedEmail( @@ -375,7 +374,7 @@ export class TeamWorkspaceResolver { userId, workspaceId, permission, - }: EventPayload<'workspace.members.roleChanged'>) { + }: Events['workspace.members.roleChanged']) { // send role changed mail await this.workspaceService.sendRoleChangedEmail(userId, { id: workspaceId, @@ -388,7 +387,7 @@ export class TeamWorkspaceResolver { workspaceId, from, to, - }: EventPayload<'workspace.members.ownershipTransferred'>) { + }: Events['workspace.members.ownershipTransferred']) { // send ownership transferred mail const fromUser = await this.models.user.getPublicUser(from); const toUser = await this.models.user.getPublicUser(to); diff --git a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts index a73bc619ab..fbac828fd5 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts @@ -18,7 +18,7 @@ import { AlreadyInSpace, Cache, DocNotFound, - EventEmitter, + EventBus, InternalServerError, MemberQuotaExceeded, QueryTooLong, @@ -83,7 +83,7 @@ export class WorkspaceResolver { private readonly permissions: PermissionService, private readonly quota: QuotaManagementService, private readonly models: Models, - private readonly event: EventEmitter, + private readonly event: EventBus, private readonly mutex: RequestMutex, private readonly workspaceService: WorkspaceService, private readonly workspaceStorage: PgWorkspaceDocStorageAdapter @@ -389,7 +389,7 @@ export class WorkspaceResolver { }); await this.workspaceStorage.deleteSpace(id); - this.event.emit('workspace.deleted', id); + this.event.emit('workspace.deleted', { id }); return true; } diff --git a/packages/backend/server/src/models/user.ts b/packages/backend/server/src/models/user.ts index 5b5bded615..021b77958c 100644 --- a/packages/backend/server/src/models/user.ts +++ b/packages/backend/server/src/models/user.ts @@ -5,11 +5,10 @@ import { pick } from 'lodash-es'; import { CryptoHelper, EmailAlreadyUsed, - EventEmitter, + EventBus, WrongSignInCredentials, WrongSignInMethod, } from '../base'; -import type { Payload } from '../base/event/def'; import { Quota_FreePlanV1_1 } from '../core/quota/schema'; import { BaseModel } from './base'; import type { Workspace } from './workspace'; @@ -40,21 +39,15 @@ const defaultUserCreatingData = { }, }; -declare module '../base/event/def' { - interface UserEvents { - created: Payload; - updated: Payload; - deleted: Payload< - User & { - // TODO(@forehalo): unlink foreign key constraint on [WorkspaceUserPermission] to delegate - // dealing of owned workspaces of deleted users to workspace model - ownedWorkspaces: Workspace['id'][]; - } - >; - } - - interface EventDefinitions { - user: UserEvents; +declare global { + interface Events { + 'user.created': User; + 'user.updated': User; + 'user.deleted': User & { + // TODO(@forehalo): unlink foreign key constraint on [WorkspaceUserPermission] to delegate + // dealing of owned workspaces of deleted users to workspace model + ownedWorkspaces: Workspace['id'][]; + }; } } @@ -65,7 +58,7 @@ export type { User }; export class UserModel extends BaseModel { constructor( private readonly crypto: CryptoHelper, - private readonly event: EventEmitter + private readonly event: EventBus ) { super(); } diff --git a/packages/backend/server/src/models/workspace.ts b/packages/backend/server/src/models/workspace.ts index 9ce268e18e..ccd7da3da4 100644 --- a/packages/backend/server/src/models/workspace.ts +++ b/packages/backend/server/src/models/workspace.ts @@ -7,10 +7,57 @@ import { } from '@prisma/client'; import { groupBy } from 'lodash-es'; -import { EventEmitter } from '../base'; +import { EventBus } from '../base'; import { BaseModel } from './base'; import { Permission } from './common'; +declare global { + interface Events { + 'workspace.members.reviewRequested': { inviteId: string }; + 'workspace.members.requestDeclined': { + userId: string; + workspaceId: string; + }; + 'workspace.members.requestApproved': { inviteId: string }; + 'workspace.members.roleChanged': { + userId: string; + workspaceId: string; + permission: number; + }; + 'workspace.members.ownershipTransferred': { + from: string; + to: string; + workspaceId: string; + }; + 'workspace.members.updated': { + workspaceId: string; + count: number; + }; + 'workspace.members.leave': { + user: { + id: string; + email: string; + }; + workspaceId: string; + }; + 'workspace.members.removed': { + workspaceId: string; + userId: string; + }; + 'workspace.deleted': { + id: string; + }; + 'workspace.blob.delete': { + workspaceId: string; + key: string; + }; + 'workspace.blob.sync': { + workspaceId: string; + key: string; + }; + } +} + export { WorkspaceMemberStatus }; export type { Workspace }; export type UpdateWorkspaceInput = Pick< @@ -28,7 +75,7 @@ export interface FindWorkspaceMembersOptions { @Injectable() export class WorkspaceModel extends BaseModel { - constructor(private readonly event: EventEmitter) { + constructor(private readonly event: EventBus) { super(); } diff --git a/packages/backend/server/src/plugins/license/service.ts b/packages/backend/server/src/plugins/license/service.ts index 5d1491224b..e31992af78 100644 --- a/packages/backend/server/src/plugins/license/service.ts +++ b/packages/backend/server/src/plugins/license/service.ts @@ -3,8 +3,8 @@ import { Cron, CronExpression } from '@nestjs/schedule'; import { InstalledLicense, PrismaClient } from '@prisma/client'; import { - EventEmitter, - type EventPayload, + Config, + EventBus, InternalServerError, LicenseNotFound, OnEvent, @@ -27,9 +27,10 @@ export class LicenseService { private readonly logger = new Logger(LicenseService.name); constructor( + private readonly config: Config, private readonly db: PrismaClient, private readonly quota: QuotaManagementService, - private readonly event: EventEmitter, + private readonly event: EventBus, private readonly permission: PermissionService ) {} @@ -151,7 +152,11 @@ export class LicenseService { } @OnEvent('workspace.members.updated') - async updateTeamSeats(payload: EventPayload<'workspace.members.updated'>) { + async updateTeamSeats(payload: Events['workspace.members.updated']) { + if (!this.config.isSelfhosted) { + return; + } + const { workspaceId, count } = payload; const license = await this.db.installedLicense.findUnique({ @@ -308,7 +313,7 @@ export class LicenseService { plan, recurring, quantity, - }: EventPayload<'workspace.subscription.activated'>) { + }: Events['workspace.subscription.activated']) { switch (plan) { case SubscriptionPlan.SelfHostedTeam: await this.quota.addTeamWorkspace( @@ -331,7 +336,7 @@ export class LicenseService { async onWorkspaceSubscriptionCanceled({ workspaceId, plan, - }: EventPayload<'workspace.subscription.canceled'>) { + }: Events['workspace.subscription.canceled']) { switch (plan) { case SubscriptionPlan.SelfHostedTeam: await this.quota.removeTeamWorkspace(workspaceId); diff --git a/packages/backend/server/src/plugins/payment/controller.ts b/packages/backend/server/src/plugins/payment/controller.ts index 1708cea947..2225aef444 100644 --- a/packages/backend/server/src/plugins/payment/controller.ts +++ b/packages/backend/server/src/plugins/payment/controller.ts @@ -2,11 +2,10 @@ import assert from 'node:assert'; import type { RawBodyRequest } from '@nestjs/common'; import { Controller, Logger, Post, Req } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; import type { Request } from 'express'; import Stripe from 'stripe'; -import { Config, InternalServerError } from '../../base'; +import { Config, EventBus, InternalServerError } from '../../base'; import { Public } from '../../core/auth'; @Controller('/api/stripe') @@ -17,7 +16,7 @@ export class StripeWebhookController { constructor( config: Config, private readonly stripe: Stripe, - private readonly event: EventEmitter2 + private readonly event: EventBus ) { assert(config.plugins.payment.stripe); this.webhookKey = config.plugins.payment.stripe.keys.webhookKey; @@ -41,7 +40,7 @@ export class StripeWebhookController { // Stripe requires responseing webhook immediately and handle event asynchronously. setImmediate(() => { - this.event.emitAsync(`stripe:${event.type}`, event).catch(e => { + this.event.emitAsync(`stripe.${event.type}` as any, event).catch(e => { this.logger.error('Failed to handle Stripe Webhook event.', e); }); }); diff --git a/packages/backend/server/src/plugins/payment/cron.ts b/packages/backend/server/src/plugins/payment/cron.ts index 22118c76de..20fbd10b4b 100644 --- a/packages/backend/server/src/plugins/payment/cron.ts +++ b/packages/backend/server/src/plugins/payment/cron.ts @@ -3,7 +3,7 @@ import { OnEvent } from '@nestjs/event-emitter'; import { Cron, CronExpression } from '@nestjs/schedule'; import { PrismaClient } from '@prisma/client'; -import { EventEmitter, type EventPayload } from '../../base'; +import { EventBus } from '../../base'; import { SubscriptionPlan, SubscriptionRecurring, @@ -14,7 +14,7 @@ import { export class SubscriptionCronJobs { constructor( private readonly db: PrismaClient, - private readonly event: EventEmitter + private readonly event: EventBus ) {} private getDateRange(after: number, base: number | Date = Date.now()) { @@ -77,14 +77,14 @@ export class SubscriptionCronJobs { // should not reach here continue; } - this.event.emit('workspace.subscription.notify', { - workspaceId: subscription.targetId, - expirationDate: end, - deletionDate: - subscription.status === 'canceled' - ? this.getDateRange(180, end).end - : undefined, - }); + + if (!subscription.nextBillAt) { + this.event.emit('workspace.subscription.notify', { + workspaceId: subscription.targetId, + expirationDate: end, + deletionDate: this.getDateRange(180, end).end, + }); + } } } @@ -112,7 +112,7 @@ export class SubscriptionCronJobs { async handleUserSubscriptionCanceled({ userId, plan, - }: EventPayload<'user.subscription.canceled'>) { + }: Events['user.subscription.canceled']) { await this.db.subscription.delete({ where: { targetId_plan: { diff --git a/packages/backend/server/src/plugins/payment/manager/user.ts b/packages/backend/server/src/plugins/payment/manager/user.ts index 75b699a91b..f2ec0c66f0 100644 --- a/packages/backend/server/src/plugins/payment/manager/user.ts +++ b/packages/backend/server/src/plugins/payment/manager/user.ts @@ -5,7 +5,7 @@ import Stripe from 'stripe'; import { z } from 'zod'; import { - EventEmitter, + EventBus, InternalServerError, InvalidCheckoutParameters, Runtime, @@ -58,7 +58,7 @@ export class UserSubscriptionManager extends SubscriptionManager { db: PrismaClient, private readonly runtime: Runtime, private readonly feature: FeatureManagementService, - private readonly event: EventEmitter, + private readonly event: EventBus, private readonly url: URLHelper ) { super(stripe, db); diff --git a/packages/backend/server/src/plugins/payment/manager/workspace.ts b/packages/backend/server/src/plugins/payment/manager/workspace.ts index d40183136b..ed031211ef 100644 --- a/packages/backend/server/src/plugins/payment/manager/workspace.ts +++ b/packages/backend/server/src/plugins/payment/manager/workspace.ts @@ -5,8 +5,7 @@ import Stripe from 'stripe'; import { z } from 'zod'; import { - EventEmitter, - type EventPayload, + EventBus, OnEvent, SubscriptionAlreadyExists, SubscriptionPlanNotFound, @@ -49,7 +48,7 @@ export class WorkspaceSubscriptionManager extends SubscriptionManager { stripe: Stripe, db: PrismaClient, private readonly url: URLHelper, - private readonly event: EventEmitter + private readonly event: EventBus ) { super(stripe, db); } @@ -269,7 +268,7 @@ export class WorkspaceSubscriptionManager extends SubscriptionManager { async onMembersUpdated({ workspaceId, count, - }: EventPayload<'workspace.members.updated'>) { + }: Events['workspace.members.updated']) { const subscription = await this.getSubscription({ plan: SubscriptionPlan.Team, workspaceId, diff --git a/packages/backend/server/src/plugins/payment/quota.ts b/packages/backend/server/src/plugins/payment/quota.ts index a7ba9b745a..a7296d82c4 100644 --- a/packages/backend/server/src/plugins/payment/quota.ts +++ b/packages/backend/server/src/plugins/payment/quota.ts @@ -1,7 +1,6 @@ import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; -import type { EventPayload } from '../../base'; import { FeatureManagementService } from '../../core/features'; import { PermissionService } from '../../core/permission'; import { @@ -29,7 +28,7 @@ export class QuotaOverride { plan, recurring, quantity, - }: EventPayload<'workspace.subscription.activated'>) { + }: Events['workspace.subscription.activated']) { switch (plan) { case 'team': { const hasTeamWorkspace = await this.quota.hasWorkspaceQuota( @@ -62,7 +61,7 @@ export class QuotaOverride { async onWorkspaceSubscriptionCanceled({ workspaceId, plan, - }: EventPayload<'workspace.subscription.canceled'>) { + }: Events['workspace.subscription.canceled']) { switch (plan) { case SubscriptionPlan.Team: await this.manager.removeTeamWorkspace(workspaceId); @@ -77,7 +76,7 @@ export class QuotaOverride { userId, plan, recurring, - }: EventPayload<'user.subscription.activated'>) { + }: Events['user.subscription.activated']) { switch (plan) { case SubscriptionPlan.AI: await this.feature.addCopilot(userId, 'subscription activated'); @@ -100,7 +99,7 @@ export class QuotaOverride { async onUserSubscriptionCanceled({ userId, plan, - }: EventPayload<'user.subscription.canceled'>) { + }: Events['user.subscription.canceled']) { switch (plan) { case SubscriptionPlan.AI: await this.feature.removeCopilot(userId); diff --git a/packages/backend/server/src/plugins/payment/types.ts b/packages/backend/server/src/plugins/payment/types.ts index faf0dcc5ca..b599a2047a 100644 --- a/packages/backend/server/src/plugins/payment/types.ts +++ b/packages/backend/server/src/plugins/payment/types.ts @@ -1,8 +1,6 @@ import type { User, Workspace } from '@prisma/client'; import Stripe from 'stripe'; -import type { Payload } from '../../base/event/def'; - export enum SubscriptionRecurring { Monthly = 'monthly', Yearly = 'yearly', @@ -50,41 +48,44 @@ export enum CouponType { ProEarlyAccessAIOneYearFree = 'ai_pro_ea_one_year_free', } -declare module '../../base/event/def' { - interface UserEvents { - subscription: { - activated: Payload<{ - userId: User['id']; - plan: SubscriptionPlan; - recurring: SubscriptionRecurring; - }>; - canceled: Payload<{ - userId: User['id']; - plan: SubscriptionPlan; - recurring: SubscriptionRecurring; - }>; +declare global { + interface Events { + 'user.subscription.activated': { + userId: User['id']; + plan: SubscriptionPlan; + recurring: SubscriptionRecurring; + }; + 'user.subscription.canceled': { + userId: User['id']; + plan: SubscriptionPlan; + recurring: SubscriptionRecurring; }; - } - interface WorkspaceEvents { - subscription: { - activated: Payload<{ - workspaceId: Workspace['id']; - plan: SubscriptionPlan; - recurring: SubscriptionRecurring; - quantity: number; - }>; - canceled: Payload<{ - workspaceId: Workspace['id']; - plan: SubscriptionPlan; - recurring: SubscriptionRecurring; - }>; - notify: Payload<{ - workspaceId: Workspace['id']; - expirationDate: Date; - deletionDate: Date | undefined; - }>; + 'workspace.subscription.activated': { + workspaceId: Workspace['id']; + plan: SubscriptionPlan; + recurring: SubscriptionRecurring; + quantity: number; }; + 'workspace.subscription.canceled': { + workspaceId: Workspace['id']; + plan: SubscriptionPlan; + recurring: SubscriptionRecurring; + }; + 'workspace.subscription.notify': { + workspaceId: Workspace['id']; + expirationDate: Date; + deletionDate: Date; + }; + + 'stripe.invoice.created': Stripe.InvoiceCreatedEvent; + 'stripe.invoice.updated': Stripe.InvoiceUpdatedEvent; + 'stripe.invoice.finalization_failed': Stripe.InvoiceFinalizationFailedEvent; + 'stripe.invoice.payment_failed': Stripe.InvoicePaymentFailedEvent; + 'stripe.invoice.paid': Stripe.InvoicePaidEvent; + 'stripe.customer.subscription.created': Stripe.CustomerSubscriptionCreatedEvent; + 'stripe.customer.subscription.updated': Stripe.CustomerSubscriptionUpdatedEvent; + 'stripe.customer.subscription.deleted': Stripe.CustomerSubscriptionDeletedEvent; } } diff --git a/packages/backend/server/src/plugins/payment/webhook.ts b/packages/backend/server/src/plugins/payment/webhook.ts index 6c18fe7dfb..c295ea8646 100644 --- a/packages/backend/server/src/plugins/payment/webhook.ts +++ b/packages/backend/server/src/plugins/payment/webhook.ts @@ -1,14 +1,9 @@ import { Injectable } from '@nestjs/common'; -import { OnEvent } from '@nestjs/event-emitter'; import Stripe from 'stripe'; +import { OnEvent } from '../../base'; import { SubscriptionService } from './service'; -const OnStripeEvent = ( - event: Stripe.Event.Type, - opts?: Parameters[1] -) => OnEvent(`stripe:${event}`, opts); - /** * Stripe webhook events sent in random order, and may be even sent more than once. * @@ -22,11 +17,11 @@ export class StripeWebhook { private readonly stripe: Stripe ) {} - @OnStripeEvent('invoice.created') - @OnStripeEvent('invoice.updated') - @OnStripeEvent('invoice.finalization_failed') - @OnStripeEvent('invoice.payment_failed') - @OnStripeEvent('invoice.paid') + @OnEvent('stripe.invoice.created') + @OnEvent('stripe.invoice.updated') + @OnEvent('stripe.invoice.finalization_failed') + @OnEvent('stripe.invoice.payment_failed') + @OnEvent('stripe.invoice.paid') async onInvoiceUpdated( event: | Stripe.InvoiceCreatedEvent @@ -39,8 +34,8 @@ export class StripeWebhook { await this.service.saveStripeInvoice(invoice); } - @OnStripeEvent('customer.subscription.created') - @OnStripeEvent('customer.subscription.updated') + @OnEvent('stripe.customer.subscription.created') + @OnEvent('stripe.customer.subscription.updated') async onSubscriptionChanges( event: | Stripe.CustomerSubscriptionUpdatedEvent @@ -56,7 +51,7 @@ export class StripeWebhook { await this.service.saveStripeSubscription(subscription); } - @OnStripeEvent('customer.subscription.deleted') + @OnEvent('stripe.customer.subscription.deleted') async onSubscriptionDeleted(event: Stripe.CustomerSubscriptionDeletedEvent) { await this.service.deleteStripeSubscription(event.data.object); }