Files
AFFiNE-Mirror/packages/backend/server/src/plugins/payment/manager/user.ts
T
DarkSky 65c3271beb feat(server): clean up dirty data from legacy version (#15078)
#### PR Dependency Tree


* **PR #15078** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
  * Persist and replay incoming payment webhooks for reliability.
* Track provider-level subscriptions, payment events, and per-target
trial usage across providers.
  * Nightly replay job to reprocess stuck payment events.
* Shadow backfill mode and emit-suppression options to control
projection/backfill side effects.
  * Subscriptions now derived from entitlements + provider facts.

* **Bug Fixes**
* Improved error propagation, retry tracking, and safer owner-grant
projection handling.

* **Tests**
* Added webhook failure/replay, provider integration, entitlement
projection, and trial/checkout tests.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2026-06-04 16:38:44 +08:00

909 lines
26 KiB
TypeScript

import { Injectable, Logger } from '@nestjs/common';
import {
Prisma,
PrismaClient,
Provider,
UserStripeCustomer,
} from '@prisma/client';
import { omit, pick } from 'lodash-es';
import Stripe from 'stripe';
import { z } from 'zod';
import {
Config,
EventBus,
InvalidCheckoutParameters,
ManagedByAppStoreOrPlay,
Mutex,
OnEvent,
SubscriptionAlreadyExists,
SubscriptionPlanNotFound,
TooManyRequest,
URLHelper,
} from '../../../base';
import { EntitlementService } from '../../../core/entitlement';
import { resolveProductMapping, RevenueCatService } from '../revenuecat';
import { StripeFactory } from '../stripe';
import {
KnownStripeInvoice,
KnownStripePrice,
KnownStripeSubscription,
LookupKey,
retriveLookupKeyFromStripeSubscription,
SubscriptionPlan,
SubscriptionRecurring,
SubscriptionStatus,
} from '../types';
import {
activeSubscriptionWhere,
CheckoutParams,
Subscription,
SubscriptionManager,
visibleSubscriptionWhere,
} from './common';
export const UserSubscriptionIdentity = z.object({
plan: z.enum([SubscriptionPlan.Pro, SubscriptionPlan.AI]),
userId: z.string(),
});
export const UserSubscriptionCheckoutArgs = z.object({
user: z.object({
id: z.string(),
email: z.string(),
}),
});
@Injectable()
export class UserSubscriptionManager extends SubscriptionManager {
private readonly logger = new Logger(UserSubscriptionManager.name);
constructor(
stripeProvider: StripeFactory,
db: PrismaClient,
private readonly config: Config,
private readonly event: EventBus,
private readonly url: URLHelper,
private readonly mutex: Mutex,
private readonly entitlement: EntitlementService,
private readonly revenueCat: RevenueCatService
) {
super(stripeProvider, db);
}
async filterPrices(
prices: KnownStripePrice[],
_customer?: UserStripeCustomer
) {
const availablePrices: KnownStripePrice[] = [];
for (const price of prices) {
if (await this.isPriceAvailable(price)) {
availablePrices.push(price);
}
}
return availablePrices;
}
async checkout(
lookupKey: LookupKey,
params: z.infer<typeof CheckoutParams>,
{ user }: z.infer<typeof UserSubscriptionCheckoutArgs>
) {
if (
(lookupKey.plan !== SubscriptionPlan.Pro &&
lookupKey.plan !== SubscriptionPlan.AI) ||
lookupKey.variant !== null
) {
throw new InvalidCheckoutParameters();
}
const active = await this.getVisibleSubscription({
plan: lookupKey.plan,
userId: user.id,
});
await this.assertNoActiveLocalEntitlement(user.id, lookupKey);
if (active?.provider === 'revenuecat') {
throw new ManagedByAppStoreOrPlay();
}
if (
active &&
!this.canCheckoutWithExistingSubscription(active.recurring, lookupKey)
) {
throw new SubscriptionAlreadyExists({ plan: lookupKey.plan });
}
const customer = await this.getOrCreateCustomer(user.id);
const stripeSubscriptions = await this.stripe.subscriptions.list({
customer: customer.stripeCustomerId,
status: 'all',
});
this.assertNoActiveStripeSubscription(stripeSubscriptions.data, lookupKey);
await this.assertNoActiveRevenueCatSubscription(user.id, lookupKey);
const price = await this.getPrice(lookupKey);
if (!price || !(await this.isPriceAvailable(price))) {
throw new SubscriptionPlanNotFound({
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
}
const discounts = await (async () => {
if (params.coupon) {
const couponId = await this.getCouponFromPromotionCode(
params.coupon,
customer
);
if (couponId) {
return { discounts: [{ coupon: couponId }] };
}
}
return { allow_promotion_codes: true };
})();
const subscriptionData = await (async () => {
if (
lookupKey.plan === SubscriptionPlan.AI &&
!(await this.hasUsedTrial(user.id, lookupKey.plan))
) {
return {
trial_period_days: 7,
} as Stripe.Checkout.SessionCreateParams.SubscriptionData;
}
return undefined;
})();
// mode: 'subscription' or 'payment' for lifetime payment
const mode =
lookupKey.recurring === SubscriptionRecurring.Lifetime
? {
mode: 'payment' as const,
invoice_creation: {
enabled: true,
},
}
: {
mode: 'subscription' as const,
subscription_data: subscriptionData,
};
const session = await this.stripe.checkout.sessions.create({
customer: customer.stripeCustomerId,
line_items: [
{
price: price.price.id,
quantity: 1,
},
],
...mode,
...discounts,
success_url: this.url.safeLink(params.successCallbackLink || '/'),
});
if (subscriptionData?.trial_period_days) {
await this.recordTrialUsage({
userId: user.id,
provider: Provider.stripe,
externalRef: session.id,
metadata: { source: 'checkout_session' },
});
}
return session;
}
async getSubscription(args: z.infer<typeof UserSubscriptionIdentity>) {
return this.db.subscription.findFirst({
where: {
targetId: args.userId,
plan: args.plan,
},
});
}
async getActiveSubscription(args: z.infer<typeof UserSubscriptionIdentity>) {
return this.db.subscription.findFirst({
where: {
targetId: args.userId,
plan: args.plan,
...activeSubscriptionWhere(),
},
});
}
async getVisibleSubscription(args: z.infer<typeof UserSubscriptionIdentity>) {
return this.db.subscription.findFirst({
where: {
targetId: args.userId,
plan: args.plan,
...visibleSubscriptionWhere(),
},
});
}
async saveStripeSubscription(subscription: KnownStripeSubscription) {
const { userId, lookupKey, stripeSubscription } = subscription;
this.assertUserIdExists(userId);
// update features first, features modify are idempotent
// so there is no need to skip if a subscription already exists.
if (
stripeSubscription.status === SubscriptionStatus.Active ||
stripeSubscription.status === SubscriptionStatus.Trialing
) {
this.event.emit('user.subscription.activated', {
userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
} else {
this.event.emit('user.subscription.canceled', {
userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
}
const subscriptionData = this.transformSubscription(subscription);
await this.upsertStripeProviderSubscription(subscription, subscriptionData);
if (
lookupKey.plan === SubscriptionPlan.AI &&
(stripeSubscription.status === SubscriptionStatus.Trialing ||
stripeSubscription.trial_start ||
stripeSubscription.trial_end)
) {
await this.recordTrialUsage({
userId,
provider: Provider.stripe,
externalRef: stripeSubscription.id,
metadata: { source: 'stripe_subscription' },
});
}
const existingByStripeId = await this.db.subscription.findUnique({
where: { stripeSubscriptionId: stripeSubscription.id },
});
const saved = existingByStripeId
? await this.db.subscription.update({
where: { id: existingByStripeId.id },
data: pick(subscriptionData, [
'status',
'stripeScheduleId',
'nextBillAt',
'canceledAt',
'end',
]),
})
: await this.db.subscription.upsert({
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
// TODO(stable-upgrade): remove reliance on target_id_plan unique slot after contract cleanup.
where: { targetId_plan: { targetId: userId, plan: lookupKey.plan } },
update: {
...omit(subscriptionData, ['provider', 'iapStore']),
provider: Provider.stripe,
iapStore: null,
rcEntitlement: null,
rcProductId: null,
rcExternalRef: null,
},
create: {
targetId: userId,
...omit(subscriptionData, ['provider', 'iapStore']),
},
});
await this.entitlement.upsertFromCloudSubscription(saved);
return saved;
}
async deleteStripeSubscription({
userId,
lookupKey,
stripeSubscription,
}: KnownStripeSubscription) {
this.assertUserIdExists(userId);
await this.db.providerSubscription.updateMany({
where: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
data: {
status: SubscriptionStatus.Canceled,
canceledAt: new Date(),
periodEnd: new Date(),
},
});
const result = await this.db.subscription.deleteMany({
where: {
stripeSubscriptionId: stripeSubscription.id,
},
});
if (result.count > 0) {
await this.entitlement.revokeCloudSubscription({
targetId: userId,
plan: lookupKey.plan,
stripeSubscriptionId: stripeSubscription.id,
});
this.event.emit('user.subscription.canceled', {
userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
}
}
async cancelSubscription(subscription: Subscription) {
return this.db.subscription.update({
where: {
// @ts-expect-error checked outside
stripeSubscriptionId: subscription.stripeSubscriptionId,
},
data: {
canceledAt: new Date(),
nextBillAt: null,
},
});
}
async resumeSubscription(subscription: Subscription) {
return this.db.subscription.update({
where: {
// @ts-expect-error checked outside
stripeSubscriptionId: subscription.stripeSubscriptionId,
},
data: {
canceledAt: null,
nextBillAt: subscription.end,
},
});
}
async updateSubscriptionRecurring(
subscription: Subscription,
recurring: SubscriptionRecurring
) {
return this.db.subscription.update({
where: {
// @ts-expect-error checked outside
stripeSubscriptionId: subscription.stripeSubscriptionId,
},
data: { recurring },
});
}
async saveInvoice(knownInvoice: KnownStripeInvoice) {
const { userId, lookupKey, stripeInvoice } = knownInvoice;
this.assertUserIdExists(userId);
const invoiceData = await this.transformInvoice(knownInvoice);
await this.upsertStripePaymentEvent(knownInvoice, invoiceData);
const invoice = await this.db.invoice.upsert({
where: {
stripeInvoiceId: stripeInvoice.id,
},
update: omit(invoiceData, 'stripeInvoiceId'),
create: {
targetId: userId,
...invoiceData,
},
});
// Lifetime subscription does not get involved with the Stripe subscription system.
// We track the deal by invoice only.
if (stripeInvoice.status === 'paid') {
await using lock = await this.mutex.acquire(
`redeem-lifetime-subscription:${stripeInvoice.id}`
);
if (!lock) {
throw new TooManyRequest();
}
if (lookupKey.recurring === SubscriptionRecurring.Lifetime) {
await this.saveLifetimeSubscription(knownInvoice);
}
}
return invoice;
}
async saveLifetimeSubscription(knownInvoice: KnownStripeInvoice) {
this.assertUserIdExists(knownInvoice.userId);
// cancel previous non-lifetime subscription
const prevSubscription = await this.db.subscription.findUnique({
where: {
targetId_plan: {
targetId: knownInvoice.userId,
plan: SubscriptionPlan.Pro,
},
},
});
if (prevSubscription) {
if (prevSubscription.stripeSubscriptionId) {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const subscription = await this.db.subscription.update({
where: {
id: prevSubscription.id,
},
data: {
stripeScheduleId: null,
stripeSubscriptionId: null,
plan: knownInvoice.lookupKey.plan,
recurring: SubscriptionRecurring.Lifetime,
start: new Date(),
end: null,
status: SubscriptionStatus.Active,
nextBillAt: null,
},
});
await this.entitlement.upsertFromCloudSubscription(subscription);
await this.stripe.subscriptions.cancel(
prevSubscription.stripeSubscriptionId,
{
prorate: true,
}
);
}
} else {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const subscription = await this.db.subscription.create({
data: {
targetId: knownInvoice.userId,
stripeSubscriptionId: null,
plan: knownInvoice.lookupKey.plan,
recurring: SubscriptionRecurring.Lifetime,
start: new Date(),
end: null,
status: SubscriptionStatus.Active,
nextBillAt: null,
},
});
await this.entitlement.upsertFromCloudSubscription(subscription);
}
this.event.emit('user.subscription.activated', {
userId: knownInvoice.userId,
plan: knownInvoice.lookupKey.plan,
recurring: SubscriptionRecurring.Lifetime,
});
}
async revokeLifetime(knownInvoice: KnownStripeInvoice) {
this.assertUserIdExists(knownInvoice.userId);
const { userId, lookupKey } = knownInvoice;
const subscription = await this.db.subscription.findFirst({
where: {
targetId: userId,
plan: lookupKey.plan,
provider: Provider.stripe,
},
});
if (!subscription) {
return;
}
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
await this.db.subscription.update({
where: {
id: subscription.id,
},
data: {
status: SubscriptionStatus.Canceled,
nextBillAt: null,
canceledAt: new Date(),
},
});
await this.entitlement.revokeCloudSubscription({
targetId: userId,
plan: lookupKey.plan,
subscriptionId: subscription.id,
stripeSubscriptionId: subscription.stripeSubscriptionId,
});
this.event.emit('user.subscription.canceled', {
userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
}
async restoreLifetime(knownInvoice: KnownStripeInvoice) {
this.assertUserIdExists(knownInvoice.userId);
const { userId, lookupKey, stripeInvoice } = knownInvoice;
const subscription = await this.db.subscription.findFirst({
where: {
targetId: userId,
plan: lookupKey.plan,
provider: Provider.stripe,
},
});
const start =
stripeInvoice.lines.data[0]?.period?.start ??
(typeof stripeInvoice.created === 'number'
? stripeInvoice.created
: Date.now() / 1000);
if (subscription) {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const saved = await this.db.subscription.update({
where: { id: subscription.id },
data: {
status: SubscriptionStatus.Active,
canceledAt: null,
nextBillAt: null,
start: subscription.start ?? new Date(start * 1000),
end: null,
},
});
await this.entitlement.upsertFromCloudSubscription(saved);
} else {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const saved = await this.db.subscription.create({
data: {
targetId: userId,
stripeSubscriptionId: null,
...lookupKey,
start: new Date(start * 1000),
end: null,
status: SubscriptionStatus.Active,
nextBillAt: null,
},
});
await this.entitlement.upsertFromCloudSubscription(saved);
}
this.event.emit('user.subscription.activated', {
userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
});
}
private async isPriceAvailable(price: KnownStripePrice) {
if (price.lookupKey.plan === SubscriptionPlan.Pro) {
return this.isProPriceAvailable(price);
}
if (price.lookupKey.plan === SubscriptionPlan.AI) {
return this.isAIPriceAvailable(price);
}
return false;
}
private async isProPriceAvailable({ lookupKey }: KnownStripePrice) {
if (lookupKey.recurring === SubscriptionRecurring.Lifetime) {
return this.config.payment.showLifetimePrice;
}
// no special price for monthly plan
if (lookupKey.recurring === SubscriptionRecurring.Monthly) {
return true;
}
return lookupKey.variant === null;
}
private async isAIPriceAvailable({ lookupKey }: KnownStripePrice) {
// no lifetime price for AI
if (lookupKey.recurring === SubscriptionRecurring.Lifetime) {
return false;
}
return lookupKey.variant === null;
}
private async assertNoActiveLocalEntitlement(
userId: string,
lookupKey: LookupKey
) {
const entitlements = await this.entitlement.getActiveEntitlements(
'user',
userId
);
const existing = entitlements.find(entitlement => {
if (lookupKey.plan === SubscriptionPlan.Pro) {
return (
entitlement.plan === 'pro' || entitlement.plan === 'lifetime_pro'
);
}
if (lookupKey.plan === SubscriptionPlan.AI) {
return entitlement.plan === 'ai';
}
return false;
});
if (!existing) {
return;
}
const metadata = existing.metadata as { provider?: string | null };
if (metadata.provider === Provider.revenuecat) {
throw new ManagedByAppStoreOrPlay();
}
if (
!this.canCheckoutWithExistingSubscription(
(existing.metadata as { recurring?: string | null }).recurring ??
SubscriptionRecurring.Monthly,
lookupKey
)
) {
throw new SubscriptionAlreadyExists({ plan: lookupKey.plan });
}
}
private async hasUsedTrial(userId: string, plan: SubscriptionPlan) {
return !!(await this.db.subscriptionTrialUsage.findUnique({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: userId,
plan,
},
},
select: { id: true },
}));
}
private async recordTrialUsage(input: {
userId: string;
provider: Provider;
externalRef: string | null;
metadata: Record<string, unknown>;
}) {
await this.db.subscriptionTrialUsage.upsert({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: input.userId,
plan: SubscriptionPlan.AI,
},
},
update: {
provider: input.provider,
externalRef: input.externalRef,
metadata: input.metadata as Prisma.InputJsonObject,
},
create: {
targetType: 'user',
targetId: input.userId,
plan: SubscriptionPlan.AI,
provider: input.provider,
externalRef: input.externalRef,
metadata: input.metadata as Prisma.InputJsonObject,
},
});
}
private async upsertStripeProviderSubscription(
known: KnownStripeSubscription,
subscriptionData: Subscription
) {
const { userId, lookupKey, stripeSubscription } = known;
this.assertUserIdExists(userId);
const price = stripeSubscription.items.data[0]?.price;
await this.db.providerSubscription.upsert({
where: {
provider_externalSubscriptionId: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
},
update: {
targetType: 'user',
targetId: userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
create: {
provider: Provider.stripe,
targetType: 'user',
targetId: userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalSubscriptionId: stripeSubscription.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
});
}
private async upsertStripePaymentEvent(
known: KnownStripeInvoice,
invoiceData: Awaited<
ReturnType<UserSubscriptionManager['transformInvoice']>
>
) {
const { userId, lookupKey, stripeInvoice } = known;
this.assertUserIdExists(userId);
await this.db.paymentEvent.upsert({
where: {
provider_externalEventId: {
provider: Provider.stripe,
externalEventId: `stripe_invoice:${stripeInvoice.id}`,
},
},
update: {
eventType: `invoice.${invoiceData.status}`,
targetType: 'user',
targetId: userId,
externalInvoiceId: stripeInvoice.id,
plan: lookupKey.plan,
amount: invoiceData.amount,
currency: invoiceData.currency,
occurredAt:
typeof stripeInvoice.created === 'number'
? new Date(stripeInvoice.created * 1000)
: undefined,
processingStatus: 'processed',
processedAt: new Date(),
metadata: known.metadata,
},
create: {
provider: Provider.stripe,
eventType: `invoice.${invoiceData.status}`,
externalEventId: `stripe_invoice:${stripeInvoice.id}`,
targetType: 'user',
targetId: userId,
externalInvoiceId: stripeInvoice.id,
plan: lookupKey.plan,
amount: invoiceData.amount,
currency: invoiceData.currency,
occurredAt:
typeof stripeInvoice.created === 'number'
? new Date(stripeInvoice.created * 1000)
: undefined,
processingStatus: 'processed',
processedAt: new Date(),
metadata: known.metadata,
},
});
}
private isCurrentStripeSubscription(subscription: Stripe.Subscription) {
return [
SubscriptionStatus.Active,
SubscriptionStatus.Trialing,
SubscriptionStatus.PastDue,
].includes(subscription.status as SubscriptionStatus);
}
private canCheckoutWithExistingSubscription(
existingRecurring: string,
lookupKey: LookupKey
) {
return (
existingRecurring !== SubscriptionRecurring.Lifetime &&
lookupKey.recurring === SubscriptionRecurring.Lifetime
);
}
private assertNoActiveStripeSubscription(
subscriptions: Stripe.Subscription[],
lookupKey: LookupKey
) {
for (const subscription of subscriptions) {
if (!this.isCurrentStripeSubscription(subscription)) {
continue;
}
const activeLookupKey =
retriveLookupKeyFromStripeSubscription(subscription);
if (
activeLookupKey?.plan === lookupKey.plan &&
!this.canCheckoutWithExistingSubscription(
activeLookupKey.recurring,
lookupKey
)
) {
throw new SubscriptionAlreadyExists({ plan: lookupKey.plan });
}
}
}
private async assertNoActiveRevenueCatSubscription(
userId: string,
lookupKey: LookupKey
) {
if (!this.config.payment.revenuecat?.enabled) {
return;
}
let subscriptions: Awaited<
ReturnType<RevenueCatService['getSubscriptions']>
>;
try {
subscriptions = await this.revenueCat.getSubscriptions(userId);
} catch (e) {
this.logger.warn(
`Failed to fetch RevenueCat subscriptions for ${userId}`,
e
);
return;
}
const productMap = this.config.payment.revenuecat?.productMap;
if (
subscriptions?.some(subscription => {
if (!subscription.isActive) return false;
const mapping = resolveProductMapping(subscription, productMap);
return mapping?.plan === lookupKey.plan;
})
) {
throw new ManagedByAppStoreOrPlay();
}
}
private assertUserIdExists(
userId: string | undefined
): asserts userId is string {
if (!userId) {
throw new Error('user should exists for stripe subscription or invoice.');
}
}
@OnEvent('user.deleted')
async onUserDeleted({ id }: Events['user.deleted']) {
const subscription = await this.db.subscription.findFirst({
where: {
targetId: id,
},
});
if (subscription?.stripeSubscriptionId) {
await this.stripe.subscriptions.cancel(subscription.stripeSubscriptionId);
}
}
}