feat(core): indexer upgrade (#8062)

This commit is contained in:
EYHN
2024-09-04 07:31:03 +00:00
parent 51f3566bec
commit fb64bc7e55
4 changed files with 60 additions and 5 deletions

View File

@@ -1,5 +1,10 @@
import { DebugLogger } from '@affine/debug'; import { DebugLogger } from '@affine/debug';
import type { Job, JobQueue, WorkspaceService } from '@toeverything/infra'; import type {
Job,
JobQueue,
WorkspaceLocalState,
WorkspaceService,
} from '@toeverything/infra';
import { import {
Entity, Entity,
IndexedDBIndexStorage, IndexedDBIndexStorage,
@@ -21,12 +26,18 @@ export function isEmptyUpdate(binary: Uint8Array) {
} }
const logger = new DebugLogger('crawler'); const logger = new DebugLogger('crawler');
const WORKSPACE_DOCS_INDEXER_VERSION_KEY = 'docs-indexer-version';
interface IndexerJobPayload { interface IndexerJobPayload {
storageDocId: string; storageDocId: string;
} }
export class DocsIndexer extends Entity { export class DocsIndexer extends Entity {
/**
* increase this number to re-index all docs
*/
static INDEXER_VERSION = 1;
private readonly jobQueue: JobQueue<IndexerJobPayload> = private readonly jobQueue: JobQueue<IndexerJobPayload> =
new IndexedDBJobQueue<IndexerJobPayload>( new IndexedDBJobQueue<IndexerJobPayload>(
'jq:' + this.workspaceService.workspace.id 'jq:' + this.workspaceService.workspace.id
@@ -66,7 +77,10 @@ export class DocsIndexer extends Entity {
{} {}
); );
constructor(private readonly workspaceService: WorkspaceService) { constructor(
private readonly workspaceService: WorkspaceService,
private readonly workspaceLocalState: WorkspaceLocalState
) {
super(); super();
} }
@@ -96,6 +110,16 @@ export class DocsIndexer extends Entity {
return; return;
} }
const dbVersion = this.getVersion();
if (dbVersion > DocsIndexer.INDEXER_VERSION) {
// stop if db version is higher then self
this.runner.stop();
throw new Error('Indexer is outdated');
}
const isUpgrade = dbVersion < DocsIndexer.INDEXER_VERSION;
// jobs should have the same storage docId, so we just pick the first one // jobs should have the same storage docId, so we just pick the first one
const storageDocId = jobs[0].payload.storageDocId; const storageDocId = jobs[0].payload.storageDocId;
@@ -111,7 +135,6 @@ export class DocsIndexer extends Entity {
await this.workspaceEngine.doc.storage.loadDocFromLocal( await this.workspaceEngine.doc.storage.loadDocFromLocal(
this.workspaceId this.workspaceId
); );
if (!rootDocBuffer) { if (!rootDocBuffer) {
return; return;
} }
@@ -134,6 +157,7 @@ export class DocsIndexer extends Entity {
type: 'rootDoc', type: 'rootDoc',
allIndexedDocs, allIndexedDocs,
rootDocBuffer, rootDocBuffer,
reindexAll: isUpgrade,
}); });
} else { } else {
const rootDocBuffer = const rootDocBuffer =
@@ -226,6 +250,10 @@ export class DocsIndexer extends Entity {
); );
} }
if (isUpgrade) {
this.setVersion();
}
const duration = performance.now() - startTime; const duration = performance.now() - startTime;
logger.debug( logger.debug(
'Finish crawling job for storageDocId:' + 'Finish crawling job for storageDocId:' +
@@ -238,6 +266,7 @@ export class DocsIndexer extends Entity {
startCrawling() { startCrawling() {
this.runner.start(); this.runner.start();
this.jobQueue this.jobQueue
.enqueue([ .enqueue([
{ {
@@ -257,6 +286,27 @@ export class DocsIndexer extends Entity {
return this.worker; return this.worker;
} }
getVersion() {
const version = this.workspaceLocalState.get<number>(
WORKSPACE_DOCS_INDEXER_VERSION_KEY
);
if (typeof version !== 'number') {
return -1;
} else {
return version;
}
}
setVersion(version = DocsIndexer.INDEXER_VERSION) {
if (this.getVersion() >= version) {
return;
}
return this.workspaceLocalState.set(
WORKSPACE_DOCS_INDEXER_VERSION_KEY,
version
);
}
override dispose(): void { override dispose(): void {
this.runner.stop(); this.runner.stop();
} }

View File

@@ -2,6 +2,7 @@ export { DocsSearchService } from './services/docs-search';
import { import {
type Framework, type Framework,
WorkspaceLocalState,
WorkspaceScope, WorkspaceScope,
WorkspaceService, WorkspaceService,
} from '@toeverything/infra'; } from '@toeverything/infra';
@@ -13,5 +14,5 @@ export function configureDocsSearchModule(framework: Framework) {
framework framework
.scope(WorkspaceScope) .scope(WorkspaceScope)
.service(DocsSearchService, [WorkspaceService]) .service(DocsSearchService, [WorkspaceService])
.entity(DocsIndexer, [WorkspaceService]); .entity(DocsIndexer, [WorkspaceService, WorkspaceLocalState]);
} }

View File

@@ -285,6 +285,7 @@ async function crawlingDocData({
function crawlingRootDocData({ function crawlingRootDocData({
allIndexedDocs, allIndexedDocs,
rootDocBuffer, rootDocBuffer,
reindexAll,
}: WorkerInput & { }: WorkerInput & {
type: 'rootDoc'; type: 'rootDoc';
}): WorkerOutput { }): WorkerOutput {
@@ -317,7 +318,9 @@ function crawlingRootDocData({
} }
const needDelete = difference(allIndexedDocs, availableDocs); const needDelete = difference(allIndexedDocs, availableDocs);
const needAdd = difference(availableDocs, allIndexedDocs); const needAdd = reindexAll
? availableDocs
: difference(availableDocs, allIndexedDocs);
return { return {
reindexDoc: [...needAdd, ...needDelete].map(docId => ({ reindexDoc: [...needAdd, ...needDelete].map(docId => ({

View File

@@ -31,6 +31,7 @@ export type WorkerInput =
type: 'rootDoc'; type: 'rootDoc';
rootDocBuffer: Uint8Array; rootDocBuffer: Uint8Array;
allIndexedDocs: string[]; allIndexedDocs: string[];
reindexAll?: boolean;
} }
| { | {
type: 'doc'; type: 'doc';