feat(server): use job system (#10218)

This commit is contained in:
forehalo
2025-02-18 05:41:57 +00:00
parent cb895d4cb0
commit da67c78152
15 changed files with 124 additions and 281 deletions

View File

@@ -65,7 +65,7 @@ test('should be able to cleanup expired history', async t => {
let count = await db.snapshotHistory.count();
t.is(count, 20);
await t.context.cronJob.cleanupExpiredHistory();
await t.context.cronJob.cleanExpiredHistories();
count = await db.snapshotHistory.count();
t.is(count, 10);

View File

@@ -3,7 +3,6 @@ import { randomUUID } from 'node:crypto';
import ava, { TestFn } from 'ava';
import { applyUpdate, Doc as YDoc } from 'yjs';
import { ConfigModule } from '../../base/config';
import {
DocStorageModule,
PgUserspaceDocStorageAdapter as Adapter,
@@ -21,16 +20,7 @@ const test = ava as TestFn<Context>;
test.before(async t => {
const module = await createTestingModule({
imports: [
ConfigModule.forRoot({
doc: {
manager: {
enableUpdateAutoMerging: false,
},
},
}),
DocStorageModule,
],
imports: [DocStorageModule],
});
t.context.models = module.get(Models);

View File

@@ -3,7 +3,6 @@ import test from 'ava';
import * as Sinon from 'sinon';
import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { ConfigModule } from '../../base/config';
import {
DocStorageModule,
PgWorkspaceDocStorageAdapter as Adapter,
@@ -16,16 +15,7 @@ let adapter: Adapter;
test.before('init testing module', async () => {
m = await createTestingModule({
imports: [
ConfigModule.forRoot({
doc: {
manager: {
enableUpdateAutoMerging: false,
},
},
}),
DocStorageModule,
],
imports: [DocStorageModule],
});
db = m.get(PrismaClient);
adapter = m.get(Adapter);

View File

@@ -1,6 +1,6 @@
import {
Attributes,
Counter,
Gauge,
Histogram,
Meter,
MetricOptions,
@@ -12,14 +12,17 @@ type MetricType = 'counter' | 'gauge' | 'histogram';
type Metric<T extends MetricType> = T extends 'counter'
? Counter
: T extends 'gauge'
? Histogram
? Gauge
: T extends 'histogram'
? Histogram
: never;
export type ScopedMetrics = {
[T in MetricType]: (name: string, opts?: MetricOptions) => Metric<T>;
counter: (name: string, opts?: MetricOptions) => Counter;
gauge: (name: string, opts?: MetricOptions) => Gauge;
histogram: (name: string, opts?: MetricOptions) => Histogram;
};
type MetricCreators = {
[T in MetricType]: (
meter: Meter,
@@ -46,20 +49,7 @@ const metricCreators: MetricCreators = {
return meter.createCounter(name, opts);
},
gauge(meter: Meter, name: string, opts?: MetricOptions) {
let value: any;
let attrs: Attributes | undefined;
const ob$ = meter.createObservableGauge(name, opts);
ob$.addCallback(result => {
result.observe(value, attrs);
});
return {
record: (newValue, newAttrs) => {
value = newValue;
attrs = newAttrs;
},
} satisfies Histogram;
return meter.createGauge(name, opts);
},
histogram(meter: Meter, name: string, opts?: MetricOptions) {
return meter.createHistogram(name, opts);

View File

@@ -35,7 +35,6 @@ AFFiNE.ENV_MAP = {
REDIS_SERVER_USER: 'redis.username',
REDIS_SERVER_PASSWORD: 'redis.password',
REDIS_SERVER_DATABASE: ['redis.db', 'int'],
DOC_MERGE_INTERVAL: ['doc.manager.updatePollInterval', 'int'],
DOC_SERVICE_ENDPOINT: 'docService.endpoint',
STRIPE_API_KEY: 'plugins.payment.stripe.keys.APIKey',
STRIPE_WEBHOOK_KEY: 'plugins.payment.stripe.keys.webhookKey',

View File

@@ -1,13 +1,35 @@
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { JobQueue, OnJob } from '../../base';
import { Models } from '../../models';
declare global {
interface Jobs {
'nightly.cleanExpiredUserSessions': {};
}
}
@Injectable()
export class AuthCronJob {
constructor(private readonly models: Models) {}
constructor(
private readonly models: Models,
private readonly queue: JobQueue
) {}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async nightlyJob() {
await this.queue.add(
'nightly.cleanExpiredUserSessions',
{},
{
// avoid duplicated jobs
jobId: 'nightly-auth-clean-expired-user-sessions',
}
);
}
@OnJob('nightly.cleanExpiredUserSessions')
async cleanExpiredUserSessions() {
await this.models.session.cleanExpiredUserSessions();
}

View File

@@ -1,109 +0,0 @@
import { mock } from 'node:test';
import { ScheduleModule } from '@nestjs/schedule';
import ava, { TestFn } from 'ava';
import * as Sinon from 'sinon';
import { Doc as YDoc } from 'yjs';
import {
createTestingModule,
type TestingModule,
} from '../../../__tests__/utils';
import { Config } from '../../../base';
import {
DocStorageModule,
PgWorkspaceDocStorageAdapter,
} from '../../../core/doc';
import { Models } from '../../../models';
import { DocServiceModule } from '..';
import { DocServiceCronJob } from '../job';
interface Context {
timer: Sinon.SinonFakeTimers;
module: TestingModule;
cronJob: DocServiceCronJob;
config: Config;
adapter: PgWorkspaceDocStorageAdapter;
models: Models;
}
const test = ava as TestFn<Context>;
// cleanup database before each test
test.before(async t => {
t.context.timer = Sinon.useFakeTimers({
toFake: ['setInterval'],
});
t.context.module = await createTestingModule({
imports: [ScheduleModule.forRoot(), DocStorageModule, DocServiceModule],
});
t.context.cronJob = t.context.module.get(DocServiceCronJob);
t.context.config = t.context.module.get(Config);
t.context.adapter = t.context.module.get(PgWorkspaceDocStorageAdapter);
t.context.models = t.context.module.get(Models);
});
test.beforeEach(async t => {
await t.context.module.initTestingDB();
});
test.afterEach(async t => {
t.context.timer.restore();
Sinon.restore();
mock.reset();
});
test.after.always(async t => {
await t.context.module.close();
});
test('should poll when interval due', async t => {
const cronJob = t.context.cronJob;
const interval = t.context.config.doc.manager.updatePollInterval;
let resolve: any;
const fake = mock.method(cronJob, 'autoMergePendingDocUpdates', () => {
return new Promise(_resolve => {
resolve = _resolve;
});
});
t.context.timer.tick(interval);
t.is(fake.mock.callCount(), 1);
// busy
t.context.timer.tick(interval);
// @ts-expect-error private member
t.is(cronJob.busy, true);
t.is(fake.mock.callCount(), 1);
resolve();
await t.context.timer.tickAsync(1);
// @ts-expect-error private member
t.is(cronJob.busy, false);
t.context.timer.tick(interval);
t.is(fake.mock.callCount(), 2);
});
test('should auto merge pending doc updates', async t => {
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await t.context.adapter.pushDocUpdates('2', '2', updates);
await t.context.cronJob.autoMergePendingDocUpdates();
const rows = await t.context.models.doc.findUpdates('2', '2');
t.is(rows.length, 0);
// again should merge nothing
await t.context.cronJob.autoMergePendingDocUpdates();
});

View File

@@ -1,61 +1,26 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
import { CLS_ID, ClsService } from 'nestjs-cls';
import { Injectable } from '@nestjs/common';
import { CallMetric, Config, genRequestId, metrics } from '../../base';
import { OnJob } from '../../base';
import { PgWorkspaceDocStorageAdapter } from '../doc';
@Injectable()
export class DocServiceCronJob implements OnModuleInit {
private busy = false;
private readonly logger = new Logger(DocServiceCronJob.name);
constructor(
private readonly config: Config,
private readonly cls: ClsService,
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly registry: SchedulerRegistry
) {}
onModuleInit() {
if (this.config.doc.manager.enableUpdateAutoMerging) {
this.registry.addInterval(
this.autoMergePendingDocUpdates.name,
// scheduler registry will clean up the interval when the app is stopped
setInterval(() => {
if (this.busy) {
return;
}
this.busy = true;
this.autoMergePendingDocUpdates()
.catch(() => {
/* never fail */
})
.finally(() => {
this.busy = false;
});
}, this.config.doc.manager.updatePollInterval)
);
this.logger.log('Updates pending queue auto merging cron started');
}
}
@CallMetric('doc', 'auto_merge_pending_doc_updates')
async autoMergePendingDocUpdates() {
await this.cls.run(async () => {
this.cls.set(CLS_ID, genRequestId('job'));
try {
const randomDoc = await this.workspace.randomDoc();
if (!randomDoc) {
return;
}
await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId);
} catch (e) {
metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1);
this.logger.error('Failed to auto merge pending doc updates', e);
}
});
declare global {
interface Jobs {
'doc.mergePendingDocUpdates': {
workspaceId: string;
docId: string;
};
}
}
@Injectable()
export class DocServiceCronJob {
constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {}
@OnJob('doc.mergePendingDocUpdates')
async mergePendingDocUpdates({
workspaceId,
docId,
}: Jobs['doc.mergePendingDocUpdates']) {
await this.workspace.getDoc(workspaceId, docId);
}
}

View File

@@ -8,6 +8,7 @@ import {
EventBus,
FailedToSaveUpdates,
FailedToUpsertSnapshot,
JobQueue,
metrics,
Mutex,
} from '../../../base';
@@ -50,7 +51,8 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
private readonly mutex: Mutex,
private readonly cache: Cache,
private readonly event: EventBus,
protected override readonly options: DocStorageOptions
protected override readonly options: DocStorageOptions,
private readonly queue: JobQueue
) {
super(options);
}
@@ -95,9 +97,19 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
};
})
);
await this.queue.add(
'doc.mergePendingDocUpdates',
{
workspaceId,
docId,
},
{
// keep it simple to let all update merged in one job
jobId: `doc:merge-pending-updates:${workspaceId}:${docId}`,
}
);
turn++;
done += batch.length;
await this.updateCachedUpdatesCount(workspaceId, docId, batch.length);
}
});
@@ -146,14 +158,11 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
docId: string,
updates: DocUpdate[]
) {
const count = await this.models.doc.deleteUpdates(
return await this.models.doc.deleteUpdates(
workspaceId,
docId,
updates.map(u => u.timestamp)
);
await this.updateCachedUpdatesCount(workspaceId, docId, -count);
return count;
}
async listDocHistories(
@@ -395,23 +404,4 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
return null;
}
private async updateCachedUpdatesCount(
workspaceId: string,
guid: string,
count: number
) {
const result = await this.cache.mapIncrease(
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`,
count
);
if (result <= 0) {
await this.cache.mapDelete(
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`
);
}
}
}

View File

@@ -5,28 +5,6 @@ import {
} from '../../base/config';
interface DocStartupConfigurations {
manager: {
/**
* Whether auto merge updates into doc snapshot.
*/
enableUpdateAutoMerging: boolean;
/**
* How often the [DocManager] will start a new turn of merging pending updates into doc snapshot.
*
* This is not the latency a new joint client will take to see the latest doc,
* but the buffer time we introduced to reduce the load of our service.
*
* in {ms}
*/
updatePollInterval: number;
/**
* The maximum number of updates that will be pulled from the server at once.
* Existing for avoiding the server to be overloaded when there are too many updates for one doc.
*/
maxUpdatesPullCount: number;
};
history: {
/**
* How long the buffer time of creating a new history snapshot when doc get updated.
@@ -53,11 +31,6 @@ declare module '../../base/config' {
}
defineStartupConfig('doc', {
manager: {
enableUpdateAutoMerging: true,
updatePollInterval: 3000,
maxUpdatesPullCount: 500,
},
history: {
interval: 1000 * 60 * 10 /* 10 mins */,
},

View File

@@ -1,27 +1,38 @@
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { metrics, OnEvent } from '../../base';
import { JobQueue, OnEvent, OnJob } from '../../base';
import { Models } from '../../models';
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
declare global {
interface Jobs {
'nightly.cleanExpiredHistories': {};
}
}
@Injectable()
export class DocStorageCronJob {
constructor(
private readonly models: Models,
private readonly workspace: PgWorkspaceDocStorageAdapter
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly queue: JobQueue
) {}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */)
async cleanupExpiredHistory() {
await this.models.doc.deleteExpiredHistories();
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async nightlyJob() {
await this.queue.add(
'nightly.cleanExpiredHistories',
{},
{
jobId: 'nightly-doc-clean-expired-histories',
}
);
}
@Cron(CronExpression.EVERY_MINUTE)
async reportUpdatesQueueCount() {
metrics.doc
.gauge('updates_queue_count')
.record(await this.models.doc.getGlobalUpdateCount());
@OnJob('nightly.cleanExpiredHistories')
async cleanExpiredHistories() {
await this.models.doc.deleteExpiredHistories();
}
@OnEvent('user.deleted')

View File

@@ -152,13 +152,13 @@ export class SpaceSyncGateway
handleConnection() {
this.connectionCount++;
this.logger.log(`New connection, total: ${this.connectionCount}`);
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
metrics.socketio.gauge('connections').record(1);
}
handleDisconnect() {
this.connectionCount--;
this.logger.log(`Connection disconnected, total: ${this.connectionCount}`);
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
metrics.socketio.gauge('connections').record(-1);
}
selectAdapter(client: Socket, spaceType: SpaceType): SyncSocketAdapter {

View File

@@ -2,18 +2,26 @@ import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import { EventBus, OnEvent } from '../../base';
import { EventBus, JobQueue, OnEvent, OnJob } from '../../base';
import {
SubscriptionPlan,
SubscriptionRecurring,
SubscriptionVariant,
} from './types';
declare global {
interface Jobs {
'nightly.cleanExpiredOnetimeSubscriptions': {};
'nightly.notifyAboutToExpireWorkspaceSubscriptions': {};
}
}
@Injectable()
export class SubscriptionCronJobs {
constructor(
private readonly db: PrismaClient,
private readonly event: EventBus
private readonly event: EventBus,
private readonly queue: JobQueue
) {}
private getDateRange(after: number, base: number | Date = Date.now()) {
@@ -27,9 +35,27 @@ export class SubscriptionCronJobs {
return { start, end };
}
// TODO(@darkskygit): enable this after the cluster event system is ready
// @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */)
async notifyExpiredWorkspace() {
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async nightlyJob() {
await this.queue.add(
'nightly.cleanExpiredOnetimeSubscriptions',
{},
{
jobId: 'nightly-payment-clean-expired-onetime-subscriptions',
}
);
await this.queue.add(
'nightly.notifyAboutToExpireWorkspaceSubscriptions',
{},
{
jobId: 'nightly-payment-notify-about-to-expire-workspace-subscriptions',
}
);
}
@OnJob('nightly.notifyAboutToExpireWorkspaceSubscriptions')
async notifyAboutToExpireWorkspaceSubscriptions() {
const { start: after30DayStart, end: after30DayEnd } =
this.getDateRange(30);
const { start: todayStart, end: todayEnd } = this.getDateRange(0);
@@ -87,7 +113,7 @@ export class SubscriptionCronJobs {
}
}
@Cron(CronExpression.EVERY_HOUR)
@OnJob('nightly.cleanExpiredOnetimeSubscriptions')
async cleanExpiredOnetimeSubscriptions() {
const subscriptions = await this.db.subscription.findMany({
where: {