feat: improve event handle (#14177)

This commit is contained in:
DarkSky
2025-12-29 18:54:59 +08:00
committed by GitHub
parent 20a80015c0
commit 6951f1002f
8 changed files with 465 additions and 8 deletions

View File

@@ -172,6 +172,7 @@ const test = ava as TestFn<{
checkout: {
sessions: Sinon.SinonStubbedInstance<Stripe.Checkout.SessionsResource>;
};
invoices: Sinon.SinonStubbedInstance<Stripe.InvoicesResource>;
promotionCodes: Sinon.SinonStubbedInstance<Stripe.PromotionCodesResource>;
};
}>;
@@ -223,6 +224,7 @@ test.before(async t => {
prices: Sinon.stub(stripe.prices),
subscriptions: Sinon.stub(stripe.subscriptions),
subscriptionSchedules: Sinon.stub(stripe.subscriptionSchedules),
invoices: Sinon.stub(stripe.invoices),
checkout: {
sessions: Sinon.stub(stripe.checkout.sessions),
},
@@ -1766,6 +1768,88 @@ test('should be able to update team subscription', async t => {
);
});
test('should suspend on dispute and restore when dispute won', async t => {
const { service, db, stripe, event } = t.context;
const invoice: Stripe.Invoice = {
id: 'in_dispute_1',
object: 'invoice',
status: 'paid',
customer_email: 'u1@affine.pro',
subscription: 'sub_1',
lines: {
object: 'list',
data: [
{
id: 'il_1',
object: 'line_item',
amount: 799,
currency: 'usd',
description: '',
discount_amounts: [],
discountable: false,
livemode: false,
metadata: {},
period: {
start: unixNow() - 60 * 60 * 24,
end: unixNow() + 60 * 60 * 24 * 30,
},
price: {
...PRICES[PRO_MONTHLY],
} as any,
quantity: 1,
} as any,
],
has_more: false,
total_count: 1,
url: '',
},
} as any;
stripe.invoices.retrieve.resolves(invoice as any);
stripe.subscriptions.retrieve.resolves(sub as any);
stripe.subscriptions.cancel.resolves(sub as any);
await service.saveStripeSubscription(sub as any);
event.emit.resetHistory();
stripe.subscriptions.cancel.resetHistory();
await service.handleRefundedInvoice(invoice.id, 'dispute_open');
const removed = await db.subscription.findFirst({
where: { stripeSubscriptionId: 'sub_1' },
});
t.is(removed, null);
t.true(
event.emit.calledWith('user.subscription.canceled', {
userId: t.context.u1.id,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Monthly,
})
);
t.false(stripe.subscriptions.cancel.called);
event.emit.resetHistory();
await service.handleRefundedInvoice(invoice.id, 'dispute_won');
const restored = await db.subscription.findFirst({
where: { stripeSubscriptionId: 'sub_1' },
});
t.truthy(restored);
t.is(restored?.status, SubscriptionStatus.Active);
t.true(
event.emit.calledWith('user.subscription.activated', {
userId: t.context.u1.id,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Monthly,
})
);
});
// NOTE(@forehalo): cancel and resume a team subscription share the same logic with user subscription
test.skip('should be able to cancel team subscription', async () => {});
test.skip('should be able to resume team subscription', async () => {});

View File

@@ -2,4 +2,8 @@ export const OneKB = 1024;
export const OneMB = OneKB * OneKB;
export const OneGB = OneKB * OneMB;
export const OneMinute = 1000 * 60;
export const OneDay = OneMinute * 60 * 24;
export const OneHour = OneMinute * 60;
export const OneDay = OneHour * 24;
export const OneWeek = OneDay * 7;
export const OneMonth = OneDay * 30;
export const OneYear = OneDay * 365;

View File

@@ -2,8 +2,10 @@ import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient, Provider } from '@prisma/client';
import { EventBus, JobQueue, OnJob } from '../../base';
import { EventBus, JobQueue, OneHour, OnJob } from '../../base';
import { RevenueCatWebhookHandler } from './revenuecat';
import { SubscriptionService } from './service';
import { StripeFactory } from './stripe';
import {
SubscriptionPlan,
SubscriptionRecurring,
@@ -16,6 +18,7 @@ declare global {
'nightly.cleanExpiredOnetimeSubscriptions': {};
'nightly.notifyAboutToExpireWorkspaceSubscriptions': {};
'nightly.reconcileRevenueCatSubscriptions': {};
'nightly.reconcileStripeRefunds': {};
'nightly.revenuecat.syncUser': { userId: string };
}
}
@@ -26,7 +29,9 @@ export class SubscriptionCronJobs {
private readonly db: PrismaClient,
private readonly event: EventBus,
private readonly queue: JobQueue,
private readonly rcHandler: RevenueCatWebhookHandler
private readonly rcHandler: RevenueCatWebhookHandler,
private readonly stripeFactory: StripeFactory,
private readonly subscription: SubscriptionService
) {}
private getDateRange(after: number, base: number | Date = Date.now()) {
@@ -56,6 +61,12 @@ export class SubscriptionCronJobs {
{ jobId: 'nightly-payment-reconcile-revenuecat-subscriptions' }
);
await this.queue.add(
'nightly.reconcileStripeRefunds',
{},
{ jobId: 'nightly-payment-reconcile-stripe-refunds' }
);
// FIXME(@forehalo): the strategy is totally wrong, for monthly plan. redesign required
// await this.queue.add(
// 'nightly.notifyAboutToExpireWorkspaceSubscriptions',
@@ -190,4 +201,62 @@ export class SubscriptionCronJobs {
async reconcileRevenueCatSubscriptionOfUser(payload: { userId: string }) {
await this.rcHandler.syncAppUser(payload.userId);
}
@OnJob('nightly.reconcileStripeRefunds')
async reconcileStripeRefunds() {
const stripe = this.stripeFactory.stripe;
const since = Math.floor((Date.now() - 36 * OneHour) / 1000);
const seen = new Set<string>();
const refunds = await stripe.refunds.list({
created: { gte: since },
limit: 100,
expand: ['data.charge'],
});
for (const refund of refunds.data) {
const charge = refund.charge;
const invoiceId =
typeof charge !== 'string'
? typeof charge?.invoice === 'string'
? charge.invoice
: charge?.invoice?.id
: undefined;
if (invoiceId && !seen.has(invoiceId)) {
seen.add(invoiceId);
await this.subscription.handleRefundedInvoice(invoiceId, 'refund');
}
}
const disputes = await stripe.disputes.list({
created: { gte: since },
limit: 100,
expand: ['data.charge'],
});
for (const dispute of disputes.data) {
const charge = dispute.charge;
const invoiceId =
typeof charge !== 'string'
? typeof charge?.invoice === 'string'
? charge.invoice
: charge?.invoice?.id
: undefined;
if (!invoiceId || seen.has(invoiceId)) {
continue;
}
seen.add(invoiceId);
const reason =
dispute.status === 'won'
? 'dispute_won'
: dispute.status === 'lost'
? 'dispute_lost'
: ('dispute_open' as const);
await this.subscription.handleRefundedInvoice(invoiceId, reason);
}
}
}

View File

@@ -3,7 +3,7 @@ import { Injectable } from '@nestjs/common';
import { EventBus, OnEvent } from '../../base';
import { WorkspaceService } from '../../core/workspaces';
import { Models } from '../../models';
import { SubscriptionPlan } from './types';
import { SubscriptionPlan, SubscriptionRecurring } from './types';
@Injectable()
export class PaymentEventHandlers {
@@ -91,12 +91,23 @@ export class PaymentEventHandlers {
async onUserSubscriptionCanceled({
userId,
plan,
recurring,
}: Events['user.subscription.canceled']) {
switch (plan) {
case SubscriptionPlan.AI:
await this.models.userFeature.remove(userId, 'unlimited_copilot');
break;
case SubscriptionPlan.Pro: {
// if user disputed a lifetime plan, we just switch them to free plan directly
if (recurring === SubscriptionRecurring.Lifetime) {
await this.models.userFeature.switchQuota(
userId,
'free_plan_v1',
'lifetime subscription canceled'
);
break;
}
// edge case: when user switch from recurring Pro plan to `Lifetime` plan,
// a subscription canceled event will be triggered because `Lifetime` plan is not subscription based
const isLifetimeUser = await this.models.userFeature.has(

View File

@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { PrismaClient, UserStripeCustomer } from '@prisma/client';
import { PrismaClient, Provider, UserStripeCustomer } from '@prisma/client';
import { omit, pick } from 'lodash-es';
import Stripe from 'stripe';
import { z } from 'zod';
@@ -11,7 +11,9 @@ import {
InvalidCheckoutParameters,
ManagedByAppStoreOrPlay,
Mutex,
OneMonth,
OnEvent,
OneYear,
SubscriptionAlreadyExists,
SubscriptionPlanNotFound,
TooManyRequest,
@@ -557,6 +559,102 @@ export class UserSubscriptionManager extends SubscriptionManager {
return subscription;
}
async revokeOnetimeOrLifetime(knownInvoice: KnownStripeInvoice) {
this.assertUserIdExists(knownInvoice.userId);
const { userId, lookupKey } = knownInvoice;
const subscription = await this.db.subscription.findFirst({
where: {
targetId: userId,
plan: lookupKey.plan,
provider: Provider.stripe,
},
});
if (!subscription) {
return;
}
await this.db.subscription.update({
where: {
id: subscription.id,
},
data: {
status: SubscriptionStatus.Canceled,
nextBillAt: null,
canceledAt: new Date(),
},
});
this.event.emit('user.subscription.canceled', {
userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
}
async restoreOnetimeOrLifetime(knownInvoice: KnownStripeInvoice) {
this.assertUserIdExists(knownInvoice.userId);
const { userId, lookupKey, stripeInvoice } = knownInvoice;
const subscription = await this.db.subscription.findFirst({
where: {
targetId: userId,
plan: lookupKey.plan,
provider: Provider.stripe,
},
});
const start =
stripeInvoice.lines.data[0]?.period?.start ??
(typeof stripeInvoice.created === 'number'
? stripeInvoice.created
: Date.now() / 1000);
let end: Date | null = null;
if (lookupKey.recurring === SubscriptionRecurring.Lifetime) {
end = null;
} else if (lookupKey.variant === SubscriptionVariant.Onetime) {
const isMonthly = lookupKey.recurring === SubscriptionRecurring.Monthly;
const duration = isMonthly ? OneMonth : OneYear;
end = subscription?.end ?? new Date(start * 1000 + duration);
} else {
end = subscription?.end ?? null;
}
if (subscription) {
await this.db.subscription.update({
where: { id: subscription.id },
data: {
status: SubscriptionStatus.Active,
canceledAt: null,
nextBillAt: null,
start: subscription.start ?? new Date(start * 1000),
end,
},
});
} else {
await this.db.subscription.create({
data: {
targetId: userId,
stripeSubscriptionId: null,
...lookupKey,
start: new Date(start * 1000),
end,
status: SubscriptionStatus.Active,
nextBillAt: null,
},
});
}
this.event.emit('user.subscription.activated', {
userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
}
private async autoPrice(lookupKey: LookupKey, strategy: PriceStrategyStatus) {
// auto select ea variant when available if not specified
let variant: SubscriptionVariant | null = lookupKey.variant;

View File

@@ -55,6 +55,7 @@ import {
SubscriptionPlan,
SubscriptionRecurring,
SubscriptionStatus,
SubscriptionVariant,
} from './types';
export const CheckoutExtraArgs = z.union([
@@ -509,6 +510,128 @@ export class SubscriptionService {
await manager.deleteStripeSubscription(knownSubscription);
}
async handleRefundedInvoice(
invoiceId: string,
reason: 'refund' | 'dispute_open' | 'dispute_lost' | 'dispute_won'
) {
try {
const invoice = await this.stripe.invoices.retrieve(invoiceId, {
expand: ['subscription', 'customer', 'lines.data.price'],
});
const knownInvoice = await this.parseStripeInvoice(invoice);
if (!knownInvoice) {
this.logger.warn(
`Skip handling ${reason}: unable to parse invoice ${invoiceId}`
);
return;
}
const cancelOnStripe = reason === 'refund' || reason === 'dispute_lost';
const revokeLocal =
reason === 'refund' ||
reason === 'dispute_open' ||
reason === 'dispute_lost';
const restore = reason === 'dispute_won';
const isOneTimeOrLifetime =
knownInvoice.lookupKey.recurring === SubscriptionRecurring.Lifetime ||
knownInvoice.lookupKey.variant === SubscriptionVariant.Onetime;
if (restore) {
if (invoice.subscription) {
const subscription =
typeof invoice.subscription === 'string'
? await this.stripe.subscriptions.retrieve(invoice.subscription, {
expand: ['customer'],
})
: invoice.subscription;
const knownSubscription =
await this.parseStripeSubscription(subscription);
if (!knownSubscription) {
this.logger.warn(
`Skip restore: unable to parse subscription ${invoice.subscription} from invoice ${invoiceId}`
);
return;
}
await this.saveStripeSubscription(subscription);
return;
}
if (
isOneTimeOrLifetime &&
(knownInvoice.lookupKey.plan === SubscriptionPlan.Pro ||
knownInvoice.lookupKey.plan === SubscriptionPlan.AI)
) {
await this.userManager.restoreOnetimeOrLifetime(knownInvoice);
}
return;
}
if (!revokeLocal) {
return;
}
if (invoice.subscription) {
const subscription =
typeof invoice.subscription === 'string'
? await this.stripe.subscriptions.retrieve(invoice.subscription, {
expand: ['customer'],
})
: invoice.subscription;
const knownSubscription =
await this.parseStripeSubscription(subscription);
if (!knownSubscription) {
this.logger.warn(
`Skip handling ${reason}: unable to parse subscription ${invoice.subscription} from invoice ${invoiceId}`
);
return;
}
if (cancelOnStripe) {
try {
await this.stripe.subscriptions.cancel(
knownSubscription.stripeSubscription.id
);
} catch (e) {
this.logger.warn(
`Failed to cancel refunded subscription on Stripe ${knownSubscription.stripeSubscription.id}`,
e
);
}
}
const manager = this.select(knownSubscription.lookupKey.plan);
await manager.deleteStripeSubscription(knownSubscription);
return;
}
if (
isOneTimeOrLifetime &&
(knownInvoice.lookupKey.plan === SubscriptionPlan.Pro ||
knownInvoice.lookupKey.plan === SubscriptionPlan.AI)
) {
await this.userManager.revokeOnetimeOrLifetime(knownInvoice);
return;
}
this.logger.warn(
`Handled ${reason} for invoice ${invoiceId}, but no local subscription to cancel`
);
} catch (e) {
this.logger.error(
`Failed to handle ${reason} for invoice ${invoiceId}`,
e
);
}
}
async getOrCreateCustomer({
userId,
userEmail,
@@ -618,8 +741,16 @@ export class SubscriptionService {
private async parseStripeInvoice(
invoice: Stripe.Invoice
): Promise<KnownStripeInvoice | null> {
const customerEmail =
invoice.customer_email ??
(typeof invoice.customer !== 'string' &&
invoice.customer &&
!invoice.customer.deleted
? (invoice.customer.email ?? null)
: null);
// we can't do anything if we can't recognize the customer
if (!invoice.customer_email) {
if (!customerEmail) {
return null;
}
@@ -638,11 +769,11 @@ export class SubscriptionService {
return null;
}
const user = await this.models.user.getUserByEmail(invoice.customer_email);
const user = await this.models.user.getUserByEmail(customerEmail);
return {
userId: user?.id,
userEmail: invoice.customer_email,
userEmail: customerEmail,
stripeInvoice: invoice,
lookupKey,
metadata: invoice.subscription_details?.metadata ?? {},

View File

@@ -88,6 +88,9 @@ declare global {
'stripe.customer.subscription.created': Stripe.CustomerSubscriptionCreatedEvent;
'stripe.customer.subscription.updated': Stripe.CustomerSubscriptionUpdatedEvent;
'stripe.customer.subscription.deleted': Stripe.CustomerSubscriptionDeletedEvent;
'stripe.charge.refunded': Stripe.ChargeRefundedEvent;
'stripe.charge.dispute.created': Stripe.ChargeDisputeCreatedEvent;
'stripe.charge.dispute.closed': Stripe.ChargeDisputeClosedEvent;
// RevenueCat integration
'revenuecat.webhook': {

View File

@@ -36,6 +36,11 @@ export class StripeWebhook {
| Stripe.InvoicePaidEvent
) {
const invoice = await this.stripe.invoices.retrieve(event.data.object.id);
if (invoice.status === 'void' || invoice.status === 'uncollectible') {
await this.service.handleRefundedInvoice(invoice.id, 'refund');
}
await this.service.saveStripeInvoice(invoice);
}
@@ -60,4 +65,56 @@ export class StripeWebhook {
async onSubscriptionDeleted(event: Stripe.CustomerSubscriptionDeletedEvent) {
await this.service.deleteStripeSubscription(event.data.object);
}
private extractInvoiceId(charge: Stripe.Charge) {
return typeof charge.invoice === 'string'
? charge.invoice
: charge.invoice?.id;
}
@OnEvent('stripe.charge.refunded')
async onChargeRefunded(event: Stripe.ChargeRefundedEvent) {
const charge = event.data.object;
const invoiceId = this.extractInvoiceId(charge);
if (invoiceId) {
await this.service.handleRefundedInvoice(invoiceId, 'refund');
}
}
@OnEvent('stripe.charge.dispute.created')
async onChargeDisputed(event: Stripe.ChargeDisputeCreatedEvent) {
const ref = event.data.object.charge;
if (!ref) return;
const chargeId = typeof ref === 'string' ? ref : ref.id;
const charge = await this.stripe.charges.retrieve(chargeId, {
expand: ['invoice'],
});
const invoiceId = this.extractInvoiceId(charge);
if (invoiceId) {
await this.service.handleRefundedInvoice(invoiceId, 'dispute_open');
}
}
@OnEvent('stripe.charge.dispute.closed')
async onChargeDisputeClosed(event: Stripe.ChargeDisputeClosedEvent) {
const ref = event.data.object.charge;
if (!ref) return;
const chargeId = typeof ref === 'string' ? ref : ref.id;
const status = event.data.object.status;
const charge = await this.stripe.charges.retrieve(chargeId, {
expand: ['invoice'],
});
const invoiceId = this.extractInvoiceId(charge);
if (invoiceId) {
const reason =
status === 'won' ? 'dispute_won' : ('dispute_lost' as const);
await this.service.handleRefundedInvoice(invoiceId, reason);
}
}
}