feat(nbstore): add indexer storage (#10953)

This commit is contained in:
EYHN
2025-03-31 12:59:51 +00:00
parent c9e14ac0db
commit 8957d0645f
82 changed files with 3393 additions and 4753 deletions

View File

@@ -1,3 +1,4 @@
export * from './awareness';
export * from './blob';
export * from './doc';
export * from './indexer';

View File

@@ -0,0 +1,45 @@
import type { IndexerStorage } from '../storage';
import type { IndexerSync } from '../sync/indexer';
export class IndexerFrontend {
constructor(
public readonly storage: IndexerStorage,
public readonly sync: IndexerSync
) {}
get state$() {
return this.sync.state$;
}
docState$(docId: string) {
return this.sync.docState$(docId);
}
search = this.storage.search.bind(this.storage);
aggregate = this.storage.aggregate.bind(this.storage);
// eslint-disable-next-line rxjs/finnish
search$ = this.storage.search$.bind(this.storage);
// eslint-disable-next-line rxjs/finnish
aggregate$ = this.storage.aggregate$.bind(this.storage);
addPriority(docId: string, priority: number) {
return this.sync.addPriority(docId, priority);
}
waitForCompleted(signal?: AbortSignal) {
return this.sync.waitForCompleted(signal);
}
waitForDocCompleted(docId: string, signal?: AbortSignal) {
return this.sync.waitForDocCompleted(docId, signal);
}
waitForDocCompletedWithPriority(
docId: string,
priority: number,
signal?: AbortSignal
) {
const undo = this.addPriority(docId, priority);
return this.sync.waitForDocCompleted(docId, signal).finally(() => undo());
}
}

View File

@@ -3,15 +3,26 @@ import { IndexedDBBlobStorage } from './blob';
import { IndexedDBBlobSyncStorage } from './blob-sync';
import { IndexedDBDocStorage } from './doc';
import { IndexedDBDocSyncStorage } from './doc-sync';
import { IndexedDBIndexerStorage } from './indexer';
import { IndexedDBIndexerSyncStorage } from './indexer-sync';
export * from './blob';
export * from './blob-sync';
export * from './doc';
export * from './doc-sync';
export * from './indexer';
export * from './indexer-sync';
export const idbStorages = [
IndexedDBDocStorage,
IndexedDBBlobStorage,
IndexedDBDocSyncStorage,
IndexedDBBlobSyncStorage,
IndexedDBIndexerStorage,
IndexedDBIndexerSyncStorage,
] satisfies StorageConstructor[];
export const idbStoragesIndexerOnly = [
IndexedDBIndexerStorage,
IndexedDBIndexerSyncStorage,
] satisfies StorageConstructor[];

View File

@@ -0,0 +1,38 @@
import { share } from '../../connection';
import type { DocClock } from '../../storage/doc';
import { IndexerSyncStorageBase } from '../../storage/indexer-sync';
import { IDBConnection, type IDBConnectionOptions } from './db';
export class IndexedDBIndexerSyncStorage extends IndexerSyncStorageBase {
static readonly identifier = 'IndexedDBIndexerSyncStorage';
readonly connection = share(new IDBConnection(this.options));
constructor(private readonly options: IDBConnectionOptions) {
super();
}
async getDocIndexedClock(docId: string): Promise<DocClock | null> {
const tx = this.connection.inner.db.transaction('indexerSync', 'readonly');
const store = tx.store;
const result = await store.get(docId);
return result
? { docId: result.docId, timestamp: result.indexedClock }
: null;
}
async setDocIndexedClock(docClock: DocClock): Promise<void> {
const tx = this.connection.inner.db.transaction('indexerSync', 'readwrite');
const store = tx.store;
await store.put({
docId: docClock.docId,
indexedClock: docClock.timestamp,
});
}
async clearDocIndexedClock(docId: string): Promise<void> {
const tx = this.connection.inner.db.transaction('indexerSync', 'readwrite');
const store = tx.store;
await store.delete(docId);
}
}

View File

@@ -0,0 +1,10 @@
import { expect, test } from 'vitest';
import { bm25 } from '../bm25';
test('bm25', () => {
expect(bm25(1, 1, 10, 10, 15)).toEqual(3.2792079793859643);
expect(bm25(2, 1, 10, 10, 15) > bm25(1, 1, 10, 10, 15)).toBeTruthy();
expect(bm25(1, 1, 10, 10, 15) > bm25(2, 1, 10, 100, 15)).toBeTruthy();
expect(bm25(1, 1, 10, 10, 15) > bm25(1, 1, 10, 100, 15)).toBeTruthy();
});

View File

@@ -0,0 +1,32 @@
import { expect, test } from 'vitest';
import { highlighter } from '../highlighter';
test('highlighter', () => {
expect(highlighter('0123456789', '<b>', '</b>', [[3, 5]])).toEqual(
'012<b>34</b>56789'
);
expect(
highlighter(
'012345678901234567890123456789012345678901234567890123456789',
'<b>',
'</b>',
[[59, 60]]
)
).toEqual('...0123456789012345678901234567890123456789012345678<b>9</b>');
expect(
highlighter(
'012345678901234567890123456789012345678901234567890123456789',
'<b>',
'</b>',
[
[10, 11],
[49, 51],
]
)
).toEqual(
'0123456789<b>0</b>12345678901234567890123456789012345678<b>9</b>...'
);
});

View File

@@ -0,0 +1,128 @@
import { expect, test } from 'vitest';
import { GeneralTokenizer } from '../tokenizer';
test('tokenizer', () => {
{
const tokens = new GeneralTokenizer().tokenize('hello world,\n AFFiNE');
expect(tokens).toEqual([
{ term: 'hello', start: 0, end: 5 },
{ term: 'world', start: 7, end: 12 },
{ term: 'affine', start: 15, end: 21 },
]);
}
{
const tokens = new GeneralTokenizer().tokenize('你好世界,阿芬');
expect(tokens).toEqual([
{
end: 2,
start: 0,
term: '你好',
},
{
end: 3,
start: 1,
term: '好世',
},
{
end: 4,
start: 2,
term: '世界',
},
{
end: 7,
start: 5,
term: '阿芬',
},
]);
}
{
const tokens = new GeneralTokenizer().tokenize('1阿2芬');
expect(tokens).toEqual([
{ term: '1', start: 0, end: 1 },
{ term: '阿', start: 1, end: 2 },
{ term: '2', start: 2, end: 3 },
{ term: '芬', start: 3, end: 4 },
]);
}
{
const tokens = new GeneralTokenizer().tokenize('안녕하세요 세계');
expect(tokens).toEqual([
{
end: 2,
start: 0,
term: '안녕',
},
{
end: 3,
start: 1,
term: '녕하',
},
{
end: 4,
start: 2,
term: '하세',
},
{
end: 5,
start: 3,
term: '세요',
},
{
end: 8,
start: 6,
term: '세계',
},
]);
}
{
const tokens = new GeneralTokenizer().tokenize('ハローワールド');
expect(tokens).toEqual([
{ term: 'ハロ', start: 0, end: 2 },
{ term: 'ロー', start: 1, end: 3 },
{ term: 'ーワ', start: 2, end: 4 },
{ term: 'ワー', start: 3, end: 5 },
{ term: 'ール', start: 4, end: 6 },
{ term: 'ルド', start: 5, end: 7 },
]);
}
{
const tokens = new GeneralTokenizer().tokenize('はろーわーるど');
expect(tokens).toEqual([
{ term: 'はろ', start: 0, end: 2 },
{ term: 'ろー', start: 1, end: 3 },
{ term: 'ーわ', start: 2, end: 4 },
{ term: 'わー', start: 3, end: 5 },
{ term: 'ーる', start: 4, end: 6 },
{ term: 'るど', start: 5, end: 7 },
]);
}
{
const tokens = new GeneralTokenizer().tokenize('👋1⃣🚪👋🏿');
expect(tokens).toEqual([
{ term: '👋', start: 0, end: 2 },
{ term: '1⃣', start: 2, end: 5 },
{ term: '🚪', start: 5, end: 7 },
{ term: '👋🏿', start: 7, end: 11 },
]);
}
{
const tokens = new GeneralTokenizer().tokenize('1');
expect(tokens).toEqual([{ term: '1', start: 0, end: 2 }]);
}
});

View File

@@ -0,0 +1,62 @@
/**
* Parameters of the BM25+ scoring algorithm. Customizing these is almost never
* necessary, and finetuning them requires an understanding of the BM25 scoring
* model.
*
* Some information about BM25 (and BM25+) can be found at these links:
*
* - https://en.wikipedia.org/wiki/Okapi_BM25
* - https://opensourceconnections.com/blog/2015/10/16/bm25-the-next-generation-of-lucene-relevation/
*/
export type BM25Params = {
/** Term frequency saturation point.
*
* Recommended values are between `1.2` and `2`. Higher values increase the
* difference in score between documents with higher and lower term
* frequencies. Setting this to `0` or a negative value is invalid. Defaults
* to `1.2`
*/
k: number;
/**
* Length normalization impact.
*
* Recommended values are around `0.75`. Higher values increase the weight
* that field length has on scoring. Setting this to `0` (not recommended)
* means that the field length has no effect on scoring. Negative values are
* invalid. Defaults to `0.7`.
*/
b: number;
/**
* BM25+ frequency normalization lower bound (usually called δ).
*
* Recommended values are between `0.5` and `1`. Increasing this parameter
* increases the minimum relevance of one occurrence of a search term
* regardless of its (possibly very long) field length. Negative values are
* invalid. Defaults to `0.5`.
*/
d: number;
};
const defaultBM25params: BM25Params = { k: 1.2, b: 0.7, d: 0.5 };
export const bm25 = (
termFreq: number,
matchingCount: number,
totalCount: number,
fieldLength: number,
avgFieldLength: number,
bm25params: BM25Params = defaultBM25params
): number => {
const { k, b, d } = bm25params;
const invDocFreq = Math.log(
1 + (totalCount - matchingCount + 0.5) / (matchingCount + 0.5)
);
return (
invDocFreq *
(d +
(termFreq * (k + 1)) /
(termFreq + k * (1 - b + b * (fieldLength / avgFieldLength))))
);
};

View File

@@ -0,0 +1,493 @@
import { type IDBPDatabase, type IDBPTransaction, type StoreNames } from 'idb';
import {
type AggregateOptions,
type AggregateResult,
type IndexerDocument,
type IndexerFieldSchema,
IndexerSchema,
type Query,
type SearchOptions,
type SearchResult,
} from '../../../storage';
import type { DocStorageSchema } from '../schema';
import { highlighter } from './highlighter';
import {
BooleanInvertedIndex,
FullTextInvertedIndex,
IntegerInvertedIndex,
type InvertedIndex,
StringInvertedIndex,
} from './inverted-index';
import { Match } from './match';
export type DataStructRWTransaction = IDBPTransaction<
DocStorageSchema,
ArrayLike<StoreNames<DocStorageSchema>>,
'readwrite'
>;
export type DataStructROTransaction = IDBPTransaction<
DocStorageSchema,
ArrayLike<StoreNames<DocStorageSchema>>,
'readonly' | 'readwrite'
>;
export class DataStruct {
database: IDBPDatabase<DocStorageSchema> = null as any;
invertedIndex = new Map<string, Map<string, InvertedIndex>>();
constructor() {
for (const [tableName, table] of Object.entries(IndexerSchema)) {
const tableInvertedIndex = new Map<string, InvertedIndex>();
for (const [fieldName, type] of Object.entries(table)) {
const typeInfo: IndexerFieldSchema =
typeof type === 'string' ? { type } : type;
if ('index' in typeInfo && typeInfo.index === false) {
// If index is false, we don't need to create an inverted index for this field.
continue;
}
if (typeInfo.type === 'String') {
tableInvertedIndex.set(
fieldName,
new StringInvertedIndex(tableName, fieldName)
);
} else if (typeInfo.type === 'Integer') {
tableInvertedIndex.set(
fieldName,
new IntegerInvertedIndex(tableName, fieldName)
);
} else if (typeInfo.type === 'FullText') {
tableInvertedIndex.set(
fieldName,
new FullTextInvertedIndex(tableName, fieldName)
);
} else if (typeInfo.type === 'Boolean') {
tableInvertedIndex.set(
fieldName,
new BooleanInvertedIndex(tableName, fieldName)
);
} else {
throw new Error(`Field type '${typeInfo.type}' not supported`);
}
}
this.invertedIndex.set(tableName, tableInvertedIndex);
}
}
private async update(
trx: DataStructRWTransaction,
table: keyof IndexerSchema,
document: IndexerDocument
) {
const existsNid = await trx
.objectStore('indexerRecords')
.index('id')
.getKey([table, document.id]);
const exists = existsNid
? await trx.objectStore('indexerRecords').get(existsNid)
: null;
if (!existsNid || !exists) {
// if not exists, return
return;
}
// delete exists one
await this.deleteByNid(trx, existsNid);
const dataMap = new Map([...exists.data, ...document.fields]); // merge exists data with new data
const nid = await trx
.objectStore('indexerRecords')
.put({ table, id: document.id, data: dataMap });
for (const [key, values] of dataMap) {
const type = IndexerSchema[table][
key as keyof IndexerSchema[typeof table]
] as IndexerFieldSchema;
if (!type) {
continue;
}
const typeInfo = typeof type === 'string' ? { type } : type;
if (typeInfo.index !== false) {
// If index is false, the field will not be indexed
const iidx = this.invertedIndex.get(table)?.get(key);
if (!iidx) {
continue;
}
await iidx.insert(trx, nid, values);
}
}
}
private async insert(
trx: DataStructRWTransaction,
table: keyof IndexerSchema,
document: IndexerDocument
) {
const existsNid = await trx
.objectStore('indexerRecords')
.index('id')
.getKey([table, document.id]);
if (existsNid) {
// delete exists one
await this.deleteByNid(trx, existsNid);
}
const dataMap = document.fields;
const nid = await trx
.objectStore('indexerRecords')
.put({ table, id: document.id, data: dataMap });
for (const [key, values] of dataMap) {
const type = IndexerSchema[table][
key as keyof IndexerSchema[typeof table]
] as IndexerFieldSchema;
if (!type) {
continue;
}
const typeInfo = typeof type === 'string' ? { type } : type;
if (typeInfo.index !== false) {
// If index is false, the field will not be indexed
const iidx = this.invertedIndex.get(table)?.get(key);
if (!iidx) {
continue;
}
await iidx.insert(trx, nid, values);
}
}
}
private async deleteByNid(trx: DataStructRWTransaction, nid: number) {
await trx.objectStore('indexerRecords').delete(nid);
const indexIds = await trx
.objectStore('invertedIndex')
.index('nid')
.getAllKeys(nid);
for (const indexId of indexIds) {
await trx.objectStore('invertedIndex').delete(indexId);
}
}
private async delete(
trx: DataStructRWTransaction,
table: keyof IndexerSchema,
id: string
) {
const nid = await trx
.objectStore('indexerRecords')
.index('id')
.getKey([table, id]);
if (nid) {
await this.deleteByNid(trx, nid);
} else {
return;
}
}
async deleteByQuery(
trx: DataStructRWTransaction,
table: keyof IndexerSchema,
query: Query<any>
) {
const match = await this.queryRaw(trx, table, query);
for (const nid of match.scores.keys()) {
await this.deleteByNid(trx, nid);
}
}
async batchWrite(
trx: DataStructRWTransaction,
table: keyof IndexerSchema,
deleteByQueries: Query<any>[],
deletes: string[],
inserts: IndexerDocument<any>[],
updates: IndexerDocument<any>[]
) {
for (const query of deleteByQueries) {
await this.deleteByQuery(trx, table, query);
}
for (const del of deletes) {
await this.delete(trx, table, del);
}
for (const inst of inserts) {
await this.insert(trx, table, inst);
}
for (const update of updates) {
await this.update(trx, table, update);
}
}
async matchAll(
trx: DataStructROTransaction,
table: keyof IndexerSchema
): Promise<Match> {
const allNids = await trx
.objectStore('indexerRecords')
.index('table')
.getAllKeys(table);
const match = new Match();
for (const nid of allNids) {
match.addScore(nid, 1);
}
return match;
}
async queryRaw(
trx: DataStructROTransaction,
table: keyof IndexerSchema,
query: Query<any>
): Promise<Match> {
if (query.type === 'match') {
const iidx = this.invertedIndex.get(table)?.get(query.field as string);
if (!iidx) {
return new Match();
}
return await iidx.match(trx, query.match);
} else if (query.type === 'boolean') {
const weights = [];
for (const q of query.queries) {
weights.push(await this.queryRaw(trx, table, q));
}
if (query.occur === 'must') {
return weights.reduce((acc, w) => acc.and(w));
} else if (query.occur === 'must_not') {
const total = weights.reduce((acc, w) => acc.and(w));
return (await this.matchAll(trx, table)).exclude(total);
} else if (query.occur === 'should') {
return weights.reduce((acc, w) => acc.or(w));
}
} else if (query.type === 'all') {
return await this.matchAll(trx, table);
} else if (query.type === 'boost') {
return (await this.queryRaw(trx, table, query.query)).boost(query.boost);
} else if (query.type === 'exists') {
const iidx = this.invertedIndex.get(table)?.get(query.field as string);
if (!iidx) {
return new Match();
}
return await iidx.all(trx);
}
throw new Error(`Query type '${query.type}' not supported`);
}
async clear(trx: DataStructRWTransaction) {
await trx.objectStore('indexerRecords').clear();
await trx.objectStore('invertedIndex').clear();
await trx.objectStore('indexerMetadata').clear();
}
async search(
trx: DataStructROTransaction,
table: keyof IndexerSchema,
query: Query<any>,
options: SearchOptions<any> = {}
): Promise<SearchResult<any, any>> {
const pagination = {
skip: options.pagination?.skip ?? 0,
limit: options.pagination?.limit ?? 100,
};
const match = await this.queryRaw(trx, table, query);
const nids = match
.toArray()
.slice(pagination.skip, pagination.skip + pagination.limit);
const nodes = [];
for (const nid of nids) {
const record = await trx.objectStore('indexerRecords').get(nid);
if (!record) {
continue;
}
nodes.push(this.resultNode(record, options, match, nid));
}
return {
pagination: {
count: match.size(),
hasMore: match.size() > pagination.limit + pagination.skip,
limit: pagination.limit,
skip: pagination.skip,
},
nodes: nodes,
};
}
async aggregate(
trx: DataStructROTransaction,
table: keyof IndexerSchema,
query: Query<any>,
field: string,
options: AggregateOptions<any> = {}
): Promise<AggregateResult<any, any>> {
const pagination = {
skip: options.pagination?.skip ?? 0,
limit: options.pagination?.limit ?? 100,
};
const hitPagination = options.hits
? {
skip: options.hits.pagination?.skip ?? 0,
limit: options.hits.pagination?.limit ?? 3,
}
: { skip: 0, limit: 0 };
const match = await this.queryRaw(trx, table, query);
const nids = match.toArray();
const buckets: {
key: string;
nids: number[];
hits: SearchResult<any, any>['nodes'];
}[] = [];
for (const nid of nids) {
const record = await trx.objectStore('indexerRecords').get(nid);
if (!record) {
continue;
}
const values = record.data.get(field);
for (const value of values ?? []) {
let bucket;
let bucketIndex = buckets.findIndex(b => b.key === value);
if (bucketIndex === -1) {
bucket = { key: value, nids: [], hits: [] };
buckets.push(bucket);
bucketIndex = buckets.length - 1;
} else {
bucket = buckets[bucketIndex];
}
if (
bucketIndex >= pagination.skip &&
bucketIndex < pagination.skip + pagination.limit
) {
bucket.nids.push(nid);
if (
bucket.nids.length - 1 >= hitPagination.skip &&
bucket.nids.length - 1 < hitPagination.skip + hitPagination.limit
) {
bucket.hits.push(
this.resultNode(record, options.hits ?? {}, match, nid)
);
}
}
}
}
return {
buckets: buckets
.slice(pagination.skip, pagination.skip + pagination.limit)
.map(bucket => {
const result = {
key: bucket.key,
score: match.getScore(bucket.nids[0]),
count: bucket.nids.length,
} as AggregateResult<any, any>['buckets'][number];
if (options.hits) {
(result as any).hits = {
pagination: {
count: bucket.nids.length,
hasMore:
bucket.nids.length > hitPagination.limit + hitPagination.skip,
limit: hitPagination.limit,
skip: hitPagination.skip,
},
nodes: bucket.hits,
} as SearchResult<any, any>;
}
return result;
}),
pagination: {
count: buckets.length,
hasMore: buckets.length > pagination.limit + pagination.skip,
limit: pagination.limit,
skip: pagination.skip,
},
};
}
async readonly(database: IDBPDatabase<DocStorageSchema>) {
return database.transaction(
['indexerRecords', 'invertedIndex', 'indexerMetadata'],
'readonly',
{ durability: 'relaxed' }
);
}
async readwrite(database: IDBPDatabase<DocStorageSchema>) {
return database.transaction(
['indexerRecords', 'invertedIndex', 'indexerMetadata'],
'readwrite',
{ durability: 'relaxed' }
);
}
private resultNode(
record: { id: string; data: Map<string, string[]> },
options: SearchOptions<any>,
match?: Match,
nid?: number
): SearchResult<any, any>['nodes'][number] {
const node = {
id: record.id,
score: match && nid ? match.getScore(nid) : 1,
} as any;
if (options.fields) {
const fields = {} as Record<string, string | string[]>;
for (const field of options.fields as string[]) {
fields[field] = record.data.get(field) ?? [''];
if (fields[field].length === 1) {
fields[field] = fields[field][0];
}
}
node.fields = fields;
}
if (match && nid && options.highlights) {
const highlights = {} as Record<string, string[]>;
for (const { field, before, end } of options.highlights) {
const highlightValues = match.getHighlighters(nid, field);
if (highlightValues) {
const rawValues = record.data.get(field) ?? [];
highlights[field] = Array.from(highlightValues)
.map(([index, ranges]) => {
const raw = rawValues[index];
if (raw) {
return (
highlighter(raw, before, end, ranges, {
maxPrefix: 20,
maxLength: 50,
}) ?? ''
);
}
return '';
})
.filter(Boolean);
}
}
node.highlights = highlights;
}
return node;
}
}

View File

@@ -0,0 +1,77 @@
export function highlighter(
originText: string,
before: string,
after: string,
matches: [number, number][],
{
maxLength = 50,
maxPrefix = 20,
}: { maxLength?: number; maxPrefix?: number } = {}
) {
const merged = mergeRanges(matches);
if (merged.length === 0) {
return null;
}
const firstMatch = merged[0][0];
const start = Math.max(
0,
Math.min(firstMatch - maxPrefix, originText.length - maxLength)
);
const end = Math.min(start + maxLength, originText.length);
const text = originText.substring(start, end);
let result = '';
let pointer = 0;
for (const match of merged) {
const matchStart = match[0] - start;
const matchEnd = match[1] - start;
if (matchStart >= text.length) {
break;
}
result += text.substring(pointer, matchStart);
pointer = matchStart;
const highlighted = text.substring(matchStart, matchEnd);
if (highlighted.length === 0) {
continue;
}
result += `${before}${highlighted}${after}`;
pointer = matchEnd;
}
result += text.substring(pointer);
if (start > 0) {
result = `...${result}`;
}
if (end < originText.length) {
result = `${result}...`;
}
return result;
}
function mergeRanges(intervals: [number, number][]) {
if (intervals.length === 0) return [];
intervals.sort((a, b) => a[0] - b[0]);
const merged = [intervals[0]];
for (let i = 1; i < intervals.length; i++) {
const last = merged[merged.length - 1];
const current = intervals[i];
if (current[0] <= last[1]) {
last[1] = Math.max(last[1], current[1]);
} else {
merged.push(current);
}
}
return merged;
}

View File

@@ -0,0 +1,203 @@
import { merge, Observable, of, Subject, throttleTime } from 'rxjs';
import type {
AggregateOptions,
AggregateResult,
IndexerDocument,
IndexerSchema,
Query,
SearchOptions,
SearchResult,
} from '../../../storage';
import { IndexerStorageBase } from '../../../storage';
import { IDBConnection, type IDBConnectionOptions } from '../db';
import { DataStruct } from './data-struct';
import { backoffRetry, exhaustMapWithTrailing, fromPromise } from './utils';
export class IndexedDBIndexerStorage extends IndexerStorageBase {
static readonly identifier = 'IndexedDBIndexerStorage';
readonly connection = new IDBConnection(this.options);
override isReadonly = false;
private readonly data = new DataStruct();
private readonly tableUpdate$ = new Subject<string>();
/**
* The write operations of IndexedDBIndexerStorage are first cached in pendingUpdates,
* and then committed to IndexedDB in a batch through the refresh method.
*/
private readonly pendingUpdates: Record<
keyof IndexerSchema,
{
deleteByQueries: Query<any>[];
deletes: string[];
inserts: IndexerDocument[];
updates: IndexerDocument[];
}
> = {
doc: { deleteByQueries: [], deletes: [], inserts: [], updates: [] },
block: { deleteByQueries: [], deletes: [], inserts: [], updates: [] },
};
get channel() {
return this.connection.inner.channel;
}
get database() {
return this.connection.inner.db;
}
constructor(private readonly options: IDBConnectionOptions) {
super();
}
override async search<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(table: T, query: Query<T>, options?: O): Promise<SearchResult<T, O>> {
const trx = await this.data.readonly(this.database);
return this.data.search(trx, table, query, options);
}
override async aggregate<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Promise<AggregateResult<T, O>> {
const trx = await this.data.readonly(this.database);
return this.data.aggregate(trx, table, query, field as string, options);
}
override search$<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(table: T, query: Query<T>, options?: O): Observable<SearchResult<T, O>> {
return merge(of(1), this.watchTableUpdated(table)).pipe(
throttleTime(3000, undefined, { leading: true, trailing: true }),
exhaustMapWithTrailing(() => {
return fromPromise(async () => {
try {
const trx = await this.data.readonly(this.database);
return await this.data.search(trx, table, query, options);
} catch (error) {
console.error('search error', error);
throw error;
}
}).pipe(backoffRetry());
})
);
}
override aggregate$<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Observable<AggregateResult<T, O>> {
return merge(of(1), this.watchTableUpdated(table)).pipe(
throttleTime(3000, undefined, { leading: true, trailing: true }),
exhaustMapWithTrailing(() => {
return fromPromise(async () => {
try {
const trx = await this.data.readonly(this.database);
return await this.data.aggregate(
trx,
table,
query,
field as string,
options
);
} catch (error) {
console.error('aggregate error', error);
throw error;
}
}).pipe(backoffRetry());
})
);
}
override async deleteByQuery<T extends keyof IndexerSchema>(
table: T,
query: Query<T>
): Promise<void> {
this.pendingUpdates[table].deleteByQueries.push(query);
}
override insert<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument
): Promise<void> {
this.pendingUpdates[table].inserts.push(document);
return Promise.resolve();
}
override delete<T extends keyof IndexerSchema>(
table: T,
id: string
): Promise<void> {
this.pendingUpdates[table].deletes.push(id);
return Promise.resolve();
}
override update<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument
): Promise<void> {
this.pendingUpdates[table].updates.push(document);
return Promise.resolve();
}
override async refresh<T extends keyof IndexerSchema>(
table: T
): Promise<void> {
const trx = await this.data.readwrite(this.database);
const tables = table ? [table] : (['doc', 'block'] as const);
for (const table of tables) {
await this.data.batchWrite(
trx,
table,
this.pendingUpdates[table].deleteByQueries,
this.pendingUpdates[table].deletes,
this.pendingUpdates[table].inserts,
this.pendingUpdates[table].updates
);
this.pendingUpdates[table] = {
deleteByQueries: [],
deletes: [],
inserts: [],
updates: [],
};
}
this.emitTableUpdated(table);
}
private watchTableUpdated(table: keyof IndexerSchema) {
return new Observable(subscriber => {
const listener = (ev: MessageEvent) => {
if (ev.data.type === 'indexer-updated' && ev.data.table === table) {
subscriber.next(1);
}
};
const subscription = this.tableUpdate$.subscribe(updatedTable => {
if (updatedTable === table) {
subscriber.next(1);
}
});
this.channel.addEventListener('message', listener);
return () => {
this.channel.removeEventListener('message', listener);
subscription.unsubscribe();
};
});
}
emitTableUpdated(table: keyof IndexerSchema) {
this.tableUpdate$.next(table);
this.channel.postMessage({ type: 'indexer-updated', table });
}
}

View File

@@ -0,0 +1,496 @@
import { bm25 } from './bm25';
import type {
DataStructROTransaction,
DataStructRWTransaction,
} from './data-struct';
import { Match } from './match';
import { GeneralTokenizer, type Token } from './tokenizer';
export interface InvertedIndex {
fieldKey: string;
match(trx: DataStructROTransaction, term: string): Promise<Match>;
all(trx: DataStructROTransaction): Promise<Match>;
insert(
trx: DataStructRWTransaction,
id: number,
terms: string[]
): Promise<void>;
}
export class StringInvertedIndex implements InvertedIndex {
constructor(
readonly table: string,
readonly fieldKey: string
) {}
async match(trx: DataStructROTransaction, term: string): Promise<Match> {
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll([
this.table,
InvertedIndexKey.forString(this.fieldKey, term).buffer(),
]);
const match = new Match();
for (const obj of objs) {
match.addScore(obj.nid, 1);
}
return match;
}
async all(trx: DataStructROTransaction): Promise<Match> {
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll(
IDBKeyRange.bound(
[this.table, InvertedIndexKey.forPrefix(this.fieldKey).buffer()],
[
this.table,
InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer(),
]
)
);
const set = new Set<number>();
for (const obj of objs) {
set.add(obj.nid);
}
const match = new Match();
for (const nid of set) {
match.addScore(nid, 1);
}
return match;
}
async insert(trx: DataStructRWTransaction, id: number, terms: string[]) {
for (const term of terms) {
await trx.objectStore('invertedIndex').put({
table: this.table,
key: InvertedIndexKey.forString(this.fieldKey, term).buffer(),
nid: id,
});
}
}
}
export class IntegerInvertedIndex implements InvertedIndex {
constructor(
readonly table: string,
readonly fieldKey: string
) {}
async match(trx: DataStructROTransaction, term: string): Promise<Match> {
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll([
this.table,
InvertedIndexKey.forInt64(this.fieldKey, BigInt(term)).buffer(),
]);
const match = new Match();
for (const obj of objs) {
match.addScore(obj.nid, 1);
}
return match;
}
// eslint-disable-next-line sonarjs/no-identical-functions
async all(trx: DataStructROTransaction): Promise<Match> {
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll(
IDBKeyRange.bound(
[this.table, InvertedIndexKey.forPrefix(this.fieldKey).buffer()],
[
this.table,
InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer(),
]
)
);
const set = new Set<number>();
for (const obj of objs) {
set.add(obj.nid);
}
const match = new Match();
for (const nid of set) {
match.addScore(nid, 1);
}
return match;
}
async insert(trx: DataStructRWTransaction, id: number, terms: string[]) {
for (const term of terms) {
await trx.objectStore('invertedIndex').put({
table: this.table,
key: InvertedIndexKey.forInt64(this.fieldKey, BigInt(term)).buffer(),
nid: id,
});
}
}
}
export class BooleanInvertedIndex implements InvertedIndex {
constructor(
readonly table: string,
readonly fieldKey: string
) {}
// eslint-disable-next-line sonarjs/no-identical-functions
async all(trx: DataStructROTransaction): Promise<Match> {
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll(
IDBKeyRange.bound(
[this.table, InvertedIndexKey.forPrefix(this.fieldKey).buffer()],
[
this.table,
InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer(),
]
)
);
const set = new Set<number>();
for (const obj of objs) {
set.add(obj.nid);
}
const match = new Match();
for (const nid of set) {
match.addScore(nid, 1);
}
return match;
}
async match(trx: DataStructROTransaction, term: string): Promise<Match> {
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll([
this.table,
InvertedIndexKey.forBoolean(this.fieldKey, term === 'true').buffer(),
]);
const match = new Match();
for (const obj of objs) {
match.addScore(obj.nid, 1);
}
return match;
}
async insert(trx: DataStructRWTransaction, id: number, terms: string[]) {
for (const term of terms) {
await trx.objectStore('invertedIndex').put({
table: this.table,
key: InvertedIndexKey.forBoolean(
this.fieldKey,
term === 'true'
).buffer(),
nid: id,
});
}
}
}
export class FullTextInvertedIndex implements InvertedIndex {
constructor(
readonly table: string,
readonly fieldKey: string
) {}
async match(trx: DataStructROTransaction, term: string): Promise<Match> {
const queryTokens = new GeneralTokenizer().tokenize(term);
const matched = new Map<
number,
Map<
number, // index
{
score: number;
ranges: [number, number][];
}
>
>();
const avgFieldLength =
(
await trx
.objectStore('indexerMetadata')
.get(`full-text:avg-field-length:${this.table}:${this.fieldKey}`)
)?.value ?? 0;
for (const token of queryTokens) {
const key = InvertedIndexKey.forString(this.fieldKey, token.term);
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll(
IDBKeyRange.bound(
[this.table, key.buffer()],
[this.table, key.add1().buffer()],
false,
true
)
);
const submatched: {
nid: number;
score: number;
position: {
index: number;
ranges: [number, number][];
};
}[] = [];
for (const obj of objs) {
const key = InvertedIndexKey.fromBuffer(obj.key);
const originTokenTerm = key.asString();
const matchLength = token.term.length;
const position = obj.pos ?? {
i: 0,
l: 0,
rs: [],
};
const termFreq = position.rs.length;
const totalCount = objs.length;
const fieldLength = position.l;
const score =
bm25(termFreq, 1, totalCount, fieldLength, avgFieldLength) *
(matchLength / originTokenTerm.length);
const match = {
score,
positions: new Map(),
};
const ranges = match.positions.get(position.i) || [];
ranges.push(
...position.rs.map(([start, _end]) => [start, start + matchLength])
);
match.positions.set(position.i, ranges);
submatched.push({
nid: obj.nid,
score,
position: {
index: position.i,
ranges: position.rs.map(([start, _end]) => [
start,
start + matchLength,
]),
},
});
}
// normalize score
const maxScore = submatched.reduce((acc, s) => Math.max(acc, s.score), 0);
const minScore = submatched.reduce((acc, s) => Math.min(acc, s.score), 0);
for (const { nid, score, position } of submatched) {
const normalizedScore =
maxScore === minScore
? score
: (score - minScore) / (maxScore - minScore);
const match =
matched.get(nid) ??
new Map<
number, // index
{
score: number;
ranges: [number, number][];
}
>();
const item = match.get(position.index) || {
score: 0,
ranges: [],
};
item.score += normalizedScore;
item.ranges.push(...position.ranges);
match.set(position.index, item);
matched.set(nid, match);
}
}
const match = new Match();
for (const [nid, items] of matched) {
if (items.size === 0) {
break;
}
let highestScore = -1;
let highestIndex = -1;
let highestRanges: [number, number][] = [];
for (const [index, { score, ranges }] of items) {
if (score > highestScore) {
highestScore = score;
highestIndex = index;
highestRanges = ranges;
}
}
match.addScore(nid, highestScore);
match.addHighlighter(nid, this.fieldKey, highestIndex, highestRanges);
}
return match;
}
// eslint-disable-next-line sonarjs/no-identical-functions
async all(trx: DataStructROTransaction): Promise<Match> {
const objs = await trx
.objectStore('invertedIndex')
.index('key')
.getAll(
IDBKeyRange.bound(
[this.table, InvertedIndexKey.forPrefix(this.fieldKey).buffer()],
[
this.table,
InvertedIndexKey.forPrefix(this.fieldKey).add1().buffer(),
]
)
);
const set = new Set<number>();
for (const obj of objs) {
set.add(obj.nid);
}
const match = new Match();
for (const nid of set) {
match.addScore(nid, 1);
}
return match;
}
async insert(trx: DataStructRWTransaction, id: number, terms: string[]) {
for (let i = 0; i < terms.length; i++) {
const tokenMap = new Map<string, Token[]>();
const originString = terms[i];
const tokens = new GeneralTokenizer().tokenize(originString);
for (const token of tokens) {
const tokens = tokenMap.get(token.term) || [];
tokens.push(token);
tokenMap.set(token.term, tokens);
}
for (const [term, tokens] of tokenMap) {
await trx.objectStore('invertedIndex').put({
table: this.table,
key: InvertedIndexKey.forString(this.fieldKey, term).buffer(),
nid: id,
pos: {
l: originString.length,
i: i,
rs: tokens.map(token => [token.start, token.end]),
},
});
}
const indexerMetadataStore = trx.objectStore('indexerMetadata');
// update avg-field-length
const totalCount =
(
await indexerMetadataStore.get(
`full-text:field-count:${this.table}:${this.fieldKey}`
)
)?.value ?? 0;
const avgFieldLength =
(
await indexerMetadataStore.get(
`full-text:avg-field-length:${this.table}:${this.fieldKey}`
)
)?.value ?? 0;
await indexerMetadataStore.put({
key: `full-text:field-count:${this.table}:${this.fieldKey}`,
value: totalCount + 1,
});
await indexerMetadataStore.put({
key: `full-text:avg-field-length:${this.table}:${this.fieldKey}`,
value:
avgFieldLength +
(terms.reduce((acc, term) => acc + term.length, 0) - avgFieldLength) /
(totalCount + 1),
});
}
}
}
export class InvertedIndexKey {
constructor(
readonly field: Uint8Array,
readonly value: Uint8Array,
readonly gap: Uint8Array = new Uint8Array([58])
) {}
asString() {
return new TextDecoder().decode(this.value);
}
asInt64() {
return new DataView(this.value.buffer).getBigInt64(
0,
false
); /* big-endian */
}
add1() {
if (this.value.byteLength > 0) {
const bytes = new Uint8Array(this.value.slice(0));
let carry = 1;
for (let i = bytes.length - 1; i >= 0 && carry > 0; i--) {
const sum = bytes[i] + carry;
bytes[i] = sum % 256;
carry = sum >> 8;
}
return new InvertedIndexKey(this.field, bytes);
} else {
return new InvertedIndexKey(
this.field,
new Uint8Array(0),
new Uint8Array([59])
);
}
}
static forPrefix(field: string) {
return new InvertedIndexKey(
new TextEncoder().encode(field),
new Uint8Array(0)
);
}
static forString(field: string, value: string) {
return new InvertedIndexKey(
new TextEncoder().encode(field),
new TextEncoder().encode(value)
);
}
static forBoolean(field: string, value: boolean) {
const bytes = new Uint8Array(1);
bytes.set([value ? 1 : 0]);
return new InvertedIndexKey(new TextEncoder().encode(field), bytes);
}
static forInt64(field: string, value: bigint) {
const bytes = new Uint8Array(8);
new DataView(bytes.buffer).setBigInt64(0, value, false); /* big-endian */
return new InvertedIndexKey(new TextEncoder().encode(field), bytes);
}
buffer() {
const tmp = new Uint8Array(
this.field.byteLength + (this.value?.byteLength ?? 0) + 1
);
tmp.set(new Uint8Array(this.field), 0);
tmp.set(new Uint8Array(this.gap), this.field.byteLength);
if (this.value.byteLength > 0) {
tmp.set(new Uint8Array(this.value), this.field.byteLength + 1);
}
return tmp.buffer;
}
static fromBuffer(buffer: ArrayBuffer) {
const array = new Uint8Array(buffer);
const fieldLength = array.indexOf(58);
const field = array.slice(0, fieldLength);
const value = array.slice(fieldLength + 1);
return new InvertedIndexKey(field, value);
}
}

View File

@@ -0,0 +1,105 @@
export class Match {
scores = new Map<number, number>();
/**
* nid -> field -> index(multi value field) -> [start, end][]
*/
highlighters = new Map<
number,
Map<string, Map<number, [number, number][]>>
>();
constructor() {}
size() {
return this.scores.size;
}
getScore(id: number) {
return this.scores.get(id) ?? 0;
}
addScore(id: number, score: number) {
const currentScore = this.scores.get(id) || 0;
this.scores.set(id, currentScore + score);
}
getHighlighters(id: number, field: string) {
return this.highlighters.get(id)?.get(field);
}
addHighlighter(
id: number,
field: string,
index: number,
newRanges: [number, number][]
) {
const fields =
this.highlighters.get(id) ||
new Map<string, Map<number, [number, number][]>>();
const values = fields.get(field) || new Map<number, [number, number][]>();
const ranges = values.get(index) || [];
ranges.push(...newRanges);
values.set(index, ranges);
fields.set(field, values);
this.highlighters.set(id, fields);
}
and(other: Match) {
const newWeight = new Match();
for (const [id, score] of this.scores) {
if (other.scores.has(id)) {
newWeight.addScore(id, score + (other.scores.get(id) ?? 0));
newWeight.copyExtData(this, id);
newWeight.copyExtData(other, id);
}
}
return newWeight;
}
or(other: Match) {
const newWeight = new Match();
for (const [id, score] of this.scores) {
newWeight.addScore(id, score);
newWeight.copyExtData(this, id);
}
for (const [id, score] of other.scores) {
newWeight.addScore(id, score);
newWeight.copyExtData(other, id);
}
return newWeight;
}
exclude(other: Match) {
const newWeight = new Match();
for (const [id, score] of this.scores) {
if (!other.scores.has(id)) {
newWeight.addScore(id, score);
newWeight.copyExtData(this, id);
}
}
return newWeight;
}
boost(boost: number) {
const newWeight = new Match();
for (const [id, score] of this.scores) {
newWeight.addScore(id, score * boost);
newWeight.copyExtData(this, id);
}
return newWeight;
}
toArray() {
return Array.from(this.scores.entries())
.sort((a, b) => b[1] - a[1])
.map(e => e[0]);
}
private copyExtData(from: Match, id: number) {
for (const [field, values] of from.highlighters.get(id) ?? []) {
for (const [index, ranges] of values) {
this.addHighlighter(id, field, index, ranges);
}
}
}
}

View File

@@ -0,0 +1,162 @@
import Graphemer from 'graphemer';
export interface Tokenizer {
tokenize(text: string): Token[];
}
export interface Token {
term: string;
start: number;
end: number;
}
export class SimpleTokenizer implements Tokenizer {
tokenize(text: string): Token[] {
const tokens: Token[] = [];
let start = 0;
let end = 0;
let inWord = false;
for (let i = 0; i < text.length; i++) {
const c = text[i];
if (c.match(/[\n\r\p{Z}\p{P}]/u)) {
if (inWord) {
end = i;
tokens.push({
term: text.substring(start, end).toLowerCase(),
start,
end,
});
inWord = false;
}
} else {
if (!inWord) {
start = i;
end = i;
inWord = true;
}
}
}
if (inWord) {
tokens.push({
term: text.substring(start).toLowerCase(),
start,
end: text.length,
});
}
return tokens;
}
}
export class NGramTokenizer implements Tokenizer {
constructor(private readonly n: number) {}
tokenize(text: string): Token[] {
const splitted: Token[] = [];
for (let i = 0; i < text.length; ) {
const nextBreak = Graphemer.nextBreak(text, i);
const c = text.substring(i, nextBreak);
splitted.push({
term: c,
start: i,
end: nextBreak,
});
i = nextBreak;
}
const tokens: Token[] = [];
for (let i = 0; i < splitted.length - this.n + 1; i++) {
tokens.push(
splitted.slice(i, i + this.n).reduce(
(acc, t) => ({
term: acc.term + t.term,
start: Math.min(acc.start, t.start),
end: Math.max(acc.end, t.end),
}),
{ term: '', start: Infinity, end: -Infinity }
)
);
}
return tokens;
}
}
export class GeneralTokenizer implements Tokenizer {
constructor() {}
tokenizeWord(word: string, lang: string): Token[] {
if (lang === 'en') {
return [{ term: word.toLowerCase(), start: 0, end: word.length }];
} else if (lang === 'cjk') {
if (word.length < 3) {
return [{ term: word, start: 0, end: word.length }];
}
return new NGramTokenizer(2).tokenize(word);
} else if (lang === 'emoji') {
return new NGramTokenizer(1).tokenize(word);
} else if (lang === '-') {
return [];
}
throw new Error('Not implemented');
}
testLang(c: string): string {
if (c.match(/[\p{Emoji}]/u)) {
return 'emoji';
} else if (c.match(/[\p{sc=Han}\p{scx=Hira}\p{scx=Kana}\p{sc=Hang}]/u)) {
return 'cjk';
} else if (c.match(/[\n\r\p{Z}\p{P}]/u)) {
return '-';
} else {
return 'en';
}
}
tokenize(text: string): Token[] {
const tokens: Token[] = [];
let start = 0;
let end = 0;
let lang: string | null = null;
for (let i = 0; i < text.length; ) {
const nextBreak = Graphemer.nextBreak(text, i);
const c = text.substring(i, nextBreak);
const l = this.testLang(c);
if (lang !== l) {
if (lang !== null) {
end = i;
tokens.push(
...this.tokenizeWord(text.substring(start, end), lang).map(
token => ({
...token,
start: token.start + start,
end: token.end + start,
})
)
);
}
start = i;
end = i;
lang = l;
}
i = nextBreak;
}
if (lang !== null) {
tokens.push(
...this.tokenizeWord(text.substring(start, text.length), lang).map(
token => ({
...token,
start: token.start + start,
end: token.end + start,
})
)
);
}
return tokens;
}
}

View File

@@ -0,0 +1,104 @@
import {
asyncScheduler,
defer,
exhaustMap,
finalize,
Observable,
type ObservableInput,
type OperatorFunction,
retry,
scheduled,
Subject,
throttle,
throwError,
timer,
} from 'rxjs';
import { MANUALLY_STOP } from '../../../utils/throw-if-aborted';
/**
* Like exhaustMap, but also includes the trailing value emitted from the source observable while waiting for the preceding inner observable to complete
*
* Original code adapted from https://github.com/ReactiveX/rxjs/issues/5004
* @param {function<T, K>(value: T, ?index: number): ObservableInput<K>} project - A function that, when applied to an item emitted by the
* source Observable, returns a projected Observable.
*/
export function exhaustMapWithTrailing<T, R>(
project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
return (source$): Observable<R> =>
defer(() => {
const release$ = new Subject<void>();
return source$.pipe(
throttle(() => release$, {
leading: true,
trailing: true,
}),
exhaustMap((value, index) =>
scheduled(project(value, index), asyncScheduler).pipe(
finalize(() => {
release$.next();
})
)
)
);
});
}
/**
* Convert a promise to an observable.
*
* like `from` but support `AbortSignal`.
*/
export function fromPromise<T>(
promise: Promise<T> | ((signal: AbortSignal) => Promise<T>)
): Observable<T> {
return new Observable(subscriber => {
const abortController = new AbortController();
const rawPromise =
promise instanceof Function ? promise(abortController.signal) : promise;
rawPromise
.then(value => {
subscriber.next(value);
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
});
}
/**
* An operator that retries the source observable when an error occurs.
*
* https://en.wikipedia.org/wiki/Exponential_backoff
*/
export function backoffRetry<T>({
when,
count = 3,
delay = 200,
maxDelay = 15000,
}: {
when?: (err: any) => boolean;
count?: number;
delay?: number;
maxDelay?: number;
} = {}) {
return (obs$: Observable<T>) =>
obs$.pipe(
retry({
count,
delay: (err, retryIndex) => {
if (when && !when(err)) {
return throwError(() => err);
}
const d = Math.pow(2, retryIndex - 1) * delay;
return timer(Math.min(d, maxDelay));
},
})
);
}

View File

@@ -37,6 +37,11 @@ Table(PeerClocks)
|------|-------|-----------|-----------|
| str | str | Date | Date |
Table(IndexerSync)
| docId | clock |
|-------|-------|
| str | Date |
Table(BlobSync)
| peer | key | uploadedAt |
|------|-----|------------|
@@ -124,6 +129,43 @@ export interface DocStorageSchema extends DBSchema {
lock: Date;
};
};
indexerSync: {
key: string;
value: {
docId: string;
indexedClock: Date;
};
};
indexerMetadata: {
key: string;
value: {
key: string;
value: any;
};
};
indexerRecords: {
key: number;
value: {
table: string;
id: string;
data: Map<string, string[]>;
};
indexes: { table: string; id: [string, string] };
};
invertedIndex: {
key: number;
value: {
table: string;
nid: number;
pos?: {
i: number /* index */;
l: number /* length */;
rs: [number, number][] /* ranges: [start, end] */;
};
key: ArrayBuffer;
};
indexes: { key: [string, ArrayBuffer]; nid: number };
};
}
const migrate: OpenDBCallbacks<DocStorageSchema>['upgrade'] = (
@@ -199,11 +241,36 @@ const initBlobSync: Migrate = db => {
blobSync.createIndex('peer', 'peer', { unique: false });
};
const initIndexer: Migrate = db => {
db.createObjectStore('indexerMetadata', {
keyPath: 'key',
});
const indexRecordsStore = db.createObjectStore('indexerRecords', {
autoIncrement: true,
});
indexRecordsStore.createIndex('table', 'table', {
unique: false,
});
indexRecordsStore.createIndex('id', ['table', 'id'], {
unique: true,
});
const invertedIndexStore = db.createObjectStore('invertedIndex', {
autoIncrement: true,
});
invertedIndexStore.createIndex('key', ['table', 'key'], {
unique: false,
});
invertedIndexStore.createIndex('nid', 'nid', { unique: false });
db.createObjectStore('indexerSync', {
keyPath: 'docId',
autoIncrement: false,
});
};
// END REGION
// 1. all schema changed should be put in migrations
// 2. order matters
const migrations: Migrate[] = [init, initBlobSync];
const migrations: Migrate[] = [init, initBlobSync, initIndexer];
export const migrator = {
version: migrations.length,

View File

@@ -1,5 +1,5 @@
export * from './connection';
export * from './frontend';
export type * from './frontend'; // // Only export types. For implementation, please import from '@affine/nbstore/frontend'
export * from './storage';
export * from './sync';
export type * from './sync'; // Only export types. For implementation, please import from '@affine/nbstore/sync'
export * from './utils/universal-id';

View File

@@ -47,6 +47,7 @@ export interface DocStorageOptions {
export interface DocStorage extends Storage {
readonly storageType: 'doc';
readonly isReadonly: boolean;
readonly spaceId: string;
/**
* Get a doc record with latest binary.
*/
@@ -103,7 +104,7 @@ export abstract class DocStorageBase<Opts = {}> implements DocStorage {
readonly storageType = 'doc';
abstract readonly connection: Connection;
protected readonly locker: Locker = new SingletonLocker();
protected readonly spaceId = this.options.id;
readonly spaceId = this.options.id;
constructor(protected readonly options: Opts & DocStorageOptions) {}

View File

@@ -0,0 +1,16 @@
import { DummyConnection } from '../../connection';
import type { DocClock } from '../doc';
import { IndexerSyncStorageBase } from '../indexer-sync';
export class DummyIndexerSyncStorage extends IndexerSyncStorageBase {
override connection = new DummyConnection();
override getDocIndexedClock(_docId: string): Promise<DocClock | null> {
return Promise.resolve(null);
}
override setDocIndexedClock(_docClock: DocClock): Promise<void> {
return Promise.resolve();
}
override clearDocIndexedClock(_docId: string): Promise<void> {
return Promise.resolve();
}
}

View File

@@ -0,0 +1,88 @@
import { NEVER, type Observable } from 'rxjs';
import { DummyConnection } from '../../connection';
import {
type AggregateOptions,
type AggregateResult,
type IndexerDocument,
type IndexerSchema,
IndexerStorageBase,
type Query,
type SearchOptions,
type SearchResult,
} from '../indexer';
export class DummyIndexerStorage extends IndexerStorageBase {
readonly isReadonly = true;
readonly connection = new DummyConnection();
override search<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(_table: T, _query: Query<T>, _options?: O): Promise<SearchResult<T, O>> {
return Promise.resolve({
pagination: { count: 0, limit: 0, skip: 0, hasMore: false },
nodes: [],
});
}
override aggregate<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
_table: T,
_query: Query<T>,
_field: keyof IndexerSchema[T],
_options?: O
): Promise<AggregateResult<T, O>> {
return Promise.resolve({
pagination: { count: 0, limit: 0, skip: 0, hasMore: false },
buckets: [],
});
}
override search$<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(_table: T, _query: Query<T>, _options?: O): Observable<SearchResult<T, O>> {
return NEVER;
}
override aggregate$<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
_table: T,
_query: Query<T>,
_field: keyof IndexerSchema[T],
_options?: O
): Observable<AggregateResult<T, O>> {
return NEVER;
}
override deleteByQuery<T extends keyof IndexerSchema>(
_table: T,
_query: Query<T>
): Promise<void> {
return Promise.resolve();
}
override insert<T extends keyof IndexerSchema>(
_table: T,
_document: IndexerDocument<T>
): Promise<void> {
return Promise.resolve();
}
override delete<T extends keyof IndexerSchema>(
_table: T,
_id: string
): Promise<void> {
return Promise.resolve();
}
override update<T extends keyof IndexerSchema>(
_table: T,
_document: IndexerDocument<T>
): Promise<void> {
return Promise.resolve();
}
override refresh<T extends keyof IndexerSchema>(_table: T): Promise<void> {
return Promise.resolve();
}
}

View File

@@ -10,6 +10,10 @@ import { DummyBlobStorage } from './dummy/blob';
import { DummyBlobSyncStorage } from './dummy/blob-sync';
import { DummyDocStorage } from './dummy/doc';
import { DummyDocSyncStorage } from './dummy/doc-sync';
import { DummyIndexerStorage } from './dummy/indexer';
import { DummyIndexerSyncStorage } from './dummy/indexer-sync';
import type { IndexerStorage } from './indexer';
import type { IndexerSyncStorage } from './indexer-sync';
import type { StorageType } from './storage';
type Storages =
@@ -17,7 +21,9 @@ type Storages =
| BlobStorage
| BlobSyncStorage
| DocSyncStorage
| AwarenessStorage;
| AwarenessStorage
| IndexerStorage
| IndexerSyncStorage;
export type SpaceStorageOptions = {
[K in StorageType]?: Storages & { storageType: K };
@@ -37,6 +43,8 @@ export class SpaceStorage {
blobSync: storages.blobSync ?? new DummyBlobSyncStorage(),
doc: storages.doc ?? new DummyDocStorage(),
docSync: storages.docSync ?? new DummyDocSyncStorage(),
indexer: storages.indexer ?? new DummyIndexerStorage(),
indexerSync: storages.indexerSync ?? new DummyIndexerSyncStorage(),
};
}
@@ -83,4 +91,5 @@ export * from './doc';
export * from './doc-sync';
export * from './errors';
export * from './history';
export * from './indexer';
export * from './storage';

View File

@@ -0,0 +1,21 @@
import type { Connection } from '../connection';
import type { DocClock } from './doc';
import type { Storage } from './storage';
export interface IndexerSyncStorage extends Storage {
readonly storageType: 'indexerSync';
getDocIndexedClock(docId: string): Promise<DocClock | null>;
setDocIndexedClock(docClock: DocClock): Promise<void>;
clearDocIndexedClock(docId: string): Promise<void>;
}
export abstract class IndexerSyncStorageBase implements IndexerSyncStorage {
readonly storageType = 'indexerSync';
abstract connection: Connection<any>;
abstract getDocIndexedClock(docId: string): Promise<DocClock | null>;
abstract setDocIndexedClock(docClock: DocClock): Promise<void>;
abstract clearDocIndexedClock(docId: string): Promise<void>;
}

View File

@@ -0,0 +1,176 @@
export * from './indexer/document';
export * from './indexer/field-type';
export * from './indexer/query';
export * from './indexer/schema';
import type { Observable } from 'rxjs';
import type { Connection } from '../connection';
import type { IndexerDocument } from './indexer/document';
import type { Query } from './indexer/query';
import type { IndexerSchema } from './indexer/schema';
import type { Storage } from './storage';
export interface IndexerStorage extends Storage {
readonly storageType: 'indexer';
readonly isReadonly: boolean;
search<T extends keyof IndexerSchema, const O extends SearchOptions<T>>(
table: T,
query: Query<T>,
options?: O
): Promise<SearchResult<T, O>>;
aggregate<T extends keyof IndexerSchema, const O extends AggregateOptions<T>>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Promise<AggregateResult<T, O>>;
search$<T extends keyof IndexerSchema, const O extends SearchOptions<T>>(
table: T,
query: Query<T>,
options?: O
): Observable<SearchResult<T, O>>;
aggregate$<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Observable<AggregateResult<T, O>>;
deleteByQuery<T extends keyof IndexerSchema>(
table: T,
query: Query<T>
): Promise<void>;
insert<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument<T>
): Promise<void>;
delete<T extends keyof IndexerSchema>(table: T, id: string): Promise<void>;
update<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument<T>
): Promise<void>;
refresh<T extends keyof IndexerSchema>(table: T): Promise<void>;
}
type ResultPagination = {
count: number;
limit: number;
skip: number;
hasMore: boolean;
};
type PaginationOption = { limit?: number; skip?: number };
type HighlightAbleField<T extends keyof IndexerSchema> = {
[K in keyof IndexerSchema[T]]: IndexerSchema[T][K] extends 'FullText'
? K
: never;
}[keyof IndexerSchema[T]];
export type SearchOptions<T extends keyof IndexerSchema> = {
pagination?: PaginationOption;
highlights?: { field: HighlightAbleField<T>; before: string; end: string }[];
fields?: (keyof IndexerSchema[T])[];
};
export type SearchResult<
T extends keyof IndexerSchema,
O extends SearchOptions<T>,
> = {
pagination: ResultPagination;
nodes: ({ id: string; score: number } & (O['fields'] extends any[]
? { fields: { [key in O['fields'][number]]: string | string[] } }
: unknown) &
(O['highlights'] extends any[]
? { highlights: { [key in O['highlights'][number]['field']]: string[] } }
: unknown))[];
};
export interface AggregateOptions<T extends keyof IndexerSchema> {
pagination?: PaginationOption;
hits?: SearchOptions<T>;
}
export type AggregateResult<
T extends keyof IndexerSchema,
O extends AggregateOptions<T>,
> = {
pagination: ResultPagination;
buckets: ({
key: string;
score: number;
count: number;
} & (O['hits'] extends object
? { hits: SearchResult<T, O['hits']> }
: unknown))[];
};
export abstract class IndexerStorageBase implements IndexerStorage {
readonly storageType = 'indexer';
abstract readonly connection: Connection;
abstract readonly isReadonly: boolean;
abstract search<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(table: T, query: Query<T>, options?: O): Promise<SearchResult<T, O>>;
abstract aggregate<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Promise<AggregateResult<T, O>>;
abstract search$<
T extends keyof IndexerSchema,
const O extends SearchOptions<T>,
>(table: T, query: Query<T>, options?: O): Observable<SearchResult<T, O>>;
abstract aggregate$<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Observable<AggregateResult<T, O>>;
abstract deleteByQuery<T extends keyof IndexerSchema>(
table: T,
query: Query<T>
): Promise<void>;
abstract insert<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument<T>
): Promise<void>;
abstract delete<T extends keyof IndexerSchema>(
table: T,
id: string
): Promise<void>;
abstract update<T extends keyof IndexerSchema>(
table: T,
document: IndexerDocument<T>
): Promise<void>;
abstract refresh<T extends keyof IndexerSchema>(table: T): Promise<void>;
}

View File

@@ -0,0 +1,58 @@
import type { IndexerSchema } from './schema';
export class IndexerDocument<
S extends keyof IndexerSchema = keyof IndexerSchema,
> {
constructor(public readonly id: string) {}
fields = new Map<keyof IndexerSchema[S], string[]>();
public insert<F extends keyof IndexerSchema[S]>(
field: F,
value: string | string[]
) {
const values = this.fields.get(field) ?? [];
if (Array.isArray(value)) {
values.push(...value);
} else {
values.push(value);
}
this.fields.set(field, values);
}
get<F extends keyof IndexerSchema[S]>(
field: F
): string[] | string | undefined {
const values = this.fields.get(field);
if (values === undefined) {
return undefined;
} else if (values.length === 1) {
return values[0];
} else {
return values;
}
}
static from<S extends keyof IndexerSchema>(
id: string,
map:
| Partial<Record<keyof IndexerSchema[S], string | string[]>>
| Map<keyof IndexerSchema[S], string | string[]>
): IndexerDocument<S> {
const doc = new IndexerDocument<S>(id);
if (map instanceof Map) {
for (const [key, value] of map) {
doc.insert(key, value);
}
} else {
for (const key in map) {
if (map[key] === undefined || map[key] === null) {
continue;
}
doc.insert(key, map[key]);
}
}
return doc;
}
}

View File

@@ -0,0 +1 @@
export type IndexFieldType = 'Integer' | 'FullText' | 'String' | 'Boolean';

View File

@@ -0,0 +1,35 @@
import type { IndexerSchema } from './schema';
export type MatchQuery<T extends keyof IndexerSchema> = {
type: 'match';
field: keyof IndexerSchema[T];
match: string;
};
export type BoostQuery = {
type: 'boost';
query: Query<any>;
boost: number;
};
export type BooleanQuery<T extends keyof IndexerSchema> = {
type: 'boolean';
occur: 'should' | 'must' | 'must_not';
queries: Query<T>[];
};
export type ExistsQuery<T extends keyof IndexerSchema> = {
type: 'exists';
field: keyof IndexerSchema[T];
};
export type AllQuery = {
type: 'all';
};
export type Query<T extends keyof IndexerSchema> =
| BooleanQuery<T>
| MatchQuery<T>
| AllQuery
| ExistsQuery<T>
| BoostQuery;

View File

@@ -0,0 +1,51 @@
import type { IndexFieldType } from './field-type';
export const IndexerSchema = {
doc: {
docId: 'String',
title: 'FullText',
// summary of the doc, used for preview
summary: { type: 'String', index: false },
},
block: {
docId: 'String',
blockId: 'String',
content: 'FullText',
flavour: 'String',
blob: 'String',
// reference doc id
// ['xxx','yyy']
refDocId: 'String',
// reference info, used for backlink to specific block
// [{"docId":"xxx","mode":"page","blockIds":["gt5Yfq1maYvgNgpi13rIq"]},{"docId":"yyy","mode":"edgeless","blockIds":["k5prpOlDF-9CzfatmO0W7"]}]
ref: { type: 'String', index: false },
// parent block flavour
parentFlavour: 'String',
// parent block id
parentBlockId: 'String',
// additional info
// { "databaseName": "xxx", "displayMode": "page/edgeless", "noteBlockId": "xxx" }
additional: { type: 'String', index: false },
markdownPreview: { type: 'String', index: false },
},
} satisfies Record<string, Record<string, IndexerFieldSchema>>;
export type IndexerFieldSchema =
| IndexFieldType
| {
type: IndexFieldType;
/**
* If false, the field will not be indexed, and thus not searchable.
*
* default: true
*/
index?: boolean;
/**
* If false, the field will not be stored, and not included in the search result.
*
* default: true
*/
store?: boolean;
};
export type IndexerSchema = typeof IndexerSchema;

View File

@@ -1,6 +1,13 @@
import type { Connection } from '../connection';
export type StorageType = 'blob' | 'blobSync' | 'doc' | 'docSync' | 'awareness';
export type StorageType =
| 'blob'
| 'blobSync'
| 'doc'
| 'docSync'
| 'awareness'
| 'indexer'
| 'indexerSync';
export interface Storage {
readonly storageType: StorageType;

View File

@@ -420,7 +420,11 @@ class BlobSyncPeerStatus {
return () => {
dispose.unsubscribe();
};
});
}).pipe(
share({
connector: () => new ReplaySubject(1),
})
);
}
private readonly statusUpdatedSubject$ = new Subject<string | true>();

View File

@@ -4,10 +4,12 @@ import type { SpaceStorage } from '../storage';
import { AwarenessSyncImpl } from './awareness';
import { BlobSyncImpl } from './blob';
import { DocSyncImpl, type DocSyncState } from './doc';
import { IndexerSyncImpl } from './indexer';
import type { PeerStorageOptions } from './types';
export type { BlobSyncState } from './blob';
export type { DocSyncDocState, DocSyncState } from './doc';
export type { IndexerDocSyncState, IndexerSyncState } from './indexer';
export interface SyncState {
doc?: DocSyncState;
@@ -17,6 +19,7 @@ export class Sync {
readonly doc: DocSyncImpl;
readonly blob: BlobSyncImpl;
readonly awareness: AwarenessSyncImpl;
readonly indexer: IndexerSyncImpl;
readonly state$: Observable<SyncState>;
@@ -26,6 +29,8 @@ export class Sync {
const docSync = storages.local.get('docSync');
const blobSync = storages.local.get('blobSync');
const awareness = storages.local.get('awareness');
const indexer = storages.local.get('indexer');
const indexerSync = storages.local.get('indexerSync');
this.doc = new DocSyncImpl(
{
@@ -60,6 +65,7 @@ export class Sync {
])
),
});
this.indexer = new IndexerSyncImpl(doc, indexer, indexerSync);
this.state$ = this.doc.state$.pipe(map(doc => ({ doc })));
}
@@ -67,10 +73,12 @@ export class Sync {
start() {
this.doc?.start();
this.blob?.start();
this.indexer?.start();
}
stop() {
this.doc?.stop();
this.blob?.stop();
this.indexer?.stop();
}
}

View File

@@ -0,0 +1,862 @@
import { defaultBlockMarkdownAdapterMatchers } from '@blocksuite/affine/adapters';
import { Container } from '@blocksuite/affine/global/di';
import {
InlineDeltaToMarkdownAdapterExtensions,
MarkdownInlineToDeltaAdapterExtensions,
} from '@blocksuite/affine/inlines/preset';
import type {
AttachmentBlockModel,
BookmarkBlockModel,
EmbedBlockModel,
ImageBlockModel,
TableBlockModel,
} from '@blocksuite/affine/model';
import { AffineSchemas } from '@blocksuite/affine/schemas';
import { MarkdownAdapter } from '@blocksuite/affine/shared/adapters';
import type { AffineTextAttributes } from '@blocksuite/affine/shared/types';
import {
createYProxy,
type DeltaInsert,
type DraftModel,
Schema,
Transformer,
type TransformerMiddleware,
type YBlock,
} from '@blocksuite/affine/store';
import { uniq } from 'lodash-es';
import {
Array as YArray,
type Doc as YDoc,
Map as YMap,
Text as YText,
} from 'yjs';
import { IndexerDocument } from '../../storage';
const blocksuiteSchema = new Schema();
blocksuiteSchema.register([...AffineSchemas]);
interface BlockDocumentInfo {
docId: string;
blockId: string;
content?: string | string[];
flavour: string;
blob?: string[];
refDocId?: string[];
ref?: string[];
parentFlavour?: string;
parentBlockId?: string;
additional?: {
databaseName?: string;
displayMode?: string;
noteBlockId?: string;
};
yblock: YMap<any>;
markdownPreview?: string;
}
const bookmarkFlavours = new Set([
'affine:bookmark',
'affine:embed-youtube',
'affine:embed-figma',
'affine:embed-github',
'affine:embed-loom',
]);
function generateMarkdownPreviewBuilder(
yRootDoc: YDoc,
workspaceId: string,
blocks: BlockDocumentInfo[]
) {
function yblockToDraftModal(yblock: YBlock): DraftModel | null {
const flavour = yblock.get('sys:flavour') as string;
const blockSchema = blocksuiteSchema.flavourSchemaMap.get(flavour);
if (!blockSchema) {
return null;
}
const keys = Array.from(yblock.keys())
.filter(key => key.startsWith('prop:'))
.map(key => key.substring(5));
const props = Object.fromEntries(
keys.map(key => [key, createYProxy(yblock.get(`prop:${key}`))])
);
return {
props,
id: yblock.get('sys:id') as string,
flavour,
children: [],
role: blockSchema.model.role,
version: (yblock.get('sys:version') as number) ?? blockSchema.version,
keys: Array.from(yblock.keys())
.filter(key => key.startsWith('prop:'))
.map(key => key.substring(5)),
} as unknown as DraftModel;
}
const titleMiddleware: TransformerMiddleware = ({ adapterConfigs }) => {
const pages = yRootDoc.getMap('meta').get('pages');
if (!(pages instanceof YArray)) {
return;
}
for (const meta of pages.toArray()) {
adapterConfigs.set(
'title:' + meta.get('id'),
meta.get('title')?.toString() ?? 'Untitled'
);
}
};
const baseUrl = `/workspace/${workspaceId}`;
function getDocLink(docId: string, blockId: string) {
const searchParams = new URLSearchParams();
searchParams.set('blockIds', blockId);
return `${baseUrl}/${docId}?${searchParams.toString()}`;
}
const docLinkBaseURLMiddleware: TransformerMiddleware = ({
adapterConfigs,
}) => {
adapterConfigs.set('docLinkBaseUrl', baseUrl);
};
const container = new Container();
[
...MarkdownInlineToDeltaAdapterExtensions,
...defaultBlockMarkdownAdapterMatchers,
...InlineDeltaToMarkdownAdapterExtensions,
].forEach(ext => {
ext.setup(container);
});
const provider = container.provider();
const markdownAdapter = new MarkdownAdapter(
new Transformer({
schema: blocksuiteSchema,
blobCRUD: {
delete: () => Promise.resolve(),
get: () => Promise.resolve(null),
list: () => Promise.resolve([]),
set: () => Promise.resolve(''),
},
docCRUD: {
create: () => {
throw new Error('Not implemented');
},
get: () => null,
delete: () => {},
},
middlewares: [docLinkBaseURLMiddleware, titleMiddleware],
}),
provider
);
const markdownPreviewCache = new WeakMap<BlockDocumentInfo, string | null>();
function trimCodeBlock(markdown: string) {
const lines = markdown.split('\n').filter(line => line.trim() !== '');
if (lines.length > 5) {
return [...lines.slice(0, 4), '...', lines.at(-1), ''].join('\n');
}
return [...lines, ''].join('\n');
}
function trimParagraph(markdown: string) {
const lines = markdown.split('\n').filter(line => line.trim() !== '');
if (lines.length > 3) {
return [...lines.slice(0, 3), '...', lines.at(-1), ''].join('\n');
}
return [...lines, ''].join('\n');
}
function getListDepth(block: BlockDocumentInfo) {
let parentBlockCount = 0;
let currentBlock: BlockDocumentInfo | undefined = block;
do {
currentBlock = blocks.find(
b => b.blockId === currentBlock?.parentBlockId
);
// reach the root block. do not count it.
if (!currentBlock || currentBlock.flavour !== 'affine:list') {
break;
}
parentBlockCount++;
} while (currentBlock);
return parentBlockCount;
}
// only works for list block
function indentMarkdown(markdown: string, depth: number) {
if (depth <= 0) {
return markdown;
}
return (
markdown
.split('\n')
.map(line => ' '.repeat(depth) + line)
.join('\n') + '\n'
);
}
const generateDatabaseMarkdownPreview = (block: BlockDocumentInfo) => {
const isDatabaseBlock = (block: BlockDocumentInfo) => {
return block.flavour === 'affine:database';
};
const model = yblockToDraftModal(block.yblock);
if (!model) {
return null;
}
let dbBlock: BlockDocumentInfo | null = null;
if (isDatabaseBlock(block)) {
dbBlock = block;
} else {
const parentBlock = blocks.find(b => b.blockId === block.parentBlockId);
if (parentBlock && isDatabaseBlock(parentBlock)) {
dbBlock = parentBlock;
}
}
if (!dbBlock) {
return null;
}
const url = getDocLink(block.docId, dbBlock.blockId);
const title = dbBlock.additional?.databaseName;
return `[database · ${title || 'Untitled'}][](${url})\n`;
};
const generateImageMarkdownPreview = (block: BlockDocumentInfo) => {
const isImageModel = (
model: DraftModel | null
): model is DraftModel<ImageBlockModel> => {
return model?.flavour === 'affine:image';
};
const model = yblockToDraftModal(block.yblock);
if (!isImageModel(model)) {
return null;
}
const info = ['an image block'];
if (model.props.sourceId) {
info.push(`file id ${model.props.sourceId}`);
}
if (model.props.caption) {
info.push(`with caption ${model.props.caption}`);
}
return info.join(', ') + '\n';
};
const generateEmbedMarkdownPreview = (block: BlockDocumentInfo) => {
const isEmbedModel = (
model: DraftModel | null
): model is DraftModel<EmbedBlockModel> => {
return (
model?.flavour === 'affine:embed-linked-doc' ||
model?.flavour === 'affine:embed-synced-doc'
);
};
const draftModel = yblockToDraftModal(block.yblock);
if (!isEmbedModel(draftModel)) {
return null;
}
const url = getDocLink(block.docId, draftModel.id);
return `[](${url})\n`;
};
const generateLatexMarkdownPreview = (block: BlockDocumentInfo) => {
let content =
typeof block.content === 'string'
? block.content.trim()
: block.content?.join('').trim();
content = content?.split('\n').join(' ') ?? '';
return `LaTeX, with value ${content}\n`;
};
const generateBookmarkMarkdownPreview = (block: BlockDocumentInfo) => {
const isBookmarkModel = (
model: DraftModel | null
): model is DraftModel<BookmarkBlockModel> => {
return bookmarkFlavours.has(model?.flavour ?? '');
};
const draftModel = yblockToDraftModal(block.yblock);
if (!isBookmarkModel(draftModel)) {
return null;
}
const title = draftModel.props.title;
const url = draftModel.props.url;
return `[${title}](${url})\n`;
};
const generateAttachmentMarkdownPreview = (block: BlockDocumentInfo) => {
const isAttachmentModel = (
model: DraftModel | null
): model is DraftModel<AttachmentBlockModel> => {
return model?.flavour === 'affine:attachment';
};
const draftModel = yblockToDraftModal(block.yblock);
if (!isAttachmentModel(draftModel)) {
return null;
}
return `[${draftModel.props.name}](${draftModel.props.sourceId})\n`;
};
const generateTableMarkdownPreview = (block: BlockDocumentInfo) => {
const isTableModel = (
model: DraftModel | null
): model is DraftModel<TableBlockModel> => {
return model?.flavour === 'affine:table';
};
const draftModel = yblockToDraftModal(block.yblock);
if (!isTableModel(draftModel)) {
return null;
}
const url = getDocLink(block.docId, draftModel.id);
return `[table][](${url})\n`;
};
const generateMarkdownPreview = async (block: BlockDocumentInfo) => {
if (markdownPreviewCache.has(block)) {
return markdownPreviewCache.get(block);
}
const flavour = block.flavour;
let markdown: string | null = null;
if (
flavour === 'affine:paragraph' ||
flavour === 'affine:list' ||
flavour === 'affine:code'
) {
const draftModel = yblockToDraftModal(block.yblock);
markdown =
block.parentFlavour === 'affine:database'
? generateDatabaseMarkdownPreview(block)
: ((draftModel ? await markdownAdapter.fromBlock(draftModel) : null)
?.file ?? null);
if (markdown) {
if (flavour === 'affine:code') {
markdown = trimCodeBlock(markdown);
} else if (flavour === 'affine:paragraph') {
markdown = trimParagraph(markdown);
}
}
} else if (flavour === 'affine:database') {
markdown = generateDatabaseMarkdownPreview(block);
} else if (
flavour === 'affine:embed-linked-doc' ||
flavour === 'affine:embed-synced-doc'
) {
markdown = generateEmbedMarkdownPreview(block);
} else if (flavour === 'affine:attachment') {
markdown = generateAttachmentMarkdownPreview(block);
} else if (flavour === 'affine:image') {
markdown = generateImageMarkdownPreview(block);
} else if (flavour === 'affine:surface' || flavour === 'affine:page') {
// skip
} else if (flavour === 'affine:latex') {
markdown = generateLatexMarkdownPreview(block);
} else if (bookmarkFlavours.has(flavour)) {
markdown = generateBookmarkMarkdownPreview(block);
} else if (flavour === 'affine:table') {
markdown = generateTableMarkdownPreview(block);
} else {
console.warn(`unknown flavour: ${flavour}`);
}
if (markdown && flavour === 'affine:list') {
const blockDepth = getListDepth(block);
markdown = indentMarkdown(markdown, Math.max(0, blockDepth));
}
markdownPreviewCache.set(block, markdown);
return markdown;
};
return generateMarkdownPreview;
}
// remove the indent of the first line of list
// e.g.,
// ```
// - list item 1
// - list item 2
// ```
// becomes
// ```
// - list item 1
// - list item 2
// ```
function unindentMarkdown(markdown: string) {
const lines = markdown.split('\n');
const res: string[] = [];
let firstListFound = false;
let baseIndent = 0;
for (let current of lines) {
const indent = current.match(/^\s*/)?.[0]?.length ?? 0;
if (indent > 0) {
if (!firstListFound) {
// For the first list item, remove all indentation
firstListFound = true;
baseIndent = indent;
current = current.trimStart();
} else {
// For subsequent list items, maintain relative indentation
current =
' '.repeat(Math.max(0, indent - baseIndent)) + current.trimStart();
}
}
res.push(current);
}
return res.join('\n');
}
export async function crawlingDocData({
ydoc,
rootYDoc,
spaceId,
docId,
}: {
ydoc: YDoc;
rootYDoc: YDoc;
spaceId: string;
docId: string;
}): Promise<{
blocks: IndexerDocument<'block'>[];
preview?: string;
}> {
let docTitle = '';
let summaryLenNeeded = 1000;
let summary = '';
const blockDocuments: BlockDocumentInfo[] = [];
const generateMarkdownPreview = generateMarkdownPreviewBuilder(
rootYDoc,
spaceId,
blockDocuments
);
const blocks = ydoc.getMap<any>('blocks');
// build a parent map for quick lookup
// for each block, record its parent id
const parentMap: Record<string, string | null> = {};
for (const [id, block] of blocks.entries()) {
const children = block.get('sys:children') as YArray<string> | undefined;
if (children instanceof YArray && children.length) {
for (const child of children) {
parentMap[child] = id;
}
}
}
if (blocks.size === 0) {
return { blocks: [] };
}
// find the nearest block that satisfies the predicate
const nearest = (
blockId: string,
predicate: (block: YMap<any>) => boolean
) => {
let current: string | null = blockId;
while (current) {
const block = blocks.get(current);
if (block && predicate(block)) {
return block;
}
current = parentMap[current] ?? null;
}
return null;
};
const nearestByFlavour = (blockId: string, flavour: string) =>
nearest(blockId, block => block.get('sys:flavour') === flavour);
let rootBlockId: string | null = null;
for (const block of blocks.values()) {
const flavour = block.get('sys:flavour')?.toString();
const blockId = block.get('sys:id')?.toString();
if (flavour === 'affine:page' && blockId) {
rootBlockId = blockId;
}
}
if (!rootBlockId) {
return { blocks: [] };
}
const queue: { parent?: string; id: string }[] = [{ id: rootBlockId }];
const visited = new Set<string>(); // avoid loop
const pushChildren = (id: string, block: YMap<any>) => {
const children = block.get('sys:children');
if (children instanceof YArray && children.length) {
for (let i = children.length - 1; i >= 0; i--) {
const childId = children.get(i);
if (childId && !visited.has(childId)) {
queue.push({ parent: id, id: childId });
visited.add(childId);
}
}
}
};
// #region first loop - generate block base info
while (queue.length) {
const next = queue.pop();
if (!next) {
break;
}
const { parent: parentBlockId, id: blockId } = next;
const block = blockId ? blocks.get(blockId) : null;
const parentBlock = parentBlockId ? blocks.get(parentBlockId) : null;
if (!block) {
break;
}
const flavour = block.get('sys:flavour')?.toString();
const parentFlavour = parentBlock?.get('sys:flavour')?.toString();
const noteBlock = nearestByFlavour(blockId, 'affine:note');
// display mode:
// - both: page and edgeless -> fallback to page
// - page: only page -> page
// - edgeless: only edgeless -> edgeless
// - undefined: edgeless (assuming it is a normal element on the edgeless)
let displayMode = noteBlock?.get('prop:displayMode') ?? 'edgeless';
if (displayMode === 'both') {
displayMode = 'page';
}
const noteBlockId: string | undefined = noteBlock
?.get('sys:id')
?.toString();
pushChildren(blockId, block);
const commonBlockProps = {
docId: ydoc.guid,
flavour,
blockId,
yblock: block,
additional: { displayMode, noteBlockId },
};
if (flavour === 'affine:page') {
docTitle = block.get('prop:title').toString();
blockDocuments.push({ ...commonBlockProps, content: docTitle });
} else if (
flavour === 'affine:paragraph' ||
flavour === 'affine:list' ||
flavour === 'affine:code'
) {
const text = block.get('prop:text') as YText;
if (!text) {
continue;
}
const deltas: DeltaInsert<AffineTextAttributes>[] = text.toDelta();
const refs = uniq(
deltas
.flatMap(delta => {
if (
delta.attributes &&
delta.attributes.reference &&
delta.attributes.reference.pageId
) {
const { pageId: refDocId, params = {} } =
delta.attributes.reference;
return {
refDocId,
ref: JSON.stringify({ docId: refDocId, ...params }),
};
}
return null;
})
.filter(ref => !!ref)
);
const databaseName =
flavour === 'affine:paragraph' && parentFlavour === 'affine:database' // if block is a database row
? parentBlock?.get('prop:title')?.toString()
: undefined;
blockDocuments.push({
...commonBlockProps,
content: text.toString(),
...refs.reduce<{ refDocId: string[]; ref: string[] }>(
(prev, curr) => {
prev.refDocId.push(curr.refDocId);
prev.ref.push(curr.ref);
return prev;
},
{ refDocId: [], ref: [] }
),
parentFlavour,
parentBlockId,
additional: { ...commonBlockProps.additional, databaseName },
});
if (summaryLenNeeded > 0) {
summary += text.toString();
summaryLenNeeded -= text.length;
}
} else if (
flavour === 'affine:embed-linked-doc' ||
flavour === 'affine:embed-synced-doc'
) {
const pageId = block.get('prop:pageId');
if (typeof pageId === 'string') {
// reference info
const params = block.get('prop:params') ?? {};
blockDocuments.push({
...commonBlockProps,
refDocId: [pageId],
ref: [JSON.stringify({ docId: pageId, ...params })],
parentFlavour,
parentBlockId,
});
}
} else if (flavour === 'affine:attachment' || flavour === 'affine:image') {
const blobId = block.get('prop:sourceId');
if (typeof blobId === 'string') {
blockDocuments.push({
...commonBlockProps,
blob: [blobId],
parentFlavour,
parentBlockId,
});
}
} else if (flavour === 'affine:surface') {
const texts = [];
const elementsObj = block.get('prop:elements');
if (
!(
elementsObj instanceof YMap &&
elementsObj.get('type') === '$blocksuite:internal:native$'
)
) {
continue;
}
const elements = elementsObj.get('value') as YMap<any>;
if (!(elements instanceof YMap)) {
continue;
}
for (const element of elements.values()) {
if (!(element instanceof YMap)) {
continue;
}
const text = element.get('text') as YText;
if (!text) {
continue;
}
texts.push(text.toString());
}
blockDocuments.push({
...commonBlockProps,
content: texts,
parentFlavour,
parentBlockId,
});
} else if (flavour === 'affine:database') {
const texts = [];
const columnsObj = block.get('prop:columns');
const databaseTitle = block.get('prop:title');
if (databaseTitle instanceof YText) {
texts.push(databaseTitle.toString());
}
if (columnsObj instanceof YArray) {
for (const column of columnsObj) {
if (!(column instanceof YMap)) {
continue;
}
if (typeof column.get('name') === 'string') {
texts.push(column.get('name'));
}
const data = column.get('data');
if (!(data instanceof YMap)) {
continue;
}
const options = data.get('options');
if (!(options instanceof YArray)) {
continue;
}
for (const option of options) {
if (!(option instanceof YMap)) {
continue;
}
const value = option.get('value');
if (typeof value === 'string') {
texts.push(value);
}
}
}
}
blockDocuments.push({
...commonBlockProps,
content: texts,
additional: {
...commonBlockProps.additional,
databaseName: databaseTitle?.toString(),
},
});
} else if (flavour === 'affine:latex') {
blockDocuments.push({
...commonBlockProps,
content: block.get('prop:latex')?.toString() ?? '',
});
} else if (flavour === 'affine:table') {
const contents = Array.from<string>(block.keys())
.map(key => {
if (key.startsWith('prop:cells.') && key.endsWith('.text')) {
return block.get(key)?.toString() ?? '';
}
return '';
})
.filter(Boolean);
blockDocuments.push({
...commonBlockProps,
content: contents,
});
} else if (bookmarkFlavours.has(flavour)) {
blockDocuments.push({ ...commonBlockProps });
}
}
// #endregion
// #region second loop - generate markdown preview
const TARGET_PREVIEW_CHARACTER = 500;
const TARGET_PREVIOUS_BLOCK = 1;
const TARGET_FOLLOW_BLOCK = 4;
for (const block of blockDocuments) {
if (block.ref?.length) {
const target = block;
// should only generate the markdown preview belong to the same affine:note
const noteBlock = nearestByFlavour(block.blockId, 'affine:note');
const sameNoteBlocks = noteBlock
? blockDocuments.filter(
candidate =>
nearestByFlavour(candidate.blockId, 'affine:note') === noteBlock
)
: [];
// only generate markdown preview for reference blocks
let previewText = (await generateMarkdownPreview(target)) ?? '';
let previousBlock = 0;
let followBlock = 0;
let previousIndex = sameNoteBlocks.findIndex(
block => block.blockId === target.blockId
);
let followIndex = previousIndex;
while (
!(
(
previewText.length > TARGET_PREVIEW_CHARACTER || // stop if preview text reaches the limit
((previousBlock >= TARGET_PREVIOUS_BLOCK || previousIndex < 0) &&
(followBlock >= TARGET_FOLLOW_BLOCK ||
followIndex >= sameNoteBlocks.length))
) // stop if no more blocks, or preview block reaches the limit
)
) {
if (previousBlock < TARGET_PREVIOUS_BLOCK) {
previousIndex--;
const block =
previousIndex >= 0 ? sameNoteBlocks.at(previousIndex) : null;
const markdown = block ? await generateMarkdownPreview(block) : null;
if (
markdown &&
!previewText.startsWith(
markdown
) /* A small hack to skip blocks with the same content */
) {
previewText = markdown + '\n' + previewText;
previousBlock++;
}
}
if (followBlock < TARGET_FOLLOW_BLOCK) {
followIndex++;
const block = sameNoteBlocks.at(followIndex);
const markdown = block ? await generateMarkdownPreview(block) : null;
if (
markdown &&
!previewText.endsWith(
markdown
) /* A small hack to skip blocks with the same content */
) {
previewText = previewText + '\n' + markdown;
followBlock++;
}
}
}
block.markdownPreview = unindentMarkdown(previewText);
}
}
// #endregion
return {
blocks: blockDocuments.map(block =>
IndexerDocument.from<'block'>(`${docId}:${block.blockId}`, {
docId: block.docId,
blockId: block.blockId,
content: block.content,
flavour: block.flavour,
blob: block.blob,
refDocId: block.refDocId,
ref: block.ref,
parentFlavour: block.parentFlavour,
parentBlockId: block.parentBlockId,
additional: block.additional
? JSON.stringify(block.additional)
: undefined,
markdownPreview: block.markdownPreview,
})
),
preview: summary,
};
}

View File

@@ -0,0 +1,579 @@
import {
filter,
first,
Observable,
ReplaySubject,
share,
Subject,
throttleTime,
} from 'rxjs';
import {
applyUpdate,
type Array as YArray,
Doc as YDoc,
type Map as YMap,
} from 'yjs';
import {
type DocStorage,
IndexerDocument,
type IndexerStorage,
} from '../../storage';
import type { IndexerSyncStorage } from '../../storage/indexer-sync';
import { AsyncPriorityQueue } from '../../utils/async-priority-queue';
import { takeUntilAbort } from '../../utils/take-until-abort';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
import { crawlingDocData } from './crawler';
export interface IndexerSyncState {
/**
* Number of documents currently in the indexing queue
*/
indexing: number;
/**
* Indicates whether all documents have been successfully indexed
*
* This is only for UI display purposes. For logical operations, please use `waitForCompleted()`
*/
completed: boolean;
/**
* Total number of documents in the workspace
*/
total: number;
errorMessage: string | null;
}
export interface IndexerDocSyncState {
/**
* Indicates whether this document is currently in the indexing queue
*/
indexing: boolean;
/**
* Indicates whether this document has been successfully indexed
*
* This is only for UI display purposes. For logical operations, please use `waitForDocCompleted()`
*/
completed: boolean;
}
export interface IndexerSync {
state$: Observable<IndexerSyncState>;
docState$(docId: string): Observable<IndexerDocSyncState>;
addPriority(docId: string, priority: number): () => void;
waitForCompleted(signal?: AbortSignal): Promise<void>;
waitForDocCompleted(docId: string, signal?: AbortSignal): Promise<void>;
}
export class IndexerSyncImpl implements IndexerSync {
private abort: AbortController | null = null;
private readonly rootDocId = this.doc.spaceId;
private readonly status = new IndexerSyncStatus(this.rootDocId);
state$ = this.status.state$.pipe(
// throttle the state to 1 second to avoid spamming the UI
throttleTime(1000)
);
docState$(docId: string) {
return this.status.docState$(docId).pipe(
// throttle the state to 1 second to avoid spamming the UI
throttleTime(1000)
);
}
waitForCompleted(signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
this.status.state$
.pipe(
filter(state => state.completed),
takeUntilAbort(signal),
first()
)
.subscribe({
next: () => {
resolve();
},
error: err => {
reject(err);
},
});
});
}
waitForDocCompleted(docId: string, signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
this.status
.docState$(docId)
.pipe(
filter(state => state.completed),
takeUntilAbort(signal),
first()
)
.subscribe({
next: () => {
resolve();
},
error: err => {
reject(err);
},
});
});
}
readonly interval = () =>
new Promise<void>(resolve =>
requestIdleCallback(() => resolve(), {
timeout: 200,
})
);
constructor(
readonly doc: DocStorage,
readonly indexer: IndexerStorage,
readonly indexerSync: IndexerSyncStorage
) {}
start() {
if (this.abort) {
this.abort.abort(MANUALLY_STOP);
}
const abort = new AbortController();
this.abort = abort;
this.mainLoop(abort.signal).catch(error => {
if (error === MANUALLY_STOP) {
return;
}
console.error('index error', error);
});
}
stop() {
this.abort?.abort(MANUALLY_STOP);
this.abort = null;
}
addPriority(id: string, priority: number) {
return this.status.addPriority(id, priority);
}
private async mainLoop(signal?: AbortSignal) {
if (this.indexer.isReadonly) {
return;
}
while (true) {
try {
await this.retryLoop(signal);
} catch (error) {
if (signal?.aborted) {
return;
}
console.error('index error, retry in 5s', error);
this.status.errorMessage =
error instanceof Error ? error.message : `${error}`;
this.status.statusUpdatedSubject$.next(true);
} finally {
// reset all status
this.status.reset();
// wait for 5s before next retry
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 5000);
}),
new Promise((_, reject) => {
// exit if manually stopped
if (signal?.aborted) {
reject(signal.reason);
}
signal?.addEventListener('abort', () => {
reject(signal.reason);
});
}),
]);
}
}
}
private async retryLoop(signal?: AbortSignal) {
await Promise.race([
Promise.all([
this.doc.connection.waitForConnected(signal),
this.indexer.connection.waitForConnected(signal),
this.indexerSync.connection.waitForConnected(signal),
]),
new Promise<void>((_, reject) => {
setTimeout(() => {
reject(new Error('Connect to remote timeout'));
}, 1000 * 30);
}),
new Promise((_, reject) => {
signal?.addEventListener('abort', reason => {
reject(reason);
});
}),
]);
this.status.errorMessage = null;
this.status.statusUpdatedSubject$.next(true);
console.log('indexer sync start');
const unsubscribe = this.doc.subscribeDocUpdate(update => {
if (!this.status.rootDocReady) {
return;
}
if (update.docId === this.rootDocId) {
applyUpdate(this.status.rootDoc, update.bin);
const allDocs = this.getAllDocsFromRootDoc();
for (const [docId, { title }] of allDocs) {
const existingDoc = this.status.docsInRootDoc.get(docId);
if (!existingDoc) {
this.status.scheduleJob(docId);
this.status.docsInRootDoc.set(docId, { title });
this.status.statusUpdatedSubject$.next(docId);
} else {
if (existingDoc.title !== title) {
this.status.docsInRootDoc.set(docId, { title });
this.status.statusUpdatedSubject$.next(docId);
}
}
}
for (const docId of this.status.docsInRootDoc.keys()) {
if (!allDocs.has(docId)) {
this.status.docsInRootDoc.delete(docId);
this.status.statusUpdatedSubject$.next(docId);
}
}
this.status.scheduleJob(this.rootDocId);
} else {
const docId = update.docId;
const existingDoc = this.status.docsInRootDoc.get(docId);
if (existingDoc) {
this.status.scheduleJob(docId);
}
}
});
try {
const rootDocBin = (await this.doc.getDoc(this.rootDocId))?.bin;
if (rootDocBin) {
applyUpdate(this.status.rootDoc, rootDocBin);
}
this.status.scheduleJob(this.rootDocId);
const allDocs = this.getAllDocsFromRootDoc();
this.status.docsInRootDoc = allDocs;
this.status.statusUpdatedSubject$.next(true);
for (const docId of allDocs.keys()) {
this.status.scheduleJob(docId);
}
this.status.rootDocReady = true;
this.status.statusUpdatedSubject$.next(true);
const allIndexedDocs = await this.getAllDocsFromIndexer();
this.status.docsInIndexer = allIndexedDocs;
this.status.statusUpdatedSubject$.next(true);
while (true) {
throwIfAborted(signal);
const docId = await this.status.acceptJob(signal);
if (docId === this.rootDocId) {
// #region crawl root doc
for (const [docId, { title }] of this.status.docsInRootDoc) {
const existingDoc = this.status.docsInIndexer.get(docId);
if (existingDoc) {
if (existingDoc.title !== title) {
// need update
await this.indexer.update(
'doc',
IndexerDocument.from(docId, {
docId,
title,
})
);
this.status.docsInIndexer.set(docId, { title });
this.status.statusUpdatedSubject$.next(docId);
}
} else {
// need add
await this.indexer.insert(
'doc',
IndexerDocument.from(docId, {
docId,
title,
})
);
this.status.docsInIndexer.set(docId, { title });
this.status.statusUpdatedSubject$.next(docId);
}
}
for (const docId of this.status.docsInIndexer.keys()) {
if (!this.status.docsInRootDoc.has(docId)) {
await this.indexer.delete('doc', docId);
await this.indexer.deleteByQuery('block', {
type: 'match',
field: 'docId',
match: docId,
});
await this.indexerSync.clearDocIndexedClock(docId);
this.status.docsInIndexer.delete(docId);
this.status.statusUpdatedSubject$.next(docId);
}
}
await this.indexer.refresh('block');
await this.indexer.refresh('doc');
// #endregion
} else {
// #region crawl doc
const existingDoc = this.status.docsInIndexer.get(docId);
if (!existingDoc) {
// doc is deleted, just skip
continue;
}
const docClock = await this.doc.getDocTimestamp(docId);
if (!docClock) {
// doc is deleted, just skip
continue;
}
const docIndexedClock =
await this.indexerSync.getDocIndexedClock(docId);
if (
docIndexedClock &&
docIndexedClock.timestamp.getTime() === docClock.timestamp.getTime()
) {
// doc is already indexed, just skip
continue;
}
const docBin = await this.doc.getDoc(docId);
if (!docBin) {
// doc is deleted, just skip
continue;
}
const docYDoc = new YDoc({ guid: docId });
applyUpdate(docYDoc, docBin.bin);
let blocks: IndexerDocument<'block'>[] = [];
let preview: string | undefined;
try {
const result = await crawlingDocData({
ydoc: docYDoc,
rootYDoc: this.status.rootDoc,
spaceId: this.status.rootDocId,
docId,
});
blocks = result.blocks;
preview = result.preview;
} catch (error) {
console.error('error crawling doc', error);
}
await this.indexer.deleteByQuery('block', {
type: 'match',
field: 'docId',
match: docId,
});
for (const block of blocks) {
await this.indexer.insert('block', block);
}
await this.indexer.refresh('block');
if (preview) {
await this.indexer.update(
'doc',
IndexerDocument.from(docId, {
summary: preview,
})
);
await this.indexer.refresh('doc');
}
await this.indexerSync.setDocIndexedClock({
docId,
timestamp: docClock.timestamp,
});
// #endregion
}
this.status.completeJob();
}
} finally {
unsubscribe();
}
}
/**
* Get all docs from the root doc, without deleted docs
*/
private getAllDocsFromRootDoc() {
const docs = this.status.rootDoc.getMap('meta').get('pages') as
| YArray<YMap<any>>
| undefined;
const availableDocs = new Map<string, { title: string | undefined }>();
if (docs) {
for (const page of docs) {
const docId = page.get('id');
if (typeof docId !== 'string') {
continue;
}
const inTrash = page.get('trash') ?? false;
const title = page.get('title');
if (!inTrash) {
availableDocs.set(docId, { title });
}
}
}
return availableDocs;
}
private async getAllDocsFromIndexer() {
const docs = await this.indexer.search(
'doc',
{
type: 'all',
},
{
pagination: {
limit: Infinity,
},
fields: ['docId', 'title'],
}
);
return new Map(
docs.nodes.map(node => {
const title = node.fields.title;
return [
node.id,
{
title: typeof title === 'string' ? title : title.at(0),
},
];
})
);
}
}
class IndexerSyncStatus {
prioritySettings = new Map<string, number>();
jobs = new AsyncPriorityQueue();
rootDoc = new YDoc({ guid: this.rootDocId });
rootDocReady = false;
docsInIndexer = new Map<string, { title: string | undefined }>();
docsInRootDoc = new Map<string, { title: string | undefined }>();
currentJob: string | null = null;
errorMessage: string | null = null;
statusUpdatedSubject$ = new Subject<string | true>();
state$ = new Observable<IndexerSyncState>(subscribe => {
const next = () => {
subscribe.next({
indexing: this.jobs.length() + (this.currentJob ? 1 : 0),
total: this.docsInRootDoc.size + 1,
errorMessage: this.errorMessage,
completed: this.rootDocReady && this.jobs.length() === 0,
});
};
next();
const dispose = this.statusUpdatedSubject$.subscribe(() => {
next();
});
return () => {
dispose.unsubscribe();
};
}).pipe(
share({
connector: () => new ReplaySubject(1),
})
);
docState$(docId: string) {
return new Observable<IndexerDocSyncState>(subscribe => {
const next = () => {
subscribe.next({
indexing: this.jobs.has(docId),
completed: this.docsInIndexer.has(docId) && !this.jobs.has(docId),
});
};
next();
const dispose = this.statusUpdatedSubject$.subscribe(updatedDocId => {
if (updatedDocId === docId || updatedDocId === true) {
next();
}
});
return () => {
dispose.unsubscribe();
};
}).pipe(
share({
connector: () => new ReplaySubject(1),
})
);
}
constructor(readonly rootDocId: string) {
this.prioritySettings.set(this.rootDocId, Infinity);
}
scheduleJob(docId: string) {
const priority = this.prioritySettings.get(docId) ?? 0;
this.jobs.push(docId, priority);
this.statusUpdatedSubject$.next(docId);
}
async acceptJob(abort?: AbortSignal) {
const job = await this.jobs.asyncPop(abort);
this.currentJob = job;
this.statusUpdatedSubject$.next(job);
return job;
}
completeJob() {
const job = this.currentJob;
this.currentJob = null;
this.statusUpdatedSubject$.next(job ?? true);
}
addPriority(id: string, priority: number) {
const oldPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, priority);
this.jobs.setPriority(id, oldPriority + priority);
return () => {
const currentPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, currentPriority - priority);
this.jobs.setPriority(id, currentPriority - priority);
};
}
reset() {
// reset all state, except prioritySettings
this.jobs.clear();
this.docsInRootDoc.clear();
this.docsInIndexer.clear();
this.rootDoc = new YDoc();
this.rootDocReady = false;
this.currentJob = null;
this.statusUpdatedSubject$.next(true);
}
}

View File

@@ -52,6 +52,10 @@ export class PriorityQueue {
return removed;
}
has(id: string) {
return this.priorityMap.has(id);
}
clear() {
this.tree.clear();
this.priorityMap.clear();
@@ -64,6 +68,6 @@ export class PriorityQueue {
}
get length() {
return this.tree.count;
return this.tree.count.bind(this.tree);
}
}

View File

@@ -0,0 +1,42 @@
import { Observable, type OperatorFunction } from 'rxjs';
/**
* Creates an operator that takes values from the source Observable until the given AbortSignal aborts.
* When the signal aborts, the Observable completes.
*
* @param signal - The AbortSignal that will trigger completion when aborted
* @returns An operator function that takes values until the signal aborts
*/
export function takeUntilAbort<T>(
signal?: AbortSignal
): OperatorFunction<T, T> {
return (source$: Observable<T>) => {
return new Observable<T>(subscriber => {
if (signal?.aborted) {
subscriber.error(signal.reason);
return;
}
const abortHandler = () => {
subscriber.error(signal?.reason);
};
if (signal) {
signal.addEventListener('abort', abortHandler);
}
const subscription = source$.subscribe({
next: value => subscriber.next(value),
error: err => subscriber.error(err),
complete: () => subscriber.complete(),
});
return () => {
if (signal) {
signal.removeEventListener('abort', abortHandler);
}
subscription.unsubscribe();
};
});
};
}

View File

@@ -1,20 +1,35 @@
import { OpClient, transfer } from '@toeverything/infra/op';
import type { Observable } from 'rxjs';
import { v4 as uuid } from 'uuid';
import { DummyConnection } from '../connection';
import { AwarenessFrontend, BlobFrontend, DocFrontend } from '../frontend';
import {
AwarenessFrontend,
BlobFrontend,
DocFrontend,
IndexerFrontend,
} from '../frontend';
import {
type AggregateOptions,
type AggregateResult,
type AwarenessRecord,
type BlobRecord,
type BlobStorage,
type DocRecord,
type DocStorage,
type DocUpdate,
type IndexerDocument,
type IndexerSchema,
type IndexerStorage,
type ListedBlobRecord,
type Query,
type SearchOptions,
type SearchResult,
} from '../storage';
import type { AwarenessSync } from '../sync/awareness';
import type { BlobSync } from '../sync/blob';
import type { DocSync } from '../sync/doc';
import type { IndexerSync } from '../sync/indexer';
import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
export type { StoreInitOptions as WorkerInitOptions } from './ops';
@@ -85,6 +100,12 @@ export class StoreClient {
this.docFrontend = new DocFrontend(this.docStorage, this.docSync);
this.blobFrontend = new BlobFrontend(this.blobStorage, this.blobSync);
this.awarenessFrontend = new AwarenessFrontend(this.awarenessSync);
this.indexerStorage = new WorkerIndexerStorage(this.client);
this.indexerSync = new WorkerIndexerSync(this.client);
this.indexerFrontend = new IndexerFrontend(
this.indexerStorage,
this.indexerSync
);
}
private readonly docStorage: WorkerDocStorage;
@@ -92,14 +113,18 @@ export class StoreClient {
private readonly docSync: WorkerDocSync;
private readonly blobSync: WorkerBlobSync;
private readonly awarenessSync: WorkerAwarenessSync;
private readonly indexerStorage: WorkerIndexerStorage;
private readonly indexerSync: WorkerIndexerSync;
readonly docFrontend: DocFrontend;
readonly blobFrontend: BlobFrontend;
readonly awarenessFrontend: AwarenessFrontend;
readonly indexerFrontend: IndexerFrontend;
}
class WorkerDocStorage implements DocStorage {
constructor(private readonly client: OpClient<WorkerOps>) {}
spaceId = '';
readonly storageType = 'doc';
readonly isReadonly = false;
@@ -316,3 +341,146 @@ class WorkerAwarenessSync implements AwarenessSync {
};
}
}
class WorkerIndexerStorage implements IndexerStorage {
constructor(private readonly client: OpClient<WorkerOps>) {}
readonly storageType = 'indexer';
readonly isReadonly = true;
connection = new DummyConnection();
search<T extends keyof IndexerSchema, const O extends SearchOptions<T>>(
table: T,
query: Query<T>,
options?: O
): Promise<SearchResult<T, O>> {
return this.client.call('indexerStorage.search', { table, query, options });
}
aggregate<T extends keyof IndexerSchema, const O extends AggregateOptions<T>>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Promise<AggregateResult<T, O>> {
return this.client.call('indexerStorage.aggregate', {
table,
query,
field: field as string,
options,
});
}
search$<T extends keyof IndexerSchema, const O extends SearchOptions<T>>(
table: T,
query: Query<T>,
options?: O
): Observable<SearchResult<T, O>> {
return this.client.ob$('indexerStorage.subscribeSearch', {
table,
query,
options,
});
}
aggregate$<
T extends keyof IndexerSchema,
const O extends AggregateOptions<T>,
>(
table: T,
query: Query<T>,
field: keyof IndexerSchema[T],
options?: O
): Observable<AggregateResult<T, O>> {
return this.client.ob$('indexerStorage.subscribeAggregate', {
table,
query,
field: field as string,
options,
});
}
deleteByQuery<T extends keyof IndexerSchema>(
_table: T,
_query: Query<T>
): Promise<void> {
throw new Error('Method not implemented.');
}
insert<T extends keyof IndexerSchema>(
_table: T,
_document: IndexerDocument<T>
): Promise<void> {
throw new Error('Method not implemented.');
}
delete<T extends keyof IndexerSchema>(_table: T, _id: string): Promise<void> {
throw new Error('Method not implemented.');
}
update<T extends keyof IndexerSchema>(
_table: T,
_document: IndexerDocument<T>
): Promise<void> {
throw new Error('Method not implemented.');
}
refresh<T extends keyof IndexerSchema>(_table: T): Promise<void> {
throw new Error('Method not implemented.');
}
}
class WorkerIndexerSync implements IndexerSync {
constructor(private readonly client: OpClient<WorkerOps>) {}
waitForCompleted(signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const abortListener = () => {
reject(signal?.reason);
subscription.unsubscribe();
};
signal?.addEventListener('abort', abortListener);
const subscription = this.client
.ob$('indexerSync.waitForCompleted')
.subscribe({
complete() {
signal?.removeEventListener('abort', abortListener);
resolve();
},
error(err) {
signal?.removeEventListener('abort', abortListener);
reject(err);
},
});
});
}
waitForDocCompleted(docId: string, signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const abortListener = () => {
reject(signal?.reason);
subscription.unsubscribe();
};
signal?.addEventListener('abort', abortListener);
const subscription = this.client
.ob$('indexerSync.waitForDocCompleted', docId)
.subscribe({
complete() {
signal?.removeEventListener('abort', abortListener);
resolve();
},
error(err) {
signal?.removeEventListener('abort', abortListener);
reject(err);
},
});
});
}
get state$() {
return this.client.ob$('indexerSync.state');
}
docState$(docId: string) {
return this.client.ob$('indexerSync.docState', docId);
}
addPriority(docId: string, priority: number) {
const subscription = this.client
.ob$('indexerSync.addPriority', { docId, priority })
.subscribe();
return () => {
subscription.unsubscribe();
};
}
}

View File

@@ -1,4 +1,3 @@
import { MANUALLY_STOP } from '@toeverything/infra';
import { OpConsumer } from '@toeverything/infra/op';
import { Observable } from 'rxjs';
@@ -7,6 +6,7 @@ import { SpaceStorage } from '../storage';
import type { AwarenessRecord } from '../storage/awareness';
import { Sync } from '../sync';
import type { PeerStorageOptions } from '../sync/types';
import { MANUALLY_STOP } from '../utils/throw-if-aborted';
import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
export type { WorkerManagerOps };
@@ -57,6 +57,14 @@ class StoreConsumer {
return this.ensureSync.awareness;
}
get indexerStorage() {
return this.ensureLocal.get('indexer');
}
get indexerSync() {
return this.ensureSync.indexer;
}
constructor(
private readonly availableStorageImplementations: StorageConstructor[],
init: StoreInitOptions
@@ -262,6 +270,48 @@ class StoreConsumer {
}),
'awarenessSync.collect': ({ collectId, awareness }) =>
collectJobs.get(collectId)?.(awareness),
'indexerStorage.aggregate': ({ table, query, field, options }) =>
this.indexerStorage.aggregate(table, query, field, options),
'indexerStorage.search': ({ table, query, options }) =>
this.indexerStorage.search(table, query, options),
'indexerStorage.subscribeSearch': ({ table, query, options }) =>
this.indexerStorage.search$(table, query, options),
'indexerStorage.subscribeAggregate': ({ table, query, field, options }) =>
this.indexerStorage.aggregate$(table, query, field, options),
'indexerSync.state': () => this.indexerSync.state$,
'indexerSync.docState': (docId: string) =>
this.indexerSync.docState$(docId),
'indexerSync.addPriority': ({ docId, priority }) =>
new Observable(() => {
const undo = this.indexerSync.addPriority(docId, priority);
return () => undo();
}),
'indexerSync.waitForCompleted': () =>
new Observable(subscriber => {
this.indexerSync
.waitForCompleted()
.then(() => {
subscriber.next();
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
}),
'indexerSync.waitForDocCompleted': (docId: string) =>
new Observable(subscriber => {
const abortController = new AbortController();
this.indexerSync
.waitForDocCompleted(docId, abortController.signal)
.then(() => {
subscriber.next();
subscriber.complete();
})
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort(MANUALLY_STOP);
}),
});
}
}

View File

@@ -1,5 +1,7 @@
import type { AvailableStorageImplementations } from '../impls';
import type {
AggregateOptions,
AggregateResult,
BlobRecord,
DocClock,
DocClocks,
@@ -7,11 +9,15 @@ import type {
DocRecord,
DocUpdate,
ListedBlobRecord,
Query,
SearchOptions,
SearchResult,
StorageType,
} from '../storage';
import type { AwarenessRecord } from '../storage/awareness';
import type { BlobSyncBlobState, BlobSyncState } from '../sync/blob';
import type { DocSyncDocState, DocSyncState } from '../sync/doc';
import type { IndexerDocSyncState, IndexerSyncState } from '../sync/indexer';
type StorageInitOptions = Values<{
[key in keyof AvailableStorageImplementations]: {
@@ -61,6 +67,35 @@ interface GroupedWorkerOps {
collect: [{ collectId: string; awareness: AwarenessRecord }, void];
};
indexerStorage: {
search: [
{ table: string; query: Query<any>; options?: SearchOptions<any> },
SearchResult<any, any>,
];
aggregate: [
{
table: string;
query: Query<any>;
field: string;
options?: AggregateOptions<any>;
},
AggregateResult<any, any>,
];
subscribeSearch: [
{ table: string; query: Query<any>; options?: SearchOptions<any> },
SearchResult<any, any>,
];
subscribeAggregate: [
{
table: string;
query: Query<any>;
field: string;
options?: AggregateOptions<any>;
},
AggregateResult<any, any>,
];
};
docSync: {
state: [void, DocSyncState];
docState: [string, DocSyncDocState];
@@ -91,6 +126,14 @@ interface GroupedWorkerOps {
];
collect: [{ collectId: string; awareness: AwarenessRecord }, void];
};
indexerSync: {
state: [void, IndexerSyncState];
docState: [string, IndexerDocSyncState];
addPriority: [{ docId: string; priority: number }, boolean];
waitForCompleted: [void, void];
waitForDocCompleted: [string, void];
};
}
type Values<T> = T extends { [k in keyof T]: any } ? T[keyof T] : never;