feat(server): event on snapshot upserted (#5002)

This commit is contained in:
liuyi
2023-11-22 07:23:44 +00:00
parent 525b196cae
commit 946b7b4004
3 changed files with 19 additions and 10 deletions

View File

@@ -5,6 +5,7 @@ import {
OnModuleDestroy, OnModuleDestroy,
OnModuleInit, OnModuleInit,
} from '@nestjs/common'; } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Snapshot, Update } from '@prisma/client'; import { Snapshot, Update } from '@prisma/client';
import { chunk } from 'lodash-es'; import { chunk } from 'lodash-es';
import { defer, retry } from 'rxjs'; import { defer, retry } from 'rxjs';
@@ -70,7 +71,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
private readonly db: PrismaService, private readonly db: PrismaService,
private readonly config: Config, private readonly config: Config,
private readonly metrics: Metrics, private readonly metrics: Metrics,
private readonly cache: Cache private readonly cache: Cache,
private readonly event: EventEmitter2
) {} ) {}
onModuleInit() { onModuleInit() {
@@ -411,7 +413,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
workspaceId: string, workspaceId: string,
guid: string, guid: string,
doc: Doc, doc: Doc,
seq?: number initialSeq?: number
) { ) {
const blob = Buffer.from(encodeStateAsUpdate(doc)); const blob = Buffer.from(encodeStateAsUpdate(doc));
const state = Buffer.from(encodeStateVector(doc)); const state = Buffer.from(encodeStateVector(doc));
@@ -435,7 +437,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
workspaceId, workspaceId,
blob, blob,
state, state,
seq, seq: initialSeq,
}, },
update: { update: {
blob, blob,
@@ -479,6 +481,10 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
...updates.map(u => u.blob) ...updates.map(u => u.blob)
); );
if (snapshot) {
this.event.emit('doc:manager:snapshot:beforeUpdate', snapshot);
}
await this.upsert(workspaceId, id, doc, last.seq); await this.upsert(workspaceId, id, doc, last.seq);
this.logger.debug( this.logger.debug(
`Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}` `Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}`
@@ -519,6 +525,9 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
// reset // reset
if (seq >= MAX_SEQ_NUM) { if (seq >= MAX_SEQ_NUM) {
await this.db.snapshot.update({ await this.db.snapshot.update({
select: {
seq: true,
},
where: { where: {
id_workspaceId: { id_workspaceId: {
workspaceId, workspaceId,

View File

@@ -11,7 +11,11 @@ import { WorkspaceModule } from './workspaces';
const { SERVER_FLAVOR } = process.env; const { SERVER_FLAVOR } = process.env;
const BusinessModules: (Type | DynamicModule)[] = []; const BusinessModules: (Type | DynamicModule)[] = [
EventEmitterModule.forRoot({
global: true,
}),
];
switch (SERVER_FLAVOR) { switch (SERVER_FLAVOR) {
case 'sync': case 'sync':
@@ -19,9 +23,6 @@ switch (SERVER_FLAVOR) {
break; break;
case 'graphql': case 'graphql':
BusinessModules.push( BusinessModules.push(
EventEmitterModule.forRoot({
global: true,
}),
GqlModule, GqlModule,
WorkspaceModule, WorkspaceModule,
UsersModule, UsersModule,
@@ -33,9 +34,6 @@ switch (SERVER_FLAVOR) {
case 'allinone': case 'allinone':
default: default:
BusinessModules.push( BusinessModules.push(
EventEmitterModule.forRoot({
global: true,
}),
GqlModule, GqlModule,
WorkspaceModule, WorkspaceModule,
UsersModule, UsersModule,

View File

@@ -1,6 +1,7 @@
import { mock } from 'node:test'; import { mock } from 'node:test';
import type { INestApplication } from '@nestjs/common'; import type { INestApplication } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { Test, TestingModule } from '@nestjs/testing'; import { Test, TestingModule } from '@nestjs/testing';
import test from 'ava'; import test from 'ava';
import { register } from 'prom-client'; import { register } from 'prom-client';
@@ -20,6 +21,7 @@ const createModule = () => {
PrismaModule, PrismaModule,
MetricsModule, MetricsModule,
CacheModule, CacheModule,
EventEmitterModule.forRoot(),
ConfigModule.forRoot(), ConfigModule.forRoot(),
DocModule.forRoot(), DocModule.forRoot(),
], ],