feat(server): clean up dirty data from legacy version (#15078)

#### PR Dependency Tree


* **PR #15078** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
  * Persist and replay incoming payment webhooks for reliability.
* Track provider-level subscriptions, payment events, and per-target
trial usage across providers.
  * Nightly replay job to reprocess stuck payment events.
* Shadow backfill mode and emit-suppression options to control
projection/backfill side effects.
  * Subscriptions now derived from entitlements + provider facts.

* **Bug Fixes**
* Improved error propagation, retry tracking, and safer owner-grant
projection handling.

* **Tests**
* Added webhook failure/replay, provider integration, entitlement
projection, and trial/checkout tests.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-06-04 16:38:44 +08:00
committed by GitHub
parent 489702eb66
commit 65c3271beb
24 changed files with 2359 additions and 175 deletions
@@ -637,6 +637,18 @@ BEGIN
RAISE EXCEPTION 'Cannot project unknown doc role % for %.% user %', NEW.type, NEW.workspace_id, NEW.page_id, NEW.user_id;
END IF;
IF projected_role = 'owner' AND EXISTS (
SELECT 1
FROM doc_grants
WHERE workspace_id = NEW.workspace_id
AND doc_id = NEW.page_id
AND principal_type = 'user'
AND role = 'owner'
AND principal_id <> NEW.user_id
) THEN
RETURN NEW;
END IF;
INSERT INTO doc_grants (
workspace_id,
doc_id,
@@ -0,0 +1,103 @@
-- CreateTable
CREATE TABLE "provider_subscriptions" (
"id" VARCHAR NOT NULL,
"provider" "Provider" NOT NULL,
"target_type" TEXT NOT NULL,
"target_id" VARCHAR NOT NULL,
"plan" VARCHAR(20) NOT NULL,
"recurring" VARCHAR(20),
"status" VARCHAR(20) NOT NULL,
"external_customer_id" VARCHAR,
"external_subscription_id" VARCHAR,
"external_product_id" VARCHAR,
"external_price_id" VARCHAR,
"iap_store" "IapStore",
"external_ref" VARCHAR,
"currency" VARCHAR(3),
"amount" INTEGER,
"quantity" INTEGER,
"period_start" TIMESTAMPTZ(3),
"period_end" TIMESTAMPTZ(3),
"trial_start" TIMESTAMPTZ(3),
"trial_end" TIMESTAMPTZ(3),
"canceled_at" TIMESTAMPTZ(3),
"metadata" JSONB NOT NULL DEFAULT '{}',
"created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "provider_subscriptions_pkey" PRIMARY KEY ("id"),
CONSTRAINT "provider_subscriptions_target_type_check" CHECK ("target_type" IN ('user', 'workspace', 'instance')),
CONSTRAINT "provider_subscriptions_stripe_identity_check" CHECK ("provider" <> 'stripe' OR "external_subscription_id" IS NOT NULL),
CONSTRAINT "provider_subscriptions_revenuecat_identity_check" CHECK ("provider" <> 'revenuecat' OR ("iap_store" IS NOT NULL AND "external_ref" IS NOT NULL AND "external_product_id" IS NOT NULL AND "external_customer_id" IS NOT NULL))
);
-- CreateTable
CREATE TABLE "payment_events" (
"id" VARCHAR NOT NULL,
"provider" "Provider" NOT NULL,
"event_type" VARCHAR NOT NULL,
"external_event_id" VARCHAR NOT NULL,
"target_type" TEXT,
"target_id" VARCHAR,
"external_invoice_id" VARCHAR,
"external_payment_id" VARCHAR,
"plan" VARCHAR(20),
"amount" INTEGER,
"currency" VARCHAR(3),
"occurred_at" TIMESTAMPTZ(3),
"processing_status" VARCHAR(20) NOT NULL DEFAULT 'pending',
"processing_attempts" INTEGER NOT NULL DEFAULT 0,
"processed_at" TIMESTAMPTZ(3),
"last_error" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
"created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "payment_events_pkey" PRIMARY KEY ("id"),
CONSTRAINT "payment_events_target_type_check" CHECK ("target_type" IS NULL OR "target_type" IN ('user', 'workspace', 'instance')),
CONSTRAINT "payment_events_processing_status_check" CHECK ("processing_status" IN ('pending', 'processing', 'processed', 'failed'))
);
-- CreateTable
CREATE TABLE "subscription_trial_usages" (
"id" VARCHAR NOT NULL,
"target_type" TEXT NOT NULL,
"target_id" VARCHAR NOT NULL,
"plan" VARCHAR(20) NOT NULL,
"provider" "Provider" NOT NULL,
"external_ref" VARCHAR,
"first_used_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"metadata" JSONB NOT NULL DEFAULT '{}',
"created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "subscription_trial_usages_pkey" PRIMARY KEY ("id"),
CONSTRAINT "subscription_trial_usages_target_type_check" CHECK ("target_type" IN ('user', 'workspace', 'instance'))
);
-- CreateIndex
CREATE INDEX "provider_subscriptions_target_type_target_id_plan_status_idx" ON "provider_subscriptions"("target_type", "target_id", "plan", "status");
-- CreateIndex
CREATE INDEX "provider_subscriptions_provider_external_customer_id_idx" ON "provider_subscriptions"("provider", "external_customer_id");
-- CreateIndex
CREATE UNIQUE INDEX "provider_subscriptions_provider_external_subscription_id_key" ON "provider_subscriptions"("provider", "external_subscription_id");
-- CreateIndex
CREATE UNIQUE INDEX "provider_subscriptions_revenuecat_external_identity_key" ON "provider_subscriptions"("provider", "iap_store", "external_ref", "external_product_id", "external_customer_id");
-- CreateIndex
CREATE UNIQUE INDEX "payment_events_provider_external_event_id_key" ON "payment_events"("provider", "external_event_id");
-- CreateIndex
CREATE INDEX "payment_events_processing_status_updated_at_idx" ON "payment_events"("processing_status", "updated_at");
-- CreateIndex
CREATE INDEX "payment_events_target_type_target_id_idx" ON "payment_events"("target_type", "target_id");
-- CreateIndex
CREATE UNIQUE INDEX "subscription_trial_usages_target_type_target_id_plan_key" ON "subscription_trial_usages"("target_type", "target_id", "plan");
-- CreateIndex
CREATE INDEX "subscription_trial_usages_provider_external_ref_idx" ON "subscription_trial_usages"("provider", "external_ref");
+77
View File
@@ -1117,6 +1117,39 @@ model Subscription {
@@map("subscriptions")
}
model ProviderSubscription {
id String @id @default(uuid()) @db.VarChar
provider Provider
targetType String @map("target_type") @db.Text
targetId String @map("target_id") @db.VarChar
plan String @db.VarChar(20)
recurring String? @db.VarChar(20)
status String @db.VarChar(20)
externalCustomerId String? @map("external_customer_id") @db.VarChar
externalSubscriptionId String? @map("external_subscription_id") @db.VarChar
externalProductId String? @map("external_product_id") @db.VarChar
externalPriceId String? @map("external_price_id") @db.VarChar
iapStore IapStore? @map("iap_store")
externalRef String? @map("external_ref") @db.VarChar
currency String? @db.VarChar(3)
amount Int? @db.Integer
quantity Int? @db.Integer
periodStart DateTime? @map("period_start") @db.Timestamptz(3)
periodEnd DateTime? @map("period_end") @db.Timestamptz(3)
trialStart DateTime? @map("trial_start") @db.Timestamptz(3)
trialEnd DateTime? @map("trial_end") @db.Timestamptz(3)
canceledAt DateTime? @map("canceled_at") @db.Timestamptz(3)
metadata Json @default("{}") @db.JsonB
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3)
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(3)
@@unique([provider, externalSubscriptionId])
@@unique([provider, iapStore, externalRef, externalProductId, externalCustomerId])
@@index([targetType, targetId, plan, status])
@@index([provider, externalCustomerId])
@@map("provider_subscriptions")
}
enum Provider {
stripe
revenuecat
@@ -1148,6 +1181,50 @@ model Invoice {
@@map("invoices")
}
model PaymentEvent {
id String @id @default(uuid()) @db.VarChar
provider Provider
eventType String @map("event_type") @db.VarChar
externalEventId String @map("external_event_id") @db.VarChar
targetType String? @map("target_type") @db.Text
targetId String? @map("target_id") @db.VarChar
externalInvoiceId String? @map("external_invoice_id") @db.VarChar
externalPaymentId String? @map("external_payment_id") @db.VarChar
plan String? @db.VarChar(20)
amount Int? @db.Integer
currency String? @db.VarChar(3)
occurredAt DateTime? @map("occurred_at") @db.Timestamptz(3)
processingStatus String @default("pending") @map("processing_status") @db.VarChar(20)
processingAttempts Int @default(0) @map("processing_attempts") @db.Integer
processedAt DateTime? @map("processed_at") @db.Timestamptz(3)
lastError String? @map("last_error") @db.Text
metadata Json @default("{}") @db.JsonB
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3)
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(3)
@@unique([provider, externalEventId])
@@index([processingStatus, updatedAt])
@@index([targetType, targetId])
@@map("payment_events")
}
model SubscriptionTrialUsage {
id String @id @default(uuid()) @db.VarChar
targetType String @map("target_type") @db.Text
targetId String @map("target_id") @db.VarChar
plan String @db.VarChar(20)
provider Provider
externalRef String? @map("external_ref") @db.VarChar
firstUsedAt DateTime @default(now()) @map("first_used_at") @db.Timestamptz(3)
metadata Json @default("{}") @db.JsonB
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3)
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(3)
@@unique([targetType, targetId, plan])
@@index([provider, externalRef])
@@map("subscription_trial_usages")
}
model License {
key String @id @map("key") @db.VarChar
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3)
@@ -23,6 +23,33 @@ test.after.always(async () => {
await module.close();
});
test('payment provider facts migration makes nullable provider identities explicit', t => {
const migration = readFileSync(
join(
process.cwd(),
'migrations/20260604000000_payment_provider_facts/migration.sql'
),
'utf8'
);
t.regex(
migration,
/provider_subscriptions_stripe_identity_check[\s\S]*"provider" <> 'stripe' OR "external_subscription_id" IS NOT NULL/
);
t.regex(
migration,
/provider_subscriptions_revenuecat_identity_check[\s\S]*"provider" <> 'revenuecat' OR \("iap_store" IS NOT NULL AND "external_ref" IS NOT NULL AND "external_product_id" IS NOT NULL AND "external_customer_id" IS NOT NULL\)/
);
t.regex(
migration,
/CREATE UNIQUE INDEX "provider_subscriptions_provider_external_subscription_id_key" ON "provider_subscriptions"\("provider", "external_subscription_id"\)/
);
t.regex(
migration,
/CREATE UNIQUE INDEX "provider_subscriptions_revenuecat_external_identity_key" ON "provider_subscriptions"\("provider", "iap_store", "external_ref", "external_product_id", "external_customer_id"\)/
);
});
class TestPermissionProjectionModel extends PermissionProjectionModel {
constructor(private readonly fakeDb: unknown) {
super();
@@ -1,13 +1,15 @@
import { PrismaClient } from '@prisma/client';
import ava, { TestFn } from 'ava';
import { CryptoHelper, EventBus } from '../../base';
import { CryptoHelper, EventBus, JobQueue } from '../../base';
import { EntitlementService } from '../../core/entitlement';
import { WorkspacePolicyService } from '../../core/permission';
import { QuotaStateService } from '../../core/quota/state';
import { WorkspaceService } from '../../core/workspaces';
import { Models } from '../../models';
import { licenseClient, LicenseService } from '../../plugins/license/service';
import { StripeWebhookController } from '../../plugins/payment/controller';
import { SubscriptionCronJobs } from '../../plugins/payment/cron';
import { PaymentEventHandlers } from '../../plugins/payment/event';
import {
SubscriptionPlan,
@@ -196,3 +198,269 @@ test('recurring selfhost license activation returns activation projection withou
},
]);
});
test('stripe webhook persists failed async processing for retry visibility', async t => {
const event = {
id: 'evt_1',
type: 'invoice.paid',
created: 1710000000,
data: { object: { id: 'in_1' } },
};
const updates: unknown[] = [];
const db = {
paymentEvent: {
findUnique: async () => null,
create: async (input: unknown) => {
updates.push(input);
return { id: 'payment_event_1' };
},
updateMany: async (input: unknown) => {
updates.push(input);
return { count: 1 };
},
update: async (input: unknown) => {
updates.push(input);
return {};
},
},
} as unknown as PrismaClient;
const controller = new StripeWebhookController(
{ payment: { stripe: { webhookKey: 'whsec' } } } as never,
db,
{
stripe: {
webhooks: {
constructEvent: () => event,
},
},
} as never,
{
emitAsync: async () => {
throw new Error('handler failed');
},
} as unknown as EventBus
);
await controller.handleWebhook({
rawBody: Buffer.from('{}'),
headers: { 'stripe-signature': 'sig' },
} as never);
await new Promise(resolve => setImmediate(resolve));
t.like(updates[0], {
data: {
provider: 'stripe',
eventType: 'invoice.paid',
externalEventId: 'evt_1',
},
});
t.deepEqual(
updates.slice(1).map(update => (update as { data: unknown }).data),
[
{
processingStatus: 'processing',
processingAttempts: { increment: 1 },
},
{
processingStatus: 'failed',
lastError: 'handler failed',
},
]
);
});
test('stripe webhook skips already processed events', async t => {
const event = {
id: 'evt_processed',
type: 'invoice.paid',
created: 1710000000,
data: { object: { id: 'in_1' } },
};
const controller = new StripeWebhookController(
{ payment: { stripe: { webhookKey: 'whsec' } } } as never,
{
paymentEvent: {
findUnique: async () => ({
id: 'payment_event_processed',
processingStatus: 'processed',
}),
},
} as unknown as PrismaClient,
{
stripe: {
webhooks: {
constructEvent: () => event,
},
},
} as never,
{
emitAsync: async () => {
t.fail('processed event should not be emitted again');
},
} as unknown as EventBus
);
await controller.handleWebhook({
rawBody: Buffer.from('{}'),
headers: { 'stripe-signature': 'sig' },
} as never);
await new Promise(resolve => setImmediate(resolve));
t.pass();
});
test('stripe webhook skips events already claimed by another processor', async t => {
const event = {
id: 'evt_claimed',
type: 'invoice.paid',
created: 1710000000,
data: { object: { id: 'in_1' } },
};
const controller = new StripeWebhookController(
{ payment: { stripe: { webhookKey: 'whsec' } } } as never,
{
paymentEvent: {
findUnique: async () => null,
create: async () => ({ id: 'payment_event_claimed' }),
updateMany: async () => ({ count: 0 }),
},
} as unknown as PrismaClient,
{
stripe: {
webhooks: {
constructEvent: () => event,
},
},
} as never,
{
emitAsync: async () => {
t.fail('unclaimed event should not be emitted');
},
} as unknown as EventBus
);
await controller.handleWebhook({
rawBody: Buffer.from('{}'),
headers: { 'stripe-signature': 'sig' },
} as never);
await new Promise(resolve => setImmediate(resolve));
t.pass();
});
test('stripe webhook replay job reprocesses pending events', async t => {
const updates: unknown[] = [];
const emitted: unknown[] = [];
let findManyInput: unknown;
const cron = new SubscriptionCronJobs(
{
paymentEvent: {
findMany: async (input: unknown) => {
findManyInput = input;
return [
{
id: 'payment_event_1',
eventType: 'invoice.paid',
metadata: { id: 'in_1' },
},
];
},
updateMany: async (input: unknown) => {
updates.push(input);
return { count: 1 };
},
update: async (input: unknown) => {
updates.push(input);
return {};
},
},
} as unknown as PrismaClient,
{
emitAsync: async (name: string, payload: unknown) => {
emitted.push({ name, payload });
},
} as unknown as EventBus,
{} as unknown as JobQueue,
{} as never,
{} as never,
{} as never,
{} as never
);
await cron.replayStripeWebhookEvents();
t.deepEqual(emitted, [
{ name: 'stripe.invoice.paid', payload: { id: 'in_1' } },
]);
t.like(findManyInput, {
where: {
OR: [
{ processingStatus: { in: ['pending', 'failed'] } },
{ processingStatus: 'processing' },
],
},
});
t.deepEqual((updates[0] as { data: unknown }).data, {
processingStatus: 'processing',
processingAttempts: { increment: 1 },
});
t.like((updates[1] as { data: unknown }).data, {
processingStatus: 'processed',
lastError: null,
});
t.true(
(updates[1] as { data: { processedAt: Date } }).data.processedAt instanceof
Date
);
});
test('stripe webhook replay job keeps failed events retryable', async t => {
const updates: unknown[] = [];
const cron = new SubscriptionCronJobs(
{
paymentEvent: {
findMany: async () => [
{
id: 'payment_event_1',
eventType: 'invoice.paid',
metadata: { id: 'in_1' },
},
],
updateMany: async (input: unknown) => {
updates.push(input);
return { count: 1 };
},
update: async (input: unknown) => {
updates.push(input);
return {};
},
},
} as unknown as PrismaClient,
{
emitAsync: async () => {
throw new Error('handler still failing');
},
} as unknown as EventBus,
{} as unknown as JobQueue,
{} as never,
{} as never,
{} as never,
{} as never
);
await cron.replayStripeWebhookEvents();
t.deepEqual(
updates.map(update => (update as { data: unknown }).data),
[
{
processingStatus: 'processing',
processingAttempts: { increment: 1 },
},
{
processingStatus: 'failed',
lastError: 'handler still failing',
},
]
);
});
@@ -27,6 +27,7 @@ import { SubscriptionService } from '../../plugins/payment/service';
import {
SubscriptionPlan,
SubscriptionRecurring,
SubscriptionStatus,
} from '../../plugins/payment/types';
import { createTestingApp, TestingApp } from '../utils';
@@ -1084,3 +1085,40 @@ test('user subscriptions ignore active rows after their current period ended', a
});
t.is(activeAI, null);
});
test('user subscriptions preserve provider trialing status', async t => {
const { db, models, subResolver } = t.context;
const trialUser = await models.user.create({
email: `${Date.now()}-trial-status@affine.pro`,
});
await db.subscription.create({
data: {
targetId: trialUser.id,
plan: SubscriptionPlan.Pro,
provider: 'stripe',
status: SubscriptionStatus.Trialing,
recurring: SubscriptionRecurring.Yearly,
start: new Date('2026-01-01T00:00:00.000Z'),
end: new Date('2099-01-01T00:00:00.000Z'),
stripeSubscriptionId: 'sub_trialing_status',
},
});
await db.providerSubscription.create({
data: {
provider: 'stripe',
targetType: 'user',
targetId: trialUser.id,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
status: SubscriptionStatus.Trialing,
externalSubscriptionId: 'sub_trialing_status',
periodStart: new Date('2026-01-01T00:00:00.000Z'),
periodEnd: new Date('2099-01-01T00:00:00.000Z'),
},
});
const subscriptions = await subResolver.subscriptions(trialUser, trialUser);
t.is(subscriptions[0]?.status, SubscriptionStatus.Trialing);
});
@@ -11,6 +11,7 @@ import { ConfigFactory, ConfigModule } from '../../base/config';
import { CurrentUser } from '../../core/auth';
import { AuthService } from '../../core/auth/service';
import { SubscriptionCronJobs } from '../../plugins/payment/cron';
import { RevenueCatService } from '../../plugins/payment/revenuecat';
import { SubscriptionService } from '../../plugins/payment/service';
import { StripeFactory } from '../../plugins/payment/stripe';
import {
@@ -135,6 +136,7 @@ const test = ava as TestFn<{
app: TestingApp;
service: SubscriptionService;
event: Sinon.SinonStubbedInstance<EventBus>;
revenueCat: Sinon.SinonStubbedInstance<RevenueCatService>;
stripe: {
customers: Sinon.SinonStubbedInstance<Stripe.CustomersResource>;
prices: Sinon.SinonStubbedInstance<Stripe.PricesResource>;
@@ -157,6 +159,11 @@ function getLastCheckoutPrice(checkoutStub: Sinon.SinonStub) {
};
}
function getLastCheckoutParams(checkoutStub: Sinon.SinonStub) {
const call = checkoutStub.getCall(checkoutStub.callCount - 1);
return call.args[0] as Stripe.Checkout.SessionCreateParams;
}
test.before(async t => {
const app = await createTestingApp({
imports: [
@@ -179,6 +186,7 @@ test.before(async t => {
t.context.event = app.get(EventBus);
t.context.service = app.get(SubscriptionService);
t.context.revenueCat = Sinon.stub(app.get(RevenueCatService));
t.context.db = app.get(PrismaClient);
t.context.app = app;
@@ -209,6 +217,9 @@ test.beforeEach(async t => {
app.get(ConfigFactory).override({
payment: {
showLifetimePrice: true,
revenuecat: {
enabled: false,
},
},
});
@@ -240,6 +251,8 @@ test.beforeEach(async t => {
// @ts-expect-error stub
stripe.subscriptions.list.resolves({ data: [] });
// @ts-expect-error stub
stripe.checkout.sessions.create.resolves({ id: 'cs_1' });
});
test.after.always(async t => {
@@ -409,6 +422,90 @@ test('should allow checkout after local subscription period ended', async t => {
});
});
test('should reject checkout when stripe already has current subscription', async t => {
const { service, u1, stripe } = t.context;
stripe.subscriptions.list.resolves({
data: [
{
...sub,
id: 'sub_pending_webhook',
status: SubscriptionStatus.Active,
items: {
data: [
{
// @ts-expect-error stub
price: {
lookup_key: PRO_YEARLY,
},
},
],
},
},
],
});
await t.throwsAsync(
() =>
service.checkout(
{
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
successCallbackLink: '',
},
{ user: u1 }
),
{ message: 'You have already subscribed to the pro plan.' }
);
t.false(stripe.checkout.sessions.create.called);
});
test('should reject checkout when revenuecat already has active subscription', async t => {
const { app, revenueCat, service, u1, stripe } = t.context;
app.get(ConfigFactory).override({
payment: {
revenuecat: {
enabled: true,
},
},
});
revenueCat.getSubscriptions.resolves([
{
identifier: 'Pro',
isTrial: false,
isActive: true,
latestPurchaseDate: new Date(),
expirationDate: new Date(Date.now() + 100000),
customerId: 'rc_customer',
productId: 'app.affine.pro.Annual',
store: 'app_store',
willRenew: true,
duration: 'P1Y',
},
]);
await t.throwsAsync(
() =>
service.checkout(
{
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
successCallbackLink: '',
},
{ user: u1 }
),
{
message:
'This subscription is managed by App Store or Google Play. Please manage it in the corresponding store.',
}
);
t.false(stripe.checkout.sessions.create.called);
});
test('should get correct pro plan price for checking out', async t => {
const { app, service, u1, stripe } = t.context;
// monthly
@@ -523,29 +620,15 @@ test('should get correct ai plan price for checking out', async t => {
price: AI_YEARLY,
coupon: undefined,
});
t.is(
getLastCheckoutParams(stripe.checkout.sessions.create).subscription_data
?.trial_period_days,
7
);
}
// user with old subscription
// user with recorded trial usage
{
stripe.subscriptions.list.resolves({
data: [
{
id: 'sub_1',
status: 'canceled',
items: {
data: [
{
// @ts-expect-error stub
price: {
lookup_key: AI_YEARLY,
},
},
],
},
},
],
});
await service.checkout(
{
plan: SubscriptionPlan.AI,
@@ -559,9 +642,38 @@ test('should get correct ai plan price for checking out', async t => {
price: AI_YEARLY,
coupon: undefined,
});
t.is(
getLastCheckoutParams(stripe.checkout.sessions.create).subscription_data
?.trial_period_days,
undefined
);
}
});
test('should record AI trial usage when checkout grants trial', async t => {
const { db, service, u1 } = t.context;
await service.checkout(
{
plan: SubscriptionPlan.AI,
recurring: SubscriptionRecurring.Yearly,
successCallbackLink: '',
},
{ user: u1 }
);
const usage = await db.subscriptionTrialUsage.findUnique({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: u1.id,
plan: SubscriptionPlan.AI,
},
},
});
t.is(usage?.externalRef, 'cs_1');
});
test('should apply user coupon for checking out', async t => {
const { service, u1, stripe } = t.context;
@@ -610,6 +722,22 @@ test('should be able to create subscription', async t => {
})
);
t.is(subInDB?.stripeSubscriptionId, sub.id);
const providerFact = await db.providerSubscription.findUnique({
where: {
provider_externalSubscriptionId: {
provider: 'stripe',
externalSubscriptionId: sub.id,
},
},
});
t.like(providerFact, {
targetType: 'user',
targetId: u1.id,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Monthly,
status: SubscriptionStatus.Active,
});
});
test('should be able to update subscription', async t => {
@@ -640,6 +768,49 @@ test('should be able to update subscription', async t => {
t.is(subInDB?.canceledAt?.getTime(), canceledAt * 1000);
});
test('should replace old subscription row when stripe creates a new subscription for the same plan', async t => {
const { service, db, u1 } = t.context;
const old = await db.subscription.create({
data: {
targetId: u1.id,
stripeSubscriptionId: 'sub_old',
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
status: SubscriptionStatus.Canceled,
start: new Date('2026-03-26T08:23:57.000Z'),
end: new Date('2027-03-26T08:23:57.000Z'),
},
});
await service.saveStripeSubscription({
...sub,
id: 'sub_new',
status: SubscriptionStatus.Active,
items: {
...sub.items,
data: [
{
...sub.items.data[0],
// @ts-expect-error stub
price: {
lookup_key: PRO_YEARLY,
},
},
],
},
});
const subscriptions = await db.subscription.findMany({
where: { targetId: u1.id, plan: SubscriptionPlan.Pro },
});
t.is(subscriptions.length, 1);
t.is(subscriptions[0].id, old.id);
t.is(subscriptions[0].stripeSubscriptionId, 'sub_new');
t.is(subscriptions[0].status, SubscriptionStatus.Active);
});
test('should be able to delete subscription', async t => {
const { event, service, db, u1 } = t.context;
await service.saveStripeSubscription(sub);
@@ -662,6 +833,19 @@ test('should be able to delete subscription', async t => {
});
t.is(subInDB, null);
t.like(
await db.providerSubscription.findUnique({
where: {
provider_externalSubscriptionId: {
provider: 'stripe',
externalSubscriptionId: sub.id,
},
},
}),
{
status: SubscriptionStatus.Canceled,
}
);
});
test('should be able to cancel subscription', async t => {
@@ -1118,6 +1302,23 @@ test('should be able to subscribe to lifetime recurring', async t => {
t.is(subInDB?.recurring, SubscriptionRecurring.Lifetime);
t.is(subInDB?.status, SubscriptionStatus.Active);
t.is(subInDB?.stripeSubscriptionId, null);
const paymentFact = await db.paymentEvent.findUnique({
where: {
provider_externalEventId: {
provider: 'stripe',
externalEventId: `stripe_invoice:${lifetimeInvoice.id}`,
},
},
});
t.like(paymentFact, {
targetType: 'user',
targetId: u1.id,
plan: SubscriptionPlan.Pro,
amount: lifetimeInvoice.total,
currency: lifetimeInvoice.currency,
processingStatus: 'processed',
});
});
test('should be able to subscribe to lifetime recurring with old subscription', async t => {
@@ -133,3 +133,43 @@ test('checker reports legal legacy facts missing entitlements', async t => {
t.is(report.cloudSubscriptionEntitlementMissing, 1);
t.is(report.selfhostLicenseEntitlementMissing, 1);
});
test('checker reports provider facts missing entitlements', async t => {
const user = await t.context.models.user.create({
email: `${randomUUID()}@affine.pro`,
});
await t.context.db.providerSubscription.create({
data: {
provider: 'stripe',
targetType: 'user',
targetId: user.id,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
status: SubscriptionStatus.Active,
externalSubscriptionId: 'sub_provider_without_entitlement',
periodStart: new Date(),
periodEnd: new Date('2099-01-01T00:00:00.000Z'),
},
});
const report = await t.context.checker.checkEntitlementProjection();
t.is(report.providerActiveEntitlementMissing, 1);
});
test('checker reports entitlements missing active provider facts', async t => {
const user = await t.context.models.user.create({
email: `${randomUUID()}@affine.pro`,
});
await t.context.entitlement.upsertFromCloudSubscription({
targetId: user.id,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
status: SubscriptionStatus.Active,
stripeSubscriptionId: 'sub_entitlement_without_active_provider',
});
const report = await t.context.checker.checkEntitlementProjection();
t.is(report.entitlementProviderMissing, 1);
});
@@ -60,6 +60,10 @@ test('projects user entitlement to legacy user features and subscriptions', asyn
recurring: SubscriptionRecurring.Monthly,
status: 'active',
});
await t.context.projection.onEntitlementChanged({
targetType: 'user',
targetId: user.id,
});
t.true(await t.context.models.userFeature.has(user.id, 'pro_plan_v1'));
t.true(await t.context.models.userFeature.has(user.id, 'unlimited_copilot'));
@@ -95,6 +99,10 @@ test('projects workspace entitlement and readonly state to legacy workspace feat
status: 'active',
quantity: 8,
});
await t.context.projection.onEntitlementChanged({
targetType: 'workspace',
targetId: workspace.id,
});
const teamFeature = await t.context.models.workspaceFeature.get(
workspace.id,
@@ -296,6 +304,135 @@ test('backfill removes dangling legacy subscriptions and entitlements', async t
t.is(await t.context.db.entitlement.count(), 0);
});
test('shadow backfill preserves legacy rows and records provider facts', async t => {
const user = await t.context.models.user.create({
email: `${randomUUID()}@affine.pro`,
});
const paidAiUser = await t.context.models.user.create({
email: `${randomUUID()}@affine.pro`,
});
const owner = await t.context.models.user.create({
email: `${randomUUID()}@affine.pro`,
});
const workspace = await t.context.models.workspace.create(owner.id);
const danglingTargetId = randomUUID();
await t.context.db.subscription.createMany({
data: [
{
targetId: user.id,
stripeSubscriptionId: 'sub_ai_trial',
plan: SubscriptionPlan.AI,
recurring: SubscriptionRecurring.Yearly,
status: SubscriptionStatus.Active,
start: new Date('2026-01-01T00:00:00.000Z'),
trialStart: new Date('2026-01-01T00:00:00.000Z'),
trialEnd: new Date('2026-01-08T00:00:00.000Z'),
},
{
targetId: paidAiUser.id,
stripeSubscriptionId: 'sub_ai_paid',
plan: SubscriptionPlan.AI,
recurring: SubscriptionRecurring.Yearly,
status: SubscriptionStatus.Active,
start: new Date('2026-01-01T00:00:00.000Z'),
},
{
targetId: danglingTargetId,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
status: SubscriptionStatus.Active,
start: new Date('2026-01-01T00:00:00.000Z'),
},
],
});
await t.context.db.invoice.create({
data: {
stripeInvoiceId: 'in_backfill_lifetime',
targetId: user.id,
currency: 'usd',
amount: 9999,
status: 'paid',
reason: 'subscription_create',
},
});
await t.context.db.installedLicense.create({
data: {
key: 'shadow-license-key',
workspaceId: workspace.id,
quantity: 3,
recurring: SubscriptionRecurring.Yearly,
validateKey: 'shadow-validate-key',
validatedAt: new Date(),
},
});
await t.context.projection.shadowBackfillEntitlementsAndQuotaStates();
t.truthy(
await t.context.db.subscription.findFirst({
where: { targetId: danglingTargetId },
})
);
t.like(
await t.context.db.providerSubscription.findUnique({
where: {
provider_externalSubscriptionId: {
provider: 'stripe',
externalSubscriptionId: 'sub_ai_trial',
},
},
}),
{
targetType: 'user',
targetId: user.id,
plan: SubscriptionPlan.AI,
status: SubscriptionStatus.Active,
}
);
t.truthy(
await t.context.db.subscriptionTrialUsage.findUnique({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: user.id,
plan: SubscriptionPlan.AI,
},
},
})
);
t.falsy(
await t.context.db.subscriptionTrialUsage.findUnique({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: paidAiUser.id,
plan: SubscriptionPlan.AI,
},
},
})
);
t.like(
await t.context.db.paymentEvent.findUnique({
where: {
provider_externalEventId: {
provider: 'stripe',
externalEventId: 'stripe_invoice:in_backfill_lifetime',
},
},
}),
{
targetId: user.id,
externalInvoiceId: 'in_backfill_lifetime',
amount: 9999,
processingStatus: 'processed',
}
);
t.false(
await t.context.models.workspaceFeature.has(workspace.id, 'team_plan_v1')
);
});
test('key based selfhost entitlements without raw payload need reupload', async t => {
const owner = await t.context.models.user.create({
email: `${randomUUID()}@affine.pro`,
@@ -16,6 +16,8 @@ export class EntitlementProjectionChecker {
selfhostLicenseProjectionMissing,
cloudSubscriptionEntitlementMissing,
selfhostLicenseEntitlementMissing,
providerActiveEntitlementMissing,
entitlementProviderMissing,
dirtyLegacyUserFeatures,
dirtyLegacyWorkspaceFeatures,
missingUserFeatureProjection,
@@ -41,6 +43,8 @@ export class EntitlementProjectionChecker {
this.selfhostLicenseProjectionMissing(),
this.cloudSubscriptionEntitlementMissing(),
this.selfhostLicenseEntitlementMissing(),
this.providerActiveEntitlementMissing(),
this.entitlementProviderMissing(),
this.dirtyLegacyUserFeatures(),
this.dirtyLegacyWorkspaceFeatures(),
this.missingUserFeatureProjection(),
@@ -56,6 +60,8 @@ export class EntitlementProjectionChecker {
selfhostLicenseProjectionMissing,
cloudSubscriptionEntitlementMissing,
selfhostLicenseEntitlementMissing,
providerActiveEntitlementMissing,
entitlementProviderMissing,
dirtyLegacyUserFeatures,
dirtyLegacyWorkspaceFeatures,
missingUserFeatureProjection,
@@ -147,6 +153,39 @@ export class EntitlementProjectionChecker {
return licenses.filter(license => !validKeys.has(license.key)).length;
}
private async providerActiveEntitlementMissing() {
const activeProviderKeys = await this.activeProviderSubscriptionKeys();
const valid = new Set(
(
await this.validEntitlements({
source: 'cloud_subscription',
})
).map(
entitlement =>
`${entitlement.targetId}:${this.subscriptionPlan(entitlement.plan)}`
)
);
return activeProviderKeys.filter(key => !valid.has(key)).length;
}
private async entitlementProviderMissing() {
const activeProviderKeys = new Set(
await this.activeProviderSubscriptionKeys()
);
const entitlements = await this.validEntitlements({
source: 'cloud_subscription',
});
return entitlements.filter(
entitlement =>
entitlement.targetId &&
!activeProviderKeys.has(
`${entitlement.targetId}:${this.subscriptionPlan(entitlement.plan)}`
)
).length;
}
private async dirtyLegacyUserFeatures() {
const rows = await this.db.userFeature.findMany({
where: {
@@ -287,4 +326,22 @@ export class EntitlementProjectionChecker {
private subscriptionPlan(plan: string) {
return plan === 'lifetime_pro' ? 'pro' : plan;
}
private async activeProviderSubscriptionKeys() {
const now = new Date();
const subscriptions = await this.db.providerSubscription.findMany({
where: {
status: { in: ['active', 'trialing', 'past_due'] },
OR: [{ periodEnd: null }, { periodEnd: { gt: now } }],
},
select: {
targetId: true,
plan: true,
},
});
return subscriptions.map(
subscription => `${subscription.targetId}:${subscription.plan}`
);
}
}
@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Entitlement, PrismaClient } from '@prisma/client';
import { Entitlement, IapStore, PrismaClient, Provider } from '@prisma/client';
import { OnEvent } from '../../base';
import { Models } from '../../models';
@@ -52,44 +52,65 @@ export class LegacyEntitlementProjectionService {
await this.#projectReadonlyFeature(workspaceId);
}
async scanInstalledLicenses() {
async scanInstalledLicenses(options: { emit?: boolean } = {}) {
const licenses = await this.db.installedLicense.findMany();
const emit = options.emit ?? true;
await Promise.all(
licenses.map(async license =>
license.license
? await this.entitlement.upsertFromSelfhostLicense({
workspaceId: license.workspaceId,
licenseKey: license.key,
recurring: license.recurring,
quantity: license.quantity,
expiresAt: license.expiredAt,
validatedAt: license.validatedAt,
license: Buffer.from(license.license),
})
: license.validateKey
? await this.entitlement.upsertFromValidatedSelfhostLicense({
? await this.entitlement.upsertFromSelfhostLicense(
{
workspaceId: license.workspaceId,
licenseKey: license.key,
recurring: license.recurring,
quantity: license.quantity,
expiresAt: license.expiredAt,
validatedAt: license.validatedAt,
validateKey: license.validateKey,
variant: license.variant,
})
: await this.entitlement.markSelfhostLicenseNeedsReupload({
workspaceId: license.workspaceId,
licenseKey: license.key,
reason: 'Installed license has no raw payload to verify.',
})
license: Buffer.from(license.license),
},
{ emit }
)
: license.validateKey
? await this.entitlement.upsertFromValidatedSelfhostLicense(
{
workspaceId: license.workspaceId,
licenseKey: license.key,
recurring: license.recurring,
quantity: license.quantity,
expiresAt: license.expiredAt,
validatedAt: license.validatedAt,
validateKey: license.validateKey,
variant: license.variant,
},
{ emit }
)
: await this.entitlement.markSelfhostLicenseNeedsReupload(
{
workspaceId: license.workspaceId,
licenseKey: license.key,
reason: 'Installed license has no raw payload to verify.',
},
{ emit }
)
)
);
}
async backfillEntitlementsAndQuotaStates() {
await this.#cleanupDanglingLegacyEntitlements();
await this.#backfillEntitlementsAndQuotaStates({ cleanupLegacy: true });
}
async shadowBackfillEntitlementsAndQuotaStates() {
await this.#backfillEntitlementsAndQuotaStates({ cleanupLegacy: false });
}
async #backfillEntitlementsAndQuotaStates({
cleanupLegacy,
}: {
cleanupLegacy: boolean;
}) {
const [subscriptions, users, workspaces] = await Promise.all([
this.db.subscription.findMany(),
this.db.user.findMany({ select: { id: true } }),
@@ -101,17 +122,31 @@ export class LegacyEntitlementProjectionService {
continue;
}
if (subscription.plan === SubscriptionPlan.SelfHostedTeam) {
await this.entitlement.markSelfhostLicenseNeedsReupload({
licenseKey: subscription.targetId,
reason:
'Historical self-hosted team subscription needs license activation or revalidation.',
});
await this.entitlement.markSelfhostLicenseNeedsReupload(
{
licenseKey: subscription.targetId,
reason:
'Historical self-hosted team subscription needs license activation or revalidation.',
},
{ emit: cleanupLegacy }
);
continue;
}
await this.entitlement.upsertFromCloudSubscription(subscription);
await this.entitlement.upsertFromCloudSubscription(subscription, {
emit: cleanupLegacy,
legacySync: true,
});
await this.#backfillProviderSubscription(subscription);
if (
subscription.plan === SubscriptionPlan.AI &&
(subscription.trialStart || subscription.trialEnd)
) {
await this.#backfillTrialUsage(subscription);
}
}
await this.scanInstalledLicenses();
await this.#backfillPaymentEvents();
await this.scanInstalledLicenses({ emit: cleanupLegacy });
await Promise.all([
...users.map(user =>
@@ -153,6 +188,206 @@ export class LegacyEntitlementProjectionService {
]);
}
async #backfillProviderSubscription(subscription: {
targetId: string;
plan: string;
recurring: string;
status: string;
provider: Provider | string;
iapStore?: IapStore | null;
rcExternalRef?: string | null;
rcProductId?: string | null;
stripeSubscriptionId?: string | null;
quantity: number;
start: Date;
end?: Date | null;
trialStart?: Date | null;
trialEnd?: Date | null;
canceledAt?: Date | null;
}) {
const targetType =
subscription.plan === SubscriptionPlan.Team ? 'workspace' : 'user';
if (
subscription.provider === 'stripe' &&
subscription.stripeSubscriptionId
) {
await this.db.providerSubscription.upsert({
where: {
provider_externalSubscriptionId: {
provider: 'stripe',
externalSubscriptionId: subscription.stripeSubscriptionId,
},
},
update: {
targetType,
targetId: subscription.targetId,
plan: subscription.plan,
recurring: subscription.recurring,
status: subscription.status,
quantity: subscription.quantity,
periodStart: subscription.start,
periodEnd: subscription.end,
trialStart: subscription.trialStart,
trialEnd: subscription.trialEnd,
canceledAt: subscription.canceledAt,
metadata: { legacySync: true },
},
create: {
provider: 'stripe',
targetType,
targetId: subscription.targetId,
plan: subscription.plan,
recurring: subscription.recurring,
status: subscription.status,
externalSubscriptionId: subscription.stripeSubscriptionId,
quantity: subscription.quantity,
periodStart: subscription.start,
periodEnd: subscription.end,
trialStart: subscription.trialStart,
trialEnd: subscription.trialEnd,
canceledAt: subscription.canceledAt,
metadata: { legacySync: true },
},
});
return;
}
if (
subscription.provider === 'revenuecat' &&
subscription.iapStore &&
subscription.rcExternalRef &&
subscription.rcProductId
) {
await this.db.providerSubscription.upsert({
where: {
provider_iapStore_externalRef_externalProductId_externalCustomerId: {
provider: 'revenuecat',
iapStore: subscription.iapStore,
externalRef: subscription.rcExternalRef,
externalProductId: subscription.rcProductId,
externalCustomerId: subscription.targetId,
},
},
update: {
targetType,
targetId: subscription.targetId,
plan: subscription.plan,
recurring: subscription.recurring,
status: subscription.status,
quantity: subscription.quantity,
periodStart: subscription.start,
periodEnd: subscription.end,
trialStart: subscription.trialStart,
trialEnd: subscription.trialEnd,
canceledAt: subscription.canceledAt,
metadata: { legacySync: true },
},
create: {
provider: 'revenuecat',
targetType,
targetId: subscription.targetId,
plan: subscription.plan,
recurring: subscription.recurring,
status: subscription.status,
externalCustomerId: subscription.targetId,
iapStore: subscription.iapStore,
externalRef: subscription.rcExternalRef,
externalProductId: subscription.rcProductId,
quantity: subscription.quantity,
periodStart: subscription.start,
periodEnd: subscription.end,
trialStart: subscription.trialStart,
trialEnd: subscription.trialEnd,
canceledAt: subscription.canceledAt,
metadata: { legacySync: true },
},
});
}
}
async #backfillTrialUsage(subscription: {
targetId: string;
plan: string;
provider: Provider | string;
stripeSubscriptionId?: string | null;
rcExternalRef?: string | null;
trialStart?: Date | null;
trialEnd?: Date | null;
start: Date;
}) {
await this.db.subscriptionTrialUsage.upsert({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: subscription.targetId,
plan: subscription.plan,
},
},
update: {},
create: {
targetType: 'user',
targetId: subscription.targetId,
plan: subscription.plan,
provider:
subscription.provider === 'revenuecat' ? 'revenuecat' : 'stripe',
externalRef:
subscription.stripeSubscriptionId ??
subscription.rcExternalRef ??
null,
firstUsedAt:
subscription.trialStart ??
subscription.trialEnd ??
subscription.start,
metadata: { legacySync: true },
},
});
}
async #backfillPaymentEvents() {
const invoices = await this.db.invoice.findMany();
for (const invoice of invoices) {
await this.db.paymentEvent.upsert({
where: {
provider_externalEventId: {
provider: 'stripe',
externalEventId: `stripe_invoice:${invoice.stripeInvoiceId}`,
},
},
update: {
targetId: invoice.targetId,
externalInvoiceId: invoice.stripeInvoiceId,
amount: invoice.amount,
currency: invoice.currency,
processingStatus: 'processed',
processedAt: invoice.updatedAt,
metadata: {
legacySync: true,
status: invoice.status,
reason: invoice.reason,
},
},
create: {
provider: 'stripe',
eventType: 'invoice.backfill',
externalEventId: `stripe_invoice:${invoice.stripeInvoiceId}`,
targetId: invoice.targetId,
externalInvoiceId: invoice.stripeInvoiceId,
amount: invoice.amount,
currency: invoice.currency,
occurredAt: invoice.createdAt,
processingStatus: 'processed',
processedAt: invoice.updatedAt,
metadata: {
legacySync: true,
status: invoice.status,
reason: invoice.reason,
},
},
});
}
}
async #cleanupDanglingLegacyEntitlements() {
await this.db.$executeRaw`
DELETE FROM entitlements entitlement
@@ -220,6 +455,7 @@ export class LegacyEntitlementProjectionService {
}
async #projectUserFeatures(userId: string) {
// TODO(stable-upgrade): contract legacy feature projection after old clients/resolvers are gone.
const entitlements = await this.#activeEntitlements('user', userId);
const quotaEntitlement = entitlements.find(entitlement =>
['lifetime_pro', 'pro'].includes(entitlement.plan)
@@ -262,6 +498,7 @@ export class LegacyEntitlementProjectionService {
}
async #projectWorkspaceFeatures(workspaceId: string) {
// TODO(stable-upgrade): contract legacy feature projection after old clients/resolvers are gone.
const [entitlement, resolved] = await Promise.all([
this.entitlement.getBestEntitlement('workspace', workspaceId),
this.entitlement.resolveWorkspaceEntitlement(workspaceId),
@@ -290,6 +527,7 @@ export class LegacyEntitlementProjectionService {
targetType: 'user' | 'workspace',
targetId: string
) {
// TODO(stable-upgrade): remove reverse projection after stable no longer depends on old subscriptions.
if (env.selfhosted) return;
const entitlements = await this.db.entitlement.findMany({
where: {
@@ -298,6 +298,7 @@ export class EntitlementService {
targetType: TargetType,
targetId: string
) {
// TODO(stable-upgrade): remove legacy subscription import after stable no longer writes old subscriptions.
if (env.selfhosted || targetType === 'instance') {
return;
}
@@ -324,7 +325,11 @@ export class EntitlementService {
return task;
}
async upsertFromSelfhostLicense(input: SelfhostLicenseEntitlementInput) {
async upsertFromSelfhostLicense(
input: SelfhostLicenseEntitlementInput,
options: { emit?: boolean } = {}
) {
const emit = options.emit ?? true;
const resolved = input.license
? resolveEntitlementV1({
deploymentType: 'selfhosted',
@@ -372,12 +377,16 @@ export class EntitlementService {
where: { id: entitlement.id },
data,
});
await this.emitEntitlementChanged(updated);
if (emit) {
await this.emitEntitlementChanged(updated);
}
return updated;
}
const created = await this.db.entitlement.create({ data });
await this.emitEntitlementChanged(created);
if (emit) {
await this.emitEntitlementChanged(created);
}
return created;
}
@@ -385,8 +394,10 @@ export class EntitlementService {
input: Omit<SelfhostLicenseEntitlementInput, 'license'> & {
licenseKey: string;
quantity: number;
}
},
options: { emit?: boolean } = {}
) {
const emit = options.emit ?? true;
const entitlement = await this.findBySubject(
'selfhost_license',
input.licenseKey
@@ -415,20 +426,28 @@ export class EntitlementService {
where: { id: entitlement.id },
data,
});
await this.emitEntitlementChanged(updated);
if (emit) {
await this.emitEntitlementChanged(updated);
}
return updated;
}
const created = await this.db.entitlement.create({ data });
await this.emitEntitlementChanged(created);
if (emit) {
await this.emitEntitlementChanged(created);
}
return created;
}
async markSelfhostLicenseNeedsReupload(input: {
workspaceId?: string;
licenseKey: string;
reason: string;
}) {
async markSelfhostLicenseNeedsReupload(
input: {
workspaceId?: string;
licenseKey: string;
reason: string;
},
options: { emit?: boolean } = {}
) {
const emit = options.emit ?? true;
const entitlement = await this.findBySubject(
'selfhost_license',
input.licenseKey
@@ -454,12 +473,16 @@ export class EntitlementService {
where: { id: entitlement.id },
data,
});
await this.emitEntitlementChanged(updated);
if (emit) {
await this.emitEntitlementChanged(updated);
}
return updated;
}
const created = await this.db.entitlement.create({ data });
await this.emitEntitlementChanged(created);
if (emit) {
await this.emitEntitlementChanged(created);
}
return created;
}
@@ -524,3 +524,73 @@ test('should filter docs by Doc.Publish', async t => {
t.is(docs3.length, 0);
});
test('legacy duplicate doc owner grants do not block projection', async t => {
const owner = await module.create(Mockers.User);
const secondOwner = await module.create(Mockers.User);
const workspace = await module.create(Mockers.Workspace, {
owner,
});
const docId = randomUUID();
await db.$executeRaw`
INSERT INTO workspace_pages (
workspace_id,
page_id,
public,
"defaultRole"
)
VALUES (${workspace.id}, ${docId}, false, ${DocRole.Manager})
`;
await resetProjection(workspace.id);
await db.$transaction(async tx => {
await tx.$executeRaw`
SELECT set_config('affine.permission_projection.enabled', 'off', true)
`;
await tx.$executeRaw`
INSERT INTO workspace_page_user_permissions (
workspace_id,
page_id,
user_id,
type,
created_at
)
VALUES (
${workspace.id},
${docId},
${owner.id},
${DocRole.Owner},
${new Date('2026-01-02T00:00:00Z')}
)
`;
await tx.$executeRaw`
INSERT INTO workspace_page_user_permissions (
workspace_id,
page_id,
user_id,
type,
created_at
)
VALUES (
${workspace.id},
${docId},
${secondOwner.id},
${DocRole.Owner},
${new Date('2026-01-01T00:00:00Z')}
)
`;
});
await models.permissionProjection.backfillLegacyProjection();
const projectedOwners = await db.$queryRaw<{ principalId: string }[]>`
SELECT principal_id AS "principalId"
FROM doc_grants
WHERE workspace_id = ${workspace.id}
AND doc_id = ${docId}
AND role = 'owner'
`;
t.deepEqual(projectedOwners, [{ principalId: secondOwner.id }]);
});
@@ -29,7 +29,10 @@ export class QuotaStateService {
private readonly event: EventBus
) {}
async reconcileUserQuotaState(userId: string) {
async reconcileUserQuotaState(
userId: string,
options: { emit?: boolean } = {}
) {
const [previous, entitlement, entitlements, resolved, usedStorageQuota] =
await Promise.all([
this.db.effectiveUserQuotaState.findUnique({ where: { userId } }),
@@ -72,13 +75,16 @@ export class QuotaStateService {
staleAfter: this.staleAfter(now),
},
});
if (this.userQuotaStateChanged(previous, state)) {
if ((options.emit ?? true) && this.userQuotaStateChanged(previous, state)) {
await this.event.emitAsync('user.quota_state.changed', { userId });
}
return state;
}
async reconcileWorkspaceQuotaState(workspaceId: string) {
async reconcileWorkspaceQuotaState(
workspaceId: string,
options: { emit?: boolean } = {}
) {
const owner = await this.getWorkspaceOwner(workspaceId);
const [
previous,
@@ -98,7 +104,7 @@ export class QuotaStateService {
const usesOwnerQuota = !this.hasStandaloneWorkspaceQuota(resolved.plan);
const [ownerState, ownerEntitlement] = usesOwnerQuota
? await Promise.all([
this.reconcileUserQuotaState(owner.id),
this.reconcileUserQuotaState(owner.id, options),
this.entitlement.resolveUserEntitlement(owner.id),
])
: [null, null];
@@ -156,7 +162,10 @@ export class QuotaStateService {
staleAfter: this.staleAfter(now),
},
});
if (this.workspaceQuotaStateChanged(previous, state)) {
if (
(options.emit ?? true) &&
this.workspaceQuotaStateChanged(previous, state)
) {
await this.event.emitAsync('workspace.quota_state.changed', {
workspaceId,
});
@@ -9,7 +9,7 @@ export class BackfillEntitlementProjection1765600000000 {
const projection = ref.get(LegacyEntitlementProjectionService, {
strict: false,
});
await projection.backfillEntitlementsAndQuotaStates();
await projection.shadowBackfillEntitlementsAndQuotaStates();
const quota = ref.get(QuotaStateService, { strict: false });
const [users, workspaces] = await Promise.all([
@@ -18,9 +18,12 @@ export class BackfillEntitlementProjection1765600000000 {
]);
const tasks = [
...users.map(user => () => quota.reconcileUserQuotaState(user.id)),
...users.map(
user => () => quota.reconcileUserQuotaState(user.id, { emit: false })
),
...workspaces.map(
workspace => () => quota.reconcileWorkspaceQuotaState(workspace.id)
workspace => () =>
quota.reconcileWorkspaceQuotaState(workspace.id, { emit: false })
),
];
const batchSize = 16;
@@ -268,6 +268,20 @@ export class PermissionProjectionModel extends BaseModel {
`;
await tx.$executeRaw`
WITH legacy_doc_grants AS (
SELECT
workspace_id,
page_id,
user_id,
type,
created_at,
row_number() OVER (
PARTITION BY workspace_id, page_id, affine_permission_legacy_doc_role(type)
ORDER BY created_at ASC, user_id ASC
) AS role_rank
FROM workspace_page_user_permissions
WHERE affine_permission_legacy_doc_role(type) IS NOT NULL
)
INSERT INTO doc_grants (
workspace_id,
doc_id,
@@ -291,8 +305,12 @@ export class PermissionProjectionModel extends BaseModel {
user_id,
created_at,
now()
FROM workspace_page_user_permissions
FROM legacy_doc_grants
WHERE affine_permission_legacy_doc_role(type) IS NOT NULL
AND (
affine_permission_legacy_doc_role(type) <> 'owner'
OR role_rank = 1
)
ON CONFLICT (workspace_id, doc_id, principal_type, principal_id)
DO UPDATE SET
role = EXCLUDED.role,
@@ -1,6 +1,8 @@
import type { RawBodyRequest } from '@nestjs/common';
import { Controller, Logger, Post, Req } from '@nestjs/common';
import { Prisma, PrismaClient, Provider } from '@prisma/client';
import type { Request } from 'express';
import Stripe from 'stripe';
import { Config, EventBus, InternalServerError } from '../../base';
import { Public } from '../../core/auth';
@@ -12,6 +14,7 @@ export class StripeWebhookController {
constructor(
private readonly config: Config,
private readonly db: PrismaClient,
private readonly stripeProvider: StripeFactory,
private readonly event: EventBus
) {}
@@ -33,14 +36,98 @@ export class StripeWebhookController {
`[${event.id}] Stripe Webhook {${event.type}} received.`
);
// Stripe requires responseing webhook immediately and handle event asynchronously.
const existingPaymentEvent = await this.db.paymentEvent.findUnique({
where: {
provider_externalEventId: {
provider: Provider.stripe,
externalEventId: event.id,
},
},
});
if (existingPaymentEvent?.processingStatus === 'processed') {
return;
}
const paymentEvent = existingPaymentEvent
? await this.db.paymentEvent.update({
where: { id: existingPaymentEvent.id },
data: {
eventType: event.type,
lastError: null,
metadata: event as unknown as Prisma.InputJsonValue,
},
})
: await this.db.paymentEvent.create({
data: {
provider: Provider.stripe,
eventType: event.type,
externalEventId: event.id,
occurredAt: new Date(event.created * 1000),
metadata: event as unknown as Prisma.InputJsonValue,
},
});
if (paymentEvent.processingStatus === 'processing') {
return;
}
// Stripe requires responding to webhooks immediately and handling events asynchronously.
setImmediate(() => {
this.event.emitAsync(`stripe.${event.type}` as any, event).catch(e => {
this.logger.error('Failed to handle Stripe Webhook event.', e);
this.processEvent(paymentEvent.id, event).catch(e => {
this.logger.error('Failed to persist Stripe Webhook failure.', e);
});
});
} catch (err: any) {
throw new InternalServerError(err.message);
} catch (err: unknown) {
throw new InternalServerError(
err instanceof Error ? err.message : String(err)
);
}
}
async processEvent(id: string, event: Stripe.Event) {
const stuckBefore = new Date(Date.now() - 60 * 60 * 1000);
const locked = await this.db.paymentEvent.updateMany({
where: {
id,
OR: [
{ processingStatus: { in: ['pending', 'failed'] } },
{
processingStatus: 'processing',
updatedAt: { lt: stuckBefore },
},
],
},
data: {
processingStatus: 'processing',
processingAttempts: { increment: 1 },
},
});
if (locked.count === 0) {
return;
}
try {
await this.event.emitAsync(
`stripe.${event.type}` as keyof Events,
event as never
);
await this.db.paymentEvent.update({
where: { id },
data: {
processingStatus: 'processed',
processedAt: new Date(),
lastError: null,
},
});
} catch (e) {
await this.db.paymentEvent.update({
where: { id },
data: {
processingStatus: 'failed',
lastError: e instanceof Error ? e.message : String(e),
},
});
this.logger.error('Failed to handle Stripe Webhook event.', e);
}
}
}
@@ -21,6 +21,7 @@ declare global {
'nightly.reconcileRevenueCatSubscriptions': {};
'nightly.reconcileStripeSubscriptions': {};
'nightly.reconcileStripeRefunds': {};
'nightly.replayStripeWebhookEvents': {};
'nightly.revenuecat.syncUser': { userId: string };
}
}
@@ -78,6 +79,12 @@ export class SubscriptionCronJobs {
{ jobId: 'nightly-payment-reconcile-stripe-refunds' }
);
await this.queue.add(
'nightly.replayStripeWebhookEvents',
{},
{ jobId: 'nightly-payment-replay-stripe-webhook-events' }
);
// FIXME(@forehalo): the strategy is totally wrong, for monthly plan. redesign required
// await this.queue.add(
// 'nightly.notifyAboutToExpireWorkspaceSubscriptions',
@@ -219,6 +226,64 @@ export class SubscriptionCronJobs {
await this.rcHandler.syncAppUser(payload.userId);
}
@OnJob('nightly.replayStripeWebhookEvents')
async replayStripeWebhookEvents() {
const stuckBefore = new Date(Date.now() - OneHour);
const events = await this.db.paymentEvent.findMany({
where: {
provider: Provider.stripe,
OR: [
{ processingStatus: { in: ['pending', 'failed'] } },
{ processingStatus: 'processing', updatedAt: { lt: stuckBefore } },
],
},
orderBy: { createdAt: 'asc' },
take: 100,
});
for (const event of events) {
const locked = await this.db.paymentEvent.updateMany({
where: {
id: event.id,
OR: [
{ processingStatus: { in: ['pending', 'failed'] } },
{ processingStatus: 'processing', updatedAt: { lt: stuckBefore } },
],
},
data: {
processingStatus: 'processing',
processingAttempts: { increment: 1 },
},
});
if (locked.count === 0) {
continue;
}
try {
await this.event.emitAsync(
`stripe.${event.eventType}` as keyof Events,
event.metadata as never
);
await this.db.paymentEvent.update({
where: { id: event.id },
data: {
processingStatus: 'processed',
processedAt: new Date(),
lastError: null,
},
});
} catch (e) {
await this.db.paymentEvent.update({
where: { id: event.id },
data: {
processingStatus: 'failed',
lastError: e instanceof Error ? e.message : String(e),
},
});
}
}
}
@OnJob('nightly.reconcileStripeSubscriptions')
async reconcileStripeSubscriptions() {
const stripe = this.stripeFactory.stripe;
@@ -15,6 +15,7 @@ import {
LookupKey,
SubscriptionPlan,
SubscriptionRecurring,
SubscriptionStatus,
} from '../types';
import {
activeSubscriptionWhere,
@@ -129,8 +130,9 @@ export class SelfhostTeamSubscriptionManager extends SubscriptionManager {
if (!existingSubscription) {
const key = randomUUID();
const [subscription] = await this.db.$transaction([
const [saved] = await this.db.$transaction([
this.db.subscription.create({
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
data: {
provider: Provider.stripe,
targetId: key,
@@ -148,9 +150,16 @@ export class SelfhostTeamSubscriptionManager extends SubscriptionManager {
props: { license: key },
});
return subscription;
await this.upsertStripeProviderSubscription(
key,
subscription,
subscriptionData
);
return saved;
} else {
return this.db.subscription.update({
const saved = await this.db.subscription.update({
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
where: {
stripeSubscriptionId: stripeSubscription.id,
},
@@ -162,12 +171,30 @@ export class SelfhostTeamSubscriptionManager extends SubscriptionManager {
'end',
]),
});
await this.upsertStripeProviderSubscription(
saved.targetId,
subscription,
subscriptionData
);
return saved;
}
}
async deleteStripeSubscription({
stripeSubscription,
}: KnownStripeSubscription) {
await this.db.providerSubscription.updateMany({
where: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
data: {
status: SubscriptionStatus.Canceled,
canceledAt: new Date(),
periodEnd: new Date(),
},
});
const subscription = await this.db.subscription.findFirst({
where: { stripeSubscriptionId: stripeSubscription.id },
});
@@ -248,4 +275,74 @@ export class SelfhostTeamSubscriptionManager extends SubscriptionManager {
return invoiceData;
}
private async upsertStripeProviderSubscription(
targetId: string,
known: KnownStripeSubscription,
subscriptionData: Subscription
) {
const { lookupKey, stripeSubscription } = known;
const price = stripeSubscription.items.data[0]?.price;
await this.db.providerSubscription.upsert({
where: {
provider_externalSubscriptionId: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
},
update: {
targetType: 'instance',
targetId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
create: {
provider: Provider.stripe,
targetType: 'instance',
targetId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalSubscriptionId: stripeSubscription.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
});
}
}
@@ -1,5 +1,10 @@
import { Injectable } from '@nestjs/common';
import { PrismaClient, Provider, UserStripeCustomer } from '@prisma/client';
import { Injectable, Logger } from '@nestjs/common';
import {
Prisma,
PrismaClient,
Provider,
UserStripeCustomer,
} from '@prisma/client';
import { omit, pick } from 'lodash-es';
import Stripe from 'stripe';
import { z } from 'zod';
@@ -17,6 +22,7 @@ import {
URLHelper,
} from '../../../base';
import { EntitlementService } from '../../../core/entitlement';
import { resolveProductMapping, RevenueCatService } from '../revenuecat';
import { StripeFactory } from '../stripe';
import {
KnownStripeInvoice,
@@ -33,13 +39,9 @@ import {
CheckoutParams,
Subscription,
SubscriptionManager,
visibleSubscriptionWhere,
} from './common';
interface PriceStrategyStatus {
proSubscribed: boolean;
aiSubscribed: boolean;
}
export const UserSubscriptionIdentity = z.object({
plan: z.enum([SubscriptionPlan.Pro, SubscriptionPlan.AI]),
userId: z.string(),
@@ -54,6 +56,8 @@ export const UserSubscriptionCheckoutArgs = z.object({
@Injectable()
export class UserSubscriptionManager extends SubscriptionManager {
private readonly logger = new Logger(UserSubscriptionManager.name);
constructor(
stripeProvider: StripeFactory,
db: PrismaClient,
@@ -61,7 +65,8 @@ export class UserSubscriptionManager extends SubscriptionManager {
private readonly event: EventBus,
private readonly url: URLHelper,
private readonly mutex: Mutex,
private readonly entitlement: EntitlementService
private readonly entitlement: EntitlementService,
private readonly revenueCat: RevenueCatService
) {
super(stripeProvider, db);
}
@@ -94,27 +99,29 @@ export class UserSubscriptionManager extends SubscriptionManager {
throw new InvalidCheckoutParameters();
}
const active = await this.getActiveSubscription({
const active = await this.getVisibleSubscription({
plan: lookupKey.plan,
userId: user.id,
});
await this.assertNoActiveLocalEntitlement(user.id, lookupKey);
if (active?.provider === 'revenuecat') {
throw new ManagedByAppStoreOrPlay();
}
if (
active &&
// do not allow to re-subscribe unless
!(
active.recurring !== SubscriptionRecurring.Lifetime &&
lookupKey.recurring === SubscriptionRecurring.Lifetime
)
!this.canCheckoutWithExistingSubscription(active.recurring, lookupKey)
) {
throw new SubscriptionAlreadyExists({ plan: lookupKey.plan });
}
const customer = await this.getOrCreateCustomer(user.id);
const strategy = await this.strategyStatus(customer);
const stripeSubscriptions = await this.stripe.subscriptions.list({
customer: customer.stripeCustomerId,
status: 'all',
});
this.assertNoActiveStripeSubscription(stripeSubscriptions.data, lookupKey);
await this.assertNoActiveRevenueCatSubscription(user.id, lookupKey);
const price = await this.getPrice(lookupKey);
if (!price || !(await this.isPriceAvailable(price))) {
@@ -138,8 +145,11 @@ export class UserSubscriptionManager extends SubscriptionManager {
return { allow_promotion_codes: true };
})();
const trials = (() => {
if (lookupKey.plan === SubscriptionPlan.AI && !strategy.aiSubscribed) {
const subscriptionData = await (async () => {
if (
lookupKey.plan === SubscriptionPlan.AI &&
!(await this.hasUsedTrial(user.id, lookupKey.plan))
) {
return {
trial_period_days: 7,
} as Stripe.Checkout.SessionCreateParams.SubscriptionData;
@@ -158,12 +168,10 @@ export class UserSubscriptionManager extends SubscriptionManager {
}
: {
mode: 'subscription' as const,
subscription_data: {
...trials,
},
subscription_data: subscriptionData,
};
return this.stripe.checkout.sessions.create({
const session = await this.stripe.checkout.sessions.create({
customer: customer.stripeCustomerId,
line_items: [
{
@@ -175,6 +183,17 @@ export class UserSubscriptionManager extends SubscriptionManager {
...discounts,
success_url: this.url.safeLink(params.successCallbackLink || '/'),
});
if (subscriptionData?.trial_period_days) {
await this.recordTrialUsage({
userId: user.id,
provider: Provider.stripe,
externalRef: session.id,
metadata: { source: 'checkout_session' },
});
}
return session;
}
async getSubscription(args: z.infer<typeof UserSubscriptionIdentity>) {
@@ -196,6 +215,16 @@ export class UserSubscriptionManager extends SubscriptionManager {
});
}
async getVisibleSubscription(args: z.infer<typeof UserSubscriptionIdentity>) {
return this.db.subscription.findFirst({
where: {
targetId: args.userId,
plan: args.plan,
...visibleSubscriptionWhere(),
},
});
}
async saveStripeSubscription(subscription: KnownStripeSubscription) {
const { userId, lookupKey, stripeSubscription } = subscription;
this.assertUserIdExists(userId);
@@ -220,23 +249,54 @@ export class UserSubscriptionManager extends SubscriptionManager {
}
const subscriptionData = this.transformSubscription(subscription);
await this.upsertStripeProviderSubscription(subscription, subscriptionData);
const saved = await this.db.subscription.upsert({
where: {
stripeSubscriptionId: stripeSubscription.id,
},
update: pick(subscriptionData, [
'status',
'stripeScheduleId',
'nextBillAt',
'canceledAt',
'end',
]),
create: {
targetId: userId,
...omit(subscriptionData, ['provider', 'iapStore']),
},
if (
lookupKey.plan === SubscriptionPlan.AI &&
(stripeSubscription.status === SubscriptionStatus.Trialing ||
stripeSubscription.trial_start ||
stripeSubscription.trial_end)
) {
await this.recordTrialUsage({
userId,
provider: Provider.stripe,
externalRef: stripeSubscription.id,
metadata: { source: 'stripe_subscription' },
});
}
const existingByStripeId = await this.db.subscription.findUnique({
where: { stripeSubscriptionId: stripeSubscription.id },
});
const saved = existingByStripeId
? await this.db.subscription.update({
where: { id: existingByStripeId.id },
data: pick(subscriptionData, [
'status',
'stripeScheduleId',
'nextBillAt',
'canceledAt',
'end',
]),
})
: await this.db.subscription.upsert({
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
// TODO(stable-upgrade): remove reliance on target_id_plan unique slot after contract cleanup.
where: { targetId_plan: { targetId: userId, plan: lookupKey.plan } },
update: {
...omit(subscriptionData, ['provider', 'iapStore']),
provider: Provider.stripe,
iapStore: null,
rcEntitlement: null,
rcProductId: null,
rcExternalRef: null,
},
create: {
targetId: userId,
...omit(subscriptionData, ['provider', 'iapStore']),
},
});
await this.entitlement.upsertFromCloudSubscription(saved);
return saved;
}
@@ -247,6 +307,17 @@ export class UserSubscriptionManager extends SubscriptionManager {
stripeSubscription,
}: KnownStripeSubscription) {
this.assertUserIdExists(userId);
await this.db.providerSubscription.updateMany({
where: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
data: {
status: SubscriptionStatus.Canceled,
canceledAt: new Date(),
periodEnd: new Date(),
},
});
const result = await this.db.subscription.deleteMany({
where: {
stripeSubscriptionId: stripeSubscription.id,
@@ -311,6 +382,7 @@ export class UserSubscriptionManager extends SubscriptionManager {
this.assertUserIdExists(userId);
const invoiceData = await this.transformInvoice(knownInvoice);
await this.upsertStripePaymentEvent(knownInvoice, invoiceData);
const invoice = await this.db.invoice.upsert({
where: {
@@ -357,6 +429,7 @@ export class UserSubscriptionManager extends SubscriptionManager {
if (prevSubscription) {
if (prevSubscription.stripeSubscriptionId) {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const subscription = await this.db.subscription.update({
where: {
id: prevSubscription.id,
@@ -382,6 +455,7 @@ export class UserSubscriptionManager extends SubscriptionManager {
);
}
} else {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const subscription = await this.db.subscription.create({
data: {
targetId: knownInvoice.userId,
@@ -420,6 +494,7 @@ export class UserSubscriptionManager extends SubscriptionManager {
return;
}
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
await this.db.subscription.update({
where: {
id: subscription.id,
@@ -463,6 +538,7 @@ export class UserSubscriptionManager extends SubscriptionManager {
: Date.now() / 1000);
if (subscription) {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const saved = await this.db.subscription.update({
where: { id: subscription.id },
data: {
@@ -475,6 +551,7 @@ export class UserSubscriptionManager extends SubscriptionManager {
});
await this.entitlement.upsertFromCloudSubscription(saved);
} else {
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
const saved = await this.db.subscription.create({
data: {
targetId: userId,
@@ -530,33 +607,282 @@ export class UserSubscriptionManager extends SubscriptionManager {
return lookupKey.variant === null;
}
private async strategyStatus(
customer: UserStripeCustomer
): Promise<PriceStrategyStatus> {
let proSubscribed = false;
let aiSubscribed = false;
const subscriptions = await this.stripe.subscriptions.list({
customer: customer.stripeCustomerId,
status: 'all',
private async assertNoActiveLocalEntitlement(
userId: string,
lookupKey: LookupKey
) {
const entitlements = await this.entitlement.getActiveEntitlements(
'user',
userId
);
const existing = entitlements.find(entitlement => {
if (lookupKey.plan === SubscriptionPlan.Pro) {
return (
entitlement.plan === 'pro' || entitlement.plan === 'lifetime_pro'
);
}
if (lookupKey.plan === SubscriptionPlan.AI) {
return entitlement.plan === 'ai';
}
return false;
});
if (!existing) {
return;
}
for (const sub of subscriptions.data) {
const lookupKey = retriveLookupKeyFromStripeSubscription(sub);
if (!lookupKey) {
const metadata = existing.metadata as { provider?: string | null };
if (metadata.provider === Provider.revenuecat) {
throw new ManagedByAppStoreOrPlay();
}
if (
!this.canCheckoutWithExistingSubscription(
(existing.metadata as { recurring?: string | null }).recurring ??
SubscriptionRecurring.Monthly,
lookupKey
)
) {
throw new SubscriptionAlreadyExists({ plan: lookupKey.plan });
}
}
private async hasUsedTrial(userId: string, plan: SubscriptionPlan) {
return !!(await this.db.subscriptionTrialUsage.findUnique({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: userId,
plan,
},
},
select: { id: true },
}));
}
private async recordTrialUsage(input: {
userId: string;
provider: Provider;
externalRef: string | null;
metadata: Record<string, unknown>;
}) {
await this.db.subscriptionTrialUsage.upsert({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: input.userId,
plan: SubscriptionPlan.AI,
},
},
update: {
provider: input.provider,
externalRef: input.externalRef,
metadata: input.metadata as Prisma.InputJsonObject,
},
create: {
targetType: 'user',
targetId: input.userId,
plan: SubscriptionPlan.AI,
provider: input.provider,
externalRef: input.externalRef,
metadata: input.metadata as Prisma.InputJsonObject,
},
});
}
private async upsertStripeProviderSubscription(
known: KnownStripeSubscription,
subscriptionData: Subscription
) {
const { userId, lookupKey, stripeSubscription } = known;
this.assertUserIdExists(userId);
const price = stripeSubscription.items.data[0]?.price;
await this.db.providerSubscription.upsert({
where: {
provider_externalSubscriptionId: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
},
update: {
targetType: 'user',
targetId: userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
create: {
provider: Provider.stripe,
targetType: 'user',
targetId: userId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalSubscriptionId: stripeSubscription.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
});
}
private async upsertStripePaymentEvent(
known: KnownStripeInvoice,
invoiceData: Awaited<
ReturnType<UserSubscriptionManager['transformInvoice']>
>
) {
const { userId, lookupKey, stripeInvoice } = known;
this.assertUserIdExists(userId);
await this.db.paymentEvent.upsert({
where: {
provider_externalEventId: {
provider: Provider.stripe,
externalEventId: `stripe_invoice:${stripeInvoice.id}`,
},
},
update: {
eventType: `invoice.${invoiceData.status}`,
targetType: 'user',
targetId: userId,
externalInvoiceId: stripeInvoice.id,
plan: lookupKey.plan,
amount: invoiceData.amount,
currency: invoiceData.currency,
occurredAt:
typeof stripeInvoice.created === 'number'
? new Date(stripeInvoice.created * 1000)
: undefined,
processingStatus: 'processed',
processedAt: new Date(),
metadata: known.metadata,
},
create: {
provider: Provider.stripe,
eventType: `invoice.${invoiceData.status}`,
externalEventId: `stripe_invoice:${stripeInvoice.id}`,
targetType: 'user',
targetId: userId,
externalInvoiceId: stripeInvoice.id,
plan: lookupKey.plan,
amount: invoiceData.amount,
currency: invoiceData.currency,
occurredAt:
typeof stripeInvoice.created === 'number'
? new Date(stripeInvoice.created * 1000)
: undefined,
processingStatus: 'processed',
processedAt: new Date(),
metadata: known.metadata,
},
});
}
private isCurrentStripeSubscription(subscription: Stripe.Subscription) {
return [
SubscriptionStatus.Active,
SubscriptionStatus.Trialing,
SubscriptionStatus.PastDue,
].includes(subscription.status as SubscriptionStatus);
}
private canCheckoutWithExistingSubscription(
existingRecurring: string,
lookupKey: LookupKey
) {
return (
existingRecurring !== SubscriptionRecurring.Lifetime &&
lookupKey.recurring === SubscriptionRecurring.Lifetime
);
}
private assertNoActiveStripeSubscription(
subscriptions: Stripe.Subscription[],
lookupKey: LookupKey
) {
for (const subscription of subscriptions) {
if (!this.isCurrentStripeSubscription(subscription)) {
continue;
}
if (lookupKey.plan === SubscriptionPlan.Pro) {
proSubscribed = true;
}
if (lookupKey.plan === SubscriptionPlan.AI) {
aiSubscribed = true;
const activeLookupKey =
retriveLookupKeyFromStripeSubscription(subscription);
if (
activeLookupKey?.plan === lookupKey.plan &&
!this.canCheckoutWithExistingSubscription(
activeLookupKey.recurring,
lookupKey
)
) {
throw new SubscriptionAlreadyExists({ plan: lookupKey.plan });
}
}
}
return { proSubscribed, aiSubscribed };
private async assertNoActiveRevenueCatSubscription(
userId: string,
lookupKey: LookupKey
) {
if (!this.config.payment.revenuecat?.enabled) {
return;
}
let subscriptions: Awaited<
ReturnType<RevenueCatService['getSubscriptions']>
>;
try {
subscriptions = await this.revenueCat.getSubscriptions(userId);
} catch (e) {
this.logger.warn(
`Failed to fetch RevenueCat subscriptions for ${userId}`,
e
);
return;
}
const productMap = this.config.payment.revenuecat?.productMap;
if (
subscriptions?.some(subscription => {
if (!subscription.isActive) return false;
const mapping = resolveProductMapping(subscription, productMap);
return mapping?.plan === lookupKey.plan;
})
) {
throw new ManagedByAppStoreOrPlay();
}
}
private assertUserIdExists(
@@ -139,6 +139,11 @@ export class WorkspaceSubscriptionManager extends SubscriptionManager {
}
const subscriptionData = this.transformSubscription(subscription);
await this.upsertStripeProviderSubscription(
workspaceId,
subscription,
subscriptionData
);
if (
stripeSubscription.status === SubscriptionStatus.Active ||
@@ -159,6 +164,8 @@ export class WorkspaceSubscriptionManager extends SubscriptionManager {
}
const saved = await this.db.subscription.upsert({
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
// TODO(stable-upgrade): remove reliance on target_id_plan unique slot after contract cleanup.
where: {
provider: Provider.stripe,
stripeSubscriptionId: stripeSubscription.id,
@@ -194,6 +201,17 @@ export class WorkspaceSubscriptionManager extends SubscriptionManager {
);
}
await this.db.providerSubscription.updateMany({
where: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
data: {
status: SubscriptionStatus.Canceled,
canceledAt: new Date(),
periodEnd: new Date(),
},
});
const result = await this.db.subscription.deleteMany({
where: { stripeSubscriptionId: stripeSubscription.id },
});
@@ -337,4 +355,74 @@ export class WorkspaceSubscriptionManager extends SubscriptionManager {
await schedule.updateQuantity(count);
}
}
private async upsertStripeProviderSubscription(
workspaceId: string,
known: KnownStripeSubscription,
subscriptionData: Subscription
) {
const { lookupKey, stripeSubscription } = known;
const price = stripeSubscription.items.data[0]?.price;
await this.db.providerSubscription.upsert({
where: {
provider_externalSubscriptionId: {
provider: Provider.stripe,
externalSubscriptionId: stripeSubscription.id,
},
},
update: {
targetType: 'workspace',
targetId: workspaceId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
create: {
provider: Provider.stripe,
targetType: 'workspace',
targetId: workspaceId,
plan: lookupKey.plan,
recurring: lookupKey.recurring,
status: stripeSubscription.status,
externalCustomerId:
typeof stripeSubscription.customer === 'string'
? stripeSubscription.customer
: stripeSubscription.customer.id,
externalSubscriptionId: stripeSubscription.id,
externalProductId:
typeof price?.product === 'string'
? price.product
: price?.product?.id,
externalPriceId: price?.id,
currency: price?.currency,
amount: price?.unit_amount ?? null,
quantity: known.quantity,
periodStart: subscriptionData.start,
periodEnd: subscriptionData.end,
trialStart: subscriptionData.trialStart,
trialEnd: subscriptionData.trialEnd,
canceledAt: subscriptionData.canceledAt,
metadata: known.metadata,
},
});
}
}
@@ -12,7 +12,8 @@ import {
ResolveField,
Resolver,
} from '@nestjs/graphql';
import { PrismaClient, Provider, type User } from '@prisma/client';
import type { Entitlement, User } from '@prisma/client';
import { PrismaClient, Provider } from '@prisma/client';
import { GraphQLJSONObject } from 'graphql-scalars';
import { groupBy } from 'lodash-es';
import Stripe from 'stripe';
@@ -27,15 +28,11 @@ import {
WorkspaceIdRequiredToUpdateTeamSubscription,
} from '../../base';
import { CurrentUser, Public } from '../../core/auth';
import { EntitlementService } from '../../core/entitlement';
import { PermissionAccess } from '../../core/permission';
import { UserType } from '../../core/user';
import { WorkspaceType } from '../../core/workspaces';
import {
Invoice,
Subscription,
visibleSubscriptionWhere,
WorkspaceSubscriptionManager,
} from './manager';
import { Invoice, Subscription, visibleSubscriptionWhere } from './manager';
import { RevenueCatWebhookHandler } from './revenuecat';
import { CheckoutParams, SubscriptionService } from './service';
import {
@@ -463,6 +460,7 @@ export class SubscriptionResolver {
export class UserSubscriptionResolver {
constructor(
private readonly db: PrismaClient,
private readonly entitlement: EntitlementService,
private readonly rcHandler: RevenueCatWebhookHandler
) {}
@@ -473,6 +471,90 @@ export class UserSubscriptionResolver {
return s;
}
private async currentUserSubscriptions(userId: string) {
const entitlements = (
await this.entitlement.getActiveEntitlements('user', userId)
).filter(
entitlement =>
entitlement.source === 'cloud_subscription' &&
['pro', 'lifetime_pro', 'ai'].includes(entitlement.plan)
);
const providerFacts = await this.db.providerSubscription.findMany({
where: {
targetType: 'user',
targetId: userId,
plan: {
in: entitlements.map(entitlement =>
this.subscriptionPlan(entitlement.plan)
),
},
status: {
in: [
SubscriptionStatus.Active,
SubscriptionStatus.Trialing,
SubscriptionStatus.PastDue,
],
},
OR: [{ periodEnd: null }, { periodEnd: { gt: new Date() } }],
},
orderBy: { updatedAt: 'desc' },
});
return entitlements.map(entitlement => {
const plan = this.subscriptionPlan(entitlement.plan);
const providerFact = providerFacts.find(
fact => fact.targetId === userId && fact.plan === plan
);
const metadata = entitlement.metadata as {
provider?: string | null;
recurring?: string | null;
variant?: string | null;
stripeSubscriptionId?: string | null;
};
return this.normalizeSubscription({
stripeSubscriptionId:
providerFact?.externalSubscriptionId ??
metadata.stripeSubscriptionId ??
null,
stripeScheduleId: null,
status: providerFact?.status ?? this.subscriptionStatus(entitlement),
plan,
recurring:
providerFact?.recurring ??
metadata.recurring ??
(entitlement.plan === 'lifetime_pro'
? SubscriptionRecurring.Lifetime
: SubscriptionRecurring.Monthly),
variant:
metadata.variant ??
(entitlement.plan === 'lifetime_pro'
? SubscriptionVariant.Onetime
: null),
quantity: entitlement.quantity ?? 1,
start: entitlement.startsAt ?? entitlement.createdAt,
end: entitlement.expiresAt,
trialStart: providerFact?.trialStart ?? null,
trialEnd: providerFact?.trialEnd ?? entitlement.graceUntil,
nextBillAt: providerFact?.periodEnd ?? entitlement.expiresAt,
canceledAt: providerFact?.canceledAt ?? null,
provider: providerFact?.provider ?? metadata.provider ?? null,
iapStore: providerFact?.iapStore ?? null,
});
});
}
private subscriptionPlan(plan: string) {
return plan === 'lifetime_pro' ? SubscriptionPlan.Pro : plan;
}
private subscriptionStatus(entitlement: Entitlement) {
if (entitlement.status === 'grace') {
return SubscriptionStatus.PastDue;
}
return SubscriptionStatus.Active;
}
@ResolveField(() => [SubscriptionType])
async subscriptions(
@CurrentUser() me: User,
@@ -482,18 +564,7 @@ export class UserSubscriptionResolver {
throw new AccessDenied();
}
const subscriptions = await this.db.subscription.findMany({
where: {
targetId: user.id,
...visibleSubscriptionWhere(),
},
});
subscriptions.forEach(subscription =>
this.normalizeSubscription(subscription)
);
return subscriptions;
return this.currentUserSubscriptions(user.id);
}
@ResolveField(() => Int, {
@@ -560,17 +631,10 @@ export class UserSubscriptionResolver {
try {
await this.rcHandler.syncAppUserWithExternalRef(user.id, transactionId);
current = await this.db.subscription.findMany({
where: {
targetId: user.id,
...visibleSubscriptionWhere(),
},
});
current = await this.currentUserSubscriptions(user.id);
// ignore errors
} catch {}
current.forEach(subscription => this.normalizeSubscription(subscription));
return current;
}
@@ -612,39 +676,93 @@ export class UserSubscriptionResolver {
if (shouldSync) {
try {
await this.rcHandler.syncAppUser(user.id);
current = await this.db.subscription.findMany({
where: {
targetId: user.id,
...visibleSubscriptionWhere(),
},
});
// ignore errors
} catch {}
}
current.forEach(subscription => this.normalizeSubscription(subscription));
return current;
return this.currentUserSubscriptions(user.id);
}
}
@Resolver(() => WorkspaceType)
export class WorkspaceSubscriptionResolver {
constructor(
private readonly service: WorkspaceSubscriptionManager,
private readonly db: PrismaClient,
private readonly entitlement: EntitlementService,
private readonly ac: PermissionAccess
) {}
private async currentWorkspaceSubscription(workspaceId: string) {
const entitlement = await this.entitlement.getBestEntitlement(
'workspace',
workspaceId
);
if (
!entitlement ||
entitlement.source !== 'cloud_subscription' ||
entitlement.plan !== 'team'
) {
return null;
}
const providerFact = await this.db.providerSubscription.findFirst({
where: {
targetType: 'workspace',
targetId: workspaceId,
plan: SubscriptionPlan.Team,
status: {
in: [
SubscriptionStatus.Active,
SubscriptionStatus.Trialing,
SubscriptionStatus.PastDue,
],
},
OR: [{ periodEnd: null }, { periodEnd: { gt: new Date() } }],
},
orderBy: { updatedAt: 'desc' },
});
const metadata = entitlement.metadata as {
provider?: string | null;
recurring?: string | null;
variant?: string | null;
stripeSubscriptionId?: string | null;
};
return {
stripeSubscriptionId:
providerFact?.externalSubscriptionId ??
metadata.stripeSubscriptionId ??
null,
stripeScheduleId: null,
status:
providerFact?.status ??
(entitlement.status === 'grace'
? SubscriptionStatus.PastDue
: SubscriptionStatus.Active),
plan: SubscriptionPlan.Team,
recurring:
providerFact?.recurring ??
metadata.recurring ??
SubscriptionRecurring.Monthly,
variant: metadata.variant ?? null,
quantity: entitlement.quantity ?? 1,
start: entitlement.startsAt ?? entitlement.createdAt,
end: entitlement.expiresAt,
trialStart: providerFact?.trialStart ?? null,
trialEnd: providerFact?.trialEnd ?? entitlement.graceUntil,
nextBillAt: providerFact?.periodEnd ?? entitlement.expiresAt,
canceledAt: providerFact?.canceledAt ?? null,
provider: providerFact?.provider ?? metadata.provider ?? null,
iapStore: providerFact?.iapStore ?? null,
};
}
@ResolveField(() => SubscriptionType, {
nullable: true,
description: 'The team subscription of the workspace, if exists.',
})
async subscription(@Parent() workspace: WorkspaceType) {
return this.service.getActiveSubscription({
plan: SubscriptionPlan.Team,
workspaceId: workspace.id,
});
return this.currentWorkspaceSubscription(workspace.id);
}
@ResolveField(() => Int, {
@@ -165,6 +165,85 @@ export class RevenueCatWebhookHandler {
const end = overrideExpirationDate || sub.expirationDate || null;
const nextBillAt = end; // period end serves as next bill anchor for IAP
if (rcExternalRef && iapStore) {
await this.db.providerSubscription.upsert({
where: {
provider_iapStore_externalRef_externalProductId_externalCustomerId:
{
provider: Provider.revenuecat,
iapStore,
externalRef: rcExternalRef,
externalProductId: sub.productId,
externalCustomerId: sub.customerId,
},
},
update: {
targetType: 'user',
targetId: appUserId,
plan: mapping.plan,
recurring: mapping.recurring,
status,
externalCustomerId: sub.customerId,
externalProductId: sub.productId,
iapStore,
externalRef: rcExternalRef,
periodStart: start,
periodEnd: end,
canceledAt,
metadata: {
entitlement: sub.identifier,
isTrial: sub.isTrial,
willRenew: sub.willRenew,
},
},
create: {
provider: Provider.revenuecat,
targetType: 'user',
targetId: appUserId,
plan: mapping.plan,
recurring: mapping.recurring,
status,
externalCustomerId: sub.customerId,
externalProductId: sub.productId,
iapStore,
externalRef: rcExternalRef,
periodStart: start,
periodEnd: end,
canceledAt,
metadata: {
entitlement: sub.identifier,
isTrial: sub.isTrial,
willRenew: sub.willRenew,
},
},
});
}
if (mapping.plan === SubscriptionPlan.AI && sub.isTrial) {
await this.db.subscriptionTrialUsage.upsert({
where: {
targetType_targetId_plan: {
targetType: 'user',
targetId: appUserId,
plan: SubscriptionPlan.AI,
},
},
update: {},
create: {
targetType: 'user',
targetId: appUserId,
plan: SubscriptionPlan.AI,
provider: Provider.revenuecat,
externalRef: rcExternalRef,
firstUsedAt: start,
metadata: {
entitlement: sub.identifier,
productId: sub.productId,
},
},
});
}
// Mutual exclusion: skip if Stripe already active for the same plan
const conflict = await this.db.subscription.findFirst({
where: {
@@ -214,6 +293,8 @@ export class RevenueCatWebhookHandler {
}
const saved = await this.db.subscription.upsert({
// TODO(stable-upgrade): remove legacy subscriptions dual-write after stable supports provider facts.
// TODO(stable-upgrade): remove reliance on target_id_plan unique slot after contract cleanup.
where: {
targetId_plan: { targetId: appUserId, plan: mapping.plan },
},
@@ -629,6 +629,7 @@ export class SubscriptionService {
`Failed to handle ${reason} for invoice ${invoiceId}`,
e
);
throw e;
}
}