Compare commits

...

1 Commits

Author SHA1 Message Date
DarkSky b98ab495bb fix(server): race condition for sync 2026-04-03 02:00:02 +08:00
5 changed files with 121 additions and 28 deletions
@@ -5,7 +5,11 @@ import test from 'ava';
import { createModule } from '../../../__tests__/create-module';
import { Mockers } from '../../../__tests__/mocks';
import { CalendarProviderRequestError, CryptoHelper } from '../../../base';
import {
CalendarProviderRequestError,
CryptoHelper,
Mutex,
} from '../../../base';
import { ConfigModule } from '../../../base/config';
import { ServerConfigModule } from '../../../core/config';
import type {
@@ -14,6 +18,7 @@ import type {
} from '../../../models';
import { Models } from '../../../models';
import { CalendarModule } from '../index';
import { CalendarCronJobs } from '../cron';
import {
CalendarProvider,
CalendarProviderFactory,
@@ -85,8 +90,10 @@ const module = await createModule({
],
});
const calendarService = module.get(CalendarService);
const calendarCronJobs = module.get(CalendarCronJobs);
const providerFactory = module.get(CalendarProviderFactory);
const models = module.get(Models);
const mutex = module.get(Mutex);
module.get(CryptoHelper).onConfigInit();
const createAccount = async (
@@ -599,3 +606,57 @@ test('syncSubscription renews webhook channel when expiring', async t => {
t.is(updated?.customResourceId, 'new-resource');
t.truthy(updated?.channelExpiration);
});
test('pollAccounts skips syncing when cluster lock is unavailable', async t => {
mock.method(mutex, 'acquire', async () => undefined);
mock.method(
models.calendarSubscription,
'listAllWithAccountForSync',
async () => []
);
const syncAccountMock = mock.method(calendarService, 'syncAccount', async () => {
return;
});
await calendarCronJobs.pollAccounts();
t.is(syncAccountMock.mock.callCount(), 0);
});
test('pollAccounts only syncs due accounts', async t => {
mock.method(mutex, 'acquire', async () => ({
[Symbol.asyncDispose]: async () => {},
}));
mock.method(
models.calendarSubscription,
'listAllWithAccountForSync',
async () =>
[
{
accountId: 'due-account',
lastSyncAt: new Date(Date.now() - 31 * 60 * 1000),
account: {
refreshIntervalMinutes: 30,
},
},
{
accountId: 'fresh-account',
lastSyncAt: new Date(Date.now() - 5 * 60 * 1000),
account: {
refreshIntervalMinutes: 30,
},
},
] as any
);
const syncAccountMock = mock.method(calendarService, 'syncAccount', async () => {
return;
});
await calendarCronJobs.pollAccounts();
t.deepEqual(
syncAccountMock.mock.calls.map(call => call.arguments[0]),
['due-account']
);
});
@@ -1,18 +1,27 @@
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { chunk } from 'lodash-es';
import { Mutex } from '../../base';
import { Models } from '../../models';
import { CalendarService } from './service';
const CALENDAR_POLL_LOCK_KEY = 'calendar:poll-accounts';
const CALENDAR_POLL_BATCH_SIZE = 10;
@Injectable()
export class CalendarCronJobs {
constructor(
private readonly models: Models,
private readonly calendar: CalendarService
private readonly calendar: CalendarService,
private readonly mutex: Mutex
) {}
@Cron(CronExpression.EVERY_MINUTE)
async pollAccounts() {
await using lock = await this.mutex.acquire(CALENDAR_POLL_LOCK_KEY);
if (!lock) return;
const subscriptions =
await this.models.calendarSubscription.listAllWithAccountForSync();
@@ -46,16 +55,18 @@ export class CalendarCronJobs {
}
const now = Date.now();
await Promise.allSettled(
Array.from(accountDueAt.entries()).map(([accountId, info]) => {
if (
const dueAccountIds = Array.from(accountDueAt.entries())
.filter(
([, info]) =>
!info.lastSyncAt ||
now - info.lastSyncAt.getTime() >= info.refreshInterval * 60 * 1000
) {
return this.calendar.syncAccount(accountId);
}
return Promise.resolve();
})
);
)
.map(([accountId]) => accountId);
for (const accountIds of chunk(dueAccountIds, CALENDAR_POLL_BATCH_SIZE)) {
await Promise.allSettled(
accountIds.map(accountId => this.calendar.syncAccount(accountId))
);
}
}
}
@@ -4,6 +4,7 @@ import { Injectable, Logger } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import type { CalendarAccount, Prisma } from '@prisma/client';
import { addDays, subDays } from 'date-fns';
import { chunk } from 'lodash-es';
import {
CalendarProviderRequestError,
@@ -32,6 +33,7 @@ const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:';
const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000;
const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000;
const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60;
const ACCOUNT_SYNC_BATCH_SIZE = 10;
@Injectable()
export class CalendarService {
@@ -433,11 +435,13 @@ export class CalendarService {
const subscriptions =
await this.models.calendarSubscription.listByAccountForSync(accountId);
await Promise.allSettled(
subscriptions.map(subscription =>
this.syncSubscription(subscription.id, { reason: 'polling' })
)
);
for (const batch of chunk(subscriptions, ACCOUNT_SYNC_BATCH_SIZE)) {
await Promise.allSettled(
batch.map(subscription =>
this.syncSubscription(subscription.id, { reason: 'polling' })
)
);
}
}
async listWorkspaceEvents(params: {
@@ -33,19 +33,37 @@ test('should not index workspace if indexer is disabled', async t => {
const count = module.queue.count('indexer.indexWorkspace');
// @ts-expect-error ignore missing fields
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
await indexerEvent.indexWorkspace({
workspaceId: 'test-workspace',
docId: 'test-workspace',
});
t.is(module.queue.count('indexer.indexWorkspace'), count);
});
test('should index workspace if indexer is enabled', async t => {
test('should index workspace when root snapshot is updated', async t => {
// @ts-expect-error ignore missing fields
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
await indexerEvent.indexWorkspace({
workspaceId: 'test-workspace',
docId: 'test-workspace',
});
const { payload } = await module.queue.waitFor('indexer.indexWorkspace');
t.is(payload.workspaceId, 'test-workspace');
});
test('should not index workspace when non-root snapshot is updated', async t => {
const count = module.queue.count('indexer.indexWorkspace');
// @ts-expect-error ignore missing fields
await indexerEvent.indexWorkspace({
workspaceId: 'test-workspace',
docId: 'child-doc',
});
t.is(module.queue.count('indexer.indexWorkspace'), count);
});
test('should not delete workspace if indexer is disabled', async t => {
Sinon.stub(config.indexer, 'enabled').value(false);
const count = module.queue.count('indexer.deleteWorkspace');
@@ -29,21 +29,20 @@ export class IndexerEvent {
);
}
@OnEvent('workspace.updated')
async indexWorkspace({ id }: Events['workspace.updated']) {
@OnEvent('doc.snapshot.updated')
async indexWorkspace({ workspaceId, docId }: Events['doc.snapshot.updated']) {
if (!this.config.indexer.enabled) {
return;
}
if (workspaceId !== docId) {
return;
}
await this.queue.add(
'indexer.indexWorkspace',
{
workspaceId: id,
},
{
jobId: `indexWorkspace/${id}`,
priority: 100,
}
{ workspaceId },
{ jobId: `indexWorkspace/${workspaceId}`, priority: 100 }
);
}