mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-04 00:28:33 +00:00
feat(server): refresh subscription if event is from anonymous account (#13840)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { Body, Controller, Headers, Logger, Post } from '@nestjs/common';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { Config, EventBus } from '../../../base';
|
||||
import { Config, EventBus, JobQueue } from '../../../base';
|
||||
import { Public } from '../../../core/auth';
|
||||
import { FeatureService } from '../../../core/features';
|
||||
import { Models } from '../../../models';
|
||||
@@ -55,6 +55,7 @@ export class RevenueCatWebhookController {
|
||||
constructor(
|
||||
private readonly config: Config,
|
||||
private readonly event: EventBus,
|
||||
private readonly queue: JobQueue,
|
||||
private readonly models: Models,
|
||||
private readonly feature: FeatureService
|
||||
) {}
|
||||
@@ -82,11 +83,12 @@ export class RevenueCatWebhookController {
|
||||
appUserId,
|
||||
familyShare: event.is_family_share,
|
||||
environment: event.environment,
|
||||
transactionId: event.transaction_id,
|
||||
};
|
||||
this.logger.log(
|
||||
`[${id}] RevenueCat Webhook {${type}} received for appUserId=${appUserId}.`
|
||||
);
|
||||
if (appUserId) {
|
||||
if (appUserId && !appUserId.startsWith('$RCAnonymousID:')) {
|
||||
const user = await this.models.user.get(appUserId);
|
||||
if (user) {
|
||||
if (
|
||||
@@ -112,6 +114,19 @@ export class RevenueCatWebhookController {
|
||||
);
|
||||
}
|
||||
}
|
||||
} else if (event.transaction_id) {
|
||||
await this.queue
|
||||
.add('nightly.revenuecat.subscription.refresh.anonymous', {
|
||||
externalRef: event.transaction_id,
|
||||
startTime: Date.now(),
|
||||
})
|
||||
.catch((e: Error) => {
|
||||
this.logger.error(
|
||||
'Failed to handle RevenueCat Webhook event.',
|
||||
e
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
this.logger.warn(
|
||||
`RevenueCat Webhook received for unknown user`,
|
||||
|
||||
@@ -48,6 +48,7 @@ const zRcV2RawSubscription = z
|
||||
.object({
|
||||
object: z.enum(['subscription']),
|
||||
id: z.string().nonempty(),
|
||||
customer_id: z.string().nonempty().nullish(),
|
||||
product_id: z.string().nonempty().nullable(),
|
||||
entitlements: zRcV2RawEntitlements,
|
||||
starts_at: z.number(),
|
||||
@@ -75,7 +76,7 @@ const zRcV2RawSubscription = z
|
||||
})
|
||||
.passthrough();
|
||||
|
||||
const zRcV2RawEnvelope = z
|
||||
const zRcV2RawSubscriptionEnvelope = z
|
||||
.object({
|
||||
app_user_id: z.string().optional(),
|
||||
id: z.string().optional(),
|
||||
@@ -83,6 +84,20 @@ const zRcV2RawEnvelope = z
|
||||
})
|
||||
.passthrough();
|
||||
|
||||
const zRcV2RawCustomerAlias = z
|
||||
.object({
|
||||
object: z.literal('customer.alias'),
|
||||
id: z.string().nonempty(),
|
||||
created_at: z.number(),
|
||||
})
|
||||
.passthrough();
|
||||
|
||||
const zRcV2RawCustomerAliasEnvelope = z
|
||||
.object({
|
||||
items: z.array(zRcV2RawCustomerAlias).default([]),
|
||||
})
|
||||
.passthrough();
|
||||
|
||||
// v2 minimal, simplified structure exposed to callers
|
||||
export const Subscription = z.object({
|
||||
identifier: z.string(),
|
||||
@@ -90,6 +105,7 @@ export const Subscription = z.object({
|
||||
isActive: z.boolean(),
|
||||
latestPurchaseDate: z.date().nullable(),
|
||||
expirationDate: z.date().nullable(),
|
||||
customerId: z.string().optional(),
|
||||
productId: z.string(),
|
||||
store: Store,
|
||||
willRenew: z.boolean(),
|
||||
@@ -166,6 +182,40 @@ export class RevenueCatService {
|
||||
return null;
|
||||
}
|
||||
|
||||
async getCustomerAlias(customerId: string): Promise<string[] | null> {
|
||||
const res = await fetch(
|
||||
`https://api.revenuecat.com/v2/projects/${this.projectId}/customers/${customerId}/aliases`,
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Bearer ${this.apiKey}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
if (!res.ok) {
|
||||
const text = await res.text();
|
||||
throw new Error(
|
||||
`RevenueCat getCustomerAlias failed: ${res.status} ${res.statusText} - ${text}`
|
||||
);
|
||||
}
|
||||
|
||||
const json = await res.json();
|
||||
const customerParsed = zRcV2RawCustomerAliasEnvelope.safeParse(json);
|
||||
|
||||
if (customerParsed.success) {
|
||||
return customerParsed.data.items
|
||||
.map(alias => alias.id)
|
||||
.filter(id => !id.startsWith('$RCAnonymousID:'));
|
||||
}
|
||||
this.logger.error(
|
||||
`RevenueCat customer ${customerId} parse failed: ${JSON.stringify(
|
||||
customerParsed.error.format()
|
||||
)}`
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
async getSubscriptionByExternalRef(
|
||||
externalRef: string
|
||||
): Promise<Subscription[] | null> {
|
||||
@@ -182,12 +232,12 @@ export class RevenueCatService {
|
||||
if (!res.ok) {
|
||||
const text = await res.text();
|
||||
throw new Error(
|
||||
`RevenueCat getSubscriber failed: ${res.status} ${res.statusText} - ${text}`
|
||||
`RevenueCat getSubscriptionByExternalRef failed: ${res.status} ${res.statusText} - ${text}`
|
||||
);
|
||||
}
|
||||
|
||||
const json = await res.json();
|
||||
const envParsed = zRcV2RawEnvelope.safeParse(json);
|
||||
const envParsed = zRcV2RawSubscriptionEnvelope.safeParse(json);
|
||||
|
||||
if (envParsed.success) {
|
||||
const parsedSubs = await Promise.all(
|
||||
@@ -222,7 +272,7 @@ export class RevenueCatService {
|
||||
}
|
||||
|
||||
const json = await res.json();
|
||||
const envParsed = zRcV2RawEnvelope.safeParse(json);
|
||||
const envParsed = zRcV2RawSubscriptionEnvelope.safeParse(json);
|
||||
|
||||
if (envParsed.success) {
|
||||
const parsedSubs = await Promise.all(
|
||||
@@ -248,7 +298,8 @@ export class RevenueCatService {
|
||||
const product = products.find(p => p.id === sub.product_id);
|
||||
if (!product) {
|
||||
this.logger.warn(
|
||||
`RevenueCat subscription ${sub.id} missing product for product_id=${sub.product_id}`
|
||||
`RevenueCat subscription ${sub.id} missing product for product_id=${sub.product_id}`,
|
||||
products
|
||||
);
|
||||
return null;
|
||||
}
|
||||
@@ -264,6 +315,7 @@ export class RevenueCatService {
|
||||
expirationDate: sub.current_period_ends_at
|
||||
? new Date(sub.current_period_ends_at)
|
||||
: null,
|
||||
customerId: sub.customer_id || undefined,
|
||||
productId: product.store_identifier,
|
||||
store: sub.store ?? product.app?.type,
|
||||
willRenew: sub.auto_renewal_status === 'will_renew',
|
||||
|
||||
@@ -1,12 +1,24 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { IapStore, PrismaClient, Provider } from '@prisma/client';
|
||||
|
||||
import { Config, EventBus, OneMinute, OnEvent } from '../../../base';
|
||||
import {
|
||||
Config,
|
||||
EventBus,
|
||||
JOB_SIGNAL,
|
||||
JobQueue,
|
||||
OneMinute,
|
||||
OnEvent,
|
||||
OnJob,
|
||||
sleep,
|
||||
} from '../../../base';
|
||||
import { SubscriptionStatus } from '../types';
|
||||
import { RcEvent } from './controller';
|
||||
import { resolveProductMapping } from './map';
|
||||
import { RevenueCatService, Subscription } from './service';
|
||||
|
||||
const REFRESH_INTERVAL = 5 * 1000; // 5 seconds
|
||||
const REFRESH_MAX_TIMES = 10 * OneMinute;
|
||||
|
||||
@Injectable()
|
||||
export class RevenueCatWebhookHandler {
|
||||
private readonly logger = new Logger(RevenueCatWebhookHandler.name);
|
||||
@@ -15,7 +27,8 @@ export class RevenueCatWebhookHandler {
|
||||
private readonly rc: RevenueCatService,
|
||||
private readonly db: PrismaClient,
|
||||
private readonly config: Config,
|
||||
private readonly event: EventBus
|
||||
private readonly event: EventBus,
|
||||
private readonly queue: JobQueue
|
||||
) {}
|
||||
|
||||
@OnEvent('revenuecat.webhook')
|
||||
@@ -39,39 +52,47 @@ export class RevenueCatWebhookHandler {
|
||||
>;
|
||||
try {
|
||||
subscriptions = await this.rc.getSubscriptionByExternalRef(externalRef);
|
||||
if (!subscriptions) return;
|
||||
if (!subscriptions) {
|
||||
throw new Error(`No transaction found: ${externalRef}`);
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to fetch RC subscriptions for ${appUserId} by ${externalRef}`,
|
||||
e
|
||||
);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
await this.syncSubscription(
|
||||
const success = await this.syncSubscription(
|
||||
appUserId,
|
||||
subscriptions,
|
||||
undefined,
|
||||
externalRef,
|
||||
new Date(Date.now() + 10 * OneMinute) // expire after 10 minutes
|
||||
);
|
||||
await this.queue.add('nightly.revenuecat.subscription.refresh', {
|
||||
userId: appUserId,
|
||||
startTime: Date.now(),
|
||||
});
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
// Exposed for reuse by reconcile job
|
||||
async syncAppUser(appUserId: string, event?: RcEvent) {
|
||||
async syncAppUser(appUserId: string, event?: RcEvent): Promise<boolean> {
|
||||
// Pull latest state to be resilient to reorder/duplicate events
|
||||
let subscriptions: Awaited<
|
||||
ReturnType<RevenueCatService['getSubscriptions']>
|
||||
>;
|
||||
try {
|
||||
subscriptions = await this.rc.getSubscriptions(appUserId);
|
||||
if (!subscriptions) return;
|
||||
if (!subscriptions) return false;
|
||||
} catch (e) {
|
||||
this.logger.error(`Failed to fetch RC subscription for ${appUserId}`, e);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
await this.syncSubscription(appUserId, subscriptions, event);
|
||||
return await this.syncSubscription(appUserId, subscriptions, event);
|
||||
}
|
||||
|
||||
private async syncSubscription(
|
||||
@@ -80,10 +101,26 @@ export class RevenueCatWebhookHandler {
|
||||
event?: RcEvent,
|
||||
externalRef?: string,
|
||||
overrideExpirationDate?: Date
|
||||
) {
|
||||
): Promise<boolean> {
|
||||
const productOverride = this.config.payment.revenuecat?.productMap;
|
||||
|
||||
let success = 0;
|
||||
for (const sub of subscriptions) {
|
||||
if (!sub.customerId) {
|
||||
this.logger.warn(`RevenueCat subscription missing customerId`, {
|
||||
subscription: sub,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
const customerAlias = await this.rc.getCustomerAlias(sub.customerId);
|
||||
if (customerAlias && !customerAlias.includes(appUserId)) {
|
||||
this.logger.warn(`RevenueCat subscription customer alias mismatch`, {
|
||||
customerId: sub.customerId,
|
||||
customerAlias,
|
||||
appUserId,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
const mapping = resolveProductMapping(sub, productOverride);
|
||||
// ignore non-whitelisted and non-fallbackable products
|
||||
if (!mapping) continue;
|
||||
@@ -197,6 +234,7 @@ export class RevenueCatWebhookHandler {
|
||||
plan: mapping.plan,
|
||||
recurring: mapping.recurring,
|
||||
});
|
||||
success += 1;
|
||||
} else if (status !== SubscriptionStatus.PastDue) {
|
||||
// Do not emit canceled for PastDue (still within retry/grace window)
|
||||
this.event.emit('user.subscription.canceled', {
|
||||
@@ -206,6 +244,7 @@ export class RevenueCatWebhookHandler {
|
||||
});
|
||||
}
|
||||
}
|
||||
return success > 0;
|
||||
}
|
||||
|
||||
private pickExternalRef(e?: RcEvent): string | null {
|
||||
@@ -270,4 +309,90 @@ export class RevenueCatWebhookHandler {
|
||||
deleteInstead: true,
|
||||
};
|
||||
}
|
||||
|
||||
@OnJob('nightly.revenuecat.subscription.refresh.anonymous')
|
||||
async onSubscriptionRefreshAnonymousUser(evt: {
|
||||
externalRef: string;
|
||||
startTime: number;
|
||||
}) {
|
||||
if (!this.config.payment.revenuecat?.enabled) return;
|
||||
if (Date.now() - evt.startTime > REFRESH_MAX_TIMES) {
|
||||
this.logger.warn(
|
||||
`RevenueCat subscription refresh timed out for externalRef ${evt.externalRef}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
const startTime = Date.now();
|
||||
try {
|
||||
const subscriptions = await this.rc.getSubscriptionByExternalRef(
|
||||
evt.externalRef
|
||||
);
|
||||
let success = 0;
|
||||
if (subscriptions) {
|
||||
for (const sub of subscriptions) {
|
||||
if (!sub.customerId) {
|
||||
this.logger.warn(`RevenueCat subscription missing customerId`, {
|
||||
subscription: sub,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
const customerAlias = await this.rc.getCustomerAlias(sub.customerId);
|
||||
if (customerAlias) {
|
||||
if (
|
||||
customerAlias.length === 0 ||
|
||||
customerAlias.length > 1 ||
|
||||
!customerAlias[0]
|
||||
) {
|
||||
this.logger.warn(
|
||||
`RevenueCat anonymous subscription has invalid customer alias`,
|
||||
{ customerId: sub.customerId, customerAlias }
|
||||
);
|
||||
continue;
|
||||
}
|
||||
const appUserId = customerAlias[0];
|
||||
const saved = await this.syncSubscription(
|
||||
appUserId,
|
||||
[sub],
|
||||
undefined,
|
||||
evt.externalRef
|
||||
);
|
||||
if (saved) success += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (success > 0) return;
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to fetch RC anonymous subscriptions by ${evt.externalRef}`,
|
||||
e
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const elapsed = Date.now() - startTime;
|
||||
if (elapsed < REFRESH_INTERVAL) {
|
||||
await sleep(REFRESH_INTERVAL - elapsed);
|
||||
}
|
||||
return JOB_SIGNAL.Retry;
|
||||
}
|
||||
|
||||
@OnJob('nightly.revenuecat.subscription.refresh')
|
||||
async onSubscriptionRefresh(evt: { userId: string; startTime: number }) {
|
||||
if (!this.config.payment.revenuecat?.enabled) return;
|
||||
if (Date.now() - evt.startTime > REFRESH_MAX_TIMES) {
|
||||
this.logger.warn(
|
||||
`RevenueCat subscription refresh timed out for user ${evt.userId}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
const startTime = Date.now();
|
||||
const success = await this.syncAppUser(evt.userId);
|
||||
if (success) return;
|
||||
|
||||
const elapsed = Date.now() - startTime;
|
||||
if (elapsed < REFRESH_INTERVAL) {
|
||||
await sleep(REFRESH_INTERVAL - elapsed);
|
||||
}
|
||||
return JOB_SIGNAL.Retry;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,6 +95,17 @@ declare global {
|
||||
event: RcEvent;
|
||||
};
|
||||
}
|
||||
|
||||
interface Jobs {
|
||||
'nightly.revenuecat.subscription.refresh': {
|
||||
userId: User['id'];
|
||||
startTime: number;
|
||||
};
|
||||
'nightly.revenuecat.subscription.refresh.anonymous': {
|
||||
externalRef: string;
|
||||
startTime: number;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export interface LookupKey {
|
||||
|
||||
Reference in New Issue
Block a user