From d975bf46fbe4273a0b4e706c45eff8a1da0d84c1 Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Sun, 5 Apr 2026 10:52:18 +0800 Subject: [PATCH] feat(server): improve calendar sync queue (#14783) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### PR Dependency Tree * **PR #14783** 👈 This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal) ## Summary by CodeRabbit * **New Features** * Configurable request timeout for calendar integrations. * Calendar polling now enqueues per-subscription sync jobs (larger batch) for improved throughput. * **Bug Fixes / Improvements** * Persisted next-sync timestamps and retry counts for more reliable scheduling and retry behavior. * Exponential backoff and webhook renewal now update scheduling consistently. * **Refactor** * Calendar sync flow moved to a job-queue-driven design for better concurrency and observability. --- .docker/selfhost/schema.json | 20 +- .../migration.sql | 27 ++ packages/backend/server/schema.prisma | 4 + .../server/src/base/job/queue/config.ts | 8 + .../backend/server/src/base/job/queue/def.ts | 1 + .../server/src/models/calendar-account.ts | 2 +- .../src/models/calendar-subscription.ts | 44 ++-- .../calendar/__tests__/service.spec.ts | 189 +++++++------- .../server/src/plugins/calendar/config.ts | 4 + .../server/src/plugins/calendar/cron.ts | 69 ++--- .../server/src/plugins/calendar/index.ts | 2 + .../server/src/plugins/calendar/job.ts | 26 ++ .../src/plugins/calendar/providers/def.ts | 17 ++ .../src/plugins/calendar/providers/google.ts | 1 + .../server/src/plugins/calendar/service.ts | 239 ++++++++---------- packages/frontend/admin/src/config.json | 4 + .../i18n/src/i18n-completenesses.json | 2 +- 17 files changed, 352 insertions(+), 307 deletions(-) create mode 100644 packages/backend/server/migrations/20260405013956_calendar_queue_schedule/migration.sql create mode 100644 packages/backend/server/src/plugins/calendar/job.ts diff --git a/.docker/selfhost/schema.json b/.docker/selfhost/schema.json index 362e29f0b6..e959273733 100644 --- a/.docker/selfhost/schema.json +++ b/.docker/selfhost/schema.json @@ -62,6 +62,18 @@ "concurrency": 10 } }, + "queues.calendar": { + "type": "object", + "description": "The config for calendar job queue\n@default {\"concurrency\":4}", + "properties": { + "concurrency": { + "type": "number" + } + }, + "default": { + "concurrency": 4 + } + }, "queues.doc": { "type": "object", "description": "The config for doc job queue\n@default {\"concurrency\":1}", @@ -843,7 +855,7 @@ "properties": { "google": { "type": "object", - "description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\"}\n@link https://developers.google.com/calendar/api/guides/push", + "description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\",\"requestTimeoutMs\":10000}\n@link https://developers.google.com/calendar/api/guides/push", "properties": { "enabled": { "type": "boolean" @@ -859,6 +871,9 @@ }, "webhookVerificationToken": { "type": "string" + }, + "requestTimeoutMs": { + "type": "number" } }, "default": { @@ -866,7 +881,8 @@ "clientId": "", "clientSecret": "", "externalWebhookUrl": "", - "webhookVerificationToken": "" + "webhookVerificationToken": "", + "requestTimeoutMs": 10000 } }, "caldav": { diff --git a/packages/backend/server/migrations/20260405013956_calendar_queue_schedule/migration.sql b/packages/backend/server/migrations/20260405013956_calendar_queue_schedule/migration.sql new file mode 100644 index 0000000000..2b34bcc9e6 --- /dev/null +++ b/packages/backend/server/migrations/20260405013956_calendar_queue_schedule/migration.sql @@ -0,0 +1,27 @@ +-- AlterTable +ALTER TABLE + "calendar_subscriptions" +ADD + COLUMN "next_sync_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, +ADD + COLUMN "sync_retry_count" INTEGER NOT NULL DEFAULT 0; + +UPDATE + "calendar_subscriptions" AS s +SET + "next_sync_at" = CASE + WHEN s."last_sync_at" IS NULL THEN CURRENT_TIMESTAMP + ELSE s."last_sync_at" + make_interval( + mins => COALESCE(a."refresh_interval_minutes", 30) + ) + END +FROM + "calendar_accounts" AS a +WHERE + a."id" = s."account_id"; + +-- CreateIndex +CREATE INDEX "calendar_subscriptions_custom_channel_id_idx" ON "calendar_subscriptions"("custom_channel_id"); + +-- CreateIndex +CREATE INDEX "calendar_subscriptions_enabled_next_sync_at_idx" ON "calendar_subscriptions"("enabled", "next_sync_at"); diff --git a/packages/backend/server/schema.prisma b/packages/backend/server/schema.prisma index c8f82c77fb..6657227ccd 100644 --- a/packages/backend/server/schema.prisma +++ b/packages/backend/server/schema.prisma @@ -1037,6 +1037,8 @@ model CalendarSubscription { enabled Boolean @default(true) syncToken String? @map("sync_token") @db.Text lastSyncAt DateTime? @map("last_sync_at") @db.Timestamptz(3) + nextSyncAt DateTime @default(now()) @map("next_sync_at") @db.Timestamptz(3) + syncRetryCount Int @default(0) @map("sync_retry_count") customChannelId String? @map("custom_channel_id") @db.VarChar customResourceId String? @map("custom_resource_id") @db.VarChar channelExpiration DateTime? @map("channel_expiration") @db.Timestamptz(3) @@ -1050,6 +1052,8 @@ model CalendarSubscription { @@unique([accountId, externalCalendarId]) @@index([accountId]) @@index([provider, externalCalendarId]) + @@index([customChannelId]) + @@index([enabled, nextSyncAt]) @@map("calendar_subscriptions") } diff --git a/packages/backend/server/src/base/job/queue/config.ts b/packages/backend/server/src/base/job/queue/config.ts index c677bb3062..ebb9b95ebb 100644 --- a/packages/backend/server/src/base/job/queue/config.ts +++ b/packages/backend/server/src/base/job/queue/config.ts @@ -55,6 +55,14 @@ defineModuleConfig('job', { schema, }, + 'queues.calendar': { + desc: 'The config for calendar job queue', + default: { + concurrency: 4, + }, + schema, + }, + 'queues.doc': { desc: 'The config for doc job queue', default: { diff --git a/packages/backend/server/src/base/job/queue/def.ts b/packages/backend/server/src/base/job/queue/def.ts index 50e18eb8ea..f19ae5b832 100644 --- a/packages/backend/server/src/base/job/queue/def.ts +++ b/packages/backend/server/src/base/job/queue/def.ts @@ -28,6 +28,7 @@ export enum Queue { DOC = 'doc', COPILOT = 'copilot', INDEXER = 'indexer', + CALENDAR = 'calendar', } export const QUEUES = Object.values(Queue); diff --git a/packages/backend/server/src/models/calendar-account.ts b/packages/backend/server/src/models/calendar-account.ts index d92e2ee4ca..cae4c4ba53 100644 --- a/packages/backend/server/src/models/calendar-account.ts +++ b/packages/backend/server/src/models/calendar-account.ts @@ -92,7 +92,7 @@ export class CalendarAccountModel extends BaseModel { scope: input.scope ?? null, status: input.status ?? 'active', lastError: input.lastError ?? null, - refreshIntervalMinutes: input.refreshIntervalMinutes ?? 60, + refreshIntervalMinutes: input.refreshIntervalMinutes ?? 30, }; const updateData: Prisma.CalendarAccountUncheckedUpdateInput = { diff --git a/packages/backend/server/src/models/calendar-subscription.ts b/packages/backend/server/src/models/calendar-subscription.ts index 5a1f413678..87d6e86287 100644 --- a/packages/backend/server/src/models/calendar-subscription.ts +++ b/packages/backend/server/src/models/calendar-subscription.ts @@ -17,6 +17,8 @@ export interface UpsertCalendarSubscriptionInput { export interface UpdateCalendarSubscriptionSyncInput { syncToken?: string | null; lastSyncAt?: Date | null; + nextSyncAt?: Date; + syncRetryCount?: number; } export interface UpdateCalendarSubscriptionChannelInput { @@ -81,13 +83,21 @@ export class CalendarSubscriptionModel extends BaseModel { } async updateSync(id: string, input: UpdateCalendarSubscriptionSyncInput) { - return await this.db.calendarSubscription.update({ - where: { id }, - data: { - syncToken: input.syncToken ?? null, - lastSyncAt: input.lastSyncAt ?? null, - }, - }); + const data: Prisma.CalendarSubscriptionUncheckedUpdateInput = {}; + if (input.syncToken !== undefined) { + data.syncToken = input.syncToken ?? null; + } + if (input.lastSyncAt !== undefined) { + data.lastSyncAt = input.lastSyncAt ?? null; + } + if (input.nextSyncAt !== undefined) { + data.nextSyncAt = input.nextSyncAt; + } + if (input.syncRetryCount !== undefined) { + data.syncRetryCount = input.syncRetryCount; + } + + return await this.db.calendarSubscription.update({ where: { id }, data }); } async updateChannel( @@ -155,10 +165,16 @@ export class CalendarSubscriptionModel extends BaseModel { }); } - async listAllWithAccountForSync() { + async listDueForSync(now: Date, limit: number) { return await this.db.calendarSubscription.findMany({ - where: { enabled: true }, - include: { account: true }, + where: { + enabled: true, + nextSyncAt: { lte: now }, + account: { status: 'active' }, + }, + select: { id: true }, + orderBy: { nextSyncAt: 'asc' }, + take: limit, }); } @@ -169,13 +185,6 @@ export class CalendarSubscriptionModel extends BaseModel { }); } - async updateLastSyncAt(id: string, lastSyncAt: Date) { - return await this.db.calendarSubscription.update({ - where: { id }, - data: { lastSyncAt }, - }); - } - async clearSyncTokensByAccount(accountId: string) { return await this.db.calendarSubscription.updateMany({ where: { accountId }, @@ -200,6 +209,7 @@ export class CalendarSubscriptionModel extends BaseModel { data: { enabled: false, syncToken: null, + syncRetryCount: 0, customChannelId: null, customResourceId: null, channelExpiration: null, diff --git a/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts b/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts index bc8ba059a9..8604497f3f 100644 --- a/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts +++ b/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts @@ -5,11 +5,7 @@ import test from 'ava'; import { createModule } from '../../../__tests__/create-module'; import { Mockers } from '../../../__tests__/mocks'; -import { - CalendarProviderRequestError, - CryptoHelper, - Mutex, -} from '../../../base'; +import { CalendarProviderRequestError, CryptoHelper } from '../../../base'; import { ConfigModule } from '../../../base/config'; import { ServerConfigModule } from '../../../core/config'; import type { @@ -93,7 +89,6 @@ const calendarService = module.get(CalendarService); const calendarCronJobs = module.get(CalendarCronJobs); const providerFactory = module.get(CalendarProviderFactory); const models = module.get(Models); -const mutex = module.get(Mutex); module.get(CryptoHelper).onConfigInit(); const createAccount = async ( @@ -120,6 +115,8 @@ const createSubscription = async ( accountId: string, overrides: Partial & { syncToken?: string | null; + nextSyncAt?: Date; + syncRetryCount?: number; customChannelId?: string | null; customResourceId?: string | null; channelExpiration?: Date | null; @@ -141,6 +138,20 @@ const createSubscription = async ( }); } + if ( + overrides.nextSyncAt !== undefined || + overrides.syncRetryCount !== undefined + ) { + await models.calendarSubscription.updateSync(subscription.id, { + ...(overrides.nextSyncAt !== undefined + ? { nextSyncAt: overrides.nextSyncAt } + : {}), + ...(overrides.syncRetryCount !== undefined + ? { syncRetryCount: overrides.syncRetryCount } + : {}), + }); + } + if ( overrides.customChannelId !== undefined || overrides.customResourceId !== undefined || @@ -158,6 +169,8 @@ const createSubscription = async ( test.afterEach.always(() => { mock.reset(); + module.queue.add.resetHistory(); + module.queue.remove.resetHistory(); }); test.after.always(async () => { @@ -259,6 +272,9 @@ test('syncSubscription resets invalid sync token and maps events', async t => { const updated = await models.calendarSubscription.get(subscription.id); t.is(updated?.syncToken, 'next-token'); t.truthy(updated?.lastSyncAt); + t.is(updated?.syncRetryCount, 0); + t.truthy(updated?.nextSyncAt); + t.true(updated!.nextSyncAt.getTime() > updated!.lastSyncAt!.getTime()); const events = await models.calendarEvent.listBySubscriptionsInRange( [subscription.id], @@ -500,51 +516,22 @@ test('syncSubscription applies exponential backoff for repeated failures', async mock.method(Date, 'now', () => now); await calendarService.syncSubscription(subscription.id); - await calendarService.syncSubscription(subscription.id); + let updated = await models.calendarSubscription.get(subscription.id); t.is(listEventsMock.mock.callCount(), 1); + t.is(updated?.syncRetryCount, 1); + t.is( + updated?.nextSyncAt.toISOString(), + new Date(now + baseDelayMs).toISOString() + ); - now += baseDelayMs + 1000; await calendarService.syncSubscription(subscription.id); + updated = await models.calendarSubscription.get(subscription.id); t.is(listEventsMock.mock.callCount(), 2); - - now += baseDelayMs + 1000; - await calendarService.syncSubscription(subscription.id); - t.is(listEventsMock.mock.callCount(), 2); -}); - -test('syncSubscription skips token refresh while in backoff window', async t => { - let now = new Date('2026-01-01T00:00:00.000Z').getTime(); - mock.method(Date, 'now', () => now); - - const user = await module.create(Mockers.User); - const account = await createAccount(user.id, { - accessToken: 'expired-access-token', - expiresAt: new Date(now - 5 * 60 * 1000), - }); - const subscription = await createSubscription(account.id, { - syncToken: 'sync-token', - }); - - const provider = new MockCalendarProvider(); - const refreshMock = mock.method(provider, 'refreshTokens', async () => ({ - accessToken: `refreshed-${randomUUID()}`, - })); - const listEventsMock = mock.method(provider, 'listEvents', async () => { - throw new Error('upstream timeout'); - }); - mock.method(providerFactory, 'get', () => provider); - - const baseDelayMs = 5 * 60 * 1000; - - await calendarService.syncSubscription(subscription.id); - await calendarService.syncSubscription(subscription.id); - t.is(refreshMock.mock.callCount(), 1); - t.is(listEventsMock.mock.callCount(), 1); - - now += baseDelayMs + 1000; - await calendarService.syncSubscription(subscription.id); - t.is(refreshMock.mock.callCount(), 2); - t.is(listEventsMock.mock.callCount(), 2); + t.is(updated?.syncRetryCount, 2); + t.is( + updated?.nextSyncAt.toISOString(), + new Date(now + baseDelayMs * 2).toISOString() + ); }); test('syncSubscription renews webhook channel when expiring', async t => { @@ -607,64 +594,72 @@ test('syncSubscription renews webhook channel when expiring', async t => { t.truthy(updated?.channelExpiration); }); -test('pollAccounts skips syncing when cluster lock is unavailable', async t => { - mock.method(mutex, 'acquire', async () => undefined); - mock.method( - models.calendarSubscription, - 'listAllWithAccountForSync', - async () => [] - ); - const syncAccountMock = mock.method( - calendarService, - 'syncAccount', - async () => { - return; - } - ); +test('syncSubscription keeps schedule moving when webhook renewal fails', async t => { + const now = new Date('2026-01-01T00:00:00.000Z').getTime(); + mock.method(Date, 'now', () => now); - await calendarCronJobs.pollAccounts(); + const user = await module.create(Mockers.User); + const account = await createAccount(user.id, { + refreshIntervalMinutes: 60, + }); + const subscription = await createSubscription(account.id, { + syncToken: 'sync-token', + channelExpiration: new Date(Date.now() + 60 * 60 * 1000), + }); - t.is(syncAccountMock.mock.callCount(), 0); -}); - -test('pollAccounts only syncs due accounts', async t => { - mock.method(mutex, 'acquire', async () => ({ - [Symbol.asyncDispose]: async () => {}, + const provider = new MockCalendarProvider(); + mock.method(provider, 'listEvents', async () => ({ + events: [], + nextSyncToken: 'next-sync', })); - mock.method( - models.calendarSubscription, - 'listAllWithAccountForSync', - async () => - [ - { - accountId: 'due-account', - lastSyncAt: new Date(Date.now() - 31 * 60 * 1000), - account: { - refreshIntervalMinutes: 30, - }, - }, - { - accountId: 'fresh-account', - lastSyncAt: new Date(Date.now() - 5 * 60 * 1000), - account: { - refreshIntervalMinutes: 30, - }, - }, - ] as any - ); + mock.method(provider, 'watchCalendar', async () => { + throw new Error('watch failed'); + }); + mock.method(providerFactory, 'get', () => provider); - const syncAccountMock = mock.method( - calendarService, - 'syncAccount', - async () => { - return; - } + await calendarService.syncSubscription(subscription.id); + + const updated = await models.calendarSubscription.get(subscription.id); + t.truthy(updated?.lastSyncAt); + t.is(updated?.syncRetryCount, 0); + t.is( + updated?.nextSyncAt.toISOString(), + new Date(now + 15 * 60 * 1000).toISOString() ); +}); + +test('pollAccounts skips when nothing is due', async t => { + mock.method(models.calendarSubscription, 'listDueForSync', async () => []); await calendarCronJobs.pollAccounts(); + t.is(module.queue.count('calendar.syncSubscription'), 0); +}); + +test('pollAccounts enqueues due subscriptions only', async t => { + mock.method(models.calendarSubscription, 'listDueForSync', async () => [ + { id: 'due-subscription-a' }, + { id: 'due-subscription-b' }, + ]); + + await calendarCronJobs.pollAccounts(); + + t.is(module.queue.count('calendar.syncSubscription'), 2); t.deepEqual( - syncAccountMock.mock.calls.map(call => call.arguments[0]), - ['due-account'] + module.queue.add + .getCalls() + .map(call => [call.args[0], call.args[1], call.args[2]]), + [ + [ + 'calendar.syncSubscription', + { subscriptionId: 'due-subscription-a', reason: 'polling' }, + { jobId: 'due-subscription-a' }, + ], + [ + 'calendar.syncSubscription', + { subscriptionId: 'due-subscription-b', reason: 'polling' }, + { jobId: 'due-subscription-b' }, + ], + ] ); }); diff --git a/packages/backend/server/src/plugins/calendar/config.ts b/packages/backend/server/src/plugins/calendar/config.ts index 8ee8872019..4136d2298f 100644 --- a/packages/backend/server/src/plugins/calendar/config.ts +++ b/packages/backend/server/src/plugins/calendar/config.ts @@ -8,6 +8,7 @@ export interface CalendarGoogleConfig { clientSecret: string; externalWebhookUrl?: string; webhookVerificationToken?: string; + requestTimeoutMs?: number; } export type CalendarCalDAVAuthType = 'auto' | 'basic' | 'digest'; @@ -49,6 +50,7 @@ const schema: JSONSchema = { clientSecret: { type: 'string' }, externalWebhookUrl: { type: 'string' }, webhookVerificationToken: { type: 'string' }, + requestTimeoutMs: { type: 'number' }, }, }; @@ -88,6 +90,7 @@ defineModuleConfig('calendar', { clientSecret: '', externalWebhookUrl: '', webhookVerificationToken: '', + requestTimeoutMs: 10_000, }, schema, shape: z.object({ @@ -101,6 +104,7 @@ defineModuleConfig('calendar', { .or(z.string().length(0)) .optional(), webhookVerificationToken: z.string().optional(), + requestTimeoutMs: z.number().int().positive().optional(), }), link: 'https://developers.google.com/calendar/api/guides/push', }, diff --git a/packages/backend/server/src/plugins/calendar/cron.ts b/packages/backend/server/src/plugins/calendar/cron.ts index b9474a170d..6fed172975 100644 --- a/packages/backend/server/src/plugins/calendar/cron.ts +++ b/packages/backend/server/src/plugins/calendar/cron.ts @@ -1,72 +1,33 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; -import { chunk } from 'lodash-es'; -import { Mutex } from '../../base'; +import { JobQueue } from '../../base'; import { Models } from '../../models'; -import { CalendarService } from './service'; -const CALENDAR_POLL_LOCK_KEY = 'calendar:poll-accounts'; -const CALENDAR_POLL_BATCH_SIZE = 10; +const CALENDAR_POLL_BATCH_SIZE = 200; @Injectable() export class CalendarCronJobs { constructor( private readonly models: Models, - private readonly calendar: CalendarService, - private readonly mutex: Mutex + private readonly queue: JobQueue ) {} @Cron(CronExpression.EVERY_MINUTE) async pollAccounts() { - await using lock = await this.mutex.acquire(CALENDAR_POLL_LOCK_KEY); - if (!lock) return; + const subscriptions = await this.models.calendarSubscription.listDueForSync( + new Date(), + CALENDAR_POLL_BATCH_SIZE + ); - const subscriptions = - await this.models.calendarSubscription.listAllWithAccountForSync(); - - const accountDueAt = new Map< - string, - { refreshInterval: number; lastSyncAt: Date | null } - >(); - - for (const subscription of subscriptions) { - const interval = subscription.account.refreshIntervalMinutes ?? 60; - const lastSyncAt = subscription.lastSyncAt ?? null; - const existing = accountDueAt.get(subscription.accountId); - if (!existing) { - accountDueAt.set(subscription.accountId, { - refreshInterval: interval, - lastSyncAt, - }); - continue; - } - - const earliest = - existing.lastSyncAt && lastSyncAt - ? existing.lastSyncAt < lastSyncAt - ? existing.lastSyncAt - : lastSyncAt - : (existing.lastSyncAt ?? lastSyncAt); - accountDueAt.set(subscription.accountId, { - refreshInterval: interval, - lastSyncAt: earliest, - }); - } - - const now = Date.now(); - const dueAccountIds = Array.from(accountDueAt.entries()) - .filter( - ([, info]) => - !info.lastSyncAt || - now - info.lastSyncAt.getTime() >= info.refreshInterval * 60 * 1000 + await Promise.allSettled( + subscriptions.map(({ id }) => + this.queue.add( + 'calendar.syncSubscription', + { subscriptionId: id, reason: 'polling' }, + { jobId: id } + ) ) - .map(([accountId]) => accountId); - - for (const accountIds of chunk(dueAccountIds, CALENDAR_POLL_BATCH_SIZE)) { - await Promise.allSettled( - accountIds.map(accountId => this.calendar.syncAccount(accountId)) - ); - } + ); } } diff --git a/packages/backend/server/src/plugins/calendar/index.ts b/packages/backend/server/src/plugins/calendar/index.ts index 25e82ad5ab..a01a57141f 100644 --- a/packages/backend/server/src/plugins/calendar/index.ts +++ b/packages/backend/server/src/plugins/calendar/index.ts @@ -7,6 +7,7 @@ import { PermissionModule } from '../../core/permission'; import { WorkspaceModule } from '../../core/workspaces'; import { CalendarController } from './controller'; import { CalendarCronJobs } from './cron'; +import { CalendarJob } from './job'; import { CalendarOAuthService } from './oauth'; import { CalendarProviderFactory, CalendarProviders } from './providers'; import { @@ -25,6 +26,7 @@ import { CalendarService } from './service'; ...CalendarProviders, CalendarProviderFactory, CalendarService, + CalendarJob, CalendarOAuthService, CalendarCronJobs, CalendarServerConfigResolver, diff --git a/packages/backend/server/src/plugins/calendar/job.ts b/packages/backend/server/src/plugins/calendar/job.ts new file mode 100644 index 0000000000..9296b5a851 --- /dev/null +++ b/packages/backend/server/src/plugins/calendar/job.ts @@ -0,0 +1,26 @@ +import { Injectable } from '@nestjs/common'; + +import { OnJob } from '../../base'; +import { CalendarService } from './service'; + +declare global { + interface Jobs { + 'calendar.syncSubscription': { + subscriptionId: string; + reason?: 'polling' | 'webhook' | 'on-demand'; + }; + } +} + +@Injectable() +export class CalendarJob { + constructor(private readonly calendar: CalendarService) {} + + @OnJob('calendar.syncSubscription') + async syncSubscription({ + subscriptionId, + reason, + }: Jobs['calendar.syncSubscription']) { + await this.calendar.syncSubscription(subscriptionId, { reason }); + } +} diff --git a/packages/backend/server/src/plugins/calendar/providers/def.ts b/packages/backend/server/src/plugins/calendar/providers/def.ts index 02ad30482a..240fc47e66 100644 --- a/packages/backend/server/src/plugins/calendar/providers/def.ts +++ b/packages/backend/server/src/plugins/calendar/providers/def.ts @@ -152,9 +152,26 @@ export abstract class CalendarProvider { } } + protected get requestTimeoutMs() { + const timeout = (this.config as { requestTimeoutMs?: number } | undefined) + ?.requestTimeoutMs; + return typeof timeout === 'number' && timeout > 0 ? timeout : undefined; + } + + protected withTimeout(signal?: AbortSignal | null) { + const timeoutMs = this.requestTimeoutMs; + if (!timeoutMs) return signal; + + const timeoutSignal = AbortSignal.timeout(timeoutMs); + if (!signal) return timeoutSignal; + + return AbortSignal.any([signal, timeoutSignal]); + } + protected async fetchJson(url: string, init?: RequestInit) { const response = await fetch(url, { ...init, + signal: this.withTimeout(init?.signal), headers: { ...init?.headers, Accept: 'application/json' }, }); const body = await response.text(); diff --git a/packages/backend/server/src/plugins/calendar/providers/google.ts b/packages/backend/server/src/plugins/calendar/providers/google.ts index 475f55e060..92f5bc3e67 100644 --- a/packages/backend/server/src/plugins/calendar/providers/google.ts +++ b/packages/backend/server/src/plugins/calendar/providers/google.ts @@ -329,6 +329,7 @@ export class GoogleCalendarProvider extends CalendarProvider { private async fetchWithTokenHandling(url: string, accessToken: string) { const response = await fetch(url, { + signal: this.withTimeout(), headers: { Accept: 'application/json', Authorization: `Bearer ${accessToken}`, diff --git a/packages/backend/server/src/plugins/calendar/service.ts b/packages/backend/server/src/plugins/calendar/service.ts index 0eea2aae20..7fdf6683ce 100644 --- a/packages/backend/server/src/plugins/calendar/service.ts +++ b/packages/backend/server/src/plugins/calendar/service.ts @@ -4,16 +4,15 @@ import { Injectable, Logger } from '@nestjs/common'; import { Transactional } from '@nestjs-cls/transactional'; import type { CalendarAccount, Prisma } from '@prisma/client'; import { addDays, subDays } from 'date-fns'; -import { chunk } from 'lodash-es'; import { CalendarProviderRequestError, Config, + exponentialBackoffDelay, GraphqlBadRequest, - Mutex, + JobQueue, URLHelper, } from '../../base'; -import { SessionRedis } from '../../base/redis'; import { Models } from '../../models'; import type { CalendarCalDAVProviderPreset } from './config'; import { @@ -29,11 +28,10 @@ import type { LinkCalDAVAccountInput } from './types'; const TOKEN_REFRESH_SKEW_MS = 60 * 1000; const DEFAULT_PAST_DAYS = 90; const DEFAULT_FUTURE_DAYS = 180; -const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:'; const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000; const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000; -const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60; -const ACCOUNT_SYNC_BATCH_SIZE = 10; +const DEFAULT_REFRESH_INTERVAL_MINUTES = 30; +const CHANNEL_RENEW_RETRY_MS = 15 * 60 * 1000; @Injectable() export class CalendarService { @@ -43,8 +41,7 @@ export class CalendarService { constructor( private readonly models: Models, private readonly providerFactory: CalendarProviderFactory, - private readonly mutex: Mutex, - private readonly redis: SessionRedis, + private readonly queue: JobQueue, private readonly config: Config, private readonly url: URLHelper ) {} @@ -87,10 +84,24 @@ export class CalendarService { return null; } - return await this.models.calendarAccount.updateRefreshInterval( - accountId, - refreshIntervalMinutes + const updatedAccount = + await this.models.calendarAccount.updateRefreshInterval( + accountId, + refreshIntervalMinutes + ); + const subscriptions = + await this.models.calendarSubscription.listByAccountForSync(accountId); + await Promise.all( + subscriptions.map(subscription => + this.models.calendarSubscription.updateSync(subscription.id, { + nextSyncAt: this.calculateNextSyncAt( + subscription.lastSyncAt ?? this.now(), + refreshIntervalMinutes + ), + }) + ) ); + return updatedAccount; } async unlinkAccount(userId: string, accountId: string) { @@ -315,25 +326,6 @@ export class CalendarService { return; } - const now = Date.now(); - const backoff = await this.getSyncFailureBackoff(subscription.id); - if (backoff && now < backoff.nextRetryAt.getTime()) { - return; - } - - await using lock = await this.mutex.acquire( - `calendar:subscription:${subscriptionId}` - ); - if (!lock) { - return; - } - - const lockedNow = Date.now(); - const lockedBackoff = await this.getSyncFailureBackoff(subscription.id); - if (lockedBackoff && lockedNow < lockedBackoff.nextRetryAt.getTime()) { - return; - } - const provider = this.providerFactory.get( account.provider as CalendarProviderName ); @@ -417,30 +409,27 @@ export class CalendarService { } if (synced) { - await this.clearSyncFailureBackoff(subscription.id); - await this.ensureWebhookChannel(subscription, provider, accessToken); - } - - await this.models.calendarSubscription.updateLastSyncAt( - subscription.id, - new Date() - ); - } - - async syncAccount(accountId: string) { - const account = await this.models.calendarAccount.get(accountId); - if (!account || account.status !== 'active') { - return; - } - - const subscriptions = - await this.models.calendarSubscription.listByAccountForSync(accountId); - for (const batch of chunk(subscriptions, ACCOUNT_SYNC_BATCH_SIZE)) { - await Promise.allSettled( - batch.map(subscription => - this.syncSubscription(subscription.id, { reason: 'polling' }) - ) + const syncedAt = this.now(); + let nextSyncAt = this.calculateNextSyncAt( + syncedAt, + account.refreshIntervalMinutes ); + + try { + await this.ensureWebhookChannel(subscription, provider, accessToken); + } catch (error) { + nextSyncAt = this.calculateChannelRetryAt(nextSyncAt); + this.logger.warn( + `Failed to ensure webhook channel for subscription ${subscription.id}`, + this.toError(error) + ); + } + + await this.models.calendarSubscription.updateSync(subscription.id, { + lastSyncAt: syncedAt, + nextSyncAt, + syncRetryCount: 0, + }); } } @@ -459,9 +448,18 @@ export class CalendarService { params.to ); + const subscriptions = + await this.models.calendarSubscription.listWithAccounts(subscriptionIds); + const staleSubscriptions = subscriptions.filter( + subscription => + subscription.enabled && + subscription.account.status === 'active' && + subscription.nextSyncAt.getTime() <= this.nowMs() + ); + Promise.allSettled( - subscriptionIds.map(subscriptionId => - this.syncSubscription(subscriptionId, { reason: 'on-demand' }) + staleSubscriptions.map(subscription => + this.enqueueSyncSubscription(subscription.id, 'on-demand') ) ).catch(error => { this.logger.warn('Calendar on-demand sync failed', error as Error); @@ -517,7 +515,7 @@ export class CalendarService { return; } - await this.syncSubscription(subscription.id, { reason: 'webhook' }); + await this.enqueueSyncSubscription(subscription.id, 'webhook'); } getWebhookToken() { @@ -751,7 +749,7 @@ export class CalendarService { } private getSyncWindow() { - const now = new Date(); + const now = this.now(); return { timeMin: subDays(now, DEFAULT_PAST_DAYS).toISOString(), timeMax: addDays(now, DEFAULT_FUTURE_DAYS).toISOString(), @@ -771,7 +769,7 @@ export class CalendarService { if ( accessToken && account.expiresAt && - account.expiresAt.getTime() > Date.now() + TOKEN_REFRESH_SKEW_MS + account.expiresAt.getTime() > this.nowMs() + TOKEN_REFRESH_SKEW_MS ) { return { accessToken }; } @@ -835,7 +833,7 @@ export class CalendarService { return; } - const renewThreshold = Date.now() + 24 * 60 * 60 * 1000; + const renewThreshold = this.nowMs() + 24 * 60 * 60 * 1000; if ( subscription.channelExpiration && subscription.channelExpiration.getTime() > renewThreshold @@ -877,6 +875,7 @@ export class CalendarService { subscription: { id: string; externalCalendarId: string; + syncRetryCount: number; customChannelId: string | null; customResourceId: string | null; }; @@ -899,7 +898,6 @@ export class CalendarService { } if (this.isTokenInvalidError(params.error)) { - await this.clearSyncFailureBackoff(params.subscription.id); await this.models.calendarAccount.invalidateAndPurge( params.account.id, this.formatSyncError(params.error) @@ -907,18 +905,14 @@ export class CalendarService { return; } - const backoff = await this.bumpSyncFailureBackoff(params.subscription.id); - const interval = params.account.refreshIntervalMinutes ?? 60; - const lastSyncAt = this.calculateLastSyncAtForRetry( - backoff.nextRetryAt, - interval - ); - await this.models.calendarSubscription.updateLastSyncAt( - params.subscription.id, - lastSyncAt - ); + const attempt = params.subscription.syncRetryCount + 1; + const nextRetryAt = this.calculateFailureRetryAt(attempt); + await this.models.calendarSubscription.updateSync(params.subscription.id, { + nextSyncAt: nextRetryAt, + syncRetryCount: attempt, + }); this.logger.warn( - `Calendar sync failed for subscription ${params.subscription.id}, attempt ${backoff.attempt}, next retry at ${backoff.nextRetryAt.toISOString()}`, + `Calendar sync failed for subscription ${params.subscription.id}, attempt ${attempt}, next retry at ${nextRetryAt.toISOString()}`, this.toError(params.error) ); } @@ -931,15 +925,6 @@ export class CalendarService { return status === 404; } - private calculateLastSyncAtForRetry( - nextRetryAt: Date, - refreshIntervalMinutes: number - ) { - // Cron schedules by `now - lastSyncAt >= refreshInterval`, so back-calculate - // a synthetic lastSyncAt to defer the next attempt to `nextRetryAt`. - return new Date(nextRetryAt.getTime() - refreshIntervalMinutes * 60 * 1000); - } - private async disableSubscription(params: { subscriptionId: string; provider: CalendarProvider; @@ -970,68 +955,52 @@ export class CalendarService { await this.models.calendarSubscription.disableAndPurge( params.subscriptionId ); - await this.clearSyncFailureBackoff(params.subscriptionId); } - private getSyncFailureBackoffKey(subscriptionId: string) { - return `${SYNC_FAILURE_BACKOFF_KEY_PREFIX}${subscriptionId}`; - } - - private async getSyncFailureBackoff(subscriptionId: string) { - const key = this.getSyncFailureBackoffKey(subscriptionId); - const value = await this.redis.get(key); - if (!value) { - return null; - } - - try { - const parsed = JSON.parse(value) as { - attempt?: number; - nextRetryAt?: string; - }; - if (!parsed.attempt || !parsed.nextRetryAt) { - return null; + async enqueueSyncSubscription( + subscriptionId: string, + reason: 'polling' | 'webhook' | 'on-demand' + ) { + await this.queue.add( + 'calendar.syncSubscription', + { + subscriptionId, + reason, + }, + { + jobId: subscriptionId, } - const nextRetryAt = new Date(parsed.nextRetryAt); - if (Number.isNaN(nextRetryAt.getTime())) { - return null; - } - return { - attempt: parsed.attempt, - nextRetryAt, - }; - } catch { - return null; - } + ); } - private async bumpSyncFailureBackoff(subscriptionId: string) { - const state = await this.getSyncFailureBackoff(subscriptionId); - const attempt = (state?.attempt ?? 0) + 1; - const delay = Math.min( - SYNC_FAILURE_BACKOFF_BASE_MS * 2 ** (attempt - 1), - SYNC_FAILURE_BACKOFF_MAX_MS - ); - const nextRetryAt = new Date(Date.now() + delay); - const key = this.getSyncFailureBackoffKey(subscriptionId); - await this.redis.set( - key, - JSON.stringify({ - attempt, - nextRetryAt: nextRetryAt.toISOString(), - }), - 'EX', - SYNC_FAILURE_BACKOFF_TTL_SECONDS - ); - return { - attempt, - nextRetryAt, - }; + private calculateNextSyncAt(base: Date, refreshIntervalMinutes?: number) { + const intervalMinutes = + refreshIntervalMinutes ?? DEFAULT_REFRESH_INTERVAL_MINUTES; + return new Date(base.getTime() + intervalMinutes * 60 * 1000); } - private async clearSyncFailureBackoff(subscriptionId: string) { - const key = this.getSyncFailureBackoffKey(subscriptionId); - await this.redis.del(key); + private calculateChannelRetryAt(nextSyncAt: Date) { + return new Date( + Math.min(nextSyncAt.getTime(), this.nowMs() + CHANNEL_RENEW_RETRY_MS) + ); + } + + private calculateFailureRetryAt(attempt: number) { + return new Date( + this.nowMs() + + exponentialBackoffDelay(attempt - 1, { + baseDelayMs: SYNC_FAILURE_BACKOFF_BASE_MS, + maxDelayMs: SYNC_FAILURE_BACKOFF_MAX_MS, + }) + ); + } + + private now() { + return new Date(this.nowMs()); + } + + private nowMs() { + return Date.now(); } private formatSyncError(error: unknown) { diff --git a/packages/frontend/admin/src/config.json b/packages/frontend/admin/src/config.json index b3af7a15f5..9230308ef3 100644 --- a/packages/frontend/admin/src/config.json +++ b/packages/frontend/admin/src/config.json @@ -27,6 +27,10 @@ "type": "Object", "desc": "The config for copilot job queue" }, + "queues.calendar": { + "type": "Object", + "desc": "The config for calendar job queue" + }, "queues.doc": { "type": "Object", "desc": "The config for doc job queue" diff --git a/packages/frontend/i18n/src/i18n-completenesses.json b/packages/frontend/i18n/src/i18n-completenesses.json index 5fb6bcb216..afb78e7497 100644 --- a/packages/frontend/i18n/src/i18n-completenesses.json +++ b/packages/frontend/i18n/src/i18n-completenesses.json @@ -17,7 +17,7 @@ "nb-NO": 47, "pl": 98, "pt-BR": 96, - "ru": 97, + "ru": 98, "sv-SE": 96, "uk": 96, "ur": 2,