feat(native): sync yocto codes (#14243)

#### PR Dependency Tree


* **PR #14243** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Batch management API for coordinated document mutations and change
tracking.
* New document accessors (IDs, state snapshots, change/delete set
queries) and subscriber count.

* **Chores**
  * Upgraded Rust edition across packages to 2024.
  * Repository-wide formatting, stylistic cleanups and test adjustments.

* **Breaking Changes**
* Removed the Node native bindings package and its JS/TS declarations
and tests (no longer published/available).

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-01-11 06:08:33 +08:00
committed by GitHub
parent d515d295ce
commit ca2462f987
143 changed files with 1396 additions and 4841 deletions

View File

@@ -1,6 +1,6 @@
use std::ops::Deref;
use super::{error::Result, storage::SqliteDocStorage, Blob, ListedBlob, SetBlob};
use super::{Blob, ListedBlob, SetBlob, error::Result, storage::SqliteDocStorage};
impl SqliteDocStorage {
pub async fn get_blob(&self, key: String) -> Result<Option<Blob>> {
@@ -60,8 +60,7 @@ impl SqliteDocStorage {
pub async fn list_blobs(&self) -> Result<Vec<ListedBlob>> {
let result = sqlx::query_as!(
ListedBlob,
"SELECT key, size, mime, created_at FROM blobs WHERE deleted_at IS NULL ORDER BY created_at \
DESC;"
"SELECT key, size, mime, created_at FROM blobs WHERE deleted_at IS NULL ORDER BY created_at DESC;"
)
.fetch_all(&self.pool)
.await?;
@@ -102,18 +101,12 @@ mod tests {
assert!(result.is_some());
storage
.delete_blob("test_".to_string(), false)
.await
.unwrap();
storage.delete_blob("test_".to_string(), false).await.unwrap();
let result = storage.get_blob("test".to_string()).await.unwrap();
assert!(result.is_none());
storage
.delete_blob("test_2".to_string(), true)
.await
.unwrap();
storage.delete_blob("test_2".to_string(), true).await.unwrap();
let result = storage.get_blob("test".to_string()).await.unwrap();
assert!(result.is_none());
@@ -146,15 +139,9 @@ mod tests {
vec!["test_1", "test_2", "test_3", "test_4"]
);
storage
.delete_blob("test_2".to_string(), false)
.await
.unwrap();
storage.delete_blob("test_2".to_string(), false).await.unwrap();
storage
.delete_blob("test_3".to_string(), true)
.await
.unwrap();
storage.delete_blob("test_3".to_string(), true).await.unwrap();
let query = sqlx::query("SELECT COUNT(*) as len FROM blobs;")
.fetch_one(&storage.pool)
@@ -186,10 +173,7 @@ mod tests {
.unwrap();
}
storage
.delete_blob("test_2".to_string(), false)
.await
.unwrap();
storage.delete_blob("test_2".to_string(), false).await.unwrap();
storage.release_blobs().await.unwrap();
let query = sqlx::query("SELECT COUNT(*) as len FROM blobs;")

View File

@@ -25,11 +25,7 @@ impl SqliteDocStorage {
Ok(())
}
pub async fn get_blob_uploaded_at(
&self,
peer: String,
blob_id: String,
) -> Result<Option<NaiveDateTime>> {
pub async fn get_blob_uploaded_at(&self, peer: String, blob_id: String) -> Result<Option<NaiveDateTime>> {
let result = sqlx::query_scalar!(
"SELECT uploaded_at FROM peer_blob_sync WHERE peer = ? AND blob_id = ?",
peer,

View File

@@ -3,7 +3,7 @@ use std::ops::Deref;
use chrono::{DateTime, NaiveDateTime};
use sqlx::{QueryBuilder, Row};
use super::{error::Result, storage::SqliteDocStorage, DocClock, DocRecord, DocUpdate};
use super::{DocClock, DocRecord, DocUpdate, error::Result, storage::SqliteDocStorage};
struct Meta {
space_id: String,
@@ -65,11 +65,7 @@ impl SqliteDocStorage {
Ok(())
}
pub async fn push_update<Update: AsRef<[u8]>>(
&self,
doc_id: String,
update: Update,
) -> Result<NaiveDateTime> {
pub async fn push_update<Update: AsRef<[u8]>>(&self, doc_id: String, update: Update) -> Result<NaiveDateTime> {
let mut timestamp = DateTime::from_timestamp_millis(chrono::Utc::now().timestamp_millis())
.unwrap()
.naive_utc();
@@ -171,11 +167,7 @@ impl SqliteDocStorage {
Ok(result)
}
pub async fn mark_updates_merged(
&self,
doc_id: String,
updates: Vec<NaiveDateTime>,
) -> Result<u32> {
pub async fn mark_updates_merged(&self, doc_id: String, updates: Vec<NaiveDateTime>) -> Result<u32> {
let mut qb = QueryBuilder::new("DELETE FROM updates");
qb.push(" WHERE doc_id = ");
@@ -297,10 +289,7 @@ mod tests {
let storage = get_storage().await;
storage.set_space_id("test".to_string()).await.unwrap();
storage
.push_update("test".to_string(), vec![0, 0])
.await
.unwrap();
storage.push_update("test".to_string(), vec![0, 0]).await.unwrap();
storage
.set_doc_snapshot(DocRecord {
doc_id: "test".to_string(),
@@ -311,11 +300,7 @@ mod tests {
.unwrap();
storage
.set_peer_pulled_remote_clock(
"remote".to_string(),
"test".to_string(),
Utc::now().naive_utc(),
)
.set_peer_pulled_remote_clock("remote".to_string(), "test".to_string(), Utc::now().naive_utc())
.await
.unwrap();
@@ -344,10 +329,7 @@ mod tests {
assert_eq!(updates.len(), 1);
let snapshot = storage
.get_doc_snapshot("new_id".to_string())
.await
.unwrap();
let snapshot = storage.get_doc_snapshot("new_id".to_string()).await.unwrap();
assert!(snapshot.is_some());
}
@@ -359,19 +341,13 @@ mod tests {
let updates = vec![vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]];
for update in updates.iter() {
storage
.push_update("test".to_string(), update)
.await
.unwrap();
storage.push_update("test".to_string(), update).await.unwrap();
}
let result = storage.get_doc_updates("test".to_string()).await.unwrap();
assert_eq!(result.len(), 4);
assert_eq!(
result.iter().map(|u| u.bin.to_vec()).collect::<Vec<_>>(),
updates
);
assert_eq!(result.iter().map(|u| u.bin.to_vec()).collect::<Vec<_>>(), updates);
}
#[tokio::test]
@@ -439,10 +415,7 @@ mod tests {
assert_eq!(clocks.len(), 0);
for i in 1..5u32 {
storage
.push_update(format!("test_{i}"), vec![0, 0])
.await
.unwrap();
storage.push_update(format!("test_{i}"), vec![0, 0]).await.unwrap();
}
let clocks = storage.get_doc_clocks(None).await.unwrap();
@@ -453,10 +426,7 @@ mod tests {
vec!["test_1", "test_2", "test_3", "test_4"]
);
let clocks = storage
.get_doc_clocks(Some(Utc::now().naive_utc()))
.await
.unwrap();
let clocks = storage.get_doc_clocks(Some(Utc::now().naive_utc())).await.unwrap();
assert_eq!(clocks.len(), 0);
@@ -473,10 +443,7 @@ mod tests {
let updates = [vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]];
for update in updates.iter() {
storage
.push_update("test".to_string(), update)
.await
.unwrap();
storage.push_update("test".to_string(), update).await.unwrap();
}
let updates = storage.get_doc_updates("test".to_string()).await.unwrap();
@@ -484,11 +451,7 @@ mod tests {
let result = storage
.mark_updates_merged(
"test".to_string(),
updates
.iter()
.skip(1)
.map(|u| u.timestamp)
.collect::<Vec<_>>(),
updates.iter().skip(1).map(|u| u.timestamp).collect::<Vec<_>>(),
)
.await
.unwrap();

View File

@@ -1,6 +1,6 @@
use chrono::NaiveDateTime;
use super::{error::Result, storage::SqliteDocStorage, DocClock};
use super::{DocClock, error::Result, storage::SqliteDocStorage};
impl SqliteDocStorage {
pub async fn get_peer_remote_clocks(&self, peer: String) -> Result<Vec<DocClock>> {
@@ -15,11 +15,7 @@ impl SqliteDocStorage {
Ok(result)
}
pub async fn get_peer_remote_clock(
&self,
peer: String,
doc_id: String,
) -> Result<Option<DocClock>> {
pub async fn get_peer_remote_clock(&self, peer: String, doc_id: String) -> Result<Option<DocClock>> {
let result = sqlx::query_as!(
DocClock,
"SELECT doc_id, remote_clock as timestamp FROM peer_clocks WHERE peer = ? AND doc_id = ?",
@@ -32,12 +28,7 @@ impl SqliteDocStorage {
Ok(result)
}
pub async fn set_peer_remote_clock(
&self,
peer: String,
doc_id: String,
clock: NaiveDateTime,
) -> Result<()> {
pub async fn set_peer_remote_clock(&self, peer: String, doc_id: String, clock: NaiveDateTime) -> Result<()> {
sqlx::query(
r#"
INSERT INTO peer_clocks (peer, doc_id, remote_clock)
@@ -66,11 +57,7 @@ impl SqliteDocStorage {
Ok(result)
}
pub async fn get_peer_pulled_remote_clock(
&self,
peer: String,
doc_id: String,
) -> Result<Option<DocClock>> {
pub async fn get_peer_pulled_remote_clock(&self, peer: String, doc_id: String) -> Result<Option<DocClock>> {
let result = sqlx::query_as!(
DocClock,
r#"SELECT doc_id, pulled_remote_clock as timestamp FROM peer_clocks WHERE peer = ? AND doc_id = ?"#,
@@ -83,12 +70,7 @@ impl SqliteDocStorage {
Ok(result)
}
pub async fn set_peer_pulled_remote_clock(
&self,
peer: String,
doc_id: String,
clock: NaiveDateTime,
) -> Result<()> {
pub async fn set_peer_pulled_remote_clock(&self, peer: String, doc_id: String, clock: NaiveDateTime) -> Result<()> {
sqlx::query(
r#"
INSERT INTO peer_clocks (peer, doc_id, pulled_remote_clock)
@@ -117,11 +99,7 @@ impl SqliteDocStorage {
Ok(result)
}
pub async fn get_peer_pushed_clock(
&self,
peer: String,
doc_id: String,
) -> Result<Option<DocClock>> {
pub async fn get_peer_pushed_clock(&self, peer: String, doc_id: String) -> Result<Option<DocClock>> {
let result = sqlx::query_as!(
DocClock,
"SELECT doc_id, pushed_clock as timestamp FROM peer_clocks WHERE peer = ? AND doc_id = ?",
@@ -134,12 +112,7 @@ impl SqliteDocStorage {
Ok(result)
}
pub async fn set_peer_pushed_clock(
&self,
peer: String,
doc_id: String,
clock: NaiveDateTime,
) -> Result<()> {
pub async fn set_peer_pushed_clock(&self, peer: String, doc_id: String, clock: NaiveDateTime) -> Result<()> {
sqlx::query(
r#"
INSERT INTO peer_clocks (peer, doc_id, pushed_clock)
@@ -157,9 +130,7 @@ impl SqliteDocStorage {
}
pub async fn clear_clocks(&self) -> Result<()> {
sqlx::query("DELETE FROM peer_clocks;")
.execute(&self.pool)
.await?;
sqlx::query("DELETE FROM peer_clocks;").execute(&self.pool).await?;
Ok(())
}

View File

@@ -1,4 +1,4 @@
use affine_common::doc_parser::{parse_doc_from_binary, BlockInfo, CrawlResult, ParseError};
use affine_common::doc_parser::{BlockInfo, CrawlResult, ParseError, parse_doc_from_binary};
use memory_indexer::{SearchHit, SnapshotData};
use napi_derive::napi;
use serde::Serialize;
@@ -94,10 +94,7 @@ impl From<(u32, u32)> for NativeMatch {
impl SqliteDocStorage {
pub async fn crawl_doc_data(&self, doc_id: &str) -> Result<NativeCrawlResult> {
let doc_bin = self
.load_doc_binary(doc_id)
.await?
.ok_or(ParseError::DocNotFound)?;
let doc_bin = self.load_doc_binary(doc_id).await?.ok_or(ParseError::DocNotFound)?;
let result = parse_doc_from_binary(doc_bin, doc_id.to_string())?;
Ok(result.into())
@@ -113,8 +110,7 @@ impl SqliteDocStorage {
updates.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
let mut segments =
Vec::with_capacity(snapshot.as_ref().map(|_| 1).unwrap_or(0) + updates.len());
let mut segments = Vec::with_capacity(snapshot.as_ref().map(|_| 1).unwrap_or(0) + updates.len());
if let Some(record) = snapshot {
segments.push(record.bin.to_vec());
}
@@ -134,12 +130,10 @@ impl SqliteDocStorage {
for row in snapshots {
let index_name: String = row.get("index_name");
let data: Vec<u8> = row.get("data");
if let Ok(decompressed) = zstd::stream::decode_all(std::io::Cursor::new(&data)) {
if let Ok((snapshot, _)) =
bincode::serde::decode_from_slice::<SnapshotData, _>(&decompressed, config)
{
index.load_snapshot(&index_name, snapshot);
}
if let Ok(decompressed) = zstd::stream::decode_all(std::io::Cursor::new(&data))
&& let Ok((snapshot, _)) = bincode::serde::decode_from_slice::<SnapshotData, _>(&decompressed, config)
{
index.load_snapshot(&index_name, snapshot);
}
}
}
@@ -156,8 +150,8 @@ impl SqliteDocStorage {
if let Some(data) = snapshot_data {
let blob = bincode::serde::encode_to_vec(&data, bincode::config::standard())
.map_err(|e| Error::Serialization(e.to_string()))?;
let compressed = zstd::stream::encode_all(std::io::Cursor::new(&blob), 4)
.map_err(|e| Error::Serialization(e.to_string()))?;
let compressed =
zstd::stream::encode_all(std::io::Cursor::new(&blob), 4).map_err(|e| Error::Serialization(e.to_string()))?;
let mut tx = self.pool.begin().await?;
@@ -201,13 +195,7 @@ impl SqliteDocStorage {
memory_indexer::InMemoryIndex::snapshot_version()
}
pub async fn fts_add(
&self,
index_name: &str,
doc_id: &str,
text: &str,
index: bool,
) -> Result<()> {
pub async fn fts_add(&self, index_name: &str, doc_id: &str, text: &str, index: bool) -> Result<()> {
let mut idx = self.index.write().await;
idx.add_doc(index_name, doc_id, text, index);
Ok(())
@@ -226,21 +214,10 @@ impl SqliteDocStorage {
pub async fn fts_search(&self, index_name: &str, query: &str) -> Result<Vec<NativeSearchHit>> {
let idx = self.index.read().await;
Ok(
idx
.search_hits(index_name, query)
.into_iter()
.map(Into::into)
.collect(),
)
Ok(idx.search_hits(index_name, query).into_iter().map(Into::into).collect())
}
pub async fn fts_get_matches(
&self,
index_name: &str,
doc_id: &str,
query: &str,
) -> Result<Vec<NativeMatch>> {
pub async fn fts_get_matches(&self, index_name: &str, doc_id: &str, query: &str) -> Result<Vec<NativeMatch>> {
let idx = self.index.read().await;
Ok(
idx

View File

@@ -1,6 +1,6 @@
use chrono::NaiveDateTime;
use super::{error::Result, storage::SqliteDocStorage, DocIndexedClock};
use super::{DocIndexedClock, error::Result, storage::SqliteDocStorage};
impl SqliteDocStorage {
pub async fn get_doc_indexed_clock(&self, doc_id: String) -> Result<Option<DocIndexedClock>> {
@@ -70,10 +70,7 @@ mod tests {
let storage = get_storage().await;
let ts = Utc::now().naive_utc();
storage
.set_doc_indexed_clock("doc1".to_string(), ts, 1)
.await
.unwrap();
storage.set_doc_indexed_clock("doc1".to_string(), ts, 1).await.unwrap();
let clock = storage
.get_doc_indexed_clock("doc1".to_string())
@@ -91,20 +88,11 @@ mod tests {
let storage = get_storage().await;
let ts = Utc::now().naive_utc();
storage
.set_doc_indexed_clock("doc1".to_string(), ts, 1)
.await
.unwrap();
storage.set_doc_indexed_clock("doc1".to_string(), ts, 1).await.unwrap();
storage
.clear_doc_indexed_clock("doc1".to_string())
.await
.unwrap();
storage.clear_doc_indexed_clock("doc1".to_string()).await.unwrap();
let clock = storage
.get_doc_indexed_clock("doc1".to_string())
.await
.unwrap();
let clock = storage.get_doc_indexed_clock("doc1".to_string()).await.unwrap();
assert!(clock.is_none());
}

View File

@@ -128,16 +128,8 @@ impl DocStoragePool {
}
#[napi]
pub async fn crawl_doc_data(
&self,
universal_id: String,
doc_id: String,
) -> Result<indexer::NativeCrawlResult> {
let result = self
.get(universal_id)
.await?
.crawl_doc_data(&doc_id)
.await?;
pub async fn crawl_doc_data(&self, universal_id: String, doc_id: String) -> Result<indexer::NativeCrawlResult> {
let result = self.get(universal_id).await?.crawl_doc_data(&doc_id).await?;
Ok(result)
}
@@ -148,60 +140,23 @@ impl DocStoragePool {
}
#[napi]
pub async fn push_update(
&self,
universal_id: String,
doc_id: String,
update: Uint8Array,
) -> Result<NaiveDateTime> {
Ok(
self
.get(universal_id)
.await?
.push_update(doc_id, update)
.await?,
)
pub async fn push_update(&self, universal_id: String, doc_id: String, update: Uint8Array) -> Result<NaiveDateTime> {
Ok(self.get(universal_id).await?.push_update(doc_id, update).await?)
}
#[napi]
pub async fn get_doc_snapshot(
&self,
universal_id: String,
doc_id: String,
) -> Result<Option<DocRecord>> {
Ok(
self
.get(universal_id)
.await?
.get_doc_snapshot(doc_id)
.await?,
)
pub async fn get_doc_snapshot(&self, universal_id: String, doc_id: String) -> Result<Option<DocRecord>> {
Ok(self.get(universal_id).await?.get_doc_snapshot(doc_id).await?)
}
#[napi]
pub async fn set_doc_snapshot(&self, universal_id: String, snapshot: DocRecord) -> Result<bool> {
Ok(
self
.get(universal_id)
.await?
.set_doc_snapshot(snapshot)
.await?,
)
Ok(self.get(universal_id).await?.set_doc_snapshot(snapshot).await?)
}
#[napi]
pub async fn get_doc_updates(
&self,
universal_id: String,
doc_id: String,
) -> Result<Vec<DocUpdate>> {
Ok(
self
.get(universal_id)
.await?
.get_doc_updates(doc_id)
.await?,
)
pub async fn get_doc_updates(&self, universal_id: String, doc_id: String) -> Result<Vec<DocUpdate>> {
Ok(self.get(universal_id).await?.get_doc_updates(doc_id).await?)
}
#[napi]
@@ -227,36 +182,18 @@ impl DocStoragePool {
}
#[napi]
pub async fn get_doc_clocks(
&self,
universal_id: String,
after: Option<NaiveDateTime>,
) -> Result<Vec<DocClock>> {
pub async fn get_doc_clocks(&self, universal_id: String, after: Option<NaiveDateTime>) -> Result<Vec<DocClock>> {
Ok(self.get(universal_id).await?.get_doc_clocks(after).await?)
}
#[napi]
pub async fn get_doc_clock(
&self,
universal_id: String,
doc_id: String,
) -> Result<Option<DocClock>> {
pub async fn get_doc_clock(&self, universal_id: String, doc_id: String) -> Result<Option<DocClock>> {
Ok(self.get(universal_id).await?.get_doc_clock(doc_id).await?)
}
#[napi]
pub async fn get_doc_indexed_clock(
&self,
universal_id: String,
doc_id: String,
) -> Result<Option<DocIndexedClock>> {
Ok(
self
.get(universal_id)
.await?
.get_doc_indexed_clock(doc_id)
.await?,
)
pub async fn get_doc_indexed_clock(&self, universal_id: String, doc_id: String) -> Result<Option<DocIndexedClock>> {
Ok(self.get(universal_id).await?.get_doc_indexed_clock(doc_id).await?)
}
#[napi]
@@ -277,11 +214,7 @@ impl DocStoragePool {
#[napi]
pub async fn clear_doc_indexed_clock(&self, universal_id: String, doc_id: String) -> Result<()> {
self
.get(universal_id)
.await?
.clear_doc_indexed_clock(doc_id)
.await?;
self.get(universal_id).await?.clear_doc_indexed_clock(doc_id).await?;
Ok(())
}
@@ -297,17 +230,8 @@ impl DocStoragePool {
}
#[napi]
pub async fn delete_blob(
&self,
universal_id: String,
key: String,
permanently: bool,
) -> Result<()> {
self
.get(universal_id)
.await?
.delete_blob(key, permanently)
.await?;
pub async fn delete_blob(&self, universal_id: String, key: String, permanently: bool) -> Result<()> {
self.get(universal_id).await?.delete_blob(key, permanently).await?;
Ok(())
}
@@ -323,18 +247,8 @@ impl DocStoragePool {
}
#[napi]
pub async fn get_peer_remote_clocks(
&self,
universal_id: String,
peer: String,
) -> Result<Vec<DocClock>> {
Ok(
self
.get(universal_id)
.await?
.get_peer_remote_clocks(peer)
.await?,
)
pub async fn get_peer_remote_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
Ok(self.get(universal_id).await?.get_peer_remote_clocks(peer).await?)
}
#[napi]
@@ -370,11 +284,7 @@ impl DocStoragePool {
}
#[napi]
pub async fn get_peer_pulled_remote_clocks(
&self,
universal_id: String,
peer: String,
) -> Result<Vec<DocClock>> {
pub async fn get_peer_pulled_remote_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
Ok(
self
.get(universal_id)
@@ -417,18 +327,8 @@ impl DocStoragePool {
}
#[napi]
pub async fn get_peer_pushed_clocks(
&self,
universal_id: String,
peer: String,
) -> Result<Vec<DocClock>> {
Ok(
self
.get(universal_id)
.await?
.get_peer_pushed_clocks(peer)
.await?,
)
pub async fn get_peer_pushed_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
Ok(self.get(universal_id).await?.get_peer_pushed_clocks(peer).await?)
}
#[napi]
@@ -528,24 +428,14 @@ impl DocStoragePool {
}
#[napi]
pub async fn fts_delete_document(
&self,
id: String,
index_name: String,
doc_id: String,
) -> Result<()> {
pub async fn fts_delete_document(&self, id: String, index_name: String, doc_id: String) -> Result<()> {
let storage = self.pool.get(id).await?;
storage.fts_delete(&index_name, &doc_id).await?;
Ok(())
}
#[napi]
pub async fn fts_get_document(
&self,
id: String,
index_name: String,
doc_id: String,
) -> Result<Option<String>> {
pub async fn fts_get_document(&self, id: String, index_name: String, doc_id: String) -> Result<Option<String>> {
let storage = self.pool.get(id).await?;
Ok(storage.fts_get(&index_name, &doc_id).await?)
}
@@ -570,11 +460,7 @@ impl DocStoragePool {
query: String,
) -> Result<Vec<indexer::NativeMatch>> {
let storage = self.pool.get(id).await?;
Ok(
storage
.fts_get_matches(&index_name, &doc_id, &query)
.await?,
)
Ok(storage.fts_get_matches(&index_name, &doc_id, &query).await?)
}
}

View File

@@ -44,18 +44,12 @@ pub struct SqliteDocStoragePool {
}
impl SqliteDocStoragePool {
async fn get_or_create_storage<'a>(
&'a self,
universal_id: String,
path: &str,
) -> RefMut<'a, SqliteDocStorage> {
let lock = RwLockWriteGuard::map(self.inner.write().await, |lock| {
match lock.entry(universal_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let storage = SqliteDocStorage::new(path.to_string());
entry.insert(storage)
}
async fn get_or_create_storage<'a>(&'a self, universal_id: String, path: &str) -> RefMut<'a, SqliteDocStorage> {
let lock = RwLockWriteGuard::map(self.inner.write().await, |lock| match lock.entry(universal_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let storage = SqliteDocStorage::new(path.to_string());
entry.insert(storage)
}
});
@@ -79,9 +73,7 @@ impl SqliteDocStoragePool {
/// Initialize the database and run migrations.
pub async fn connect(&self, universal_id: String, path: String) -> Result<()> {
let storage = self
.get_or_create_storage(universal_id.to_owned(), &path)
.await;
let storage = self.get_or_create_storage(universal_id.to_owned(), &path).await;
storage.connect().await?;
Ok(())

View File

@@ -3,9 +3,9 @@ use std::sync::Arc;
use affine_schema::get_migrator;
use memory_indexer::InMemoryIndex;
use sqlx::{
Pool, Row,
migrate::MigrateDatabase,
sqlite::{Sqlite, SqliteConnectOptions, SqlitePoolOptions},
Pool, Row,
};
use tokio::sync::RwLock;
@@ -19,9 +19,7 @@ pub struct SqliteDocStorage {
impl SqliteDocStorage {
pub fn new(path: String) -> Self {
let sqlite_options = SqliteConnectOptions::new()
.filename(&path)
.foreign_keys(false);
let sqlite_options = SqliteConnectOptions::new().filename(&path).foreign_keys(false);
let mut pool_options = SqlitePoolOptions::new();
@@ -94,9 +92,7 @@ impl SqliteDocStorage {
/// Flush the WAL file to the database file.
/// See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B
pub async fn checkpoint(&self) -> Result<()> {
sqlx::query("PRAGMA wal_checkpoint(FULL);")
.execute(&self.pool)
.await?;
sqlx::query("PRAGMA wal_checkpoint(FULL);").execute(&self.pool).await?;
Ok(())
}