feat(server): blob reconciliation (#15165)

#### PR Dependency Tree


* **PR #15165** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added automated backend maintenance for missing blob metadata
backfill, document-to-blob reference rebuilding, and unreferenced blob
cleanup planning/execution.
* Introduced scheduled batch processing (workspace-paged) and paginated
object-storage listing.
* **Bug Fixes**
* Improved reliability of object-storage reads by treating expected “not
found” results as non-errors.
* Strengthened blob/expired cleanup flows with runtime-driven batching
and reduced coupling to metadata synchronization.
* **Tests**
* Expanded unit and e2e coverage for partial blob metadata and updated
runtime/job cleanup test assertions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-06-29 00:02:38 +08:00
committed by GitHub
parent 4a7c931eca
commit 0a422aa158
42 changed files with 2494 additions and 264 deletions
+12
View File
@@ -121,6 +121,18 @@
"default": {
"concurrency": 1
}
},
"queues.backendRuntime": {
"type": "object",
"description": "The config for backend runtime job queue\n@default {\"concurrency\":1}",
"properties": {
"concurrency": {
"type": "number"
}
},
"default": {
"concurrency": 1
}
}
}
},
+6
View File
@@ -0,0 +1,6 @@
/* auto-generated by NAPI-RS */
/* eslint-disable */
declare const _default: typeof import('./index')
export default _default
+48
View File
@@ -1,13 +1,22 @@
/* auto-generated by NAPI-RS */
/* eslint-disable */
declare const _default: typeof import('./index')
export default _default
export declare class BackendRuntime {
planUnreferencedWorkspaceBlobs(workspaceId: string, gracePeriodDays: number, limit: number): Promise<RuntimeBlobCleanupPlanResult>
executeBlobCleanupCandidates(runId: string, gracePeriodDays: number, limit: number): Promise<RuntimeBlobCleanupExecuteResult>
completeBlobUpload(workspaceId: string, key: string, expectedSize: number, expectedMime: string): Promise<RuntimeBlobCompleteResult>
completeFsBlobUpload(root: string, bucket: string, workspaceId: string, key: string, expectedSize: number, expectedMime: string): Promise<RuntimeBlobCompleteResult>
cleanupExpiredPendingBlobs(cutoffMs: number, limit: number): Promise<RuntimeBlobCleanupResult>
releaseDeletedBlobs(workspaceId: string, limit: number): Promise<RuntimeBlobCleanupResult>
backfillMissingBlobMetadata(workspaceId: string | undefined | null, limit: number): Promise<RuntimeBlobMetadataBackfillResult>
acquireCoordinationLease(key: string, owner: string, ttlMs: number): Promise<CoordinationLeaseGrant | null>
releaseCoordinationLease(key: string, owner: string, fencingToken: bigint | number): Promise<boolean>
renewCoordinationLease(key: string, owner: string, fencingToken: bigint | number, ttlMs: number): Promise<boolean>
rebuildDocBlobRefs(workspaceId: string, docId: string): Promise<RuntimeDocBlobRefsResult>
rebuildWorkspaceDocBlobRefs(workspaceId: string, limit: number): Promise<RuntimeDocBlobRefsResult>
/**
* Merge pending doc updates with y-octo and persist the merged snapshot.
*
@@ -816,6 +825,25 @@ export declare function resolveEntitlementV1(input: ResolveEntitlementInput): Re
export declare function runNativeActionRecipePreparedStream(input: ActionRuntimeInput, callback: ((err: Error | null, arg: string) => void)): LlmStreamHandle
export interface RuntimeBlobCleanupExecuteResult {
scannedCandidates: number
deletedObjects: number
deletedMetadata: number
skippedStillReferenced: number
failed: number
workspaceIds: Array<string>
}
export interface RuntimeBlobCleanupPlanResult {
runId?: string
scannedBlobs: number
candidatesMarked: number
protectedByDocRefs: number
protectedByMetadata: number
protectedByOtherRefs: number
nextCursor?: string
}
export interface RuntimeBlobCleanupResult {
scanned: number
deleted: number
@@ -831,12 +859,32 @@ export interface RuntimeBlobCompleteResult {
lastModifiedMs?: number
}
export interface RuntimeBlobMetadataBackfillResult {
scannedObjects: number
headedObjects: number
upsertedMetadata: number
skippedExisting: number
skippedWorkspaceMissing: number
failed: number
nextCursor?: string
workspaceIds: Array<string>
}
export interface RuntimeByokLocalLeaseRecord {
leaseId: string
payload: any
expiresAtMs: number
}
export interface RuntimeDocBlobRefsResult {
scannedDocs: number
parsedDocs: number
refsWritten: number
refsDeleted: number
failedDocs: number
nextCursor?: string
}
export interface RuntimeDocCompactionResult {
leaseAcquired: boolean
merged: boolean
+1
View File
@@ -16,6 +16,7 @@
},
"napi": {
"binaryName": "server-native",
"dtsHeaderFile": "dts-header.d.ts",
"targets": [
"aarch64-apple-darwin",
"aarch64-unknown-linux-gnu",
@@ -0,0 +1,647 @@
use chrono::{DateTime, Duration, Utc};
use napi::Result;
use sqlx::{FromRow, PgPool};
use super::{
BackendRuntime,
error::napi_error,
types::{RuntimeBlobCleanupExecuteResult, RuntimeBlobCleanupPlanResult},
};
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blob_cleanup_plan_result_keeps_run_id_for_execute() {
let result = RuntimeBlobCleanupPlanResult {
run_id: Some("00000000-0000-0000-0000-000000000000".to_string()),
scanned_blobs: 1,
candidates_marked: 1,
protected_by_doc_refs: 0,
protected_by_metadata: 0,
protected_by_other_refs: 0,
next_cursor: None,
};
assert!(result.run_id.is_some());
assert_eq!(result.candidates_marked, 1);
}
#[test]
fn blob_cleanup_execute_result_tracks_skipped_and_failed_counts() {
let result = RuntimeBlobCleanupExecuteResult {
scanned_candidates: 3,
deleted_objects: 1,
deleted_metadata: 1,
skipped_still_referenced: 1,
failed: 1,
workspace_ids: vec!["workspace".to_string()],
};
assert_eq!(result.scanned_candidates, 3);
assert_eq!(
result.skipped_still_referenced + result.failed + result.deleted_objects,
3
);
}
}
#[derive(FromRow)]
struct BlobCandidateRow {
workspace_id: String,
key: String,
size: i32,
}
#[derive(FromRow)]
struct MarkedCandidateRow {
workspace_id: String,
blob_key: String,
}
fn push_workspace_once(workspace_ids: &mut Vec<String>, workspace_id: &str) {
if !workspace_ids.iter().any(|id| id == workspace_id) {
workspace_ids.push(workspace_id.to_string());
}
}
async fn checkpoint_completed(pool: &PgPool, kind: &str, scope: &str) -> Result<bool> {
sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM blob_reconciliation_checkpoints WHERE kind = $1 AND scope = $2 AND status = \
'completed')",
)
.bind(kind)
.bind(scope)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup checkpoint check failed: {err}")))
}
async fn projection_is_stale(pool: &PgPool, workspace_id: &str) -> Result<bool> {
let checkpoint_fresh = checkpoint_completed(pool, "doc_blob_refs", workspace_id).await?;
let has_stale_rows = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM doc_blob_refs WHERE workspace_id = $1 AND status <> 'fresh')",
)
.bind(workspace_id)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup projection freshness check failed: {err}")))?;
Ok(!checkpoint_fresh || has_stale_rows)
}
async fn stale_projection_workspaces(pool: &PgPool, workspace_id: &str) -> Result<Vec<String>> {
if projection_is_stale(pool, workspace_id).await? {
Ok(vec![workspace_id.to_string()])
} else {
Ok(Vec::new())
}
}
async fn metadata_backfill_is_complete(pool: &PgPool, workspace_id: &str) -> Result<bool> {
checkpoint_completed(pool, "blob_metadata_backfill", workspace_id).await
}
async fn has_doc_ref(pool: &PgPool, workspace_id: &str, key: &str) -> Result<bool> {
sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM doc_blob_refs WHERE workspace_id = $1 AND blob_key = $2 AND status = 'fresh')",
)
.bind(workspace_id)
.bind(key)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup doc ref check failed: {err}")))
}
async fn has_other_ref(pool: &PgPool, workspace_id: &str, key: &str) -> Result<bool> {
let required_ref = sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND avatar_key = $2)
OR EXISTS(SELECT 1 FROM ai_transcript_tasks WHERE workspace_id = $1 AND blob_id = $2)
OR EXISTS(SELECT 1 FROM ai_jobs WHERE workspace_id = $1 AND blob_id = $2)
OR EXISTS(
SELECT 1
FROM ai_contexts c
JOIN ai_sessions_metadata s ON s.id = c.session_id
WHERE s.workspace_id = $1
AND jsonb_path_exists(
c.config::jsonb,
'$.** ? (@ == $blobKey)',
jsonb_build_object('blobKey', to_jsonb($2::text))
)
)
"#,
)
.bind(workspace_id)
.bind(key)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup protected ref check failed: {err}")))?;
if required_ref {
return Ok(true);
}
if table_exists(pool, "ai_workspace_files").await?
&& sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM ai_workspace_files WHERE workspace_id = $1 AND blob_id = $2)",
)
.bind(workspace_id)
.bind(key)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup workspace file ref check failed: {err}")))?
{
return Ok(true);
}
if table_exists(pool, "ai_workspace_blob_embeddings").await?
&& sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM ai_workspace_blob_embeddings WHERE workspace_id = $1 AND blob_id = $2)",
)
.bind(workspace_id)
.bind(key)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup workspace blob embedding ref check failed: {err}")))?
{
return Ok(true);
}
Ok(false)
}
async fn table_exists(pool: &PgPool, table: &str) -> Result<bool> {
sqlx::query_scalar::<_, bool>("SELECT to_regclass($1) IS NOT NULL")
.bind(format!("public.{table}"))
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup table existence check failed: {err}")))
}
async fn load_completed_blobs(
pool: &PgPool,
workspace_id: &str,
after_key: Option<&str>,
limit: i64,
) -> Result<Vec<BlobCandidateRow>> {
sqlx::query_as::<_, BlobCandidateRow>(
r#"
SELECT workspace_id, key, size
FROM blobs
WHERE workspace_id = $1
AND status = 'completed'
AND deleted_at IS NULL
AND ($2::text IS NULL OR key > $2)
ORDER BY key ASC
LIMIT $3
"#,
)
.bind(workspace_id)
.bind(after_key)
.bind(limit)
.fetch_all(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup load completed blobs failed: {err}")))
}
async fn load_plan_cursor(pool: &PgPool, workspace_id: &str) -> Result<Option<String>> {
let row = sqlx::query_as::<_, (String, serde_json::Value)>(
"SELECT status, cursor FROM blob_reconciliation_checkpoints WHERE kind = 'blob_cleanup_plan' AND scope = $1",
)
.bind(workspace_id)
.fetch_optional(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup plan checkpoint load failed: {err}")))?;
let Some((status, cursor)) = row else {
return Ok(None);
};
if status == "completed" {
return Ok(None);
}
Ok({
cursor
.get("lastBlobKey")
.and_then(|value| value.as_str())
.map(ToString::to_string)
})
}
async fn upsert_plan_checkpoint(
pool: &PgPool,
workspace_id: &str,
last_blob_key: Option<&str>,
completed: bool,
) -> Result<()> {
let status = if completed { "completed" } else { "running" };
sqlx::query(
r#"
INSERT INTO blob_reconciliation_checkpoints
(kind, scope, status, cursor, last_key, completed_at)
VALUES ('blob_cleanup_plan', $1, $2, $3, $4, CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END)
ON CONFLICT (kind, scope) DO UPDATE
SET status = EXCLUDED.status,
cursor = EXCLUDED.cursor,
last_key = COALESCE(EXCLUDED.last_key, blob_reconciliation_checkpoints.last_key),
completed_at = CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END,
updated_at = CURRENT_TIMESTAMP
"#,
)
.bind(workspace_id)
.bind(status)
.bind(serde_json::json!({ "lastBlobKey": last_blob_key }))
.bind(last_blob_key)
.bind(completed)
.execute(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup plan checkpoint write failed: {err}")))?;
Ok(())
}
async fn create_run(pool: &PgPool, workspace_id: &str) -> Result<String> {
sqlx::query_scalar::<_, String>(
r#"
INSERT INTO blob_reconciliation_runs (kind, mode, status, workspace_id)
VALUES ('blob_cleanup_plan', 'mark_only', 'running', $1)
RETURNING id::text
"#,
)
.bind(workspace_id)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup create run failed: {err}")))
}
async fn finish_run(
pool: &PgPool,
run_id: &str,
workspace_id: &str,
result: &RuntimeBlobCleanupPlanResult,
stale_projection_workspaces: Vec<String>,
) -> Result<()> {
let candidate_bytes = sqlx::query_scalar::<_, Option<i64>>(
"SELECT SUM(object_size)::bigint FROM blob_cleanup_candidates WHERE run_id = $1::uuid AND status = 'marked'",
)
.bind(run_id)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup candidate bytes audit failed: {err}")))?
.unwrap_or(0);
sqlx::query(
r#"
UPDATE blob_reconciliation_runs
SET status = 'finished',
finished_at = CURRENT_TIMESTAMP,
scanned = $2,
changed = $3,
metadata = $4
WHERE id = $1::uuid
"#,
)
.bind(run_id)
.bind(result.scanned_blobs as i32)
.bind(result.candidates_marked as i32)
.bind(serde_json::json!({
"protectedByDocRefs": result.protected_by_doc_refs,
"protectedByMetadata": result.protected_by_metadata,
"protectedByOtherRefs": result.protected_by_other_refs,
"topWorkspaceCandidateBytes": [{
"workspaceId": workspace_id,
"candidateBytes": candidate_bytes,
}],
"staleOrFailedProjectionWorkspaces": stale_projection_workspaces,
}))
.execute(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup finish run failed: {err}")))?;
Ok(())
}
async fn mark_candidate_status(
pool: &PgPool,
run_id: &str,
workspace_id: &str,
blob_key: &str,
status: &str,
evidence: serde_json::Value,
error: Option<&str>,
) -> Result<()> {
sqlx::query(
r#"
UPDATE blob_cleanup_candidates
SET status = $3,
executed_at = CURRENT_TIMESTAMP,
evidence = evidence || $4,
error = $5
WHERE workspace_id = $1 AND blob_key = $2 AND run_id = $6::uuid
"#,
)
.bind(workspace_id)
.bind(blob_key)
.bind(status)
.bind(evidence)
.bind(error)
.bind(run_id)
.execute(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup mark candidate status failed: {err}")))?;
Ok(())
}
async fn finish_execute_run(pool: &PgPool, run_id: &str, result: &RuntimeBlobCleanupExecuteResult) -> Result<()> {
sqlx::query(
r#"
UPDATE blob_reconciliation_runs
SET status = 'finished',
finished_at = CURRENT_TIMESTAMP,
scanned = $2,
changed = $3,
failed = $4,
metadata = metadata || $5
WHERE id = $1::uuid
"#,
)
.bind(run_id)
.bind(result.scanned_candidates as i32)
.bind(result.deleted_metadata as i32)
.bind(result.failed as i32)
.bind(serde_json::json!({
"deletedObjects": result.deleted_objects,
"deletedMetadata": result.deleted_metadata,
"skippedStillReferenced": result.skipped_still_referenced,
"failed": result.failed,
}))
.execute(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup execute run finish failed: {err}")))?;
Ok(())
}
async fn mark_candidate(
pool: &PgPool,
run_id: &str,
row: &BlobCandidateRow,
object_size: i64,
object_last_modified: DateTime<Utc>,
) -> Result<i64> {
let result = sqlx::query(
r#"
INSERT INTO blob_cleanup_candidates
(workspace_id, blob_key, reason, status, object_size, object_last_modified, run_id, evidence)
VALUES ($1, $2, 'unreferenced_completed_blob', 'marked', $3, $4, $5::uuid, $6)
ON CONFLICT (workspace_id, blob_key) DO UPDATE
SET reason = EXCLUDED.reason,
status = 'marked',
object_size = EXCLUDED.object_size,
object_last_modified = EXCLUDED.object_last_modified,
planned_at = CURRENT_TIMESTAMP,
run_id = EXCLUDED.run_id,
evidence = EXCLUDED.evidence,
error = NULL
"#,
)
.bind(&row.workspace_id)
.bind(&row.key)
.bind(object_size)
.bind(object_last_modified)
.bind(run_id)
.bind(serde_json::json!({ "metadataSize": row.size }))
.execute(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup mark candidate failed: {err}")))?;
Ok(result.rows_affected() as i64)
}
async fn load_marked_candidates(pool: &PgPool, run_id: &str, limit: i64) -> Result<Vec<MarkedCandidateRow>> {
sqlx::query_as::<_, MarkedCandidateRow>(
r#"
SELECT workspace_id, blob_key
FROM blob_cleanup_candidates
WHERE run_id = $1::uuid AND status IN ('marked', 'failed')
ORDER BY CASE WHEN status = 'marked' THEN 0 ELSE 1 END, planned_at ASC
LIMIT $2
"#,
)
.bind(run_id)
.bind(limit)
.fetch_all(pool)
.await
.map_err(|err| napi_error(format!("Blob cleanup load marked candidates failed: {err}")))
}
#[napi_derive::napi]
impl BackendRuntime {
#[napi]
pub async fn plan_unreferenced_workspace_blobs(
&self,
workspace_id: String,
grace_period_days: i64,
limit: i64,
) -> Result<RuntimeBlobCleanupPlanResult> {
if limit <= 0 {
return Err(napi_error("blob cleanup plan limit must be positive"));
}
if grace_period_days < 0 {
return Err(napi_error("blob cleanup grace period must be non-negative"));
}
let pool = self.pool().await?;
let run_id = create_run(&pool, &workspace_id).await?;
let mut result = RuntimeBlobCleanupPlanResult {
run_id: Some(run_id.clone()),
scanned_blobs: 0,
candidates_marked: 0,
protected_by_doc_refs: 0,
protected_by_metadata: 0,
protected_by_other_refs: 0,
next_cursor: None,
};
let cursor = load_plan_cursor(&pool, &workspace_id).await?;
let stale_projection_workspaces = stale_projection_workspaces(&pool, &workspace_id).await?;
if !metadata_backfill_is_complete(&pool, &workspace_id).await? || !stale_projection_workspaces.is_empty() {
result.protected_by_metadata = load_completed_blobs(&pool, &workspace_id, cursor.as_deref(), limit)
.await?
.len() as i64;
finish_run(&pool, &run_id, &workspace_id, &result, stale_projection_workspaces).await?;
return Ok(result);
}
let min_last_modified = Utc::now() - Duration::days(grace_period_days);
let rows = load_completed_blobs(&pool, &workspace_id, cursor.as_deref(), limit).await?;
let has_more = rows.len() == limit as usize;
let mut last_blob_key = None;
for row in rows {
result.scanned_blobs += 1;
last_blob_key = Some(row.key.clone());
if has_doc_ref(&pool, &row.workspace_id, &row.key).await? {
result.protected_by_doc_refs += 1;
continue;
}
if has_other_ref(&pool, &row.workspace_id, &row.key).await? {
result.protected_by_other_refs += 1;
continue;
}
let object_key = format!("{}/{}", row.workspace_id, row.key);
let Some(metadata) = self.object_storage_head(object_key).await? else {
result.protected_by_metadata += 1;
continue;
};
let last_modified = DateTime::<Utc>::from_timestamp_millis(metadata.last_modified_ms)
.ok_or_else(|| napi_error("blob cleanup object last modified is invalid"))?;
if metadata.content_length != row.size as i64 || last_modified > min_last_modified {
result.protected_by_metadata += 1;
continue;
}
result.candidates_marked += mark_candidate(&pool, &run_id, &row, metadata.content_length, last_modified).await?;
}
if has_more {
result.next_cursor = last_blob_key.clone();
}
upsert_plan_checkpoint(&pool, &workspace_id, last_blob_key.as_deref(), !has_more).await?;
finish_run(&pool, &run_id, &workspace_id, &result, Vec::new()).await?;
Ok(result)
}
#[napi]
pub async fn execute_blob_cleanup_candidates(
&self,
run_id: String,
grace_period_days: i64,
limit: i64,
) -> Result<RuntimeBlobCleanupExecuteResult> {
if limit <= 0 {
return Err(napi_error("blob cleanup execute limit must be positive"));
}
if grace_period_days < 0 {
return Err(napi_error("blob cleanup grace period must be non-negative"));
}
let pool = self.pool().await?;
let min_last_modified = Utc::now() - Duration::days(grace_period_days);
let rows = load_marked_candidates(&pool, &run_id, limit).await?;
let mut result = RuntimeBlobCleanupExecuteResult {
scanned_candidates: rows.len() as i64,
deleted_objects: 0,
deleted_metadata: 0,
skipped_still_referenced: 0,
failed: 0,
workspace_ids: Vec::new(),
};
for row in rows {
if projection_is_stale(&pool, &row.workspace_id).await?
|| has_doc_ref(&pool, &row.workspace_id, &row.blob_key).await?
|| has_other_ref(&pool, &row.workspace_id, &row.blob_key).await?
{
result.skipped_still_referenced += 1;
mark_candidate_status(
&pool,
&run_id,
&row.workspace_id,
&row.blob_key,
"skipped",
serde_json::json!({ "skipReason": "referenced_or_projection_stale" }),
None,
)
.await?;
continue;
}
let object_key = format!("{}/{}", row.workspace_id, row.blob_key);
let mut object_was_missing = false;
let metadata = match self.object_storage_head(object_key.clone()).await {
Ok(metadata) => metadata,
Err(err) => {
result.failed += 1;
mark_candidate_status(
&pool,
&run_id,
&row.workspace_id,
&row.blob_key,
"failed",
serde_json::json!({ "failure": "object_head_failed" }),
Some(&err.to_string()),
)
.await?;
continue;
}
};
if let Some(metadata) = metadata {
let last_modified = DateTime::<Utc>::from_timestamp_millis(metadata.last_modified_ms)
.ok_or_else(|| napi_error("blob cleanup execute object last modified is invalid"))?;
if last_modified > min_last_modified {
result.skipped_still_referenced += 1;
mark_candidate_status(
&pool,
&run_id,
&row.workspace_id,
&row.blob_key,
"skipped",
serde_json::json!({ "skipReason": "object_inside_grace_period" }),
None,
)
.await?;
continue;
}
if let Err(err) = self.object_storage_delete(object_key).await {
result.failed += 1;
mark_candidate_status(
&pool,
&run_id,
&row.workspace_id,
&row.blob_key,
"failed",
serde_json::json!({ "failure": "object_delete_failed" }),
Some(&err.to_string()),
)
.await?;
continue;
}
result.deleted_objects += 1;
} else {
object_was_missing = true;
}
let deleted_metadata =
match sqlx::query("DELETE FROM blobs WHERE workspace_id = $1 AND key = $2 AND deleted_at IS NULL")
.bind(&row.workspace_id)
.bind(&row.blob_key)
.execute(&pool)
.await
{
Ok(result) => result.rows_affected() as i64,
Err(err) => {
result.failed += 1;
mark_candidate_status(
&pool,
&run_id,
&row.workspace_id,
&row.blob_key,
"failed",
serde_json::json!({ "failure": "metadata_delete_failed" }),
Some(&err.to_string()),
)
.await?;
continue;
}
};
result.deleted_metadata += deleted_metadata;
push_workspace_once(&mut result.workspace_ids, &row.workspace_id);
mark_candidate_status(
&pool,
&run_id,
&row.workspace_id,
&row.blob_key,
"executed",
serde_json::json!({
"deletedMetadata": deleted_metadata,
"objectMissingBeforeDelete": object_was_missing,
}),
None,
)
.await?;
}
finish_execute_run(&pool, &run_id, &result).await?;
Ok(result)
}
}
@@ -0,0 +1,276 @@
use chrono::{DateTime, Utc};
use napi::Result;
use sqlx::{FromRow, PgPool};
use super::{
BackendRuntime,
error::napi_error,
types::{RuntimeBlobMetadataBackfillResult, RuntimeObjectMetadata},
};
async fn workspace_exists(pool: &PgPool, workspace_id: &str) -> Result<bool> {
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1)")
.bind(workspace_id)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob metadata backfill workspace check failed: {err}")))
}
async fn blob_exists(pool: &PgPool, workspace_id: &str, key: &str) -> Result<bool> {
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM blobs WHERE workspace_id = $1 AND key = $2)")
.bind(workspace_id)
.bind(key)
.fetch_one(pool)
.await
.map_err(|err| napi_error(format!("Blob metadata backfill blob check failed: {err}")))
}
async fn upsert_blob_metadata(
pool: &PgPool,
workspace_id: &str,
key: &str,
metadata: RuntimeObjectMetadata,
) -> Result<i64> {
let last_modified = DateTime::<Utc>::from_timestamp_millis(metadata.last_modified_ms)
.ok_or_else(|| napi_error("Blob metadata backfill object last modified is invalid"))?;
let result = sqlx::query(
r#"
INSERT INTO blobs (workspace_id, key, size, mime, status, upload_id, created_at, deleted_at)
VALUES ($1, $2, $3, $4, 'completed', NULL, $5, NULL)
ON CONFLICT (workspace_id, key) DO UPDATE
SET size = EXCLUDED.size,
mime = EXCLUDED.mime,
status = 'completed',
upload_id = NULL,
deleted_at = NULL
WHERE blobs.deleted_at IS NULL
"#,
)
.bind(workspace_id)
.bind(key)
.bind(metadata.content_length as i32)
.bind(metadata.content_type)
.bind(last_modified)
.execute(pool)
.await
.map_err(|err| napi_error(format!("Blob metadata backfill upsert failed: {err}")))?;
Ok(result.rows_affected() as i64)
}
fn split_workspace_blob_key(full_key: &str) -> Option<(&str, &str)> {
let (workspace_id, key) = full_key.split_once('/')?;
if workspace_id.is_empty() || key.is_empty() || key.contains('/') {
return None;
}
Some((workspace_id, key))
}
fn checkpoint_scope(workspace_id: Option<&str>) -> String {
workspace_id.unwrap_or("__all__").to_string()
}
#[derive(FromRow)]
struct BackfillCheckpoint {
last_key: Option<String>,
cursor: serde_json::Value,
}
impl BackfillCheckpoint {
fn continuation_token(&self) -> Option<String> {
self
.cursor
.get("continuationToken")
.and_then(|value| value.as_str())
.map(ToString::to_string)
}
}
async fn load_checkpoint(pool: &PgPool, scope: &str) -> Result<Option<BackfillCheckpoint>> {
sqlx::query_as::<_, BackfillCheckpoint>(
"SELECT last_key, cursor FROM blob_reconciliation_checkpoints WHERE kind = 'blob_metadata_backfill' AND scope = $1",
)
.bind(scope)
.fetch_optional(pool)
.await
.map_err(|err| napi_error(format!("Blob metadata backfill checkpoint load failed: {err}")))
}
async fn upsert_checkpoint(
pool: &PgPool,
scope: &str,
last_key: Option<&str>,
continuation_token: Option<&str>,
completed: bool,
) -> Result<()> {
let status = if completed { "completed" } else { "running" };
sqlx::query(
r#"
INSERT INTO blob_reconciliation_checkpoints
(kind, scope, status, cursor, last_key, completed_at, metadata)
VALUES ('blob_metadata_backfill', $1, $2, $3, $4, CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END, $6)
ON CONFLICT (kind, scope) DO UPDATE
SET status = EXCLUDED.status,
cursor = EXCLUDED.cursor,
last_key = COALESCE(EXCLUDED.last_key, blob_reconciliation_checkpoints.last_key),
completed_at = CASE WHEN $5 THEN CURRENT_TIMESTAMP ELSE NULL END,
updated_at = CURRENT_TIMESTAMP,
metadata = EXCLUDED.metadata
"#,
)
.bind(scope)
.bind(status)
.bind(serde_json::json!({
"lastKey": last_key,
"continuationToken": continuation_token,
}))
.bind(last_key)
.bind(completed)
.bind(serde_json::json!({
"quotaReportingReconciliationRequired": true,
}))
.execute(pool)
.await
.map_err(|err| napi_error(format!("Blob metadata backfill checkpoint write failed: {err}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blob_metadata_backfill_splits_workspace_blob_keys() {
assert_eq!(
split_workspace_blob_key("workspace/blob-key"),
Some(("workspace", "blob-key"))
);
assert_eq!(split_workspace_blob_key("workspace/nested/blob-key"), None);
assert_eq!(split_workspace_blob_key("workspace/"), None);
assert_eq!(split_workspace_blob_key("blob-key"), None);
}
#[test]
fn blob_metadata_backfill_checkpoint_scope_is_explicit() {
assert_eq!(checkpoint_scope(Some("workspace")), "workspace");
assert_eq!(checkpoint_scope(None), "__all__");
}
}
fn push_workspace_once(workspace_ids: &mut Vec<String>, workspace_id: &str) {
if !workspace_ids.iter().any(|id| id == workspace_id) {
workspace_ids.push(workspace_id.to_string());
}
}
fn checked_list_page_limit(limit: i64) -> Result<i32> {
i32::try_from(limit).map_err(|_| napi_error("blob metadata backfill limit exceeds i32::MAX"))
}
#[napi_derive::napi]
impl BackendRuntime {
#[napi]
pub async fn backfill_missing_blob_metadata(
&self,
workspace_id: Option<String>,
limit: i64,
) -> Result<RuntimeBlobMetadataBackfillResult> {
if limit <= 0 {
return Err(napi_error("blob metadata backfill limit must be positive"));
}
let page_limit = checked_list_page_limit(limit)?;
let pool = self.pool().await?;
let prefix = workspace_id.as_ref().map(|id| format!("{id}/"));
let scope = checkpoint_scope(workspace_id.as_deref());
let checkpoint = load_checkpoint(&pool, &scope).await?;
let page = self
.object_storage_list_page(
prefix,
checkpoint.as_ref().and_then(BackfillCheckpoint::continuation_token),
checkpoint.as_ref().and_then(|checkpoint| checkpoint.last_key.clone()),
page_limit,
)
.await?;
let has_more = page.next_continuation_token.is_some();
let mut result = RuntimeBlobMetadataBackfillResult {
scanned_objects: 0,
headed_objects: 0,
upserted_metadata: 0,
skipped_existing: 0,
skipped_workspace_missing: 0,
failed: 0,
next_cursor: None,
workspace_ids: Vec::new(),
};
let mut last_scanned_key = None;
for object in &page.entries {
result.scanned_objects += 1;
last_scanned_key = Some(object.key.clone());
let Some((object_workspace_id, key)) = split_workspace_blob_key(&object.key) else {
result.failed += 1;
continue;
};
if workspace_id.as_deref().is_some_and(|id| id != object_workspace_id) {
result.failed += 1;
continue;
}
if !workspace_exists(&pool, object_workspace_id).await? {
result.skipped_workspace_missing += 1;
continue;
}
if blob_exists(&pool, object_workspace_id, key).await? {
result.skipped_existing += 1;
continue;
}
result.headed_objects += 1;
let Some(metadata) = self.object_storage_head(object.key.clone()).await? else {
result.failed += 1;
continue;
};
let affected = upsert_blob_metadata(&pool, object_workspace_id, key, metadata).await?;
if affected > 0 {
result.upserted_metadata += affected;
push_workspace_once(&mut result.workspace_ids, object_workspace_id);
}
}
if has_more {
result.next_cursor = last_scanned_key.clone();
}
upsert_checkpoint(
&pool,
&scope,
last_scanned_key.as_deref(),
page.next_continuation_token.as_deref(),
!has_more,
)
.await?;
sqlx::query(
r#"
INSERT INTO blob_reconciliation_runs
(kind, mode, status, workspace_id, finished_at, scanned, changed, failed, metadata)
VALUES ('blob_metadata_backfill', 'execute', 'finished', $1, CURRENT_TIMESTAMP, $2, $3, $4, $5)
"#,
)
.bind(workspace_id)
.bind(result.scanned_objects as i32)
.bind(result.upserted_metadata as i32)
.bind(result.failed as i32)
.bind(serde_json::json!({
"headedObjects": result.headed_objects,
"skippedExisting": result.skipped_existing,
"skippedWorkspaceMissing": result.skipped_workspace_missing,
"checkpointScope": scope,
"nextCursor": result.next_cursor,
"quotaReportingReconciliationRequired": true,
}))
.execute(&pool)
.await
.map_err(|err| napi_error(format!("Blob metadata backfill run record failed: {err}")))?;
Ok(result)
}
}
@@ -20,8 +20,9 @@ pub(super) struct RuntimeConfig {
impl RuntimeConfig {
pub(super) fn from_config_files() -> Result<Self> {
let database_url =
database_url_from_config_files()?.unwrap_or_else(|| "postgresql://localhost:5432/affine".to_string());
let database_url = database_url_from_env()
.or(database_url_from_config_files()?)
.unwrap_or_else(|| "postgresql://localhost:5432/affine".to_string());
let storage = ObjectStorageConfig::from_config_files()?;
Ok(Self { database_url, storage })
}
@@ -39,6 +40,14 @@ struct DbConfigFile {
datasource_url: Option<String>,
}
fn database_url_from_env() -> Option<String> {
env::var("DATABASE_URL").ok().and_then(non_empty_string)
}
fn non_empty_string(value: String) -> Option<String> {
if value.trim().is_empty() { None } else { Some(value) }
}
fn database_url_from_config_files() -> Result<Option<String>> {
let mut database_url = None;
for path in config_json_paths() {
@@ -49,9 +58,7 @@ fn database_url_from_config_files() -> Result<Option<String>> {
.map_err(|err| napi_error(format!("failed to read config file {}: {err}", path.display())))?;
let config: AppConfigFile = serde_json::from_str(&raw)
.map_err(|err| napi_error(format!("failed to parse config file {}: {err}", path.display())))?;
if let Some(next) = config.db.and_then(|db| db.datasource_url)
&& !next.trim().is_empty()
{
if let Some(next) = config.db.and_then(|db| db.datasource_url).and_then(non_empty_string) {
database_url = Some(next);
}
}
@@ -125,4 +132,14 @@ mod tests {
.all(|path| !path.to_string_lossy().contains("packages/backend/server"))
);
}
#[test]
fn blank_database_urls_are_ignored() {
assert_eq!(non_empty_string("".to_string()), None);
assert_eq!(non_empty_string(" ".to_string()), None);
assert_eq!(
non_empty_string("postgresql://affine:affine@localhost:5432/affine".to_string()),
Some("postgresql://affine:affine@localhost:5432/affine".to_string())
);
}
}
@@ -0,0 +1,416 @@
use affine_common::doc_parser;
use chrono::{DateTime, Utc};
use napi::Result;
use sqlx::{FromRow, PgPool};
use y_octo::Doc;
use super::{BackendRuntime, error::napi_error, types::RuntimeDocBlobRefsResult};
const PARSER_VERSION: i32 = 1;
#[derive(FromRow)]
struct SnapshotRow {
workspace_id: String,
doc_id: String,
blob: Vec<u8>,
updated_at: DateTime<Utc>,
}
#[derive(FromRow)]
struct UpdateRow {
blob: Vec<u8>,
created_at: DateTime<Utc>,
}
struct ExtractedRef {
blob_key: String,
block_id: String,
flavour: String,
}
async fn load_snapshot(pool: &PgPool, workspace_id: &str, doc_id: &str) -> Result<Option<SnapshotRow>> {
sqlx::query_as::<_, SnapshotRow>(
r#"
SELECT workspace_id, guid AS doc_id, blob, updated_at
FROM snapshots
WHERE workspace_id = $1 AND guid = $2
"#,
)
.bind(workspace_id)
.bind(doc_id)
.fetch_optional(pool)
.await
.map_err(|err| napi_error(format!("Doc blob refs load snapshot failed: {err}")))
}
async fn load_updates(pool: &PgPool, workspace_id: &str, doc_id: &str) -> Result<Vec<UpdateRow>> {
sqlx::query_as::<_, UpdateRow>(
r#"
SELECT blob, created_at
FROM updates
WHERE workspace_id = $1 AND guid = $2
ORDER BY created_at ASC
"#,
)
.bind(workspace_id)
.bind(doc_id)
.fetch_all(pool)
.await
.map_err(|err| napi_error(format!("Doc blob refs load updates failed: {err}")))
}
fn apply_doc_updates(updates: impl IntoIterator<Item = Vec<u8>>) -> Result<Vec<u8>> {
let mut doc = Doc::default();
for update in updates {
doc
.apply_update_from_binary_v1(&update)
.map_err(|err| napi_error(format!("Doc blob refs merge failed: {err}")))?;
}
doc
.encode_update_v1()
.map_err(|err| napi_error(format!("Doc blob refs encode failed: {err}")))
}
async fn load_current_doc(pool: &PgPool, workspace_id: &str, doc_id: &str) -> Result<Option<SnapshotRow>> {
let snapshot = load_snapshot(pool, workspace_id, doc_id).await?;
let updates = load_updates(pool, workspace_id, doc_id).await?;
if snapshot.is_none() && updates.is_empty() {
return Ok(None);
}
let mut merge_inputs = Vec::with_capacity(updates.len() + usize::from(snapshot.is_some()));
let mut updated_at = snapshot
.as_ref()
.map(|snapshot| snapshot.updated_at)
.unwrap_or_else(Utc::now);
if let Some(snapshot) = snapshot {
merge_inputs.push(snapshot.blob);
}
for update in updates {
updated_at = update.created_at;
merge_inputs.push(update.blob);
}
Ok(Some(SnapshotRow {
workspace_id: workspace_id.to_string(),
doc_id: doc_id.to_string(),
blob: apply_doc_updates(merge_inputs)?,
updated_at,
}))
}
async fn load_workspace_doc_ids(pool: &PgPool, workspace_id: &str) -> Result<Vec<String>> {
let Some(root) = load_current_doc(pool, workspace_id, workspace_id).await? else {
return Ok(Vec::new());
};
let ids = doc_parser::get_doc_ids_from_binary(root.blob, false)
.map_err(|err| napi_error(format!("Doc blob refs root doc parse failed: {err}")))?;
let mut ids = ids;
ids.sort();
Ok(ids)
}
async fn upsert_projection_checkpoint(
pool: &PgPool,
workspace_id: &str,
result: &RuntimeDocBlobRefsResult,
) -> Result<()> {
let completed = result.next_cursor.is_none();
let status = if completed && result.failed_docs == 0 {
"completed"
} else if result.failed_docs > 0 {
"failed"
} else {
"running"
};
sqlx::query(
r#"
INSERT INTO blob_reconciliation_checkpoints
(kind, scope, status, cursor, completed_at, metadata)
VALUES ('doc_blob_refs', $1, $2, $3, CASE WHEN $4 THEN CURRENT_TIMESTAMP ELSE NULL END, $5)
ON CONFLICT (kind, scope) DO UPDATE
SET status = EXCLUDED.status,
cursor = EXCLUDED.cursor,
completed_at = CASE WHEN $4 THEN CURRENT_TIMESTAMP ELSE NULL END,
updated_at = CURRENT_TIMESTAMP,
metadata = EXCLUDED.metadata
"#,
)
.bind(workspace_id)
.bind(status)
.bind(serde_json::json!({ "lastDocId": result.next_cursor }))
.bind(completed && result.failed_docs == 0)
.bind(serde_json::json!({
"parserVersion": PARSER_VERSION,
}))
.execute(pool)
.await
.map_err(|err| napi_error(format!("Doc blob refs checkpoint write failed: {err}")))?;
Ok(())
}
async fn upsert_projection_failure_checkpoint(pool: &PgPool, workspace_id: &str, error: &str) -> Result<()> {
sqlx::query(
r#"
INSERT INTO blob_reconciliation_checkpoints
(kind, scope, status, cursor, completed_at, metadata)
VALUES ('doc_blob_refs', $1, 'failed', '{}', NULL, $2)
ON CONFLICT (kind, scope) DO UPDATE
SET status = 'failed',
cursor = '{}',
completed_at = NULL,
updated_at = CURRENT_TIMESTAMP,
metadata = EXCLUDED.metadata
"#,
)
.bind(workspace_id)
.bind(serde_json::json!({
"parserVersion": PARSER_VERSION,
"error": error,
}))
.execute(pool)
.await
.map_err(|err| napi_error(format!("Doc blob refs failure checkpoint write failed: {err}")))?;
Ok(())
}
async fn load_projection_cursor(pool: &PgPool, workspace_id: &str) -> Result<Option<String>> {
let cursor = sqlx::query_scalar::<_, serde_json::Value>(
"SELECT cursor FROM blob_reconciliation_checkpoints WHERE kind = 'doc_blob_refs' AND scope = $1",
)
.bind(workspace_id)
.fetch_optional(pool)
.await
.map_err(|err| napi_error(format!("Doc blob refs checkpoint load failed: {err}")))?;
Ok(cursor.and_then(|cursor| {
cursor
.get("lastDocId")
.and_then(|value| value.as_str())
.map(ToString::to_string)
}))
}
async fn purge_removed_doc_refs(pool: &PgPool, workspace_id: &str, current_doc_ids: &[String]) -> Result<i64> {
let result = sqlx::query(
r#"
DELETE FROM doc_blob_refs
WHERE workspace_id = $1
AND NOT (doc_id = ANY($2))
"#,
)
.bind(workspace_id)
.bind(current_doc_ids)
.execute(pool)
.await
.map_err(|err| napi_error(format!("Doc blob refs purge removed docs failed: {err}")))?;
Ok(result.rows_affected() as i64)
}
fn extract_refs(snapshot: &SnapshotRow) -> Result<Vec<ExtractedRef>> {
let parsed = doc_parser::parse_doc_from_binary(snapshot.blob.clone(), snapshot.doc_id.clone())
.map_err(|err| napi_error(format!("Doc blob refs parse failed: {err}")))?;
let mut refs = Vec::new();
for block in parsed.blocks {
let Some(blob_keys) = block.blob else {
continue;
};
for blob_key in blob_keys {
refs.push(ExtractedRef {
blob_key,
block_id: block.block_id.clone(),
flavour: block.flavour.clone(),
});
}
}
Ok(refs)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn doc_blob_refs_extracts_image_refs() {
let doc_id = "doc-blob-ref-test".to_string();
let blob =
doc_parser::build_full_doc("Doc", "![Alt](blob://image-blob-key)", &doc_id).expect("doc fixture should build");
let snapshot = SnapshotRow {
workspace_id: "workspace".to_string(),
doc_id,
blob,
updated_at: Utc::now(),
};
let refs = extract_refs(&snapshot).expect("refs should parse");
assert!(
refs
.iter()
.any(|reference| { reference.blob_key == "image-blob-key" && reference.flavour == "affine:image" })
);
}
}
async fn replace_doc_refs(pool: &PgPool, snapshot: &SnapshotRow, refs: Vec<ExtractedRef>) -> Result<(i64, i64)> {
let mut tx = pool
.begin()
.await
.map_err(|err| napi_error(format!("Doc blob refs transaction failed: {err}")))?;
let deleted = sqlx::query("DELETE FROM doc_blob_refs WHERE workspace_id = $1 AND doc_id = $2")
.bind(&snapshot.workspace_id)
.bind(&snapshot.doc_id)
.execute(&mut *tx)
.await
.map_err(|err| napi_error(format!("Doc blob refs delete failed: {err}")))?
.rows_affected() as i64;
let mut written = 0;
for reference in refs {
let affected = sqlx::query(
r#"
INSERT INTO doc_blob_refs
(workspace_id, doc_id, blob_key, block_id, flavour, snapshot_updated_at, parser_version, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, 'fresh')
ON CONFLICT (workspace_id, doc_id, blob_key, block_id) DO UPDATE
SET flavour = EXCLUDED.flavour,
snapshot_updated_at = EXCLUDED.snapshot_updated_at,
indexed_at = CURRENT_TIMESTAMP,
parser_version = EXCLUDED.parser_version,
status = 'fresh',
error = NULL
"#,
)
.bind(&snapshot.workspace_id)
.bind(&snapshot.doc_id)
.bind(reference.blob_key)
.bind(reference.block_id)
.bind(reference.flavour)
.bind(snapshot.updated_at)
.bind(PARSER_VERSION)
.execute(&mut *tx)
.await
.map_err(|err| napi_error(format!("Doc blob refs insert failed: {err}")))?
.rows_affected() as i64;
written += affected;
}
tx.commit()
.await
.map_err(|err| napi_error(format!("Doc blob refs transaction commit failed: {err}")))?;
Ok((written, deleted))
}
async fn mark_doc_failed(pool: &PgPool, workspace_id: &str, doc_id: &str, error: &str) -> Result<()> {
sqlx::query(
r#"
INSERT INTO doc_blob_refs
(workspace_id, doc_id, blob_key, block_id, flavour, snapshot_updated_at, parser_version, status, error)
VALUES ($1, $2, '__parse_failed__', '__parse_failed__', '__parse_failed__', CURRENT_TIMESTAMP, $3, 'failed', $4)
ON CONFLICT (workspace_id, doc_id, blob_key, block_id) DO UPDATE
SET indexed_at = CURRENT_TIMESTAMP,
status = 'failed',
error = EXCLUDED.error
"#,
)
.bind(workspace_id)
.bind(doc_id)
.bind(PARSER_VERSION)
.bind(error)
.execute(pool)
.await
.map_err(|err| napi_error(format!("Doc blob refs mark failure failed: {err}")))?;
Ok(())
}
#[napi_derive::napi]
impl BackendRuntime {
#[napi]
pub async fn rebuild_doc_blob_refs(&self, workspace_id: String, doc_id: String) -> Result<RuntimeDocBlobRefsResult> {
let pool = self.pool().await?;
let mut result = RuntimeDocBlobRefsResult {
scanned_docs: 1,
parsed_docs: 0,
refs_written: 0,
refs_deleted: 0,
failed_docs: 0,
next_cursor: None,
};
let Some(snapshot) = load_current_doc(&pool, &workspace_id, &doc_id).await? else {
result.failed_docs = 1;
mark_doc_failed(&pool, &workspace_id, &doc_id, "snapshot_missing").await?;
return Ok(result);
};
match extract_refs(&snapshot) {
Ok(refs) => {
let (written, deleted) = replace_doc_refs(&pool, &snapshot, refs).await?;
result.parsed_docs = 1;
result.refs_written = written;
result.refs_deleted = deleted;
}
Err(err) => {
result.failed_docs = 1;
mark_doc_failed(&pool, &workspace_id, &doc_id, &err.to_string()).await?;
}
}
Ok(result)
}
#[napi]
pub async fn rebuild_workspace_doc_blob_refs(
&self,
workspace_id: String,
limit: i64,
) -> Result<RuntimeDocBlobRefsResult> {
if limit <= 0 {
return Err(napi_error("doc blob refs rebuild limit must be positive"));
}
let pool = self.pool().await?;
let doc_ids = match load_workspace_doc_ids(&pool, &workspace_id).await {
Ok(doc_ids) => doc_ids,
Err(err) => {
upsert_projection_failure_checkpoint(&pool, &workspace_id, &err.to_string()).await?;
return Err(err);
}
};
let cursor = load_projection_cursor(&pool, &workspace_id).await?;
let current_doc_ids = doc_ids.clone();
let doc_ids = doc_ids
.into_iter()
.filter(|doc_id| cursor.as_ref().is_none_or(|cursor| doc_id > cursor))
.collect::<Vec<_>>();
let has_more = doc_ids.len() > limit as usize;
let mut total = RuntimeDocBlobRefsResult {
scanned_docs: 0,
parsed_docs: 0,
refs_written: 0,
refs_deleted: 0,
failed_docs: 0,
next_cursor: None,
};
let mut last_doc_id = None;
for doc_id in doc_ids.into_iter().take(limit as usize) {
last_doc_id = Some(doc_id.clone());
let result = self.rebuild_doc_blob_refs(workspace_id.clone(), doc_id).await?;
total.scanned_docs += result.scanned_docs;
total.parsed_docs += result.parsed_docs;
total.refs_written += result.refs_written;
total.refs_deleted += result.refs_deleted;
total.failed_docs += result.failed_docs;
}
if has_more {
total.next_cursor = last_doc_id;
} else if total.failed_docs == 0 {
total.refs_deleted += purge_removed_doc_refs(&pool, &workspace_id, &current_doc_ids).await?;
}
upsert_projection_checkpoint(&pool, &workspace_id, &total).await?;
Ok(total)
}
}
@@ -1,8 +1,11 @@
mod blob_cleanup;
mod blob_complete;
mod blob_reclaimer;
mod blob_reconciliation;
mod config;
mod constants;
mod coordination_lease;
mod doc_blob_refs;
mod doc_compactor;
mod doc_storage;
mod error;
@@ -9,8 +9,8 @@ use aws_sdk_s3::{
use napi::Result;
use super::types::{
MultipartUploadInitResult, MultipartUploadPart, ObjectGetResult, ObjectListEntry, ObjectMetadata, ObjectPutMetadata,
PresignedObjectRequest, completed_multipart_parts, trim_etag,
MultipartUploadInitResult, MultipartUploadPart, ObjectGetResult, ObjectListEntry, ObjectListPage, ObjectMetadata,
ObjectPutMetadata, PresignedObjectRequest, completed_multipart_parts, trim_etag,
};
use crate::backend_runtime::error::napi_error;
@@ -233,14 +233,11 @@ impl ObjectStorageClient {
}
pub(super) async fn head(&self, key: &str) -> Result<Option<ObjectMetadata>> {
let result = self
.client
.head_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.map_err(|err| napi_error(format!("ObjectStorage head failed for {key}: {err:?}")))?;
let result = match self.client.head_object().bucket(&self.bucket).key(key).send().await {
Ok(result) => result,
Err(err) if is_not_found_error(&err) => return Ok(None),
Err(err) => return Err(napi_error(format!("ObjectStorage head failed for {key}: {err:?}"))),
};
Ok(Some(ObjectMetadata {
content_type: result
@@ -253,14 +250,11 @@ impl ObjectStorageClient {
}
pub(super) async fn get(&self, key: &str) -> Result<Option<ObjectGetResult>> {
let result = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.map_err(|err| napi_error(format!("ObjectStorage get failed for {key}: {err:?}")))?;
let result = match self.client.get_object().bucket(&self.bucket).key(key).send().await {
Ok(result) => result,
Err(err) if is_not_found_error(&err) => return Ok(None),
Err(err) => return Err(napi_error(format!("ObjectStorage get failed for {key}: {err:?}"))),
};
let metadata = ObjectMetadata {
content_type: result
.content_type
@@ -314,6 +308,43 @@ impl ObjectStorageClient {
Ok(entries)
}
pub(super) async fn list_page(
&self,
prefix: Option<String>,
continuation_token: Option<String>,
start_after: Option<String>,
max_keys: i32,
) -> Result<ObjectListPage> {
let mut request = self.client.list_objects_v2().bucket(&self.bucket).max_keys(max_keys);
if let Some(prefix) = prefix {
request = request.prefix(prefix);
}
if let Some(continuation_token) = continuation_token {
request = request.continuation_token(continuation_token);
} else if let Some(start_after) = start_after {
request = request.start_after(start_after);
}
let result = request
.send()
.await
.map_err(|err| napi_error(format!("ObjectStorage list page failed: {err:?}")))?;
Ok(ObjectListPage {
entries: result
.contents()
.iter()
.filter_map(|object| {
Some(ObjectListEntry {
key: object.key.as_ref()?.clone(),
content_length: object.size.unwrap_or(0),
last_modified_ms: optional_datetime_ms(object.last_modified),
})
})
.collect(),
next_continuation_token: result.next_continuation_token,
})
}
pub(super) async fn delete(&self, key: &str) -> Result<()> {
self
.client
@@ -327,6 +358,11 @@ impl ObjectStorageClient {
}
}
fn is_not_found_error(error: &impl std::fmt::Debug) -> bool {
let message = format!("{error:?}");
message.contains("NoSuchKey") || (message.contains("NotFound") && !message.contains("NoSuchBucket"))
}
fn expires_at_ms(expires_in_seconds: u64) -> Result<i64> {
let expires_at = SystemTime::now()
.checked_add(Duration::from_secs(expires_in_seconds))
@@ -7,6 +7,7 @@ mod types;
use client::ObjectStorageClient;
pub(super) use config::ObjectStorageConfig;
use napi::{Result, bindgen_prelude::Buffer};
use types::ObjectListPage;
pub(super) use types::StorageProviderConfig;
use super::{
@@ -32,6 +33,19 @@ impl BackendRuntime {
self.object_storage_client()?.delete(key).await
}
pub(super) async fn object_storage_list_page(
&self,
prefix: Option<String>,
continuation_token: Option<String>,
start_after: Option<String>,
max_keys: i32,
) -> Result<ObjectListPage> {
self
.object_storage_client()?
.list_page(prefix, continuation_token, start_after, max_keys)
.await
}
pub(super) async fn object_storage_abort_upload(&self, key: &str, upload_id: &str) -> Result<()> {
self
.object_storage_client()?
@@ -28,10 +28,16 @@ pub(super) struct ObjectMetadata {
}
#[derive(Clone, Debug, PartialEq)]
pub(super) struct ObjectListEntry {
pub(super) key: String,
pub(super) content_length: i64,
pub(super) last_modified_ms: i64,
pub(in crate::backend_runtime) struct ObjectListEntry {
pub(in crate::backend_runtime) key: String,
pub(in crate::backend_runtime) content_length: i64,
pub(in crate::backend_runtime) last_modified_ms: i64,
}
#[derive(Clone, Debug, PartialEq)]
pub(in crate::backend_runtime) struct ObjectListPage {
pub(in crate::backend_runtime) entries: Vec<ObjectListEntry>,
pub(in crate::backend_runtime) next_continuation_token: Option<String>,
}
#[derive(Clone, Debug, PartialEq)]
@@ -38,3 +38,75 @@ CREATE TABLE IF NOT EXISTS runtime_leases (
CREATE INDEX IF NOT EXISTS runtime_leases_expires_at_idx
ON runtime_leases (expires_at);
CREATE TABLE IF NOT EXISTS blob_reconciliation_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
kind TEXT NOT NULL,
mode TEXT NOT NULL,
status TEXT NOT NULL,
workspace_id TEXT,
started_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at TIMESTAMPTZ(3),
cursor JSONB NOT NULL DEFAULT '{}',
scanned INTEGER NOT NULL DEFAULT 0,
changed INTEGER NOT NULL DEFAULT 0,
failed INTEGER NOT NULL DEFAULT 0,
metadata JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS blob_reconciliation_runs_workspace_idx
ON blob_reconciliation_runs (workspace_id, started_at DESC);
CREATE TABLE IF NOT EXISTS blob_reconciliation_checkpoints (
kind TEXT NOT NULL,
scope TEXT NOT NULL,
status TEXT NOT NULL,
cursor JSONB NOT NULL DEFAULT '{}',
last_key TEXT,
last_sid INTEGER,
completed_at TIMESTAMPTZ(3),
updated_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
metadata JSONB NOT NULL DEFAULT '{}',
PRIMARY KEY (kind, scope)
);
CREATE INDEX IF NOT EXISTS blob_reconciliation_checkpoints_status_idx
ON blob_reconciliation_checkpoints (kind, status, updated_at DESC);
CREATE TABLE IF NOT EXISTS doc_blob_refs (
workspace_id TEXT NOT NULL,
doc_id TEXT NOT NULL,
blob_key TEXT NOT NULL,
block_id TEXT NOT NULL,
flavour TEXT NOT NULL,
snapshot_updated_at TIMESTAMPTZ(3) NOT NULL,
indexed_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
parser_version INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'fresh',
error TEXT,
PRIMARY KEY (workspace_id, doc_id, blob_key, block_id)
);
CREATE INDEX IF NOT EXISTS doc_blob_refs_workspace_blob_idx
ON doc_blob_refs (workspace_id, blob_key);
CREATE INDEX IF NOT EXISTS doc_blob_refs_workspace_status_idx
ON doc_blob_refs (workspace_id, status);
CREATE TABLE IF NOT EXISTS blob_cleanup_candidates (
workspace_id TEXT NOT NULL,
blob_key TEXT NOT NULL,
reason TEXT NOT NULL,
status TEXT NOT NULL,
object_size BIGINT NOT NULL,
object_last_modified TIMESTAMPTZ(3),
planned_at TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
executed_at TIMESTAMPTZ(3),
run_id UUID NOT NULL,
evidence JSONB NOT NULL DEFAULT '{}',
error TEXT,
PRIMARY KEY (workspace_id, blob_key)
);
CREATE INDEX IF NOT EXISTS blob_cleanup_candidates_run_idx
ON blob_cleanup_candidates (run_id, status);
@@ -14,6 +14,10 @@ fn migrations_include_runtime_tables_without_worker_heartbeats() {
assert!(RUNTIME_MIGRATIONS.contains("runtime_states"));
assert!(RUNTIME_MIGRATIONS.contains("runtime_gates"));
assert!(RUNTIME_MIGRATIONS.contains("runtime_leases"));
assert!(RUNTIME_MIGRATIONS.contains("blob_reconciliation_runs"));
assert!(RUNTIME_MIGRATIONS.contains("blob_reconciliation_checkpoints"));
assert!(RUNTIME_MIGRATIONS.contains("doc_blob_refs"));
assert!(RUNTIME_MIGRATIONS.contains("blob_cleanup_candidates"));
assert!(!RUNTIME_MIGRATIONS.contains("runtime_worker_heartbeats"));
}
@@ -138,6 +138,49 @@ pub struct RuntimeBlobCompleteResult {
pub last_modified_ms: Option<i64>,
}
#[napi_derive::napi(object)]
pub struct RuntimeBlobMetadataBackfillResult {
pub scanned_objects: i64,
pub headed_objects: i64,
pub upserted_metadata: i64,
pub skipped_existing: i64,
pub skipped_workspace_missing: i64,
pub failed: i64,
pub next_cursor: Option<String>,
pub workspace_ids: Vec<String>,
}
#[napi_derive::napi(object)]
pub struct RuntimeDocBlobRefsResult {
pub scanned_docs: i64,
pub parsed_docs: i64,
pub refs_written: i64,
pub refs_deleted: i64,
pub failed_docs: i64,
pub next_cursor: Option<String>,
}
#[napi_derive::napi(object)]
pub struct RuntimeBlobCleanupPlanResult {
pub run_id: Option<String>,
pub scanned_blobs: i64,
pub candidates_marked: i64,
pub protected_by_doc_refs: i64,
pub protected_by_metadata: i64,
pub protected_by_other_refs: i64,
pub next_cursor: Option<String>,
}
#[napi_derive::napi(object)]
pub struct RuntimeBlobCleanupExecuteResult {
pub scanned_candidates: i64,
pub deleted_objects: i64,
pub deleted_metadata: i64,
pub skipped_still_referenced: i64,
pub failed: i64,
pub workspace_ids: Vec<String>,
}
#[napi_derive::napi(object)]
pub struct RuntimeDocCompactionResult {
pub lease_acquired: bool,
@@ -2,17 +2,25 @@ import { ScheduleModule } from '@nestjs/schedule';
import { TestingModule } from '@nestjs/testing';
import { PrismaClient } from '@prisma/client';
import test from 'ava';
import Sinon from 'sinon';
import { AuthModule, AuthService } from '../../core/auth';
import { AuthCronJob } from '../../core/auth/job';
import { BackendRuntimeProvider } from '../../core/backend-runtime';
import { createTestingModule } from '../utils';
let m: TestingModule;
let db: PrismaClient;
const runtime = {
cleanupExpiredUserSessions: Sinon.stub(),
};
test.before(async () => {
m = await createTestingModule({
imports: [ScheduleModule.forRoot(), AuthModule],
tapModule: builder => {
builder.overrideProvider(BackendRuntimeProvider).useValue(runtime);
},
});
db = m.get(PrismaClient);
@@ -32,16 +40,17 @@ test('should clean expired user sessions', async t => {
let userSessions = await db.userSession.findMany();
t.is(userSessions.length, 2);
// no expired sessions
runtime.cleanupExpiredUserSessions.reset();
runtime.cleanupExpiredUserSessions.resolves(0);
await job.cleanExpiredUserSessions();
userSessions = await db.userSession.findMany();
t.is(userSessions.length, 2);
t.true(runtime.cleanupExpiredUserSessions.calledOnce);
t.deepEqual(runtime.cleanupExpiredUserSessions.firstCall.args, [1000]);
// clean all expired sessions
await db.userSession.updateMany({
data: { expiresAt: new Date(Date.now() - 1000) },
});
runtime.cleanupExpiredUserSessions.reset();
runtime.cleanupExpiredUserSessions.onCall(0).resolves(1000);
runtime.cleanupExpiredUserSessions.onCall(1).resolves(2);
await job.cleanExpiredUserSessions();
userSessions = await db.userSession.findMany();
t.is(userSessions.length, 0);
t.is(runtime.cleanupExpiredUserSessions.callCount, 2);
t.deepEqual(runtime.cleanupExpiredUserSessions.firstCall.args, [1000]);
t.deepEqual(runtime.cleanupExpiredUserSessions.secondCall.args, [1000]);
});
@@ -1,7 +1,9 @@
import { ScheduleModule } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import ava, { TestFn } from 'ava';
import Sinon from 'sinon';
import { BackendRuntimeProvider } from '../../core/backend-runtime';
import { DocStorageModule } from '../../core/doc';
import { DocStorageCronJob } from '../../core/doc/job';
import { createTestingModule, type TestingModule } from '../utils';
@@ -10,14 +12,23 @@ interface Context {
module: TestingModule;
db: PrismaClient;
cronJob: DocStorageCronJob;
runtime: { cleanupExpiredSnapshotHistories: Sinon.SinonStub };
}
const test = ava as TestFn<Context>;
// cleanup database before each test
test.before(async t => {
t.context.runtime = {
cleanupExpiredSnapshotHistories: Sinon.stub(),
};
t.context.module = await createTestingModule({
imports: [ScheduleModule.forRoot(), DocStorageModule],
tapModule: builder => {
builder
.overrideProvider(BackendRuntimeProvider)
.useValue(t.context.runtime);
},
});
t.context.db = t.context.module.get(PrismaClient);
@@ -26,6 +37,7 @@ test.before(async t => {
test.beforeEach(async t => {
await t.context.module.initTestingDB();
t.context.runtime.cleanupExpiredSnapshotHistories.reset();
});
test.after.always(async t => {
@@ -33,7 +45,7 @@ test.after.always(async t => {
});
test('should be able to cleanup expired history', async t => {
const { db } = t.context;
const { db, runtime } = t.context;
const timestamp = Date.now();
// insert expired data
@@ -65,12 +77,10 @@ test('should be able to cleanup expired history', async t => {
let count = await db.snapshotHistory.count();
t.is(count, 20);
runtime.cleanupExpiredSnapshotHistories.onCall(0).resolves(1000);
runtime.cleanupExpiredSnapshotHistories.onCall(1).resolves(10);
await t.context.cronJob.cleanExpiredHistories();
count = await db.snapshotHistory.count();
t.is(count, 10);
const example = await db.snapshotHistory.findFirst();
t.truthy(example);
t.true(example!.expiredAt > new Date());
t.is(runtime.cleanupExpiredSnapshotHistories.callCount, 2);
});
@@ -303,28 +303,3 @@ test('should delete userSession fail when sessionId not match', async t => {
);
t.is(count, 0);
});
test('should cleanup expired userSessions', async t => {
const user = await t.context.user.create({
email: 'test@affine.pro',
});
const session = await t.context.db.session.create({
data: {},
});
const userSession = await t.context.session.createOrRefreshUserSession(
user.id,
session.id
);
await t.context.session.cleanExpiredUserSessions();
let count = await t.context.db.userSession.count();
t.is(count, 1);
// Set expiresAt to past time
await t.context.db.userSession.update({
where: { id: userSession.id },
data: { expiresAt: new Date('2022-01-01') },
});
await t.context.session.cleanExpiredUserSessions();
count = await t.context.db.userSession.count();
t.is(count, 0);
});
@@ -4,6 +4,7 @@ import ava, { TestFn } from 'ava';
import Sinon from 'sinon';
import { OneDay } from '../../base';
import { BackendRuntimeProvider } from '../../core/backend-runtime';
import { StorageModule, WorkspaceBlobStorage } from '../../core/storage';
import { BlobUploadCleanupJob } from '../../core/storage/job';
import { MockUser, MockWorkspace } from '../mocks';
@@ -14,13 +15,22 @@ interface Context {
db: PrismaClient;
job: BlobUploadCleanupJob;
storage: WorkspaceBlobStorage;
runtime: { cleanupExpiredPendingBlobs: Sinon.SinonStub };
}
const test = ava as TestFn<Context>;
test.before(async t => {
t.context.runtime = {
cleanupExpiredPendingBlobs: Sinon.stub(),
};
t.context.module = await createTestingModule({
imports: [ScheduleModule.forRoot(), StorageModule],
tapModule: builder => {
builder
.overrideProvider(BackendRuntimeProvider)
.useValue(t.context.runtime);
},
});
t.context.db = t.context.module.get(PrismaClient);
@@ -30,6 +40,7 @@ test.before(async t => {
test.beforeEach(async t => {
await t.context.module.initTestingDB();
t.context.runtime.cleanupExpiredPendingBlobs.reset();
});
test.after.always(async t => {
@@ -86,24 +97,14 @@ test('should cleanup expired pending blobs', async t => {
],
});
const abortSpy = Sinon.stub(
t.context.storage,
'abortMultipartUpload'
).resolves();
const deleteSpy = Sinon.spy(t.context.storage, 'delete');
t.teardown(() => {
abortSpy.restore();
deleteSpy.restore();
t.context.runtime.cleanupExpiredPendingBlobs.resolves({
scanned: 2,
deleted: 2,
abortedMultipart: 1,
workspaceIds: [workspace.id],
});
await t.context.job.cleanExpiredPendingBlobs();
t.is(abortSpy.callCount, 1);
t.is(deleteSpy.callCount, 2);
const remaining = await t.context.db.blob.findMany({
where: { workspaceId: workspace.id },
});
const remainingKeys = remaining.map(record => record.key).sort();
t.deepEqual(remainingKeys, ['completed-keep', 'pending-active']);
t.true(t.context.runtime.cleanupExpiredPendingBlobs.calledOnce);
});
@@ -119,6 +119,50 @@ test('should list blobs', async t => {
t.deepEqual(ret.map(x => x.key).sort(), [hash1, hash2].sort());
});
test('should keep partial blob metadata listing on DB path without storage scan', async t => {
await app.signupV1('u1@affine.pro');
const workspace = await createWorkspace(app);
const storage = app.get(WorkspaceBlobStorage);
const rawProvider = (storage as any).provider;
const listSpy = Sinon.spy(rawProvider, 'list');
t.teardown(() => listSpy.restore());
const buffer1 = Buffer.from('with metadata');
const buffer2 = Buffer.from('without metadata');
const key1 = sha256Base64urlWithPadding(buffer1);
const key2 = sha256Base64urlWithPadding(buffer2);
const config = app.get(Config);
const factory = app.get(StorageProviderFactory);
const provider = factory.create(config.storages.blob.storage);
await provider.put(`${workspace.id}/${key1}`, buffer1, {
contentType: 'text/plain',
contentLength: buffer1.length,
});
await provider.put(`${workspace.id}/${key2}`, buffer2, {
contentType: 'text/plain',
contentLength: buffer2.length,
});
const blobModel = app.get(BlobModel);
await blobModel.upsert({
workspaceId: workspace.id,
key: key1,
mime: 'text/plain',
size: buffer1.length,
status: 'completed',
uploadId: null,
});
const listed = await storage.list(workspace.id);
t.deepEqual(
listed.map(blob => blob.key),
[key1]
);
t.true(listSpy.notCalled);
});
test('should create pending blob upload with graphql fallback', async t => {
await app.signupV1('u1@affine.pro');
@@ -221,10 +265,13 @@ test('should auto delete blobs when workspace is deleted', async t => {
const blobs = await listBlobs(app, workspace.id);
t.is(blobs.length, 2);
const workspaceBlobStorage = Sinon.spy(app.get(WorkspaceBlobStorage));
const storage = app.get(WorkspaceBlobStorage);
const rawProvider = (storage as any).provider;
const listSpy = Sinon.spy(rawProvider, 'list');
t.teardown(() => listSpy.restore());
await deleteWorkspace(app, workspace.id);
// should not emit workspace.blob.sync event
t.is(workspaceBlobStorage.syncBlobMeta.callCount, 0);
t.is(listSpy.callCount, 0);
});
test('should calc blobs size', async t => {
@@ -30,6 +30,7 @@ import { RateLimiterModule } from './base/throttler';
import { WebSocketModule } from './base/websocket';
import { AccessTokenModule } from './core/access-token';
import { AuthModule } from './core/auth';
import { BackendRuntimeModule } from './core/backend-runtime';
import { CommentModule } from './core/comment';
import { ServerConfigModule, ServerConfigResolverModule } from './core/config';
import { DocStorageModule } from './core/doc';
@@ -120,6 +121,7 @@ export const FunctionalityModules = [
JobModule.forRoot(),
RealtimeModule,
ModelsModule,
BackendRuntimeModule,
ScheduleModule.forRoot(),
MonitorModule,
];
@@ -94,4 +94,12 @@ defineModuleConfig('job', {
},
schema,
},
'queues.backendRuntime': {
desc: 'The config for backend runtime job queue',
default: {
concurrency: 1,
},
schema,
},
});
@@ -29,6 +29,7 @@ export enum Queue {
COPILOT = 'copilot',
INDEXER = 'indexer',
CALENDAR = 'calendar',
BACKENDRUNTIME = 'backendRuntime',
}
export const QUEUES = Object.values(Queue);
@@ -2,6 +2,7 @@ import './config';
import { Module } from '@nestjs/common';
import { BackendRuntimeModule } from '../backend-runtime';
import { FeatureModule } from '../features';
import { MailModule } from '../mail';
import { QuotaModule } from '../quota';
@@ -20,7 +21,13 @@ import { AuthService } from './service';
import { SessionIssuer } from './session-issuer';
@Module({
imports: [FeatureModule, UserModule, QuotaModule, MailModule],
imports: [
BackendRuntimeModule,
FeatureModule,
UserModule,
QuotaModule,
MailModule,
],
providers: [
AuthService,
AuthResolver,
+6 -3
View File
@@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { JobQueue, OnJob } from '../../base';
import { Models } from '../../models';
import { BackendRuntimeProvider } from '../backend-runtime';
declare global {
interface Jobs {
@@ -13,7 +13,7 @@ declare global {
@Injectable()
export class AuthCronJob {
constructor(
private readonly models: Models,
private readonly rt: BackendRuntimeProvider,
private readonly queue: JobQueue
) {}
@@ -31,6 +31,9 @@ export class AuthCronJob {
@OnJob('nightly.cleanExpiredUserSessions')
async cleanExpiredUserSessions() {
await this.models.session.cleanExpiredUserSessions();
for (;;) {
const count = await this.rt.cleanupExpiredUserSessions(1000);
if (count < 1000) break;
}
}
}
@@ -0,0 +1,57 @@
import { ScheduleModule } from '@nestjs/schedule';
import ava, { TestFn } from 'ava';
import Sinon from 'sinon';
import {
createTestingModule,
type TestingModule,
} from '../../../__tests__/utils';
import { BackendRuntimeModule, BackendRuntimeProvider } from '../index';
import { BackendRuntimeHousekeepingJob } from '../job';
interface Context {
module: TestingModule;
job: BackendRuntimeHousekeepingJob;
runtime: {
cleanupExpiredRuntimeStates: Sinon.SinonStub;
cleanupExpiredRuntimeGates: Sinon.SinonStub;
};
}
const test = ava as TestFn<Context>;
test.before(async t => {
t.context.runtime = {
cleanupExpiredRuntimeStates: Sinon.stub(),
cleanupExpiredRuntimeGates: Sinon.stub(),
};
t.context.module = await createTestingModule({
imports: [ScheduleModule.forRoot(), BackendRuntimeModule],
tapModule: builder => {
builder
.overrideProvider(BackendRuntimeProvider)
.useValue(t.context.runtime);
},
});
t.context.job = t.context.module.get(BackendRuntimeHousekeepingJob);
});
test.beforeEach(t => {
t.context.runtime.cleanupExpiredRuntimeStates.reset();
t.context.runtime.cleanupExpiredRuntimeGates.reset();
});
test.after.always(async t => {
await t.context.module.close();
});
test('backend-runtime housekeeping cleans runtime state and gate batches', async t => {
t.context.runtime.cleanupExpiredRuntimeStates.onCall(0).resolves(1000);
t.context.runtime.cleanupExpiredRuntimeStates.onCall(1).resolves(2);
t.context.runtime.cleanupExpiredRuntimeGates.resolves(1);
await t.context.job.cleanExpiredRuntimeHousekeeping();
t.is(t.context.runtime.cleanupExpiredRuntimeStates.callCount, 2);
t.is(t.context.runtime.cleanupExpiredRuntimeGates.callCount, 1);
});
@@ -0,0 +1,43 @@
import test from 'ava';
import Sinon from 'sinon';
import { BackendRuntimeProvider } from '../provider';
test('backend-runtime provider starts once, runs migrations once, and reports health', async t => {
const provider = new BackendRuntimeProvider();
const runtime = {
start: Sinon.stub().resolves(),
stop: Sinon.stub().resolves(),
runMigrations: Sinon.stub().resolves(),
health: Sinon.stub().resolves({
started: true,
databaseConnected: true,
objectStorageConfigured: true,
}),
};
(provider as any).runtime = runtime;
await provider.start();
await provider.start();
const health = await provider.health();
await provider.stop();
t.is(runtime.start.callCount, 2);
t.is(runtime.runMigrations.callCount, 1);
t.true(health.databaseConnected);
t.true(health.objectStorageConfigured);
t.is(runtime.stop.callCount, 1);
});
test('backend-runtime provider measures explicit typed methods', async t => {
const provider = new BackendRuntimeProvider();
const runtime = {
cleanupExpiredRuntimeStates: Sinon.stub().resolves(3),
};
(provider as any).runtime = runtime;
const result = await provider.cleanupExpiredRuntimeStates(1000);
t.is(result, 3);
t.true(runtime.cleanupExpiredRuntimeStates.calledOnceWithExactly(1000));
});
@@ -0,0 +1,368 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import { EventBus, JobQueue, OnJob } from '../../base';
import { BackendRuntimeProvider } from './provider';
declare global {
interface Jobs {
'backendRuntime.backfillMissingBlobMetadata': {
workspaceId: string;
limit?: number;
};
'backendRuntime.backfillMissingBlobMetadataBySid': {
lastSid?: number;
workspaceLimit?: number;
objectLimit?: number;
};
'backendRuntime.rebuildWorkspaceDocBlobRefs': {
workspaceId: string;
limit?: number;
};
'backendRuntime.rebuildWorkspaceDocBlobRefsBySid': {
lastSid?: number;
workspaceLimit?: number;
docLimit?: number;
};
'backendRuntime.planUnreferencedWorkspaceBlobs': {
workspaceId: string;
gracePeriodDays?: number;
limit?: number;
};
'backendRuntime.planUnreferencedWorkspaceBlobsBySid': {
lastSid?: number;
workspaceLimit?: number;
gracePeriodDays?: number;
limit?: number;
};
'backendRuntime.executeBlobCleanupCandidates': {
runId: string;
gracePeriodDays?: number;
limit?: number;
};
}
}
@Injectable()
export class BackendRuntimeBlobJob {
private readonly logger = new Logger(BackendRuntimeBlobJob.name);
constructor(
private readonly rt: BackendRuntimeProvider,
private readonly event: EventBus,
private readonly queue: JobQueue,
private readonly db: PrismaClient
) {}
async enqueueBackfillMissingBlobMetadata(workspaceId: string, limit = 1000) {
await this.queue.add('backendRuntime.backfillMissingBlobMetadata', {
workspaceId,
limit,
});
}
async enqueueBackfillMissingBlobMetadataBySid(
lastSid = 0,
workspaceLimit = 100,
objectLimit = 1000
) {
await this.queue.add('backendRuntime.backfillMissingBlobMetadataBySid', {
lastSid,
workspaceLimit,
objectLimit,
});
}
async enqueueRebuildWorkspaceDocBlobRefs(workspaceId: string, limit = 1000) {
await this.queue.add('backendRuntime.rebuildWorkspaceDocBlobRefs', {
workspaceId,
limit,
});
}
async enqueueRebuildWorkspaceDocBlobRefsBySid(
lastSid = 0,
workspaceLimit = 100,
docLimit = 1000
) {
await this.queue.add('backendRuntime.rebuildWorkspaceDocBlobRefsBySid', {
lastSid,
workspaceLimit,
docLimit,
});
}
@OnJob('backendRuntime.backfillMissingBlobMetadataBySid')
async backfillMissingBlobMetadataBySid({
lastSid = 0,
workspaceLimit = 100,
objectLimit = 1000,
}: Jobs['backendRuntime.backfillMissingBlobMetadataBySid']) {
const workspaces = await this.db.workspace.findMany({
where: { sid: { gt: lastSid } },
orderBy: { sid: 'asc' },
select: { id: true, sid: true },
take: workspaceLimit,
});
for (const workspace of workspaces) {
await this.drainBlobMetadataBackfill(workspace.id, objectLimit, {
sid: workspace.sid,
});
}
const nextSid = workspaces.at(-1)?.sid;
if (nextSid !== undefined && workspaces.length === workspaceLimit) {
await this.enqueueBackfillMissingBlobMetadataBySid(
nextSid,
workspaceLimit,
objectLimit
);
}
}
async enqueuePlanUnreferencedWorkspaceBlobs(
workspaceId: string,
gracePeriodDays = 30,
limit = 1000
) {
await this.queue.add('backendRuntime.planUnreferencedWorkspaceBlobs', {
workspaceId,
gracePeriodDays,
limit,
});
}
async enqueuePlanUnreferencedWorkspaceBlobsBySid(
lastSid = 0,
workspaceLimit = 100,
gracePeriodDays = 30,
limit = 1000
) {
await this.queue.add('backendRuntime.planUnreferencedWorkspaceBlobsBySid', {
lastSid,
workspaceLimit,
gracePeriodDays,
limit,
});
}
async enqueueExecuteBlobCleanupCandidates(
runId: string,
gracePeriodDays = 30,
limit = 1000
) {
await this.queue.add('backendRuntime.executeBlobCleanupCandidates', {
runId,
gracePeriodDays,
limit,
});
}
@Cron(CronExpression.EVERY_DAY_AT_1AM)
async dailyBlobMetadataBackfill() {
await this.queue.add(
'backendRuntime.backfillMissingBlobMetadataBySid',
{},
{ jobId: 'daily-backend-runtime-blob-metadata-backfill' }
);
}
@Cron(CronExpression.EVERY_DAY_AT_2AM)
async dailyDocBlobRefsRebuild() {
await this.queue.add(
'backendRuntime.rebuildWorkspaceDocBlobRefsBySid',
{},
{ jobId: 'daily-backend-runtime-doc-blob-refs-rebuild' }
);
}
@Cron(CronExpression.EVERY_DAY_AT_3AM)
async dailyBlobCleanupPlanning() {
await this.queue.add(
'backendRuntime.planUnreferencedWorkspaceBlobsBySid',
{},
{ jobId: 'daily-backend-runtime-blob-cleanup-planning' }
);
}
@OnJob('backendRuntime.backfillMissingBlobMetadata')
async backfillMissingBlobMetadata({
workspaceId,
limit = 1000,
}: Jobs['backendRuntime.backfillMissingBlobMetadata']) {
await this.drainBlobMetadataBackfill(workspaceId, limit);
}
@OnJob('backendRuntime.rebuildWorkspaceDocBlobRefs')
async rebuildWorkspaceDocBlobRefs({
workspaceId,
limit = 1000,
}: Jobs['backendRuntime.rebuildWorkspaceDocBlobRefs']) {
await this.drainWorkspaceDocBlobRefs(workspaceId, limit);
}
@OnJob('backendRuntime.rebuildWorkspaceDocBlobRefsBySid')
async rebuildWorkspaceDocBlobRefsBySid({
lastSid = 0,
workspaceLimit = 100,
docLimit = 1000,
}: Jobs['backendRuntime.rebuildWorkspaceDocBlobRefsBySid']) {
const workspaces = await this.db.workspace.findMany({
where: {
sid: {
gt: lastSid,
},
},
orderBy: {
sid: 'asc',
},
select: {
id: true,
sid: true,
},
take: workspaceLimit,
});
for (const workspace of workspaces) {
await this.drainWorkspaceDocBlobRefs(workspace.id, docLimit, {
sid: workspace.sid,
});
}
const nextSid = workspaces.at(-1)?.sid;
if (nextSid !== undefined && workspaces.length === workspaceLimit) {
await this.enqueueRebuildWorkspaceDocBlobRefsBySid(
nextSid,
workspaceLimit,
docLimit
);
}
}
@OnJob('backendRuntime.planUnreferencedWorkspaceBlobs')
async planUnreferencedWorkspaceBlobs({
workspaceId,
gracePeriodDays = 30,
limit = 1000,
}: Jobs['backendRuntime.planUnreferencedWorkspaceBlobs']) {
const result = await this.rt.planUnreferencedWorkspaceBlobs(
workspaceId,
gracePeriodDays,
limit
);
this.logger.log(
`planned blob cleanup workspace=${workspaceId} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}`
);
}
@OnJob('backendRuntime.planUnreferencedWorkspaceBlobsBySid')
async planUnreferencedWorkspaceBlobsBySid({
lastSid = 0,
workspaceLimit = 100,
gracePeriodDays = 30,
limit = 1000,
}: Jobs['backendRuntime.planUnreferencedWorkspaceBlobsBySid']) {
const workspaces = await this.db.workspace.findMany({
where: {
sid: {
gt: lastSid,
},
},
orderBy: {
sid: 'asc',
},
select: {
id: true,
sid: true,
},
take: workspaceLimit,
});
for (const workspace of workspaces) {
const result = await this.rt.planUnreferencedWorkspaceBlobs(
workspace.id,
gracePeriodDays,
limit
);
this.logger.log(
`planned blob cleanup workspace=${workspace.id} sid=${workspace.sid} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}`
);
}
const nextSid = workspaces.at(-1)?.sid;
if (nextSid !== undefined && workspaces.length === workspaceLimit) {
await this.enqueuePlanUnreferencedWorkspaceBlobsBySid(
nextSid,
workspaceLimit,
gracePeriodDays,
limit
);
}
}
@OnJob('backendRuntime.executeBlobCleanupCandidates')
async executeBlobCleanupCandidates({
runId,
gracePeriodDays = 30,
limit = 1000,
}: Jobs['backendRuntime.executeBlobCleanupCandidates']) {
const result = await this.rt.executeBlobCleanupCandidates(
runId,
gracePeriodDays,
limit
);
await Promise.all(
result.workspaceIds.map(workspaceId =>
this.event.emitAsync('workspace.blobs.updated', { workspaceId })
)
);
this.logger.log(
`executed blob cleanup run=${runId} deleted=${result.deletedObjects} skipped=${result.skippedStillReferenced} failed=${result.failed}`
);
}
private async drainBlobMetadataBackfill(
workspaceId: string,
limit: number,
context: { sid?: number } = {}
) {
for (;;) {
const result = await this.rt.backfillMissingBlobMetadata(
workspaceId,
limit
);
await Promise.all(
result.workspaceIds.map(workspaceId =>
this.event.emitAsync('workspace.blobs.updated', { workspaceId })
)
);
this.logger.log(
`backfilled blob metadata workspace=${workspaceId}${context.sid === undefined ? '' : ` sid=${context.sid}`} upserted=${result.upsertedMetadata} scanned=${result.scannedObjects}`
);
if (!result.nextCursor) {
break;
}
}
}
private async drainWorkspaceDocBlobRefs(
workspaceId: string,
limit: number,
context: { sid?: number } = {}
) {
for (;;) {
const result = await this.rt.rebuildWorkspaceDocBlobRefs(
workspaceId,
limit
);
this.logger.log(
`rebuilt doc blob refs workspace=${workspaceId}${context.sid === undefined ? '' : ` sid=${context.sid}`} parsed=${result.parsedDocs} failed=${result.failedDocs}`
);
if (!result.nextCursor) {
break;
}
}
}
}
@@ -0,0 +1,18 @@
import { Global, Module } from '@nestjs/common';
import { BackendRuntimeBlobJob } from './blob-job';
import { BackendRuntimeHousekeepingJob } from './job';
import { BackendRuntimeProvider } from './provider';
@Global()
@Module({
providers: [
BackendRuntimeProvider,
BackendRuntimeBlobJob,
BackendRuntimeHousekeepingJob,
],
exports: [BackendRuntimeProvider, BackendRuntimeBlobJob],
})
export class BackendRuntimeModule {}
export { BackendRuntimeProvider } from './provider';
@@ -0,0 +1,58 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { JobQueue, OnJob } from '../../base';
import { BackendRuntimeProvider } from './provider';
declare global {
interface Jobs {
'nightly.cleanExpiredBackendRuntimeHousekeeping': {};
}
}
@Injectable()
export class BackendRuntimeHousekeepingJob {
private readonly logger = new Logger(BackendRuntimeHousekeepingJob.name);
constructor(
private readonly rt: BackendRuntimeProvider,
private readonly queue: JobQueue
) {}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async nightlyJob() {
await this.queue.add(
'nightly.cleanExpiredBackendRuntimeHousekeeping',
{},
{
jobId: 'nightly-backend-runtime-housekeeping',
}
);
}
@OnJob('nightly.cleanExpiredBackendRuntimeHousekeeping')
async cleanExpiredRuntimeHousekeeping() {
const states = await this.cleanBatches(() =>
this.rt.cleanupExpiredRuntimeStates(1000)
);
const gates = await this.cleanBatches(() =>
this.rt.cleanupExpiredRuntimeGates(1000)
);
this.logger.log(
`cleaned runtime housekeeping states=${states} gates=${gates}`
);
}
private async cleanBatches(fn: () => Promise<number>) {
let total = 0;
for (;;) {
const count = Number(await fn());
total += count;
if (count < 1000) {
break;
}
}
return total;
}
}
@@ -0,0 +1,137 @@
import {
Injectable,
Logger,
type OnApplicationBootstrap,
type OnApplicationShutdown,
} from '@nestjs/common';
import { wrapCallMetric } from '../../base/metrics';
import { BackendRuntime, type BackendRuntimeHealth } from '../../native';
type RuntimeInstance = InstanceType<typeof BackendRuntime>;
@Injectable()
export class BackendRuntimeProvider
implements OnApplicationBootstrap, OnApplicationShutdown
{
private readonly logger = new Logger(BackendRuntimeProvider.name);
private readonly runtime: RuntimeInstance = new BackendRuntime();
private migrationsStarted = false;
async onApplicationBootstrap() {
await this.start();
}
async onApplicationShutdown() {
await this.stop();
}
async start() {
await this.runtime.start();
await this.runMigrationsOnce();
const health = await this.runtime.health();
this.logger.log(
`backend runtime started: db=${health.databaseConnected} objectStorage=${health.objectStorageConfigured}`
);
}
async stop() {
await this.runtime.stop();
this.logger.log('backend runtime stopped');
}
async health(): Promise<BackendRuntimeHealth> {
return await this.runtime.health();
}
async cleanupExpiredPendingBlobs(cutoffMs: number, limit: number) {
return await this.measured('cleanupExpiredPendingBlobs', rt =>
rt.cleanupExpiredPendingBlobs(cutoffMs, limit)
);
}
async releaseDeletedBlobs(workspaceId: string, limit: number) {
return await this.measured('releaseDeletedBlobs', rt =>
rt.releaseDeletedBlobs(workspaceId, limit)
);
}
async cleanupExpiredSnapshotHistories(limit: number) {
return await this.measured('cleanupExpiredSnapshotHistories', rt =>
rt.cleanupExpiredSnapshotHistories(limit)
);
}
async cleanupExpiredUserSessions(limit: number) {
return await this.measured('cleanupExpiredUserSessions', rt =>
rt.cleanupExpiredUserSessions(limit)
);
}
async cleanupExpiredRuntimeStates(limit: number) {
return await this.measured('cleanupExpiredRuntimeStates', rt =>
rt.cleanupExpiredRuntimeStates(limit)
);
}
async cleanupExpiredRuntimeGates(limit: number) {
return await this.measured('cleanupExpiredRuntimeGates', rt =>
rt.cleanupExpiredRuntimeGates(limit)
);
}
async backfillMissingBlobMetadata(
workspaceId: string | null | undefined,
limit: number
) {
return await this.measured('backfillMissingBlobMetadata', rt =>
rt.backfillMissingBlobMetadata(workspaceId, limit)
);
}
async rebuildWorkspaceDocBlobRefs(workspaceId: string, limit: number) {
return await this.measured('rebuildWorkspaceDocBlobRefs', rt =>
rt.rebuildWorkspaceDocBlobRefs(workspaceId, limit)
);
}
async planUnreferencedWorkspaceBlobs(
workspaceId: string,
gracePeriodDays: number,
limit: number
) {
return await this.measured('planUnreferencedWorkspaceBlobs', rt =>
rt.planUnreferencedWorkspaceBlobs(workspaceId, gracePeriodDays, limit)
);
}
async executeBlobCleanupCandidates(
runId: string,
gracePeriodDays: number,
limit: number
) {
return await this.measured('executeBlobCleanupCandidates', rt =>
rt.executeBlobCleanupCandidates(runId, gracePeriodDays, limit)
);
}
private async measured<T>(
method: string,
fn: (runtime: RuntimeInstance) => Promise<T>
): Promise<T> {
return await wrapCallMetric(
() => fn(this.runtime),
'storage',
'backend_runtime',
{ method }
)();
}
private async runMigrationsOnce() {
if (this.migrationsStarted) {
return;
}
await this.runtime.runMigrations();
this.migrationsStarted = true;
}
}
@@ -2,6 +2,7 @@ import './config';
import { Module } from '@nestjs/common';
import { BackendRuntimeModule } from '../backend-runtime';
import { PermissionModule } from '../permission';
import { QuotaModule } from '../quota';
import { StorageModule } from '../storage';
@@ -14,7 +15,7 @@ import { DatabaseDocReader, DocReader, DocReaderProvider } from './reader';
import { DocWriter } from './writer';
@Module({
imports: [QuotaModule, PermissionModule, StorageModule],
imports: [BackendRuntimeModule, QuotaModule, PermissionModule, StorageModule],
providers: [
DocStorageOptions,
PgWorkspaceDocStorageAdapter,
+6 -3
View File
@@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { JobQueue, OnJob } from '../../base';
import { Models } from '../../models';
import { BackendRuntimeProvider } from '../backend-runtime';
declare global {
interface Jobs {
@@ -13,7 +13,7 @@ declare global {
@Injectable()
export class DocStorageCronJob {
constructor(
private readonly models: Models,
private readonly rt: BackendRuntimeProvider,
private readonly queue: JobQueue
) {}
@@ -30,6 +30,9 @@ export class DocStorageCronJob {
@OnJob('nightly.cleanExpiredHistories')
async cleanExpiredHistories() {
await this.models.history.cleanExpired();
for (;;) {
const count = await this.rt.cleanupExpiredSnapshotHistories(1000);
if (count < 1000) break;
}
}
}
@@ -2,6 +2,7 @@ import './config';
import { Module } from '@nestjs/common';
import { BackendRuntimeModule } from '../backend-runtime';
import { BlobUploadCleanupJob } from './job';
import { R2UploadController } from './r2-proxy';
import {
@@ -11,6 +12,7 @@ import {
} from './wrappers';
@Module({
imports: [BackendRuntimeModule],
controllers: [R2UploadController],
providers: [
WorkspaceBlobStorage,
+21 -18
View File
@@ -1,9 +1,8 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { JobQueue, OneDay, OnJob } from '../../base';
import { Models } from '../../models';
import { WorkspaceBlobStorage } from './wrappers/blob';
import { EventBus, JobQueue, OneDay, OnJob } from '../../base';
import { BackendRuntimeProvider } from '../backend-runtime';
declare global {
interface Jobs {
@@ -16,8 +15,8 @@ export class BlobUploadCleanupJob {
private readonly logger = new Logger(BlobUploadCleanupJob.name);
constructor(
private readonly models: Models,
private readonly storage: WorkspaceBlobStorage,
private readonly rt: BackendRuntimeProvider,
private readonly event: EventBus,
private readonly queue: JobQueue
) {}
@@ -34,21 +33,25 @@ export class BlobUploadCleanupJob {
@OnJob('nightly.cleanExpiredPendingBlobs')
async cleanExpiredPendingBlobs() {
const cutoff = new Date(Date.now() - OneDay);
const pending = await this.models.blob.listPendingExpired(cutoff);
for (const blob of pending) {
if (blob.uploadId) {
await this.storage.abortMultipartUpload(
blob.workspaceId,
blob.key,
blob.uploadId
);
const cutoff = Date.now() - OneDay;
let scanned = 0;
let deleted = 0;
for (;;) {
const result = await this.rt.cleanupExpiredPendingBlobs(cutoff, 1000);
scanned += result.scanned;
deleted += result.deleted;
await Promise.all(
result.workspaceIds.map(workspaceId =>
this.event.emitAsync('workspace.blobs.updated', { workspaceId })
)
);
if (result.scanned < 1000) {
break;
}
await this.storage.delete(blob.workspaceId, blob.key, true);
}
this.logger.log(`cleaned ${pending.length} expired pending blobs`);
this.logger.log(
`cleaned ${deleted} expired pending blobs, scanned ${scanned}`
);
}
}
@@ -7,7 +7,6 @@ import {
Config,
EventBus,
type GetObjectMetadata,
ListObjectsMetadata,
OnEvent,
PutObjectMetadata,
type StorageProvider,
@@ -15,13 +14,10 @@ import {
URLHelper,
} from '../../../base';
import { Models } from '../../../models';
import { BackendRuntimeProvider } from '../../backend-runtime';
declare global {
interface Events {
'workspace.blob.sync': {
workspaceId: string;
key: string;
};
'workspace.blob.delete': {
workspaceId: string;
key: string;
@@ -57,7 +53,8 @@ export class WorkspaceBlobStorage {
private readonly event: EventBus,
private readonly storageFactory: StorageProviderFactory,
private readonly models: Models,
private readonly url: URLHelper
private readonly url: URLHelper,
private readonly rt: BackendRuntimeProvider
) {}
@OnEvent('config.init')
@@ -223,34 +220,8 @@ export class WorkspaceBlobStorage {
return { ok: true, metadata };
}
async list(workspaceId: string, syncBlobMeta = true) {
const blobsInDb = await this.models.blob.list(workspaceId);
if (blobsInDb.length > 0) {
return blobsInDb;
}
// all blobs are uploading but not completed yet
const hasDbBlobs = await this.models.blob.hasAny(workspaceId);
if (hasDbBlobs) {
return blobsInDb;
}
const blobs = await this.provider.list(workspaceId + '/');
blobs.forEach(blob => {
blob.key = blob.key.slice(workspaceId.length + 1);
});
if (syncBlobMeta) {
this.trySyncBlobsMeta(workspaceId, blobs);
}
return blobs.map(blob => ({
key: blob.key,
size: blob.contentLength,
createdAt: blob.lastModified,
mime: 'application/octet-stream',
}));
async list(workspaceId: string) {
return await this.models.blob.list(workspaceId);
}
async delete(workspaceId: string, key: string, permanently = false) {
@@ -264,17 +235,17 @@ export class WorkspaceBlobStorage {
}
async release(workspaceId: string) {
const deletedBlobs = await this.models.blob.listDeleted(workspaceId);
deletedBlobs.forEach(blob => {
this.event.emit('workspace.blob.delete', {
workspaceId: workspaceId,
key: blob.key,
});
});
let scanned = 0;
let deleted = 0;
for (;;) {
const result = await this.rt.releaseDeletedBlobs(workspaceId, 1000);
scanned += result.scanned;
deleted += result.deleted;
if (result.scanned < 1000) break;
}
this.logger.log(
`released ${deletedBlobs.length} blobs for workspace ${workspaceId}`
`released ${deleted}/${scanned} blobs for workspace ${workspaceId}`
);
await this.event.emitAsync('workspace.blobs.updated', { workspaceId });
@@ -291,15 +262,6 @@ export class WorkspaceBlobStorage {
return this.url.link(`/api/workspaces/${workspaceId}/blobs/${avatarKey}`);
}
private trySyncBlobsMeta(workspaceId: string, blobs: ListObjectsMetadata[]) {
for (const blob of blobs) {
this.event.emit('workspace.blob.sync', {
workspaceId,
key: blob.key,
});
}
}
private async upsert(
workspaceId: string,
key: string,
@@ -315,26 +277,9 @@ export class WorkspaceBlobStorage {
});
}
@OnEvent('workspace.blob.sync')
async syncBlobMeta({ workspaceId, key }: Events['workspace.blob.sync']) {
try {
const meta = await this.provider.head(`${workspaceId}/${key}`);
if (meta) {
await this.upsert(workspaceId, key, meta);
} else {
await this.models.blob.delete(workspaceId, key, true);
}
} catch (e) {
// never throw
this.logger.error('failed to sync blob meta to DB', e);
}
}
@OnEvent('workspace.deleted')
async onWorkspaceDeleted({ id }: Events['workspace.deleted']) {
// do not sync blob meta to DB
const blobs = await this.list(id, false);
const blobs = await this.list(id);
// to reduce cpu time holding
blobs.forEach(blob => {
@@ -105,31 +105,6 @@ test('should list blobs', async t => {
t.is(blobs[1].key, blob2.key);
});
test('should list deleted blobs', async t => {
const workspace = await module.create(Mockers.Workspace);
const blob = await models.blob.upsert({
workspaceId: workspace.id,
key: 'test-key',
mime: 'text/plain',
size: 100,
});
await models.blob.delete(workspace.id, blob.key);
const blobs = await models.blob.listDeleted(workspace.id);
t.is(blobs.length, 1);
t.is(blobs[0].key, blob.key);
t.truthy(blobs[0].deletedAt);
// delete permanently
await models.blob.delete(workspace.id, blob.key, true);
const blobs2 = await models.blob.listDeleted(workspace.id);
t.is(blobs2.length, 0);
});
test('should get blob', async t => {
const workspace = await module.create(Mockers.Workspace);
const blob = await models.blob.upsert({
@@ -96,32 +96,6 @@ export class BlobModel extends BaseModel {
return count > 0;
}
async listPendingExpired(before: Date) {
return await this.db.blob.findMany({
where: {
status: 'pending',
deletedAt: null,
createdAt: {
lt: before,
},
},
select: {
workspaceId: true,
key: true,
uploadId: true,
},
});
}
async listDeleted(workspaceId: string) {
return await this.db.blob.findMany({
where: {
workspaceId,
deletedAt: { not: null },
},
});
}
async totalSize(workspaceId: string) {
const sum = await this.db.blob.aggregate({
where: {
@@ -162,21 +162,4 @@ export class HistoryModel extends BaseModel {
editor: row.createdByUser,
};
}
/**
* Clean expired histories.
*/
async cleanExpired() {
const { count } = await this.db.snapshotHistory.deleteMany({
where: {
expiredAt: {
lte: new Date(),
},
},
});
if (count > 0) {
this.logger.log(`Deleted ${count} expired histories`);
}
return count;
}
}
@@ -148,17 +148,4 @@ export class SessionModel extends BaseModel {
}
return count;
}
async cleanExpiredUserSessions() {
const { count } = await this.db.userSession.deleteMany({
where: {
expiresAt: {
lte: new Date(),
},
},
});
if (count > 0) {
this.logger.log(`Cleaned ${count} expired user sessions`);
}
}
}
+8
View File
@@ -46,9 +46,13 @@ import serverNativeModule, {
type RequestedModelMatchResponse,
type ResolvedEntitlement,
type ResolveEntitlementInput,
type RuntimeBlobCleanupExecuteResult,
type RuntimeBlobCleanupPlanResult,
type RuntimeBlobCleanupResult,
type RuntimeBlobCompleteResult,
type RuntimeBlobMetadataBackfillResult,
type RuntimeByokLocalLeaseRecord,
type RuntimeDocBlobRefsResult,
type RuntimeDocCompactionResult,
type RuntimeMagicLinkOtpConsumeResult,
type RuntimeMultipartUploadInit,
@@ -91,9 +95,13 @@ export type {
RemoteMimeTypeRequest,
ResolvedEntitlement,
ResolveEntitlementInput,
RuntimeBlobCleanupExecuteResult,
RuntimeBlobCleanupPlanResult,
RuntimeBlobCleanupResult,
RuntimeBlobCompleteResult,
RuntimeBlobMetadataBackfillResult,
RuntimeByokLocalLeaseRecord,
RuntimeDocBlobRefsResult,
RuntimeDocCompactionResult,
RuntimeMagicLinkOtpConsumeResult,
RuntimeMultipartUploadInit,
+4
View File
@@ -46,6 +46,10 @@
"queues.nightly": {
"type": "Object",
"desc": "The config for nightly job queue"
},
"queues.backendRuntime": {
"type": "Object",
"desc": "The config for backend runtime job queue"
}
},
"throttle": {