mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-14 05:14:54 +00:00
fix(server): avoid a snowball effect of email sending failures (#13818)
fix #13802
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { getStreamAsBuffer } from 'get-stream';
|
||||
|
||||
import { JOB_SIGNAL, OnJob, sleep } from '../../base';
|
||||
import { Cache, JOB_SIGNAL, JobQueue, OnJob, sleep } from '../../base';
|
||||
import { type MailName, MailProps, Renderers } from '../../mails';
|
||||
import { UserProps, WorkspaceProps } from '../../mails/components';
|
||||
import { Models } from '../../models';
|
||||
@@ -40,17 +41,32 @@ declare global {
|
||||
}
|
||||
}
|
||||
|
||||
const sendMailKey = 'mailjob:sendMail';
|
||||
const retryMailKey = 'mailjob:sendMail:retry';
|
||||
const sendMailCacheKey = (name: string, to: string) =>
|
||||
`${sendMailKey}:${name}:${to}`;
|
||||
const retryMaxPerTick = 20;
|
||||
const retryFirstTime = 3;
|
||||
|
||||
@Injectable()
|
||||
export class MailJob {
|
||||
private readonly logger = new Logger('MailJob');
|
||||
|
||||
constructor(
|
||||
private readonly cache: Cache,
|
||||
private readonly queue: JobQueue,
|
||||
private readonly sender: MailSender,
|
||||
private readonly doc: DocReader,
|
||||
private readonly workspaceBlob: WorkspaceBlobStorage,
|
||||
private readonly models: Models
|
||||
) {}
|
||||
|
||||
@OnJob('notification.sendMail')
|
||||
async sendMail({
|
||||
private calculateRetryDelay(startTime: number) {
|
||||
const elapsed = Date.now() - startTime;
|
||||
return Math.min(30 * 1000, Math.round(elapsed / 2000) * 1000);
|
||||
}
|
||||
|
||||
private async sendMailInternal({
|
||||
startTime,
|
||||
name,
|
||||
to,
|
||||
@@ -97,23 +113,29 @@ export class MailJob {
|
||||
}
|
||||
}
|
||||
|
||||
const result = await this.sender.send(name, {
|
||||
to,
|
||||
...(await Renderers[name](
|
||||
// @ts-expect-error the job trigger part has been typechecked
|
||||
props
|
||||
)),
|
||||
...options,
|
||||
});
|
||||
if (result === false) {
|
||||
try {
|
||||
const result = await this.sender.send(name, {
|
||||
to,
|
||||
...(await Renderers[name](
|
||||
// @ts-expect-error the job trigger part has been typechecked
|
||||
props
|
||||
)),
|
||||
...options,
|
||||
});
|
||||
if (!result) {
|
||||
// wait for a while before retrying
|
||||
const retryDelay = this.calculateRetryDelay(startTime);
|
||||
await sleep(retryDelay);
|
||||
return JOB_SIGNAL.Retry;
|
||||
}
|
||||
return undefined;
|
||||
} catch (e) {
|
||||
this.logger.error(`Failed to send mail [${name}] to [${to}]`, e);
|
||||
// wait for a while before retrying
|
||||
const elapsed = Date.now() - startTime;
|
||||
const retryDelay = Math.min(30 * 1000, Math.round(elapsed / 2000) * 1000);
|
||||
const retryDelay = this.calculateRetryDelay(startTime);
|
||||
await sleep(retryDelay);
|
||||
return JOB_SIGNAL.Retry;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private async fetchWorkspaceProps(workspaceId: string) {
|
||||
@@ -151,4 +173,45 @@ export class MailJob {
|
||||
|
||||
return { email: user.email } satisfies UserProps;
|
||||
}
|
||||
|
||||
@OnJob('notification.sendMail')
|
||||
async sendMail(job: Jobs['notification.sendMail']) {
|
||||
const cacheKey = sendMailCacheKey(job.name, job.to);
|
||||
const retried = await this.cache.mapIncrease(sendMailKey, cacheKey, 1);
|
||||
if (retried <= retryFirstTime) {
|
||||
const ret = await this.sendMailInternal(job);
|
||||
if (!ret) await this.cache.mapDelete(sendMailKey, cacheKey);
|
||||
return ret;
|
||||
}
|
||||
await this.cache.mapSet(retryMailKey, cacheKey, JSON.stringify(job));
|
||||
await this.cache.mapDelete(sendMailKey, cacheKey);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@Cron(CronExpression.EVERY_MINUTE)
|
||||
async sendRetryMails() {
|
||||
// pick random one from the retry map
|
||||
let processed = 0;
|
||||
let key = await this.cache.mapRandomKey(retryMailKey);
|
||||
while (key && processed < retryMaxPerTick) {
|
||||
try {
|
||||
const job = await this.cache.mapGet<string>(retryMailKey, key);
|
||||
if (job) {
|
||||
const jobData = JSON.parse(job) as Jobs['notification.sendMail'];
|
||||
await this.queue.add('notification.sendMail', jobData);
|
||||
// wait for a while before retrying
|
||||
const retryDelay = this.calculateRetryDelay(jobData.startTime);
|
||||
await sleep(retryDelay);
|
||||
}
|
||||
await this.cache.mapDelete(retryMailKey, key);
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to re-queue retry mail job for key [${key}]`,
|
||||
e
|
||||
);
|
||||
}
|
||||
key = await this.cache.mapRandomKey(retryMailKey);
|
||||
processed++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user