fix(native): misalignment between index clock and snapshot clock (#14688)

fix #14191

#### PR Dependency Tree


* **PR #14688** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Bug Fixes**
* Improved indexer synchronization timing for clock persistence to
prevent premature completion signals
  * Enhanced document-level indexing status tracking accuracy
  * Optimized refresh behavior for better state consistency

* **Chores**
  * Updated indexer versioning system

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-03-20 02:09:11 +08:00
committed by GitHub
parent 0d2d4bb6a1
commit daf536f77a
3 changed files with 364 additions and 14 deletions

View File

@@ -1,18 +1,235 @@
import 'fake-indexeddb/auto';
import { expect, test } from 'vitest';
import * as reader from '@affine/reader';
import { NEVER } from 'rxjs';
import { afterEach, expect, test, vi } from 'vitest';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { DummyConnection } from '../connection';
import {
IndexedDBBlobStorage,
IndexedDBBlobSyncStorage,
IndexedDBDocStorage,
IndexedDBDocSyncStorage,
} from '../impls/idb';
import { SpaceStorage } from '../storage';
import {
type AggregateOptions,
type AggregateResult,
type CrawlResult,
type DocClock,
type DocClocks,
type DocDiff,
type DocIndexedClock,
type DocRecord,
type DocStorage,
type DocUpdate,
type IndexerDocument,
type IndexerSchema,
IndexerStorageBase,
IndexerSyncStorageBase,
type Query,
type SearchOptions,
type SearchResult,
SpaceStorage,
} from '../storage';
import { Sync } from '../sync';
import { IndexerSyncImpl } from '../sync/indexer';
import { expectYjsEqual } from './utils';
afterEach(() => {
vi.restoreAllMocks();
});
function deferred<T = void>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
class TestDocStorage implements DocStorage {
readonly storageType = 'doc' as const;
readonly connection = new DummyConnection();
readonly isReadonly = false;
private readonly subscribers = new Set<
(update: DocRecord, origin?: string) => void
>();
constructor(
readonly spaceId: string,
private readonly timestamps: Map<string, Date>,
private readonly crawlDocDataImpl: (
docId: string
) => Promise<CrawlResult | null>
) {}
async getDoc(_docId: string): Promise<DocRecord | null> {
return null;
}
async getDocDiff(
_docId: string,
_state?: Uint8Array
): Promise<DocDiff | null> {
return null;
}
async pushDocUpdate(update: DocUpdate, origin?: string): Promise<DocClock> {
const timestamp = this.timestamps.get(update.docId) ?? new Date();
const record = { ...update, timestamp };
this.timestamps.set(update.docId, timestamp);
for (const subscriber of this.subscribers) {
subscriber(record, origin);
}
return { docId: update.docId, timestamp };
}
async getDocTimestamp(docId: string): Promise<DocClock | null> {
const timestamp = this.timestamps.get(docId);
return timestamp ? { docId, timestamp } : null;
}
async getDocTimestamps(): Promise<DocClocks> {
return Object.fromEntries(this.timestamps);
}
async deleteDoc(docId: string): Promise<void> {
this.timestamps.delete(docId);
}
subscribeDocUpdate(callback: (update: DocRecord, origin?: string) => void) {
this.subscribers.add(callback);
return () => {
this.subscribers.delete(callback);
};
}
async crawlDocData(docId: string): Promise<CrawlResult | null> {
return this.crawlDocDataImpl(docId);
}
}
class TrackingIndexerStorage extends IndexerStorageBase {
override readonly connection = new DummyConnection();
override readonly isReadonly = false;
constructor(
private readonly calls: string[],
override readonly recommendRefreshInterval: number
) {
super();
}
override async search<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(_table: T, _query: Query<T>, _options?: O): Promise<SearchResult<T, O>> {
return {
pagination: { count: 0, limit: 0, skip: 0, hasMore: false },
nodes: [],
} as SearchResult<T, O>;
}
override async aggregate<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
_table: T,
_query: Query<T>,
_field: keyof IndexerSchema[T],
_options?: O
): Promise<AggregateResult<T, O>> {
return {
pagination: { count: 0, limit: 0, skip: 0, hasMore: false },
buckets: [],
} as AggregateResult<T, O>;
}
override search$<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(_table: T, _query: Query<T>, _options?: O) {
return NEVER;
}
override aggregate$<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(_table: T, _query: Query<T>, _field: keyof IndexerSchema[T], _options?: O) {
return NEVER;
}
override async deleteByQuery<T extends keyof IndexerSchema>(
table: T,
_query: Query<T>
): Promise<void> {
this.calls.push(`deleteByQuery:${String(table)}`);
}
override async insert<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument<T>
): Promise<void> {
this.calls.push(`insert:${String(table)}:${document.id}`);
}
override async delete<T extends keyof IndexerSchema>(
table: T,
id: string
): Promise<void> {
this.calls.push(`delete:${String(table)}:${id}`);
}
override async update<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument<T>
): Promise<void> {
this.calls.push(`update:${String(table)}:${document.id}`);
}
override async refresh<T extends keyof IndexerSchema>(
_table: T
): Promise<void> {
return;
}
override async refreshIfNeed(): Promise<void> {
this.calls.push('refresh');
}
override async indexVersion(): Promise<number> {
return 1;
}
}
class TrackingIndexerSyncStorage extends IndexerSyncStorageBase {
override readonly connection = new DummyConnection();
private readonly clocks = new Map<string, DocIndexedClock>();
constructor(private readonly calls: string[]) {
super();
}
override async getDocIndexedClock(
docId: string
): Promise<DocIndexedClock | null> {
return this.clocks.get(docId) ?? null;
}
override async setDocIndexedClock(clock: DocIndexedClock): Promise<void> {
this.calls.push(`setClock:${clock.docId}`);
this.clocks.set(clock.docId, clock);
}
override async clearDocIndexedClock(docId: string): Promise<void> {
this.calls.push(`clearClock:${docId}`);
this.clocks.delete(docId);
}
}
test('doc', async () => {
const doc = new YDoc();
doc.getMap('test').set('hello', 'world');
@@ -207,3 +424,114 @@ test('blob', async () => {
expect(c?.data).toEqual(new Uint8Array([4, 3, 2, 1]));
}
});
test('indexer defers indexed clock persistence until a refresh happens on delayed refresh storages', async () => {
const calls: string[] = [];
const docsInRootDoc = new Map([['doc1', { title: 'Doc 1' }]]);
const docStorage = new TestDocStorage(
'workspace-id',
new Map([['doc1', new Date('2026-01-01T00:00:00.000Z')]]),
async () => ({
title: 'Doc 1',
summary: 'summary',
blocks: [
{ blockId: 'block-1', flavour: 'affine:image', blob: ['blob-1'] },
],
})
);
const indexer = new TrackingIndexerStorage(calls, 30_000);
const indexerSyncStorage = new TrackingIndexerSyncStorage(calls);
const sync = new IndexerSyncImpl(
docStorage,
{
local: indexer,
remotes: {},
},
indexerSyncStorage
);
vi.spyOn(reader, 'readAllDocsFromRootDoc').mockImplementation(
() => new Map(docsInRootDoc)
);
try {
sync.start();
await sync.waitForCompleted();
expect(calls).not.toContain('setClock:doc1');
sync.stop();
await vi.waitFor(() => {
expect(calls).toContain('setClock:doc1');
});
const lastRefreshIndex = calls.lastIndexOf('refresh');
const setClockIndex = calls.indexOf('setClock:doc1');
expect(lastRefreshIndex).toBeGreaterThanOrEqual(0);
expect(setClockIndex).toBeGreaterThan(lastRefreshIndex);
} finally {
sync.stop();
}
});
test('indexer completion waits for the current job to finish', async () => {
const docsInRootDoc = new Map([['doc1', { title: 'Doc 1' }]]);
const crawlStarted = deferred<void>();
const releaseCrawl = deferred<void>();
const docStorage = new TestDocStorage(
'workspace-id',
new Map([['doc1', new Date('2026-01-01T00:00:00.000Z')]]),
async () => {
crawlStarted.resolve();
await releaseCrawl.promise;
return {
title: 'Doc 1',
summary: 'summary',
blocks: [
{ blockId: 'block-1', flavour: 'affine:image', blob: ['blob-1'] },
],
};
}
);
const sync = new IndexerSyncImpl(
docStorage,
{
local: new TrackingIndexerStorage([], 30_000),
remotes: {},
},
new TrackingIndexerSyncStorage([])
);
vi.spyOn(reader, 'readAllDocsFromRootDoc').mockImplementation(
() => new Map(docsInRootDoc)
);
try {
sync.start();
await crawlStarted.promise;
let completed = false;
let docCompleted = false;
const waitForCompleted = sync.waitForCompleted().then(() => {
completed = true;
});
const waitForDocCompleted = sync.waitForDocCompleted('doc1').then(() => {
docCompleted = true;
});
await new Promise(resolve => setTimeout(resolve, 20));
expect(completed).toBe(false);
expect(docCompleted).toBe(false);
releaseCrawl.resolve();
await waitForCompleted;
await waitForDocCompleted;
} finally {
sync.stop();
}
});

View File

@@ -112,6 +112,10 @@ export class IndexerSyncImpl implements IndexerSync {
private readonly indexer: IndexerStorage;
private readonly remote?: IndexerStorage;
private readonly pendingIndexedClocks = new Map<
string,
{ docId: string; timestamp: Date; indexerVersion: number }
>();
private lastRefreshed = Date.now();
@@ -372,12 +376,13 @@ export class IndexerSyncImpl implements IndexerSync {
field: 'docId',
match: docId,
});
this.pendingIndexedClocks.delete(docId);
await this.indexerSync.clearDocIndexedClock(docId);
this.status.docsInIndexer.delete(docId);
this.status.statusUpdatedSubject$.next(docId);
}
}
await this.refreshIfNeed();
await this.refreshIfNeed(true);
// #endregion
} else {
// #region crawl doc
@@ -394,7 +399,8 @@ export class IndexerSyncImpl implements IndexerSync {
}
const docIndexedClock =
await this.indexerSync.getDocIndexedClock(docId);
this.pendingIndexedClocks.get(docId) ??
(await this.indexerSync.getDocIndexedClock(docId));
if (
docIndexedClock &&
docIndexedClock.timestamp.getTime() ===
@@ -460,13 +466,12 @@ export class IndexerSyncImpl implements IndexerSync {
);
}
await this.refreshIfNeed();
await this.indexerSync.setDocIndexedClock({
this.pendingIndexedClocks.set(docId, {
docId,
timestamp: docClock.timestamp,
indexerVersion: indexVersion,
});
await this.refreshIfNeed();
// #endregion
}
@@ -476,7 +481,7 @@ export class IndexerSyncImpl implements IndexerSync {
this.status.completeJob();
}
} finally {
await this.refreshIfNeed();
await this.refreshIfNeed(true);
unsubscribe();
}
}
@@ -484,18 +489,27 @@ export class IndexerSyncImpl implements IndexerSync {
// ensure the indexer is refreshed according to recommendRefreshInterval
// recommendRefreshInterval <= 0 means force refresh on each operation
// recommendRefreshInterval > 0 means refresh if the last refresh is older than recommendRefreshInterval
private async refreshIfNeed(): Promise<void> {
private async refreshIfNeed(force = false): Promise<void> {
const recommendRefreshInterval = this.indexer.recommendRefreshInterval ?? 0;
const needRefresh =
recommendRefreshInterval > 0 &&
this.lastRefreshed + recommendRefreshInterval < Date.now();
const forceRefresh = recommendRefreshInterval <= 0;
if (needRefresh || forceRefresh) {
if (force || needRefresh || forceRefresh) {
await this.indexer.refreshIfNeed();
await this.flushPendingIndexedClocks();
this.lastRefreshed = Date.now();
}
}
private async flushPendingIndexedClocks() {
if (this.pendingIndexedClocks.size === 0) return;
for (const [docId, clock] of this.pendingIndexedClocks) {
await this.indexerSync.setDocIndexedClock(clock);
this.pendingIndexedClocks.delete(docId);
}
}
/**
* Get all docs from the root doc, without deleted docs
*/
@@ -706,7 +720,10 @@ class IndexerSyncStatus {
indexing: this.jobs.length() + (this.currentJob ? 1 : 0),
total: this.docsInRootDoc.size + 1,
errorMessage: this.errorMessage,
completed: this.rootDocReady && this.jobs.length() === 0,
completed:
this.rootDocReady &&
this.jobs.length() === 0 &&
this.currentJob === null,
batterySaveMode: this.batterySaveMode,
paused: this.paused !== null,
});
@@ -734,9 +751,10 @@ class IndexerSyncStatus {
completed: true,
});
} else {
const indexing = this.jobs.has(docId) || this.currentJob === docId;
subscribe.next({
indexing: this.jobs.has(docId),
completed: this.docsInIndexer.has(docId) && !this.jobs.has(docId),
indexing,
completed: this.docsInIndexer.has(docId) && !indexing,
});
}
};

View File

@@ -5,6 +5,10 @@ use serde::Serialize;
use sqlx::Row;
use y_octo::DocOptions;
// Increment this whenever there is a breaking change in the index format or how
// updates are applied
const NBSTORE_INDEXER_VERSION: u32 = 1;
use super::{
error::{Error, Result},
storage::SqliteDocStorage,
@@ -192,7 +196,7 @@ impl SqliteDocStorage {
}
pub fn index_version() -> u32 {
memory_indexer::InMemoryIndex::snapshot_version()
memory_indexer::InMemoryIndex::snapshot_version() + NBSTORE_INDEXER_VERSION
}
pub async fn fts_add(&self, index_name: &str, doc_id: &str, text: &str, index: bool) -> Result<()> {