From 17d584b3363b6869d58c9aa3c2252bd248574ad4 Mon Sep 17 00:00:00 2001 From: liuyi Date: Fri, 8 Dec 2023 05:00:58 +0000 Subject: [PATCH] refactor(server): use events system (#5149) --- packages/backend/server/src/event/events.ts | 19 ++---- packages/backend/server/src/event/index.ts | 3 +- packages/backend/server/src/event/types.ts | 7 +- .../backend/server/src/modules/doc/history.ts | 65 +++++++++++++------ .../backend/server/src/modules/doc/manager.ts | 44 +++++++++++-- .../server/src/modules/workspaces/resolver.ts | 15 +---- packages/backend/server/tests/doc.spec.ts | 4 +- packages/backend/server/tests/history.spec.ts | 61 ++++++----------- 8 files changed, 122 insertions(+), 96 deletions(-) diff --git a/packages/backend/server/src/event/events.ts b/packages/backend/server/src/event/events.ts index e98bb72440..025d664663 100644 --- a/packages/backend/server/src/event/events.ts +++ b/packages/backend/server/src/event/events.ts @@ -1,23 +1,18 @@ -import type { Snapshot, User, Workspace } from '@prisma/client'; +import type { Snapshot, Workspace } from '@prisma/client'; -import { ChangePayload, Flatten, Payload } from './types'; +import { Flatten, Payload } from './types'; interface EventDefinitions { - user: { - created: Payload; - updated: Payload>; - deleted: Payload; - }; - workspace: { - created: Payload; - updated: Payload>; deleted: Payload; }; snapshot: { - created: Payload; - updated: Payload>; + updated: Payload< + Pick & { + previous: Pick; + } + >; deleted: Payload>; }; } diff --git a/packages/backend/server/src/event/index.ts b/packages/backend/server/src/event/index.ts index 06b398ad04..0c70582e59 100644 --- a/packages/backend/server/src/event/index.ts +++ b/packages/backend/server/src/event/index.ts @@ -5,7 +5,7 @@ import { OnEvent as RawOnEvent, } from '@nestjs/event-emitter'; -import { Event, EventPayload } from './events'; +import type { Event, EventPayload } from './events'; @Injectable() export class EventEmitter { @@ -42,3 +42,4 @@ export const OnEvent = ( exports: [EventEmitter], }) export class EventModule {} +export { EventPayload }; diff --git a/packages/backend/server/src/event/types.ts b/packages/backend/server/src/event/types.ts index 2a4bb5771c..65d48b293d 100644 --- a/packages/backend/server/src/event/types.ts +++ b/packages/backend/server/src/event/types.ts @@ -3,11 +3,6 @@ export type Payload = { data: T; }; -export type ChangePayload = { - from: Partial; - to: Partial; -}; - export type Join = A extends '' ? B : `${A}.${B}`; @@ -33,6 +28,6 @@ export type Leaves = T extends Payload export type Flatten = Leaves extends infer R ? { // @ts-expect-error yo, ts can't make it - [K in R]: PathType extends Payload ? { data: U } : never; + [K in R]: PathType extends Payload ? U : never; } : never; diff --git a/packages/backend/server/src/modules/doc/history.ts b/packages/backend/server/src/modules/doc/history.ts index 7502355950..fa52ecda97 100644 --- a/packages/backend/server/src/modules/doc/history.ts +++ b/packages/backend/server/src/modules/doc/history.ts @@ -1,15 +1,15 @@ import { isDeepStrictEqual } from 'node:util'; import { Injectable, Logger } from '@nestjs/common'; -import { OnEvent } from '@nestjs/event-emitter'; import { Cron, CronExpression } from '@nestjs/schedule'; -import type { Snapshot } from '@prisma/client'; import { Config } from '../../config'; +import { type EventPayload, OnEvent } from '../../event'; import { metrics } from '../../metrics'; import { PrismaService } from '../../prisma'; import { SubscriptionStatus } from '../payment/service'; import { Permission } from '../workspaces/types'; +import { isEmptyBuffer } from './manager'; @Injectable() export class DocHistoryManager { @@ -19,16 +19,38 @@ export class DocHistoryManager { private readonly db: PrismaService ) {} - @OnEvent('doc:manager:snapshot:beforeUpdate') - async onDocUpdated(snapshot: Snapshot, forceCreate = false) { - const last = await this.last(snapshot.workspaceId, snapshot.id); + @OnEvent('workspace.deleted') + onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) { + return this.db.snapshotHistory.deleteMany({ + where: { + workspaceId, + }, + }); + } + + @OnEvent('snapshot.deleted') + onSnapshotDeleted({ workspaceId, id }: EventPayload<'snapshot.deleted'>) { + return this.db.snapshotHistory.deleteMany({ + where: { + workspaceId, + id, + }, + }); + } + + @OnEvent('snapshot.updated') + async onDocUpdated( + { workspaceId, id, previous }: EventPayload<'snapshot.updated'>, + forceCreate = false + ) { + const last = await this.last(workspaceId, id); let shouldCreateHistory = false; if (!last) { // never created shouldCreateHistory = true; - } else if (last.timestamp === snapshot.updatedAt) { + } else if (last.timestamp === previous.updatedAt) { // no change shouldCreateHistory = false; } else if ( @@ -36,16 +58,23 @@ export class DocHistoryManager { forceCreate || // last history created before interval in configs last.timestamp.getTime() < - snapshot.updatedAt.getTime() - this.config.doc.history.interval + previous.updatedAt.getTime() - this.config.doc.history.interval ) { shouldCreateHistory = true; } if (shouldCreateHistory) { // skip the history recording when no actual update on snapshot happended - if (last && isDeepStrictEqual(last.state, snapshot.state)) { + if (last && isDeepStrictEqual(last.state, previous.state)) { this.logger.debug( - `State matches, skip creating history record for ${snapshot.id} in workspace ${snapshot.workspaceId}` + `State matches, skip creating history record for ${id} in workspace ${workspaceId}` + ); + return; + } + + if (isEmptyBuffer(previous.blob)) { + this.logger.debug( + `Doc is empty, skip creating history record for ${id} in workspace ${workspaceId}` ); return; } @@ -56,12 +85,12 @@ export class DocHistoryManager { timestamp: true, }, data: { - workspaceId: snapshot.workspaceId, - id: snapshot.id, - timestamp: snapshot.updatedAt, - blob: snapshot.blob, - state: snapshot.state, - expiredAt: await this.getExpiredDateFromNow(snapshot.workspaceId), + workspaceId, + id, + timestamp: previous.updatedAt, + blob: previous.blob, + state: previous.state, + expiredAt: await this.getExpiredDateFromNow(workspaceId), }, }) .catch(() => { @@ -73,9 +102,7 @@ export class DocHistoryManager { description: 'How many times the snapshot history created', }) .add(1); - this.logger.log( - `History created for ${snapshot.id} in workspace ${snapshot.workspaceId}.` - ); + this.logger.log(`History created for ${id} in workspace ${workspaceId}.`); } } @@ -180,7 +207,7 @@ export class DocHistoryManager { } // save old snapshot as one history record - await this.onDocUpdated(oldSnapshot, true); + await this.onDocUpdated({ workspaceId, id, previous: oldSnapshot }, true); // WARN: // we should never do the snapshot updating in recovering, // which is not the solution in CRDT. diff --git a/packages/backend/server/src/modules/doc/manager.ts b/packages/backend/server/src/modules/doc/manager.ts index a1fd0faf4b..57151bb760 100644 --- a/packages/backend/server/src/modules/doc/manager.ts +++ b/packages/backend/server/src/modules/doc/manager.ts @@ -5,7 +5,6 @@ import { OnModuleDestroy, OnModuleInit, } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; import { Snapshot, Update } from '@prisma/client'; import { chunk } from 'lodash-es'; import { defer, retry } from 'rxjs'; @@ -20,6 +19,7 @@ import { import { Cache } from '../../cache'; import { Config } from '../../config'; +import { EventEmitter, type EventPayload, OnEvent } from '../../event'; import { metrics } from '../../metrics/metrics'; import { PrismaService } from '../../prisma'; import { mergeUpdatesInApplyWay as jwstMergeUpdates } from '../../storage'; @@ -71,7 +71,7 @@ function isStateNewer(lhs: Buffer, rhs: Buffer): boolean { return false; } -function isEmptyBuffer(buf: Buffer): boolean { +export function isEmptyBuffer(buf: Buffer): boolean { return ( buf.length === 0 || // 0x0000 @@ -102,7 +102,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { private readonly db: PrismaService, private readonly config: Config, private readonly cache: Cache, - private readonly event: EventEmitter2 + private readonly event: EventEmitter ) {} onModuleInit() { @@ -224,6 +224,33 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { } } + @OnEvent('workspace.deleted') + async onWorkspaceDeleted(workspaceId: string) { + await this.db.snapshot.deleteMany({ + where: { + workspaceId, + }, + }); + await this.db.update.deleteMany({ + where: { + workspaceId, + }, + }); + } + + @OnEvent('snapshot.deleted') + async onSnapshotDeleted({ + id, + workspaceId, + }: EventPayload<'snapshot.deleted'>) { + await this.db.update.deleteMany({ + where: { + id, + workspaceId, + }, + }); + } + /** * add update to manager for later processing. */ @@ -538,8 +565,17 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { ...updates.map(u => u.blob) ); + await this.upsert(workspaceId, id, doc, last.seq); if (snapshot) { - this.event.emit('doc:manager:snapshot:beforeUpdate', snapshot); + this.event.emit('snapshot.updated', { + id, + workspaceId, + previous: { + blob: snapshot.blob, + state: snapshot.state, + updatedAt: snapshot.updatedAt, + }, + }); } const done = await this.upsert(workspaceId, id, doc, last.seq); diff --git a/packages/backend/server/src/modules/workspaces/resolver.ts b/packages/backend/server/src/modules/workspaces/resolver.ts index e1aae88a19..bbb772d76f 100644 --- a/packages/backend/server/src/modules/workspaces/resolver.ts +++ b/packages/backend/server/src/modules/workspaces/resolver.ts @@ -33,6 +33,7 @@ import type { import GraphQLUpload from 'graphql-upload/GraphQLUpload.mjs'; import { applyUpdate, Doc } from 'yjs'; +import { EventEmitter } from '../../event'; import { PrismaService } from '../../prisma'; import { StorageProvide } from '../../storage'; import { CloudThrottlerGuard, Throttle } from '../../throttler'; @@ -146,6 +147,7 @@ export class WorkspaceResolver { private readonly prisma: PrismaService, private readonly permissions: PermissionService, private readonly users: UsersService, + private readonly event: EventEmitter, @Inject(StorageProvide) private readonly storage: Storage ) {} @@ -388,18 +390,7 @@ export class WorkspaceResolver { }, }); - await this.prisma.$transaction([ - this.prisma.update.deleteMany({ - where: { - workspaceId: id, - }, - }), - this.prisma.snapshot.deleteMany({ - where: { - workspaceId: id, - }, - }), - ]); + this.event.emit('workspace.deleted', id); return true; } diff --git a/packages/backend/server/tests/doc.spec.ts b/packages/backend/server/tests/doc.spec.ts index 2fbec0be8a..76ff2a1ca1 100644 --- a/packages/backend/server/tests/doc.spec.ts +++ b/packages/backend/server/tests/doc.spec.ts @@ -1,7 +1,6 @@ import { mock } from 'node:test'; import type { INestApplication } from '@nestjs/common'; -import { EventEmitterModule } from '@nestjs/event-emitter'; import { Test, TestingModule } from '@nestjs/testing'; import test from 'ava'; import { register } from 'prom-client'; @@ -15,6 +14,7 @@ import { import { CacheModule } from '../src/cache'; import { Config, ConfigModule } from '../src/config'; +import { EventModule } from '../src/event'; import { DocManager, DocModule } from '../src/modules/doc'; import { PrismaModule, PrismaService } from '../src/prisma'; import { flushDB } from './utils'; @@ -24,7 +24,7 @@ const createModule = () => { imports: [ PrismaModule, CacheModule, - EventEmitterModule.forRoot(), + EventModule, ConfigModule.forRoot(), DocModule.forRoot(), ], diff --git a/packages/backend/server/tests/history.spec.ts b/packages/backend/server/tests/history.spec.ts index c2ed148574..0c77b1facd 100644 --- a/packages/backend/server/tests/history.spec.ts +++ b/packages/backend/server/tests/history.spec.ts @@ -6,6 +6,7 @@ import test from 'ava'; import * as Sinon from 'sinon'; import { ConfigModule } from '../src/config'; +import type { EventPayload } from '../src/event'; import { DocHistoryManager } from '../src/modules/doc'; import { PrismaModule, PrismaService } from '../src/prisma'; import { flushDB } from './utils'; @@ -41,21 +42,28 @@ test.afterEach(async () => { const snapshot: Snapshot = { workspaceId: '1', id: 'doc1', - blob: Buffer.from([0, 0]), - state: Buffer.from([0, 0]), + blob: Buffer.from([1, 0]), + state: Buffer.from([0]), seq: 0, updatedAt: new Date(), createdAt: new Date(), }; +function getEventData( + timestamp: Date = new Date() +): EventPayload<'snapshot.updated'> { + return { + workspaceId: snapshot.workspaceId, + id: snapshot.id, + previous: { ...snapshot, updatedAt: timestamp }, + }; +} + test('should create doc history if never created before', async t => { Sinon.stub(manager, 'last').resolves(null); const timestamp = new Date(); - await manager.onDocUpdated({ - ...snapshot, - updatedAt: timestamp, - }); + await manager.onDocUpdated(getEventData(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -72,10 +80,7 @@ test('should not create history if timestamp equals to last record', async t => const timestamp = new Date(); Sinon.stub(manager, 'last').resolves({ timestamp, state: null }); - await manager.onDocUpdated({ - ...snapshot, - updatedAt: timestamp, - }); + await manager.onDocUpdated(getEventData(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -94,10 +99,7 @@ test('should not create history if state equals to last record', async t => { state: snapshot.state, }); - await manager.onDocUpdated({ - ...snapshot, - updatedAt: timestamp, - }); + await manager.onDocUpdated(getEventData(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -116,10 +118,7 @@ test('should not create history if time diff is less than interval config', asyn state: Buffer.from([0, 1]), }); - await manager.onDocUpdated({ - ...snapshot, - updatedAt: timestamp, - }); + await manager.onDocUpdated(getEventData(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -138,10 +137,7 @@ test('should create history if time diff is larger than interval config and stat state: Buffer.from([0, 1]), }); - await manager.onDocUpdated({ - ...snapshot, - updatedAt: timestamp, - }); + await manager.onDocUpdated(getEventData(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -160,13 +156,7 @@ test('should create history with force flag even if time diff in small', async t state: Buffer.from([0, 1]), }); - await manager.onDocUpdated( - { - ...snapshot, - updatedAt: timestamp, - }, - true - ); + await manager.onDocUpdated(getEventData(timestamp), true); const history = await db.snapshotHistory.findFirst({ where: { @@ -224,13 +214,7 @@ test('should correctly list all history records', async t => { test('should be able to get history data', async t => { const timestamp = new Date(); - await manager.onDocUpdated( - { - ...snapshot, - updatedAt: timestamp, - }, - true - ); + await manager.onDocUpdated(getEventData(timestamp), true); const history = await manager.get( snapshot.workspaceId, @@ -274,10 +258,7 @@ test('should be able to recover from history', async t => { }, }); const history1Timestamp = snapshot.updatedAt.getTime() - 10; - await manager.onDocUpdated({ - ...snapshot, - updatedAt: new Date(history1Timestamp), - }); + await manager.onDocUpdated(getEventData(new Date(history1Timestamp))); await manager.recover( snapshot.workspaceId,