feat: refresh index if version changed (#14150)

This commit is contained in:
DarkSky
2025-12-26 01:08:05 +08:00
committed by GitHub
parent e8693a3a25
commit 2e38898937
19 changed files with 57 additions and 19 deletions

4
Cargo.lock generated
View File

@@ -2643,9 +2643,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]] [[package]]
name = "memory-indexer" name = "memory-indexer"
version = "0.2.0" version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10feba381e2eeb6582f34379d62ee0658e57f63d776698150b985bd0f38664b3" checksum = "36308f8c9f537d7624a30cd4d6243c54143221e4e0dc2a699783c206604befbd"
dependencies = [ dependencies = [
"jieba-rs", "jieba-rs",
"once_cell", "once_cell",

View File

@@ -47,7 +47,7 @@ resolver = "3"
libc = "0.2" libc = "0.2"
log = "0.4" log = "0.4"
loom = { version = "0.7", features = ["checkpoint"] } loom = { version = "0.7", features = ["checkpoint"] }
memory-indexer = "0.2" memory-indexer = "0.2.1"
mimalloc = "0.1" mimalloc = "0.1"
mp4parse = "0.17" mp4parse = "0.17"
nanoid = "0.4" nanoid = "0.4"

View File

@@ -143,4 +143,8 @@ export class CloudIndexerStorage extends IndexerStorageBase {
override async refreshIfNeed(): Promise<void> { override async refreshIfNeed(): Promise<void> {
return Promise.resolve(); return Promise.resolve();
} }
override async indexVersion(): Promise<number> {
return Promise.resolve(1);
}
} }

View File

@@ -218,4 +218,10 @@ export class IndexedDBIndexerStorage extends IndexerStorageBase {
this.tableUpdate$.next(table); this.tableUpdate$.next(table);
this.channel.postMessage({ type: 'indexer-updated', table }); this.channel.postMessage({ type: 'indexer-updated', table });
} }
// Get the current indexer version
// increase this number to re-index all docs
async indexVersion(): Promise<number> {
return Promise.resolve(1);
}
} }

View File

@@ -112,6 +112,7 @@ export interface NativeDBApis {
query: string query: string
) => Promise<{ start: number; end: number }[]>; ) => Promise<{ start: number; end: number }[]>;
ftsFlushIndex: (id: string) => Promise<void>; ftsFlushIndex: (id: string) => Promise<void>;
ftsIndexVersion: () => Promise<number>;
} }
type NativeDBApisWrapper = NativeDBApis extends infer APIs type NativeDBApisWrapper = NativeDBApis extends infer APIs
@@ -119,7 +120,7 @@ type NativeDBApisWrapper = NativeDBApis extends infer APIs
[K in keyof APIs]: APIs[K] extends (...args: any[]) => any [K in keyof APIs]: APIs[K] extends (...args: any[]) => any
? Parameters<APIs[K]> extends [string, ...infer Rest] ? Parameters<APIs[K]> extends [string, ...infer Rest]
? (...args: Rest) => ReturnType<APIs[K]> ? (...args: Rest) => ReturnType<APIs[K]>
: never : (...args: Parameters<APIs[K]>) => ReturnType<APIs[K]>
: never; : never;
} }
: never; : never;

View File

@@ -241,4 +241,8 @@ export class SqliteIndexerStorage extends IndexerStorageBase {
async refreshIfNeed(): Promise<void> { async refreshIfNeed(): Promise<void> {
await this.connection.apis.ftsFlushIndex(); await this.connection.apis.ftsFlushIndex();
} }
async indexVersion(): Promise<number> {
return this.connection.apis.ftsIndexVersion();
}
} }

View File

@@ -88,4 +88,7 @@ export class DummyIndexerStorage extends IndexerStorageBase {
override async refreshIfNeed(): Promise<void> { override async refreshIfNeed(): Promise<void> {
return Promise.resolve(); return Promise.resolve();
} }
override async indexVersion(): Promise<number> {
return Promise.resolve(0);
}
} }

View File

@@ -64,6 +64,7 @@ export interface IndexerStorage extends Storage {
refresh<T extends keyof IndexerSchema>(table: T): Promise<void>; refresh<T extends keyof IndexerSchema>(table: T): Promise<void>;
refreshIfNeed(): Promise<void>; refreshIfNeed(): Promise<void>;
indexVersion(): Promise<number>;
} }
type ResultPagination = { type ResultPagination = {
@@ -178,4 +179,6 @@ export abstract class IndexerStorageBase implements IndexerStorage {
abstract refresh<T extends keyof IndexerSchema>(table: T): Promise<void>; abstract refresh<T extends keyof IndexerSchema>(table: T): Promise<void>;
abstract refreshIfNeed(): Promise<void>; abstract refreshIfNeed(): Promise<void>;
abstract indexVersion(): Promise<number>;
} }

View File

@@ -106,10 +106,6 @@ export interface IndexerSync {
} }
export class IndexerSyncImpl implements IndexerSync { export class IndexerSyncImpl implements IndexerSync {
/**
* increase this number to re-index all docs
*/
readonly INDEXER_VERSION = 2;
private abort: AbortController | null = null; private abort: AbortController | null = null;
private readonly rootDocId = this.doc.spaceId; private readonly rootDocId = this.doc.spaceId;
private readonly status = new IndexerSyncStatus(this.rootDocId); private readonly status = new IndexerSyncStatus(this.rootDocId);
@@ -266,7 +262,8 @@ export class IndexerSyncImpl implements IndexerSync {
this.status.errorMessage = null; this.status.errorMessage = null;
this.status.statusUpdatedSubject$.next(true); this.status.statusUpdatedSubject$.next(true);
console.log('indexer sync start'); const indexVersion = await this.indexer.indexVersion();
console.log('indexer sync start, version: ', indexVersion);
const unsubscribe = this.doc.subscribeDocUpdate(update => { const unsubscribe = this.doc.subscribeDocUpdate(update => {
if (!this.status.rootDocReady) { if (!this.status.rootDocReady) {
@@ -402,7 +399,7 @@ export class IndexerSyncImpl implements IndexerSync {
docIndexedClock && docIndexedClock &&
docIndexedClock.timestamp.getTime() === docIndexedClock.timestamp.getTime() ===
docClock.timestamp.getTime() && docClock.timestamp.getTime() &&
docIndexedClock.indexerVersion === this.INDEXER_VERSION docIndexedClock.indexerVersion === indexVersion
) { ) {
// doc is already indexed, just skip // doc is already indexed, just skip
continue; continue;
@@ -468,7 +465,7 @@ export class IndexerSyncImpl implements IndexerSync {
await this.indexerSync.setDocIndexedClock({ await this.indexerSync.setDocIndexedClock({
docId, docId,
timestamp: docClock.timestamp, timestamp: docClock.timestamp,
indexerVersion: this.INDEXER_VERSION, indexerVersion: indexVersion,
}); });
// #endregion // #endregion
} }

View File

@@ -184,4 +184,5 @@ export interface NbStorePlugin {
query: string; query: string;
}) => Promise<Array<{ start: number; end: number }>>; }) => Promise<Array<{ start: number; end: number }>>;
ftsFlushIndex: (options: { id: string }) => Promise<void>; ftsFlushIndex: (options: { id: string }) => Promise<void>;
ftsIndexVersion: () => Promise<number>;
} }

View File

@@ -406,4 +406,7 @@ export const NbStoreNativeDBApis: NativeDBApis = {
id, id,
}); });
}, },
ftsIndexVersion: function (): Promise<number> {
return NbStore.ftsIndexVersion();
},
}; };

View File

@@ -54,4 +54,5 @@ export const nbstoreHandlers: NativeDBApis = {
ftsGetDocument: POOL.ftsGetDocument.bind(POOL), ftsGetDocument: POOL.ftsGetDocument.bind(POOL),
ftsGetMatches: POOL.ftsGetMatches.bind(POOL), ftsGetMatches: POOL.ftsGetMatches.bind(POOL),
ftsFlushIndex: POOL.ftsFlushIndex.bind(POOL), ftsFlushIndex: POOL.ftsFlushIndex.bind(POOL),
ftsIndexVersion: POOL.ftsIndexVersion.bind(POOL),
}; };

View File

@@ -184,4 +184,5 @@ export interface NbStorePlugin {
query: string; query: string;
}) => Promise<Array<{ start: number; end: number }>>; }) => Promise<Array<{ start: number; end: number }>>;
ftsFlushIndex: (options: { id: string }) => Promise<void>; ftsFlushIndex: (options: { id: string }) => Promise<void>;
ftsIndexVersion: () => Promise<number>;
} }

View File

@@ -410,4 +410,7 @@ export const NbStoreNativeDBApis: NativeDBApis = {
id, id,
}); });
}, },
ftsIndexVersion: function (): Promise<number> {
return NbStore.ftsIndexVersion();
},
}; };

View File

@@ -84,6 +84,7 @@ export declare class DocStoragePool {
getBlobUploadedAt(universalId: string, peer: string, blobId: string): Promise<Date | null> getBlobUploadedAt(universalId: string, peer: string, blobId: string): Promise<Date | null>
ftsAddDocument(id: string, indexName: string, docId: string, text: string, index: boolean): Promise<void> ftsAddDocument(id: string, indexName: string, docId: string, text: string, index: boolean): Promise<void>
ftsFlushIndex(id: string): Promise<void> ftsFlushIndex(id: string): Promise<void>
ftsIndexVersion(): Promise<number>
ftsDeleteDocument(id: string, indexName: string, docId: string): Promise<void> ftsDeleteDocument(id: string, indexName: string, docId: string): Promise<void>
ftsGetDocument(id: string, indexName: string, docId: string): Promise<string | null> ftsGetDocument(id: string, indexName: string, docId: string): Promise<string | null>
ftsSearch(id: string, indexName: string, query: string): Promise<Array<NativeSearchHit>> ftsSearch(id: string, indexName: string, query: string): Promise<Array<NativeSearchHit>>

View File

@@ -74,7 +74,7 @@ impl SqliteDocStorage {
mod tests { mod tests {
use sqlx::Row; use sqlx::Row;
use super::*; use super::{super::Data, *};
async fn get_storage() -> SqliteDocStorage { async fn get_storage() -> SqliteDocStorage {
let storage = SqliteDocStorage::new(":memory:".to_string()); let storage = SqliteDocStorage::new(":memory:".to_string());
@@ -91,7 +91,7 @@ mod tests {
storage storage
.set_blob(SetBlob { .set_blob(SetBlob {
key: format!("test_{}", i), key: format!("test_{}", i),
data: vec![0, 0], data: Into::<Data>::into(vec![0, 0]),
mime: "text/plain".to_string(), mime: "text/plain".to_string(),
}) })
.await .await
@@ -131,7 +131,7 @@ mod tests {
storage storage
.set_blob(SetBlob { .set_blob(SetBlob {
key: format!("test_{}", i), key: format!("test_{}", i),
data: vec![0, 0], data: Into::<Data>::into(vec![0, 0]),
mime: "text/plain".to_string(), mime: "text/plain".to_string(),
}) })
.await .await
@@ -179,7 +179,7 @@ mod tests {
storage storage
.set_blob(SetBlob { .set_blob(SetBlob {
key: format!("test_{}", i), key: format!("test_{}", i),
data: vec![0, 0], data: Into::<Data>::into(vec![0, 0]),
mime: "text/plain".to_string(), mime: "text/plain".to_string(),
}) })
.await .await

View File

@@ -250,6 +250,7 @@ mod tests {
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use super::*; use super::*;
use crate::Data;
async fn get_storage() -> SqliteDocStorage { async fn get_storage() -> SqliteDocStorage {
let storage = SqliteDocStorage::new(":memory:".to_string()); let storage = SqliteDocStorage::new(":memory:".to_string());
@@ -293,7 +294,7 @@ mod tests {
storage storage
.set_doc_snapshot(DocRecord { .set_doc_snapshot(DocRecord {
doc_id: "test".to_string(), doc_id: "test".to_string(),
bin: vec![0, 0], bin: Into::<Data>::into(vec![0, 0]),
timestamp: Utc::now().naive_utc(), timestamp: Utc::now().naive_utc(),
}) })
.await .await
@@ -373,7 +374,7 @@ mod tests {
let snapshot = DocRecord { let snapshot = DocRecord {
doc_id: "test".to_string(), doc_id: "test".to_string(),
bin: vec![0, 0], bin: Into::<Data>::into(vec![0, 0]),
timestamp: Utc::now().naive_utc(), timestamp: Utc::now().naive_utc(),
}; };
@@ -391,7 +392,7 @@ mod tests {
let snapshot = DocRecord { let snapshot = DocRecord {
doc_id: "test".to_string(), doc_id: "test".to_string(),
bin: vec![0, 0], bin: Into::<Data>::into(vec![0, 0]),
timestamp: Utc::now().naive_utc(), timestamp: Utc::now().naive_utc(),
}; };
@@ -404,7 +405,7 @@ mod tests {
let snapshot = DocRecord { let snapshot = DocRecord {
doc_id: "test".to_string(), doc_id: "test".to_string(),
bin: vec![0, 1], bin: Into::<Data>::into(vec![0, 1]),
timestamp: DateTime::from_timestamp_millis(Utc::now().timestamp_millis() - 1000) timestamp: DateTime::from_timestamp_millis(Utc::now().timestamp_millis() - 1000)
.unwrap() .unwrap()
.naive_utc(), .naive_utc(),

View File

@@ -197,6 +197,10 @@ impl SqliteDocStorage {
Ok(()) Ok(())
} }
pub fn index_version() -> u32 {
memory_indexer::InMemoryIndex::snapshot_version()
}
pub async fn fts_add( pub async fn fts_add(
&self, &self,
index_name: &str, index_name: &str,

View File

@@ -472,6 +472,11 @@ impl DocStoragePool {
Ok(()) Ok(())
} }
#[napi]
pub async fn fts_index_version(&self) -> Result<u32> {
Ok(SqliteDocStorage::index_version())
}
#[napi] #[napi]
pub async fn fts_delete_document( pub async fn fts_delete_document(
&self, &self,