feat(server): trigger workspace embedding (#12328)

fix AI-127

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

- **New Features**
  - Added automated event handling for workspace updates and document embedding, streamlining document embedding workflows.
  - Introduced detection and queuing of documents needing embedding, excluding ignored documents.
- **Improvements**
  - Enhanced performance of embedding-related searches by filtering results at the database level.
  - Increased concurrency for embedding job processing to improve throughput.
- **Bug Fixes**
  - Improved error handling and fallback for missing document titles during embedding.
  - Added safeguards to skip invalid embedding jobs based on document identifiers.
- **Tests**
  - Expanded test coverage for document embedding and ignored document filtering.
  - Updated end-to-end tests to use dynamic content for improved reliability.
  - Added synchronization waits in document creation utilities to improve test stability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
darkskygit
2025-05-20 05:16:45 +00:00
parent 3c982d2b91
commit 6f9361caee
13 changed files with 213 additions and 35 deletions

View File

@@ -73,3 +73,19 @@ Generated by [AVA](https://avajs.dev).
name: 'file1',
},
]
> should find docs to embed
1
> should not find docs to embed
0
> should find docs to embed
1
> should not find docs to embed
0

View File

@@ -1,8 +1,12 @@
import { randomUUID } from 'node:crypto';
import { PrismaClient, User, Workspace } from '@prisma/client';
import ava, { TestFn } from 'ava';
import { Config } from '../../base';
import { CopilotContextModel } from '../../models/copilot-context';
import { CopilotWorkspaceConfigModel } from '../../models/copilot-workspace';
import { DocModel } from '../../models/doc';
import { UserModel } from '../../models/user';
import { WorkspaceModel } from '../../models/workspace';
import { createTestingModule, type TestingModule } from '../utils';
@@ -12,8 +16,10 @@ interface Context {
config: Config;
module: TestingModule;
db: PrismaClient;
doc: DocModel;
user: UserModel;
workspace: WorkspaceModel;
copilotContext: CopilotContextModel;
copilotWorkspace: CopilotWorkspaceConfigModel;
}
@@ -23,8 +29,10 @@ test.before(async t => {
const module = await createTestingModule();
t.context.user = module.get(UserModel);
t.context.workspace = module.get(WorkspaceModel);
t.context.copilotContext = module.get(CopilotContextModel);
t.context.copilotWorkspace = module.get(CopilotWorkspaceConfigModel);
t.context.db = module.get(PrismaClient);
t.context.doc = module.get(DocModel);
t.context.config = module.get(Config);
t.context.module = module;
});
@@ -136,6 +144,61 @@ test('should insert and search embedding', async t => {
);
}
}
{
const docId = randomUUID();
await t.context.doc.upsert({
spaceId: workspace.id,
docId,
blob: Uint8Array.from([1, 2, 3]),
timestamp: Date.now(),
editorId: user.id,
});
const toBeEmbedDocIds = await t.context.copilotWorkspace.findDocsToEmbed(
workspace.id
);
t.snapshot(toBeEmbedDocIds.length, 'should find docs to embed');
await t.context.copilotContext.insertWorkspaceEmbedding(
workspace.id,
docId,
[
{
index: 0,
content: 'content',
embedding: Array.from({ length: 1024 }, () => 1),
},
]
);
const afterInsertEmbedding =
await t.context.copilotWorkspace.findDocsToEmbed(workspace.id);
t.snapshot(afterInsertEmbedding.length, 'should not find docs to embed');
}
{
const docId = randomUUID();
await t.context.doc.upsert({
spaceId: workspace.id,
docId,
blob: Uint8Array.from([1, 2, 3]),
timestamp: Date.now(),
editorId: user.id,
});
const toBeEmbedDocIds = await t.context.copilotWorkspace.findDocsToEmbed(
workspace.id
);
t.snapshot(toBeEmbedDocIds.length, 'should find docs to embed');
await t.context.copilotWorkspace.updateIgnoredDocs(workspace.id, [docId]);
const afterAddIgnoreDocs = await t.context.copilotWorkspace.findDocsToEmbed(
workspace.id
);
t.snapshot(afterAddIgnoreDocs.length, 'should not find docs to embed');
}
});
test('should check embedding table', async t => {

View File

@@ -48,7 +48,7 @@ defineModuleConfig('job', {
'queues.copilot': {
desc: 'The config for copilot job queue',
default: {
concurrency: 1,
concurrency: 5,
},
schema,
},

View File

@@ -119,6 +119,11 @@ export class CopilotContextModel extends BaseModel {
}
async hasWorkspaceEmbedding(workspaceId: string, docIds: string[]) {
const canEmbedding = await this.checkEmbeddingAvailable();
if (!canEmbedding) {
return new Set();
}
const existsIds = await this.db.aiWorkspaceEmbedding
.findMany({
where: {
@@ -238,10 +243,11 @@ export class CopilotContextModel extends BaseModel {
WHERE
w."workspace_id" = ${workspaceId}
AND i."doc_id" IS NULL
AND (w."embedding" <=> ${embedding}::vector) <= ${threshold}
ORDER BY "distance" ASC
LIMIT ${topK};
`;
return similarityChunks.filter(c => Number(c.distance) <= threshold);
return similarityChunks;
}
}

View File

@@ -34,6 +34,33 @@ export class CopilotWorkspaceConfigModel extends BaseModel {
});
}
/**
* find docs to embed, excluding ignored and already embedded docs
* newer docs will be list first
* @param workspaceId id of the workspace
* @returns docIds
*/
@Transactional()
async findDocsToEmbed(workspaceId: string): Promise<string[]> {
const docIds = await this.db.snapshot
.findMany({
where: {
workspaceId,
embedding: {
is: null,
},
},
select: { id: true },
})
.then(r => r.map(doc => doc.id));
const skipDocIds = await this.listIgnoredDocIds(workspaceId).then(
r => new Set(r.map(r => r.docId))
);
return docIds.filter(id => !skipDocIds.has(id));
}
@Transactional()
async updateIgnoredDocs(
workspaceId: string,

View File

@@ -7,6 +7,7 @@ import { BaseModel } from './base';
declare global {
interface Events {
'workspace.updated': Workspace;
'workspace.deleted': {
id: string;
};
@@ -58,6 +59,9 @@ export class WorkspaceModel extends BaseModel {
this.logger.debug(
`Updated workspace ${workspaceId} with data ${JSON.stringify(data)}`
);
this.event.emit('workspace.updated', workspace);
return workspace;
}

View File

@@ -4,6 +4,7 @@ import {
AFFiNELogger,
BlobNotFound,
Config,
DocNotFound,
EventBus,
JobQueue,
mapAnyError,
@@ -87,11 +88,37 @@ export class CopilotContextDocJob {
}
}
// @OnEvent('doc.indexer.updated')
async addDocEmbeddingQueueFromEvent(
// TODO(@darkskygit): replace this with real event type
doc: { workspaceId: string; docId: string } //Events['doc.indexer.updated'],
) {
@OnEvent('workspace.updated')
async onWorkspaceConfigUpdate({
id,
enableDocEmbedding,
}: Events['workspace.updated']) {
if (enableDocEmbedding) {
// trigger workspace embedding
this.event.emit('workspace.embedding', {
workspaceId: id,
});
}
}
@OnEvent('workspace.embedding')
async addWorkspaceEmbeddingQueue({
workspaceId,
}: Events['workspace.embedding']) {
if (!this.supportEmbedding) return;
const toBeEmbedDocIds =
await this.models.copilotWorkspace.findDocsToEmbed(workspaceId);
for (const docId of toBeEmbedDocIds) {
await this.queue.add('copilot.embedding.docs', {
workspaceId,
docId,
});
}
}
@OnEvent('doc.indexer.updated')
async addDocEmbeddingQueueFromEvent(doc: Events['doc.indexer.updated']) {
if (!this.supportEmbedding) return;
await this.queue.add('copilot.embedding.docs', {
@@ -100,11 +127,10 @@ export class CopilotContextDocJob {
});
}
// @OnEvent('doc.indexer.deleted')
async deleteDocEmbeddingQueueFromEvent(
// TODO(@darkskygit): replace this with real event type
doc: { workspaceId: string; docId: string } //Events['doc.indexer.deleted'],
) {
@OnEvent('doc.indexer.deleted')
async deleteDocEmbeddingQueueFromEvent(doc: Events['doc.indexer.deleted']) {
if (!this.supportEmbedding) return;
await this.models.copilotContext.deleteWorkspaceEmbedding(
doc.workspaceId,
doc.docId
@@ -193,13 +219,14 @@ export class CopilotContextDocJob {
docId,
}: Jobs['copilot.embedding.docs']) {
if (!this.supportEmbedding) return;
if (workspaceId === docId || docId.includes('$')) return;
try {
const content = await this.doc.getFullDocContent(workspaceId, docId);
if (content) {
// no need to check if embeddings is empty, will throw internally
const embeddings = await this.embeddingClient.getFileEmbeddings(
new File([content.summary], `${content.title}.md`)
new File([content.summary], `${content.title || 'Untitled'}.md`)
);
for (const chunks of embeddings) {
@@ -209,6 +236,8 @@ export class CopilotContextDocJob {
chunks
);
}
} else if (contextId) {
throw new DocNotFound({ spaceId: workspaceId, docId });
}
} catch (error: any) {
if (contextId) {

View File

@@ -8,6 +8,10 @@ import { parseDoc } from '../../../native';
declare global {
interface Events {
'workspace.embedding': {
workspaceId: string;
};
'workspace.doc.embedding': Array<{
workspaceId: string;
docId: string;