fix(server): config & update handle (#15173)

#### PR Dependency Tree


* **PR #15173** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added native document update validation to check incoming Yjs updates
for decodability before applying them.
* Introduced support for validation timeouts and cancellation during
update checks.
* Blob maintenance jobs now detect when object storage is unavailable
and skip related work gracefully.

* **Bug Fixes**
* Invalid (and oversized) updates are now filtered out earlier during
document ingestion.
* Background blob maintenance continues processing other work even if
one workspace fails.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-06-29 22:59:17 +08:00
committed by GitHub
parent 1b9e21f2de
commit a1363b3873
13 changed files with 517 additions and 60 deletions
+6
View File
@@ -514,6 +514,12 @@ export declare function llmValidateJsonSchema(schema: any, value: any): any
*/
export declare function mergeUpdatesInApplyWay(updates: Array<Buffer>): Buffer
/**
* Check whether a Yjs update binary can be decoded without applying it to a
* document state.
*/
export declare function validateDocUpdate(update: Buffer): Promise<boolean>
export declare function mintChallengeResponse(resource: string, bits?: number | undefined | null): Promise<string>
export interface ModelConditionsContract {
@@ -6,6 +6,8 @@ use std::{
use napi::Result;
use serde::Deserialize;
use serde_json::Map;
use sqlx::{PgPool, Row};
use super::{
error::napi_error,
@@ -20,26 +22,56 @@ pub(super) struct RuntimeConfig {
impl RuntimeConfig {
pub(super) fn from_config_files() -> Result<Self> {
let app_config = app_config_from_config_files()?;
let database_url = database_url_from_env()
.or(database_url_from_config_files()?)
.or(app_config.database_url())
.unwrap_or_else(|| "postgresql://localhost:5432/affine".to_string());
let storage = ObjectStorageConfig::from_config_files()?;
let storage = ObjectStorageConfig::from_provider_config(app_config.blob_storage_provider_config())?;
Ok(Self { database_url, storage })
}
pub(super) async fn with_db_overrides(&self, pool: &PgPool) -> Result<Self> {
let mut app_config = app_config_from_config_files()?;
app_config.apply_file_config(load_app_config_overrides_from_db(pool).await?);
Ok(Self {
// The DB override is loaded after this connection already exists, so it
// must not rewrite the active datasource URL.
database_url: self.database_url.clone(),
storage: ObjectStorageConfig::from_provider_config(app_config.blob_storage_provider_config())?,
})
}
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Default, Deserialize)]
struct AppConfigFile {
db: Option<DbConfigFile>,
#[serde(default)]
storages: Option<HashMap<String, StorageProviderConfig>>,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase")]
struct DbConfigFile {
datasource_url: Option<String>,
}
impl AppConfigFile {
fn database_url(&self) -> Option<String> {
self
.db
.as_ref()
.and_then(|db| db.datasource_url.clone())
.and_then(non_empty_string)
}
fn blob_storage_provider_config(&self) -> Option<StorageProviderConfig> {
self
.storages
.as_ref()
.and_then(|storages| storages.get("blob.storage").cloned())
}
}
fn database_url_from_env() -> Option<String> {
env::var("DATABASE_URL").ok().and_then(non_empty_string)
}
@@ -48,8 +80,8 @@ fn non_empty_string(value: String) -> Option<String> {
if value.trim().is_empty() { None } else { Some(value) }
}
fn database_url_from_config_files() -> Result<Option<String>> {
let mut database_url = None;
fn app_config_from_config_files() -> Result<AppConfigFile> {
let mut merged = AppConfigFile::default();
for path in config_json_paths() {
if !path.exists() {
continue;
@@ -58,30 +90,59 @@ fn database_url_from_config_files() -> Result<Option<String>> {
.map_err(|err| napi_error(format!("failed to read config file {}: {err}", path.display())))?;
let config: AppConfigFile = serde_json::from_str(&raw)
.map_err(|err| napi_error(format!("failed to parse config file {}: {err}", path.display())))?;
if let Some(next) = config.db.and_then(|db| db.datasource_url).and_then(non_empty_string) {
database_url = Some(next);
}
merged.apply_file_config(config);
}
Ok(database_url)
Ok(merged)
}
pub(super) fn blob_storage_config_from_config_files() -> Result<Option<StorageProviderConfig>> {
let mut storage = None;
for path in config_json_paths() {
if !path.exists() {
continue;
impl AppConfigFile {
fn apply_file_config(&mut self, config: AppConfigFile) {
if config.db.is_some() {
self.db = config.db;
}
let raw = fs::read_to_string(&path)
.map_err(|err| napi_error(format!("failed to read config file {}: {err}", path.display())))?;
let config: AppConfigFile = serde_json::from_str(&raw)
.map_err(|err| napi_error(format!("failed to parse config file {}: {err}", path.display())))?;
if let Some(next) = config.storages.and_then(|mut storages| storages.remove("blob.storage")) {
storage = Some(next);
if let Some(storages) = config.storages
&& !storages.is_empty()
{
self.storages.get_or_insert_with(HashMap::new).extend(storages);
}
}
}
async fn load_app_config_overrides_from_db(pool: &PgPool) -> Result<AppConfigFile> {
let rows = match sqlx::query("SELECT id, value FROM app_configs").fetch_all(pool).await {
Ok(rows) => rows,
Err(sqlx::Error::Database(err)) if err.code().as_deref() == Some("42P01") => return Ok(AppConfigFile::default()),
Err(err) => return Err(napi_error(format!("failed to load app config overrides: {err}"))),
};
app_config_from_flat_overrides(rows.into_iter().map(|row| {
let id: String = row.get("id");
let value: serde_json::Value = row.get("value");
(id, value)
}))
}
fn app_config_from_flat_overrides<I, S>(rows: I) -> Result<AppConfigFile>
where
I: IntoIterator<Item = (S, serde_json::Value)>,
S: AsRef<str>,
{
let mut root = Map::new();
for (path, value) in rows {
let Some((module, key)) = path.as_ref().split_once('.') else {
continue;
};
root
.entry(module.to_string())
.or_insert_with(|| serde_json::Value::Object(Map::new()));
if let Some(serde_json::Value::Object(module_object)) = root.get_mut(module) {
module_object.insert(key.to_string(), value);
}
}
Ok(storage)
serde_json::from_value(serde_json::Value::Object(root))
.map_err(|err| napi_error(format!("invalid app config overrides: {err}")))
}
pub(super) fn config_json_paths() -> Vec<PathBuf> {
@@ -142,4 +203,48 @@ mod tests {
Some("postgresql://affine:affine@localhost:5432/affine".to_string())
);
}
#[test]
fn parses_blob_storage_app_config_value() {
let app_config = app_config_from_flat_overrides([
(
"unknown.future.config",
serde_json::json!({
"shape": "ignored"
}),
),
(
"storages.blob.storage",
serde_json::json!({
"provider": "cloudflare-r2",
"bucket": "workspace-blobs-canary",
"config": {
"accountId": "account",
"credentials": {
"accessKeyId": "key",
"secretAccessKey": "secret"
},
"usePresignedURL": {
"enabled": true
}
}
}),
),
])
.unwrap();
let storage = app_config.blob_storage_provider_config().unwrap();
let config = ObjectStorageConfig::from_provider_config(Some(storage))
.unwrap()
.unwrap();
let health = config.health();
assert!(health.configured);
assert_eq!(health.provider.as_deref(), Some("cloudflare-r2"));
assert_eq!(health.bucket.as_deref(), Some("workspace-blobs-canary"));
assert_eq!(
health.endpoint.as_deref(),
Some("https://account.r2.cloudflarestorage.com")
);
assert!(health.use_presigned_url);
}
}
@@ -18,7 +18,7 @@ mod tests;
mod types;
mod workspace_stats;
use std::time::Duration;
use std::{sync::RwLock, time::Duration};
use napi::Result;
use sha2::{Digest, Sha256};
@@ -33,7 +33,7 @@ pub(super) fn token_hash(token: &str) -> String {
#[napi_derive::napi]
pub struct BackendRuntime {
config: RuntimeConfig,
config: RwLock<RuntimeConfig>,
pool: Mutex<Option<PgPool>>,
}
@@ -42,7 +42,7 @@ impl BackendRuntime {
#[napi(constructor)]
pub fn new() -> Result<Self> {
Ok(Self {
config: RuntimeConfig::from_config_files()?,
config: RwLock::new(RuntimeConfig::from_config_files()?),
pool: Mutex::new(None),
})
}
@@ -54,10 +54,11 @@ impl BackendRuntime {
return Ok(());
}
let database_url = self.config()?.database_url;
let pool = PgPoolOptions::new()
.max_connections(5)
.acquire_timeout(Duration::from_secs(5))
.connect(&self.config.database_url)
.connect(&database_url)
.await
.map_err(|err| napi_error(format!("BackendRuntime failed to connect postgres: {err}")))?;
@@ -66,6 +67,9 @@ impl BackendRuntime {
.await
.map_err(|err| napi_error(format!("BackendRuntime postgres health check failed: {err}")))?;
let config = self.config()?.with_db_overrides(&pool).await?;
self.update_config(config)?;
*guard = Some(pool);
Ok(())
}
@@ -94,7 +98,7 @@ impl BackendRuntime {
Ok(BackendRuntimeHealth {
started: pool.is_some(),
database_connected,
object_storage_configured: self.config.storage.is_some(),
object_storage_configured: self.config()?.storage.is_some(),
})
}
@@ -113,6 +117,22 @@ impl BackendRuntime {
.cloned()
.ok_or_else(|| napi_error("BackendRuntime must be started before using postgres operations"))
}
pub(in crate::backend_runtime) fn config(&self) -> Result<RuntimeConfig> {
self
.config
.read()
.map(|config| config.clone())
.map_err(|_| napi_error("BackendRuntime config lock poisoned"))
}
fn update_config(&self, config: RuntimeConfig) -> Result<()> {
*self
.config
.write()
.map_err(|_| napi_error("BackendRuntime config lock poisoned"))? = config;
Ok(())
}
}
async fn migrate_runtime_tables(pool: &PgPool) -> Result<()> {
@@ -5,9 +5,7 @@ use napi::Result;
use serde::Deserialize;
use super::{client::ObjectStorageClient, types::StorageProviderConfig};
use crate::backend_runtime::{
config::blob_storage_config_from_config_files, error::napi_error, types::RuntimeObjectStorageHealth,
};
use crate::backend_runtime::{error::napi_error, types::RuntimeObjectStorageHealth};
#[derive(Clone, Debug)]
pub(in crate::backend_runtime) struct ObjectStorageConfig {
@@ -75,8 +73,10 @@ struct UsePresignedUrlConfigFile {
}
impl ObjectStorageConfig {
pub(in crate::backend_runtime) fn from_config_files() -> Result<Option<Self>> {
let Some(storage) = blob_storage_config_from_config_files()? else {
pub(in crate::backend_runtime) fn from_provider_config(
storage: Option<StorageProviderConfig>,
) -> Result<Option<Self>> {
let Some(storage) = storage else {
return Ok(None);
};
@@ -190,7 +190,7 @@ impl ObjectStorageConfig {
))
}
pub(super) fn health(&self) -> RuntimeObjectStorageHealth {
pub(in crate::backend_runtime) fn health(&self) -> RuntimeObjectStorageHealth {
let client_buildable = self
.build_client()
.map(|client| client.non_destructive_health())
@@ -21,12 +21,11 @@ use super::{
#[napi_derive::napi]
impl BackendRuntime {
fn object_storage_client(&self) -> Result<ObjectStorageClient> {
self
.config
let storage = self
.config()?
.storage
.as_ref()
.ok_or_else(|| super::error::napi_error("ObjectStorageClient is not configured"))?
.build_client()
.ok_or_else(|| super::error::napi_error("ObjectStorageClient is not configured"))?;
storage.build_client()
}
pub(super) async fn object_storage_delete_object(&self, key: &str) -> Result<()> {
@@ -55,7 +54,7 @@ impl BackendRuntime {
#[napi]
pub fn object_storage_health(&self) -> RuntimeObjectStorageHealth {
match &self.config.storage {
match self.config().ok().and_then(|config| config.storage) {
Some(storage) => storage.health(),
None => RuntimeObjectStorageHealth {
configured: false,
@@ -70,10 +70,10 @@ async fn runtime_from_database_url() -> AnyResult<Option<BackendRuntime>> {
.context("cleanup runtime_leases for backend runtime tests")?;
Ok(Some(BackendRuntime {
config: RuntimeConfig {
config: std::sync::RwLock::new(RuntimeConfig {
database_url,
storage: None,
},
}),
pool: Mutex::new(Some(pool)),
}))
}
@@ -130,7 +130,7 @@ async fn runtime_gate_sql_semantics_are_atomic_and_ttl_bound() {
let mut tasks = Vec::new();
for _ in 0..16 {
let runtime = BackendRuntime {
config: runtime.config.clone(),
config: std::sync::RwLock::new(runtime.config().unwrap()),
pool: Mutex::new(Some(runtime.pool().await.unwrap())),
};
tasks.push(tokio::spawn(async move {
@@ -189,7 +189,7 @@ async fn coordination_lease_sql_semantics_are_fenced_and_ttl_bound() {
let mut tasks = Vec::new();
for index in 0..16 {
let runtime = BackendRuntime {
config: runtime.config.clone(),
config: std::sync::RwLock::new(runtime.config().unwrap()),
pool: Mutex::new(Some(runtime.pool().await.unwrap())),
};
tasks.push(tokio::spawn(async move {
@@ -393,7 +393,7 @@ async fn verification_token_sql_state_machine_handles_keep_verify_and_cleanup()
let mut tasks = Vec::new();
for _ in 0..16 {
let runtime = BackendRuntime {
config: runtime.config.clone(),
config: std::sync::RwLock::new(runtime.config().unwrap()),
pool: Mutex::new(Some(runtime.pool().await.unwrap())),
};
let token = concurrent_token.clone();
+17 -1
View File
@@ -18,7 +18,7 @@ pub mod tiktoken;
use affine_common::napi_utils::map_napi_err;
use napi::{Result, Status, bindgen_prelude::*};
use y_octo::Doc;
use y_octo::{Doc, Update};
#[cfg(not(target_arch = "arm"))]
#[global_allocator]
@@ -41,6 +41,16 @@ pub fn merge_updates_in_apply_way(updates: Vec<Buffer>) -> Result<Buffer> {
Ok(buf.into())
}
/// Check whether a Yjs update binary can be decoded without applying it to a
/// document state.
#[napi(catch_unwind)]
pub async fn validate_doc_update(update: Buffer) -> Result<bool> {
let update = update.to_vec();
tokio::task::spawn_blocking(move || Update::decode_v1(update).is_ok())
.await
.map_err(|err| napi::Error::from_reason(format!("Doc update validation task failed: {err}")))
}
#[napi]
pub const AFFINE_PRO_PUBLIC_KEY: Option<&'static str> = std::option_env!("AFFINE_PRO_PUBLIC_KEY");
@@ -59,4 +69,10 @@ mod tests {
};
assert_eq!(err.status, Status::GenericFailure);
}
#[test]
fn y_octo_update_decode_accepts_valid_update_and_rejects_invalid_update() {
assert!(Update::decode_v1(vec![0, 0]).is_ok());
assert!(Update::decode_v1(vec![0]).is_err());
}
}
@@ -0,0 +1,156 @@
import ava, { TestFn } from 'ava';
import Sinon from 'sinon';
import { BackendRuntimeBlobJob } from '../blob-job';
interface Context {
runtime: {
health: Sinon.SinonStub;
backfillMissingBlobMetadata: Sinon.SinonStub;
rebuildWorkspaceDocBlobRefs: Sinon.SinonStub;
planUnreferencedWorkspaceBlobs: Sinon.SinonStub;
executeBlobCleanupCandidates: Sinon.SinonStub;
};
event: {
emitAsync: Sinon.SinonStub;
};
queue: {
add: Sinon.SinonStub;
};
db: {
workspace: {
findMany: Sinon.SinonStub;
};
};
job: BackendRuntimeBlobJob;
}
const test = ava as TestFn<Context>;
test.beforeEach(t => {
t.context.runtime = {
health: Sinon.stub().resolves({
databaseConnected: true,
objectStorageConfigured: true,
}),
backfillMissingBlobMetadata: Sinon.stub(),
rebuildWorkspaceDocBlobRefs: Sinon.stub(),
planUnreferencedWorkspaceBlobs: Sinon.stub(),
executeBlobCleanupCandidates: Sinon.stub(),
};
t.context.event = {
emitAsync: Sinon.stub().resolves(undefined),
};
t.context.queue = {
add: Sinon.stub().resolves(undefined),
};
t.context.db = {
workspace: {
findMany: Sinon.stub(),
},
};
t.context.job = new BackendRuntimeBlobJob(
t.context.runtime as any,
t.context.event as any,
t.context.queue as any,
t.context.db as any
);
});
const objectStorageRequiredCases: {
name: string;
run: (context: Context) => Promise<unknown>;
untouched: (context: Context) => Sinon.SinonStub[];
}[] = [
{
name: 'blob metadata backfill sweep',
run: context => context.job.backfillMissingBlobMetadataBySid({}),
untouched: context => [
context.db.workspace.findMany,
context.runtime.backfillMissingBlobMetadata,
context.queue.add,
],
},
{
name: 'blob cleanup execution',
run: context =>
context.job.executeBlobCleanupCandidates({ runId: 'run-1' }),
untouched: context => [
context.runtime.executeBlobCleanupCandidates,
context.event.emitAsync,
],
},
{
name: 'blob cleanup planning sweep',
run: context => context.job.planUnreferencedWorkspaceBlobsBySid({}),
untouched: context => [
context.db.workspace.findMany,
context.runtime.planUnreferencedWorkspaceBlobs,
context.queue.add,
],
},
{
name: 'blob cleanup planning',
run: context =>
context.job.planUnreferencedWorkspaceBlobs({
workspaceId: 'workspace-1',
}),
untouched: context => [context.runtime.planUnreferencedWorkspaceBlobs],
},
];
for (const scenario of objectStorageRequiredCases) {
test(`${scenario.name} skips when object storage is not configured`, async t => {
t.context.runtime.health.resolves({
databaseConnected: true,
objectStorageConfigured: false,
});
await scenario.run(t.context);
t.true(t.context.runtime.health.calledOnce);
for (const stub of scenario.untouched(t.context)) {
t.false(stub.called);
}
});
}
test('doc blob refs sweep continues after one workspace fails', async t => {
t.context.db.workspace.findMany.resolves([
{ id: 'workspace-1', sid: 1 },
{ id: 'workspace-2', sid: 2 },
]);
t.context.runtime.rebuildWorkspaceDocBlobRefs
.onFirstCall()
.rejects(new Error('bad root doc'))
.onSecondCall()
.resolves({
scannedDocs: 1,
parsedDocs: 1,
refsWritten: 0,
refsDeleted: 0,
failedDocs: 0,
nextCursor: null,
});
await t.context.job.rebuildWorkspaceDocBlobRefsBySid({
workspaceLimit: 2,
docLimit: 100,
});
t.is(t.context.runtime.rebuildWorkspaceDocBlobRefs.callCount, 2);
t.deepEqual(t.context.runtime.rebuildWorkspaceDocBlobRefs.firstCall.args, [
'workspace-1',
100,
]);
t.deepEqual(t.context.runtime.rebuildWorkspaceDocBlobRefs.secondCall.args, [
'workspace-2',
100,
]);
t.true(
t.context.queue.add.calledWith(
'backendRuntime.rebuildWorkspaceDocBlobRefsBySid',
{ lastSid: 2, workspaceLimit: 2, docLimit: 100 }
)
);
});
@@ -99,6 +99,10 @@ export class BackendRuntimeBlobJob {
workspaceLimit = 100,
objectLimit = 1000,
}: Jobs['backendRuntime.backfillMissingBlobMetadataBySid']) {
if (!(await this.hasObjectStorage('blob metadata backfill sweep'))) {
return;
}
const workspaces = await this.db.workspace.findMany({
where: { sid: { gt: lastSid } },
orderBy: { sid: 'asc' },
@@ -107,9 +111,16 @@ export class BackendRuntimeBlobJob {
});
for (const workspace of workspaces) {
await this.drainBlobMetadataBackfill(workspace.id, objectLimit, {
sid: workspace.sid,
});
try {
await this.drainBlobMetadataBackfill(workspace.id, objectLimit, {
sid: workspace.sid,
});
} catch (err) {
this.logger.error(
`blob metadata backfill failed workspace=${workspace.id} sid=${workspace.sid}`,
err
);
}
}
const nextSid = workspaces.at(-1)?.sid;
@@ -192,6 +203,10 @@ export class BackendRuntimeBlobJob {
workspaceId,
limit = 1000,
}: Jobs['backendRuntime.backfillMissingBlobMetadata']) {
if (!(await this.hasObjectStorage('blob metadata backfill'))) {
return;
}
await this.drainBlobMetadataBackfill(workspaceId, limit);
}
@@ -226,9 +241,16 @@ export class BackendRuntimeBlobJob {
});
for (const workspace of workspaces) {
await this.drainWorkspaceDocBlobRefs(workspace.id, docLimit, {
sid: workspace.sid,
});
try {
await this.drainWorkspaceDocBlobRefs(workspace.id, docLimit, {
sid: workspace.sid,
});
} catch (err) {
this.logger.error(
`doc blob refs rebuild failed workspace=${workspace.id} sid=${workspace.sid}`,
err
);
}
}
const nextSid = workspaces.at(-1)?.sid;
@@ -247,6 +269,10 @@ export class BackendRuntimeBlobJob {
gracePeriodDays = 30,
limit = 1000,
}: Jobs['backendRuntime.planUnreferencedWorkspaceBlobs']) {
if (!(await this.hasObjectStorage('blob cleanup planning'))) {
return;
}
const result = await this.rt.planUnreferencedWorkspaceBlobs(
workspaceId,
gracePeriodDays,
@@ -264,6 +290,10 @@ export class BackendRuntimeBlobJob {
gracePeriodDays = 30,
limit = 1000,
}: Jobs['backendRuntime.planUnreferencedWorkspaceBlobsBySid']) {
if (!(await this.hasObjectStorage('blob cleanup planning sweep'))) {
return;
}
const workspaces = await this.db.workspace.findMany({
where: {
sid: {
@@ -281,14 +311,21 @@ export class BackendRuntimeBlobJob {
});
for (const workspace of workspaces) {
const result = await this.rt.planUnreferencedWorkspaceBlobs(
workspace.id,
gracePeriodDays,
limit
);
this.logger.log(
`planned blob cleanup workspace=${workspace.id} sid=${workspace.sid} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}`
);
try {
const result = await this.rt.planUnreferencedWorkspaceBlobs(
workspace.id,
gracePeriodDays,
limit
);
this.logger.log(
`planned blob cleanup workspace=${workspace.id} sid=${workspace.sid} run=${result.runId} candidates=${result.candidatesMarked} scanned=${result.scannedBlobs}`
);
} catch (err) {
this.logger.error(
`blob cleanup planning failed workspace=${workspace.id} sid=${workspace.sid}`,
err
);
}
}
const nextSid = workspaces.at(-1)?.sid;
@@ -308,6 +345,10 @@ export class BackendRuntimeBlobJob {
gracePeriodDays = 30,
limit = 1000,
}: Jobs['backendRuntime.executeBlobCleanupCandidates']) {
if (!(await this.hasObjectStorage('blob cleanup execution'))) {
return;
}
const result = await this.rt.executeBlobCleanupCandidates(
runId,
gracePeriodDays,
@@ -365,4 +406,16 @@ export class BackendRuntimeBlobJob {
}
}
}
private async hasObjectStorage(operation: string) {
const health = await this.rt.health();
if (health.objectStorageConfigured) {
return true;
}
this.logger.warn(
`skip ${operation}: BackendRuntime object storage is not configured`
);
return false;
}
}
@@ -55,6 +55,9 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
return 0;
}
updates = await this.filterValidDocUpdates(userId, docId, updates);
if (!updates.length) return 0;
await using _lock = await this.lockDocForUpdate(userId, docId);
const snapshot = await this.getDocSnapshot(userId, docId);
const now = Date.now();
@@ -61,6 +61,9 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
return 0;
}
updates = await this.filterValidDocUpdates(workspaceId, docId, updates);
if (!updates.length) return 0;
const isNewDoc = !(await this.models.doc.exists(workspaceId, docId));
let pendings = updates;
@@ -11,11 +11,15 @@ import {
UndoManager,
} from 'yjs';
import { CallMetric } from '../../../base';
import { CallMetric, metrics } from '../../../base';
import { validateDocUpdate } from '../../../native';
import { applyUpdatesWithNative, mergeUpdatesWithYjs } from '../merge-updates';
import { Connection } from './connection';
import { SingletonLocker } from './lock';
const DOC_UPDATE_VALIDATE_TIMEOUT_MS = 1000;
const DOC_UPDATE_VALIDATE_MAX_BYTES = 32 * 1024 * 1024;
async function nativeApplyUpdates(updates: Uint8Array[]): Promise<Uint8Array> {
return applyUpdatesWithNative(updates, 'doc.storage.squash.native');
}
@@ -81,6 +85,47 @@ export abstract class DocStorageAdapter extends Connection {
);
}
protected async filterValidDocUpdates(
spaceId: string,
docId: string,
updates: Uint8Array[]
) {
const valid: Uint8Array[] = [];
for (const update of updates) {
const reason = await this.invalidDocUpdateReason(update);
if (reason) {
metrics.doc.counter('doc_update_rejected').add(1, { reason });
this.logger.warn(
`Dropped invalid doc update, spaceId: ${spaceId}, docId: ${docId}, reason: ${reason}, size: ${update.length}`
);
continue;
}
valid.push(update);
}
return valid;
}
private async invalidDocUpdateReason(update: Uint8Array) {
if (update.length === 2 && update[0] === 0 && update[1] === 0) {
return null;
}
if (update.length > DOC_UPDATE_VALIDATE_MAX_BYTES) {
return 'oversized';
}
try {
return (await validateDocUpdate(Buffer.from(update), {
timeoutMs: DOC_UPDATE_VALIDATE_TIMEOUT_MS,
}))
? null
: 'invalid';
} catch (err) {
this.logger.warn('Doc update validation failed', err);
metrics.doc.counter('doc_update_validation_failed').add(1);
return null;
}
}
async getDoc(spaceId: string, docId: string): Promise<DocRecord | null> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
+51
View File
@@ -162,6 +162,57 @@ import type {
export const mergeUpdatesInApplyWay = serverNativeModule.mergeUpdatesInApplyWay;
export async function validateDocUpdate(
update: Buffer,
options: { signal?: AbortSignal; timeoutMs?: number } = {}
): Promise<boolean> {
const signals = [];
if (options.signal) {
signals.push(options.signal);
}
if (options.timeoutMs !== undefined) {
signals.push(AbortSignal.timeout(options.timeoutMs));
}
const signal =
signals.length === 0
? undefined
: signals.length === 1
? signals[0]
: AbortSignal.any(signals);
if (signal?.aborted) {
throw signal.reason;
}
return await new Promise<boolean>((resolve, reject) => {
let settled = false;
const settle = (callback: () => void) => {
if (settled) return;
settled = true;
callback();
};
const onAbort = () => {
settle(() =>
reject(
signal?.reason instanceof Error
? signal.reason
: new Error('Doc update validation aborted')
)
);
};
signal?.addEventListener('abort', onAbort, { once: true });
serverNativeModule
.validateDocUpdate(update)
.then(
result => settle(() => resolve(result)),
error => settle(() => reject(error))
)
.finally(() => {
signal?.removeEventListener('abort', onAbort);
});
});
}
export const verifyChallengeResponse = async (
response: any,
bits: number,