Compare commits

...

2 Commits

Author SHA1 Message Date
DarkSky b98ab495bb fix(server): race condition for sync 2026-04-03 02:00:02 +08:00
DarkSky 99b07c2ee1 fix: ios marketing version 2026-03-04 01:17:14 +08:00
6 changed files with 173 additions and 32 deletions
@@ -5,7 +5,11 @@ import test from 'ava';
import { createModule } from '../../../__tests__/create-module';
import { Mockers } from '../../../__tests__/mocks';
import { CalendarProviderRequestError, CryptoHelper } from '../../../base';
import {
CalendarProviderRequestError,
CryptoHelper,
Mutex,
} from '../../../base';
import { ConfigModule } from '../../../base/config';
import { ServerConfigModule } from '../../../core/config';
import type {
@@ -14,6 +18,7 @@ import type {
} from '../../../models';
import { Models } from '../../../models';
import { CalendarModule } from '../index';
import { CalendarCronJobs } from '../cron';
import {
CalendarProvider,
CalendarProviderFactory,
@@ -85,8 +90,10 @@ const module = await createModule({
],
});
const calendarService = module.get(CalendarService);
const calendarCronJobs = module.get(CalendarCronJobs);
const providerFactory = module.get(CalendarProviderFactory);
const models = module.get(Models);
const mutex = module.get(Mutex);
module.get(CryptoHelper).onConfigInit();
const createAccount = async (
@@ -599,3 +606,57 @@ test('syncSubscription renews webhook channel when expiring', async t => {
t.is(updated?.customResourceId, 'new-resource');
t.truthy(updated?.channelExpiration);
});
test('pollAccounts skips syncing when cluster lock is unavailable', async t => {
mock.method(mutex, 'acquire', async () => undefined);
mock.method(
models.calendarSubscription,
'listAllWithAccountForSync',
async () => []
);
const syncAccountMock = mock.method(calendarService, 'syncAccount', async () => {
return;
});
await calendarCronJobs.pollAccounts();
t.is(syncAccountMock.mock.callCount(), 0);
});
test('pollAccounts only syncs due accounts', async t => {
mock.method(mutex, 'acquire', async () => ({
[Symbol.asyncDispose]: async () => {},
}));
mock.method(
models.calendarSubscription,
'listAllWithAccountForSync',
async () =>
[
{
accountId: 'due-account',
lastSyncAt: new Date(Date.now() - 31 * 60 * 1000),
account: {
refreshIntervalMinutes: 30,
},
},
{
accountId: 'fresh-account',
lastSyncAt: new Date(Date.now() - 5 * 60 * 1000),
account: {
refreshIntervalMinutes: 30,
},
},
] as any
);
const syncAccountMock = mock.method(calendarService, 'syncAccount', async () => {
return;
});
await calendarCronJobs.pollAccounts();
t.deepEqual(
syncAccountMock.mock.calls.map(call => call.arguments[0]),
['due-account']
);
});
@@ -1,18 +1,27 @@
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { chunk } from 'lodash-es';
import { Mutex } from '../../base';
import { Models } from '../../models';
import { CalendarService } from './service';
const CALENDAR_POLL_LOCK_KEY = 'calendar:poll-accounts';
const CALENDAR_POLL_BATCH_SIZE = 10;
@Injectable()
export class CalendarCronJobs {
constructor(
private readonly models: Models,
private readonly calendar: CalendarService
private readonly calendar: CalendarService,
private readonly mutex: Mutex
) {}
@Cron(CronExpression.EVERY_MINUTE)
async pollAccounts() {
await using lock = await this.mutex.acquire(CALENDAR_POLL_LOCK_KEY);
if (!lock) return;
const subscriptions =
await this.models.calendarSubscription.listAllWithAccountForSync();
@@ -46,16 +55,18 @@ export class CalendarCronJobs {
}
const now = Date.now();
await Promise.allSettled(
Array.from(accountDueAt.entries()).map(([accountId, info]) => {
if (
const dueAccountIds = Array.from(accountDueAt.entries())
.filter(
([, info]) =>
!info.lastSyncAt ||
now - info.lastSyncAt.getTime() >= info.refreshInterval * 60 * 1000
) {
return this.calendar.syncAccount(accountId);
}
return Promise.resolve();
})
);
)
.map(([accountId]) => accountId);
for (const accountIds of chunk(dueAccountIds, CALENDAR_POLL_BATCH_SIZE)) {
await Promise.allSettled(
accountIds.map(accountId => this.calendar.syncAccount(accountId))
);
}
}
}
@@ -4,6 +4,7 @@ import { Injectable, Logger } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import type { CalendarAccount, Prisma } from '@prisma/client';
import { addDays, subDays } from 'date-fns';
import { chunk } from 'lodash-es';
import {
CalendarProviderRequestError,
@@ -32,6 +33,7 @@ const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:';
const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000;
const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000;
const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60;
const ACCOUNT_SYNC_BATCH_SIZE = 10;
@Injectable()
export class CalendarService {
@@ -433,11 +435,13 @@ export class CalendarService {
const subscriptions =
await this.models.calendarSubscription.listByAccountForSync(accountId);
await Promise.allSettled(
subscriptions.map(subscription =>
this.syncSubscription(subscription.id, { reason: 'polling' })
)
);
for (const batch of chunk(subscriptions, ACCOUNT_SYNC_BATCH_SIZE)) {
await Promise.allSettled(
batch.map(subscription =>
this.syncSubscription(subscription.id, { reason: 'polling' })
)
);
}
}
async listWorkspaceEvents(params: {
@@ -33,19 +33,37 @@ test('should not index workspace if indexer is disabled', async t => {
const count = module.queue.count('indexer.indexWorkspace');
// @ts-expect-error ignore missing fields
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
await indexerEvent.indexWorkspace({
workspaceId: 'test-workspace',
docId: 'test-workspace',
});
t.is(module.queue.count('indexer.indexWorkspace'), count);
});
test('should index workspace if indexer is enabled', async t => {
test('should index workspace when root snapshot is updated', async t => {
// @ts-expect-error ignore missing fields
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
await indexerEvent.indexWorkspace({
workspaceId: 'test-workspace',
docId: 'test-workspace',
});
const { payload } = await module.queue.waitFor('indexer.indexWorkspace');
t.is(payload.workspaceId, 'test-workspace');
});
test('should not index workspace when non-root snapshot is updated', async t => {
const count = module.queue.count('indexer.indexWorkspace');
// @ts-expect-error ignore missing fields
await indexerEvent.indexWorkspace({
workspaceId: 'test-workspace',
docId: 'child-doc',
});
t.is(module.queue.count('indexer.indexWorkspace'), count);
});
test('should not delete workspace if indexer is disabled', async t => {
Sinon.stub(config.indexer, 'enabled').value(false);
const count = module.queue.count('indexer.deleteWorkspace');
@@ -29,21 +29,20 @@ export class IndexerEvent {
);
}
@OnEvent('workspace.updated')
async indexWorkspace({ id }: Events['workspace.updated']) {
@OnEvent('doc.snapshot.updated')
async indexWorkspace({ workspaceId, docId }: Events['doc.snapshot.updated']) {
if (!this.config.indexer.enabled) {
return;
}
if (workspaceId !== docId) {
return;
}
await this.queue.add(
'indexer.indexWorkspace',
{
workspaceId: id,
},
{
jobId: `indexWorkspace/${id}`,
priority: 100,
}
{ workspaceId },
{ jobId: `indexWorkspace/${workspaceId}`, priority: 100 }
);
}
+52 -4
View File
@@ -73,8 +73,8 @@ update_app_stream_version() {
update_ios_marketing_version() {
local file_path=$1
# Remove everything after the "-"
local new_version=$(echo "$2" | sed -E 's/-.*$//')
# Normalize inputs like "v0.26.4-beta.1" to "0.26.4"
local new_version=$(echo "$2" | sed -E 's/^v//; s/-.*$//')
# Check if file exists
if [ ! -f "$file_path" ]; then
@@ -98,8 +98,56 @@ update_ios_marketing_version() {
rm "$file_path".bak
}
# Derive a date-based iOS MARKETING_VERSION from the latest stable/beta tag.
# Apple requires CFBundleShortVersionString to increase monotonically. Using
# date-based versions (YYYY.M.D) derived from the last stable/beta release tag
# ensures this. The user-facing App Store version is set separately in
# App Store Connect.
get_ios_version_from_git() {
# Find the most recent stable/beta tag reachable from HEAD (exclude canary/nightly)
local latest_tag
latest_tag=$(git describe --tags --match 'v[0-9]*' \
--exclude '*canary*' --exclude '*nightly*' \
--abbrev=0 HEAD 2>/dev/null)
if [ -z "$latest_tag" ]; then
# No stable/beta tag found, fall back to today's date
date +"%Y.%-m.%-d"
return
fi
# Get the tag creation date (tagger date for annotated tags, commit date for lightweight)
local tag_date
tag_date=$(git for-each-ref --format='%(creatordate:short)' "refs/tags/$latest_tag")
if [ -z "$tag_date" ]; then
date +"%Y.%-m.%-d"
return
fi
# Format as YYYY.M.D (no leading zeros for month/day)
local year month day
year=$(echo "$tag_date" | cut -d'-' -f1)
month=$((10#$(echo "$tag_date" | cut -d'-' -f2)))
day=$((10#$(echo "$tag_date" | cut -d'-' -f3)))
echo "${year}.${month}.${day}"
}
new_version=$1
ios_new_version=${IOS_APP_VERSION:-$new_version}
if [ -n "$IOS_APP_VERSION" ]; then
# Manual override via environment variable
ios_new_version=$IOS_APP_VERSION
elif echo "$new_version" | grep -qE '(canary|nightly)'; then
# Canary/nightly: use the date of the last stable/beta tag
ios_new_version=$(get_ios_version_from_git)
else
# Stable/beta release: use today's date
ios_new_version=$(date +"%Y.%-m.%-d")
fi
echo "iOS MARKETING_VERSION: $ios_new_version (app version: $new_version)"
update_app_version_in_helm_charts ".github/helm/affine/Chart.yaml" "$new_version"
update_app_version_in_helm_charts ".github/helm/affine/charts/graphql/Chart.yaml" "$new_version"
@@ -108,4 +156,4 @@ update_app_version_in_helm_charts ".github/helm/affine/charts/doc/Chart.yaml" "$
update_app_stream_version "packages/frontend/apps/electron/resources/affine.metainfo.xml" "$new_version"
update_ios_marketing_version "packages/frontend/apps/ios/App/App.xcodeproj/project.pbxproj" "$new_version"
update_ios_marketing_version "packages/frontend/apps/ios/App/App.xcodeproj/project.pbxproj" "$ios_new_version"