feat(native): native reader for indexer (#14055)

This commit is contained in:
DarkSky
2025-12-07 16:22:11 +08:00
committed by GitHub
parent 69cdeedc4e
commit cf4e37c584
28 changed files with 1376 additions and 569 deletions

View File

@@ -4,7 +4,7 @@ name = "affine_common"
version = "0.1.0"
[features]
default = ["hashcash"]
default = []
doc-loader = [
"docx-parser",
"infer",
@@ -35,7 +35,7 @@ tree-sitter = [
"dep:tree-sitter-scala",
"dep:tree-sitter-typescript",
]
ydoc-loader = ["assert-json-diff", "y-octo"]
ydoc-loader = ["assert-json-diff", "serde", "serde_json", "thiserror", "y-octo"]
[dependencies]
chrono = { workspace = true }

View File

@@ -17,14 +17,6 @@ const BOOKMARK_FLAVOURS: [&str; 5] = [
"affine:embed-loom",
];
#[derive(Debug, Clone)]
pub struct CrawlDocInput {
pub doc_bin: Vec<u8>,
pub root_doc_bin: Option<Vec<u8>>,
pub space_id: String,
pub doc_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlockInfo {
pub block_id: String,
@@ -87,15 +79,8 @@ impl From<JwstCodecError> for ParseError {
}
}
pub fn parse_doc_from_binary(input: CrawlDocInput) -> Result<CrawlResult, ParseError> {
let CrawlDocInput {
doc_bin,
root_doc_bin: _,
space_id: _,
doc_id,
} = input;
if doc_bin.is_empty() {
pub fn parse_doc_from_binary(doc_bin: Vec<u8>, doc_id: String) -> Result<CrawlResult, ParseError> {
if doc_bin.is_empty() || doc_bin == [0, 0] {
return Err(ParseError::InvalidBinary);
}
@@ -509,14 +494,10 @@ mod tests {
#[test]
fn test_parse_doc_from_binary() {
let json = include_bytes!("../fixtures/demo.ydoc.json");
let input = CrawlDocInput {
doc_bin: include_bytes!("../fixtures/demo.ydoc").to_vec(),
root_doc_bin: None,
space_id: "o9WCLGyxkLxdULZ-f2B9V".to_string(),
doc_id: "dYpV7PPhk8amRkY5IAcVO".to_string(),
};
let doc_bin = include_bytes!("../fixtures/demo.ydoc").to_vec();
let doc_id = "dYpV7PPhk8amRkY5IAcVO".to_string();
let result = parse_doc_from_binary(input).unwrap();
let result = parse_doc_from_binary(doc_bin, doc_id).unwrap();
let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
.numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
assert_json_diff::assert_json_matches!(

View File

@@ -139,4 +139,8 @@ export class CloudIndexerStorage extends IndexerStorageBase {
override refresh<T extends keyof IndexerSchema>(_table: T): Promise<void> {
return Promise.resolve();
}
override async refreshIfNeed(): Promise<void> {
return Promise.resolve();
}
}

View File

@@ -176,6 +176,21 @@ export class IndexedDBIndexerStorage extends IndexerStorageBase {
this.emitTableUpdated(table);
}
override async refreshIfNeed(): Promise<void> {
const needRefreshTable = Object.entries(this.pendingUpdates)
.filter(
([, updates]) =>
updates.deleteByQueries.length > 0 ||
updates.deletes.length > 0 ||
updates.inserts.length > 0 ||
updates.updates.length > 0
)
.map(([table]) => table as keyof IndexerSchema);
for (const table of needRefreshTable) {
await this.refresh(table);
}
}
private watchTableUpdated(table: keyof IndexerSchema) {
return new Observable(subscriber => {
const listener = (ev: MessageEvent) => {

View File

@@ -1,6 +1,7 @@
import { AutoReconnectConnection } from '../../connection';
import type {
BlobRecord,
CrawlResult,
DocClock,
DocRecord,
ListedBlobRecord,
@@ -81,6 +82,7 @@ export interface NativeDBApis {
peer: string,
blobId: string
) => Promise<Date | null>;
crawlDocData: (id: string, docId: string) => Promise<CrawlResult>;
}
type NativeDBApisWrapper = NativeDBApis extends infer APIs

View File

@@ -1,5 +1,7 @@
import { share } from '../../connection';
import {
type BlockInfo,
type CrawlResult,
type DocClocks,
type DocRecord,
DocStorageBase,
@@ -79,4 +81,127 @@ export class SqliteDocStorage extends DocStorageBase<SqliteNativeDBOptions> {
updates.map(update => update.timestamp)
);
}
override async crawlDocData(docId: string): Promise<CrawlResult | null> {
const result = await this.db.crawlDocData(docId);
return normalizeNativeCrawlResult(result);
}
}
function normalizeNativeCrawlResult(result: unknown): CrawlResult | null {
if (!isRecord(result)) {
console.warn('[nbstore] crawlDocData returned non-object result');
return null;
}
if (
typeof result.title !== 'string' ||
typeof result.summary !== 'string' ||
!Array.isArray(result.blocks)
) {
console.warn('[nbstore] crawlDocData result missing basic fields');
return null;
}
const { title, summary } = result as { title: string; summary: string };
const rawBlocks = result.blocks as unknown[];
const blocks: BlockInfo[] = [];
for (const block of rawBlocks) {
const normalized = normalizeBlock(block);
if (normalized) {
blocks.push(normalized);
}
}
if (blocks.length === 0) {
console.warn('[nbstore] crawlDocData has no valid blocks');
return null;
}
return {
blocks,
title,
summary,
};
}
function normalizeBlock(block: unknown): BlockInfo | null {
if (!isRecord(block)) {
return null;
}
const blockId = readStringField(block, 'blockId');
const flavour = readStringField(block, 'flavour');
if (!blockId || !flavour) {
return null;
}
return {
blockId,
flavour,
content: readStringArrayField(block, 'content'),
blob: readStringArrayField(block, 'blob'),
refDocId: readStringArrayField(block, 'refDocId'),
refInfo: readStringArrayField(block, 'refInfo'),
parentFlavour: readStringField(block, 'parentFlavour'),
parentBlockId: readStringField(block, 'parentBlockId'),
additional: safeAdditionalField(block),
};
}
function readStringField(
target: Record<string, unknown>,
key: string
): string | undefined {
const value = readField(target, key);
return typeof value === 'string' && value ? value : undefined;
}
function readStringArrayField(
target: Record<string, unknown>,
key: string
): string[] | undefined {
const value = readField(target, key);
if (Array.isArray(value)) {
const filtered = value.filter(
(item): item is string => typeof item === 'string' && item.length > 0
);
return filtered.length ? filtered : undefined;
}
if (typeof value === 'string' && value.length > 0) {
return [value];
}
return undefined;
}
function safeAdditionalField(
target: Record<string, unknown>
): string | undefined {
const value = readField(target, 'additional');
if (typeof value !== 'string' || value.length === 0) {
return undefined;
}
try {
const parsed = JSON.parse(value);
return JSON.stringify(parsed);
} catch {
console.warn(
'[nbstore] ignore invalid additional payload in crawlDocData block'
);
return undefined;
}
}
function readField(target: Record<string, unknown>, key: string) {
return target[key] ?? target[toSnakeCase(key)];
}
function toSnakeCase(key: string) {
return key.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null;
}

View File

@@ -7,6 +7,24 @@ import type { Locker } from './lock';
import { SingletonLocker } from './lock';
import { type Storage } from './storage';
export interface BlockInfo {
blockId: string;
flavour: string;
content?: string[];
blob?: string[];
refDocId?: string[];
refInfo?: string[];
parentFlavour?: string;
parentBlockId?: string;
additional?: string;
}
export interface CrawlResult {
blocks: BlockInfo[];
title: string;
summary: string;
}
export interface DocClock {
docId: string;
timestamp: Date;
@@ -94,6 +112,8 @@ export interface DocStorage extends Storage {
subscribeDocUpdate(
callback: (update: DocRecord, origin?: string) => void
): () => void;
crawlDocData?(docId: string): Promise<CrawlResult | null>;
}
export abstract class DocStorageBase<Opts = {}> implements DocStorage {
@@ -174,6 +194,10 @@ export abstract class DocStorageBase<Opts = {}> implements DocStorage {
};
}
async crawlDocData(_docId: string): Promise<CrawlResult | null> {
return null;
}
// REGION: api for internal usage
protected on(
event: 'update',

View File

@@ -85,4 +85,7 @@ export class DummyIndexerStorage extends IndexerStorageBase {
override refresh<T extends keyof IndexerSchema>(_table: T): Promise<void> {
return Promise.resolve();
}
override async refreshIfNeed(): Promise<void> {
return Promise.resolve();
}
}

View File

@@ -62,6 +62,7 @@ export interface IndexerStorage extends Storage {
): Promise<void>;
refresh<T extends keyof IndexerSchema>(table: T): Promise<void>;
refreshIfNeed(): Promise<void>;
}
type ResultPagination = {
@@ -173,4 +174,6 @@ export abstract class IndexerStorageBase implements IndexerStorage {
): Promise<void>;
abstract refresh<T extends keyof IndexerSchema>(table: T): Promise<void>;
abstract refreshIfNeed(): Promise<void>;
}

View File

@@ -117,6 +117,8 @@ export class IndexerSyncImpl implements IndexerSync {
private readonly indexer: IndexerStorage;
private readonly remote?: IndexerStorage;
private lastRefreshed = Date.now();
state$ = this.status.state$.pipe(
// throttle the state to 1 second to avoid spamming the UI
throttleTime(1000, undefined, {
@@ -378,8 +380,7 @@ export class IndexerSyncImpl implements IndexerSync {
this.status.statusUpdatedSubject$.next(docId);
}
}
await this.indexer.refresh('block');
await this.indexer.refresh('doc');
await this.refreshIfNeed();
// #endregion
} else {
// #region crawl doc
@@ -407,33 +408,40 @@ export class IndexerSyncImpl implements IndexerSync {
continue;
}
const docBin = await this.doc.getDoc(docId);
if (!docBin) {
// doc is deleted, just skip
continue;
}
console.log('[indexer] start indexing doc', docId);
const docYDoc = new YDoc({ guid: docId });
applyUpdate(docYDoc, docBin.bin);
let blocks: IndexerDocument<'block'>[] = [];
let preview: string | undefined;
try {
const result = await crawlingDocData({
ydoc: docYDoc,
rootYDoc: this.status.rootDoc,
spaceId: this.status.rootDocId,
docId,
});
if (!result) {
// doc is empty without root block, just skip
const nativeResult = await this.tryNativeCrawlDocData(docId);
if (nativeResult) {
blocks = nativeResult.block;
preview = nativeResult.summary;
} else {
const docBin = await this.doc.getDoc(docId);
if (!docBin) {
// doc is deleted, just skip
continue;
}
blocks = result.blocks;
preview = result.preview;
} catch (error) {
console.error('error crawling doc', error);
const docYDoc = new YDoc({ guid: docId });
applyUpdate(docYDoc, docBin.bin);
try {
const result = await crawlingDocData({
ydoc: docYDoc,
rootYDoc: this.status.rootDoc,
spaceId: this.status.rootDocId,
docId,
});
if (!result) {
// doc is empty without root block, just skip
continue;
}
blocks = result.blocks;
preview = result.preview;
} catch (error) {
console.error('error crawling doc', error);
}
}
await this.indexer.deleteByQuery('block', {
@@ -446,8 +454,6 @@ export class IndexerSyncImpl implements IndexerSync {
await this.indexer.insert('block', block);
}
await this.indexer.refresh('block');
if (preview) {
await this.indexer.update(
'doc',
@@ -455,9 +461,10 @@ export class IndexerSyncImpl implements IndexerSync {
summary: preview,
})
);
await this.indexer.refresh('doc');
}
await this.refreshIfNeed();
await this.indexerSync.setDocIndexedClock({
docId,
timestamp: docClock.timestamp,
@@ -471,10 +478,19 @@ export class IndexerSyncImpl implements IndexerSync {
this.status.completeJob();
}
} finally {
await this.refreshIfNeed();
unsubscribe();
}
}
private async refreshIfNeed(): Promise<void> {
if (this.lastRefreshed + 100 < Date.now()) {
console.log('[indexer] refreshing indexer');
await this.indexer.refreshIfNeed();
this.lastRefreshed = Date.now();
}
}
/**
* Get all docs from the root doc, without deleted docs
*/
@@ -484,6 +500,36 @@ export class IndexerSyncImpl implements IndexerSync {
});
}
private async tryNativeCrawlDocData(docId: string) {
try {
const result = await this.doc.crawlDocData?.(docId);
if (result) {
return {
title: result.title,
block: result.blocks.map(block =>
IndexerDocument.from<'block'>(`${docId}:${block.blockId}`, {
docId,
blockId: block.blockId,
content: block.content,
flavour: block.flavour,
blob: block.blob,
refDocId: block.refDocId,
ref: block.refInfo,
parentFlavour: block.parentFlavour,
parentBlockId: block.parentBlockId,
additional: block.additional,
})
),
summary: result.summary,
};
}
return null;
} catch (error) {
console.warn('[indexer] native crawlDocData failed', docId, error);
return null;
}
}
private async getAllDocsFromIndexer() {
const docs = await this.indexer.search(
'doc',