mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-07-02 02:00:49 +08:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b98ab495bb |
@@ -5,7 +5,11 @@ import test from 'ava';
|
||||
|
||||
import { createModule } from '../../../__tests__/create-module';
|
||||
import { Mockers } from '../../../__tests__/mocks';
|
||||
import { CalendarProviderRequestError, CryptoHelper } from '../../../base';
|
||||
import {
|
||||
CalendarProviderRequestError,
|
||||
CryptoHelper,
|
||||
Mutex,
|
||||
} from '../../../base';
|
||||
import { ConfigModule } from '../../../base/config';
|
||||
import { ServerConfigModule } from '../../../core/config';
|
||||
import type {
|
||||
@@ -14,6 +18,7 @@ import type {
|
||||
} from '../../../models';
|
||||
import { Models } from '../../../models';
|
||||
import { CalendarModule } from '../index';
|
||||
import { CalendarCronJobs } from '../cron';
|
||||
import {
|
||||
CalendarProvider,
|
||||
CalendarProviderFactory,
|
||||
@@ -85,8 +90,10 @@ const module = await createModule({
|
||||
],
|
||||
});
|
||||
const calendarService = module.get(CalendarService);
|
||||
const calendarCronJobs = module.get(CalendarCronJobs);
|
||||
const providerFactory = module.get(CalendarProviderFactory);
|
||||
const models = module.get(Models);
|
||||
const mutex = module.get(Mutex);
|
||||
module.get(CryptoHelper).onConfigInit();
|
||||
|
||||
const createAccount = async (
|
||||
@@ -599,3 +606,57 @@ test('syncSubscription renews webhook channel when expiring', async t => {
|
||||
t.is(updated?.customResourceId, 'new-resource');
|
||||
t.truthy(updated?.channelExpiration);
|
||||
});
|
||||
|
||||
test('pollAccounts skips syncing when cluster lock is unavailable', async t => {
|
||||
mock.method(mutex, 'acquire', async () => undefined);
|
||||
mock.method(
|
||||
models.calendarSubscription,
|
||||
'listAllWithAccountForSync',
|
||||
async () => []
|
||||
);
|
||||
const syncAccountMock = mock.method(calendarService, 'syncAccount', async () => {
|
||||
return;
|
||||
});
|
||||
|
||||
await calendarCronJobs.pollAccounts();
|
||||
|
||||
t.is(syncAccountMock.mock.callCount(), 0);
|
||||
});
|
||||
|
||||
test('pollAccounts only syncs due accounts', async t => {
|
||||
mock.method(mutex, 'acquire', async () => ({
|
||||
[Symbol.asyncDispose]: async () => {},
|
||||
}));
|
||||
mock.method(
|
||||
models.calendarSubscription,
|
||||
'listAllWithAccountForSync',
|
||||
async () =>
|
||||
[
|
||||
{
|
||||
accountId: 'due-account',
|
||||
lastSyncAt: new Date(Date.now() - 31 * 60 * 1000),
|
||||
account: {
|
||||
refreshIntervalMinutes: 30,
|
||||
},
|
||||
},
|
||||
{
|
||||
accountId: 'fresh-account',
|
||||
lastSyncAt: new Date(Date.now() - 5 * 60 * 1000),
|
||||
account: {
|
||||
refreshIntervalMinutes: 30,
|
||||
},
|
||||
},
|
||||
] as any
|
||||
);
|
||||
|
||||
const syncAccountMock = mock.method(calendarService, 'syncAccount', async () => {
|
||||
return;
|
||||
});
|
||||
|
||||
await calendarCronJobs.pollAccounts();
|
||||
|
||||
t.deepEqual(
|
||||
syncAccountMock.mock.calls.map(call => call.arguments[0]),
|
||||
['due-account']
|
||||
);
|
||||
});
|
||||
|
||||
@@ -1,18 +1,27 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { chunk } from 'lodash-es';
|
||||
|
||||
import { Mutex } from '../../base';
|
||||
import { Models } from '../../models';
|
||||
import { CalendarService } from './service';
|
||||
|
||||
const CALENDAR_POLL_LOCK_KEY = 'calendar:poll-accounts';
|
||||
const CALENDAR_POLL_BATCH_SIZE = 10;
|
||||
|
||||
@Injectable()
|
||||
export class CalendarCronJobs {
|
||||
constructor(
|
||||
private readonly models: Models,
|
||||
private readonly calendar: CalendarService
|
||||
private readonly calendar: CalendarService,
|
||||
private readonly mutex: Mutex
|
||||
) {}
|
||||
|
||||
@Cron(CronExpression.EVERY_MINUTE)
|
||||
async pollAccounts() {
|
||||
await using lock = await this.mutex.acquire(CALENDAR_POLL_LOCK_KEY);
|
||||
if (!lock) return;
|
||||
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listAllWithAccountForSync();
|
||||
|
||||
@@ -46,16 +55,18 @@ export class CalendarCronJobs {
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
await Promise.allSettled(
|
||||
Array.from(accountDueAt.entries()).map(([accountId, info]) => {
|
||||
if (
|
||||
const dueAccountIds = Array.from(accountDueAt.entries())
|
||||
.filter(
|
||||
([, info]) =>
|
||||
!info.lastSyncAt ||
|
||||
now - info.lastSyncAt.getTime() >= info.refreshInterval * 60 * 1000
|
||||
) {
|
||||
return this.calendar.syncAccount(accountId);
|
||||
}
|
||||
return Promise.resolve();
|
||||
})
|
||||
);
|
||||
)
|
||||
.map(([accountId]) => accountId);
|
||||
|
||||
for (const accountIds of chunk(dueAccountIds, CALENDAR_POLL_BATCH_SIZE)) {
|
||||
await Promise.allSettled(
|
||||
accountIds.map(accountId => this.calendar.syncAccount(accountId))
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Transactional } from '@nestjs-cls/transactional';
|
||||
import type { CalendarAccount, Prisma } from '@prisma/client';
|
||||
import { addDays, subDays } from 'date-fns';
|
||||
import { chunk } from 'lodash-es';
|
||||
|
||||
import {
|
||||
CalendarProviderRequestError,
|
||||
@@ -32,6 +33,7 @@ const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:';
|
||||
const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60;
|
||||
const ACCOUNT_SYNC_BATCH_SIZE = 10;
|
||||
|
||||
@Injectable()
|
||||
export class CalendarService {
|
||||
@@ -433,11 +435,13 @@ export class CalendarService {
|
||||
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listByAccountForSync(accountId);
|
||||
await Promise.allSettled(
|
||||
subscriptions.map(subscription =>
|
||||
this.syncSubscription(subscription.id, { reason: 'polling' })
|
||||
)
|
||||
);
|
||||
for (const batch of chunk(subscriptions, ACCOUNT_SYNC_BATCH_SIZE)) {
|
||||
await Promise.allSettled(
|
||||
batch.map(subscription =>
|
||||
this.syncSubscription(subscription.id, { reason: 'polling' })
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async listWorkspaceEvents(params: {
|
||||
|
||||
@@ -33,19 +33,37 @@ test('should not index workspace if indexer is disabled', async t => {
|
||||
const count = module.queue.count('indexer.indexWorkspace');
|
||||
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'test-workspace',
|
||||
});
|
||||
|
||||
t.is(module.queue.count('indexer.indexWorkspace'), count);
|
||||
});
|
||||
|
||||
test('should index workspace if indexer is enabled', async t => {
|
||||
test('should index workspace when root snapshot is updated', async t => {
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'test-workspace',
|
||||
});
|
||||
|
||||
const { payload } = await module.queue.waitFor('indexer.indexWorkspace');
|
||||
t.is(payload.workspaceId, 'test-workspace');
|
||||
});
|
||||
|
||||
test('should not index workspace when non-root snapshot is updated', async t => {
|
||||
const count = module.queue.count('indexer.indexWorkspace');
|
||||
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'child-doc',
|
||||
});
|
||||
|
||||
t.is(module.queue.count('indexer.indexWorkspace'), count);
|
||||
});
|
||||
|
||||
test('should not delete workspace if indexer is disabled', async t => {
|
||||
Sinon.stub(config.indexer, 'enabled').value(false);
|
||||
const count = module.queue.count('indexer.deleteWorkspace');
|
||||
|
||||
@@ -29,21 +29,20 @@ export class IndexerEvent {
|
||||
);
|
||||
}
|
||||
|
||||
@OnEvent('workspace.updated')
|
||||
async indexWorkspace({ id }: Events['workspace.updated']) {
|
||||
@OnEvent('doc.snapshot.updated')
|
||||
async indexWorkspace({ workspaceId, docId }: Events['doc.snapshot.updated']) {
|
||||
if (!this.config.indexer.enabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (workspaceId !== docId) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.queue.add(
|
||||
'indexer.indexWorkspace',
|
||||
{
|
||||
workspaceId: id,
|
||||
},
|
||||
{
|
||||
jobId: `indexWorkspace/${id}`,
|
||||
priority: 100,
|
||||
}
|
||||
{ workspaceId },
|
||||
{ jobId: `indexWorkspace/${workspaceId}`, priority: 100 }
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user