From c62d79ab149ef636d13243af628ef25cb3504fbe Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 2 Jul 2024 09:18:08 +0000 Subject: [PATCH] feat(core): run indexer in worker (#7295) --- packages/common/infra/package.json | 1 + packages/common/infra/src/livedata/ops.ts | 4 +- packages/common/infra/src/sync/blob/blob.ts | 3 +- packages/common/infra/src/sync/doc/old-id.md | 24 + .../common/infra/src/sync/indexer/indexer.ts | 3 + .../src/sync/job/impl/indexeddb/index.ts | 7 + packages/common/infra/src/sync/job/runner.ts | 2 +- .../core/src/modules/cloud/services/fetch.ts | 4 +- .../docs-search/entities/docs-indexer.ts | 542 ++++++------------ .../docs-search/services/docs-search.ts | 4 + .../modules/docs-search/worker/in-worker.ts | 313 ++++++++++ .../modules/docs-search/worker/out-worker.ts | 98 ++++ .../src/modules/docs-search/worker/types.ts | 50 ++ 13 files changed, 697 insertions(+), 358 deletions(-) create mode 100644 packages/common/infra/src/sync/doc/old-id.md create mode 100644 packages/frontend/core/src/modules/docs-search/worker/in-worker.ts create mode 100644 packages/frontend/core/src/modules/docs-search/worker/out-worker.ts create mode 100644 packages/frontend/core/src/modules/docs-search/worker/types.ts diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index de0d7c120b..81c23fead7 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -2,6 +2,7 @@ "name": "@toeverything/infra", "type": "module", "private": true, + "sideEffects": false, "exports": { "./blocksuite": "./src/blocksuite/index.ts", "./storage": "./src/storage/index.ts", diff --git a/packages/common/infra/src/livedata/ops.ts b/packages/common/infra/src/livedata/ops.ts index 34e08674c2..711d5b2526 100644 --- a/packages/common/infra/src/livedata/ops.ts +++ b/packages/common/infra/src/livedata/ops.ts @@ -18,6 +18,7 @@ import { timer, } from 'rxjs'; +import { MANUALLY_STOP } from '../utils'; import type { LiveData } from './livedata'; /** @@ -107,7 +108,8 @@ export function fromPromise( .catch(error => { subscriber.error(error); }); - return () => abortController.abort('Aborted'); + + return () => abortController.abort(MANUALLY_STOP); }); } diff --git a/packages/common/infra/src/sync/blob/blob.ts b/packages/common/infra/src/sync/blob/blob.ts index 4bd1615949..658e9989b9 100644 --- a/packages/common/infra/src/sync/blob/blob.ts +++ b/packages/common/infra/src/sync/blob/blob.ts @@ -4,6 +4,7 @@ import { difference } from 'lodash-es'; import { LiveData } from '../../livedata'; import type { Memento } from '../../storage'; +import { MANUALLY_STOP } from '../../utils'; import { BlobStorageOverCapacity } from './error'; const logger = new DebugLogger('affine:blob-engine'); @@ -70,7 +71,7 @@ export class BlobEngine { } stop() { - this.abort?.abort(); + this.abort?.abort(MANUALLY_STOP); this.abort = null; } diff --git a/packages/common/infra/src/sync/doc/old-id.md b/packages/common/infra/src/sync/doc/old-id.md new file mode 100644 index 0000000000..87aed16c7e --- /dev/null +++ b/packages/common/infra/src/sync/doc/old-id.md @@ -0,0 +1,24 @@ +AFFiNE currently has a lot of data stored using the old ID format. Here, we record the usage of IDs to avoid forgetting. + +## Old ID Format + +The format is: + +- `{workspace-id}:space:{nanoid}` Common +- `{workspace-id}:space:page:{nanoid}` + +> Note: sometimes the `workspace-id` is not same with current workspace id. + +## Usage + +- Local Storage + - indexeddb: Both new and old IDs coexist + - sqlite: Both new and old IDs coexist + - server-clock: Only new IDs are stored + - sync-metadata: Both new and old IDs coexist +- Server Storage + - Only stores new IDs but accepts writes using old IDs +- Protocols + - When the client submits an update, both new and old IDs are used. + - When the server broadcasts updates sent by other clients, both new and old IDs are used. + - When the server responds to `client-pre-sync` (listing all updated docids), only new IDs are used. diff --git a/packages/common/infra/src/sync/indexer/indexer.ts b/packages/common/infra/src/sync/indexer/indexer.ts index 027bd74419..cba453022f 100644 --- a/packages/common/infra/src/sync/indexer/indexer.ts +++ b/packages/common/infra/src/sync/indexer/indexer.ts @@ -20,6 +20,9 @@ export interface IndexWriter delete(id: string): void; + // TODO(@eyhn) + // deleteByQuery(query: Query): void; + commit(): Promise; rollback(): void; diff --git a/packages/common/infra/src/sync/job/impl/indexeddb/index.ts b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts index 3708bdf311..3d0799f101 100644 --- a/packages/common/infra/src/sync/job/impl/indexeddb/index.ts +++ b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts @@ -155,6 +155,9 @@ export class IndexedDBJobQueue implements JobQueue { .objectStore('jobs') .delete(typeof id === 'string' ? parseInt(id) : id); } + + trx.commit(); + this.broadcast.postMessage('job-completed'); } async return(jobs: Job[], retry: boolean = false): Promise { @@ -174,6 +177,10 @@ export class IndexedDBJobQueue implements JobQueue { .delete(typeof id === 'string' ? parseInt(id) : id); } } + + trx.commit(); + + this.broadcast.postMessage('job-completed'); } async clear(): Promise { diff --git a/packages/common/infra/src/sync/job/runner.ts b/packages/common/infra/src/sync/job/runner.ts index 7bf04c62d7..3220e21709 100644 --- a/packages/common/infra/src/sync/job/runner.ts +++ b/packages/common/infra/src/sync/job/runner.ts @@ -29,7 +29,7 @@ export class JobRunner { } stop() { - this.abort?.abort(); + this.abort?.abort(MANUALLY_STOP); this.abort = null; } diff --git a/packages/frontend/core/src/modules/cloud/services/fetch.ts b/packages/frontend/core/src/modules/cloud/services/fetch.ts index 988bf97260..8fc8f781b4 100644 --- a/packages/frontend/core/src/modules/cloud/services/fetch.ts +++ b/packages/frontend/core/src/modules/cloud/services/fetch.ts @@ -41,8 +41,8 @@ export class FetchService extends Service { throw externalSignal.reason; } const abortController = new AbortController(); - externalSignal?.addEventListener('abort', () => { - abortController.abort(); + externalSignal?.addEventListener('abort', reason => { + abortController.abort(reason); }); const timeout = init?.timeout ?? 15000; diff --git a/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts index 58bc596b98..5c02227c28 100644 --- a/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts +++ b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts @@ -1,32 +1,32 @@ import { DebugLogger } from '@affine/debug'; -import type { AffineTextAttributes } from '@blocksuite/blocks'; -import type { DeltaInsert } from '@blocksuite/inline'; import type { Job, JobQueue, WorkspaceService } from '@toeverything/infra'; import { - Document, Entity, IndexedDBIndexStorage, IndexedDBJobQueue, JobRunner, LiveData, } from '@toeverything/infra'; -import { difference } from 'lodash-es'; import { map } from 'rxjs'; -import { Array as YArray, Map as YMap, type Text as YText } from 'yjs'; -import { applyUpdate, Doc as YDoc } from 'yjs'; -import { - type BlockIndexSchema, - blockIndexSchema, - docIndexSchema, -} from '../schema'; +import { blockIndexSchema, docIndexSchema } from '../schema'; +import { createWorker, type IndexerWorker } from '../worker/out-worker'; + +export function isEmptyUpdate(binary: Uint8Array) { + return ( + binary.byteLength === 0 || + (binary.byteLength === 2 && binary[0] === 0 && binary[1] === 0) + ); +} const logger = new DebugLogger('crawler'); interface IndexerJobPayload { docId: string; + storageDocId: string; } +// TODO(@eyhn): simplify this, it's too complex export class DocsIndexer extends Entity { private readonly jobQueue: JobQueue = new IndexedDBJobQueue( @@ -51,6 +51,8 @@ export class DocsIndexer extends Entity { private readonly workspaceId = this.workspaceService.workspace.id; + private worker: IndexerWorker | null = null; + readonly status$ = LiveData.from<{ remaining?: number }>( this.jobQueue.status$.pipe( map(status => ({ @@ -67,13 +69,13 @@ export class DocsIndexer extends Entity { setupListener() { this.workspaceEngine.doc.storage.eventBus.on(event => { if (event.clientId === this.workspaceEngine.doc.clientId) { - const docId = event.docId; + const docId = normalizeDocId(event.docId); this.jobQueue .enqueue([ { batchKey: docId, - payload: { docId }, + payload: { docId, storageDocId: event.docId }, }, ]) .catch(err => { @@ -83,23 +85,140 @@ export class DocsIndexer extends Entity { }); } - async execJob(jobs: Job[], _signal: AbortSignal) { + async execJob(jobs: Job[], signal: AbortSignal) { if (jobs.length === 0) { return; } // jobs should have the same docId, so we just pick the first one const docId = jobs[0].payload.docId; + const storageDocId = jobs[0].payload.storageDocId; + + const worker = await this.ensureWorker(signal); const startTime = performance.now(); logger.debug('Start crawling job for docId:', docId); - if (docId) { - if (docId === this.workspaceId) { - await this.crawlingRootDocData(); - } else { - await this.crawlingDocData(docId); + let workerOutput; + + if (docId === this.workspaceId) { + const rootDocBuffer = + await this.workspaceEngine.doc.storage.loadDocFromLocal( + this.workspaceId + ); + + if (!rootDocBuffer) { + return; } + + const allIndexedDocs = ( + await this.docIndex.search( + { + type: 'all', + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + skip: 0, + }, + } + ) + ).nodes.map(n => n.id); + + workerOutput = await worker.run({ + type: 'rootDoc', + allIndexedDocs, + rootDocBuffer, + }); + } else { + const rootDocBuffer = + await this.workspaceEngine.doc.storage.loadDocFromLocal( + this.workspaceId + ); + + const docBuffer = + (await this.workspaceEngine.doc.storage.loadDocFromLocal( + storageDocId + )) ?? new Uint8Array(0); + + if (!rootDocBuffer) { + return; + } + + workerOutput = await worker.run({ + type: 'doc', + docBuffer, + docId, + rootDocBuffer, + }); + } + + if (workerOutput.deletedDoc || workerOutput.addedDoc) { + if (workerOutput.deletedDoc) { + const docIndexWriter = await this.docIndex.write(); + for (const docId of workerOutput.deletedDoc) { + docIndexWriter.delete(docId); + } + await docIndexWriter.commit(); + const blockIndexWriter = await this.blockIndex.write(); + for (const docId of workerOutput.deletedDoc) { + const oldBlocks = await blockIndexWriter.search( + { + type: 'match', + field: 'docId', + match: docId, + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + }, + } + ); + for (const block of oldBlocks.nodes) { + docIndexWriter.delete(block.id); + } + } + await blockIndexWriter.commit(); + } + if (workerOutput.addedDoc) { + const docIndexWriter = await this.docIndex.write(); + for (const { doc } of workerOutput.addedDoc) { + docIndexWriter.put(doc); + } + await docIndexWriter.commit(); + const blockIndexWriter = await this.blockIndex.write(); + for (const { blocks } of workerOutput.addedDoc) { + // delete old blocks + const oldBlocks = await blockIndexWriter.search( + { + type: 'match', + field: 'docId', + match: docId, + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + }, + } + ); + for (const block of oldBlocks.nodes) { + blockIndexWriter.delete(block.id); + } + for (const block of blocks) { + blockIndexWriter.insert(block); + } + } + await blockIndexWriter.commit(); + } + } + + if (workerOutput.reindexDoc) { + await this.jobQueue.enqueue( + workerOutput.reindexDoc.map(({ docId, storageDocId }) => ({ + batchKey: docId, + payload: { docId, storageDocId }, + })) + ); } const duration = performance.now() - startTime; @@ -114,7 +233,7 @@ export class DocsIndexer extends Entity { .enqueue([ { batchKey: this.workspaceId, - payload: { docId: this.workspaceId }, + payload: { docId: this.workspaceId, storageDocId: this.workspaceId }, }, ]) .catch(err => { @@ -122,341 +241,58 @@ export class DocsIndexer extends Entity { }); } - async crawlingDocData(docId: string) { - const rootDocBuffer = - await this.workspaceEngine.doc.storage.loadDocFromLocal(this.workspaceId); - - if (!rootDocBuffer) { - return; - } - - const yRootDoc = new YDoc(); - applyUpdate(yRootDoc, rootDocBuffer); - - const docStoragePossibleIds = Array.from(yRootDoc.getSubdocs()) - .map(doc => doc.guid) - .filter(id => id.endsWith(docId)); - - let docBuffer; - - for (const id of docStoragePossibleIds) { - docBuffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(id); - - if (docBuffer) { - break; - } - } - - if (!docBuffer) { - return; - } - - const ydoc = new YDoc(); - - applyUpdate(ydoc, docBuffer); - - let docExists: boolean | null = null; - - ( - yRootDoc.getMap('meta').get('pages') as YArray> | undefined - )?.forEach(page => { - if (page.get('id') === docId) { - docExists = !(page.get('trash') ?? false); - } - }); - - if (!docExists) { - const indexWriter = await this.docIndex.write(); - indexWriter.delete(docId); - await indexWriter.commit(); - - const blockIndexWriter = await this.blockIndex.write(); - const oldBlocks = await blockIndexWriter.search( - { - type: 'match', - field: 'docId', - match: docId, - }, - { - pagination: { - limit: Number.MAX_SAFE_INTEGER, - }, - } - ); - for (const block of oldBlocks.nodes) { - blockIndexWriter.delete(block.id); - } - await blockIndexWriter.commit(); - } else { - const blocks = ydoc.getMap('blocks'); - - if (blocks.size === 0) { - return; - } - - let docTitle = ''; - - const blockDocuments: Document[] = []; - - for (const block of blocks.values()) { - const flavour = block.get('sys:flavour')?.toString(); - const blockId = block.get('sys:id')?.toString(); - - if (!flavour || !blockId) { - continue; - } - - if (flavour === 'affine:page') { - docTitle = block.get('prop:title').toString(); - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: docTitle, - }) - ); - } - - if ( - flavour === 'affine:paragraph' || - flavour === 'affine:list' || - flavour === 'affine:code' - ) { - const text = block.get('prop:text') as YText; - if (!text) { - continue; - } - - const deltas: DeltaInsert[] = text.toDelta(); - const ref = deltas - .map(delta => { - if ( - delta.attributes && - delta.attributes.reference && - delta.attributes.reference.pageId - ) { - return delta.attributes.reference.pageId; - } - return null; - }) - .filter((link): link is string => !!link); - - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: text.toString(), - ref, - }) - ); - } - - if ( - flavour === 'affine:embed-linked-doc' || - flavour === 'affine:embed-synced-doc' - ) { - const pageId = block.get('prop:pageId'); - if (typeof pageId === 'string') { - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - ref: pageId, - }) - ); - } - } - - if (flavour === 'affine:attachment' || flavour === 'affine:image') { - const blobId = block.get('prop:sourceId'); - if (typeof blobId === 'string') { - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - blob: [blobId], - }) - ); - } - } - - if (flavour === 'affine:surface') { - const texts = []; - - const elementsObj = block.get('prop:elements'); - if ( - !( - elementsObj instanceof YMap && - elementsObj.get('type') === '$blocksuite:internal:native$' - ) - ) { - continue; - } - const elements = elementsObj.get('value') as YMap; - if (!(elements instanceof YMap)) { - continue; - } - - for (const element of elements.values()) { - if (!(element instanceof YMap)) { - continue; - } - const text = element.get('text') as YText; - if (!text) { - continue; - } - - texts.push(text.toString()); - } - - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: texts, - }) - ); - } - - if (flavour === 'affine:database') { - const texts = []; - const columnsObj = block.get('prop:columns'); - if (!(columnsObj instanceof YArray)) { - continue; - } - for (const column of columnsObj) { - if (!(column instanceof YMap)) { - continue; - } - if (typeof column.get('name') === 'string') { - texts.push(column.get('name')); - } - - const data = column.get('data'); - if (!(data instanceof YMap)) { - continue; - } - const options = data.get('options'); - if (!(options instanceof YArray)) { - continue; - } - for (const option of options) { - if (!(option instanceof YMap)) { - continue; - } - const value = option.get('value'); - if (typeof value === 'string') { - texts.push(value); - } - } - } - - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: texts, - }) - ); - } - } - - const docIndexWriter = await this.docIndex.write(); - docIndexWriter.put( - Document.from(docId, { - title: docTitle, - }) - ); - await docIndexWriter.commit(); - - const blockIndexWriter = await this.blockIndex.write(); - const oldBlocks = await blockIndexWriter.search( - { - type: 'match', - field: 'docId', - match: docId, - }, - { - pagination: { - limit: Number.MAX_SAFE_INTEGER, - }, - } - ); - for (const block of oldBlocks.nodes) { - blockIndexWriter.delete(block.id); - } - for (const block of blockDocuments) { - blockIndexWriter.insert(block); - } - await blockIndexWriter.commit(); + async ensureWorker(signal: AbortSignal): Promise { + if (!this.worker) { + this.worker = await createWorker(signal); } + return this.worker; } - async crawlingRootDocData() { - const buffer = await this.workspaceEngine.doc.storage.loadDocFromLocal( - this.workspaceId - ); - if (!buffer) { - return; - } - - const ydoc = new YDoc(); - - applyUpdate(ydoc, buffer); - - const docs = ydoc.getMap('meta').get('pages') as - | YArray> - | undefined; - - if (!docs) { - return; - } - - const availableDocs = []; - - for (const page of docs) { - const docId = page.get('id'); - - if (typeof docId !== 'string') { - continue; - } - - const inTrash = page.get('trash') ?? false; - - if (!inTrash) { - availableDocs.push(docId); - } - } - - // a hack to get all docs in index - const allIndexedDocs = ( - await this.docIndex.search( - { - type: 'all', - }, - { - pagination: { - limit: Number.MAX_SAFE_INTEGER, - skip: 0, - }, - } - ) - ).nodes.map(n => n.id); - - const needDelete = difference(allIndexedDocs, availableDocs); - const needAdd = difference(availableDocs, allIndexedDocs); - - await this.jobQueue.enqueue( - [...needAdd, ...needDelete].map(docId => ({ - batchKey: docId, - payload: { docId }, - })) - ); + override dispose(): void { + this.runner.stop(); + } +} + +function normalizeDocId(raw: string) { + enum DocVariant { + Workspace = 'workspace', + Page = 'page', + Space = 'space', + Settings = 'settings', + Unknown = 'unknown', + } + + try { + if (!raw.length) { + throw new Error('Invalid Empty Doc ID'); + } + + let parts = raw.split(':'); + + if (parts.length > 3) { + // special adapt case `wsId:space:page:pageId` + if (parts[1] === DocVariant.Space && parts[2] === DocVariant.Page) { + parts = [parts[0], DocVariant.Space, parts[3]]; + } else { + throw new Error(`Invalid format of Doc ID: ${raw}`); + } + } else if (parts.length === 2) { + // `${variant}:${guid}` + throw new Error('not supported'); + } else if (parts.length === 1) { + // ${ws} or ${pageId} + parts = ['', DocVariant.Unknown, parts[0]]; + } + + const docId = parts.at(2); + + if (!docId) { + throw new Error('ID is required'); + } + + return docId; + } catch (err) { + logger.error('Error on normalize docId ' + raw, err); + return raw; } } diff --git a/packages/frontend/core/src/modules/docs-search/services/docs-search.ts b/packages/frontend/core/src/modules/docs-search/services/docs-search.ts index 8e10fac808..0a5f7e42a9 100644 --- a/packages/frontend/core/src/modules/docs-search/services/docs-search.ts +++ b/packages/frontend/core/src/modules/docs-search/services/docs-search.ts @@ -419,4 +419,8 @@ export class DocsSearchService extends Service { const title = doc?.get('title'); return typeof title === 'string' ? title : title?.[0]; } + + override dispose(): void { + this.indexer.dispose(); + } } diff --git a/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts b/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts new file mode 100644 index 0000000000..cdb5c1ad60 --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts @@ -0,0 +1,313 @@ +import type { AffineTextAttributes } from '@blocksuite/blocks'; +import type { DeltaInsert } from '@blocksuite/inline'; +import { Document } from '@toeverything/infra'; +import { difference } from 'lodash-es'; +import { + applyUpdate, + Array as YArray, + Doc as YDoc, + Map as YMap, + type Text as YText, +} from 'yjs'; + +import type { BlockIndexSchema, docIndexSchema } from '../schema'; +import type { + WorkerIngoingMessage, + WorkerInput, + WorkerOutgoingMessage, + WorkerOutput, +} from './types'; + +function crawlingDocData({ + docBuffer, + docId, + rootDocBuffer, +}: WorkerInput & { type: 'doc' }): WorkerOutput { + const yRootDoc = new YDoc(); + applyUpdate(yRootDoc, rootDocBuffer); + + const ydoc = new YDoc(); + + applyUpdate(ydoc, docBuffer); + + let docExists: boolean | null = null; + + ( + yRootDoc.getMap('meta').get('pages') as YArray> | undefined + )?.forEach(page => { + if (page.get('id') === docId) { + docExists = !(page.get('trash') ?? false); + } + }); + + if (!docExists) { + return { + deletedDoc: [docId], + }; + } else { + const blocks = ydoc.getMap('blocks'); + + if (blocks.size === 0) { + return {}; + } + + let docTitle = ''; + + const blockDocuments: Document[] = []; + + for (const block of blocks.values()) { + const flavour = block.get('sys:flavour')?.toString(); + const blockId = block.get('sys:id')?.toString(); + + if (!flavour || !blockId) { + continue; + } + + if (flavour === 'affine:page') { + docTitle = block.get('prop:title').toString(); + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: docTitle, + }) + ); + } + + if ( + flavour === 'affine:paragraph' || + flavour === 'affine:list' || + flavour === 'affine:code' + ) { + const text = block.get('prop:text') as YText; + if (!text) { + continue; + } + + const deltas: DeltaInsert[] = text.toDelta(); + const ref = deltas + .map(delta => { + if ( + delta.attributes && + delta.attributes.reference && + delta.attributes.reference.pageId + ) { + return delta.attributes.reference.pageId; + } + return null; + }) + .filter((link): link is string => !!link); + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: text.toString(), + ref, + }) + ); + } + + if ( + flavour === 'affine:embed-linked-doc' || + flavour === 'affine:embed-synced-doc' + ) { + const pageId = block.get('prop:pageId'); + if (typeof pageId === 'string') { + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + ref: pageId, + }) + ); + } + } + + if (flavour === 'affine:attachment' || flavour === 'affine:image') { + const blobId = block.get('prop:sourceId'); + if (typeof blobId === 'string') { + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + blob: [blobId], + }) + ); + } + } + + if (flavour === 'affine:surface') { + const texts = []; + + const elementsObj = block.get('prop:elements'); + if ( + !( + elementsObj instanceof YMap && + elementsObj.get('type') === '$blocksuite:internal:native$' + ) + ) { + continue; + } + const elements = elementsObj.get('value') as YMap; + if (!(elements instanceof YMap)) { + continue; + } + + for (const element of elements.values()) { + if (!(element instanceof YMap)) { + continue; + } + const text = element.get('text') as YText; + if (!text) { + continue; + } + + texts.push(text.toString()); + } + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: texts, + }) + ); + } + + if (flavour === 'affine:database') { + const texts = []; + const columnsObj = block.get('prop:columns'); + if (!(columnsObj instanceof YArray)) { + continue; + } + for (const column of columnsObj) { + if (!(column instanceof YMap)) { + continue; + } + if (typeof column.get('name') === 'string') { + texts.push(column.get('name')); + } + + const data = column.get('data'); + if (!(data instanceof YMap)) { + continue; + } + const options = data.get('options'); + if (!(options instanceof YArray)) { + continue; + } + for (const option of options) { + if (!(option instanceof YMap)) { + continue; + } + const value = option.get('value'); + if (typeof value === 'string') { + texts.push(value); + } + } + } + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: texts, + }) + ); + } + } + + return { + addedDoc: [ + { + id: docId, + doc: Document.from(docId, { + title: docTitle, + }), + blocks: blockDocuments, + }, + ], + }; + } +} + +function crawlingRootDocData({ + allIndexedDocs, + rootDocBuffer, +}: WorkerInput & { + type: 'rootDoc'; +}): WorkerOutput { + const ydoc = new YDoc(); + + applyUpdate(ydoc, rootDocBuffer); + + const docs = ydoc.getMap('meta').get('pages') as + | YArray> + | undefined; + + if (!docs) { + return {}; + } + + const availableDocs = []; + + for (const page of docs) { + const docId = page.get('id'); + + if (typeof docId !== 'string') { + continue; + } + + const inTrash = page.get('trash') ?? false; + + if (!inTrash) { + availableDocs.push(docId); + } + } + + const needDelete = difference(allIndexedDocs, availableDocs); + const needAdd = difference(availableDocs, allIndexedDocs); + + return { + reindexDoc: [...needAdd, ...needDelete].map(docId => ({ + docId, + storageDocId: ydoc.getMap('spaces').get(docId)?.guid ?? docId, + })), + }; +} + +globalThis.onmessage = (event: MessageEvent) => { + const message = event.data; + if (message.type === 'init') { + postMessage({ type: 'init', msgId: message.msgId }); + return; + } + if (message.type === 'run') { + const { input } = message; + try { + let data; + if (input.type === 'rootDoc') { + data = crawlingRootDocData(input); + } else { + data = crawlingDocData(input); + } + + postMessage({ type: 'done', msgId: message.msgId, output: data }); + } catch (error) { + postMessage({ + type: 'failed', + msgId: message.msgId, + error: error instanceof Error ? error.message : error + '', + }); + } + } +}; + +declare function postMessage(message: WorkerOutgoingMessage): void; diff --git a/packages/frontend/core/src/modules/docs-search/worker/out-worker.ts b/packages/frontend/core/src/modules/docs-search/worker/out-worker.ts new file mode 100644 index 0000000000..a154395f3c --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/worker/out-worker.ts @@ -0,0 +1,98 @@ +import { DebugLogger } from '@affine/debug'; +import { MANUALLY_STOP, throwIfAborted } from '@toeverything/infra'; + +import type { + WorkerIngoingMessage, + WorkerInput, + WorkerOutgoingMessage, + WorkerOutput, +} from './types'; + +const logger = new DebugLogger('affine:indexer-worker'); + +export async function createWorker(abort: AbortSignal) { + let worker: Worker | null = null; + while (throwIfAborted(abort)) { + try { + worker = await new Promise((resolve, reject) => { + const worker = new Worker(new URL('./in-worker.ts', import.meta.url)); + worker.addEventListener('error', reject); + worker.addEventListener('message', event => { + if (event.data.type === 'init') { + resolve(worker); + } + }); + worker.postMessage({ type: 'init', msgId: 0 } as WorkerIngoingMessage); + setTimeout(() => { + reject('timeout'); + }, 1000 * 30 /* 30 sec */); + }); + } catch (err) { + logger.debug( + `Indexer worker init failed, ${err}, will retry in 5 seconds.` + ); + await new Promise(resolve => setTimeout(resolve, 5000)); + } + if (worker) { + break; + } + } + + if (!worker) { + // never reach here + throw new Error('Worker is not created'); + } + + const terminateAbort = new AbortController(); + + let msgId = 1; + + return { + run: async (input: WorkerInput) => { + const dispose: (() => void)[] = []; + return new Promise((resolve, reject) => { + const currentMsgId = msgId++; + const msgHandler = (event: MessageEvent) => { + if (event.data.msgId === currentMsgId) { + if (event.data.type === 'done') { + resolve(event.data.output); + } else if (event.data.type === 'failed') { + reject(new Error(event.data.error)); + } else { + reject(new Error('Unknown message type')); + } + } + }; + const abortHandler = (reason: any) => { + reject(reason); + }; + + worker.addEventListener('message', msgHandler); + dispose.push(() => { + worker?.removeEventListener('message', msgHandler); + }); + + terminateAbort.signal.addEventListener('abort', abortHandler); + dispose.push(() => { + terminateAbort.signal.removeEventListener('abort', abortHandler); + }); + + worker.postMessage({ + type: 'run', + msgId: currentMsgId, + input, + } as WorkerIngoingMessage); + }).finally(() => { + for (const d of dispose) { + d(); + } + }); + }, + dispose: () => { + worker.terminate(); + terminateAbort.abort(MANUALLY_STOP); + }, + }; +} + +export type IndexerWorker = Awaited>; diff --git a/packages/frontend/core/src/modules/docs-search/worker/types.ts b/packages/frontend/core/src/modules/docs-search/worker/types.ts new file mode 100644 index 0000000000..d5ff7f10dd --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/worker/types.ts @@ -0,0 +1,50 @@ +import type { Document } from '@toeverything/infra'; + +import type { BlockIndexSchema, DocIndexSchema } from '../schema'; + +export type WorkerIngoingMessage = ( + | { + type: 'init'; + } + | { + type: 'run'; + input: WorkerInput; + } +) & { msgId: number }; + +export type WorkerOutgoingMessage = ( + | { + type: 'init'; + } + | { + type: 'done'; + output: WorkerOutput; + } + | { + type: 'failed'; + error: string; + } +) & { msgId: number }; + +export type WorkerInput = + | { + type: 'rootDoc'; + rootDocBuffer: Uint8Array; + allIndexedDocs: string[]; + } + | { + type: 'doc'; + docId: string; + rootDocBuffer: Uint8Array; + docBuffer: Uint8Array; + }; + +export interface WorkerOutput { + reindexDoc?: { docId: string; storageDocId: string }[]; + addedDoc?: { + id: string; + blocks: Document[]; + doc: Document; + }[]; + deletedDoc?: string[]; +}