diff --git a/packages/backend/server/src/core/doc-service/job.ts b/packages/backend/server/src/core/doc-service/job.ts index 088c7630b2..4cef938059 100644 --- a/packages/backend/server/src/core/doc-service/job.ts +++ b/packages/backend/server/src/core/doc-service/job.ts @@ -1,6 +1,8 @@ import { Injectable } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { PrismaClient } from '@prisma/client'; -import { OnJob } from '../../base'; +import { JobQueue, OnJob } from '../../base'; import { PgWorkspaceDocStorageAdapter } from '../doc'; declare global { @@ -14,7 +16,11 @@ declare global { @Injectable() export class DocServiceCronJob { - constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {} + constructor( + private readonly workspace: PgWorkspaceDocStorageAdapter, + private readonly prisma: PrismaClient, + private readonly job: JobQueue + ) {} @OnJob('doc.mergePendingDocUpdates') async mergePendingDocUpdates({ @@ -23,4 +29,29 @@ export class DocServiceCronJob { }: Jobs['doc.mergePendingDocUpdates']) { await this.workspace.getDoc(workspaceId, docId); } + + @Cron(CronExpression.EVERY_10_SECONDS) + async schedule() { + const group = await this.prisma.update.groupBy({ + by: ['workspaceId', 'id'], + _count: true, + }); + + for (const update of group) { + if (update._count > 100) { + await this.job.add( + 'doc.mergePendingDocUpdates', + { + workspaceId: update.workspaceId, + docId: update.id, + }, + { + jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`, + priority: update._count, + delay: 0, + } + ); + } + } + } }