refactor(server): use events system (#5149)

This commit is contained in:
liuyi
2023-12-08 05:00:58 +00:00
parent 52cfe4521a
commit 17d584b336
8 changed files with 122 additions and 96 deletions

View File

@@ -1,23 +1,18 @@
import type { Snapshot, User, Workspace } from '@prisma/client';
import type { Snapshot, Workspace } from '@prisma/client';
import { ChangePayload, Flatten, Payload } from './types';
import { Flatten, Payload } from './types';
interface EventDefinitions {
user: {
created: Payload<User>;
updated: Payload<ChangePayload<User>>;
deleted: Payload<User['id']>;
};
workspace: {
created: Payload<Workspace>;
updated: Payload<ChangePayload<Workspace>>;
deleted: Payload<Workspace['id']>;
};
snapshot: {
created: Payload<Snapshot>;
updated: Payload<ChangePayload<Snapshot>>;
updated: Payload<
Pick<Snapshot, 'id' | 'workspaceId'> & {
previous: Pick<Snapshot, 'blob' | 'state' | 'updatedAt'>;
}
>;
deleted: Payload<Pick<Snapshot, 'id' | 'workspaceId'>>;
};
}

View File

@@ -5,7 +5,7 @@ import {
OnEvent as RawOnEvent,
} from '@nestjs/event-emitter';
import { Event, EventPayload } from './events';
import type { Event, EventPayload } from './events';
@Injectable()
export class EventEmitter {
@@ -42,3 +42,4 @@ export const OnEvent = (
exports: [EventEmitter],
})
export class EventModule {}
export { EventPayload };

View File

@@ -3,11 +3,6 @@ export type Payload<T> = {
data: T;
};
export type ChangePayload<T> = {
from: Partial<T>;
to: Partial<T>;
};
export type Join<A extends string, B extends string> = A extends ''
? B
: `${A}.${B}`;
@@ -33,6 +28,6 @@ export type Leaves<T, P extends string = ''> = T extends Payload<any>
export type Flatten<T> = Leaves<T> extends infer R
? {
// @ts-expect-error yo, ts can't make it
[K in R]: PathType<T, K> extends Payload<infer U> ? { data: U } : never;
[K in R]: PathType<T, K> extends Payload<infer U> ? U : never;
}
: never;

View File

@@ -1,15 +1,15 @@
import { isDeepStrictEqual } from 'node:util';
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { Cron, CronExpression } from '@nestjs/schedule';
import type { Snapshot } from '@prisma/client';
import { Config } from '../../config';
import { type EventPayload, OnEvent } from '../../event';
import { metrics } from '../../metrics';
import { PrismaService } from '../../prisma';
import { SubscriptionStatus } from '../payment/service';
import { Permission } from '../workspaces/types';
import { isEmptyBuffer } from './manager';
@Injectable()
export class DocHistoryManager {
@@ -19,16 +19,38 @@ export class DocHistoryManager {
private readonly db: PrismaService
) {}
@OnEvent('doc:manager:snapshot:beforeUpdate')
async onDocUpdated(snapshot: Snapshot, forceCreate = false) {
const last = await this.last(snapshot.workspaceId, snapshot.id);
@OnEvent('workspace.deleted')
onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) {
return this.db.snapshotHistory.deleteMany({
where: {
workspaceId,
},
});
}
@OnEvent('snapshot.deleted')
onSnapshotDeleted({ workspaceId, id }: EventPayload<'snapshot.deleted'>) {
return this.db.snapshotHistory.deleteMany({
where: {
workspaceId,
id,
},
});
}
@OnEvent('snapshot.updated')
async onDocUpdated(
{ workspaceId, id, previous }: EventPayload<'snapshot.updated'>,
forceCreate = false
) {
const last = await this.last(workspaceId, id);
let shouldCreateHistory = false;
if (!last) {
// never created
shouldCreateHistory = true;
} else if (last.timestamp === snapshot.updatedAt) {
} else if (last.timestamp === previous.updatedAt) {
// no change
shouldCreateHistory = false;
} else if (
@@ -36,16 +58,23 @@ export class DocHistoryManager {
forceCreate ||
// last history created before interval in configs
last.timestamp.getTime() <
snapshot.updatedAt.getTime() - this.config.doc.history.interval
previous.updatedAt.getTime() - this.config.doc.history.interval
) {
shouldCreateHistory = true;
}
if (shouldCreateHistory) {
// skip the history recording when no actual update on snapshot happended
if (last && isDeepStrictEqual(last.state, snapshot.state)) {
if (last && isDeepStrictEqual(last.state, previous.state)) {
this.logger.debug(
`State matches, skip creating history record for ${snapshot.id} in workspace ${snapshot.workspaceId}`
`State matches, skip creating history record for ${id} in workspace ${workspaceId}`
);
return;
}
if (isEmptyBuffer(previous.blob)) {
this.logger.debug(
`Doc is empty, skip creating history record for ${id} in workspace ${workspaceId}`
);
return;
}
@@ -56,12 +85,12 @@ export class DocHistoryManager {
timestamp: true,
},
data: {
workspaceId: snapshot.workspaceId,
id: snapshot.id,
timestamp: snapshot.updatedAt,
blob: snapshot.blob,
state: snapshot.state,
expiredAt: await this.getExpiredDateFromNow(snapshot.workspaceId),
workspaceId,
id,
timestamp: previous.updatedAt,
blob: previous.blob,
state: previous.state,
expiredAt: await this.getExpiredDateFromNow(workspaceId),
},
})
.catch(() => {
@@ -73,9 +102,7 @@ export class DocHistoryManager {
description: 'How many times the snapshot history created',
})
.add(1);
this.logger.log(
`History created for ${snapshot.id} in workspace ${snapshot.workspaceId}.`
);
this.logger.log(`History created for ${id} in workspace ${workspaceId}.`);
}
}
@@ -180,7 +207,7 @@ export class DocHistoryManager {
}
// save old snapshot as one history record
await this.onDocUpdated(oldSnapshot, true);
await this.onDocUpdated({ workspaceId, id, previous: oldSnapshot }, true);
// WARN:
// we should never do the snapshot updating in recovering,
// which is not the solution in CRDT.

View File

@@ -5,7 +5,6 @@ import {
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Snapshot, Update } from '@prisma/client';
import { chunk } from 'lodash-es';
import { defer, retry } from 'rxjs';
@@ -20,6 +19,7 @@ import {
import { Cache } from '../../cache';
import { Config } from '../../config';
import { EventEmitter, type EventPayload, OnEvent } from '../../event';
import { metrics } from '../../metrics/metrics';
import { PrismaService } from '../../prisma';
import { mergeUpdatesInApplyWay as jwstMergeUpdates } from '../../storage';
@@ -71,7 +71,7 @@ function isStateNewer(lhs: Buffer, rhs: Buffer): boolean {
return false;
}
function isEmptyBuffer(buf: Buffer): boolean {
export function isEmptyBuffer(buf: Buffer): boolean {
return (
buf.length === 0 ||
// 0x0000
@@ -102,7 +102,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
private readonly db: PrismaService,
private readonly config: Config,
private readonly cache: Cache,
private readonly event: EventEmitter2
private readonly event: EventEmitter
) {}
onModuleInit() {
@@ -224,6 +224,33 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
}
}
@OnEvent('workspace.deleted')
async onWorkspaceDeleted(workspaceId: string) {
await this.db.snapshot.deleteMany({
where: {
workspaceId,
},
});
await this.db.update.deleteMany({
where: {
workspaceId,
},
});
}
@OnEvent('snapshot.deleted')
async onSnapshotDeleted({
id,
workspaceId,
}: EventPayload<'snapshot.deleted'>) {
await this.db.update.deleteMany({
where: {
id,
workspaceId,
},
});
}
/**
* add update to manager for later processing.
*/
@@ -538,8 +565,17 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
...updates.map(u => u.blob)
);
await this.upsert(workspaceId, id, doc, last.seq);
if (snapshot) {
this.event.emit('doc:manager:snapshot:beforeUpdate', snapshot);
this.event.emit('snapshot.updated', {
id,
workspaceId,
previous: {
blob: snapshot.blob,
state: snapshot.state,
updatedAt: snapshot.updatedAt,
},
});
}
const done = await this.upsert(workspaceId, id, doc, last.seq);

View File

@@ -33,6 +33,7 @@ import type {
import GraphQLUpload from 'graphql-upload/GraphQLUpload.mjs';
import { applyUpdate, Doc } from 'yjs';
import { EventEmitter } from '../../event';
import { PrismaService } from '../../prisma';
import { StorageProvide } from '../../storage';
import { CloudThrottlerGuard, Throttle } from '../../throttler';
@@ -146,6 +147,7 @@ export class WorkspaceResolver {
private readonly prisma: PrismaService,
private readonly permissions: PermissionService,
private readonly users: UsersService,
private readonly event: EventEmitter,
@Inject(StorageProvide) private readonly storage: Storage
) {}
@@ -388,18 +390,7 @@ export class WorkspaceResolver {
},
});
await this.prisma.$transaction([
this.prisma.update.deleteMany({
where: {
workspaceId: id,
},
}),
this.prisma.snapshot.deleteMany({
where: {
workspaceId: id,
},
}),
]);
this.event.emit('workspace.deleted', id);
return true;
}