From 0a422aa158288d4f256094a71aa3310f049b33f8 Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Mon, 29 Jun 2026 00:02:38 +0800 Subject: [PATCH] feat(server): blob reconciliation (#15165) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### PR Dependency Tree * **PR #15165** 👈 This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal) ## Summary by CodeRabbit * **New Features** * Added automated backend maintenance for missing blob metadata backfill, document-to-blob reference rebuilding, and unreferenced blob cleanup planning/execution. * Introduced scheduled batch processing (workspace-paged) and paginated object-storage listing. * **Bug Fixes** * Improved reliability of object-storage reads by treating expected “not found” results as non-errors. * Strengthened blob/expired cleanup flows with runtime-driven batching and reduced coupling to metadata synchronization. * **Tests** * Expanded unit and e2e coverage for partial blob metadata and updated runtime/job cleanup test assertions. --- .docker/selfhost/schema.json | 12 + packages/backend/native/dts-header.d.ts | 6 + packages/backend/native/index.d.ts | 48 ++ packages/backend/native/package.json | 1 + .../src/backend_runtime/blob_cleanup.rs | 647 ++++++++++++++++++ .../backend_runtime/blob_reconciliation.rs | 276 ++++++++ .../native/src/backend_runtime/config.rs | 27 +- .../src/backend_runtime/doc_blob_refs.rs | 416 +++++++++++ .../backend/native/src/backend_runtime/mod.rs | 3 + .../backend_runtime/object_storage/client.rs | 72 +- .../src/backend_runtime/object_storage/mod.rs | 14 + .../backend_runtime/object_storage/types.rs | 14 +- .../sql/runtime_migrations.sql | 72 ++ .../native/src/backend_runtime/tests.rs | 4 + .../native/src/backend_runtime/types.rs | 43 ++ .../server/src/__tests__/auth/job.spec.ts | 27 +- .../server/src/__tests__/doc/cron.spec.ts | 24 +- .../src/__tests__/models/session.spec.ts | 25 - .../storage/blob-upload-cleanup.spec.ts | 33 +- .../src/__tests__/workspace/blobs.e2e.ts | 53 +- packages/backend/server/src/app.module.ts | 2 + .../server/src/base/job/queue/config.ts | 8 + .../backend/server/src/base/job/queue/def.ts | 1 + .../backend/server/src/core/auth/index.ts | 9 +- packages/backend/server/src/core/auth/job.ts | 9 +- .../backend-runtime/__tests__/job.spec.ts | 57 ++ .../__tests__/provider.spec.ts | 43 ++ .../src/core/backend-runtime/blob-job.ts | 368 ++++++++++ .../server/src/core/backend-runtime/index.ts | 18 + .../server/src/core/backend-runtime/job.ts | 58 ++ .../src/core/backend-runtime/provider.ts | 137 ++++ packages/backend/server/src/core/doc/index.ts | 3 +- packages/backend/server/src/core/doc/job.ts | 9 +- .../backend/server/src/core/storage/index.ts | 2 + .../backend/server/src/core/storage/job.ts | 39 +- .../server/src/core/storage/wrappers/blob.ts | 85 +-- .../server/src/models/__tests__/blob.spec.ts | 25 - packages/backend/server/src/models/blob.ts | 26 - packages/backend/server/src/models/history.ts | 17 - packages/backend/server/src/models/session.ts | 13 - packages/backend/server/src/native.ts | 8 + packages/frontend/admin/src/config.json | 4 + 42 files changed, 2494 insertions(+), 264 deletions(-) create mode 100644 packages/backend/native/dts-header.d.ts create mode 100644 packages/backend/native/src/backend_runtime/blob_cleanup.rs create mode 100644 packages/backend/native/src/backend_runtime/blob_reconciliation.rs create mode 100644 packages/backend/native/src/backend_runtime/doc_blob_refs.rs create mode 100644 packages/backend/server/src/core/backend-runtime/__tests__/job.spec.ts create mode 100644 packages/backend/server/src/core/backend-runtime/__tests__/provider.spec.ts create mode 100644 packages/backend/server/src/core/backend-runtime/blob-job.ts create mode 100644 packages/backend/server/src/core/backend-runtime/index.ts create mode 100644 packages/backend/server/src/core/backend-runtime/job.ts create mode 100644 packages/backend/server/src/core/backend-runtime/provider.ts diff --git a/.docker/selfhost/schema.json b/.docker/selfhost/schema.json index c10bef1e3b..a72973aa20 100644 --- a/.docker/selfhost/schema.json +++ b/.docker/selfhost/schema.json @@ -121,6 +121,18 @@ "default": { "concurrency": 1 } + }, + "queues.backendRuntime": { + "type": "object", + "description": "The config for backend runtime job queue\n@default {\"concurrency\":1}", + "properties": { + "concurrency": { + "type": "number" + } + }, + "default": { + "concurrency": 1 + } } } }, diff --git a/packages/backend/native/dts-header.d.ts b/packages/backend/native/dts-header.d.ts new file mode 100644 index 0000000000..fe97a9944f --- /dev/null +++ b/packages/backend/native/dts-header.d.ts @@ -0,0 +1,6 @@ +/* auto-generated by NAPI-RS */ +/* eslint-disable */ + +declare const _default: typeof import('./index') +export default _default + diff --git a/packages/backend/native/index.d.ts b/packages/backend/native/index.d.ts index a3c0e7d045..5d10672190 100644 --- a/packages/backend/native/index.d.ts +++ b/packages/backend/native/index.d.ts @@ -1,13 +1,22 @@ /* auto-generated by NAPI-RS */ /* eslint-disable */ + +declare const _default: typeof import('./index') +export default _default + export declare class BackendRuntime { + planUnreferencedWorkspaceBlobs(workspaceId: string, gracePeriodDays: number, limit: number): Promise + executeBlobCleanupCandidates(runId: string, gracePeriodDays: number, limit: number): Promise completeBlobUpload(workspaceId: string, key: string, expectedSize: number, expectedMime: string): Promise completeFsBlobUpload(root: string, bucket: string, workspaceId: string, key: string, expectedSize: number, expectedMime: string): Promise cleanupExpiredPendingBlobs(cutoffMs: number, limit: number): Promise releaseDeletedBlobs(workspaceId: string, limit: number): Promise + backfillMissingBlobMetadata(workspaceId: string | undefined | null, limit: number): Promise acquireCoordinationLease(key: string, owner: string, ttlMs: number): Promise releaseCoordinationLease(key: string, owner: string, fencingToken: bigint | number): Promise renewCoordinationLease(key: string, owner: string, fencingToken: bigint | number, ttlMs: number): Promise + rebuildDocBlobRefs(workspaceId: string, docId: string): Promise + rebuildWorkspaceDocBlobRefs(workspaceId: string, limit: number): Promise /** * Merge pending doc updates with y-octo and persist the merged snapshot. * @@ -816,6 +825,25 @@ export declare function resolveEntitlementV1(input: ResolveEntitlementInput): Re export declare function runNativeActionRecipePreparedStream(input: ActionRuntimeInput, callback: ((err: Error | null, arg: string) => void)): LlmStreamHandle +export interface RuntimeBlobCleanupExecuteResult { + scannedCandidates: number + deletedObjects: number + deletedMetadata: number + skippedStillReferenced: number + failed: number + workspaceIds: Array +} + +export interface RuntimeBlobCleanupPlanResult { + runId?: string + scannedBlobs: number + candidatesMarked: number + protectedByDocRefs: number + protectedByMetadata: number + protectedByOtherRefs: number + nextCursor?: string +} + export interface RuntimeBlobCleanupResult { scanned: number deleted: number @@ -831,12 +859,32 @@ export interface RuntimeBlobCompleteResult { lastModifiedMs?: number } +export interface RuntimeBlobMetadataBackfillResult { + scannedObjects: number + headedObjects: number + upsertedMetadata: number + skippedExisting: number + skippedWorkspaceMissing: number + failed: number + nextCursor?: string + workspaceIds: Array +} + export interface RuntimeByokLocalLeaseRecord { leaseId: string payload: any expiresAtMs: number } +export interface RuntimeDocBlobRefsResult { + scannedDocs: number + parsedDocs: number + refsWritten: number + refsDeleted: number + failedDocs: number + nextCursor?: string +} + export interface RuntimeDocCompactionResult { leaseAcquired: boolean merged: boolean diff --git a/packages/backend/native/package.json b/packages/backend/native/package.json index e769fd3d9d..2c8dd200fe 100644 --- a/packages/backend/native/package.json +++ b/packages/backend/native/package.json @@ -16,6 +16,7 @@ }, "napi": { "binaryName": "server-native", + "dtsHeaderFile": "dts-header.d.ts", "targets": [ "aarch64-apple-darwin", "aarch64-unknown-linux-gnu", diff --git a/packages/backend/native/src/backend_runtime/blob_cleanup.rs b/packages/backend/native/src/backend_runtime/blob_cleanup.rs new file mode 100644 index 0000000000..203e077b56 --- /dev/null +++ b/packages/backend/native/src/backend_runtime/blob_cleanup.rs @@ -0,0 +1,647 @@ +use chrono::{DateTime, Duration, Utc}; +use napi::Result; +use sqlx::{FromRow, PgPool}; + +use super::{ + BackendRuntime, + error::napi_error, + types::{RuntimeBlobCleanupExecuteResult, RuntimeBlobCleanupPlanResult}, +}; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn blob_cleanup_plan_result_keeps_run_id_for_execute() { + let result = RuntimeBlobCleanupPlanResult { + run_id: Some("00000000-0000-0000-0000-000000000000".to_string()), + scanned_blobs: 1, + candidates_marked: 1, + protected_by_doc_refs: 0, + protected_by_metadata: 0, + protected_by_other_refs: 0, + next_cursor: None, + }; + + assert!(result.run_id.is_some()); + assert_eq!(result.candidates_marked, 1); + } + + #[test] + fn blob_cleanup_execute_result_tracks_skipped_and_failed_counts() { + let result = RuntimeBlobCleanupExecuteResult { + scanned_candidates: 3, + deleted_objects: 1, + deleted_metadata: 1, + skipped_still_referenced: 1, + failed: 1, + workspace_ids: vec!["workspace".to_string()], + }; + + assert_eq!(result.scanned_candidates, 3); + assert_eq!( + result.skipped_still_referenced + result.failed + result.deleted_objects, + 3 + ); + } +} + +#[derive(FromRow)] +struct BlobCandidateRow { + workspace_id: String, + key: String, + size: i32, +} + +#[derive(FromRow)] +struct MarkedCandidateRow { + workspace_id: String, + blob_key: String, +} + +fn push_workspace_once(workspace_ids: &mut Vec, workspace_id: &str) { + if !workspace_ids.iter().any(|id| id == workspace_id) { + workspace_ids.push(workspace_id.to_string()); + } +} + +async fn checkpoint_completed(pool: &PgPool, kind: &str, scope: &str) -> Result { + sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM blob_reconciliation_checkpoints WHERE kind = $1 AND scope = $2 AND status = \ + 'completed')", + ) + .bind(kind) + .bind(scope) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup checkpoint check failed: {err}"))) +} + +async fn projection_is_stale(pool: &PgPool, workspace_id: &str) -> Result { + let checkpoint_fresh = checkpoint_completed(pool, "doc_blob_refs", workspace_id).await?; + let has_stale_rows = sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM doc_blob_refs WHERE workspace_id = $1 AND status <> 'fresh')", + ) + .bind(workspace_id) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup projection freshness check failed: {err}")))?; + Ok(!checkpoint_fresh || has_stale_rows) +} + +async fn stale_projection_workspaces(pool: &PgPool, workspace_id: &str) -> Result> { + if projection_is_stale(pool, workspace_id).await? { + Ok(vec![workspace_id.to_string()]) + } else { + Ok(Vec::new()) + } +} + +async fn metadata_backfill_is_complete(pool: &PgPool, workspace_id: &str) -> Result { + checkpoint_completed(pool, "blob_metadata_backfill", workspace_id).await +} + +async fn has_doc_ref(pool: &PgPool, workspace_id: &str, key: &str) -> Result { + sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM doc_blob_refs WHERE workspace_id = $1 AND blob_key = $2 AND status = 'fresh')", + ) + .bind(workspace_id) + .bind(key) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup doc ref check failed: {err}"))) +} + +async fn has_other_ref(pool: &PgPool, workspace_id: &str, key: &str) -> Result { + let required_ref = sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND avatar_key = $2) + OR EXISTS(SELECT 1 FROM ai_transcript_tasks WHERE workspace_id = $1 AND blob_id = $2) + OR EXISTS(SELECT 1 FROM ai_jobs WHERE workspace_id = $1 AND blob_id = $2) + OR EXISTS( + SELECT 1 + FROM ai_contexts c + JOIN ai_sessions_metadata s ON s.id = c.session_id + WHERE s.workspace_id = $1 + AND jsonb_path_exists( + c.config::jsonb, + '$.** ? (@ == $blobKey)', + jsonb_build_object('blobKey', to_jsonb($2::text)) + ) + ) + "#, + ) + .bind(workspace_id) + .bind(key) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup protected ref check failed: {err}")))?; + if required_ref { + return Ok(true); + } + if table_exists(pool, "ai_workspace_files").await? + && sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM ai_workspace_files WHERE workspace_id = $1 AND blob_id = $2)", + ) + .bind(workspace_id) + .bind(key) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup workspace file ref check failed: {err}")))? + { + return Ok(true); + } + if table_exists(pool, "ai_workspace_blob_embeddings").await? + && sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM ai_workspace_blob_embeddings WHERE workspace_id = $1 AND blob_id = $2)", + ) + .bind(workspace_id) + .bind(key) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup workspace blob embedding ref check failed: {err}")))? + { + return Ok(true); + } + Ok(false) +} + +async fn table_exists(pool: &PgPool, table: &str) -> Result { + sqlx::query_scalar::<_, bool>("SELECT to_regclass($1) IS NOT NULL") + .bind(format!("public.{table}")) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup table existence check failed: {err}"))) +} + +async fn load_completed_blobs( + pool: &PgPool, + workspace_id: &str, + after_key: Option<&str>, + limit: i64, +) -> Result> { + sqlx::query_as::<_, BlobCandidateRow>( + r#" + SELECT workspace_id, key, size + FROM blobs + WHERE workspace_id = $1 + AND status = 'completed' + AND deleted_at IS NULL + AND ($2::text IS NULL OR key > $2) + ORDER BY key ASC + LIMIT $3 + "#, + ) + .bind(workspace_id) + .bind(after_key) + .bind(limit) + .fetch_all(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup load completed blobs failed: {err}"))) +} + +async fn load_plan_cursor(pool: &PgPool, workspace_id: &str) -> Result> { + let row = sqlx::query_as::<_, (String, serde_json::Value)>( + "SELECT status, cursor FROM blob_reconciliation_checkpoints WHERE kind = 'blob_cleanup_plan' AND scope = $1", + ) + .bind(workspace_id) + .fetch_optional(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup plan checkpoint load failed: {err}")))?; + let Some((status, cursor)) = row else { + return Ok(None); + }; + if status == "completed" { + return Ok(None); + } + Ok({ + cursor + .get("lastBlobKey") + .and_then(|value| value.as_str()) + .map(ToString::to_string) + }) +} + +async fn upsert_plan_checkpoint( + pool: &PgPool, + workspace_id: &str, + last_blob_key: Option<&str>, + completed: bool, +) -> Result<()> { + let status = if completed { "completed" } else { "running" }; + sqlx::query( + r#" + INSERT INTO blob_reconciliation_checkpoints + (kind, scope, status, cursor, last_key, completed_at) + VALUES ('blob_cleanup_plan', $1, $2, $3, $4, CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END) + ON CONFLICT (kind, scope) DO UPDATE + SET status = EXCLUDED.status, + cursor = EXCLUDED.cursor, + last_key = COALESCE(EXCLUDED.last_key, blob_reconciliation_checkpoints.last_key), + completed_at = CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END, + updated_at = CURRENT_TIMESTAMP + "#, + ) + .bind(workspace_id) + .bind(status) + .bind(serde_json::json!({ "lastBlobKey": last_blob_key })) + .bind(last_blob_key) + .bind(completed) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup plan checkpoint write failed: {err}")))?; + Ok(()) +} + +async fn create_run(pool: &PgPool, workspace_id: &str) -> Result { + sqlx::query_scalar::<_, String>( + r#" + INSERT INTO blob_reconciliation_runs (kind, mode, status, workspace_id) + VALUES ('blob_cleanup_plan', 'mark_only', 'running', $1) + RETURNING id::text + "#, + ) + .bind(workspace_id) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup create run failed: {err}"))) +} + +async fn finish_run( + pool: &PgPool, + run_id: &str, + workspace_id: &str, + result: &RuntimeBlobCleanupPlanResult, + stale_projection_workspaces: Vec, +) -> Result<()> { + let candidate_bytes = sqlx::query_scalar::<_, Option>( + "SELECT SUM(object_size)::bigint FROM blob_cleanup_candidates WHERE run_id = $1::uuid AND status = 'marked'", + ) + .bind(run_id) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup candidate bytes audit failed: {err}")))? + .unwrap_or(0); + sqlx::query( + r#" + UPDATE blob_reconciliation_runs + SET status = 'finished', + finished_at = CURRENT_TIMESTAMP, + scanned = $2, + changed = $3, + metadata = $4 + WHERE id = $1::uuid + "#, + ) + .bind(run_id) + .bind(result.scanned_blobs as i32) + .bind(result.candidates_marked as i32) + .bind(serde_json::json!({ + "protectedByDocRefs": result.protected_by_doc_refs, + "protectedByMetadata": result.protected_by_metadata, + "protectedByOtherRefs": result.protected_by_other_refs, + "topWorkspaceCandidateBytes": [{ + "workspaceId": workspace_id, + "candidateBytes": candidate_bytes, + }], + "staleOrFailedProjectionWorkspaces": stale_projection_workspaces, + })) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup finish run failed: {err}")))?; + Ok(()) +} + +async fn mark_candidate_status( + pool: &PgPool, + run_id: &str, + workspace_id: &str, + blob_key: &str, + status: &str, + evidence: serde_json::Value, + error: Option<&str>, +) -> Result<()> { + sqlx::query( + r#" + UPDATE blob_cleanup_candidates + SET status = $3, + executed_at = CURRENT_TIMESTAMP, + evidence = evidence || $4, + error = $5 + WHERE workspace_id = $1 AND blob_key = $2 AND run_id = $6::uuid + "#, + ) + .bind(workspace_id) + .bind(blob_key) + .bind(status) + .bind(evidence) + .bind(error) + .bind(run_id) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup mark candidate status failed: {err}")))?; + Ok(()) +} + +async fn finish_execute_run(pool: &PgPool, run_id: &str, result: &RuntimeBlobCleanupExecuteResult) -> Result<()> { + sqlx::query( + r#" + UPDATE blob_reconciliation_runs + SET status = 'finished', + finished_at = CURRENT_TIMESTAMP, + scanned = $2, + changed = $3, + failed = $4, + metadata = metadata || $5 + WHERE id = $1::uuid + "#, + ) + .bind(run_id) + .bind(result.scanned_candidates as i32) + .bind(result.deleted_metadata as i32) + .bind(result.failed as i32) + .bind(serde_json::json!({ + "deletedObjects": result.deleted_objects, + "deletedMetadata": result.deleted_metadata, + "skippedStillReferenced": result.skipped_still_referenced, + "failed": result.failed, + })) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup execute run finish failed: {err}")))?; + Ok(()) +} + +async fn mark_candidate( + pool: &PgPool, + run_id: &str, + row: &BlobCandidateRow, + object_size: i64, + object_last_modified: DateTime, +) -> Result { + let result = sqlx::query( + r#" + INSERT INTO blob_cleanup_candidates + (workspace_id, blob_key, reason, status, object_size, object_last_modified, run_id, evidence) + VALUES ($1, $2, 'unreferenced_completed_blob', 'marked', $3, $4, $5::uuid, $6) + ON CONFLICT (workspace_id, blob_key) DO UPDATE + SET reason = EXCLUDED.reason, + status = 'marked', + object_size = EXCLUDED.object_size, + object_last_modified = EXCLUDED.object_last_modified, + planned_at = CURRENT_TIMESTAMP, + run_id = EXCLUDED.run_id, + evidence = EXCLUDED.evidence, + error = NULL + "#, + ) + .bind(&row.workspace_id) + .bind(&row.key) + .bind(object_size) + .bind(object_last_modified) + .bind(run_id) + .bind(serde_json::json!({ "metadataSize": row.size })) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup mark candidate failed: {err}")))?; + Ok(result.rows_affected() as i64) +} + +async fn load_marked_candidates(pool: &PgPool, run_id: &str, limit: i64) -> Result> { + sqlx::query_as::<_, MarkedCandidateRow>( + r#" + SELECT workspace_id, blob_key + FROM blob_cleanup_candidates + WHERE run_id = $1::uuid AND status IN ('marked', 'failed') + ORDER BY CASE WHEN status = 'marked' THEN 0 ELSE 1 END, planned_at ASC + LIMIT $2 + "#, + ) + .bind(run_id) + .bind(limit) + .fetch_all(pool) + .await + .map_err(|err| napi_error(format!("Blob cleanup load marked candidates failed: {err}"))) +} + +#[napi_derive::napi] +impl BackendRuntime { + #[napi] + pub async fn plan_unreferenced_workspace_blobs( + &self, + workspace_id: String, + grace_period_days: i64, + limit: i64, + ) -> Result { + if limit <= 0 { + return Err(napi_error("blob cleanup plan limit must be positive")); + } + if grace_period_days < 0 { + return Err(napi_error("blob cleanup grace period must be non-negative")); + } + + let pool = self.pool().await?; + let run_id = create_run(&pool, &workspace_id).await?; + let mut result = RuntimeBlobCleanupPlanResult { + run_id: Some(run_id.clone()), + scanned_blobs: 0, + candidates_marked: 0, + protected_by_doc_refs: 0, + protected_by_metadata: 0, + protected_by_other_refs: 0, + next_cursor: None, + }; + + let cursor = load_plan_cursor(&pool, &workspace_id).await?; + let stale_projection_workspaces = stale_projection_workspaces(&pool, &workspace_id).await?; + if !metadata_backfill_is_complete(&pool, &workspace_id).await? || !stale_projection_workspaces.is_empty() { + result.protected_by_metadata = load_completed_blobs(&pool, &workspace_id, cursor.as_deref(), limit) + .await? + .len() as i64; + finish_run(&pool, &run_id, &workspace_id, &result, stale_projection_workspaces).await?; + return Ok(result); + } + + let min_last_modified = Utc::now() - Duration::days(grace_period_days); + let rows = load_completed_blobs(&pool, &workspace_id, cursor.as_deref(), limit).await?; + let has_more = rows.len() == limit as usize; + let mut last_blob_key = None; + for row in rows { + result.scanned_blobs += 1; + last_blob_key = Some(row.key.clone()); + if has_doc_ref(&pool, &row.workspace_id, &row.key).await? { + result.protected_by_doc_refs += 1; + continue; + } + if has_other_ref(&pool, &row.workspace_id, &row.key).await? { + result.protected_by_other_refs += 1; + continue; + } + let object_key = format!("{}/{}", row.workspace_id, row.key); + let Some(metadata) = self.object_storage_head(object_key).await? else { + result.protected_by_metadata += 1; + continue; + }; + let last_modified = DateTime::::from_timestamp_millis(metadata.last_modified_ms) + .ok_or_else(|| napi_error("blob cleanup object last modified is invalid"))?; + if metadata.content_length != row.size as i64 || last_modified > min_last_modified { + result.protected_by_metadata += 1; + continue; + } + result.candidates_marked += mark_candidate(&pool, &run_id, &row, metadata.content_length, last_modified).await?; + } + if has_more { + result.next_cursor = last_blob_key.clone(); + } + upsert_plan_checkpoint(&pool, &workspace_id, last_blob_key.as_deref(), !has_more).await?; + + finish_run(&pool, &run_id, &workspace_id, &result, Vec::new()).await?; + Ok(result) + } + + #[napi] + pub async fn execute_blob_cleanup_candidates( + &self, + run_id: String, + grace_period_days: i64, + limit: i64, + ) -> Result { + if limit <= 0 { + return Err(napi_error("blob cleanup execute limit must be positive")); + } + if grace_period_days < 0 { + return Err(napi_error("blob cleanup grace period must be non-negative")); + } + + let pool = self.pool().await?; + let min_last_modified = Utc::now() - Duration::days(grace_period_days); + let rows = load_marked_candidates(&pool, &run_id, limit).await?; + let mut result = RuntimeBlobCleanupExecuteResult { + scanned_candidates: rows.len() as i64, + deleted_objects: 0, + deleted_metadata: 0, + skipped_still_referenced: 0, + failed: 0, + workspace_ids: Vec::new(), + }; + + for row in rows { + if projection_is_stale(&pool, &row.workspace_id).await? + || has_doc_ref(&pool, &row.workspace_id, &row.blob_key).await? + || has_other_ref(&pool, &row.workspace_id, &row.blob_key).await? + { + result.skipped_still_referenced += 1; + mark_candidate_status( + &pool, + &run_id, + &row.workspace_id, + &row.blob_key, + "skipped", + serde_json::json!({ "skipReason": "referenced_or_projection_stale" }), + None, + ) + .await?; + continue; + } + + let object_key = format!("{}/{}", row.workspace_id, row.blob_key); + let mut object_was_missing = false; + let metadata = match self.object_storage_head(object_key.clone()).await { + Ok(metadata) => metadata, + Err(err) => { + result.failed += 1; + mark_candidate_status( + &pool, + &run_id, + &row.workspace_id, + &row.blob_key, + "failed", + serde_json::json!({ "failure": "object_head_failed" }), + Some(&err.to_string()), + ) + .await?; + continue; + } + }; + if let Some(metadata) = metadata { + let last_modified = DateTime::::from_timestamp_millis(metadata.last_modified_ms) + .ok_or_else(|| napi_error("blob cleanup execute object last modified is invalid"))?; + if last_modified > min_last_modified { + result.skipped_still_referenced += 1; + mark_candidate_status( + &pool, + &run_id, + &row.workspace_id, + &row.blob_key, + "skipped", + serde_json::json!({ "skipReason": "object_inside_grace_period" }), + None, + ) + .await?; + continue; + } + if let Err(err) = self.object_storage_delete(object_key).await { + result.failed += 1; + mark_candidate_status( + &pool, + &run_id, + &row.workspace_id, + &row.blob_key, + "failed", + serde_json::json!({ "failure": "object_delete_failed" }), + Some(&err.to_string()), + ) + .await?; + continue; + } + result.deleted_objects += 1; + } else { + object_was_missing = true; + } + + let deleted_metadata = + match sqlx::query("DELETE FROM blobs WHERE workspace_id = $1 AND key = $2 AND deleted_at IS NULL") + .bind(&row.workspace_id) + .bind(&row.blob_key) + .execute(&pool) + .await + { + Ok(result) => result.rows_affected() as i64, + Err(err) => { + result.failed += 1; + mark_candidate_status( + &pool, + &run_id, + &row.workspace_id, + &row.blob_key, + "failed", + serde_json::json!({ "failure": "metadata_delete_failed" }), + Some(&err.to_string()), + ) + .await?; + continue; + } + }; + result.deleted_metadata += deleted_metadata; + push_workspace_once(&mut result.workspace_ids, &row.workspace_id); + + mark_candidate_status( + &pool, + &run_id, + &row.workspace_id, + &row.blob_key, + "executed", + serde_json::json!({ + "deletedMetadata": deleted_metadata, + "objectMissingBeforeDelete": object_was_missing, + }), + None, + ) + .await?; + } + + finish_execute_run(&pool, &run_id, &result).await?; + Ok(result) + } +} diff --git a/packages/backend/native/src/backend_runtime/blob_reconciliation.rs b/packages/backend/native/src/backend_runtime/blob_reconciliation.rs new file mode 100644 index 0000000000..93031e6610 --- /dev/null +++ b/packages/backend/native/src/backend_runtime/blob_reconciliation.rs @@ -0,0 +1,276 @@ +use chrono::{DateTime, Utc}; +use napi::Result; +use sqlx::{FromRow, PgPool}; + +use super::{ + BackendRuntime, + error::napi_error, + types::{RuntimeBlobMetadataBackfillResult, RuntimeObjectMetadata}, +}; + +async fn workspace_exists(pool: &PgPool, workspace_id: &str) -> Result { + sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1)") + .bind(workspace_id) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob metadata backfill workspace check failed: {err}"))) +} + +async fn blob_exists(pool: &PgPool, workspace_id: &str, key: &str) -> Result { + sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM blobs WHERE workspace_id = $1 AND key = $2)") + .bind(workspace_id) + .bind(key) + .fetch_one(pool) + .await + .map_err(|err| napi_error(format!("Blob metadata backfill blob check failed: {err}"))) +} + +async fn upsert_blob_metadata( + pool: &PgPool, + workspace_id: &str, + key: &str, + metadata: RuntimeObjectMetadata, +) -> Result { + let last_modified = DateTime::::from_timestamp_millis(metadata.last_modified_ms) + .ok_or_else(|| napi_error("Blob metadata backfill object last modified is invalid"))?; + let result = sqlx::query( + r#" + INSERT INTO blobs (workspace_id, key, size, mime, status, upload_id, created_at, deleted_at) + VALUES ($1, $2, $3, $4, 'completed', NULL, $5, NULL) + ON CONFLICT (workspace_id, key) DO UPDATE + SET size = EXCLUDED.size, + mime = EXCLUDED.mime, + status = 'completed', + upload_id = NULL, + deleted_at = NULL + WHERE blobs.deleted_at IS NULL + "#, + ) + .bind(workspace_id) + .bind(key) + .bind(metadata.content_length as i32) + .bind(metadata.content_type) + .bind(last_modified) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Blob metadata backfill upsert failed: {err}")))?; + + Ok(result.rows_affected() as i64) +} + +fn split_workspace_blob_key(full_key: &str) -> Option<(&str, &str)> { + let (workspace_id, key) = full_key.split_once('/')?; + if workspace_id.is_empty() || key.is_empty() || key.contains('/') { + return None; + } + Some((workspace_id, key)) +} + +fn checkpoint_scope(workspace_id: Option<&str>) -> String { + workspace_id.unwrap_or("__all__").to_string() +} + +#[derive(FromRow)] +struct BackfillCheckpoint { + last_key: Option, + cursor: serde_json::Value, +} + +impl BackfillCheckpoint { + fn continuation_token(&self) -> Option { + self + .cursor + .get("continuationToken") + .and_then(|value| value.as_str()) + .map(ToString::to_string) + } +} + +async fn load_checkpoint(pool: &PgPool, scope: &str) -> Result> { + sqlx::query_as::<_, BackfillCheckpoint>( + "SELECT last_key, cursor FROM blob_reconciliation_checkpoints WHERE kind = 'blob_metadata_backfill' AND scope = $1", + ) + .bind(scope) + .fetch_optional(pool) + .await + .map_err(|err| napi_error(format!("Blob metadata backfill checkpoint load failed: {err}"))) +} + +async fn upsert_checkpoint( + pool: &PgPool, + scope: &str, + last_key: Option<&str>, + continuation_token: Option<&str>, + completed: bool, +) -> Result<()> { + let status = if completed { "completed" } else { "running" }; + sqlx::query( + r#" + INSERT INTO blob_reconciliation_checkpoints + (kind, scope, status, cursor, last_key, completed_at, metadata) + VALUES ('blob_metadata_backfill', $1, $2, $3, $4, CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END, $6) + ON CONFLICT (kind, scope) DO UPDATE + SET status = EXCLUDED.status, + cursor = EXCLUDED.cursor, + last_key = COALESCE(EXCLUDED.last_key, blob_reconciliation_checkpoints.last_key), + completed_at = CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END, + updated_at = CURRENT_TIMESTAMP, + metadata = EXCLUDED.metadata + "#, + ) + .bind(scope) + .bind(status) + .bind(serde_json::json!({ + "lastKey": last_key, + "continuationToken": continuation_token, + })) + .bind(last_key) + .bind(completed) + .bind(serde_json::json!({ + "quotaReportingReconciliationRequired": true, + })) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Blob metadata backfill checkpoint write failed: {err}")))?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn blob_metadata_backfill_splits_workspace_blob_keys() { + assert_eq!( + split_workspace_blob_key("workspace/blob-key"), + Some(("workspace", "blob-key")) + ); + assert_eq!(split_workspace_blob_key("workspace/nested/blob-key"), None); + assert_eq!(split_workspace_blob_key("workspace/"), None); + assert_eq!(split_workspace_blob_key("blob-key"), None); + } + + #[test] + fn blob_metadata_backfill_checkpoint_scope_is_explicit() { + assert_eq!(checkpoint_scope(Some("workspace")), "workspace"); + assert_eq!(checkpoint_scope(None), "__all__"); + } +} + +fn push_workspace_once(workspace_ids: &mut Vec, workspace_id: &str) { + if !workspace_ids.iter().any(|id| id == workspace_id) { + workspace_ids.push(workspace_id.to_string()); + } +} + +fn checked_list_page_limit(limit: i64) -> Result { + i32::try_from(limit).map_err(|_| napi_error("blob metadata backfill limit exceeds i32::MAX")) +} + +#[napi_derive::napi] +impl BackendRuntime { + #[napi] + pub async fn backfill_missing_blob_metadata( + &self, + workspace_id: Option, + limit: i64, + ) -> Result { + if limit <= 0 { + return Err(napi_error("blob metadata backfill limit must be positive")); + } + let page_limit = checked_list_page_limit(limit)?; + + let pool = self.pool().await?; + let prefix = workspace_id.as_ref().map(|id| format!("{id}/")); + let scope = checkpoint_scope(workspace_id.as_deref()); + let checkpoint = load_checkpoint(&pool, &scope).await?; + let page = self + .object_storage_list_page( + prefix, + checkpoint.as_ref().and_then(BackfillCheckpoint::continuation_token), + checkpoint.as_ref().and_then(|checkpoint| checkpoint.last_key.clone()), + page_limit, + ) + .await?; + let has_more = page.next_continuation_token.is_some(); + + let mut result = RuntimeBlobMetadataBackfillResult { + scanned_objects: 0, + headed_objects: 0, + upserted_metadata: 0, + skipped_existing: 0, + skipped_workspace_missing: 0, + failed: 0, + next_cursor: None, + workspace_ids: Vec::new(), + }; + + let mut last_scanned_key = None; + for object in &page.entries { + result.scanned_objects += 1; + last_scanned_key = Some(object.key.clone()); + let Some((object_workspace_id, key)) = split_workspace_blob_key(&object.key) else { + result.failed += 1; + continue; + }; + if workspace_id.as_deref().is_some_and(|id| id != object_workspace_id) { + result.failed += 1; + continue; + } + if !workspace_exists(&pool, object_workspace_id).await? { + result.skipped_workspace_missing += 1; + continue; + } + if blob_exists(&pool, object_workspace_id, key).await? { + result.skipped_existing += 1; + continue; + } + result.headed_objects += 1; + let Some(metadata) = self.object_storage_head(object.key.clone()).await? else { + result.failed += 1; + continue; + }; + let affected = upsert_blob_metadata(&pool, object_workspace_id, key, metadata).await?; + if affected > 0 { + result.upserted_metadata += affected; + push_workspace_once(&mut result.workspace_ids, object_workspace_id); + } + } + if has_more { + result.next_cursor = last_scanned_key.clone(); + } + upsert_checkpoint( + &pool, + &scope, + last_scanned_key.as_deref(), + page.next_continuation_token.as_deref(), + !has_more, + ) + .await?; + + sqlx::query( + r#" + INSERT INTO blob_reconciliation_runs + (kind, mode, status, workspace_id, finished_at, scanned, changed, failed, metadata) + VALUES ('blob_metadata_backfill', 'execute', 'finished', $1, CURRENT_TIMESTAMP, $2, $3, $4, $5) + "#, + ) + .bind(workspace_id) + .bind(result.scanned_objects as i32) + .bind(result.upserted_metadata as i32) + .bind(result.failed as i32) + .bind(serde_json::json!({ + "headedObjects": result.headed_objects, + "skippedExisting": result.skipped_existing, + "skippedWorkspaceMissing": result.skipped_workspace_missing, + "checkpointScope": scope, + "nextCursor": result.next_cursor, + "quotaReportingReconciliationRequired": true, + })) + .execute(&pool) + .await + .map_err(|err| napi_error(format!("Blob metadata backfill run record failed: {err}")))?; + + Ok(result) + } +} diff --git a/packages/backend/native/src/backend_runtime/config.rs b/packages/backend/native/src/backend_runtime/config.rs index 9543f72c8e..5e6b77d3f2 100644 --- a/packages/backend/native/src/backend_runtime/config.rs +++ b/packages/backend/native/src/backend_runtime/config.rs @@ -20,8 +20,9 @@ pub(super) struct RuntimeConfig { impl RuntimeConfig { pub(super) fn from_config_files() -> Result { - let database_url = - database_url_from_config_files()?.unwrap_or_else(|| "postgresql://localhost:5432/affine".to_string()); + let database_url = database_url_from_env() + .or(database_url_from_config_files()?) + .unwrap_or_else(|| "postgresql://localhost:5432/affine".to_string()); let storage = ObjectStorageConfig::from_config_files()?; Ok(Self { database_url, storage }) } @@ -39,6 +40,14 @@ struct DbConfigFile { datasource_url: Option, } +fn database_url_from_env() -> Option { + env::var("DATABASE_URL").ok().and_then(non_empty_string) +} + +fn non_empty_string(value: String) -> Option { + if value.trim().is_empty() { None } else { Some(value) } +} + fn database_url_from_config_files() -> Result> { let mut database_url = None; for path in config_json_paths() { @@ -49,9 +58,7 @@ fn database_url_from_config_files() -> Result> { .map_err(|err| napi_error(format!("failed to read config file {}: {err}", path.display())))?; let config: AppConfigFile = serde_json::from_str(&raw) .map_err(|err| napi_error(format!("failed to parse config file {}: {err}", path.display())))?; - if let Some(next) = config.db.and_then(|db| db.datasource_url) - && !next.trim().is_empty() - { + if let Some(next) = config.db.and_then(|db| db.datasource_url).and_then(non_empty_string) { database_url = Some(next); } } @@ -125,4 +132,14 @@ mod tests { .all(|path| !path.to_string_lossy().contains("packages/backend/server")) ); } + + #[test] + fn blank_database_urls_are_ignored() { + assert_eq!(non_empty_string("".to_string()), None); + assert_eq!(non_empty_string(" ".to_string()), None); + assert_eq!( + non_empty_string("postgresql://affine:affine@localhost:5432/affine".to_string()), + Some("postgresql://affine:affine@localhost:5432/affine".to_string()) + ); + } } diff --git a/packages/backend/native/src/backend_runtime/doc_blob_refs.rs b/packages/backend/native/src/backend_runtime/doc_blob_refs.rs new file mode 100644 index 0000000000..7e77118aa2 --- /dev/null +++ b/packages/backend/native/src/backend_runtime/doc_blob_refs.rs @@ -0,0 +1,416 @@ +use affine_common::doc_parser; +use chrono::{DateTime, Utc}; +use napi::Result; +use sqlx::{FromRow, PgPool}; +use y_octo::Doc; + +use super::{BackendRuntime, error::napi_error, types::RuntimeDocBlobRefsResult}; + +const PARSER_VERSION: i32 = 1; + +#[derive(FromRow)] +struct SnapshotRow { + workspace_id: String, + doc_id: String, + blob: Vec, + updated_at: DateTime, +} + +#[derive(FromRow)] +struct UpdateRow { + blob: Vec, + created_at: DateTime, +} + +struct ExtractedRef { + blob_key: String, + block_id: String, + flavour: String, +} + +async fn load_snapshot(pool: &PgPool, workspace_id: &str, doc_id: &str) -> Result> { + sqlx::query_as::<_, SnapshotRow>( + r#" + SELECT workspace_id, guid AS doc_id, blob, updated_at + FROM snapshots + WHERE workspace_id = $1 AND guid = $2 + "#, + ) + .bind(workspace_id) + .bind(doc_id) + .fetch_optional(pool) + .await + .map_err(|err| napi_error(format!("Doc blob refs load snapshot failed: {err}"))) +} + +async fn load_updates(pool: &PgPool, workspace_id: &str, doc_id: &str) -> Result> { + sqlx::query_as::<_, UpdateRow>( + r#" + SELECT blob, created_at + FROM updates + WHERE workspace_id = $1 AND guid = $2 + ORDER BY created_at ASC + "#, + ) + .bind(workspace_id) + .bind(doc_id) + .fetch_all(pool) + .await + .map_err(|err| napi_error(format!("Doc blob refs load updates failed: {err}"))) +} + +fn apply_doc_updates(updates: impl IntoIterator>) -> Result> { + let mut doc = Doc::default(); + for update in updates { + doc + .apply_update_from_binary_v1(&update) + .map_err(|err| napi_error(format!("Doc blob refs merge failed: {err}")))?; + } + doc + .encode_update_v1() + .map_err(|err| napi_error(format!("Doc blob refs encode failed: {err}"))) +} + +async fn load_current_doc(pool: &PgPool, workspace_id: &str, doc_id: &str) -> Result> { + let snapshot = load_snapshot(pool, workspace_id, doc_id).await?; + let updates = load_updates(pool, workspace_id, doc_id).await?; + if snapshot.is_none() && updates.is_empty() { + return Ok(None); + } + + let mut merge_inputs = Vec::with_capacity(updates.len() + usize::from(snapshot.is_some())); + let mut updated_at = snapshot + .as_ref() + .map(|snapshot| snapshot.updated_at) + .unwrap_or_else(Utc::now); + if let Some(snapshot) = snapshot { + merge_inputs.push(snapshot.blob); + } + for update in updates { + updated_at = update.created_at; + merge_inputs.push(update.blob); + } + + Ok(Some(SnapshotRow { + workspace_id: workspace_id.to_string(), + doc_id: doc_id.to_string(), + blob: apply_doc_updates(merge_inputs)?, + updated_at, + })) +} + +async fn load_workspace_doc_ids(pool: &PgPool, workspace_id: &str) -> Result> { + let Some(root) = load_current_doc(pool, workspace_id, workspace_id).await? else { + return Ok(Vec::new()); + }; + let ids = doc_parser::get_doc_ids_from_binary(root.blob, false) + .map_err(|err| napi_error(format!("Doc blob refs root doc parse failed: {err}")))?; + let mut ids = ids; + ids.sort(); + Ok(ids) +} + +async fn upsert_projection_checkpoint( + pool: &PgPool, + workspace_id: &str, + result: &RuntimeDocBlobRefsResult, +) -> Result<()> { + let completed = result.next_cursor.is_none(); + let status = if completed && result.failed_docs == 0 { + "completed" + } else if result.failed_docs > 0 { + "failed" + } else { + "running" + }; + sqlx::query( + r#" + INSERT INTO blob_reconciliation_checkpoints + (kind, scope, status, cursor, completed_at, metadata) + VALUES ('doc_blob_refs', $1, $2, $3, CASE WHEN $4 THEN CURRENT_TIMESTAMP ELSE NULL END, $5) + ON CONFLICT (kind, scope) DO UPDATE + SET status = EXCLUDED.status, + cursor = EXCLUDED.cursor, + completed_at = CASE WHEN $4 THEN CURRENT_TIMESTAMP ELSE NULL END, + updated_at = CURRENT_TIMESTAMP, + metadata = EXCLUDED.metadata + "#, + ) + .bind(workspace_id) + .bind(status) + .bind(serde_json::json!({ "lastDocId": result.next_cursor })) + .bind(completed && result.failed_docs == 0) + .bind(serde_json::json!({ + "parserVersion": PARSER_VERSION, + })) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Doc blob refs checkpoint write failed: {err}")))?; + Ok(()) +} + +async fn upsert_projection_failure_checkpoint(pool: &PgPool, workspace_id: &str, error: &str) -> Result<()> { + sqlx::query( + r#" + INSERT INTO blob_reconciliation_checkpoints + (kind, scope, status, cursor, completed_at, metadata) + VALUES ('doc_blob_refs', $1, 'failed', '{}', NULL, $2) + ON CONFLICT (kind, scope) DO UPDATE + SET status = 'failed', + cursor = '{}', + completed_at = NULL, + updated_at = CURRENT_TIMESTAMP, + metadata = EXCLUDED.metadata + "#, + ) + .bind(workspace_id) + .bind(serde_json::json!({ + "parserVersion": PARSER_VERSION, + "error": error, + })) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Doc blob refs failure checkpoint write failed: {err}")))?; + Ok(()) +} + +async fn load_projection_cursor(pool: &PgPool, workspace_id: &str) -> Result> { + let cursor = sqlx::query_scalar::<_, serde_json::Value>( + "SELECT cursor FROM blob_reconciliation_checkpoints WHERE kind = 'doc_blob_refs' AND scope = $1", + ) + .bind(workspace_id) + .fetch_optional(pool) + .await + .map_err(|err| napi_error(format!("Doc blob refs checkpoint load failed: {err}")))?; + Ok(cursor.and_then(|cursor| { + cursor + .get("lastDocId") + .and_then(|value| value.as_str()) + .map(ToString::to_string) + })) +} + +async fn purge_removed_doc_refs(pool: &PgPool, workspace_id: &str, current_doc_ids: &[String]) -> Result { + let result = sqlx::query( + r#" + DELETE FROM doc_blob_refs + WHERE workspace_id = $1 + AND NOT (doc_id = ANY($2)) + "#, + ) + .bind(workspace_id) + .bind(current_doc_ids) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Doc blob refs purge removed docs failed: {err}")))?; + Ok(result.rows_affected() as i64) +} + +fn extract_refs(snapshot: &SnapshotRow) -> Result> { + let parsed = doc_parser::parse_doc_from_binary(snapshot.blob.clone(), snapshot.doc_id.clone()) + .map_err(|err| napi_error(format!("Doc blob refs parse failed: {err}")))?; + let mut refs = Vec::new(); + for block in parsed.blocks { + let Some(blob_keys) = block.blob else { + continue; + }; + for blob_key in blob_keys { + refs.push(ExtractedRef { + blob_key, + block_id: block.block_id.clone(), + flavour: block.flavour.clone(), + }); + } + } + Ok(refs) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn doc_blob_refs_extracts_image_refs() { + let doc_id = "doc-blob-ref-test".to_string(); + let blob = + doc_parser::build_full_doc("Doc", "![Alt](blob://image-blob-key)", &doc_id).expect("doc fixture should build"); + let snapshot = SnapshotRow { + workspace_id: "workspace".to_string(), + doc_id, + blob, + updated_at: Utc::now(), + }; + + let refs = extract_refs(&snapshot).expect("refs should parse"); + + assert!( + refs + .iter() + .any(|reference| { reference.blob_key == "image-blob-key" && reference.flavour == "affine:image" }) + ); + } +} + +async fn replace_doc_refs(pool: &PgPool, snapshot: &SnapshotRow, refs: Vec) -> Result<(i64, i64)> { + let mut tx = pool + .begin() + .await + .map_err(|err| napi_error(format!("Doc blob refs transaction failed: {err}")))?; + + let deleted = sqlx::query("DELETE FROM doc_blob_refs WHERE workspace_id = $1 AND doc_id = $2") + .bind(&snapshot.workspace_id) + .bind(&snapshot.doc_id) + .execute(&mut *tx) + .await + .map_err(|err| napi_error(format!("Doc blob refs delete failed: {err}")))? + .rows_affected() as i64; + + let mut written = 0; + for reference in refs { + let affected = sqlx::query( + r#" + INSERT INTO doc_blob_refs + (workspace_id, doc_id, blob_key, block_id, flavour, snapshot_updated_at, parser_version, status) + VALUES ($1, $2, $3, $4, $5, $6, $7, 'fresh') + ON CONFLICT (workspace_id, doc_id, blob_key, block_id) DO UPDATE + SET flavour = EXCLUDED.flavour, + snapshot_updated_at = EXCLUDED.snapshot_updated_at, + indexed_at = CURRENT_TIMESTAMP, + parser_version = EXCLUDED.parser_version, + status = 'fresh', + error = NULL + "#, + ) + .bind(&snapshot.workspace_id) + .bind(&snapshot.doc_id) + .bind(reference.blob_key) + .bind(reference.block_id) + .bind(reference.flavour) + .bind(snapshot.updated_at) + .bind(PARSER_VERSION) + .execute(&mut *tx) + .await + .map_err(|err| napi_error(format!("Doc blob refs insert failed: {err}")))? + .rows_affected() as i64; + written += affected; + } + + tx.commit() + .await + .map_err(|err| napi_error(format!("Doc blob refs transaction commit failed: {err}")))?; + Ok((written, deleted)) +} + +async fn mark_doc_failed(pool: &PgPool, workspace_id: &str, doc_id: &str, error: &str) -> Result<()> { + sqlx::query( + r#" + INSERT INTO doc_blob_refs + (workspace_id, doc_id, blob_key, block_id, flavour, snapshot_updated_at, parser_version, status, error) + VALUES ($1, $2, '__parse_failed__', '__parse_failed__', '__parse_failed__', CURRENT_TIMESTAMP, $3, 'failed', $4) + ON CONFLICT (workspace_id, doc_id, blob_key, block_id) DO UPDATE + SET indexed_at = CURRENT_TIMESTAMP, + status = 'failed', + error = EXCLUDED.error + "#, + ) + .bind(workspace_id) + .bind(doc_id) + .bind(PARSER_VERSION) + .bind(error) + .execute(pool) + .await + .map_err(|err| napi_error(format!("Doc blob refs mark failure failed: {err}")))?; + Ok(()) +} + +#[napi_derive::napi] +impl BackendRuntime { + #[napi] + pub async fn rebuild_doc_blob_refs(&self, workspace_id: String, doc_id: String) -> Result { + let pool = self.pool().await?; + let mut result = RuntimeDocBlobRefsResult { + scanned_docs: 1, + parsed_docs: 0, + refs_written: 0, + refs_deleted: 0, + failed_docs: 0, + next_cursor: None, + }; + + let Some(snapshot) = load_current_doc(&pool, &workspace_id, &doc_id).await? else { + result.failed_docs = 1; + mark_doc_failed(&pool, &workspace_id, &doc_id, "snapshot_missing").await?; + return Ok(result); + }; + + match extract_refs(&snapshot) { + Ok(refs) => { + let (written, deleted) = replace_doc_refs(&pool, &snapshot, refs).await?; + result.parsed_docs = 1; + result.refs_written = written; + result.refs_deleted = deleted; + } + Err(err) => { + result.failed_docs = 1; + mark_doc_failed(&pool, &workspace_id, &doc_id, &err.to_string()).await?; + } + } + + Ok(result) + } + + #[napi] + pub async fn rebuild_workspace_doc_blob_refs( + &self, + workspace_id: String, + limit: i64, + ) -> Result { + if limit <= 0 { + return Err(napi_error("doc blob refs rebuild limit must be positive")); + } + + let pool = self.pool().await?; + let doc_ids = match load_workspace_doc_ids(&pool, &workspace_id).await { + Ok(doc_ids) => doc_ids, + Err(err) => { + upsert_projection_failure_checkpoint(&pool, &workspace_id, &err.to_string()).await?; + return Err(err); + } + }; + let cursor = load_projection_cursor(&pool, &workspace_id).await?; + let current_doc_ids = doc_ids.clone(); + let doc_ids = doc_ids + .into_iter() + .filter(|doc_id| cursor.as_ref().is_none_or(|cursor| doc_id > cursor)) + .collect::>(); + let has_more = doc_ids.len() > limit as usize; + let mut total = RuntimeDocBlobRefsResult { + scanned_docs: 0, + parsed_docs: 0, + refs_written: 0, + refs_deleted: 0, + failed_docs: 0, + next_cursor: None, + }; + + let mut last_doc_id = None; + for doc_id in doc_ids.into_iter().take(limit as usize) { + last_doc_id = Some(doc_id.clone()); + let result = self.rebuild_doc_blob_refs(workspace_id.clone(), doc_id).await?; + total.scanned_docs += result.scanned_docs; + total.parsed_docs += result.parsed_docs; + total.refs_written += result.refs_written; + total.refs_deleted += result.refs_deleted; + total.failed_docs += result.failed_docs; + } + if has_more { + total.next_cursor = last_doc_id; + } else if total.failed_docs == 0 { + total.refs_deleted += purge_removed_doc_refs(&pool, &workspace_id, ¤t_doc_ids).await?; + } + + upsert_projection_checkpoint(&pool, &workspace_id, &total).await?; + + Ok(total) + } +} diff --git a/packages/backend/native/src/backend_runtime/mod.rs b/packages/backend/native/src/backend_runtime/mod.rs index ec547f0916..e249b7fc73 100644 --- a/packages/backend/native/src/backend_runtime/mod.rs +++ b/packages/backend/native/src/backend_runtime/mod.rs @@ -1,8 +1,11 @@ +mod blob_cleanup; mod blob_complete; mod blob_reclaimer; +mod blob_reconciliation; mod config; mod constants; mod coordination_lease; +mod doc_blob_refs; mod doc_compactor; mod doc_storage; mod error; diff --git a/packages/backend/native/src/backend_runtime/object_storage/client.rs b/packages/backend/native/src/backend_runtime/object_storage/client.rs index a92c3e15f7..300c19b852 100644 --- a/packages/backend/native/src/backend_runtime/object_storage/client.rs +++ b/packages/backend/native/src/backend_runtime/object_storage/client.rs @@ -9,8 +9,8 @@ use aws_sdk_s3::{ use napi::Result; use super::types::{ - MultipartUploadInitResult, MultipartUploadPart, ObjectGetResult, ObjectListEntry, ObjectMetadata, ObjectPutMetadata, - PresignedObjectRequest, completed_multipart_parts, trim_etag, + MultipartUploadInitResult, MultipartUploadPart, ObjectGetResult, ObjectListEntry, ObjectListPage, ObjectMetadata, + ObjectPutMetadata, PresignedObjectRequest, completed_multipart_parts, trim_etag, }; use crate::backend_runtime::error::napi_error; @@ -233,14 +233,11 @@ impl ObjectStorageClient { } pub(super) async fn head(&self, key: &str) -> Result> { - let result = self - .client - .head_object() - .bucket(&self.bucket) - .key(key) - .send() - .await - .map_err(|err| napi_error(format!("ObjectStorage head failed for {key}: {err:?}")))?; + let result = match self.client.head_object().bucket(&self.bucket).key(key).send().await { + Ok(result) => result, + Err(err) if is_not_found_error(&err) => return Ok(None), + Err(err) => return Err(napi_error(format!("ObjectStorage head failed for {key}: {err:?}"))), + }; Ok(Some(ObjectMetadata { content_type: result @@ -253,14 +250,11 @@ impl ObjectStorageClient { } pub(super) async fn get(&self, key: &str) -> Result> { - let result = self - .client - .get_object() - .bucket(&self.bucket) - .key(key) - .send() - .await - .map_err(|err| napi_error(format!("ObjectStorage get failed for {key}: {err:?}")))?; + let result = match self.client.get_object().bucket(&self.bucket).key(key).send().await { + Ok(result) => result, + Err(err) if is_not_found_error(&err) => return Ok(None), + Err(err) => return Err(napi_error(format!("ObjectStorage get failed for {key}: {err:?}"))), + }; let metadata = ObjectMetadata { content_type: result .content_type @@ -314,6 +308,43 @@ impl ObjectStorageClient { Ok(entries) } + pub(super) async fn list_page( + &self, + prefix: Option, + continuation_token: Option, + start_after: Option, + max_keys: i32, + ) -> Result { + let mut request = self.client.list_objects_v2().bucket(&self.bucket).max_keys(max_keys); + if let Some(prefix) = prefix { + request = request.prefix(prefix); + } + if let Some(continuation_token) = continuation_token { + request = request.continuation_token(continuation_token); + } else if let Some(start_after) = start_after { + request = request.start_after(start_after); + } + let result = request + .send() + .await + .map_err(|err| napi_error(format!("ObjectStorage list page failed: {err:?}")))?; + + Ok(ObjectListPage { + entries: result + .contents() + .iter() + .filter_map(|object| { + Some(ObjectListEntry { + key: object.key.as_ref()?.clone(), + content_length: object.size.unwrap_or(0), + last_modified_ms: optional_datetime_ms(object.last_modified), + }) + }) + .collect(), + next_continuation_token: result.next_continuation_token, + }) + } + pub(super) async fn delete(&self, key: &str) -> Result<()> { self .client @@ -327,6 +358,11 @@ impl ObjectStorageClient { } } +fn is_not_found_error(error: &impl std::fmt::Debug) -> bool { + let message = format!("{error:?}"); + message.contains("NoSuchKey") || (message.contains("NotFound") && !message.contains("NoSuchBucket")) +} + fn expires_at_ms(expires_in_seconds: u64) -> Result { let expires_at = SystemTime::now() .checked_add(Duration::from_secs(expires_in_seconds)) diff --git a/packages/backend/native/src/backend_runtime/object_storage/mod.rs b/packages/backend/native/src/backend_runtime/object_storage/mod.rs index 9016f5fa1b..2962ff1c90 100644 --- a/packages/backend/native/src/backend_runtime/object_storage/mod.rs +++ b/packages/backend/native/src/backend_runtime/object_storage/mod.rs @@ -7,6 +7,7 @@ mod types; use client::ObjectStorageClient; pub(super) use config::ObjectStorageConfig; use napi::{Result, bindgen_prelude::Buffer}; +use types::ObjectListPage; pub(super) use types::StorageProviderConfig; use super::{ @@ -32,6 +33,19 @@ impl BackendRuntime { self.object_storage_client()?.delete(key).await } + pub(super) async fn object_storage_list_page( + &self, + prefix: Option, + continuation_token: Option, + start_after: Option, + max_keys: i32, + ) -> Result { + self + .object_storage_client()? + .list_page(prefix, continuation_token, start_after, max_keys) + .await + } + pub(super) async fn object_storage_abort_upload(&self, key: &str, upload_id: &str) -> Result<()> { self .object_storage_client()? diff --git a/packages/backend/native/src/backend_runtime/object_storage/types.rs b/packages/backend/native/src/backend_runtime/object_storage/types.rs index 0bf3d02a35..f16715f39b 100644 --- a/packages/backend/native/src/backend_runtime/object_storage/types.rs +++ b/packages/backend/native/src/backend_runtime/object_storage/types.rs @@ -28,10 +28,16 @@ pub(super) struct ObjectMetadata { } #[derive(Clone, Debug, PartialEq)] -pub(super) struct ObjectListEntry { - pub(super) key: String, - pub(super) content_length: i64, - pub(super) last_modified_ms: i64, +pub(in crate::backend_runtime) struct ObjectListEntry { + pub(in crate::backend_runtime) key: String, + pub(in crate::backend_runtime) content_length: i64, + pub(in crate::backend_runtime) last_modified_ms: i64, +} + +#[derive(Clone, Debug, PartialEq)] +pub(in crate::backend_runtime) struct ObjectListPage { + pub(in crate::backend_runtime) entries: Vec, + pub(in crate::backend_runtime) next_continuation_token: Option, } #[derive(Clone, Debug, PartialEq)] diff --git a/packages/backend/native/src/backend_runtime/sql/runtime_migrations.sql b/packages/backend/native/src/backend_runtime/sql/runtime_migrations.sql index 4912f1026d..859ff753d9 100644 --- a/packages/backend/native/src/backend_runtime/sql/runtime_migrations.sql +++ b/packages/backend/native/src/backend_runtime/sql/runtime_migrations.sql @@ -38,3 +38,75 @@ CREATE TABLE IF NOT EXISTS runtime_leases ( CREATE INDEX IF NOT EXISTS runtime_leases_expires_at_idx ON runtime_leases (expires_at); + +CREATE TABLE IF NOT EXISTS blob_reconciliation_runs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + kind TEXT NOT NULL, + mode TEXT NOT NULL, + status TEXT NOT NULL, + workspace_id TEXT, + started_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + finished_at TIMESTAMPTZ(3), + cursor JSONB NOT NULL DEFAULT '{}', + scanned INTEGER NOT NULL DEFAULT 0, + changed INTEGER NOT NULL DEFAULT 0, + failed INTEGER NOT NULL DEFAULT 0, + metadata JSONB NOT NULL DEFAULT '{}' +); + +CREATE INDEX IF NOT EXISTS blob_reconciliation_runs_workspace_idx + ON blob_reconciliation_runs (workspace_id, started_at DESC); + +CREATE TABLE IF NOT EXISTS blob_reconciliation_checkpoints ( + kind TEXT NOT NULL, + scope TEXT NOT NULL, + status TEXT NOT NULL, + cursor JSONB NOT NULL DEFAULT '{}', + last_key TEXT, + last_sid INTEGER, + completed_at TIMESTAMPTZ(3), + updated_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata JSONB NOT NULL DEFAULT '{}', + PRIMARY KEY (kind, scope) +); + +CREATE INDEX IF NOT EXISTS blob_reconciliation_checkpoints_status_idx + ON blob_reconciliation_checkpoints (kind, status, updated_at DESC); + +CREATE TABLE IF NOT EXISTS doc_blob_refs ( + workspace_id TEXT NOT NULL, + doc_id TEXT NOT NULL, + blob_key TEXT NOT NULL, + block_id TEXT NOT NULL, + flavour TEXT NOT NULL, + snapshot_updated_at TIMESTAMPTZ(3) NOT NULL, + indexed_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + parser_version INTEGER NOT NULL, + status TEXT NOT NULL DEFAULT 'fresh', + error TEXT, + PRIMARY KEY (workspace_id, doc_id, blob_key, block_id) +); + +CREATE INDEX IF NOT EXISTS doc_blob_refs_workspace_blob_idx + ON doc_blob_refs (workspace_id, blob_key); + +CREATE INDEX IF NOT EXISTS doc_blob_refs_workspace_status_idx + ON doc_blob_refs (workspace_id, status); + +CREATE TABLE IF NOT EXISTS blob_cleanup_candidates ( + workspace_id TEXT NOT NULL, + blob_key TEXT NOT NULL, + reason TEXT NOT NULL, + status TEXT NOT NULL, + object_size BIGINT NOT NULL, + object_last_modified TIMESTAMPTZ(3), + planned_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + executed_at TIMESTAMPTZ(3), + run_id UUID NOT NULL, + evidence JSONB NOT NULL DEFAULT '{}', + error TEXT, + PRIMARY KEY (workspace_id, blob_key) +); + +CREATE INDEX IF NOT EXISTS blob_cleanup_candidates_run_idx + ON blob_cleanup_candidates (run_id, status); diff --git a/packages/backend/native/src/backend_runtime/tests.rs b/packages/backend/native/src/backend_runtime/tests.rs index a734259ab9..b29ad9ef8b 100644 --- a/packages/backend/native/src/backend_runtime/tests.rs +++ b/packages/backend/native/src/backend_runtime/tests.rs @@ -14,6 +14,10 @@ fn migrations_include_runtime_tables_without_worker_heartbeats() { assert!(RUNTIME_MIGRATIONS.contains("runtime_states")); assert!(RUNTIME_MIGRATIONS.contains("runtime_gates")); assert!(RUNTIME_MIGRATIONS.contains("runtime_leases")); + assert!(RUNTIME_MIGRATIONS.contains("blob_reconciliation_runs")); + assert!(RUNTIME_MIGRATIONS.contains("blob_reconciliation_checkpoints")); + assert!(RUNTIME_MIGRATIONS.contains("doc_blob_refs")); + assert!(RUNTIME_MIGRATIONS.contains("blob_cleanup_candidates")); assert!(!RUNTIME_MIGRATIONS.contains("runtime_worker_heartbeats")); } diff --git a/packages/backend/native/src/backend_runtime/types.rs b/packages/backend/native/src/backend_runtime/types.rs index 513859a8da..8e0bec64e8 100644 --- a/packages/backend/native/src/backend_runtime/types.rs +++ b/packages/backend/native/src/backend_runtime/types.rs @@ -138,6 +138,49 @@ pub struct RuntimeBlobCompleteResult { pub last_modified_ms: Option, } +#[napi_derive::napi(object)] +pub struct RuntimeBlobMetadataBackfillResult { + pub scanned_objects: i64, + pub headed_objects: i64, + pub upserted_metadata: i64, + pub skipped_existing: i64, + pub skipped_workspace_missing: i64, + pub failed: i64, + pub next_cursor: Option, + pub workspace_ids: Vec, +} + +#[napi_derive::napi(object)] +pub struct RuntimeDocBlobRefsResult { + pub scanned_docs: i64, + pub parsed_docs: i64, + pub refs_written: i64, + pub refs_deleted: i64, + pub failed_docs: i64, + pub next_cursor: Option, +} + +#[napi_derive::napi(object)] +pub struct RuntimeBlobCleanupPlanResult { + pub run_id: Option, + pub scanned_blobs: i64, + pub candidates_marked: i64, + pub protected_by_doc_refs: i64, + pub protected_by_metadata: i64, + pub protected_by_other_refs: i64, + pub next_cursor: Option, +} + +#[napi_derive::napi(object)] +pub struct RuntimeBlobCleanupExecuteResult { + pub scanned_candidates: i64, + pub deleted_objects: i64, + pub deleted_metadata: i64, + pub skipped_still_referenced: i64, + pub failed: i64, + pub workspace_ids: Vec, +} + #[napi_derive::napi(object)] pub struct RuntimeDocCompactionResult { pub lease_acquired: bool, diff --git a/packages/backend/server/src/__tests__/auth/job.spec.ts b/packages/backend/server/src/__tests__/auth/job.spec.ts index 930b2b5e7b..c11c1ba5da 100644 --- a/packages/backend/server/src/__tests__/auth/job.spec.ts +++ b/packages/backend/server/src/__tests__/auth/job.spec.ts @@ -2,17 +2,25 @@ import { ScheduleModule } from '@nestjs/schedule'; import { TestingModule } from '@nestjs/testing'; import { PrismaClient } from '@prisma/client'; import test from 'ava'; +import Sinon from 'sinon'; import { AuthModule, AuthService } from '../../core/auth'; import { AuthCronJob } from '../../core/auth/job'; +import { BackendRuntimeProvider } from '../../core/backend-runtime'; import { createTestingModule } from '../utils'; let m: TestingModule; let db: PrismaClient; +const runtime = { + cleanupExpiredUserSessions: Sinon.stub(), +}; test.before(async () => { m = await createTestingModule({ imports: [ScheduleModule.forRoot(), AuthModule], + tapModule: builder => { + builder.overrideProvider(BackendRuntimeProvider).useValue(runtime); + }, }); db = m.get(PrismaClient); @@ -32,16 +40,17 @@ test('should clean expired user sessions', async t => { let userSessions = await db.userSession.findMany(); t.is(userSessions.length, 2); - // no expired sessions + runtime.cleanupExpiredUserSessions.reset(); + runtime.cleanupExpiredUserSessions.resolves(0); await job.cleanExpiredUserSessions(); - userSessions = await db.userSession.findMany(); - t.is(userSessions.length, 2); + t.true(runtime.cleanupExpiredUserSessions.calledOnce); + t.deepEqual(runtime.cleanupExpiredUserSessions.firstCall.args, [1000]); - // clean all expired sessions - await db.userSession.updateMany({ - data: { expiresAt: new Date(Date.now() - 1000) }, - }); + runtime.cleanupExpiredUserSessions.reset(); + runtime.cleanupExpiredUserSessions.onCall(0).resolves(1000); + runtime.cleanupExpiredUserSessions.onCall(1).resolves(2); await job.cleanExpiredUserSessions(); - userSessions = await db.userSession.findMany(); - t.is(userSessions.length, 0); + t.is(runtime.cleanupExpiredUserSessions.callCount, 2); + t.deepEqual(runtime.cleanupExpiredUserSessions.firstCall.args, [1000]); + t.deepEqual(runtime.cleanupExpiredUserSessions.secondCall.args, [1000]); }); diff --git a/packages/backend/server/src/__tests__/doc/cron.spec.ts b/packages/backend/server/src/__tests__/doc/cron.spec.ts index c182abdf0f..ea3f5e8cd5 100644 --- a/packages/backend/server/src/__tests__/doc/cron.spec.ts +++ b/packages/backend/server/src/__tests__/doc/cron.spec.ts @@ -1,7 +1,9 @@ import { ScheduleModule } from '@nestjs/schedule'; import { PrismaClient } from '@prisma/client'; import ava, { TestFn } from 'ava'; +import Sinon from 'sinon'; +import { BackendRuntimeProvider } from '../../core/backend-runtime'; import { DocStorageModule } from '../../core/doc'; import { DocStorageCronJob } from '../../core/doc/job'; import { createTestingModule, type TestingModule } from '../utils'; @@ -10,14 +12,23 @@ interface Context { module: TestingModule; db: PrismaClient; cronJob: DocStorageCronJob; + runtime: { cleanupExpiredSnapshotHistories: Sinon.SinonStub }; } const test = ava as TestFn; // cleanup database before each test test.before(async t => { + t.context.runtime = { + cleanupExpiredSnapshotHistories: Sinon.stub(), + }; t.context.module = await createTestingModule({ imports: [ScheduleModule.forRoot(), DocStorageModule], + tapModule: builder => { + builder + .overrideProvider(BackendRuntimeProvider) + .useValue(t.context.runtime); + }, }); t.context.db = t.context.module.get(PrismaClient); @@ -26,6 +37,7 @@ test.before(async t => { test.beforeEach(async t => { await t.context.module.initTestingDB(); + t.context.runtime.cleanupExpiredSnapshotHistories.reset(); }); test.after.always(async t => { @@ -33,7 +45,7 @@ test.after.always(async t => { }); test('should be able to cleanup expired history', async t => { - const { db } = t.context; + const { db, runtime } = t.context; const timestamp = Date.now(); // insert expired data @@ -65,12 +77,10 @@ test('should be able to cleanup expired history', async t => { let count = await db.snapshotHistory.count(); t.is(count, 20); + runtime.cleanupExpiredSnapshotHistories.onCall(0).resolves(1000); + runtime.cleanupExpiredSnapshotHistories.onCall(1).resolves(10); + await t.context.cronJob.cleanExpiredHistories(); - count = await db.snapshotHistory.count(); - t.is(count, 10); - - const example = await db.snapshotHistory.findFirst(); - t.truthy(example); - t.true(example!.expiredAt > new Date()); + t.is(runtime.cleanupExpiredSnapshotHistories.callCount, 2); }); diff --git a/packages/backend/server/src/__tests__/models/session.spec.ts b/packages/backend/server/src/__tests__/models/session.spec.ts index 50481ddd94..4c17b1d33d 100644 --- a/packages/backend/server/src/__tests__/models/session.spec.ts +++ b/packages/backend/server/src/__tests__/models/session.spec.ts @@ -303,28 +303,3 @@ test('should delete userSession fail when sessionId not match', async t => { ); t.is(count, 0); }); - -test('should cleanup expired userSessions', async t => { - const user = await t.context.user.create({ - email: 'test@affine.pro', - }); - const session = await t.context.db.session.create({ - data: {}, - }); - const userSession = await t.context.session.createOrRefreshUserSession( - user.id, - session.id - ); - await t.context.session.cleanExpiredUserSessions(); - let count = await t.context.db.userSession.count(); - t.is(count, 1); - - // Set expiresAt to past time - await t.context.db.userSession.update({ - where: { id: userSession.id }, - data: { expiresAt: new Date('2022-01-01') }, - }); - await t.context.session.cleanExpiredUserSessions(); - count = await t.context.db.userSession.count(); - t.is(count, 0); -}); diff --git a/packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts b/packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts index 5bb790c373..186a6e555e 100644 --- a/packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts +++ b/packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts @@ -4,6 +4,7 @@ import ava, { TestFn } from 'ava'; import Sinon from 'sinon'; import { OneDay } from '../../base'; +import { BackendRuntimeProvider } from '../../core/backend-runtime'; import { StorageModule, WorkspaceBlobStorage } from '../../core/storage'; import { BlobUploadCleanupJob } from '../../core/storage/job'; import { MockUser, MockWorkspace } from '../mocks'; @@ -14,13 +15,22 @@ interface Context { db: PrismaClient; job: BlobUploadCleanupJob; storage: WorkspaceBlobStorage; + runtime: { cleanupExpiredPendingBlobs: Sinon.SinonStub }; } const test = ava as TestFn; test.before(async t => { + t.context.runtime = { + cleanupExpiredPendingBlobs: Sinon.stub(), + }; t.context.module = await createTestingModule({ imports: [ScheduleModule.forRoot(), StorageModule], + tapModule: builder => { + builder + .overrideProvider(BackendRuntimeProvider) + .useValue(t.context.runtime); + }, }); t.context.db = t.context.module.get(PrismaClient); @@ -30,6 +40,7 @@ test.before(async t => { test.beforeEach(async t => { await t.context.module.initTestingDB(); + t.context.runtime.cleanupExpiredPendingBlobs.reset(); }); test.after.always(async t => { @@ -86,24 +97,14 @@ test('should cleanup expired pending blobs', async t => { ], }); - const abortSpy = Sinon.stub( - t.context.storage, - 'abortMultipartUpload' - ).resolves(); - const deleteSpy = Sinon.spy(t.context.storage, 'delete'); - t.teardown(() => { - abortSpy.restore(); - deleteSpy.restore(); + t.context.runtime.cleanupExpiredPendingBlobs.resolves({ + scanned: 2, + deleted: 2, + abortedMultipart: 1, + workspaceIds: [workspace.id], }); await t.context.job.cleanExpiredPendingBlobs(); - t.is(abortSpy.callCount, 1); - t.is(deleteSpy.callCount, 2); - - const remaining = await t.context.db.blob.findMany({ - where: { workspaceId: workspace.id }, - }); - const remainingKeys = remaining.map(record => record.key).sort(); - t.deepEqual(remainingKeys, ['completed-keep', 'pending-active']); + t.true(t.context.runtime.cleanupExpiredPendingBlobs.calledOnce); }); diff --git a/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts b/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts index 93d3035cc4..e75b6cec4e 100644 --- a/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts +++ b/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts @@ -119,6 +119,50 @@ test('should list blobs', async t => { t.deepEqual(ret.map(x => x.key).sort(), [hash1, hash2].sort()); }); +test('should keep partial blob metadata listing on DB path without storage scan', async t => { + await app.signupV1('u1@affine.pro'); + + const workspace = await createWorkspace(app); + const storage = app.get(WorkspaceBlobStorage); + const rawProvider = (storage as any).provider; + const listSpy = Sinon.spy(rawProvider, 'list'); + t.teardown(() => listSpy.restore()); + + const buffer1 = Buffer.from('with metadata'); + const buffer2 = Buffer.from('without metadata'); + const key1 = sha256Base64urlWithPadding(buffer1); + const key2 = sha256Base64urlWithPadding(buffer2); + const config = app.get(Config); + const factory = app.get(StorageProviderFactory); + const provider = factory.create(config.storages.blob.storage); + await provider.put(`${workspace.id}/${key1}`, buffer1, { + contentType: 'text/plain', + contentLength: buffer1.length, + }); + await provider.put(`${workspace.id}/${key2}`, buffer2, { + contentType: 'text/plain', + contentLength: buffer2.length, + }); + + const blobModel = app.get(BlobModel); + await blobModel.upsert({ + workspaceId: workspace.id, + key: key1, + mime: 'text/plain', + size: buffer1.length, + status: 'completed', + uploadId: null, + }); + + const listed = await storage.list(workspace.id); + + t.deepEqual( + listed.map(blob => blob.key), + [key1] + ); + t.true(listSpy.notCalled); +}); + test('should create pending blob upload with graphql fallback', async t => { await app.signupV1('u1@affine.pro'); @@ -221,10 +265,13 @@ test('should auto delete blobs when workspace is deleted', async t => { const blobs = await listBlobs(app, workspace.id); t.is(blobs.length, 2); - const workspaceBlobStorage = Sinon.spy(app.get(WorkspaceBlobStorage)); + const storage = app.get(WorkspaceBlobStorage); + const rawProvider = (storage as any).provider; + const listSpy = Sinon.spy(rawProvider, 'list'); + t.teardown(() => listSpy.restore()); + await deleteWorkspace(app, workspace.id); - // should not emit workspace.blob.sync event - t.is(workspaceBlobStorage.syncBlobMeta.callCount, 0); + t.is(listSpy.callCount, 0); }); test('should calc blobs size', async t => { diff --git a/packages/backend/server/src/app.module.ts b/packages/backend/server/src/app.module.ts index 837a5345d3..0c38924eda 100644 --- a/packages/backend/server/src/app.module.ts +++ b/packages/backend/server/src/app.module.ts @@ -30,6 +30,7 @@ import { RateLimiterModule } from './base/throttler'; import { WebSocketModule } from './base/websocket'; import { AccessTokenModule } from './core/access-token'; import { AuthModule } from './core/auth'; +import { BackendRuntimeModule } from './core/backend-runtime'; import { CommentModule } from './core/comment'; import { ServerConfigModule, ServerConfigResolverModule } from './core/config'; import { DocStorageModule } from './core/doc'; @@ -120,6 +121,7 @@ export const FunctionalityModules = [ JobModule.forRoot(), RealtimeModule, ModelsModule, + BackendRuntimeModule, ScheduleModule.forRoot(), MonitorModule, ]; diff --git a/packages/backend/server/src/base/job/queue/config.ts b/packages/backend/server/src/base/job/queue/config.ts index ebb9b95ebb..2752bd2b38 100644 --- a/packages/backend/server/src/base/job/queue/config.ts +++ b/packages/backend/server/src/base/job/queue/config.ts @@ -94,4 +94,12 @@ defineModuleConfig('job', { }, schema, }, + + 'queues.backendRuntime': { + desc: 'The config for backend runtime job queue', + default: { + concurrency: 1, + }, + schema, + }, }); diff --git a/packages/backend/server/src/base/job/queue/def.ts b/packages/backend/server/src/base/job/queue/def.ts index f19ae5b832..e26c50b8fb 100644 --- a/packages/backend/server/src/base/job/queue/def.ts +++ b/packages/backend/server/src/base/job/queue/def.ts @@ -29,6 +29,7 @@ export enum Queue { COPILOT = 'copilot', INDEXER = 'indexer', CALENDAR = 'calendar', + BACKENDRUNTIME = 'backendRuntime', } export const QUEUES = Object.values(Queue); diff --git a/packages/backend/server/src/core/auth/index.ts b/packages/backend/server/src/core/auth/index.ts index f10c304bf2..e6bba8a0a6 100644 --- a/packages/backend/server/src/core/auth/index.ts +++ b/packages/backend/server/src/core/auth/index.ts @@ -2,6 +2,7 @@ import './config'; import { Module } from '@nestjs/common'; +import { BackendRuntimeModule } from '../backend-runtime'; import { FeatureModule } from '../features'; import { MailModule } from '../mail'; import { QuotaModule } from '../quota'; @@ -20,7 +21,13 @@ import { AuthService } from './service'; import { SessionIssuer } from './session-issuer'; @Module({ - imports: [FeatureModule, UserModule, QuotaModule, MailModule], + imports: [ + BackendRuntimeModule, + FeatureModule, + UserModule, + QuotaModule, + MailModule, + ], providers: [ AuthService, AuthResolver, diff --git a/packages/backend/server/src/core/auth/job.ts b/packages/backend/server/src/core/auth/job.ts index 550709aabe..91948d7969 100644 --- a/packages/backend/server/src/core/auth/job.ts +++ b/packages/backend/server/src/core/auth/job.ts @@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { JobQueue, OnJob } from '../../base'; -import { Models } from '../../models'; +import { BackendRuntimeProvider } from '../backend-runtime'; declare global { interface Jobs { @@ -13,7 +13,7 @@ declare global { @Injectable() export class AuthCronJob { constructor( - private readonly models: Models, + private readonly rt: BackendRuntimeProvider, private readonly queue: JobQueue ) {} @@ -31,6 +31,9 @@ export class AuthCronJob { @OnJob('nightly.cleanExpiredUserSessions') async cleanExpiredUserSessions() { - await this.models.session.cleanExpiredUserSessions(); + for (;;) { + const count = await this.rt.cleanupExpiredUserSessions(1000); + if (count < 1000) break; + } } } diff --git a/packages/backend/server/src/core/backend-runtime/__tests__/job.spec.ts b/packages/backend/server/src/core/backend-runtime/__tests__/job.spec.ts new file mode 100644 index 0000000000..f3fcf28d22 --- /dev/null +++ b/packages/backend/server/src/core/backend-runtime/__tests__/job.spec.ts @@ -0,0 +1,57 @@ +import { ScheduleModule } from '@nestjs/schedule'; +import ava, { TestFn } from 'ava'; +import Sinon from 'sinon'; + +import { + createTestingModule, + type TestingModule, +} from '../../../__tests__/utils'; +import { BackendRuntimeModule, BackendRuntimeProvider } from '../index'; +import { BackendRuntimeHousekeepingJob } from '../job'; + +interface Context { + module: TestingModule; + job: BackendRuntimeHousekeepingJob; + runtime: { + cleanupExpiredRuntimeStates: Sinon.SinonStub; + cleanupExpiredRuntimeGates: Sinon.SinonStub; + }; +} + +const test = ava as TestFn; + +test.before(async t => { + t.context.runtime = { + cleanupExpiredRuntimeStates: Sinon.stub(), + cleanupExpiredRuntimeGates: Sinon.stub(), + }; + t.context.module = await createTestingModule({ + imports: [ScheduleModule.forRoot(), BackendRuntimeModule], + tapModule: builder => { + builder + .overrideProvider(BackendRuntimeProvider) + .useValue(t.context.runtime); + }, + }); + t.context.job = t.context.module.get(BackendRuntimeHousekeepingJob); +}); + +test.beforeEach(t => { + t.context.runtime.cleanupExpiredRuntimeStates.reset(); + t.context.runtime.cleanupExpiredRuntimeGates.reset(); +}); + +test.after.always(async t => { + await t.context.module.close(); +}); + +test('backend-runtime housekeeping cleans runtime state and gate batches', async t => { + t.context.runtime.cleanupExpiredRuntimeStates.onCall(0).resolves(1000); + t.context.runtime.cleanupExpiredRuntimeStates.onCall(1).resolves(2); + t.context.runtime.cleanupExpiredRuntimeGates.resolves(1); + + await t.context.job.cleanExpiredRuntimeHousekeeping(); + + t.is(t.context.runtime.cleanupExpiredRuntimeStates.callCount, 2); + t.is(t.context.runtime.cleanupExpiredRuntimeGates.callCount, 1); +}); diff --git a/packages/backend/server/src/core/backend-runtime/__tests__/provider.spec.ts b/packages/backend/server/src/core/backend-runtime/__tests__/provider.spec.ts new file mode 100644 index 0000000000..36b932dbb3 --- /dev/null +++ b/packages/backend/server/src/core/backend-runtime/__tests__/provider.spec.ts @@ -0,0 +1,43 @@ +import test from 'ava'; +import Sinon from 'sinon'; + +import { BackendRuntimeProvider } from '../provider'; + +test('backend-runtime provider starts once, runs migrations once, and reports health', async t => { + const provider = new BackendRuntimeProvider(); + const runtime = { + start: Sinon.stub().resolves(), + stop: Sinon.stub().resolves(), + runMigrations: Sinon.stub().resolves(), + health: Sinon.stub().resolves({ + started: true, + databaseConnected: true, + objectStorageConfigured: true, + }), + }; + (provider as any).runtime = runtime; + + await provider.start(); + await provider.start(); + const health = await provider.health(); + await provider.stop(); + + t.is(runtime.start.callCount, 2); + t.is(runtime.runMigrations.callCount, 1); + t.true(health.databaseConnected); + t.true(health.objectStorageConfigured); + t.is(runtime.stop.callCount, 1); +}); + +test('backend-runtime provider measures explicit typed methods', async t => { + const provider = new BackendRuntimeProvider(); + const runtime = { + cleanupExpiredRuntimeStates: Sinon.stub().resolves(3), + }; + (provider as any).runtime = runtime; + + const result = await provider.cleanupExpiredRuntimeStates(1000); + + t.is(result, 3); + t.true(runtime.cleanupExpiredRuntimeStates.calledOnceWithExactly(1000)); +}); diff --git a/packages/backend/server/src/core/backend-runtime/blob-job.ts b/packages/backend/server/src/core/backend-runtime/blob-job.ts new file mode 100644 index 0000000000..d1e0a0918e --- /dev/null +++ b/packages/backend/server/src/core/backend-runtime/blob-job.ts @@ -0,0 +1,368 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { PrismaClient } from '@prisma/client'; + +import { EventBus, JobQueue, OnJob } from '../../base'; +import { BackendRuntimeProvider } from './provider'; + +declare global { + interface Jobs { + 'backendRuntime.backfillMissingBlobMetadata': { + workspaceId: string; + limit?: number; + }; + 'backendRuntime.backfillMissingBlobMetadataBySid': { + lastSid?: number; + workspaceLimit?: number; + objectLimit?: number; + }; + 'backendRuntime.rebuildWorkspaceDocBlobRefs': { + workspaceId: string; + limit?: number; + }; + 'backendRuntime.rebuildWorkspaceDocBlobRefsBySid': { + lastSid?: number; + workspaceLimit?: number; + docLimit?: number; + }; + 'backendRuntime.planUnreferencedWorkspaceBlobs': { + workspaceId: string; + gracePeriodDays?: number; + limit?: number; + }; + 'backendRuntime.planUnreferencedWorkspaceBlobsBySid': { + lastSid?: number; + workspaceLimit?: number; + gracePeriodDays?: number; + limit?: number; + }; + 'backendRuntime.executeBlobCleanupCandidates': { + runId: string; + gracePeriodDays?: number; + limit?: number; + }; + } +} + +@Injectable() +export class BackendRuntimeBlobJob { + private readonly logger = new Logger(BackendRuntimeBlobJob.name); + + constructor( + private readonly rt: BackendRuntimeProvider, + private readonly event: EventBus, + private readonly queue: JobQueue, + private readonly db: PrismaClient + ) {} + + async enqueueBackfillMissingBlobMetadata(workspaceId: string, limit = 1000) { + await this.queue.add('backendRuntime.backfillMissingBlobMetadata', { + workspaceId, + limit, + }); + } + + async enqueueBackfillMissingBlobMetadataBySid( + lastSid = 0, + workspaceLimit = 100, + objectLimit = 1000 + ) { + await this.queue.add('backendRuntime.backfillMissingBlobMetadataBySid', { + lastSid, + workspaceLimit, + objectLimit, + }); + } + + async enqueueRebuildWorkspaceDocBlobRefs(workspaceId: string, limit = 1000) { + await this.queue.add('backendRuntime.rebuildWorkspaceDocBlobRefs', { + workspaceId, + limit, + }); + } + + async enqueueRebuildWorkspaceDocBlobRefsBySid( + lastSid = 0, + workspaceLimit = 100, + docLimit = 1000 + ) { + await this.queue.add('backendRuntime.rebuildWorkspaceDocBlobRefsBySid', { + lastSid, + workspaceLimit, + docLimit, + }); + } + + @OnJob('backendRuntime.backfillMissingBlobMetadataBySid') + async backfillMissingBlobMetadataBySid({ + lastSid = 0, + workspaceLimit = 100, + objectLimit = 1000, + }: Jobs['backendRuntime.backfillMissingBlobMetadataBySid']) { + const workspaces = await this.db.workspace.findMany({ + where: { sid: { gt: lastSid } }, + orderBy: { sid: 'asc' }, + select: { id: true, sid: true }, + take: workspaceLimit, + }); + + for (const workspace of workspaces) { + await this.drainBlobMetadataBackfill(workspace.id, objectLimit, { + sid: workspace.sid, + }); + } + + const nextSid = workspaces.at(-1)?.sid; + if (nextSid !== undefined && workspaces.length === workspaceLimit) { + await this.enqueueBackfillMissingBlobMetadataBySid( + nextSid, + workspaceLimit, + objectLimit + ); + } + } + + async enqueuePlanUnreferencedWorkspaceBlobs( + workspaceId: string, + gracePeriodDays = 30, + limit = 1000 + ) { + await this.queue.add('backendRuntime.planUnreferencedWorkspaceBlobs', { + workspaceId, + gracePeriodDays, + limit, + }); + } + + async enqueuePlanUnreferencedWorkspaceBlobsBySid( + lastSid = 0, + workspaceLimit = 100, + gracePeriodDays = 30, + limit = 1000 + ) { + await this.queue.add('backendRuntime.planUnreferencedWorkspaceBlobsBySid', { + lastSid, + workspaceLimit, + gracePeriodDays, + limit, + }); + } + + async enqueueExecuteBlobCleanupCandidates( + runId: string, + gracePeriodDays = 30, + limit = 1000 + ) { + await this.queue.add('backendRuntime.executeBlobCleanupCandidates', { + runId, + gracePeriodDays, + limit, + }); + } + + @Cron(CronExpression.EVERY_DAY_AT_1AM) + async dailyBlobMetadataBackfill() { + await this.queue.add( + 'backendRuntime.backfillMissingBlobMetadataBySid', + {}, + { jobId: 'daily-backend-runtime-blob-metadata-backfill' } + ); + } + + @Cron(CronExpression.EVERY_DAY_AT_2AM) + async dailyDocBlobRefsRebuild() { + await this.queue.add( + 'backendRuntime.rebuildWorkspaceDocBlobRefsBySid', + {}, + { jobId: 'daily-backend-runtime-doc-blob-refs-rebuild' } + ); + } + + @Cron(CronExpression.EVERY_DAY_AT_3AM) + async dailyBlobCleanupPlanning() { + await this.queue.add( + 'backendRuntime.planUnreferencedWorkspaceBlobsBySid', + {}, + { jobId: 'daily-backend-runtime-blob-cleanup-planning' } + ); + } + + @OnJob('backendRuntime.backfillMissingBlobMetadata') + async backfillMissingBlobMetadata({ + workspaceId, + limit = 1000, + }: Jobs['backendRuntime.backfillMissingBlobMetadata']) { + await this.drainBlobMetadataBackfill(workspaceId, limit); + } + + @OnJob('backendRuntime.rebuildWorkspaceDocBlobRefs') + async rebuildWorkspaceDocBlobRefs({ + workspaceId, + limit = 1000, + }: Jobs['backendRuntime.rebuildWorkspaceDocBlobRefs']) { + await this.drainWorkspaceDocBlobRefs(workspaceId, limit); + } + + @OnJob('backendRuntime.rebuildWorkspaceDocBlobRefsBySid') + async rebuildWorkspaceDocBlobRefsBySid({ + lastSid = 0, + workspaceLimit = 100, + docLimit = 1000, + }: Jobs['backendRuntime.rebuildWorkspaceDocBlobRefsBySid']) { + const workspaces = await this.db.workspace.findMany({ + where: { + sid: { + gt: lastSid, + }, + }, + orderBy: { + sid: 'asc', + }, + select: { + id: true, + sid: true, + }, + take: workspaceLimit, + }); + + for (const workspace of workspaces) { + await this.drainWorkspaceDocBlobRefs(workspace.id, docLimit, { + sid: workspace.sid, + }); + } + + const nextSid = workspaces.at(-1)?.sid; + if (nextSid !== undefined && workspaces.length === workspaceLimit) { + await this.enqueueRebuildWorkspaceDocBlobRefsBySid( + nextSid, + workspaceLimit, + docLimit + ); + } + } + + @OnJob('backendRuntime.planUnreferencedWorkspaceBlobs') + async planUnreferencedWorkspaceBlobs({ + workspaceId, + gracePeriodDays = 30, + limit = 1000, + }: Jobs['backendRuntime.planUnreferencedWorkspaceBlobs']) { + const result = await this.rt.planUnreferencedWorkspaceBlobs( + workspaceId, + gracePeriodDays, + limit + ); + this.logger.log( + `planned blob cleanup workspace=${workspaceId} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}` + ); + } + + @OnJob('backendRuntime.planUnreferencedWorkspaceBlobsBySid') + async planUnreferencedWorkspaceBlobsBySid({ + lastSid = 0, + workspaceLimit = 100, + gracePeriodDays = 30, + limit = 1000, + }: Jobs['backendRuntime.planUnreferencedWorkspaceBlobsBySid']) { + const workspaces = await this.db.workspace.findMany({ + where: { + sid: { + gt: lastSid, + }, + }, + orderBy: { + sid: 'asc', + }, + select: { + id: true, + sid: true, + }, + take: workspaceLimit, + }); + + for (const workspace of workspaces) { + const result = await this.rt.planUnreferencedWorkspaceBlobs( + workspace.id, + gracePeriodDays, + limit + ); + this.logger.log( + `planned blob cleanup workspace=${workspace.id} sid=${workspace.sid} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}` + ); + } + + const nextSid = workspaces.at(-1)?.sid; + if (nextSid !== undefined && workspaces.length === workspaceLimit) { + await this.enqueuePlanUnreferencedWorkspaceBlobsBySid( + nextSid, + workspaceLimit, + gracePeriodDays, + limit + ); + } + } + + @OnJob('backendRuntime.executeBlobCleanupCandidates') + async executeBlobCleanupCandidates({ + runId, + gracePeriodDays = 30, + limit = 1000, + }: Jobs['backendRuntime.executeBlobCleanupCandidates']) { + const result = await this.rt.executeBlobCleanupCandidates( + runId, + gracePeriodDays, + limit + ); + await Promise.all( + result.workspaceIds.map(workspaceId => + this.event.emitAsync('workspace.blobs.updated', { workspaceId }) + ) + ); + this.logger.log( + `executed blob cleanup run=${runId} deleted=${result.deletedObjects} skipped=${result.skippedStillReferenced} failed=${result.failed}` + ); + } + + private async drainBlobMetadataBackfill( + workspaceId: string, + limit: number, + context: { sid?: number } = {} + ) { + for (;;) { + const result = await this.rt.backfillMissingBlobMetadata( + workspaceId, + limit + ); + await Promise.all( + result.workspaceIds.map(workspaceId => + this.event.emitAsync('workspace.blobs.updated', { workspaceId }) + ) + ); + this.logger.log( + `backfilled blob metadata workspace=${workspaceId}${context.sid === undefined ? '' : ` sid=${context.sid}`} upserted=${result.upsertedMetadata} scanned=${result.scannedObjects}` + ); + if (!result.nextCursor) { + break; + } + } + } + + private async drainWorkspaceDocBlobRefs( + workspaceId: string, + limit: number, + context: { sid?: number } = {} + ) { + for (;;) { + const result = await this.rt.rebuildWorkspaceDocBlobRefs( + workspaceId, + limit + ); + this.logger.log( + `rebuilt doc blob refs workspace=${workspaceId}${context.sid === undefined ? '' : ` sid=${context.sid}`} parsed=${result.parsedDocs} failed=${result.failedDocs}` + ); + if (!result.nextCursor) { + break; + } + } + } +} diff --git a/packages/backend/server/src/core/backend-runtime/index.ts b/packages/backend/server/src/core/backend-runtime/index.ts new file mode 100644 index 0000000000..c9245ee54c --- /dev/null +++ b/packages/backend/server/src/core/backend-runtime/index.ts @@ -0,0 +1,18 @@ +import { Global, Module } from '@nestjs/common'; + +import { BackendRuntimeBlobJob } from './blob-job'; +import { BackendRuntimeHousekeepingJob } from './job'; +import { BackendRuntimeProvider } from './provider'; + +@Global() +@Module({ + providers: [ + BackendRuntimeProvider, + BackendRuntimeBlobJob, + BackendRuntimeHousekeepingJob, + ], + exports: [BackendRuntimeProvider, BackendRuntimeBlobJob], +}) +export class BackendRuntimeModule {} + +export { BackendRuntimeProvider } from './provider'; diff --git a/packages/backend/server/src/core/backend-runtime/job.ts b/packages/backend/server/src/core/backend-runtime/job.ts new file mode 100644 index 0000000000..fce8da818a --- /dev/null +++ b/packages/backend/server/src/core/backend-runtime/job.ts @@ -0,0 +1,58 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; + +import { JobQueue, OnJob } from '../../base'; +import { BackendRuntimeProvider } from './provider'; + +declare global { + interface Jobs { + 'nightly.cleanExpiredBackendRuntimeHousekeeping': {}; + } +} + +@Injectable() +export class BackendRuntimeHousekeepingJob { + private readonly logger = new Logger(BackendRuntimeHousekeepingJob.name); + + constructor( + private readonly rt: BackendRuntimeProvider, + private readonly queue: JobQueue + ) {} + + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async nightlyJob() { + await this.queue.add( + 'nightly.cleanExpiredBackendRuntimeHousekeeping', + {}, + { + jobId: 'nightly-backend-runtime-housekeeping', + } + ); + } + + @OnJob('nightly.cleanExpiredBackendRuntimeHousekeeping') + async cleanExpiredRuntimeHousekeeping() { + const states = await this.cleanBatches(() => + this.rt.cleanupExpiredRuntimeStates(1000) + ); + const gates = await this.cleanBatches(() => + this.rt.cleanupExpiredRuntimeGates(1000) + ); + + this.logger.log( + `cleaned runtime housekeeping states=${states} gates=${gates}` + ); + } + + private async cleanBatches(fn: () => Promise) { + let total = 0; + for (;;) { + const count = Number(await fn()); + total += count; + if (count < 1000) { + break; + } + } + return total; + } +} diff --git a/packages/backend/server/src/core/backend-runtime/provider.ts b/packages/backend/server/src/core/backend-runtime/provider.ts new file mode 100644 index 0000000000..30c85dc2c6 --- /dev/null +++ b/packages/backend/server/src/core/backend-runtime/provider.ts @@ -0,0 +1,137 @@ +import { + Injectable, + Logger, + type OnApplicationBootstrap, + type OnApplicationShutdown, +} from '@nestjs/common'; + +import { wrapCallMetric } from '../../base/metrics'; +import { BackendRuntime, type BackendRuntimeHealth } from '../../native'; + +type RuntimeInstance = InstanceType; + +@Injectable() +export class BackendRuntimeProvider + implements OnApplicationBootstrap, OnApplicationShutdown +{ + private readonly logger = new Logger(BackendRuntimeProvider.name); + private readonly runtime: RuntimeInstance = new BackendRuntime(); + private migrationsStarted = false; + + async onApplicationBootstrap() { + await this.start(); + } + + async onApplicationShutdown() { + await this.stop(); + } + + async start() { + await this.runtime.start(); + await this.runMigrationsOnce(); + const health = await this.runtime.health(); + this.logger.log( + `backend runtime started: db=${health.databaseConnected} objectStorage=${health.objectStorageConfigured}` + ); + } + + async stop() { + await this.runtime.stop(); + this.logger.log('backend runtime stopped'); + } + + async health(): Promise { + return await this.runtime.health(); + } + + async cleanupExpiredPendingBlobs(cutoffMs: number, limit: number) { + return await this.measured('cleanupExpiredPendingBlobs', rt => + rt.cleanupExpiredPendingBlobs(cutoffMs, limit) + ); + } + + async releaseDeletedBlobs(workspaceId: string, limit: number) { + return await this.measured('releaseDeletedBlobs', rt => + rt.releaseDeletedBlobs(workspaceId, limit) + ); + } + + async cleanupExpiredSnapshotHistories(limit: number) { + return await this.measured('cleanupExpiredSnapshotHistories', rt => + rt.cleanupExpiredSnapshotHistories(limit) + ); + } + + async cleanupExpiredUserSessions(limit: number) { + return await this.measured('cleanupExpiredUserSessions', rt => + rt.cleanupExpiredUserSessions(limit) + ); + } + + async cleanupExpiredRuntimeStates(limit: number) { + return await this.measured('cleanupExpiredRuntimeStates', rt => + rt.cleanupExpiredRuntimeStates(limit) + ); + } + + async cleanupExpiredRuntimeGates(limit: number) { + return await this.measured('cleanupExpiredRuntimeGates', rt => + rt.cleanupExpiredRuntimeGates(limit) + ); + } + + async backfillMissingBlobMetadata( + workspaceId: string | null | undefined, + limit: number + ) { + return await this.measured('backfillMissingBlobMetadata', rt => + rt.backfillMissingBlobMetadata(workspaceId, limit) + ); + } + + async rebuildWorkspaceDocBlobRefs(workspaceId: string, limit: number) { + return await this.measured('rebuildWorkspaceDocBlobRefs', rt => + rt.rebuildWorkspaceDocBlobRefs(workspaceId, limit) + ); + } + + async planUnreferencedWorkspaceBlobs( + workspaceId: string, + gracePeriodDays: number, + limit: number + ) { + return await this.measured('planUnreferencedWorkspaceBlobs', rt => + rt.planUnreferencedWorkspaceBlobs(workspaceId, gracePeriodDays, limit) + ); + } + + async executeBlobCleanupCandidates( + runId: string, + gracePeriodDays: number, + limit: number + ) { + return await this.measured('executeBlobCleanupCandidates', rt => + rt.executeBlobCleanupCandidates(runId, gracePeriodDays, limit) + ); + } + + private async measured( + method: string, + fn: (runtime: RuntimeInstance) => Promise + ): Promise { + return await wrapCallMetric( + () => fn(this.runtime), + 'storage', + 'backend_runtime', + { method } + )(); + } + + private async runMigrationsOnce() { + if (this.migrationsStarted) { + return; + } + await this.runtime.runMigrations(); + this.migrationsStarted = true; + } +} diff --git a/packages/backend/server/src/core/doc/index.ts b/packages/backend/server/src/core/doc/index.ts index 8058bed7dc..04dc620070 100644 --- a/packages/backend/server/src/core/doc/index.ts +++ b/packages/backend/server/src/core/doc/index.ts @@ -2,6 +2,7 @@ import './config'; import { Module } from '@nestjs/common'; +import { BackendRuntimeModule } from '../backend-runtime'; import { PermissionModule } from '../permission'; import { QuotaModule } from '../quota'; import { StorageModule } from '../storage'; @@ -14,7 +15,7 @@ import { DatabaseDocReader, DocReader, DocReaderProvider } from './reader'; import { DocWriter } from './writer'; @Module({ - imports: [QuotaModule, PermissionModule, StorageModule], + imports: [BackendRuntimeModule, QuotaModule, PermissionModule, StorageModule], providers: [ DocStorageOptions, PgWorkspaceDocStorageAdapter, diff --git a/packages/backend/server/src/core/doc/job.ts b/packages/backend/server/src/core/doc/job.ts index 1d0a5c184c..ef516286ca 100644 --- a/packages/backend/server/src/core/doc/job.ts +++ b/packages/backend/server/src/core/doc/job.ts @@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { JobQueue, OnJob } from '../../base'; -import { Models } from '../../models'; +import { BackendRuntimeProvider } from '../backend-runtime'; declare global { interface Jobs { @@ -13,7 +13,7 @@ declare global { @Injectable() export class DocStorageCronJob { constructor( - private readonly models: Models, + private readonly rt: BackendRuntimeProvider, private readonly queue: JobQueue ) {} @@ -30,6 +30,9 @@ export class DocStorageCronJob { @OnJob('nightly.cleanExpiredHistories') async cleanExpiredHistories() { - await this.models.history.cleanExpired(); + for (;;) { + const count = await this.rt.cleanupExpiredSnapshotHistories(1000); + if (count < 1000) break; + } } } diff --git a/packages/backend/server/src/core/storage/index.ts b/packages/backend/server/src/core/storage/index.ts index ffa3e1fb9b..cbc1495b75 100644 --- a/packages/backend/server/src/core/storage/index.ts +++ b/packages/backend/server/src/core/storage/index.ts @@ -2,6 +2,7 @@ import './config'; import { Module } from '@nestjs/common'; +import { BackendRuntimeModule } from '../backend-runtime'; import { BlobUploadCleanupJob } from './job'; import { R2UploadController } from './r2-proxy'; import { @@ -11,6 +12,7 @@ import { } from './wrappers'; @Module({ + imports: [BackendRuntimeModule], controllers: [R2UploadController], providers: [ WorkspaceBlobStorage, diff --git a/packages/backend/server/src/core/storage/job.ts b/packages/backend/server/src/core/storage/job.ts index 8fef65cddd..5e01520860 100644 --- a/packages/backend/server/src/core/storage/job.ts +++ b/packages/backend/server/src/core/storage/job.ts @@ -1,9 +1,8 @@ import { Injectable, Logger } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; -import { JobQueue, OneDay, OnJob } from '../../base'; -import { Models } from '../../models'; -import { WorkspaceBlobStorage } from './wrappers/blob'; +import { EventBus, JobQueue, OneDay, OnJob } from '../../base'; +import { BackendRuntimeProvider } from '../backend-runtime'; declare global { interface Jobs { @@ -16,8 +15,8 @@ export class BlobUploadCleanupJob { private readonly logger = new Logger(BlobUploadCleanupJob.name); constructor( - private readonly models: Models, - private readonly storage: WorkspaceBlobStorage, + private readonly rt: BackendRuntimeProvider, + private readonly event: EventBus, private readonly queue: JobQueue ) {} @@ -34,21 +33,25 @@ export class BlobUploadCleanupJob { @OnJob('nightly.cleanExpiredPendingBlobs') async cleanExpiredPendingBlobs() { - const cutoff = new Date(Date.now() - OneDay); - const pending = await this.models.blob.listPendingExpired(cutoff); - - for (const blob of pending) { - if (blob.uploadId) { - await this.storage.abortMultipartUpload( - blob.workspaceId, - blob.key, - blob.uploadId - ); + const cutoff = Date.now() - OneDay; + let scanned = 0; + let deleted = 0; + for (;;) { + const result = await this.rt.cleanupExpiredPendingBlobs(cutoff, 1000); + scanned += result.scanned; + deleted += result.deleted; + await Promise.all( + result.workspaceIds.map(workspaceId => + this.event.emitAsync('workspace.blobs.updated', { workspaceId }) + ) + ); + if (result.scanned < 1000) { + break; } - - await this.storage.delete(blob.workspaceId, blob.key, true); } - this.logger.log(`cleaned ${pending.length} expired pending blobs`); + this.logger.log( + `cleaned ${deleted} expired pending blobs, scanned ${scanned}` + ); } } diff --git a/packages/backend/server/src/core/storage/wrappers/blob.ts b/packages/backend/server/src/core/storage/wrappers/blob.ts index 1386c791d1..dacee8fd3f 100644 --- a/packages/backend/server/src/core/storage/wrappers/blob.ts +++ b/packages/backend/server/src/core/storage/wrappers/blob.ts @@ -7,7 +7,6 @@ import { Config, EventBus, type GetObjectMetadata, - ListObjectsMetadata, OnEvent, PutObjectMetadata, type StorageProvider, @@ -15,13 +14,10 @@ import { URLHelper, } from '../../../base'; import { Models } from '../../../models'; +import { BackendRuntimeProvider } from '../../backend-runtime'; declare global { interface Events { - 'workspace.blob.sync': { - workspaceId: string; - key: string; - }; 'workspace.blob.delete': { workspaceId: string; key: string; @@ -57,7 +53,8 @@ export class WorkspaceBlobStorage { private readonly event: EventBus, private readonly storageFactory: StorageProviderFactory, private readonly models: Models, - private readonly url: URLHelper + private readonly url: URLHelper, + private readonly rt: BackendRuntimeProvider ) {} @OnEvent('config.init') @@ -223,34 +220,8 @@ export class WorkspaceBlobStorage { return { ok: true, metadata }; } - async list(workspaceId: string, syncBlobMeta = true) { - const blobsInDb = await this.models.blob.list(workspaceId); - - if (blobsInDb.length > 0) { - return blobsInDb; - } - - // all blobs are uploading but not completed yet - const hasDbBlobs = await this.models.blob.hasAny(workspaceId); - if (hasDbBlobs) { - return blobsInDb; - } - - const blobs = await this.provider.list(workspaceId + '/'); - blobs.forEach(blob => { - blob.key = blob.key.slice(workspaceId.length + 1); - }); - - if (syncBlobMeta) { - this.trySyncBlobsMeta(workspaceId, blobs); - } - - return blobs.map(blob => ({ - key: blob.key, - size: blob.contentLength, - createdAt: blob.lastModified, - mime: 'application/octet-stream', - })); + async list(workspaceId: string) { + return await this.models.blob.list(workspaceId); } async delete(workspaceId: string, key: string, permanently = false) { @@ -264,17 +235,17 @@ export class WorkspaceBlobStorage { } async release(workspaceId: string) { - const deletedBlobs = await this.models.blob.listDeleted(workspaceId); - - deletedBlobs.forEach(blob => { - this.event.emit('workspace.blob.delete', { - workspaceId: workspaceId, - key: blob.key, - }); - }); + let scanned = 0; + let deleted = 0; + for (;;) { + const result = await this.rt.releaseDeletedBlobs(workspaceId, 1000); + scanned += result.scanned; + deleted += result.deleted; + if (result.scanned < 1000) break; + } this.logger.log( - `released ${deletedBlobs.length} blobs for workspace ${workspaceId}` + `released ${deleted}/${scanned} blobs for workspace ${workspaceId}` ); await this.event.emitAsync('workspace.blobs.updated', { workspaceId }); @@ -291,15 +262,6 @@ export class WorkspaceBlobStorage { return this.url.link(`/api/workspaces/${workspaceId}/blobs/${avatarKey}`); } - private trySyncBlobsMeta(workspaceId: string, blobs: ListObjectsMetadata[]) { - for (const blob of blobs) { - this.event.emit('workspace.blob.sync', { - workspaceId, - key: blob.key, - }); - } - } - private async upsert( workspaceId: string, key: string, @@ -315,26 +277,9 @@ export class WorkspaceBlobStorage { }); } - @OnEvent('workspace.blob.sync') - async syncBlobMeta({ workspaceId, key }: Events['workspace.blob.sync']) { - try { - const meta = await this.provider.head(`${workspaceId}/${key}`); - - if (meta) { - await this.upsert(workspaceId, key, meta); - } else { - await this.models.blob.delete(workspaceId, key, true); - } - } catch (e) { - // never throw - this.logger.error('failed to sync blob meta to DB', e); - } - } - @OnEvent('workspace.deleted') async onWorkspaceDeleted({ id }: Events['workspace.deleted']) { - // do not sync blob meta to DB - const blobs = await this.list(id, false); + const blobs = await this.list(id); // to reduce cpu time holding blobs.forEach(blob => { diff --git a/packages/backend/server/src/models/__tests__/blob.spec.ts b/packages/backend/server/src/models/__tests__/blob.spec.ts index 04697f87a1..acd1300868 100644 --- a/packages/backend/server/src/models/__tests__/blob.spec.ts +++ b/packages/backend/server/src/models/__tests__/blob.spec.ts @@ -105,31 +105,6 @@ test('should list blobs', async t => { t.is(blobs[1].key, blob2.key); }); -test('should list deleted blobs', async t => { - const workspace = await module.create(Mockers.Workspace); - const blob = await models.blob.upsert({ - workspaceId: workspace.id, - key: 'test-key', - mime: 'text/plain', - size: 100, - }); - - await models.blob.delete(workspace.id, blob.key); - - const blobs = await models.blob.listDeleted(workspace.id); - - t.is(blobs.length, 1); - t.is(blobs[0].key, blob.key); - t.truthy(blobs[0].deletedAt); - - // delete permanently - await models.blob.delete(workspace.id, blob.key, true); - - const blobs2 = await models.blob.listDeleted(workspace.id); - - t.is(blobs2.length, 0); -}); - test('should get blob', async t => { const workspace = await module.create(Mockers.Workspace); const blob = await models.blob.upsert({ diff --git a/packages/backend/server/src/models/blob.ts b/packages/backend/server/src/models/blob.ts index 0ec2704fcb..5ae22df41e 100644 --- a/packages/backend/server/src/models/blob.ts +++ b/packages/backend/server/src/models/blob.ts @@ -96,32 +96,6 @@ export class BlobModel extends BaseModel { return count > 0; } - async listPendingExpired(before: Date) { - return await this.db.blob.findMany({ - where: { - status: 'pending', - deletedAt: null, - createdAt: { - lt: before, - }, - }, - select: { - workspaceId: true, - key: true, - uploadId: true, - }, - }); - } - - async listDeleted(workspaceId: string) { - return await this.db.blob.findMany({ - where: { - workspaceId, - deletedAt: { not: null }, - }, - }); - } - async totalSize(workspaceId: string) { const sum = await this.db.blob.aggregate({ where: { diff --git a/packages/backend/server/src/models/history.ts b/packages/backend/server/src/models/history.ts index bb8086f3d8..083faa607f 100644 --- a/packages/backend/server/src/models/history.ts +++ b/packages/backend/server/src/models/history.ts @@ -162,21 +162,4 @@ export class HistoryModel extends BaseModel { editor: row.createdByUser, }; } - - /** - * Clean expired histories. - */ - async cleanExpired() { - const { count } = await this.db.snapshotHistory.deleteMany({ - where: { - expiredAt: { - lte: new Date(), - }, - }, - }); - if (count > 0) { - this.logger.log(`Deleted ${count} expired histories`); - } - return count; - } } diff --git a/packages/backend/server/src/models/session.ts b/packages/backend/server/src/models/session.ts index e37cec0e77..1194aa512b 100644 --- a/packages/backend/server/src/models/session.ts +++ b/packages/backend/server/src/models/session.ts @@ -148,17 +148,4 @@ export class SessionModel extends BaseModel { } return count; } - - async cleanExpiredUserSessions() { - const { count } = await this.db.userSession.deleteMany({ - where: { - expiresAt: { - lte: new Date(), - }, - }, - }); - if (count > 0) { - this.logger.log(`Cleaned ${count} expired user sessions`); - } - } } diff --git a/packages/backend/server/src/native.ts b/packages/backend/server/src/native.ts index 8675ebbbf9..fcaf6635db 100644 --- a/packages/backend/server/src/native.ts +++ b/packages/backend/server/src/native.ts @@ -46,9 +46,13 @@ import serverNativeModule, { type RequestedModelMatchResponse, type ResolvedEntitlement, type ResolveEntitlementInput, + type RuntimeBlobCleanupExecuteResult, + type RuntimeBlobCleanupPlanResult, type RuntimeBlobCleanupResult, type RuntimeBlobCompleteResult, + type RuntimeBlobMetadataBackfillResult, type RuntimeByokLocalLeaseRecord, + type RuntimeDocBlobRefsResult, type RuntimeDocCompactionResult, type RuntimeMagicLinkOtpConsumeResult, type RuntimeMultipartUploadInit, @@ -91,9 +95,13 @@ export type { RemoteMimeTypeRequest, ResolvedEntitlement, ResolveEntitlementInput, + RuntimeBlobCleanupExecuteResult, + RuntimeBlobCleanupPlanResult, RuntimeBlobCleanupResult, RuntimeBlobCompleteResult, + RuntimeBlobMetadataBackfillResult, RuntimeByokLocalLeaseRecord, + RuntimeDocBlobRefsResult, RuntimeDocCompactionResult, RuntimeMagicLinkOtpConsumeResult, RuntimeMultipartUploadInit, diff --git a/packages/frontend/admin/src/config.json b/packages/frontend/admin/src/config.json index b792db7b98..b24b005f62 100644 --- a/packages/frontend/admin/src/config.json +++ b/packages/frontend/admin/src/config.json @@ -46,6 +46,10 @@ "queues.nightly": { "type": "Object", "desc": "The config for nightly job queue" + }, + "queues.backendRuntime": { + "type": "Object", + "desc": "The config for backend runtime job queue" } }, "throttle": {