feat(server): auto index all workspaces to indexer (#12205)

close CLOUD-207

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

- **New Features**
  - Introduced automated periodic indexing of workspaces with a new job type and a scheduled cron job running every 30 seconds.
  - Added a unique sequential identifier (`sid`) and an "indexed" flag to workspaces to track indexing status.
- **Improvements**
  - Enhanced workspace indexing to handle missing workspaces and snapshots distinctly and selectively index documents.
  - Added ability to query workspaces after a given identifier with result limits.
- **Bug Fixes**
  - Improved error handling and logging during workspace indexing operations.
- **Tests**
  - Expanded test coverage for workspace indexing and auto-indexing, including scheduling and edge cases.
- **Chores**
  - Updated data models and schema to support new workspace fields and indexing features.
  - Enhanced mock data utilities to allow custom timestamps.
  - Improved type safety and flexibility in document snapshot retrieval.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
fengmk2
2025-05-19 04:39:38 +00:00
parent 34686f3d85
commit a34c3ea200
8 changed files with 211 additions and 13 deletions

View File

@@ -6,7 +6,9 @@ import Sinon from 'sinon';
import { createModule } from '../../../__tests__/create-module';
import { Mockers } from '../../../__tests__/mocks';
import { JOB_SIGNAL } from '../../../base';
import { ServerConfigModule } from '../../../core/config';
import { Models } from '../../../models';
import { IndexerModule, IndexerService } from '..';
import { SearchProviderFactory } from '../factory';
import { IndexerJob } from '../job';
@@ -20,6 +22,7 @@ const indexerService = module.get(IndexerService);
const indexerJob = module.get(IndexerJob);
const searchProviderFactory = module.get(SearchProviderFactory);
const manticoresearch = module.get(ManticoresearchProvider);
const models = module.get(Models);
const user = await module.create(Mockers.User);
const workspace = await module.create(Mockers.Workspace, {
@@ -63,15 +66,21 @@ test('should handle indexer.deleteDoc job', async t => {
test('should handle indexer.indexWorkspace job', async t => {
const count = module.queue.count('indexer.deleteDoc');
const spy = Sinon.spy(indexerService, 'listDocIds');
await indexerJob.indexWorkspace({
workspaceId: workspace.id,
});
t.is(spy.callCount, 1);
const { payload } = await module.queue.waitFor('indexer.indexDoc');
t.is(payload.workspaceId, workspace.id);
t.is(payload.docId, '5nS9BSp3Px');
// no delete job
t.is(module.queue.count('indexer.deleteDoc'), count);
// workspace should be indexed
const ws = await models.workspace.get(workspace.id);
t.is(ws!.indexed, true);
});
test('should not sync existing doc', async t => {
@@ -79,9 +88,11 @@ test('should not sync existing doc', async t => {
mock.method(indexerService, 'listDocIds', async () => {
return ['5nS9BSp3Px'];
});
await indexerJob.indexWorkspace({
workspaceId: workspace.id,
});
t.is(module.queue.count('indexer.indexDoc'), count);
});
@@ -90,9 +101,11 @@ test('should delete doc from indexer when docId is not in workspace', async t =>
mock.method(indexerService, 'listDocIds', async () => {
return ['mock-doc-id1', 'mock-doc-id2'];
});
await indexerJob.indexWorkspace({
workspaceId: workspace.id,
});
const { payload } = await module.queue.waitFor('indexer.indexDoc');
t.is(payload.workspaceId, workspace.id);
t.is(payload.docId, '5nS9BSp3Px');
@@ -101,8 +114,71 @@ test('should delete doc from indexer when docId is not in workspace', async t =>
test('should handle indexer.deleteWorkspace job', async t => {
const spy = Sinon.spy(indexerService, 'deleteWorkspace');
await indexerJob.deleteWorkspace({
workspaceId: workspace.id,
});
t.is(spy.callCount, 1);
});
test('should handle indexer.autoIndexWorkspaces job', async t => {
const workspace = await module.create(Mockers.Workspace, {
snapshot: true,
});
const result = await indexerJob.autoIndexWorkspaces({
lastIndexedWorkspaceSid: workspace.sid - 1,
});
t.is(result, JOB_SIGNAL.Repeat);
const { payload } = await module.queue.waitFor('indexer.indexWorkspace');
t.is(payload.workspaceId, workspace.id);
// no new auto index job
const count = module.queue.count('indexer.autoIndexWorkspaces');
await indexerJob.autoIndexWorkspaces({
lastIndexedWorkspaceSid: workspace.sid,
});
t.is(module.queue.count('indexer.autoIndexWorkspaces'), count);
});
test('should not index workspace if it is not updated in 180 days', async t => {
const workspace = await module.create(Mockers.Workspace);
await module.create(Mockers.DocSnapshot, {
user,
workspaceId: workspace.id,
docId: workspace.id,
updatedAt: new Date(Date.now() - 180 * 24 * 60 * 60 * 1000 - 1),
});
const count = module.queue.count('indexer.indexWorkspace');
await indexerJob.autoIndexWorkspaces({
lastIndexedWorkspaceSid: workspace.sid - 1,
});
t.is(module.queue.count('indexer.indexWorkspace'), count);
});
test('should not index workspace if snapshot not exists', async t => {
// not create snapshot
const workspace = await module.create(Mockers.Workspace);
const count = module.queue.count('indexer.indexWorkspace');
await indexerJob.autoIndexWorkspaces({
lastIndexedWorkspaceSid: workspace.sid - 1,
});
t.is(module.queue.count('indexer.indexWorkspace'), count);
});
test('should schedule auto index workspaces', async t => {
await indexerJob.scheduleAutoIndexWorkspaces();
const { payload } = await module.queue.waitFor('indexer.autoIndexWorkspaces');
t.is(payload.lastIndexedWorkspaceSid, undefined);
});

View File

@@ -1,6 +1,7 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { JobQueue, OnJob } from '../../base';
import { Config, JOB_SIGNAL, JobQueue, OnJob } from '../../base';
import { readAllDocIdsFromWorkspaceSnapshot } from '../../core/utils/blocksuite';
import { Models } from '../../models';
import { IndexerService } from './service';
@@ -21,6 +22,9 @@ declare global {
'indexer.deleteWorkspace': {
workspaceId: string;
};
'indexer.autoIndexWorkspaces': {
lastIndexedWorkspaceSid?: number;
};
}
}
@@ -31,7 +35,8 @@ export class IndexerJob {
constructor(
private readonly models: Models,
private readonly service: IndexerService,
private readonly queue: JobQueue
private readonly queue: JobQueue,
private readonly config: Config
) {}
@OnJob('indexer.indexDoc')
@@ -51,22 +56,30 @@ export class IndexerJob {
@OnJob('indexer.indexWorkspace')
async indexWorkspace({ workspaceId }: Jobs['indexer.indexWorkspace']) {
await this.queue.remove(workspaceId, 'indexer.deleteWorkspace');
const workspace = await this.models.workspace.get(workspaceId);
if (!workspace) {
this.logger.warn(`workspace ${workspaceId} not found`);
return;
}
const snapshot = await this.models.doc.getSnapshot(
workspaceId,
workspaceId
);
if (!snapshot) {
this.logger.warn(`workspace ${workspaceId} not found`);
this.logger.warn(`workspace snapshot ${workspaceId} not found`);
return;
}
const docIdsInWorkspace = readAllDocIdsFromWorkspaceSnapshot(snapshot.blob);
const docIdsInIndexer = await this.service.listDocIds(workspaceId);
const docIdsInWorkspaceSet = new Set(docIdsInWorkspace);
const docIdsInIndexerSet = new Set(docIdsInIndexer);
// diff the docIdsInWorkspace and docIdsInIndexer
const missingDocIds = docIdsInWorkspace.filter(
docId => !docIdsInIndexerSet.has(docId)
);
// diff the docIdsInWorkspace and docIdsInIndexer, if the workspace is not indexed, all the docIdsInWorkspace should be indexed
const missingDocIds = workspace.indexed
? docIdsInWorkspace.filter(docId => !docIdsInIndexerSet.has(docId))
: docIdsInWorkspace;
const deletedDocIds = docIdsInIndexer.filter(
docId => !docIdsInWorkspaceSet.has(docId)
);
@@ -97,6 +110,11 @@ export class IndexerJob {
}
);
}
if (!workspace.indexed) {
await this.models.workspace.update(workspaceId, {
indexed: true,
});
}
this.logger.debug(
`indexed workspace ${workspaceId} with ${missingDocIds.length} missing docs and ${deletedDocIds.length} deleted docs`
);
@@ -107,4 +125,67 @@ export class IndexerJob {
await this.queue.remove(workspaceId, 'indexer.indexWorkspace');
await this.service.deleteWorkspace(workspaceId);
}
@OnJob('indexer.autoIndexWorkspaces')
async autoIndexWorkspaces(payload: Jobs['indexer.autoIndexWorkspaces']) {
const startSid = payload.lastIndexedWorkspaceSid ?? 0;
const workspaces = await this.models.workspace.listAfterSid(startSid, 100);
if (workspaces.length === 0) {
// Keep the current sid value when repeating
return JOB_SIGNAL.Repeat;
}
let addedCount = 0;
for (const workspace of workspaces) {
if (workspace.indexed) {
continue;
}
const snapshotMeta = await this.models.doc.getSnapshot(
workspace.id,
workspace.id,
{
select: {
updatedAt: true,
},
}
);
// ignore 180 days not updated workspaces
if (
!snapshotMeta?.updatedAt ||
Date.now() - snapshotMeta.updatedAt.getTime() >
180 * 24 * 60 * 60 * 1000
) {
continue;
}
await this.queue.add(
'indexer.indexWorkspace',
{ workspaceId: workspace.id },
{ jobId: workspace.id }
);
addedCount++;
}
const nextSid = workspaces[workspaces.length - 1].sid;
this.logger.log(
`Auto added ${addedCount} workspaces to queue, lastIndexedWorkspaceSid: ${startSid} -> ${nextSid}`
);
// update the lastIndexedWorkspaceSid in the payload and repeat the job after 30 seconds
payload.lastIndexedWorkspaceSid = nextSid;
return JOB_SIGNAL.Repeat;
}
@Cron(CronExpression.EVERY_30_SECONDS)
async scheduleAutoIndexWorkspaces() {
if (!this.config.indexer.enabled) {
return;
}
await this.queue.add(
'indexer.autoIndexWorkspaces',
{},
{
// make sure only one job is running at a time
delay: 30 * 1000,
jobId: 'indexer:auto-index-workspaces',
}
);
}
}