From 90e4a9b181ebb5bde5724376aecd541459ed035c Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 2 Jul 2024 08:52:33 +0000 Subject: [PATCH] feat: doc search infra (#7166) detail in `packages/common/infra/src/sync/indexer/README.md` --- packages/common/infra/package.json | 4 + .../common/infra/src/sync/indexer/README.md | 147 +++++ .../sync/indexer/__tests__/black-box.spec.ts | 554 ++++++++++++++++++ .../common/infra/src/sync/indexer/document.ts | 48 ++ .../infra/src/sync/indexer/field-type.ts | 1 + .../impl/indexeddb/__tests__/bm25.spec.ts | 9 + .../indexeddb/__tests__/highlighter.spec.ts | 32 + .../indexeddb/__tests__/tokenizer.spec.ts | 128 ++++ .../src/sync/indexer/impl/indexeddb/bm25.ts | 62 ++ .../indexer/impl/indexeddb/data-struct.ts | 465 +++++++++++++++ .../indexer/impl/indexeddb/highlighter.ts | 77 +++ .../src/sync/indexer/impl/indexeddb/index.ts | 171 ++++++ .../indexer/impl/indexeddb/inverted-index.ts | 429 ++++++++++++++ .../src/sync/indexer/impl/indexeddb/match.ts | 127 ++++ .../sync/indexer/impl/indexeddb/tokenizer.ts | 162 +++++ .../sync/indexer/impl/memory/data-struct.ts | 282 +++++++++ .../src/sync/indexer/impl/memory/index.ts | 141 +++++ .../indexer/impl/memory/inverted-index.ts | 220 +++++++ .../src/sync/indexer/impl/memory/match.ts | 108 ++++ .../common/infra/src/sync/indexer/index.ts | 6 + .../common/infra/src/sync/indexer/indexer.ts | 38 ++ .../common/infra/src/sync/indexer/query.ts | 35 ++ .../common/infra/src/sync/indexer/schema.ts | 7 + .../common/infra/src/sync/indexer/searcher.ts | 83 +++ .../src/utils/exhaustmap-with-trailing.ts | 41 ++ yarn.lock | 11 + 26 files changed, 3388 insertions(+) create mode 100644 packages/common/infra/src/sync/indexer/README.md create mode 100644 packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts create mode 100644 packages/common/infra/src/sync/indexer/document.ts create mode 100644 packages/common/infra/src/sync/indexer/field-type.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/bm25.spec.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/highlighter.spec.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/tokenizer.spec.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/bm25.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/highlighter.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/indexeddb/tokenizer.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/memory/index.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/memory/inverted-index.ts create mode 100644 packages/common/infra/src/sync/indexer/impl/memory/match.ts create mode 100644 packages/common/infra/src/sync/indexer/index.ts create mode 100644 packages/common/infra/src/sync/indexer/indexer.ts create mode 100644 packages/common/infra/src/sync/indexer/query.ts create mode 100644 packages/common/infra/src/sync/indexer/schema.ts create mode 100644 packages/common/infra/src/sync/indexer/searcher.ts create mode 100644 packages/common/infra/src/utils/exhaustmap-with-trailing.ts diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index 15ebb8affc..de0d7c120b 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -18,6 +18,9 @@ "@blocksuite/store": "0.15.0-canary-202407011031-17e7b65", "@datastructures-js/binary-search-tree": "^5.3.2", "foxact": "^0.2.33", + "fuse.js": "^7.0.0", + "graphemer": "^1.4.0", + "idb": "^8.0.0", "jotai": "^2.8.0", "jotai-effect": "^1.0.0", "lodash-es": "^4.17.21", @@ -33,6 +36,7 @@ "@blocksuite/presets": "0.15.0-canary-202407011031-17e7b65", "@testing-library/react": "^16.0.0", "async-call-rpc": "^6.4.0", + "fake-indexeddb": "^6.0.0", "react": "^18.2.0", "rxjs": "^7.8.1", "vite": "^5.2.8", diff --git a/packages/common/infra/src/sync/indexer/README.md b/packages/common/infra/src/sync/indexer/README.md new file mode 100644 index 0000000000..dc258fac7b --- /dev/null +++ b/packages/common/infra/src/sync/indexer/README.md @@ -0,0 +1,147 @@ +# index + +Search engine abstraction layer for AFFiNE. + +## Using + +1. Define schema + +First, we need to define the shape of the data. Currently, there are the following data types. + +- 'Integer' +- 'Boolean' +- 'FullText': for full-text search, it will be tokenized and stemmed. +- 'String': for exact match search, e.g. tags, ids. + +```typescript +const schema = defineSchema({ + title: 'FullText', + tag: 'String', + size: 'Integer', +}); +``` + +> **Array type** +> All types can contain one or more values, so each field can store an array. + +2. Pick a backend + +Currently, there are two backends available. + +- `MemoryIndex`: in-memory indexer, useful for testing. +- `IndexedDBIndex`: persistent indexer using IndexedDB. + +> **Underlying Data Table** +> Some back-end processes need to maintain underlying data tables, including table creation and migration. This operation should be silently executed the first time the indexer is invoked. +> Callers do not need to worry about these details. +> +> This design conforms to the usual conventions of search engine APIs, such as in Elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html + +3. Write data + +Write data to the indexer. you need to start a write transaction by `await index.write()` first and then complete the batch write through `await writer.commit()`. + +> **Transactional** +> Typically, the indexer does not provide transactional guarantees; reliable locking logic needs to be implemented at a higher level. + +```typescript +const indexer = new IndexedDBIndex(schema); + +const writer = await index.write(); +writer.insert( + Document.from('id', { + title: 'hello world', + tag: ['doc', 'page'], + size: '100', + }) +); +await writer.commit(); +``` + +4. Search data + +To search for content in the indexer, you need to use a specific **query language**. Here are some examples: + +```typescript +// match title == 'hello world' +{ + type: 'match', + field: 'title', + match: 'hello world', +} + +// match title == 'hello world' && tag == 'doc' +{ + type: 'boolean', + occur: 'must', + queries: [ + { + type: 'match', + field: 'title', + match: 'hello world', + }, + { + type: 'match', + field: 'tag', + match: 'doc', + }, + ], +} +``` + +There are two ways to perform the search, `index.search()` and `index.aggregate()`. + +- **search**: return each matched node and pagination information. +- **aggregate**: aggregate all matched results based on a certain field into buckets, and return the count and score of items in each bucket. + +Examples: + +```typescript +const result = await index.search({ + type: 'match', + field: 'title', + match: 'hello world', +}); +// result = { +// nodes: [ +// { +// id: '1', +// score: 1, +// }, +// ], +// pagination: { +// count: 1, +// hasMore: false, +// limit: 10, +// skip: 0, +// }, +// } +``` + +```typescript +const result = await index.aggregate( + { + type: 'match', + field: 'title', + match: 'affine', + }, + 'tag' +); +// result = { +// buckets: [ +// { key: 'motorcycle', count: 2, score: 1 }, +// { key: 'bike', count: 1, score: 1 }, +// { key: 'airplane', count: 1, score: 1 }, +// ], +// pagination: { +// count: 3, +// hasMore: false, +// limit: 10, +// skip: 0, +// }, +// } +``` + +More uses: + +[black-box.spec.ts](./__tests__/black-box.spec.ts) diff --git a/packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts b/packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts new file mode 100644 index 0000000000..110f0d286c --- /dev/null +++ b/packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts @@ -0,0 +1,554 @@ +/** + * @vitest-environment happy-dom + */ +import 'fake-indexeddb/auto'; + +import { map } from 'rxjs'; +import { beforeEach, describe, expect, test, vitest } from 'vitest'; + +import { defineSchema, Document, type Index } from '..'; +import { IndexedDBIndex } from '../impl/indexeddb'; +import { MemoryIndex } from '../impl/memory'; + +const schema = defineSchema({ + title: 'FullText', + tag: 'String', + size: 'Integer', +}); + +let index: Index = null!; + +describe.each([ + { name: 'memory', backend: MemoryIndex }, + { name: 'idb', backend: IndexedDBIndex }, +])('index tests($name)', ({ backend }) => { + async function writeData( + data: Record< + string, + Partial> + > + ) { + const writer = await index.write(); + for (const [id, item] of Object.entries(data)) { + const doc = new Document(id); + for (const [key, value] of Object.entries(item)) { + if (Array.isArray(value)) { + for (const v of value) { + doc.insert(key, v); + } + } else { + doc.insert(key, value); + } + } + writer.insert(doc); + } + await writer.commit(); + } + + beforeEach(async () => { + index = new backend(schema); + await index.clear(); + }); + + test('basic', async () => { + await writeData({ + '1': { + title: 'hello world', + }, + }); + + const result = await index.search({ + type: 'match', + field: 'title', + match: 'hello world', + }); + + expect(result).toEqual({ + nodes: [ + { + id: '1', + score: expect.anything(), + }, + ], + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }); + + test('basic integer', async () => { + await writeData({ + '1': { + title: 'hello world', + size: '100', + }, + }); + + const result = await index.search({ + type: 'match', + field: 'size', + match: '100', + }); + + expect(result).toEqual({ + nodes: [ + { + id: '1', + score: expect.anything(), + }, + ], + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }); + + test('fuzz', async () => { + await writeData({ + '1': { + title: 'hello world', + }, + }); + const result = await index.search({ + type: 'match', + field: 'title', + match: 'hell', + }); + + expect(result).toEqual({ + nodes: [ + { + id: '1', + score: expect.anything(), + }, + ], + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }); + + test('highlight', async () => { + await writeData({ + '1': { + title: 'hello world', + size: '100', + }, + }); + + const result = await index.search( + { + type: 'match', + field: 'title', + match: 'hello', + }, + { + highlights: [ + { + field: 'title', + before: '', + end: '', + }, + ], + } + ); + + expect(result).toEqual({ + nodes: expect.arrayContaining([ + { + id: '1', + score: expect.anything(), + highlights: { + title: [expect.stringContaining('hello')], + }, + }, + ]), + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }); + + test('fields', async () => { + await writeData({ + '1': { + title: 'hello world', + tag: ['car', 'bike'], + }, + }); + + const result = await index.search( + { + type: 'match', + field: 'title', + match: 'hello', + }, + { + fields: ['title', 'tag'], + } + ); + + expect(result.nodes[0].fields).toEqual({ + title: 'hello world', + tag: expect.arrayContaining(['bike', 'car']), + }); + }); + + test('pagination', async () => { + await writeData( + Array.from({ length: 100 }).reduce((acc: any, _, i) => { + acc['apple' + i] = { + tag: ['apple'], + }; + return acc; + }, {}) as any + ); + + const result = await index.search( + { + type: 'match', + field: 'tag', + match: 'apple', + }, + { + pagination: { + skip: 0, + limit: 10, + }, + } + ); + + expect(result).toEqual({ + nodes: expect.arrayContaining( + Array.from({ length: 10 }).fill({ + id: expect.stringContaining('apple'), + score: expect.anything(), + }) + ), + pagination: { + count: 100, + hasMore: true, + limit: 10, + skip: 0, + }, + }); + + const result2 = await index.search( + { + type: 'match', + field: 'tag', + match: 'apple', + }, + { + pagination: { + skip: 10, + limit: 10, + }, + } + ); + + expect(result2).toEqual({ + nodes: expect.arrayContaining( + Array.from({ length: 10 }).fill({ + id: expect.stringContaining('apple'), + score: expect.anything(), + }) + ), + pagination: { + count: 100, + hasMore: true, + limit: 10, + skip: 10, + }, + }); + }); + + test('aggr', async () => { + await writeData({ + '1': { + title: 'hello world', + tag: ['car', 'bike'], + }, + affine1: { + title: 'affine', + tag: ['motorcycle', 'bike'], + }, + affine2: { + title: 'affine', + tag: ['motorcycle', 'airplane'], + }, + }); + + const result = await index.aggregate( + { + type: 'match', + field: 'title', + match: 'affine', + }, + 'tag' + ); + + expect(result).toEqual({ + buckets: expect.arrayContaining([ + { key: 'motorcycle', count: 2, score: expect.anything() }, + { key: 'bike', count: 1, score: expect.anything() }, + { key: 'airplane', count: 1, score: expect.anything() }, + ]), + pagination: { + count: 3, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }); + + test('hits', async () => { + await writeData( + Array.from({ length: 100 }).reduce((acc: any, _, i) => { + acc['apple' + i] = { + title: 'apple', + tag: ['apple', 'fruit'], + }; + return acc; + }, {}) as any + ); + const result = await index.aggregate( + { + type: 'match', + field: 'title', + match: 'apple', + }, + 'tag', + { + hits: { + pagination: { + skip: 0, + limit: 5, + }, + highlights: [ + { + field: 'title', + before: '', + end: '', + }, + ], + fields: ['title', 'tag'], + }, + } + ); + + expect(result).toEqual({ + buckets: expect.arrayContaining([ + { + key: 'apple', + count: 100, + score: expect.anything(), + hits: { + pagination: { + count: 100, + hasMore: true, + limit: 5, + skip: 0, + }, + nodes: expect.arrayContaining( + Array.from({ length: 5 }).fill({ + id: expect.stringContaining('apple'), + score: expect.anything(), + highlights: { + title: [expect.stringContaining('apple')], + }, + fields: { + title: expect.stringContaining('apple'), + tag: expect.arrayContaining(['apple', 'fruit']), + }, + }) + ), + }, + }, + { + key: 'fruit', + count: 100, + score: expect.anything(), + hits: { + pagination: { + count: 100, + hasMore: true, + limit: 5, + skip: 0, + }, + nodes: expect.arrayContaining( + Array.from({ length: 5 }).fill({ + id: expect.stringContaining('apple'), + score: expect.anything(), + highlights: { + title: [expect.stringContaining('apple')], + }, + fields: { + title: expect.stringContaining('apple'), + tag: expect.arrayContaining(['apple', 'fruit']), + }, + }) + ), + }, + }, + ]), + pagination: { + count: 2, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }); + + test('exists', async () => { + await writeData({ + '1': { + title: 'hello world', + tag: '111', + }, + '2': { + tag: '222', + }, + '3': { + title: 'hello world', + tag: '333', + }, + }); + + const result = await index.search({ + type: 'exists', + field: 'title', + }); + + expect(result).toEqual({ + nodes: expect.arrayContaining([ + { + id: '1', + score: expect.anything(), + }, + { + id: '3', + score: expect.anything(), + }, + ]), + pagination: { + count: 2, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }); + + test('subscribe', async () => { + await writeData({ + '1': { + title: 'hello world', + }, + }); + + let value = null as any; + index + .search$({ + type: 'match', + field: 'title', + match: 'hello world', + }) + .pipe(map(v => (value = v))) + .subscribe(); + + await vitest.waitFor( + () => { + expect(value).toEqual({ + nodes: [ + { + id: '1', + score: expect.anything(), + }, + ], + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }, + { + timeout: 5000, + } + ); + + await writeData({ + '2': { + title: 'hello world', + }, + }); + + await vitest.waitFor( + () => { + expect(value).toEqual({ + nodes: [ + { + id: '1', + score: expect.anything(), + }, + { + id: '2', + score: expect.anything(), + }, + ], + pagination: { + count: 2, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }, + { + timeout: 5000, + } + ); + + const writer = await index.write(); + writer.delete('1'); + await writer.commit(); + + await vitest.waitFor( + () => { + expect(value).toEqual({ + nodes: [ + { + id: '2', + score: expect.anything(), + }, + ], + pagination: { + count: 1, + hasMore: false, + limit: expect.anything(), + skip: 0, + }, + }); + }, + { + timeout: 5000, + } + ); + }); +}); diff --git a/packages/common/infra/src/sync/indexer/document.ts b/packages/common/infra/src/sync/indexer/document.ts new file mode 100644 index 0000000000..5983800b33 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/document.ts @@ -0,0 +1,48 @@ +import type { Schema } from './schema'; + +export class Document { + constructor(public readonly id: string) {} + + fields = new Map(); + + public insert(field: F, value: string | string[]) { + const values = this.fields.get(field) ?? []; + if (Array.isArray(value)) { + values.push(...value); + } else { + values.push(value); + } + this.fields.set(field, values); + } + + get(field: F): string[] | string | undefined { + const values = this.fields.get(field); + if (values === undefined) { + return undefined; + } else if (values.length === 1) { + return values[0]; + } else { + return values; + } + } + + static from( + id: string, + map: + | Partial> + | Map + ): Document { + const doc = new Document(id); + + if (map instanceof Map) { + for (const [key, value] of map) { + doc.insert(key, value); + } + } else { + for (const key in map) { + doc.insert(key, map[key] as string | string[]); + } + } + return doc; + } +} diff --git a/packages/common/infra/src/sync/indexer/field-type.ts b/packages/common/infra/src/sync/indexer/field-type.ts new file mode 100644 index 0000000000..2953440cdf --- /dev/null +++ b/packages/common/infra/src/sync/indexer/field-type.ts @@ -0,0 +1 @@ +export type FieldType = 'Integer' | 'FullText' | 'String' | 'Boolean'; diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/bm25.spec.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/bm25.spec.ts new file mode 100644 index 0000000000..d6856167cc --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/bm25.spec.ts @@ -0,0 +1,9 @@ +import { expect, test } from 'vitest'; + +import { bm25 } from '../bm25'; + +test('bm25', () => { + expect(bm25(1, 1, 10, 10, 15)).toEqual(3.2792079793859643); + expect(bm25(2, 1, 10, 10, 15) > bm25(1, 1, 10, 10, 15)).toBeTruthy(); + expect(bm25(1, 1, 10, 10, 15) > bm25(2, 1, 10, 100, 15)).toBeTruthy(); +}); diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/highlighter.spec.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/highlighter.spec.ts new file mode 100644 index 0000000000..8b60e73883 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/highlighter.spec.ts @@ -0,0 +1,32 @@ +import { expect, test } from 'vitest'; + +import { highlighter } from '../highlighter'; + +test('highlighter', () => { + expect(highlighter('0123456789', '', '', [[3, 5]])).toEqual( + '0123456789' + ); + + expect( + highlighter( + '012345678901234567890123456789012345678901234567890123456789', + '', + '', + [[59, 60]] + ) + ).toEqual('...01234567890123456789012345678901234567890123456789'); + + expect( + highlighter( + '012345678901234567890123456789012345678901234567890123456789', + '', + '', + [ + [10, 11], + [49, 51], + ] + ) + ).toEqual( + '01234567890123456789012345678901234567890123456789...' + ); +}); diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/tokenizer.spec.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/tokenizer.spec.ts new file mode 100644 index 0000000000..99fb4bce2b --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/__tests__/tokenizer.spec.ts @@ -0,0 +1,128 @@ +import { expect, test } from 'vitest'; + +import { GeneralTokenizer } from '../tokenizer'; + +test('tokenizer', () => { + { + const tokens = new GeneralTokenizer().tokenize('hello world,\n AFFiNE'); + + expect(tokens).toEqual([ + { term: 'hello', start: 0, end: 5 }, + { term: 'world', start: 7, end: 12 }, + { term: 'affine', start: 15, end: 21 }, + ]); + } + + { + const tokens = new GeneralTokenizer().tokenize('你好世界,阿芬'); + + expect(tokens).toEqual([ + { + end: 2, + start: 0, + term: '你好', + }, + { + end: 3, + start: 1, + term: '好世', + }, + { + end: 4, + start: 2, + term: '世界', + }, + { + end: 7, + start: 5, + term: '阿芬', + }, + ]); + } + + { + const tokens = new GeneralTokenizer().tokenize('1阿2芬'); + + expect(tokens).toEqual([ + { term: '1', start: 0, end: 1 }, + { term: '阿', start: 1, end: 2 }, + { term: '2', start: 2, end: 3 }, + { term: '芬', start: 3, end: 4 }, + ]); + } + + { + const tokens = new GeneralTokenizer().tokenize('안녕하세요 세계'); + + expect(tokens).toEqual([ + { + end: 2, + start: 0, + term: '안녕', + }, + { + end: 3, + start: 1, + term: '녕하', + }, + { + end: 4, + start: 2, + term: '하세', + }, + { + end: 5, + start: 3, + term: '세요', + }, + { + end: 8, + start: 6, + term: '세계', + }, + ]); + } + + { + const tokens = new GeneralTokenizer().tokenize('ハローワールド'); + + expect(tokens).toEqual([ + { term: 'ハロ', start: 0, end: 2 }, + { term: 'ロー', start: 1, end: 3 }, + { term: 'ーワ', start: 2, end: 4 }, + { term: 'ワー', start: 3, end: 5 }, + { term: 'ール', start: 4, end: 6 }, + { term: 'ルド', start: 5, end: 7 }, + ]); + } + + { + const tokens = new GeneralTokenizer().tokenize('はろーわーるど'); + + expect(tokens).toEqual([ + { term: 'はろ', start: 0, end: 2 }, + { term: 'ろー', start: 1, end: 3 }, + { term: 'ーわ', start: 2, end: 4 }, + { term: 'わー', start: 3, end: 5 }, + { term: 'ーる', start: 4, end: 6 }, + { term: 'るど', start: 5, end: 7 }, + ]); + } + + { + const tokens = new GeneralTokenizer().tokenize('👋1️⃣🚪👋🏿'); + + expect(tokens).toEqual([ + { term: '👋', start: 0, end: 2 }, + { term: '1️⃣', start: 2, end: 5 }, + { term: '🚪', start: 5, end: 7 }, + { term: '👋🏿', start: 7, end: 11 }, + ]); + } + + { + const tokens = new GeneralTokenizer().tokenize('1️'); + + expect(tokens).toEqual([{ term: '1️', start: 0, end: 2 }]); + } +}); diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/bm25.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/bm25.ts new file mode 100644 index 0000000000..9f69b56424 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/bm25.ts @@ -0,0 +1,62 @@ +/** + * Parameters of the BM25+ scoring algorithm. Customizing these is almost never + * necessary, and finetuning them requires an understanding of the BM25 scoring + * model. + * + * Some information about BM25 (and BM25+) can be found at these links: + * + * - https://en.wikipedia.org/wiki/Okapi_BM25 + * - https://opensourceconnections.com/blog/2015/10/16/bm25-the-next-generation-of-lucene-relevation/ + */ +export type BM25Params = { + /** Term frequency saturation point. + * + * Recommended values are between `1.2` and `2`. Higher values increase the + * difference in score between documents with higher and lower term + * frequencies. Setting this to `0` or a negative value is invalid. Defaults + * to `1.2` + */ + k: number; + + /** + * Length normalization impact. + * + * Recommended values are around `0.75`. Higher values increase the weight + * that field length has on scoring. Setting this to `0` (not recommended) + * means that the field length has no effect on scoring. Negative values are + * invalid. Defaults to `0.7`. + */ + b: number; + + /** + * BM25+ frequency normalization lower bound (usually called δ). + * + * Recommended values are between `0.5` and `1`. Increasing this parameter + * increases the minimum relevance of one occurrence of a search term + * regardless of its (possibly very long) field length. Negative values are + * invalid. Defaults to `0.5`. + */ + d: number; +}; + +const defaultBM25params: BM25Params = { k: 1.2, b: 0.7, d: 0.5 }; + +export const bm25 = ( + termFreq: number, + matchingCount: number, + totalCount: number, + fieldLength: number, + avgFieldLength: number, + bm25params: BM25Params = defaultBM25params +): number => { + const { k, b, d } = bm25params; + const invDocFreq = Math.log( + 1 + (totalCount - matchingCount + 0.5) / (matchingCount + 0.5) + ); + return ( + invDocFreq * + (d + + (termFreq * (k + 1)) / + (termFreq + k * (1 - b + (b * fieldLength) / avgFieldLength))) + ); +}; diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts new file mode 100644 index 0000000000..f418bd0674 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/data-struct.ts @@ -0,0 +1,465 @@ +import { + type DBSchema, + type IDBPDatabase, + type IDBPTransaction, + openDB, + type StoreNames, +} from 'idb'; + +import { + type AggregateOptions, + type AggregateResult, + Document, + type Query, + type Schema, + type SearchOptions, + type SearchResult, +} from '../../'; +import { highlighter } from './highlighter'; +import { + BooleanInvertedIndex, + FullTextInvertedIndex, + IntegerInvertedIndex, + type InvertedIndex, + StringInvertedIndex, +} from './inverted-index'; +import { Match } from './match'; + +export interface IndexDB extends DBSchema { + kvMetadata: { + key: string; + value: { + key: string; + value: any; + }; + }; + records: { + key: number; + value: { + id: string; + data: Map; + }; + indexes: { id: string }; + }; + invertedIndex: { + key: number; + value: { + nid: number; + pos?: { + i: number /* index */; + l: number /* length */; + rs: [number, number][] /* ranges: [start, end] */; + }; + key: ArrayBuffer; + }; + indexes: { key: ArrayBuffer; nid: number }; + }; +} + +export type DataStructRWTransaction = IDBPTransaction< + IndexDB, + ArrayLike>, + 'readwrite' +>; + +export type DataStructROTransaction = IDBPTransaction< + IndexDB, + ArrayLike>, + 'readonly' | 'readwrite' +>; + +export class DataStruct { + private initializePromise: Promise | null = null; + database: IDBPDatabase = null as any; + invertedIndex = new Map(); + + constructor( + readonly databaseName: string, + schema: Schema + ) { + for (const [key, type] of Object.entries(schema)) { + if (type === 'String') { + this.invertedIndex.set(key, new StringInvertedIndex(key)); + } else if (type === 'Integer') { + this.invertedIndex.set(key, new IntegerInvertedIndex(key)); + } else if (type === 'FullText') { + this.invertedIndex.set(key, new FullTextInvertedIndex(key)); + } else if (type === 'Boolean') { + this.invertedIndex.set(key, new BooleanInvertedIndex(key)); + } else { + throw new Error(`Field type '${type}' not supported`); + } + } + } + + async insert(trx: DataStructRWTransaction, document: Document) { + const exists = await trx + .objectStore('records') + .index('id') + .get(document.id); + + if (exists) { + throw new Error('Document already exists'); + } + + const nid = await trx.objectStore('records').add({ + id: document.id, + data: new Map(document.fields as Map), + }); + + for (const [key, values] of document.fields) { + const iidx = this.invertedIndex.get(key as string); + if (!iidx) { + throw new Error( + `Inverted index '${key.toString()}' not found, document not match schema` + ); + } + await iidx.insert(trx, nid, values); + } + } + + async delete(trx: DataStructRWTransaction, id: string) { + const nid = await trx.objectStore('records').index('id').getKey(id); + + if (nid) { + await trx.objectStore('records').delete(nid); + } + + const indexIds = await trx + .objectStore('invertedIndex') + .index('nid') + .getAllKeys(nid); + for (const indexId of indexIds) { + await trx.objectStore('invertedIndex').delete(indexId); + } + } + + async batchWrite( + trx: DataStructRWTransaction, + deletes: string[], + inserts: Document[] + ) { + for (const del of deletes) { + await this.delete(trx, del); + } + for (const inst of inserts) { + await this.insert(trx, inst); + } + } + + async matchAll(trx: DataStructROTransaction): Promise { + const allNids = await trx.objectStore('records').getAllKeys(); + const match = new Match(); + + for (const nid of allNids) { + match.addScore(nid, 1); + } + return match; + } + + private async queryRaw( + trx: DataStructROTransaction, + query: Query + ): Promise { + if (query.type === 'match') { + const iidx = this.invertedIndex.get(query.field as string); + if (!iidx) { + throw new Error(`Field '${query.field as string}' not found`); + } + return await iidx.match(trx, query.match); + } else if (query.type === 'boolean') { + const weights = []; + for (const q of query.queries) { + weights.push(await this.queryRaw(trx, q)); + } + if (query.occur === 'must') { + return weights.reduce((acc, w) => acc.and(w)); + } else if (query.occur === 'must_not') { + const total = weights.reduce((acc, w) => acc.and(w)); + return (await this.matchAll(trx)).exclude(total); + } else if (query.occur === 'should') { + return weights.reduce((acc, w) => acc.or(w)); + } + } else if (query.type === 'all') { + return await this.matchAll(trx); + } else if (query.type === 'boost') { + return (await this.queryRaw(trx, query.query)).boost(query.boost); + } else if (query.type === 'exists') { + const iidx = this.invertedIndex.get(query.field as string); + if (!iidx) { + throw new Error(`Field '${query.field as string}' not found`); + } + return await iidx.all(trx); + } + throw new Error(`Query type '${query.type}' not supported`); + } + + private async query( + trx: DataStructROTransaction, + query: Query + ): Promise { + const match = await this.queryRaw(trx, query); + const filteredMatch = match.asyncFilter(async nid => { + const record = await trx.objectStore('records').getKey(nid); + return record !== undefined; + }); + return filteredMatch; + } + + async clear(trx: DataStructRWTransaction) { + await trx.objectStore('records').clear(); + await trx.objectStore('invertedIndex').clear(); + await trx.objectStore('kvMetadata').clear(); + } + + async search( + trx: DataStructROTransaction, + query: Query, + options: SearchOptions + ): Promise> { + const pagination = { + skip: options.pagination?.skip ?? 0, + limit: options.pagination?.limit ?? 100, + }; + + const match = await this.query(trx, query); + + const nids = match + .toArray() + .slice(pagination.skip, pagination.skip + pagination.limit); + + const nodes = []; + for (const nid of nids) { + nodes.push(await this.resultNode(trx, match, nid, options)); + } + + return { + pagination: { + count: match.size(), + hasMore: match.size() > pagination.limit + pagination.skip, + limit: pagination.limit, + skip: pagination.skip, + }, + nodes: nodes, + }; + } + + async aggregate( + trx: DataStructROTransaction, + query: Query, + field: string, + options: AggregateOptions + ): Promise> { + const pagination = { + skip: options.pagination?.skip ?? 0, + limit: options.pagination?.limit ?? 100, + }; + + const hitPagination = options.hits + ? { + skip: options.hits.pagination?.skip ?? 0, + limit: options.hits.pagination?.limit ?? 3, + } + : { + skip: 0, + limit: 0, + }; + + const match = await this.query(trx, query); + + const nids = match.toArray(); + + const buckets: { + key: string; + nids: number[]; + hits: SearchResult['nodes']; + }[] = []; + + for (const nid of nids) { + const values = (await trx.objectStore('records').get(nid))?.data.get( + field + ); + for (const value of values ?? []) { + let bucket; + let bucketIndex = buckets.findIndex(b => b.key === value); + if (bucketIndex === -1) { + bucket = { key: value, nids: [], hits: [] }; + buckets.push(bucket); + bucketIndex = buckets.length - 1; + } else { + bucket = buckets[bucketIndex]; + } + + if ( + bucketIndex >= pagination.skip && + bucketIndex < pagination.skip + pagination.limit + ) { + bucket.nids.push(nid); + if ( + bucket.nids.length - 1 >= hitPagination.skip && + bucket.nids.length - 1 < hitPagination.skip + hitPagination.limit + ) { + bucket.hits.push( + await this.resultNode(trx, match, nid, options.hits ?? {}) + ); + } + } + } + } + + return { + buckets: buckets + .slice(pagination.skip, pagination.skip + pagination.limit) + .map(bucket => { + const result = { + key: bucket.key, + score: match.getScore(bucket.nids[0]), + count: bucket.nids.length, + } as AggregateResult['buckets'][number]; + + if (options.hits) { + (result as any).hits = { + pagination: { + count: bucket.nids.length, + hasMore: + bucket.nids.length > hitPagination.limit + hitPagination.skip, + limit: hitPagination.limit, + skip: hitPagination.skip, + }, + nodes: bucket.hits, + } as SearchResult; + } + + return result; + }), + pagination: { + count: buckets.length, + hasMore: buckets.length > pagination.limit + pagination.skip, + limit: pagination.limit, + skip: pagination.skip, + }, + }; + } + + async getAll( + trx: DataStructROTransaction, + ids: string[] + ): Promise { + const docs = []; + for (const id of ids) { + const record = await trx.objectStore('records').index('id').get(id); + if (record) { + docs.push(Document.from(record.id, record.data)); + } + } + + return docs; + } + + async has(trx: DataStructROTransaction, id: string): Promise { + const nid = await trx.objectStore('records').index('id').getKey(id); + return nid !== undefined; + } + + async readonly() { + await this.ensureInitialized(); + return this.database.transaction( + ['records', 'invertedIndex', 'kvMetadata'], + 'readonly' + ); + } + + async readwrite() { + await this.ensureInitialized(); + return this.database.transaction( + ['records', 'invertedIndex', 'kvMetadata'], + 'readwrite' + ); + } + + private async ensureInitialized() { + if (this.database) { + return; + } + this.initializePromise ??= this.initialize(); + await this.initializePromise; + } + + private async initialize() { + this.database = await openDB(this.databaseName, 1, { + upgrade(database) { + database.createObjectStore('kvMetadata', { + keyPath: 'key', + }); + const recordsStore = database.createObjectStore('records', { + autoIncrement: true, + }); + recordsStore.createIndex('id', 'id', { + unique: true, + }); + const invertedIndexStore = database.createObjectStore('invertedIndex', { + autoIncrement: true, + }); + invertedIndexStore.createIndex('key', 'key', { unique: false }); + invertedIndexStore.createIndex('nid', 'nid', { unique: false }); + }, + }); + } + + private async resultNode( + trx: DataStructROTransaction, + match: Match, + nid: number, + options: SearchOptions + ): Promise['nodes'][number]> { + const record = await trx.objectStore('records').get(nid); + if (!record) { + throw new Error(`Record not found for nid ${nid}`); + } + + const node = { + id: record.id, + score: match.getScore(nid), + } as any; + + if (options.fields) { + const fields = {} as Record; + for (const field of options.fields as string[]) { + fields[field] = record.data.get(field) ?? ['']; + if (fields[field].length === 1) { + fields[field] = fields[field][0]; + } + } + node.fields = fields; + } + + if (options.highlights) { + const highlights = {} as Record; + for (const { field, before, end } of options.highlights) { + const highlightValues = match.getHighlighters(nid, field); + if (highlightValues) { + const rawValues = record.data.get(field) ?? []; + highlights[field] = Array.from(highlightValues) + .map(([index, ranges]) => { + const raw = rawValues[index]; + + if (raw) { + return ( + highlighter(raw, before, end, ranges, { + maxPrefix: 20, + maxLength: 50, + }) ?? '' + ); + } + + return ''; + }) + .filter(Boolean); + } + } + node.highlights = highlights; + } + + return node; + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/highlighter.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/highlighter.ts new file mode 100644 index 0000000000..6986dfcd84 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/highlighter.ts @@ -0,0 +1,77 @@ +export function highlighter( + originText: string, + before: string, + after: string, + matches: [number, number][], + { + maxLength = 50, + maxPrefix = 20, + }: { maxLength?: number; maxPrefix?: number } = {} +) { + const merged = mergeRanges(matches); + + if (merged.length === 0) { + return null; + } + + const firstMatch = merged[0][0]; + const start = Math.max( + 0, + Math.min(firstMatch - maxPrefix, originText.length - maxLength) + ); + const end = Math.min(start + maxLength, originText.length); + const text = originText.substring(start, end); + + let result = ''; + + let pointer = 0; + for (const match of merged) { + const matchStart = match[0] - start; + const matchEnd = match[1] - start; + if (matchStart >= text.length) { + break; + } + result += text.substring(pointer, matchStart); + pointer = matchStart; + const highlighted = text.substring(matchStart, matchEnd); + + if (highlighted.length === 0) { + continue; + } + + result += `${before}${highlighted}${after}`; + pointer = matchEnd; + } + result += text.substring(pointer); + + if (start > 0) { + result = `...${result}`; + } + + if (end < originText.length) { + result = `${result}...`; + } + + return result; +} + +function mergeRanges(intervals: [number, number][]) { + if (intervals.length === 0) return []; + + intervals.sort((a, b) => a[0] - b[0]); + + const merged = [intervals[0]]; + + for (let i = 1; i < intervals.length; i++) { + const last = merged[merged.length - 1]; + const current = intervals[i]; + + if (current[0] <= last[1]) { + last[1] = Math.max(last[1], current[1]); + } else { + merged.push(current); + } + } + + return merged; +} diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts new file mode 100644 index 0000000000..8b017bc2dd --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts @@ -0,0 +1,171 @@ +import type { Observable } from 'rxjs'; +import { from, merge, of, Subject, throttleTime } from 'rxjs'; + +import { exhaustMapWithTrailing } from '../../../../utils/exhaustmap-with-trailing'; +import { + type AggregateOptions, + type AggregateResult, + type Document, + type Index, + type IndexStorage, + type IndexWriter, + type Query, + type Schema, + type SearchOptions, + type SearchResult, +} from '../../'; +import { DataStruct, type DataStructRWTransaction } from './data-struct'; + +export class IndexedDBIndex implements Index { + data: DataStruct = new DataStruct(this.databaseName, this.schema); + broadcast$ = new Subject(); + + constructor( + private readonly schema: S, + private readonly databaseName: string = 'indexer' + ) { + const channel = new BroadcastChannel(this.databaseName + ':indexer'); + channel.onmessage = () => { + this.broadcast$.next(1); + }; + } + + async get(id: string): Promise | null> { + return (await this.getAll([id]))[0] ?? null; + } + + async getAll(ids: string[]): Promise[]> { + const trx = await this.data.readonly(); + return this.data.getAll(trx, ids); + } + + async write(): Promise> { + return new IndexedDBIndexWriter(this.data, await this.data.readwrite()); + } + + async has(id: string): Promise { + const trx = await this.data.readonly(); + return this.data.has(trx, id); + } + + async search( + query: Query, + options: SearchOptions = {} + ): Promise>> { + const trx = await this.data.readonly(); + return this.data.search(trx, query, options); + } + + search$( + query: Query, + options: SearchOptions = {} + ): Observable>> { + return merge(of(1), this.broadcast$).pipe( + throttleTime(500, undefined, { leading: true, trailing: true }), + exhaustMapWithTrailing(() => { + return from( + (async () => { + const trx = await this.data.readonly(); + return this.data.search(trx, query, options); + })() + ); + }) + ); + } + + async aggregate( + query: Query, + field: string, + options: AggregateOptions = {} + ): Promise>> { + const trx = await this.data.readonly(); + return this.data.aggregate(trx, query, field, options); + } + + aggregate$( + query: Query, + field: string, + options: AggregateOptions = {} + ): Observable>> { + return merge(of(1), this.broadcast$).pipe( + throttleTime(500, undefined, { leading: true, trailing: true }), + exhaustMapWithTrailing(() => { + return from( + (async () => { + const trx = await this.data.readonly(); + return this.data.aggregate(trx, query, field, options); + })() + ); + }) + ); + } + + async clear(): Promise { + const trx = await this.data.readwrite(); + return this.data.clear(trx); + } +} + +export class IndexedDBIndexWriter implements IndexWriter { + inserts: Document[] = []; + deletes: string[] = []; + channel = new BroadcastChannel(this.data.databaseName + ':indexer'); + + constructor( + private readonly data: DataStruct, + private readonly trx: DataStructRWTransaction + ) {} + + async get(id: string): Promise | null> { + return (await this.getAll([id]))[0] ?? null; + } + + async getAll(ids: string[]): Promise[]> { + const trx = await this.data.readonly(); + return this.data.getAll(trx, ids); + } + + insert(document: Document): void { + this.inserts.push(document); + } + delete(id: string): void { + this.deletes.push(id); + } + put(document: Document): void { + this.delete(document.id); + this.insert(document); + } + + async commit(): Promise { + await this.data.batchWrite(this.trx, this.deletes, this.inserts); + this.channel.postMessage(1); + } + + rollback(): void {} + + has(id: string): Promise { + return this.data.has(this.trx, id); + } + + async search( + query: Query, + options: SearchOptions = {} + ): Promise>> { + return this.data.search(this.trx, query, options); + } + + async aggregate( + query: Query, + field: string, + options: AggregateOptions = {} + ): Promise>> { + return this.data.aggregate(this.trx, query, field, options); + } +} + +export class IndexedDBIndexStorage implements IndexStorage { + constructor(private readonly databaseName: string) {} + getIndex(name: string, s: S): Index { + return new IndexedDBIndex(s, this.databaseName + ':' + name); + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts new file mode 100644 index 0000000000..88f5373304 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/inverted-index.ts @@ -0,0 +1,429 @@ +import { bm25 } from './bm25'; +import type { + DataStructROTransaction, + DataStructRWTransaction, +} from './data-struct'; +import { Match } from './match'; +import { GeneralTokenizer, type Token } from './tokenizer'; + +export interface InvertedIndex { + fieldKey: string; + + match(trx: DataStructROTransaction, term: string): Promise; + + all(trx: DataStructROTransaction): Promise; + + insert( + trx: DataStructRWTransaction, + id: number, + terms: string[] + ): Promise; +} + +export class StringInvertedIndex implements InvertedIndex { + constructor(readonly fieldKey: string) {} + + async match(trx: DataStructROTransaction, term: string): Promise { + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll(InvertedIndexKey.forString(this.fieldKey, term).buffer()); + const match = new Match(); + for (const obj of objs) { + match.addScore(obj.nid, 1); + } + return match; + } + + async all(trx: DataStructROTransaction): Promise { + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll( + IDBKeyRange.bound( + InvertedIndexKey.forPrefix(this.fieldKey).buffer(), + InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer() + ) + ); + + const set = new Set(); + for (const obj of objs) { + set.add(obj.nid); + } + + const match = new Match(); + for (const nid of set) { + match.addScore(nid, 1); + } + return match; + } + + async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { + for (const term of terms) { + await trx.objectStore('invertedIndex').add({ + key: InvertedIndexKey.forString(this.fieldKey, term).buffer(), + nid: id, + }); + } + } +} + +export class IntegerInvertedIndex implements InvertedIndex { + constructor(readonly fieldKey: string) {} + + async match(trx: DataStructROTransaction, term: string): Promise { + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll(InvertedIndexKey.forInt64(this.fieldKey, BigInt(term)).buffer()); + const match = new Match(); + for (const obj of objs) { + match.addScore(obj.nid, 1); + } + return match; + } + + // eslint-disable-next-line sonarjs/no-identical-functions + async all(trx: DataStructROTransaction): Promise { + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll( + IDBKeyRange.bound( + InvertedIndexKey.forPrefix(this.fieldKey).buffer(), + InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer() + ) + ); + + const set = new Set(); + for (const obj of objs) { + set.add(obj.nid); + } + + const match = new Match(); + for (const nid of set) { + match.addScore(nid, 1); + } + return match; + } + + async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { + for (const term of terms) { + await trx.objectStore('invertedIndex').add({ + key: InvertedIndexKey.forInt64(this.fieldKey, BigInt(term)).buffer(), + nid: id, + }); + } + } +} + +export class BooleanInvertedIndex implements InvertedIndex { + constructor(readonly fieldKey: string) {} + + // eslint-disable-next-line sonarjs/no-identical-functions + async all(trx: DataStructROTransaction): Promise { + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll( + IDBKeyRange.bound( + InvertedIndexKey.forPrefix(this.fieldKey).buffer(), + InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer() + ) + ); + + const set = new Set(); + for (const obj of objs) { + set.add(obj.nid); + } + + const match = new Match(); + for (const nid of set) { + match.addScore(nid, 1); + } + return match; + } + + async match(trx: DataStructROTransaction, term: string): Promise { + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll( + InvertedIndexKey.forBoolean(this.fieldKey, term === 'true').buffer() + ); + const match = new Match(); + for (const obj of objs) { + match.addScore(obj.nid, 1); + } + return match; + } + + async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { + for (const term of terms) { + await trx.objectStore('invertedIndex').add({ + key: InvertedIndexKey.forBoolean( + this.fieldKey, + term === 'true' + ).buffer(), + nid: id, + }); + } + } +} + +export class FullTextInvertedIndex implements InvertedIndex { + constructor(readonly fieldKey: string) {} + + async match(trx: DataStructROTransaction, term: string): Promise { + const queryTokens = new GeneralTokenizer().tokenize(term); + const matched = new Map< + number, + { + score: number[]; + positions: Map; + } + >(); + for (const token of queryTokens) { + const key = InvertedIndexKey.forString(this.fieldKey, token.term); + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll( + IDBKeyRange.bound(key.buffer(), key.add1().buffer(), false, true) + ); + const submatched: { + nid: number; + score: number; + position: { + index: number; + ranges: [number, number][]; + }; + }[] = []; + for (const obj of objs) { + const key = InvertedIndexKey.fromBuffer(obj.key); + const originTokenTerm = key.asString(); + const matchLength = token.term.length; + const position = obj.pos ?? { + i: 0, + l: 0, + rs: [], + }; + const termFreq = position.rs.length; + const totalCount = objs.length; + const avgFieldLength = + ( + await trx + .objectStore('kvMetadata') + .get(`full-text:avg-field-length:${this.fieldKey}`) + )?.value ?? 0; + const fieldLength = position.l; + const score = + bm25(termFreq, 1, totalCount, fieldLength, avgFieldLength) * + (matchLength / originTokenTerm.length); + const match = { + score, + positions: new Map(), + }; + const ranges = match.positions.get(position.i) || []; + ranges.push( + ...position.rs.map(([start, _end]) => [start, start + matchLength]) + ); + match.positions.set(position.i, ranges); + submatched.push({ + nid: obj.nid, + score, + position: { + index: position.i, + ranges: position.rs.map(([start, _end]) => [ + start, + start + matchLength, + ]), + }, + }); + } + + // normalize score + const maxScore = submatched.reduce((acc, s) => Math.max(acc, s.score), 0); + const minScore = submatched.reduce((acc, s) => Math.min(acc, s.score), 1); + for (const { nid, score, position } of submatched) { + const normalizedScore = (score - minScore) / (maxScore - minScore); + const match = matched.get(nid) || { + score: [] as number[], + positions: new Map(), + }; + match.score.push(normalizedScore); + const ranges = match.positions.get(position.index) || []; + ranges.push(...position.ranges); + match.positions.set(position.index, ranges); + matched.set(nid, match); + } + } + const match = new Match(); + for (const [nid, { score, positions }] of matched) { + match.addScore( + nid, + score.reduce((acc, s) => acc + s, 0) + ); + + for (const [index, ranges] of positions) { + match.addHighlighter(nid, this.fieldKey, index, ranges); + } + } + return match; + } + + // eslint-disable-next-line sonarjs/no-identical-functions + async all(trx: DataStructROTransaction): Promise { + const objs = await trx + .objectStore('invertedIndex') + .index('key') + .getAll( + IDBKeyRange.bound( + InvertedIndexKey.forPrefix(this.fieldKey).buffer(), + InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer() + ) + ); + + const set = new Set(); + for (const obj of objs) { + set.add(obj.nid); + } + + const match = new Match(); + for (const nid of set) { + match.addScore(nid, 1); + } + return match; + } + + async insert(trx: DataStructRWTransaction, id: number, terms: string[]) { + for (let i = 0; i < terms.length; i++) { + const tokenMap = new Map(); + const term = terms[i]; + + const tokens = new GeneralTokenizer().tokenize(term); + + for (const token of tokens) { + const tokens = tokenMap.get(token.term) || []; + tokens.push(token); + tokenMap.set(token.term, tokens); + } + + for (const [term, tokens] of tokenMap) { + await trx.objectStore('invertedIndex').add({ + key: InvertedIndexKey.forString(this.fieldKey, term).buffer(), + nid: id, + pos: { + l: term.length, + i: i, + rs: tokens.map(token => [token.start, token.end]), + }, + }); + } + + const kvMetadataStore = trx.objectStore('kvMetadata'); + // update avg-field-length + const totalCount = + (await kvMetadataStore.get(`full-text:field-count:${this.fieldKey}`)) + ?.value ?? 0; + const avgFieldLength = + ( + await kvMetadataStore.get( + `full-text:avg-field-length:${this.fieldKey}` + ) + )?.value ?? 0; + await kvMetadataStore.put({ + key: `full-text:field-count:${this.fieldKey}`, + value: totalCount + 1, + }); + await kvMetadataStore.put({ + key: `full-text:avg-field-length:${this.fieldKey}`, + value: + avgFieldLength + + (terms.reduce((acc, term) => acc + term.length, 0) - avgFieldLength) / + (totalCount + 1), + }); + } + } +} + +export class InvertedIndexKey { + constructor( + readonly field: ArrayBuffer, + readonly value: ArrayBuffer, + readonly gap: ArrayBuffer = new Uint8Array([58]) + ) {} + + asString() { + return new TextDecoder().decode(this.value); + } + + asInt64() { + return new DataView(this.value).getBigInt64(0, false); /* big-endian */ + } + + add1() { + if (this.value.byteLength > 0) { + const bytes = new Uint8Array(this.value.slice(0)); + let carry = 1; + for (let i = bytes.length - 1; i >= 0 && carry > 0; i--) { + const sum = bytes[i] + carry; + bytes[i] = sum % 256; + carry = sum >> 8; + } + return new InvertedIndexKey(this.field, bytes); + } else { + return new InvertedIndexKey( + this.field, + new ArrayBuffer(0), + new Uint8Array([59]) + ); + } + } + + static forPrefix(field: string) { + return new InvertedIndexKey( + new TextEncoder().encode(field), + new ArrayBuffer(0) + ); + } + + static forString(field: string, value: string) { + return new InvertedIndexKey( + new TextEncoder().encode(field), + new TextEncoder().encode(value) + ); + } + + static forBoolean(field: string, value: boolean) { + const bytes = new Uint8Array(1); + bytes.set([value ? 1 : 0]); + return new InvertedIndexKey(new TextEncoder().encode(field), bytes); + } + + static forInt64(field: string, value: bigint) { + const bytes = new ArrayBuffer(8); + new DataView(bytes).setBigInt64(0, value, false); /* big-endian */ + return new InvertedIndexKey(new TextEncoder().encode(field), bytes); + } + + buffer() { + const tmp = new Uint8Array( + this.field.byteLength + (this.value?.byteLength ?? 0) + 1 + ); + tmp.set(new Uint8Array(this.field), 0); + tmp.set(new Uint8Array(this.gap), this.field.byteLength); + if (this.value.byteLength > 0) { + tmp.set(new Uint8Array(this.value), this.field.byteLength + 1); + } + return tmp.buffer; + } + + static fromBuffer(buffer: ArrayBuffer) { + const array = new Uint8Array(buffer); + const fieldLength = array.indexOf(58); + const field = array.slice(0, fieldLength); + const value = array.slice(fieldLength + 1); + return new InvertedIndexKey(field, value); + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts new file mode 100644 index 0000000000..10dbce0ca2 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/match.ts @@ -0,0 +1,127 @@ +export class Match { + scores = new Map(); + /** + * nid -> field -> index(multi value field) -> [start, end][] + */ + highlighters = new Map< + number, + Map> + >(); + + constructor() {} + + size() { + return this.scores.size; + } + + getScore(id: number) { + return this.scores.get(id) ?? 0; + } + + addScore(id: number, score: number) { + const currentScore = this.scores.get(id) || 0; + this.scores.set(id, currentScore + score); + } + + getHighlighters(id: number, field: string) { + return this.highlighters.get(id)?.get(field); + } + + addHighlighter( + id: number, + field: string, + index: number, + newRanges: [number, number][] + ) { + const fields = + this.highlighters.get(id) || + new Map>(); + const values = fields.get(field) || new Map(); + const ranges = values.get(index) || []; + ranges.push(...newRanges); + values.set(index, ranges); + fields.set(field, values); + this.highlighters.set(id, fields); + } + + and(other: Match) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + if (other.scores.has(id)) { + newWeight.addScore(id, score + (other.scores.get(id) ?? 0)); + newWeight.copyExtData(this, id); + newWeight.copyExtData(other, id); + } + } + return newWeight; + } + + or(other: Match) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + newWeight.addScore(id, score); + newWeight.copyExtData(this, id); + } + for (const [id, score] of other.scores) { + newWeight.addScore(id, score); + newWeight.copyExtData(other, id); + } + return newWeight; + } + + exclude(other: Match) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + if (!other.scores.has(id)) { + newWeight.addScore(id, score); + newWeight.copyExtData(this, id); + } + } + return newWeight; + } + + boost(boost: number) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + newWeight.addScore(id, score * boost); + newWeight.copyExtData(this, id); + } + return newWeight; + } + + toArray() { + return Array.from(this.scores.entries()) + .sort((a, b) => b[1] - a[1]) + .map(e => e[0]); + } + + filter(predicate: (id: number) => boolean) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + if (predicate(id)) { + newWeight.addScore(id, score); + newWeight.copyExtData(this, id); + } + } + return newWeight; + } + + async asyncFilter(predicate: (id: number) => Promise) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + if (await predicate(id)) { + newWeight.addScore(id, score); + newWeight.copyExtData(this, id); + } + } + return newWeight; + } + + private copyExtData(from: Match, id: number) { + for (const [field, values] of from.highlighters.get(id) ?? []) { + for (const [index, ranges] of values) { + this.addHighlighter(id, field, index, ranges); + } + } + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/tokenizer.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/tokenizer.ts new file mode 100644 index 0000000000..35dd5986e0 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/tokenizer.ts @@ -0,0 +1,162 @@ +import Graphemer from 'graphemer'; + +export interface Tokenizer { + tokenize(text: string): Token[]; +} + +export interface Token { + term: string; + start: number; + end: number; +} + +export class SimpleTokenizer implements Tokenizer { + tokenize(text: string): Token[] { + const tokens: Token[] = []; + let start = 0; + let end = 0; + let inWord = false; + for (let i = 0; i < text.length; i++) { + const c = text[i]; + if (c.match(/[\n\r\p{Z}\p{P}]/u)) { + if (inWord) { + end = i; + tokens.push({ + term: text.substring(start, end).toLowerCase(), + start, + end, + }); + inWord = false; + } + } else { + if (!inWord) { + start = i; + end = i; + inWord = true; + } + } + } + if (inWord) { + tokens.push({ + term: text.substring(start).toLowerCase(), + start, + end: text.length, + }); + } + return tokens; + } +} + +export class NGramTokenizer implements Tokenizer { + constructor(private readonly n: number) {} + + tokenize(text: string): Token[] { + const splitted: Token[] = []; + for (let i = 0; i < text.length; ) { + const nextBreak = Graphemer.nextBreak(text, i); + const c = text.substring(i, nextBreak); + + splitted.push({ + term: c, + start: i, + end: nextBreak, + }); + + i = nextBreak; + } + const tokens: Token[] = []; + for (let i = 0; i < splitted.length - this.n + 1; i++) { + tokens.push( + splitted.slice(i, i + this.n).reduce( + (acc, t) => ({ + term: acc.term + t.term, + start: Math.min(acc.start, t.start), + end: Math.max(acc.end, t.end), + }), + { term: '', start: Infinity, end: -Infinity } + ) + ); + } + return tokens; + } +} + +export class GeneralTokenizer implements Tokenizer { + constructor() {} + + tokenizeWord(word: string, lang: string): Token[] { + if (lang === 'en') { + return [{ term: word.toLowerCase(), start: 0, end: word.length }]; + } else if (lang === 'cjk') { + if (word.length < 3) { + return [{ term: word, start: 0, end: word.length }]; + } + return new NGramTokenizer(2).tokenize(word); + } else if (lang === 'emoji') { + return new NGramTokenizer(1).tokenize(word); + } else if (lang === '-') { + return []; + } + + throw new Error('Not implemented'); + } + + testLang(c: string): string { + if (c.match(/[\p{Emoji}]/u)) { + return 'emoji'; + } else if (c.match(/[\p{sc=Han}\p{scx=Hira}\p{scx=Kana}\p{sc=Hang}]/u)) { + return 'cjk'; + } else if (c.match(/[\n\r\p{Z}\p{P}]/u)) { + return '-'; + } else { + return 'en'; + } + } + + tokenize(text: string): Token[] { + const tokens: Token[] = []; + let start = 0; + let end = 0; + let lang: string | null = null; + + for (let i = 0; i < text.length; ) { + const nextBreak = Graphemer.nextBreak(text, i); + const c = text.substring(i, nextBreak); + + const l = this.testLang(c); + if (lang !== l) { + if (lang !== null) { + end = i; + tokens.push( + ...this.tokenizeWord(text.substring(start, end), lang).map( + token => ({ + ...token, + start: token.start + start, + end: token.end + start, + }) + ) + ); + } + + start = i; + end = i; + lang = l; + } + + i = nextBreak; + } + if (lang !== null) { + tokens.push( + ...this.tokenizeWord(text.substring(start, text.length), lang).map( + token => ({ + ...token, + start: token.start + start, + end: token.end + start, + }) + ) + ); + } + + return tokens; + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts b/packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts new file mode 100644 index 0000000000..ffa150b531 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/memory/data-struct.ts @@ -0,0 +1,282 @@ +import { + type AggregateOptions, + type AggregateResult, + Document, + type Query, + type Schema, + type SearchOptions, + type SearchResult, +} from '../../'; +import { + BooleanInvertedIndex, + FullTextInvertedIndex, + IntegerInvertedIndex, + type InvertedIndex, + StringInvertedIndex, +} from './inverted-index'; +import { Match } from './match'; + +type DataRecord = { + id: string; + data: Map; + deleted: boolean; +}; + +export class DataStruct { + records: DataRecord[] = []; + + idMap = new Map(); + + invertedIndex = new Map(); + + constructor(schema: Schema) { + for (const [key, type] of Object.entries(schema)) { + if (type === 'String') { + this.invertedIndex.set(key, new StringInvertedIndex(key)); + } else if (type === 'Integer') { + this.invertedIndex.set(key, new IntegerInvertedIndex(key)); + } else if (type === 'FullText') { + this.invertedIndex.set(key, new FullTextInvertedIndex(key)); + } else if (type === 'Boolean') { + this.invertedIndex.set(key, new BooleanInvertedIndex(key)); + } else { + throw new Error(`Field type '${type}' not supported`); + } + } + } + + getAll(ids: string[]): Document[] { + return ids + .map(id => { + const nid = this.idMap.get(id); + if (nid === undefined) { + return undefined; + } + return Document.from(id, this.records[nid].data); + }) + .filter((v): v is Document => v !== undefined); + } + + insert(document: Document) { + if (this.idMap.has(document.id)) { + throw new Error('Document already exists'); + } + + this.records.push({ + id: document.id, + data: document.fields as Map, + deleted: false, + }); + + const nid = this.records.length - 1; + this.idMap.set(document.id, nid); + for (const [key, values] of document.fields) { + for (const value of values) { + const iidx = this.invertedIndex.get(key as string); + if (!iidx) { + throw new Error( + `Inverted index '${key.toString()}' not found, document not match schema` + ); + } + iidx.insert(nid, value); + } + } + } + + delete(id: string) { + const nid = this.idMap.get(id); + if (nid === undefined) { + throw new Error('Document not found'); + } + + this.records[nid].deleted = true; + this.records[nid].data = new Map(); + } + + matchAll(): Match { + const weight = new Match(); + for (let i = 0; i < this.records.length; i++) { + weight.addScore(i, 1); + } + return weight; + } + + clear() { + this.records = []; + this.idMap.clear(); + this.invertedIndex.forEach(v => v.clear()); + } + + private queryRaw(query: Query): Match { + if (query.type === 'match') { + const iidx = this.invertedIndex.get(query.field as string); + if (!iidx) { + throw new Error(`Field '${query.field as string}' not found`); + } + return iidx.match(query.match); + } else if (query.type === 'boolean') { + const weights = query.queries.map(q => this.queryRaw(q)); + if (query.occur === 'must') { + return weights.reduce((acc, w) => acc.and(w)); + } else if (query.occur === 'must_not') { + const total = weights.reduce((acc, w) => acc.and(w)); + return this.matchAll().exclude(total); + } else if (query.occur === 'should') { + return weights.reduce((acc, w) => acc.or(w)); + } + } else if (query.type === 'all') { + return this.matchAll(); + } else if (query.type === 'boost') { + return this.queryRaw(query.query).boost(query.boost); + } else if (query.type === 'exists') { + const iidx = this.invertedIndex.get(query.field as string); + if (!iidx) { + throw new Error(`Field '${query.field as string}' not found`); + } + return iidx.all(); + } + throw new Error(`Query type '${query.type}' not supported`); + } + + query(query: Query): Match { + return this.queryRaw(query).filter(id => !this.records[id].deleted); + } + + search( + query: Query, + options: SearchOptions = {} + ): SearchResult { + const pagination = { + skip: options.pagination?.skip ?? 0, + limit: options.pagination?.limit ?? 100, + }; + + const match = this.query(query); + + const nids = match + .toArray() + .slice(pagination.skip, pagination.skip + pagination.limit); + + return { + pagination: { + count: match.size(), + hasMore: match.size() > pagination.limit + pagination.skip, + limit: pagination.limit, + skip: pagination.skip, + }, + nodes: nids.map(nid => this.resultNode(match, nid, options)), + }; + } + + aggregate( + query: Query, + field: string, + options: AggregateOptions = {} + ): AggregateResult { + const pagination = { + skip: options.pagination?.skip ?? 0, + limit: options.pagination?.limit ?? 100, + }; + + const match = this.query(query); + + const nids = match.toArray(); + + const buckets: { key: string; nids: number[] }[] = []; + + for (const nid of nids) { + for (const value of this.records[nid].data.get(field) ?? []) { + let bucket = buckets.find(b => b.key === value); + if (!bucket) { + bucket = { key: value, nids: [] }; + buckets.push(bucket); + } + bucket.nids.push(nid); + } + } + + return { + buckets: buckets + .slice(pagination.skip, pagination.skip + pagination.limit) + .map(bucket => { + const result = { + key: bucket.key, + score: match.getScore(bucket.nids[0]), + count: bucket.nids.length, + } as AggregateResult['buckets'][number]; + + if (options.hits) { + const hitsOptions = options.hits; + const pagination = { + skip: options.hits.pagination?.skip ?? 0, + limit: options.hits.pagination?.limit ?? 3, + }; + + const hits = bucket.nids.slice( + pagination.skip, + pagination.skip + pagination.limit + ); + + (result as any).hits = { + pagination: { + count: bucket.nids.length, + hasMore: + bucket.nids.length > pagination.limit + pagination.skip, + limit: pagination.limit, + skip: pagination.skip, + }, + nodes: hits.map(nid => this.resultNode(match, nid, hitsOptions)), + } as SearchResult; + } + + return result; + }), + pagination: { + count: buckets.length, + hasMore: buckets.length > pagination.limit + pagination.skip, + limit: pagination.limit, + skip: pagination.skip, + }, + }; + } + + has(id: string): boolean { + return this.idMap.has(id); + } + + private resultNode( + match: Match, + nid: number, + options: SearchOptions + ): SearchResult['nodes'][number] { + const node = { + id: this.records[nid].id, + score: match.getScore(nid), + } as any; + + if (options.fields) { + const fields = {} as Record; + for (const field of options.fields as string[]) { + fields[field] = this.records[nid].data.get(field) ?? ['']; + if (fields[field].length === 1) { + fields[field] = fields[field][0]; + } + } + node.fields = fields; + } + + if (options.highlights) { + const highlights = {} as Record; + for (const { field, before, end } of options.highlights) { + highlights[field] = match + .getHighlighters(nid, field) + .flatMap(highlighter => { + return highlighter(before, end); + }); + } + node.highlights = highlights; + } + + return node; + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/memory/index.ts b/packages/common/infra/src/sync/indexer/impl/memory/index.ts new file mode 100644 index 0000000000..2d05557cde --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/memory/index.ts @@ -0,0 +1,141 @@ +import { map, merge, type Observable, of, Subject, throttleTime } from 'rxjs'; + +import type { + AggregateOptions, + AggregateResult, + Document, + Index, + IndexStorage, + IndexWriter, + Query, + Schema, + SearchOptions, + SearchResult, +} from '../../'; +import { DataStruct } from './data-struct'; + +export class MemoryIndex implements Index { + private readonly data: DataStruct = new DataStruct(this.schema); + broadcast$ = new Subject(); + + constructor(private readonly schema: Schema) {} + + write(): Promise> { + return Promise.resolve(new MemoryIndexWriter(this.data, this.broadcast$)); + } + + async get(id: string): Promise | null> { + return (await this.getAll([id]))[0] ?? null; + } + + getAll(ids: string[]): Promise[]> { + return Promise.resolve(this.data.getAll(ids)); + } + + has(id: string): Promise { + return Promise.resolve(this.data.has(id)); + } + + async search( + query: Query, + options: SearchOptions = {} + ): Promise> { + return this.data.search(query, options); + } + + search$( + query: Query, + options: SearchOptions = {} + ): Observable> { + return merge(of(1), this.broadcast$).pipe( + throttleTime(500, undefined, { leading: false, trailing: true }), + map(() => this.data.search(query, options)) + ); + } + + async aggregate( + query: Query, + field: string, + options: AggregateOptions = {} + ): Promise> { + return this.data.aggregate(query, field, options); + } + + aggregate$( + query: Query, + field: string, + options: AggregateOptions = {} + ): Observable>> { + return merge(of(1), this.broadcast$).pipe( + throttleTime(500, undefined, { leading: false, trailing: true }), + map(() => this.data.aggregate(query, field, options)) + ); + } + + clear(): Promise { + this.data.clear(); + return Promise.resolve(); + } +} + +export class MemoryIndexWriter implements IndexWriter { + inserts: Document[] = []; + deletes: string[] = []; + + constructor( + private readonly data: DataStruct, + private readonly broadcast$: Subject + ) {} + + async get(id: string): Promise | null> { + return (await this.getAll([id]))[0] ?? null; + } + + getAll(ids: string[]): Promise[]> { + return Promise.resolve(this.data.getAll(ids)); + } + + insert(document: Document): void { + this.inserts.push(document); + } + delete(id: string): void { + this.deletes.push(id); + } + put(document: Document): void { + this.delete(document.id); + this.insert(document); + } + async search( + query: Query, + options: SearchOptions = {} + ): Promise> { + return this.data.search(query, options); + } + async aggregate( + query: Query, + field: string, + options: AggregateOptions = {} + ): Promise> { + return this.data.aggregate(query, field, options); + } + commit(): Promise { + for (const del of this.deletes) { + this.data.delete(del); + } + for (const inst of this.inserts) { + this.data.insert(inst); + } + this.broadcast$.next(1); + return Promise.resolve(); + } + rollback(): void {} + has(id: string): Promise { + return Promise.resolve(this.data.has(id)); + } +} + +export class MemoryIndexStorage implements IndexStorage { + getIndex(_: string, schema: S): Index { + return new MemoryIndex(schema); + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/memory/inverted-index.ts b/packages/common/infra/src/sync/indexer/impl/memory/inverted-index.ts new file mode 100644 index 0000000000..14c17e1dc4 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/memory/inverted-index.ts @@ -0,0 +1,220 @@ +import Fuse from 'fuse.js'; + +import { Match } from './match'; + +export interface InvertedIndex { + fieldKey: string; + + match(term: string): Match; + + all(): Match; + + insert(id: number, term: string): void; + + clear(): void; +} + +export class StringInvertedIndex implements InvertedIndex { + index: Map = new Map(); + + constructor(readonly fieldKey: string) {} + + match(term: string): Match { + const match = new Match(); + + for (const id of this.index.get(term) ?? []) { + match.addScore(id, 1); + } + + return match; + } + + all(): Match { + const match = new Match(); + + for (const [_term, ids] of this.index) { + for (const id of ids) { + if (match.getScore(id) === 0) { + match.addScore(id, 1); + } + } + } + + return match; + } + + insert(id: number, term: string): void { + const ids = this.index.get(term) ?? []; + ids.push(id); + this.index.set(term, ids); + } + + clear(): void { + this.index.clear(); + } +} + +export class IntegerInvertedIndex implements InvertedIndex { + index: Map = new Map(); + + constructor(readonly fieldKey: string) {} + + // eslint-disable-next-line sonarjs/no-identical-functions + match(term: string): Match { + const match = new Match(); + + for (const id of this.index.get(term) ?? []) { + match.addScore(id, 1); + } + + return match; + } + + // eslint-disable-next-line sonarjs/no-identical-functions + all(): Match { + const match = new Match(); + + for (const [_term, ids] of this.index) { + for (const id of ids) { + if (match.getScore(id) === 0) { + match.addScore(id, 1); + } + } + } + + return match; + } + + // eslint-disable-next-line sonarjs/no-identical-functions + insert(id: number, term: string): void { + const ids = this.index.get(term) ?? []; + ids.push(id); + this.index.set(term, ids); + } + + clear(): void { + this.index.clear(); + } +} + +export class BooleanInvertedIndex implements InvertedIndex { + index: Map = new Map(); + + constructor(readonly fieldKey: string) {} + + // eslint-disable-next-line sonarjs/no-identical-functions + match(term: string): Match { + const match = new Match(); + + for (const id of this.index.get(term === 'true') ?? []) { + match.addScore(id, 1); + } + + return match; + } + + // eslint-disable-next-line sonarjs/no-identical-functions + all(): Match { + const match = new Match(); + + for (const [_term, ids] of this.index) { + for (const id of ids) { + if (match.getScore(id) === 0) { + match.addScore(id, 1); + } + } + } + + return match; + } + + // eslint-disable-next-line sonarjs/no-identical-functions + insert(id: number, term: string): void { + const ids = this.index.get(term === 'true') ?? []; + ids.push(id); + this.index.set(term === 'true', ids); + } + + clear(): void { + this.index.clear(); + } +} + +export class FullTextInvertedIndex implements InvertedIndex { + records = [] as { id: number; v: string }[]; + index = Fuse.createIndex(['v'], [] as { id: number; v: string }[]); + + constructor(readonly fieldKey: string) {} + + match(term: string): Match { + const searcher = new Fuse( + this.records, + { + includeScore: true, + includeMatches: true, + shouldSort: true, + keys: ['v'], + }, + this.index + ); + const result = searcher.search(term); + + const match = new Match(); + + for (const value of result) { + match.addScore(value.item.id, 1 - (value.score ?? 1)); + + match.addHighlighter(value.item.id, this.fieldKey, (before, after) => { + const matches = value.matches; + if (!matches || matches.length === 0) { + return ['']; + } + + const firstMatch = matches[0]; + + const text = firstMatch.value; + if (!text) { + return ['']; + } + + let result = ''; + let pointer = 0; + for (const match of matches) { + for (const [start, end] of match.indices) { + result += text.substring(pointer, start); + result += `${before}${text.substring(start, end + 1)}${after}`; + pointer = end + 1; + } + } + result += text.substring(pointer); + + return [result]; + }); + } + + return match; + } + + // eslint-disable-next-line sonarjs/no-identical-functions + all(): Match { + const match = new Match(); + + for (const { id } of this.records) { + if (match.getScore(id) === 0) { + match.addScore(id, 1); + } + } + + return match; + } + + insert(id: number, term: string): void { + this.index.add({ id, v: term }); + this.records.push({ id, v: term }); + } + + clear(): void { + this.records = []; + this.index = Fuse.createIndex(['v'], [] as { id: number; v: string }[]); + } +} diff --git a/packages/common/infra/src/sync/indexer/impl/memory/match.ts b/packages/common/infra/src/sync/indexer/impl/memory/match.ts new file mode 100644 index 0000000000..917a62ebab --- /dev/null +++ b/packages/common/infra/src/sync/indexer/impl/memory/match.ts @@ -0,0 +1,108 @@ +export class Match { + scores = new Map(); + highlighters = new Map< + number, + Map string[])[]> + >(); + + constructor() {} + + size() { + return this.scores.size; + } + + getScore(id: number) { + return this.scores.get(id) ?? 0; + } + + addScore(id: number, score: number) { + const currentScore = this.scores.get(id) || 0; + this.scores.set(id, currentScore + score); + } + + getHighlighters(id: number, field: string) { + return this.highlighters.get(id)?.get(field) ?? []; + } + + addHighlighter( + id: number, + field: string, + highlighter: (before: string, after: string) => string[] + ) { + const fields = this.highlighters.get(id) || new Map(); + const highlighters = fields.get(field) || []; + highlighters.push(highlighter); + fields.set(field, highlighters); + this.highlighters.set(id, fields); + } + + and(other: Match) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + if (other.scores.has(id)) { + newWeight.addScore(id, score + (other.scores.get(id) ?? 0)); + newWeight.copyExtData(this, id); + newWeight.copyExtData(other, id); + } + } + return newWeight; + } + + or(other: Match) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + newWeight.addScore(id, score); + newWeight.copyExtData(this, id); + } + for (const [id, score] of other.scores) { + newWeight.addScore(id, score); + newWeight.copyExtData(other, id); + } + return newWeight; + } + + exclude(other: Match) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + if (!other.scores.has(id)) { + newWeight.addScore(id, score); + newWeight.copyExtData(this, id); + } + } + return newWeight; + } + + boost(boost: number) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + newWeight.addScore(id, score * boost); + newWeight.copyExtData(this, id); + } + return newWeight; + } + + toArray() { + return Array.from(this.scores.entries()) + .sort((a, b) => b[1] - a[1]) + .map(e => e[0]); + } + + filter(predicate: (id: number) => boolean) { + const newWeight = new Match(); + for (const [id, score] of this.scores) { + if (predicate(id)) { + newWeight.addScore(id, score); + newWeight.copyExtData(this, id); + } + } + return newWeight; + } + + private copyExtData(from: Match, id: number) { + for (const [field, highlighters] of from.highlighters.get(id) ?? []) { + for (const highlighter of highlighters) { + this.addHighlighter(id, field, highlighter); + } + } + } +} diff --git a/packages/common/infra/src/sync/indexer/index.ts b/packages/common/infra/src/sync/indexer/index.ts new file mode 100644 index 0000000000..e4cfbadfbc --- /dev/null +++ b/packages/common/infra/src/sync/indexer/index.ts @@ -0,0 +1,6 @@ +export * from './document'; +export * from './field-type'; +export * from './indexer'; +export * from './query'; +export * from './schema'; +export * from './searcher'; diff --git a/packages/common/infra/src/sync/indexer/indexer.ts b/packages/common/infra/src/sync/indexer/indexer.ts new file mode 100644 index 0000000000..027bd74419 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/indexer.ts @@ -0,0 +1,38 @@ +import type { Document } from './document'; +import type { Schema } from './schema'; +import type { Searcher, Subscriber } from './searcher'; + +export interface Index + extends IndexReader, + Searcher, + Subscriber { + write(): Promise>; + + clear(): Promise; +} + +export interface IndexWriter + extends IndexReader, + Searcher { + insert(document: Document): void; + + put(document: Document): void; + + delete(id: string): void; + + commit(): Promise; + + rollback(): void; +} + +export interface IndexReader { + get(id: string): Promise | null>; + + getAll(ids: string[]): Promise[]>; + + has(id: string): Promise; +} + +export interface IndexStorage { + getIndex(name: string, schema: S): Index; +} diff --git a/packages/common/infra/src/sync/indexer/query.ts b/packages/common/infra/src/sync/indexer/query.ts new file mode 100644 index 0000000000..921154894e --- /dev/null +++ b/packages/common/infra/src/sync/indexer/query.ts @@ -0,0 +1,35 @@ +import type { Schema } from './schema'; + +export type MatchQuery = { + type: 'match'; + field: keyof S; + match: string; +}; + +export type BoostQuery = { + type: 'boost'; + query: Query; + boost: number; +}; + +export type BooleanQuery = { + type: 'boolean'; + occur: 'should' | 'must' | 'must_not'; + queries: Query[]; +}; + +export type ExistsQuery = { + type: 'exists'; + field: keyof S; +}; + +export type AllQuery = { + type: 'all'; +}; + +export type Query = + | BooleanQuery + | MatchQuery + | AllQuery + | ExistsQuery + | BoostQuery; diff --git a/packages/common/infra/src/sync/indexer/schema.ts b/packages/common/infra/src/sync/indexer/schema.ts new file mode 100644 index 0000000000..dd534c48a4 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/schema.ts @@ -0,0 +1,7 @@ +import type { FieldType } from './field-type'; + +export type Schema = Record; + +export function defineSchema(schema: T): T { + return schema; +} diff --git a/packages/common/infra/src/sync/indexer/searcher.ts b/packages/common/infra/src/sync/indexer/searcher.ts new file mode 100644 index 0000000000..dec72ad758 --- /dev/null +++ b/packages/common/infra/src/sync/indexer/searcher.ts @@ -0,0 +1,83 @@ +import type { Observable } from 'rxjs'; + +import type { Query } from './query'; +import type { Schema } from './schema'; + +type HighlightAbleField = { + [K in keyof S]: S[K] extends 'FullText' ? K : never; +}[keyof S]; + +export interface Searcher { + search>( + query: Query, + options?: O + ): Promise>; + aggregate>( + query: Query, + field: keyof S, + options?: O + ): Promise>; +} + +export interface Subscriber { + search$>( + query: Query, + options?: O + ): Observable>; + aggregate$>( + query: Query, + field: keyof S, + options?: O + ): Observable>; +} + +type ResultPagination = { + count: number; + limit: number; + skip: number; + hasMore: boolean; +}; + +type PaginationOption = { + limit?: number; + skip?: number; +}; + +export type SearchOptions = { + pagination?: PaginationOption; + highlights?: { + field: HighlightAbleField; + before: string; + end: string; + }[]; + fields?: (keyof S)[]; +}; + +export type SearchResult> = { + pagination: ResultPagination; + nodes: ({ + id: string; + score: number; + } & (O['fields'] extends any[] + ? { fields: { [key in O['fields'][number]]: string | string[] } } + : unknown) & + (O['highlights'] extends any[] + ? { highlights: { [key in O['highlights'][number]['field']]: string[] } } + : unknown))[]; +}; + +export interface AggregateOptions { + pagination?: PaginationOption; + hits?: SearchOptions; +} + +export type AggregateResult> = { + pagination: ResultPagination; + buckets: ({ + key: string; + score: number; + count: number; + } & (O['hits'] extends object + ? { hits: SearchResult } + : unknown))[]; +}; diff --git a/packages/common/infra/src/utils/exhaustmap-with-trailing.ts b/packages/common/infra/src/utils/exhaustmap-with-trailing.ts new file mode 100644 index 0000000000..f03b161a19 --- /dev/null +++ b/packages/common/infra/src/utils/exhaustmap-with-trailing.ts @@ -0,0 +1,41 @@ +import { + asyncScheduler, + defer, + exhaustMap, + finalize, + type Observable, + type ObservableInput, + type OperatorFunction, + scheduled, + Subject, + throttle, +} from 'rxjs'; + +/** + * Like exhaustMap, but also includes the trailing value emitted from the source observable while waiting for the preceding inner observable to complete + * + * Original code adapted from https://github.com/ReactiveX/rxjs/issues/5004 + * @param {function(value: T, ?index: number): ObservableInput} project - A function that, when applied to an item emitted by the + * source Observable, returns a projected Observable. + */ +export function exhaustMapWithTrailing( + project: (value: T, index: number) => ObservableInput +): OperatorFunction { + return (source$): Observable => + defer(() => { + const release$ = new Subject(); + return source$.pipe( + throttle(() => release$, { + leading: true, + trailing: true, + }), + exhaustMap((value, index) => + scheduled(project(value, index), asyncScheduler).pipe( + finalize(() => { + release$.next(); + }) + ) + ) + ); + }); +} diff --git a/yarn.lock b/yarn.lock index fdf35bfb75..0251be314a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -15659,7 +15659,11 @@ __metadata: "@datastructures-js/binary-search-tree": "npm:^5.3.2" "@testing-library/react": "npm:^16.0.0" async-call-rpc: "npm:^6.4.0" + fake-indexeddb: "npm:^6.0.0" foxact: "npm:^0.2.33" + fuse.js: "npm:^7.0.0" + graphemer: "npm:^1.4.0" + idb: "npm:^8.0.0" jotai: "npm:^2.8.0" jotai-effect: "npm:^1.0.0" lodash-es: "npm:^4.17.21" @@ -24567,6 +24571,13 @@ __metadata: languageName: node linkType: hard +"fuse.js@npm:^7.0.0": + version: 7.0.0 + resolution: "fuse.js@npm:7.0.0" + checksum: 10/d75d35f2d61afa85b8248f9cbfc7d4df29ae47ea574a15ad5c3c2a41930c5ed78668346295508b59ec4929fcb1a5cd6d9a8c649b5a3bc8b18e515f4e4cb9809d + languageName: node + linkType: hard + "galactus@npm:^1.0.0": version: 1.0.0 resolution: "galactus@npm:1.0.0"