feat: improve mobile oom

This commit is contained in:
DarkSky
2026-02-19 22:51:57 +08:00
parent d8cc0acdd0
commit ad52c46f0a
9 changed files with 374 additions and 33 deletions

10
Cargo.lock generated
View File

@@ -111,6 +111,7 @@ dependencies = [
"base64-simd",
"chrono",
"homedir",
"lru",
"objc2",
"objc2-foundation",
"sqlx",
@@ -2572,6 +2573,15 @@ dependencies = [
"weezl",
]
[[package]]
name = "lru"
version = "0.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593"
dependencies = [
"hashbrown 0.16.1",
]
[[package]]
name = "mac"
version = "0.1.1"

View File

@@ -46,6 +46,7 @@ resolver = "3"
libc = "0.2"
log = "0.4"
loom = { version = "0.7", features = ["checkpoint"] }
lru = "0.16"
memory-indexer = "0.3.0"
mimalloc = "0.1"
mp4parse = "0.17"

View File

@@ -2,7 +2,7 @@ import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
export interface Blob {
key: string;
// base64 encoded data
// base64 encoded data, or "__AFFINE_BLOB_FILE__:<absolutePath>" for large blobs
data: string;
mime: string;
size: number;

View File

@@ -12,13 +12,14 @@ import {
parseUniversalId,
} from '@affine/nbstore';
import { type NativeDBApis } from '@affine/nbstore/sqlite';
import { registerPlugin } from '@capacitor/core';
import { Capacitor, registerPlugin } from '@capacitor/core';
import type { NbStorePlugin } from './definitions';
export * from './definitions';
export const NbStore = registerPlugin<NbStorePlugin>('NbStoreDocStorage');
const ANDROID_BLOB_FILE_PREFIX = '__AFFINE_BLOB_FILE__:';
export const NbStoreNativeDBApis: NativeDBApis = {
connect: async function (id: string): Promise<void> {
@@ -132,14 +133,36 @@ export const NbStoreNativeDBApis: NativeDBApis = {
id,
key,
});
return record
? {
data: base64ToUint8Array(record.data),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
}
: null;
if (!record) {
return null;
}
if (record.data.startsWith(ANDROID_BLOB_FILE_PREFIX)) {
const filePath = record.data.slice(ANDROID_BLOB_FILE_PREFIX.length);
const normalizedPath = filePath.startsWith('file://')
? filePath
: `file://${filePath}`;
const response = await fetch(Capacitor.convertFileSrc(normalizedPath));
if (!response.ok) {
throw new Error(
`Failed to read blob file: ${filePath} (status ${response.status})`
);
}
const buffer = await response.arrayBuffer();
return {
data: new Uint8Array(buffer),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
};
}
return {
data: base64ToUint8Array(record.data),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
};
},
setBlob: async function (id: string, blob: BlobRecord): Promise<void> {
await NbStore.setBlob({

View File

@@ -2,7 +2,7 @@ import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
export interface Blob {
key: string;
// base64 encoded data
// base64 encoded data, or "__AFFINE_BLOB_FILE__:<absolutePath>" for large blobs
data: string;
mime: string;
size: number;

View File

@@ -12,13 +12,14 @@ import {
parseUniversalId,
} from '@affine/nbstore';
import { type NativeDBApis } from '@affine/nbstore/sqlite';
import { registerPlugin } from '@capacitor/core';
import { Capacitor, registerPlugin } from '@capacitor/core';
import type { NbStorePlugin } from './definitions';
export * from './definitions';
export const NbStore = registerPlugin<NbStorePlugin>('NbStoreDocStorage');
const MOBILE_BLOB_FILE_PREFIX = '__AFFINE_BLOB_FILE__:';
export const NbStoreNativeDBApis: NativeDBApis = {
connect: async function (id: string): Promise<void> {
@@ -132,14 +133,36 @@ export const NbStoreNativeDBApis: NativeDBApis = {
id,
key,
});
return record
? {
data: base64ToUint8Array(record.data),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
}
: null;
if (!record) {
return null;
}
if (record.data.startsWith(MOBILE_BLOB_FILE_PREFIX)) {
const filePath = record.data.slice(MOBILE_BLOB_FILE_PREFIX.length);
const normalizedPath = filePath.startsWith('file://')
? filePath
: `file://${filePath}`;
const response = await fetch(Capacitor.convertFileSrc(normalizedPath));
if (!response.ok) {
throw new Error(
`Failed to read blob file: ${filePath} (status ${response.status})`
);
}
const buffer = await response.arrayBuffer();
return {
data: new Uint8Array(buffer),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
};
}
return {
data: base64ToUint8Array(record.data),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
};
},
setBlob: async function (id: string, blob: BlobRecord): Promise<void> {
await NbStore.setBlob({

View File

@@ -38,5 +38,8 @@ objc2-foundation = { workspace = true, features = [
[target.'cfg(not(any(target_os = "ios", target_os = "macos")))'.dependencies]
homedir = { workspace = true }
[target.'cfg(any(target_os = "android", target_os = "ios"))'.dependencies]
lru = { workspace = true }
[build-dependencies]
uniffi = { workspace = true, features = ["build"] }

View File

@@ -1,5 +1,9 @@
use affine_common::hashcash::Stamp;
use affine_nbstore::{Data, pool::SqliteDocStoragePool};
#[cfg(any(target_os = "android", target_os = "ios"))]
pub(crate) mod mobile_blob_cache;
#[cfg(any(target_os = "android", target_os = "ios"))]
use mobile_blob_cache::{MOBILE_BLOB_INLINE_THRESHOLD_BYTES, MobileBlobCache};
#[derive(uniffi::Error, thiserror::Error, Debug)]
pub enum UniffiError {
@@ -171,7 +175,8 @@ impl TryFrom<DocClock> for affine_nbstore::DocClock {
#[derive(uniffi::Record)]
pub struct Blob {
pub key: String,
// base64 encoded data
// base64 encoded data; on mobile large blobs this is a file-path token prefixed
// with "__AFFINE_BLOB_FILE__:"
pub data: String,
pub mime: String,
pub size: i64,
@@ -314,12 +319,16 @@ impl From<affine_nbstore::indexer::NativeMatch> for MatchRange {
#[derive(uniffi::Object)]
pub struct DocStoragePool {
inner: SqliteDocStoragePool,
#[cfg(any(target_os = "android", target_os = "ios"))]
mobile_blob_cache: MobileBlobCache,
}
#[uniffi::export]
pub fn new_doc_storage_pool() -> DocStoragePool {
DocStoragePool {
inner: Default::default(),
#[cfg(any(target_os = "android", target_os = "ios"))]
mobile_blob_cache: MobileBlobCache::new(),
}
}
@@ -327,10 +336,18 @@ pub fn new_doc_storage_pool() -> DocStoragePool {
impl DocStoragePool {
/// Initialize the database and run migrations.
pub async fn connect(&self, universal_id: String, path: String) -> Result<()> {
Ok(self.inner.connect(universal_id, path).await?)
self.inner.connect(universal_id.clone(), path.clone()).await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self
.mobile_blob_cache
.register_workspace(&universal_id, &path)
.map_err(|err| UniffiError::Err(format!("Failed to initialize mobile blob cache: {err}")))?;
Ok(())
}
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_workspace(&universal_id);
self.inner.disconnect(universal_id).await?;
Ok(())
}
@@ -454,26 +471,70 @@ impl DocStoragePool {
}
pub async fn get_blob(&self, universal_id: String, key: String) -> Result<Option<Blob>> {
Ok(self.inner.get(universal_id).await?.get_blob(key).await?.map(Into::into))
#[cfg(any(target_os = "android", target_os = "ios"))]
{
if let Some(blob) = self.mobile_blob_cache.get_blob(&universal_id, &key) {
return Ok(Some(blob));
}
let Some(blob) = self
.inner
.get(universal_id.clone())
.await?
.get_blob(key.clone())
.await?
else {
return Ok(None);
};
if blob.data.len() < MOBILE_BLOB_INLINE_THRESHOLD_BYTES {
return Ok(Some(blob.into()));
}
return self
.mobile_blob_cache
.cache_blob(&universal_id, &blob)
.map(Some)
.map_err(|err| UniffiError::Err(format!("Failed to cache blob file: {err}")));
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
{
Ok(self.inner.get(universal_id).await?.get_blob(key).await?.map(Into::into))
}
}
pub async fn set_blob(&self, universal_id: String, blob: SetBlob) -> Result<()> {
Ok(self.inner.get(universal_id).await?.set_blob(blob.try_into()?).await?)
#[cfg(any(target_os = "android", target_os = "ios"))]
let key = blob.key.clone();
self
.inner
.get(universal_id.clone())
.await?
.set_blob(blob.try_into()?)
.await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_blob(&universal_id, &key);
Ok(())
}
pub async fn delete_blob(&self, universal_id: String, key: String, permanently: bool) -> Result<()> {
Ok(
self
.inner
.get(universal_id)
.await?
.delete_blob(key, permanently)
.await?,
)
self
.inner
.get(universal_id.clone())
.await?
.delete_blob(key.clone(), permanently)
.await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_blob(&universal_id, &key);
Ok(())
}
pub async fn release_blobs(&self, universal_id: String) -> Result<()> {
Ok(self.inner.get(universal_id).await?.release_blobs().await?)
self.inner.get(universal_id.clone()).await?.release_blobs().await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_workspace(&universal_id);
Ok(())
}
pub async fn list_blobs(&self, universal_id: String) -> Result<Vec<ListedBlob>> {

View File

@@ -0,0 +1,220 @@
use std::{
collections::{HashMap, hash_map::DefaultHasher},
hash::{Hash, Hasher},
num::NonZeroUsize,
path::{Path, PathBuf},
sync::Mutex,
};
pub(crate) const MOBILE_BLOB_INLINE_THRESHOLD_BYTES: usize = 1024 * 1024;
const MOBILE_BLOB_CACHE_CAPACITY: usize = 32;
const MOBILE_BLOB_CACHE_DIR: &str = "nbstore-blob-cache";
const MOBILE_BLOB_FILE_PREFIX: &str = "__AFFINE_BLOB_FILE__:";
#[derive(Clone)]
struct MobileBlobCacheEntry {
key: String,
path: String,
mime: String,
size: i64,
created_at: i64,
}
impl MobileBlobCacheEntry {
fn to_blob(&self) -> crate::Blob {
crate::Blob {
key: self.key.clone(),
data: format!("{MOBILE_BLOB_FILE_PREFIX}{}", self.path),
mime: self.mime.clone(),
size: self.size,
created_at: self.created_at,
}
}
}
pub(crate) struct MobileBlobCache {
workspace_dirs: Mutex<HashMap<String, PathBuf>>,
entries: Mutex<lru::LruCache<String, MobileBlobCacheEntry>>,
}
impl MobileBlobCache {
pub(crate) fn new() -> Self {
Self {
workspace_dirs: Mutex::new(HashMap::new()),
entries: Mutex::new(lru::LruCache::new(
NonZeroUsize::new(MOBILE_BLOB_CACHE_CAPACITY).expect("cache capacity is non-zero"),
)),
}
}
pub(crate) fn register_workspace(&self, universal_id: &str, database_path: &str) -> std::io::Result<()> {
let cache_dir = Self::system_cache_dir(database_path, universal_id);
std::fs::create_dir_all(&cache_dir)?;
Self::cleanup_cache_dir(&cache_dir)?;
self
.workspace_dirs
.lock()
.expect("workspace cache lock poisoned")
.insert(universal_id.to_string(), cache_dir);
Ok(())
}
pub(crate) fn get_blob(&self, universal_id: &str, key: &str) -> Option<crate::Blob> {
let cache_key = Self::cache_key(universal_id, key);
let mut entries = self.entries.lock().expect("blob cache lock poisoned");
if let Some(entry) = entries.get(&cache_key) {
if Path::new(&entry.path).exists() {
return Some(entry.to_blob());
}
}
if let Some(entry) = entries.pop(&cache_key) {
Self::delete_blob_file(&entry.path);
}
None
}
pub(crate) fn cache_blob(&self, universal_id: &str, blob: &affine_nbstore::Blob) -> std::io::Result<crate::Blob> {
let cache_key = Self::cache_key(universal_id, &blob.key);
let cache_dir = self.resolve_cache_dir(universal_id);
std::fs::create_dir_all(&cache_dir)?;
let file_path = Self::blob_file_path(&cache_dir, &cache_key);
std::fs::write(&file_path, &blob.data)?;
let entry = MobileBlobCacheEntry {
key: blob.key.clone(),
path: file_path.to_string_lossy().into_owned(),
mime: blob.mime.clone(),
size: blob.size,
created_at: blob.created_at.and_utc().timestamp_millis(),
};
let mut entries = self.entries.lock().expect("blob cache lock poisoned");
if let Some((_previous_key, previous)) = entries.push(cache_key, entry.clone()) {
if previous.path != entry.path {
Self::delete_blob_file(&previous.path);
}
}
Ok(entry.to_blob())
}
pub(crate) fn invalidate_blob(&self, universal_id: &str, key: &str) {
let cache_key = Self::cache_key(universal_id, key);
if let Some(entry) = self.entries.lock().expect("blob cache lock poisoned").pop(&cache_key) {
Self::delete_blob_file(&entry.path);
}
}
pub(crate) fn invalidate_workspace(&self, universal_id: &str) {
let prefix = format!("{universal_id}\u{1f}");
let mut entries = self.entries.lock().expect("blob cache lock poisoned");
let keys = entries
.iter()
.filter_map(|(key, _)| key.starts_with(&prefix).then_some(key.clone()))
.collect::<Vec<_>>();
for key in keys {
if let Some(entry) = entries.pop(&key) {
Self::delete_blob_file(&entry.path);
}
}
self
.workspace_dirs
.lock()
.expect("workspace cache lock poisoned")
.remove(universal_id);
}
fn cache_key(universal_id: &str, key: &str) -> String {
format!("{universal_id}\u{1f}{key}")
}
#[cfg(target_os = "android")]
fn system_cache_dir(database_path: &str, universal_id: &str) -> PathBuf {
// Android DB lives in "<app>/files/..."; cache should live in "<app>/cache/...".
let mut current = Path::new(database_path).parent();
while let Some(path) = current {
if path.file_name().and_then(|n| n.to_str()) == Some("files") {
if let Some(app_root) = path.parent() {
return app_root
.join("cache")
.join(MOBILE_BLOB_CACHE_DIR)
.join(Self::workspace_bucket(universal_id));
}
}
current = path.parent();
}
Self::fallback_temp_cache_dir(universal_id)
}
#[cfg(target_os = "ios")]
fn system_cache_dir(database_path: &str, universal_id: &str) -> PathBuf {
// iOS DB lives in ".../Documents/..."; cache should live in ".../Library/Caches/...".
let mut current = Path::new(database_path).parent();
while let Some(path) = current {
if path.file_name().and_then(|n| n.to_str()) == Some("Documents") {
if let Some(container_root) = path.parent() {
return container_root
.join("Library")
.join("Caches")
.join(MOBILE_BLOB_CACHE_DIR)
.join(Self::workspace_bucket(universal_id));
}
}
current = path.parent();
}
Self::fallback_temp_cache_dir(universal_id)
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
fn system_cache_dir(_database_path: &str, universal_id: &str) -> PathBuf {
Self::fallback_temp_cache_dir(universal_id)
}
fn fallback_temp_cache_dir(universal_id: &str) -> PathBuf {
std::env::temp_dir()
.join(MOBILE_BLOB_CACHE_DIR)
.join(Self::workspace_bucket(universal_id))
}
fn workspace_bucket(universal_id: &str) -> String {
let mut hasher = DefaultHasher::new();
universal_id.hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
fn resolve_cache_dir(&self, universal_id: &str) -> PathBuf {
let mut workspace_dirs = self.workspace_dirs.lock().expect("workspace cache lock poisoned");
workspace_dirs
.entry(universal_id.to_string())
.or_insert_with(|| Self::system_cache_dir(universal_id))
.clone()
}
fn blob_file_path(cache_dir: &Path, cache_key: &str) -> PathBuf {
let mut hasher = DefaultHasher::new();
cache_key.hash(&mut hasher);
cache_dir.join(format!("{:016x}.blob", hasher.finish()))
}
fn delete_blob_file(path: &str) {
let _ = std::fs::remove_file(path);
}
fn cleanup_cache_dir(cache_dir: &Path) -> std::io::Result<()> {
for entry in std::fs::read_dir(cache_dir)? {
let entry = entry?;
if entry.path().is_file() {
let _ = std::fs::remove_file(entry.path());
}
}
Ok(())
}
}