From 91acb88a2dee2c7dc72f4c5bcb05c767ff006221 Mon Sep 17 00:00:00 2001 From: DarkSky Date: Sat, 30 May 2026 16:24:46 +0800 Subject: [PATCH] fix(server): mail test & retry --- .gitignore | 1 + .../server/src/__tests__/mails.spec.ts | 21 +++ .../server/src/__tests__/mocks/queue.mock.ts | 1 + .../src/__tests__/payment/service.spec.ts | 31 ++++ .../base/job/queue/__tests__/queue.spec.ts | 16 ++ .../server/src/base/job/queue/queue.ts | 33 ++++ .../src/core/mail/__tests__/job.spec.ts | 149 ++++++++++++++++++ packages/backend/server/src/core/mail/job.ts | 84 +++++++++- .../backend/server/src/core/mail/utils.ts | 55 +++++++ .../src/plugins/payment/manager/user.ts | 13 +- .../src/graphql/admin/send-test-email.gql | 31 ++-- packages/common/graphql/src/graphql/index.ts | 4 +- packages/common/graphql/src/schema.ts | 1 + 13 files changed, 414 insertions(+), 26 deletions(-) create mode 100644 packages/backend/server/src/core/mail/__tests__/job.spec.ts create mode 100644 packages/backend/server/src/core/mail/utils.ts diff --git a/.gitignore b/.gitignore index 15e44010e7..d3f45b1acf 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,7 @@ testem.log /typings tsconfig.tsbuildinfo .context +*.md # System Files .DS_Store diff --git a/packages/backend/server/src/__tests__/mails.spec.ts b/packages/backend/server/src/__tests__/mails.spec.ts index 7ed3bb8893..904c9e77af 100644 --- a/packages/backend/server/src/__tests__/mails.spec.ts +++ b/packages/backend/server/src/__tests__/mails.spec.ts @@ -1,5 +1,6 @@ import test from 'ava'; +import { normalizeSMTPHeloHostname } from '../core/mail/utils'; import { Renderers } from '../mails'; import { TEST_DOC, TEST_USER } from '../mails/common'; @@ -21,3 +22,23 @@ test('should render mention email with empty doc title', async t => { }); t.snapshot(content.html, content.subject); }); + +test('should normalize valid SMTP HELO hostnames', t => { + t.is(normalizeSMTPHeloHostname('mail.example.com'), 'mail.example.com'); + t.is(normalizeSMTPHeloHostname(' localhost '), 'localhost'); + t.is(normalizeSMTPHeloHostname('[127.0.0.1]'), '[127.0.0.1]'); + t.is(normalizeSMTPHeloHostname('[IPv6:2001:db8::1]'), '[IPv6:2001:db8::1]'); +}); + +test('should reject invalid SMTP HELO hostnames', t => { + t.is(normalizeSMTPHeloHostname(), undefined); + t.is(normalizeSMTPHeloHostname(''), undefined); + t.is(normalizeSMTPHeloHostname(' '), undefined); + t.is(normalizeSMTPHeloHostname('AFFiNE Server'), undefined); + t.is(normalizeSMTPHeloHostname('-example.com'), undefined); + t.is(normalizeSMTPHeloHostname('example-.com'), undefined); + t.is(normalizeSMTPHeloHostname('example..com'), undefined); + t.is(normalizeSMTPHeloHostname('[bad host]'), undefined); + t.is(normalizeSMTPHeloHostname('[foo]'), undefined); + t.is(normalizeSMTPHeloHostname('[IPv6:foo]'), undefined); +}); diff --git a/packages/backend/server/src/__tests__/mocks/queue.mock.ts b/packages/backend/server/src/__tests__/mocks/queue.mock.ts index d44a9b6321..a2ec86b33b 100644 --- a/packages/backend/server/src/__tests__/mocks/queue.mock.ts +++ b/packages/backend/server/src/__tests__/mocks/queue.mock.ts @@ -6,6 +6,7 @@ import { JobQueue } from '../../base'; export class MockJobQueue { add = Sinon.createStubInstance(JobQueue).add.resolves(); remove = Sinon.createStubInstance(JobQueue).remove.resolves(); + removeWhere = Sinon.createStubInstance(JobQueue).removeWhere.resolves([]); last(name: Job): { name: Job; payload: Jobs[Job] } { const addJobName = this.add.lastCall?.args[0]; diff --git a/packages/backend/server/src/__tests__/payment/service.spec.ts b/packages/backend/server/src/__tests__/payment/service.spec.ts index b96e1ed353..ce0c65640a 100644 --- a/packages/backend/server/src/__tests__/payment/service.spec.ts +++ b/packages/backend/server/src/__tests__/payment/service.spec.ts @@ -437,6 +437,37 @@ test('should throw if user has subscription already', async t => { ); }); +test('should allow checkout after local subscription period ended', async t => { + const { service, u1, db, stripe } = t.context; + + await db.subscription.create({ + data: { + targetId: u1.id, + stripeSubscriptionId: 'sub_expired_ai', + plan: SubscriptionPlan.AI, + recurring: SubscriptionRecurring.Yearly, + status: SubscriptionStatus.Active, + start: new Date('2026-05-04T13:11:45.000Z'), + end: new Date('2026-05-11T13:11:45.000Z'), + }, + }); + + await service.checkout( + { + plan: SubscriptionPlan.AI, + recurring: SubscriptionRecurring.Yearly, + successCallbackLink: '', + }, + { user: u1 } + ); + + t.true(stripe.checkout.sessions.create.calledOnce); + t.deepEqual(getLastCheckoutPrice(stripe.checkout.sessions.create), { + price: AI_YEARLY, + coupon: undefined, + }); +}); + test('should get correct pro plan price for checking out', async t => { const { app, service, u1, stripe, feature } = t.context; // non-ea user diff --git a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts index 1d770fac14..65f2a2b13b 100644 --- a/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts +++ b/packages/backend/server/src/base/job/queue/__tests__/queue.spec.ts @@ -117,6 +117,22 @@ test('should remove job from queue', async t => { t.is(nullData, undefined); t.is(nullJob, undefined); }); + +test('should remove jobs by payload predicate', async t => { + const keep = await queue.add('nightly.__test__job', { name: 'keep' }); + const remove = await queue.add('nightly.__test__job', { name: 'remove' }); + const other = await queue.add('nightly.__test__job2', { name: 'remove' }); + + const removed = await queue.removeWhere( + 'nightly.__test__job', + job => job.name === 'remove' + ); + + t.deepEqual(removed, [{ name: 'remove' }]); + t.truthy(await queue.get(keep.id!, 'nightly.__test__job')); + t.is(await queue.get(remove.id!, 'nightly.__test__job'), undefined); + t.truthy(await queue.get(other.id!, 'nightly.__test__job2')); +}); // #endregion // #region executor diff --git a/packages/backend/server/src/base/job/queue/queue.ts b/packages/backend/server/src/base/job/queue/queue.ts index 73c6e64725..30a714fa05 100644 --- a/packages/backend/server/src/base/job/queue/queue.ts +++ b/packages/backend/server/src/base/job/queue/queue.ts @@ -55,6 +55,39 @@ export class JobQueue { return undefined; } + async removeWhere( + jobName: T, + predicate: (payload: Jobs[T]) => boolean | Promise + ): Promise { + const ns = namespace(jobName); + const queue = this.getQueue(ns); + const jobs = (await queue.getJobs([ + 'waiting', + 'delayed', + 'prioritized', + 'paused', + 'waiting-children', + ])) as Job>[]; + const removed: Jobs[T][] = []; + + for (const job of jobs) { + if (job.name !== jobName) { + continue; + } + + const payload = job.data.payload; + if (!(await predicate(payload))) { + continue; + } + + await job.remove(); + this.logger.log(`Job ${jobName}(id=${job.id}) removed from queue ${ns}`); + removed.push(payload); + } + + return removed; + } + async get(jobId: string, jobName: T) { const ns = namespace(jobName); const queue = this.getQueue(ns); diff --git a/packages/backend/server/src/core/mail/__tests__/job.spec.ts b/packages/backend/server/src/core/mail/__tests__/job.spec.ts new file mode 100644 index 0000000000..2bdbfc0573 --- /dev/null +++ b/packages/backend/server/src/core/mail/__tests__/job.spec.ts @@ -0,0 +1,149 @@ +import test from 'ava'; +import Sinon from 'sinon'; + +import { Mockers } from '../../../__tests__/mocks'; +import { createTestingModule } from '../../../__tests__/utils'; +import { Cache } from '../../../base'; +import { Models } from '../../../models'; +import { MailJob } from '../job'; +import { MailSender } from '../sender'; + +let module: Awaited>; +let cache: Cache; +let mailJob: MailJob; +let sender: MailSender; +let models: Models; + +test.before(async () => { + module = await createTestingModule(); + cache = module.get(Cache); + mailJob = module.get(MailJob); + sender = module.get(MailSender); + models = module.get(Models); +}); + +test.after.always(async () => { + await module.close(); +}); + +test.afterEach(() => { + Sinon.restore(); +}); + +test('should clear pending mail records when user is deleted', async t => { + const user = await module.create(Mockers.User); + const another = await module.create(Mockers.User); + const sendMailKey = 'mailjob:sendMail'; + const retryMailKey = 'mailjob:sendMail:retry'; + const userKey = `${sendMailKey}:SignIn:${user.email}`; + const userRetryKey = `${sendMailKey}:VerifyEmail:${user.email}`; + const anotherKey = `${sendMailKey}:SignIn:${another.email}`; + + await cache.mapSet(sendMailKey, userKey, 1); + await cache.mapSet(sendMailKey, anotherKey, 1); + await cache.mapSet( + retryMailKey, + userRetryKey, + JSON.stringify({ + startTime: Date.now(), + name: 'VerifyEmail', + to: user.email, + props: { url: 'https://affine.pro/verify' }, + }) + ); + + await mailJob.onUserDeleted({ ...user, ownedWorkspaces: [] }); + + t.true(module.queue.removeWhere.calledOnce); + t.is(module.queue.removeWhere.firstCall.args[0], 'notification.sendMail'); + const shouldRemove = module.queue.removeWhere.firstCall.args[1]; + t.true( + await shouldRemove({ + to: user.email, + } as Jobs['notification.sendMail']) + ); + t.false( + await shouldRemove({ + to: another.email, + } as Jobs['notification.sendMail']) + ); + t.is(await cache.mapGet(sendMailKey, userKey), undefined); + t.is(await cache.mapGet(retryMailKey, userRetryKey), undefined); + t.is(await cache.mapGet(sendMailKey, anotherKey), 1); +}); + +test('should skip queued mail for disabled recipient', async t => { + const user = await module.create(Mockers.User, { disabled: true }); + const send = Sinon.stub(sender, 'send').resolves(true); + + await mailJob.sendMail({ + startTime: Date.now(), + name: 'SignIn', + to: user.email, + props: { + url: 'https://affine.pro/sign-in', + otp: '123456', + }, + }); + + t.false(send.called); + t.truthy(await models.user.get(user.id, { withDisabled: true })); +}); + +test('should drop expired mail retry', async t => { + const send = Sinon.stub(sender, 'send').resolves(true); + + await mailJob.sendMail({ + startTime: Date.now() - 25 * 60 * 60 * 1000, + name: 'SignIn', + to: 'expired-retry@example.com', + props: { + url: 'https://affine.pro/sign-in', + otp: '123456', + }, + }); + + t.false(send.called); +}); + +test('should drop mail retry after max attempts', async t => { + const send = Sinon.stub(sender, 'send').resolves(true); + + await mailJob.sendMail({ + startTime: Date.now(), + retryCount: 12, + name: 'SignIn', + to: 'max-retry@example.com', + props: { + url: 'https://affine.pro/sign-in', + otp: '123456', + }, + }); + + t.false(send.called); +}); + +test('should requeue legacy stringified retry mail', async t => { + const retryMailKey = 'mailjob:sendMail:retry'; + const job: Jobs['notification.sendMail'] = { + startTime: Date.now(), + name: 'SignIn', + to: 'legacy-retry@example.com', + props: { + url: 'https://affine.pro/sign-in', + otp: '123456', + }, + }; + const cacheKey = `${retryMailKey}:SignIn:${job.to}`; + + Sinon.stub(cache, 'mapRandomKey') + .onFirstCall() + .resolves(cacheKey) + .onSecondCall() + .resolves(undefined); + await cache.mapSet(retryMailKey, cacheKey, JSON.stringify(job)); + await mailJob.sendRetryMails(); + + t.true(module.queue.add.calledWith('notification.sendMail', job)); + t.is(await cache.mapGet(retryMailKey, cacheKey), undefined); +}); diff --git a/packages/backend/server/src/core/mail/job.ts b/packages/backend/server/src/core/mail/job.ts index fe427a219b..fe6a412de1 100644 --- a/packages/backend/server/src/core/mail/job.ts +++ b/packages/backend/server/src/core/mail/job.ts @@ -2,7 +2,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { getStreamAsBuffer } from 'get-stream'; -import { Cache, JOB_SIGNAL, JobQueue, OnJob, sleep } from '../../base'; +import { Cache, JOB_SIGNAL, JobQueue, OnEvent, OnJob, sleep } from '../../base'; import { type MailName, MailProps, Renderers } from '../../mails'; import { UserProps, WorkspaceProps } from '../../mails/components'; import { Models } from '../../models'; @@ -35,7 +35,7 @@ type SendMailJob> = { declare global { interface Jobs { - 'notification.sendMail': { startTime: number } & { + 'notification.sendMail': { startTime: number; retryCount?: number } & { [K in MailName]: SendMailJob; }[MailName]; } @@ -47,6 +47,8 @@ const sendMailCacheKey = (name: string, to: string) => `${sendMailKey}:${name}:${to}`; const retryMaxPerTick = 20; const retryFirstTime = 3; +const retryMaxAttempts = 12; +const retryMaxAge = 24 * 60 * 60 * 1000; @Injectable() export class MailJob { @@ -66,12 +68,55 @@ export class MailJob { return Math.min(30 * 1000, Math.round(elapsed / 2000) * 1000); } + private getRetryExhaustedReason({ + startTime, + retryCount, + }: Jobs['notification.sendMail']) { + if (Date.now() - startTime > retryMaxAge) { + return 'expired'; + } + + if ((retryCount ?? 0) > retryMaxAttempts) { + return 'max attempts reached'; + } + + return; + } + + private async shouldSkipRecipient(to: string) { + const user = await this.models.user.getUserByEmail(to, { + withDisabled: true, + }); + + return user?.disabled === true; + } + + private async deleteRecipientMailCache(to: string) { + const suffix = `:${to}`; + + await Promise.all( + [sendMailKey, retryMailKey].map(async map => { + const keys = await this.cache.mapKeys(map); + await Promise.all( + keys + .filter(key => key.endsWith(suffix)) + .map(key => this.cache.mapDelete(map, key)) + ); + }) + ); + } + private async sendMailInternal({ startTime, name, to, props, }: Jobs['notification.sendMail']) { + if (await this.shouldSkipRecipient(to)) { + this.logger.debug(`Skip mail [${name}] to disabled user [${to}]`); + return; + } + let options: Partial = {}; for (const key in props) { @@ -177,17 +222,41 @@ export class MailJob { @OnJob('notification.sendMail') async sendMail(job: Jobs['notification.sendMail']) { const cacheKey = sendMailCacheKey(job.name, job.to); + job.retryCount = (job.retryCount ?? 0) + 1; + const exhaustedReason = this.getRetryExhaustedReason(job); + if (exhaustedReason) { + this.logger.warn( + `Drop mail [${job.name}] to [${job.to}], reason=${exhaustedReason}` + ); + await Promise.all([ + this.cache.mapDelete(sendMailKey, cacheKey), + this.cache.mapDelete(retryMailKey, cacheKey), + ]); + return; + } + const retried = await this.cache.mapIncrease(sendMailKey, cacheKey, 1); if (retried <= retryFirstTime) { const ret = await this.sendMailInternal(job); if (!ret) await this.cache.mapDelete(sendMailKey, cacheKey); return ret; } - await this.cache.mapSet(retryMailKey, cacheKey, JSON.stringify(job)); + await this.cache.mapSet(retryMailKey, cacheKey, job); await this.cache.mapDelete(sendMailKey, cacheKey); return undefined; } + @OnEvent('user.deleted') + async onUserDeleted(user: Events['user.deleted']) { + await Promise.all([ + this.deleteRecipientMailCache(user.email), + this.queue.removeWhere( + 'notification.sendMail', + job => job.to === user.email + ), + ]); + } + @Cron(CronExpression.EVERY_MINUTE) async sendRetryMails() { // pick random one from the retry map @@ -195,9 +264,14 @@ export class MailJob { let key = await this.cache.mapRandomKey(retryMailKey); while (key && processed < retryMaxPerTick) { try { - const job = await this.cache.mapGet(retryMailKey, key); + const job = await this.cache.mapGet< + Jobs['notification.sendMail'] | string + >(retryMailKey, key); if (job) { - const jobData = JSON.parse(job) as Jobs['notification.sendMail']; + const jobData = + typeof job === 'string' + ? (JSON.parse(job) as Jobs['notification.sendMail']) + : job; await this.queue.add('notification.sendMail', jobData); // wait for a while before retrying const retryDelay = this.calculateRetryDelay(jobData.startTime); diff --git a/packages/backend/server/src/core/mail/utils.ts b/packages/backend/server/src/core/mail/utils.ts new file mode 100644 index 0000000000..572406b751 --- /dev/null +++ b/packages/backend/server/src/core/mail/utils.ts @@ -0,0 +1,55 @@ +import { isIP } from 'node:net'; +import { hostname as getHostname } from 'node:os'; + +const hostnameLabelRegexp = /^[A-Za-z0-9-]+$/; + +function isValidSMTPAddressLiteral(hostname: string) { + if (!hostname.startsWith('[') || !hostname.endsWith(']')) return false; + + const literal = hostname.slice(1, -1); + if (!literal || literal.includes(' ')) return false; + if (isIP(literal) === 4) return true; + + if (literal.startsWith('IPv6:')) { + return isIP(literal.slice('IPv6:'.length)) === 6; + } + + return false; +} + +export function normalizeSMTPHeloHostname(hostname?: string) { + if (!hostname) return undefined; + + const normalized = hostname.trim().replace(/\.$/, ''); + if (!normalized) return undefined; + if (isValidSMTPAddressLiteral(normalized)) return normalized; + if (normalized.length > 253) return undefined; + + const labels = normalized.split('.'); + for (const label of labels) { + if (!label || label.length > 63) return undefined; + if ( + !hostnameLabelRegexp.test(label) || + label.startsWith('-') || + label.endsWith('-') + ) { + return undefined; + } + } + + return normalized; +} + +function readSystemHostname() { + try { + return getHostname(); + } catch { + return ''; + } +} + +export function resolveSMTPHeloHostname(configuredName: string) { + const normalizedConfiguredName = normalizeSMTPHeloHostname(configuredName); + if (normalizedConfiguredName) return normalizedConfiguredName; + return normalizeSMTPHeloHostname(readSystemHostname()); +} diff --git a/packages/backend/server/src/plugins/payment/manager/user.ts b/packages/backend/server/src/plugins/payment/manager/user.ts index e0647644b3..e442ffea09 100644 --- a/packages/backend/server/src/plugins/payment/manager/user.ts +++ b/packages/backend/server/src/plugins/payment/manager/user.ts @@ -114,22 +114,17 @@ export class UserSubscriptionManager extends SubscriptionManager { throw new ManagedByAppStoreOrPlay(); } - const subscription = await this.getSubscription({ - plan: lookupKey.plan, - userId: user.id, - }); - if ( - subscription && + active && // do not allow to re-subscribe unless !( /* current subscription is a onetime subscription and so as the one that's checking out */ ( - (subscription.variant === SubscriptionVariant.Onetime && + (active.variant === SubscriptionVariant.Onetime && lookupKey.variant === SubscriptionVariant.Onetime) || /* current subscription is normal subscription and is checking-out a lifetime subscription */ - (subscription.recurring !== SubscriptionRecurring.Lifetime && - subscription.variant !== SubscriptionVariant.Onetime && + (active.recurring !== SubscriptionRecurring.Lifetime && + active.variant !== SubscriptionVariant.Onetime && lookupKey.recurring === SubscriptionRecurring.Lifetime) ) ) diff --git a/packages/common/graphql/src/graphql/admin/send-test-email.gql b/packages/common/graphql/src/graphql/admin/send-test-email.gql index 7b0e94e9fa..8e861e500a 100644 --- a/packages/common/graphql/src/graphql/admin/send-test-email.gql +++ b/packages/common/graphql/src/graphql/admin/send-test-email.gql @@ -1,10 +1,21 @@ -mutation sendTestEmail($host: String!, $port: Int!, $sender: String!, $username: String!, $password: String!, $ignoreTLS: Boolean!) { - sendTestEmail(config: { - host: $host, - port: $port, - sender: $sender, - username: $username, - password: $password, - ignoreTLS: $ignoreTLS, - }) -} \ No newline at end of file +mutation sendTestEmail( + $name: String! + $host: String! + $port: Int! + $sender: String! + $username: String! + $password: String! + $ignoreTLS: Boolean! +) { + sendTestEmail( + config: { + name: $name + host: $host + port: $port + sender: $sender + username: $username + password: $password + ignoreTLS: $ignoreTLS + } + ) +} diff --git a/packages/common/graphql/src/graphql/index.ts b/packages/common/graphql/src/graphql/index.ts index 74647207bf..1d0c46ec24 100644 --- a/packages/common/graphql/src/graphql/index.ts +++ b/packages/common/graphql/src/graphql/index.ts @@ -549,9 +549,9 @@ export const listUsersQuery = { export const sendTestEmailMutation = { id: 'sendTestEmailMutation' as const, op: 'sendTestEmail', - query: `mutation sendTestEmail($host: String!, $port: Int!, $sender: String!, $username: String!, $password: String!, $ignoreTLS: Boolean!) { + query: `mutation sendTestEmail($name: String!, $host: String!, $port: Int!, $sender: String!, $username: String!, $password: String!, $ignoreTLS: Boolean!) { sendTestEmail( - config: {host: $host, port: $port, sender: $sender, username: $username, password: $password, ignoreTLS: $ignoreTLS} + config: {name: $name, host: $host, port: $port, sender: $sender, username: $username, password: $password, ignoreTLS: $ignoreTLS} ) }`, }; diff --git a/packages/common/graphql/src/schema.ts b/packages/common/graphql/src/schema.ts index f11bfbad85..d00a7f522b 100644 --- a/packages/common/graphql/src/schema.ts +++ b/packages/common/graphql/src/schema.ts @@ -4027,6 +4027,7 @@ export type ListUsersQuery = { }; export type SendTestEmailMutationVariables = Exact<{ + name: Scalars['String']['input']; host: Scalars['String']['input']; port: Scalars['Int']['input']; sender: Scalars['String']['input'];