mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-07-02 02:00:49 +08:00
feat: handle calendar sync failed (#14510)
#### PR Dependency Tree * **PR #14510** 👈 This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Improved calendar sync reliability with exponential backoff for repeated failures. * Better handling of token refresh failures with automatic account invalidation and cleanup when needed. * Subscriptions are now automatically disabled and related events removed when the calendar provider reports missing resources. * **Tests** * Added comprehensive tests covering sync failures, backoff behavior, token refresh handling, skipping retries during backoff, and recovery. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Transactional } from '@nestjs-cls/transactional';
|
||||
import type { CalendarAccount, Prisma } from '@prisma/client';
|
||||
|
||||
import { CryptoHelper } from '../base';
|
||||
@@ -174,6 +175,18 @@ export class CalendarAccountModel extends BaseModel {
|
||||
});
|
||||
}
|
||||
|
||||
@Transactional()
|
||||
async invalidateAndPurge(id: string, lastError?: string | null) {
|
||||
await this.updateStatus(id, 'invalid', lastError ?? null);
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listByAccount(id);
|
||||
const subscriptionIds = subscriptions.map(subscription => subscription.id);
|
||||
if (subscriptionIds.length > 0) {
|
||||
await this.models.calendarEvent.deleteBySubscriptionIds(subscriptionIds);
|
||||
}
|
||||
await this.models.calendarSubscription.clearSyncTokensByAccount(id);
|
||||
}
|
||||
|
||||
async delete(id: string) {
|
||||
return await this.db.calendarAccount.delete({
|
||||
where: { id },
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Transactional } from '@nestjs-cls/transactional';
|
||||
import type { CalendarSubscription, Prisma } from '@prisma/client';
|
||||
|
||||
import { BaseModel } from './base';
|
||||
@@ -191,4 +192,20 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
@Transactional()
|
||||
async disableAndPurge(subscriptionId: string) {
|
||||
await this.db.calendarSubscription.update({
|
||||
where: { id: subscriptionId },
|
||||
data: {
|
||||
enabled: false,
|
||||
syncToken: null,
|
||||
customChannelId: null,
|
||||
customResourceId: null,
|
||||
channelExpiration: null,
|
||||
},
|
||||
});
|
||||
|
||||
await this.models.calendarEvent.deleteBySubscriptionIds([subscriptionId]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import test from 'ava';
|
||||
|
||||
import { createModule } from '../../../__tests__/create-module';
|
||||
import { Mockers } from '../../../__tests__/mocks';
|
||||
import { CryptoHelper } from '../../../base';
|
||||
import { CalendarProviderRequestError, CryptoHelper } from '../../../base';
|
||||
import { ConfigModule } from '../../../base/config';
|
||||
import { ServerConfigModule } from '../../../core/config';
|
||||
import type {
|
||||
@@ -319,6 +319,227 @@ test('syncSubscription invalidates account on invalid grant', async t => {
|
||||
t.is(events.length, 0);
|
||||
});
|
||||
|
||||
test('syncSubscription invalidates account when refresh token is invalid', async t => {
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id, {
|
||||
accessToken: 'expired-access-token',
|
||||
expiresAt: new Date(Date.now() - 5 * 60 * 1000),
|
||||
});
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
});
|
||||
|
||||
await models.calendarEvent.upsert({
|
||||
subscriptionId: subscription.id,
|
||||
externalEventId: randomUUID(),
|
||||
recurrenceId: null,
|
||||
etag: null,
|
||||
status: 'confirmed',
|
||||
title: 'existing',
|
||||
description: null,
|
||||
location: null,
|
||||
startAtUtc: new Date('2024-01-02T00:00:00.000Z'),
|
||||
endAtUtc: new Date('2024-01-02T01:00:00.000Z'),
|
||||
originalTimezone: 'UTC',
|
||||
allDay: false,
|
||||
providerUpdatedAt: null,
|
||||
raw: {},
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
const refreshMock = mock.method(provider, 'refreshTokens', async () => {
|
||||
throw new Error('invalid_grant');
|
||||
});
|
||||
const listEventsMock = mock.method(provider, 'listEvents', async () => ({
|
||||
events: [],
|
||||
}));
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
|
||||
t.is(refreshMock.mock.callCount(), 1);
|
||||
t.is(listEventsMock.mock.callCount(), 0);
|
||||
|
||||
const updatedAccount = await models.calendarAccount.get(account.id);
|
||||
t.is(updatedAccount?.status, 'invalid');
|
||||
t.truthy(updatedAccount?.lastError);
|
||||
|
||||
const updatedSubscription = await models.calendarSubscription.get(
|
||||
subscription.id
|
||||
);
|
||||
t.is(updatedSubscription?.syncToken, null);
|
||||
|
||||
const events = await models.calendarEvent.listBySubscriptionsInRange(
|
||||
[subscription.id],
|
||||
new Date('2024-01-01T00:00:00.000Z'),
|
||||
new Date('2024-01-03T00:00:00.000Z')
|
||||
);
|
||||
t.is(events.length, 0);
|
||||
});
|
||||
|
||||
test('syncSubscription disables subscription on provider 404', async t => {
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id);
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
});
|
||||
|
||||
await models.calendarEvent.upsert({
|
||||
subscriptionId: subscription.id,
|
||||
externalEventId: randomUUID(),
|
||||
recurrenceId: null,
|
||||
etag: null,
|
||||
status: 'confirmed',
|
||||
title: 'to remove',
|
||||
description: null,
|
||||
location: null,
|
||||
startAtUtc: new Date('2026-01-02T00:00:00.000Z'),
|
||||
endAtUtc: new Date('2026-01-02T01:00:00.000Z'),
|
||||
originalTimezone: 'UTC',
|
||||
allDay: false,
|
||||
providerUpdatedAt: null,
|
||||
raw: {},
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
const listEventsMock = mock.method(provider, 'listEvents', async () => {
|
||||
throw new CalendarProviderRequestError({
|
||||
status: 404,
|
||||
message: JSON.stringify({
|
||||
error: {
|
||||
code: 404,
|
||||
message: 'Not Found',
|
||||
errors: [{ reason: 'notFound' }],
|
||||
},
|
||||
}),
|
||||
});
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
|
||||
t.is(listEventsMock.mock.callCount(), 1);
|
||||
|
||||
const updatedSubscription = await models.calendarSubscription.get(
|
||||
subscription.id
|
||||
);
|
||||
t.truthy(updatedSubscription);
|
||||
t.is(updatedSubscription?.enabled, false);
|
||||
t.is(updatedSubscription?.syncToken, null);
|
||||
|
||||
const events = await models.calendarEvent.listBySubscriptionsInRange(
|
||||
[subscription.id],
|
||||
new Date('2024-01-01T00:00:00.000Z'),
|
||||
new Date('2026-12-31T00:00:00.000Z')
|
||||
);
|
||||
t.is(events.length, 0);
|
||||
});
|
||||
|
||||
test('syncSubscription rolls back disable when event cleanup fails', async t => {
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id);
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
mock.method(provider, 'listEvents', async () => {
|
||||
throw new CalendarProviderRequestError({
|
||||
status: 404,
|
||||
message: JSON.stringify({
|
||||
error: {
|
||||
code: 404,
|
||||
message: 'Not Found',
|
||||
errors: [{ reason: 'notFound' }],
|
||||
},
|
||||
}),
|
||||
});
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
mock.method(models.calendarEvent, 'deleteBySubscriptionIds', async () => {
|
||||
throw new Error('delete events failed');
|
||||
});
|
||||
|
||||
await t.throwsAsync(calendarService.syncSubscription(subscription.id), {
|
||||
message: 'delete events failed',
|
||||
});
|
||||
|
||||
const updatedSubscription = await models.calendarSubscription.get(
|
||||
subscription.id
|
||||
);
|
||||
t.truthy(updatedSubscription);
|
||||
t.is(updatedSubscription?.enabled, true);
|
||||
t.is(updatedSubscription?.syncToken, 'sync-token');
|
||||
});
|
||||
|
||||
test('syncSubscription applies exponential backoff for repeated failures', async t => {
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id, {
|
||||
refreshIntervalMinutes: 1,
|
||||
});
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
const listEventsMock = mock.method(provider, 'listEvents', async () => {
|
||||
throw new Error('upstream timeout');
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
const baseDelayMs = 5 * 60 * 1000;
|
||||
let now = new Date('2026-01-01T00:00:00.000Z').getTime();
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 1);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
});
|
||||
|
||||
test('syncSubscription skips token refresh while in backoff window', async t => {
|
||||
let now = new Date('2026-01-01T00:00:00.000Z').getTime();
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id, {
|
||||
accessToken: 'expired-access-token',
|
||||
expiresAt: new Date(now - 5 * 60 * 1000),
|
||||
});
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
const refreshMock = mock.method(provider, 'refreshTokens', async () => ({
|
||||
accessToken: `refreshed-${randomUUID()}`,
|
||||
}));
|
||||
const listEventsMock = mock.method(provider, 'listEvents', async () => {
|
||||
throw new Error('upstream timeout');
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
const baseDelayMs = 5 * 60 * 1000;
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(refreshMock.mock.callCount(), 1);
|
||||
t.is(listEventsMock.mock.callCount(), 1);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(refreshMock.mock.callCount(), 2);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
});
|
||||
|
||||
test('syncSubscription renews webhook channel when expiring', async t => {
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id);
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
Mutex,
|
||||
URLHelper,
|
||||
} from '../../base';
|
||||
import { SessionRedis } from '../../base/redis';
|
||||
import { Models } from '../../models';
|
||||
import type { CalendarCalDAVProviderPreset } from './config';
|
||||
import {
|
||||
@@ -27,6 +28,10 @@ import type { LinkCalDAVAccountInput } from './types';
|
||||
const TOKEN_REFRESH_SKEW_MS = 60 * 1000;
|
||||
const DEFAULT_PAST_DAYS = 90;
|
||||
const DEFAULT_FUTURE_DAYS = 180;
|
||||
const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:';
|
||||
const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60;
|
||||
|
||||
@Injectable()
|
||||
export class CalendarService {
|
||||
@@ -37,6 +42,7 @@ export class CalendarService {
|
||||
private readonly models: Models,
|
||||
private readonly providerFactory: CalendarProviderFactory<CalendarProvider>,
|
||||
private readonly mutex: Mutex,
|
||||
private readonly redis: SessionRedis,
|
||||
private readonly config: Config,
|
||||
private readonly url: URLHelper
|
||||
) {}
|
||||
@@ -307,6 +313,12 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const backoff = await this.getSyncFailureBackoff(subscription.id);
|
||||
if (backoff && now < backoff.nextRetryAt.getTime()) {
|
||||
return;
|
||||
}
|
||||
|
||||
await using lock = await this.mutex.acquire(
|
||||
`calendar:subscription:${subscriptionId}`
|
||||
);
|
||||
@@ -314,6 +326,12 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const lockedNow = Date.now();
|
||||
const lockedBackoff = await this.getSyncFailureBackoff(subscription.id);
|
||||
if (lockedBackoff && lockedNow < lockedBackoff.nextRetryAt.getTime()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const provider = this.providerFactory.get(
|
||||
account.provider as CalendarProviderName
|
||||
);
|
||||
@@ -321,8 +339,18 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const { accessToken } = await this.ensureAccessToken(account);
|
||||
if (!accessToken) {
|
||||
let accessToken: string | null = null;
|
||||
try {
|
||||
const tokens = await this.ensureAccessToken(account);
|
||||
if (!tokens.accessToken) return;
|
||||
accessToken = tokens.accessToken;
|
||||
} catch (error) {
|
||||
await this.handleSubscriptionSyncFailure({
|
||||
error,
|
||||
subscription,
|
||||
account,
|
||||
provider,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -352,31 +380,42 @@ export class CalendarService {
|
||||
await this.models.calendarSubscription.updateSync(subscription.id, {
|
||||
syncToken: null,
|
||||
});
|
||||
await this.syncWithProvider({
|
||||
provider,
|
||||
subscriptionId: subscription.id,
|
||||
calendarId: subscription.externalCalendarId,
|
||||
accessToken,
|
||||
account,
|
||||
timeMin,
|
||||
timeMax,
|
||||
subscriptionTimezone: subscription.timezone ?? undefined,
|
||||
});
|
||||
synced = true;
|
||||
} else {
|
||||
if (this.isTokenInvalidError(error)) {
|
||||
await this.invalidateAccount(account.id, (error as Error).message);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Calendar sync failed for subscription ${subscription.id}`,
|
||||
error as Error
|
||||
);
|
||||
try {
|
||||
await this.syncWithProvider({
|
||||
provider,
|
||||
subscriptionId: subscription.id,
|
||||
calendarId: subscription.externalCalendarId,
|
||||
accessToken,
|
||||
account,
|
||||
timeMin,
|
||||
timeMax,
|
||||
subscriptionTimezone: subscription.timezone ?? undefined,
|
||||
});
|
||||
synced = true;
|
||||
} catch (syncTokenRetryError) {
|
||||
await this.handleSubscriptionSyncFailure({
|
||||
error: syncTokenRetryError,
|
||||
subscription,
|
||||
account,
|
||||
provider,
|
||||
accessToken,
|
||||
});
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
await this.handleSubscriptionSyncFailure({
|
||||
error,
|
||||
subscription,
|
||||
account,
|
||||
provider,
|
||||
accessToken,
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (synced) {
|
||||
await this.clearSyncFailureBackoff(subscription.id);
|
||||
await this.ensureWebhookChannel(subscription, provider, accessToken);
|
||||
}
|
||||
|
||||
@@ -764,19 +803,6 @@ export class CalendarService {
|
||||
return false;
|
||||
}
|
||||
|
||||
private async invalidateAccount(accountId: string, lastError?: string) {
|
||||
await this.models.calendarAccount.updateStatus(
|
||||
accountId,
|
||||
'invalid',
|
||||
lastError ?? null
|
||||
);
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listByAccount(accountId);
|
||||
const subscriptionIds = subscriptions.map(s => s.id);
|
||||
await this.models.calendarEvent.deleteBySubscriptionIds(subscriptionIds);
|
||||
await this.models.calendarSubscription.clearSyncTokensByAccount(accountId);
|
||||
}
|
||||
|
||||
private requireProvider(name: CalendarProviderName) {
|
||||
const provider = this.providerFactory.get(name);
|
||||
if (!provider) {
|
||||
@@ -841,4 +867,180 @@ export class CalendarService {
|
||||
channelExpiration: result.expiration ?? null,
|
||||
});
|
||||
}
|
||||
|
||||
private async handleSubscriptionSyncFailure(params: {
|
||||
error: unknown;
|
||||
subscription: {
|
||||
id: string;
|
||||
externalCalendarId: string;
|
||||
customChannelId: string | null;
|
||||
customResourceId: string | null;
|
||||
};
|
||||
account: CalendarAccount;
|
||||
provider: CalendarProvider;
|
||||
accessToken?: string;
|
||||
}) {
|
||||
if (this.isSubscriptionMissingError(params.error)) {
|
||||
await this.disableSubscription({
|
||||
subscriptionId: params.subscription.id,
|
||||
provider: params.provider,
|
||||
accessToken: params.accessToken,
|
||||
customChannelId: params.subscription.customChannelId,
|
||||
customResourceId: params.subscription.customResourceId,
|
||||
});
|
||||
this.logger.warn(
|
||||
`Calendar subscription ${params.subscription.id} was disabled because provider returned 404 for calendar ${params.subscription.externalCalendarId}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isTokenInvalidError(params.error)) {
|
||||
await this.clearSyncFailureBackoff(params.subscription.id);
|
||||
await this.models.calendarAccount.invalidateAndPurge(
|
||||
params.account.id,
|
||||
this.formatSyncError(params.error)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const backoff = await this.bumpSyncFailureBackoff(params.subscription.id);
|
||||
const interval = params.account.refreshIntervalMinutes ?? 60;
|
||||
const lastSyncAt = this.calculateLastSyncAtForRetry(
|
||||
backoff.nextRetryAt,
|
||||
interval
|
||||
);
|
||||
await this.models.calendarSubscription.updateLastSyncAt(
|
||||
params.subscription.id,
|
||||
lastSyncAt
|
||||
);
|
||||
this.logger.warn(
|
||||
`Calendar sync failed for subscription ${params.subscription.id}, attempt ${backoff.attempt}, next retry at ${backoff.nextRetryAt.toISOString()}`,
|
||||
this.toError(params.error)
|
||||
);
|
||||
}
|
||||
|
||||
private isSubscriptionMissingError(error: unknown) {
|
||||
if (!(error instanceof CalendarProviderRequestError)) {
|
||||
return false;
|
||||
}
|
||||
const status = error.data?.status ?? error.status;
|
||||
return status === 404;
|
||||
}
|
||||
|
||||
private calculateLastSyncAtForRetry(
|
||||
nextRetryAt: Date,
|
||||
refreshIntervalMinutes: number
|
||||
) {
|
||||
// Cron schedules by `now - lastSyncAt >= refreshInterval`, so back-calculate
|
||||
// a synthetic lastSyncAt to defer the next attempt to `nextRetryAt`.
|
||||
return new Date(nextRetryAt.getTime() - refreshIntervalMinutes * 60 * 1000);
|
||||
}
|
||||
|
||||
private async disableSubscription(params: {
|
||||
subscriptionId: string;
|
||||
provider: CalendarProvider;
|
||||
accessToken?: string;
|
||||
customChannelId: string | null;
|
||||
customResourceId: string | null;
|
||||
}) {
|
||||
if (
|
||||
params.provider.stopChannel &&
|
||||
params.accessToken &&
|
||||
params.customChannelId &&
|
||||
params.customResourceId
|
||||
) {
|
||||
try {
|
||||
await params.provider.stopChannel({
|
||||
accessToken: params.accessToken,
|
||||
channelId: params.customChannelId,
|
||||
resourceId: params.customResourceId,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to stop webhook channel for disabled calendar subscription ${params.subscriptionId}`,
|
||||
this.toError(error)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await this.models.calendarSubscription.disableAndPurge(
|
||||
params.subscriptionId
|
||||
);
|
||||
await this.clearSyncFailureBackoff(params.subscriptionId);
|
||||
}
|
||||
|
||||
private getSyncFailureBackoffKey(subscriptionId: string) {
|
||||
return `${SYNC_FAILURE_BACKOFF_KEY_PREFIX}${subscriptionId}`;
|
||||
}
|
||||
|
||||
private async getSyncFailureBackoff(subscriptionId: string) {
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
const value = await this.redis.get(key);
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(value) as {
|
||||
attempt?: number;
|
||||
nextRetryAt?: string;
|
||||
};
|
||||
if (!parsed.attempt || !parsed.nextRetryAt) {
|
||||
return null;
|
||||
}
|
||||
const nextRetryAt = new Date(parsed.nextRetryAt);
|
||||
if (Number.isNaN(nextRetryAt.getTime())) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
attempt: parsed.attempt,
|
||||
nextRetryAt,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async bumpSyncFailureBackoff(subscriptionId: string) {
|
||||
const state = await this.getSyncFailureBackoff(subscriptionId);
|
||||
const attempt = (state?.attempt ?? 0) + 1;
|
||||
const delay = Math.min(
|
||||
SYNC_FAILURE_BACKOFF_BASE_MS * 2 ** (attempt - 1),
|
||||
SYNC_FAILURE_BACKOFF_MAX_MS
|
||||
);
|
||||
const nextRetryAt = new Date(Date.now() + delay);
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
await this.redis.set(
|
||||
key,
|
||||
JSON.stringify({
|
||||
attempt,
|
||||
nextRetryAt: nextRetryAt.toISOString(),
|
||||
}),
|
||||
'EX',
|
||||
SYNC_FAILURE_BACKOFF_TTL_SECONDS
|
||||
);
|
||||
return {
|
||||
attempt,
|
||||
nextRetryAt,
|
||||
};
|
||||
}
|
||||
|
||||
private async clearSyncFailureBackoff(subscriptionId: string) {
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
await this.redis.del(key);
|
||||
}
|
||||
|
||||
private formatSyncError(error: unknown) {
|
||||
if (error instanceof Error && error.message) {
|
||||
return error.message;
|
||||
}
|
||||
return String(error);
|
||||
}
|
||||
|
||||
private toError(error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
return error;
|
||||
}
|
||||
return new Error(this.formatSyncError(error));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user