mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-27 02:42:25 +08:00
Revert "feat(core): run indexer in worker (#7295)"
This reverts commit c62d79ab14.
This commit is contained in:
@@ -2,7 +2,6 @@
|
|||||||
"name": "@toeverything/infra",
|
"name": "@toeverything/infra",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"private": true,
|
"private": true,
|
||||||
"sideEffects": false,
|
|
||||||
"exports": {
|
"exports": {
|
||||||
"./blocksuite": "./src/blocksuite/index.ts",
|
"./blocksuite": "./src/blocksuite/index.ts",
|
||||||
"./storage": "./src/storage/index.ts",
|
"./storage": "./src/storage/index.ts",
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ import {
|
|||||||
timer,
|
timer,
|
||||||
} from 'rxjs';
|
} from 'rxjs';
|
||||||
|
|
||||||
import { MANUALLY_STOP } from '../utils';
|
|
||||||
import type { LiveData } from './livedata';
|
import type { LiveData } from './livedata';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -108,8 +107,7 @@ export function fromPromise<T>(
|
|||||||
.catch(error => {
|
.catch(error => {
|
||||||
subscriber.error(error);
|
subscriber.error(error);
|
||||||
});
|
});
|
||||||
|
return () => abortController.abort('Aborted');
|
||||||
return () => abortController.abort(MANUALLY_STOP);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import { difference } from 'lodash-es';
|
|||||||
|
|
||||||
import { LiveData } from '../../livedata';
|
import { LiveData } from '../../livedata';
|
||||||
import type { Memento } from '../../storage';
|
import type { Memento } from '../../storage';
|
||||||
import { MANUALLY_STOP } from '../../utils';
|
|
||||||
import { BlobStorageOverCapacity } from './error';
|
import { BlobStorageOverCapacity } from './error';
|
||||||
|
|
||||||
const logger = new DebugLogger('affine:blob-engine');
|
const logger = new DebugLogger('affine:blob-engine');
|
||||||
@@ -71,7 +70,7 @@ export class BlobEngine {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
stop() {
|
||||||
this.abort?.abort(MANUALLY_STOP);
|
this.abort?.abort();
|
||||||
this.abort = null;
|
this.abort = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,24 +0,0 @@
|
|||||||
AFFiNE currently has a lot of data stored using the old ID format. Here, we record the usage of IDs to avoid forgetting.
|
|
||||||
|
|
||||||
## Old ID Format
|
|
||||||
|
|
||||||
The format is:
|
|
||||||
|
|
||||||
- `{workspace-id}:space:{nanoid}` Common
|
|
||||||
- `{workspace-id}:space:page:{nanoid}`
|
|
||||||
|
|
||||||
> Note: sometimes the `workspace-id` is not same with current workspace id.
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
- Local Storage
|
|
||||||
- indexeddb: Both new and old IDs coexist
|
|
||||||
- sqlite: Both new and old IDs coexist
|
|
||||||
- server-clock: Only new IDs are stored
|
|
||||||
- sync-metadata: Both new and old IDs coexist
|
|
||||||
- Server Storage
|
|
||||||
- Only stores new IDs but accepts writes using old IDs
|
|
||||||
- Protocols
|
|
||||||
- When the client submits an update, both new and old IDs are used.
|
|
||||||
- When the server broadcasts updates sent by other clients, both new and old IDs are used.
|
|
||||||
- When the server responds to `client-pre-sync` (listing all updated docids), only new IDs are used.
|
|
||||||
@@ -20,9 +20,6 @@ export interface IndexWriter<S extends Schema>
|
|||||||
|
|
||||||
delete(id: string): void;
|
delete(id: string): void;
|
||||||
|
|
||||||
// TODO(@eyhn)
|
|
||||||
// deleteByQuery(query: Query<S>): void;
|
|
||||||
|
|
||||||
commit(): Promise<void>;
|
commit(): Promise<void>;
|
||||||
|
|
||||||
rollback(): void;
|
rollback(): void;
|
||||||
|
|||||||
@@ -155,9 +155,6 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
|
|||||||
.objectStore('jobs')
|
.objectStore('jobs')
|
||||||
.delete(typeof id === 'string' ? parseInt(id) : id);
|
.delete(typeof id === 'string' ? parseInt(id) : id);
|
||||||
}
|
}
|
||||||
|
|
||||||
trx.commit();
|
|
||||||
this.broadcast.postMessage('job-completed');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async return(jobs: Job[], retry: boolean = false): Promise<void> {
|
async return(jobs: Job[], retry: boolean = false): Promise<void> {
|
||||||
@@ -177,10 +174,6 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
|
|||||||
.delete(typeof id === 'string' ? parseInt(id) : id);
|
.delete(typeof id === 'string' ? parseInt(id) : id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trx.commit();
|
|
||||||
|
|
||||||
this.broadcast.postMessage('job-completed');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async clear(): Promise<void> {
|
async clear(): Promise<void> {
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ export class JobRunner<J> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
stop() {
|
||||||
this.abort?.abort(MANUALLY_STOP);
|
this.abort?.abort();
|
||||||
this.abort = null;
|
this.abort = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -41,8 +41,8 @@ export class FetchService extends Service {
|
|||||||
throw externalSignal.reason;
|
throw externalSignal.reason;
|
||||||
}
|
}
|
||||||
const abortController = new AbortController();
|
const abortController = new AbortController();
|
||||||
externalSignal?.addEventListener('abort', reason => {
|
externalSignal?.addEventListener('abort', () => {
|
||||||
abortController.abort(reason);
|
abortController.abort();
|
||||||
});
|
});
|
||||||
|
|
||||||
const timeout = init?.timeout ?? 15000;
|
const timeout = init?.timeout ?? 15000;
|
||||||
|
|||||||
@@ -1,32 +1,32 @@
|
|||||||
import { DebugLogger } from '@affine/debug';
|
import { DebugLogger } from '@affine/debug';
|
||||||
|
import type { AffineTextAttributes } from '@blocksuite/blocks';
|
||||||
|
import type { DeltaInsert } from '@blocksuite/inline';
|
||||||
import type { Job, JobQueue, WorkspaceService } from '@toeverything/infra';
|
import type { Job, JobQueue, WorkspaceService } from '@toeverything/infra';
|
||||||
import {
|
import {
|
||||||
|
Document,
|
||||||
Entity,
|
Entity,
|
||||||
IndexedDBIndexStorage,
|
IndexedDBIndexStorage,
|
||||||
IndexedDBJobQueue,
|
IndexedDBJobQueue,
|
||||||
JobRunner,
|
JobRunner,
|
||||||
LiveData,
|
LiveData,
|
||||||
} from '@toeverything/infra';
|
} from '@toeverything/infra';
|
||||||
|
import { difference } from 'lodash-es';
|
||||||
import { map } from 'rxjs';
|
import { map } from 'rxjs';
|
||||||
|
import { Array as YArray, Map as YMap, type Text as YText } from 'yjs';
|
||||||
|
import { applyUpdate, Doc as YDoc } from 'yjs';
|
||||||
|
|
||||||
import { blockIndexSchema, docIndexSchema } from '../schema';
|
import {
|
||||||
import { createWorker, type IndexerWorker } from '../worker/out-worker';
|
type BlockIndexSchema,
|
||||||
|
blockIndexSchema,
|
||||||
export function isEmptyUpdate(binary: Uint8Array) {
|
docIndexSchema,
|
||||||
return (
|
} from '../schema';
|
||||||
binary.byteLength === 0 ||
|
|
||||||
(binary.byteLength === 2 && binary[0] === 0 && binary[1] === 0)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const logger = new DebugLogger('crawler');
|
const logger = new DebugLogger('crawler');
|
||||||
|
|
||||||
interface IndexerJobPayload {
|
interface IndexerJobPayload {
|
||||||
docId: string;
|
docId: string;
|
||||||
storageDocId: string;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(@eyhn): simplify this, it's too complex
|
|
||||||
export class DocsIndexer extends Entity {
|
export class DocsIndexer extends Entity {
|
||||||
private readonly jobQueue: JobQueue<IndexerJobPayload> =
|
private readonly jobQueue: JobQueue<IndexerJobPayload> =
|
||||||
new IndexedDBJobQueue<IndexerJobPayload>(
|
new IndexedDBJobQueue<IndexerJobPayload>(
|
||||||
@@ -51,8 +51,6 @@ export class DocsIndexer extends Entity {
|
|||||||
|
|
||||||
private readonly workspaceId = this.workspaceService.workspace.id;
|
private readonly workspaceId = this.workspaceService.workspace.id;
|
||||||
|
|
||||||
private worker: IndexerWorker | null = null;
|
|
||||||
|
|
||||||
readonly status$ = LiveData.from<{ remaining?: number }>(
|
readonly status$ = LiveData.from<{ remaining?: number }>(
|
||||||
this.jobQueue.status$.pipe(
|
this.jobQueue.status$.pipe(
|
||||||
map(status => ({
|
map(status => ({
|
||||||
@@ -69,13 +67,13 @@ export class DocsIndexer extends Entity {
|
|||||||
setupListener() {
|
setupListener() {
|
||||||
this.workspaceEngine.doc.storage.eventBus.on(event => {
|
this.workspaceEngine.doc.storage.eventBus.on(event => {
|
||||||
if (event.clientId === this.workspaceEngine.doc.clientId) {
|
if (event.clientId === this.workspaceEngine.doc.clientId) {
|
||||||
const docId = normalizeDocId(event.docId);
|
const docId = event.docId;
|
||||||
|
|
||||||
this.jobQueue
|
this.jobQueue
|
||||||
.enqueue([
|
.enqueue([
|
||||||
{
|
{
|
||||||
batchKey: docId,
|
batchKey: docId,
|
||||||
payload: { docId, storageDocId: event.docId },
|
payload: { docId },
|
||||||
},
|
},
|
||||||
])
|
])
|
||||||
.catch(err => {
|
.catch(err => {
|
||||||
@@ -85,140 +83,23 @@ export class DocsIndexer extends Entity {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async execJob(jobs: Job<IndexerJobPayload>[], signal: AbortSignal) {
|
async execJob(jobs: Job<IndexerJobPayload>[], _signal: AbortSignal) {
|
||||||
if (jobs.length === 0) {
|
if (jobs.length === 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// jobs should have the same docId, so we just pick the first one
|
// jobs should have the same docId, so we just pick the first one
|
||||||
const docId = jobs[0].payload.docId;
|
const docId = jobs[0].payload.docId;
|
||||||
const storageDocId = jobs[0].payload.storageDocId;
|
|
||||||
|
|
||||||
const worker = await this.ensureWorker(signal);
|
|
||||||
|
|
||||||
const startTime = performance.now();
|
const startTime = performance.now();
|
||||||
logger.debug('Start crawling job for docId:', docId);
|
logger.debug('Start crawling job for docId:', docId);
|
||||||
|
|
||||||
let workerOutput;
|
if (docId) {
|
||||||
|
if (docId === this.workspaceId) {
|
||||||
if (docId === this.workspaceId) {
|
await this.crawlingRootDocData();
|
||||||
const rootDocBuffer =
|
} else {
|
||||||
await this.workspaceEngine.doc.storage.loadDocFromLocal(
|
await this.crawlingDocData(docId);
|
||||||
this.workspaceId
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!rootDocBuffer) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const allIndexedDocs = (
|
|
||||||
await this.docIndex.search(
|
|
||||||
{
|
|
||||||
type: 'all',
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pagination: {
|
|
||||||
limit: Number.MAX_SAFE_INTEGER,
|
|
||||||
skip: 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
).nodes.map(n => n.id);
|
|
||||||
|
|
||||||
workerOutput = await worker.run({
|
|
||||||
type: 'rootDoc',
|
|
||||||
allIndexedDocs,
|
|
||||||
rootDocBuffer,
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
const rootDocBuffer =
|
|
||||||
await this.workspaceEngine.doc.storage.loadDocFromLocal(
|
|
||||||
this.workspaceId
|
|
||||||
);
|
|
||||||
|
|
||||||
const docBuffer =
|
|
||||||
(await this.workspaceEngine.doc.storage.loadDocFromLocal(
|
|
||||||
storageDocId
|
|
||||||
)) ?? new Uint8Array(0);
|
|
||||||
|
|
||||||
if (!rootDocBuffer) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
workerOutput = await worker.run({
|
|
||||||
type: 'doc',
|
|
||||||
docBuffer,
|
|
||||||
docId,
|
|
||||||
rootDocBuffer,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workerOutput.deletedDoc || workerOutput.addedDoc) {
|
|
||||||
if (workerOutput.deletedDoc) {
|
|
||||||
const docIndexWriter = await this.docIndex.write();
|
|
||||||
for (const docId of workerOutput.deletedDoc) {
|
|
||||||
docIndexWriter.delete(docId);
|
|
||||||
}
|
|
||||||
await docIndexWriter.commit();
|
|
||||||
const blockIndexWriter = await this.blockIndex.write();
|
|
||||||
for (const docId of workerOutput.deletedDoc) {
|
|
||||||
const oldBlocks = await blockIndexWriter.search(
|
|
||||||
{
|
|
||||||
type: 'match',
|
|
||||||
field: 'docId',
|
|
||||||
match: docId,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pagination: {
|
|
||||||
limit: Number.MAX_SAFE_INTEGER,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
);
|
|
||||||
for (const block of oldBlocks.nodes) {
|
|
||||||
docIndexWriter.delete(block.id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
await blockIndexWriter.commit();
|
|
||||||
}
|
|
||||||
if (workerOutput.addedDoc) {
|
|
||||||
const docIndexWriter = await this.docIndex.write();
|
|
||||||
for (const { doc } of workerOutput.addedDoc) {
|
|
||||||
docIndexWriter.put(doc);
|
|
||||||
}
|
|
||||||
await docIndexWriter.commit();
|
|
||||||
const blockIndexWriter = await this.blockIndex.write();
|
|
||||||
for (const { blocks } of workerOutput.addedDoc) {
|
|
||||||
// delete old blocks
|
|
||||||
const oldBlocks = await blockIndexWriter.search(
|
|
||||||
{
|
|
||||||
type: 'match',
|
|
||||||
field: 'docId',
|
|
||||||
match: docId,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pagination: {
|
|
||||||
limit: Number.MAX_SAFE_INTEGER,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
);
|
|
||||||
for (const block of oldBlocks.nodes) {
|
|
||||||
blockIndexWriter.delete(block.id);
|
|
||||||
}
|
|
||||||
for (const block of blocks) {
|
|
||||||
blockIndexWriter.insert(block);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
await blockIndexWriter.commit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workerOutput.reindexDoc) {
|
|
||||||
await this.jobQueue.enqueue(
|
|
||||||
workerOutput.reindexDoc.map(({ docId, storageDocId }) => ({
|
|
||||||
batchKey: docId,
|
|
||||||
payload: { docId, storageDocId },
|
|
||||||
}))
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const duration = performance.now() - startTime;
|
const duration = performance.now() - startTime;
|
||||||
@@ -233,7 +114,7 @@ export class DocsIndexer extends Entity {
|
|||||||
.enqueue([
|
.enqueue([
|
||||||
{
|
{
|
||||||
batchKey: this.workspaceId,
|
batchKey: this.workspaceId,
|
||||||
payload: { docId: this.workspaceId, storageDocId: this.workspaceId },
|
payload: { docId: this.workspaceId },
|
||||||
},
|
},
|
||||||
])
|
])
|
||||||
.catch(err => {
|
.catch(err => {
|
||||||
@@ -241,58 +122,341 @@ export class DocsIndexer extends Entity {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async ensureWorker(signal: AbortSignal): Promise<IndexerWorker> {
|
async crawlingDocData(docId: string) {
|
||||||
if (!this.worker) {
|
const rootDocBuffer =
|
||||||
this.worker = await createWorker(signal);
|
await this.workspaceEngine.doc.storage.loadDocFromLocal(this.workspaceId);
|
||||||
}
|
|
||||||
return this.worker;
|
|
||||||
}
|
|
||||||
|
|
||||||
override dispose(): void {
|
if (!rootDocBuffer) {
|
||||||
this.runner.stop();
|
return;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function normalizeDocId(raw: string) {
|
|
||||||
enum DocVariant {
|
|
||||||
Workspace = 'workspace',
|
|
||||||
Page = 'page',
|
|
||||||
Space = 'space',
|
|
||||||
Settings = 'settings',
|
|
||||||
Unknown = 'unknown',
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (!raw.length) {
|
|
||||||
throw new Error('Invalid Empty Doc ID');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let parts = raw.split(':');
|
const yRootDoc = new YDoc();
|
||||||
|
applyUpdate(yRootDoc, rootDocBuffer);
|
||||||
|
|
||||||
if (parts.length > 3) {
|
const docStoragePossibleIds = Array.from(yRootDoc.getSubdocs())
|
||||||
// special adapt case `wsId:space:page:pageId`
|
.map(doc => doc.guid)
|
||||||
if (parts[1] === DocVariant.Space && parts[2] === DocVariant.Page) {
|
.filter(id => id.endsWith(docId));
|
||||||
parts = [parts[0], DocVariant.Space, parts[3]];
|
|
||||||
} else {
|
let docBuffer;
|
||||||
throw new Error(`Invalid format of Doc ID: ${raw}`);
|
|
||||||
|
for (const id of docStoragePossibleIds) {
|
||||||
|
docBuffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(id);
|
||||||
|
|
||||||
|
if (docBuffer) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else if (parts.length === 2) {
|
|
||||||
// `${variant}:${guid}`
|
|
||||||
throw new Error('not supported');
|
|
||||||
} else if (parts.length === 1) {
|
|
||||||
// ${ws} or ${pageId}
|
|
||||||
parts = ['', DocVariant.Unknown, parts[0]];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const docId = parts.at(2);
|
if (!docBuffer) {
|
||||||
|
return;
|
||||||
if (!docId) {
|
|
||||||
throw new Error('ID is required');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return docId;
|
const ydoc = new YDoc();
|
||||||
} catch (err) {
|
|
||||||
logger.error('Error on normalize docId ' + raw, err);
|
applyUpdate(ydoc, docBuffer);
|
||||||
return raw;
|
|
||||||
|
let docExists: boolean | null = null;
|
||||||
|
|
||||||
|
(
|
||||||
|
yRootDoc.getMap('meta').get('pages') as YArray<YMap<any>> | undefined
|
||||||
|
)?.forEach(page => {
|
||||||
|
if (page.get('id') === docId) {
|
||||||
|
docExists = !(page.get('trash') ?? false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!docExists) {
|
||||||
|
const indexWriter = await this.docIndex.write();
|
||||||
|
indexWriter.delete(docId);
|
||||||
|
await indexWriter.commit();
|
||||||
|
|
||||||
|
const blockIndexWriter = await this.blockIndex.write();
|
||||||
|
const oldBlocks = await blockIndexWriter.search(
|
||||||
|
{
|
||||||
|
type: 'match',
|
||||||
|
field: 'docId',
|
||||||
|
match: docId,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pagination: {
|
||||||
|
limit: Number.MAX_SAFE_INTEGER,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
for (const block of oldBlocks.nodes) {
|
||||||
|
blockIndexWriter.delete(block.id);
|
||||||
|
}
|
||||||
|
await blockIndexWriter.commit();
|
||||||
|
} else {
|
||||||
|
const blocks = ydoc.getMap<any>('blocks');
|
||||||
|
|
||||||
|
if (blocks.size === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let docTitle = '';
|
||||||
|
|
||||||
|
const blockDocuments: Document<typeof blockIndexSchema>[] = [];
|
||||||
|
|
||||||
|
for (const block of blocks.values()) {
|
||||||
|
const flavour = block.get('sys:flavour')?.toString();
|
||||||
|
const blockId = block.get('sys:id')?.toString();
|
||||||
|
|
||||||
|
if (!flavour || !blockId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flavour === 'affine:page') {
|
||||||
|
docTitle = block.get('prop:title').toString();
|
||||||
|
blockDocuments.push(
|
||||||
|
Document.from(`${docId}:${blockId}`, {
|
||||||
|
docId,
|
||||||
|
flavour,
|
||||||
|
blockId,
|
||||||
|
content: docTitle,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
flavour === 'affine:paragraph' ||
|
||||||
|
flavour === 'affine:list' ||
|
||||||
|
flavour === 'affine:code'
|
||||||
|
) {
|
||||||
|
const text = block.get('prop:text') as YText;
|
||||||
|
if (!text) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deltas: DeltaInsert<AffineTextAttributes>[] = text.toDelta();
|
||||||
|
const ref = deltas
|
||||||
|
.map(delta => {
|
||||||
|
if (
|
||||||
|
delta.attributes &&
|
||||||
|
delta.attributes.reference &&
|
||||||
|
delta.attributes.reference.pageId
|
||||||
|
) {
|
||||||
|
return delta.attributes.reference.pageId;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
})
|
||||||
|
.filter((link): link is string => !!link);
|
||||||
|
|
||||||
|
blockDocuments.push(
|
||||||
|
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
||||||
|
docId,
|
||||||
|
flavour,
|
||||||
|
blockId,
|
||||||
|
content: text.toString(),
|
||||||
|
ref,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
flavour === 'affine:embed-linked-doc' ||
|
||||||
|
flavour === 'affine:embed-synced-doc'
|
||||||
|
) {
|
||||||
|
const pageId = block.get('prop:pageId');
|
||||||
|
if (typeof pageId === 'string') {
|
||||||
|
blockDocuments.push(
|
||||||
|
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
||||||
|
docId,
|
||||||
|
flavour,
|
||||||
|
blockId,
|
||||||
|
ref: pageId,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flavour === 'affine:attachment' || flavour === 'affine:image') {
|
||||||
|
const blobId = block.get('prop:sourceId');
|
||||||
|
if (typeof blobId === 'string') {
|
||||||
|
blockDocuments.push(
|
||||||
|
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
||||||
|
docId,
|
||||||
|
flavour,
|
||||||
|
blockId,
|
||||||
|
blob: [blobId],
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flavour === 'affine:surface') {
|
||||||
|
const texts = [];
|
||||||
|
|
||||||
|
const elementsObj = block.get('prop:elements');
|
||||||
|
if (
|
||||||
|
!(
|
||||||
|
elementsObj instanceof YMap &&
|
||||||
|
elementsObj.get('type') === '$blocksuite:internal:native$'
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const elements = elementsObj.get('value') as YMap<any>;
|
||||||
|
if (!(elements instanceof YMap)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const element of elements.values()) {
|
||||||
|
if (!(element instanceof YMap)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const text = element.get('text') as YText;
|
||||||
|
if (!text) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
texts.push(text.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDocuments.push(
|
||||||
|
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
||||||
|
docId,
|
||||||
|
flavour,
|
||||||
|
blockId,
|
||||||
|
content: texts,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flavour === 'affine:database') {
|
||||||
|
const texts = [];
|
||||||
|
const columnsObj = block.get('prop:columns');
|
||||||
|
if (!(columnsObj instanceof YArray)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (const column of columnsObj) {
|
||||||
|
if (!(column instanceof YMap)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (typeof column.get('name') === 'string') {
|
||||||
|
texts.push(column.get('name'));
|
||||||
|
}
|
||||||
|
|
||||||
|
const data = column.get('data');
|
||||||
|
if (!(data instanceof YMap)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const options = data.get('options');
|
||||||
|
if (!(options instanceof YArray)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (const option of options) {
|
||||||
|
if (!(option instanceof YMap)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const value = option.get('value');
|
||||||
|
if (typeof value === 'string') {
|
||||||
|
texts.push(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDocuments.push(
|
||||||
|
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
||||||
|
docId,
|
||||||
|
flavour,
|
||||||
|
blockId,
|
||||||
|
content: texts,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const docIndexWriter = await this.docIndex.write();
|
||||||
|
docIndexWriter.put(
|
||||||
|
Document.from<typeof docIndexSchema>(docId, {
|
||||||
|
title: docTitle,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
await docIndexWriter.commit();
|
||||||
|
|
||||||
|
const blockIndexWriter = await this.blockIndex.write();
|
||||||
|
const oldBlocks = await blockIndexWriter.search(
|
||||||
|
{
|
||||||
|
type: 'match',
|
||||||
|
field: 'docId',
|
||||||
|
match: docId,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pagination: {
|
||||||
|
limit: Number.MAX_SAFE_INTEGER,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
for (const block of oldBlocks.nodes) {
|
||||||
|
blockIndexWriter.delete(block.id);
|
||||||
|
}
|
||||||
|
for (const block of blockDocuments) {
|
||||||
|
blockIndexWriter.insert(block);
|
||||||
|
}
|
||||||
|
await blockIndexWriter.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async crawlingRootDocData() {
|
||||||
|
const buffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(
|
||||||
|
this.workspaceId
|
||||||
|
);
|
||||||
|
if (!buffer) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ydoc = new YDoc();
|
||||||
|
|
||||||
|
applyUpdate(ydoc, buffer);
|
||||||
|
|
||||||
|
const docs = ydoc.getMap('meta').get('pages') as
|
||||||
|
| YArray<YMap<any>>
|
||||||
|
| undefined;
|
||||||
|
|
||||||
|
if (!docs) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const availableDocs = [];
|
||||||
|
|
||||||
|
for (const page of docs) {
|
||||||
|
const docId = page.get('id');
|
||||||
|
|
||||||
|
if (typeof docId !== 'string') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const inTrash = page.get('trash') ?? false;
|
||||||
|
|
||||||
|
if (!inTrash) {
|
||||||
|
availableDocs.push(docId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// a hack to get all docs in index
|
||||||
|
const allIndexedDocs = (
|
||||||
|
await this.docIndex.search(
|
||||||
|
{
|
||||||
|
type: 'all',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pagination: {
|
||||||
|
limit: Number.MAX_SAFE_INTEGER,
|
||||||
|
skip: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
).nodes.map(n => n.id);
|
||||||
|
|
||||||
|
const needDelete = difference(allIndexedDocs, availableDocs);
|
||||||
|
const needAdd = difference(availableDocs, allIndexedDocs);
|
||||||
|
|
||||||
|
await this.jobQueue.enqueue(
|
||||||
|
[...needAdd, ...needDelete].map(docId => ({
|
||||||
|
batchKey: docId,
|
||||||
|
payload: { docId },
|
||||||
|
}))
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -419,8 +419,4 @@ export class DocsSearchService extends Service {
|
|||||||
const title = doc?.get('title');
|
const title = doc?.get('title');
|
||||||
return typeof title === 'string' ? title : title?.[0];
|
return typeof title === 'string' ? title : title?.[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
override dispose(): void {
|
|
||||||
this.indexer.dispose();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,313 +0,0 @@
|
|||||||
import type { AffineTextAttributes } from '@blocksuite/blocks';
|
|
||||||
import type { DeltaInsert } from '@blocksuite/inline';
|
|
||||||
import { Document } from '@toeverything/infra';
|
|
||||||
import { difference } from 'lodash-es';
|
|
||||||
import {
|
|
||||||
applyUpdate,
|
|
||||||
Array as YArray,
|
|
||||||
Doc as YDoc,
|
|
||||||
Map as YMap,
|
|
||||||
type Text as YText,
|
|
||||||
} from 'yjs';
|
|
||||||
|
|
||||||
import type { BlockIndexSchema, docIndexSchema } from '../schema';
|
|
||||||
import type {
|
|
||||||
WorkerIngoingMessage,
|
|
||||||
WorkerInput,
|
|
||||||
WorkerOutgoingMessage,
|
|
||||||
WorkerOutput,
|
|
||||||
} from './types';
|
|
||||||
|
|
||||||
function crawlingDocData({
|
|
||||||
docBuffer,
|
|
||||||
docId,
|
|
||||||
rootDocBuffer,
|
|
||||||
}: WorkerInput & { type: 'doc' }): WorkerOutput {
|
|
||||||
const yRootDoc = new YDoc();
|
|
||||||
applyUpdate(yRootDoc, rootDocBuffer);
|
|
||||||
|
|
||||||
const ydoc = new YDoc();
|
|
||||||
|
|
||||||
applyUpdate(ydoc, docBuffer);
|
|
||||||
|
|
||||||
let docExists: boolean | null = null;
|
|
||||||
|
|
||||||
(
|
|
||||||
yRootDoc.getMap('meta').get('pages') as YArray<YMap<any>> | undefined
|
|
||||||
)?.forEach(page => {
|
|
||||||
if (page.get('id') === docId) {
|
|
||||||
docExists = !(page.get('trash') ?? false);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!docExists) {
|
|
||||||
return {
|
|
||||||
deletedDoc: [docId],
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
const blocks = ydoc.getMap<any>('blocks');
|
|
||||||
|
|
||||||
if (blocks.size === 0) {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
let docTitle = '';
|
|
||||||
|
|
||||||
const blockDocuments: Document<BlockIndexSchema>[] = [];
|
|
||||||
|
|
||||||
for (const block of blocks.values()) {
|
|
||||||
const flavour = block.get('sys:flavour')?.toString();
|
|
||||||
const blockId = block.get('sys:id')?.toString();
|
|
||||||
|
|
||||||
if (!flavour || !blockId) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flavour === 'affine:page') {
|
|
||||||
docTitle = block.get('prop:title').toString();
|
|
||||||
blockDocuments.push(
|
|
||||||
Document.from(`${docId}:${blockId}`, {
|
|
||||||
docId,
|
|
||||||
flavour,
|
|
||||||
blockId,
|
|
||||||
content: docTitle,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
flavour === 'affine:paragraph' ||
|
|
||||||
flavour === 'affine:list' ||
|
|
||||||
flavour === 'affine:code'
|
|
||||||
) {
|
|
||||||
const text = block.get('prop:text') as YText;
|
|
||||||
if (!text) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const deltas: DeltaInsert<AffineTextAttributes>[] = text.toDelta();
|
|
||||||
const ref = deltas
|
|
||||||
.map(delta => {
|
|
||||||
if (
|
|
||||||
delta.attributes &&
|
|
||||||
delta.attributes.reference &&
|
|
||||||
delta.attributes.reference.pageId
|
|
||||||
) {
|
|
||||||
return delta.attributes.reference.pageId;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
})
|
|
||||||
.filter((link): link is string => !!link);
|
|
||||||
|
|
||||||
blockDocuments.push(
|
|
||||||
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
|
||||||
docId,
|
|
||||||
flavour,
|
|
||||||
blockId,
|
|
||||||
content: text.toString(),
|
|
||||||
ref,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
flavour === 'affine:embed-linked-doc' ||
|
|
||||||
flavour === 'affine:embed-synced-doc'
|
|
||||||
) {
|
|
||||||
const pageId = block.get('prop:pageId');
|
|
||||||
if (typeof pageId === 'string') {
|
|
||||||
blockDocuments.push(
|
|
||||||
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
|
||||||
docId,
|
|
||||||
flavour,
|
|
||||||
blockId,
|
|
||||||
ref: pageId,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flavour === 'affine:attachment' || flavour === 'affine:image') {
|
|
||||||
const blobId = block.get('prop:sourceId');
|
|
||||||
if (typeof blobId === 'string') {
|
|
||||||
blockDocuments.push(
|
|
||||||
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
|
||||||
docId,
|
|
||||||
flavour,
|
|
||||||
blockId,
|
|
||||||
blob: [blobId],
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flavour === 'affine:surface') {
|
|
||||||
const texts = [];
|
|
||||||
|
|
||||||
const elementsObj = block.get('prop:elements');
|
|
||||||
if (
|
|
||||||
!(
|
|
||||||
elementsObj instanceof YMap &&
|
|
||||||
elementsObj.get('type') === '$blocksuite:internal:native$'
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const elements = elementsObj.get('value') as YMap<any>;
|
|
||||||
if (!(elements instanceof YMap)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const element of elements.values()) {
|
|
||||||
if (!(element instanceof YMap)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const text = element.get('text') as YText;
|
|
||||||
if (!text) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
texts.push(text.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDocuments.push(
|
|
||||||
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
|
||||||
docId,
|
|
||||||
flavour,
|
|
||||||
blockId,
|
|
||||||
content: texts,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flavour === 'affine:database') {
|
|
||||||
const texts = [];
|
|
||||||
const columnsObj = block.get('prop:columns');
|
|
||||||
if (!(columnsObj instanceof YArray)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (const column of columnsObj) {
|
|
||||||
if (!(column instanceof YMap)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (typeof column.get('name') === 'string') {
|
|
||||||
texts.push(column.get('name'));
|
|
||||||
}
|
|
||||||
|
|
||||||
const data = column.get('data');
|
|
||||||
if (!(data instanceof YMap)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const options = data.get('options');
|
|
||||||
if (!(options instanceof YArray)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (const option of options) {
|
|
||||||
if (!(option instanceof YMap)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const value = option.get('value');
|
|
||||||
if (typeof value === 'string') {
|
|
||||||
texts.push(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDocuments.push(
|
|
||||||
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
|
|
||||||
docId,
|
|
||||||
flavour,
|
|
||||||
blockId,
|
|
||||||
content: texts,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
addedDoc: [
|
|
||||||
{
|
|
||||||
id: docId,
|
|
||||||
doc: Document.from<typeof docIndexSchema>(docId, {
|
|
||||||
title: docTitle,
|
|
||||||
}),
|
|
||||||
blocks: blockDocuments,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function crawlingRootDocData({
|
|
||||||
allIndexedDocs,
|
|
||||||
rootDocBuffer,
|
|
||||||
}: WorkerInput & {
|
|
||||||
type: 'rootDoc';
|
|
||||||
}): WorkerOutput {
|
|
||||||
const ydoc = new YDoc();
|
|
||||||
|
|
||||||
applyUpdate(ydoc, rootDocBuffer);
|
|
||||||
|
|
||||||
const docs = ydoc.getMap('meta').get('pages') as
|
|
||||||
| YArray<YMap<any>>
|
|
||||||
| undefined;
|
|
||||||
|
|
||||||
if (!docs) {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
const availableDocs = [];
|
|
||||||
|
|
||||||
for (const page of docs) {
|
|
||||||
const docId = page.get('id');
|
|
||||||
|
|
||||||
if (typeof docId !== 'string') {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const inTrash = page.get('trash') ?? false;
|
|
||||||
|
|
||||||
if (!inTrash) {
|
|
||||||
availableDocs.push(docId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const needDelete = difference(allIndexedDocs, availableDocs);
|
|
||||||
const needAdd = difference(availableDocs, allIndexedDocs);
|
|
||||||
|
|
||||||
return {
|
|
||||||
reindexDoc: [...needAdd, ...needDelete].map(docId => ({
|
|
||||||
docId,
|
|
||||||
storageDocId: ydoc.getMap<YDoc>('spaces').get(docId)?.guid ?? docId,
|
|
||||||
})),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
globalThis.onmessage = (event: MessageEvent<WorkerIngoingMessage>) => {
|
|
||||||
const message = event.data;
|
|
||||||
if (message.type === 'init') {
|
|
||||||
postMessage({ type: 'init', msgId: message.msgId });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (message.type === 'run') {
|
|
||||||
const { input } = message;
|
|
||||||
try {
|
|
||||||
let data;
|
|
||||||
if (input.type === 'rootDoc') {
|
|
||||||
data = crawlingRootDocData(input);
|
|
||||||
} else {
|
|
||||||
data = crawlingDocData(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
postMessage({ type: 'done', msgId: message.msgId, output: data });
|
|
||||||
} catch (error) {
|
|
||||||
postMessage({
|
|
||||||
type: 'failed',
|
|
||||||
msgId: message.msgId,
|
|
||||||
error: error instanceof Error ? error.message : error + '',
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
declare function postMessage(message: WorkerOutgoingMessage): void;
|
|
||||||
@@ -1,98 +0,0 @@
|
|||||||
import { DebugLogger } from '@affine/debug';
|
|
||||||
import { MANUALLY_STOP, throwIfAborted } from '@toeverything/infra';
|
|
||||||
|
|
||||||
import type {
|
|
||||||
WorkerIngoingMessage,
|
|
||||||
WorkerInput,
|
|
||||||
WorkerOutgoingMessage,
|
|
||||||
WorkerOutput,
|
|
||||||
} from './types';
|
|
||||||
|
|
||||||
const logger = new DebugLogger('affine:indexer-worker');
|
|
||||||
|
|
||||||
export async function createWorker(abort: AbortSignal) {
|
|
||||||
let worker: Worker | null = null;
|
|
||||||
while (throwIfAborted(abort)) {
|
|
||||||
try {
|
|
||||||
worker = await new Promise<Worker>((resolve, reject) => {
|
|
||||||
const worker = new Worker(new URL('./in-worker.ts', import.meta.url));
|
|
||||||
worker.addEventListener('error', reject);
|
|
||||||
worker.addEventListener('message', event => {
|
|
||||||
if (event.data.type === 'init') {
|
|
||||||
resolve(worker);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
worker.postMessage({ type: 'init', msgId: 0 } as WorkerIngoingMessage);
|
|
||||||
setTimeout(() => {
|
|
||||||
reject('timeout');
|
|
||||||
}, 1000 * 30 /* 30 sec */);
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
logger.debug(
|
|
||||||
`Indexer worker init failed, ${err}, will retry in 5 seconds.`
|
|
||||||
);
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 5000));
|
|
||||||
}
|
|
||||||
if (worker) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!worker) {
|
|
||||||
// never reach here
|
|
||||||
throw new Error('Worker is not created');
|
|
||||||
}
|
|
||||||
|
|
||||||
const terminateAbort = new AbortController();
|
|
||||||
|
|
||||||
let msgId = 1;
|
|
||||||
|
|
||||||
return {
|
|
||||||
run: async (input: WorkerInput) => {
|
|
||||||
const dispose: (() => void)[] = [];
|
|
||||||
return new Promise<WorkerOutput>((resolve, reject) => {
|
|
||||||
const currentMsgId = msgId++;
|
|
||||||
const msgHandler = (event: MessageEvent<WorkerOutgoingMessage>) => {
|
|
||||||
if (event.data.msgId === currentMsgId) {
|
|
||||||
if (event.data.type === 'done') {
|
|
||||||
resolve(event.data.output);
|
|
||||||
} else if (event.data.type === 'failed') {
|
|
||||||
reject(new Error(event.data.error));
|
|
||||||
} else {
|
|
||||||
reject(new Error('Unknown message type'));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const abortHandler = (reason: any) => {
|
|
||||||
reject(reason);
|
|
||||||
};
|
|
||||||
|
|
||||||
worker.addEventListener('message', msgHandler);
|
|
||||||
dispose.push(() => {
|
|
||||||
worker?.removeEventListener('message', msgHandler);
|
|
||||||
});
|
|
||||||
|
|
||||||
terminateAbort.signal.addEventListener('abort', abortHandler);
|
|
||||||
dispose.push(() => {
|
|
||||||
terminateAbort.signal.removeEventListener('abort', abortHandler);
|
|
||||||
});
|
|
||||||
|
|
||||||
worker.postMessage({
|
|
||||||
type: 'run',
|
|
||||||
msgId: currentMsgId,
|
|
||||||
input,
|
|
||||||
} as WorkerIngoingMessage);
|
|
||||||
}).finally(() => {
|
|
||||||
for (const d of dispose) {
|
|
||||||
d();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
},
|
|
||||||
dispose: () => {
|
|
||||||
worker.terminate();
|
|
||||||
terminateAbort.abort(MANUALLY_STOP);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export type IndexerWorker = Awaited<ReturnType<typeof createWorker>>;
|
|
||||||
@@ -1,50 +0,0 @@
|
|||||||
import type { Document } from '@toeverything/infra';
|
|
||||||
|
|
||||||
import type { BlockIndexSchema, DocIndexSchema } from '../schema';
|
|
||||||
|
|
||||||
export type WorkerIngoingMessage = (
|
|
||||||
| {
|
|
||||||
type: 'init';
|
|
||||||
}
|
|
||||||
| {
|
|
||||||
type: 'run';
|
|
||||||
input: WorkerInput;
|
|
||||||
}
|
|
||||||
) & { msgId: number };
|
|
||||||
|
|
||||||
export type WorkerOutgoingMessage = (
|
|
||||||
| {
|
|
||||||
type: 'init';
|
|
||||||
}
|
|
||||||
| {
|
|
||||||
type: 'done';
|
|
||||||
output: WorkerOutput;
|
|
||||||
}
|
|
||||||
| {
|
|
||||||
type: 'failed';
|
|
||||||
error: string;
|
|
||||||
}
|
|
||||||
) & { msgId: number };
|
|
||||||
|
|
||||||
export type WorkerInput =
|
|
||||||
| {
|
|
||||||
type: 'rootDoc';
|
|
||||||
rootDocBuffer: Uint8Array;
|
|
||||||
allIndexedDocs: string[];
|
|
||||||
}
|
|
||||||
| {
|
|
||||||
type: 'doc';
|
|
||||||
docId: string;
|
|
||||||
rootDocBuffer: Uint8Array;
|
|
||||||
docBuffer: Uint8Array;
|
|
||||||
};
|
|
||||||
|
|
||||||
export interface WorkerOutput {
|
|
||||||
reindexDoc?: { docId: string; storageDocId: string }[];
|
|
||||||
addedDoc?: {
|
|
||||||
id: string;
|
|
||||||
blocks: Document<BlockIndexSchema>[];
|
|
||||||
doc: Document<DocIndexSchema>;
|
|
||||||
}[];
|
|
||||||
deletedDoc?: string[];
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user