Compare commits

...

2 Commits

Author SHA1 Message Date
EYHN
b427a89c9a fix(core): fix awareness send message repeatedly (#10643) 2025-03-10 12:26:57 +08:00
forehalo
00398fc63a fix(server): reschedule busy doc merging 2025-03-03 18:38:05 +08:00
2 changed files with 34 additions and 3 deletions

View File

@@ -1,6 +1,8 @@
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import { OnJob } from '../../base';
import { JobQueue, OnJob } from '../../base';
import { PgWorkspaceDocStorageAdapter } from '../doc';
declare global {
@@ -14,7 +16,11 @@ declare global {
@Injectable()
export class DocServiceCronJob {
constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {}
constructor(
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly prisma: PrismaClient,
private readonly job: JobQueue
) {}
@OnJob('doc.mergePendingDocUpdates')
async mergePendingDocUpdates({
@@ -23,4 +29,29 @@ export class DocServiceCronJob {
}: Jobs['doc.mergePendingDocUpdates']) {
await this.workspace.getDoc(workspaceId, docId);
}
@Cron(CronExpression.EVERY_10_SECONDS)
async schedule() {
const group = await this.prisma.update.groupBy({
by: ['workspaceId', 'id'],
_count: true,
});
for (const update of group) {
if (update._count > 100) {
await this.job.add(
'doc.mergePendingDocUpdates',
{
workspaceId: update.workspaceId,
docId: update.id,
},
{
jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`,
priority: update._count,
delay: 0,
}
);
}
}
}
}

View File

@@ -47,7 +47,7 @@ export class AwarenessFrontend {
return;
}
applyAwarenessUpdate(awareness, update.bin, origin);
applyAwarenessUpdate(awareness, update.bin, uniqueId);
};
const handleSyncCollect = () => {
return Promise.resolve({