diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index 81c23fead7..de0d7c120b 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -2,7 +2,6 @@ "name": "@toeverything/infra", "type": "module", "private": true, - "sideEffects": false, "exports": { "./blocksuite": "./src/blocksuite/index.ts", "./storage": "./src/storage/index.ts", diff --git a/packages/common/infra/src/livedata/ops.ts b/packages/common/infra/src/livedata/ops.ts index 711d5b2526..34e08674c2 100644 --- a/packages/common/infra/src/livedata/ops.ts +++ b/packages/common/infra/src/livedata/ops.ts @@ -18,7 +18,6 @@ import { timer, } from 'rxjs'; -import { MANUALLY_STOP } from '../utils'; import type { LiveData } from './livedata'; /** @@ -108,8 +107,7 @@ export function fromPromise( .catch(error => { subscriber.error(error); }); - - return () => abortController.abort(MANUALLY_STOP); + return () => abortController.abort('Aborted'); }); } diff --git a/packages/common/infra/src/sync/blob/blob.ts b/packages/common/infra/src/sync/blob/blob.ts index 658e9989b9..4bd1615949 100644 --- a/packages/common/infra/src/sync/blob/blob.ts +++ b/packages/common/infra/src/sync/blob/blob.ts @@ -4,7 +4,6 @@ import { difference } from 'lodash-es'; import { LiveData } from '../../livedata'; import type { Memento } from '../../storage'; -import { MANUALLY_STOP } from '../../utils'; import { BlobStorageOverCapacity } from './error'; const logger = new DebugLogger('affine:blob-engine'); @@ -71,7 +70,7 @@ export class BlobEngine { } stop() { - this.abort?.abort(MANUALLY_STOP); + this.abort?.abort(); this.abort = null; } diff --git a/packages/common/infra/src/sync/doc/old-id.md b/packages/common/infra/src/sync/doc/old-id.md deleted file mode 100644 index 87aed16c7e..0000000000 --- a/packages/common/infra/src/sync/doc/old-id.md +++ /dev/null @@ -1,24 +0,0 @@ -AFFiNE currently has a lot of data stored using the old ID format. Here, we record the usage of IDs to avoid forgetting. - -## Old ID Format - -The format is: - -- `{workspace-id}:space:{nanoid}` Common -- `{workspace-id}:space:page:{nanoid}` - -> Note: sometimes the `workspace-id` is not same with current workspace id. - -## Usage - -- Local Storage - - indexeddb: Both new and old IDs coexist - - sqlite: Both new and old IDs coexist - - server-clock: Only new IDs are stored - - sync-metadata: Both new and old IDs coexist -- Server Storage - - Only stores new IDs but accepts writes using old IDs -- Protocols - - When the client submits an update, both new and old IDs are used. - - When the server broadcasts updates sent by other clients, both new and old IDs are used. - - When the server responds to `client-pre-sync` (listing all updated docids), only new IDs are used. diff --git a/packages/common/infra/src/sync/indexer/indexer.ts b/packages/common/infra/src/sync/indexer/indexer.ts index cba453022f..027bd74419 100644 --- a/packages/common/infra/src/sync/indexer/indexer.ts +++ b/packages/common/infra/src/sync/indexer/indexer.ts @@ -20,9 +20,6 @@ export interface IndexWriter delete(id: string): void; - // TODO(@eyhn) - // deleteByQuery(query: Query): void; - commit(): Promise; rollback(): void; diff --git a/packages/common/infra/src/sync/job/impl/indexeddb/index.ts b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts index 3d0799f101..3708bdf311 100644 --- a/packages/common/infra/src/sync/job/impl/indexeddb/index.ts +++ b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts @@ -155,9 +155,6 @@ export class IndexedDBJobQueue implements JobQueue { .objectStore('jobs') .delete(typeof id === 'string' ? parseInt(id) : id); } - - trx.commit(); - this.broadcast.postMessage('job-completed'); } async return(jobs: Job[], retry: boolean = false): Promise { @@ -177,10 +174,6 @@ export class IndexedDBJobQueue implements JobQueue { .delete(typeof id === 'string' ? parseInt(id) : id); } } - - trx.commit(); - - this.broadcast.postMessage('job-completed'); } async clear(): Promise { diff --git a/packages/common/infra/src/sync/job/runner.ts b/packages/common/infra/src/sync/job/runner.ts index 3220e21709..7bf04c62d7 100644 --- a/packages/common/infra/src/sync/job/runner.ts +++ b/packages/common/infra/src/sync/job/runner.ts @@ -29,7 +29,7 @@ export class JobRunner { } stop() { - this.abort?.abort(MANUALLY_STOP); + this.abort?.abort(); this.abort = null; } diff --git a/packages/frontend/core/src/modules/cloud/services/fetch.ts b/packages/frontend/core/src/modules/cloud/services/fetch.ts index 8fc8f781b4..988bf97260 100644 --- a/packages/frontend/core/src/modules/cloud/services/fetch.ts +++ b/packages/frontend/core/src/modules/cloud/services/fetch.ts @@ -41,8 +41,8 @@ export class FetchService extends Service { throw externalSignal.reason; } const abortController = new AbortController(); - externalSignal?.addEventListener('abort', reason => { - abortController.abort(reason); + externalSignal?.addEventListener('abort', () => { + abortController.abort(); }); const timeout = init?.timeout ?? 15000; diff --git a/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts index 5c02227c28..58bc596b98 100644 --- a/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts +++ b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts @@ -1,32 +1,32 @@ import { DebugLogger } from '@affine/debug'; +import type { AffineTextAttributes } from '@blocksuite/blocks'; +import type { DeltaInsert } from '@blocksuite/inline'; import type { Job, JobQueue, WorkspaceService } from '@toeverything/infra'; import { + Document, Entity, IndexedDBIndexStorage, IndexedDBJobQueue, JobRunner, LiveData, } from '@toeverything/infra'; +import { difference } from 'lodash-es'; import { map } from 'rxjs'; +import { Array as YArray, Map as YMap, type Text as YText } from 'yjs'; +import { applyUpdate, Doc as YDoc } from 'yjs'; -import { blockIndexSchema, docIndexSchema } from '../schema'; -import { createWorker, type IndexerWorker } from '../worker/out-worker'; - -export function isEmptyUpdate(binary: Uint8Array) { - return ( - binary.byteLength === 0 || - (binary.byteLength === 2 && binary[0] === 0 && binary[1] === 0) - ); -} +import { + type BlockIndexSchema, + blockIndexSchema, + docIndexSchema, +} from '../schema'; const logger = new DebugLogger('crawler'); interface IndexerJobPayload { docId: string; - storageDocId: string; } -// TODO(@eyhn): simplify this, it's too complex export class DocsIndexer extends Entity { private readonly jobQueue: JobQueue = new IndexedDBJobQueue( @@ -51,8 +51,6 @@ export class DocsIndexer extends Entity { private readonly workspaceId = this.workspaceService.workspace.id; - private worker: IndexerWorker | null = null; - readonly status$ = LiveData.from<{ remaining?: number }>( this.jobQueue.status$.pipe( map(status => ({ @@ -69,13 +67,13 @@ export class DocsIndexer extends Entity { setupListener() { this.workspaceEngine.doc.storage.eventBus.on(event => { if (event.clientId === this.workspaceEngine.doc.clientId) { - const docId = normalizeDocId(event.docId); + const docId = event.docId; this.jobQueue .enqueue([ { batchKey: docId, - payload: { docId, storageDocId: event.docId }, + payload: { docId }, }, ]) .catch(err => { @@ -85,140 +83,23 @@ export class DocsIndexer extends Entity { }); } - async execJob(jobs: Job[], signal: AbortSignal) { + async execJob(jobs: Job[], _signal: AbortSignal) { if (jobs.length === 0) { return; } // jobs should have the same docId, so we just pick the first one const docId = jobs[0].payload.docId; - const storageDocId = jobs[0].payload.storageDocId; - - const worker = await this.ensureWorker(signal); const startTime = performance.now(); logger.debug('Start crawling job for docId:', docId); - let workerOutput; - - if (docId === this.workspaceId) { - const rootDocBuffer = - await this.workspaceEngine.doc.storage.loadDocFromLocal( - this.workspaceId - ); - - if (!rootDocBuffer) { - return; + if (docId) { + if (docId === this.workspaceId) { + await this.crawlingRootDocData(); + } else { + await this.crawlingDocData(docId); } - - const allIndexedDocs = ( - await this.docIndex.search( - { - type: 'all', - }, - { - pagination: { - limit: Number.MAX_SAFE_INTEGER, - skip: 0, - }, - } - ) - ).nodes.map(n => n.id); - - workerOutput = await worker.run({ - type: 'rootDoc', - allIndexedDocs, - rootDocBuffer, - }); - } else { - const rootDocBuffer = - await this.workspaceEngine.doc.storage.loadDocFromLocal( - this.workspaceId - ); - - const docBuffer = - (await this.workspaceEngine.doc.storage.loadDocFromLocal( - storageDocId - )) ?? new Uint8Array(0); - - if (!rootDocBuffer) { - return; - } - - workerOutput = await worker.run({ - type: 'doc', - docBuffer, - docId, - rootDocBuffer, - }); - } - - if (workerOutput.deletedDoc || workerOutput.addedDoc) { - if (workerOutput.deletedDoc) { - const docIndexWriter = await this.docIndex.write(); - for (const docId of workerOutput.deletedDoc) { - docIndexWriter.delete(docId); - } - await docIndexWriter.commit(); - const blockIndexWriter = await this.blockIndex.write(); - for (const docId of workerOutput.deletedDoc) { - const oldBlocks = await blockIndexWriter.search( - { - type: 'match', - field: 'docId', - match: docId, - }, - { - pagination: { - limit: Number.MAX_SAFE_INTEGER, - }, - } - ); - for (const block of oldBlocks.nodes) { - docIndexWriter.delete(block.id); - } - } - await blockIndexWriter.commit(); - } - if (workerOutput.addedDoc) { - const docIndexWriter = await this.docIndex.write(); - for (const { doc } of workerOutput.addedDoc) { - docIndexWriter.put(doc); - } - await docIndexWriter.commit(); - const blockIndexWriter = await this.blockIndex.write(); - for (const { blocks } of workerOutput.addedDoc) { - // delete old blocks - const oldBlocks = await blockIndexWriter.search( - { - type: 'match', - field: 'docId', - match: docId, - }, - { - pagination: { - limit: Number.MAX_SAFE_INTEGER, - }, - } - ); - for (const block of oldBlocks.nodes) { - blockIndexWriter.delete(block.id); - } - for (const block of blocks) { - blockIndexWriter.insert(block); - } - } - await blockIndexWriter.commit(); - } - } - - if (workerOutput.reindexDoc) { - await this.jobQueue.enqueue( - workerOutput.reindexDoc.map(({ docId, storageDocId }) => ({ - batchKey: docId, - payload: { docId, storageDocId }, - })) - ); } const duration = performance.now() - startTime; @@ -233,7 +114,7 @@ export class DocsIndexer extends Entity { .enqueue([ { batchKey: this.workspaceId, - payload: { docId: this.workspaceId, storageDocId: this.workspaceId }, + payload: { docId: this.workspaceId }, }, ]) .catch(err => { @@ -241,58 +122,341 @@ export class DocsIndexer extends Entity { }); } - async ensureWorker(signal: AbortSignal): Promise { - if (!this.worker) { - this.worker = await createWorker(signal); - } - return this.worker; - } + async crawlingDocData(docId: string) { + const rootDocBuffer = + await this.workspaceEngine.doc.storage.loadDocFromLocal(this.workspaceId); - override dispose(): void { - this.runner.stop(); - } -} - -function normalizeDocId(raw: string) { - enum DocVariant { - Workspace = 'workspace', - Page = 'page', - Space = 'space', - Settings = 'settings', - Unknown = 'unknown', - } - - try { - if (!raw.length) { - throw new Error('Invalid Empty Doc ID'); + if (!rootDocBuffer) { + return; } - let parts = raw.split(':'); + const yRootDoc = new YDoc(); + applyUpdate(yRootDoc, rootDocBuffer); - if (parts.length > 3) { - // special adapt case `wsId:space:page:pageId` - if (parts[1] === DocVariant.Space && parts[2] === DocVariant.Page) { - parts = [parts[0], DocVariant.Space, parts[3]]; - } else { - throw new Error(`Invalid format of Doc ID: ${raw}`); + const docStoragePossibleIds = Array.from(yRootDoc.getSubdocs()) + .map(doc => doc.guid) + .filter(id => id.endsWith(docId)); + + let docBuffer; + + for (const id of docStoragePossibleIds) { + docBuffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(id); + + if (docBuffer) { + break; } - } else if (parts.length === 2) { - // `${variant}:${guid}` - throw new Error('not supported'); - } else if (parts.length === 1) { - // ${ws} or ${pageId} - parts = ['', DocVariant.Unknown, parts[0]]; } - const docId = parts.at(2); - - if (!docId) { - throw new Error('ID is required'); + if (!docBuffer) { + return; } - return docId; - } catch (err) { - logger.error('Error on normalize docId ' + raw, err); - return raw; + const ydoc = new YDoc(); + + applyUpdate(ydoc, docBuffer); + + let docExists: boolean | null = null; + + ( + yRootDoc.getMap('meta').get('pages') as YArray> | undefined + )?.forEach(page => { + if (page.get('id') === docId) { + docExists = !(page.get('trash') ?? false); + } + }); + + if (!docExists) { + const indexWriter = await this.docIndex.write(); + indexWriter.delete(docId); + await indexWriter.commit(); + + const blockIndexWriter = await this.blockIndex.write(); + const oldBlocks = await blockIndexWriter.search( + { + type: 'match', + field: 'docId', + match: docId, + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + }, + } + ); + for (const block of oldBlocks.nodes) { + blockIndexWriter.delete(block.id); + } + await blockIndexWriter.commit(); + } else { + const blocks = ydoc.getMap('blocks'); + + if (blocks.size === 0) { + return; + } + + let docTitle = ''; + + const blockDocuments: Document[] = []; + + for (const block of blocks.values()) { + const flavour = block.get('sys:flavour')?.toString(); + const blockId = block.get('sys:id')?.toString(); + + if (!flavour || !blockId) { + continue; + } + + if (flavour === 'affine:page') { + docTitle = block.get('prop:title').toString(); + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: docTitle, + }) + ); + } + + if ( + flavour === 'affine:paragraph' || + flavour === 'affine:list' || + flavour === 'affine:code' + ) { + const text = block.get('prop:text') as YText; + if (!text) { + continue; + } + + const deltas: DeltaInsert[] = text.toDelta(); + const ref = deltas + .map(delta => { + if ( + delta.attributes && + delta.attributes.reference && + delta.attributes.reference.pageId + ) { + return delta.attributes.reference.pageId; + } + return null; + }) + .filter((link): link is string => !!link); + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: text.toString(), + ref, + }) + ); + } + + if ( + flavour === 'affine:embed-linked-doc' || + flavour === 'affine:embed-synced-doc' + ) { + const pageId = block.get('prop:pageId'); + if (typeof pageId === 'string') { + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + ref: pageId, + }) + ); + } + } + + if (flavour === 'affine:attachment' || flavour === 'affine:image') { + const blobId = block.get('prop:sourceId'); + if (typeof blobId === 'string') { + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + blob: [blobId], + }) + ); + } + } + + if (flavour === 'affine:surface') { + const texts = []; + + const elementsObj = block.get('prop:elements'); + if ( + !( + elementsObj instanceof YMap && + elementsObj.get('type') === '$blocksuite:internal:native$' + ) + ) { + continue; + } + const elements = elementsObj.get('value') as YMap; + if (!(elements instanceof YMap)) { + continue; + } + + for (const element of elements.values()) { + if (!(element instanceof YMap)) { + continue; + } + const text = element.get('text') as YText; + if (!text) { + continue; + } + + texts.push(text.toString()); + } + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: texts, + }) + ); + } + + if (flavour === 'affine:database') { + const texts = []; + const columnsObj = block.get('prop:columns'); + if (!(columnsObj instanceof YArray)) { + continue; + } + for (const column of columnsObj) { + if (!(column instanceof YMap)) { + continue; + } + if (typeof column.get('name') === 'string') { + texts.push(column.get('name')); + } + + const data = column.get('data'); + if (!(data instanceof YMap)) { + continue; + } + const options = data.get('options'); + if (!(options instanceof YArray)) { + continue; + } + for (const option of options) { + if (!(option instanceof YMap)) { + continue; + } + const value = option.get('value'); + if (typeof value === 'string') { + texts.push(value); + } + } + } + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: texts, + }) + ); + } + } + + const docIndexWriter = await this.docIndex.write(); + docIndexWriter.put( + Document.from(docId, { + title: docTitle, + }) + ); + await docIndexWriter.commit(); + + const blockIndexWriter = await this.blockIndex.write(); + const oldBlocks = await blockIndexWriter.search( + { + type: 'match', + field: 'docId', + match: docId, + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + }, + } + ); + for (const block of oldBlocks.nodes) { + blockIndexWriter.delete(block.id); + } + for (const block of blockDocuments) { + blockIndexWriter.insert(block); + } + await blockIndexWriter.commit(); + } + } + + async crawlingRootDocData() { + const buffer = await this.workspaceEngine.doc.storage.loadDocFromLocal( + this.workspaceId + ); + if (!buffer) { + return; + } + + const ydoc = new YDoc(); + + applyUpdate(ydoc, buffer); + + const docs = ydoc.getMap('meta').get('pages') as + | YArray> + | undefined; + + if (!docs) { + return; + } + + const availableDocs = []; + + for (const page of docs) { + const docId = page.get('id'); + + if (typeof docId !== 'string') { + continue; + } + + const inTrash = page.get('trash') ?? false; + + if (!inTrash) { + availableDocs.push(docId); + } + } + + // a hack to get all docs in index + const allIndexedDocs = ( + await this.docIndex.search( + { + type: 'all', + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + skip: 0, + }, + } + ) + ).nodes.map(n => n.id); + + const needDelete = difference(allIndexedDocs, availableDocs); + const needAdd = difference(availableDocs, allIndexedDocs); + + await this.jobQueue.enqueue( + [...needAdd, ...needDelete].map(docId => ({ + batchKey: docId, + payload: { docId }, + })) + ); } } diff --git a/packages/frontend/core/src/modules/docs-search/services/docs-search.ts b/packages/frontend/core/src/modules/docs-search/services/docs-search.ts index 0a5f7e42a9..8e10fac808 100644 --- a/packages/frontend/core/src/modules/docs-search/services/docs-search.ts +++ b/packages/frontend/core/src/modules/docs-search/services/docs-search.ts @@ -419,8 +419,4 @@ export class DocsSearchService extends Service { const title = doc?.get('title'); return typeof title === 'string' ? title : title?.[0]; } - - override dispose(): void { - this.indexer.dispose(); - } } diff --git a/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts b/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts deleted file mode 100644 index cdb5c1ad60..0000000000 --- a/packages/frontend/core/src/modules/docs-search/worker/in-worker.ts +++ /dev/null @@ -1,313 +0,0 @@ -import type { AffineTextAttributes } from '@blocksuite/blocks'; -import type { DeltaInsert } from '@blocksuite/inline'; -import { Document } from '@toeverything/infra'; -import { difference } from 'lodash-es'; -import { - applyUpdate, - Array as YArray, - Doc as YDoc, - Map as YMap, - type Text as YText, -} from 'yjs'; - -import type { BlockIndexSchema, docIndexSchema } from '../schema'; -import type { - WorkerIngoingMessage, - WorkerInput, - WorkerOutgoingMessage, - WorkerOutput, -} from './types'; - -function crawlingDocData({ - docBuffer, - docId, - rootDocBuffer, -}: WorkerInput & { type: 'doc' }): WorkerOutput { - const yRootDoc = new YDoc(); - applyUpdate(yRootDoc, rootDocBuffer); - - const ydoc = new YDoc(); - - applyUpdate(ydoc, docBuffer); - - let docExists: boolean | null = null; - - ( - yRootDoc.getMap('meta').get('pages') as YArray> | undefined - )?.forEach(page => { - if (page.get('id') === docId) { - docExists = !(page.get('trash') ?? false); - } - }); - - if (!docExists) { - return { - deletedDoc: [docId], - }; - } else { - const blocks = ydoc.getMap('blocks'); - - if (blocks.size === 0) { - return {}; - } - - let docTitle = ''; - - const blockDocuments: Document[] = []; - - for (const block of blocks.values()) { - const flavour = block.get('sys:flavour')?.toString(); - const blockId = block.get('sys:id')?.toString(); - - if (!flavour || !blockId) { - continue; - } - - if (flavour === 'affine:page') { - docTitle = block.get('prop:title').toString(); - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: docTitle, - }) - ); - } - - if ( - flavour === 'affine:paragraph' || - flavour === 'affine:list' || - flavour === 'affine:code' - ) { - const text = block.get('prop:text') as YText; - if (!text) { - continue; - } - - const deltas: DeltaInsert[] = text.toDelta(); - const ref = deltas - .map(delta => { - if ( - delta.attributes && - delta.attributes.reference && - delta.attributes.reference.pageId - ) { - return delta.attributes.reference.pageId; - } - return null; - }) - .filter((link): link is string => !!link); - - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: text.toString(), - ref, - }) - ); - } - - if ( - flavour === 'affine:embed-linked-doc' || - flavour === 'affine:embed-synced-doc' - ) { - const pageId = block.get('prop:pageId'); - if (typeof pageId === 'string') { - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - ref: pageId, - }) - ); - } - } - - if (flavour === 'affine:attachment' || flavour === 'affine:image') { - const blobId = block.get('prop:sourceId'); - if (typeof blobId === 'string') { - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - blob: [blobId], - }) - ); - } - } - - if (flavour === 'affine:surface') { - const texts = []; - - const elementsObj = block.get('prop:elements'); - if ( - !( - elementsObj instanceof YMap && - elementsObj.get('type') === '$blocksuite:internal:native$' - ) - ) { - continue; - } - const elements = elementsObj.get('value') as YMap; - if (!(elements instanceof YMap)) { - continue; - } - - for (const element of elements.values()) { - if (!(element instanceof YMap)) { - continue; - } - const text = element.get('text') as YText; - if (!text) { - continue; - } - - texts.push(text.toString()); - } - - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: texts, - }) - ); - } - - if (flavour === 'affine:database') { - const texts = []; - const columnsObj = block.get('prop:columns'); - if (!(columnsObj instanceof YArray)) { - continue; - } - for (const column of columnsObj) { - if (!(column instanceof YMap)) { - continue; - } - if (typeof column.get('name') === 'string') { - texts.push(column.get('name')); - } - - const data = column.get('data'); - if (!(data instanceof YMap)) { - continue; - } - const options = data.get('options'); - if (!(options instanceof YArray)) { - continue; - } - for (const option of options) { - if (!(option instanceof YMap)) { - continue; - } - const value = option.get('value'); - if (typeof value === 'string') { - texts.push(value); - } - } - } - - blockDocuments.push( - Document.from(`${docId}:${blockId}`, { - docId, - flavour, - blockId, - content: texts, - }) - ); - } - } - - return { - addedDoc: [ - { - id: docId, - doc: Document.from(docId, { - title: docTitle, - }), - blocks: blockDocuments, - }, - ], - }; - } -} - -function crawlingRootDocData({ - allIndexedDocs, - rootDocBuffer, -}: WorkerInput & { - type: 'rootDoc'; -}): WorkerOutput { - const ydoc = new YDoc(); - - applyUpdate(ydoc, rootDocBuffer); - - const docs = ydoc.getMap('meta').get('pages') as - | YArray> - | undefined; - - if (!docs) { - return {}; - } - - const availableDocs = []; - - for (const page of docs) { - const docId = page.get('id'); - - if (typeof docId !== 'string') { - continue; - } - - const inTrash = page.get('trash') ?? false; - - if (!inTrash) { - availableDocs.push(docId); - } - } - - const needDelete = difference(allIndexedDocs, availableDocs); - const needAdd = difference(availableDocs, allIndexedDocs); - - return { - reindexDoc: [...needAdd, ...needDelete].map(docId => ({ - docId, - storageDocId: ydoc.getMap('spaces').get(docId)?.guid ?? docId, - })), - }; -} - -globalThis.onmessage = (event: MessageEvent) => { - const message = event.data; - if (message.type === 'init') { - postMessage({ type: 'init', msgId: message.msgId }); - return; - } - if (message.type === 'run') { - const { input } = message; - try { - let data; - if (input.type === 'rootDoc') { - data = crawlingRootDocData(input); - } else { - data = crawlingDocData(input); - } - - postMessage({ type: 'done', msgId: message.msgId, output: data }); - } catch (error) { - postMessage({ - type: 'failed', - msgId: message.msgId, - error: error instanceof Error ? error.message : error + '', - }); - } - } -}; - -declare function postMessage(message: WorkerOutgoingMessage): void; diff --git a/packages/frontend/core/src/modules/docs-search/worker/out-worker.ts b/packages/frontend/core/src/modules/docs-search/worker/out-worker.ts deleted file mode 100644 index a154395f3c..0000000000 --- a/packages/frontend/core/src/modules/docs-search/worker/out-worker.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { DebugLogger } from '@affine/debug'; -import { MANUALLY_STOP, throwIfAborted } from '@toeverything/infra'; - -import type { - WorkerIngoingMessage, - WorkerInput, - WorkerOutgoingMessage, - WorkerOutput, -} from './types'; - -const logger = new DebugLogger('affine:indexer-worker'); - -export async function createWorker(abort: AbortSignal) { - let worker: Worker | null = null; - while (throwIfAborted(abort)) { - try { - worker = await new Promise((resolve, reject) => { - const worker = new Worker(new URL('./in-worker.ts', import.meta.url)); - worker.addEventListener('error', reject); - worker.addEventListener('message', event => { - if (event.data.type === 'init') { - resolve(worker); - } - }); - worker.postMessage({ type: 'init', msgId: 0 } as WorkerIngoingMessage); - setTimeout(() => { - reject('timeout'); - }, 1000 * 30 /* 30 sec */); - }); - } catch (err) { - logger.debug( - `Indexer worker init failed, ${err}, will retry in 5 seconds.` - ); - await new Promise(resolve => setTimeout(resolve, 5000)); - } - if (worker) { - break; - } - } - - if (!worker) { - // never reach here - throw new Error('Worker is not created'); - } - - const terminateAbort = new AbortController(); - - let msgId = 1; - - return { - run: async (input: WorkerInput) => { - const dispose: (() => void)[] = []; - return new Promise((resolve, reject) => { - const currentMsgId = msgId++; - const msgHandler = (event: MessageEvent) => { - if (event.data.msgId === currentMsgId) { - if (event.data.type === 'done') { - resolve(event.data.output); - } else if (event.data.type === 'failed') { - reject(new Error(event.data.error)); - } else { - reject(new Error('Unknown message type')); - } - } - }; - const abortHandler = (reason: any) => { - reject(reason); - }; - - worker.addEventListener('message', msgHandler); - dispose.push(() => { - worker?.removeEventListener('message', msgHandler); - }); - - terminateAbort.signal.addEventListener('abort', abortHandler); - dispose.push(() => { - terminateAbort.signal.removeEventListener('abort', abortHandler); - }); - - worker.postMessage({ - type: 'run', - msgId: currentMsgId, - input, - } as WorkerIngoingMessage); - }).finally(() => { - for (const d of dispose) { - d(); - } - }); - }, - dispose: () => { - worker.terminate(); - terminateAbort.abort(MANUALLY_STOP); - }, - }; -} - -export type IndexerWorker = Awaited>; diff --git a/packages/frontend/core/src/modules/docs-search/worker/types.ts b/packages/frontend/core/src/modules/docs-search/worker/types.ts deleted file mode 100644 index d5ff7f10dd..0000000000 --- a/packages/frontend/core/src/modules/docs-search/worker/types.ts +++ /dev/null @@ -1,50 +0,0 @@ -import type { Document } from '@toeverything/infra'; - -import type { BlockIndexSchema, DocIndexSchema } from '../schema'; - -export type WorkerIngoingMessage = ( - | { - type: 'init'; - } - | { - type: 'run'; - input: WorkerInput; - } -) & { msgId: number }; - -export type WorkerOutgoingMessage = ( - | { - type: 'init'; - } - | { - type: 'done'; - output: WorkerOutput; - } - | { - type: 'failed'; - error: string; - } -) & { msgId: number }; - -export type WorkerInput = - | { - type: 'rootDoc'; - rootDocBuffer: Uint8Array; - allIndexedDocs: string[]; - } - | { - type: 'doc'; - docId: string; - rootDocBuffer: Uint8Array; - docBuffer: Uint8Array; - }; - -export interface WorkerOutput { - reindexDoc?: { docId: string; storageDocId: string }[]; - addedDoc?: { - id: string; - blocks: Document[]; - doc: Document; - }[]; - deletedDoc?: string[]; -}