feat(server): increase embedding jobs concurrency & handle empty content after trim (#12574)

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

- **Improvements**
  - Increased the default concurrency for background tasks, enhancing processing efficiency.
  - Improved handling of empty or unsupported documents to ensure consistent processing.
  - Optimized document filtering to exclude certain documents from processing, improving performance.

- **Bug Fixes**
  - Enhanced detection of empty document summaries, reducing errors during processing.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
darkskygit
2025-05-27 14:28:34 +00:00
parent 7eb6b268a6
commit 9220b973c7
6 changed files with 54 additions and 23 deletions

View File

@@ -199,6 +199,21 @@ test('should insert and search embedding', async t => {
);
t.snapshot(afterAddIgnoreDocs.length, 'should not find docs to embed');
}
{
const docId = `foo$bar`;
await t.context.doc.upsert({
spaceId: workspace.id,
docId: docId,
blob: Uint8Array.from([1, 2, 3]),
timestamp: Date.now(),
editorId: user.id,
});
const results = await t.context.copilotWorkspace.findDocsToEmbed(
workspace.id
);
t.false(results.includes(docId), 'docs containing `$` should be excluded');
}
});
test('should check need to be embedded', async t => {

View File

@@ -50,7 +50,7 @@ defineModuleConfig('job', {
'queues.copilot': {
desc: 'The config for copilot job queue',
default: {
concurrency: 5,
concurrency: 10,
},
schema,
},

View File

@@ -42,23 +42,26 @@ export class CopilotWorkspaceConfigModel extends BaseModel {
*/
@Transactional()
async findDocsToEmbed(workspaceId: string): Promise<string[]> {
const ignoredDocIds = (await this.listIgnoredDocIds(workspaceId)).map(
d => d.docId
);
const docIds = await this.db.snapshot
.findMany({
where: {
workspaceId,
embedding: {
none: {},
},
AND: [
{ id: { notIn: ignoredDocIds } },
{ id: { not: workspaceId } },
{ id: { not: { contains: '$' } } },
],
embedding: { none: {} },
},
select: { id: true },
})
.then(r => r.map(doc => doc.id));
const skipDocIds = await this.listIgnoredDocIds(workspaceId).then(
r => new Set(r.map(r => r.docId))
);
return docIds.filter(id => !skipDocIds.has(id));
return docIds;
}
@Transactional()

View File

@@ -4,6 +4,7 @@ import {
AFFiNELogger,
BlobNotFound,
Config,
CopilotContextFileNotSupported,
DocNotFound,
EventBus,
JobQueue,
@@ -300,6 +301,19 @@ export class CopilotContextDocJob {
return controller.signal;
}
private async fulfillEmptyEmbedding(workspaceId: string, docId: string) {
const emptyEmbedding = {
index: 0,
content: '',
embedding: Array.from({ length: EMBEDDING_DIMENSIONS }, () => 0),
};
await this.models.copilotContext.insertWorkspaceEmbedding(
workspaceId,
docId,
[emptyEmbedding]
);
}
@OnJob('copilot.embedding.docs')
async embedPendingDocs({
contextId,
@@ -321,7 +335,7 @@ export class CopilotContextDocJob {
const fragment = await this.getDocFragment(workspaceId, docId);
if (fragment) {
// fast fall for empty doc, journal is easily to create a empty doc
if (fragment.summary) {
if (fragment.summary.trim()) {
const embeddings = await this.embeddingClient.getFileEmbeddings(
new File(
[fragment.summary],
@@ -340,16 +354,7 @@ export class CopilotContextDocJob {
}
} else {
// for empty doc, insert empty embedding
const emptyEmbedding = {
index: 0,
content: '',
embedding: Array.from({ length: EMBEDDING_DIMENSIONS }, () => 0),
};
await this.models.copilotContext.insertWorkspaceEmbedding(
workspaceId,
docId,
[emptyEmbedding]
);
await this.fulfillEmptyEmbedding(workspaceId, docId);
}
} else if (contextId) {
throw new DocNotFound({ spaceId: workspaceId, docId });
@@ -362,6 +367,14 @@ export class CopilotContextDocJob {
docId,
});
}
if (
error instanceof CopilotContextFileNotSupported &&
error.message.includes('no content found')
) {
// if the doc is empty, we still need to fulfill the embedding
await this.fulfillEmptyEmbedding(workspaceId, docId);
return;
}
// passthrough error to job queue
throw error;