feat(server): improve calendar sync queue (#14783)

#### PR Dependency Tree


* **PR #14783** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
  * Configurable request timeout for calendar integrations.
* Calendar polling now enqueues per-subscription sync jobs (larger
batch) for improved throughput.

* **Bug Fixes / Improvements**
* Persisted next-sync timestamps and retry counts for more reliable
scheduling and retry behavior.
* Exponential backoff and webhook renewal now update scheduling
consistently.

* **Refactor**
* Calendar sync flow moved to a job-queue-driven design for better
concurrency and observability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-04-05 10:52:18 +08:00
committed by GitHub
parent bfcf7fc2ba
commit d975bf46fb
17 changed files with 352 additions and 307 deletions
+18 -2
View File
@@ -62,6 +62,18 @@
"concurrency": 10
}
},
"queues.calendar": {
"type": "object",
"description": "The config for calendar job queue\n@default {\"concurrency\":4}",
"properties": {
"concurrency": {
"type": "number"
}
},
"default": {
"concurrency": 4
}
},
"queues.doc": {
"type": "object",
"description": "The config for doc job queue\n@default {\"concurrency\":1}",
@@ -843,7 +855,7 @@
"properties": {
"google": {
"type": "object",
"description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\"}\n@link https://developers.google.com/calendar/api/guides/push",
"description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\",\"requestTimeoutMs\":10000}\n@link https://developers.google.com/calendar/api/guides/push",
"properties": {
"enabled": {
"type": "boolean"
@@ -859,6 +871,9 @@
},
"webhookVerificationToken": {
"type": "string"
},
"requestTimeoutMs": {
"type": "number"
}
},
"default": {
@@ -866,7 +881,8 @@
"clientId": "",
"clientSecret": "",
"externalWebhookUrl": "",
"webhookVerificationToken": ""
"webhookVerificationToken": "",
"requestTimeoutMs": 10000
}
},
"caldav": {
@@ -0,0 +1,27 @@
-- AlterTable
ALTER TABLE
"calendar_subscriptions"
ADD
COLUMN "next_sync_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD
COLUMN "sync_retry_count" INTEGER NOT NULL DEFAULT 0;
UPDATE
"calendar_subscriptions" AS s
SET
"next_sync_at" = CASE
WHEN s."last_sync_at" IS NULL THEN CURRENT_TIMESTAMP
ELSE s."last_sync_at" + make_interval(
mins => COALESCE(a."refresh_interval_minutes", 30)
)
END
FROM
"calendar_accounts" AS a
WHERE
a."id" = s."account_id";
-- CreateIndex
CREATE INDEX "calendar_subscriptions_custom_channel_id_idx" ON "calendar_subscriptions"("custom_channel_id");
-- CreateIndex
CREATE INDEX "calendar_subscriptions_enabled_next_sync_at_idx" ON "calendar_subscriptions"("enabled", "next_sync_at");
+4
View File
@@ -1037,6 +1037,8 @@ model CalendarSubscription {
enabled Boolean @default(true)
syncToken String? @map("sync_token") @db.Text
lastSyncAt DateTime? @map("last_sync_at") @db.Timestamptz(3)
nextSyncAt DateTime @default(now()) @map("next_sync_at") @db.Timestamptz(3)
syncRetryCount Int @default(0) @map("sync_retry_count")
customChannelId String? @map("custom_channel_id") @db.VarChar
customResourceId String? @map("custom_resource_id") @db.VarChar
channelExpiration DateTime? @map("channel_expiration") @db.Timestamptz(3)
@@ -1050,6 +1052,8 @@ model CalendarSubscription {
@@unique([accountId, externalCalendarId])
@@index([accountId])
@@index([provider, externalCalendarId])
@@index([customChannelId])
@@index([enabled, nextSyncAt])
@@map("calendar_subscriptions")
}
@@ -55,6 +55,14 @@ defineModuleConfig('job', {
schema,
},
'queues.calendar': {
desc: 'The config for calendar job queue',
default: {
concurrency: 4,
},
schema,
},
'queues.doc': {
desc: 'The config for doc job queue',
default: {
@@ -28,6 +28,7 @@ export enum Queue {
DOC = 'doc',
COPILOT = 'copilot',
INDEXER = 'indexer',
CALENDAR = 'calendar',
}
export const QUEUES = Object.values(Queue);
@@ -92,7 +92,7 @@ export class CalendarAccountModel extends BaseModel {
scope: input.scope ?? null,
status: input.status ?? 'active',
lastError: input.lastError ?? null,
refreshIntervalMinutes: input.refreshIntervalMinutes ?? 60,
refreshIntervalMinutes: input.refreshIntervalMinutes ?? 30,
};
const updateData: Prisma.CalendarAccountUncheckedUpdateInput = {
@@ -17,6 +17,8 @@ export interface UpsertCalendarSubscriptionInput {
export interface UpdateCalendarSubscriptionSyncInput {
syncToken?: string | null;
lastSyncAt?: Date | null;
nextSyncAt?: Date;
syncRetryCount?: number;
}
export interface UpdateCalendarSubscriptionChannelInput {
@@ -81,13 +83,21 @@ export class CalendarSubscriptionModel extends BaseModel {
}
async updateSync(id: string, input: UpdateCalendarSubscriptionSyncInput) {
return await this.db.calendarSubscription.update({
where: { id },
data: {
syncToken: input.syncToken ?? null,
lastSyncAt: input.lastSyncAt ?? null,
},
});
const data: Prisma.CalendarSubscriptionUncheckedUpdateInput = {};
if (input.syncToken !== undefined) {
data.syncToken = input.syncToken ?? null;
}
if (input.lastSyncAt !== undefined) {
data.lastSyncAt = input.lastSyncAt ?? null;
}
if (input.nextSyncAt !== undefined) {
data.nextSyncAt = input.nextSyncAt;
}
if (input.syncRetryCount !== undefined) {
data.syncRetryCount = input.syncRetryCount;
}
return await this.db.calendarSubscription.update({ where: { id }, data });
}
async updateChannel(
@@ -155,10 +165,16 @@ export class CalendarSubscriptionModel extends BaseModel {
});
}
async listAllWithAccountForSync() {
async listDueForSync(now: Date, limit: number) {
return await this.db.calendarSubscription.findMany({
where: { enabled: true },
include: { account: true },
where: {
enabled: true,
nextSyncAt: { lte: now },
account: { status: 'active' },
},
select: { id: true },
orderBy: { nextSyncAt: 'asc' },
take: limit,
});
}
@@ -169,13 +185,6 @@ export class CalendarSubscriptionModel extends BaseModel {
});
}
async updateLastSyncAt(id: string, lastSyncAt: Date) {
return await this.db.calendarSubscription.update({
where: { id },
data: { lastSyncAt },
});
}
async clearSyncTokensByAccount(accountId: string) {
return await this.db.calendarSubscription.updateMany({
where: { accountId },
@@ -200,6 +209,7 @@ export class CalendarSubscriptionModel extends BaseModel {
data: {
enabled: false,
syncToken: null,
syncRetryCount: 0,
customChannelId: null,
customResourceId: null,
channelExpiration: null,
@@ -5,11 +5,7 @@ import test from 'ava';
import { createModule } from '../../../__tests__/create-module';
import { Mockers } from '../../../__tests__/mocks';
import {
CalendarProviderRequestError,
CryptoHelper,
Mutex,
} from '../../../base';
import { CalendarProviderRequestError, CryptoHelper } from '../../../base';
import { ConfigModule } from '../../../base/config';
import { ServerConfigModule } from '../../../core/config';
import type {
@@ -93,7 +89,6 @@ const calendarService = module.get(CalendarService);
const calendarCronJobs = module.get(CalendarCronJobs);
const providerFactory = module.get(CalendarProviderFactory);
const models = module.get(Models);
const mutex = module.get(Mutex);
module.get(CryptoHelper).onConfigInit();
const createAccount = async (
@@ -120,6 +115,8 @@ const createSubscription = async (
accountId: string,
overrides: Partial<UpsertCalendarSubscriptionInput> & {
syncToken?: string | null;
nextSyncAt?: Date;
syncRetryCount?: number;
customChannelId?: string | null;
customResourceId?: string | null;
channelExpiration?: Date | null;
@@ -141,6 +138,20 @@ const createSubscription = async (
});
}
if (
overrides.nextSyncAt !== undefined ||
overrides.syncRetryCount !== undefined
) {
await models.calendarSubscription.updateSync(subscription.id, {
...(overrides.nextSyncAt !== undefined
? { nextSyncAt: overrides.nextSyncAt }
: {}),
...(overrides.syncRetryCount !== undefined
? { syncRetryCount: overrides.syncRetryCount }
: {}),
});
}
if (
overrides.customChannelId !== undefined ||
overrides.customResourceId !== undefined ||
@@ -158,6 +169,8 @@ const createSubscription = async (
test.afterEach.always(() => {
mock.reset();
module.queue.add.resetHistory();
module.queue.remove.resetHistory();
});
test.after.always(async () => {
@@ -259,6 +272,9 @@ test('syncSubscription resets invalid sync token and maps events', async t => {
const updated = await models.calendarSubscription.get(subscription.id);
t.is(updated?.syncToken, 'next-token');
t.truthy(updated?.lastSyncAt);
t.is(updated?.syncRetryCount, 0);
t.truthy(updated?.nextSyncAt);
t.true(updated!.nextSyncAt.getTime() > updated!.lastSyncAt!.getTime());
const events = await models.calendarEvent.listBySubscriptionsInRange(
[subscription.id],
@@ -500,51 +516,22 @@ test('syncSubscription applies exponential backoff for repeated failures', async
mock.method(Date, 'now', () => now);
await calendarService.syncSubscription(subscription.id);
await calendarService.syncSubscription(subscription.id);
let updated = await models.calendarSubscription.get(subscription.id);
t.is(listEventsMock.mock.callCount(), 1);
t.is(updated?.syncRetryCount, 1);
t.is(
updated?.nextSyncAt.toISOString(),
new Date(now + baseDelayMs).toISOString()
);
now += baseDelayMs + 1000;
await calendarService.syncSubscription(subscription.id);
updated = await models.calendarSubscription.get(subscription.id);
t.is(listEventsMock.mock.callCount(), 2);
now += baseDelayMs + 1000;
await calendarService.syncSubscription(subscription.id);
t.is(listEventsMock.mock.callCount(), 2);
});
test('syncSubscription skips token refresh while in backoff window', async t => {
let now = new Date('2026-01-01T00:00:00.000Z').getTime();
mock.method(Date, 'now', () => now);
const user = await module.create(Mockers.User);
const account = await createAccount(user.id, {
accessToken: 'expired-access-token',
expiresAt: new Date(now - 5 * 60 * 1000),
});
const subscription = await createSubscription(account.id, {
syncToken: 'sync-token',
});
const provider = new MockCalendarProvider();
const refreshMock = mock.method(provider, 'refreshTokens', async () => ({
accessToken: `refreshed-${randomUUID()}`,
}));
const listEventsMock = mock.method(provider, 'listEvents', async () => {
throw new Error('upstream timeout');
});
mock.method(providerFactory, 'get', () => provider);
const baseDelayMs = 5 * 60 * 1000;
await calendarService.syncSubscription(subscription.id);
await calendarService.syncSubscription(subscription.id);
t.is(refreshMock.mock.callCount(), 1);
t.is(listEventsMock.mock.callCount(), 1);
now += baseDelayMs + 1000;
await calendarService.syncSubscription(subscription.id);
t.is(refreshMock.mock.callCount(), 2);
t.is(listEventsMock.mock.callCount(), 2);
t.is(updated?.syncRetryCount, 2);
t.is(
updated?.nextSyncAt.toISOString(),
new Date(now + baseDelayMs * 2).toISOString()
);
});
test('syncSubscription renews webhook channel when expiring', async t => {
@@ -607,64 +594,72 @@ test('syncSubscription renews webhook channel when expiring', async t => {
t.truthy(updated?.channelExpiration);
});
test('pollAccounts skips syncing when cluster lock is unavailable', async t => {
mock.method(mutex, 'acquire', async () => undefined);
mock.method(
models.calendarSubscription,
'listAllWithAccountForSync',
async () => []
);
const syncAccountMock = mock.method(
calendarService,
'syncAccount',
async () => {
return;
}
);
test('syncSubscription keeps schedule moving when webhook renewal fails', async t => {
const now = new Date('2026-01-01T00:00:00.000Z').getTime();
mock.method(Date, 'now', () => now);
await calendarCronJobs.pollAccounts();
const user = await module.create(Mockers.User);
const account = await createAccount(user.id, {
refreshIntervalMinutes: 60,
});
const subscription = await createSubscription(account.id, {
syncToken: 'sync-token',
channelExpiration: new Date(Date.now() + 60 * 60 * 1000),
});
t.is(syncAccountMock.mock.callCount(), 0);
});
test('pollAccounts only syncs due accounts', async t => {
mock.method(mutex, 'acquire', async () => ({
[Symbol.asyncDispose]: async () => {},
const provider = new MockCalendarProvider();
mock.method(provider, 'listEvents', async () => ({
events: [],
nextSyncToken: 'next-sync',
}));
mock.method(
models.calendarSubscription,
'listAllWithAccountForSync',
async () =>
[
{
accountId: 'due-account',
lastSyncAt: new Date(Date.now() - 31 * 60 * 1000),
account: {
refreshIntervalMinutes: 30,
},
},
{
accountId: 'fresh-account',
lastSyncAt: new Date(Date.now() - 5 * 60 * 1000),
account: {
refreshIntervalMinutes: 30,
},
},
] as any
);
mock.method(provider, 'watchCalendar', async () => {
throw new Error('watch failed');
});
mock.method(providerFactory, 'get', () => provider);
const syncAccountMock = mock.method(
calendarService,
'syncAccount',
async () => {
return;
}
await calendarService.syncSubscription(subscription.id);
const updated = await models.calendarSubscription.get(subscription.id);
t.truthy(updated?.lastSyncAt);
t.is(updated?.syncRetryCount, 0);
t.is(
updated?.nextSyncAt.toISOString(),
new Date(now + 15 * 60 * 1000).toISOString()
);
});
test('pollAccounts skips when nothing is due', async t => {
mock.method(models.calendarSubscription, 'listDueForSync', async () => []);
await calendarCronJobs.pollAccounts();
t.is(module.queue.count('calendar.syncSubscription'), 0);
});
test('pollAccounts enqueues due subscriptions only', async t => {
mock.method(models.calendarSubscription, 'listDueForSync', async () => [
{ id: 'due-subscription-a' },
{ id: 'due-subscription-b' },
]);
await calendarCronJobs.pollAccounts();
t.is(module.queue.count('calendar.syncSubscription'), 2);
t.deepEqual(
syncAccountMock.mock.calls.map(call => call.arguments[0]),
['due-account']
module.queue.add
.getCalls()
.map(call => [call.args[0], call.args[1], call.args[2]]),
[
[
'calendar.syncSubscription',
{ subscriptionId: 'due-subscription-a', reason: 'polling' },
{ jobId: 'due-subscription-a' },
],
[
'calendar.syncSubscription',
{ subscriptionId: 'due-subscription-b', reason: 'polling' },
{ jobId: 'due-subscription-b' },
],
]
);
});
@@ -8,6 +8,7 @@ export interface CalendarGoogleConfig {
clientSecret: string;
externalWebhookUrl?: string;
webhookVerificationToken?: string;
requestTimeoutMs?: number;
}
export type CalendarCalDAVAuthType = 'auto' | 'basic' | 'digest';
@@ -49,6 +50,7 @@ const schema: JSONSchema = {
clientSecret: { type: 'string' },
externalWebhookUrl: { type: 'string' },
webhookVerificationToken: { type: 'string' },
requestTimeoutMs: { type: 'number' },
},
};
@@ -88,6 +90,7 @@ defineModuleConfig('calendar', {
clientSecret: '',
externalWebhookUrl: '',
webhookVerificationToken: '',
requestTimeoutMs: 10_000,
},
schema,
shape: z.object({
@@ -101,6 +104,7 @@ defineModuleConfig('calendar', {
.or(z.string().length(0))
.optional(),
webhookVerificationToken: z.string().optional(),
requestTimeoutMs: z.number().int().positive().optional(),
}),
link: 'https://developers.google.com/calendar/api/guides/push',
},
@@ -1,72 +1,33 @@
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { chunk } from 'lodash-es';
import { Mutex } from '../../base';
import { JobQueue } from '../../base';
import { Models } from '../../models';
import { CalendarService } from './service';
const CALENDAR_POLL_LOCK_KEY = 'calendar:poll-accounts';
const CALENDAR_POLL_BATCH_SIZE = 10;
const CALENDAR_POLL_BATCH_SIZE = 200;
@Injectable()
export class CalendarCronJobs {
constructor(
private readonly models: Models,
private readonly calendar: CalendarService,
private readonly mutex: Mutex
private readonly queue: JobQueue
) {}
@Cron(CronExpression.EVERY_MINUTE)
async pollAccounts() {
await using lock = await this.mutex.acquire(CALENDAR_POLL_LOCK_KEY);
if (!lock) return;
const subscriptions = await this.models.calendarSubscription.listDueForSync(
new Date(),
CALENDAR_POLL_BATCH_SIZE
);
const subscriptions =
await this.models.calendarSubscription.listAllWithAccountForSync();
const accountDueAt = new Map<
string,
{ refreshInterval: number; lastSyncAt: Date | null }
>();
for (const subscription of subscriptions) {
const interval = subscription.account.refreshIntervalMinutes ?? 60;
const lastSyncAt = subscription.lastSyncAt ?? null;
const existing = accountDueAt.get(subscription.accountId);
if (!existing) {
accountDueAt.set(subscription.accountId, {
refreshInterval: interval,
lastSyncAt,
});
continue;
}
const earliest =
existing.lastSyncAt && lastSyncAt
? existing.lastSyncAt < lastSyncAt
? existing.lastSyncAt
: lastSyncAt
: (existing.lastSyncAt ?? lastSyncAt);
accountDueAt.set(subscription.accountId, {
refreshInterval: interval,
lastSyncAt: earliest,
});
}
const now = Date.now();
const dueAccountIds = Array.from(accountDueAt.entries())
.filter(
([, info]) =>
!info.lastSyncAt ||
now - info.lastSyncAt.getTime() >= info.refreshInterval * 60 * 1000
await Promise.allSettled(
subscriptions.map(({ id }) =>
this.queue.add(
'calendar.syncSubscription',
{ subscriptionId: id, reason: 'polling' },
{ jobId: id }
)
)
.map(([accountId]) => accountId);
for (const accountIds of chunk(dueAccountIds, CALENDAR_POLL_BATCH_SIZE)) {
await Promise.allSettled(
accountIds.map(accountId => this.calendar.syncAccount(accountId))
);
}
);
}
}
@@ -7,6 +7,7 @@ import { PermissionModule } from '../../core/permission';
import { WorkspaceModule } from '../../core/workspaces';
import { CalendarController } from './controller';
import { CalendarCronJobs } from './cron';
import { CalendarJob } from './job';
import { CalendarOAuthService } from './oauth';
import { CalendarProviderFactory, CalendarProviders } from './providers';
import {
@@ -25,6 +26,7 @@ import { CalendarService } from './service';
...CalendarProviders,
CalendarProviderFactory,
CalendarService,
CalendarJob,
CalendarOAuthService,
CalendarCronJobs,
CalendarServerConfigResolver,
@@ -0,0 +1,26 @@
import { Injectable } from '@nestjs/common';
import { OnJob } from '../../base';
import { CalendarService } from './service';
declare global {
interface Jobs {
'calendar.syncSubscription': {
subscriptionId: string;
reason?: 'polling' | 'webhook' | 'on-demand';
};
}
}
@Injectable()
export class CalendarJob {
constructor(private readonly calendar: CalendarService) {}
@OnJob('calendar.syncSubscription')
async syncSubscription({
subscriptionId,
reason,
}: Jobs['calendar.syncSubscription']) {
await this.calendar.syncSubscription(subscriptionId, { reason });
}
}
@@ -152,9 +152,26 @@ export abstract class CalendarProvider {
}
}
protected get requestTimeoutMs() {
const timeout = (this.config as { requestTimeoutMs?: number } | undefined)
?.requestTimeoutMs;
return typeof timeout === 'number' && timeout > 0 ? timeout : undefined;
}
protected withTimeout(signal?: AbortSignal | null) {
const timeoutMs = this.requestTimeoutMs;
if (!timeoutMs) return signal;
const timeoutSignal = AbortSignal.timeout(timeoutMs);
if (!signal) return timeoutSignal;
return AbortSignal.any([signal, timeoutSignal]);
}
protected async fetchJson<T>(url: string, init?: RequestInit) {
const response = await fetch(url, {
...init,
signal: this.withTimeout(init?.signal),
headers: { ...init?.headers, Accept: 'application/json' },
});
const body = await response.text();
@@ -329,6 +329,7 @@ export class GoogleCalendarProvider extends CalendarProvider {
private async fetchWithTokenHandling<T>(url: string, accessToken: string) {
const response = await fetch(url, {
signal: this.withTimeout(),
headers: {
Accept: 'application/json',
Authorization: `Bearer ${accessToken}`,
@@ -4,16 +4,15 @@ import { Injectable, Logger } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import type { CalendarAccount, Prisma } from '@prisma/client';
import { addDays, subDays } from 'date-fns';
import { chunk } from 'lodash-es';
import {
CalendarProviderRequestError,
Config,
exponentialBackoffDelay,
GraphqlBadRequest,
Mutex,
JobQueue,
URLHelper,
} from '../../base';
import { SessionRedis } from '../../base/redis';
import { Models } from '../../models';
import type { CalendarCalDAVProviderPreset } from './config';
import {
@@ -29,11 +28,10 @@ import type { LinkCalDAVAccountInput } from './types';
const TOKEN_REFRESH_SKEW_MS = 60 * 1000;
const DEFAULT_PAST_DAYS = 90;
const DEFAULT_FUTURE_DAYS = 180;
const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:';
const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000;
const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000;
const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60;
const ACCOUNT_SYNC_BATCH_SIZE = 10;
const DEFAULT_REFRESH_INTERVAL_MINUTES = 30;
const CHANNEL_RENEW_RETRY_MS = 15 * 60 * 1000;
@Injectable()
export class CalendarService {
@@ -43,8 +41,7 @@ export class CalendarService {
constructor(
private readonly models: Models,
private readonly providerFactory: CalendarProviderFactory<CalendarProvider>,
private readonly mutex: Mutex,
private readonly redis: SessionRedis,
private readonly queue: JobQueue,
private readonly config: Config,
private readonly url: URLHelper
) {}
@@ -87,10 +84,24 @@ export class CalendarService {
return null;
}
return await this.models.calendarAccount.updateRefreshInterval(
accountId,
refreshIntervalMinutes
const updatedAccount =
await this.models.calendarAccount.updateRefreshInterval(
accountId,
refreshIntervalMinutes
);
const subscriptions =
await this.models.calendarSubscription.listByAccountForSync(accountId);
await Promise.all(
subscriptions.map(subscription =>
this.models.calendarSubscription.updateSync(subscription.id, {
nextSyncAt: this.calculateNextSyncAt(
subscription.lastSyncAt ?? this.now(),
refreshIntervalMinutes
),
})
)
);
return updatedAccount;
}
async unlinkAccount(userId: string, accountId: string) {
@@ -315,25 +326,6 @@ export class CalendarService {
return;
}
const now = Date.now();
const backoff = await this.getSyncFailureBackoff(subscription.id);
if (backoff && now < backoff.nextRetryAt.getTime()) {
return;
}
await using lock = await this.mutex.acquire(
`calendar:subscription:${subscriptionId}`
);
if (!lock) {
return;
}
const lockedNow = Date.now();
const lockedBackoff = await this.getSyncFailureBackoff(subscription.id);
if (lockedBackoff && lockedNow < lockedBackoff.nextRetryAt.getTime()) {
return;
}
const provider = this.providerFactory.get(
account.provider as CalendarProviderName
);
@@ -417,30 +409,27 @@ export class CalendarService {
}
if (synced) {
await this.clearSyncFailureBackoff(subscription.id);
await this.ensureWebhookChannel(subscription, provider, accessToken);
}
await this.models.calendarSubscription.updateLastSyncAt(
subscription.id,
new Date()
);
}
async syncAccount(accountId: string) {
const account = await this.models.calendarAccount.get(accountId);
if (!account || account.status !== 'active') {
return;
}
const subscriptions =
await this.models.calendarSubscription.listByAccountForSync(accountId);
for (const batch of chunk(subscriptions, ACCOUNT_SYNC_BATCH_SIZE)) {
await Promise.allSettled(
batch.map(subscription =>
this.syncSubscription(subscription.id, { reason: 'polling' })
)
const syncedAt = this.now();
let nextSyncAt = this.calculateNextSyncAt(
syncedAt,
account.refreshIntervalMinutes
);
try {
await this.ensureWebhookChannel(subscription, provider, accessToken);
} catch (error) {
nextSyncAt = this.calculateChannelRetryAt(nextSyncAt);
this.logger.warn(
`Failed to ensure webhook channel for subscription ${subscription.id}`,
this.toError(error)
);
}
await this.models.calendarSubscription.updateSync(subscription.id, {
lastSyncAt: syncedAt,
nextSyncAt,
syncRetryCount: 0,
});
}
}
@@ -459,9 +448,18 @@ export class CalendarService {
params.to
);
const subscriptions =
await this.models.calendarSubscription.listWithAccounts(subscriptionIds);
const staleSubscriptions = subscriptions.filter(
subscription =>
subscription.enabled &&
subscription.account.status === 'active' &&
subscription.nextSyncAt.getTime() <= this.nowMs()
);
Promise.allSettled(
subscriptionIds.map(subscriptionId =>
this.syncSubscription(subscriptionId, { reason: 'on-demand' })
staleSubscriptions.map(subscription =>
this.enqueueSyncSubscription(subscription.id, 'on-demand')
)
).catch(error => {
this.logger.warn('Calendar on-demand sync failed', error as Error);
@@ -517,7 +515,7 @@ export class CalendarService {
return;
}
await this.syncSubscription(subscription.id, { reason: 'webhook' });
await this.enqueueSyncSubscription(subscription.id, 'webhook');
}
getWebhookToken() {
@@ -751,7 +749,7 @@ export class CalendarService {
}
private getSyncWindow() {
const now = new Date();
const now = this.now();
return {
timeMin: subDays(now, DEFAULT_PAST_DAYS).toISOString(),
timeMax: addDays(now, DEFAULT_FUTURE_DAYS).toISOString(),
@@ -771,7 +769,7 @@ export class CalendarService {
if (
accessToken &&
account.expiresAt &&
account.expiresAt.getTime() > Date.now() + TOKEN_REFRESH_SKEW_MS
account.expiresAt.getTime() > this.nowMs() + TOKEN_REFRESH_SKEW_MS
) {
return { accessToken };
}
@@ -835,7 +833,7 @@ export class CalendarService {
return;
}
const renewThreshold = Date.now() + 24 * 60 * 60 * 1000;
const renewThreshold = this.nowMs() + 24 * 60 * 60 * 1000;
if (
subscription.channelExpiration &&
subscription.channelExpiration.getTime() > renewThreshold
@@ -877,6 +875,7 @@ export class CalendarService {
subscription: {
id: string;
externalCalendarId: string;
syncRetryCount: number;
customChannelId: string | null;
customResourceId: string | null;
};
@@ -899,7 +898,6 @@ export class CalendarService {
}
if (this.isTokenInvalidError(params.error)) {
await this.clearSyncFailureBackoff(params.subscription.id);
await this.models.calendarAccount.invalidateAndPurge(
params.account.id,
this.formatSyncError(params.error)
@@ -907,18 +905,14 @@ export class CalendarService {
return;
}
const backoff = await this.bumpSyncFailureBackoff(params.subscription.id);
const interval = params.account.refreshIntervalMinutes ?? 60;
const lastSyncAt = this.calculateLastSyncAtForRetry(
backoff.nextRetryAt,
interval
);
await this.models.calendarSubscription.updateLastSyncAt(
params.subscription.id,
lastSyncAt
);
const attempt = params.subscription.syncRetryCount + 1;
const nextRetryAt = this.calculateFailureRetryAt(attempt);
await this.models.calendarSubscription.updateSync(params.subscription.id, {
nextSyncAt: nextRetryAt,
syncRetryCount: attempt,
});
this.logger.warn(
`Calendar sync failed for subscription ${params.subscription.id}, attempt ${backoff.attempt}, next retry at ${backoff.nextRetryAt.toISOString()}`,
`Calendar sync failed for subscription ${params.subscription.id}, attempt ${attempt}, next retry at ${nextRetryAt.toISOString()}`,
this.toError(params.error)
);
}
@@ -931,15 +925,6 @@ export class CalendarService {
return status === 404;
}
private calculateLastSyncAtForRetry(
nextRetryAt: Date,
refreshIntervalMinutes: number
) {
// Cron schedules by `now - lastSyncAt >= refreshInterval`, so back-calculate
// a synthetic lastSyncAt to defer the next attempt to `nextRetryAt`.
return new Date(nextRetryAt.getTime() - refreshIntervalMinutes * 60 * 1000);
}
private async disableSubscription(params: {
subscriptionId: string;
provider: CalendarProvider;
@@ -970,68 +955,52 @@ export class CalendarService {
await this.models.calendarSubscription.disableAndPurge(
params.subscriptionId
);
await this.clearSyncFailureBackoff(params.subscriptionId);
}
private getSyncFailureBackoffKey(subscriptionId: string) {
return `${SYNC_FAILURE_BACKOFF_KEY_PREFIX}${subscriptionId}`;
}
private async getSyncFailureBackoff(subscriptionId: string) {
const key = this.getSyncFailureBackoffKey(subscriptionId);
const value = await this.redis.get(key);
if (!value) {
return null;
}
try {
const parsed = JSON.parse(value) as {
attempt?: number;
nextRetryAt?: string;
};
if (!parsed.attempt || !parsed.nextRetryAt) {
return null;
async enqueueSyncSubscription(
subscriptionId: string,
reason: 'polling' | 'webhook' | 'on-demand'
) {
await this.queue.add(
'calendar.syncSubscription',
{
subscriptionId,
reason,
},
{
jobId: subscriptionId,
}
const nextRetryAt = new Date(parsed.nextRetryAt);
if (Number.isNaN(nextRetryAt.getTime())) {
return null;
}
return {
attempt: parsed.attempt,
nextRetryAt,
};
} catch {
return null;
}
);
}
private async bumpSyncFailureBackoff(subscriptionId: string) {
const state = await this.getSyncFailureBackoff(subscriptionId);
const attempt = (state?.attempt ?? 0) + 1;
const delay = Math.min(
SYNC_FAILURE_BACKOFF_BASE_MS * 2 ** (attempt - 1),
SYNC_FAILURE_BACKOFF_MAX_MS
);
const nextRetryAt = new Date(Date.now() + delay);
const key = this.getSyncFailureBackoffKey(subscriptionId);
await this.redis.set(
key,
JSON.stringify({
attempt,
nextRetryAt: nextRetryAt.toISOString(),
}),
'EX',
SYNC_FAILURE_BACKOFF_TTL_SECONDS
);
return {
attempt,
nextRetryAt,
};
private calculateNextSyncAt(base: Date, refreshIntervalMinutes?: number) {
const intervalMinutes =
refreshIntervalMinutes ?? DEFAULT_REFRESH_INTERVAL_MINUTES;
return new Date(base.getTime() + intervalMinutes * 60 * 1000);
}
private async clearSyncFailureBackoff(subscriptionId: string) {
const key = this.getSyncFailureBackoffKey(subscriptionId);
await this.redis.del(key);
private calculateChannelRetryAt(nextSyncAt: Date) {
return new Date(
Math.min(nextSyncAt.getTime(), this.nowMs() + CHANNEL_RENEW_RETRY_MS)
);
}
private calculateFailureRetryAt(attempt: number) {
return new Date(
this.nowMs() +
exponentialBackoffDelay(attempt - 1, {
baseDelayMs: SYNC_FAILURE_BACKOFF_BASE_MS,
maxDelayMs: SYNC_FAILURE_BACKOFF_MAX_MS,
})
);
}
private now() {
return new Date(this.nowMs());
}
private nowMs() {
return Date.now();
}
private formatSyncError(error: unknown) {
+4
View File
@@ -27,6 +27,10 @@
"type": "Object",
"desc": "The config for copilot job queue"
},
"queues.calendar": {
"type": "Object",
"desc": "The config for calendar job queue"
},
"queues.doc": {
"type": "Object",
"desc": "The config for doc job queue"
@@ -17,7 +17,7 @@
"nb-NO": 47,
"pl": 98,
"pt-BR": 96,
"ru": 97,
"ru": 98,
"sv-SE": 96,
"uk": 96,
"ur": 2,