From 6ecdc8db7a265000e02bb58b63cd7ea9920d868f Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 22 Oct 2024 06:41:07 +0000 Subject: [PATCH] feat(infra): opti indexer performance (#8557) --- .../sync/indexer/__tests__/black-box.spec.ts | 182 +++++++++--------- .../indexer/impl/indexeddb/data-struct.ts | 137 ++++++++----- .../src/sync/indexer/impl/indexeddb/index.ts | 7 +- .../indexer/impl/indexeddb/inverted-index.ts | 12 +- .../src/sync/indexer/impl/indexeddb/match.ts | 22 --- .../sync/indexer/impl/memory/data-struct.ts | 26 ++- .../src/sync/indexer/impl/memory/index.ts | 2 +- .../common/infra/src/sync/indexer/indexer.ts | 2 +- .../src/sync/job/impl/indexeddb/index.ts | 16 +- .../docs-search/entities/docs-indexer.ts | 14 +- .../modules/docs-search/worker/in-worker.ts | 61 +++--- 11 files changed, 264 insertions(+), 217 deletions(-) diff --git a/packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts b/packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts index 110f0d286c..0f48aa04b7 100644 --- a/packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts +++ b/packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts @@ -454,101 +454,107 @@ describe.each([ }); }); - test('subscribe', async () => { - await writeData({ - '1': { - title: 'hello world', - }, - }); + test( + 'subscribe', + { + timeout: 30000, + }, + async () => { + await writeData({ + '1': { + title: 'hello world', + }, + }); - let value = null as any; - index - .search$({ - type: 'match', - field: 'title', - match: 'hello world', - }) - .pipe(map(v => (value = v))) - .subscribe(); + let value = null as any; + index + .search$({ + type: 'match', + field: 'title', + match: 'hello world', + }) + .pipe(map(v => (value = v))) + .subscribe(); - await vitest.waitFor( - () => { - expect(value).toEqual({ - nodes: [ - { - id: '1', - score: expect.anything(), + await vitest.waitFor( + () => { + expect(value).toEqual({ + nodes: [ + { + id: '1', + score: expect.anything(), + }, + ], + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, }, - ], - pagination: { - count: 1, - hasMore: false, - limit: expect.anything(), - skip: 0, - }, - }); - }, - { - timeout: 5000, - } - ); + }); + }, + { + timeout: 10000, + } + ); - await writeData({ - '2': { - title: 'hello world', - }, - }); + await writeData({ + '2': { + title: 'hello world', + }, + }); - await vitest.waitFor( - () => { - expect(value).toEqual({ - nodes: [ - { - id: '1', - score: expect.anything(), + await vitest.waitFor( + () => { + expect(value).toEqual({ + nodes: [ + { + id: '1', + score: expect.anything(), + }, + { + id: '2', + score: expect.anything(), + }, + ], + pagination: { + count: 2, + hasMore: false, + limit: expect.anything(), + skip: 0, }, - { - id: '2', - score: expect.anything(), - }, - ], - pagination: { - count: 2, - hasMore: false, - limit: expect.anything(), - skip: 0, - }, - }); - }, - { - timeout: 5000, - } - ); + }); + }, + { + timeout: 10000, + } + ); - const writer = await index.write(); - writer.delete('1'); - await writer.commit(); + const writer = await index.write(); + writer.delete('1'); + await writer.commit(); - await vitest.waitFor( - () => { - expect(value).toEqual({ - nodes: [ - { - id: '2', - score: expect.anything(), + await vitest.waitFor( + () => { + expect(value).toEqual({ + nodes: [ + { + id: '2', + score: expect.anything(), + }, + ], + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, }, - ], - pagination: { - count: 1, - hasMore: false, - limit: expect.anything(), - skip: 0, - }, - }); - }, - { - timeout: 5000, - } - ); - }); + }); + }, + { + timeout: 10000, + } + ); + } + ); }); diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts index 2fd5c031a3..9fe55bc1ec 100644 --- a/packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts @@ -100,7 +100,7 @@ export class DataStruct { } } - async insert(trx: DataStructRWTransaction, document: Document) { + private async insert(trx: DataStructRWTransaction, document: Document) { const exists = await trx .objectStore('records') .index('id') @@ -138,7 +138,7 @@ export class DataStruct { } } - async delete(trx: DataStructRWTransaction, id: string) { + private async delete(trx: DataStructRWTransaction, id: string) { const nid = await trx.objectStore('records').index('id').getKey(id); if (nid) { @@ -159,11 +159,30 @@ export class DataStruct { deletes: string[], inserts: Document[] ) { - for (const del of deletes) { - await this.delete(trx, del); - } - for (const inst of inserts) { - await this.insert(trx, inst); + const startTime = performance.now(); + try { + for (const del of deletes) { + await this.delete(trx, del); + } + for (const inst of inserts) { + await this.insert(trx, inst); + } + } finally { + const endTime = performance.now(); + if (BUILD_CONFIG.debug) { + performance.measure( + `[IndexedDB Indexer] Batch Write (${this.databaseName})`, + { + start: startTime, + end: endTime, + } + ); + } + logger.debug( + `[indexer ${this.databaseName}] batchWrite`, + endTime - startTime, + 'ms' + ); } } @@ -214,18 +233,6 @@ export class DataStruct { throw new Error(`Query type '${query.type}' not supported`); } - private async query( - trx: DataStructROTransaction, - query: Query - ): Promise { - const match = await this.queryRaw(trx, query); - const filteredMatch = match.asyncFilter(async nid => { - const record = await trx.objectStore('records').getKey(nid); - return record !== undefined; - }); - return filteredMatch; - } - async clear(trx: DataStructRWTransaction) { await trx.objectStore('records').clear(); await trx.objectStore('invertedIndex').clear(); @@ -244,7 +251,7 @@ export class DataStruct { limit: options.pagination?.limit ?? 100, }; - const match = await this.query(trx, query); + const match = await this.queryRaw(trx, query); const nids = match .toArray() @@ -252,7 +259,11 @@ export class DataStruct { const nodes = []; for (const nid of nids) { - nodes.push(await this.resultNode(trx, match, nid, options)); + const record = await trx.objectStore('records').get(nid); + if (!record) { + continue; + } + nodes.push(this.resultNode(record, options, match, nid)); } return { @@ -265,9 +276,20 @@ export class DataStruct { nodes: nodes, }; } finally { + const endTime = performance.now(); + if (BUILD_CONFIG.debug) { + performance.measure( + `[IndexedDB Indexer] Search (${this.databaseName})`, + { + detail: { query, options }, + start: startTime, + end: endTime, + } + ); + } logger.debug( `[indexer ${this.databaseName}] search`, - performance.now() - startTime, + endTime - startTime, 'ms', query ); @@ -297,7 +319,7 @@ export class DataStruct { limit: 0, }; - const match = await this.query(trx, query); + const match = await this.queryRaw(trx, query); const nids = match.toArray(); @@ -308,9 +330,11 @@ export class DataStruct { }[] = []; for (const nid of nids) { - const values = (await trx.objectStore('records').get(nid))?.data.get( - field - ); + const record = await trx.objectStore('records').get(nid); + if (!record) { + continue; + } + const values = record.data.get(field); for (const value of values ?? []) { let bucket; let bucketIndex = buckets.findIndex(b => b.key === value); @@ -332,7 +356,7 @@ export class DataStruct { bucket.nids.length - 1 < hitPagination.skip + hitPagination.limit ) { bucket.hits.push( - await this.resultNode(trx, match, nid, options.hits ?? {}) + this.resultNode(record, options.hits ?? {}, match, nid) ); } } @@ -373,9 +397,20 @@ export class DataStruct { }, }; } finally { + const endTime = performance.now(); + if (BUILD_CONFIG.debug) { + performance.measure( + `[IndexedDB Indexer] Aggregate (${this.databaseName})`, + { + detail: { query, field, options }, + start: startTime, + end: endTime, + } + ); + } logger.debug( `[indexer ${this.databaseName}] aggregate`, - performance.now() - startTime, + endTime - startTime, 'ms' ); } @@ -383,12 +418,19 @@ export class DataStruct { async getAll( trx: DataStructROTransaction, - ids: string[] + ids?: string[] ): Promise { const docs = []; - for (const id of ids) { - const record = await trx.objectStore('records').index('id').get(id); - if (record) { + if (ids) { + for (const id of ids) { + const record = await trx.objectStore('records').index('id').get(id); + if (record) { + docs.push(Document.from(record.id, record.data)); + } + } + } else { + const records = await trx.objectStore('records').getAll(); + for (const record of records) { docs.push(Document.from(record.id, record.data)); } } @@ -405,7 +447,10 @@ export class DataStruct { await this.ensureInitialized(); return this.database.transaction( ['records', 'invertedIndex', 'kvMetadata'], - 'readonly' + 'readonly', + { + durability: 'relaxed', + } ); } @@ -413,7 +458,10 @@ export class DataStruct { await this.ensureInitialized(); return this.database.transaction( ['records', 'invertedIndex', 'kvMetadata'], - 'readwrite' + 'readwrite', + { + durability: 'relaxed', + } ); } @@ -446,20 +494,15 @@ export class DataStruct { }); } - private async resultNode( - trx: DataStructROTransaction, - match: Match, - nid: number, - options: SearchOptions - ): Promise['nodes'][number]> { - const record = await trx.objectStore('records').get(nid); - if (!record) { - throw new Error(`Record not found for nid ${nid}`); - } - + private resultNode( + record: { id: string; data: Map }, + options: SearchOptions, + match?: Match, + nid?: number + ): SearchResult['nodes'][number] { const node = { id: record.id, - score: match.getScore(nid), + score: match && nid ? match.getScore(nid) : 1, } as any; if (options.fields) { @@ -473,7 +516,7 @@ export class DataStruct { node.fields = fields; } - if (options.highlights) { + if (match && nid && options.highlights) { const highlights = {} as Record; for (const { field, before, end } of options.highlights) { const highlightValues = match.getHighlighters(nid, field); diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts index b9244bee01..2cbca87a5b 100644 --- a/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts @@ -61,7 +61,7 @@ export class IndexedDBIndex implements Index { options: SearchOptions = {} ): Observable>> { return merge(of(1), this.broadcast$).pipe( - throttleTime(500, undefined, { leading: true, trailing: true }), + throttleTime(3000, undefined, { leading: true, trailing: true }), exhaustMapWithTrailing(() => { return from( (async () => { @@ -88,7 +88,7 @@ export class IndexedDBIndex implements Index { options: AggregateOptions = {} ): Observable>> { return merge(of(1), this.broadcast$).pipe( - throttleTime(500, undefined, { leading: true, trailing: true }), + throttleTime(3000, undefined, { leading: true, trailing: true }), exhaustMapWithTrailing(() => { return from( (async () => { @@ -120,7 +120,7 @@ export class IndexedDBIndexWriter implements IndexWriter { return (await this.getAll([id]))[0] ?? null; } - async getAll(ids: string[]): Promise[]> { + async getAll(ids?: string[]): Promise[]> { const trx = await this.data.readonly(); return this.data.getAll(trx, ids); } @@ -138,6 +138,7 @@ export class IndexedDBIndexWriter implements IndexWriter { async commit(): Promise { await this.data.batchWrite(this.trx, this.deletes, this.inserts); + this.trx.commit(); this.channel.postMessage(1); } diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts index 4230e18dfc..135f8ec631 100644 --- a/packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts @@ -202,6 +202,12 @@ export class FullTextInvertedIndex implements InvertedIndex { } > >(); + const avgFieldLength = + ( + await trx + .objectStore('kvMetadata') + .get(`full-text:avg-field-length:${this.fieldKey}`) + )?.value ?? 0; for (const token of queryTokens) { const key = InvertedIndexKey.forString(this.fieldKey, token.term); const objs = await trx @@ -229,12 +235,6 @@ export class FullTextInvertedIndex implements InvertedIndex { }; const termFreq = position.rs.length; const totalCount = objs.length; - const avgFieldLength = - ( - await trx - .objectStore('kvMetadata') - .get(`full-text:avg-field-length:${this.fieldKey}`) - )?.value ?? 0; const fieldLength = position.l; const score = bm25(termFreq, 1, totalCount, fieldLength, avgFieldLength) * diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts index 10dbce0ca2..707779a2ea 100644 --- a/packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts @@ -95,28 +95,6 @@ export class Match { .map(e => e[0]); } - filter(predicate: (id: number) => boolean) { - const newWeight = new Match(); - for (const [id, score] of this.scores) { - if (predicate(id)) { - newWeight.addScore(id, score); - newWeight.copyExtData(this, id); - } - } - return newWeight; - } - - async asyncFilter(predicate: (id: number) => Promise) { - const newWeight = new Match(); - for (const [id, score] of this.scores) { - if (await predicate(id)) { - newWeight.addScore(id, score); - newWeight.copyExtData(this, id); - } - } - return newWeight; - } - private copyExtData(from: Match, id: number) { for (const [field, values] of from.highlighters.get(id) ?? []) { for (const [index, ranges] of values) { diff --git a/packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts b/packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts index 96a77812f7..a22ced413c 100644 --- a/packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts +++ b/packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts @@ -47,16 +47,22 @@ export class DataStruct { } } - getAll(ids: string[]): Document[] { - return ids - .map(id => { - const nid = this.idMap.get(id); - if (nid === undefined) { - return undefined; - } - return Document.from(id, this.records[nid].data); - }) - .filter((v): v is Document => v !== undefined); + getAll(ids?: string[]): Document[] { + if (ids) { + return ids + .map(id => { + const nid = this.idMap.get(id); + if (nid === undefined) { + return undefined; + } + return Document.from(id, this.records[nid].data); + }) + .filter((v): v is Document => v !== undefined); + } else { + return this.records + .filter(record => !record.deleted) + .map(record => Document.from(record.id, record.data)); + } } insert(document: Document) { diff --git a/packages/common/infra/src/sync/indexer/impl/memory/index.ts b/packages/common/infra/src/sync/indexer/impl/memory/index.ts index 2d05557cde..c3bddd9dac 100644 --- a/packages/common/infra/src/sync/indexer/impl/memory/index.ts +++ b/packages/common/infra/src/sync/indexer/impl/memory/index.ts @@ -28,7 +28,7 @@ export class MemoryIndex implements Index { return (await this.getAll([id]))[0] ?? null; } - getAll(ids: string[]): Promise[]> { + getAll(ids?: string[]): Promise[]> { return Promise.resolve(this.data.getAll(ids)); } diff --git a/packages/common/infra/src/sync/indexer/indexer.ts b/packages/common/infra/src/sync/indexer/indexer.ts index cba453022f..843ed36655 100644 --- a/packages/common/infra/src/sync/indexer/indexer.ts +++ b/packages/common/infra/src/sync/indexer/indexer.ts @@ -31,7 +31,7 @@ export interface IndexWriter export interface IndexReader { get(id: string): Promise | null>; - getAll(ids: string[]): Promise[]>; + getAll(ids?: string[]): Promise[]>; has(id: string): Promise; } diff --git a/packages/common/infra/src/sync/job/impl/indexeddb/index.ts b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts index 20f6f8e471..f6215d52d0 100644 --- a/packages/common/infra/src/sync/job/impl/indexeddb/index.ts +++ b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts @@ -50,7 +50,9 @@ export class IndexedDBJobQueue implements JobQueue { async accept(): Promise { await this.ensureInitialized(); const jobs = []; - const trx = this.database.transaction(['jobs'], 'readwrite'); + const trx = this.database.transaction(['jobs'], 'readwrite', { + durability: 'relaxed', + }); // if no priority jobs @@ -148,7 +150,9 @@ export class IndexedDBJobQueue implements JobQueue { async complete(jobs: Job[]): Promise { await this.ensureInitialized(); - const trx = this.database.transaction(['jobs'], 'readwrite'); + const trx = this.database.transaction(['jobs'], 'readwrite', { + durability: 'relaxed', + }); for (const { id } of jobs) { await trx @@ -162,7 +166,9 @@ export class IndexedDBJobQueue implements JobQueue { async return(jobs: Job[], retry: boolean = false): Promise { await this.ensureInitialized(); - const trx = this.database.transaction(['jobs'], 'readwrite'); + const trx = this.database.transaction(['jobs'], 'readwrite', { + durability: 'relaxed', + }); for (const { id } of jobs) { if (retry) { @@ -185,7 +191,9 @@ export class IndexedDBJobQueue implements JobQueue { async clear(): Promise { await this.ensureInitialized(); - const trx = this.database.transaction(['jobs'], 'readwrite'); + const trx = this.database.transaction(['jobs'], 'readwrite', { + durability: 'relaxed', + }); await trx.objectStore('jobs').clear(); } diff --git a/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts index 19a99e4825..1c81110d5c 100644 --- a/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts +++ b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts @@ -139,19 +139,7 @@ export class DocsIndexer extends Entity { return; } - const allIndexedDocs = ( - await this.docIndex.search( - { - type: 'all', - }, - { - pagination: { - limit: Number.MAX_SAFE_INTEGER, - skip: 0, - }, - } - ) - ).nodes.map(n => n.id); + const allIndexedDocs = (await this.docIndex.getAll()).map(d => d.id); workerOutput = await worker.run({ type: 'rootDoc', diff --git a/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts b/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts index 21d03eccd1..49e6982e09 100644 --- a/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts +++ b/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts @@ -20,7 +20,10 @@ import type { WorkerOutput, } from './types'; -let cachedRootDoc: { doc: YDoc; hash: string } | null = null; +const LRU_CACHE_SIZE = 5; + +// lru cache for ydoc instances, last used at the end of the array +const lruCache = [] as { doc: YDoc; hash: string }[]; async function digest(data: Uint8Array) { if ( @@ -35,6 +38,29 @@ async function digest(data: Uint8Array) { return lib0Digest(data); } +async function getOrCreateCachedYDoc(data: Uint8Array) { + try { + const hash = toHexString(await digest(data)); + const cachedIndex = lruCache.findIndex(item => item.hash === hash); + if (cachedIndex !== -1) { + const cached = lruCache.splice(cachedIndex, 1)[0]; + lruCache.push(cached); + return cached.doc; + } else { + const doc = new YDoc(); + if (!isEmptyUpdate(data)) { + applyUpdate(doc, data); + } + lruCache.push({ doc, hash }); + return doc; + } + } finally { + if (lruCache.length > LRU_CACHE_SIZE) { + lruCache.shift(); + } + } +} + async function crawlingDocData({ docBuffer, storageDocId, @@ -45,16 +71,7 @@ async function crawlingDocData({ return {}; } - const rootDocBufferHash = toHexString(await digest(rootDocBuffer)); - - let yRootDoc; - if (cachedRootDoc && cachedRootDoc.hash === rootDocBufferHash) { - yRootDoc = cachedRootDoc.doc; - } else { - yRootDoc = new YDoc(); - applyUpdate(yRootDoc, rootDocBuffer); - cachedRootDoc = { doc: yRootDoc, hash: rootDocBufferHash }; - } + const yRootDoc = await getOrCreateCachedYDoc(rootDocBuffer); let docId = null; for (const [id, subdoc] of yRootDoc.getMap('spaces')) { @@ -83,16 +100,18 @@ async function crawlingDocData({ deletedDoc: [docId], }; } else { - const ydoc = new YDoc(); + if (isEmptyUpdate(docBuffer)) { + return { + deletedDoc: [docId], + }; + } + + const ydoc = await getOrCreateCachedYDoc(docBuffer); let docTitle = ''; let summaryLenNeeded = 1000; let summary = ''; const blockDocuments: Document[] = []; - if (!isEmptyUpdate(docBuffer)) { - applyUpdate(ydoc, docBuffer); - } - const blocks = ydoc.getMap('blocks'); if (blocks.size === 0) { @@ -363,16 +382,14 @@ async function crawlingDocData({ } } -function crawlingRootDocData({ +async function crawlingRootDocData({ allIndexedDocs, rootDocBuffer, reindexAll, }: WorkerInput & { type: 'rootDoc'; -}): WorkerOutput { - const ydoc = new YDoc(); - - applyUpdate(ydoc, rootDocBuffer); +}): Promise { + const ydoc = await getOrCreateCachedYDoc(rootDocBuffer); const docs = ydoc.getMap('meta').get('pages') as | YArray> @@ -422,7 +439,7 @@ globalThis.onmessage = async (event: MessageEvent) => { try { let data; if (input.type === 'rootDoc') { - data = crawlingRootDocData(input); + data = await crawlingRootDocData(input); } else { data = await crawlingDocData(input); }