diff --git a/packages/backend/server/src/core/mail/job.ts b/packages/backend/server/src/core/mail/job.ts index fdea182419..fe427a219b 100644 --- a/packages/backend/server/src/core/mail/job.ts +++ b/packages/backend/server/src/core/mail/job.ts @@ -1,7 +1,8 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; import { getStreamAsBuffer } from 'get-stream'; -import { JOB_SIGNAL, OnJob, sleep } from '../../base'; +import { Cache, JOB_SIGNAL, JobQueue, OnJob, sleep } from '../../base'; import { type MailName, MailProps, Renderers } from '../../mails'; import { UserProps, WorkspaceProps } from '../../mails/components'; import { Models } from '../../models'; @@ -40,17 +41,32 @@ declare global { } } +const sendMailKey = 'mailjob:sendMail'; +const retryMailKey = 'mailjob:sendMail:retry'; +const sendMailCacheKey = (name: string, to: string) => + `${sendMailKey}:${name}:${to}`; +const retryMaxPerTick = 20; +const retryFirstTime = 3; + @Injectable() export class MailJob { + private readonly logger = new Logger('MailJob'); + constructor( + private readonly cache: Cache, + private readonly queue: JobQueue, private readonly sender: MailSender, private readonly doc: DocReader, private readonly workspaceBlob: WorkspaceBlobStorage, private readonly models: Models ) {} - @OnJob('notification.sendMail') - async sendMail({ + private calculateRetryDelay(startTime: number) { + const elapsed = Date.now() - startTime; + return Math.min(30 * 1000, Math.round(elapsed / 2000) * 1000); + } + + private async sendMailInternal({ startTime, name, to, @@ -97,23 +113,29 @@ export class MailJob { } } - const result = await this.sender.send(name, { - to, - ...(await Renderers[name]( - // @ts-expect-error the job trigger part has been typechecked - props - )), - ...options, - }); - if (result === false) { + try { + const result = await this.sender.send(name, { + to, + ...(await Renderers[name]( + // @ts-expect-error the job trigger part has been typechecked + props + )), + ...options, + }); + if (!result) { + // wait for a while before retrying + const retryDelay = this.calculateRetryDelay(startTime); + await sleep(retryDelay); + return JOB_SIGNAL.Retry; + } + return undefined; + } catch (e) { + this.logger.error(`Failed to send mail [${name}] to [${to}]`, e); // wait for a while before retrying - const elapsed = Date.now() - startTime; - const retryDelay = Math.min(30 * 1000, Math.round(elapsed / 2000) * 1000); + const retryDelay = this.calculateRetryDelay(startTime); await sleep(retryDelay); return JOB_SIGNAL.Retry; } - - return undefined; } private async fetchWorkspaceProps(workspaceId: string) { @@ -151,4 +173,45 @@ export class MailJob { return { email: user.email } satisfies UserProps; } + + @OnJob('notification.sendMail') + async sendMail(job: Jobs['notification.sendMail']) { + const cacheKey = sendMailCacheKey(job.name, job.to); + const retried = await this.cache.mapIncrease(sendMailKey, cacheKey, 1); + if (retried <= retryFirstTime) { + const ret = await this.sendMailInternal(job); + if (!ret) await this.cache.mapDelete(sendMailKey, cacheKey); + return ret; + } + await this.cache.mapSet(retryMailKey, cacheKey, JSON.stringify(job)); + await this.cache.mapDelete(sendMailKey, cacheKey); + return undefined; + } + + @Cron(CronExpression.EVERY_MINUTE) + async sendRetryMails() { + // pick random one from the retry map + let processed = 0; + let key = await this.cache.mapRandomKey(retryMailKey); + while (key && processed < retryMaxPerTick) { + try { + const job = await this.cache.mapGet(retryMailKey, key); + if (job) { + const jobData = JSON.parse(job) as Jobs['notification.sendMail']; + await this.queue.add('notification.sendMail', jobData); + // wait for a while before retrying + const retryDelay = this.calculateRetryDelay(jobData.startTime); + await sleep(retryDelay); + } + await this.cache.mapDelete(retryMailKey, key); + } catch (e) { + this.logger.error( + `Failed to re-queue retry mail job for key [${key}]`, + e + ); + } + key = await this.cache.mapRandomKey(retryMailKey); + processed++; + } + } }