diff --git a/packages/backend/native/index.d.ts b/packages/backend/native/index.d.ts index 5d10672190..32de3166ed 100644 --- a/packages/backend/native/index.d.ts +++ b/packages/backend/native/index.d.ts @@ -514,6 +514,12 @@ export declare function llmValidateJsonSchema(schema: any, value: any): any */ export declare function mergeUpdatesInApplyWay(updates: Array): Buffer +/** + * Check whether a Yjs update binary can be decoded without applying it to a + * document state. + */ +export declare function validateDocUpdate(update: Buffer): Promise + export declare function mintChallengeResponse(resource: string, bits?: number | undefined | null): Promise export interface ModelConditionsContract { diff --git a/packages/backend/native/src/backend_runtime/config.rs b/packages/backend/native/src/backend_runtime/config.rs index 5e6b77d3f2..347d650093 100644 --- a/packages/backend/native/src/backend_runtime/config.rs +++ b/packages/backend/native/src/backend_runtime/config.rs @@ -6,6 +6,8 @@ use std::{ use napi::Result; use serde::Deserialize; +use serde_json::Map; +use sqlx::{PgPool, Row}; use super::{ error::napi_error, @@ -20,26 +22,56 @@ pub(super) struct RuntimeConfig { impl RuntimeConfig { pub(super) fn from_config_files() -> Result { + let app_config = app_config_from_config_files()?; let database_url = database_url_from_env() - .or(database_url_from_config_files()?) + .or(app_config.database_url()) .unwrap_or_else(|| "postgresql://localhost:5432/affine".to_string()); - let storage = ObjectStorageConfig::from_config_files()?; + let storage = ObjectStorageConfig::from_provider_config(app_config.blob_storage_provider_config())?; Ok(Self { database_url, storage }) } + + pub(super) async fn with_db_overrides(&self, pool: &PgPool) -> Result { + let mut app_config = app_config_from_config_files()?; + app_config.apply_file_config(load_app_config_overrides_from_db(pool).await?); + Ok(Self { + // The DB override is loaded after this connection already exists, so it + // must not rewrite the active datasource URL. + database_url: self.database_url.clone(), + storage: ObjectStorageConfig::from_provider_config(app_config.blob_storage_provider_config())?, + }) + } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Default, Deserialize)] struct AppConfigFile { db: Option, + #[serde(default)] storages: Option>, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Default, Deserialize)] #[serde(rename_all = "camelCase")] struct DbConfigFile { datasource_url: Option, } +impl AppConfigFile { + fn database_url(&self) -> Option { + self + .db + .as_ref() + .and_then(|db| db.datasource_url.clone()) + .and_then(non_empty_string) + } + + fn blob_storage_provider_config(&self) -> Option { + self + .storages + .as_ref() + .and_then(|storages| storages.get("blob.storage").cloned()) + } +} + fn database_url_from_env() -> Option { env::var("DATABASE_URL").ok().and_then(non_empty_string) } @@ -48,8 +80,8 @@ fn non_empty_string(value: String) -> Option { if value.trim().is_empty() { None } else { Some(value) } } -fn database_url_from_config_files() -> Result> { - let mut database_url = None; +fn app_config_from_config_files() -> Result { + let mut merged = AppConfigFile::default(); for path in config_json_paths() { if !path.exists() { continue; @@ -58,30 +90,59 @@ fn database_url_from_config_files() -> Result> { .map_err(|err| napi_error(format!("failed to read config file {}: {err}", path.display())))?; let config: AppConfigFile = serde_json::from_str(&raw) .map_err(|err| napi_error(format!("failed to parse config file {}: {err}", path.display())))?; - if let Some(next) = config.db.and_then(|db| db.datasource_url).and_then(non_empty_string) { - database_url = Some(next); - } + merged.apply_file_config(config); } - Ok(database_url) + Ok(merged) } -pub(super) fn blob_storage_config_from_config_files() -> Result> { - let mut storage = None; - for path in config_json_paths() { - if !path.exists() { - continue; +impl AppConfigFile { + fn apply_file_config(&mut self, config: AppConfigFile) { + if config.db.is_some() { + self.db = config.db; } - let raw = fs::read_to_string(&path) - .map_err(|err| napi_error(format!("failed to read config file {}: {err}", path.display())))?; - let config: AppConfigFile = serde_json::from_str(&raw) - .map_err(|err| napi_error(format!("failed to parse config file {}: {err}", path.display())))?; - if let Some(next) = config.storages.and_then(|mut storages| storages.remove("blob.storage")) { - storage = Some(next); + if let Some(storages) = config.storages + && !storages.is_empty() + { + self.storages.get_or_insert_with(HashMap::new).extend(storages); + } + } +} + +async fn load_app_config_overrides_from_db(pool: &PgPool) -> Result { + let rows = match sqlx::query("SELECT id, value FROM app_configs").fetch_all(pool).await { + Ok(rows) => rows, + Err(sqlx::Error::Database(err)) if err.code().as_deref() == Some("42P01") => return Ok(AppConfigFile::default()), + Err(err) => return Err(napi_error(format!("failed to load app config overrides: {err}"))), + }; + + app_config_from_flat_overrides(rows.into_iter().map(|row| { + let id: String = row.get("id"); + let value: serde_json::Value = row.get("value"); + (id, value) + })) +} + +fn app_config_from_flat_overrides(rows: I) -> Result +where + I: IntoIterator, + S: AsRef, +{ + let mut root = Map::new(); + for (path, value) in rows { + let Some((module, key)) = path.as_ref().split_once('.') else { + continue; + }; + root + .entry(module.to_string()) + .or_insert_with(|| serde_json::Value::Object(Map::new())); + if let Some(serde_json::Value::Object(module_object)) = root.get_mut(module) { + module_object.insert(key.to_string(), value); } } - Ok(storage) + serde_json::from_value(serde_json::Value::Object(root)) + .map_err(|err| napi_error(format!("invalid app config overrides: {err}"))) } pub(super) fn config_json_paths() -> Vec { @@ -142,4 +203,48 @@ mod tests { Some("postgresql://affine:affine@localhost:5432/affine".to_string()) ); } + + #[test] + fn parses_blob_storage_app_config_value() { + let app_config = app_config_from_flat_overrides([ + ( + "unknown.future.config", + serde_json::json!({ + "shape": "ignored" + }), + ), + ( + "storages.blob.storage", + serde_json::json!({ + "provider": "cloudflare-r2", + "bucket": "workspace-blobs-canary", + "config": { + "accountId": "account", + "credentials": { + "accessKeyId": "key", + "secretAccessKey": "secret" + }, + "usePresignedURL": { + "enabled": true + } + } + }), + ), + ]) + .unwrap(); + let storage = app_config.blob_storage_provider_config().unwrap(); + let config = ObjectStorageConfig::from_provider_config(Some(storage)) + .unwrap() + .unwrap(); + + let health = config.health(); + assert!(health.configured); + assert_eq!(health.provider.as_deref(), Some("cloudflare-r2")); + assert_eq!(health.bucket.as_deref(), Some("workspace-blobs-canary")); + assert_eq!( + health.endpoint.as_deref(), + Some("https://account.r2.cloudflarestorage.com") + ); + assert!(health.use_presigned_url); + } } diff --git a/packages/backend/native/src/backend_runtime/mod.rs b/packages/backend/native/src/backend_runtime/mod.rs index e249b7fc73..69c6ab83eb 100644 --- a/packages/backend/native/src/backend_runtime/mod.rs +++ b/packages/backend/native/src/backend_runtime/mod.rs @@ -18,7 +18,7 @@ mod tests; mod types; mod workspace_stats; -use std::time::Duration; +use std::{sync::RwLock, time::Duration}; use napi::Result; use sha2::{Digest, Sha256}; @@ -33,7 +33,7 @@ pub(super) fn token_hash(token: &str) -> String { #[napi_derive::napi] pub struct BackendRuntime { - config: RuntimeConfig, + config: RwLock, pool: Mutex>, } @@ -42,7 +42,7 @@ impl BackendRuntime { #[napi(constructor)] pub fn new() -> Result { Ok(Self { - config: RuntimeConfig::from_config_files()?, + config: RwLock::new(RuntimeConfig::from_config_files()?), pool: Mutex::new(None), }) } @@ -54,10 +54,11 @@ impl BackendRuntime { return Ok(()); } + let database_url = self.config()?.database_url; let pool = PgPoolOptions::new() .max_connections(5) .acquire_timeout(Duration::from_secs(5)) - .connect(&self.config.database_url) + .connect(&database_url) .await .map_err(|err| napi_error(format!("BackendRuntime failed to connect postgres: {err}")))?; @@ -66,6 +67,9 @@ impl BackendRuntime { .await .map_err(|err| napi_error(format!("BackendRuntime postgres health check failed: {err}")))?; + let config = self.config()?.with_db_overrides(&pool).await?; + self.update_config(config)?; + *guard = Some(pool); Ok(()) } @@ -94,7 +98,7 @@ impl BackendRuntime { Ok(BackendRuntimeHealth { started: pool.is_some(), database_connected, - object_storage_configured: self.config.storage.is_some(), + object_storage_configured: self.config()?.storage.is_some(), }) } @@ -113,6 +117,22 @@ impl BackendRuntime { .cloned() .ok_or_else(|| napi_error("BackendRuntime must be started before using postgres operations")) } + + pub(in crate::backend_runtime) fn config(&self) -> Result { + self + .config + .read() + .map(|config| config.clone()) + .map_err(|_| napi_error("BackendRuntime config lock poisoned")) + } + + fn update_config(&self, config: RuntimeConfig) -> Result<()> { + *self + .config + .write() + .map_err(|_| napi_error("BackendRuntime config lock poisoned"))? = config; + Ok(()) + } } async fn migrate_runtime_tables(pool: &PgPool) -> Result<()> { diff --git a/packages/backend/native/src/backend_runtime/object_storage/config.rs b/packages/backend/native/src/backend_runtime/object_storage/config.rs index c29a8af97a..ae65328a82 100644 --- a/packages/backend/native/src/backend_runtime/object_storage/config.rs +++ b/packages/backend/native/src/backend_runtime/object_storage/config.rs @@ -5,9 +5,7 @@ use napi::Result; use serde::Deserialize; use super::{client::ObjectStorageClient, types::StorageProviderConfig}; -use crate::backend_runtime::{ - config::blob_storage_config_from_config_files, error::napi_error, types::RuntimeObjectStorageHealth, -}; +use crate::backend_runtime::{error::napi_error, types::RuntimeObjectStorageHealth}; #[derive(Clone, Debug)] pub(in crate::backend_runtime) struct ObjectStorageConfig { @@ -75,8 +73,10 @@ struct UsePresignedUrlConfigFile { } impl ObjectStorageConfig { - pub(in crate::backend_runtime) fn from_config_files() -> Result> { - let Some(storage) = blob_storage_config_from_config_files()? else { + pub(in crate::backend_runtime) fn from_provider_config( + storage: Option, + ) -> Result> { + let Some(storage) = storage else { return Ok(None); }; @@ -190,7 +190,7 @@ impl ObjectStorageConfig { )) } - pub(super) fn health(&self) -> RuntimeObjectStorageHealth { + pub(in crate::backend_runtime) fn health(&self) -> RuntimeObjectStorageHealth { let client_buildable = self .build_client() .map(|client| client.non_destructive_health()) diff --git a/packages/backend/native/src/backend_runtime/object_storage/mod.rs b/packages/backend/native/src/backend_runtime/object_storage/mod.rs index 2962ff1c90..4918c68efd 100644 --- a/packages/backend/native/src/backend_runtime/object_storage/mod.rs +++ b/packages/backend/native/src/backend_runtime/object_storage/mod.rs @@ -21,12 +21,11 @@ use super::{ #[napi_derive::napi] impl BackendRuntime { fn object_storage_client(&self) -> Result { - self - .config + let storage = self + .config()? .storage - .as_ref() - .ok_or_else(|| super::error::napi_error("ObjectStorageClient is not configured"))? - .build_client() + .ok_or_else(|| super::error::napi_error("ObjectStorageClient is not configured"))?; + storage.build_client() } pub(super) async fn object_storage_delete_object(&self, key: &str) -> Result<()> { @@ -55,7 +54,7 @@ impl BackendRuntime { #[napi] pub fn object_storage_health(&self) -> RuntimeObjectStorageHealth { - match &self.config.storage { + match self.config().ok().and_then(|config| config.storage) { Some(storage) => storage.health(), None => RuntimeObjectStorageHealth { configured: false, diff --git a/packages/backend/native/src/backend_runtime/tests.rs b/packages/backend/native/src/backend_runtime/tests.rs index b29ad9ef8b..a74cb0a185 100644 --- a/packages/backend/native/src/backend_runtime/tests.rs +++ b/packages/backend/native/src/backend_runtime/tests.rs @@ -70,10 +70,10 @@ async fn runtime_from_database_url() -> AnyResult> { .context("cleanup runtime_leases for backend runtime tests")?; Ok(Some(BackendRuntime { - config: RuntimeConfig { + config: std::sync::RwLock::new(RuntimeConfig { database_url, storage: None, - }, + }), pool: Mutex::new(Some(pool)), })) } @@ -130,7 +130,7 @@ async fn runtime_gate_sql_semantics_are_atomic_and_ttl_bound() { let mut tasks = Vec::new(); for _ in 0..16 { let runtime = BackendRuntime { - config: runtime.config.clone(), + config: std::sync::RwLock::new(runtime.config().unwrap()), pool: Mutex::new(Some(runtime.pool().await.unwrap())), }; tasks.push(tokio::spawn(async move { @@ -189,7 +189,7 @@ async fn coordination_lease_sql_semantics_are_fenced_and_ttl_bound() { let mut tasks = Vec::new(); for index in 0..16 { let runtime = BackendRuntime { - config: runtime.config.clone(), + config: std::sync::RwLock::new(runtime.config().unwrap()), pool: Mutex::new(Some(runtime.pool().await.unwrap())), }; tasks.push(tokio::spawn(async move { @@ -393,7 +393,7 @@ async fn verification_token_sql_state_machine_handles_keep_verify_and_cleanup() let mut tasks = Vec::new(); for _ in 0..16 { let runtime = BackendRuntime { - config: runtime.config.clone(), + config: std::sync::RwLock::new(runtime.config().unwrap()), pool: Mutex::new(Some(runtime.pool().await.unwrap())), }; let token = concurrent_token.clone(); diff --git a/packages/backend/native/src/lib.rs b/packages/backend/native/src/lib.rs index ed3cc66bb7..b9c8c9fc2c 100644 --- a/packages/backend/native/src/lib.rs +++ b/packages/backend/native/src/lib.rs @@ -18,7 +18,7 @@ pub mod tiktoken; use affine_common::napi_utils::map_napi_err; use napi::{Result, Status, bindgen_prelude::*}; -use y_octo::Doc; +use y_octo::{Doc, Update}; #[cfg(not(target_arch = "arm"))] #[global_allocator] @@ -41,6 +41,16 @@ pub fn merge_updates_in_apply_way(updates: Vec) -> Result { Ok(buf.into()) } +/// Check whether a Yjs update binary can be decoded without applying it to a +/// document state. +#[napi(catch_unwind)] +pub async fn validate_doc_update(update: Buffer) -> Result { + let update = update.to_vec(); + tokio::task::spawn_blocking(move || Update::decode_v1(update).is_ok()) + .await + .map_err(|err| napi::Error::from_reason(format!("Doc update validation task failed: {err}"))) +} + #[napi] pub const AFFINE_PRO_PUBLIC_KEY: Option<&'static str> = std::option_env!("AFFINE_PRO_PUBLIC_KEY"); @@ -59,4 +69,10 @@ mod tests { }; assert_eq!(err.status, Status::GenericFailure); } + + #[test] + fn y_octo_update_decode_accepts_valid_update_and_rejects_invalid_update() { + assert!(Update::decode_v1(vec![0, 0]).is_ok()); + assert!(Update::decode_v1(vec![0]).is_err()); + } } diff --git a/packages/backend/server/src/core/backend-runtime/__tests__/blob-job.spec.ts b/packages/backend/server/src/core/backend-runtime/__tests__/blob-job.spec.ts new file mode 100644 index 0000000000..740aa947f8 --- /dev/null +++ b/packages/backend/server/src/core/backend-runtime/__tests__/blob-job.spec.ts @@ -0,0 +1,156 @@ +import ava, { TestFn } from 'ava'; +import Sinon from 'sinon'; + +import { BackendRuntimeBlobJob } from '../blob-job'; + +interface Context { + runtime: { + health: Sinon.SinonStub; + backfillMissingBlobMetadata: Sinon.SinonStub; + rebuildWorkspaceDocBlobRefs: Sinon.SinonStub; + planUnreferencedWorkspaceBlobs: Sinon.SinonStub; + executeBlobCleanupCandidates: Sinon.SinonStub; + }; + event: { + emitAsync: Sinon.SinonStub; + }; + queue: { + add: Sinon.SinonStub; + }; + db: { + workspace: { + findMany: Sinon.SinonStub; + }; + }; + job: BackendRuntimeBlobJob; +} + +const test = ava as TestFn; + +test.beforeEach(t => { + t.context.runtime = { + health: Sinon.stub().resolves({ + databaseConnected: true, + objectStorageConfigured: true, + }), + backfillMissingBlobMetadata: Sinon.stub(), + rebuildWorkspaceDocBlobRefs: Sinon.stub(), + planUnreferencedWorkspaceBlobs: Sinon.stub(), + executeBlobCleanupCandidates: Sinon.stub(), + }; + t.context.event = { + emitAsync: Sinon.stub().resolves(undefined), + }; + t.context.queue = { + add: Sinon.stub().resolves(undefined), + }; + t.context.db = { + workspace: { + findMany: Sinon.stub(), + }, + }; + t.context.job = new BackendRuntimeBlobJob( + t.context.runtime as any, + t.context.event as any, + t.context.queue as any, + t.context.db as any + ); +}); + +const objectStorageRequiredCases: { + name: string; + run: (context: Context) => Promise; + untouched: (context: Context) => Sinon.SinonStub[]; +}[] = [ + { + name: 'blob metadata backfill sweep', + run: context => context.job.backfillMissingBlobMetadataBySid({}), + untouched: context => [ + context.db.workspace.findMany, + context.runtime.backfillMissingBlobMetadata, + context.queue.add, + ], + }, + { + name: 'blob cleanup execution', + run: context => + context.job.executeBlobCleanupCandidates({ runId: 'run-1' }), + untouched: context => [ + context.runtime.executeBlobCleanupCandidates, + context.event.emitAsync, + ], + }, + { + name: 'blob cleanup planning sweep', + run: context => context.job.planUnreferencedWorkspaceBlobsBySid({}), + untouched: context => [ + context.db.workspace.findMany, + context.runtime.planUnreferencedWorkspaceBlobs, + context.queue.add, + ], + }, + { + name: 'blob cleanup planning', + run: context => + context.job.planUnreferencedWorkspaceBlobs({ + workspaceId: 'workspace-1', + }), + untouched: context => [context.runtime.planUnreferencedWorkspaceBlobs], + }, +]; + +for (const scenario of objectStorageRequiredCases) { + test(`${scenario.name} skips when object storage is not configured`, async t => { + t.context.runtime.health.resolves({ + databaseConnected: true, + objectStorageConfigured: false, + }); + + await scenario.run(t.context); + + t.true(t.context.runtime.health.calledOnce); + for (const stub of scenario.untouched(t.context)) { + t.false(stub.called); + } + }); +} + +test('doc blob refs sweep continues after one workspace fails', async t => { + t.context.db.workspace.findMany.resolves([ + { id: 'workspace-1', sid: 1 }, + { id: 'workspace-2', sid: 2 }, + ]); + t.context.runtime.rebuildWorkspaceDocBlobRefs + .onFirstCall() + .rejects(new Error('bad root doc')) + .onSecondCall() + .resolves({ + scannedDocs: 1, + parsedDocs: 1, + refsWritten: 0, + refsDeleted: 0, + failedDocs: 0, + nextCursor: null, + }); + + await t.context.job.rebuildWorkspaceDocBlobRefsBySid({ + workspaceLimit: 2, + docLimit: 100, + }); + + t.is(t.context.runtime.rebuildWorkspaceDocBlobRefs.callCount, 2); + t.deepEqual(t.context.runtime.rebuildWorkspaceDocBlobRefs.firstCall.args, [ + 'workspace-1', + 100, + ]); + t.deepEqual(t.context.runtime.rebuildWorkspaceDocBlobRefs.secondCall.args, [ + 'workspace-2', + 100, + ]); + t.true( + t.context.queue.add.calledWith( + 'backendRuntime.rebuildWorkspaceDocBlobRefsBySid', + { lastSid: 2, workspaceLimit: 2, docLimit: 100 } + ) + ); +}); diff --git a/packages/backend/server/src/core/backend-runtime/blob-job.ts b/packages/backend/server/src/core/backend-runtime/blob-job.ts index d1e0a0918e..b097e715de 100644 --- a/packages/backend/server/src/core/backend-runtime/blob-job.ts +++ b/packages/backend/server/src/core/backend-runtime/blob-job.ts @@ -99,6 +99,10 @@ export class BackendRuntimeBlobJob { workspaceLimit = 100, objectLimit = 1000, }: Jobs['backendRuntime.backfillMissingBlobMetadataBySid']) { + if (!(await this.hasObjectStorage('blob metadata backfill sweep'))) { + return; + } + const workspaces = await this.db.workspace.findMany({ where: { sid: { gt: lastSid } }, orderBy: { sid: 'asc' }, @@ -107,9 +111,16 @@ export class BackendRuntimeBlobJob { }); for (const workspace of workspaces) { - await this.drainBlobMetadataBackfill(workspace.id, objectLimit, { - sid: workspace.sid, - }); + try { + await this.drainBlobMetadataBackfill(workspace.id, objectLimit, { + sid: workspace.sid, + }); + } catch (err) { + this.logger.error( + `blob metadata backfill failed workspace=${workspace.id} sid=${workspace.sid}`, + err + ); + } } const nextSid = workspaces.at(-1)?.sid; @@ -192,6 +203,10 @@ export class BackendRuntimeBlobJob { workspaceId, limit = 1000, }: Jobs['backendRuntime.backfillMissingBlobMetadata']) { + if (!(await this.hasObjectStorage('blob metadata backfill'))) { + return; + } + await this.drainBlobMetadataBackfill(workspaceId, limit); } @@ -226,9 +241,16 @@ export class BackendRuntimeBlobJob { }); for (const workspace of workspaces) { - await this.drainWorkspaceDocBlobRefs(workspace.id, docLimit, { - sid: workspace.sid, - }); + try { + await this.drainWorkspaceDocBlobRefs(workspace.id, docLimit, { + sid: workspace.sid, + }); + } catch (err) { + this.logger.error( + `doc blob refs rebuild failed workspace=${workspace.id} sid=${workspace.sid}`, + err + ); + } } const nextSid = workspaces.at(-1)?.sid; @@ -247,6 +269,10 @@ export class BackendRuntimeBlobJob { gracePeriodDays = 30, limit = 1000, }: Jobs['backendRuntime.planUnreferencedWorkspaceBlobs']) { + if (!(await this.hasObjectStorage('blob cleanup planning'))) { + return; + } + const result = await this.rt.planUnreferencedWorkspaceBlobs( workspaceId, gracePeriodDays, @@ -264,6 +290,10 @@ export class BackendRuntimeBlobJob { gracePeriodDays = 30, limit = 1000, }: Jobs['backendRuntime.planUnreferencedWorkspaceBlobsBySid']) { + if (!(await this.hasObjectStorage('blob cleanup planning sweep'))) { + return; + } + const workspaces = await this.db.workspace.findMany({ where: { sid: { @@ -281,14 +311,21 @@ export class BackendRuntimeBlobJob { }); for (const workspace of workspaces) { - const result = await this.rt.planUnreferencedWorkspaceBlobs( - workspace.id, - gracePeriodDays, - limit - ); - this.logger.log( - `planned blob cleanup workspace=${workspace.id} sid=${workspace.sid} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}` - ); + try { + const result = await this.rt.planUnreferencedWorkspaceBlobs( + workspace.id, + gracePeriodDays, + limit + ); + this.logger.log( + `planned blob cleanup workspace=${workspace.id} sid=${workspace.sid} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}` + ); + } catch (err) { + this.logger.error( + `blob cleanup planning failed workspace=${workspace.id} sid=${workspace.sid}`, + err + ); + } } const nextSid = workspaces.at(-1)?.sid; @@ -308,6 +345,10 @@ export class BackendRuntimeBlobJob { gracePeriodDays = 30, limit = 1000, }: Jobs['backendRuntime.executeBlobCleanupCandidates']) { + if (!(await this.hasObjectStorage('blob cleanup execution'))) { + return; + } + const result = await this.rt.executeBlobCleanupCandidates( runId, gracePeriodDays, @@ -365,4 +406,16 @@ export class BackendRuntimeBlobJob { } } } + + private async hasObjectStorage(operation: string) { + const health = await this.rt.health(); + if (health.objectStorageConfigured) { + return true; + } + + this.logger.warn( + `skip ${operation}: BackendRuntime object storage is not configured` + ); + return false; + } } diff --git a/packages/backend/server/src/core/doc/adapters/userspace.ts b/packages/backend/server/src/core/doc/adapters/userspace.ts index 6853d05f93..52f33d13dc 100644 --- a/packages/backend/server/src/core/doc/adapters/userspace.ts +++ b/packages/backend/server/src/core/doc/adapters/userspace.ts @@ -55,6 +55,9 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter { return 0; } + updates = await this.filterValidDocUpdates(userId, docId, updates); + if (!updates.length) return 0; + await using _lock = await this.lockDocForUpdate(userId, docId); const snapshot = await this.getDocSnapshot(userId, docId); const now = Date.now(); diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts index ae00fb5986..c5c576b8a7 100644 --- a/packages/backend/server/src/core/doc/adapters/workspace.ts +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -61,6 +61,9 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { return 0; } + updates = await this.filterValidDocUpdates(workspaceId, docId, updates); + if (!updates.length) return 0; + const isNewDoc = !(await this.models.doc.exists(workspaceId, docId)); let pendings = updates; diff --git a/packages/backend/server/src/core/doc/storage/doc.ts b/packages/backend/server/src/core/doc/storage/doc.ts index fc204d96de..67b42bce6d 100644 --- a/packages/backend/server/src/core/doc/storage/doc.ts +++ b/packages/backend/server/src/core/doc/storage/doc.ts @@ -11,11 +11,15 @@ import { UndoManager, } from 'yjs'; -import { CallMetric } from '../../../base'; +import { CallMetric, metrics } from '../../../base'; +import { validateDocUpdate } from '../../../native'; import { applyUpdatesWithNative, mergeUpdatesWithYjs } from '../merge-updates'; import { Connection } from './connection'; import { SingletonLocker } from './lock'; +const DOC_UPDATE_VALIDATE_TIMEOUT_MS = 1000; +const DOC_UPDATE_VALIDATE_MAX_BYTES = 32 * 1024 * 1024; + async function nativeApplyUpdates(updates: Uint8Array[]): Promise { return applyUpdatesWithNative(updates, 'doc.storage.squash.native'); } @@ -81,6 +85,47 @@ export abstract class DocStorageAdapter extends Connection { ); } + protected async filterValidDocUpdates( + spaceId: string, + docId: string, + updates: Uint8Array[] + ) { + const valid: Uint8Array[] = []; + for (const update of updates) { + const reason = await this.invalidDocUpdateReason(update); + if (reason) { + metrics.doc.counter('doc_update_rejected').add(1, { reason }); + this.logger.warn( + `Dropped invalid doc update, spaceId: ${spaceId}, docId: ${docId}, reason: ${reason}, size: ${update.length}` + ); + continue; + } + valid.push(update); + } + return valid; + } + + private async invalidDocUpdateReason(update: Uint8Array) { + if (update.length === 2 && update[0] === 0 && update[1] === 0) { + return null; + } + if (update.length > DOC_UPDATE_VALIDATE_MAX_BYTES) { + return 'oversized'; + } + + try { + return (await validateDocUpdate(Buffer.from(update), { + timeoutMs: DOC_UPDATE_VALIDATE_TIMEOUT_MS, + })) + ? null + : 'invalid'; + } catch (err) { + this.logger.warn('Doc update validation failed', err); + metrics.doc.counter('doc_update_validation_failed').add(1); + return null; + } + } + async getDoc(spaceId: string, docId: string): Promise { await using _lock = await this.lockDocForUpdate(spaceId, docId); diff --git a/packages/backend/server/src/native.ts b/packages/backend/server/src/native.ts index fcaf6635db..9dcdba821d 100644 --- a/packages/backend/server/src/native.ts +++ b/packages/backend/server/src/native.ts @@ -162,6 +162,57 @@ import type { export const mergeUpdatesInApplyWay = serverNativeModule.mergeUpdatesInApplyWay; +export async function validateDocUpdate( + update: Buffer, + options: { signal?: AbortSignal; timeoutMs?: number } = {} +): Promise { + const signals = []; + if (options.signal) { + signals.push(options.signal); + } + if (options.timeoutMs !== undefined) { + signals.push(AbortSignal.timeout(options.timeoutMs)); + } + const signal = + signals.length === 0 + ? undefined + : signals.length === 1 + ? signals[0] + : AbortSignal.any(signals); + + if (signal?.aborted) { + throw signal.reason; + } + + return await new Promise((resolve, reject) => { + let settled = false; + const settle = (callback: () => void) => { + if (settled) return; + settled = true; + callback(); + }; + const onAbort = () => { + settle(() => + reject( + signal?.reason instanceof Error + ? signal.reason + : new Error('Doc update validation aborted') + ) + ); + }; + signal?.addEventListener('abort', onAbort, { once: true }); + serverNativeModule + .validateDocUpdate(update) + .then( + result => settle(() => resolve(result)), + error => settle(() => reject(error)) + ) + .finally(() => { + signal?.removeEventListener('abort', onAbort); + }); + }); +} + export const verifyChallengeResponse = async ( response: any, bits: number,