Compare commits

..

5 Commits

Author SHA1 Message Date
DarkSky
a0ec463ec2 chore: add more test 2026-02-20 01:59:21 +08:00
DarkSky
e9ea299ce9 chore: improve codes 2026-02-20 01:40:48 +08:00
DarkSky
849699e93f fix: cache path check 2026-02-20 00:58:04 +08:00
DarkSky
7a8db38891 chore: doc binary perf 2026-02-19 23:56:40 +08:00
DarkSky
ad52c46f0a feat: improve mobile oom 2026-02-19 22:51:57 +08:00
23 changed files with 1087 additions and 178 deletions

10
Cargo.lock generated
View File

@@ -111,6 +111,7 @@ dependencies = [
"base64-simd",
"chrono",
"homedir",
"lru",
"objc2",
"objc2-foundation",
"sqlx",
@@ -2572,6 +2573,15 @@ dependencies = [
"weezl",
]
[[package]]
name = "lru"
version = "0.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593"
dependencies = [
"hashbrown 0.16.1",
]
[[package]]
name = "mac"
version = "0.1.1"

View File

@@ -46,6 +46,7 @@ resolver = "3"
libc = "0.2"
log = "0.4"
loom = { version = "0.7", features = ["checkpoint"] }
lru = "0.16"
memory-indexer = "0.3.0"
mimalloc = "0.1"
mp4parse = "0.17"

View File

@@ -15,6 +15,7 @@
"@affine/core": "workspace:*",
"@affine/env": "workspace:*",
"@affine/i18n": "workspace:*",
"@affine/mobile-shared": "workspace:*",
"@affine/nbstore": "workspace:*",
"@affine/track": "workspace:*",
"@blocksuite/affine": "workspace:*",

View File

@@ -2,7 +2,7 @@ import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
export interface Blob {
key: string;
// base64 encoded data
// base64 encoded data, or "__AFFINE_BLOB_FILE__:<absolutePath>" for large blobs
data: string;
mime: string;
size: number;
@@ -41,12 +41,13 @@ export interface NbStorePlugin {
pushUpdate: (options: {
id: string;
docId: string;
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
data: string;
}) => Promise<{ timestamp: number }>;
getDocSnapshot: (options: { id: string; docId: string }) => Promise<
| {
docId: string;
// base64 encoded data
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
bin: string;
timestamp: number;
}
@@ -55,6 +56,7 @@ export interface NbStorePlugin {
setDocSnapshot: (options: {
id: string;
docId: string;
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
bin: string;
timestamp: number;
}) => Promise<{ success: boolean }>;
@@ -62,7 +64,7 @@ export interface NbStorePlugin {
updates: {
docId: string;
timestamp: number;
// base64 encoded data
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
bin: string;
}[];
}>;

View File

@@ -1,7 +1,9 @@
import { uint8ArrayToBase64 } from '@affine/core/modules/workspace-engine';
import {
base64ToUint8Array,
uint8ArrayToBase64,
} from '@affine/core/modules/workspace-engine';
decodePayload,
MOBILE_BLOB_FILE_PREFIX,
MOBILE_DOC_FILE_PREFIX,
} from '@affine/mobile-shared/nbstore/payload';
import {
type BlobRecord,
type CrawlResult,
@@ -45,13 +47,15 @@ export const NbStoreNativeDBApis: NativeDBApis = {
docId: string
): Promise<DocRecord | null> {
const snapshot = await NbStore.getDocSnapshot({ id, docId });
return snapshot
? {
bin: base64ToUint8Array(snapshot.bin),
docId: snapshot.docId,
timestamp: new Date(snapshot.timestamp),
}
: null;
if (!snapshot) {
return null;
}
return {
bin: await decodePayload(snapshot.bin, MOBILE_DOC_FILE_PREFIX),
docId: snapshot.docId,
timestamp: new Date(snapshot.timestamp),
};
},
setDocSnapshot: async function (
id: string,
@@ -70,11 +74,13 @@ export const NbStoreNativeDBApis: NativeDBApis = {
docId: string
): Promise<DocRecord[]> {
const { updates } = await NbStore.getDocUpdates({ id, docId });
return updates.map(update => ({
bin: base64ToUint8Array(update.bin),
docId: update.docId,
timestamp: new Date(update.timestamp),
}));
return Promise.all(
updates.map(async update => ({
bin: await decodePayload(update.bin, MOBILE_DOC_FILE_PREFIX),
docId: update.docId,
timestamp: new Date(update.timestamp),
}))
);
},
markUpdatesMerged: async function (
id: string,
@@ -132,14 +138,16 @@ export const NbStoreNativeDBApis: NativeDBApis = {
id,
key,
});
return record
? {
data: base64ToUint8Array(record.data),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
}
: null;
if (!record) {
return null;
}
return {
data: await decodePayload(record.data, MOBILE_BLOB_FILE_PREFIX),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
};
},
setBlob: async function (id: string, blob: BlobRecord): Promise<void> {
await NbStore.setBlob({

View File

@@ -11,6 +11,7 @@
{ "path": "../../core" },
{ "path": "../../../common/env" },
{ "path": "../../i18n" },
{ "path": "../mobile-shared" },
{ "path": "../../../common/nbstore" },
{ "path": "../../track" },
{ "path": "../../../../blocksuite/affine/all" },

View File

@@ -19,6 +19,7 @@
"@affine/env": "workspace:*",
"@affine/graphql": "workspace:*",
"@affine/i18n": "workspace:*",
"@affine/mobile-shared": "workspace:*",
"@affine/nbstore": "workspace:*",
"@affine/track": "workspace:*",
"@blocksuite/affine": "workspace:*",

View File

@@ -2,7 +2,7 @@ import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
export interface Blob {
key: string;
// base64 encoded data
// base64 encoded data, or "__AFFINE_BLOB_FILE__:<absolutePath>" for large blobs
data: string;
mime: string;
size: number;
@@ -41,12 +41,13 @@ export interface NbStorePlugin {
pushUpdate: (options: {
id: string;
docId: string;
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
data: string;
}) => Promise<{ timestamp: number }>;
getDocSnapshot: (options: { id: string; docId: string }) => Promise<
| {
docId: string;
// base64 encoded data
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
bin: string;
timestamp: number;
}
@@ -55,6 +56,7 @@ export interface NbStorePlugin {
setDocSnapshot: (options: {
id: string;
docId: string;
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
bin: string;
timestamp: number;
}) => Promise<{ success: boolean }>;
@@ -62,7 +64,7 @@ export interface NbStorePlugin {
updates: {
docId: string;
timestamp: number;
// base64 encoded data
// base64 encoded data, or "__AFFINE_DOC_FILE__:<absolutePath>" for large doc payloads
bin: string;
}[];
}>;

View File

@@ -1,7 +1,9 @@
import { uint8ArrayToBase64 } from '@affine/core/modules/workspace-engine';
import {
base64ToUint8Array,
uint8ArrayToBase64,
} from '@affine/core/modules/workspace-engine';
decodePayload,
MOBILE_BLOB_FILE_PREFIX,
MOBILE_DOC_FILE_PREFIX,
} from '@affine/mobile-shared/nbstore/payload';
import {
type BlobRecord,
type CrawlResult,
@@ -45,13 +47,15 @@ export const NbStoreNativeDBApis: NativeDBApis = {
docId: string
): Promise<DocRecord | null> {
const snapshot = await NbStore.getDocSnapshot({ id, docId });
return snapshot
? {
bin: base64ToUint8Array(snapshot.bin),
docId: snapshot.docId,
timestamp: new Date(snapshot.timestamp),
}
: null;
if (!snapshot) {
return null;
}
return {
bin: await decodePayload(snapshot.bin, MOBILE_DOC_FILE_PREFIX),
docId: snapshot.docId,
timestamp: new Date(snapshot.timestamp),
};
},
setDocSnapshot: async function (
id: string,
@@ -70,11 +74,13 @@ export const NbStoreNativeDBApis: NativeDBApis = {
docId: string
): Promise<DocRecord[]> {
const { updates } = await NbStore.getDocUpdates({ id, docId });
return updates.map(update => ({
bin: base64ToUint8Array(update.bin),
docId: update.docId,
timestamp: new Date(update.timestamp),
}));
return Promise.all(
updates.map(async update => ({
bin: await decodePayload(update.bin, MOBILE_DOC_FILE_PREFIX),
docId: update.docId,
timestamp: new Date(update.timestamp),
}))
);
},
markUpdatesMerged: async function (
id: string,
@@ -132,14 +138,16 @@ export const NbStoreNativeDBApis: NativeDBApis = {
id,
key,
});
return record
? {
data: base64ToUint8Array(record.data),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
}
: null;
if (!record) {
return null;
}
return {
data: await decodePayload(record.data, MOBILE_BLOB_FILE_PREFIX),
key: record.key,
mime: record.mime,
createdAt: new Date(record.createdAt),
};
},
setBlob: async function (id: string, blob: BlobRecord): Promise<void> {
await NbStore.setBlob({

View File

@@ -12,6 +12,7 @@
{ "path": "../../../common/env" },
{ "path": "../../../common/graphql" },
{ "path": "../../i18n" },
{ "path": "../mobile-shared" },
{ "path": "../../../common/nbstore" },
{ "path": "../../track" },
{ "path": "../../../../blocksuite/affine/all" },

View File

@@ -0,0 +1,18 @@
{
"name": "@affine/mobile-shared",
"version": "0.26.1",
"type": "module",
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./nbstore/payload": "./src/nbstore/payload.ts"
},
"dependencies": {
"@affine/core": "workspace:*",
"@capacitor/core": "^7.0.0"
},
"devDependencies": {
"typescript": "^5.7.2"
}
}

View File

@@ -0,0 +1 @@
export * from './nbstore/payload';

View File

@@ -0,0 +1,58 @@
import { base64ToUint8Array } from '@affine/core/modules/workspace-engine';
import { Capacitor } from '@capacitor/core';
export const MOBILE_BLOB_FILE_PREFIX = '__AFFINE_BLOB_FILE__:';
export const MOBILE_DOC_FILE_PREFIX = '__AFFINE_DOC_FILE__:';
const MOBILE_PAYLOAD_CACHE_PATH_PATTERN =
/\/nbstore-blob-cache\/[0-9a-f]{16}\/[0-9a-f]{16}\.(blob|docbin)$/;
function normalizeTokenFilePath(rawPath: string): string {
const trimmedPath = rawPath.trim();
if (!trimmedPath) {
throw new Error('Invalid mobile payload token: empty file path');
}
return trimmedPath.startsWith('file://')
? trimmedPath
: `file://${trimmedPath}`;
}
function assertMobileCachePath(fileUrl: string): void {
let pathname: string;
try {
pathname = decodeURIComponent(new URL(fileUrl).pathname);
} catch {
throw new Error('Invalid mobile payload token: malformed file URL');
}
if (
pathname.includes('/../') ||
pathname.includes('/./') ||
!MOBILE_PAYLOAD_CACHE_PATH_PATTERN.test(pathname)
) {
throw new Error(
`Refusing to read mobile payload outside cache dir: ${fileUrl}`
);
}
}
export async function decodePayload(
data: string,
prefix: string
): Promise<Uint8Array> {
if (!data.startsWith(prefix)) {
return base64ToUint8Array(data);
}
const normalizedPath = normalizeTokenFilePath(data.slice(prefix.length));
assertMobileCachePath(normalizedPath);
const response = await fetch(Capacitor.convertFileSrc(normalizedPath));
if (!response.ok) {
throw new Error(
`Failed to read mobile payload file: ${normalizedPath} (status ${response.status})`
);
}
return new Uint8Array(await response.arrayBuffer());
}

View File

@@ -0,0 +1,10 @@
{
"extends": "../../../../tsconfig.web.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./dist",
"tsBuildInfoFile": "./dist/tsconfig.tsbuildinfo"
},
"include": ["./src"],
"references": [{ "path": "../../core" }]
}

View File

@@ -38,5 +38,11 @@ objc2-foundation = { workspace = true, features = [
[target.'cfg(not(any(target_os = "ios", target_os = "macos")))'.dependencies]
homedir = { workspace = true }
[target.'cfg(any(target_os = "android", target_os = "ios"))'.dependencies]
lru = { workspace = true }
[build-dependencies]
uniffi = { workspace = true, features = ["build"] }
[dev-dependencies]
lru = { workspace = true }

View File

@@ -1,5 +1,10 @@
use affine_common::hashcash::Stamp;
use affine_nbstore::{Data, pool::SqliteDocStoragePool};
#[cfg(any(target_os = "android", target_os = "ios", test))]
#[cfg_attr(all(test, not(any(target_os = "android", target_os = "ios"))), allow(dead_code))]
pub(crate) mod mobile_blob_cache;
#[cfg(any(target_os = "android", target_os = "ios"))]
use mobile_blob_cache::{MOBILE_BLOB_INLINE_THRESHOLD_BYTES, MobileBlobCache, is_mobile_binary_file_token};
#[derive(uniffi::Error, thiserror::Error, Debug)]
pub enum UniffiError {
@@ -21,6 +26,12 @@ type Result<T> = std::result::Result<T, UniffiError>;
uniffi::setup_scaffolding!("affine_mobile_native");
fn decode_base64_data(data: &str) -> Result<Vec<u8>> {
base64_simd::STANDARD
.decode_to_vec(data)
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))
}
#[uniffi::export]
pub fn hashcash_mint(resource: String, bits: u32) -> String {
Stamp::mint(resource, Some(bits)).format()
@@ -29,7 +40,8 @@ pub fn hashcash_mint(resource: String, bits: u32) -> String {
#[derive(uniffi::Record)]
pub struct DocRecord {
pub doc_id: String,
// base64 encoded data
// base64 encoded data; on mobile large payloads this can be a file-path token
// prefixed with "__AFFINE_DOC_FILE__:"
pub bin: String,
pub timestamp: i64,
}
@@ -50,11 +62,7 @@ impl TryFrom<DocRecord> for affine_nbstore::DocRecord {
fn try_from(record: DocRecord) -> Result<Self> {
Ok(Self {
doc_id: record.doc_id,
bin: Into::<Data>::into(
base64_simd::STANDARD
.decode_to_vec(record.bin)
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
),
bin: Into::<Data>::into(decode_base64_data(&record.bin)?),
timestamp: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(record.timestamp)
.ok_or(UniffiError::TimestampDecodingError)?
.naive_utc(),
@@ -66,7 +74,8 @@ impl TryFrom<DocRecord> for affine_nbstore::DocRecord {
pub struct DocUpdate {
pub doc_id: String,
pub timestamp: i64,
// base64 encoded data
// base64 encoded data; on mobile large payloads this can be a file-path token
// prefixed with "__AFFINE_DOC_FILE__:"
pub bin: String,
}
@@ -89,11 +98,7 @@ impl TryFrom<DocUpdate> for affine_nbstore::DocUpdate {
timestamp: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(update.timestamp)
.ok_or(UniffiError::TimestampDecodingError)?
.naive_utc(),
bin: Into::<Data>::into(
base64_simd::STANDARD
.decode_to_vec(update.bin)
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
),
bin: Into::<Data>::into(decode_base64_data(&update.bin)?),
})
}
}
@@ -171,7 +176,8 @@ impl TryFrom<DocClock> for affine_nbstore::DocClock {
#[derive(uniffi::Record)]
pub struct Blob {
pub key: String,
// base64 encoded data
// base64 encoded data; on mobile large blobs this is a file-path token prefixed
// with "__AFFINE_BLOB_FILE__:"
pub data: String,
pub mime: String,
pub size: i64,
@@ -193,7 +199,7 @@ impl From<affine_nbstore::Blob> for Blob {
#[derive(uniffi::Record)]
pub struct SetBlob {
pub key: String,
// base64 encoded data
// base64 encoded data; mobile file-path tokens are also accepted
pub data: String,
pub mime: String,
}
@@ -204,11 +210,7 @@ impl TryFrom<SetBlob> for affine_nbstore::SetBlob {
fn try_from(blob: SetBlob) -> Result<Self> {
Ok(Self {
key: blob.key,
data: Into::<Data>::into(
base64_simd::STANDARD
.decode_to_vec(blob.data)
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
),
data: Into::<Data>::into(decode_base64_data(&blob.data)?),
mime: blob.mime,
})
}
@@ -314,23 +316,63 @@ impl From<affine_nbstore::indexer::NativeMatch> for MatchRange {
#[derive(uniffi::Object)]
pub struct DocStoragePool {
inner: SqliteDocStoragePool,
#[cfg(any(target_os = "android", target_os = "ios"))]
mobile_blob_cache: MobileBlobCache,
}
#[uniffi::export]
pub fn new_doc_storage_pool() -> DocStoragePool {
DocStoragePool {
inner: Default::default(),
#[cfg(any(target_os = "android", target_os = "ios"))]
mobile_blob_cache: MobileBlobCache::new(),
}
}
#[uniffi::export(async_runtime = "tokio")]
impl DocStoragePool {
fn decode_mobile_data(&self, universal_id: &str, data: &str) -> Result<Vec<u8>> {
#[cfg(any(target_os = "android", target_os = "ios"))]
if is_mobile_binary_file_token(data) {
return self
.mobile_blob_cache
.read_binary_file(universal_id, data)
.map_err(|err| UniffiError::Err(format!("Failed to read mobile file token: {err}")));
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
let _ = universal_id;
decode_base64_data(data)
}
fn encode_doc_data(&self, universal_id: &str, doc_id: &str, timestamp: i64, data: &[u8]) -> Result<String> {
#[cfg(any(target_os = "android", target_os = "ios"))]
if data.len() >= MOBILE_BLOB_INLINE_THRESHOLD_BYTES {
return self
.mobile_blob_cache
.cache_doc_bin(universal_id, doc_id, timestamp, data)
.map_err(|err| UniffiError::Err(format!("Failed to cache doc file: {err}")));
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
let _ = (universal_id, doc_id, timestamp);
Ok(base64_simd::STANDARD.encode_to_string(data))
}
/// Initialize the database and run migrations.
pub async fn connect(&self, universal_id: String, path: String) -> Result<()> {
Ok(self.inner.connect(universal_id, path).await?)
self.inner.connect(universal_id.clone(), path.clone()).await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self
.mobile_blob_cache
.register_workspace(&universal_id, &path)
.map_err(|err| UniffiError::Err(format!("Failed to initialize mobile blob cache: {err}")))?;
Ok(())
}
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_workspace(&universal_id);
self.inner.disconnect(universal_id).await?;
Ok(())
}
@@ -340,17 +382,13 @@ impl DocStoragePool {
}
pub async fn push_update(&self, universal_id: String, doc_id: String, update: String) -> Result<i64> {
let decoded_update = self.decode_mobile_data(&universal_id, &update)?;
Ok(
self
.inner
.get(universal_id)
.await?
.push_update(
doc_id,
base64_simd::STANDARD
.decode_to_vec(update)
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
)
.push_update(doc_id, decoded_update)
.await?
.and_utc()
.timestamp_millis(),
@@ -358,40 +396,54 @@ impl DocStoragePool {
}
pub async fn get_doc_snapshot(&self, universal_id: String, doc_id: String) -> Result<Option<DocRecord>> {
Ok(
self
.inner
.get(universal_id)
.await?
.get_doc_snapshot(doc_id)
.await?
.map(Into::into),
)
let Some(record) = self
.inner
.get(universal_id.clone())
.await?
.get_doc_snapshot(doc_id)
.await?
else {
return Ok(None);
};
let timestamp = record.timestamp.and_utc().timestamp_millis();
let bin = self.encode_doc_data(&universal_id, &record.doc_id, timestamp, &record.bin)?;
Ok(Some(DocRecord {
doc_id: record.doc_id,
bin,
timestamp,
}))
}
pub async fn set_doc_snapshot(&self, universal_id: String, snapshot: DocRecord) -> Result<bool> {
Ok(
self
.inner
.get(universal_id)
.await?
.set_doc_snapshot(snapshot.try_into()?)
.await?,
)
let doc_record = affine_nbstore::DocRecord {
doc_id: snapshot.doc_id,
bin: Into::<Data>::into(self.decode_mobile_data(&universal_id, &snapshot.bin)?),
timestamp: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(snapshot.timestamp)
.ok_or(UniffiError::TimestampDecodingError)?
.naive_utc(),
};
Ok(self.inner.get(universal_id).await?.set_doc_snapshot(doc_record).await?)
}
pub async fn get_doc_updates(&self, universal_id: String, doc_id: String) -> Result<Vec<DocUpdate>> {
Ok(
self
.inner
.get(universal_id)
.await?
.get_doc_updates(doc_id)
.await?
.into_iter()
.map(Into::into)
.collect(),
)
self
.inner
.get(universal_id.clone())
.await?
.get_doc_updates(doc_id)
.await?
.into_iter()
.map(|update| {
let timestamp = update.timestamp.and_utc().timestamp_millis();
let bin = self.encode_doc_data(&universal_id, &update.doc_id, timestamp, &update.bin)?;
Ok(DocUpdate {
doc_id: update.doc_id,
timestamp,
bin,
})
})
.collect()
}
pub async fn mark_updates_merged(&self, universal_id: String, doc_id: String, updates: Vec<i64>) -> Result<u32> {
@@ -454,26 +506,70 @@ impl DocStoragePool {
}
pub async fn get_blob(&self, universal_id: String, key: String) -> Result<Option<Blob>> {
Ok(self.inner.get(universal_id).await?.get_blob(key).await?.map(Into::into))
#[cfg(any(target_os = "android", target_os = "ios"))]
{
if let Some(blob) = self.mobile_blob_cache.get_blob(&universal_id, &key) {
return Ok(Some(blob));
}
let Some(blob) = self
.inner
.get(universal_id.clone())
.await?
.get_blob(key.clone())
.await?
else {
return Ok(None);
};
if blob.data.len() < MOBILE_BLOB_INLINE_THRESHOLD_BYTES {
return Ok(Some(blob.into()));
}
return self
.mobile_blob_cache
.cache_blob(&universal_id, &blob)
.map(Some)
.map_err(|err| UniffiError::Err(format!("Failed to cache blob file: {err}")));
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
{
Ok(self.inner.get(universal_id).await?.get_blob(key).await?.map(Into::into))
}
}
pub async fn set_blob(&self, universal_id: String, blob: SetBlob) -> Result<()> {
Ok(self.inner.get(universal_id).await?.set_blob(blob.try_into()?).await?)
#[cfg(any(target_os = "android", target_os = "ios"))]
let key = blob.key.clone();
let blob = affine_nbstore::SetBlob {
key: blob.key,
data: Into::<Data>::into(self.decode_mobile_data(&universal_id, &blob.data)?),
mime: blob.mime,
};
self.inner.get(universal_id.clone()).await?.set_blob(blob).await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_blob(&universal_id, &key);
Ok(())
}
pub async fn delete_blob(&self, universal_id: String, key: String, permanently: bool) -> Result<()> {
Ok(
self
.inner
.get(universal_id)
.await?
.delete_blob(key, permanently)
.await?,
)
self
.inner
.get(universal_id.clone())
.await?
.delete_blob(key.clone(), permanently)
.await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_blob(&universal_id, &key);
Ok(())
}
pub async fn release_blobs(&self, universal_id: String) -> Result<()> {
Ok(self.inner.get(universal_id).await?.release_blobs().await?)
self.inner.get(universal_id.clone()).await?.release_blobs().await?;
#[cfg(any(target_os = "android", target_os = "ios"))]
self.mobile_blob_cache.invalidate_workspace(&universal_id);
Ok(())
}
pub async fn list_blobs(&self, universal_id: String) -> Result<Vec<ListedBlob>> {

View File

@@ -0,0 +1,635 @@
use std::{
collections::{HashMap, hash_map::DefaultHasher},
hash::{Hash, Hasher},
num::NonZeroUsize,
path::{Path, PathBuf},
sync::Mutex,
};
use lru::LruCache;
pub(crate) const MOBILE_BLOB_INLINE_THRESHOLD_BYTES: usize = 1024 * 1024;
const MOBILE_BLOB_MAX_READ_BYTES: u64 = 64 * 1024 * 1024;
const MOBILE_BLOB_CACHE_CAPACITY: usize = 32;
const MOBILE_BLOB_CACHE_DIR: &str = "nbstore-blob-cache";
pub(crate) const MOBILE_BLOB_FILE_PREFIX: &str = "__AFFINE_BLOB_FILE__:";
pub(crate) const MOBILE_DOC_FILE_PREFIX: &str = "__AFFINE_DOC_FILE__:";
#[derive(Clone)]
struct MobileBlobCacheEntry {
key: String,
path: String,
mime: String,
size: i64,
created_at: i64,
}
impl MobileBlobCacheEntry {
fn to_blob(&self) -> crate::Blob {
crate::Blob {
key: self.key.clone(),
data: format!("{MOBILE_BLOB_FILE_PREFIX}{}", self.path),
mime: self.mime.clone(),
size: self.size,
created_at: self.created_at,
}
}
}
pub(crate) struct MobileBlobCache {
workspace_dirs: Mutex<HashMap<String, PathBuf>>,
blob_entries: Mutex<LruCache<String, MobileBlobCacheEntry>>,
doc_entries: Mutex<LruCache<String, String>>,
}
impl MobileBlobCache {
pub(crate) fn new() -> Self {
Self {
workspace_dirs: Mutex::new(HashMap::new()),
blob_entries: Mutex::new(LruCache::new(
NonZeroUsize::new(MOBILE_BLOB_CACHE_CAPACITY).expect("cache capacity is non-zero"),
)),
doc_entries: Mutex::new(LruCache::new(
NonZeroUsize::new(MOBILE_BLOB_CACHE_CAPACITY).expect("cache capacity is non-zero"),
)),
}
}
pub(crate) fn register_workspace(&self, universal_id: &str, database_path: &str) -> std::io::Result<()> {
let cache_dir = Self::system_cache_dir(database_path, universal_id);
std::fs::create_dir_all(&cache_dir)?;
Self::cleanup_cache_dir(&cache_dir)?;
self
.workspace_dirs
.lock()
.expect("workspace cache lock poisoned")
.insert(universal_id.to_string(), cache_dir);
Ok(())
}
pub(crate) fn get_blob(&self, universal_id: &str, key: &str) -> Option<crate::Blob> {
let cache_key = Self::cache_key(universal_id, key);
let mut entries = self.blob_entries.lock().expect("blob cache lock poisoned");
if let Some(entry) = entries.get(&cache_key) {
if Path::new(&entry.path).exists() {
return Some(entry.to_blob());
}
}
if let Some(entry) = entries.pop(&cache_key) {
Self::delete_blob_file(&entry.path);
}
None
}
pub(crate) fn cache_blob(&self, universal_id: &str, blob: &affine_nbstore::Blob) -> std::io::Result<crate::Blob> {
let cache_key = Self::cache_key(universal_id, &blob.key);
let cache_dir = self.ensure_cache_dir(universal_id)?;
let file_path = Self::blob_file_path(&cache_dir, &cache_key);
std::fs::write(&file_path, &blob.data)?;
let entry = MobileBlobCacheEntry {
key: blob.key.clone(),
path: file_path.to_string_lossy().into_owned(),
mime: blob.mime.clone(),
size: blob.size,
created_at: blob.created_at.and_utc().timestamp_millis(),
};
let mut entries = self.blob_entries.lock().expect("blob cache lock poisoned");
if let Some((_previous_key, previous)) = entries.push(cache_key, entry.clone()) {
if previous.path != entry.path {
Self::delete_blob_file(&previous.path);
}
}
Ok(entry.to_blob())
}
pub(crate) fn cache_doc_bin(
&self,
universal_id: &str,
doc_id: &str,
timestamp: i64,
data: &[u8],
) -> std::io::Result<String> {
let cache_key = Self::cache_key(universal_id, &format!("doc\u{1f}{doc_id}\u{1f}{timestamp}"));
let cache_dir = self.ensure_cache_dir(universal_id)?;
let file_path = Self::doc_file_path(&cache_dir, &cache_key);
std::fs::write(&file_path, data)?;
let path = file_path.to_string_lossy().into_owned();
let mut entries = self.doc_entries.lock().expect("doc cache lock poisoned");
if let Some((_previous_key, previous_path)) = entries.push(cache_key, path.clone())
&& previous_path != path
{
Self::delete_blob_file(&previous_path);
}
Ok(format!("{MOBILE_DOC_FILE_PREFIX}{path}"))
}
pub(crate) fn invalidate_blob(&self, universal_id: &str, key: &str) {
let cache_key = Self::cache_key(universal_id, key);
if let Some(entry) = self
.blob_entries
.lock()
.expect("blob cache lock poisoned")
.pop(&cache_key)
{
Self::delete_blob_file(&entry.path);
}
}
pub(crate) fn invalidate_workspace(&self, universal_id: &str) {
let prefix = format!("{universal_id}\u{1f}");
let mut blob_entries = self.blob_entries.lock().expect("blob cache lock poisoned");
let keys = blob_entries
.iter()
.filter_map(|(key, _)| key.starts_with(&prefix).then_some(key.clone()))
.collect::<Vec<_>>();
for key in keys {
if let Some(entry) = blob_entries.pop(&key) {
Self::delete_blob_file(&entry.path);
}
}
let mut doc_entries = self.doc_entries.lock().expect("doc cache lock poisoned");
let doc_keys = doc_entries
.iter()
.filter_map(|(key, _)| key.starts_with(&prefix).then_some(key.clone()))
.collect::<Vec<_>>();
for key in doc_keys {
if let Some(path) = doc_entries.pop(&key) {
Self::delete_blob_file(&path);
}
}
if let Some(cache_dir) = self
.workspace_dirs
.lock()
.expect("workspace cache lock poisoned")
.remove(universal_id)
{
let _ = std::fs::remove_dir_all(&cache_dir);
if let Some(parent) = cache_dir.parent() {
let _ = std::fs::remove_dir(parent);
}
}
}
fn cache_key(universal_id: &str, key: &str) -> String {
format!("{universal_id}\u{1f}{key}")
}
#[cfg(target_os = "android")]
fn system_cache_dir(database_path: &str, universal_id: &str) -> PathBuf {
// Android DB lives in "<app>/files/..."; cache should live in
// "<app>/cache/...".
let mut current = Path::new(database_path).parent();
while let Some(path) = current {
if path.file_name().and_then(|n| n.to_str()) == Some("files") {
if let Some(app_root) = path.parent() {
return app_root
.join("cache")
.join(MOBILE_BLOB_CACHE_DIR)
.join(Self::workspace_bucket(universal_id));
}
}
current = path.parent();
}
Self::fallback_temp_cache_dir(universal_id)
}
#[cfg(target_os = "ios")]
fn system_cache_dir(database_path: &str, universal_id: &str) -> PathBuf {
// iOS DB lives in ".../Documents/..."; cache should live in
// ".../Library/Caches/...".
let mut current = Path::new(database_path).parent();
while let Some(path) = current {
if path.file_name().and_then(|n| n.to_str()) == Some("Documents") {
if let Some(container_root) = path.parent() {
return container_root
.join("Library")
.join("Caches")
.join(MOBILE_BLOB_CACHE_DIR)
.join(Self::workspace_bucket(universal_id));
}
}
current = path.parent();
}
Self::fallback_temp_cache_dir(universal_id)
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
fn system_cache_dir(_database_path: &str, universal_id: &str) -> PathBuf {
Self::fallback_temp_cache_dir(universal_id)
}
fn fallback_temp_cache_dir(universal_id: &str) -> PathBuf {
std::env::temp_dir()
.join(MOBILE_BLOB_CACHE_DIR)
.join(Self::workspace_bucket(universal_id))
}
fn workspace_bucket(universal_id: &str) -> String {
let mut hasher = DefaultHasher::new();
universal_id.hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
fn resolve_cache_dir(&self, universal_id: &str) -> PathBuf {
let mut workspace_dirs = self.workspace_dirs.lock().expect("workspace cache lock poisoned");
workspace_dirs
.entry(universal_id.to_string())
.or_insert_with(|| Self::fallback_temp_cache_dir(universal_id))
.clone()
}
fn ensure_cache_dir(&self, universal_id: &str) -> std::io::Result<PathBuf> {
let cache_dir = self.resolve_cache_dir(universal_id);
std::fs::create_dir_all(&cache_dir)?;
Ok(cache_dir)
}
fn blob_file_path(cache_dir: &Path, cache_key: &str) -> PathBuf {
let mut hasher = DefaultHasher::new();
cache_key.hash(&mut hasher);
cache_dir.join(format!("{:016x}.blob", hasher.finish()))
}
fn doc_file_path(cache_dir: &Path, cache_key: &str) -> PathBuf {
let mut hasher = DefaultHasher::new();
cache_key.hash(&mut hasher);
cache_dir.join(format!("{:016x}.docbin", hasher.finish()))
}
fn delete_blob_file(path: &str) {
let _ = std::fs::remove_file(path);
}
fn cleanup_cache_dir(cache_dir: &Path) -> std::io::Result<()> {
for entry in std::fs::read_dir(cache_dir)? {
let entry = entry?;
if entry.path().is_file() {
let _ = std::fs::remove_file(entry.path());
}
}
Ok(())
}
}
pub(crate) fn is_mobile_binary_file_token(value: &str) -> bool {
value.starts_with(MOBILE_BLOB_FILE_PREFIX) || value.starts_with(MOBILE_DOC_FILE_PREFIX)
}
impl MobileBlobCache {
pub(crate) fn read_binary_file(&self, universal_id: &str, value: &str) -> std::io::Result<Vec<u8>> {
let path = value
.strip_prefix(MOBILE_BLOB_FILE_PREFIX)
.or_else(|| value.strip_prefix(MOBILE_DOC_FILE_PREFIX))
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid mobile file token"))?;
let path = path.strip_prefix("file://").unwrap_or(path);
let canonical = std::fs::canonicalize(path)?;
let workspace_dir = {
self
.workspace_dirs
.lock()
.expect("workspace cache lock poisoned")
.get(universal_id)
.cloned()
}
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "workspace cache directory not registered"))?;
let workspace_dir = std::fs::canonicalize(workspace_dir)?;
if !is_valid_mobile_cache_path(&canonical, &workspace_dir) {
return Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"mobile file token points outside the workspace cache directory",
));
}
let metadata = std::fs::metadata(&canonical)?;
if !metadata.is_file() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"mobile file token does not resolve to a file",
));
}
if metadata.len() > MOBILE_BLOB_MAX_READ_BYTES {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"mobile file token exceeds max size: {} > {}",
metadata.len(),
MOBILE_BLOB_MAX_READ_BYTES
),
));
}
std::fs::read(canonical)
}
}
fn is_valid_mobile_cache_path(path: &Path, workspace_dir: &Path) -> bool {
if !path.starts_with(workspace_dir) {
return false;
}
let Ok(relative) = path.strip_prefix(workspace_dir) else {
return false;
};
let mut components = relative.components();
let Some(std::path::Component::Normal(file_name)) = components.next() else {
return false;
};
if components.next().is_some() {
return false;
}
let Some(file_name) = file_name.to_str() else {
return false;
};
let Some((stem, extension)) = file_name.rsplit_once('.') else {
return false;
};
if extension != "blob" && extension != "docbin" {
return false;
}
stem.len() == 16 && stem.chars().all(|c| c.is_ascii_hexdigit())
}
#[cfg(test)]
mod tests {
#[cfg(unix)]
use std::os::unix::fs::{PermissionsExt, symlink};
use std::{
fs,
io::ErrorKind,
path::PathBuf,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
thread,
time::{SystemTime, UNIX_EPOCH},
};
use super::*;
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
fn unique_id(prefix: &str) -> String {
let counter = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before unix epoch")
.as_nanos();
format!("{prefix}-{now}-{counter}")
}
fn build_blob(key: &str, data: Vec<u8>) -> affine_nbstore::Blob {
affine_nbstore::Blob {
key: key.to_string(),
data: data.clone(),
mime: "application/octet-stream".to_string(),
size: data.len() as i64,
created_at: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(0)
.expect("valid timestamp")
.naive_utc(),
}
}
fn workspace_dir(cache: &MobileBlobCache, universal_id: &str) -> PathBuf {
cache
.workspace_dirs
.lock()
.expect("workspace cache lock poisoned")
.get(universal_id)
.cloned()
.expect("workspace should be registered")
}
fn token_path(token: &str) -> PathBuf {
token
.strip_prefix(MOBILE_BLOB_FILE_PREFIX)
.or_else(|| token.strip_prefix(MOBILE_DOC_FILE_PREFIX))
.map(PathBuf::from)
.expect("token should contain file path")
}
fn setup_cache(prefix: &str) -> (MobileBlobCache, String, PathBuf) {
let cache = MobileBlobCache::new();
let universal_id = unique_id(prefix);
let db_path = std::env::temp_dir()
.join("affine-mobile-cache-tests")
.join(unique_id("db"))
.join("workspace.sqlite");
if let Some(parent) = db_path.parent() {
fs::create_dir_all(parent).expect("create test db parent");
}
cache
.register_workspace(&universal_id, db_path.to_string_lossy().as_ref())
.expect("register workspace should succeed");
let workspace = workspace_dir(&cache, &universal_id);
(cache, universal_id, workspace)
}
#[test]
fn read_binary_file_rejects_path_traversal_and_malformed_name() {
let (cache, universal_id, workspace) = setup_cache("path-validation");
let outside_name = unique_id("outside");
let outside_dir = workspace
.parent()
.expect("workspace should have parent")
.join(&outside_name);
fs::create_dir_all(&outside_dir).expect("create outside dir");
let outside_file = outside_dir.join("1234567890abcdef.blob");
fs::write(&outside_file, b"outside-data").expect("write outside file");
let traversal = workspace.join(format!("../{outside_name}/1234567890abcdef.blob"));
let traversal_token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", traversal.display());
let traversal_err = cache
.read_binary_file(&universal_id, &traversal_token)
.expect_err("path traversal should be rejected");
assert_eq!(traversal_err.kind(), ErrorKind::PermissionDenied);
let malformed = workspace.join("invalid-name.blob");
fs::write(&malformed, b"bad").expect("write malformed file");
let malformed_token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", malformed.display());
let malformed_err = cache
.read_binary_file(&universal_id, &malformed_token)
.expect_err("malformed cache path should be rejected");
assert_eq!(malformed_err.kind(), ErrorKind::PermissionDenied);
cache.invalidate_workspace(&universal_id);
let _ = fs::remove_dir_all(outside_dir);
}
#[cfg(unix)]
#[test]
fn read_binary_file_rejects_symlink_escape() {
let (cache, universal_id, workspace) = setup_cache("symlink");
let outside_dir = workspace
.parent()
.expect("workspace should have parent")
.join(unique_id("symlink-outside"));
fs::create_dir_all(&outside_dir).expect("create outside dir");
let outside_file = outside_dir.join("1234567890abcdef.blob");
fs::write(&outside_file, b"outside-data").expect("write outside file");
let symlink_path = workspace.join("aaaaaaaaaaaaaaaa.blob");
symlink(&outside_file, &symlink_path).expect("create symlink");
let token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", symlink_path.display());
let err = cache
.read_binary_file(&universal_id, &token)
.expect_err("symlink escaping cache dir should be rejected");
assert_eq!(err.kind(), ErrorKind::PermissionDenied);
cache.invalidate_workspace(&universal_id);
let _ = fs::remove_dir_all(outside_dir);
}
#[test]
fn cache_blob_evicts_lru_entry_and_deletes_file() {
let (cache, universal_id, _workspace) = setup_cache("lru-eviction");
let mut first_path = None;
for i in 0..=MOBILE_BLOB_CACHE_CAPACITY {
let key = format!("blob-{i}");
let blob = build_blob(&key, vec![i as u8]);
let cached = cache.cache_blob(&universal_id, &blob).expect("cache blob");
if i == 0 {
first_path = Some(token_path(&cached.data));
}
}
let first_path = first_path.expect("first path should exist");
assert!(!first_path.exists(), "evicted blob file should be deleted");
assert!(cache.get_blob(&universal_id, "blob-0").is_none());
assert!(cache.get_blob(&universal_id, "blob-1").is_some());
cache.invalidate_workspace(&universal_id);
}
#[test]
fn invalidate_workspace_removes_cached_files_and_workspace_dir() {
let (cache, universal_id, workspace) = setup_cache("invalidate");
let cached_blob = cache
.cache_blob(&universal_id, &build_blob("blob", vec![1, 2, 3]))
.expect("cache blob");
let blob_path = token_path(&cached_blob.data);
let doc_token = cache
.cache_doc_bin(&universal_id, "doc", 123, b"doc-bytes")
.expect("cache doc bin");
let doc_path = token_path(&doc_token);
assert!(blob_path.exists());
assert!(doc_path.exists());
cache.invalidate_workspace(&universal_id);
assert!(!blob_path.exists());
assert!(!doc_path.exists());
assert!(!workspace.exists());
assert!(
!cache
.workspace_dirs
.lock()
.expect("workspace cache lock poisoned")
.contains_key(&universal_id)
);
}
#[test]
fn read_binary_file_returns_not_found_for_missing_file() {
let (cache, universal_id, _workspace) = setup_cache("missing-file");
let cached_blob = cache
.cache_blob(&universal_id, &build_blob("blob", vec![9, 8, 7]))
.expect("cache blob");
let path = token_path(&cached_blob.data);
fs::remove_file(&path).expect("remove cached file");
let err = cache
.read_binary_file(&universal_id, &cached_blob.data)
.expect_err("missing file should error");
assert_eq!(err.kind(), ErrorKind::NotFound);
cache.invalidate_workspace(&universal_id);
}
#[cfg(unix)]
#[test]
fn read_binary_file_returns_permission_denied_for_unreadable_file() {
let (cache, universal_id, workspace) = setup_cache("permissions");
let file_path = workspace.join("1234567890abcdef.blob");
fs::write(&file_path, b"secret").expect("write file");
let mut permissions = fs::metadata(&file_path).expect("read metadata").permissions();
permissions.set_mode(0o000);
fs::set_permissions(&file_path, permissions).expect("set restrictive permissions");
let token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", file_path.display());
let err = cache
.read_binary_file(&universal_id, &token)
.expect_err("unreadable file should error");
assert_eq!(err.kind(), ErrorKind::PermissionDenied);
let mut restore = fs::metadata(&file_path).expect("read metadata").permissions();
restore.set_mode(0o600);
let _ = fs::set_permissions(&file_path, restore);
cache.invalidate_workspace(&universal_id);
}
#[test]
fn concurrent_cache_and_read_is_consistent() {
let cache = Arc::new(MobileBlobCache::new());
let universal_id = Arc::new(unique_id("concurrent"));
cache
.register_workspace(universal_id.as_str(), ":memory:")
.expect("register workspace");
let workers = 8;
let iterations = 24;
let mut handles = Vec::with_capacity(workers);
for worker in 0..workers {
let cache = Arc::clone(&cache);
let universal_id = Arc::clone(&universal_id);
handles.push(thread::spawn(move || {
for i in 0..iterations {
let key = format!("blob-{worker}-{i}");
let data = vec![worker as u8, i as u8, 42];
let blob = build_blob(&key, data.clone());
let cached = cache
.cache_blob(universal_id.as_str(), &blob)
.expect("cache blob in worker");
let read_back = cache
.read_binary_file(universal_id.as_str(), &cached.data)
.expect("read cached blob");
assert_eq!(read_back, data);
assert!(cache.get_blob(universal_id.as_str(), &key).is_some());
}
}));
}
for handle in handles {
handle.join().expect("worker thread should succeed");
}
cache.invalidate_workspace(universal_id.as_str());
}
}

View File

@@ -106,7 +106,7 @@ impl DocStoragePool {
})
}
async fn get(&self, universal_id: String) -> Result<Ref<'_, SqliteDocStorage>> {
async fn get(&self, universal_id: String) -> Result<Ref<SqliteDocStorage>> {
Ok(self.pool.get(universal_id).await?)
}

View File

@@ -1,92 +1,121 @@
use core::ops::{Deref, DerefMut};
use std::collections::hash_map::{Entry, HashMap};
use core::ops::Deref;
use std::{
collections::hash_map::{Entry, HashMap},
sync::Arc,
};
use tokio::sync::{RwLock, RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard};
use tokio::sync::RwLock;
use super::{
error::{Error, Result},
storage::SqliteDocStorage,
};
pub struct Ref<'a, V> {
_guard: RwLockReadGuard<'a, V>,
pub struct Ref<V> {
inner: Arc<V>,
}
impl<V> Deref for Ref<'_, V> {
impl<V> Deref for Ref<V> {
type Target = V;
fn deref(&self) -> &Self::Target {
self._guard.deref()
}
}
pub struct RefMut<'a, V> {
_guard: RwLockMappedWriteGuard<'a, V>,
}
impl<V> Deref for RefMut<'_, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
&self._guard
}
}
impl<V> DerefMut for RefMut<'_, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self._guard
self.inner.deref()
}
}
#[derive(Default)]
pub struct SqliteDocStoragePool {
inner: RwLock<HashMap<String, SqliteDocStorage>>,
inner: RwLock<HashMap<String, StorageState>>,
}
enum StorageState {
Connecting(Arc<SqliteDocStorage>),
Connected(Arc<SqliteDocStorage>),
}
impl SqliteDocStoragePool {
async fn get_or_create_storage<'a>(&'a self, universal_id: String, path: &str) -> RefMut<'a, SqliteDocStorage> {
let lock = RwLockWriteGuard::map(self.inner.write().await, |lock| match lock.entry(universal_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let storage = SqliteDocStorage::new(path.to_string());
entry.insert(storage)
}
});
RefMut { _guard: lock }
}
pub async fn get(&self, universal_id: String) -> Result<Ref<'_, SqliteDocStorage>> {
let lock = RwLockReadGuard::try_map(self.inner.read().await, |lock| {
if let Some(storage) = lock.get(&universal_id) {
Some(storage)
} else {
None
}
});
match lock {
Ok(guard) => Ok(Ref { _guard: guard }),
Err(_) => Err(Error::InvalidOperation),
}
pub async fn get(&self, universal_id: String) -> Result<Ref<SqliteDocStorage>> {
let lock = self.inner.read().await;
let Some(state) = lock.get(&universal_id) else {
return Err(Error::InvalidOperation);
};
let StorageState::Connected(storage) = state else {
return Err(Error::InvalidOperation);
};
Ok(Ref {
inner: Arc::clone(storage),
})
}
/// Initialize the database and run migrations.
pub async fn connect(&self, universal_id: String, path: String) -> Result<()> {
let storage = self.get_or_create_storage(universal_id.to_owned(), &path).await;
let storage = {
let mut lock = self.inner.write().await;
match lock.entry(universal_id.clone()) {
Entry::Occupied(entry) => match entry.get() {
StorageState::Connected(_) => return Ok(()),
StorageState::Connecting(_) => return Err(Error::InvalidOperation),
},
Entry::Vacant(entry) => {
let storage = Arc::new(SqliteDocStorage::new(path));
entry.insert(StorageState::Connecting(Arc::clone(&storage)));
storage
}
}
};
storage.connect().await?;
Ok(())
}
if let Err(err) = storage.connect().await {
let mut lock = self.inner.write().await;
if matches!(
lock.get(&universal_id),
Some(StorageState::Connecting(existing)) if Arc::ptr_eq(existing, &storage)
) {
lock.remove(&universal_id);
}
return Err(err);
}
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
let mut lock = self.inner.write().await;
let mut transitioned = false;
{
let mut lock = self.inner.write().await;
if matches!(
lock.get(&universal_id),
Some(StorageState::Connecting(existing)) if Arc::ptr_eq(existing, &storage)
) {
lock.insert(universal_id, StorageState::Connected(Arc::clone(&storage)));
transitioned = true;
}
}
if let Entry::Occupied(entry) = lock.entry(universal_id) {
let storage = entry.remove();
if !transitioned {
storage.close().await;
return Err(Error::InvalidOperation);
}
Ok(())
}
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
let storage = {
let mut lock = self.inner.write().await;
match lock.get(&universal_id) {
None => return Ok(()),
Some(StorageState::Connecting(_)) => return Err(Error::InvalidOperation),
Some(StorageState::Connected(storage)) => {
// Prevent shutting down the shared storage while requests still hold refs.
if Arc::strong_count(storage) > 1 {
return Err(Error::InvalidOperation);
}
}
}
let Some(StorageState::Connected(storage)) = lock.remove(&universal_id) else {
return Err(Error::InvalidOperation);
};
storage
};
storage.close().await;
Ok(())
}
}

View File

@@ -41,7 +41,7 @@
"build": "napi build -p affine_native --platform --release",
"build:debug": "napi build -p affine_native --platform",
"universal": "napi universal",
"test": "ava",
"test": "ava --no-worker-threads --concurrency=2",
"version": "napi version"
},
"version": "0.26.1"

View File

@@ -1248,6 +1248,7 @@ export const PackageList = [
'packages/frontend/core',
'packages/common/env',
'packages/frontend/i18n',
'packages/frontend/apps/mobile-shared',
'packages/common/nbstore',
'packages/frontend/track',
'blocksuite/affine/all',
@@ -1290,6 +1291,7 @@ export const PackageList = [
'packages/common/env',
'packages/common/graphql',
'packages/frontend/i18n',
'packages/frontend/apps/mobile-shared',
'packages/common/nbstore',
'packages/frontend/track',
'blocksuite/affine/all',
@@ -1313,6 +1315,11 @@ export const PackageList = [
'packages/common/infra',
],
},
{
location: 'packages/frontend/apps/mobile-shared',
name: '@affine/mobile-shared',
workspaceDependencies: ['packages/frontend/core'],
},
{
location: 'packages/frontend/apps/web',
name: '@affine/web',
@@ -1593,6 +1600,7 @@ export type PackageName =
| '@affine/electron-renderer'
| '@affine/ios'
| '@affine/mobile'
| '@affine/mobile-shared'
| '@affine/web'
| '@affine/component'
| '@affine/core'

View File

@@ -139,6 +139,7 @@
{ "path": "./packages/frontend/apps/electron-renderer" },
{ "path": "./packages/frontend/apps/ios" },
{ "path": "./packages/frontend/apps/mobile" },
{ "path": "./packages/frontend/apps/mobile-shared" },
{ "path": "./packages/frontend/apps/web" },
{ "path": "./packages/frontend/component" },
{ "path": "./packages/frontend/core" },

View File

@@ -255,6 +255,7 @@ __metadata:
"@affine/core": "workspace:*"
"@affine/env": "workspace:*"
"@affine/i18n": "workspace:*"
"@affine/mobile-shared": "workspace:*"
"@affine/nbstore": "workspace:*"
"@affine/track": "workspace:*"
"@blocksuite/affine": "workspace:*"
@@ -703,6 +704,7 @@ __metadata:
"@affine/env": "workspace:*"
"@affine/graphql": "workspace:*"
"@affine/i18n": "workspace:*"
"@affine/mobile-shared": "workspace:*"
"@affine/native": "workspace:*"
"@affine/nbstore": "workspace:*"
"@affine/track": "workspace:*"
@@ -764,6 +766,16 @@ __metadata:
languageName: unknown
linkType: soft
"@affine/mobile-shared@workspace:*, @affine/mobile-shared@workspace:packages/frontend/apps/mobile-shared":
version: 0.0.0-use.local
resolution: "@affine/mobile-shared@workspace:packages/frontend/apps/mobile-shared"
dependencies:
"@affine/core": "workspace:*"
"@capacitor/core": "npm:^7.0.0"
typescript: "npm:^5.7.2"
languageName: unknown
linkType: soft
"@affine/mobile@workspace:packages/frontend/apps/mobile":
version: 0.0.0-use.local
resolution: "@affine/mobile@workspace:packages/frontend/apps/mobile"