diff --git a/packages/backend/server/src/models/calendar-account.ts b/packages/backend/server/src/models/calendar-account.ts index b526545bff..d92e2ee4ca 100644 --- a/packages/backend/server/src/models/calendar-account.ts +++ b/packages/backend/server/src/models/calendar-account.ts @@ -1,4 +1,5 @@ import { Injectable } from '@nestjs/common'; +import { Transactional } from '@nestjs-cls/transactional'; import type { CalendarAccount, Prisma } from '@prisma/client'; import { CryptoHelper } from '../base'; @@ -174,6 +175,18 @@ export class CalendarAccountModel extends BaseModel { }); } + @Transactional() + async invalidateAndPurge(id: string, lastError?: string | null) { + await this.updateStatus(id, 'invalid', lastError ?? null); + const subscriptions = + await this.models.calendarSubscription.listByAccount(id); + const subscriptionIds = subscriptions.map(subscription => subscription.id); + if (subscriptionIds.length > 0) { + await this.models.calendarEvent.deleteBySubscriptionIds(subscriptionIds); + } + await this.models.calendarSubscription.clearSyncTokensByAccount(id); + } + async delete(id: string) { return await this.db.calendarAccount.delete({ where: { id }, diff --git a/packages/backend/server/src/models/calendar-subscription.ts b/packages/backend/server/src/models/calendar-subscription.ts index 982fa02358..5a1f413678 100644 --- a/packages/backend/server/src/models/calendar-subscription.ts +++ b/packages/backend/server/src/models/calendar-subscription.ts @@ -1,4 +1,5 @@ import { Injectable } from '@nestjs/common'; +import { Transactional } from '@nestjs-cls/transactional'; import type { CalendarSubscription, Prisma } from '@prisma/client'; import { BaseModel } from './base'; @@ -191,4 +192,20 @@ export class CalendarSubscriptionModel extends BaseModel { data, }); } + + @Transactional() + async disableAndPurge(subscriptionId: string) { + await this.db.calendarSubscription.update({ + where: { id: subscriptionId }, + data: { + enabled: false, + syncToken: null, + customChannelId: null, + customResourceId: null, + channelExpiration: null, + }, + }); + + await this.models.calendarEvent.deleteBySubscriptionIds([subscriptionId]); + } } diff --git a/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts b/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts index 03ef923319..f0a09993c2 100644 --- a/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts +++ b/packages/backend/server/src/plugins/calendar/__tests__/service.spec.ts @@ -5,7 +5,7 @@ import test from 'ava'; import { createModule } from '../../../__tests__/create-module'; import { Mockers } from '../../../__tests__/mocks'; -import { CryptoHelper } from '../../../base'; +import { CalendarProviderRequestError, CryptoHelper } from '../../../base'; import { ConfigModule } from '../../../base/config'; import { ServerConfigModule } from '../../../core/config'; import type { @@ -319,6 +319,227 @@ test('syncSubscription invalidates account on invalid grant', async t => { t.is(events.length, 0); }); +test('syncSubscription invalidates account when refresh token is invalid', async t => { + const user = await module.create(Mockers.User); + const account = await createAccount(user.id, { + accessToken: 'expired-access-token', + expiresAt: new Date(Date.now() - 5 * 60 * 1000), + }); + const subscription = await createSubscription(account.id, { + syncToken: 'sync-token', + }); + + await models.calendarEvent.upsert({ + subscriptionId: subscription.id, + externalEventId: randomUUID(), + recurrenceId: null, + etag: null, + status: 'confirmed', + title: 'existing', + description: null, + location: null, + startAtUtc: new Date('2024-01-02T00:00:00.000Z'), + endAtUtc: new Date('2024-01-02T01:00:00.000Z'), + originalTimezone: 'UTC', + allDay: false, + providerUpdatedAt: null, + raw: {}, + }); + + const provider = new MockCalendarProvider(); + const refreshMock = mock.method(provider, 'refreshTokens', async () => { + throw new Error('invalid_grant'); + }); + const listEventsMock = mock.method(provider, 'listEvents', async () => ({ + events: [], + })); + mock.method(providerFactory, 'get', () => provider); + + await calendarService.syncSubscription(subscription.id); + + t.is(refreshMock.mock.callCount(), 1); + t.is(listEventsMock.mock.callCount(), 0); + + const updatedAccount = await models.calendarAccount.get(account.id); + t.is(updatedAccount?.status, 'invalid'); + t.truthy(updatedAccount?.lastError); + + const updatedSubscription = await models.calendarSubscription.get( + subscription.id + ); + t.is(updatedSubscription?.syncToken, null); + + const events = await models.calendarEvent.listBySubscriptionsInRange( + [subscription.id], + new Date('2024-01-01T00:00:00.000Z'), + new Date('2024-01-03T00:00:00.000Z') + ); + t.is(events.length, 0); +}); + +test('syncSubscription disables subscription on provider 404', async t => { + const user = await module.create(Mockers.User); + const account = await createAccount(user.id); + const subscription = await createSubscription(account.id, { + syncToken: 'sync-token', + }); + + await models.calendarEvent.upsert({ + subscriptionId: subscription.id, + externalEventId: randomUUID(), + recurrenceId: null, + etag: null, + status: 'confirmed', + title: 'to remove', + description: null, + location: null, + startAtUtc: new Date('2026-01-02T00:00:00.000Z'), + endAtUtc: new Date('2026-01-02T01:00:00.000Z'), + originalTimezone: 'UTC', + allDay: false, + providerUpdatedAt: null, + raw: {}, + }); + + const provider = new MockCalendarProvider(); + const listEventsMock = mock.method(provider, 'listEvents', async () => { + throw new CalendarProviderRequestError({ + status: 404, + message: JSON.stringify({ + error: { + code: 404, + message: 'Not Found', + errors: [{ reason: 'notFound' }], + }, + }), + }); + }); + mock.method(providerFactory, 'get', () => provider); + + await calendarService.syncSubscription(subscription.id); + await calendarService.syncSubscription(subscription.id); + + t.is(listEventsMock.mock.callCount(), 1); + + const updatedSubscription = await models.calendarSubscription.get( + subscription.id + ); + t.truthy(updatedSubscription); + t.is(updatedSubscription?.enabled, false); + t.is(updatedSubscription?.syncToken, null); + + const events = await models.calendarEvent.listBySubscriptionsInRange( + [subscription.id], + new Date('2024-01-01T00:00:00.000Z'), + new Date('2026-12-31T00:00:00.000Z') + ); + t.is(events.length, 0); +}); + +test('syncSubscription rolls back disable when event cleanup fails', async t => { + const user = await module.create(Mockers.User); + const account = await createAccount(user.id); + const subscription = await createSubscription(account.id, { + syncToken: 'sync-token', + }); + + const provider = new MockCalendarProvider(); + mock.method(provider, 'listEvents', async () => { + throw new CalendarProviderRequestError({ + status: 404, + message: JSON.stringify({ + error: { + code: 404, + message: 'Not Found', + errors: [{ reason: 'notFound' }], + }, + }), + }); + }); + mock.method(providerFactory, 'get', () => provider); + mock.method(models.calendarEvent, 'deleteBySubscriptionIds', async () => { + throw new Error('delete events failed'); + }); + + await t.throwsAsync(calendarService.syncSubscription(subscription.id), { + message: 'delete events failed', + }); + + const updatedSubscription = await models.calendarSubscription.get( + subscription.id + ); + t.truthy(updatedSubscription); + t.is(updatedSubscription?.enabled, true); + t.is(updatedSubscription?.syncToken, 'sync-token'); +}); + +test('syncSubscription applies exponential backoff for repeated failures', async t => { + const user = await module.create(Mockers.User); + const account = await createAccount(user.id, { + refreshIntervalMinutes: 1, + }); + const subscription = await createSubscription(account.id, { + syncToken: 'sync-token', + }); + + const provider = new MockCalendarProvider(); + const listEventsMock = mock.method(provider, 'listEvents', async () => { + throw new Error('upstream timeout'); + }); + mock.method(providerFactory, 'get', () => provider); + + const baseDelayMs = 5 * 60 * 1000; + let now = new Date('2026-01-01T00:00:00.000Z').getTime(); + mock.method(Date, 'now', () => now); + + await calendarService.syncSubscription(subscription.id); + await calendarService.syncSubscription(subscription.id); + t.is(listEventsMock.mock.callCount(), 1); + + now += baseDelayMs + 1000; + await calendarService.syncSubscription(subscription.id); + t.is(listEventsMock.mock.callCount(), 2); + + now += baseDelayMs + 1000; + await calendarService.syncSubscription(subscription.id); + t.is(listEventsMock.mock.callCount(), 2); +}); + +test('syncSubscription skips token refresh while in backoff window', async t => { + let now = new Date('2026-01-01T00:00:00.000Z').getTime(); + mock.method(Date, 'now', () => now); + + const user = await module.create(Mockers.User); + const account = await createAccount(user.id, { + accessToken: 'expired-access-token', + expiresAt: new Date(now - 5 * 60 * 1000), + }); + const subscription = await createSubscription(account.id, { + syncToken: 'sync-token', + }); + + const provider = new MockCalendarProvider(); + const refreshMock = mock.method(provider, 'refreshTokens', async () => ({ + accessToken: `refreshed-${randomUUID()}`, + })); + const listEventsMock = mock.method(provider, 'listEvents', async () => { + throw new Error('upstream timeout'); + }); + mock.method(providerFactory, 'get', () => provider); + + const baseDelayMs = 5 * 60 * 1000; + + await calendarService.syncSubscription(subscription.id); + await calendarService.syncSubscription(subscription.id); + t.is(refreshMock.mock.callCount(), 1); + t.is(listEventsMock.mock.callCount(), 1); + + now += baseDelayMs + 1000; + await calendarService.syncSubscription(subscription.id); + t.is(refreshMock.mock.callCount(), 2); + t.is(listEventsMock.mock.callCount(), 2); +}); + test('syncSubscription renews webhook channel when expiring', async t => { const user = await module.create(Mockers.User); const account = await createAccount(user.id); diff --git a/packages/backend/server/src/plugins/calendar/service.ts b/packages/backend/server/src/plugins/calendar/service.ts index f73746114d..dce5877758 100644 --- a/packages/backend/server/src/plugins/calendar/service.ts +++ b/packages/backend/server/src/plugins/calendar/service.ts @@ -12,6 +12,7 @@ import { Mutex, URLHelper, } from '../../base'; +import { SessionRedis } from '../../base/redis'; import { Models } from '../../models'; import type { CalendarCalDAVProviderPreset } from './config'; import { @@ -27,6 +28,10 @@ import type { LinkCalDAVAccountInput } from './types'; const TOKEN_REFRESH_SKEW_MS = 60 * 1000; const DEFAULT_PAST_DAYS = 90; const DEFAULT_FUTURE_DAYS = 180; +const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:'; +const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000; +const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000; +const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60; @Injectable() export class CalendarService { @@ -37,6 +42,7 @@ export class CalendarService { private readonly models: Models, private readonly providerFactory: CalendarProviderFactory, private readonly mutex: Mutex, + private readonly redis: SessionRedis, private readonly config: Config, private readonly url: URLHelper ) {} @@ -307,6 +313,12 @@ export class CalendarService { return; } + const now = Date.now(); + const backoff = await this.getSyncFailureBackoff(subscription.id); + if (backoff && now < backoff.nextRetryAt.getTime()) { + return; + } + await using lock = await this.mutex.acquire( `calendar:subscription:${subscriptionId}` ); @@ -314,6 +326,12 @@ export class CalendarService { return; } + const lockedNow = Date.now(); + const lockedBackoff = await this.getSyncFailureBackoff(subscription.id); + if (lockedBackoff && lockedNow < lockedBackoff.nextRetryAt.getTime()) { + return; + } + const provider = this.providerFactory.get( account.provider as CalendarProviderName ); @@ -321,8 +339,18 @@ export class CalendarService { return; } - const { accessToken } = await this.ensureAccessToken(account); - if (!accessToken) { + let accessToken: string | null = null; + try { + const tokens = await this.ensureAccessToken(account); + if (!tokens.accessToken) return; + accessToken = tokens.accessToken; + } catch (error) { + await this.handleSubscriptionSyncFailure({ + error, + subscription, + account, + provider, + }); return; } @@ -352,31 +380,42 @@ export class CalendarService { await this.models.calendarSubscription.updateSync(subscription.id, { syncToken: null, }); - await this.syncWithProvider({ - provider, - subscriptionId: subscription.id, - calendarId: subscription.externalCalendarId, - accessToken, - account, - timeMin, - timeMax, - subscriptionTimezone: subscription.timezone ?? undefined, - }); - synced = true; - } else { - if (this.isTokenInvalidError(error)) { - await this.invalidateAccount(account.id, (error as Error).message); - } else { - this.logger.warn( - `Calendar sync failed for subscription ${subscription.id}`, - error as Error - ); + try { + await this.syncWithProvider({ + provider, + subscriptionId: subscription.id, + calendarId: subscription.externalCalendarId, + accessToken, + account, + timeMin, + timeMax, + subscriptionTimezone: subscription.timezone ?? undefined, + }); + synced = true; + } catch (syncTokenRetryError) { + await this.handleSubscriptionSyncFailure({ + error: syncTokenRetryError, + subscription, + account, + provider, + accessToken, + }); + return; } + } else { + await this.handleSubscriptionSyncFailure({ + error, + subscription, + account, + provider, + accessToken, + }); return; } } if (synced) { + await this.clearSyncFailureBackoff(subscription.id); await this.ensureWebhookChannel(subscription, provider, accessToken); } @@ -764,19 +803,6 @@ export class CalendarService { return false; } - private async invalidateAccount(accountId: string, lastError?: string) { - await this.models.calendarAccount.updateStatus( - accountId, - 'invalid', - lastError ?? null - ); - const subscriptions = - await this.models.calendarSubscription.listByAccount(accountId); - const subscriptionIds = subscriptions.map(s => s.id); - await this.models.calendarEvent.deleteBySubscriptionIds(subscriptionIds); - await this.models.calendarSubscription.clearSyncTokensByAccount(accountId); - } - private requireProvider(name: CalendarProviderName) { const provider = this.providerFactory.get(name); if (!provider) { @@ -841,4 +867,180 @@ export class CalendarService { channelExpiration: result.expiration ?? null, }); } + + private async handleSubscriptionSyncFailure(params: { + error: unknown; + subscription: { + id: string; + externalCalendarId: string; + customChannelId: string | null; + customResourceId: string | null; + }; + account: CalendarAccount; + provider: CalendarProvider; + accessToken?: string; + }) { + if (this.isSubscriptionMissingError(params.error)) { + await this.disableSubscription({ + subscriptionId: params.subscription.id, + provider: params.provider, + accessToken: params.accessToken, + customChannelId: params.subscription.customChannelId, + customResourceId: params.subscription.customResourceId, + }); + this.logger.warn( + `Calendar subscription ${params.subscription.id} was disabled because provider returned 404 for calendar ${params.subscription.externalCalendarId}` + ); + return; + } + + if (this.isTokenInvalidError(params.error)) { + await this.clearSyncFailureBackoff(params.subscription.id); + await this.models.calendarAccount.invalidateAndPurge( + params.account.id, + this.formatSyncError(params.error) + ); + return; + } + + const backoff = await this.bumpSyncFailureBackoff(params.subscription.id); + const interval = params.account.refreshIntervalMinutes ?? 60; + const lastSyncAt = this.calculateLastSyncAtForRetry( + backoff.nextRetryAt, + interval + ); + await this.models.calendarSubscription.updateLastSyncAt( + params.subscription.id, + lastSyncAt + ); + this.logger.warn( + `Calendar sync failed for subscription ${params.subscription.id}, attempt ${backoff.attempt}, next retry at ${backoff.nextRetryAt.toISOString()}`, + this.toError(params.error) + ); + } + + private isSubscriptionMissingError(error: unknown) { + if (!(error instanceof CalendarProviderRequestError)) { + return false; + } + const status = error.data?.status ?? error.status; + return status === 404; + } + + private calculateLastSyncAtForRetry( + nextRetryAt: Date, + refreshIntervalMinutes: number + ) { + // Cron schedules by `now - lastSyncAt >= refreshInterval`, so back-calculate + // a synthetic lastSyncAt to defer the next attempt to `nextRetryAt`. + return new Date(nextRetryAt.getTime() - refreshIntervalMinutes * 60 * 1000); + } + + private async disableSubscription(params: { + subscriptionId: string; + provider: CalendarProvider; + accessToken?: string; + customChannelId: string | null; + customResourceId: string | null; + }) { + if ( + params.provider.stopChannel && + params.accessToken && + params.customChannelId && + params.customResourceId + ) { + try { + await params.provider.stopChannel({ + accessToken: params.accessToken, + channelId: params.customChannelId, + resourceId: params.customResourceId, + }); + } catch (error) { + this.logger.warn( + `Failed to stop webhook channel for disabled calendar subscription ${params.subscriptionId}`, + this.toError(error) + ); + } + } + + await this.models.calendarSubscription.disableAndPurge( + params.subscriptionId + ); + await this.clearSyncFailureBackoff(params.subscriptionId); + } + + private getSyncFailureBackoffKey(subscriptionId: string) { + return `${SYNC_FAILURE_BACKOFF_KEY_PREFIX}${subscriptionId}`; + } + + private async getSyncFailureBackoff(subscriptionId: string) { + const key = this.getSyncFailureBackoffKey(subscriptionId); + const value = await this.redis.get(key); + if (!value) { + return null; + } + + try { + const parsed = JSON.parse(value) as { + attempt?: number; + nextRetryAt?: string; + }; + if (!parsed.attempt || !parsed.nextRetryAt) { + return null; + } + const nextRetryAt = new Date(parsed.nextRetryAt); + if (Number.isNaN(nextRetryAt.getTime())) { + return null; + } + return { + attempt: parsed.attempt, + nextRetryAt, + }; + } catch { + return null; + } + } + + private async bumpSyncFailureBackoff(subscriptionId: string) { + const state = await this.getSyncFailureBackoff(subscriptionId); + const attempt = (state?.attempt ?? 0) + 1; + const delay = Math.min( + SYNC_FAILURE_BACKOFF_BASE_MS * 2 ** (attempt - 1), + SYNC_FAILURE_BACKOFF_MAX_MS + ); + const nextRetryAt = new Date(Date.now() + delay); + const key = this.getSyncFailureBackoffKey(subscriptionId); + await this.redis.set( + key, + JSON.stringify({ + attempt, + nextRetryAt: nextRetryAt.toISOString(), + }), + 'EX', + SYNC_FAILURE_BACKOFF_TTL_SECONDS + ); + return { + attempt, + nextRetryAt, + }; + } + + private async clearSyncFailureBackoff(subscriptionId: string) { + const key = this.getSyncFailureBackoffKey(subscriptionId); + await this.redis.del(key); + } + + private formatSyncError(error: unknown) { + if (error instanceof Error && error.message) { + return error.message; + } + return String(error); + } + + private toError(error: unknown) { + if (error instanceof Error) { + return error; + } + return new Error(this.formatSyncError(error)); + } }