From 215541d331d7afcaf4f1244507dcea2c5e48655e Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Tue, 9 Dec 2025 22:04:50 +0800 Subject: [PATCH] feat: improve indexing perf with native indexer (#14066) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix #12132, #14006, #13496, #12375, #12132 The previous idb indexer generated a large number of scattered writes when flushing to disk, which caused CPU and disk write spikes. If the document volume is extremely large, the accumulation of write transactions will cause memory usage to continuously increase. This PR introduces batch writes to mitigate write performance on the web side, and adds a native indexer on the Electron side to greatly improve performance. ## Summary by CodeRabbit * **New Features** * Full-text search (FTS) added across storage layers and native plugins: indexing, search, document retrieval, match ranges, and index flushing. * New SQLite-backed indexer storage, streaming search/aggregate APIs, and in-memory index with node-building and highlighting. * **Performance** * Indexing rewritten for batched, concurrent writes and parallel metadata updates. * Search scoring enhanced to consider multiple term positions and aggregated term data. * **Other** * Configurable refresh interval and indexer version bump. ✏️ Tip: You can customize this high-level summary in your review settings. --- Cargo.lock | 357 +++++++++++++++--- .../src/impls/idb/indexer/data-struct.ts | 6 +- .../nbstore/src/impls/idb/indexer/index.ts | 1 + .../src/impls/idb/indexer/inverted-index.ts | 207 +++++----- .../common/nbstore/src/impls/idb/schema.ts | 2 +- .../common/nbstore/src/impls/sqlite/db.ts | 29 ++ .../common/nbstore/src/impls/sqlite/index.ts | 3 + .../nbstore/src/impls/sqlite/indexer/index.ts | 244 ++++++++++++ .../nbstore/src/impls/sqlite/indexer/match.ts | 105 ++++++ .../src/impls/sqlite/indexer/node-builder.ts | 97 +++++ .../nbstore/src/impls/sqlite/indexer/query.ts | 72 ++++ .../nbstore/src/impls/sqlite/indexer/utils.ts | 20 + .../common/nbstore/src/storage/indexer.ts | 2 + .../common/nbstore/src/sync/indexer/index.ts | 13 +- .../src/plugins/nbstore/definitions.ts | 29 ++ .../apps/android/src/plugins/nbstore/index.ts | 67 ++++ .../electron/src/helper/nbstore/handlers.ts | 6 + .../ios/src/plugins/nbstore/definitions.ts | 29 ++ .../apps/ios/src/plugins/nbstore/index.ts | 67 ++++ .../modules/workspace-engine/impls/local.ts | 7 +- packages/frontend/native/index.d.ts | 16 + packages/frontend/native/nbstore/Cargo.toml | 5 + packages/frontend/native/nbstore/src/error.rs | 2 + .../frontend/native/nbstore/src/indexer.rs | 180 --------- .../nbstore/src/indexer/memory_indexer.rs | 261 +++++++++++++ .../native/nbstore/src/indexer/mod.rs | 266 +++++++++++++ .../native/nbstore/src/indexer/tokenizer.rs | 85 +++++ .../native/nbstore/src/indexer/types.rs | 79 ++++ packages/frontend/native/nbstore/src/lib.rs | 71 ++++ .../frontend/native/nbstore/src/storage.rs | 11 +- packages/frontend/native/schema/src/lib.rs | 12 + 31 files changed, 2030 insertions(+), 321 deletions(-) create mode 100644 packages/common/nbstore/src/impls/sqlite/indexer/index.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/indexer/match.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/indexer/node-builder.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/indexer/query.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/indexer/utils.ts delete mode 100644 packages/frontend/native/nbstore/src/indexer.rs create mode 100644 packages/frontend/native/nbstore/src/indexer/memory_indexer.rs create mode 100644 packages/frontend/native/nbstore/src/indexer/mod.rs create mode 100644 packages/frontend/native/nbstore/src/indexer/tokenizer.rs create mode 100644 packages/frontend/native/nbstore/src/indexer/types.rs diff --git a/Cargo.lock b/Cargo.lock index 557c2ef60e..a3e256a9a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "adobe-cmap-parser" version = "0.4.1" @@ -144,19 +150,24 @@ dependencies = [ "affine_common", "affine_schema", "anyhow", + "bincode", "chrono", "dotenvy", + "jieba-rs", "napi", "napi-build", "napi-derive", + "once_cell", "serde", "serde_json", "sqlx", "thiserror 2.0.12", + "tiniestsegmenter", "tokio", "uniffi", "uuid", "y-octo", + "zstd", ] [[package]] @@ -384,7 +395,7 @@ dependencies = [ "rustc-hash 2.1.1", "serde", "serde_derive", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -442,7 +453,7 @@ checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -463,7 +474,7 @@ dependencies = [ "derive_utils", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -518,6 +529,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bindgen" version = "0.70.1" @@ -533,7 +564,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -731,6 +762,15 @@ dependencies = [ "shlex", ] +[[package]] +name = "cedarwood" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d910bedd62c24733263d0bed247460853c9d22e8956bd4cd964302095e04e90" +dependencies = [ + "smallvec", +] + [[package]] name = "cesu8" version = "1.1.0" @@ -877,7 +917,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -1019,6 +1059,15 @@ dependencies = [ "thiserror 2.0.12", ] +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "core_maths" version = "0.1.1" @@ -1243,6 +1292,12 @@ version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f211af61d8efdd104f96e57adf5e426ba1bc3ed7a4ead616e15e5881fd79c4d" +[[package]] +name = "dary_heap" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d2e3287df1c007e74221c49ca10a95d557349e54b3a75dc2fb14712c751f04" + [[package]] name = "dashmap" version = "6.1.0" @@ -1282,7 +1337,7 @@ checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -1295,7 +1350,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -1306,7 +1361,7 @@ checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -1347,7 +1402,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -1555,6 +1610,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.5.0" @@ -1573,7 +1634,7 @@ checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -1818,7 +1879,18 @@ checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", ] [[package]] @@ -2078,6 +2150,43 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "include-flate" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01b7cb6ca682a621e7cda1c358c9724b53a7b4409be9be1dd443b7f3a26f998" +dependencies = [ + "include-flate-codegen", + "include-flate-compress", + "libflate", + "zstd", +] + +[[package]] +name = "include-flate-codegen" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f49bf5274aebe468d6e6eba14a977eaf1efa481dc173f361020de70c1c48050" +dependencies = [ + "include-flate-compress", + "libflate", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.111", + "zstd", +] + +[[package]] +name = "include-flate-compress" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae6a40e716bcd5931f5dbb79cd921512a4f647e2e9413fded3171fca3824dbc" +dependencies = [ + "libflate", + "zstd", +] + [[package]] name = "indexmap" version = "2.9.0" @@ -2175,6 +2284,29 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47f142fe24a9c9944451e8349de0a56af5f3e7226dc46f3ed4d4ecc0b85af75e" +[[package]] +name = "jieba-macros" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348294e44ee7e3c42685da656490f8febc7359632544019621588902216da95c" +dependencies = [ + "phf_codegen 0.13.1", +] + +[[package]] +name = "jieba-rs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "766bd7012aa5ba49411ebdf4e93bddd59b182d2918e085d58dec5bb9b54b7105" +dependencies = [ + "cedarwood", + "include-flate", + "jieba-macros", + "phf 0.13.1", + "regex", + "rustc-hash 2.1.1", +] + [[package]] name = "jni" version = "0.21.1" @@ -2267,6 +2399,30 @@ version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +[[package]] +name = "libflate" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3248b8d211bd23a104a42d81b4fa8bb8ac4a3b75e7a43d85d2c9ccb6179cd74" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a599cb10a9cd92b1300debcef28da8f70b935ec937f44fcd1b70a7c986a11c5c" +dependencies = [ + "core2", + "hashbrown 0.16.1", + "rle-decode-fast", +] + [[package]] name = "libloading" version = "0.8.8" @@ -2405,7 +2561,7 @@ checksum = "7a2629bb1404f3d34c2e921f21fd34ba00b206124c81f65c50b43b6aaefeb016" dependencies = [ "log", "phf 0.10.1", - "phf_codegen", + "phf_codegen 0.10.0", "string_cache", "string_cache_codegen", "tendril", @@ -2541,7 +2697,7 @@ dependencies = [ "napi-derive-backend", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -2554,7 +2710,7 @@ dependencies = [ "proc-macro2", "quote", "semver", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -2693,7 +2849,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -2744,7 +2900,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -2977,6 +3133,16 @@ dependencies = [ "phf_shared 0.11.3", ] +[[package]] +name = "phf" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1562dc717473dbaa4c1f85a36410e03c047b2e7df7f45ee938fbef64ae7fadf" +dependencies = [ + "phf_shared 0.13.1", + "serde", +] + [[package]] name = "phf_codegen" version = "0.10.0" @@ -2987,6 +3153,16 @@ dependencies = [ "phf_shared 0.10.0", ] +[[package]] +name = "phf_codegen" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49aa7f9d80421bca176ca8dbfebe668cc7a2684708594ec9f3c0db0805d5d6e1" +dependencies = [ + "phf_generator 0.13.1", + "phf_shared 0.13.1", +] + [[package]] name = "phf_generator" version = "0.10.0" @@ -3007,6 +3183,16 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "phf_generator" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135ace3a761e564ec88c03a77317a7c6b80bb7f7135ef2544dbe054243b89737" +dependencies = [ + "fastrand", + "phf_shared 0.13.1", +] + [[package]] name = "phf_macros" version = "0.11.3" @@ -3017,7 +3203,7 @@ dependencies = [ "phf_shared 0.11.3", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -3038,6 +3224,15 @@ dependencies = [ "siphasher 1.0.1", ] +[[package]] +name = "phf_shared" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e57fef6bc5981e38c2ce2d63bfa546861309f875b8a75f092d1d54ae2d64f266" +dependencies = [ + "siphasher 1.0.1", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -3169,6 +3364,30 @@ dependencies = [ "toml_edit", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -3206,7 +3425,7 @@ checksum = "4ee1c9ac207483d5e7db4940700de86a9aae46ef90c48b57f99fe7edb8345e49" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -3434,6 +3653,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rsa" version = "0.9.8" @@ -3486,7 +3711,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.101", + "syn 2.0.111", "walkdir", ] @@ -3667,7 +3892,7 @@ checksum = "1783eabc414609e28a5ba76aee5ddd52199f7107a0b24c2e9746a1ecc34a683d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -3696,7 +3921,7 @@ checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -3934,7 +4159,7 @@ dependencies = [ "quote", "sqlx-core", "sqlx-macros-core", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -3957,7 +4182,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 2.0.101", + "syn 2.0.111", "tokio", "url", ] @@ -4153,7 +4378,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -4371,9 +4596,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.101" +version = "2.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" dependencies = [ "proc-macro2", "quote", @@ -4388,7 +4613,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -4469,7 +4694,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -4480,7 +4705,7 @@ checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -4508,6 +4733,12 @@ dependencies = [ "rustc-hash 1.1.0", ] +[[package]] +name = "tiniestsegmenter" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f39721481fb54f0e9f3a1da5a6ac6063c61ec62ec828cd5e1860acce9458f40" + [[package]] name = "tinystr" version = "0.8.1" @@ -4569,7 +4800,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -4629,7 +4860,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -4954,7 +5185,7 @@ dependencies = [ "indexmap", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -4969,7 +5200,7 @@ dependencies = [ "proc-macro2", "quote", "serde", - "syn 2.0.101", + "syn 2.0.111", "toml", "uniffi_meta", ] @@ -5017,6 +5248,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "url" version = "2.5.4" @@ -5081,6 +5318,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "vsimd" version = "0.8.0" @@ -5149,7 +5392,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", "wasm-bindgen-shared", ] @@ -5184,7 +5427,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5384,7 +5627,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -5395,7 +5638,7 @@ checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -5406,7 +5649,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -5417,7 +5660,7 @@ checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -5806,7 +6049,7 @@ checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", "synstructure", ] @@ -5845,7 +6088,7 @@ checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -5865,7 +6108,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", "synstructure", ] @@ -5905,7 +6148,7 @@ checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.111", ] [[package]] @@ -5923,3 +6166,31 @@ dependencies = [ "num_enum", "thiserror 1.0.69", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/packages/common/nbstore/src/impls/idb/indexer/data-struct.ts b/packages/common/nbstore/src/impls/idb/indexer/data-struct.ts index 09e3d9f5f0..c349a868b4 100644 --- a/packages/common/nbstore/src/impls/idb/indexer/data-struct.ts +++ b/packages/common/nbstore/src/impls/idb/indexer/data-struct.ts @@ -180,9 +180,9 @@ export class DataStruct { .index('nid') .getAllKeys(nid); - for (const indexId of indexIds) { - await trx.objectStore('invertedIndex').delete(indexId); - } + await Promise.all( + indexIds.map(indexId => trx.objectStore('invertedIndex').delete(indexId)) + ); } private async delete( diff --git a/packages/common/nbstore/src/impls/idb/indexer/index.ts b/packages/common/nbstore/src/impls/idb/indexer/index.ts index 3da48a5709..35bc031e65 100644 --- a/packages/common/nbstore/src/impls/idb/indexer/index.ts +++ b/packages/common/nbstore/src/impls/idb/indexer/index.ts @@ -18,6 +18,7 @@ import { backoffRetry, exhaustMapWithTrailing } from './utils'; export class IndexedDBIndexerStorage extends IndexerStorageBase { static readonly identifier = 'IndexedDBIndexerStorage'; + override recommendRefreshInterval: number = 0; // force refresh on each indexer operation readonly connection = share(new IDBConnection(this.options)); override isReadonly = false; private readonly data = new DataStruct(); diff --git a/packages/common/nbstore/src/impls/idb/indexer/inverted-index.ts b/packages/common/nbstore/src/impls/idb/indexer/inverted-index.ts index 09ec637cd1..3680c5f538 100644 --- a/packages/common/nbstore/src/impls/idb/indexer/inverted-index.ts +++ b/packages/common/nbstore/src/impls/idb/indexer/inverted-index.ts @@ -68,13 +68,16 @@ export class StringInvertedIndex implements InvertedIndex { } async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { - for (const term of terms) { - await trx.objectStore('invertedIndex').put({ - table: this.table, - key: InvertedIndexKey.forString(this.fieldKey, term).buffer(), - nid: id, - }); - } + const uniqueTerms = new Set(terms); + await Promise.all( + Array.from(uniqueTerms).map(term => + trx.objectStore('invertedIndex').put({ + table: this.table, + key: InvertedIndexKey.forString(this.fieldKey, term).buffer(), + nid: id, + }) + ) + ); } } @@ -127,13 +130,16 @@ export class IntegerInvertedIndex implements InvertedIndex { } async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { - for (const term of terms) { - await trx.objectStore('invertedIndex').put({ - table: this.table, - key: InvertedIndexKey.forInt64(this.fieldKey, BigInt(term)).buffer(), - nid: id, - }); - } + const uniqueTerms = new Set(terms); + await Promise.all( + Array.from(uniqueTerms).map(term => + trx.objectStore('invertedIndex').put({ + table: this.table, + key: InvertedIndexKey.forInt64(this.fieldKey, BigInt(term)).buffer(), + nid: id, + }) + ) + ); } } @@ -186,16 +192,19 @@ export class BooleanInvertedIndex implements InvertedIndex { } async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { - for (const term of terms) { - await trx.objectStore('invertedIndex').put({ - table: this.table, - key: InvertedIndexKey.forBoolean( - this.fieldKey, - term === 'true' - ).buffer(), - nid: id, - }); - } + const uniqueTerms = new Set(terms); + await Promise.all( + Array.from(uniqueTerms).map(term => + trx.objectStore('invertedIndex').put({ + table: this.table, + key: InvertedIndexKey.forBoolean( + this.fieldKey, + term === 'true' + ).buffer(), + nid: id, + }) + ) + ); } } @@ -260,37 +269,37 @@ export class FullTextInvertedIndex implements InvertedIndex { const key = InvertedIndexKey.fromBuffer(obj.key); const originTokenTerm = key.asString(); const matchLength = token.term.length; - const position = obj.pos ?? { - i: 0, - l: 0, - rs: [], - }; - const termFreq = position.rs.length; - const totalCount = objs.length; - const fieldLength = position.l; - const score = - bm25(termFreq, 1, totalCount, fieldLength, avgFieldLength) * - (matchLength / originTokenTerm.length); - const match = { - score, - positions: new Map(), - }; - const ranges = match.positions.get(position.i) || []; - ranges.push( - ...position.rs.map(([start, _end]) => [start, start + matchLength]) - ); - match.positions.set(position.i, ranges); - submatched.push({ - nid: obj.nid, - score, - position: { - index: position.i, - ranges: position.rs.map(([start, _end]) => [ - start, - start + matchLength, - ]), - }, - }); + let positions = obj.pos + ? Array.isArray(obj.pos) + ? obj.pos + : [obj.pos] + : [ + { + i: 0, + l: 0, + rs: [] as [number, number][], + }, + ]; + + for (const position of positions) { + const termFreq = position.rs.length; + const totalCount = objs.length; + const fieldLength = position.l; + const score = + bm25(termFreq, 1, totalCount, fieldLength, avgFieldLength) * + (matchLength / originTokenTerm.length); + submatched.push({ + nid: obj.nid, + score, + position: { + index: position.i, + ranges: position.rs.map(([start, _end]) => [ + start, + start + matchLength, + ]), + }, + }); + } } // normalize score @@ -369,6 +378,13 @@ export class FullTextInvertedIndex implements InvertedIndex { } async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { + const promises: Promise[] = []; + const totalTermLength = terms.reduce((acc, term) => acc + term.length, 0); + const globalTokenMap = new Map< + string, + { l: number; i: number; rs: [number, number][] }[] + >(); + for (let i = 0; i < terms.length; i++) { const tokenMap = new Map(); const originString = terms[i]; @@ -382,44 +398,57 @@ export class FullTextInvertedIndex implements InvertedIndex { } for (const [term, tokens] of tokenMap) { - await trx.objectStore('invertedIndex').put({ + const entry = globalTokenMap.get(term) || []; + entry.push({ + l: originString.length, + i: i, + rs: tokens.map(token => [token.start, token.end]), + }); + globalTokenMap.set(term, entry); + } + } + + for (const [term, positions] of globalTokenMap) { + promises.push( + trx.objectStore('invertedIndex').put({ table: this.table, key: InvertedIndexKey.forString(this.fieldKey, term).buffer(), nid: id, - pos: { - l: originString.length, - i: i, - rs: tokens.map(token => [token.start, token.end]), - }, - }); - } - - const indexerMetadataStore = trx.objectStore('indexerMetadata'); - // update avg-field-length - const totalCount = - ( - await indexerMetadataStore.get( - `full-text:field-count:${this.table}:${this.fieldKey}` - ) - )?.value ?? 0; - const avgFieldLength = - ( - await indexerMetadataStore.get( - `full-text:avg-field-length:${this.table}:${this.fieldKey}` - ) - )?.value ?? 0; - await indexerMetadataStore.put({ - key: `full-text:field-count:${this.table}:${this.fieldKey}`, - value: totalCount + 1, - }); - await indexerMetadataStore.put({ - key: `full-text:avg-field-length:${this.table}:${this.fieldKey}`, - value: - avgFieldLength + - (terms.reduce((acc, term) => acc + term.length, 0) - avgFieldLength) / - (totalCount + 1), - }); + pos: positions, + }) + ); } + + const indexerMetadataStore = trx.objectStore('indexerMetadata'); + const countKey = `full-text:field-count:${this.table}:${this.fieldKey}`; + const avgKey = `full-text:avg-field-length:${this.table}:${this.fieldKey}`; + + const [countObj, avgObj] = await Promise.all([ + indexerMetadataStore.get(countKey), + indexerMetadataStore.get(avgKey), + ]); + + const totalCount = countObj?.value ?? 0; + const avgFieldLength = avgObj?.value ?? 0; + + const newTotalCount = totalCount + terms.length; + const newAvgFieldLength = + (avgFieldLength * totalCount + totalTermLength) / newTotalCount; + + promises.push( + indexerMetadataStore.put({ + key: countKey, + value: newTotalCount, + }) + ); + promises.push( + indexerMetadataStore.put({ + key: avgKey, + value: isNaN(newAvgFieldLength) ? 0 : newAvgFieldLength, + }) + ); + + await Promise.all(promises); } } diff --git a/packages/common/nbstore/src/impls/idb/schema.ts b/packages/common/nbstore/src/impls/idb/schema.ts index a27cf6fce5..d2be99a8cc 100644 --- a/packages/common/nbstore/src/impls/idb/schema.ts +++ b/packages/common/nbstore/src/impls/idb/schema.ts @@ -162,7 +162,7 @@ export interface DocStorageSchema extends DBSchema { i: number /* index */; l: number /* length */; rs: [number, number][] /* ranges: [start, end] */; - }; + }[]; key: ArrayBuffer; }; indexes: { key: [string, ArrayBuffer]; nid: number }; diff --git a/packages/common/nbstore/src/impls/sqlite/db.ts b/packages/common/nbstore/src/impls/sqlite/db.ts index f8d501ab50..6138ca6d51 100644 --- a/packages/common/nbstore/src/impls/sqlite/db.ts +++ b/packages/common/nbstore/src/impls/sqlite/db.ts @@ -83,6 +83,35 @@ export interface NativeDBApis { blobId: string ) => Promise; crawlDocData: (id: string, docId: string) => Promise; + ftsAddDocument: ( + id: string, + indexName: string, + docId: string, + text: string, + index: boolean + ) => Promise; + ftsDeleteDocument: ( + id: string, + indexName: string, + docId: string + ) => Promise; + ftsSearch: ( + id: string, + indexName: string, + query: string + ) => Promise<{ id: string; score: number }[]>; + ftsGetDocument: ( + id: string, + indexName: string, + docId: string + ) => Promise; + ftsGetMatches: ( + id: string, + indexName: string, + docId: string, + query: string + ) => Promise<{ start: number; end: number }[]>; + ftsFlushIndex: (id: string) => Promise; } type NativeDBApisWrapper = NativeDBApis extends infer APIs diff --git a/packages/common/nbstore/src/impls/sqlite/index.ts b/packages/common/nbstore/src/impls/sqlite/index.ts index 8797983ca2..7d233b4269 100644 --- a/packages/common/nbstore/src/impls/sqlite/index.ts +++ b/packages/common/nbstore/src/impls/sqlite/index.ts @@ -3,16 +3,19 @@ import { SqliteBlobStorage } from './blob'; import { SqliteBlobSyncStorage } from './blob-sync'; import { SqliteDocStorage } from './doc'; import { SqliteDocSyncStorage } from './doc-sync'; +import { SqliteIndexerStorage } from './indexer'; export * from './blob'; export * from './blob-sync'; export { bindNativeDBApis, type NativeDBApis } from './db'; export * from './doc'; export * from './doc-sync'; +export * from './indexer'; export const sqliteStorages = [ SqliteDocStorage, SqliteBlobStorage, SqliteDocSyncStorage, SqliteBlobSyncStorage, + SqliteIndexerStorage, ] satisfies StorageConstructor[]; diff --git a/packages/common/nbstore/src/impls/sqlite/indexer/index.ts b/packages/common/nbstore/src/impls/sqlite/indexer/index.ts new file mode 100644 index 0000000000..e11e8445c2 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/indexer/index.ts @@ -0,0 +1,244 @@ +import { merge, Observable, of, Subject } from 'rxjs'; +import { filter, throttleTime } from 'rxjs/operators'; + +import { share } from '../../../connection'; +import type { + AggregateOptions, + AggregateResult, + IndexerDocument, + Query, + SearchOptions, + SearchResult, +} from '../../../storage'; +import { IndexerStorageBase } from '../../../storage'; +import { IndexerSchema } from '../../../storage/indexer/schema'; +import { fromPromise } from '../../../utils/from-promise'; +import { backoffRetry, exhaustMapWithTrailing } from '../../idb/indexer/utils'; +import { NativeDBConnection, type SqliteNativeDBOptions } from '../db'; +import { createNode } from './node-builder'; +import { queryRaw } from './query'; +import { getText, tryParseArrayField } from './utils'; + +export class SqliteIndexerStorage extends IndexerStorageBase { + static readonly identifier = 'SqliteIndexerStorage'; + override readonly recommendRefreshInterval = 30 * 1000; // 5 seconds + readonly connection: NativeDBConnection; + readonly isReadonly = false; + private readonly tableUpdate$ = new Subject(); + + constructor(options: SqliteNativeDBOptions) { + super(); + this.connection = share(new NativeDBConnection(options)); + } + + private watchTableUpdated(table: string) { + return this.tableUpdate$.asObservable().pipe(filter(t => t === table)); + } + + async search>( + table: T, + query: Query, + options?: O + ): Promise> { + const match = await queryRaw(this.connection, table, query); + + // Pagination + const limit = options?.pagination?.limit ?? 10; + const skip = options?.pagination?.skip ?? 0; + const ids = match.toArray(); + const pagedIds = ids.slice(skip, skip + limit); + + const nodes = []; + for (const id of pagedIds) { + const node = await createNode( + this.connection, + table, + id, + match.getScore(id), + options ?? {}, + query + ); + nodes.push(node); + } + + return { + pagination: { + count: ids.length, + limit, + skip, + hasMore: ids.length > skip + limit, + }, + nodes, + }; + } + + async aggregate< + T extends keyof IndexerSchema, + const O extends AggregateOptions, + >( + table: T, + query: Query, + field: keyof IndexerSchema[T], + options?: O + ): Promise> { + const match = await queryRaw(this.connection, table, query); + const ids = match.toArray(); + + const buckets: any[] = []; + + for (const id of ids) { + const text = await this.connection.apis.ftsGetDocument( + `${table}:${field as string}`, + id + ); + if (text) { + let values: string[] = [text]; + const parsed = tryParseArrayField(text); + if (parsed) { + values = parsed; + } + + for (const val of values) { + let bucket = buckets.find(b => b.key === val); + if (!bucket) { + bucket = { key: val, count: 0, score: 0 }; + if (options?.hits) { + bucket.hits = { + pagination: { count: 0, limit: 0, skip: 0, hasMore: false }, + nodes: [], + }; + } + buckets.push(bucket); + } + bucket.count++; + + if (options?.hits) { + const hitLimit = options.hits.pagination?.limit ?? 3; + if (bucket.hits.nodes.length < hitLimit) { + const node = await createNode( + this.connection, + table, + id, + match.getScore(id), + options.hits, + query + ); + bucket.hits.nodes.push(node); + bucket.hits.pagination.count++; + } + } + } + } + } + + return { + pagination: { + count: buckets.length, + limit: 0, + skip: 0, + hasMore: false, + }, + buckets, + }; + } + + search$>( + table: T, + query: Query, + options?: O + ): Observable> { + return merge(of(1), this.watchTableUpdated(table)).pipe( + throttleTime(3000, undefined, { leading: true, trailing: true }), + exhaustMapWithTrailing(() => { + return fromPromise(async () => { + return await this.search(table, query, options); + }).pipe(backoffRetry()); + }) + ); + } + + aggregate$< + T extends keyof IndexerSchema, + const O extends AggregateOptions, + >( + table: T, + query: Query, + field: keyof IndexerSchema[T], + options?: O + ): Observable> { + return merge(of(1), this.watchTableUpdated(table)).pipe( + throttleTime(3000, undefined, { leading: true, trailing: true }), + exhaustMapWithTrailing(() => { + return fromPromise(async () => { + return await this.aggregate(table, query, field, options); + }).pipe(backoffRetry()); + }) + ); + } + + async deleteByQuery( + table: T, + query: Query + ): Promise { + const match = await queryRaw(this.connection, table, query); + const ids = match.toArray(); + for (const id of ids) { + await this.delete(table, id); + } + } + + async insert( + table: T, + document: IndexerDocument + ): Promise { + const schema = IndexerSchema[table]; + for (const [field, values] of document.fields) { + const fieldSchema = schema[field]; + // @ts-expect-error + const shouldIndex = fieldSchema.index !== false; + // @ts-expect-error + const shouldStore = fieldSchema.store !== false; + + if (!shouldStore && !shouldIndex) continue; + + const text = getText(values); + + if (typeof text === 'string') { + await this.connection.apis.ftsAddDocument( + `${table}:${field as string}`, + document.id, + text, + shouldIndex + ); + } + } + this.tableUpdate$.next(table); + } + + async delete( + table: T, + id: string + ): Promise { + const schema = IndexerSchema[table]; + for (const field of Object.keys(schema)) { + await this.connection.apis.ftsDeleteDocument(`${table}:${field}`, id); + } + this.tableUpdate$.next(table); + } + + async update( + table: T, + document: IndexerDocument + ): Promise { + // Update is essentially insert (overwrite) + return this.insert(table, document); + } + + async refresh(_table: T): Promise { + // No-op for memory index + } + + async refreshIfNeed(): Promise { + await this.connection.apis.ftsFlushIndex(); + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/indexer/match.ts b/packages/common/nbstore/src/impls/sqlite/indexer/match.ts new file mode 100644 index 0000000000..51fead106b --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/indexer/match.ts @@ -0,0 +1,105 @@ +export class Match { + scores = new Map(); + /** + * id -> field -> index(multi value field) -> [start, end][] + */ + highlighters = new Map< + string, + Map> + >(); + + constructor() {} + + size() { + return this.scores.size; + } + + getScore(id: string) { + return this.scores.get(id) ?? 0; + } + + addScore(id: string, score: number) { + const currentScore = this.scores.get(id) || 0; + this.scores.set(id, currentScore + score); + } + + getHighlighters(id: string, field: string) { + return this.highlighters.get(id)?.get(field); + } + + addHighlighter( + id: string, + field: string, + index: number, + newRanges: [number, number][] + ) { + const fields = + this.highlighters.get(id) || + new Map>(); + const values = fields.get(field) || new Map(); + const ranges = values.get(index) || []; + ranges.push(...newRanges); + values.set(index, ranges); + fields.set(field, values); + this.highlighters.set(id, fields); + } + + and(other: Match) { + const newMatch = new Match(); + for (const [id, score] of this.scores) { + if (other.scores.has(id)) { + newMatch.addScore(id, score + (other.scores.get(id) ?? 0)); + newMatch.copyExtData(this, id); + newMatch.copyExtData(other, id); + } + } + return newMatch; + } + + or(other: Match) { + const newMatch = new Match(); + for (const [id, score] of this.scores) { + newMatch.addScore(id, score); + newMatch.copyExtData(this, id); + } + for (const [id, score] of other.scores) { + newMatch.addScore(id, score); + newMatch.copyExtData(other, id); + } + return newMatch; + } + + exclude(other: Match) { + const newMatch = new Match(); + for (const [id, score] of this.scores) { + if (!other.scores.has(id)) { + newMatch.addScore(id, score); + newMatch.copyExtData(this, id); + } + } + return newMatch; + } + + boost(boost: number) { + const newMatch = new Match(); + for (const [id, score] of this.scores) { + newMatch.addScore(id, score * boost); + newMatch.copyExtData(this, id); + } + return newMatch; + } + + toArray() { + return Array.from(this.scores.entries()) + .sort((a, b) => b[1] - a[1]) + .map(e => e[0]); + } + + private copyExtData(from: Match, id: string) { + for (const [field, values] of from.highlighters.get(id) ?? []) { + for (const [index, ranges] of values) { + this.addHighlighter(id, field, index, ranges); + } + } + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/indexer/node-builder.ts b/packages/common/nbstore/src/impls/sqlite/indexer/node-builder.ts new file mode 100644 index 0000000000..716b892ab8 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/indexer/node-builder.ts @@ -0,0 +1,97 @@ +import { type Query, type SearchOptions } from '../../../storage'; +import { highlighter } from '../../idb/indexer/highlighter'; +import { type NativeDBConnection } from '../db'; +import { tryParseArrayField } from './utils'; + +export async function createNode( + connection: NativeDBConnection, + table: string, + id: string, + score: number, + options: SearchOptions, + query: Query +) { + const node: any = { id, score }; + + if (options.fields) { + const fields: Record = {}; + for (const field of options.fields) { + const text = await connection.apis.ftsGetDocument( + `${table}:${field as string}`, + id + ); + if (text !== null) { + const parsed = tryParseArrayField(text); + if (parsed) { + fields[field as string] = parsed; + } else { + fields[field as string] = text; + } + } else { + fields[field as string] = ''; + } + } + node.fields = fields; + } + + if (options.highlights) { + const highlights: Record = {}; + const queryStrings = extractQueryStrings(query); + + for (const h of options.highlights) { + const text = await connection.apis.ftsGetDocument( + `${table}:${h.field as string}`, + id + ); + if (text) { + const queryString = Array.from(queryStrings).join(' '); + const matches = await connection.apis.ftsGetMatches( + `${table}:${h.field as string}`, + id, + queryString + ); + + if (matches.length > 0) { + const highlighted = highlighter( + text, + h.before, + h.end, + matches.map(m => [m.start, m.end]), + { + maxPrefix: 20, + maxLength: 50, + } + ); + highlights[h.field as string] = highlighted ? [highlighted] : []; + } else { + highlights[h.field as string] = []; + } + } else { + highlights[h.field as string] = []; + } + } + node.highlights = highlights; + } + + return node; +} + +function extractQueryStrings(query: Query): Set { + const terms = new Set(); + if (query.type === 'match') { + terms.add(query.match); + } else if (query.type === 'boolean') { + for (const q of query.queries) { + const subTerms = extractQueryStrings(q); + for (const term of subTerms) { + terms.add(term); + } + } + } else if (query.type === 'boost') { + const subTerms = extractQueryStrings(query.query); + for (const term of subTerms) { + terms.add(term); + } + } + return terms; +} diff --git a/packages/common/nbstore/src/impls/sqlite/indexer/query.ts b/packages/common/nbstore/src/impls/sqlite/indexer/query.ts new file mode 100644 index 0000000000..d2f04e9e25 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/indexer/query.ts @@ -0,0 +1,72 @@ +import { IndexerSchema, type Query } from '../../../storage'; +import { type NativeDBConnection } from '../db'; +import { Match } from './match'; + +export async function queryRaw( + connection: NativeDBConnection, + table: string, + query: Query +): Promise { + if (query.type === 'match') { + const indexName = `${table}:${String(query.field)}`; + const hits = await connection.apis.ftsSearch(indexName, query.match); + const match = new Match(); + for (const hit of hits ?? []) { + match.addScore(hit.id, hit.score); + } + return match; + } else if (query.type === 'boolean') { + const matches: Match[] = []; + for (const q of query.queries) { + matches.push(await queryRaw(connection, table, q)); + } + + if (query.occur === 'must') { + if (matches.length === 0) return new Match(); + return matches.reduce((acc, m) => acc.and(m)); + } else if (query.occur === 'should') { + if (matches.length === 0) return new Match(); + return matches.reduce((acc, m) => acc.or(m)); + } else if (query.occur === 'must_not') { + const union = matches.reduce((acc, m) => acc.or(m), new Match()); + const all = await matchAll(connection, table); + return all.exclude(union); + } + } else if (query.type === 'all') { + return matchAll(connection, table); + } else if (query.type === 'boost') { + const match = await queryRaw(connection, table, query.query); + return match.boost(query.boost); + } else if (query.type === 'exists') { + const indexName = `${table}:${String(query.field)}`; + const hits = await connection.apis.ftsSearch(indexName, '*'); + const match = new Match(); + for (const hit of hits ?? []) { + match.addScore(hit.id, 1); + } + return match; + } + + return new Match(); +} + +export async function matchAll( + connection: NativeDBConnection, + table: string +): Promise { + const schema = IndexerSchema[table as keyof IndexerSchema]; + if (!schema) return new Match(); + + const match = new Match(); + for (const field of Object.keys(schema)) { + const indexName = `${table}:${field}`; + let hits = await connection.apis.ftsSearch(indexName, ''); + if (!hits || hits.length === 0) { + hits = await connection.apis.ftsSearch(indexName, '*'); + } + for (const hit of hits ?? []) { + match.addScore(hit.id, 1); + } + } + return match; +} diff --git a/packages/common/nbstore/src/impls/sqlite/indexer/utils.ts b/packages/common/nbstore/src/impls/sqlite/indexer/utils.ts new file mode 100644 index 0000000000..eb05fc0881 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/indexer/utils.ts @@ -0,0 +1,20 @@ +export function getText( + val: string | string[] | undefined +): string | undefined { + if (Array.isArray(val)) { + return JSON.stringify(val); + } + return val; +} + +export function tryParseArrayField(text: string): any[] | null { + if (text.startsWith('[') && text.endsWith(']')) { + try { + const parsed = JSON.parse(text); + if (Array.isArray(parsed)) { + return parsed; + } + } catch {} + } + return null; +} diff --git a/packages/common/nbstore/src/storage/indexer.ts b/packages/common/nbstore/src/storage/indexer.ts index edfc6c1ed0..aa031358a8 100644 --- a/packages/common/nbstore/src/storage/indexer.ts +++ b/packages/common/nbstore/src/storage/indexer.ts @@ -14,6 +14,7 @@ import type { Storage } from './storage'; export interface IndexerStorage extends Storage { readonly storageType: 'indexer'; readonly isReadonly: boolean; + readonly recommendRefreshInterval: number; // 100ms search>( table: T, @@ -120,6 +121,7 @@ export type AggregateResult< export abstract class IndexerStorageBase implements IndexerStorage { readonly storageType = 'indexer'; + readonly recommendRefreshInterval: number = 100; // 100ms abstract readonly connection: Connection; abstract readonly isReadonly: boolean; diff --git a/packages/common/nbstore/src/sync/indexer/index.ts b/packages/common/nbstore/src/sync/indexer/index.ts index 7ce9a3d433..d0f6cc3b01 100644 --- a/packages/common/nbstore/src/sync/indexer/index.ts +++ b/packages/common/nbstore/src/sync/indexer/index.ts @@ -109,7 +109,7 @@ export class IndexerSyncImpl implements IndexerSync { /** * increase this number to re-index all docs */ - readonly INDEXER_VERSION = 1; + readonly INDEXER_VERSION = 2; private abort: AbortController | null = null; private readonly rootDocId = this.doc.spaceId; private readonly status = new IndexerSyncStatus(this.rootDocId); @@ -484,9 +484,16 @@ export class IndexerSyncImpl implements IndexerSync { } } + // ensure the indexer is refreshed according to recommendRefreshInterval + // recommendRefreshInterval <= 0 means force refresh on each operation + // recommendRefreshInterval > 0 means refresh if the last refresh is older than recommendRefreshInterval private async refreshIfNeed(): Promise { - if (this.lastRefreshed + 100 < Date.now()) { - console.log('[indexer] refreshing indexer'); + const recommendRefreshInterval = this.indexer.recommendRefreshInterval ?? 0; + const needRefresh = + recommendRefreshInterval > 0 && + this.lastRefreshed + recommendRefreshInterval < Date.now(); + const forceRefresh = recommendRefreshInterval <= 0; + if (needRefresh || forceRefresh) { await this.indexer.refreshIfNeed(); this.lastRefreshed = Date.now(); } diff --git a/packages/frontend/apps/android/src/plugins/nbstore/definitions.ts b/packages/frontend/apps/android/src/plugins/nbstore/definitions.ts index bc2c1892e8..0db7082376 100644 --- a/packages/frontend/apps/android/src/plugins/nbstore/definitions.ts +++ b/packages/frontend/apps/android/src/plugins/nbstore/definitions.ts @@ -155,4 +155,33 @@ export interface NbStorePlugin { id: string; docId: string; }) => Promise; + ftsAddDocument: (options: { + id: string; + indexName: string; + docId: string; + text: string; + index: boolean; + }) => Promise; + ftsDeleteDocument: (options: { + id: string; + indexName: string; + docId: string; + }) => Promise; + ftsSearch: (options: { + id: string; + indexName: string; + query: string; + }) => Promise<{ id: string; score: number }[]>; + ftsGetDocument: (options: { + id: string; + indexName: string; + docId: string; + }) => Promise<{ text: string | null }>; + ftsGetMatches: (options: { + id: string; + indexName: string; + docId: string; + query: string; + }) => Promise>; + ftsFlushIndex: (options: { id: string }) => Promise; } diff --git a/packages/frontend/apps/android/src/plugins/nbstore/index.ts b/packages/frontend/apps/android/src/plugins/nbstore/index.ts index 3390afb40d..881162c258 100644 --- a/packages/frontend/apps/android/src/plugins/nbstore/index.ts +++ b/packages/frontend/apps/android/src/plugins/nbstore/index.ts @@ -339,4 +339,71 @@ export const NbStoreNativeDBApis: NativeDBApis = { crawlDocData: async function (id: string, docId: string) { return NbStore.crawlDocData({ id, docId }); }, + ftsAddDocument: async function ( + id: string, + indexName: string, + docId: string, + text: string, + index: boolean + ): Promise { + await NbStore.ftsAddDocument({ + id, + indexName, + docId, + text, + index, + }); + }, + ftsDeleteDocument: async function ( + id: string, + indexName: string, + docId: string + ): Promise { + await NbStore.ftsDeleteDocument({ + id, + indexName, + docId, + }); + }, + ftsSearch: async function ( + id: string, + indexName: string, + query: string + ): Promise<{ id: string; score: number }[]> { + return await NbStore.ftsSearch({ + id, + indexName, + query, + }); + }, + ftsGetDocument: async function ( + id: string, + indexName: string, + docId: string + ): Promise { + const result = await NbStore.ftsGetDocument({ + id, + indexName, + docId, + }); + return result.text; + }, + ftsGetMatches: async function ( + id: string, + indexName: string, + docId: string, + query: string + ): Promise<{ start: number; end: number }[]> { + return await NbStore.ftsGetMatches({ + id, + indexName, + docId, + query, + }); + }, + ftsFlushIndex: async function (id: string): Promise { + await NbStore.ftsFlushIndex({ + id, + }); + }, }; diff --git a/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts index 5a9ed91296..2735d80788 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts @@ -48,4 +48,10 @@ export const nbstoreHandlers: NativeDBApis = { setBlobUploadedAt: POOL.setBlobUploadedAt.bind(POOL), getBlobUploadedAt: POOL.getBlobUploadedAt.bind(POOL), crawlDocData: POOL.crawlDocData.bind(POOL), + ftsAddDocument: POOL.ftsAddDocument.bind(POOL), + ftsDeleteDocument: POOL.ftsDeleteDocument.bind(POOL), + ftsSearch: POOL.ftsSearch.bind(POOL), + ftsGetDocument: POOL.ftsGetDocument.bind(POOL), + ftsGetMatches: POOL.ftsGetMatches.bind(POOL), + ftsFlushIndex: POOL.ftsFlushIndex.bind(POOL), }; diff --git a/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts b/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts index bc2c1892e8..0db7082376 100644 --- a/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts +++ b/packages/frontend/apps/ios/src/plugins/nbstore/definitions.ts @@ -155,4 +155,33 @@ export interface NbStorePlugin { id: string; docId: string; }) => Promise; + ftsAddDocument: (options: { + id: string; + indexName: string; + docId: string; + text: string; + index: boolean; + }) => Promise; + ftsDeleteDocument: (options: { + id: string; + indexName: string; + docId: string; + }) => Promise; + ftsSearch: (options: { + id: string; + indexName: string; + query: string; + }) => Promise<{ id: string; score: number }[]>; + ftsGetDocument: (options: { + id: string; + indexName: string; + docId: string; + }) => Promise<{ text: string | null }>; + ftsGetMatches: (options: { + id: string; + indexName: string; + docId: string; + query: string; + }) => Promise>; + ftsFlushIndex: (options: { id: string }) => Promise; } diff --git a/packages/frontend/apps/ios/src/plugins/nbstore/index.ts b/packages/frontend/apps/ios/src/plugins/nbstore/index.ts index abc7461dce..766c2cabea 100644 --- a/packages/frontend/apps/ios/src/plugins/nbstore/index.ts +++ b/packages/frontend/apps/ios/src/plugins/nbstore/index.ts @@ -343,4 +343,71 @@ export const NbStoreNativeDBApis: NativeDBApis = { ): Promise { return await NbStore.crawlDocData({ id, docId }); }, + ftsAddDocument: async function ( + id: string, + indexName: string, + docId: string, + text: string, + index: boolean + ): Promise { + await NbStore.ftsAddDocument({ + id, + indexName, + docId, + text, + index, + }); + }, + ftsDeleteDocument: async function ( + id: string, + indexName: string, + docId: string + ): Promise { + await NbStore.ftsDeleteDocument({ + id, + indexName, + docId, + }); + }, + ftsSearch: async function ( + id: string, + indexName: string, + query: string + ): Promise<{ id: string; score: number }[]> { + return await NbStore.ftsSearch({ + id, + indexName, + query, + }); + }, + ftsGetDocument: async function ( + id: string, + indexName: string, + docId: string + ): Promise { + const result = await NbStore.ftsGetDocument({ + id, + indexName, + docId, + }); + return result.text; + }, + ftsGetMatches: async function ( + id: string, + indexName: string, + docId: string, + query: string + ): Promise<{ start: number; end: number }[]> { + return await NbStore.ftsGetMatches({ + id, + indexName, + docId, + query, + }); + }, + ftsFlushIndex: async function (id: string): Promise { + await NbStore.ftsFlushIndex({ + id, + }); + }, }; diff --git a/packages/frontend/core/src/modules/workspace-engine/impls/local.ts b/packages/frontend/core/src/modules/workspace-engine/impls/local.ts index a2a2fb8cfc..2f57657b6c 100644 --- a/packages/frontend/core/src/modules/workspace-engine/impls/local.ts +++ b/packages/frontend/core/src/modules/workspace-engine/impls/local.ts @@ -10,6 +10,7 @@ import { IndexedDBBlobSyncStorage, IndexedDBDocStorage, IndexedDBDocSyncStorage, + IndexedDBIndexerStorage, } from '@affine/nbstore/idb'; import { IndexedDBV1BlobStorage, @@ -20,6 +21,7 @@ import { SqliteBlobSyncStorage, SqliteDocStorage, SqliteDocSyncStorage, + SqliteIndexerStorage, } from '@affine/nbstore/sqlite'; import { SqliteV1BlobStorage, @@ -107,6 +109,9 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider { BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS || BUILD_CONFIG.isAndroid ? SqliteBlobSyncStorage : IndexedDBBlobSyncStorage; + IndexerStorageType = BUILD_CONFIG.isElectron + ? SqliteIndexerStorage + : IndexedDBIndexerStorage; async deleteWorkspace(id: string): Promise { setLocalWorkspaceIds(ids => ids.filter(x => x !== id)); @@ -351,7 +356,7 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider { }, }, indexer: { - name: 'IndexedDBIndexerStorage', + name: this.IndexerStorageType.identifier, opts: { flavour: this.flavour, type: 'workspace', diff --git a/packages/frontend/native/index.d.ts b/packages/frontend/native/index.d.ts index 36f2f775c8..32c9d6b877 100644 --- a/packages/frontend/native/index.d.ts +++ b/packages/frontend/native/index.d.ts @@ -82,6 +82,12 @@ export declare class DocStoragePool { clearClocks(universalId: string): Promise setBlobUploadedAt(universalId: string, peer: string, blobId: string, uploadedAt?: Date | undefined | null): Promise getBlobUploadedAt(universalId: string, peer: string, blobId: string): Promise + ftsAddDocument(id: string, indexName: string, docId: string, text: string, index: boolean): Promise + ftsFlushIndex(id: string): Promise + ftsDeleteDocument(id: string, indexName: string, docId: string): Promise + ftsGetDocument(id: string, indexName: string, docId: string): Promise + ftsSearch(id: string, indexName: string, query: string): Promise> + ftsGetMatches(id: string, indexName: string, docId: string, query: string): Promise> } export interface Blob { @@ -134,6 +140,16 @@ export interface NativeCrawlResult { summary: string } +export interface NativeMatch { + start: number + end: number +} + +export interface NativeSearchHit { + id: string + score: number +} + export interface SetBlob { key: string data: Uint8Array diff --git a/packages/frontend/native/nbstore/Cargo.toml b/packages/frontend/native/nbstore/Cargo.toml index 7b74d970db..f8400a39ea 100644 --- a/packages/frontend/native/nbstore/Cargo.toml +++ b/packages/frontend/native/nbstore/Cargo.toml @@ -13,9 +13,12 @@ use-as-lib = ["napi-derive/noop", "napi/noop"] affine_common = { workspace = true, features = ["ydoc-loader"] } affine_schema = { path = "../schema" } anyhow = { workspace = true } +bincode = { version = "2.0.1", features = ["serde"] } chrono = { workspace = true } +jieba-rs = "0.8.1" napi = { workspace = true } napi-derive = { workspace = true } +once_cell = { workspace = true } serde = { workspace = true, features = ["derive"] } sqlx = { workspace = true, default-features = false, features = [ "chrono", @@ -26,8 +29,10 @@ sqlx = { workspace = true, default-features = false, features = [ "tls-rustls", ] } thiserror = { workspace = true } +tiniestsegmenter = "0.3" tokio = { workspace = true, features = ["full"] } y-octo = { workspace = true } +zstd = "0.13" [target.'cfg(any(target_os = "ios", target_os = "android"))'.dependencies] uniffi = { workspace = true } diff --git a/packages/frontend/native/nbstore/src/error.rs b/packages/frontend/native/nbstore/src/error.rs index 608368c911..60b46d80f0 100644 --- a/packages/frontend/native/nbstore/src/error.rs +++ b/packages/frontend/native/nbstore/src/error.rs @@ -10,6 +10,8 @@ pub enum Error { MigrateError(#[from] sqlx::migrate::MigrateError), #[error("Invalid operation")] InvalidOperation, + #[error("Serialization Error: {0}")] + Serialization(String), #[error(transparent)] Parse(#[from] ParseError), } diff --git a/packages/frontend/native/nbstore/src/indexer.rs b/packages/frontend/native/nbstore/src/indexer.rs deleted file mode 100644 index 149c8e305f..0000000000 --- a/packages/frontend/native/nbstore/src/indexer.rs +++ /dev/null @@ -1,180 +0,0 @@ -use affine_common::doc_parser::{parse_doc_from_binary, BlockInfo, CrawlResult, ParseError}; -use napi_derive::napi; -use serde::Serialize; -use y_octo::DocOptions; - -use super::{error::Result, storage::SqliteDocStorage}; - -#[napi(object)] -#[derive(Debug, Serialize)] -pub struct NativeBlockInfo { - pub block_id: String, - pub flavour: String, - pub content: Option>, - pub blob: Option>, - pub ref_doc_id: Option>, - pub ref_info: Option>, - pub parent_flavour: Option, - pub parent_block_id: Option, - pub additional: Option, -} - -#[napi(object)] -#[derive(Debug, Serialize)] -pub struct NativeCrawlResult { - pub blocks: Vec, - pub title: String, - pub summary: String, -} - -impl From for NativeBlockInfo { - fn from(value: BlockInfo) -> Self { - Self { - block_id: value.block_id, - flavour: value.flavour, - content: value.content, - blob: value.blob, - ref_doc_id: value.ref_doc_id, - ref_info: value.ref_info, - parent_flavour: value.parent_flavour, - parent_block_id: value.parent_block_id, - additional: value.additional, - } - } -} - -impl From for NativeCrawlResult { - fn from(value: CrawlResult) -> Self { - Self { - blocks: value.blocks.into_iter().map(Into::into).collect(), - title: value.title, - summary: value.summary, - } - } -} - -impl SqliteDocStorage { - pub async fn crawl_doc_data(&self, doc_id: &str) -> Result { - let doc_bin = self - .load_doc_binary(doc_id) - .await? - .ok_or(ParseError::DocNotFound)?; - - let result = parse_doc_from_binary(doc_bin, doc_id.to_string())?; - Ok(result.into()) - } - - async fn load_doc_binary(&self, doc_id: &str) -> Result>> { - let snapshot = self.get_doc_snapshot(doc_id.to_string()).await?; - let mut updates = self.get_doc_updates(doc_id.to_string()).await?; - - if snapshot.is_none() && updates.is_empty() { - return Ok(None); - } - - updates.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); - - let mut segments = - Vec::with_capacity(snapshot.as_ref().map(|_| 1).unwrap_or(0) + updates.len()); - if let Some(record) = snapshot { - segments.push(record.bin.to_vec()); - } - segments.extend(updates.into_iter().map(|update| update.bin.to_vec())); - - merge_updates(segments, doc_id).map(Some) - } -} - -fn merge_updates(mut segments: Vec>, guid: &str) -> Result> { - if segments.is_empty() { - return Err(ParseError::DocNotFound.into()); - } - - if segments.len() == 1 { - return segments.pop().ok_or(ParseError::DocNotFound.into()); - } - - let mut doc = DocOptions::new().with_guid(guid.to_string()).build(); - for update in segments.iter() { - doc - .apply_update_from_binary_v1(update) - .map_err(|_| ParseError::InvalidBinary)?; - } - - let buffer = doc - .encode_update_v1() - .map_err(|err| ParseError::ParserError(err.to_string()))?; - - Ok(buffer) -} - -#[cfg(test)] -mod tests { - use std::path::{Path, PathBuf}; - - use affine_common::doc_parser::ParseError; - use chrono::Utc; - use serde_json::Value; - use tokio::fs; - use uuid::Uuid; - - use super::{super::error::Error, *}; - - const DEMO_BIN: &[u8] = include_bytes!("../../../../common/native/fixtures/demo.ydoc"); - const DEMO_JSON: &[u8] = include_bytes!("../../../../common/native/fixtures/demo.ydoc.json"); - - fn temp_workspace_dir() -> PathBuf { - std::env::temp_dir().join(format!("affine-native-{}", Uuid::new_v4())) - } - - async fn init_db(path: &Path) -> SqliteDocStorage { - fs::create_dir_all(path.parent().unwrap()).await.unwrap(); - let storage = SqliteDocStorage::new(path.to_string_lossy().into_owned()); - storage.connect().await.unwrap(); - storage - } - - async fn cleanup(path: &Path) { - let _ = fs::remove_dir_all(path.parent().unwrap()).await; - } - - #[tokio::test] - async fn parse_demo_snapshot_matches_fixture() { - let base = temp_workspace_dir(); - fs::create_dir_all(&base).await.unwrap(); - let db_path = base.join("storage.db"); - - let storage = init_db(&db_path).await; - sqlx::query(r#"INSERT INTO snapshots (doc_id, data, updated_at) VALUES (?, ?, ?)"#) - .bind("demo-doc") - .bind(DEMO_BIN) - .bind(Utc::now().naive_utc()) - .execute(&storage.pool) - .await - .unwrap(); - - let result = storage.crawl_doc_data("demo-doc").await.unwrap(); - - let expected: Value = serde_json::from_slice(DEMO_JSON).unwrap(); - let actual = serde_json::to_value(&result).unwrap(); - assert_eq!(expected, actual); - - storage.close().await; - cleanup(&db_path).await; - } - - #[tokio::test] - async fn missing_doc_returns_error() { - let base = temp_workspace_dir(); - fs::create_dir_all(&base).await.unwrap(); - let db_path = base.join("storage.db"); - - let storage = init_db(&db_path).await; - - let err = storage.crawl_doc_data("absent-doc").await.unwrap_err(); - assert!(matches!(err, Error::Parse(ParseError::DocNotFound))); - - storage.close().await; - cleanup(&db_path).await; - } -} diff --git a/packages/frontend/native/nbstore/src/indexer/memory_indexer.rs b/packages/frontend/native/nbstore/src/indexer/memory_indexer.rs new file mode 100644 index 0000000000..f4c927ce88 --- /dev/null +++ b/packages/frontend/native/nbstore/src/indexer/memory_indexer.rs @@ -0,0 +1,261 @@ +use std::collections::{HashMap, HashSet}; + +use super::{ + tokenizer::tokenize, + types::{DocData, SnapshotData}, +}; + +type DirtyDoc = (String, String, String, i64); +type DeletedDoc = HashMap>; + +#[derive(Default, Debug)] +pub struct InMemoryIndex { + pub docs: HashMap>, + pub inverted: HashMap>>, + pub total_lens: HashMap, + pub dirty: HashMap>, + pub deleted: HashMap>, +} + +impl InMemoryIndex { + pub fn add_doc(&mut self, index_name: &str, doc_id: &str, text: &str, index: bool) { + let tokens = if index { tokenize(text) } else { vec![] }; + // doc_len should be the number of tokens (including duplicates) + let doc_len = tokens.len() as i64; + + let mut pos_map: HashMap> = HashMap::new(); + for token in tokens { + pos_map + .entry(token.term) + .or_default() + .push((token.start as u32, token.end as u32)); + } + + if let Some(docs) = self.docs.get_mut(index_name) { + if let Some(old_data) = docs.remove(doc_id) { + *self.total_lens.entry(index_name.to_string()).or_default() -= old_data.doc_len; + + if let Some(inverted) = self.inverted.get_mut(index_name) { + for (term, _) in old_data.term_pos { + if let Some(doc_map) = inverted.get_mut(&term) { + doc_map.remove(doc_id); + if doc_map.is_empty() { + inverted.remove(&term); + } + } + } + } + } + } + + let doc_data = DocData { + content: text.to_string(), + doc_len, + term_pos: pos_map.clone(), + }; + + self + .docs + .entry(index_name.to_string()) + .or_default() + .insert(doc_id.to_string(), doc_data); + *self.total_lens.entry(index_name.to_string()).or_default() += doc_len; + + let inverted = self.inverted.entry(index_name.to_string()).or_default(); + for (term, positions) in pos_map { + inverted + .entry(term) + .or_default() + .insert(doc_id.to_string(), positions.len() as i64); + } + + self + .dirty + .entry(index_name.to_string()) + .or_default() + .insert(doc_id.to_string()); + if let Some(deleted) = self.deleted.get_mut(index_name) { + deleted.remove(doc_id); + } + } + + pub fn remove_doc(&mut self, index_name: &str, doc_id: &str) { + if let Some(docs) = self.docs.get_mut(index_name) { + if let Some(old_data) = docs.remove(doc_id) { + *self.total_lens.entry(index_name.to_string()).or_default() -= old_data.doc_len; + + if let Some(inverted) = self.inverted.get_mut(index_name) { + for (term, _) in old_data.term_pos { + if let Some(doc_map) = inverted.get_mut(&term) { + doc_map.remove(doc_id); + if doc_map.is_empty() { + inverted.remove(&term); + } + } + } + } + + self + .deleted + .entry(index_name.to_string()) + .or_default() + .insert(doc_id.to_string()); + if let Some(dirty) = self.dirty.get_mut(index_name) { + dirty.remove(doc_id); + } + } + } + } + + pub fn get_doc(&self, index_name: &str, doc_id: &str) -> Option { + self + .docs + .get(index_name) + .and_then(|docs| docs.get(doc_id)) + .map(|d| d.content.clone()) + } + + pub fn search(&self, index_name: &str, query: &str) -> Vec<(String, f64)> { + if query == "*" || query.is_empty() { + if let Some(docs) = self.docs.get(index_name) { + return docs.keys().map(|k| (k.clone(), 1.0)).collect(); + } + return vec![]; + } + + let query_terms = tokenize(query); + if query_terms.is_empty() { + return vec![]; + } + + let inverted = match self.inverted.get(index_name) { + Some(i) => i, + None => return vec![], + }; + + let mut candidates: Option> = None; + + for token in &query_terms { + if let Some(doc_map) = inverted.get(&token.term) { + let docs: HashSet = doc_map.keys().cloned().collect(); + match candidates { + None => candidates = Some(docs), + Some(ref mut c) => { + c.retain(|id| docs.contains(id)); + } + } + if candidates.as_ref().unwrap().is_empty() { + return vec![]; + } + } else { + return vec![]; + } + } + + let candidates = candidates.unwrap_or_default(); + if candidates.is_empty() { + return vec![]; + } + + let docs = self.docs.get(index_name).unwrap(); + let total_len = *self.total_lens.get(index_name).unwrap_or(&0); + let n = docs.len() as f64; + let avgdl = if n > 0.0 { total_len as f64 / n } else { 0.0 }; + + let k1 = 1.2; + let b = 0.75; + + let mut scores: Vec<(String, f64)> = Vec::with_capacity(candidates.len()); + + let mut idfs = HashMap::new(); + for token in &query_terms { + let n_q = inverted.get(&token.term).map(|m| m.len()).unwrap_or(0) as f64; + let idf = ((n - n_q + 0.5) / (n_q + 0.5) + 1.0).ln(); + idfs.insert(&token.term, idf); + } + + for doc_id in candidates { + let doc_data = docs.get(&doc_id).unwrap(); + let mut score = 0.0; + + for token in &query_terms { + if let Some(positions) = doc_data.term_pos.get(&token.term) { + let freq = positions.len() as f64; + let idf = idfs.get(&token.term).unwrap(); + let numerator = freq * (k1 + 1.0); + let denominator = freq + k1 * (1.0 - b + b * (doc_data.doc_len as f64 / avgdl)); + score += idf * (numerator / denominator); + } + } + scores.push((doc_id, score)); + } + + scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); + + scores + } + + pub fn take_dirty_and_deleted(&mut self) -> (Vec, DeletedDoc) { + let dirty = std::mem::take(&mut self.dirty); + let deleted = std::mem::take(&mut self.deleted); + + let mut dirty_data = Vec::new(); + for (index_name, doc_ids) in &dirty { + if let Some(docs) = self.docs.get(index_name) { + for doc_id in doc_ids { + if let Some(data) = docs.get(doc_id) { + dirty_data.push(( + index_name.clone(), + doc_id.clone(), + data.content.clone(), + data.doc_len, + )); + } + } + } + } + (dirty_data, deleted) + } + + pub fn get_matches(&self, index_name: &str, doc_id: &str, query: &str) -> Vec<(u32, u32)> { + let mut matches = Vec::new(); + if let Some(docs) = self.docs.get(index_name) { + if let Some(doc_data) = docs.get(doc_id) { + let query_tokens = tokenize(query); + for token in query_tokens { + if let Some(positions) = doc_data.term_pos.get(&token.term) { + matches.extend(positions.iter().cloned()); + } + } + } + } + matches.sort_by(|a, b| a.0.cmp(&b.0)); + matches + } + + pub fn load_snapshot(&mut self, index_name: &str, snapshot: SnapshotData) { + let docs = self.docs.entry(index_name.to_string()).or_default(); + let inverted = self.inverted.entry(index_name.to_string()).or_default(); + let total_len = self.total_lens.entry(index_name.to_string()).or_default(); + + for (doc_id, doc_data) in snapshot.docs { + *total_len += doc_data.doc_len; + + for (term, positions) in &doc_data.term_pos { + inverted + .entry(term.clone()) + .or_default() + .insert(doc_id.clone(), positions.len() as i64); + } + + docs.insert(doc_id, doc_data); + } + } + + pub fn get_snapshot_data(&self, index_name: &str) -> Option { + self + .docs + .get(index_name) + .map(|docs| SnapshotData { docs: docs.clone() }) + } +} diff --git a/packages/frontend/native/nbstore/src/indexer/mod.rs b/packages/frontend/native/nbstore/src/indexer/mod.rs new file mode 100644 index 0000000000..38da761112 --- /dev/null +++ b/packages/frontend/native/nbstore/src/indexer/mod.rs @@ -0,0 +1,266 @@ +mod memory_indexer; +mod tokenizer; +mod types; + +use affine_common::doc_parser::{parse_doc_from_binary, ParseError}; +pub use memory_indexer::InMemoryIndex; +use sqlx::Row; +pub use types::{ + DocData, NativeBlockInfo, NativeCrawlResult, NativeMatch, NativeSearchHit, SnapshotData, +}; +use y_octo::DocOptions; + +use super::{ + error::{Error, Result}, + storage::SqliteDocStorage, +}; + +impl SqliteDocStorage { + pub async fn crawl_doc_data(&self, doc_id: &str) -> Result { + let doc_bin = self + .load_doc_binary(doc_id) + .await? + .ok_or(ParseError::DocNotFound)?; + + let result = parse_doc_from_binary(doc_bin, doc_id.to_string())?; + Ok(result.into()) + } + + async fn load_doc_binary(&self, doc_id: &str) -> Result>> { + let snapshot = self.get_doc_snapshot(doc_id.to_string()).await?; + let mut updates = self.get_doc_updates(doc_id.to_string()).await?; + + if snapshot.is_none() && updates.is_empty() { + return Ok(None); + } + + updates.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); + + let mut segments = + Vec::with_capacity(snapshot.as_ref().map(|_| 1).unwrap_or(0) + updates.len()); + if let Some(record) = snapshot { + segments.push(record.bin.to_vec()); + } + segments.extend(updates.into_iter().map(|update| update.bin.to_vec())); + + merge_updates(segments, doc_id).map(Some) + } + + pub async fn init_index(&self) -> Result<()> { + let snapshots = sqlx::query("SELECT index_name, data FROM idx_snapshots") + .fetch_all(&self.pool) + .await?; + + { + let mut index = self.index.write().await; + for row in snapshots { + let index_name: String = row.get("index_name"); + let data: Vec = row.get("data"); + if let Ok(decompressed) = zstd::stream::decode_all(std::io::Cursor::new(&data)) { + if let Ok((snapshot, _)) = bincode::serde::decode_from_slice::( + &decompressed, + bincode::config::standard(), + ) { + index.load_snapshot(&index_name, snapshot); + } + } + } + } + + Ok(()) + } + + async fn compact_index(&self, index_name: &str) -> Result<()> { + let snapshot_data = { + let index = self.index.read().await; + index.get_snapshot_data(index_name) + }; + + if let Some(data) = snapshot_data { + let blob = bincode::serde::encode_to_vec(&data, bincode::config::standard()) + .map_err(|e| Error::Serialization(e.to_string()))?; + let compressed = zstd::stream::encode_all(std::io::Cursor::new(&blob), 0) + .map_err(|e| Error::Serialization(e.to_string()))?; + + let mut tx = self.pool.begin().await?; + + sqlx::query("INSERT OR REPLACE INTO idx_snapshots (index_name, data) VALUES (?, ?)") + .bind(index_name) + .bind(compressed) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + } + Ok(()) + } + + pub async fn flush_index(&self) -> Result<()> { + let (dirty_docs, deleted_docs) = { + let mut index = self.index.write().await; + index.take_dirty_and_deleted() + }; + + if dirty_docs.is_empty() && deleted_docs.is_empty() { + return Ok(()); + } + + let mut modified_indices = std::collections::HashSet::new(); + for index_name in deleted_docs.keys() { + modified_indices.insert(index_name.clone()); + } + for (index_name, _, _, _) in &dirty_docs { + modified_indices.insert(index_name.clone()); + } + + for index_name in modified_indices { + self.compact_index(&index_name).await?; + } + + Ok(()) + } + + pub async fn fts_add( + &self, + index_name: &str, + doc_id: &str, + text: &str, + index: bool, + ) -> Result<()> { + let mut idx = self.index.write().await; + idx.add_doc(index_name, doc_id, text, index); + Ok(()) + } + + pub async fn fts_delete(&self, index_name: &str, doc_id: &str) -> Result<()> { + let mut idx = self.index.write().await; + idx.remove_doc(index_name, doc_id); + Ok(()) + } + + pub async fn fts_get(&self, index_name: &str, doc_id: &str) -> Result> { + let idx = self.index.read().await; + Ok(idx.get_doc(index_name, doc_id)) + } + + pub async fn fts_search(&self, index_name: &str, query: &str) -> Result> { + let idx = self.index.read().await; + Ok( + idx + .search(index_name, query) + .into_iter() + .map(|(id, score)| NativeSearchHit { id, score }) + .collect(), + ) + } + + pub async fn fts_get_matches( + &self, + index_name: &str, + doc_id: &str, + query: &str, + ) -> Result> { + let idx = self.index.read().await; + Ok( + idx + .get_matches(index_name, doc_id, query) + .into_iter() + .map(|(start, end)| NativeMatch { start, end }) + .collect(), + ) + } +} + +fn merge_updates(mut segments: Vec>, guid: &str) -> Result> { + if segments.is_empty() { + return Err(ParseError::DocNotFound.into()); + } + + if segments.len() == 1 { + return segments.pop().ok_or(ParseError::DocNotFound.into()); + } + + let mut doc = DocOptions::new().with_guid(guid.to_string()).build(); + for update in segments.iter() { + doc + .apply_update_from_binary_v1(update) + .map_err(|_| ParseError::InvalidBinary)?; + } + + let buffer = doc + .encode_update_v1() + .map_err(|err| ParseError::ParserError(err.to_string()))?; + + Ok(buffer) +} + +#[cfg(test)] +mod tests { + use std::path::{Path, PathBuf}; + + use affine_common::doc_parser::ParseError; + use chrono::Utc; + use serde_json::Value; + use tokio::fs; + use uuid::Uuid; + + use super::{super::error::Error, *}; + + const DEMO_BIN: &[u8] = include_bytes!("../../../../../common/native/fixtures/demo.ydoc"); + const DEMO_JSON: &[u8] = include_bytes!("../../../../../common/native/fixtures/demo.ydoc.json"); + + fn temp_workspace_dir() -> PathBuf { + std::env::temp_dir().join(format!("affine-native-{}", Uuid::new_v4())) + } + + async fn init_db(path: &Path) -> SqliteDocStorage { + fs::create_dir_all(path.parent().unwrap()).await.unwrap(); + let storage = SqliteDocStorage::new(path.to_string_lossy().into_owned()); + storage.connect().await.unwrap(); + storage + } + + async fn cleanup(path: &Path) { + let _ = fs::remove_dir_all(path.parent().unwrap()).await; + } + + #[tokio::test] + async fn parse_demo_snapshot_matches_fixture() { + let base = temp_workspace_dir(); + fs::create_dir_all(&base).await.unwrap(); + let db_path = base.join("storage.db"); + + let storage = init_db(&db_path).await; + sqlx::query(r#"INSERT INTO snapshots (doc_id, data, updated_at) VALUES (?, ?, ?)"#) + .bind("demo-doc") + .bind(DEMO_BIN) + .bind(Utc::now().naive_utc()) + .execute(&storage.pool) + .await + .unwrap(); + + let result = storage.crawl_doc_data("demo-doc").await.unwrap(); + + let expected: Value = serde_json::from_slice(DEMO_JSON).unwrap(); + let actual = serde_json::to_value(&result).unwrap(); + assert_eq!(expected, actual); + + storage.close().await; + cleanup(&db_path).await; + } + + #[tokio::test] + async fn missing_doc_returns_error() { + let base = temp_workspace_dir(); + fs::create_dir_all(&base).await.unwrap(); + let db_path = base.join("storage.db"); + + let storage = init_db(&db_path).await; + + let err = storage.crawl_doc_data("absent-doc").await.unwrap_err(); + assert!(matches!(err, Error::Parse(ParseError::DocNotFound))); + + storage.close().await; + cleanup(&db_path).await; + } +} diff --git a/packages/frontend/native/nbstore/src/indexer/tokenizer.rs b/packages/frontend/native/nbstore/src/indexer/tokenizer.rs new file mode 100644 index 0000000000..5a4753ad1b --- /dev/null +++ b/packages/frontend/native/nbstore/src/indexer/tokenizer.rs @@ -0,0 +1,85 @@ +use jieba_rs::Jieba; +use once_cell::sync::Lazy; +use tiniestsegmenter::tokenize as ts_tokenize; + +static JIEBA: Lazy = Lazy::new(Jieba::new); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Token { + pub term: String, + pub start: usize, + pub end: usize, +} + +pub fn tokenize(text: &str) -> Vec { + let mut tokens = Vec::new(); + + // Use jieba for Chinese/English + // Jieba tokenize returns tokens with offsets + let jieba_tokens = JIEBA.tokenize(text, jieba_rs::TokenizeMode::Search, false); + for token in jieba_tokens { + if token.word.chars().any(|c| c.is_alphanumeric()) { + tokens.push(Token { + term: token.word.to_lowercase(), + start: token.start, + end: token.end, + }); + } + } + + // Use TinySegmenter for Japanese + // TinySegmenter does not provide offsets, so we have to find them manually + // This is a simplified approach and might not be perfect for repeated terms + let mut last_pos = 0; + for term in ts_tokenize(text) { + if term.chars().any(|c| c.is_alphanumeric()) { + if let Some(pos) = text[last_pos..].find(term) { + let start = last_pos + pos; + let end = start + term.len(); + tokens.push(Token { + term: term.to_lowercase(), + start, + end, + }); + last_pos = end; + } + } + } + + // Manually handle Korean bigrams and unigrams + let chars: Vec = text.chars().collect(); + let mut byte_offset = 0; + for (i, &c) in chars.iter().enumerate() { + let char_len = c.len_utf8(); + if is_hangul(c) { + tokens.push(Token { + term: c.to_string().to_lowercase(), + start: byte_offset, + end: byte_offset + char_len, + }); + if i + 1 < chars.len() { + let next = chars[i + 1]; + if is_hangul(next) { + let next_len = next.len_utf8(); + tokens.push(Token { + term: format!("{}{}", c, next).to_lowercase(), + start: byte_offset, + end: byte_offset + char_len + next_len, + }); + } + } + } + byte_offset += char_len; + } + + tokens +} + +fn is_hangul(c: char) -> bool { + // Hangul Syllables + ('\u{AC00}'..='\u{D7AF}').contains(&c) + // Hangul Jamo + || ('\u{1100}'..='\u{11FF}').contains(&c) + // Hangul Compatibility Jamo + || ('\u{3130}'..='\u{318F}').contains(&c) +} diff --git a/packages/frontend/native/nbstore/src/indexer/types.rs b/packages/frontend/native/nbstore/src/indexer/types.rs new file mode 100644 index 0000000000..9099b0625a --- /dev/null +++ b/packages/frontend/native/nbstore/src/indexer/types.rs @@ -0,0 +1,79 @@ +use std::collections::HashMap; + +use affine_common::doc_parser::{BlockInfo, CrawlResult}; +use napi_derive::napi; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DocData { + pub content: String, + pub doc_len: i64, + pub term_pos: HashMap>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SnapshotData { + pub docs: HashMap, +} + +#[napi(object)] +#[derive(Debug, Serialize)] +pub struct NativeBlockInfo { + pub block_id: String, + pub flavour: String, + pub content: Option>, + pub blob: Option>, + pub ref_doc_id: Option>, + pub ref_info: Option>, + pub parent_flavour: Option, + pub parent_block_id: Option, + pub additional: Option, +} + +#[napi(object)] +#[derive(Debug, Serialize)] +pub struct NativeCrawlResult { + pub blocks: Vec, + pub title: String, + pub summary: String, +} + +#[napi(object)] +#[derive(Debug, Serialize)] +pub struct NativeSearchHit { + pub id: String, + pub score: f64, +} + +#[napi(object)] +#[derive(Debug, Serialize)] +pub struct NativeMatch { + pub start: u32, + pub end: u32, +} + +impl From for NativeBlockInfo { + fn from(value: BlockInfo) -> Self { + Self { + block_id: value.block_id, + flavour: value.flavour, + content: value.content, + blob: value.blob, + ref_doc_id: value.ref_doc_id, + ref_info: value.ref_info, + parent_flavour: value.parent_flavour, + parent_block_id: value.parent_block_id, + additional: value.additional, + } + } +} + +impl From for NativeCrawlResult { + fn from(value: CrawlResult) -> Self { + Self { + blocks: value.blocks.into_iter().map(Into::into).collect(), + title: value.title, + summary: value.summary, + } + } +} diff --git a/packages/frontend/native/nbstore/src/lib.rs b/packages/frontend/native/nbstore/src/lib.rs index 01336d930c..7b9976f0dd 100644 --- a/packages/frontend/native/nbstore/src/lib.rs +++ b/packages/frontend/native/nbstore/src/lib.rs @@ -450,6 +450,77 @@ impl DocStoragePool { Ok(result) } + + #[napi] + pub async fn fts_add_document( + &self, + id: String, + index_name: String, + doc_id: String, + text: String, + index: bool, + ) -> Result<()> { + let storage = self.pool.get(id).await?; + storage.fts_add(&index_name, &doc_id, &text, index).await?; + Ok(()) + } + + #[napi] + pub async fn fts_flush_index(&self, id: String) -> Result<()> { + let storage = self.pool.get(id).await?; + storage.flush_index().await?; + Ok(()) + } + + #[napi] + pub async fn fts_delete_document( + &self, + id: String, + index_name: String, + doc_id: String, + ) -> Result<()> { + let storage = self.pool.get(id).await?; + storage.fts_delete(&index_name, &doc_id).await?; + Ok(()) + } + + #[napi] + pub async fn fts_get_document( + &self, + id: String, + index_name: String, + doc_id: String, + ) -> Result> { + let storage = self.pool.get(id).await?; + Ok(storage.fts_get(&index_name, &doc_id).await?) + } + + #[napi] + pub async fn fts_search( + &self, + id: String, + index_name: String, + query: String, + ) -> Result> { + let storage = self.pool.get(id).await?; + Ok(storage.fts_search(&index_name, &query).await?) + } + + #[napi] + pub async fn fts_get_matches( + &self, + id: String, + index_name: String, + doc_id: String, + query: String, + ) -> Result> { + let storage = self.pool.get(id).await?; + Ok( + storage + .fts_get_matches(&index_name, &doc_id, &query) + .await?, + ) + } } #[napi] diff --git a/packages/frontend/native/nbstore/src/storage.rs b/packages/frontend/native/nbstore/src/storage.rs index 7c72cada4d..664f3db3ac 100644 --- a/packages/frontend/native/nbstore/src/storage.rs +++ b/packages/frontend/native/nbstore/src/storage.rs @@ -1,15 +1,19 @@ +use std::sync::Arc; + use affine_schema::get_migrator; use sqlx::{ migrate::MigrateDatabase, sqlite::{Sqlite, SqliteConnectOptions, SqlitePoolOptions}, Pool, Row, }; +use tokio::sync::RwLock; -use super::error::Result; +use super::{error::Result, indexer::InMemoryIndex}; pub struct SqliteDocStorage { pub pool: Pool, path: String, + pub index: Arc>, } impl SqliteDocStorage { @@ -20,6 +24,8 @@ impl SqliteDocStorage { let mut pool_options = SqlitePoolOptions::new(); + let index = Arc::new(RwLock::new(InMemoryIndex::default())); + if path == ":memory:" { pool_options = pool_options .min_connections(1) @@ -30,6 +36,7 @@ impl SqliteDocStorage { Self { pool: pool_options.connect_lazy_with(sqlite_options), path, + index, } } else { Self { @@ -37,6 +44,7 @@ impl SqliteDocStorage { .max_connections(4) .connect_lazy_with(sqlite_options.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)), path, + index, } } } @@ -61,6 +69,7 @@ impl SqliteDocStorage { }; self.migrate().await?; + self.init_index().await?; Ok(()) } diff --git a/packages/frontend/native/schema/src/lib.rs b/packages/frontend/native/schema/src/lib.rs index 7951635da8..5c9c26f6ef 100644 --- a/packages/frontend/native/schema/src/lib.rs +++ b/packages/frontend/native/schema/src/lib.rs @@ -71,6 +71,18 @@ CREATE TABLE "peer_blob_sync" ( PRIMARY KEY (peer, blob_id) ); CREATE INDEX peer_blob_sync_peer ON peer_blob_sync (peer); + "#, + None, + ), + // add idx snapshots + ( + "add_idx_snapshots", + r#" +CREATE TABLE idx_snapshots ( + index_name TEXT PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +); "#, None, ),