From da67c781528cfc1ed3ee98770c3a57b0dc519a27 Mon Sep 17 00:00:00 2001 From: forehalo Date: Tue, 18 Feb 2025 05:41:57 +0000 Subject: [PATCH] feat(server): use job system (#10218) --- .../charts/graphql/templates/deployment.yaml | 2 - .../helm/affine/charts/graphql/values.yaml | 2 - .../server/src/__tests__/doc/cron.spec.ts | 2 +- .../src/__tests__/doc/userspace.spec.ts | 12 +- .../src/__tests__/doc/workspace.spec.ts | 12 +- .../server/src/base/metrics/metrics.ts | 24 ++-- .../backend/server/src/config/affine.env.ts | 1 - packages/backend/server/src/core/auth/job.ts | 24 +++- .../core/doc-service/__tests__/job.spec.ts | 109 ------------------ .../server/src/core/doc-service/job.ts | 77 ++++--------- .../server/src/core/doc/adapters/workspace.ts | 40 +++---- .../backend/server/src/core/doc/config.ts | 27 ----- packages/backend/server/src/core/doc/job.ts | 31 +++-- .../backend/server/src/core/sync/gateway.ts | 4 +- .../server/src/plugins/payment/cron.ts | 38 +++++- 15 files changed, 124 insertions(+), 281 deletions(-) delete mode 100644 packages/backend/server/src/core/doc-service/__tests__/job.spec.ts diff --git a/.github/helm/affine/charts/graphql/templates/deployment.yaml b/.github/helm/affine/charts/graphql/templates/deployment.yaml index 7ead6812e1..a6cba6eb4f 100644 --- a/.github/helm/affine/charts/graphql/templates/deployment.yaml +++ b/.github/helm/affine/charts/graphql/templates/deployment.yaml @@ -116,8 +116,6 @@ spec: secretKeyRef: name: "{{ .Values.app.payment.stripe.secretName }}" key: stripeWebhookKey - - name: DOC_MERGE_INTERVAL - value: "{{ .Values.app.doc.mergeInterval }}" - name: DOC_SERVICE_ENDPOINT value: "http://{{ .Values.global.docService.name }}:{{ .Values.global.docService.port }}" {{ if .Values.app.experimental.enableJwstCodec }} diff --git a/.github/helm/affine/charts/graphql/values.yaml b/.github/helm/affine/charts/graphql/values.yaml index 9dd154d616..86915f4d8e 100644 --- a/.github/helm/affine/charts/graphql/values.yaml +++ b/.github/helm/affine/charts/graphql/values.yaml @@ -17,8 +17,6 @@ app: # AFFINE_SERVER_HOST host: '0.0.0.0' https: true - doc: - mergeInterval: "3000" captcha: enabled: false secretName: captcha diff --git a/packages/backend/server/src/__tests__/doc/cron.spec.ts b/packages/backend/server/src/__tests__/doc/cron.spec.ts index 5b90f8db10..c182abdf0f 100644 --- a/packages/backend/server/src/__tests__/doc/cron.spec.ts +++ b/packages/backend/server/src/__tests__/doc/cron.spec.ts @@ -65,7 +65,7 @@ test('should be able to cleanup expired history', async t => { let count = await db.snapshotHistory.count(); t.is(count, 20); - await t.context.cronJob.cleanupExpiredHistory(); + await t.context.cronJob.cleanExpiredHistories(); count = await db.snapshotHistory.count(); t.is(count, 10); diff --git a/packages/backend/server/src/__tests__/doc/userspace.spec.ts b/packages/backend/server/src/__tests__/doc/userspace.spec.ts index 14f97d74d6..9eedf35f0c 100644 --- a/packages/backend/server/src/__tests__/doc/userspace.spec.ts +++ b/packages/backend/server/src/__tests__/doc/userspace.spec.ts @@ -3,7 +3,6 @@ import { randomUUID } from 'node:crypto'; import ava, { TestFn } from 'ava'; import { applyUpdate, Doc as YDoc } from 'yjs'; -import { ConfigModule } from '../../base/config'; import { DocStorageModule, PgUserspaceDocStorageAdapter as Adapter, @@ -21,16 +20,7 @@ const test = ava as TestFn; test.before(async t => { const module = await createTestingModule({ - imports: [ - ConfigModule.forRoot({ - doc: { - manager: { - enableUpdateAutoMerging: false, - }, - }, - }), - DocStorageModule, - ], + imports: [DocStorageModule], }); t.context.models = module.get(Models); diff --git a/packages/backend/server/src/__tests__/doc/workspace.spec.ts b/packages/backend/server/src/__tests__/doc/workspace.spec.ts index 60d3b9ed99..cb94f08519 100644 --- a/packages/backend/server/src/__tests__/doc/workspace.spec.ts +++ b/packages/backend/server/src/__tests__/doc/workspace.spec.ts @@ -3,7 +3,6 @@ import test from 'ava'; import * as Sinon from 'sinon'; import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs'; -import { ConfigModule } from '../../base/config'; import { DocStorageModule, PgWorkspaceDocStorageAdapter as Adapter, @@ -16,16 +15,7 @@ let adapter: Adapter; test.before('init testing module', async () => { m = await createTestingModule({ - imports: [ - ConfigModule.forRoot({ - doc: { - manager: { - enableUpdateAutoMerging: false, - }, - }, - }), - DocStorageModule, - ], + imports: [DocStorageModule], }); db = m.get(PrismaClient); adapter = m.get(Adapter); diff --git a/packages/backend/server/src/base/metrics/metrics.ts b/packages/backend/server/src/base/metrics/metrics.ts index 8129e5859a..84a7239dc1 100644 --- a/packages/backend/server/src/base/metrics/metrics.ts +++ b/packages/backend/server/src/base/metrics/metrics.ts @@ -1,6 +1,6 @@ import { - Attributes, Counter, + Gauge, Histogram, Meter, MetricOptions, @@ -12,14 +12,17 @@ type MetricType = 'counter' | 'gauge' | 'histogram'; type Metric = T extends 'counter' ? Counter : T extends 'gauge' - ? Histogram + ? Gauge : T extends 'histogram' ? Histogram : never; export type ScopedMetrics = { - [T in MetricType]: (name: string, opts?: MetricOptions) => Metric; + counter: (name: string, opts?: MetricOptions) => Counter; + gauge: (name: string, opts?: MetricOptions) => Gauge; + histogram: (name: string, opts?: MetricOptions) => Histogram; }; + type MetricCreators = { [T in MetricType]: ( meter: Meter, @@ -46,20 +49,7 @@ const metricCreators: MetricCreators = { return meter.createCounter(name, opts); }, gauge(meter: Meter, name: string, opts?: MetricOptions) { - let value: any; - let attrs: Attributes | undefined; - const ob$ = meter.createObservableGauge(name, opts); - - ob$.addCallback(result => { - result.observe(value, attrs); - }); - - return { - record: (newValue, newAttrs) => { - value = newValue; - attrs = newAttrs; - }, - } satisfies Histogram; + return meter.createGauge(name, opts); }, histogram(meter: Meter, name: string, opts?: MetricOptions) { return meter.createHistogram(name, opts); diff --git a/packages/backend/server/src/config/affine.env.ts b/packages/backend/server/src/config/affine.env.ts index 80fcb5543a..38e039136f 100644 --- a/packages/backend/server/src/config/affine.env.ts +++ b/packages/backend/server/src/config/affine.env.ts @@ -35,7 +35,6 @@ AFFiNE.ENV_MAP = { REDIS_SERVER_USER: 'redis.username', REDIS_SERVER_PASSWORD: 'redis.password', REDIS_SERVER_DATABASE: ['redis.db', 'int'], - DOC_MERGE_INTERVAL: ['doc.manager.updatePollInterval', 'int'], DOC_SERVICE_ENDPOINT: 'docService.endpoint', STRIPE_API_KEY: 'plugins.payment.stripe.keys.APIKey', STRIPE_WEBHOOK_KEY: 'plugins.payment.stripe.keys.webhookKey', diff --git a/packages/backend/server/src/core/auth/job.ts b/packages/backend/server/src/core/auth/job.ts index 1e59279dd5..550709aabe 100644 --- a/packages/backend/server/src/core/auth/job.ts +++ b/packages/backend/server/src/core/auth/job.ts @@ -1,13 +1,35 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; +import { JobQueue, OnJob } from '../../base'; import { Models } from '../../models'; +declare global { + interface Jobs { + 'nightly.cleanExpiredUserSessions': {}; + } +} + @Injectable() export class AuthCronJob { - constructor(private readonly models: Models) {} + constructor( + private readonly models: Models, + private readonly queue: JobQueue + ) {} @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async nightlyJob() { + await this.queue.add( + 'nightly.cleanExpiredUserSessions', + {}, + { + // avoid duplicated jobs + jobId: 'nightly-auth-clean-expired-user-sessions', + } + ); + } + + @OnJob('nightly.cleanExpiredUserSessions') async cleanExpiredUserSessions() { await this.models.session.cleanExpiredUserSessions(); } diff --git a/packages/backend/server/src/core/doc-service/__tests__/job.spec.ts b/packages/backend/server/src/core/doc-service/__tests__/job.spec.ts deleted file mode 100644 index 8c4f2ed7e8..0000000000 --- a/packages/backend/server/src/core/doc-service/__tests__/job.spec.ts +++ /dev/null @@ -1,109 +0,0 @@ -import { mock } from 'node:test'; - -import { ScheduleModule } from '@nestjs/schedule'; -import ava, { TestFn } from 'ava'; -import * as Sinon from 'sinon'; -import { Doc as YDoc } from 'yjs'; - -import { - createTestingModule, - type TestingModule, -} from '../../../__tests__/utils'; -import { Config } from '../../../base'; -import { - DocStorageModule, - PgWorkspaceDocStorageAdapter, -} from '../../../core/doc'; -import { Models } from '../../../models'; -import { DocServiceModule } from '..'; -import { DocServiceCronJob } from '../job'; - -interface Context { - timer: Sinon.SinonFakeTimers; - module: TestingModule; - cronJob: DocServiceCronJob; - config: Config; - adapter: PgWorkspaceDocStorageAdapter; - models: Models; -} - -const test = ava as TestFn; - -// cleanup database before each test -test.before(async t => { - t.context.timer = Sinon.useFakeTimers({ - toFake: ['setInterval'], - }); - t.context.module = await createTestingModule({ - imports: [ScheduleModule.forRoot(), DocStorageModule, DocServiceModule], - }); - - t.context.cronJob = t.context.module.get(DocServiceCronJob); - t.context.config = t.context.module.get(Config); - t.context.adapter = t.context.module.get(PgWorkspaceDocStorageAdapter); - t.context.models = t.context.module.get(Models); -}); - -test.beforeEach(async t => { - await t.context.module.initTestingDB(); -}); - -test.afterEach(async t => { - t.context.timer.restore(); - Sinon.restore(); - mock.reset(); -}); - -test.after.always(async t => { - await t.context.module.close(); -}); - -test('should poll when interval due', async t => { - const cronJob = t.context.cronJob; - const interval = t.context.config.doc.manager.updatePollInterval; - - let resolve: any; - const fake = mock.method(cronJob, 'autoMergePendingDocUpdates', () => { - return new Promise(_resolve => { - resolve = _resolve; - }); - }); - - t.context.timer.tick(interval); - t.is(fake.mock.callCount(), 1); - - // busy - t.context.timer.tick(interval); - // @ts-expect-error private member - t.is(cronJob.busy, true); - t.is(fake.mock.callCount(), 1); - - resolve(); - await t.context.timer.tickAsync(1); - - // @ts-expect-error private member - t.is(cronJob.busy, false); - t.context.timer.tick(interval); - t.is(fake.mock.callCount(), 2); -}); - -test('should auto merge pending doc updates', async t => { - const doc = new YDoc(); - const text = doc.getText('content'); - const updates: Buffer[] = []; - - doc.on('update', update => { - updates.push(Buffer.from(update)); - }); - - text.insert(0, 'hello'); - text.insert(5, 'world'); - text.insert(5, ' '); - - await t.context.adapter.pushDocUpdates('2', '2', updates); - await t.context.cronJob.autoMergePendingDocUpdates(); - const rows = await t.context.models.doc.findUpdates('2', '2'); - t.is(rows.length, 0); - // again should merge nothing - await t.context.cronJob.autoMergePendingDocUpdates(); -}); diff --git a/packages/backend/server/src/core/doc-service/job.ts b/packages/backend/server/src/core/doc-service/job.ts index 45f4ee5d6a..088c7630b2 100644 --- a/packages/backend/server/src/core/doc-service/job.ts +++ b/packages/backend/server/src/core/doc-service/job.ts @@ -1,61 +1,26 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { CLS_ID, ClsService } from 'nestjs-cls'; +import { Injectable } from '@nestjs/common'; -import { CallMetric, Config, genRequestId, metrics } from '../../base'; +import { OnJob } from '../../base'; import { PgWorkspaceDocStorageAdapter } from '../doc'; -@Injectable() -export class DocServiceCronJob implements OnModuleInit { - private busy = false; - private readonly logger = new Logger(DocServiceCronJob.name); - - constructor( - private readonly config: Config, - private readonly cls: ClsService, - private readonly workspace: PgWorkspaceDocStorageAdapter, - private readonly registry: SchedulerRegistry - ) {} - - onModuleInit() { - if (this.config.doc.manager.enableUpdateAutoMerging) { - this.registry.addInterval( - this.autoMergePendingDocUpdates.name, - // scheduler registry will clean up the interval when the app is stopped - setInterval(() => { - if (this.busy) { - return; - } - this.busy = true; - this.autoMergePendingDocUpdates() - .catch(() => { - /* never fail */ - }) - .finally(() => { - this.busy = false; - }); - }, this.config.doc.manager.updatePollInterval) - ); - - this.logger.log('Updates pending queue auto merging cron started'); - } - } - - @CallMetric('doc', 'auto_merge_pending_doc_updates') - async autoMergePendingDocUpdates() { - await this.cls.run(async () => { - this.cls.set(CLS_ID, genRequestId('job')); - try { - const randomDoc = await this.workspace.randomDoc(); - if (!randomDoc) { - return; - } - - await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId); - } catch (e) { - metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1); - this.logger.error('Failed to auto merge pending doc updates', e); - } - }); +declare global { + interface Jobs { + 'doc.mergePendingDocUpdates': { + workspaceId: string; + docId: string; + }; + } +} + +@Injectable() +export class DocServiceCronJob { + constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {} + + @OnJob('doc.mergePendingDocUpdates') + async mergePendingDocUpdates({ + workspaceId, + docId, + }: Jobs['doc.mergePendingDocUpdates']) { + await this.workspace.getDoc(workspaceId, docId); } } diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts index 14b3e21352..860f28bf6d 100644 --- a/packages/backend/server/src/core/doc/adapters/workspace.ts +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -8,6 +8,7 @@ import { EventBus, FailedToSaveUpdates, FailedToUpsertSnapshot, + JobQueue, metrics, Mutex, } from '../../../base'; @@ -50,7 +51,8 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { private readonly mutex: Mutex, private readonly cache: Cache, private readonly event: EventBus, - protected override readonly options: DocStorageOptions + protected override readonly options: DocStorageOptions, + private readonly queue: JobQueue ) { super(options); } @@ -95,9 +97,19 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { }; }) ); + await this.queue.add( + 'doc.mergePendingDocUpdates', + { + workspaceId, + docId, + }, + { + // keep it simple to let all update merged in one job + jobId: `doc:merge-pending-updates:${workspaceId}:${docId}`, + } + ); turn++; done += batch.length; - await this.updateCachedUpdatesCount(workspaceId, docId, batch.length); } }); @@ -146,14 +158,11 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { docId: string, updates: DocUpdate[] ) { - const count = await this.models.doc.deleteUpdates( + return await this.models.doc.deleteUpdates( workspaceId, docId, updates.map(u => u.timestamp) ); - - await this.updateCachedUpdatesCount(workspaceId, docId, -count); - return count; } async listDocHistories( @@ -395,23 +404,4 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { return null; } - - private async updateCachedUpdatesCount( - workspaceId: string, - guid: string, - count: number - ) { - const result = await this.cache.mapIncrease( - UPDATES_QUEUE_CACHE_KEY, - `${workspaceId}::${guid}`, - count - ); - - if (result <= 0) { - await this.cache.mapDelete( - UPDATES_QUEUE_CACHE_KEY, - `${workspaceId}::${guid}` - ); - } - } } diff --git a/packages/backend/server/src/core/doc/config.ts b/packages/backend/server/src/core/doc/config.ts index 302c35eed1..1add392c4f 100644 --- a/packages/backend/server/src/core/doc/config.ts +++ b/packages/backend/server/src/core/doc/config.ts @@ -5,28 +5,6 @@ import { } from '../../base/config'; interface DocStartupConfigurations { - manager: { - /** - * Whether auto merge updates into doc snapshot. - */ - enableUpdateAutoMerging: boolean; - - /** - * How often the [DocManager] will start a new turn of merging pending updates into doc snapshot. - * - * This is not the latency a new joint client will take to see the latest doc, - * but the buffer time we introduced to reduce the load of our service. - * - * in {ms} - */ - updatePollInterval: number; - - /** - * The maximum number of updates that will be pulled from the server at once. - * Existing for avoiding the server to be overloaded when there are too many updates for one doc. - */ - maxUpdatesPullCount: number; - }; history: { /** * How long the buffer time of creating a new history snapshot when doc get updated. @@ -53,11 +31,6 @@ declare module '../../base/config' { } defineStartupConfig('doc', { - manager: { - enableUpdateAutoMerging: true, - updatePollInterval: 3000, - maxUpdatesPullCount: 500, - }, history: { interval: 1000 * 60 * 10 /* 10 mins */, }, diff --git a/packages/backend/server/src/core/doc/job.ts b/packages/backend/server/src/core/doc/job.ts index 10f0f52bd2..028cc65be9 100644 --- a/packages/backend/server/src/core/doc/job.ts +++ b/packages/backend/server/src/core/doc/job.ts @@ -1,27 +1,38 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; -import { metrics, OnEvent } from '../../base'; +import { JobQueue, OnEvent, OnJob } from '../../base'; import { Models } from '../../models'; import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; +declare global { + interface Jobs { + 'nightly.cleanExpiredHistories': {}; + } +} + @Injectable() export class DocStorageCronJob { constructor( private readonly models: Models, - private readonly workspace: PgWorkspaceDocStorageAdapter + private readonly workspace: PgWorkspaceDocStorageAdapter, + private readonly queue: JobQueue ) {} - @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */) - async cleanupExpiredHistory() { - await this.models.doc.deleteExpiredHistories(); + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async nightlyJob() { + await this.queue.add( + 'nightly.cleanExpiredHistories', + {}, + { + jobId: 'nightly-doc-clean-expired-histories', + } + ); } - @Cron(CronExpression.EVERY_MINUTE) - async reportUpdatesQueueCount() { - metrics.doc - .gauge('updates_queue_count') - .record(await this.models.doc.getGlobalUpdateCount()); + @OnJob('nightly.cleanExpiredHistories') + async cleanExpiredHistories() { + await this.models.doc.deleteExpiredHistories(); } @OnEvent('user.deleted') diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts index 9e0cd1887f..a9a4a1e0e4 100644 --- a/packages/backend/server/src/core/sync/gateway.ts +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -152,13 +152,13 @@ export class SpaceSyncGateway handleConnection() { this.connectionCount++; this.logger.log(`New connection, total: ${this.connectionCount}`); - metrics.socketio.gauge('realtime_connections').record(this.connectionCount); + metrics.socketio.gauge('connections').record(1); } handleDisconnect() { this.connectionCount--; this.logger.log(`Connection disconnected, total: ${this.connectionCount}`); - metrics.socketio.gauge('realtime_connections').record(this.connectionCount); + metrics.socketio.gauge('connections').record(-1); } selectAdapter(client: Socket, spaceType: SpaceType): SyncSocketAdapter { diff --git a/packages/backend/server/src/plugins/payment/cron.ts b/packages/backend/server/src/plugins/payment/cron.ts index ebae9d2933..3e3a77eef7 100644 --- a/packages/backend/server/src/plugins/payment/cron.ts +++ b/packages/backend/server/src/plugins/payment/cron.ts @@ -2,18 +2,26 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { PrismaClient } from '@prisma/client'; -import { EventBus, OnEvent } from '../../base'; +import { EventBus, JobQueue, OnEvent, OnJob } from '../../base'; import { SubscriptionPlan, SubscriptionRecurring, SubscriptionVariant, } from './types'; +declare global { + interface Jobs { + 'nightly.cleanExpiredOnetimeSubscriptions': {}; + 'nightly.notifyAboutToExpireWorkspaceSubscriptions': {}; + } +} + @Injectable() export class SubscriptionCronJobs { constructor( private readonly db: PrismaClient, - private readonly event: EventBus + private readonly event: EventBus, + private readonly queue: JobQueue ) {} private getDateRange(after: number, base: number | Date = Date.now()) { @@ -27,9 +35,27 @@ export class SubscriptionCronJobs { return { start, end }; } - // TODO(@darkskygit): enable this after the cluster event system is ready - // @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */) - async notifyExpiredWorkspace() { + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async nightlyJob() { + await this.queue.add( + 'nightly.cleanExpiredOnetimeSubscriptions', + {}, + { + jobId: 'nightly-payment-clean-expired-onetime-subscriptions', + } + ); + + await this.queue.add( + 'nightly.notifyAboutToExpireWorkspaceSubscriptions', + {}, + { + jobId: 'nightly-payment-notify-about-to-expire-workspace-subscriptions', + } + ); + } + + @OnJob('nightly.notifyAboutToExpireWorkspaceSubscriptions') + async notifyAboutToExpireWorkspaceSubscriptions() { const { start: after30DayStart, end: after30DayEnd } = this.getDateRange(30); const { start: todayStart, end: todayEnd } = this.getDateRange(0); @@ -87,7 +113,7 @@ export class SubscriptionCronJobs { } } - @Cron(CronExpression.EVERY_HOUR) + @OnJob('nightly.cleanExpiredOnetimeSubscriptions') async cleanExpiredOnetimeSubscriptions() { const subscriptions = await this.db.subscription.findMany({ where: {