feat(infra): opti indexer performance (#8557)

This commit is contained in:
EYHN
2024-10-22 06:41:07 +00:00
parent d482e2f82e
commit 6ecdc8db7a
11 changed files with 264 additions and 217 deletions

View File

@@ -454,101 +454,107 @@ describe.each([
}); });
}); });
test('subscribe', async () => { test(
await writeData({ 'subscribe',
'1': { {
title: 'hello world', timeout: 30000,
}, },
}); async () => {
await writeData({
'1': {
title: 'hello world',
},
});
let value = null as any; let value = null as any;
index index
.search$({ .search$({
type: 'match', type: 'match',
field: 'title', field: 'title',
match: 'hello world', match: 'hello world',
}) })
.pipe(map(v => (value = v))) .pipe(map(v => (value = v)))
.subscribe(); .subscribe();
await vitest.waitFor( await vitest.waitFor(
() => { () => {
expect(value).toEqual({ expect(value).toEqual({
nodes: [ nodes: [
{ {
id: '1', id: '1',
score: expect.anything(), score: expect.anything(),
},
],
pagination: {
count: 1,
hasMore: false,
limit: expect.anything(),
skip: 0,
}, },
], });
pagination: { },
count: 1, {
hasMore: false, timeout: 10000,
limit: expect.anything(), }
skip: 0, );
},
});
},
{
timeout: 5000,
}
);
await writeData({ await writeData({
'2': { '2': {
title: 'hello world', title: 'hello world',
}, },
}); });
await vitest.waitFor( await vitest.waitFor(
() => { () => {
expect(value).toEqual({ expect(value).toEqual({
nodes: [ nodes: [
{ {
id: '1', id: '1',
score: expect.anything(), score: expect.anything(),
},
{
id: '2',
score: expect.anything(),
},
],
pagination: {
count: 2,
hasMore: false,
limit: expect.anything(),
skip: 0,
}, },
{ });
id: '2', },
score: expect.anything(), {
}, timeout: 10000,
], }
pagination: { );
count: 2,
hasMore: false,
limit: expect.anything(),
skip: 0,
},
});
},
{
timeout: 5000,
}
);
const writer = await index.write(); const writer = await index.write();
writer.delete('1'); writer.delete('1');
await writer.commit(); await writer.commit();
await vitest.waitFor( await vitest.waitFor(
() => { () => {
expect(value).toEqual({ expect(value).toEqual({
nodes: [ nodes: [
{ {
id: '2', id: '2',
score: expect.anything(), score: expect.anything(),
},
],
pagination: {
count: 1,
hasMore: false,
limit: expect.anything(),
skip: 0,
}, },
], });
pagination: { },
count: 1, {
hasMore: false, timeout: 10000,
limit: expect.anything(), }
skip: 0, );
}, }
}); );
},
{
timeout: 5000,
}
);
});
}); });

View File

@@ -100,7 +100,7 @@ export class DataStruct {
} }
} }
async insert(trx: DataStructRWTransaction, document: Document) { private async insert(trx: DataStructRWTransaction, document: Document) {
const exists = await trx const exists = await trx
.objectStore('records') .objectStore('records')
.index('id') .index('id')
@@ -138,7 +138,7 @@ export class DataStruct {
} }
} }
async delete(trx: DataStructRWTransaction, id: string) { private async delete(trx: DataStructRWTransaction, id: string) {
const nid = await trx.objectStore('records').index('id').getKey(id); const nid = await trx.objectStore('records').index('id').getKey(id);
if (nid) { if (nid) {
@@ -159,11 +159,30 @@ export class DataStruct {
deletes: string[], deletes: string[],
inserts: Document[] inserts: Document[]
) { ) {
for (const del of deletes) { const startTime = performance.now();
await this.delete(trx, del); try {
} for (const del of deletes) {
for (const inst of inserts) { await this.delete(trx, del);
await this.insert(trx, inst); }
for (const inst of inserts) {
await this.insert(trx, inst);
}
} finally {
const endTime = performance.now();
if (BUILD_CONFIG.debug) {
performance.measure(
`[IndexedDB Indexer] Batch Write (${this.databaseName})`,
{
start: startTime,
end: endTime,
}
);
}
logger.debug(
`[indexer ${this.databaseName}] batchWrite`,
endTime - startTime,
'ms'
);
} }
} }
@@ -214,18 +233,6 @@ export class DataStruct {
throw new Error(`Query type '${query.type}' not supported`); throw new Error(`Query type '${query.type}' not supported`);
} }
private async query(
trx: DataStructROTransaction,
query: Query<any>
): Promise<Match> {
const match = await this.queryRaw(trx, query);
const filteredMatch = match.asyncFilter(async nid => {
const record = await trx.objectStore('records').getKey(nid);
return record !== undefined;
});
return filteredMatch;
}
async clear(trx: DataStructRWTransaction) { async clear(trx: DataStructRWTransaction) {
await trx.objectStore('records').clear(); await trx.objectStore('records').clear();
await trx.objectStore('invertedIndex').clear(); await trx.objectStore('invertedIndex').clear();
@@ -244,7 +251,7 @@ export class DataStruct {
limit: options.pagination?.limit ?? 100, limit: options.pagination?.limit ?? 100,
}; };
const match = await this.query(trx, query); const match = await this.queryRaw(trx, query);
const nids = match const nids = match
.toArray() .toArray()
@@ -252,7 +259,11 @@ export class DataStruct {
const nodes = []; const nodes = [];
for (const nid of nids) { for (const nid of nids) {
nodes.push(await this.resultNode(trx, match, nid, options)); const record = await trx.objectStore('records').get(nid);
if (!record) {
continue;
}
nodes.push(this.resultNode(record, options, match, nid));
} }
return { return {
@@ -265,9 +276,20 @@ export class DataStruct {
nodes: nodes, nodes: nodes,
}; };
} finally { } finally {
const endTime = performance.now();
if (BUILD_CONFIG.debug) {
performance.measure(
`[IndexedDB Indexer] Search (${this.databaseName})`,
{
detail: { query, options },
start: startTime,
end: endTime,
}
);
}
logger.debug( logger.debug(
`[indexer ${this.databaseName}] search`, `[indexer ${this.databaseName}] search`,
performance.now() - startTime, endTime - startTime,
'ms', 'ms',
query query
); );
@@ -297,7 +319,7 @@ export class DataStruct {
limit: 0, limit: 0,
}; };
const match = await this.query(trx, query); const match = await this.queryRaw(trx, query);
const nids = match.toArray(); const nids = match.toArray();
@@ -308,9 +330,11 @@ export class DataStruct {
}[] = []; }[] = [];
for (const nid of nids) { for (const nid of nids) {
const values = (await trx.objectStore('records').get(nid))?.data.get( const record = await trx.objectStore('records').get(nid);
field if (!record) {
); continue;
}
const values = record.data.get(field);
for (const value of values ?? []) { for (const value of values ?? []) {
let bucket; let bucket;
let bucketIndex = buckets.findIndex(b => b.key === value); let bucketIndex = buckets.findIndex(b => b.key === value);
@@ -332,7 +356,7 @@ export class DataStruct {
bucket.nids.length - 1 < hitPagination.skip + hitPagination.limit bucket.nids.length - 1 < hitPagination.skip + hitPagination.limit
) { ) {
bucket.hits.push( bucket.hits.push(
await this.resultNode(trx, match, nid, options.hits ?? {}) this.resultNode(record, options.hits ?? {}, match, nid)
); );
} }
} }
@@ -373,9 +397,20 @@ export class DataStruct {
}, },
}; };
} finally { } finally {
const endTime = performance.now();
if (BUILD_CONFIG.debug) {
performance.measure(
`[IndexedDB Indexer] Aggregate (${this.databaseName})`,
{
detail: { query, field, options },
start: startTime,
end: endTime,
}
);
}
logger.debug( logger.debug(
`[indexer ${this.databaseName}] aggregate`, `[indexer ${this.databaseName}] aggregate`,
performance.now() - startTime, endTime - startTime,
'ms' 'ms'
); );
} }
@@ -383,12 +418,19 @@ export class DataStruct {
async getAll( async getAll(
trx: DataStructROTransaction, trx: DataStructROTransaction,
ids: string[] ids?: string[]
): Promise<Document[]> { ): Promise<Document[]> {
const docs = []; const docs = [];
for (const id of ids) { if (ids) {
const record = await trx.objectStore('records').index('id').get(id); for (const id of ids) {
if (record) { const record = await trx.objectStore('records').index('id').get(id);
if (record) {
docs.push(Document.from(record.id, record.data));
}
}
} else {
const records = await trx.objectStore('records').getAll();
for (const record of records) {
docs.push(Document.from(record.id, record.data)); docs.push(Document.from(record.id, record.data));
} }
} }
@@ -405,7 +447,10 @@ export class DataStruct {
await this.ensureInitialized(); await this.ensureInitialized();
return this.database.transaction( return this.database.transaction(
['records', 'invertedIndex', 'kvMetadata'], ['records', 'invertedIndex', 'kvMetadata'],
'readonly' 'readonly',
{
durability: 'relaxed',
}
); );
} }
@@ -413,7 +458,10 @@ export class DataStruct {
await this.ensureInitialized(); await this.ensureInitialized();
return this.database.transaction( return this.database.transaction(
['records', 'invertedIndex', 'kvMetadata'], ['records', 'invertedIndex', 'kvMetadata'],
'readwrite' 'readwrite',
{
durability: 'relaxed',
}
); );
} }
@@ -446,20 +494,15 @@ export class DataStruct {
}); });
} }
private async resultNode( private resultNode(
trx: DataStructROTransaction, record: { id: string; data: Map<string, string[]> },
match: Match, options: SearchOptions<any>,
nid: number, match?: Match,
options: SearchOptions<any> nid?: number
): Promise<SearchResult<any, any>['nodes'][number]> { ): SearchResult<any, any>['nodes'][number] {
const record = await trx.objectStore('records').get(nid);
if (!record) {
throw new Error(`Record not found for nid ${nid}`);
}
const node = { const node = {
id: record.id, id: record.id,
score: match.getScore(nid), score: match && nid ? match.getScore(nid) : 1,
} as any; } as any;
if (options.fields) { if (options.fields) {
@@ -473,7 +516,7 @@ export class DataStruct {
node.fields = fields; node.fields = fields;
} }
if (options.highlights) { if (match && nid && options.highlights) {
const highlights = {} as Record<string, string[]>; const highlights = {} as Record<string, string[]>;
for (const { field, before, end } of options.highlights) { for (const { field, before, end } of options.highlights) {
const highlightValues = match.getHighlighters(nid, field); const highlightValues = match.getHighlighters(nid, field);

View File

@@ -61,7 +61,7 @@ export class IndexedDBIndex<S extends Schema> implements Index<S> {
options: SearchOptions<any> = {} options: SearchOptions<any> = {}
): Observable<SearchResult<any, SearchOptions<any>>> { ): Observable<SearchResult<any, SearchOptions<any>>> {
return merge(of(1), this.broadcast$).pipe( return merge(of(1), this.broadcast$).pipe(
throttleTime(500, undefined, { leading: true, trailing: true }), throttleTime(3000, undefined, { leading: true, trailing: true }),
exhaustMapWithTrailing(() => { exhaustMapWithTrailing(() => {
return from( return from(
(async () => { (async () => {
@@ -88,7 +88,7 @@ export class IndexedDBIndex<S extends Schema> implements Index<S> {
options: AggregateOptions<any> = {} options: AggregateOptions<any> = {}
): Observable<AggregateResult<S, AggregateOptions<any>>> { ): Observable<AggregateResult<S, AggregateOptions<any>>> {
return merge(of(1), this.broadcast$).pipe( return merge(of(1), this.broadcast$).pipe(
throttleTime(500, undefined, { leading: true, trailing: true }), throttleTime(3000, undefined, { leading: true, trailing: true }),
exhaustMapWithTrailing(() => { exhaustMapWithTrailing(() => {
return from( return from(
(async () => { (async () => {
@@ -120,7 +120,7 @@ export class IndexedDBIndexWriter<S extends Schema> implements IndexWriter<S> {
return (await this.getAll([id]))[0] ?? null; return (await this.getAll([id]))[0] ?? null;
} }
async getAll(ids: string[]): Promise<Document<S>[]> { async getAll(ids?: string[]): Promise<Document<S>[]> {
const trx = await this.data.readonly(); const trx = await this.data.readonly();
return this.data.getAll(trx, ids); return this.data.getAll(trx, ids);
} }
@@ -138,6 +138,7 @@ export class IndexedDBIndexWriter<S extends Schema> implements IndexWriter<S> {
async commit(): Promise<void> { async commit(): Promise<void> {
await this.data.batchWrite(this.trx, this.deletes, this.inserts); await this.data.batchWrite(this.trx, this.deletes, this.inserts);
this.trx.commit();
this.channel.postMessage(1); this.channel.postMessage(1);
} }

View File

@@ -202,6 +202,12 @@ export class FullTextInvertedIndex implements InvertedIndex {
} }
> >
>(); >();
const avgFieldLength =
(
await trx
.objectStore('kvMetadata')
.get(`full-text:avg-field-length:${this.fieldKey}`)
)?.value ?? 0;
for (const token of queryTokens) { for (const token of queryTokens) {
const key = InvertedIndexKey.forString(this.fieldKey, token.term); const key = InvertedIndexKey.forString(this.fieldKey, token.term);
const objs = await trx const objs = await trx
@@ -229,12 +235,6 @@ export class FullTextInvertedIndex implements InvertedIndex {
}; };
const termFreq = position.rs.length; const termFreq = position.rs.length;
const totalCount = objs.length; const totalCount = objs.length;
const avgFieldLength =
(
await trx
.objectStore('kvMetadata')
.get(`full-text:avg-field-length:${this.fieldKey}`)
)?.value ?? 0;
const fieldLength = position.l; const fieldLength = position.l;
const score = const score =
bm25(termFreq, 1, totalCount, fieldLength, avgFieldLength) * bm25(termFreq, 1, totalCount, fieldLength, avgFieldLength) *

View File

@@ -95,28 +95,6 @@ export class Match {
.map(e => e[0]); .map(e => e[0]);
} }
filter(predicate: (id: number) => boolean) {
const newWeight = new Match();
for (const [id, score] of this.scores) {
if (predicate(id)) {
newWeight.addScore(id, score);
newWeight.copyExtData(this, id);
}
}
return newWeight;
}
async asyncFilter(predicate: (id: number) => Promise<boolean>) {
const newWeight = new Match();
for (const [id, score] of this.scores) {
if (await predicate(id)) {
newWeight.addScore(id, score);
newWeight.copyExtData(this, id);
}
}
return newWeight;
}
private copyExtData(from: Match, id: number) { private copyExtData(from: Match, id: number) {
for (const [field, values] of from.highlighters.get(id) ?? []) { for (const [field, values] of from.highlighters.get(id) ?? []) {
for (const [index, ranges] of values) { for (const [index, ranges] of values) {

View File

@@ -47,16 +47,22 @@ export class DataStruct {
} }
} }
getAll(ids: string[]): Document[] { getAll(ids?: string[]): Document[] {
return ids if (ids) {
.map(id => { return ids
const nid = this.idMap.get(id); .map(id => {
if (nid === undefined) { const nid = this.idMap.get(id);
return undefined; if (nid === undefined) {
} return undefined;
return Document.from(id, this.records[nid].data); }
}) return Document.from(id, this.records[nid].data);
.filter((v): v is Document => v !== undefined); })
.filter((v): v is Document => v !== undefined);
} else {
return this.records
.filter(record => !record.deleted)
.map(record => Document.from(record.id, record.data));
}
} }
insert(document: Document) { insert(document: Document) {

View File

@@ -28,7 +28,7 @@ export class MemoryIndex<S extends Schema> implements Index<S> {
return (await this.getAll([id]))[0] ?? null; return (await this.getAll([id]))[0] ?? null;
} }
getAll(ids: string[]): Promise<Document<S>[]> { getAll(ids?: string[]): Promise<Document<S>[]> {
return Promise.resolve(this.data.getAll(ids)); return Promise.resolve(this.data.getAll(ids));
} }

View File

@@ -31,7 +31,7 @@ export interface IndexWriter<S extends Schema>
export interface IndexReader<S extends Schema> { export interface IndexReader<S extends Schema> {
get(id: string): Promise<Document<S> | null>; get(id: string): Promise<Document<S> | null>;
getAll(ids: string[]): Promise<Document<S>[]>; getAll(ids?: string[]): Promise<Document<S>[]>;
has(id: string): Promise<boolean>; has(id: string): Promise<boolean>;
} }

View File

@@ -50,7 +50,9 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
async accept(): Promise<Job[] | null> { async accept(): Promise<Job[] | null> {
await this.ensureInitialized(); await this.ensureInitialized();
const jobs = []; const jobs = [];
const trx = this.database.transaction(['jobs'], 'readwrite'); const trx = this.database.transaction(['jobs'], 'readwrite', {
durability: 'relaxed',
});
// if no priority jobs // if no priority jobs
@@ -148,7 +150,9 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
async complete(jobs: Job[]): Promise<void> { async complete(jobs: Job[]): Promise<void> {
await this.ensureInitialized(); await this.ensureInitialized();
const trx = this.database.transaction(['jobs'], 'readwrite'); const trx = this.database.transaction(['jobs'], 'readwrite', {
durability: 'relaxed',
});
for (const { id } of jobs) { for (const { id } of jobs) {
await trx await trx
@@ -162,7 +166,9 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
async return(jobs: Job[], retry: boolean = false): Promise<void> { async return(jobs: Job[], retry: boolean = false): Promise<void> {
await this.ensureInitialized(); await this.ensureInitialized();
const trx = this.database.transaction(['jobs'], 'readwrite'); const trx = this.database.transaction(['jobs'], 'readwrite', {
durability: 'relaxed',
});
for (const { id } of jobs) { for (const { id } of jobs) {
if (retry) { if (retry) {
@@ -185,7 +191,9 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
async clear(): Promise<void> { async clear(): Promise<void> {
await this.ensureInitialized(); await this.ensureInitialized();
const trx = this.database.transaction(['jobs'], 'readwrite'); const trx = this.database.transaction(['jobs'], 'readwrite', {
durability: 'relaxed',
});
await trx.objectStore('jobs').clear(); await trx.objectStore('jobs').clear();
} }

View File

@@ -139,19 +139,7 @@ export class DocsIndexer extends Entity {
return; return;
} }
const allIndexedDocs = ( const allIndexedDocs = (await this.docIndex.getAll()).map(d => d.id);
await this.docIndex.search(
{
type: 'all',
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
skip: 0,
},
}
)
).nodes.map(n => n.id);
workerOutput = await worker.run({ workerOutput = await worker.run({
type: 'rootDoc', type: 'rootDoc',

View File

@@ -20,7 +20,10 @@ import type {
WorkerOutput, WorkerOutput,
} from './types'; } from './types';
let cachedRootDoc: { doc: YDoc; hash: string } | null = null; const LRU_CACHE_SIZE = 5;
// lru cache for ydoc instances, last used at the end of the array
const lruCache = [] as { doc: YDoc; hash: string }[];
async function digest(data: Uint8Array) { async function digest(data: Uint8Array) {
if ( if (
@@ -35,6 +38,29 @@ async function digest(data: Uint8Array) {
return lib0Digest(data); return lib0Digest(data);
} }
async function getOrCreateCachedYDoc(data: Uint8Array) {
try {
const hash = toHexString(await digest(data));
const cachedIndex = lruCache.findIndex(item => item.hash === hash);
if (cachedIndex !== -1) {
const cached = lruCache.splice(cachedIndex, 1)[0];
lruCache.push(cached);
return cached.doc;
} else {
const doc = new YDoc();
if (!isEmptyUpdate(data)) {
applyUpdate(doc, data);
}
lruCache.push({ doc, hash });
return doc;
}
} finally {
if (lruCache.length > LRU_CACHE_SIZE) {
lruCache.shift();
}
}
}
async function crawlingDocData({ async function crawlingDocData({
docBuffer, docBuffer,
storageDocId, storageDocId,
@@ -45,16 +71,7 @@ async function crawlingDocData({
return {}; return {};
} }
const rootDocBufferHash = toHexString(await digest(rootDocBuffer)); const yRootDoc = await getOrCreateCachedYDoc(rootDocBuffer);
let yRootDoc;
if (cachedRootDoc && cachedRootDoc.hash === rootDocBufferHash) {
yRootDoc = cachedRootDoc.doc;
} else {
yRootDoc = new YDoc();
applyUpdate(yRootDoc, rootDocBuffer);
cachedRootDoc = { doc: yRootDoc, hash: rootDocBufferHash };
}
let docId = null; let docId = null;
for (const [id, subdoc] of yRootDoc.getMap('spaces')) { for (const [id, subdoc] of yRootDoc.getMap('spaces')) {
@@ -83,16 +100,18 @@ async function crawlingDocData({
deletedDoc: [docId], deletedDoc: [docId],
}; };
} else { } else {
const ydoc = new YDoc(); if (isEmptyUpdate(docBuffer)) {
return {
deletedDoc: [docId],
};
}
const ydoc = await getOrCreateCachedYDoc(docBuffer);
let docTitle = ''; let docTitle = '';
let summaryLenNeeded = 1000; let summaryLenNeeded = 1000;
let summary = ''; let summary = '';
const blockDocuments: Document<BlockIndexSchema>[] = []; const blockDocuments: Document<BlockIndexSchema>[] = [];
if (!isEmptyUpdate(docBuffer)) {
applyUpdate(ydoc, docBuffer);
}
const blocks = ydoc.getMap<any>('blocks'); const blocks = ydoc.getMap<any>('blocks');
if (blocks.size === 0) { if (blocks.size === 0) {
@@ -363,16 +382,14 @@ async function crawlingDocData({
} }
} }
function crawlingRootDocData({ async function crawlingRootDocData({
allIndexedDocs, allIndexedDocs,
rootDocBuffer, rootDocBuffer,
reindexAll, reindexAll,
}: WorkerInput & { }: WorkerInput & {
type: 'rootDoc'; type: 'rootDoc';
}): WorkerOutput { }): Promise<WorkerOutput> {
const ydoc = new YDoc(); const ydoc = await getOrCreateCachedYDoc(rootDocBuffer);
applyUpdate(ydoc, rootDocBuffer);
const docs = ydoc.getMap('meta').get('pages') as const docs = ydoc.getMap('meta').get('pages') as
| YArray<YMap<any>> | YArray<YMap<any>>
@@ -422,7 +439,7 @@ globalThis.onmessage = async (event: MessageEvent<WorkerIngoingMessage>) => {
try { try {
let data; let data;
if (input.type === 'rootDoc') { if (input.type === 'rootDoc') {
data = crawlingRootDocData(input); data = await crawlingRootDocData(input);
} else { } else {
data = await crawlingDocData(input); data = await crawlingDocData(input);
} }