diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts index 8bc7ec8a56..45a2313c03 100644 --- a/packages/common/nbstore/src/__tests__/sync.spec.ts +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -1,18 +1,235 @@ import 'fake-indexeddb/auto'; -import { expect, test } from 'vitest'; +import * as reader from '@affine/reader'; +import { NEVER } from 'rxjs'; +import { afterEach, expect, test, vi } from 'vitest'; import { Doc as YDoc, encodeStateAsUpdate } from 'yjs'; +import { DummyConnection } from '../connection'; import { IndexedDBBlobStorage, IndexedDBBlobSyncStorage, IndexedDBDocStorage, IndexedDBDocSyncStorage, } from '../impls/idb'; -import { SpaceStorage } from '../storage'; +import { + type AggregateOptions, + type AggregateResult, + type CrawlResult, + type DocClock, + type DocClocks, + type DocDiff, + type DocIndexedClock, + type DocRecord, + type DocStorage, + type DocUpdate, + type IndexerDocument, + type IndexerSchema, + IndexerStorageBase, + IndexerSyncStorageBase, + type Query, + type SearchOptions, + type SearchResult, + SpaceStorage, +} from '../storage'; import { Sync } from '../sync'; +import { IndexerSyncImpl } from '../sync/indexer'; import { expectYjsEqual } from './utils'; +afterEach(() => { + vi.restoreAllMocks(); +}); + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +class TestDocStorage implements DocStorage { + readonly storageType = 'doc' as const; + readonly connection = new DummyConnection(); + readonly isReadonly = false; + private readonly subscribers = new Set< + (update: DocRecord, origin?: string) => void + >(); + + constructor( + readonly spaceId: string, + private readonly timestamps: Map, + private readonly crawlDocDataImpl: ( + docId: string + ) => Promise + ) {} + + async getDoc(_docId: string): Promise { + return null; + } + + async getDocDiff( + _docId: string, + _state?: Uint8Array + ): Promise { + return null; + } + + async pushDocUpdate(update: DocUpdate, origin?: string): Promise { + const timestamp = this.timestamps.get(update.docId) ?? new Date(); + const record = { ...update, timestamp }; + this.timestamps.set(update.docId, timestamp); + for (const subscriber of this.subscribers) { + subscriber(record, origin); + } + return { docId: update.docId, timestamp }; + } + + async getDocTimestamp(docId: string): Promise { + const timestamp = this.timestamps.get(docId); + return timestamp ? { docId, timestamp } : null; + } + + async getDocTimestamps(): Promise { + return Object.fromEntries(this.timestamps); + } + + async deleteDoc(docId: string): Promise { + this.timestamps.delete(docId); + } + + subscribeDocUpdate(callback: (update: DocRecord, origin?: string) => void) { + this.subscribers.add(callback); + return () => { + this.subscribers.delete(callback); + }; + } + + async crawlDocData(docId: string): Promise { + return this.crawlDocDataImpl(docId); + } +} + +class TrackingIndexerStorage extends IndexerStorageBase { + override readonly connection = new DummyConnection(); + override readonly isReadonly = false; + + constructor( + private readonly calls: string[], + override readonly recommendRefreshInterval: number + ) { + super(); + } + + override async search< + T extends keyof IndexerSchema, + const O extends SearchOptions, + >(_table: T, _query: Query, _options?: O): Promise> { + return { + pagination: { count: 0, limit: 0, skip: 0, hasMore: false }, + nodes: [], + } as SearchResult; + } + + override async aggregate< + T extends keyof IndexerSchema, + const O extends AggregateOptions, + >( + _table: T, + _query: Query, + _field: keyof IndexerSchema[T], + _options?: O + ): Promise> { + return { + pagination: { count: 0, limit: 0, skip: 0, hasMore: false }, + buckets: [], + } as AggregateResult; + } + + override search$< + T extends keyof IndexerSchema, + const O extends SearchOptions, + >(_table: T, _query: Query, _options?: O) { + return NEVER; + } + + override aggregate$< + T extends keyof IndexerSchema, + const O extends AggregateOptions, + >(_table: T, _query: Query, _field: keyof IndexerSchema[T], _options?: O) { + return NEVER; + } + + override async deleteByQuery( + table: T, + _query: Query + ): Promise { + this.calls.push(`deleteByQuery:${String(table)}`); + } + + override async insert( + table: T, + document: IndexerDocument + ): Promise { + this.calls.push(`insert:${String(table)}:${document.id}`); + } + + override async delete( + table: T, + id: string + ): Promise { + this.calls.push(`delete:${String(table)}:${id}`); + } + + override async update( + table: T, + document: IndexerDocument + ): Promise { + this.calls.push(`update:${String(table)}:${document.id}`); + } + + override async refresh( + _table: T + ): Promise { + return; + } + + override async refreshIfNeed(): Promise { + this.calls.push('refresh'); + } + + override async indexVersion(): Promise { + return 1; + } +} + +class TrackingIndexerSyncStorage extends IndexerSyncStorageBase { + override readonly connection = new DummyConnection(); + private readonly clocks = new Map(); + + constructor(private readonly calls: string[]) { + super(); + } + + override async getDocIndexedClock( + docId: string + ): Promise { + return this.clocks.get(docId) ?? null; + } + + override async setDocIndexedClock(clock: DocIndexedClock): Promise { + this.calls.push(`setClock:${clock.docId}`); + this.clocks.set(clock.docId, clock); + } + + override async clearDocIndexedClock(docId: string): Promise { + this.calls.push(`clearClock:${docId}`); + this.clocks.delete(docId); + } +} + test('doc', async () => { const doc = new YDoc(); doc.getMap('test').set('hello', 'world'); @@ -207,3 +424,114 @@ test('blob', async () => { expect(c?.data).toEqual(new Uint8Array([4, 3, 2, 1])); } }); + +test('indexer defers indexed clock persistence until a refresh happens on delayed refresh storages', async () => { + const calls: string[] = []; + const docsInRootDoc = new Map([['doc1', { title: 'Doc 1' }]]); + const docStorage = new TestDocStorage( + 'workspace-id', + new Map([['doc1', new Date('2026-01-01T00:00:00.000Z')]]), + async () => ({ + title: 'Doc 1', + summary: 'summary', + blocks: [ + { blockId: 'block-1', flavour: 'affine:image', blob: ['blob-1'] }, + ], + }) + ); + const indexer = new TrackingIndexerStorage(calls, 30_000); + const indexerSyncStorage = new TrackingIndexerSyncStorage(calls); + const sync = new IndexerSyncImpl( + docStorage, + { + local: indexer, + remotes: {}, + }, + indexerSyncStorage + ); + + vi.spyOn(reader, 'readAllDocsFromRootDoc').mockImplementation( + () => new Map(docsInRootDoc) + ); + + try { + sync.start(); + await sync.waitForCompleted(); + + expect(calls).not.toContain('setClock:doc1'); + + sync.stop(); + + await vi.waitFor(() => { + expect(calls).toContain('setClock:doc1'); + }); + + const lastRefreshIndex = calls.lastIndexOf('refresh'); + const setClockIndex = calls.indexOf('setClock:doc1'); + + expect(lastRefreshIndex).toBeGreaterThanOrEqual(0); + expect(setClockIndex).toBeGreaterThan(lastRefreshIndex); + } finally { + sync.stop(); + } +}); + +test('indexer completion waits for the current job to finish', async () => { + const docsInRootDoc = new Map([['doc1', { title: 'Doc 1' }]]); + const crawlStarted = deferred(); + const releaseCrawl = deferred(); + const docStorage = new TestDocStorage( + 'workspace-id', + new Map([['doc1', new Date('2026-01-01T00:00:00.000Z')]]), + async () => { + crawlStarted.resolve(); + await releaseCrawl.promise; + return { + title: 'Doc 1', + summary: 'summary', + blocks: [ + { blockId: 'block-1', flavour: 'affine:image', blob: ['blob-1'] }, + ], + }; + } + ); + const sync = new IndexerSyncImpl( + docStorage, + { + local: new TrackingIndexerStorage([], 30_000), + remotes: {}, + }, + new TrackingIndexerSyncStorage([]) + ); + + vi.spyOn(reader, 'readAllDocsFromRootDoc').mockImplementation( + () => new Map(docsInRootDoc) + ); + + try { + sync.start(); + await crawlStarted.promise; + + let completed = false; + let docCompleted = false; + + const waitForCompleted = sync.waitForCompleted().then(() => { + completed = true; + }); + const waitForDocCompleted = sync.waitForDocCompleted('doc1').then(() => { + docCompleted = true; + }); + + await new Promise(resolve => setTimeout(resolve, 20)); + + expect(completed).toBe(false); + expect(docCompleted).toBe(false); + + releaseCrawl.resolve(); + + await waitForCompleted; + await waitForDocCompleted; + } finally { + sync.stop(); + } +}); diff --git a/packages/common/nbstore/src/sync/indexer/index.ts b/packages/common/nbstore/src/sync/indexer/index.ts index dac3fe8745..a02208b912 100644 --- a/packages/common/nbstore/src/sync/indexer/index.ts +++ b/packages/common/nbstore/src/sync/indexer/index.ts @@ -112,6 +112,10 @@ export class IndexerSyncImpl implements IndexerSync { private readonly indexer: IndexerStorage; private readonly remote?: IndexerStorage; + private readonly pendingIndexedClocks = new Map< + string, + { docId: string; timestamp: Date; indexerVersion: number } + >(); private lastRefreshed = Date.now(); @@ -372,12 +376,13 @@ export class IndexerSyncImpl implements IndexerSync { field: 'docId', match: docId, }); + this.pendingIndexedClocks.delete(docId); await this.indexerSync.clearDocIndexedClock(docId); this.status.docsInIndexer.delete(docId); this.status.statusUpdatedSubject$.next(docId); } } - await this.refreshIfNeed(); + await this.refreshIfNeed(true); // #endregion } else { // #region crawl doc @@ -394,7 +399,8 @@ export class IndexerSyncImpl implements IndexerSync { } const docIndexedClock = - await this.indexerSync.getDocIndexedClock(docId); + this.pendingIndexedClocks.get(docId) ?? + (await this.indexerSync.getDocIndexedClock(docId)); if ( docIndexedClock && docIndexedClock.timestamp.getTime() === @@ -460,13 +466,12 @@ export class IndexerSyncImpl implements IndexerSync { ); } - await this.refreshIfNeed(); - - await this.indexerSync.setDocIndexedClock({ + this.pendingIndexedClocks.set(docId, { docId, timestamp: docClock.timestamp, indexerVersion: indexVersion, }); + await this.refreshIfNeed(); // #endregion } @@ -476,7 +481,7 @@ export class IndexerSyncImpl implements IndexerSync { this.status.completeJob(); } } finally { - await this.refreshIfNeed(); + await this.refreshIfNeed(true); unsubscribe(); } } @@ -484,18 +489,27 @@ export class IndexerSyncImpl implements IndexerSync { // ensure the indexer is refreshed according to recommendRefreshInterval // recommendRefreshInterval <= 0 means force refresh on each operation // recommendRefreshInterval > 0 means refresh if the last refresh is older than recommendRefreshInterval - private async refreshIfNeed(): Promise { + private async refreshIfNeed(force = false): Promise { const recommendRefreshInterval = this.indexer.recommendRefreshInterval ?? 0; const needRefresh = recommendRefreshInterval > 0 && this.lastRefreshed + recommendRefreshInterval < Date.now(); const forceRefresh = recommendRefreshInterval <= 0; - if (needRefresh || forceRefresh) { + if (force || needRefresh || forceRefresh) { await this.indexer.refreshIfNeed(); + await this.flushPendingIndexedClocks(); this.lastRefreshed = Date.now(); } } + private async flushPendingIndexedClocks() { + if (this.pendingIndexedClocks.size === 0) return; + for (const [docId, clock] of this.pendingIndexedClocks) { + await this.indexerSync.setDocIndexedClock(clock); + this.pendingIndexedClocks.delete(docId); + } + } + /** * Get all docs from the root doc, without deleted docs */ @@ -706,7 +720,10 @@ class IndexerSyncStatus { indexing: this.jobs.length() + (this.currentJob ? 1 : 0), total: this.docsInRootDoc.size + 1, errorMessage: this.errorMessage, - completed: this.rootDocReady && this.jobs.length() === 0, + completed: + this.rootDocReady && + this.jobs.length() === 0 && + this.currentJob === null, batterySaveMode: this.batterySaveMode, paused: this.paused !== null, }); @@ -734,9 +751,10 @@ class IndexerSyncStatus { completed: true, }); } else { + const indexing = this.jobs.has(docId) || this.currentJob === docId; subscribe.next({ - indexing: this.jobs.has(docId), - completed: this.docsInIndexer.has(docId) && !this.jobs.has(docId), + indexing, + completed: this.docsInIndexer.has(docId) && !indexing, }); } }; diff --git a/packages/frontend/native/nbstore/src/indexer.rs b/packages/frontend/native/nbstore/src/indexer.rs index a54bb199b9..10c31fcf19 100644 --- a/packages/frontend/native/nbstore/src/indexer.rs +++ b/packages/frontend/native/nbstore/src/indexer.rs @@ -5,6 +5,10 @@ use serde::Serialize; use sqlx::Row; use y_octo::DocOptions; +// Increment this whenever there is a breaking change in the index format or how +// updates are applied +const NBSTORE_INDEXER_VERSION: u32 = 1; + use super::{ error::{Error, Result}, storage::SqliteDocStorage, @@ -192,7 +196,7 @@ impl SqliteDocStorage { } pub fn index_version() -> u32 { - memory_indexer::InMemoryIndex::snapshot_version() + memory_indexer::InMemoryIndex::snapshot_version() + NBSTORE_INDEXER_VERSION } pub async fn fts_add(&self, index_name: &str, doc_id: &str, text: &str, index: bool) -> Result<()> {