fix(server): query & backfill perf (#15144)

#### PR Dependency Tree


* **PR #15144** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Document history retention is now explicitly controlled via
caller-provided max-age parameters during pending doc compaction.

* **Improvements**
* Quota state backfilling/reconciliation was improved to reduce
unnecessary work and ensure missing quota states are created in batches.
* Permission context loading now more strictly respects “known” vs
“stale” quota runtime state.

* **Bug Fixes**
* Workspace member responses now populate invite IDs correctly from the
nested user information.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-06-23 10:08:24 +08:00
committed by GitHub
parent fa488aee64
commit f44a7978d9
10 changed files with 331 additions and 207 deletions
+5 -1
View File
@@ -13,8 +13,12 @@ export declare class BackendRuntime {
*
* Do not use this for snapshots that will be sent back to yjs clients until
* the y-octo/yjs round-trip compatibility issue is resolved.
*
* The caller owns quota reconciliation and must pass a fresh
* historyMaxAgeSeconds value. The compactor intentionally does not read
* effective workspace quota state.
*/
compactPendingDocUpdates(workspaceId: string, docId: string, batchLimit: number, historyMinIntervalMs: number, owner: string, leaseTtlMs: number): Promise<RuntimeDocCompactionResult>
compactPendingDocUpdates(workspaceId: string, docId: string, batchLimit: number, historyMinIntervalMs: number, historyMaxAgeSeconds: number, owner: string, leaseTtlMs: number): Promise<RuntimeDocCompactionResult>
upsertDocSnapshot(workspaceId: string, docId: string, blob: Buffer, timestampMs: number, editorId?: string | undefined | null): Promise<boolean>
createDocHistory(input: RuntimeDocHistoryInput): Promise<boolean>
deleteDocStorage(workspaceId: string, docId: string): Promise<void>
@@ -1,4 +1,3 @@
pub(super) const DEFAULT_HISTORY_PERIOD_SECONDS: i32 = 7 * 24 * 60 * 60;
pub(super) const BYOK_LOCAL_LEASE_ACTIVE_PURPOSE: &str = "copilot_byok_local_lease:active";
pub(super) const BYOK_LOCAL_LEASE_PURPOSE: &str = "copilot_byok_local_lease";
pub(super) const MAGIC_LINK_OTP_PURPOSE: &str = "magic_link_otp";
@@ -3,9 +3,7 @@ use napi::Result;
use sqlx::{FromRow, PgPool, Postgres, Row, Transaction};
use y_octo::Doc;
use super::{
BackendRuntime, constants::DEFAULT_HISTORY_PERIOD_SECONDS, error::napi_error, types::RuntimeDocCompactionResult,
};
use super::{BackendRuntime, error::napi_error, types::RuntimeDocCompactionResult};
#[derive(FromRow)]
struct SnapshotRow {
@@ -36,6 +34,7 @@ impl DocCompactorStore {
doc_id: &str,
batch_limit: i64,
history_min_interval_ms: i64,
history_max_age_seconds: i64,
) -> Result<(i64, bool)> {
compact_doc(
self.pool.clone(),
@@ -43,6 +42,7 @@ impl DocCompactorStore {
doc_id,
batch_limit,
history_min_interval_ms,
history_max_age_seconds,
)
.await
}
@@ -64,6 +64,14 @@ fn apply_updates(updates: impl IntoIterator<Item = Vec<u8>>) -> Result<Vec<u8>>
.map_err(|err| napi_error(format!("DocCompactor encode failed: {err}")))
}
fn checked_milliseconds(value: i64, field: &str) -> Result<Duration> {
Duration::try_milliseconds(value).ok_or_else(|| napi_error(format!("DocCompactor {field} is too large")))
}
fn checked_seconds(value: i64, field: &str) -> Result<Duration> {
Duration::try_seconds(value).ok_or_else(|| napi_error(format!("DocCompactor {field} is too large")))
}
async fn load_snapshot(
tx: &mut Transaction<'_, Postgres>,
workspace_id: &str,
@@ -186,27 +194,13 @@ async fn should_create_history(
return Ok(false);
}
Ok(last_timestamp < snapshot.updated_at - Duration::milliseconds(history_min_interval_ms))
}
let min_interval = checked_milliseconds(history_min_interval_ms, "history interval")?;
let threshold = snapshot
.updated_at
.checked_sub_signed(min_interval)
.ok_or_else(|| napi_error("DocCompactor history interval is out of range"))?;
async fn history_max_age_seconds(tx: &mut Transaction<'_, Postgres>, workspace_id: &str) -> Result<i32> {
let row = sqlx::query(
r#"
SELECT history_period_seconds
FROM effective_workspace_quota_states
WHERE workspace_id = $1
"#,
)
.bind(workspace_id)
.fetch_optional(&mut **tx)
.await
.map_err(|err| napi_error(format!("DocCompactor load history quota failed: {err}")))?;
Ok(
row
.map(|row| row.get("history_period_seconds"))
.unwrap_or(DEFAULT_HISTORY_PERIOD_SECONDS),
)
Ok(last_timestamp < threshold)
}
async fn create_history(
@@ -214,13 +208,16 @@ async fn create_history(
workspace_id: &str,
doc_id: &str,
snapshot: &SnapshotRow,
max_age_seconds: i64,
) -> Result<bool> {
let max_age_seconds = history_max_age_seconds(tx, workspace_id).await?;
if max_age_seconds <= 0 {
return Ok(false);
}
let expired_at = Utc::now() + Duration::seconds(max_age_seconds as i64);
let max_age = checked_seconds(max_age_seconds, "history max age")?;
let expired_at = Utc::now()
.checked_add_signed(max_age)
.ok_or_else(|| napi_error("DocCompactor history max age is out of range"))?;
sqlx::query(
r#"
INSERT INTO snapshot_histories
@@ -274,6 +271,7 @@ async fn compact_doc(
doc_id: &str,
batch_limit: i64,
history_min_interval_ms: i64,
history_max_age_seconds: i64,
) -> Result<(i64, bool)> {
let mut tx = pool
.begin()
@@ -317,7 +315,7 @@ async fn compact_doc(
&& let Some(snapshot) = &snapshot
&& should_create_history(&mut tx, snapshot, workspace_id, doc_id, history_min_interval_ms).await?
{
history_created = create_history(&mut tx, workspace_id, doc_id, snapshot).await?;
history_created = create_history(&mut tx, workspace_id, doc_id, snapshot, history_max_age_seconds).await?;
}
let timestamps = updates.iter().map(|update| update.created_at).collect::<Vec<_>>();
@@ -336,13 +334,20 @@ impl BackendRuntime {
///
/// Do not use this for snapshots that will be sent back to yjs clients until
/// the y-octo/yjs round-trip compatibility issue is resolved.
///
/// The caller owns quota reconciliation and must pass a fresh
/// history_max_age_seconds value. The compactor intentionally does not read
/// effective_workspace_quota_states; if a future caller cannot provide a
/// fresh quota state, fail and retry after Node reconciles it.
#[napi]
#[allow(clippy::too_many_arguments)]
pub async fn compact_pending_doc_updates(
&self,
workspace_id: String,
doc_id: String,
batch_limit: i64,
history_min_interval_ms: i64,
history_max_age_seconds: i64,
owner: String,
lease_ttl_ms: i64,
) -> Result<RuntimeDocCompactionResult> {
@@ -352,6 +357,16 @@ impl BackendRuntime {
if history_min_interval_ms < 0 {
return Err(napi_error("doc compactor history interval must be non-negative"));
}
if history_max_age_seconds < 0 {
return Err(napi_error("doc compactor history max age must be non-negative"));
}
checked_milliseconds(history_min_interval_ms, "history interval")?;
if history_max_age_seconds > 0 {
let max_age = checked_seconds(history_max_age_seconds, "history max age")?;
Utc::now()
.checked_add_signed(max_age)
.ok_or_else(|| napi_error("DocCompactor history max age is out of range"))?;
}
let lease_key = format!("doc:update:{workspace_id}:{doc_id}");
let Some(lease) = self.acquire_coordination_lease(lease_key, owner, lease_ttl_ms).await? else {
@@ -366,7 +381,13 @@ impl BackendRuntime {
};
let result = DocCompactorStore::new(self.pool().await?)
.compact_doc(&workspace_id, &doc_id, batch_limit, history_min_interval_ms)
.compact_doc(
&workspace_id,
&doc_id,
batch_limit,
history_min_interval_ms,
history_max_age_seconds,
)
.await;
let released = self
@@ -20,6 +20,8 @@ type Metadata = {
legacyProjected?: boolean;
};
const BACKFILL_BATCH_SIZE = 1000;
@Injectable()
export class LegacyEntitlementProjectionService {
constructor(
@@ -111,11 +113,23 @@ export class LegacyEntitlementProjectionService {
}: {
cleanupLegacy: boolean;
}) {
const [subscriptions, users, workspaces] = await Promise.all([
this.db.subscription.findMany(),
this.db.user.findMany({ select: { id: true } }),
this.db.workspace.findMany({ select: { id: true } }),
]);
const [subscriptionCount, invoiceCount, installedLicenseCount] =
await Promise.all([
this.db.subscription.count(),
this.db.invoice.count(),
this.db.installedLicense.count(),
]);
if (
subscriptionCount === 0 &&
invoiceCount === 0 &&
installedLicenseCount === 0
) {
await this.#backfillQuotaStateStaleFlags();
return;
}
const subscriptions = await this.db.subscription.findMany();
for (const subscription of subscriptions) {
if (!(await this.#subscriptionTargetExists(subscription))) {
@@ -148,44 +162,89 @@ export class LegacyEntitlementProjectionService {
await this.#backfillPaymentEvents();
await this.scanInstalledLicenses({ emit: cleanupLegacy });
await this.#backfillQuotaStateStaleFlags();
}
async #backfillQuotaStateStaleFlags() {
await Promise.all([
...users.map(user =>
this.db.effectiveUserQuotaState.upsert({
where: { userId: user.id },
update: { stale: true },
create: {
userId: user.id,
plan: 'free',
blobLimit: BigInt(0),
storageQuota: BigInt(0),
usedStorageQuota: BigInt(0),
historyPeriodSeconds: 0,
known: false,
stale: true,
},
})
),
...workspaces.map(workspace =>
this.db.effectiveWorkspaceQuotaState.upsert({
where: { workspaceId: workspace.id },
update: { stale: true },
create: {
workspaceId: workspace.id,
plan: 'free',
usesOwnerQuota: true,
seatLimit: 0,
memberCount: 0,
overcapacityMemberCount: 0,
blobLimit: BigInt(0),
storageQuota: BigInt(0),
usedStorageQuota: BigInt(0),
historyPeriodSeconds: 0,
known: false,
stale: true,
},
})
),
this.db.effectiveUserQuotaState.updateMany({
data: { stale: true },
}),
this.db.effectiveWorkspaceQuotaState.updateMany({
data: { stale: true },
}),
]);
await Promise.all([
this.#createMissingUserQuotaStates(),
this.#createMissingWorkspaceQuotaStates(),
]);
}
async #createMissingUserQuotaStates() {
let lastId: string | undefined;
while (true) {
const users = await this.db.user.findMany({
select: { id: true },
where: lastId ? { id: { gt: lastId } } : undefined,
orderBy: { id: 'asc' },
take: BACKFILL_BATCH_SIZE,
});
if (!users.length) {
break;
}
await this.db.effectiveUserQuotaState.createMany({
data: users.map(user => ({
userId: user.id,
plan: 'free',
blobLimit: BigInt(0),
storageQuota: BigInt(0),
usedStorageQuota: BigInt(0),
historyPeriodSeconds: 0,
known: false,
stale: true,
})),
skipDuplicates: true,
});
lastId = users.at(-1)?.id;
}
}
async #createMissingWorkspaceQuotaStates() {
let lastId: string | undefined;
while (true) {
const workspaces = await this.db.workspace.findMany({
select: { id: true },
where: lastId ? { id: { gt: lastId } } : undefined,
orderBy: { id: 'asc' },
take: BACKFILL_BATCH_SIZE,
});
if (!workspaces.length) {
break;
}
await this.db.effectiveWorkspaceQuotaState.createMany({
data: workspaces.map(workspace => ({
workspaceId: workspace.id,
plan: 'free',
usesOwnerQuota: true,
seatLimit: 0,
memberCount: 0,
overcapacityMemberCount: 0,
blobLimit: BigInt(0),
storageQuota: BigInt(0),
usedStorageQuota: BigInt(0),
historyPeriodSeconds: 0,
known: false,
stale: true,
})),
skipDuplicates: true,
});
lastId = workspaces.at(-1)?.id;
}
}
async #backfillProviderSubscription(subscription: {
@@ -310,7 +310,7 @@ export class PermissionContextLoader {
private async workspaceRuntime(workspaceId: string) {
return this.memo(this.cache.workspaceRuntime, workspaceId, () =>
this.models.workspaceRuntimeState.get(workspaceId).then(async state => {
if (state.known || !state.stale) {
if (state.known && !state.stale) {
return state;
}
+33 -60
View File
@@ -49,31 +49,21 @@ export class QuotaStateService {
};
const now = new Date();
const update = {
plan: resolved.plan,
sourceEntitlementId: entitlement?.id ?? null,
...this.quotaData(resolved.quota),
usedStorageQuota,
flags,
known: true,
stale: false,
lastReconciledAt: now,
staleAfter: this.staleAfter(now),
};
const state = await this.db.effectiveUserQuotaState.upsert({
where: { userId },
update: {
plan: resolved.plan,
sourceEntitlementId: entitlement?.id ?? null,
...this.quotaData(resolved.quota),
usedStorageQuota,
flags,
known: true,
stale: false,
lastReconciledAt: now,
staleAfter: this.staleAfter(now),
},
create: {
userId,
plan: resolved.plan,
sourceEntitlementId: entitlement?.id ?? null,
...this.quotaData(resolved.quota),
usedStorageQuota,
flags,
known: true,
stale: false,
lastReconciledAt: now,
staleAfter: this.staleAfter(now),
},
update,
create: { userId, ...update },
});
if ((options.emit ?? true) && this.userQuotaStateChanged(previous, state)) {
await this.event.emitAsync('user.quota_state.changed', { userId });
@@ -122,45 +112,28 @@ export class QuotaStateService {
].filter((reason): reason is string => !!reason);
const now = new Date();
const update = {
plan,
sourceEntitlementId: entitlement?.id ?? null,
ownerUserId: owner.id,
usesOwnerQuota,
seatLimit,
memberCount,
overcapacityMemberCount,
...this.workspaceQuotaData(quota),
usedStorageQuota,
readonly: readonlyReasons.length > 0,
readonlyReasons,
flags: resolved.flags,
known: true,
stale: false,
lastReconciledAt: now,
staleAfter: this.staleAfter(now),
};
const state = await this.db.effectiveWorkspaceQuotaState.upsert({
where: { workspaceId },
update: {
plan,
sourceEntitlementId: entitlement?.id ?? null,
ownerUserId: owner.id,
usesOwnerQuota,
seatLimit,
memberCount,
overcapacityMemberCount,
...this.workspaceQuotaData(quota),
usedStorageQuota,
readonly: readonlyReasons.length > 0,
readonlyReasons,
flags: resolved.flags,
known: true,
stale: false,
lastReconciledAt: now,
staleAfter: this.staleAfter(now),
},
create: {
workspaceId,
plan,
sourceEntitlementId: entitlement?.id ?? null,
ownerUserId: owner.id,
usesOwnerQuota,
seatLimit,
memberCount,
overcapacityMemberCount,
...this.workspaceQuotaData(quota),
usedStorageQuota,
readonly: readonlyReasons.length > 0,
readonlyReasons,
flags: resolved.flags,
known: true,
stale: false,
lastReconciledAt: now,
staleAfter: this.staleAfter(now),
},
update,
create: { workspaceId, ...update },
});
if (
(options.emit ?? true) &&
@@ -48,7 +48,7 @@ function serializeWorkspaceMember(
avatarUrl: row.user.avatarUrl ?? null,
permission: role,
role,
inviteId: row.id,
inviteId: row.user.id,
emailVerified: null,
status: row.status,
};
@@ -156,11 +156,11 @@ export class WorkspaceMemberResolver {
first: take ?? 8,
});
return list.map(({ id, status, type, user }) => ({
return list.map(({ status, type, user }) => ({
...user,
permission: Number(type),
role: Number(type),
inviteId: id,
inviteId: user?.id ?? '',
status,
}));
} else {
@@ -169,11 +169,11 @@ export class WorkspaceMemberResolver {
first: take ?? 8,
});
return list.map(({ id, status, type, user }) => ({
return list.map(({ status, type, user }) => ({
...user,
permission: Number(type),
role: Number(type),
inviteId: id,
inviteId: user?.id ?? '',
status,
}));
}
@@ -1,7 +1,6 @@
import { ModuleRef } from '@nestjs/core';
import { PrismaClient } from '@prisma/client';
import { WorkspacePolicyService } from '../../core/permission/policy';
import { Models } from '../../models';
export class BackfillPermissionProjection1765500000000 {
@@ -10,20 +9,7 @@ export class BackfillPermissionProjection1765500000000 {
await models.permissionProjection.backfillLegacyProjection();
await ensureWorkspaceAdminStatsDirtyTriggerGuard(db);
await repairOwnerlessWorkspaces(db);
const policy = ref.get(WorkspacePolicyService, { strict: false });
const workspaces = await db.workspace.findMany({
select: { id: true },
});
for (const workspace of workspaces) {
const state = await policy.getWorkspaceState(workspace.id);
await models.workspaceRuntimeState.upsert(workspace.id, {
readonly: state.isReadonly,
readonlyReasons: state.readonlyReasons,
known: true,
staleAfter: null,
});
}
await backfillUnknownQuotaRuntimeStates(db);
}
static async down(_db: PrismaClient) {}
@@ -55,40 +41,125 @@ async function ensureWorkspaceAdminStatsDirtyTriggerGuard(db: PrismaClient) {
`;
}
async function repairOwnerlessWorkspaces(db: PrismaClient) {
async function backfillUnknownQuotaRuntimeStates(db: PrismaClient) {
await db.$executeRaw`
WITH ownerless AS (
SELECT w.id
FROM workspaces w
WHERE NOT EXISTS (
SELECT 1
FROM workspace_members owner
WHERE owner.workspace_id = w.id
AND owner.role = 'owner'
AND owner.state = 'active'
)
),
accepted_members AS (
SELECT id
FROM (
SELECT
wm.id,
row_number() OVER (
PARTITION BY wm.workspace_id
ORDER BY wm.created_at ASC, wm.id ASC
) AS rn
FROM workspace_members wm
JOIN ownerless o ON o.id = wm.workspace_id
WHERE wm.state = 'active'
) ranked
WHERE rn = 1
)
UPDATE workspace_members wm
SET role = 'owner', updated_at = now()
FROM accepted_members am
WHERE wm.id = am.id
`;
INSERT INTO effective_user_quota_states (
user_id,
plan,
source_entitlement_id,
blob_limit,
storage_quota,
used_storage_quota,
history_period_seconds,
copilot_action_limit,
flags,
known,
stale,
last_reconciled_at,
stale_after
)
SELECT
users.id,
'free',
NULL,
0,
0,
0,
0,
NULL,
'{}'::jsonb,
false,
true,
NULL,
NULL
FROM users
ON CONFLICT (user_id)
DO UPDATE SET
stale = true,
updated_at = now()
`;
await db.$executeRaw`
WITH owners AS (
SELECT workspace_id, user_id
FROM workspace_members
WHERE role = 'owner'
AND state = 'active'
)
INSERT INTO effective_workspace_quota_states (
workspace_id,
plan,
source_entitlement_id,
owner_user_id,
uses_owner_quota,
seat_limit,
member_count,
overcapacity_member_count,
blob_limit,
storage_quota,
used_storage_quota,
history_period_seconds,
readonly,
readonly_reasons,
flags,
known,
stale,
last_reconciled_at,
stale_after
)
SELECT
workspaces.id,
'free',
NULL,
owners.user_id,
true,
0,
0,
0,
0,
0,
0,
0,
false,
ARRAY[]::text[],
'{}'::jsonb,
false,
true,
NULL,
NULL
FROM workspaces
JOIN owners ON owners.workspace_id = workspaces.id
ON CONFLICT (workspace_id)
DO UPDATE SET
stale = true,
updated_at = now()
`;
await db.$executeRaw`
INSERT INTO workspace_runtime_states (
workspace_id,
known,
readonly,
readonly_reasons,
last_reconciled_at,
stale_after,
updated_at
)
SELECT
workspace_id,
false,
false,
ARRAY[]::text[],
NULL,
NULL,
now()
FROM effective_workspace_quota_states
ON CONFLICT (workspace_id)
DO NOTHING
`;
}
async function repairOwnerlessWorkspaces(db: PrismaClient) {
await db.$executeRaw`
DELETE FROM workspaces w
WHERE NOT EXISTS (
@@ -105,4 +176,24 @@ async function repairOwnerlessWorkspaces(db: PrismaClient) {
AND member.state = 'active'
)
`;
await db.$executeRaw`
WITH accepted_members AS (
SELECT DISTINCT ON (wm.workspace_id) wm.id
FROM workspace_members wm
WHERE wm.state = 'active'
AND NOT EXISTS (
SELECT 1
FROM workspace_members owner
WHERE owner.workspace_id = wm.workspace_id
AND owner.role = 'owner'
AND owner.state = 'active'
)
ORDER BY wm.workspace_id, wm.created_at ASC, wm.id ASC
)
UPDATE workspace_members wm
SET role = 'owner', updated_at = now()
FROM accepted_members am
WHERE wm.id = am.id
`;
}
@@ -2,36 +2,13 @@ import { ModuleRef } from '@nestjs/core';
import { PrismaClient } from '@prisma/client';
import { LegacyEntitlementProjectionService } from '../../core/entitlement';
import { QuotaStateService } from '../../core/quota/state';
export class BackfillEntitlementProjection1765600000000 {
static async up(db: PrismaClient, ref: ModuleRef) {
static async up(_db: PrismaClient, ref: ModuleRef) {
const projection = ref.get(LegacyEntitlementProjectionService, {
strict: false,
});
await projection.shadowBackfillEntitlementsAndQuotaStates();
const quota = ref.get(QuotaStateService, { strict: false });
const [users, workspaces] = await Promise.all([
db.user.findMany({ select: { id: true } }),
db.workspace.findMany({ select: { id: true } }),
]);
const tasks = [
...users.map(
user => () => quota.reconcileUserQuotaState(user.id, { emit: false })
),
...workspaces.map(
workspace => () =>
quota.reconcileWorkspaceQuotaState(workspace.id, { emit: false })
),
];
const batchSize = 16;
for (let index = 0; index < tasks.length; index += batchSize) {
await Promise.all(
tasks.slice(index, index + batchSize).map(task => task())
);
}
}
static async down(_db: PrismaClient) {}