feat: improve indexing perf with native indexer (#14066)

fix #12132, #14006, #13496, #12375, #12132 

The previous idb indexer generated a large number of scattered writes
when flushing to disk, which caused CPU and disk write spikes. If the
document volume is extremely large, the accumulation of write
transactions will cause memory usage to continuously increase.

This PR introduces batch writes to mitigate write performance on the web
side, and adds a native indexer on the Electron side to greatly improve
performance.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Full-text search (FTS) added across storage layers and native plugins:
indexing, search, document retrieval, match ranges, and index flushing.
* New SQLite-backed indexer storage, streaming search/aggregate APIs,
and in-memory index with node-building and highlighting.

* **Performance**
* Indexing rewritten for batched, concurrent writes and parallel
metadata updates.
* Search scoring enhanced to consider multiple term positions and
aggregated term data.

* **Other**
  * Configurable refresh interval and indexer version bump.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2025-12-09 22:04:50 +08:00
committed by GitHub
parent 90d0ca847a
commit 215541d331
31 changed files with 2030 additions and 321 deletions

View File

@@ -155,4 +155,33 @@ export interface NbStorePlugin {
id: string;
docId: string;
}) => Promise<CrawlResult>;
ftsAddDocument: (options: {
id: string;
indexName: string;
docId: string;
text: string;
index: boolean;
}) => Promise<void>;
ftsDeleteDocument: (options: {
id: string;
indexName: string;
docId: string;
}) => Promise<void>;
ftsSearch: (options: {
id: string;
indexName: string;
query: string;
}) => Promise<{ id: string; score: number }[]>;
ftsGetDocument: (options: {
id: string;
indexName: string;
docId: string;
}) => Promise<{ text: string | null }>;
ftsGetMatches: (options: {
id: string;
indexName: string;
docId: string;
query: string;
}) => Promise<Array<{ start: number; end: number }>>;
ftsFlushIndex: (options: { id: string }) => Promise<void>;
}

View File

@@ -339,4 +339,71 @@ export const NbStoreNativeDBApis: NativeDBApis = {
crawlDocData: async function (id: string, docId: string) {
return NbStore.crawlDocData({ id, docId });
},
ftsAddDocument: async function (
id: string,
indexName: string,
docId: string,
text: string,
index: boolean
): Promise<void> {
await NbStore.ftsAddDocument({
id,
indexName,
docId,
text,
index,
});
},
ftsDeleteDocument: async function (
id: string,
indexName: string,
docId: string
): Promise<void> {
await NbStore.ftsDeleteDocument({
id,
indexName,
docId,
});
},
ftsSearch: async function (
id: string,
indexName: string,
query: string
): Promise<{ id: string; score: number }[]> {
return await NbStore.ftsSearch({
id,
indexName,
query,
});
},
ftsGetDocument: async function (
id: string,
indexName: string,
docId: string
): Promise<string | null> {
const result = await NbStore.ftsGetDocument({
id,
indexName,
docId,
});
return result.text;
},
ftsGetMatches: async function (
id: string,
indexName: string,
docId: string,
query: string
): Promise<{ start: number; end: number }[]> {
return await NbStore.ftsGetMatches({
id,
indexName,
docId,
query,
});
},
ftsFlushIndex: async function (id: string): Promise<void> {
await NbStore.ftsFlushIndex({
id,
});
},
};

View File

@@ -48,4 +48,10 @@ export const nbstoreHandlers: NativeDBApis = {
setBlobUploadedAt: POOL.setBlobUploadedAt.bind(POOL),
getBlobUploadedAt: POOL.getBlobUploadedAt.bind(POOL),
crawlDocData: POOL.crawlDocData.bind(POOL),
ftsAddDocument: POOL.ftsAddDocument.bind(POOL),
ftsDeleteDocument: POOL.ftsDeleteDocument.bind(POOL),
ftsSearch: POOL.ftsSearch.bind(POOL),
ftsGetDocument: POOL.ftsGetDocument.bind(POOL),
ftsGetMatches: POOL.ftsGetMatches.bind(POOL),
ftsFlushIndex: POOL.ftsFlushIndex.bind(POOL),
};

View File

@@ -155,4 +155,33 @@ export interface NbStorePlugin {
id: string;
docId: string;
}) => Promise<CrawlResult>;
ftsAddDocument: (options: {
id: string;
indexName: string;
docId: string;
text: string;
index: boolean;
}) => Promise<void>;
ftsDeleteDocument: (options: {
id: string;
indexName: string;
docId: string;
}) => Promise<void>;
ftsSearch: (options: {
id: string;
indexName: string;
query: string;
}) => Promise<{ id: string; score: number }[]>;
ftsGetDocument: (options: {
id: string;
indexName: string;
docId: string;
}) => Promise<{ text: string | null }>;
ftsGetMatches: (options: {
id: string;
indexName: string;
docId: string;
query: string;
}) => Promise<Array<{ start: number; end: number }>>;
ftsFlushIndex: (options: { id: string }) => Promise<void>;
}

View File

@@ -343,4 +343,71 @@ export const NbStoreNativeDBApis: NativeDBApis = {
): Promise<CrawlResult> {
return await NbStore.crawlDocData({ id, docId });
},
ftsAddDocument: async function (
id: string,
indexName: string,
docId: string,
text: string,
index: boolean
): Promise<void> {
await NbStore.ftsAddDocument({
id,
indexName,
docId,
text,
index,
});
},
ftsDeleteDocument: async function (
id: string,
indexName: string,
docId: string
): Promise<void> {
await NbStore.ftsDeleteDocument({
id,
indexName,
docId,
});
},
ftsSearch: async function (
id: string,
indexName: string,
query: string
): Promise<{ id: string; score: number }[]> {
return await NbStore.ftsSearch({
id,
indexName,
query,
});
},
ftsGetDocument: async function (
id: string,
indexName: string,
docId: string
): Promise<string | null> {
const result = await NbStore.ftsGetDocument({
id,
indexName,
docId,
});
return result.text;
},
ftsGetMatches: async function (
id: string,
indexName: string,
docId: string,
query: string
): Promise<{ start: number; end: number }[]> {
return await NbStore.ftsGetMatches({
id,
indexName,
docId,
query,
});
},
ftsFlushIndex: async function (id: string): Promise<void> {
await NbStore.ftsFlushIndex({
id,
});
},
};

View File

@@ -10,6 +10,7 @@ import {
IndexedDBBlobSyncStorage,
IndexedDBDocStorage,
IndexedDBDocSyncStorage,
IndexedDBIndexerStorage,
} from '@affine/nbstore/idb';
import {
IndexedDBV1BlobStorage,
@@ -20,6 +21,7 @@ import {
SqliteBlobSyncStorage,
SqliteDocStorage,
SqliteDocSyncStorage,
SqliteIndexerStorage,
} from '@affine/nbstore/sqlite';
import {
SqliteV1BlobStorage,
@@ -107,6 +109,9 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS || BUILD_CONFIG.isAndroid
? SqliteBlobSyncStorage
: IndexedDBBlobSyncStorage;
IndexerStorageType = BUILD_CONFIG.isElectron
? SqliteIndexerStorage
: IndexedDBIndexerStorage;
async deleteWorkspace(id: string): Promise<void> {
setLocalWorkspaceIds(ids => ids.filter(x => x !== id));
@@ -351,7 +356,7 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
},
},
indexer: {
name: 'IndexedDBIndexerStorage',
name: this.IndexerStorageType.identifier,
opts: {
flavour: this.flavour,
type: 'workspace',

View File

@@ -82,6 +82,12 @@ export declare class DocStoragePool {
clearClocks(universalId: string): Promise<void>
setBlobUploadedAt(universalId: string, peer: string, blobId: string, uploadedAt?: Date | undefined | null): Promise<void>
getBlobUploadedAt(universalId: string, peer: string, blobId: string): Promise<Date | null>
ftsAddDocument(id: string, indexName: string, docId: string, text: string, index: boolean): Promise<void>
ftsFlushIndex(id: string): Promise<void>
ftsDeleteDocument(id: string, indexName: string, docId: string): Promise<void>
ftsGetDocument(id: string, indexName: string, docId: string): Promise<string | null>
ftsSearch(id: string, indexName: string, query: string): Promise<Array<NativeSearchHit>>
ftsGetMatches(id: string, indexName: string, docId: string, query: string): Promise<Array<NativeMatch>>
}
export interface Blob {
@@ -134,6 +140,16 @@ export interface NativeCrawlResult {
summary: string
}
export interface NativeMatch {
start: number
end: number
}
export interface NativeSearchHit {
id: string
score: number
}
export interface SetBlob {
key: string
data: Uint8Array

View File

@@ -13,9 +13,12 @@ use-as-lib = ["napi-derive/noop", "napi/noop"]
affine_common = { workspace = true, features = ["ydoc-loader"] }
affine_schema = { path = "../schema" }
anyhow = { workspace = true }
bincode = { version = "2.0.1", features = ["serde"] }
chrono = { workspace = true }
jieba-rs = "0.8.1"
napi = { workspace = true }
napi-derive = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sqlx = { workspace = true, default-features = false, features = [
"chrono",
@@ -26,8 +29,10 @@ sqlx = { workspace = true, default-features = false, features = [
"tls-rustls",
] }
thiserror = { workspace = true }
tiniestsegmenter = "0.3"
tokio = { workspace = true, features = ["full"] }
y-octo = { workspace = true }
zstd = "0.13"
[target.'cfg(any(target_os = "ios", target_os = "android"))'.dependencies]
uniffi = { workspace = true }

View File

@@ -10,6 +10,8 @@ pub enum Error {
MigrateError(#[from] sqlx::migrate::MigrateError),
#[error("Invalid operation")]
InvalidOperation,
#[error("Serialization Error: {0}")]
Serialization(String),
#[error(transparent)]
Parse(#[from] ParseError),
}

View File

@@ -1,180 +0,0 @@
use affine_common::doc_parser::{parse_doc_from_binary, BlockInfo, CrawlResult, ParseError};
use napi_derive::napi;
use serde::Serialize;
use y_octo::DocOptions;
use super::{error::Result, storage::SqliteDocStorage};
#[napi(object)]
#[derive(Debug, Serialize)]
pub struct NativeBlockInfo {
pub block_id: String,
pub flavour: String,
pub content: Option<Vec<String>>,
pub blob: Option<Vec<String>>,
pub ref_doc_id: Option<Vec<String>>,
pub ref_info: Option<Vec<String>>,
pub parent_flavour: Option<String>,
pub parent_block_id: Option<String>,
pub additional: Option<String>,
}
#[napi(object)]
#[derive(Debug, Serialize)]
pub struct NativeCrawlResult {
pub blocks: Vec<NativeBlockInfo>,
pub title: String,
pub summary: String,
}
impl From<BlockInfo> for NativeBlockInfo {
fn from(value: BlockInfo) -> Self {
Self {
block_id: value.block_id,
flavour: value.flavour,
content: value.content,
blob: value.blob,
ref_doc_id: value.ref_doc_id,
ref_info: value.ref_info,
parent_flavour: value.parent_flavour,
parent_block_id: value.parent_block_id,
additional: value.additional,
}
}
}
impl From<CrawlResult> for NativeCrawlResult {
fn from(value: CrawlResult) -> Self {
Self {
blocks: value.blocks.into_iter().map(Into::into).collect(),
title: value.title,
summary: value.summary,
}
}
}
impl SqliteDocStorage {
pub async fn crawl_doc_data(&self, doc_id: &str) -> Result<NativeCrawlResult> {
let doc_bin = self
.load_doc_binary(doc_id)
.await?
.ok_or(ParseError::DocNotFound)?;
let result = parse_doc_from_binary(doc_bin, doc_id.to_string())?;
Ok(result.into())
}
async fn load_doc_binary(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
let snapshot = self.get_doc_snapshot(doc_id.to_string()).await?;
let mut updates = self.get_doc_updates(doc_id.to_string()).await?;
if snapshot.is_none() && updates.is_empty() {
return Ok(None);
}
updates.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
let mut segments =
Vec::with_capacity(snapshot.as_ref().map(|_| 1).unwrap_or(0) + updates.len());
if let Some(record) = snapshot {
segments.push(record.bin.to_vec());
}
segments.extend(updates.into_iter().map(|update| update.bin.to_vec()));
merge_updates(segments, doc_id).map(Some)
}
}
fn merge_updates(mut segments: Vec<Vec<u8>>, guid: &str) -> Result<Vec<u8>> {
if segments.is_empty() {
return Err(ParseError::DocNotFound.into());
}
if segments.len() == 1 {
return segments.pop().ok_or(ParseError::DocNotFound.into());
}
let mut doc = DocOptions::new().with_guid(guid.to_string()).build();
for update in segments.iter() {
doc
.apply_update_from_binary_v1(update)
.map_err(|_| ParseError::InvalidBinary)?;
}
let buffer = doc
.encode_update_v1()
.map_err(|err| ParseError::ParserError(err.to_string()))?;
Ok(buffer)
}
#[cfg(test)]
mod tests {
use std::path::{Path, PathBuf};
use affine_common::doc_parser::ParseError;
use chrono::Utc;
use serde_json::Value;
use tokio::fs;
use uuid::Uuid;
use super::{super::error::Error, *};
const DEMO_BIN: &[u8] = include_bytes!("../../../../common/native/fixtures/demo.ydoc");
const DEMO_JSON: &[u8] = include_bytes!("../../../../common/native/fixtures/demo.ydoc.json");
fn temp_workspace_dir() -> PathBuf {
std::env::temp_dir().join(format!("affine-native-{}", Uuid::new_v4()))
}
async fn init_db(path: &Path) -> SqliteDocStorage {
fs::create_dir_all(path.parent().unwrap()).await.unwrap();
let storage = SqliteDocStorage::new(path.to_string_lossy().into_owned());
storage.connect().await.unwrap();
storage
}
async fn cleanup(path: &Path) {
let _ = fs::remove_dir_all(path.parent().unwrap()).await;
}
#[tokio::test]
async fn parse_demo_snapshot_matches_fixture() {
let base = temp_workspace_dir();
fs::create_dir_all(&base).await.unwrap();
let db_path = base.join("storage.db");
let storage = init_db(&db_path).await;
sqlx::query(r#"INSERT INTO snapshots (doc_id, data, updated_at) VALUES (?, ?, ?)"#)
.bind("demo-doc")
.bind(DEMO_BIN)
.bind(Utc::now().naive_utc())
.execute(&storage.pool)
.await
.unwrap();
let result = storage.crawl_doc_data("demo-doc").await.unwrap();
let expected: Value = serde_json::from_slice(DEMO_JSON).unwrap();
let actual = serde_json::to_value(&result).unwrap();
assert_eq!(expected, actual);
storage.close().await;
cleanup(&db_path).await;
}
#[tokio::test]
async fn missing_doc_returns_error() {
let base = temp_workspace_dir();
fs::create_dir_all(&base).await.unwrap();
let db_path = base.join("storage.db");
let storage = init_db(&db_path).await;
let err = storage.crawl_doc_data("absent-doc").await.unwrap_err();
assert!(matches!(err, Error::Parse(ParseError::DocNotFound)));
storage.close().await;
cleanup(&db_path).await;
}
}

View File

@@ -0,0 +1,261 @@
use std::collections::{HashMap, HashSet};
use super::{
tokenizer::tokenize,
types::{DocData, SnapshotData},
};
type DirtyDoc = (String, String, String, i64);
type DeletedDoc = HashMap<String, HashSet<String>>;
#[derive(Default, Debug)]
pub struct InMemoryIndex {
pub docs: HashMap<String, HashMap<String, DocData>>,
pub inverted: HashMap<String, HashMap<String, HashMap<String, i64>>>,
pub total_lens: HashMap<String, i64>,
pub dirty: HashMap<String, HashSet<String>>,
pub deleted: HashMap<String, HashSet<String>>,
}
impl InMemoryIndex {
pub fn add_doc(&mut self, index_name: &str, doc_id: &str, text: &str, index: bool) {
let tokens = if index { tokenize(text) } else { vec![] };
// doc_len should be the number of tokens (including duplicates)
let doc_len = tokens.len() as i64;
let mut pos_map: HashMap<String, Vec<(u32, u32)>> = HashMap::new();
for token in tokens {
pos_map
.entry(token.term)
.or_default()
.push((token.start as u32, token.end as u32));
}
if let Some(docs) = self.docs.get_mut(index_name) {
if let Some(old_data) = docs.remove(doc_id) {
*self.total_lens.entry(index_name.to_string()).or_default() -= old_data.doc_len;
if let Some(inverted) = self.inverted.get_mut(index_name) {
for (term, _) in old_data.term_pos {
if let Some(doc_map) = inverted.get_mut(&term) {
doc_map.remove(doc_id);
if doc_map.is_empty() {
inverted.remove(&term);
}
}
}
}
}
}
let doc_data = DocData {
content: text.to_string(),
doc_len,
term_pos: pos_map.clone(),
};
self
.docs
.entry(index_name.to_string())
.or_default()
.insert(doc_id.to_string(), doc_data);
*self.total_lens.entry(index_name.to_string()).or_default() += doc_len;
let inverted = self.inverted.entry(index_name.to_string()).or_default();
for (term, positions) in pos_map {
inverted
.entry(term)
.or_default()
.insert(doc_id.to_string(), positions.len() as i64);
}
self
.dirty
.entry(index_name.to_string())
.or_default()
.insert(doc_id.to_string());
if let Some(deleted) = self.deleted.get_mut(index_name) {
deleted.remove(doc_id);
}
}
pub fn remove_doc(&mut self, index_name: &str, doc_id: &str) {
if let Some(docs) = self.docs.get_mut(index_name) {
if let Some(old_data) = docs.remove(doc_id) {
*self.total_lens.entry(index_name.to_string()).or_default() -= old_data.doc_len;
if let Some(inverted) = self.inverted.get_mut(index_name) {
for (term, _) in old_data.term_pos {
if let Some(doc_map) = inverted.get_mut(&term) {
doc_map.remove(doc_id);
if doc_map.is_empty() {
inverted.remove(&term);
}
}
}
}
self
.deleted
.entry(index_name.to_string())
.or_default()
.insert(doc_id.to_string());
if let Some(dirty) = self.dirty.get_mut(index_name) {
dirty.remove(doc_id);
}
}
}
}
pub fn get_doc(&self, index_name: &str, doc_id: &str) -> Option<String> {
self
.docs
.get(index_name)
.and_then(|docs| docs.get(doc_id))
.map(|d| d.content.clone())
}
pub fn search(&self, index_name: &str, query: &str) -> Vec<(String, f64)> {
if query == "*" || query.is_empty() {
if let Some(docs) = self.docs.get(index_name) {
return docs.keys().map(|k| (k.clone(), 1.0)).collect();
}
return vec![];
}
let query_terms = tokenize(query);
if query_terms.is_empty() {
return vec![];
}
let inverted = match self.inverted.get(index_name) {
Some(i) => i,
None => return vec![],
};
let mut candidates: Option<HashSet<String>> = None;
for token in &query_terms {
if let Some(doc_map) = inverted.get(&token.term) {
let docs: HashSet<String> = doc_map.keys().cloned().collect();
match candidates {
None => candidates = Some(docs),
Some(ref mut c) => {
c.retain(|id| docs.contains(id));
}
}
if candidates.as_ref().unwrap().is_empty() {
return vec![];
}
} else {
return vec![];
}
}
let candidates = candidates.unwrap_or_default();
if candidates.is_empty() {
return vec![];
}
let docs = self.docs.get(index_name).unwrap();
let total_len = *self.total_lens.get(index_name).unwrap_or(&0);
let n = docs.len() as f64;
let avgdl = if n > 0.0 { total_len as f64 / n } else { 0.0 };
let k1 = 1.2;
let b = 0.75;
let mut scores: Vec<(String, f64)> = Vec::with_capacity(candidates.len());
let mut idfs = HashMap::new();
for token in &query_terms {
let n_q = inverted.get(&token.term).map(|m| m.len()).unwrap_or(0) as f64;
let idf = ((n - n_q + 0.5) / (n_q + 0.5) + 1.0).ln();
idfs.insert(&token.term, idf);
}
for doc_id in candidates {
let doc_data = docs.get(&doc_id).unwrap();
let mut score = 0.0;
for token in &query_terms {
if let Some(positions) = doc_data.term_pos.get(&token.term) {
let freq = positions.len() as f64;
let idf = idfs.get(&token.term).unwrap();
let numerator = freq * (k1 + 1.0);
let denominator = freq + k1 * (1.0 - b + b * (doc_data.doc_len as f64 / avgdl));
score += idf * (numerator / denominator);
}
}
scores.push((doc_id, score));
}
scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
scores
}
pub fn take_dirty_and_deleted(&mut self) -> (Vec<DirtyDoc>, DeletedDoc) {
let dirty = std::mem::take(&mut self.dirty);
let deleted = std::mem::take(&mut self.deleted);
let mut dirty_data = Vec::new();
for (index_name, doc_ids) in &dirty {
if let Some(docs) = self.docs.get(index_name) {
for doc_id in doc_ids {
if let Some(data) = docs.get(doc_id) {
dirty_data.push((
index_name.clone(),
doc_id.clone(),
data.content.clone(),
data.doc_len,
));
}
}
}
}
(dirty_data, deleted)
}
pub fn get_matches(&self, index_name: &str, doc_id: &str, query: &str) -> Vec<(u32, u32)> {
let mut matches = Vec::new();
if let Some(docs) = self.docs.get(index_name) {
if let Some(doc_data) = docs.get(doc_id) {
let query_tokens = tokenize(query);
for token in query_tokens {
if let Some(positions) = doc_data.term_pos.get(&token.term) {
matches.extend(positions.iter().cloned());
}
}
}
}
matches.sort_by(|a, b| a.0.cmp(&b.0));
matches
}
pub fn load_snapshot(&mut self, index_name: &str, snapshot: SnapshotData) {
let docs = self.docs.entry(index_name.to_string()).or_default();
let inverted = self.inverted.entry(index_name.to_string()).or_default();
let total_len = self.total_lens.entry(index_name.to_string()).or_default();
for (doc_id, doc_data) in snapshot.docs {
*total_len += doc_data.doc_len;
for (term, positions) in &doc_data.term_pos {
inverted
.entry(term.clone())
.or_default()
.insert(doc_id.clone(), positions.len() as i64);
}
docs.insert(doc_id, doc_data);
}
}
pub fn get_snapshot_data(&self, index_name: &str) -> Option<SnapshotData> {
self
.docs
.get(index_name)
.map(|docs| SnapshotData { docs: docs.clone() })
}
}

View File

@@ -0,0 +1,266 @@
mod memory_indexer;
mod tokenizer;
mod types;
use affine_common::doc_parser::{parse_doc_from_binary, ParseError};
pub use memory_indexer::InMemoryIndex;
use sqlx::Row;
pub use types::{
DocData, NativeBlockInfo, NativeCrawlResult, NativeMatch, NativeSearchHit, SnapshotData,
};
use y_octo::DocOptions;
use super::{
error::{Error, Result},
storage::SqliteDocStorage,
};
impl SqliteDocStorage {
pub async fn crawl_doc_data(&self, doc_id: &str) -> Result<NativeCrawlResult> {
let doc_bin = self
.load_doc_binary(doc_id)
.await?
.ok_or(ParseError::DocNotFound)?;
let result = parse_doc_from_binary(doc_bin, doc_id.to_string())?;
Ok(result.into())
}
async fn load_doc_binary(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
let snapshot = self.get_doc_snapshot(doc_id.to_string()).await?;
let mut updates = self.get_doc_updates(doc_id.to_string()).await?;
if snapshot.is_none() && updates.is_empty() {
return Ok(None);
}
updates.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
let mut segments =
Vec::with_capacity(snapshot.as_ref().map(|_| 1).unwrap_or(0) + updates.len());
if let Some(record) = snapshot {
segments.push(record.bin.to_vec());
}
segments.extend(updates.into_iter().map(|update| update.bin.to_vec()));
merge_updates(segments, doc_id).map(Some)
}
pub async fn init_index(&self) -> Result<()> {
let snapshots = sqlx::query("SELECT index_name, data FROM idx_snapshots")
.fetch_all(&self.pool)
.await?;
{
let mut index = self.index.write().await;
for row in snapshots {
let index_name: String = row.get("index_name");
let data: Vec<u8> = row.get("data");
if let Ok(decompressed) = zstd::stream::decode_all(std::io::Cursor::new(&data)) {
if let Ok((snapshot, _)) = bincode::serde::decode_from_slice::<SnapshotData, _>(
&decompressed,
bincode::config::standard(),
) {
index.load_snapshot(&index_name, snapshot);
}
}
}
}
Ok(())
}
async fn compact_index(&self, index_name: &str) -> Result<()> {
let snapshot_data = {
let index = self.index.read().await;
index.get_snapshot_data(index_name)
};
if let Some(data) = snapshot_data {
let blob = bincode::serde::encode_to_vec(&data, bincode::config::standard())
.map_err(|e| Error::Serialization(e.to_string()))?;
let compressed = zstd::stream::encode_all(std::io::Cursor::new(&blob), 0)
.map_err(|e| Error::Serialization(e.to_string()))?;
let mut tx = self.pool.begin().await?;
sqlx::query("INSERT OR REPLACE INTO idx_snapshots (index_name, data) VALUES (?, ?)")
.bind(index_name)
.bind(compressed)
.execute(&mut *tx)
.await?;
tx.commit().await?;
}
Ok(())
}
pub async fn flush_index(&self) -> Result<()> {
let (dirty_docs, deleted_docs) = {
let mut index = self.index.write().await;
index.take_dirty_and_deleted()
};
if dirty_docs.is_empty() && deleted_docs.is_empty() {
return Ok(());
}
let mut modified_indices = std::collections::HashSet::new();
for index_name in deleted_docs.keys() {
modified_indices.insert(index_name.clone());
}
for (index_name, _, _, _) in &dirty_docs {
modified_indices.insert(index_name.clone());
}
for index_name in modified_indices {
self.compact_index(&index_name).await?;
}
Ok(())
}
pub async fn fts_add(
&self,
index_name: &str,
doc_id: &str,
text: &str,
index: bool,
) -> Result<()> {
let mut idx = self.index.write().await;
idx.add_doc(index_name, doc_id, text, index);
Ok(())
}
pub async fn fts_delete(&self, index_name: &str, doc_id: &str) -> Result<()> {
let mut idx = self.index.write().await;
idx.remove_doc(index_name, doc_id);
Ok(())
}
pub async fn fts_get(&self, index_name: &str, doc_id: &str) -> Result<Option<String>> {
let idx = self.index.read().await;
Ok(idx.get_doc(index_name, doc_id))
}
pub async fn fts_search(&self, index_name: &str, query: &str) -> Result<Vec<NativeSearchHit>> {
let idx = self.index.read().await;
Ok(
idx
.search(index_name, query)
.into_iter()
.map(|(id, score)| NativeSearchHit { id, score })
.collect(),
)
}
pub async fn fts_get_matches(
&self,
index_name: &str,
doc_id: &str,
query: &str,
) -> Result<Vec<NativeMatch>> {
let idx = self.index.read().await;
Ok(
idx
.get_matches(index_name, doc_id, query)
.into_iter()
.map(|(start, end)| NativeMatch { start, end })
.collect(),
)
}
}
fn merge_updates(mut segments: Vec<Vec<u8>>, guid: &str) -> Result<Vec<u8>> {
if segments.is_empty() {
return Err(ParseError::DocNotFound.into());
}
if segments.len() == 1 {
return segments.pop().ok_or(ParseError::DocNotFound.into());
}
let mut doc = DocOptions::new().with_guid(guid.to_string()).build();
for update in segments.iter() {
doc
.apply_update_from_binary_v1(update)
.map_err(|_| ParseError::InvalidBinary)?;
}
let buffer = doc
.encode_update_v1()
.map_err(|err| ParseError::ParserError(err.to_string()))?;
Ok(buffer)
}
#[cfg(test)]
mod tests {
use std::path::{Path, PathBuf};
use affine_common::doc_parser::ParseError;
use chrono::Utc;
use serde_json::Value;
use tokio::fs;
use uuid::Uuid;
use super::{super::error::Error, *};
const DEMO_BIN: &[u8] = include_bytes!("../../../../../common/native/fixtures/demo.ydoc");
const DEMO_JSON: &[u8] = include_bytes!("../../../../../common/native/fixtures/demo.ydoc.json");
fn temp_workspace_dir() -> PathBuf {
std::env::temp_dir().join(format!("affine-native-{}", Uuid::new_v4()))
}
async fn init_db(path: &Path) -> SqliteDocStorage {
fs::create_dir_all(path.parent().unwrap()).await.unwrap();
let storage = SqliteDocStorage::new(path.to_string_lossy().into_owned());
storage.connect().await.unwrap();
storage
}
async fn cleanup(path: &Path) {
let _ = fs::remove_dir_all(path.parent().unwrap()).await;
}
#[tokio::test]
async fn parse_demo_snapshot_matches_fixture() {
let base = temp_workspace_dir();
fs::create_dir_all(&base).await.unwrap();
let db_path = base.join("storage.db");
let storage = init_db(&db_path).await;
sqlx::query(r#"INSERT INTO snapshots (doc_id, data, updated_at) VALUES (?, ?, ?)"#)
.bind("demo-doc")
.bind(DEMO_BIN)
.bind(Utc::now().naive_utc())
.execute(&storage.pool)
.await
.unwrap();
let result = storage.crawl_doc_data("demo-doc").await.unwrap();
let expected: Value = serde_json::from_slice(DEMO_JSON).unwrap();
let actual = serde_json::to_value(&result).unwrap();
assert_eq!(expected, actual);
storage.close().await;
cleanup(&db_path).await;
}
#[tokio::test]
async fn missing_doc_returns_error() {
let base = temp_workspace_dir();
fs::create_dir_all(&base).await.unwrap();
let db_path = base.join("storage.db");
let storage = init_db(&db_path).await;
let err = storage.crawl_doc_data("absent-doc").await.unwrap_err();
assert!(matches!(err, Error::Parse(ParseError::DocNotFound)));
storage.close().await;
cleanup(&db_path).await;
}
}

View File

@@ -0,0 +1,85 @@
use jieba_rs::Jieba;
use once_cell::sync::Lazy;
use tiniestsegmenter::tokenize as ts_tokenize;
static JIEBA: Lazy<Jieba> = Lazy::new(Jieba::new);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Token {
pub term: String,
pub start: usize,
pub end: usize,
}
pub fn tokenize(text: &str) -> Vec<Token> {
let mut tokens = Vec::new();
// Use jieba for Chinese/English
// Jieba tokenize returns tokens with offsets
let jieba_tokens = JIEBA.tokenize(text, jieba_rs::TokenizeMode::Search, false);
for token in jieba_tokens {
if token.word.chars().any(|c| c.is_alphanumeric()) {
tokens.push(Token {
term: token.word.to_lowercase(),
start: token.start,
end: token.end,
});
}
}
// Use TinySegmenter for Japanese
// TinySegmenter does not provide offsets, so we have to find them manually
// This is a simplified approach and might not be perfect for repeated terms
let mut last_pos = 0;
for term in ts_tokenize(text) {
if term.chars().any(|c| c.is_alphanumeric()) {
if let Some(pos) = text[last_pos..].find(term) {
let start = last_pos + pos;
let end = start + term.len();
tokens.push(Token {
term: term.to_lowercase(),
start,
end,
});
last_pos = end;
}
}
}
// Manually handle Korean bigrams and unigrams
let chars: Vec<char> = text.chars().collect();
let mut byte_offset = 0;
for (i, &c) in chars.iter().enumerate() {
let char_len = c.len_utf8();
if is_hangul(c) {
tokens.push(Token {
term: c.to_string().to_lowercase(),
start: byte_offset,
end: byte_offset + char_len,
});
if i + 1 < chars.len() {
let next = chars[i + 1];
if is_hangul(next) {
let next_len = next.len_utf8();
tokens.push(Token {
term: format!("{}{}", c, next).to_lowercase(),
start: byte_offset,
end: byte_offset + char_len + next_len,
});
}
}
}
byte_offset += char_len;
}
tokens
}
fn is_hangul(c: char) -> bool {
// Hangul Syllables
('\u{AC00}'..='\u{D7AF}').contains(&c)
// Hangul Jamo
|| ('\u{1100}'..='\u{11FF}').contains(&c)
// Hangul Compatibility Jamo
|| ('\u{3130}'..='\u{318F}').contains(&c)
}

View File

@@ -0,0 +1,79 @@
use std::collections::HashMap;
use affine_common::doc_parser::{BlockInfo, CrawlResult};
use napi_derive::napi;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocData {
pub content: String,
pub doc_len: i64,
pub term_pos: HashMap<String, Vec<(u32, u32)>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SnapshotData {
pub docs: HashMap<String, DocData>,
}
#[napi(object)]
#[derive(Debug, Serialize)]
pub struct NativeBlockInfo {
pub block_id: String,
pub flavour: String,
pub content: Option<Vec<String>>,
pub blob: Option<Vec<String>>,
pub ref_doc_id: Option<Vec<String>>,
pub ref_info: Option<Vec<String>>,
pub parent_flavour: Option<String>,
pub parent_block_id: Option<String>,
pub additional: Option<String>,
}
#[napi(object)]
#[derive(Debug, Serialize)]
pub struct NativeCrawlResult {
pub blocks: Vec<NativeBlockInfo>,
pub title: String,
pub summary: String,
}
#[napi(object)]
#[derive(Debug, Serialize)]
pub struct NativeSearchHit {
pub id: String,
pub score: f64,
}
#[napi(object)]
#[derive(Debug, Serialize)]
pub struct NativeMatch {
pub start: u32,
pub end: u32,
}
impl From<BlockInfo> for NativeBlockInfo {
fn from(value: BlockInfo) -> Self {
Self {
block_id: value.block_id,
flavour: value.flavour,
content: value.content,
blob: value.blob,
ref_doc_id: value.ref_doc_id,
ref_info: value.ref_info,
parent_flavour: value.parent_flavour,
parent_block_id: value.parent_block_id,
additional: value.additional,
}
}
}
impl From<CrawlResult> for NativeCrawlResult {
fn from(value: CrawlResult) -> Self {
Self {
blocks: value.blocks.into_iter().map(Into::into).collect(),
title: value.title,
summary: value.summary,
}
}
}

View File

@@ -450,6 +450,77 @@ impl DocStoragePool {
Ok(result)
}
#[napi]
pub async fn fts_add_document(
&self,
id: String,
index_name: String,
doc_id: String,
text: String,
index: bool,
) -> Result<()> {
let storage = self.pool.get(id).await?;
storage.fts_add(&index_name, &doc_id, &text, index).await?;
Ok(())
}
#[napi]
pub async fn fts_flush_index(&self, id: String) -> Result<()> {
let storage = self.pool.get(id).await?;
storage.flush_index().await?;
Ok(())
}
#[napi]
pub async fn fts_delete_document(
&self,
id: String,
index_name: String,
doc_id: String,
) -> Result<()> {
let storage = self.pool.get(id).await?;
storage.fts_delete(&index_name, &doc_id).await?;
Ok(())
}
#[napi]
pub async fn fts_get_document(
&self,
id: String,
index_name: String,
doc_id: String,
) -> Result<Option<String>> {
let storage = self.pool.get(id).await?;
Ok(storage.fts_get(&index_name, &doc_id).await?)
}
#[napi]
pub async fn fts_search(
&self,
id: String,
index_name: String,
query: String,
) -> Result<Vec<indexer::NativeSearchHit>> {
let storage = self.pool.get(id).await?;
Ok(storage.fts_search(&index_name, &query).await?)
}
#[napi]
pub async fn fts_get_matches(
&self,
id: String,
index_name: String,
doc_id: String,
query: String,
) -> Result<Vec<indexer::NativeMatch>> {
let storage = self.pool.get(id).await?;
Ok(
storage
.fts_get_matches(&index_name, &doc_id, &query)
.await?,
)
}
}
#[napi]

View File

@@ -1,15 +1,19 @@
use std::sync::Arc;
use affine_schema::get_migrator;
use sqlx::{
migrate::MigrateDatabase,
sqlite::{Sqlite, SqliteConnectOptions, SqlitePoolOptions},
Pool, Row,
};
use tokio::sync::RwLock;
use super::error::Result;
use super::{error::Result, indexer::InMemoryIndex};
pub struct SqliteDocStorage {
pub pool: Pool<Sqlite>,
path: String,
pub index: Arc<RwLock<InMemoryIndex>>,
}
impl SqliteDocStorage {
@@ -20,6 +24,8 @@ impl SqliteDocStorage {
let mut pool_options = SqlitePoolOptions::new();
let index = Arc::new(RwLock::new(InMemoryIndex::default()));
if path == ":memory:" {
pool_options = pool_options
.min_connections(1)
@@ -30,6 +36,7 @@ impl SqliteDocStorage {
Self {
pool: pool_options.connect_lazy_with(sqlite_options),
path,
index,
}
} else {
Self {
@@ -37,6 +44,7 @@ impl SqliteDocStorage {
.max_connections(4)
.connect_lazy_with(sqlite_options.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)),
path,
index,
}
}
}
@@ -61,6 +69,7 @@ impl SqliteDocStorage {
};
self.migrate().await?;
self.init_index().await?;
Ok(())
}

View File

@@ -71,6 +71,18 @@ CREATE TABLE "peer_blob_sync" (
PRIMARY KEY (peer, blob_id)
);
CREATE INDEX peer_blob_sync_peer ON peer_blob_sync (peer);
"#,
None,
),
// add idx snapshots
(
"add_idx_snapshots",
r#"
CREATE TABLE idx_snapshots (
index_name TEXT PRIMARY KEY NOT NULL,
data BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
"#,
None,
),