feat(server): use doc service (#9967)

close CLOUD-94
This commit is contained in:
fengmk2
2025-02-08 05:27:56 +00:00
parent ee0df52531
commit e5d1cd9ea2
17 changed files with 423 additions and 127 deletions

View File

@@ -28,7 +28,7 @@ jobs:
extra-flags: workspaces focus @affine/server extra-flags: workspaces focus @affine/server
- name: Build Server - name: Build Server
run: | run: |
rm -rf packages/backend/server/src/__tests__ find packages/backend/server -type d -name "__tests__" -exec rm -rf {} +
yarn workspace @affine/server build yarn workspace @affine/server build
- name: Upload server dist - name: Upload server dist
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4

View File

@@ -1,67 +1,39 @@
import { mock } from 'node:test';
import { ScheduleModule } from '@nestjs/schedule'; import { ScheduleModule } from '@nestjs/schedule';
import { TestingModule } from '@nestjs/testing';
import { PrismaClient } from '@prisma/client'; import { PrismaClient } from '@prisma/client';
import test from 'ava'; import ava, { TestFn } from 'ava';
import * as Sinon from 'sinon';
import { Config } from '../../base/config';
import { DocStorageModule } from '../../core/doc'; import { DocStorageModule } from '../../core/doc';
import { DocStorageCronJob } from '../../core/doc/job'; import { DocStorageCronJob } from '../../core/doc/job';
import { createTestingModule } from '../utils'; import { createTestingModule, type TestingModule } from '../utils';
let m: TestingModule; interface Context {
let timer: Sinon.SinonFakeTimers; module: TestingModule;
let db: PrismaClient; db: PrismaClient;
cronJob: DocStorageCronJob;
}
const test = ava as TestFn<Context>;
// cleanup database before each test // cleanup database before each test
test.before(async () => { test.before(async t => {
timer = Sinon.useFakeTimers({ t.context.module = await createTestingModule({
toFake: ['setInterval'],
});
m = await createTestingModule({
imports: [ScheduleModule.forRoot(), DocStorageModule], imports: [ScheduleModule.forRoot(), DocStorageModule],
}); });
db = m.get(PrismaClient); t.context.db = t.context.module.get(PrismaClient);
t.context.cronJob = t.context.module.get(DocStorageCronJob);
}); });
test.after.always(async () => { test.beforeEach(async t => {
await m.close(); await t.context.module.initTestingDB();
timer.restore();
}); });
test('should poll when intervel due', async t => { test.after.always(async t => {
const manager = m.get(DocStorageCronJob); await t.context.module.close();
const interval = m.get(Config).doc.manager.updatePollInterval;
let resolve: any;
const fake = mock.method(manager, 'autoMergePendingDocUpdates', () => {
return new Promise(_resolve => {
resolve = _resolve;
});
});
timer.tick(interval);
t.is(fake.mock.callCount(), 1);
// busy
timer.tick(interval);
// @ts-expect-error private member
t.is(manager.busy, true);
t.is(fake.mock.callCount(), 1);
resolve();
await timer.tickAsync(1);
// @ts-expect-error private member
t.is(manager.busy, false);
timer.tick(interval);
t.is(fake.mock.callCount(), 2);
}); });
test('should be able to cleanup expired history', async t => { test('should be able to cleanup expired history', async t => {
const { db } = t.context;
const timestamp = Date.now(); const timestamp = Date.now();
// insert expired data // insert expired data
@@ -93,7 +65,7 @@ test('should be able to cleanup expired history', async t => {
let count = await db.snapshotHistory.count(); let count = await db.snapshotHistory.count();
t.is(count, 20); t.is(count, 20);
await m.get(DocStorageCronJob).cleanupExpiredHistory(); await t.context.cronJob.cleanupExpiredHistory();
count = await db.snapshotHistory.count(); count = await db.snapshotHistory.count();
t.is(count, 10); t.is(count, 10);

View File

@@ -1,8 +1,4 @@
import { import { INestApplication, LogLevel, ModuleMetadata } from '@nestjs/common';
ConsoleLogger,
INestApplication,
ModuleMetadata,
} from '@nestjs/common';
import { APP_GUARD, ModuleRef } from '@nestjs/core'; import { APP_GUARD, ModuleRef } from '@nestjs/core';
import { Query, Resolver } from '@nestjs/graphql'; import { Query, Resolver } from '@nestjs/graphql';
import { import {
@@ -17,12 +13,15 @@ import type { Response } from 'supertest';
import supertest from 'supertest'; import supertest from 'supertest';
import { AppModule, FunctionalityModules } from '../../app.module'; import { AppModule, FunctionalityModules } from '../../app.module';
import { GlobalExceptionFilter, Runtime } from '../../base'; import { AFFiNELogger, GlobalExceptionFilter, Runtime } from '../../base';
import { GqlModule } from '../../base/graphql'; import { GqlModule } from '../../base/graphql';
import { AuthGuard, AuthModule } from '../../core/auth'; import { AuthGuard, AuthModule } from '../../core/auth';
import { RefreshFeatures0001 } from '../../data/migrations/0001-refresh-features'; import { RefreshFeatures0001 } from '../../data/migrations/0001-refresh-features';
import { ModelsModule } from '../../models'; import { ModelsModule } from '../../models';
const TEST_LOG_LEVEL: LogLevel =
(process.env.TEST_LOG_LEVEL as LogLevel) ?? 'fatal';
async function flushDB(client: PrismaClient) { async function flushDB(client: PrismaClient) {
const result: { tablename: string }[] = const result: { tablename: string }[] =
await client.$queryRaw`SELECT tablename await client.$queryRaw`SELECT tablename
@@ -39,7 +38,7 @@ async function flushDB(client: PrismaClient) {
); );
} }
interface TestingModuleMeatdata extends ModuleMetadata { interface TestingModuleMetadata extends ModuleMetadata {
tapModule?(m: TestingModuleBuilder): void; tapModule?(m: TestingModuleBuilder): void;
tapApp?(app: INestApplication): void; tapApp?(app: INestApplication): void;
} }
@@ -85,7 +84,7 @@ class MockResolver {
} }
export async function createTestingModule( export async function createTestingModule(
moduleDef: TestingModuleMeatdata = {}, moduleDef: TestingModuleMetadata = {},
autoInitialize = true autoInitialize = true
): Promise<TestingModule> { ): Promise<TestingModule> {
// setting up // setting up
@@ -127,7 +126,7 @@ export async function createTestingModule(
// can't tolerate the noisy logs // can't tolerate the noisy logs
// @ts-expect-error private // @ts-expect-error private
m.applyLogger({ m.applyLogger({
logger: ['fatal'], logger: [TEST_LOG_LEVEL],
}); });
const runtime = m.get(Runtime); const runtime = m.get(Runtime);
// by pass password min length validation // by pass password min length validation
@@ -146,7 +145,7 @@ export async function createTestingModule(
} }
export async function createTestingApp( export async function createTestingApp(
moduleDef: TestingModuleMeatdata = {} moduleDef: TestingModuleMetadata = {}
): Promise<{ module: TestingModule; app: TestingApp }> { ): Promise<{ module: TestingModule; app: TestingApp }> {
const m = await createTestingModule(moduleDef, false); const m = await createTestingModule(moduleDef, false);
@@ -155,9 +154,9 @@ export async function createTestingApp(
bodyParser: true, bodyParser: true,
rawBody: true, rawBody: true,
}) as TestingApp; }) as TestingApp;
const logger = new ConsoleLogger(); const logger = new AFFiNELogger();
logger.setLogLevels(['fatal']); logger.setLogLevels([TEST_LOG_LEVEL]);
app.useLogger(logger); app.useLogger(logger);
app.useGlobalFilters(new GlobalExceptionFilter(app.getHttpAdapter())); app.useGlobalFilters(new GlobalExceptionFilter(app.getHttpAdapter()));

View File

@@ -1,5 +1,3 @@
import { randomUUID } from 'node:crypto';
import { import {
DynamicModule, DynamicModule,
ForwardReference, ForwardReference,
@@ -15,7 +13,7 @@ import { get } from 'lodash-es';
import { ClsModule } from 'nestjs-cls'; import { ClsModule } from 'nestjs-cls';
import { AppController } from './app.controller'; import { AppController } from './app.controller';
import { getOptionalModuleMetadata } from './base'; import { genRequestId, getOptionalModuleMetadata } from './base';
import { CacheModule } from './base/cache'; import { CacheModule } from './base/cache';
import { AFFiNEConfig, ConfigModule, mergeConfigOverride } from './base/config'; import { AFFiNEConfig, ConfigModule, mergeConfigOverride } from './base/config';
import { ErrorModule } from './base/error'; import { ErrorModule } from './base/error';
@@ -59,7 +57,7 @@ export const FunctionalityModules = [
generateId: true, generateId: true,
idGenerator(req: Request) { idGenerator(req: Request) {
// make every request has a unique id to tracing // make every request has a unique id to tracing
return req.get('x-rpc-trace-id') ?? `req-${randomUUID()}`; return req.get('x-rpc-trace-id') ?? genRequestId('req');
}, },
setup(cls, _req, res: Response) { setup(cls, _req, res: Response) {
res.setHeader('X-Request-Id', cls.getId()); res.setHeader('X-Request-Id', cls.getId());
@@ -72,7 +70,7 @@ export const FunctionalityModules = [
generateId: true, generateId: true,
idGenerator() { idGenerator() {
// make every request has a unique id to tracing // make every request has a unique id to tracing
return `ws-${randomUUID()}`; return genRequestId('ws');
}, },
}, },
plugins: [ plugins: [
@@ -200,6 +198,12 @@ export function buildAppModule() {
.use(...FunctionalityModules) .use(...FunctionalityModules)
.use(ModelsModule) .use(ModelsModule)
// enable schedule module on graphql server and doc service
.useIf(
config => config.flavor.graphql || config.flavor.doc,
ScheduleModule.forRoot()
)
// auth // auth
.use(UserModule, AuthModule, PermissionModule) .use(UserModule, AuthModule, PermissionModule)
@@ -212,7 +216,6 @@ export function buildAppModule() {
// graphql server only // graphql server only
.useIf( .useIf(
config => config.flavor.graphql, config => config.flavor.graphql,
ScheduleModule.forRoot(),
GqlModule, GqlModule,
StorageModule, StorageModule,
ServerConfigModule, ServerConfigModule,

View File

@@ -1,5 +1,3 @@
import { randomUUID } from 'node:crypto';
import { import {
applyDecorators, applyDecorators,
Injectable, Injectable,
@@ -21,6 +19,7 @@ import { CLS_ID, ClsService } from 'nestjs-cls';
import type { Server, Socket } from 'socket.io'; import type { Server, Socket } from 'socket.io';
import { CallMetric } from '../metrics'; import { CallMetric } from '../metrics';
import { genRequestId } from '../utils';
import type { EventName } from './def'; import type { EventName } from './def';
const EventHandlerWrapper = (event: EventName): MethodDecorator => { const EventHandlerWrapper = (event: EventName): MethodDecorator => {
@@ -94,7 +93,7 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap {
// to internal event system // to internal event system
this.server?.on(event, (payload, requestId?: string) => { this.server?.on(event, (payload, requestId?: string) => {
this.cls.run(() => { this.cls.run(() => {
requestId = requestId ?? `server_event-${randomUUID()}`; requestId = requestId ?? genRequestId('se');
this.cls.set(CLS_ID, requestId); this.cls.set(CLS_ID, requestId);
this.logger.log(`Server Event: ${event} (Received)`); this.logger.log(`Server Event: ${event} (Received)`);
this.emit(event, payload); this.emit(event, payload);

View File

@@ -1,3 +1,4 @@
import { randomUUID } from 'node:crypto';
import { IncomingMessage } from 'node:http'; import { IncomingMessage } from 'node:http';
import type { ArgumentsHost, ExecutionContext } from '@nestjs/common'; import type { ArgumentsHost, ExecutionContext } from '@nestjs/common';
@@ -77,3 +78,18 @@ export function parseCookies(
{} as Record<string, string> {} as Record<string, string>
); );
} }
/**
* Request type
*
* @description
* - `req`: http request
* - `ws`: websocket request
* - `se`: server event
* - `job`: cron job
*/
export type RequestType = 'req' | 'ws' | 'se' | 'job';
export function genRequestId(type: RequestType) {
return `${AFFiNE.flavor.type}:${type}-${randomUUID()}`;
}

View File

@@ -0,0 +1,85 @@
import { randomUUID } from 'node:crypto';
import { User, Workspace } from '@prisma/client';
import ava, { TestFn } from 'ava';
import request from 'supertest';
import { Doc as YDoc } from 'yjs';
import { createTestingApp, type TestingApp } from '../../../__tests__/utils';
import { AppModule } from '../../../app.module';
import { Config } from '../../../base';
import { ConfigModule } from '../../../base/config';
import { Models } from '../../../models';
import { PgWorkspaceDocStorageAdapter } from '../../doc';
import { PermissionService } from '../../permission';
const test = ava as TestFn<{
models: Models;
app: TestingApp;
config: Config;
adapter: PgWorkspaceDocStorageAdapter;
permission: PermissionService;
}>;
test.before(async t => {
const { app } = await createTestingApp({
imports: [
ConfigModule.forRoot({
flavor: {
doc: false,
},
docService: {
endpoint: '',
},
}),
AppModule,
],
});
t.context.models = app.get(Models);
t.context.config = app.get(Config);
t.context.adapter = app.get(PgWorkspaceDocStorageAdapter);
t.context.permission = app.get(PermissionService);
t.context.app = app;
});
let user: User;
let workspace: Workspace;
test.beforeEach(async t => {
t.context.config.docService.endpoint = t.context.app.getHttpServerUrl();
await t.context.app.initTestingDB();
user = await t.context.models.user.create({
email: 'test@affine.pro',
});
workspace = await t.context.models.workspace.create(user.id);
});
test.after.always(async t => {
await t.context.app.close();
});
test('should render page success', async t => {
const docId = randomUUID();
const { app, adapter, permission } = t.context;
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await adapter.pushDocUpdates(workspace.id, docId, updates, user.id);
await permission.publishPage(workspace.id, docId);
await request(app.getHttpServer())
.get(`/workspace/${workspace.id}/${docId}`)
.expect(200);
t.pass();
});

View File

@@ -0,0 +1,85 @@
import { randomUUID } from 'node:crypto';
import { User, Workspace } from '@prisma/client';
import ava, { TestFn } from 'ava';
import { Doc as YDoc } from 'yjs';
import { createTestingApp, type TestingApp } from '../../../__tests__/utils';
import { AppModule } from '../../../app.module';
import { Config } from '../../../base';
import { ConfigModule } from '../../../base/config';
import { Models } from '../../../models';
import { PgWorkspaceDocStorageAdapter } from '../../doc';
import { DocContentService } from '..';
const test = ava as TestFn<{
models: Models;
app: TestingApp;
docContentService: DocContentService;
config: Config;
adapter: PgWorkspaceDocStorageAdapter;
}>;
test.before(async t => {
const { app } = await createTestingApp({
imports: [
ConfigModule.forRoot({
flavor: {
doc: false,
},
docService: {
endpoint: '',
},
}),
AppModule,
],
});
t.context.models = app.get(Models);
t.context.docContentService = app.get(DocContentService);
t.context.config = app.get(Config);
t.context.adapter = app.get(PgWorkspaceDocStorageAdapter);
t.context.app = app;
});
let user: User;
let workspace: Workspace;
test.beforeEach(async t => {
t.context.config.docService.endpoint = t.context.app.getHttpServerUrl();
await t.context.app.initTestingDB();
user = await t.context.models.user.create({
email: 'test@affine.pro',
});
workspace = await t.context.models.workspace.create(user.id);
});
test.after.always(async t => {
await t.context.app.close();
});
test('should get doc content from doc service rpc', async t => {
const docId = randomUUID();
const { docContentService } = t.context;
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await t.context.adapter.pushDocUpdates(workspace.id, docId, updates, user.id);
const docContent = await docContentService.getPageContent(
workspace.id,
docId
);
// TODO(@fengmk2): should create a test ydoc with blocks
t.is(docContent, null);
});

View File

@@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common';
import { applyUpdate, Doc } from 'yjs'; import { applyUpdate, Doc } from 'yjs';
import { Cache, OnEvent } from '../../base'; import { Cache, OnEvent } from '../../base';
import { PgWorkspaceDocStorageAdapter } from '../doc'; import { DocReader } from '../doc';
import { import {
type PageDocContent, type PageDocContent,
parsePageDoc, parsePageDoc,
@@ -14,7 +14,7 @@ import {
export class DocContentService { export class DocContentService {
constructor( constructor(
private readonly cache: Cache, private readonly cache: Cache,
private readonly workspace: PgWorkspaceDocStorageAdapter private readonly docReader: DocReader
) {} ) {}
async getPageContent( async getPageContent(
@@ -28,7 +28,7 @@ export class DocContentService {
return cachedResult; return cachedResult;
} }
const docRecord = await this.workspace.getDoc(workspaceId, guid); const docRecord = await this.docReader.getDoc(workspaceId, guid);
if (!docRecord) { if (!docRecord) {
return null; return null;
} }
@@ -61,7 +61,7 @@ export class DocContentService {
return cachedResult; return cachedResult;
} }
const docRecord = await this.workspace.getDoc(workspaceId, workspaceId); const docRecord = await this.docReader.getDoc(workspaceId, workspaceId);
if (!docRecord) { if (!docRecord) {
return null; return null;
} }

View File

@@ -0,0 +1,109 @@
import { mock } from 'node:test';
import { ScheduleModule } from '@nestjs/schedule';
import ava, { TestFn } from 'ava';
import * as Sinon from 'sinon';
import { Doc as YDoc } from 'yjs';
import {
createTestingModule,
type TestingModule,
} from '../../../__tests__/utils';
import { Config } from '../../../base';
import {
DocStorageModule,
PgWorkspaceDocStorageAdapter,
} from '../../../core/doc';
import { Models } from '../../../models';
import { DocServiceModule } from '..';
import { DocServiceCronJob } from '../job';
interface Context {
timer: Sinon.SinonFakeTimers;
module: TestingModule;
cronJob: DocServiceCronJob;
config: Config;
adapter: PgWorkspaceDocStorageAdapter;
models: Models;
}
const test = ava as TestFn<Context>;
// cleanup database before each test
test.before(async t => {
t.context.timer = Sinon.useFakeTimers({
toFake: ['setInterval'],
});
t.context.module = await createTestingModule({
imports: [ScheduleModule.forRoot(), DocStorageModule, DocServiceModule],
});
t.context.cronJob = t.context.module.get(DocServiceCronJob);
t.context.config = t.context.module.get(Config);
t.context.adapter = t.context.module.get(PgWorkspaceDocStorageAdapter);
t.context.models = t.context.module.get(Models);
});
test.beforeEach(async t => {
await t.context.module.initTestingDB();
});
test.afterEach(async t => {
t.context.timer.restore();
Sinon.restore();
mock.reset();
});
test.after.always(async t => {
await t.context.module.close();
});
test('should poll when interval due', async t => {
const cronJob = t.context.cronJob;
const interval = t.context.config.doc.manager.updatePollInterval;
let resolve: any;
const fake = mock.method(cronJob, 'autoMergePendingDocUpdates', () => {
return new Promise(_resolve => {
resolve = _resolve;
});
});
t.context.timer.tick(interval);
t.is(fake.mock.callCount(), 1);
// busy
t.context.timer.tick(interval);
// @ts-expect-error private member
t.is(cronJob.busy, true);
t.is(fake.mock.callCount(), 1);
resolve();
await t.context.timer.tickAsync(1);
// @ts-expect-error private member
t.is(cronJob.busy, false);
t.context.timer.tick(interval);
t.is(fake.mock.callCount(), 2);
});
test('should auto merge pending doc updates', async t => {
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await t.context.adapter.pushDocUpdates('2', '2', updates);
await t.context.cronJob.autoMergePendingDocUpdates();
const rows = await t.context.models.doc.findUpdates('2', '2');
t.is(rows.length, 0);
// again should merge nothing
await t.context.cronJob.autoMergePendingDocUpdates();
});

View File

@@ -1,4 +1,4 @@
import { Controller, Get, Param, Res } from '@nestjs/common'; import { Controller, Get, Logger, Param, Res } from '@nestjs/common';
import type { Response } from 'express'; import type { Response } from 'express';
import { NotFound, SkipThrottle } from '../../base'; import { NotFound, SkipThrottle } from '../../base';
@@ -7,12 +7,14 @@ import { PgWorkspaceDocStorageAdapter } from '../doc';
@Controller('/rpc') @Controller('/rpc')
export class DocRpcController { export class DocRpcController {
private readonly logger = new Logger(DocRpcController.name);
constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {} constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {}
@SkipThrottle() @SkipThrottle()
@Internal() @Internal()
@Get('/workspaces/:workspaceId/docs/:docId') @Get('/workspaces/:workspaceId/docs/:docId')
async render( async getDoc(
@Param('workspaceId') workspaceId: string, @Param('workspaceId') workspaceId: string,
@Param('docId') docId: string, @Param('docId') docId: string,
@Res() res: Response @Res() res: Response
@@ -21,6 +23,7 @@ export class DocRpcController {
if (!doc) { if (!doc) {
throw new NotFound('Doc not found'); throw new NotFound('Doc not found');
} }
this.logger.log(`get doc ${docId} from workspace ${workspaceId}`);
res.setHeader('x-doc-timestamp', doc.timestamp.toString()); res.setHeader('x-doc-timestamp', doc.timestamp.toString());
if (doc.editor) { if (doc.editor) {
res.setHeader('x-doc-editor-id', doc.editor); res.setHeader('x-doc-editor-id', doc.editor);

View File

@@ -2,9 +2,11 @@ import { Module } from '@nestjs/common';
import { DocStorageModule } from '../doc'; import { DocStorageModule } from '../doc';
import { DocRpcController } from './controller'; import { DocRpcController } from './controller';
import { DocServiceCronJob } from './job';
@Module({ @Module({
imports: [DocStorageModule], imports: [DocStorageModule],
providers: [DocServiceCronJob],
controllers: [DocRpcController], controllers: [DocRpcController],
}) })
export class DocServiceModule {} export class DocServiceModule {}

View File

@@ -0,0 +1,61 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
import { CLS_ID, ClsService } from 'nestjs-cls';
import { CallMetric, Config, genRequestId, metrics } from '../../base';
import { PgWorkspaceDocStorageAdapter } from '../doc';
@Injectable()
export class DocServiceCronJob implements OnModuleInit {
private busy = false;
private readonly logger = new Logger(DocServiceCronJob.name);
constructor(
private readonly config: Config,
private readonly cls: ClsService,
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly registry: SchedulerRegistry
) {}
onModuleInit() {
if (this.config.doc.manager.enableUpdateAutoMerging) {
this.registry.addInterval(
this.autoMergePendingDocUpdates.name,
// scheduler registry will clean up the interval when the app is stopped
setInterval(() => {
if (this.busy) {
return;
}
this.busy = true;
this.autoMergePendingDocUpdates()
.catch(() => {
/* never fail */
})
.finally(() => {
this.busy = false;
});
}, this.config.doc.manager.updatePollInterval)
);
this.logger.log('Updates pending queue auto merging cron started');
}
}
@CallMetric('doc', 'auto_merge_pending_doc_updates')
async autoMergePendingDocUpdates() {
await this.cls.run(async () => {
this.cls.set(CLS_ID, genRequestId('job'));
try {
const randomDoc = await this.workspace.randomDoc();
if (!randomDoc) {
return;
}
await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId);
} catch (e) {
metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1);
this.logger.error('Failed to auto merge pending doc updates', e);
}
});
}
}

View File

@@ -334,7 +334,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
}); });
if (updatedSnapshot) { if (updatedSnapshot) {
this.event.emit('doc.snapshot.updated', { this.event.broadcast('doc.snapshot.updated', {
workspaceId: snapshot.spaceId, workspaceId: snapshot.spaceId,
docId: snapshot.docId, docId: snapshot.docId,
}); });

View File

@@ -19,7 +19,11 @@ import { DocReader, DocReaderProvider } from './reader';
DocStorageCronJob, DocStorageCronJob,
DocReaderProvider, DocReaderProvider,
], ],
exports: [PgWorkspaceDocStorageAdapter, PgUserspaceDocStorageAdapter], exports: [
DocReader,
PgWorkspaceDocStorageAdapter,
PgUserspaceDocStorageAdapter,
],
}) })
export class DocStorageModule {} export class DocStorageModule {}
export { export {

View File

@@ -1,61 +1,17 @@
import { Injectable, Logger, OnModuleInit, Optional } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'; import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client'; import { PrismaClient } from '@prisma/client';
import { CallMetric, Config, metrics, OnEvent } from '../../base'; import { metrics, OnEvent } from '../../base';
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
@Injectable() @Injectable()
export class DocStorageCronJob implements OnModuleInit { export class DocStorageCronJob {
private busy = false;
private readonly logger = new Logger(DocStorageCronJob.name);
constructor( constructor(
private readonly config: Config,
private readonly db: PrismaClient, private readonly db: PrismaClient,
private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly workspace: PgWorkspaceDocStorageAdapter
@Optional() private readonly registry?: SchedulerRegistry
) {} ) {}
onModuleInit() {
if (this.registry && this.config.doc.manager.enableUpdateAutoMerging) {
this.registry.addInterval(
this.autoMergePendingDocUpdates.name,
// scheduler registry will clean up the interval when the app is stopped
setInterval(() => {
if (this.busy) {
return;
}
this.busy = true;
this.autoMergePendingDocUpdates()
.catch(() => {
/* never fail */
})
.finally(() => {
this.busy = false;
});
}, this.config.doc.manager.updatePollInterval)
);
this.logger.log('Updates pending queue auto merging cron started');
}
}
@CallMetric('doc', 'auto_merge_pending_doc_updates')
async autoMergePendingDocUpdates() {
try {
const randomDoc = await this.workspace.randomDoc();
if (!randomDoc) {
return;
}
await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId);
} catch (e) {
metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1);
this.logger.error('Failed to auto merge pending doc updates', e);
}
}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */) @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */)
async cleanupExpiredHistory() { async cleanupExpiredHistory() {
await this.db.snapshotHistory.deleteMany({ await this.db.snapshotHistory.deleteMany({

View File

@@ -13,6 +13,7 @@ import {
} from '../../base'; } from '../../base';
import { CurrentUser, Public } from '../auth'; import { CurrentUser, Public } from '../auth';
import { PgWorkspaceDocStorageAdapter } from '../doc'; import { PgWorkspaceDocStorageAdapter } from '../doc';
import { DocReader } from '../doc/reader';
import { PermissionService, PublicDocMode } from '../permission'; import { PermissionService, PublicDocMode } from '../permission';
import { WorkspaceBlobStorage } from '../storage'; import { WorkspaceBlobStorage } from '../storage';
import { DocID } from '../utils/doc'; import { DocID } from '../utils/doc';
@@ -24,6 +25,7 @@ export class WorkspacesController {
private readonly storage: WorkspaceBlobStorage, private readonly storage: WorkspaceBlobStorage,
private readonly permission: PermissionService, private readonly permission: PermissionService,
private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly docReader: DocReader,
private readonly prisma: PrismaClient private readonly prisma: PrismaClient
) {} ) {}
@@ -95,7 +97,7 @@ export class WorkspacesController {
throw new AccessDenied(); throw new AccessDenied();
} }
const binResponse = await this.workspace.getDoc( const binResponse = await this.docReader.getDoc(
docId.workspace, docId.workspace,
docId.guid docId.guid
); );