mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-07-02 02:00:49 +08:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 43704d60fb | |||
| 46e7e35357 | |||
| b98ab495bb | |||
| 99b07c2ee1 | |||
| e1e0ac2345 | |||
| bdccf4e9fd | |||
| 11cf1928b5 | |||
| 5215c73166 |
@@ -62,6 +62,18 @@
|
||||
"concurrency": 10
|
||||
}
|
||||
},
|
||||
"queues.calendar": {
|
||||
"type": "object",
|
||||
"description": "The config for calendar job queue\n@default {\"concurrency\":4}",
|
||||
"properties": {
|
||||
"concurrency": {
|
||||
"type": "number"
|
||||
}
|
||||
},
|
||||
"default": {
|
||||
"concurrency": 4
|
||||
}
|
||||
},
|
||||
"queues.doc": {
|
||||
"type": "object",
|
||||
"description": "The config for doc job queue\n@default {\"concurrency\":1}",
|
||||
@@ -843,7 +855,7 @@
|
||||
"properties": {
|
||||
"google": {
|
||||
"type": "object",
|
||||
"description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\"}\n@link https://developers.google.com/calendar/api/guides/push",
|
||||
"description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\",\"requestTimeoutMs\":10000}\n@link https://developers.google.com/calendar/api/guides/push",
|
||||
"properties": {
|
||||
"enabled": {
|
||||
"type": "boolean"
|
||||
@@ -859,6 +871,9 @@
|
||||
},
|
||||
"webhookVerificationToken": {
|
||||
"type": "string"
|
||||
},
|
||||
"requestTimeoutMs": {
|
||||
"type": "number"
|
||||
}
|
||||
},
|
||||
"default": {
|
||||
@@ -866,7 +881,8 @@
|
||||
"clientId": "",
|
||||
"clientSecret": "",
|
||||
"externalWebhookUrl": "",
|
||||
"webhookVerificationToken": ""
|
||||
"webhookVerificationToken": "",
|
||||
"requestTimeoutMs": 10000
|
||||
}
|
||||
},
|
||||
"caldav": {
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
"@blocksuite/sync": "workspace:*",
|
||||
"@floating-ui/dom": "^1.6.13",
|
||||
"@lit/context": "^1.1.2",
|
||||
"@lottiefiles/dotlottie-wc": "^0.5.0",
|
||||
"@lottiefiles/dotlottie-wc": "^0.9.4",
|
||||
"@preact/signals-core": "^1.8.0",
|
||||
"@toeverything/theme": "^1.1.23",
|
||||
"@types/hast": "^3.0.4",
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
"@blocksuite/icons": "^2.2.17",
|
||||
"@floating-ui/dom": "^1.6.13",
|
||||
"@lit/context": "^1.1.3",
|
||||
"@lottiefiles/dotlottie-wc": "^0.5.0",
|
||||
"@lottiefiles/dotlottie-wc": "^0.9.4",
|
||||
"@preact/signals-core": "^1.8.0",
|
||||
"@toeverything/theme": "^1.1.23",
|
||||
"@vanilla-extract/css": "^1.17.0",
|
||||
|
||||
+27
@@ -0,0 +1,27 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE
|
||||
"calendar_subscriptions"
|
||||
ADD
|
||||
COLUMN "next_sync_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
ADD
|
||||
COLUMN "sync_retry_count" INTEGER NOT NULL DEFAULT 0;
|
||||
|
||||
UPDATE
|
||||
"calendar_subscriptions" AS s
|
||||
SET
|
||||
"next_sync_at" = CASE
|
||||
WHEN s."last_sync_at" IS NULL THEN CURRENT_TIMESTAMP
|
||||
ELSE s."last_sync_at" + make_interval(
|
||||
mins => COALESCE(a."refresh_interval_minutes", 30)
|
||||
)
|
||||
END
|
||||
FROM
|
||||
"calendar_accounts" AS a
|
||||
WHERE
|
||||
a."id" = s."account_id";
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "calendar_subscriptions_custom_channel_id_idx" ON "calendar_subscriptions"("custom_channel_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "calendar_subscriptions_enabled_next_sync_at_idx" ON "calendar_subscriptions"("enabled", "next_sync_at");
|
||||
@@ -110,7 +110,7 @@
|
||||
"react-dom": "19.2.1",
|
||||
"reflect-metadata": "^0.2.2",
|
||||
"rxjs": "^7.8.2",
|
||||
"semver": "^7.7.3",
|
||||
"semver": "^7.7.4",
|
||||
"ses": "^1.14.0",
|
||||
"socket.io": "^4.8.1",
|
||||
"stripe": "^17.7.0",
|
||||
@@ -126,7 +126,6 @@
|
||||
"@faker-js/faker": "^10.1.0",
|
||||
"@nestjs/swagger": "^11.2.0",
|
||||
"@nestjs/testing": "patch:@nestjs/testing@npm%3A10.4.15#~/.yarn/patches/@nestjs-testing-npm-10.4.15-d591a1705a.patch",
|
||||
"@react-email/preview-server": "^4.3.2",
|
||||
"@types/cookie-parser": "^1.4.8",
|
||||
"@types/express": "^5.0.1",
|
||||
"@types/express-serve-static-core": "^5.0.6",
|
||||
@@ -139,8 +138,7 @@
|
||||
"@types/nodemailer": "^7.0.0",
|
||||
"@types/on-headers": "^1.0.3",
|
||||
"@types/react": "^19.0.1",
|
||||
"@types/react-dom": "^19.0.2",
|
||||
"@types/semver": "^7.5.8",
|
||||
"@types/semver": "^7.7.1",
|
||||
"@types/sinon": "^21.0.0",
|
||||
"@types/supertest": "^6.0.2",
|
||||
"ava": "^6.4.0",
|
||||
|
||||
@@ -1037,6 +1037,8 @@ model CalendarSubscription {
|
||||
enabled Boolean @default(true)
|
||||
syncToken String? @map("sync_token") @db.Text
|
||||
lastSyncAt DateTime? @map("last_sync_at") @db.Timestamptz(3)
|
||||
nextSyncAt DateTime @default(now()) @map("next_sync_at") @db.Timestamptz(3)
|
||||
syncRetryCount Int @default(0) @map("sync_retry_count")
|
||||
customChannelId String? @map("custom_channel_id") @db.VarChar
|
||||
customResourceId String? @map("custom_resource_id") @db.VarChar
|
||||
channelExpiration DateTime? @map("channel_expiration") @db.Timestamptz(3)
|
||||
@@ -1050,6 +1052,8 @@ model CalendarSubscription {
|
||||
@@unique([accountId, externalCalendarId])
|
||||
@@index([accountId])
|
||||
@@index([provider, externalCalendarId])
|
||||
@@index([customChannelId])
|
||||
@@index([enabled, nextSyncAt])
|
||||
@@map("calendar_subscriptions")
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ export class MockEventBus {
|
||||
|
||||
emit = this.stub.emitAsync;
|
||||
emitAsync = this.stub.emitAsync;
|
||||
emitDetached = this.stub.emitAsync;
|
||||
broadcast = this.stub.broadcast;
|
||||
|
||||
last<Event extends EventName>(
|
||||
|
||||
@@ -6,6 +6,7 @@ import { AppModule } from '../app.module';
|
||||
import {
|
||||
CANARY_CLIENT_VERSION_MAX_AGE_DAYS,
|
||||
ConfigFactory,
|
||||
hasNewerVersion,
|
||||
UseNamedGuard,
|
||||
} from '../base';
|
||||
import { Public } from '../core/auth/guard';
|
||||
@@ -249,3 +250,11 @@ test('should reject old canary date version in canary namespace', async t => {
|
||||
env.NAMESPACE = prevNamespace;
|
||||
}
|
||||
});
|
||||
|
||||
test('should compare release versions for available upgrades', t => {
|
||||
t.false(hasNewerVersion('0.26.5', '0.26.4'));
|
||||
t.false(hasNewerVersion('0.26.5', '0.26.5'));
|
||||
t.true(hasNewerVersion('0.26.5', '0.26.6'));
|
||||
t.true(hasNewerVersion('0.26.5', '0.26.6-beta.1'));
|
||||
t.false(hasNewerVersion('0.26.6-beta.2', '0.26.6-beta.1'));
|
||||
});
|
||||
|
||||
@@ -88,12 +88,21 @@ export class EventBus
|
||||
emit<T extends EventName>(event: T, payload: Events[T]) {
|
||||
this.logger.debug(`Dispatch event: ${event}`);
|
||||
|
||||
// NOTE(@forehalo):
|
||||
// Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick.
|
||||
// In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting.
|
||||
// So we catch it here with `emitAsync`
|
||||
this.emitter.emitAsync(event, payload).catch(e => {
|
||||
this.emitter.emit('error', { event, payload, error: e });
|
||||
this.dispatchAsync(event, payload);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit event in detached cls context to avoid inheriting current transaction.
|
||||
*/
|
||||
emitDetached<T extends EventName>(event: T, payload: Events[T]) {
|
||||
this.logger.debug(`Dispatch event: ${event} (detached)`);
|
||||
|
||||
const requestId = this.cls.getId();
|
||||
this.cls.run({ ifNested: 'override' }, () => {
|
||||
this.cls.set(CLS_ID, requestId ?? genRequestId('event'));
|
||||
this.dispatchAsync(event, payload);
|
||||
});
|
||||
|
||||
return true;
|
||||
@@ -166,6 +175,16 @@ export class EventBus
|
||||
return this.emitter.waitFor(name, timeout);
|
||||
}
|
||||
|
||||
private dispatchAsync<T extends EventName>(event: T, payload: Events[T]) {
|
||||
// NOTE:
|
||||
// Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick.
|
||||
// In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting.
|
||||
// So we catch it here with `emitAsync`
|
||||
this.emitter.emitAsync(event, payload).catch(e => {
|
||||
this.emitter.emit('error', { event, payload, error: e });
|
||||
});
|
||||
}
|
||||
|
||||
private readonly bindEventHandlers = once(() => {
|
||||
this.scanner.scan().forEach(({ event, handler, opts }) => {
|
||||
this.on(event, handler, opts);
|
||||
|
||||
@@ -55,6 +55,14 @@ defineModuleConfig('job', {
|
||||
schema,
|
||||
},
|
||||
|
||||
'queues.calendar': {
|
||||
desc: 'The config for calendar job queue',
|
||||
default: {
|
||||
concurrency: 4,
|
||||
},
|
||||
schema,
|
||||
},
|
||||
|
||||
'queues.doc': {
|
||||
desc: 'The config for doc job queue',
|
||||
default: {
|
||||
|
||||
@@ -28,6 +28,7 @@ export enum Queue {
|
||||
DOC = 'doc',
|
||||
COPILOT = 'copilot',
|
||||
INDEXER = 'indexer',
|
||||
CALENDAR = 'calendar',
|
||||
}
|
||||
|
||||
export const QUEUES = Object.values(Queue);
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import semver from 'semver';
|
||||
|
||||
const DAY_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
// Example: 2026.2.6-canary.015
|
||||
@@ -89,3 +91,26 @@ export function checkCanaryDateClientVersion(
|
||||
normalized: parsed.normalized,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeComparableVersion(version: string): string | null {
|
||||
const canary = parseCanaryDateClientVersion(version);
|
||||
return semver.valid(canary?.normalized ?? version.trim(), {
|
||||
loose: true,
|
||||
});
|
||||
}
|
||||
|
||||
export function hasNewerVersion(
|
||||
currentVersion: string,
|
||||
nextVersion: string
|
||||
): boolean {
|
||||
const current = normalizeComparableVersion(currentVersion);
|
||||
const next = normalizeComparableVersion(nextVersion);
|
||||
|
||||
if (!current || !next) {
|
||||
return currentVersion.trim() !== nextVersion.trim();
|
||||
}
|
||||
|
||||
return semver.gt(next, current, {
|
||||
loose: true,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import {
|
||||
} from '@nestjs/graphql';
|
||||
import { GraphQLJSON, GraphQLJSONObject } from 'graphql-scalars';
|
||||
|
||||
import { Config, URLHelper } from '../../base';
|
||||
import { Config, hasNewerVersion, URLHelper } from '../../base';
|
||||
import { Namespace } from '../../env';
|
||||
import { Feature, type WorkspaceFeatureName } from '../../models';
|
||||
import { CurrentUser, Public } from '../auth';
|
||||
@@ -143,7 +143,7 @@ export class ServerConfigResolver {
|
||||
}>;
|
||||
|
||||
const latest = releases.at(0);
|
||||
if (!latest || latest.name === env.version) {
|
||||
if (!latest || !hasNewerVersion(env.version, latest.name)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -68,7 +68,7 @@ test('should update doc content to database when doc is updated', async t => {
|
||||
|
||||
const docId = randomUUID();
|
||||
await adapter.pushDocUpdates(workspace.id, docId, updates);
|
||||
await adapter.getDoc(workspace.id, docId);
|
||||
await adapter.getDocBinNative(workspace.id, docId);
|
||||
|
||||
mock.method(docReader, 'parseDocContent', () => {
|
||||
return {
|
||||
@@ -181,3 +181,22 @@ test('should ignore update workspace content to database when parse workspace co
|
||||
t.is(content!.name, null);
|
||||
t.is(content!.avatarKey, null);
|
||||
});
|
||||
|
||||
test('should ignore stale workspace when updating doc meta from snapshot event', async t => {
|
||||
const { docReader, listener, models } = t.context;
|
||||
const docId = randomUUID();
|
||||
mock.method(docReader, 'parseDocContent', () => ({
|
||||
title: 'test title',
|
||||
summary: 'test summary',
|
||||
}));
|
||||
|
||||
await models.workspace.delete(workspace.id);
|
||||
|
||||
await t.notThrowsAsync(async () => {
|
||||
await listener.markDocContentCacheStale({
|
||||
workspaceId: workspace.id,
|
||||
docId,
|
||||
blob: Buffer.from([0x01]),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -110,7 +110,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
|
||||
});
|
||||
|
||||
if (isNewDoc) {
|
||||
this.event.emit('doc.created', {
|
||||
this.event.emitDetached('doc.created', {
|
||||
workspaceId,
|
||||
docId,
|
||||
editor: editorId,
|
||||
@@ -334,7 +334,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
|
||||
});
|
||||
|
||||
if (updatedSnapshot) {
|
||||
this.event.emit('doc.snapshot.updated', {
|
||||
this.event.emitDetached('doc.snapshot.updated', {
|
||||
workspaceId: snapshot.spaceId,
|
||||
docId: snapshot.docId,
|
||||
blob,
|
||||
|
||||
@@ -1,12 +1,29 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Prisma } from '@prisma/client';
|
||||
|
||||
import { OnEvent } from '../../base';
|
||||
import { Models } from '../../models';
|
||||
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
|
||||
import { DocReader } from './reader';
|
||||
|
||||
const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']);
|
||||
|
||||
function isIgnorableDocEventError(error: unknown) {
|
||||
if (error instanceof Prisma.PrismaClientKnownRequestError) {
|
||||
return IGNORED_PRISMA_CODES.has(error.code);
|
||||
}
|
||||
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
|
||||
return /transaction is aborted|transaction already closed/i.test(
|
||||
error.message
|
||||
);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class DocEventsListener {
|
||||
private readonly logger = new Logger(DocEventsListener.name);
|
||||
|
||||
constructor(
|
||||
private readonly docReader: DocReader,
|
||||
private readonly models: Models,
|
||||
@@ -20,21 +37,39 @@ export class DocEventsListener {
|
||||
blob,
|
||||
}: Events['doc.snapshot.updated']) {
|
||||
await this.docReader.markDocContentCacheStale(workspaceId, docId);
|
||||
const workspace = await this.models.workspace.get(workspaceId);
|
||||
if (!workspace) {
|
||||
this.logger.warn(
|
||||
`Skip stale doc snapshot event for missing workspace ${workspaceId}/${docId}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
const isDoc = workspaceId !== docId;
|
||||
// update doc content to database
|
||||
if (isDoc) {
|
||||
const content = this.docReader.parseDocContent(blob);
|
||||
if (!content) {
|
||||
try {
|
||||
if (isDoc) {
|
||||
const content = this.docReader.parseDocContent(blob);
|
||||
if (!content) {
|
||||
return;
|
||||
}
|
||||
await this.models.doc.upsertMeta(workspaceId, docId, content);
|
||||
} else {
|
||||
// update workspace content to database
|
||||
const content = this.docReader.parseWorkspaceContent(blob);
|
||||
if (!content) {
|
||||
return;
|
||||
}
|
||||
await this.models.workspace.update(workspaceId, content);
|
||||
}
|
||||
} catch (error) {
|
||||
if (isIgnorableDocEventError(error)) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(
|
||||
`Ignore stale doc snapshot event for ${workspaceId}/${docId}: ${message}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
await this.models.doc.upsertMeta(workspaceId, docId, content);
|
||||
} else {
|
||||
// update workspace content to database
|
||||
const content = this.docReader.parseWorkspaceContent(blob);
|
||||
if (!content) {
|
||||
return;
|
||||
}
|
||||
await this.models.workspace.update(workspaceId, content);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
|
||||
import ava, { TestFn } from 'ava';
|
||||
|
||||
import {
|
||||
createTestingModule,
|
||||
type TestingModule,
|
||||
} from '../../../__tests__/utils';
|
||||
import { DocRole, Models, User, Workspace } from '../../../models';
|
||||
import { EventsListener } from '../event';
|
||||
import { PermissionModule } from '../index';
|
||||
|
||||
interface Context {
|
||||
module: TestingModule;
|
||||
models: Models;
|
||||
listener: EventsListener;
|
||||
}
|
||||
|
||||
const test = ava as TestFn<Context>;
|
||||
|
||||
let owner: User;
|
||||
let workspace: Workspace;
|
||||
|
||||
test.before(async t => {
|
||||
const module = await createTestingModule({ imports: [PermissionModule] });
|
||||
t.context.module = module;
|
||||
t.context.models = module.get(Models);
|
||||
t.context.listener = module.get(EventsListener);
|
||||
});
|
||||
|
||||
test.beforeEach(async t => {
|
||||
await t.context.module.initTestingDB();
|
||||
owner = await t.context.models.user.create({
|
||||
email: `${randomUUID()}@affine.pro`,
|
||||
});
|
||||
workspace = await t.context.models.workspace.create(owner.id);
|
||||
});
|
||||
|
||||
test.after.always(async t => {
|
||||
await t.context.module.close();
|
||||
});
|
||||
|
||||
test('should ignore default owner event when workspace does not exist', async t => {
|
||||
await t.notThrowsAsync(async () => {
|
||||
await t.context.listener.setDefaultPageOwner({
|
||||
workspaceId: randomUUID(),
|
||||
docId: randomUUID(),
|
||||
editor: owner.id,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
test('should ignore default owner event when editor does not exist', async t => {
|
||||
await t.notThrowsAsync(async () => {
|
||||
await t.context.listener.setDefaultPageOwner({
|
||||
workspaceId: workspace.id,
|
||||
docId: randomUUID(),
|
||||
editor: randomUUID(),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
test('should set owner when workspace and editor exist', async t => {
|
||||
const docId = randomUUID();
|
||||
await t.context.listener.setDefaultPageOwner({
|
||||
workspaceId: workspace.id,
|
||||
docId,
|
||||
editor: owner.id,
|
||||
});
|
||||
|
||||
const role = await t.context.models.docUser.get(
|
||||
workspace.id,
|
||||
docId,
|
||||
owner.id
|
||||
);
|
||||
t.is(role?.type, DocRole.Owner);
|
||||
});
|
||||
@@ -1,10 +1,27 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Prisma } from '@prisma/client';
|
||||
|
||||
import { OnEvent } from '../../base';
|
||||
import { Models } from '../../models';
|
||||
|
||||
const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']);
|
||||
|
||||
function isIgnorablePermissionEventError(error: unknown) {
|
||||
if (error instanceof Prisma.PrismaClientKnownRequestError) {
|
||||
return IGNORED_PRISMA_CODES.has(error.code);
|
||||
}
|
||||
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
|
||||
return /transaction is aborted|transaction already closed/i.test(
|
||||
error.message
|
||||
);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class EventsListener {
|
||||
private readonly logger = new Logger(EventsListener.name);
|
||||
|
||||
constructor(private readonly models: Models) {}
|
||||
|
||||
@OnEvent('doc.created')
|
||||
@@ -15,6 +32,33 @@ export class EventsListener {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.models.docUser.setOwner(workspaceId, docId, editor);
|
||||
const workspace = await this.models.workspace.get(workspaceId);
|
||||
if (!workspace) {
|
||||
this.logger.warn(
|
||||
`Skip default doc owner event for missing workspace ${workspaceId}/${docId}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const user = await this.models.user.get(editor);
|
||||
if (!user) {
|
||||
this.logger.warn(
|
||||
`Skip default doc owner event for missing editor ${workspaceId}/${docId}/${editor}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.models.docUser.setOwner(workspaceId, docId, editor);
|
||||
} catch (error) {
|
||||
if (isIgnorablePermissionEventError(error)) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(
|
||||
`Ignore stale doc owner event for ${workspaceId}/${docId}/${editor}: ${message}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ export class CalendarAccountModel extends BaseModel {
|
||||
scope: input.scope ?? null,
|
||||
status: input.status ?? 'active',
|
||||
lastError: input.lastError ?? null,
|
||||
refreshIntervalMinutes: input.refreshIntervalMinutes ?? 60,
|
||||
refreshIntervalMinutes: input.refreshIntervalMinutes ?? 30,
|
||||
};
|
||||
|
||||
const updateData: Prisma.CalendarAccountUncheckedUpdateInput = {
|
||||
|
||||
@@ -17,6 +17,8 @@ export interface UpsertCalendarSubscriptionInput {
|
||||
export interface UpdateCalendarSubscriptionSyncInput {
|
||||
syncToken?: string | null;
|
||||
lastSyncAt?: Date | null;
|
||||
nextSyncAt?: Date;
|
||||
syncRetryCount?: number;
|
||||
}
|
||||
|
||||
export interface UpdateCalendarSubscriptionChannelInput {
|
||||
@@ -81,13 +83,21 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
}
|
||||
|
||||
async updateSync(id: string, input: UpdateCalendarSubscriptionSyncInput) {
|
||||
return await this.db.calendarSubscription.update({
|
||||
where: { id },
|
||||
data: {
|
||||
syncToken: input.syncToken ?? null,
|
||||
lastSyncAt: input.lastSyncAt ?? null,
|
||||
},
|
||||
});
|
||||
const data: Prisma.CalendarSubscriptionUncheckedUpdateInput = {};
|
||||
if (input.syncToken !== undefined) {
|
||||
data.syncToken = input.syncToken ?? null;
|
||||
}
|
||||
if (input.lastSyncAt !== undefined) {
|
||||
data.lastSyncAt = input.lastSyncAt ?? null;
|
||||
}
|
||||
if (input.nextSyncAt !== undefined) {
|
||||
data.nextSyncAt = input.nextSyncAt;
|
||||
}
|
||||
if (input.syncRetryCount !== undefined) {
|
||||
data.syncRetryCount = input.syncRetryCount;
|
||||
}
|
||||
|
||||
return await this.db.calendarSubscription.update({ where: { id }, data });
|
||||
}
|
||||
|
||||
async updateChannel(
|
||||
@@ -155,10 +165,16 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
});
|
||||
}
|
||||
|
||||
async listAllWithAccountForSync() {
|
||||
async listDueForSync(now: Date, limit: number) {
|
||||
return await this.db.calendarSubscription.findMany({
|
||||
where: { enabled: true },
|
||||
include: { account: true },
|
||||
where: {
|
||||
enabled: true,
|
||||
nextSyncAt: { lte: now },
|
||||
account: { status: 'active' },
|
||||
},
|
||||
select: { id: true },
|
||||
orderBy: { nextSyncAt: 'asc' },
|
||||
take: limit,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -169,13 +185,6 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
});
|
||||
}
|
||||
|
||||
async updateLastSyncAt(id: string, lastSyncAt: Date) {
|
||||
return await this.db.calendarSubscription.update({
|
||||
where: { id },
|
||||
data: { lastSyncAt },
|
||||
});
|
||||
}
|
||||
|
||||
async clearSyncTokensByAccount(accountId: string) {
|
||||
return await this.db.calendarSubscription.updateMany({
|
||||
where: { accountId },
|
||||
@@ -200,6 +209,7 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
data: {
|
||||
enabled: false,
|
||||
syncToken: null,
|
||||
syncRetryCount: 0,
|
||||
customChannelId: null,
|
||||
customResourceId: null,
|
||||
channelExpiration: null,
|
||||
|
||||
@@ -2,6 +2,7 @@ import assert from 'node:assert';
|
||||
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Transactional } from '@nestjs-cls/transactional';
|
||||
import type { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
|
||||
import { WorkspaceDocUserRole } from '@prisma/client';
|
||||
|
||||
import { CanNotBatchGrantDocOwnerPermissions, PaginationInput } from '../base';
|
||||
@@ -14,31 +15,20 @@ export class DocUserModel extends BaseModel {
|
||||
* Set or update the [Owner] of a doc.
|
||||
* The old [Owner] will be changed to [Manager] if there is already an [Owner].
|
||||
*/
|
||||
@Transactional()
|
||||
@Transactional<TransactionalAdapterPrisma>({ timeout: 15000 })
|
||||
async setOwner(workspaceId: string, docId: string, userId: string) {
|
||||
const oldOwner = await this.db.workspaceDocUserRole.findFirst({
|
||||
await this.db.workspaceDocUserRole.updateMany({
|
||||
where: {
|
||||
workspaceId,
|
||||
docId,
|
||||
type: DocRole.Owner,
|
||||
userId: { not: userId },
|
||||
},
|
||||
data: {
|
||||
type: DocRole.Manager,
|
||||
},
|
||||
});
|
||||
|
||||
if (oldOwner) {
|
||||
await this.db.workspaceDocUserRole.update({
|
||||
where: {
|
||||
workspaceId_docId_userId: {
|
||||
workspaceId,
|
||||
docId,
|
||||
userId: oldOwner.userId,
|
||||
},
|
||||
},
|
||||
data: {
|
||||
type: DocRole.Manager,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
await this.db.workspaceDocUserRole.upsert({
|
||||
where: {
|
||||
workspaceId_docId_userId: {
|
||||
@@ -57,16 +47,9 @@ export class DocUserModel extends BaseModel {
|
||||
type: DocRole.Owner,
|
||||
},
|
||||
});
|
||||
|
||||
if (oldOwner) {
|
||||
this.logger.log(
|
||||
`Transfer doc owner of [${workspaceId}/${docId}] from [${oldOwner.userId}] to [${userId}]`
|
||||
);
|
||||
} else {
|
||||
this.logger.log(
|
||||
`Set doc owner of [${workspaceId}/${docId}] to [${userId}]`
|
||||
);
|
||||
}
|
||||
this.logger.log(
|
||||
`Set doc owner of [${workspaceId}/${docId}] to [${userId}]`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -14,6 +14,7 @@ import type {
|
||||
} from '../../../models';
|
||||
import { Models } from '../../../models';
|
||||
import { CalendarModule } from '../index';
|
||||
import { CalendarCronJobs } from '../cron';
|
||||
import {
|
||||
CalendarProvider,
|
||||
CalendarProviderFactory,
|
||||
@@ -85,6 +86,7 @@ const module = await createModule({
|
||||
],
|
||||
});
|
||||
const calendarService = module.get(CalendarService);
|
||||
const calendarCronJobs = module.get(CalendarCronJobs);
|
||||
const providerFactory = module.get(CalendarProviderFactory);
|
||||
const models = module.get(Models);
|
||||
module.get(CryptoHelper).onConfigInit();
|
||||
@@ -113,6 +115,8 @@ const createSubscription = async (
|
||||
accountId: string,
|
||||
overrides: Partial<UpsertCalendarSubscriptionInput> & {
|
||||
syncToken?: string | null;
|
||||
nextSyncAt?: Date;
|
||||
syncRetryCount?: number;
|
||||
customChannelId?: string | null;
|
||||
customResourceId?: string | null;
|
||||
channelExpiration?: Date | null;
|
||||
@@ -134,6 +138,20 @@ const createSubscription = async (
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
overrides.nextSyncAt !== undefined ||
|
||||
overrides.syncRetryCount !== undefined
|
||||
) {
|
||||
await models.calendarSubscription.updateSync(subscription.id, {
|
||||
...(overrides.nextSyncAt !== undefined
|
||||
? { nextSyncAt: overrides.nextSyncAt }
|
||||
: {}),
|
||||
...(overrides.syncRetryCount !== undefined
|
||||
? { syncRetryCount: overrides.syncRetryCount }
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
overrides.customChannelId !== undefined ||
|
||||
overrides.customResourceId !== undefined ||
|
||||
@@ -151,6 +169,8 @@ const createSubscription = async (
|
||||
|
||||
test.afterEach.always(() => {
|
||||
mock.reset();
|
||||
module.queue.add.resetHistory();
|
||||
module.queue.remove.resetHistory();
|
||||
});
|
||||
|
||||
test.after.always(async () => {
|
||||
@@ -252,6 +272,9 @@ test('syncSubscription resets invalid sync token and maps events', async t => {
|
||||
const updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.is(updated?.syncToken, 'next-token');
|
||||
t.truthy(updated?.lastSyncAt);
|
||||
t.is(updated?.syncRetryCount, 0);
|
||||
t.truthy(updated?.nextSyncAt);
|
||||
t.true(updated!.nextSyncAt.getTime() > updated!.lastSyncAt!.getTime());
|
||||
|
||||
const events = await models.calendarEvent.listBySubscriptionsInRange(
|
||||
[subscription.id],
|
||||
@@ -493,51 +516,22 @@ test('syncSubscription applies exponential backoff for repeated failures', async
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
let updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 1);
|
||||
t.is(updated?.syncRetryCount, 1);
|
||||
t.is(
|
||||
updated?.nextSyncAt.toISOString(),
|
||||
new Date(now + baseDelayMs).toISOString()
|
||||
);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
});
|
||||
|
||||
test('syncSubscription skips token refresh while in backoff window', async t => {
|
||||
let now = new Date('2026-01-01T00:00:00.000Z').getTime();
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id, {
|
||||
accessToken: 'expired-access-token',
|
||||
expiresAt: new Date(now - 5 * 60 * 1000),
|
||||
});
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
const refreshMock = mock.method(provider, 'refreshTokens', async () => ({
|
||||
accessToken: `refreshed-${randomUUID()}`,
|
||||
}));
|
||||
const listEventsMock = mock.method(provider, 'listEvents', async () => {
|
||||
throw new Error('upstream timeout');
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
const baseDelayMs = 5 * 60 * 1000;
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(refreshMock.mock.callCount(), 1);
|
||||
t.is(listEventsMock.mock.callCount(), 1);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(refreshMock.mock.callCount(), 2);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
t.is(updated?.syncRetryCount, 2);
|
||||
t.is(
|
||||
updated?.nextSyncAt.toISOString(),
|
||||
new Date(now + baseDelayMs * 2).toISOString()
|
||||
);
|
||||
});
|
||||
|
||||
test('syncSubscription renews webhook channel when expiring', async t => {
|
||||
@@ -599,3 +593,73 @@ test('syncSubscription renews webhook channel when expiring', async t => {
|
||||
t.is(updated?.customResourceId, 'new-resource');
|
||||
t.truthy(updated?.channelExpiration);
|
||||
});
|
||||
|
||||
test('syncSubscription keeps schedule moving when webhook renewal fails', async t => {
|
||||
const now = new Date('2026-01-01T00:00:00.000Z').getTime();
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id, {
|
||||
refreshIntervalMinutes: 60,
|
||||
});
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
channelExpiration: new Date(Date.now() + 60 * 60 * 1000),
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
mock.method(provider, 'listEvents', async () => ({
|
||||
events: [],
|
||||
nextSyncToken: 'next-sync',
|
||||
}));
|
||||
mock.method(provider, 'watchCalendar', async () => {
|
||||
throw new Error('watch failed');
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
|
||||
const updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.truthy(updated?.lastSyncAt);
|
||||
t.is(updated?.syncRetryCount, 0);
|
||||
t.is(
|
||||
updated?.nextSyncAt.toISOString(),
|
||||
new Date(now + 15 * 60 * 1000).toISOString()
|
||||
);
|
||||
});
|
||||
|
||||
test('pollAccounts skips when nothing is due', async t => {
|
||||
mock.method(models.calendarSubscription, 'listDueForSync', async () => []);
|
||||
|
||||
await calendarCronJobs.pollAccounts();
|
||||
|
||||
t.is(module.queue.count('calendar.syncSubscription'), 0);
|
||||
});
|
||||
|
||||
test('pollAccounts enqueues due subscriptions only', async t => {
|
||||
mock.method(models.calendarSubscription, 'listDueForSync', async () => [
|
||||
{ id: 'due-subscription-a' },
|
||||
{ id: 'due-subscription-b' },
|
||||
]);
|
||||
|
||||
await calendarCronJobs.pollAccounts();
|
||||
|
||||
t.is(module.queue.count('calendar.syncSubscription'), 2);
|
||||
t.deepEqual(
|
||||
module.queue.add
|
||||
.getCalls()
|
||||
.map(call => [call.args[0], call.args[1], call.args[2]]),
|
||||
[
|
||||
[
|
||||
'calendar.syncSubscription',
|
||||
{ subscriptionId: 'due-subscription-a', reason: 'polling' },
|
||||
{ jobId: 'due-subscription-a' },
|
||||
],
|
||||
[
|
||||
'calendar.syncSubscription',
|
||||
{ subscriptionId: 'due-subscription-b', reason: 'polling' },
|
||||
{ jobId: 'due-subscription-b' },
|
||||
],
|
||||
]
|
||||
);
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@ export interface CalendarGoogleConfig {
|
||||
clientSecret: string;
|
||||
externalWebhookUrl?: string;
|
||||
webhookVerificationToken?: string;
|
||||
requestTimeoutMs?: number;
|
||||
}
|
||||
|
||||
export type CalendarCalDAVAuthType = 'auto' | 'basic' | 'digest';
|
||||
@@ -49,6 +50,7 @@ const schema: JSONSchema = {
|
||||
clientSecret: { type: 'string' },
|
||||
externalWebhookUrl: { type: 'string' },
|
||||
webhookVerificationToken: { type: 'string' },
|
||||
requestTimeoutMs: { type: 'number' },
|
||||
},
|
||||
};
|
||||
|
||||
@@ -88,6 +90,7 @@ defineModuleConfig('calendar', {
|
||||
clientSecret: '',
|
||||
externalWebhookUrl: '',
|
||||
webhookVerificationToken: '',
|
||||
requestTimeoutMs: 10_000,
|
||||
},
|
||||
schema,
|
||||
shape: z.object({
|
||||
@@ -101,6 +104,7 @@ defineModuleConfig('calendar', {
|
||||
.or(z.string().length(0))
|
||||
.optional(),
|
||||
webhookVerificationToken: z.string().optional(),
|
||||
requestTimeoutMs: z.number().int().positive().optional(),
|
||||
}),
|
||||
link: 'https://developers.google.com/calendar/api/guides/push',
|
||||
},
|
||||
|
||||
@@ -1,61 +1,33 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
|
||||
import { JobQueue } from '../../base';
|
||||
import { Models } from '../../models';
|
||||
import { CalendarService } from './service';
|
||||
|
||||
const CALENDAR_POLL_BATCH_SIZE = 200;
|
||||
|
||||
@Injectable()
|
||||
export class CalendarCronJobs {
|
||||
constructor(
|
||||
private readonly models: Models,
|
||||
private readonly calendar: CalendarService
|
||||
private readonly queue: JobQueue
|
||||
) {}
|
||||
|
||||
@Cron(CronExpression.EVERY_MINUTE)
|
||||
async pollAccounts() {
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listAllWithAccountForSync();
|
||||
const subscriptions = await this.models.calendarSubscription.listDueForSync(
|
||||
new Date(),
|
||||
CALENDAR_POLL_BATCH_SIZE
|
||||
);
|
||||
|
||||
const accountDueAt = new Map<
|
||||
string,
|
||||
{ refreshInterval: number; lastSyncAt: Date | null }
|
||||
>();
|
||||
|
||||
for (const subscription of subscriptions) {
|
||||
const interval = subscription.account.refreshIntervalMinutes ?? 60;
|
||||
const lastSyncAt = subscription.lastSyncAt ?? null;
|
||||
const existing = accountDueAt.get(subscription.accountId);
|
||||
if (!existing) {
|
||||
accountDueAt.set(subscription.accountId, {
|
||||
refreshInterval: interval,
|
||||
lastSyncAt,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
const earliest =
|
||||
existing.lastSyncAt && lastSyncAt
|
||||
? existing.lastSyncAt < lastSyncAt
|
||||
? existing.lastSyncAt
|
||||
: lastSyncAt
|
||||
: (existing.lastSyncAt ?? lastSyncAt);
|
||||
accountDueAt.set(subscription.accountId, {
|
||||
refreshInterval: interval,
|
||||
lastSyncAt: earliest,
|
||||
});
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
await Promise.allSettled(
|
||||
Array.from(accountDueAt.entries()).map(([accountId, info]) => {
|
||||
if (
|
||||
!info.lastSyncAt ||
|
||||
now - info.lastSyncAt.getTime() >= info.refreshInterval * 60 * 1000
|
||||
) {
|
||||
return this.calendar.syncAccount(accountId);
|
||||
}
|
||||
return Promise.resolve();
|
||||
})
|
||||
subscriptions.map(({ id }) =>
|
||||
this.queue.add(
|
||||
'calendar.syncSubscription',
|
||||
{ subscriptionId: id, reason: 'polling' },
|
||||
{ jobId: id }
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import { PermissionModule } from '../../core/permission';
|
||||
import { WorkspaceModule } from '../../core/workspaces';
|
||||
import { CalendarController } from './controller';
|
||||
import { CalendarCronJobs } from './cron';
|
||||
import { CalendarJob } from './job';
|
||||
import { CalendarOAuthService } from './oauth';
|
||||
import { CalendarProviderFactory, CalendarProviders } from './providers';
|
||||
import {
|
||||
@@ -25,6 +26,7 @@ import { CalendarService } from './service';
|
||||
...CalendarProviders,
|
||||
CalendarProviderFactory,
|
||||
CalendarService,
|
||||
CalendarJob,
|
||||
CalendarOAuthService,
|
||||
CalendarCronJobs,
|
||||
CalendarServerConfigResolver,
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { OnJob } from '../../base';
|
||||
import { CalendarService } from './service';
|
||||
|
||||
declare global {
|
||||
interface Jobs {
|
||||
'calendar.syncSubscription': {
|
||||
subscriptionId: string;
|
||||
reason?: 'polling' | 'webhook' | 'on-demand';
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class CalendarJob {
|
||||
constructor(private readonly calendar: CalendarService) {}
|
||||
|
||||
@OnJob('calendar.syncSubscription')
|
||||
async syncSubscription({
|
||||
subscriptionId,
|
||||
reason,
|
||||
}: Jobs['calendar.syncSubscription']) {
|
||||
await this.calendar.syncSubscription(subscriptionId, { reason });
|
||||
}
|
||||
}
|
||||
@@ -152,10 +152,28 @@ export abstract class CalendarProvider {
|
||||
}
|
||||
}
|
||||
|
||||
protected get requestTimeoutMs() {
|
||||
const timeout = (this.config as { requestTimeoutMs?: number } | undefined)
|
||||
?.requestTimeoutMs;
|
||||
return typeof timeout === 'number' && timeout > 0 ? timeout : undefined;
|
||||
}
|
||||
|
||||
protected withTimeout(signal?: AbortSignal | null) {
|
||||
const timeoutMs = this.requestTimeoutMs;
|
||||
if (!timeoutMs) return signal;
|
||||
|
||||
const timeoutSignal = AbortSignal.timeout(timeoutMs);
|
||||
if (!signal) return timeoutSignal;
|
||||
|
||||
return AbortSignal.any([signal, timeoutSignal]);
|
||||
}
|
||||
|
||||
protected async fetchJson<T>(url: string, init?: RequestInit) {
|
||||
const response = await fetch(url, {
|
||||
headers: { Accept: 'application/json', ...init?.headers },
|
||||
...init,
|
||||
signal: this.withTimeout(init?.signal),
|
||||
headers: { ...init?.headers, Accept: 'application/json' },
|
||||
});
|
||||
const body = await response.text();
|
||||
if (!response.ok) {
|
||||
|
||||
@@ -329,6 +329,7 @@ export class GoogleCalendarProvider extends CalendarProvider {
|
||||
|
||||
private async fetchWithTokenHandling<T>(url: string, accessToken: string) {
|
||||
const response = await fetch(url, {
|
||||
signal: this.withTimeout(),
|
||||
headers: {
|
||||
Accept: 'application/json',
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
|
||||
@@ -8,11 +8,11 @@ import { addDays, subDays } from 'date-fns';
|
||||
import {
|
||||
CalendarProviderRequestError,
|
||||
Config,
|
||||
exponentialBackoffDelay,
|
||||
GraphqlBadRequest,
|
||||
Mutex,
|
||||
JobQueue,
|
||||
URLHelper,
|
||||
} from '../../base';
|
||||
import { SessionRedis } from '../../base/redis';
|
||||
import { Models } from '../../models';
|
||||
import type { CalendarCalDAVProviderPreset } from './config';
|
||||
import {
|
||||
@@ -28,10 +28,10 @@ import type { LinkCalDAVAccountInput } from './types';
|
||||
const TOKEN_REFRESH_SKEW_MS = 60 * 1000;
|
||||
const DEFAULT_PAST_DAYS = 90;
|
||||
const DEFAULT_FUTURE_DAYS = 180;
|
||||
const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:';
|
||||
const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60;
|
||||
const DEFAULT_REFRESH_INTERVAL_MINUTES = 30;
|
||||
const CHANNEL_RENEW_RETRY_MS = 15 * 60 * 1000;
|
||||
|
||||
@Injectable()
|
||||
export class CalendarService {
|
||||
@@ -41,8 +41,7 @@ export class CalendarService {
|
||||
constructor(
|
||||
private readonly models: Models,
|
||||
private readonly providerFactory: CalendarProviderFactory<CalendarProvider>,
|
||||
private readonly mutex: Mutex,
|
||||
private readonly redis: SessionRedis,
|
||||
private readonly queue: JobQueue,
|
||||
private readonly config: Config,
|
||||
private readonly url: URLHelper
|
||||
) {}
|
||||
@@ -85,10 +84,24 @@ export class CalendarService {
|
||||
return null;
|
||||
}
|
||||
|
||||
return await this.models.calendarAccount.updateRefreshInterval(
|
||||
accountId,
|
||||
refreshIntervalMinutes
|
||||
const updatedAccount =
|
||||
await this.models.calendarAccount.updateRefreshInterval(
|
||||
accountId,
|
||||
refreshIntervalMinutes
|
||||
);
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listByAccountForSync(accountId);
|
||||
await Promise.all(
|
||||
subscriptions.map(subscription =>
|
||||
this.models.calendarSubscription.updateSync(subscription.id, {
|
||||
nextSyncAt: this.calculateNextSyncAt(
|
||||
subscription.lastSyncAt ?? this.now(),
|
||||
refreshIntervalMinutes
|
||||
),
|
||||
})
|
||||
)
|
||||
);
|
||||
return updatedAccount;
|
||||
}
|
||||
|
||||
async unlinkAccount(userId: string, accountId: string) {
|
||||
@@ -313,25 +326,6 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const backoff = await this.getSyncFailureBackoff(subscription.id);
|
||||
if (backoff && now < backoff.nextRetryAt.getTime()) {
|
||||
return;
|
||||
}
|
||||
|
||||
await using lock = await this.mutex.acquire(
|
||||
`calendar:subscription:${subscriptionId}`
|
||||
);
|
||||
if (!lock) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lockedNow = Date.now();
|
||||
const lockedBackoff = await this.getSyncFailureBackoff(subscription.id);
|
||||
if (lockedBackoff && lockedNow < lockedBackoff.nextRetryAt.getTime()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const provider = this.providerFactory.get(
|
||||
account.provider as CalendarProviderName
|
||||
);
|
||||
@@ -415,29 +409,28 @@ export class CalendarService {
|
||||
}
|
||||
|
||||
if (synced) {
|
||||
await this.clearSyncFailureBackoff(subscription.id);
|
||||
await this.ensureWebhookChannel(subscription, provider, accessToken);
|
||||
const syncedAt = this.now();
|
||||
let nextSyncAt = this.calculateNextSyncAt(
|
||||
syncedAt,
|
||||
account.refreshIntervalMinutes
|
||||
);
|
||||
|
||||
try {
|
||||
await this.ensureWebhookChannel(subscription, provider, accessToken);
|
||||
} catch (error) {
|
||||
nextSyncAt = this.calculateChannelRetryAt(nextSyncAt);
|
||||
this.logger.warn(
|
||||
`Failed to ensure webhook channel for subscription ${subscription.id}`,
|
||||
this.toError(error)
|
||||
);
|
||||
}
|
||||
|
||||
await this.models.calendarSubscription.updateSync(subscription.id, {
|
||||
lastSyncAt: syncedAt,
|
||||
nextSyncAt,
|
||||
syncRetryCount: 0,
|
||||
});
|
||||
}
|
||||
|
||||
await this.models.calendarSubscription.updateLastSyncAt(
|
||||
subscription.id,
|
||||
new Date()
|
||||
);
|
||||
}
|
||||
|
||||
async syncAccount(accountId: string) {
|
||||
const account = await this.models.calendarAccount.get(accountId);
|
||||
if (!account || account.status !== 'active') {
|
||||
return;
|
||||
}
|
||||
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listByAccountForSync(accountId);
|
||||
await Promise.allSettled(
|
||||
subscriptions.map(subscription =>
|
||||
this.syncSubscription(subscription.id, { reason: 'polling' })
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
async listWorkspaceEvents(params: {
|
||||
@@ -455,9 +448,18 @@ export class CalendarService {
|
||||
params.to
|
||||
);
|
||||
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listWithAccounts(subscriptionIds);
|
||||
const staleSubscriptions = subscriptions.filter(
|
||||
subscription =>
|
||||
subscription.enabled &&
|
||||
subscription.account.status === 'active' &&
|
||||
subscription.nextSyncAt.getTime() <= this.nowMs()
|
||||
);
|
||||
|
||||
Promise.allSettled(
|
||||
subscriptionIds.map(subscriptionId =>
|
||||
this.syncSubscription(subscriptionId, { reason: 'on-demand' })
|
||||
staleSubscriptions.map(subscription =>
|
||||
this.enqueueSyncSubscription(subscription.id, 'on-demand')
|
||||
)
|
||||
).catch(error => {
|
||||
this.logger.warn('Calendar on-demand sync failed', error as Error);
|
||||
@@ -513,7 +515,7 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.syncSubscription(subscription.id, { reason: 'webhook' });
|
||||
await this.enqueueSyncSubscription(subscription.id, 'webhook');
|
||||
}
|
||||
|
||||
getWebhookToken() {
|
||||
@@ -747,7 +749,7 @@ export class CalendarService {
|
||||
}
|
||||
|
||||
private getSyncWindow() {
|
||||
const now = new Date();
|
||||
const now = this.now();
|
||||
return {
|
||||
timeMin: subDays(now, DEFAULT_PAST_DAYS).toISOString(),
|
||||
timeMax: addDays(now, DEFAULT_FUTURE_DAYS).toISOString(),
|
||||
@@ -767,7 +769,7 @@ export class CalendarService {
|
||||
if (
|
||||
accessToken &&
|
||||
account.expiresAt &&
|
||||
account.expiresAt.getTime() > Date.now() + TOKEN_REFRESH_SKEW_MS
|
||||
account.expiresAt.getTime() > this.nowMs() + TOKEN_REFRESH_SKEW_MS
|
||||
) {
|
||||
return { accessToken };
|
||||
}
|
||||
@@ -831,7 +833,7 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const renewThreshold = Date.now() + 24 * 60 * 60 * 1000;
|
||||
const renewThreshold = this.nowMs() + 24 * 60 * 60 * 1000;
|
||||
if (
|
||||
subscription.channelExpiration &&
|
||||
subscription.channelExpiration.getTime() > renewThreshold
|
||||
@@ -873,6 +875,7 @@ export class CalendarService {
|
||||
subscription: {
|
||||
id: string;
|
||||
externalCalendarId: string;
|
||||
syncRetryCount: number;
|
||||
customChannelId: string | null;
|
||||
customResourceId: string | null;
|
||||
};
|
||||
@@ -895,7 +898,6 @@ export class CalendarService {
|
||||
}
|
||||
|
||||
if (this.isTokenInvalidError(params.error)) {
|
||||
await this.clearSyncFailureBackoff(params.subscription.id);
|
||||
await this.models.calendarAccount.invalidateAndPurge(
|
||||
params.account.id,
|
||||
this.formatSyncError(params.error)
|
||||
@@ -903,18 +905,14 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const backoff = await this.bumpSyncFailureBackoff(params.subscription.id);
|
||||
const interval = params.account.refreshIntervalMinutes ?? 60;
|
||||
const lastSyncAt = this.calculateLastSyncAtForRetry(
|
||||
backoff.nextRetryAt,
|
||||
interval
|
||||
);
|
||||
await this.models.calendarSubscription.updateLastSyncAt(
|
||||
params.subscription.id,
|
||||
lastSyncAt
|
||||
);
|
||||
const attempt = params.subscription.syncRetryCount + 1;
|
||||
const nextRetryAt = this.calculateFailureRetryAt(attempt);
|
||||
await this.models.calendarSubscription.updateSync(params.subscription.id, {
|
||||
nextSyncAt: nextRetryAt,
|
||||
syncRetryCount: attempt,
|
||||
});
|
||||
this.logger.warn(
|
||||
`Calendar sync failed for subscription ${params.subscription.id}, attempt ${backoff.attempt}, next retry at ${backoff.nextRetryAt.toISOString()}`,
|
||||
`Calendar sync failed for subscription ${params.subscription.id}, attempt ${attempt}, next retry at ${nextRetryAt.toISOString()}`,
|
||||
this.toError(params.error)
|
||||
);
|
||||
}
|
||||
@@ -927,15 +925,6 @@ export class CalendarService {
|
||||
return status === 404;
|
||||
}
|
||||
|
||||
private calculateLastSyncAtForRetry(
|
||||
nextRetryAt: Date,
|
||||
refreshIntervalMinutes: number
|
||||
) {
|
||||
// Cron schedules by `now - lastSyncAt >= refreshInterval`, so back-calculate
|
||||
// a synthetic lastSyncAt to defer the next attempt to `nextRetryAt`.
|
||||
return new Date(nextRetryAt.getTime() - refreshIntervalMinutes * 60 * 1000);
|
||||
}
|
||||
|
||||
private async disableSubscription(params: {
|
||||
subscriptionId: string;
|
||||
provider: CalendarProvider;
|
||||
@@ -966,68 +955,52 @@ export class CalendarService {
|
||||
await this.models.calendarSubscription.disableAndPurge(
|
||||
params.subscriptionId
|
||||
);
|
||||
await this.clearSyncFailureBackoff(params.subscriptionId);
|
||||
}
|
||||
|
||||
private getSyncFailureBackoffKey(subscriptionId: string) {
|
||||
return `${SYNC_FAILURE_BACKOFF_KEY_PREFIX}${subscriptionId}`;
|
||||
}
|
||||
|
||||
private async getSyncFailureBackoff(subscriptionId: string) {
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
const value = await this.redis.get(key);
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(value) as {
|
||||
attempt?: number;
|
||||
nextRetryAt?: string;
|
||||
};
|
||||
if (!parsed.attempt || !parsed.nextRetryAt) {
|
||||
return null;
|
||||
async enqueueSyncSubscription(
|
||||
subscriptionId: string,
|
||||
reason: 'polling' | 'webhook' | 'on-demand'
|
||||
) {
|
||||
await this.queue.add(
|
||||
'calendar.syncSubscription',
|
||||
{
|
||||
subscriptionId,
|
||||
reason,
|
||||
},
|
||||
{
|
||||
jobId: subscriptionId,
|
||||
}
|
||||
const nextRetryAt = new Date(parsed.nextRetryAt);
|
||||
if (Number.isNaN(nextRetryAt.getTime())) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
attempt: parsed.attempt,
|
||||
nextRetryAt,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private async bumpSyncFailureBackoff(subscriptionId: string) {
|
||||
const state = await this.getSyncFailureBackoff(subscriptionId);
|
||||
const attempt = (state?.attempt ?? 0) + 1;
|
||||
const delay = Math.min(
|
||||
SYNC_FAILURE_BACKOFF_BASE_MS * 2 ** (attempt - 1),
|
||||
SYNC_FAILURE_BACKOFF_MAX_MS
|
||||
);
|
||||
const nextRetryAt = new Date(Date.now() + delay);
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
await this.redis.set(
|
||||
key,
|
||||
JSON.stringify({
|
||||
attempt,
|
||||
nextRetryAt: nextRetryAt.toISOString(),
|
||||
}),
|
||||
'EX',
|
||||
SYNC_FAILURE_BACKOFF_TTL_SECONDS
|
||||
);
|
||||
return {
|
||||
attempt,
|
||||
nextRetryAt,
|
||||
};
|
||||
private calculateNextSyncAt(base: Date, refreshIntervalMinutes?: number) {
|
||||
const intervalMinutes =
|
||||
refreshIntervalMinutes ?? DEFAULT_REFRESH_INTERVAL_MINUTES;
|
||||
return new Date(base.getTime() + intervalMinutes * 60 * 1000);
|
||||
}
|
||||
|
||||
private async clearSyncFailureBackoff(subscriptionId: string) {
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
await this.redis.del(key);
|
||||
private calculateChannelRetryAt(nextSyncAt: Date) {
|
||||
return new Date(
|
||||
Math.min(nextSyncAt.getTime(), this.nowMs() + CHANNEL_RENEW_RETRY_MS)
|
||||
);
|
||||
}
|
||||
|
||||
private calculateFailureRetryAt(attempt: number) {
|
||||
return new Date(
|
||||
this.nowMs() +
|
||||
exponentialBackoffDelay(attempt - 1, {
|
||||
baseDelayMs: SYNC_FAILURE_BACKOFF_BASE_MS,
|
||||
maxDelayMs: SYNC_FAILURE_BACKOFF_MAX_MS,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private now() {
|
||||
return new Date(this.nowMs());
|
||||
}
|
||||
|
||||
private nowMs() {
|
||||
return Date.now();
|
||||
}
|
||||
|
||||
private formatSyncError(error: unknown) {
|
||||
|
||||
@@ -33,19 +33,37 @@ test('should not index workspace if indexer is disabled', async t => {
|
||||
const count = module.queue.count('indexer.indexWorkspace');
|
||||
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'test-workspace',
|
||||
});
|
||||
|
||||
t.is(module.queue.count('indexer.indexWorkspace'), count);
|
||||
});
|
||||
|
||||
test('should index workspace if indexer is enabled', async t => {
|
||||
test('should index workspace when root snapshot is updated', async t => {
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'test-workspace',
|
||||
});
|
||||
|
||||
const { payload } = await module.queue.waitFor('indexer.indexWorkspace');
|
||||
t.is(payload.workspaceId, 'test-workspace');
|
||||
});
|
||||
|
||||
test('should not index workspace when non-root snapshot is updated', async t => {
|
||||
const count = module.queue.count('indexer.indexWorkspace');
|
||||
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'child-doc',
|
||||
});
|
||||
|
||||
t.is(module.queue.count('indexer.indexWorkspace'), count);
|
||||
});
|
||||
|
||||
test('should not delete workspace if indexer is disabled', async t => {
|
||||
Sinon.stub(config.indexer, 'enabled').value(false);
|
||||
const count = module.queue.count('indexer.deleteWorkspace');
|
||||
|
||||
@@ -29,21 +29,20 @@ export class IndexerEvent {
|
||||
);
|
||||
}
|
||||
|
||||
@OnEvent('workspace.updated')
|
||||
async indexWorkspace({ id }: Events['workspace.updated']) {
|
||||
@OnEvent('doc.snapshot.updated')
|
||||
async indexWorkspace({ workspaceId, docId }: Events['doc.snapshot.updated']) {
|
||||
if (!this.config.indexer.enabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (workspaceId !== docId) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.queue.add(
|
||||
'indexer.indexWorkspace',
|
||||
{
|
||||
workspaceId: id,
|
||||
},
|
||||
{
|
||||
jobId: `indexWorkspace/${id}`,
|
||||
priority: 100,
|
||||
}
|
||||
{ workspaceId },
|
||||
{ jobId: `indexWorkspace/${workspaceId}`, priority: 100 }
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,10 @@
|
||||
"type": "Object",
|
||||
"desc": "The config for copilot job queue"
|
||||
},
|
||||
"queues.calendar": {
|
||||
"type": "Object",
|
||||
"desc": "The config for calendar job queue"
|
||||
},
|
||||
"queues.doc": {
|
||||
"type": "Object",
|
||||
"desc": "The config for doc job queue"
|
||||
|
||||
@@ -31,7 +31,7 @@ extension AFFiNEViewController: IntelligentsButtonDelegate {
|
||||
private func showAIConsentAlert() {
|
||||
let alert = UIAlertController(
|
||||
title: "AI Feature Data Usage",
|
||||
message: "To provide AI-powered features, your input (such as document content and conversation messages) will be sent to a third-party AI service for processing. This data is used solely to generate responses and is not used for any other purpose.\n\nBy continuing, you agree to share this data with the AI service.",
|
||||
message: "To provide AI-powered features, your input (such as document content and conversation messages) will be sent to our third-party AI service providers (Google, Anthropic, or OpenAI, based on your choice) for processing. This data is used solely to generate responses and is not used for any other purpose.\n\nBy continuing, you agree to share this data with these AI services.",
|
||||
preferredStyle: .alert
|
||||
)
|
||||
alert.addAction(UIAlertAction(title: "Cancel", style: .cancel))
|
||||
|
||||
@@ -26,13 +26,13 @@
|
||||
"@blocksuite/global": "workspace:*",
|
||||
"@blocksuite/icons": "^2.2.17",
|
||||
"@blocksuite/std": "workspace:*",
|
||||
"@dotlottie/player-component": "^2.7.12",
|
||||
"@emotion/cache": "^11.14.0",
|
||||
"@emotion/css": "^11.13.5",
|
||||
"@emotion/react": "^11.14.0",
|
||||
"@floating-ui/dom": "^1.6.13",
|
||||
"@juggle/resize-observer": "^3.4.0",
|
||||
"@lit/context": "^1.1.4",
|
||||
"@lottiefiles/dotlottie-wc": "^0.9.4",
|
||||
"@marsidev/react-turnstile": "^1.1.0",
|
||||
"@myriaddreamin/typst-ts-renderer": "^0.7.0-rc2",
|
||||
"@myriaddreamin/typst-ts-web-compiler": "^0.7.0-rc2",
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -18,7 +18,7 @@
|
||||
"pl": 98,
|
||||
"pt-BR": 96,
|
||||
"ru": 98,
|
||||
"sv-SE": 97,
|
||||
"sv-SE": 96,
|
||||
"uk": 96,
|
||||
"ur": 2,
|
||||
"zh-Hans": 98,
|
||||
|
||||
+52
-4
@@ -73,8 +73,8 @@ update_app_stream_version() {
|
||||
|
||||
update_ios_marketing_version() {
|
||||
local file_path=$1
|
||||
# Remove everything after the "-"
|
||||
local new_version=$(echo "$2" | sed -E 's/-.*$//')
|
||||
# Normalize inputs like "v0.26.4-beta.1" to "0.26.4"
|
||||
local new_version=$(echo "$2" | sed -E 's/^v//; s/-.*$//')
|
||||
|
||||
# Check if file exists
|
||||
if [ ! -f "$file_path" ]; then
|
||||
@@ -98,8 +98,56 @@ update_ios_marketing_version() {
|
||||
rm "$file_path".bak
|
||||
}
|
||||
|
||||
# Derive a date-based iOS MARKETING_VERSION from the latest stable/beta tag.
|
||||
# Apple requires CFBundleShortVersionString to increase monotonically. Using
|
||||
# date-based versions (YYYY.M.D) derived from the last stable/beta release tag
|
||||
# ensures this. The user-facing App Store version is set separately in
|
||||
# App Store Connect.
|
||||
get_ios_version_from_git() {
|
||||
# Find the most recent stable/beta tag reachable from HEAD (exclude canary/nightly)
|
||||
local latest_tag
|
||||
latest_tag=$(git describe --tags --match 'v[0-9]*' \
|
||||
--exclude '*canary*' --exclude '*nightly*' \
|
||||
--abbrev=0 HEAD 2>/dev/null)
|
||||
|
||||
if [ -z "$latest_tag" ]; then
|
||||
# No stable/beta tag found, fall back to today's date
|
||||
date +"%Y.%-m.%-d"
|
||||
return
|
||||
fi
|
||||
|
||||
# Get the tag creation date (tagger date for annotated tags, commit date for lightweight)
|
||||
local tag_date
|
||||
tag_date=$(git for-each-ref --format='%(creatordate:short)' "refs/tags/$latest_tag")
|
||||
|
||||
if [ -z "$tag_date" ]; then
|
||||
date +"%Y.%-m.%-d"
|
||||
return
|
||||
fi
|
||||
|
||||
# Format as YYYY.M.D (no leading zeros for month/day)
|
||||
local year month day
|
||||
year=$(echo "$tag_date" | cut -d'-' -f1)
|
||||
month=$((10#$(echo "$tag_date" | cut -d'-' -f2)))
|
||||
day=$((10#$(echo "$tag_date" | cut -d'-' -f3)))
|
||||
|
||||
echo "${year}.${month}.${day}"
|
||||
}
|
||||
|
||||
new_version=$1
|
||||
ios_new_version=${IOS_APP_VERSION:-$new_version}
|
||||
|
||||
if [ -n "$IOS_APP_VERSION" ]; then
|
||||
# Manual override via environment variable
|
||||
ios_new_version=$IOS_APP_VERSION
|
||||
elif echo "$new_version" | grep -qE '(canary|nightly)'; then
|
||||
# Canary/nightly: use the date of the last stable/beta tag
|
||||
ios_new_version=$(get_ios_version_from_git)
|
||||
else
|
||||
# Stable/beta release: use today's date
|
||||
ios_new_version=$(date +"%Y.%-m.%-d")
|
||||
fi
|
||||
|
||||
echo "iOS MARKETING_VERSION: $ios_new_version (app version: $new_version)"
|
||||
|
||||
update_app_version_in_helm_charts ".github/helm/affine/Chart.yaml" "$new_version"
|
||||
update_app_version_in_helm_charts ".github/helm/affine/charts/graphql/Chart.yaml" "$new_version"
|
||||
@@ -108,4 +156,4 @@ update_app_version_in_helm_charts ".github/helm/affine/charts/doc/Chart.yaml" "$
|
||||
|
||||
update_app_stream_version "packages/frontend/apps/electron/resources/affine.metainfo.xml" "$new_version"
|
||||
|
||||
update_ios_marketing_version "packages/frontend/apps/ios/App/App.xcodeproj/project.pbxproj" "$new_version"
|
||||
update_ios_marketing_version "packages/frontend/apps/ios/App/App.xcodeproj/project.pbxproj" "$ios_new_version"
|
||||
|
||||
Reference in New Issue
Block a user