mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-07-02 02:00:49 +08:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c4c9e3c36d | |||
| 1a8d884f8e | |||
| 91acb88a2d | |||
| 43704d60fb | |||
| 46e7e35357 | |||
| b98ab495bb | |||
| 99b07c2ee1 | |||
| e1e0ac2345 | |||
| bdccf4e9fd | |||
| 11cf1928b5 | |||
| 5215c73166 | |||
| 895e774569 |
@@ -62,6 +62,18 @@
|
||||
"concurrency": 10
|
||||
}
|
||||
},
|
||||
"queues.calendar": {
|
||||
"type": "object",
|
||||
"description": "The config for calendar job queue\n@default {\"concurrency\":4}",
|
||||
"properties": {
|
||||
"concurrency": {
|
||||
"type": "number"
|
||||
}
|
||||
},
|
||||
"default": {
|
||||
"concurrency": 4
|
||||
}
|
||||
},
|
||||
"queues.doc": {
|
||||
"type": "object",
|
||||
"description": "The config for doc job queue\n@default {\"concurrency\":1}",
|
||||
@@ -843,7 +855,7 @@
|
||||
"properties": {
|
||||
"google": {
|
||||
"type": "object",
|
||||
"description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\"}\n@link https://developers.google.com/calendar/api/guides/push",
|
||||
"description": "Google Calendar integration config\n@default {\"enabled\":false,\"clientId\":\"\",\"clientSecret\":\"\",\"externalWebhookUrl\":\"\",\"webhookVerificationToken\":\"\",\"requestTimeoutMs\":10000}\n@link https://developers.google.com/calendar/api/guides/push",
|
||||
"properties": {
|
||||
"enabled": {
|
||||
"type": "boolean"
|
||||
@@ -859,6 +871,9 @@
|
||||
},
|
||||
"webhookVerificationToken": {
|
||||
"type": "string"
|
||||
},
|
||||
"requestTimeoutMs": {
|
||||
"type": "number"
|
||||
}
|
||||
},
|
||||
"default": {
|
||||
@@ -866,7 +881,8 @@
|
||||
"clientId": "",
|
||||
"clientSecret": "",
|
||||
"externalWebhookUrl": "",
|
||||
"webhookVerificationToken": ""
|
||||
"webhookVerificationToken": "",
|
||||
"requestTimeoutMs": 10000
|
||||
}
|
||||
},
|
||||
"caldav": {
|
||||
|
||||
@@ -48,6 +48,7 @@ testem.log
|
||||
/typings
|
||||
tsconfig.tsbuildinfo
|
||||
.context
|
||||
*.md
|
||||
|
||||
# System Files
|
||||
.DS_Store
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
"@blocksuite/sync": "workspace:*",
|
||||
"@floating-ui/dom": "^1.6.13",
|
||||
"@lit/context": "^1.1.2",
|
||||
"@lottiefiles/dotlottie-wc": "^0.5.0",
|
||||
"@lottiefiles/dotlottie-wc": "^0.9.4",
|
||||
"@preact/signals-core": "^1.8.0",
|
||||
"@toeverything/theme": "^1.1.23",
|
||||
"@types/hast": "^3.0.4",
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
"@blocksuite/icons": "^2.2.17",
|
||||
"@floating-ui/dom": "^1.6.13",
|
||||
"@lit/context": "^1.1.3",
|
||||
"@lottiefiles/dotlottie-wc": "^0.5.0",
|
||||
"@lottiefiles/dotlottie-wc": "^0.9.4",
|
||||
"@preact/signals-core": "^1.8.0",
|
||||
"@toeverything/theme": "^1.1.23",
|
||||
"@vanilla-extract/css": "^1.17.0",
|
||||
|
||||
+27
@@ -0,0 +1,27 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE
|
||||
"calendar_subscriptions"
|
||||
ADD
|
||||
COLUMN "next_sync_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
ADD
|
||||
COLUMN "sync_retry_count" INTEGER NOT NULL DEFAULT 0;
|
||||
|
||||
UPDATE
|
||||
"calendar_subscriptions" AS s
|
||||
SET
|
||||
"next_sync_at" = CASE
|
||||
WHEN s."last_sync_at" IS NULL THEN CURRENT_TIMESTAMP
|
||||
ELSE s."last_sync_at" + make_interval(
|
||||
mins => COALESCE(a."refresh_interval_minutes", 30)
|
||||
)
|
||||
END
|
||||
FROM
|
||||
"calendar_accounts" AS a
|
||||
WHERE
|
||||
a."id" = s."account_id";
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "calendar_subscriptions_custom_channel_id_idx" ON "calendar_subscriptions"("custom_channel_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "calendar_subscriptions_enabled_next_sync_at_idx" ON "calendar_subscriptions"("enabled", "next_sync_at");
|
||||
@@ -110,7 +110,7 @@
|
||||
"react-dom": "19.2.1",
|
||||
"reflect-metadata": "^0.2.2",
|
||||
"rxjs": "^7.8.2",
|
||||
"semver": "^7.7.3",
|
||||
"semver": "^7.7.4",
|
||||
"ses": "^1.14.0",
|
||||
"socket.io": "^4.8.1",
|
||||
"stripe": "^17.7.0",
|
||||
@@ -126,7 +126,6 @@
|
||||
"@faker-js/faker": "^10.1.0",
|
||||
"@nestjs/swagger": "^11.2.0",
|
||||
"@nestjs/testing": "patch:@nestjs/testing@npm%3A10.4.15#~/.yarn/patches/@nestjs-testing-npm-10.4.15-d591a1705a.patch",
|
||||
"@react-email/preview-server": "^4.3.2",
|
||||
"@types/cookie-parser": "^1.4.8",
|
||||
"@types/express": "^5.0.1",
|
||||
"@types/express-serve-static-core": "^5.0.6",
|
||||
@@ -139,8 +138,7 @@
|
||||
"@types/nodemailer": "^7.0.0",
|
||||
"@types/on-headers": "^1.0.3",
|
||||
"@types/react": "^19.0.1",
|
||||
"@types/react-dom": "^19.0.2",
|
||||
"@types/semver": "^7.5.8",
|
||||
"@types/semver": "^7.7.1",
|
||||
"@types/sinon": "^21.0.0",
|
||||
"@types/supertest": "^6.0.2",
|
||||
"ava": "^6.4.0",
|
||||
|
||||
@@ -1037,6 +1037,8 @@ model CalendarSubscription {
|
||||
enabled Boolean @default(true)
|
||||
syncToken String? @map("sync_token") @db.Text
|
||||
lastSyncAt DateTime? @map("last_sync_at") @db.Timestamptz(3)
|
||||
nextSyncAt DateTime @default(now()) @map("next_sync_at") @db.Timestamptz(3)
|
||||
syncRetryCount Int @default(0) @map("sync_retry_count")
|
||||
customChannelId String? @map("custom_channel_id") @db.VarChar
|
||||
customResourceId String? @map("custom_resource_id") @db.VarChar
|
||||
channelExpiration DateTime? @map("channel_expiration") @db.Timestamptz(3)
|
||||
@@ -1050,6 +1052,8 @@ model CalendarSubscription {
|
||||
@@unique([accountId, externalCalendarId])
|
||||
@@index([accountId])
|
||||
@@index([provider, externalCalendarId])
|
||||
@@index([customChannelId])
|
||||
@@index([enabled, nextSyncAt])
|
||||
@@map("calendar_subscriptions")
|
||||
}
|
||||
|
||||
|
||||
@@ -858,7 +858,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> test@test.com invited you to join Test Workspace
|
||||
> You were invited to join a workspace on AFFiNE
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -973,7 +973,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> test@test.com accepted your invitation
|
||||
> Your workspace invitation was accepted
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1072,7 +1072,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> test@test.com left Test Workspace
|
||||
> A workspace member left
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1132,7 +1132,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> New request to join Test Workspace
|
||||
> New request to join a workspace
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1239,7 +1239,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> Your request to join Test Workspace has been approved
|
||||
> Your request to join a workspace has been approved
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1294,7 +1294,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> Your request to join Test Workspace was declined
|
||||
> Your request to join a workspace was declined
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1348,7 +1348,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> You have been removed from Test Workspace
|
||||
> You have been removed from a workspace
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1401,7 +1401,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> Your ownership of Test Workspace has been transferred
|
||||
> Your workspace ownership has been transferred
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1454,7 +1454,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> You are now the owner of Test Workspace
|
||||
> You are now the owner of a workspace
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1506,7 +1506,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> test@test.com mentioned you in Test Doc
|
||||
> You were mentioned in AFFiNE
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<!--$-->␊
|
||||
@@ -1601,7 +1601,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> test@test.com commented on Test Doc
|
||||
> New comment in AFFiNE
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<!--$-->␊
|
||||
@@ -1695,7 +1695,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> test@test.com mentioned you in a comment on Test Doc
|
||||
> You were mentioned in a comment
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<!--$-->␊
|
||||
@@ -1894,7 +1894,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> You are now an admin of Test Workspace
|
||||
> You are now a workspace admin
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -1993,7 +1993,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> Your role has been changed in Test Workspace
|
||||
> Your workspace role has been changed
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -2094,7 +2094,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> [Action Required] Final warning: Your workspace Test Workspace will be deleted in 24 hours
|
||||
> [Action Required] Final warning: Your workspace will be deleted in 24 hours
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -2208,7 +2208,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> [Action Required] Important: Your workspace Test Workspace will be deleted soon
|
||||
> [Action Required] Important: Your workspace will be deleted soon
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -2324,7 +2324,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> Your workspace Test Workspace has been deleted
|
||||
> Your workspace has been deleted
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -2408,7 +2408,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> [Action Required] Your Test Workspace team workspace will expire soon
|
||||
> [Action Required] Your team workspace will expire soon
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -2511,7 +2511,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
<!--/$-->␊
|
||||
`
|
||||
|
||||
> Your Test Workspace team workspace has expired
|
||||
> Your team workspace has expired
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<link␊
|
||||
@@ -2689,7 +2689,7 @@ Generated by [AVA](https://avajs.dev).
|
||||
|
||||
## should render mention email with empty doc title
|
||||
|
||||
> test@test.com mentioned you in
|
||||
> You were mentioned in AFFiNE
|
||||
|
||||
`<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">␊
|
||||
<!--$-->␊
|
||||
|
||||
Binary file not shown.
@@ -1,5 +1,6 @@
|
||||
import test from 'ava';
|
||||
|
||||
import { normalizeSMTPHeloHostname } from '../core/mail/utils';
|
||||
import { Renderers } from '../mails';
|
||||
import { TEST_DOC, TEST_USER } from '../mails/common';
|
||||
|
||||
@@ -21,3 +22,23 @@ test('should render mention email with empty doc title', async t => {
|
||||
});
|
||||
t.snapshot(content.html, content.subject);
|
||||
});
|
||||
|
||||
test('should normalize valid SMTP HELO hostnames', t => {
|
||||
t.is(normalizeSMTPHeloHostname('mail.example.com'), 'mail.example.com');
|
||||
t.is(normalizeSMTPHeloHostname(' localhost '), 'localhost');
|
||||
t.is(normalizeSMTPHeloHostname('[127.0.0.1]'), '[127.0.0.1]');
|
||||
t.is(normalizeSMTPHeloHostname('[IPv6:2001:db8::1]'), '[IPv6:2001:db8::1]');
|
||||
});
|
||||
|
||||
test('should reject invalid SMTP HELO hostnames', t => {
|
||||
t.is(normalizeSMTPHeloHostname(), undefined);
|
||||
t.is(normalizeSMTPHeloHostname(''), undefined);
|
||||
t.is(normalizeSMTPHeloHostname(' '), undefined);
|
||||
t.is(normalizeSMTPHeloHostname('AFFiNE Server'), undefined);
|
||||
t.is(normalizeSMTPHeloHostname('-example.com'), undefined);
|
||||
t.is(normalizeSMTPHeloHostname('example-.com'), undefined);
|
||||
t.is(normalizeSMTPHeloHostname('example..com'), undefined);
|
||||
t.is(normalizeSMTPHeloHostname('[bad host]'), undefined);
|
||||
t.is(normalizeSMTPHeloHostname('[foo]'), undefined);
|
||||
t.is(normalizeSMTPHeloHostname('[IPv6:foo]'), undefined);
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@ export class MockEventBus {
|
||||
|
||||
emit = this.stub.emitAsync;
|
||||
emitAsync = this.stub.emitAsync;
|
||||
emitDetached = this.stub.emitAsync;
|
||||
broadcast = this.stub.broadcast;
|
||||
|
||||
last<Event extends EventName>(
|
||||
|
||||
@@ -6,6 +6,7 @@ import { JobQueue } from '../../base';
|
||||
export class MockJobQueue {
|
||||
add = Sinon.createStubInstance(JobQueue).add.resolves();
|
||||
remove = Sinon.createStubInstance(JobQueue).remove.resolves();
|
||||
removeWhere = Sinon.createStubInstance(JobQueue).removeWhere.resolves([]);
|
||||
|
||||
last<Job extends JobName>(name: Job): { name: Job; payload: Jobs[Job] } {
|
||||
const addJobName = this.add.lastCall?.args[0];
|
||||
|
||||
@@ -437,6 +437,37 @@ test('should throw if user has subscription already', async t => {
|
||||
);
|
||||
});
|
||||
|
||||
test('should allow checkout after local subscription period ended', async t => {
|
||||
const { service, u1, db, stripe } = t.context;
|
||||
|
||||
await db.subscription.create({
|
||||
data: {
|
||||
targetId: u1.id,
|
||||
stripeSubscriptionId: 'sub_expired_ai',
|
||||
plan: SubscriptionPlan.AI,
|
||||
recurring: SubscriptionRecurring.Yearly,
|
||||
status: SubscriptionStatus.Active,
|
||||
start: new Date('2026-05-04T13:11:45.000Z'),
|
||||
end: new Date('2026-05-11T13:11:45.000Z'),
|
||||
},
|
||||
});
|
||||
|
||||
await service.checkout(
|
||||
{
|
||||
plan: SubscriptionPlan.AI,
|
||||
recurring: SubscriptionRecurring.Yearly,
|
||||
successCallbackLink: '',
|
||||
},
|
||||
{ user: u1 }
|
||||
);
|
||||
|
||||
t.true(stripe.checkout.sessions.create.calledOnce);
|
||||
t.deepEqual(getLastCheckoutPrice(stripe.checkout.sessions.create), {
|
||||
price: AI_YEARLY,
|
||||
coupon: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
test('should get correct pro plan price for checking out', async t => {
|
||||
const { app, service, u1, stripe, feature } = t.context;
|
||||
// non-ea user
|
||||
|
||||
@@ -6,6 +6,7 @@ import { AppModule } from '../app.module';
|
||||
import {
|
||||
CANARY_CLIENT_VERSION_MAX_AGE_DAYS,
|
||||
ConfigFactory,
|
||||
hasNewerVersion,
|
||||
UseNamedGuard,
|
||||
} from '../base';
|
||||
import { Public } from '../core/auth/guard';
|
||||
@@ -249,3 +250,11 @@ test('should reject old canary date version in canary namespace', async t => {
|
||||
env.NAMESPACE = prevNamespace;
|
||||
}
|
||||
});
|
||||
|
||||
test('should compare release versions for available upgrades', t => {
|
||||
t.false(hasNewerVersion('0.26.5', '0.26.4'));
|
||||
t.false(hasNewerVersion('0.26.5', '0.26.5'));
|
||||
t.true(hasNewerVersion('0.26.5', '0.26.6'));
|
||||
t.true(hasNewerVersion('0.26.5', '0.26.6-beta.1'));
|
||||
t.false(hasNewerVersion('0.26.6-beta.2', '0.26.6-beta.1'));
|
||||
});
|
||||
|
||||
@@ -88,12 +88,21 @@ export class EventBus
|
||||
emit<T extends EventName>(event: T, payload: Events[T]) {
|
||||
this.logger.debug(`Dispatch event: ${event}`);
|
||||
|
||||
// NOTE(@forehalo):
|
||||
// Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick.
|
||||
// In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting.
|
||||
// So we catch it here with `emitAsync`
|
||||
this.emitter.emitAsync(event, payload).catch(e => {
|
||||
this.emitter.emit('error', { event, payload, error: e });
|
||||
this.dispatchAsync(event, payload);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit event in detached cls context to avoid inheriting current transaction.
|
||||
*/
|
||||
emitDetached<T extends EventName>(event: T, payload: Events[T]) {
|
||||
this.logger.debug(`Dispatch event: ${event} (detached)`);
|
||||
|
||||
const requestId = this.cls.getId();
|
||||
this.cls.run({ ifNested: 'override' }, () => {
|
||||
this.cls.set(CLS_ID, requestId ?? genRequestId('event'));
|
||||
this.dispatchAsync(event, payload);
|
||||
});
|
||||
|
||||
return true;
|
||||
@@ -166,6 +175,16 @@ export class EventBus
|
||||
return this.emitter.waitFor(name, timeout);
|
||||
}
|
||||
|
||||
private dispatchAsync<T extends EventName>(event: T, payload: Events[T]) {
|
||||
// NOTE:
|
||||
// Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick.
|
||||
// In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting.
|
||||
// So we catch it here with `emitAsync`
|
||||
this.emitter.emitAsync(event, payload).catch(e => {
|
||||
this.emitter.emit('error', { event, payload, error: e });
|
||||
});
|
||||
}
|
||||
|
||||
private readonly bindEventHandlers = once(() => {
|
||||
this.scanner.scan().forEach(({ event, handler, opts }) => {
|
||||
this.on(event, handler, opts);
|
||||
|
||||
@@ -117,6 +117,22 @@ test('should remove job from queue', async t => {
|
||||
t.is(nullData, undefined);
|
||||
t.is(nullJob, undefined);
|
||||
});
|
||||
|
||||
test('should remove jobs by payload predicate', async t => {
|
||||
const keep = await queue.add('nightly.__test__job', { name: 'keep' });
|
||||
const remove = await queue.add('nightly.__test__job', { name: 'remove' });
|
||||
const other = await queue.add('nightly.__test__job2', { name: 'remove' });
|
||||
|
||||
const removed = await queue.removeWhere(
|
||||
'nightly.__test__job',
|
||||
job => job.name === 'remove'
|
||||
);
|
||||
|
||||
t.deepEqual(removed, [{ name: 'remove' }]);
|
||||
t.truthy(await queue.get(keep.id!, 'nightly.__test__job'));
|
||||
t.is(await queue.get(remove.id!, 'nightly.__test__job'), undefined);
|
||||
t.truthy(await queue.get(other.id!, 'nightly.__test__job2'));
|
||||
});
|
||||
// #endregion
|
||||
|
||||
// #region executor
|
||||
|
||||
@@ -55,6 +55,14 @@ defineModuleConfig('job', {
|
||||
schema,
|
||||
},
|
||||
|
||||
'queues.calendar': {
|
||||
desc: 'The config for calendar job queue',
|
||||
default: {
|
||||
concurrency: 4,
|
||||
},
|
||||
schema,
|
||||
},
|
||||
|
||||
'queues.doc': {
|
||||
desc: 'The config for doc job queue',
|
||||
default: {
|
||||
|
||||
@@ -28,6 +28,7 @@ export enum Queue {
|
||||
DOC = 'doc',
|
||||
COPILOT = 'copilot',
|
||||
INDEXER = 'indexer',
|
||||
CALENDAR = 'calendar',
|
||||
}
|
||||
|
||||
export const QUEUES = Object.values(Queue);
|
||||
|
||||
@@ -55,6 +55,39 @@ export class JobQueue {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async removeWhere<T extends JobName>(
|
||||
jobName: T,
|
||||
predicate: (payload: Jobs[T]) => boolean | Promise<boolean>
|
||||
): Promise<Jobs[T][]> {
|
||||
const ns = namespace(jobName);
|
||||
const queue = this.getQueue(ns);
|
||||
const jobs = (await queue.getJobs([
|
||||
'waiting',
|
||||
'delayed',
|
||||
'prioritized',
|
||||
'paused',
|
||||
'waiting-children',
|
||||
])) as Job<JobData<T>>[];
|
||||
const removed: Jobs[T][] = [];
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job.name !== jobName) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const payload = job.data.payload;
|
||||
if (!(await predicate(payload))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await job.remove();
|
||||
this.logger.log(`Job ${jobName}(id=${job.id}) removed from queue ${ns}`);
|
||||
removed.push(payload);
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
async get<T extends JobName>(jobId: string, jobName: T) {
|
||||
const ns = namespace(jobName);
|
||||
const queue = this.getQueue(ns);
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import semver from 'semver';
|
||||
|
||||
const DAY_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
// Example: 2026.2.6-canary.015
|
||||
@@ -89,3 +91,26 @@ export function checkCanaryDateClientVersion(
|
||||
normalized: parsed.normalized,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeComparableVersion(version: string): string | null {
|
||||
const canary = parseCanaryDateClientVersion(version);
|
||||
return semver.valid(canary?.normalized ?? version.trim(), {
|
||||
loose: true,
|
||||
});
|
||||
}
|
||||
|
||||
export function hasNewerVersion(
|
||||
currentVersion: string,
|
||||
nextVersion: string
|
||||
): boolean {
|
||||
const current = normalizeComparableVersion(currentVersion);
|
||||
const next = normalizeComparableVersion(nextVersion);
|
||||
|
||||
if (!current || !next) {
|
||||
return currentVersion.trim() !== nextVersion.trim();
|
||||
}
|
||||
|
||||
return semver.gt(next, current, {
|
||||
loose: true,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import {
|
||||
} from '@nestjs/graphql';
|
||||
import { GraphQLJSON, GraphQLJSONObject } from 'graphql-scalars';
|
||||
|
||||
import { Config, URLHelper } from '../../base';
|
||||
import { Config, hasNewerVersion, URLHelper } from '../../base';
|
||||
import { Namespace } from '../../env';
|
||||
import { Feature, type WorkspaceFeatureName } from '../../models';
|
||||
import { CurrentUser, Public } from '../auth';
|
||||
@@ -143,7 +143,7 @@ export class ServerConfigResolver {
|
||||
}>;
|
||||
|
||||
const latest = releases.at(0);
|
||||
if (!latest || latest.name === env.version) {
|
||||
if (!latest || !hasNewerVersion(env.version, latest.name)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -68,7 +68,7 @@ test('should update doc content to database when doc is updated', async t => {
|
||||
|
||||
const docId = randomUUID();
|
||||
await adapter.pushDocUpdates(workspace.id, docId, updates);
|
||||
await adapter.getDoc(workspace.id, docId);
|
||||
await adapter.getDocBinNative(workspace.id, docId);
|
||||
|
||||
mock.method(docReader, 'parseDocContent', () => {
|
||||
return {
|
||||
@@ -181,3 +181,22 @@ test('should ignore update workspace content to database when parse workspace co
|
||||
t.is(content!.name, null);
|
||||
t.is(content!.avatarKey, null);
|
||||
});
|
||||
|
||||
test('should ignore stale workspace when updating doc meta from snapshot event', async t => {
|
||||
const { docReader, listener, models } = t.context;
|
||||
const docId = randomUUID();
|
||||
mock.method(docReader, 'parseDocContent', () => ({
|
||||
title: 'test title',
|
||||
summary: 'test summary',
|
||||
}));
|
||||
|
||||
await models.workspace.delete(workspace.id);
|
||||
|
||||
await t.notThrowsAsync(async () => {
|
||||
await listener.markDocContentCacheStale({
|
||||
workspaceId: workspace.id,
|
||||
docId,
|
||||
blob: Buffer.from([0x01]),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -110,7 +110,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
|
||||
});
|
||||
|
||||
if (isNewDoc) {
|
||||
this.event.emit('doc.created', {
|
||||
this.event.emitDetached('doc.created', {
|
||||
workspaceId,
|
||||
docId,
|
||||
editor: editorId,
|
||||
@@ -334,7 +334,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
|
||||
});
|
||||
|
||||
if (updatedSnapshot) {
|
||||
this.event.emit('doc.snapshot.updated', {
|
||||
this.event.emitDetached('doc.snapshot.updated', {
|
||||
workspaceId: snapshot.spaceId,
|
||||
docId: snapshot.docId,
|
||||
blob,
|
||||
|
||||
@@ -1,12 +1,29 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Prisma } from '@prisma/client';
|
||||
|
||||
import { OnEvent } from '../../base';
|
||||
import { Models } from '../../models';
|
||||
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
|
||||
import { DocReader } from './reader';
|
||||
|
||||
const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']);
|
||||
|
||||
function isIgnorableDocEventError(error: unknown) {
|
||||
if (error instanceof Prisma.PrismaClientKnownRequestError) {
|
||||
return IGNORED_PRISMA_CODES.has(error.code);
|
||||
}
|
||||
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
|
||||
return /transaction is aborted|transaction already closed/i.test(
|
||||
error.message
|
||||
);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class DocEventsListener {
|
||||
private readonly logger = new Logger(DocEventsListener.name);
|
||||
|
||||
constructor(
|
||||
private readonly docReader: DocReader,
|
||||
private readonly models: Models,
|
||||
@@ -20,21 +37,39 @@ export class DocEventsListener {
|
||||
blob,
|
||||
}: Events['doc.snapshot.updated']) {
|
||||
await this.docReader.markDocContentCacheStale(workspaceId, docId);
|
||||
const workspace = await this.models.workspace.get(workspaceId);
|
||||
if (!workspace) {
|
||||
this.logger.warn(
|
||||
`Skip stale doc snapshot event for missing workspace ${workspaceId}/${docId}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
const isDoc = workspaceId !== docId;
|
||||
// update doc content to database
|
||||
if (isDoc) {
|
||||
const content = this.docReader.parseDocContent(blob);
|
||||
if (!content) {
|
||||
try {
|
||||
if (isDoc) {
|
||||
const content = this.docReader.parseDocContent(blob);
|
||||
if (!content) {
|
||||
return;
|
||||
}
|
||||
await this.models.doc.upsertMeta(workspaceId, docId, content);
|
||||
} else {
|
||||
// update workspace content to database
|
||||
const content = this.docReader.parseWorkspaceContent(blob);
|
||||
if (!content) {
|
||||
return;
|
||||
}
|
||||
await this.models.workspace.update(workspaceId, content);
|
||||
}
|
||||
} catch (error) {
|
||||
if (isIgnorableDocEventError(error)) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(
|
||||
`Ignore stale doc snapshot event for ${workspaceId}/${docId}: ${message}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
await this.models.doc.upsertMeta(workspaceId, docId, content);
|
||||
} else {
|
||||
// update workspace content to database
|
||||
const content = this.docReader.parseWorkspaceContent(blob);
|
||||
if (!content) {
|
||||
return;
|
||||
}
|
||||
await this.models.workspace.update(workspaceId, content);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,243 @@
|
||||
import test from 'ava';
|
||||
import Sinon from 'sinon';
|
||||
|
||||
import { Mockers } from '../../../__tests__/mocks';
|
||||
import { createTestingModule } from '../../../__tests__/utils';
|
||||
import { Cache } from '../../../base';
|
||||
import { Models } from '../../../models';
|
||||
import { MailJob } from '../job';
|
||||
import { MailSender } from '../sender';
|
||||
|
||||
let module: Awaited<ReturnType<typeof createTestingModule>>;
|
||||
let cache: Cache;
|
||||
let mailJob: MailJob;
|
||||
let sender: MailSender;
|
||||
let models: Models;
|
||||
|
||||
test.before(async () => {
|
||||
module = await createTestingModule();
|
||||
cache = module.get(Cache);
|
||||
mailJob = module.get(MailJob);
|
||||
sender = module.get(MailSender);
|
||||
models = module.get(Models);
|
||||
});
|
||||
|
||||
test.after.always(async () => {
|
||||
await module.close();
|
||||
});
|
||||
|
||||
test.afterEach(() => {
|
||||
Sinon.restore();
|
||||
});
|
||||
|
||||
test('should clear pending mail records when user is deleted', async t => {
|
||||
const user = await module.create(Mockers.User);
|
||||
const another = await module.create(Mockers.User);
|
||||
const sendMailKey = 'mailjob:sendMail';
|
||||
const retryMailKey = 'mailjob:sendMail:retry';
|
||||
const userKey = `${sendMailKey}:SignIn:${user.email}`;
|
||||
const userRetryKey = `${sendMailKey}:VerifyEmail:${user.email}`;
|
||||
const anotherKey = `${sendMailKey}:SignIn:${another.email}`;
|
||||
|
||||
await cache.mapSet(sendMailKey, userKey, 1);
|
||||
await cache.mapSet(sendMailKey, anotherKey, 1);
|
||||
await cache.mapSet(
|
||||
retryMailKey,
|
||||
userRetryKey,
|
||||
JSON.stringify({
|
||||
startTime: Date.now(),
|
||||
name: 'VerifyEmail',
|
||||
to: user.email,
|
||||
props: { url: 'https://affine.pro/verify' },
|
||||
})
|
||||
);
|
||||
|
||||
await mailJob.onUserDeleted({ ...user, ownedWorkspaces: [] });
|
||||
|
||||
t.true(module.queue.removeWhere.calledOnce);
|
||||
t.is(module.queue.removeWhere.firstCall.args[0], 'notification.sendMail');
|
||||
const shouldRemove = module.queue.removeWhere.firstCall.args[1];
|
||||
t.true(
|
||||
await shouldRemove({
|
||||
to: user.email,
|
||||
} as Jobs['notification.sendMail'])
|
||||
);
|
||||
t.false(
|
||||
await shouldRemove({
|
||||
to: another.email,
|
||||
} as Jobs['notification.sendMail'])
|
||||
);
|
||||
t.is(await cache.mapGet(sendMailKey, userKey), undefined);
|
||||
t.is(await cache.mapGet(retryMailKey, userRetryKey), undefined);
|
||||
t.is(await cache.mapGet(sendMailKey, anotherKey), 1);
|
||||
});
|
||||
|
||||
test('should skip queued mail for disabled recipient', async t => {
|
||||
const user = await module.create(Mockers.User, { disabled: true });
|
||||
const send = Sinon.stub(sender, 'send').resolves(true);
|
||||
|
||||
await mailJob.sendMail({
|
||||
startTime: Date.now(),
|
||||
name: 'SignIn',
|
||||
to: user.email,
|
||||
props: {
|
||||
url: 'https://affine.pro/sign-in',
|
||||
otp: '123456',
|
||||
},
|
||||
});
|
||||
|
||||
t.false(send.called);
|
||||
t.truthy(await models.user.get(user.id, { withDisabled: true }));
|
||||
});
|
||||
|
||||
test('should drop expired mail retry', async t => {
|
||||
const send = Sinon.stub(sender, 'send').resolves(true);
|
||||
|
||||
await mailJob.sendMail({
|
||||
startTime: Date.now() - 25 * 60 * 60 * 1000,
|
||||
name: 'SignIn',
|
||||
to: 'expired-retry@example.com',
|
||||
props: {
|
||||
url: 'https://affine.pro/sign-in',
|
||||
otp: '123456',
|
||||
},
|
||||
});
|
||||
|
||||
t.false(send.called);
|
||||
});
|
||||
|
||||
test('should drop time-sensitive mail after its business expiration', async t => {
|
||||
const send = Sinon.stub(sender, 'send').resolves(true);
|
||||
|
||||
await mailJob.sendMail({
|
||||
startTime: Date.now() - 31 * 60 * 1000,
|
||||
name: 'SignIn',
|
||||
to: 'expired-sign-in@example.com',
|
||||
props: {
|
||||
url: 'https://affine.pro/sign-in',
|
||||
otp: '123456',
|
||||
},
|
||||
});
|
||||
|
||||
t.false(send.called);
|
||||
});
|
||||
|
||||
test('should use explicit mail expiration when provided', async t => {
|
||||
const send = Sinon.stub(sender, 'send').resolves(true);
|
||||
|
||||
await mailJob.sendMail({
|
||||
startTime: Date.now(),
|
||||
expiresAt: Date.now() - 1,
|
||||
name: 'MemberInvitation',
|
||||
to: 'expired-invitation@example.com',
|
||||
props: {
|
||||
user: {
|
||||
$$userId: 'owner-id',
|
||||
},
|
||||
workspace: {
|
||||
$$workspaceId: 'workspace-id',
|
||||
},
|
||||
url: 'https://affine.pro/invite/test',
|
||||
},
|
||||
});
|
||||
|
||||
t.false(send.called);
|
||||
});
|
||||
|
||||
test('should drop mail retry after max attempts', async t => {
|
||||
const send = Sinon.stub(sender, 'send').resolves(true);
|
||||
|
||||
await mailJob.sendMail({
|
||||
startTime: Date.now(),
|
||||
retryCount: 12,
|
||||
name: 'SignIn',
|
||||
to: 'max-retry@example.com',
|
||||
props: {
|
||||
url: 'https://affine.pro/sign-in',
|
||||
otp: '123456',
|
||||
},
|
||||
});
|
||||
|
||||
t.false(send.called);
|
||||
});
|
||||
|
||||
test('should requeue legacy stringified retry mail', async t => {
|
||||
const retryMailKey = 'mailjob:sendMail:retry';
|
||||
const job: Jobs['notification.sendMail'] = {
|
||||
startTime: Date.now(),
|
||||
name: 'SignIn',
|
||||
to: 'legacy-retry@example.com',
|
||||
props: {
|
||||
url: 'https://affine.pro/sign-in',
|
||||
otp: '123456',
|
||||
},
|
||||
};
|
||||
const cacheKey = `${retryMailKey}:SignIn:${job.to}`;
|
||||
|
||||
Sinon.stub(cache, 'mapRandomKey')
|
||||
.onFirstCall()
|
||||
.resolves(cacheKey)
|
||||
.onSecondCall()
|
||||
.resolves(undefined);
|
||||
await cache.mapSet(retryMailKey, cacheKey, JSON.stringify(job));
|
||||
await mailJob.sendRetryMails();
|
||||
|
||||
t.true(module.queue.add.calledWith('notification.sendMail', job));
|
||||
t.is(await cache.mapGet(retryMailKey, cacheKey), undefined);
|
||||
});
|
||||
|
||||
test('should skip member invitation mail when rendered workspace name contains domain', async t => {
|
||||
const owner = await module.create(Mockers.User);
|
||||
const member = await module.create(Mockers.User);
|
||||
const workspace = await module.create(Mockers.Workspace, {
|
||||
owner: { id: owner.id },
|
||||
name: 'BTC https://spam.example',
|
||||
});
|
||||
const send = Sinon.stub(sender, 'send').resolves(true);
|
||||
|
||||
await mailJob.sendMail({
|
||||
startTime: Date.now(),
|
||||
name: 'MemberInvitation',
|
||||
to: member.email,
|
||||
props: {
|
||||
user: {
|
||||
$$userId: owner.id,
|
||||
},
|
||||
workspace: {
|
||||
$$workspaceId: workspace.id,
|
||||
},
|
||||
url: 'https://affine.pro/invite/test',
|
||||
},
|
||||
});
|
||||
|
||||
t.false(send.called);
|
||||
});
|
||||
|
||||
test('should keep dynamic mail props untouched for retry', async t => {
|
||||
const owner = await module.create(Mockers.User);
|
||||
const member = await module.create(Mockers.User);
|
||||
const workspace = await module.create(Mockers.Workspace, {
|
||||
owner: { id: owner.id },
|
||||
name: 'Safe Workspace',
|
||||
});
|
||||
Sinon.stub(sender, 'send').resolves(false);
|
||||
const job: Jobs['notification.sendMail'] = {
|
||||
startTime: Date.now(),
|
||||
name: 'MemberInvitation',
|
||||
to: member.email,
|
||||
props: {
|
||||
user: {
|
||||
$$userId: owner.id,
|
||||
},
|
||||
workspace: {
|
||||
$$workspaceId: workspace.id,
|
||||
},
|
||||
url: 'https://affine.pro/invite/test',
|
||||
},
|
||||
};
|
||||
|
||||
await mailJob.sendMail(job);
|
||||
|
||||
t.deepEqual(job.props.user, { $$userId: owner.id });
|
||||
t.deepEqual(job.props.workspace, { $$workspaceId: workspace.id });
|
||||
});
|
||||
@@ -2,12 +2,13 @@ import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { getStreamAsBuffer } from 'get-stream';
|
||||
|
||||
import { Cache, JOB_SIGNAL, JobQueue, OnJob, sleep } from '../../base';
|
||||
import { Cache, JOB_SIGNAL, JobQueue, OnEvent, OnJob, sleep } from '../../base';
|
||||
import { type MailName, MailProps, Renderers } from '../../mails';
|
||||
import { UserProps, WorkspaceProps } from '../../mails/components';
|
||||
import { Models } from '../../models';
|
||||
import { DocReader } from '../doc/reader';
|
||||
import { WorkspaceBlobStorage } from '../storage';
|
||||
import { containsUrlOrDomain } from '../workspaces/abuse';
|
||||
import { MailSender, SendOptions } from './sender';
|
||||
|
||||
type DynamicallyFetchedProps<Props> = {
|
||||
@@ -35,7 +36,11 @@ type SendMailJob<Mail extends MailName = MailName, Props = MailProps<Mail>> = {
|
||||
|
||||
declare global {
|
||||
interface Jobs {
|
||||
'notification.sendMail': { startTime: number } & {
|
||||
'notification.sendMail': {
|
||||
startTime: number;
|
||||
retryCount?: number;
|
||||
expiresAt?: number;
|
||||
} & {
|
||||
[K in MailName]: SendMailJob<K>;
|
||||
}[MailName];
|
||||
}
|
||||
@@ -47,6 +52,19 @@ const sendMailCacheKey = (name: string, to: string) =>
|
||||
`${sendMailKey}:${name}:${to}`;
|
||||
const retryMaxPerTick = 20;
|
||||
const retryFirstTime = 3;
|
||||
const retryMaxAttempts = 12;
|
||||
const retryMaxAge = 24 * 60 * 60 * 1000;
|
||||
const magicLinkExpiresIn = 30 * 60 * 1000;
|
||||
|
||||
const mailExpiresIn: Partial<Record<MailName, number>> = {
|
||||
SignIn: magicLinkExpiresIn,
|
||||
SignUp: magicLinkExpiresIn,
|
||||
SetPassword: magicLinkExpiresIn,
|
||||
ChangePassword: magicLinkExpiresIn,
|
||||
VerifyEmail: magicLinkExpiresIn,
|
||||
ChangeEmail: magicLinkExpiresIn,
|
||||
VerifyChangeEmail: magicLinkExpiresIn,
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class MailJob {
|
||||
@@ -66,17 +84,65 @@ export class MailJob {
|
||||
return Math.min(30 * 1000, Math.round(elapsed / 2000) * 1000);
|
||||
}
|
||||
|
||||
private getRetryExhaustedReason({
|
||||
startTime,
|
||||
retryCount,
|
||||
expiresAt,
|
||||
name,
|
||||
}: Jobs['notification.sendMail']) {
|
||||
const expiredAt =
|
||||
expiresAt ?? startTime + (mailExpiresIn[name] ?? retryMaxAge);
|
||||
if (Date.now() > expiredAt) {
|
||||
return 'expired';
|
||||
}
|
||||
|
||||
if ((retryCount ?? 0) > retryMaxAttempts) {
|
||||
return 'max attempts reached';
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
private async shouldSkipRecipient(to: string) {
|
||||
const user = await this.models.user.getUserByEmail(to, {
|
||||
withDisabled: true,
|
||||
});
|
||||
|
||||
return user?.disabled === true;
|
||||
}
|
||||
|
||||
private async deleteRecipientMailCache(to: string) {
|
||||
const suffix = `:${to}`;
|
||||
|
||||
await Promise.all(
|
||||
[sendMailKey, retryMailKey].map(async map => {
|
||||
const keys = await this.cache.mapKeys(map);
|
||||
await Promise.all(
|
||||
keys
|
||||
.filter(key => key.endsWith(suffix))
|
||||
.map(key => this.cache.mapDelete(map, key))
|
||||
);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private async sendMailInternal({
|
||||
startTime,
|
||||
name,
|
||||
to,
|
||||
props,
|
||||
}: Jobs['notification.sendMail']) {
|
||||
let options: Partial<SendOptions> = {};
|
||||
if (await this.shouldSkipRecipient(to)) {
|
||||
this.logger.debug(`Skip mail [${name}] to disabled user [${to}]`);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const key in props) {
|
||||
let options: Partial<SendOptions> = {};
|
||||
const renderedProps = { ...props };
|
||||
|
||||
for (const key in renderedProps) {
|
||||
// @ts-expect-error allow
|
||||
const val = props[key];
|
||||
const val = renderedProps[key];
|
||||
if (val && typeof val === 'object') {
|
||||
if ('$$workspaceId' in val) {
|
||||
const workspaceProps = await this.fetchWorkspaceProps(
|
||||
@@ -87,6 +153,16 @@ export class MailJob {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
name === 'MemberInvitation' &&
|
||||
containsUrlOrDomain(workspaceProps.name)
|
||||
) {
|
||||
this.logger.warn(
|
||||
`Skip mail [${name}] to [${to}], reason=workspace name contains url or domain`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (workspaceProps.avatar) {
|
||||
options.attachments = [
|
||||
{
|
||||
@@ -99,7 +175,7 @@ export class MailJob {
|
||||
workspaceProps.avatar = 'cid:workspaceAvatar';
|
||||
}
|
||||
// @ts-expect-error replacement
|
||||
props[key] = workspaceProps;
|
||||
renderedProps[key] = workspaceProps;
|
||||
} else if ('$$userId' in val) {
|
||||
const userProps = await this.fetchUserProps(val.$$userId);
|
||||
|
||||
@@ -108,17 +184,30 @@ export class MailJob {
|
||||
}
|
||||
|
||||
// @ts-expect-error replacement
|
||||
props[key] = userProps;
|
||||
renderedProps[key] = userProps;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
name === 'MemberInvitation' &&
|
||||
'workspace' in renderedProps &&
|
||||
containsUrlOrDomain(
|
||||
(renderedProps.workspace as WorkspaceProps | undefined)?.name
|
||||
)
|
||||
) {
|
||||
this.logger.warn(
|
||||
`Skip mail [${name}] to [${to}], reason=workspace name contains url or domain`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await this.sender.send(name, {
|
||||
to,
|
||||
...(await Renderers[name](
|
||||
// @ts-expect-error the job trigger part has been typechecked
|
||||
props
|
||||
renderedProps
|
||||
)),
|
||||
...options,
|
||||
});
|
||||
@@ -130,7 +219,7 @@ export class MailJob {
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
this.logger.error(`Failed to send mail [${name}] to [${to}]`, e);
|
||||
this.logger.error(`Failed to send mail [${name}] to [${to}]`, e, props);
|
||||
// wait for a while before retrying
|
||||
const retryDelay = this.calculateRetryDelay(startTime);
|
||||
await sleep(retryDelay);
|
||||
@@ -177,17 +266,41 @@ export class MailJob {
|
||||
@OnJob('notification.sendMail')
|
||||
async sendMail(job: Jobs['notification.sendMail']) {
|
||||
const cacheKey = sendMailCacheKey(job.name, job.to);
|
||||
job.retryCount = (job.retryCount ?? 0) + 1;
|
||||
const exhaustedReason = this.getRetryExhaustedReason(job);
|
||||
if (exhaustedReason) {
|
||||
this.logger.warn(
|
||||
`Drop mail [${job.name}] to [${job.to}], reason=${exhaustedReason}`
|
||||
);
|
||||
await Promise.all([
|
||||
this.cache.mapDelete(sendMailKey, cacheKey),
|
||||
this.cache.mapDelete(retryMailKey, cacheKey),
|
||||
]);
|
||||
return;
|
||||
}
|
||||
|
||||
const retried = await this.cache.mapIncrease(sendMailKey, cacheKey, 1);
|
||||
if (retried <= retryFirstTime) {
|
||||
const ret = await this.sendMailInternal(job);
|
||||
if (!ret) await this.cache.mapDelete(sendMailKey, cacheKey);
|
||||
return ret;
|
||||
}
|
||||
await this.cache.mapSet(retryMailKey, cacheKey, JSON.stringify(job));
|
||||
await this.cache.mapSet(retryMailKey, cacheKey, job);
|
||||
await this.cache.mapDelete(sendMailKey, cacheKey);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@OnEvent('user.deleted')
|
||||
async onUserDeleted(user: Events['user.deleted']) {
|
||||
await Promise.all([
|
||||
this.deleteRecipientMailCache(user.email),
|
||||
this.queue.removeWhere(
|
||||
'notification.sendMail',
|
||||
job => job.to === user.email
|
||||
),
|
||||
]);
|
||||
}
|
||||
|
||||
@Cron(CronExpression.EVERY_MINUTE)
|
||||
async sendRetryMails() {
|
||||
// pick random one from the retry map
|
||||
@@ -195,9 +308,14 @@ export class MailJob {
|
||||
let key = await this.cache.mapRandomKey(retryMailKey);
|
||||
while (key && processed < retryMaxPerTick) {
|
||||
try {
|
||||
const job = await this.cache.mapGet<string>(retryMailKey, key);
|
||||
const job = await this.cache.mapGet<
|
||||
Jobs['notification.sendMail'] | string
|
||||
>(retryMailKey, key);
|
||||
if (job) {
|
||||
const jobData = JSON.parse(job) as Jobs['notification.sendMail'];
|
||||
const jobData =
|
||||
typeof job === 'string'
|
||||
? (JSON.parse(job) as Jobs['notification.sendMail'])
|
||||
: job;
|
||||
await this.queue.add('notification.sendMail', jobData);
|
||||
// wait for a while before retrying
|
||||
const retryDelay = this.calculateRetryDelay(jobData.startTime);
|
||||
|
||||
@@ -140,7 +140,11 @@ export class MailSender {
|
||||
return true;
|
||||
} catch (e) {
|
||||
metrics.mail.counter('failed_total').add(1, { name });
|
||||
this.logger.error(`Failed to send mail [${name}].`, e);
|
||||
this.logger.error(`Failed to send mail [${name}].`, e, {
|
||||
subject: options.subject,
|
||||
from: options.from,
|
||||
to: options.to,
|
||||
});
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
import { isIP } from 'node:net';
|
||||
import { hostname as getHostname } from 'node:os';
|
||||
|
||||
const hostnameLabelRegexp = /^[A-Za-z0-9-]+$/;
|
||||
|
||||
function isValidSMTPAddressLiteral(hostname: string) {
|
||||
if (!hostname.startsWith('[') || !hostname.endsWith(']')) return false;
|
||||
|
||||
const literal = hostname.slice(1, -1);
|
||||
if (!literal || literal.includes(' ')) return false;
|
||||
if (isIP(literal) === 4) return true;
|
||||
|
||||
if (literal.startsWith('IPv6:')) {
|
||||
return isIP(literal.slice('IPv6:'.length)) === 6;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export function normalizeSMTPHeloHostname(hostname?: string) {
|
||||
if (!hostname) return undefined;
|
||||
|
||||
const normalized = hostname.trim().replace(/\.$/, '');
|
||||
if (!normalized) return undefined;
|
||||
if (isValidSMTPAddressLiteral(normalized)) return normalized;
|
||||
if (normalized.length > 253) return undefined;
|
||||
|
||||
const labels = normalized.split('.');
|
||||
for (const label of labels) {
|
||||
if (!label || label.length > 63) return undefined;
|
||||
if (
|
||||
!hostnameLabelRegexp.test(label) ||
|
||||
label.startsWith('-') ||
|
||||
label.endsWith('-')
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function readSystemHostname() {
|
||||
try {
|
||||
return getHostname();
|
||||
} catch {
|
||||
return '';
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveSMTPHeloHostname(configuredName: string) {
|
||||
const normalizedConfiguredName = normalizeSMTPHeloHostname(configuredName);
|
||||
if (normalizedConfiguredName) return normalizedConfiguredName;
|
||||
return normalizeSMTPHeloHostname(readSystemHostname());
|
||||
}
|
||||
@@ -87,6 +87,29 @@ test('should create invitation notification and email', async t => {
|
||||
t.is(invitationMail.payload.name, 'MemberInvitation');
|
||||
});
|
||||
|
||||
test('should not send invitation email when workspace name contains domain', async t => {
|
||||
const spamWorkspace = await module.create(Mockers.Workspace, {
|
||||
owner: {
|
||||
id: owner.id,
|
||||
},
|
||||
name: 'BTC https://spam.example',
|
||||
});
|
||||
const inviteId = randomUUID();
|
||||
const invitationMailCount = module.queue.count('notification.sendMail');
|
||||
|
||||
const notification = await notificationService.createInvitation({
|
||||
userId: member.id,
|
||||
body: {
|
||||
workspaceId: spamWorkspace.id,
|
||||
createdByUserId: owner.id,
|
||||
inviteId,
|
||||
},
|
||||
});
|
||||
|
||||
t.truthy(notification);
|
||||
t.is(module.queue.count('notification.sendMail'), invitationMailCount);
|
||||
});
|
||||
|
||||
test('should not send invitation email if user setting is not to receive invitation email', async t => {
|
||||
const inviteId = randomUUID();
|
||||
await module.create(Mockers.UserSettings, {
|
||||
|
||||
@@ -22,6 +22,7 @@ import {
|
||||
generateWorkspaceSettingsPath,
|
||||
WorkspaceSettingsTab,
|
||||
} from '../utils/workspace';
|
||||
import { containsUrlOrDomain } from '../workspaces/abuse';
|
||||
|
||||
@Injectable()
|
||||
export class NotificationService {
|
||||
@@ -151,6 +152,16 @@ export class NotificationService {
|
||||
}
|
||||
|
||||
private async sendInvitationEmail(input: InvitationNotificationCreate) {
|
||||
const workspace = await this.docReader.getWorkspaceContent(
|
||||
input.body.workspaceId
|
||||
);
|
||||
if (containsUrlOrDomain(workspace?.name)) {
|
||||
this.logger.warn(
|
||||
`Skip invitation email for workspace ${input.body.workspaceId}, reason=workspace name contains url or domain`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const inviteUrl = this.url.link(`/invite/${input.body.inviteId}`);
|
||||
if (env.dev) {
|
||||
// make it easier to test in dev mode
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
|
||||
import ava, { TestFn } from 'ava';
|
||||
|
||||
import {
|
||||
createTestingModule,
|
||||
type TestingModule,
|
||||
} from '../../../__tests__/utils';
|
||||
import { DocRole, Models, User, Workspace } from '../../../models';
|
||||
import { EventsListener } from '../event';
|
||||
import { PermissionModule } from '../index';
|
||||
|
||||
interface Context {
|
||||
module: TestingModule;
|
||||
models: Models;
|
||||
listener: EventsListener;
|
||||
}
|
||||
|
||||
const test = ava as TestFn<Context>;
|
||||
|
||||
let owner: User;
|
||||
let workspace: Workspace;
|
||||
|
||||
test.before(async t => {
|
||||
const module = await createTestingModule({ imports: [PermissionModule] });
|
||||
t.context.module = module;
|
||||
t.context.models = module.get(Models);
|
||||
t.context.listener = module.get(EventsListener);
|
||||
});
|
||||
|
||||
test.beforeEach(async t => {
|
||||
await t.context.module.initTestingDB();
|
||||
owner = await t.context.models.user.create({
|
||||
email: `${randomUUID()}@affine.pro`,
|
||||
});
|
||||
workspace = await t.context.models.workspace.create(owner.id);
|
||||
});
|
||||
|
||||
test.after.always(async t => {
|
||||
await t.context.module.close();
|
||||
});
|
||||
|
||||
test('should ignore default owner event when workspace does not exist', async t => {
|
||||
await t.notThrowsAsync(async () => {
|
||||
await t.context.listener.setDefaultPageOwner({
|
||||
workspaceId: randomUUID(),
|
||||
docId: randomUUID(),
|
||||
editor: owner.id,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
test('should ignore default owner event when editor does not exist', async t => {
|
||||
await t.notThrowsAsync(async () => {
|
||||
await t.context.listener.setDefaultPageOwner({
|
||||
workspaceId: workspace.id,
|
||||
docId: randomUUID(),
|
||||
editor: randomUUID(),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
test('should set owner when workspace and editor exist', async t => {
|
||||
const docId = randomUUID();
|
||||
await t.context.listener.setDefaultPageOwner({
|
||||
workspaceId: workspace.id,
|
||||
docId,
|
||||
editor: owner.id,
|
||||
});
|
||||
|
||||
const role = await t.context.models.docUser.get(
|
||||
workspace.id,
|
||||
docId,
|
||||
owner.id
|
||||
);
|
||||
t.is(role?.type, DocRole.Owner);
|
||||
});
|
||||
@@ -1,10 +1,27 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Prisma } from '@prisma/client';
|
||||
|
||||
import { OnEvent } from '../../base';
|
||||
import { Models } from '../../models';
|
||||
|
||||
const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']);
|
||||
|
||||
function isIgnorablePermissionEventError(error: unknown) {
|
||||
if (error instanceof Prisma.PrismaClientKnownRequestError) {
|
||||
return IGNORED_PRISMA_CODES.has(error.code);
|
||||
}
|
||||
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
|
||||
return /transaction is aborted|transaction already closed/i.test(
|
||||
error.message
|
||||
);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class EventsListener {
|
||||
private readonly logger = new Logger(EventsListener.name);
|
||||
|
||||
constructor(private readonly models: Models) {}
|
||||
|
||||
@OnEvent('doc.created')
|
||||
@@ -15,6 +32,33 @@ export class EventsListener {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.models.docUser.setOwner(workspaceId, docId, editor);
|
||||
const workspace = await this.models.workspace.get(workspaceId);
|
||||
if (!workspace) {
|
||||
this.logger.warn(
|
||||
`Skip default doc owner event for missing workspace ${workspaceId}/${docId}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const user = await this.models.user.get(editor);
|
||||
if (!user) {
|
||||
this.logger.warn(
|
||||
`Skip default doc owner event for missing editor ${workspaceId}/${docId}/${editor}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.models.docUser.setOwner(workspaceId, docId, editor);
|
||||
} catch (error) {
|
||||
if (isIgnorablePermissionEventError(error)) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(
|
||||
`Ignore stale doc owner event for ${workspaceId}/${docId}/${editor}: ${message}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,16 @@ import { isMobileRequest } from '../utils/user-agent';
|
||||
|
||||
const staticPathRegex = /^\/(_plugin|assets|imgs|js|plugins|static)\//;
|
||||
|
||||
function isMissingStaticAssetError(error: unknown) {
|
||||
if (!error || typeof error !== 'object') {
|
||||
return false;
|
||||
}
|
||||
|
||||
const err = error as { code?: string; status?: number; statusCode?: number };
|
||||
|
||||
return err.code === 'ENOENT' || err.status === 404 || err.statusCode === 404;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class StaticFilesResolver implements OnModuleInit {
|
||||
constructor(
|
||||
@@ -86,7 +96,18 @@ export class StaticFilesResolver implements OnModuleInit {
|
||||
next();
|
||||
return;
|
||||
}
|
||||
routeByUA(req, res, next, true);
|
||||
routeByUA(
|
||||
req,
|
||||
res,
|
||||
error => {
|
||||
if (isMissingStaticAssetError(error)) {
|
||||
res.status(404).end();
|
||||
return;
|
||||
}
|
||||
next(error);
|
||||
},
|
||||
true
|
||||
);
|
||||
});
|
||||
|
||||
// /
|
||||
|
||||
@@ -210,6 +210,9 @@ export class SpaceSyncGateway
|
||||
private readonly server!: Server;
|
||||
|
||||
private connectionCount = 0;
|
||||
private readonly socketUsers = new Map<string, string>();
|
||||
private readonly localUserConnectionCounts = new Map<string, number>();
|
||||
private unresolvedPresenceSockets = 0;
|
||||
private flushTimer?: NodeJS.Timeout;
|
||||
|
||||
constructor(
|
||||
@@ -224,7 +227,9 @@ export class SpaceSyncGateway
|
||||
onModuleInit() {
|
||||
this.flushTimer = setInterval(() => {
|
||||
this.flushActiveUsersMinute().catch(error => {
|
||||
this.logger.warn('Failed to flush active users minute', error as Error);
|
||||
this.logger.warn(
|
||||
`Failed to flush active users minute: ${this.formatError(error)}`
|
||||
);
|
||||
});
|
||||
}, 60_000);
|
||||
this.flushTimer.unref?.();
|
||||
@@ -278,8 +283,7 @@ export class SpaceSyncGateway
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
'Failed to merge updates for broadcast, falling back to batch',
|
||||
error as Error
|
||||
`Failed to merge updates for broadcast, falling back to batch: ${this.formatError(error)}`
|
||||
);
|
||||
return {
|
||||
spaceType,
|
||||
@@ -302,14 +306,20 @@ export class SpaceSyncGateway
|
||||
this.connectionCount++;
|
||||
this.logger.debug(`New connection, total: ${this.connectionCount}`);
|
||||
metrics.socketio.gauge('connections').record(this.connectionCount);
|
||||
this.attachPresenceUserId(client);
|
||||
this.flushActiveUsersMinute().catch(error => {
|
||||
this.logger.warn('Failed to flush active users minute', error as Error);
|
||||
const userId = this.attachPresenceUserId(client);
|
||||
this.trackConnectedSocket(client.id, userId);
|
||||
void this.flushActiveUsersMinute({
|
||||
aggregateAcrossCluster: false,
|
||||
}).catch(error => {
|
||||
this.logger.warn(
|
||||
`Failed to flush active users minute: ${this.formatError(error)}`
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
handleDisconnect(_client: Socket) {
|
||||
handleDisconnect(client: Socket) {
|
||||
this.connectionCount = Math.max(0, this.connectionCount - 1);
|
||||
this.trackDisconnectedSocket(client.id);
|
||||
this.logger.debug(
|
||||
`Connection disconnected, total: ${this.connectionCount}`
|
||||
);
|
||||
@@ -317,21 +327,24 @@ export class SpaceSyncGateway
|
||||
void this.flushActiveUsersMinute({
|
||||
aggregateAcrossCluster: false,
|
||||
}).catch(error => {
|
||||
this.logger.warn('Failed to flush active users minute', error as Error);
|
||||
this.logger.warn(
|
||||
`Failed to flush active users minute: ${this.formatError(error)}`
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private attachPresenceUserId(client: Socket) {
|
||||
private attachPresenceUserId(client: Socket): string | null {
|
||||
const request = client.request as Request;
|
||||
const userId = request.session?.user.id ?? request.token?.user.id;
|
||||
if (typeof userId !== 'string' || !userId) {
|
||||
this.logger.warn(
|
||||
`Unable to resolve authenticated user id for socket ${client.id}`
|
||||
);
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
client.data[SOCKET_PRESENCE_USER_ID_KEY] = userId;
|
||||
return userId;
|
||||
}
|
||||
|
||||
private resolvePresenceUserId(socket: { data?: unknown }) {
|
||||
@@ -345,6 +358,60 @@ export class SpaceSyncGateway
|
||||
return typeof userId === 'string' && userId ? userId : null;
|
||||
}
|
||||
|
||||
private trackConnectedSocket(socketId: string, userId: string | null) {
|
||||
if (!userId) {
|
||||
this.unresolvedPresenceSockets++;
|
||||
return;
|
||||
}
|
||||
|
||||
this.socketUsers.set(socketId, userId);
|
||||
const prev = this.localUserConnectionCounts.get(userId) ?? 0;
|
||||
this.localUserConnectionCounts.set(userId, prev + 1);
|
||||
}
|
||||
|
||||
private trackDisconnectedSocket(socketId: string) {
|
||||
const userId = this.socketUsers.get(socketId);
|
||||
if (!userId) {
|
||||
this.unresolvedPresenceSockets = Math.max(
|
||||
0,
|
||||
this.unresolvedPresenceSockets - 1
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
this.socketUsers.delete(socketId);
|
||||
const next = (this.localUserConnectionCounts.get(userId) ?? 1) - 1;
|
||||
if (next <= 0) {
|
||||
this.localUserConnectionCounts.delete(userId);
|
||||
} else {
|
||||
this.localUserConnectionCounts.set(userId, next);
|
||||
}
|
||||
}
|
||||
|
||||
private resolveLocalActiveUsers() {
|
||||
if (this.unresolvedPresenceSockets > 0) {
|
||||
return Math.max(0, this.connectionCount);
|
||||
}
|
||||
|
||||
return this.localUserConnectionCounts.size;
|
||||
}
|
||||
|
||||
private formatError(error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
return error.stack ?? error.message;
|
||||
}
|
||||
|
||||
if (typeof error === 'string') {
|
||||
return error;
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.stringify(error);
|
||||
} catch {
|
||||
return String(error);
|
||||
}
|
||||
}
|
||||
|
||||
private async flushActiveUsersMinute(options?: {
|
||||
aggregateAcrossCluster?: boolean;
|
||||
}) {
|
||||
@@ -352,7 +419,7 @@ export class SpaceSyncGateway
|
||||
minute.setSeconds(0, 0);
|
||||
|
||||
const aggregateAcrossCluster = options?.aggregateAcrossCluster ?? true;
|
||||
let activeUsers = Math.max(0, this.connectionCount);
|
||||
let activeUsers = this.resolveLocalActiveUsers();
|
||||
if (aggregateAcrossCluster) {
|
||||
try {
|
||||
const sockets = await this.server.fetchSockets();
|
||||
@@ -377,8 +444,7 @@ export class SpaceSyncGateway
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
'Failed to aggregate active users from sockets, using local value',
|
||||
error as Error
|
||||
`Failed to aggregate active users from sockets, using local value: ${this.formatError(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
export const SHARE_ACTION_ACCOUNT_AGE_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
const URL_OR_DOMAIN_PATTERN =
|
||||
/(?:https?:\/\/|www\.|(?<![@\w-])(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z]{2,63}(?:[/?#:]|$))/i;
|
||||
|
||||
export function containsUrlOrDomain(value: string | null | undefined) {
|
||||
return URL_OR_DOMAIN_PATTERN.test(value ?? '');
|
||||
}
|
||||
|
||||
export function isUserOldEnoughForShareActions(user: { createdAt: Date }) {
|
||||
return Date.now() - user.createdAt.getTime() >= SHARE_ACTION_ACCOUNT_AGE_MS;
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import { PrismaClient } from '@prisma/client';
|
||||
import { SafeIntResolver } from 'graphql-scalars';
|
||||
|
||||
import {
|
||||
ActionForbidden,
|
||||
Cache,
|
||||
DocActionDenied,
|
||||
DocDefaultRoleCanNotBeOwner,
|
||||
@@ -40,6 +41,7 @@ import {
|
||||
DocRole,
|
||||
} from '../../permission';
|
||||
import { PublicUserType, WorkspaceUserType } from '../../user';
|
||||
import { isUserOldEnoughForShareActions } from '../abuse';
|
||||
import { WorkspaceType } from '../types';
|
||||
import { TimeBucket, TimeWindow } from './analytics-types';
|
||||
import {
|
||||
@@ -299,6 +301,15 @@ export class WorkspaceDocResolver {
|
||||
private readonly cache: Cache
|
||||
) {}
|
||||
|
||||
private async assertCanShare(userId: string) {
|
||||
const user = await this.models.user.get(userId);
|
||||
if (!user || !isUserOldEnoughForShareActions(user)) {
|
||||
throw new ActionForbidden(
|
||||
'Sharing links is unavailable during the first 24 hours after signup.'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ResolveField(() => WorkspaceDocMeta, {
|
||||
description: 'Cloud page metadata of workspace',
|
||||
complexity: 2,
|
||||
@@ -413,6 +424,7 @@ export class WorkspaceDocResolver {
|
||||
}
|
||||
|
||||
await this.ac.user(user.id).doc(workspaceId, docId).assert('Doc.Publish');
|
||||
await this.assertCanShare(user.id);
|
||||
|
||||
const doc = await this.models.doc.publish(workspaceId, docId, mode);
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import {
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import {
|
||||
ActionForbidden,
|
||||
ActionForbiddenOnNonTeamWorkspace,
|
||||
AlreadyInSpace,
|
||||
AuthenticationRequired,
|
||||
@@ -40,6 +41,7 @@ import { AccessController, WorkspaceRole } from '../../permission';
|
||||
import { QuotaService } from '../../quota';
|
||||
import { UserType } from '../../user';
|
||||
import { validators } from '../../utils/validators';
|
||||
import { containsUrlOrDomain, isUserOldEnoughForShareActions } from '../abuse';
|
||||
import { WorkspaceService } from '../service';
|
||||
import {
|
||||
InvitationType,
|
||||
@@ -68,6 +70,24 @@ export class WorkspaceMemberResolver {
|
||||
private readonly quota: QuotaService
|
||||
) {}
|
||||
|
||||
private async assertCanInviteOrShare(userId: string) {
|
||||
const user = await this.models.user.get(userId);
|
||||
if (!user || !isUserOldEnoughForShareActions(user)) {
|
||||
throw new ActionForbidden(
|
||||
'Inviting members and creating share links are unavailable during the first 24 hours after signup.'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async assertWorkspaceNameCanInvite(workspaceId: string) {
|
||||
const workspace = await this.workspaceService.getWorkspaceInfo(workspaceId);
|
||||
if (containsUrlOrDomain(workspace.name)) {
|
||||
throw new ActionForbidden(
|
||||
'Workspace names containing links or domains cannot be used to invite members.'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ResolveField(() => UserType, {
|
||||
description: 'Owner of workspace',
|
||||
complexity: 2,
|
||||
@@ -141,6 +161,8 @@ export class WorkspaceMemberResolver {
|
||||
.user(me.id)
|
||||
.workspace(workspaceId)
|
||||
.assert('Workspace.Users.Manage');
|
||||
await this.assertCanInviteOrShare(me.id);
|
||||
await this.assertWorkspaceNameCanInvite(workspaceId);
|
||||
|
||||
if (emails.length > 512) {
|
||||
throw new TooManyRequest();
|
||||
@@ -272,6 +294,8 @@ export class WorkspaceMemberResolver {
|
||||
.user(user.id)
|
||||
.workspace(workspaceId)
|
||||
.assert('Workspace.Users.Manage');
|
||||
await this.assertCanInviteOrShare(user.id);
|
||||
await this.assertWorkspaceNameCanInvite(workspaceId);
|
||||
|
||||
const cacheWorkspaceId = `workspace:inviteLink:${workspaceId}`;
|
||||
const invite = await this.cache.get<{ inviteId: string }>(cacheWorkspaceId);
|
||||
|
||||
@@ -83,95 +83,70 @@ export const Renderers = {
|
||||
//#region Workspace
|
||||
MemberInvitation: make(
|
||||
Invitation,
|
||||
props => `${props.user.email} invited you to join ${props.workspace.name}`
|
||||
'You were invited to join a workspace on AFFiNE'
|
||||
),
|
||||
MemberAccepted: make(
|
||||
InvitationAccepted,
|
||||
props => `${props.user.email} accepted your invitation`
|
||||
),
|
||||
MemberLeave: make(
|
||||
MemberLeave,
|
||||
props => `${props.user.email} left ${props.workspace.name}`
|
||||
'Your workspace invitation was accepted'
|
||||
),
|
||||
MemberLeave: make(MemberLeave, 'A workspace member left'),
|
||||
LinkInvitationReviewRequest: make(
|
||||
LinkInvitationReviewRequest,
|
||||
props => `New request to join ${props.workspace.name}`
|
||||
'New request to join a workspace'
|
||||
),
|
||||
LinkInvitationApprove: make(
|
||||
LinkInvitationApproved,
|
||||
props => `Your request to join ${props.workspace.name} has been approved`
|
||||
'Your request to join a workspace has been approved'
|
||||
),
|
||||
LinkInvitationDecline: make(
|
||||
LinkInvitationReviewDeclined,
|
||||
props => `Your request to join ${props.workspace.name} was declined`
|
||||
),
|
||||
MemberRemoved: make(
|
||||
MemberRemoved,
|
||||
props => `You have been removed from ${props.workspace.name}`
|
||||
'Your request to join a workspace was declined'
|
||||
),
|
||||
MemberRemoved: make(MemberRemoved, 'You have been removed from a workspace'),
|
||||
OwnershipTransferred: make(
|
||||
OwnershipTransferred,
|
||||
props => `Your ownership of ${props.workspace.name} has been transferred`
|
||||
'Your workspace ownership has been transferred'
|
||||
),
|
||||
OwnershipReceived: make(
|
||||
OwnershipReceived,
|
||||
props => `You are now the owner of ${props.workspace.name}`
|
||||
'You are now the owner of a workspace'
|
||||
),
|
||||
//#endregion
|
||||
|
||||
//#region Doc
|
||||
Mention: make(
|
||||
Mention,
|
||||
props => `${props.user.email} mentioned you in ${props.doc.title}`
|
||||
),
|
||||
Comment: make(
|
||||
Comment,
|
||||
props => `${props.user.email} commented on ${props.doc.title}`
|
||||
),
|
||||
CommentMention: make(
|
||||
CommentMention,
|
||||
props =>
|
||||
`${props.user.email} mentioned you in a comment on ${props.doc.title}`
|
||||
),
|
||||
Mention: make(Mention, 'You were mentioned in AFFiNE'),
|
||||
Comment: make(Comment, 'New comment in AFFiNE'),
|
||||
CommentMention: make(CommentMention, 'You were mentioned in a comment'),
|
||||
//#endregion
|
||||
|
||||
//#region Team
|
||||
TeamWorkspaceUpgraded: make(TeamWorkspaceUpgraded, props =>
|
||||
props.isOwner
|
||||
? 'Your workspace has been upgraded to team workspace! 🎉'
|
||||
: `${props.workspace.name} has been upgraded to team workspace! 🎉`
|
||||
),
|
||||
TeamBecomeAdmin: make(
|
||||
TeamBecomeAdmin,
|
||||
props => `You are now an admin of ${props.workspace.name}`
|
||||
: 'A workspace has been upgraded to team workspace! 🎉'
|
||||
),
|
||||
TeamBecomeAdmin: make(TeamBecomeAdmin, 'You are now a workspace admin'),
|
||||
TeamBecomeCollaborator: make(
|
||||
TeamBecomeCollaborator,
|
||||
props => `Your role has been changed in ${props.workspace.name}`
|
||||
'Your workspace role has been changed'
|
||||
),
|
||||
TeamDeleteIn24Hours: make(
|
||||
TeamDeleteIn24Hours,
|
||||
props =>
|
||||
`[Action Required] Final warning: Your workspace ${props.workspace.name} will be deleted in 24 hours`
|
||||
'[Action Required] Final warning: Your workspace will be deleted in 24 hours'
|
||||
),
|
||||
TeamDeleteInOneMonth: make(
|
||||
TeamDeleteInOneMonth,
|
||||
props =>
|
||||
`[Action Required] Important: Your workspace ${props.workspace.name} will be deleted soon`
|
||||
'[Action Required] Important: Your workspace will be deleted soon'
|
||||
),
|
||||
TeamWorkspaceDeleted: make(
|
||||
TeamWorkspaceDeleted,
|
||||
props => `Your workspace ${props.workspace.name} has been deleted`
|
||||
'Your workspace has been deleted'
|
||||
),
|
||||
TeamWorkspaceExpireSoon: make(
|
||||
TeamExpireSoon,
|
||||
props =>
|
||||
`[Action Required] Your ${props.workspace.name} team workspace will expire soon`
|
||||
),
|
||||
TeamWorkspaceExpired: make(
|
||||
TeamExpired,
|
||||
props => `Your ${props.workspace.name} team workspace has expired`
|
||||
'[Action Required] Your team workspace will expire soon'
|
||||
),
|
||||
TeamWorkspaceExpired: make(TeamExpired, 'Your team workspace has expired'),
|
||||
//#endregion
|
||||
|
||||
//#region License
|
||||
|
||||
@@ -92,7 +92,7 @@ export class CalendarAccountModel extends BaseModel {
|
||||
scope: input.scope ?? null,
|
||||
status: input.status ?? 'active',
|
||||
lastError: input.lastError ?? null,
|
||||
refreshIntervalMinutes: input.refreshIntervalMinutes ?? 60,
|
||||
refreshIntervalMinutes: input.refreshIntervalMinutes ?? 30,
|
||||
};
|
||||
|
||||
const updateData: Prisma.CalendarAccountUncheckedUpdateInput = {
|
||||
|
||||
@@ -17,6 +17,8 @@ export interface UpsertCalendarSubscriptionInput {
|
||||
export interface UpdateCalendarSubscriptionSyncInput {
|
||||
syncToken?: string | null;
|
||||
lastSyncAt?: Date | null;
|
||||
nextSyncAt?: Date;
|
||||
syncRetryCount?: number;
|
||||
}
|
||||
|
||||
export interface UpdateCalendarSubscriptionChannelInput {
|
||||
@@ -81,13 +83,21 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
}
|
||||
|
||||
async updateSync(id: string, input: UpdateCalendarSubscriptionSyncInput) {
|
||||
return await this.db.calendarSubscription.update({
|
||||
where: { id },
|
||||
data: {
|
||||
syncToken: input.syncToken ?? null,
|
||||
lastSyncAt: input.lastSyncAt ?? null,
|
||||
},
|
||||
});
|
||||
const data: Prisma.CalendarSubscriptionUncheckedUpdateInput = {};
|
||||
if (input.syncToken !== undefined) {
|
||||
data.syncToken = input.syncToken ?? null;
|
||||
}
|
||||
if (input.lastSyncAt !== undefined) {
|
||||
data.lastSyncAt = input.lastSyncAt ?? null;
|
||||
}
|
||||
if (input.nextSyncAt !== undefined) {
|
||||
data.nextSyncAt = input.nextSyncAt;
|
||||
}
|
||||
if (input.syncRetryCount !== undefined) {
|
||||
data.syncRetryCount = input.syncRetryCount;
|
||||
}
|
||||
|
||||
return await this.db.calendarSubscription.update({ where: { id }, data });
|
||||
}
|
||||
|
||||
async updateChannel(
|
||||
@@ -155,10 +165,16 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
});
|
||||
}
|
||||
|
||||
async listAllWithAccountForSync() {
|
||||
async listDueForSync(now: Date, limit: number) {
|
||||
return await this.db.calendarSubscription.findMany({
|
||||
where: { enabled: true },
|
||||
include: { account: true },
|
||||
where: {
|
||||
enabled: true,
|
||||
nextSyncAt: { lte: now },
|
||||
account: { status: 'active' },
|
||||
},
|
||||
select: { id: true },
|
||||
orderBy: { nextSyncAt: 'asc' },
|
||||
take: limit,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -169,13 +185,6 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
});
|
||||
}
|
||||
|
||||
async updateLastSyncAt(id: string, lastSyncAt: Date) {
|
||||
return await this.db.calendarSubscription.update({
|
||||
where: { id },
|
||||
data: { lastSyncAt },
|
||||
});
|
||||
}
|
||||
|
||||
async clearSyncTokensByAccount(accountId: string) {
|
||||
return await this.db.calendarSubscription.updateMany({
|
||||
where: { accountId },
|
||||
@@ -200,6 +209,7 @@ export class CalendarSubscriptionModel extends BaseModel {
|
||||
data: {
|
||||
enabled: false,
|
||||
syncToken: null,
|
||||
syncRetryCount: 0,
|
||||
customChannelId: null,
|
||||
customResourceId: null,
|
||||
channelExpiration: null,
|
||||
|
||||
@@ -2,6 +2,7 @@ import assert from 'node:assert';
|
||||
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Transactional } from '@nestjs-cls/transactional';
|
||||
import type { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
|
||||
import { WorkspaceDocUserRole } from '@prisma/client';
|
||||
|
||||
import { CanNotBatchGrantDocOwnerPermissions, PaginationInput } from '../base';
|
||||
@@ -14,31 +15,20 @@ export class DocUserModel extends BaseModel {
|
||||
* Set or update the [Owner] of a doc.
|
||||
* The old [Owner] will be changed to [Manager] if there is already an [Owner].
|
||||
*/
|
||||
@Transactional()
|
||||
@Transactional<TransactionalAdapterPrisma>({ timeout: 15000 })
|
||||
async setOwner(workspaceId: string, docId: string, userId: string) {
|
||||
const oldOwner = await this.db.workspaceDocUserRole.findFirst({
|
||||
await this.db.workspaceDocUserRole.updateMany({
|
||||
where: {
|
||||
workspaceId,
|
||||
docId,
|
||||
type: DocRole.Owner,
|
||||
userId: { not: userId },
|
||||
},
|
||||
data: {
|
||||
type: DocRole.Manager,
|
||||
},
|
||||
});
|
||||
|
||||
if (oldOwner) {
|
||||
await this.db.workspaceDocUserRole.update({
|
||||
where: {
|
||||
workspaceId_docId_userId: {
|
||||
workspaceId,
|
||||
docId,
|
||||
userId: oldOwner.userId,
|
||||
},
|
||||
},
|
||||
data: {
|
||||
type: DocRole.Manager,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
await this.db.workspaceDocUserRole.upsert({
|
||||
where: {
|
||||
workspaceId_docId_userId: {
|
||||
@@ -57,16 +47,9 @@ export class DocUserModel extends BaseModel {
|
||||
type: DocRole.Owner,
|
||||
},
|
||||
});
|
||||
|
||||
if (oldOwner) {
|
||||
this.logger.log(
|
||||
`Transfer doc owner of [${workspaceId}/${docId}] from [${oldOwner.userId}] to [${userId}]`
|
||||
);
|
||||
} else {
|
||||
this.logger.log(
|
||||
`Set doc owner of [${workspaceId}/${docId}] to [${userId}]`
|
||||
);
|
||||
}
|
||||
this.logger.log(
|
||||
`Set doc owner of [${workspaceId}/${docId}] to [${userId}]`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -14,6 +14,7 @@ import type {
|
||||
} from '../../../models';
|
||||
import { Models } from '../../../models';
|
||||
import { CalendarModule } from '../index';
|
||||
import { CalendarCronJobs } from '../cron';
|
||||
import {
|
||||
CalendarProvider,
|
||||
CalendarProviderFactory,
|
||||
@@ -85,6 +86,7 @@ const module = await createModule({
|
||||
],
|
||||
});
|
||||
const calendarService = module.get(CalendarService);
|
||||
const calendarCronJobs = module.get(CalendarCronJobs);
|
||||
const providerFactory = module.get(CalendarProviderFactory);
|
||||
const models = module.get(Models);
|
||||
module.get(CryptoHelper).onConfigInit();
|
||||
@@ -113,6 +115,8 @@ const createSubscription = async (
|
||||
accountId: string,
|
||||
overrides: Partial<UpsertCalendarSubscriptionInput> & {
|
||||
syncToken?: string | null;
|
||||
nextSyncAt?: Date;
|
||||
syncRetryCount?: number;
|
||||
customChannelId?: string | null;
|
||||
customResourceId?: string | null;
|
||||
channelExpiration?: Date | null;
|
||||
@@ -134,6 +138,20 @@ const createSubscription = async (
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
overrides.nextSyncAt !== undefined ||
|
||||
overrides.syncRetryCount !== undefined
|
||||
) {
|
||||
await models.calendarSubscription.updateSync(subscription.id, {
|
||||
...(overrides.nextSyncAt !== undefined
|
||||
? { nextSyncAt: overrides.nextSyncAt }
|
||||
: {}),
|
||||
...(overrides.syncRetryCount !== undefined
|
||||
? { syncRetryCount: overrides.syncRetryCount }
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
overrides.customChannelId !== undefined ||
|
||||
overrides.customResourceId !== undefined ||
|
||||
@@ -151,6 +169,8 @@ const createSubscription = async (
|
||||
|
||||
test.afterEach.always(() => {
|
||||
mock.reset();
|
||||
module.queue.add.resetHistory();
|
||||
module.queue.remove.resetHistory();
|
||||
});
|
||||
|
||||
test.after.always(async () => {
|
||||
@@ -252,6 +272,9 @@ test('syncSubscription resets invalid sync token and maps events', async t => {
|
||||
const updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.is(updated?.syncToken, 'next-token');
|
||||
t.truthy(updated?.lastSyncAt);
|
||||
t.is(updated?.syncRetryCount, 0);
|
||||
t.truthy(updated?.nextSyncAt);
|
||||
t.true(updated!.nextSyncAt.getTime() > updated!.lastSyncAt!.getTime());
|
||||
|
||||
const events = await models.calendarEvent.listBySubscriptionsInRange(
|
||||
[subscription.id],
|
||||
@@ -493,51 +516,22 @@ test('syncSubscription applies exponential backoff for repeated failures', async
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
let updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 1);
|
||||
t.is(updated?.syncRetryCount, 1);
|
||||
t.is(
|
||||
updated?.nextSyncAt.toISOString(),
|
||||
new Date(now + baseDelayMs).toISOString()
|
||||
);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
});
|
||||
|
||||
test('syncSubscription skips token refresh while in backoff window', async t => {
|
||||
let now = new Date('2026-01-01T00:00:00.000Z').getTime();
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id, {
|
||||
accessToken: 'expired-access-token',
|
||||
expiresAt: new Date(now - 5 * 60 * 1000),
|
||||
});
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
const refreshMock = mock.method(provider, 'refreshTokens', async () => ({
|
||||
accessToken: `refreshed-${randomUUID()}`,
|
||||
}));
|
||||
const listEventsMock = mock.method(provider, 'listEvents', async () => {
|
||||
throw new Error('upstream timeout');
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
const baseDelayMs = 5 * 60 * 1000;
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(refreshMock.mock.callCount(), 1);
|
||||
t.is(listEventsMock.mock.callCount(), 1);
|
||||
|
||||
now += baseDelayMs + 1000;
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
t.is(refreshMock.mock.callCount(), 2);
|
||||
t.is(listEventsMock.mock.callCount(), 2);
|
||||
t.is(updated?.syncRetryCount, 2);
|
||||
t.is(
|
||||
updated?.nextSyncAt.toISOString(),
|
||||
new Date(now + baseDelayMs * 2).toISOString()
|
||||
);
|
||||
});
|
||||
|
||||
test('syncSubscription renews webhook channel when expiring', async t => {
|
||||
@@ -599,3 +593,73 @@ test('syncSubscription renews webhook channel when expiring', async t => {
|
||||
t.is(updated?.customResourceId, 'new-resource');
|
||||
t.truthy(updated?.channelExpiration);
|
||||
});
|
||||
|
||||
test('syncSubscription keeps schedule moving when webhook renewal fails', async t => {
|
||||
const now = new Date('2026-01-01T00:00:00.000Z').getTime();
|
||||
mock.method(Date, 'now', () => now);
|
||||
|
||||
const user = await module.create(Mockers.User);
|
||||
const account = await createAccount(user.id, {
|
||||
refreshIntervalMinutes: 60,
|
||||
});
|
||||
const subscription = await createSubscription(account.id, {
|
||||
syncToken: 'sync-token',
|
||||
channelExpiration: new Date(Date.now() + 60 * 60 * 1000),
|
||||
});
|
||||
|
||||
const provider = new MockCalendarProvider();
|
||||
mock.method(provider, 'listEvents', async () => ({
|
||||
events: [],
|
||||
nextSyncToken: 'next-sync',
|
||||
}));
|
||||
mock.method(provider, 'watchCalendar', async () => {
|
||||
throw new Error('watch failed');
|
||||
});
|
||||
mock.method(providerFactory, 'get', () => provider);
|
||||
|
||||
await calendarService.syncSubscription(subscription.id);
|
||||
|
||||
const updated = await models.calendarSubscription.get(subscription.id);
|
||||
t.truthy(updated?.lastSyncAt);
|
||||
t.is(updated?.syncRetryCount, 0);
|
||||
t.is(
|
||||
updated?.nextSyncAt.toISOString(),
|
||||
new Date(now + 15 * 60 * 1000).toISOString()
|
||||
);
|
||||
});
|
||||
|
||||
test('pollAccounts skips when nothing is due', async t => {
|
||||
mock.method(models.calendarSubscription, 'listDueForSync', async () => []);
|
||||
|
||||
await calendarCronJobs.pollAccounts();
|
||||
|
||||
t.is(module.queue.count('calendar.syncSubscription'), 0);
|
||||
});
|
||||
|
||||
test('pollAccounts enqueues due subscriptions only', async t => {
|
||||
mock.method(models.calendarSubscription, 'listDueForSync', async () => [
|
||||
{ id: 'due-subscription-a' },
|
||||
{ id: 'due-subscription-b' },
|
||||
]);
|
||||
|
||||
await calendarCronJobs.pollAccounts();
|
||||
|
||||
t.is(module.queue.count('calendar.syncSubscription'), 2);
|
||||
t.deepEqual(
|
||||
module.queue.add
|
||||
.getCalls()
|
||||
.map(call => [call.args[0], call.args[1], call.args[2]]),
|
||||
[
|
||||
[
|
||||
'calendar.syncSubscription',
|
||||
{ subscriptionId: 'due-subscription-a', reason: 'polling' },
|
||||
{ jobId: 'due-subscription-a' },
|
||||
],
|
||||
[
|
||||
'calendar.syncSubscription',
|
||||
{ subscriptionId: 'due-subscription-b', reason: 'polling' },
|
||||
{ jobId: 'due-subscription-b' },
|
||||
],
|
||||
]
|
||||
);
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@ export interface CalendarGoogleConfig {
|
||||
clientSecret: string;
|
||||
externalWebhookUrl?: string;
|
||||
webhookVerificationToken?: string;
|
||||
requestTimeoutMs?: number;
|
||||
}
|
||||
|
||||
export type CalendarCalDAVAuthType = 'auto' | 'basic' | 'digest';
|
||||
@@ -49,6 +50,7 @@ const schema: JSONSchema = {
|
||||
clientSecret: { type: 'string' },
|
||||
externalWebhookUrl: { type: 'string' },
|
||||
webhookVerificationToken: { type: 'string' },
|
||||
requestTimeoutMs: { type: 'number' },
|
||||
},
|
||||
};
|
||||
|
||||
@@ -88,6 +90,7 @@ defineModuleConfig('calendar', {
|
||||
clientSecret: '',
|
||||
externalWebhookUrl: '',
|
||||
webhookVerificationToken: '',
|
||||
requestTimeoutMs: 10_000,
|
||||
},
|
||||
schema,
|
||||
shape: z.object({
|
||||
@@ -101,6 +104,7 @@ defineModuleConfig('calendar', {
|
||||
.or(z.string().length(0))
|
||||
.optional(),
|
||||
webhookVerificationToken: z.string().optional(),
|
||||
requestTimeoutMs: z.number().int().positive().optional(),
|
||||
}),
|
||||
link: 'https://developers.google.com/calendar/api/guides/push',
|
||||
},
|
||||
|
||||
@@ -1,61 +1,33 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
|
||||
import { JobQueue } from '../../base';
|
||||
import { Models } from '../../models';
|
||||
import { CalendarService } from './service';
|
||||
|
||||
const CALENDAR_POLL_BATCH_SIZE = 200;
|
||||
|
||||
@Injectable()
|
||||
export class CalendarCronJobs {
|
||||
constructor(
|
||||
private readonly models: Models,
|
||||
private readonly calendar: CalendarService
|
||||
private readonly queue: JobQueue
|
||||
) {}
|
||||
|
||||
@Cron(CronExpression.EVERY_MINUTE)
|
||||
async pollAccounts() {
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listAllWithAccountForSync();
|
||||
const subscriptions = await this.models.calendarSubscription.listDueForSync(
|
||||
new Date(),
|
||||
CALENDAR_POLL_BATCH_SIZE
|
||||
);
|
||||
|
||||
const accountDueAt = new Map<
|
||||
string,
|
||||
{ refreshInterval: number; lastSyncAt: Date | null }
|
||||
>();
|
||||
|
||||
for (const subscription of subscriptions) {
|
||||
const interval = subscription.account.refreshIntervalMinutes ?? 60;
|
||||
const lastSyncAt = subscription.lastSyncAt ?? null;
|
||||
const existing = accountDueAt.get(subscription.accountId);
|
||||
if (!existing) {
|
||||
accountDueAt.set(subscription.accountId, {
|
||||
refreshInterval: interval,
|
||||
lastSyncAt,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
const earliest =
|
||||
existing.lastSyncAt && lastSyncAt
|
||||
? existing.lastSyncAt < lastSyncAt
|
||||
? existing.lastSyncAt
|
||||
: lastSyncAt
|
||||
: (existing.lastSyncAt ?? lastSyncAt);
|
||||
accountDueAt.set(subscription.accountId, {
|
||||
refreshInterval: interval,
|
||||
lastSyncAt: earliest,
|
||||
});
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
await Promise.allSettled(
|
||||
Array.from(accountDueAt.entries()).map(([accountId, info]) => {
|
||||
if (
|
||||
!info.lastSyncAt ||
|
||||
now - info.lastSyncAt.getTime() >= info.refreshInterval * 60 * 1000
|
||||
) {
|
||||
return this.calendar.syncAccount(accountId);
|
||||
}
|
||||
return Promise.resolve();
|
||||
})
|
||||
subscriptions.map(({ id }) =>
|
||||
this.queue.add(
|
||||
'calendar.syncSubscription',
|
||||
{ subscriptionId: id, reason: 'polling' },
|
||||
{ jobId: id }
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import { PermissionModule } from '../../core/permission';
|
||||
import { WorkspaceModule } from '../../core/workspaces';
|
||||
import { CalendarController } from './controller';
|
||||
import { CalendarCronJobs } from './cron';
|
||||
import { CalendarJob } from './job';
|
||||
import { CalendarOAuthService } from './oauth';
|
||||
import { CalendarProviderFactory, CalendarProviders } from './providers';
|
||||
import {
|
||||
@@ -25,6 +26,7 @@ import { CalendarService } from './service';
|
||||
...CalendarProviders,
|
||||
CalendarProviderFactory,
|
||||
CalendarService,
|
||||
CalendarJob,
|
||||
CalendarOAuthService,
|
||||
CalendarCronJobs,
|
||||
CalendarServerConfigResolver,
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { OnJob } from '../../base';
|
||||
import { CalendarService } from './service';
|
||||
|
||||
declare global {
|
||||
interface Jobs {
|
||||
'calendar.syncSubscription': {
|
||||
subscriptionId: string;
|
||||
reason?: 'polling' | 'webhook' | 'on-demand';
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class CalendarJob {
|
||||
constructor(private readonly calendar: CalendarService) {}
|
||||
|
||||
@OnJob('calendar.syncSubscription')
|
||||
async syncSubscription({
|
||||
subscriptionId,
|
||||
reason,
|
||||
}: Jobs['calendar.syncSubscription']) {
|
||||
await this.calendar.syncSubscription(subscriptionId, { reason });
|
||||
}
|
||||
}
|
||||
@@ -152,10 +152,28 @@ export abstract class CalendarProvider {
|
||||
}
|
||||
}
|
||||
|
||||
protected get requestTimeoutMs() {
|
||||
const timeout = (this.config as { requestTimeoutMs?: number } | undefined)
|
||||
?.requestTimeoutMs;
|
||||
return typeof timeout === 'number' && timeout > 0 ? timeout : undefined;
|
||||
}
|
||||
|
||||
protected withTimeout(signal?: AbortSignal | null) {
|
||||
const timeoutMs = this.requestTimeoutMs;
|
||||
if (!timeoutMs) return signal;
|
||||
|
||||
const timeoutSignal = AbortSignal.timeout(timeoutMs);
|
||||
if (!signal) return timeoutSignal;
|
||||
|
||||
return AbortSignal.any([signal, timeoutSignal]);
|
||||
}
|
||||
|
||||
protected async fetchJson<T>(url: string, init?: RequestInit) {
|
||||
const response = await fetch(url, {
|
||||
headers: { Accept: 'application/json', ...init?.headers },
|
||||
...init,
|
||||
signal: this.withTimeout(init?.signal),
|
||||
headers: { ...init?.headers, Accept: 'application/json' },
|
||||
});
|
||||
const body = await response.text();
|
||||
if (!response.ok) {
|
||||
|
||||
@@ -329,6 +329,7 @@ export class GoogleCalendarProvider extends CalendarProvider {
|
||||
|
||||
private async fetchWithTokenHandling<T>(url: string, accessToken: string) {
|
||||
const response = await fetch(url, {
|
||||
signal: this.withTimeout(),
|
||||
headers: {
|
||||
Accept: 'application/json',
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
|
||||
@@ -8,11 +8,11 @@ import { addDays, subDays } from 'date-fns';
|
||||
import {
|
||||
CalendarProviderRequestError,
|
||||
Config,
|
||||
exponentialBackoffDelay,
|
||||
GraphqlBadRequest,
|
||||
Mutex,
|
||||
JobQueue,
|
||||
URLHelper,
|
||||
} from '../../base';
|
||||
import { SessionRedis } from '../../base/redis';
|
||||
import { Models } from '../../models';
|
||||
import type { CalendarCalDAVProviderPreset } from './config';
|
||||
import {
|
||||
@@ -28,10 +28,10 @@ import type { LinkCalDAVAccountInput } from './types';
|
||||
const TOKEN_REFRESH_SKEW_MS = 60 * 1000;
|
||||
const DEFAULT_PAST_DAYS = 90;
|
||||
const DEFAULT_FUTURE_DAYS = 180;
|
||||
const SYNC_FAILURE_BACKOFF_KEY_PREFIX = 'calendar:sync:backoff:';
|
||||
const SYNC_FAILURE_BACKOFF_BASE_MS = 5 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000;
|
||||
const SYNC_FAILURE_BACKOFF_TTL_SECONDS = 24 * 60 * 60;
|
||||
const DEFAULT_REFRESH_INTERVAL_MINUTES = 30;
|
||||
const CHANNEL_RENEW_RETRY_MS = 15 * 60 * 1000;
|
||||
|
||||
@Injectable()
|
||||
export class CalendarService {
|
||||
@@ -41,8 +41,7 @@ export class CalendarService {
|
||||
constructor(
|
||||
private readonly models: Models,
|
||||
private readonly providerFactory: CalendarProviderFactory<CalendarProvider>,
|
||||
private readonly mutex: Mutex,
|
||||
private readonly redis: SessionRedis,
|
||||
private readonly queue: JobQueue,
|
||||
private readonly config: Config,
|
||||
private readonly url: URLHelper
|
||||
) {}
|
||||
@@ -85,10 +84,24 @@ export class CalendarService {
|
||||
return null;
|
||||
}
|
||||
|
||||
return await this.models.calendarAccount.updateRefreshInterval(
|
||||
accountId,
|
||||
refreshIntervalMinutes
|
||||
const updatedAccount =
|
||||
await this.models.calendarAccount.updateRefreshInterval(
|
||||
accountId,
|
||||
refreshIntervalMinutes
|
||||
);
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listByAccountForSync(accountId);
|
||||
await Promise.all(
|
||||
subscriptions.map(subscription =>
|
||||
this.models.calendarSubscription.updateSync(subscription.id, {
|
||||
nextSyncAt: this.calculateNextSyncAt(
|
||||
subscription.lastSyncAt ?? this.now(),
|
||||
refreshIntervalMinutes
|
||||
),
|
||||
})
|
||||
)
|
||||
);
|
||||
return updatedAccount;
|
||||
}
|
||||
|
||||
async unlinkAccount(userId: string, accountId: string) {
|
||||
@@ -313,25 +326,6 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const backoff = await this.getSyncFailureBackoff(subscription.id);
|
||||
if (backoff && now < backoff.nextRetryAt.getTime()) {
|
||||
return;
|
||||
}
|
||||
|
||||
await using lock = await this.mutex.acquire(
|
||||
`calendar:subscription:${subscriptionId}`
|
||||
);
|
||||
if (!lock) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lockedNow = Date.now();
|
||||
const lockedBackoff = await this.getSyncFailureBackoff(subscription.id);
|
||||
if (lockedBackoff && lockedNow < lockedBackoff.nextRetryAt.getTime()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const provider = this.providerFactory.get(
|
||||
account.provider as CalendarProviderName
|
||||
);
|
||||
@@ -415,29 +409,28 @@ export class CalendarService {
|
||||
}
|
||||
|
||||
if (synced) {
|
||||
await this.clearSyncFailureBackoff(subscription.id);
|
||||
await this.ensureWebhookChannel(subscription, provider, accessToken);
|
||||
const syncedAt = this.now();
|
||||
let nextSyncAt = this.calculateNextSyncAt(
|
||||
syncedAt,
|
||||
account.refreshIntervalMinutes
|
||||
);
|
||||
|
||||
try {
|
||||
await this.ensureWebhookChannel(subscription, provider, accessToken);
|
||||
} catch (error) {
|
||||
nextSyncAt = this.calculateChannelRetryAt(nextSyncAt);
|
||||
this.logger.warn(
|
||||
`Failed to ensure webhook channel for subscription ${subscription.id}`,
|
||||
this.toError(error)
|
||||
);
|
||||
}
|
||||
|
||||
await this.models.calendarSubscription.updateSync(subscription.id, {
|
||||
lastSyncAt: syncedAt,
|
||||
nextSyncAt,
|
||||
syncRetryCount: 0,
|
||||
});
|
||||
}
|
||||
|
||||
await this.models.calendarSubscription.updateLastSyncAt(
|
||||
subscription.id,
|
||||
new Date()
|
||||
);
|
||||
}
|
||||
|
||||
async syncAccount(accountId: string) {
|
||||
const account = await this.models.calendarAccount.get(accountId);
|
||||
if (!account || account.status !== 'active') {
|
||||
return;
|
||||
}
|
||||
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listByAccountForSync(accountId);
|
||||
await Promise.allSettled(
|
||||
subscriptions.map(subscription =>
|
||||
this.syncSubscription(subscription.id, { reason: 'polling' })
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
async listWorkspaceEvents(params: {
|
||||
@@ -455,9 +448,18 @@ export class CalendarService {
|
||||
params.to
|
||||
);
|
||||
|
||||
const subscriptions =
|
||||
await this.models.calendarSubscription.listWithAccounts(subscriptionIds);
|
||||
const staleSubscriptions = subscriptions.filter(
|
||||
subscription =>
|
||||
subscription.enabled &&
|
||||
subscription.account.status === 'active' &&
|
||||
subscription.nextSyncAt.getTime() <= this.nowMs()
|
||||
);
|
||||
|
||||
Promise.allSettled(
|
||||
subscriptionIds.map(subscriptionId =>
|
||||
this.syncSubscription(subscriptionId, { reason: 'on-demand' })
|
||||
staleSubscriptions.map(subscription =>
|
||||
this.enqueueSyncSubscription(subscription.id, 'on-demand')
|
||||
)
|
||||
).catch(error => {
|
||||
this.logger.warn('Calendar on-demand sync failed', error as Error);
|
||||
@@ -513,7 +515,7 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.syncSubscription(subscription.id, { reason: 'webhook' });
|
||||
await this.enqueueSyncSubscription(subscription.id, 'webhook');
|
||||
}
|
||||
|
||||
getWebhookToken() {
|
||||
@@ -747,7 +749,7 @@ export class CalendarService {
|
||||
}
|
||||
|
||||
private getSyncWindow() {
|
||||
const now = new Date();
|
||||
const now = this.now();
|
||||
return {
|
||||
timeMin: subDays(now, DEFAULT_PAST_DAYS).toISOString(),
|
||||
timeMax: addDays(now, DEFAULT_FUTURE_DAYS).toISOString(),
|
||||
@@ -767,7 +769,7 @@ export class CalendarService {
|
||||
if (
|
||||
accessToken &&
|
||||
account.expiresAt &&
|
||||
account.expiresAt.getTime() > Date.now() + TOKEN_REFRESH_SKEW_MS
|
||||
account.expiresAt.getTime() > this.nowMs() + TOKEN_REFRESH_SKEW_MS
|
||||
) {
|
||||
return { accessToken };
|
||||
}
|
||||
@@ -831,7 +833,7 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const renewThreshold = Date.now() + 24 * 60 * 60 * 1000;
|
||||
const renewThreshold = this.nowMs() + 24 * 60 * 60 * 1000;
|
||||
if (
|
||||
subscription.channelExpiration &&
|
||||
subscription.channelExpiration.getTime() > renewThreshold
|
||||
@@ -873,6 +875,7 @@ export class CalendarService {
|
||||
subscription: {
|
||||
id: string;
|
||||
externalCalendarId: string;
|
||||
syncRetryCount: number;
|
||||
customChannelId: string | null;
|
||||
customResourceId: string | null;
|
||||
};
|
||||
@@ -895,7 +898,6 @@ export class CalendarService {
|
||||
}
|
||||
|
||||
if (this.isTokenInvalidError(params.error)) {
|
||||
await this.clearSyncFailureBackoff(params.subscription.id);
|
||||
await this.models.calendarAccount.invalidateAndPurge(
|
||||
params.account.id,
|
||||
this.formatSyncError(params.error)
|
||||
@@ -903,18 +905,14 @@ export class CalendarService {
|
||||
return;
|
||||
}
|
||||
|
||||
const backoff = await this.bumpSyncFailureBackoff(params.subscription.id);
|
||||
const interval = params.account.refreshIntervalMinutes ?? 60;
|
||||
const lastSyncAt = this.calculateLastSyncAtForRetry(
|
||||
backoff.nextRetryAt,
|
||||
interval
|
||||
);
|
||||
await this.models.calendarSubscription.updateLastSyncAt(
|
||||
params.subscription.id,
|
||||
lastSyncAt
|
||||
);
|
||||
const attempt = params.subscription.syncRetryCount + 1;
|
||||
const nextRetryAt = this.calculateFailureRetryAt(attempt);
|
||||
await this.models.calendarSubscription.updateSync(params.subscription.id, {
|
||||
nextSyncAt: nextRetryAt,
|
||||
syncRetryCount: attempt,
|
||||
});
|
||||
this.logger.warn(
|
||||
`Calendar sync failed for subscription ${params.subscription.id}, attempt ${backoff.attempt}, next retry at ${backoff.nextRetryAt.toISOString()}`,
|
||||
`Calendar sync failed for subscription ${params.subscription.id}, attempt ${attempt}, next retry at ${nextRetryAt.toISOString()}`,
|
||||
this.toError(params.error)
|
||||
);
|
||||
}
|
||||
@@ -927,15 +925,6 @@ export class CalendarService {
|
||||
return status === 404;
|
||||
}
|
||||
|
||||
private calculateLastSyncAtForRetry(
|
||||
nextRetryAt: Date,
|
||||
refreshIntervalMinutes: number
|
||||
) {
|
||||
// Cron schedules by `now - lastSyncAt >= refreshInterval`, so back-calculate
|
||||
// a synthetic lastSyncAt to defer the next attempt to `nextRetryAt`.
|
||||
return new Date(nextRetryAt.getTime() - refreshIntervalMinutes * 60 * 1000);
|
||||
}
|
||||
|
||||
private async disableSubscription(params: {
|
||||
subscriptionId: string;
|
||||
provider: CalendarProvider;
|
||||
@@ -966,68 +955,52 @@ export class CalendarService {
|
||||
await this.models.calendarSubscription.disableAndPurge(
|
||||
params.subscriptionId
|
||||
);
|
||||
await this.clearSyncFailureBackoff(params.subscriptionId);
|
||||
}
|
||||
|
||||
private getSyncFailureBackoffKey(subscriptionId: string) {
|
||||
return `${SYNC_FAILURE_BACKOFF_KEY_PREFIX}${subscriptionId}`;
|
||||
}
|
||||
|
||||
private async getSyncFailureBackoff(subscriptionId: string) {
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
const value = await this.redis.get(key);
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(value) as {
|
||||
attempt?: number;
|
||||
nextRetryAt?: string;
|
||||
};
|
||||
if (!parsed.attempt || !parsed.nextRetryAt) {
|
||||
return null;
|
||||
async enqueueSyncSubscription(
|
||||
subscriptionId: string,
|
||||
reason: 'polling' | 'webhook' | 'on-demand'
|
||||
) {
|
||||
await this.queue.add(
|
||||
'calendar.syncSubscription',
|
||||
{
|
||||
subscriptionId,
|
||||
reason,
|
||||
},
|
||||
{
|
||||
jobId: subscriptionId,
|
||||
}
|
||||
const nextRetryAt = new Date(parsed.nextRetryAt);
|
||||
if (Number.isNaN(nextRetryAt.getTime())) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
attempt: parsed.attempt,
|
||||
nextRetryAt,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private async bumpSyncFailureBackoff(subscriptionId: string) {
|
||||
const state = await this.getSyncFailureBackoff(subscriptionId);
|
||||
const attempt = (state?.attempt ?? 0) + 1;
|
||||
const delay = Math.min(
|
||||
SYNC_FAILURE_BACKOFF_BASE_MS * 2 ** (attempt - 1),
|
||||
SYNC_FAILURE_BACKOFF_MAX_MS
|
||||
);
|
||||
const nextRetryAt = new Date(Date.now() + delay);
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
await this.redis.set(
|
||||
key,
|
||||
JSON.stringify({
|
||||
attempt,
|
||||
nextRetryAt: nextRetryAt.toISOString(),
|
||||
}),
|
||||
'EX',
|
||||
SYNC_FAILURE_BACKOFF_TTL_SECONDS
|
||||
);
|
||||
return {
|
||||
attempt,
|
||||
nextRetryAt,
|
||||
};
|
||||
private calculateNextSyncAt(base: Date, refreshIntervalMinutes?: number) {
|
||||
const intervalMinutes =
|
||||
refreshIntervalMinutes ?? DEFAULT_REFRESH_INTERVAL_MINUTES;
|
||||
return new Date(base.getTime() + intervalMinutes * 60 * 1000);
|
||||
}
|
||||
|
||||
private async clearSyncFailureBackoff(subscriptionId: string) {
|
||||
const key = this.getSyncFailureBackoffKey(subscriptionId);
|
||||
await this.redis.del(key);
|
||||
private calculateChannelRetryAt(nextSyncAt: Date) {
|
||||
return new Date(
|
||||
Math.min(nextSyncAt.getTime(), this.nowMs() + CHANNEL_RENEW_RETRY_MS)
|
||||
);
|
||||
}
|
||||
|
||||
private calculateFailureRetryAt(attempt: number) {
|
||||
return new Date(
|
||||
this.nowMs() +
|
||||
exponentialBackoffDelay(attempt - 1, {
|
||||
baseDelayMs: SYNC_FAILURE_BACKOFF_BASE_MS,
|
||||
maxDelayMs: SYNC_FAILURE_BACKOFF_MAX_MS,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private now() {
|
||||
return new Date(this.nowMs());
|
||||
}
|
||||
|
||||
private nowMs() {
|
||||
return Date.now();
|
||||
}
|
||||
|
||||
private formatSyncError(error: unknown) {
|
||||
|
||||
@@ -33,19 +33,37 @@ test('should not index workspace if indexer is disabled', async t => {
|
||||
const count = module.queue.count('indexer.indexWorkspace');
|
||||
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'test-workspace',
|
||||
});
|
||||
|
||||
t.is(module.queue.count('indexer.indexWorkspace'), count);
|
||||
});
|
||||
|
||||
test('should index workspace if indexer is enabled', async t => {
|
||||
test('should index workspace when root snapshot is updated', async t => {
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({ id: 'test-workspace' });
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'test-workspace',
|
||||
});
|
||||
|
||||
const { payload } = await module.queue.waitFor('indexer.indexWorkspace');
|
||||
t.is(payload.workspaceId, 'test-workspace');
|
||||
});
|
||||
|
||||
test('should not index workspace when non-root snapshot is updated', async t => {
|
||||
const count = module.queue.count('indexer.indexWorkspace');
|
||||
|
||||
// @ts-expect-error ignore missing fields
|
||||
await indexerEvent.indexWorkspace({
|
||||
workspaceId: 'test-workspace',
|
||||
docId: 'child-doc',
|
||||
});
|
||||
|
||||
t.is(module.queue.count('indexer.indexWorkspace'), count);
|
||||
});
|
||||
|
||||
test('should not delete workspace if indexer is disabled', async t => {
|
||||
Sinon.stub(config.indexer, 'enabled').value(false);
|
||||
const count = module.queue.count('indexer.deleteWorkspace');
|
||||
|
||||
@@ -29,21 +29,20 @@ export class IndexerEvent {
|
||||
);
|
||||
}
|
||||
|
||||
@OnEvent('workspace.updated')
|
||||
async indexWorkspace({ id }: Events['workspace.updated']) {
|
||||
@OnEvent('doc.snapshot.updated')
|
||||
async indexWorkspace({ workspaceId, docId }: Events['doc.snapshot.updated']) {
|
||||
if (!this.config.indexer.enabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (workspaceId !== docId) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.queue.add(
|
||||
'indexer.indexWorkspace',
|
||||
{
|
||||
workspaceId: id,
|
||||
},
|
||||
{
|
||||
jobId: `indexWorkspace/${id}`,
|
||||
priority: 100,
|
||||
}
|
||||
{ workspaceId },
|
||||
{ jobId: `indexWorkspace/${workspaceId}`, priority: 100 }
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -114,22 +114,17 @@ export class UserSubscriptionManager extends SubscriptionManager {
|
||||
throw new ManagedByAppStoreOrPlay();
|
||||
}
|
||||
|
||||
const subscription = await this.getSubscription({
|
||||
plan: lookupKey.plan,
|
||||
userId: user.id,
|
||||
});
|
||||
|
||||
if (
|
||||
subscription &&
|
||||
active &&
|
||||
// do not allow to re-subscribe unless
|
||||
!(
|
||||
/* current subscription is a onetime subscription and so as the one that's checking out */
|
||||
(
|
||||
(subscription.variant === SubscriptionVariant.Onetime &&
|
||||
(active.variant === SubscriptionVariant.Onetime &&
|
||||
lookupKey.variant === SubscriptionVariant.Onetime) ||
|
||||
/* current subscription is normal subscription and is checking-out a lifetime subscription */
|
||||
(subscription.recurring !== SubscriptionRecurring.Lifetime &&
|
||||
subscription.variant !== SubscriptionVariant.Onetime &&
|
||||
(active.recurring !== SubscriptionRecurring.Lifetime &&
|
||||
active.variant !== SubscriptionVariant.Onetime &&
|
||||
lookupKey.recurring === SubscriptionRecurring.Lifetime)
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1,10 +1,21 @@
|
||||
mutation sendTestEmail($host: String!, $port: Int!, $sender: String!, $username: String!, $password: String!, $ignoreTLS: Boolean!) {
|
||||
sendTestEmail(config: {
|
||||
host: $host,
|
||||
port: $port,
|
||||
sender: $sender,
|
||||
username: $username,
|
||||
password: $password,
|
||||
ignoreTLS: $ignoreTLS,
|
||||
})
|
||||
}
|
||||
mutation sendTestEmail(
|
||||
$name: String!
|
||||
$host: String!
|
||||
$port: Int!
|
||||
$sender: String!
|
||||
$username: String!
|
||||
$password: String!
|
||||
$ignoreTLS: Boolean!
|
||||
) {
|
||||
sendTestEmail(
|
||||
config: {
|
||||
name: $name
|
||||
host: $host
|
||||
port: $port
|
||||
sender: $sender
|
||||
username: $username
|
||||
password: $password
|
||||
ignoreTLS: $ignoreTLS
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -549,9 +549,9 @@ export const listUsersQuery = {
|
||||
export const sendTestEmailMutation = {
|
||||
id: 'sendTestEmailMutation' as const,
|
||||
op: 'sendTestEmail',
|
||||
query: `mutation sendTestEmail($host: String!, $port: Int!, $sender: String!, $username: String!, $password: String!, $ignoreTLS: Boolean!) {
|
||||
query: `mutation sendTestEmail($name: String!, $host: String!, $port: Int!, $sender: String!, $username: String!, $password: String!, $ignoreTLS: Boolean!) {
|
||||
sendTestEmail(
|
||||
config: {host: $host, port: $port, sender: $sender, username: $username, password: $password, ignoreTLS: $ignoreTLS}
|
||||
config: {name: $name, host: $host, port: $port, sender: $sender, username: $username, password: $password, ignoreTLS: $ignoreTLS}
|
||||
)
|
||||
}`,
|
||||
};
|
||||
|
||||
@@ -4027,6 +4027,7 @@ export type ListUsersQuery = {
|
||||
};
|
||||
|
||||
export type SendTestEmailMutationVariables = Exact<{
|
||||
name: Scalars['String']['input'];
|
||||
host: Scalars['String']['input'];
|
||||
port: Scalars['Int']['input'];
|
||||
sender: Scalars['String']['input'];
|
||||
|
||||
@@ -27,6 +27,10 @@
|
||||
"type": "Object",
|
||||
"desc": "The config for copilot job queue"
|
||||
},
|
||||
"queues.calendar": {
|
||||
"type": "Object",
|
||||
"desc": "The config for calendar job queue"
|
||||
},
|
||||
"queues.doc": {
|
||||
"type": "Object",
|
||||
"desc": "The config for doc job queue"
|
||||
|
||||
@@ -31,7 +31,7 @@ extension AFFiNEViewController: IntelligentsButtonDelegate {
|
||||
private func showAIConsentAlert() {
|
||||
let alert = UIAlertController(
|
||||
title: "AI Feature Data Usage",
|
||||
message: "To provide AI-powered features, your input (such as document content and conversation messages) will be sent to a third-party AI service for processing. This data is used solely to generate responses and is not used for any other purpose.\n\nBy continuing, you agree to share this data with the AI service.",
|
||||
message: "To provide AI-powered features, your input (such as document content and conversation messages) will be sent to our third-party AI service providers (Google, Anthropic, or OpenAI, based on your choice) for processing. This data is used solely to generate responses and is not used for any other purpose.\n\nBy continuing, you agree to share this data with these AI services.",
|
||||
preferredStyle: .alert
|
||||
)
|
||||
alert.addAction(UIAlertAction(title: "Cancel", style: .cancel))
|
||||
|
||||
@@ -26,13 +26,13 @@
|
||||
"@blocksuite/global": "workspace:*",
|
||||
"@blocksuite/icons": "^2.2.17",
|
||||
"@blocksuite/std": "workspace:*",
|
||||
"@dotlottie/player-component": "^2.7.12",
|
||||
"@emotion/cache": "^11.14.0",
|
||||
"@emotion/css": "^11.13.5",
|
||||
"@emotion/react": "^11.14.0",
|
||||
"@floating-ui/dom": "^1.6.13",
|
||||
"@juggle/resize-observer": "^3.4.0",
|
||||
"@lit/context": "^1.1.4",
|
||||
"@lottiefiles/dotlottie-wc": "^0.9.4",
|
||||
"@marsidev/react-turnstile": "^1.1.0",
|
||||
"@myriaddreamin/typst-ts-renderer": "^0.7.0-rc2",
|
||||
"@myriaddreamin/typst-ts-web-compiler": "^0.7.0-rc2",
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -18,7 +18,7 @@
|
||||
"pl": 98,
|
||||
"pt-BR": 96,
|
||||
"ru": 98,
|
||||
"sv-SE": 97,
|
||||
"sv-SE": 96,
|
||||
"uk": 96,
|
||||
"ur": 2,
|
||||
"zh-Hans": 98,
|
||||
|
||||
+52
-4
@@ -73,8 +73,8 @@ update_app_stream_version() {
|
||||
|
||||
update_ios_marketing_version() {
|
||||
local file_path=$1
|
||||
# Remove everything after the "-"
|
||||
local new_version=$(echo "$2" | sed -E 's/-.*$//')
|
||||
# Normalize inputs like "v0.26.4-beta.1" to "0.26.4"
|
||||
local new_version=$(echo "$2" | sed -E 's/^v//; s/-.*$//')
|
||||
|
||||
# Check if file exists
|
||||
if [ ! -f "$file_path" ]; then
|
||||
@@ -98,8 +98,56 @@ update_ios_marketing_version() {
|
||||
rm "$file_path".bak
|
||||
}
|
||||
|
||||
# Derive a date-based iOS MARKETING_VERSION from the latest stable/beta tag.
|
||||
# Apple requires CFBundleShortVersionString to increase monotonically. Using
|
||||
# date-based versions (YYYY.M.D) derived from the last stable/beta release tag
|
||||
# ensures this. The user-facing App Store version is set separately in
|
||||
# App Store Connect.
|
||||
get_ios_version_from_git() {
|
||||
# Find the most recent stable/beta tag reachable from HEAD (exclude canary/nightly)
|
||||
local latest_tag
|
||||
latest_tag=$(git describe --tags --match 'v[0-9]*' \
|
||||
--exclude '*canary*' --exclude '*nightly*' \
|
||||
--abbrev=0 HEAD 2>/dev/null)
|
||||
|
||||
if [ -z "$latest_tag" ]; then
|
||||
# No stable/beta tag found, fall back to today's date
|
||||
date +"%Y.%-m.%-d"
|
||||
return
|
||||
fi
|
||||
|
||||
# Get the tag creation date (tagger date for annotated tags, commit date for lightweight)
|
||||
local tag_date
|
||||
tag_date=$(git for-each-ref --format='%(creatordate:short)' "refs/tags/$latest_tag")
|
||||
|
||||
if [ -z "$tag_date" ]; then
|
||||
date +"%Y.%-m.%-d"
|
||||
return
|
||||
fi
|
||||
|
||||
# Format as YYYY.M.D (no leading zeros for month/day)
|
||||
local year month day
|
||||
year=$(echo "$tag_date" | cut -d'-' -f1)
|
||||
month=$((10#$(echo "$tag_date" | cut -d'-' -f2)))
|
||||
day=$((10#$(echo "$tag_date" | cut -d'-' -f3)))
|
||||
|
||||
echo "${year}.${month}.${day}"
|
||||
}
|
||||
|
||||
new_version=$1
|
||||
ios_new_version=${IOS_APP_VERSION:-$new_version}
|
||||
|
||||
if [ -n "$IOS_APP_VERSION" ]; then
|
||||
# Manual override via environment variable
|
||||
ios_new_version=$IOS_APP_VERSION
|
||||
elif echo "$new_version" | grep -qE '(canary|nightly)'; then
|
||||
# Canary/nightly: use the date of the last stable/beta tag
|
||||
ios_new_version=$(get_ios_version_from_git)
|
||||
else
|
||||
# Stable/beta release: use today's date
|
||||
ios_new_version=$(date +"%Y.%-m.%-d")
|
||||
fi
|
||||
|
||||
echo "iOS MARKETING_VERSION: $ios_new_version (app version: $new_version)"
|
||||
|
||||
update_app_version_in_helm_charts ".github/helm/affine/Chart.yaml" "$new_version"
|
||||
update_app_version_in_helm_charts ".github/helm/affine/charts/graphql/Chart.yaml" "$new_version"
|
||||
@@ -108,4 +156,4 @@ update_app_version_in_helm_charts ".github/helm/affine/charts/doc/Chart.yaml" "$
|
||||
|
||||
update_app_stream_version "packages/frontend/apps/electron/resources/affine.metainfo.xml" "$new_version"
|
||||
|
||||
update_ios_marketing_version "packages/frontend/apps/ios/App/App.xcodeproj/project.pbxproj" "$new_version"
|
||||
update_ios_marketing_version "packages/frontend/apps/ios/App/App.xcodeproj/project.pbxproj" "$ios_new_version"
|
||||
|
||||
Reference in New Issue
Block a user