feat(server): workspace embedding improve (#12022)

fix AI-10
fix AI-109
fix PD-2484

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

- **New Features**
  - Added a method to check if a document requires embedding, improving embedding efficiency.
  - Enhanced document embeddings with enriched metadata, including title, summary, creation/update dates, and author information.
  - Introduced a new type for document fragments with extended metadata fields.

- **Improvements**
  - Embedding logic now conditionally processes only documents needing updates.
  - Embedding content now includes document metadata for more informative context.
  - Expanded and improved test coverage for embedding scenarios and workspace behaviors.
  - Event emission added for workspace embedding updates on client version mismatch.
  - Job queueing enhanced with prioritization and explicit job IDs for better management.
  - Job queue calls updated to include priority and context identifiers in a structured format.

- **Bug Fixes**
  - Improved handling of ignored documents in embedding matches.
  - Fixed incorrect document ID assignment in embedding job queueing.

- **Tests**
  - Added and updated snapshot and behavioral tests for embedding and workspace document handling.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
darkskygit
2025-05-23 10:16:14 +00:00
parent 262f1a47a4
commit 2a80fbb993
9 changed files with 326 additions and 54 deletions

View File

@@ -4,19 +4,52 @@ The actual snapshot is saved in `copilot-context.spec.ts.snap`.
Generated by [AVA](https://avajs.dev). Generated by [AVA](https://avajs.dev).
## should get null for non-exist job
> should return null for non-exist job
null
## should insert embedding by doc id ## should insert embedding by doc id
> should match file embedding > should match file embedding
[ [
{ {
chunk: 0,
content: 'content',
distance: 0,
fileId: 'file-id', fileId: 'file-id',
}, },
] ]
> should return empty array when embedding is deleted
[]
> should match workspace embedding
[
{
docId: 'doc1',
},
]
> should return empty array when doc is ignored
[]
> should return workspace embedding
[
{
docId: 'doc1',
},
]
> should return empty array when embedding deleted > should return empty array when embedding deleted
[] []
## should check embedding table
> should return true when embedding table is available
true

View File

@@ -6,9 +6,11 @@ import ava, { TestFn } from 'ava';
import { Config } from '../../base'; import { Config } from '../../base';
import { CopilotContextModel } from '../../models/copilot-context'; import { CopilotContextModel } from '../../models/copilot-context';
import { CopilotSessionModel } from '../../models/copilot-session'; import { CopilotSessionModel } from '../../models/copilot-session';
import { CopilotWorkspaceConfigModel } from '../../models/copilot-workspace';
import { UserModel } from '../../models/user'; import { UserModel } from '../../models/user';
import { WorkspaceModel } from '../../models/workspace'; import { WorkspaceModel } from '../../models/workspace';
import { createTestingModule, type TestingModule } from '../utils'; import { createTestingModule, type TestingModule } from '../utils';
import { cleanObject } from '../utils/copilot';
interface Context { interface Context {
config: Config; config: Config;
@@ -18,6 +20,7 @@ interface Context {
workspace: WorkspaceModel; workspace: WorkspaceModel;
copilotSession: CopilotSessionModel; copilotSession: CopilotSessionModel;
copilotContext: CopilotContextModel; copilotContext: CopilotContextModel;
copilotWorkspace: CopilotWorkspaceConfigModel;
} }
const test = ava as TestFn<Context>; const test = ava as TestFn<Context>;
@@ -28,6 +31,7 @@ test.before(async t => {
t.context.workspace = module.get(WorkspaceModel); t.context.workspace = module.get(WorkspaceModel);
t.context.copilotSession = module.get(CopilotSessionModel); t.context.copilotSession = module.get(CopilotSessionModel);
t.context.copilotContext = module.get(CopilotContextModel); t.context.copilotContext = module.get(CopilotContextModel);
t.context.copilotWorkspace = module.get(CopilotWorkspaceConfigModel);
t.context.db = module.get(PrismaClient); t.context.db = module.get(PrismaClient);
t.context.config = module.get(Config); t.context.config = module.get(Config);
t.context.module = module; t.context.module = module;
@@ -74,7 +78,7 @@ test('should create a copilot context', async t => {
test('should get null for non-exist job', async t => { test('should get null for non-exist job', async t => {
const job = await t.context.copilotContext.get('non-exist'); const job = await t.context.copilotContext.get('non-exist');
t.is(job, null); t.snapshot(job, 'should return null for non-exist job');
}); });
test('should update context', async t => { test('should update context', async t => {
@@ -111,7 +115,10 @@ test('should insert embedding by doc id', async t => {
1, 1,
1 1
); );
t.snapshot(ret, 'should match file embedding'); t.snapshot(
cleanObject(ret, ['chunk', 'content', 'distance']),
'should match file embedding'
);
} }
{ {
@@ -122,7 +129,7 @@ test('should insert embedding by doc id', async t => {
1, 1,
1 1
); );
t.is(ret.length, 0); t.snapshot(ret, 'should return empty array when embedding is deleted');
} }
} }
@@ -155,7 +162,7 @@ test('should insert embedding by doc id', async t => {
workspace.id, workspace.id,
[docId] [docId]
); );
t.true(ret.has(docId), 'should return true when embedding exists'); t.true(ret.has(docId), 'should return doc id when embedding is inserted');
} }
{ {
@@ -165,8 +172,39 @@ test('should insert embedding by doc id', async t => {
1, 1,
1 1
); );
t.is(ret.length, 1); t.snapshot(
t.is(ret[0].content, 'content'); cleanObject(ret, ['chunk', 'content', 'distance']),
'should match workspace embedding'
);
}
{
await t.context.copilotWorkspace.updateIgnoredDocs(workspace.id, [docId]);
const ret = await t.context.copilotContext.matchWorkspaceEmbedding(
Array.from({ length: 1024 }, () => 0.9),
workspace.id,
1,
1
);
t.snapshot(ret, 'should return empty array when doc is ignored');
}
{
await t.context.copilotWorkspace.updateIgnoredDocs(
workspace.id,
undefined,
[docId]
);
const ret = await t.context.copilotContext.matchWorkspaceEmbedding(
Array.from({ length: 1024 }, () => 0.9),
workspace.id,
1,
1
);
t.snapshot(
cleanObject(ret, ['chunk', 'content', 'distance']),
'should return workspace embedding'
);
} }
{ {
@@ -188,7 +226,7 @@ test('should insert embedding by doc id', async t => {
test('should check embedding table', async t => { test('should check embedding table', async t => {
{ {
const ret = await t.context.copilotContext.checkEmbeddingAvailable(); const ret = await t.context.copilotContext.checkEmbeddingAvailable();
t.true(ret, 'should return true when embedding table is available'); t.snapshot(ret, 'should return true when embedding table is available');
} }
// { // {

View File

@@ -201,6 +201,68 @@ test('should insert and search embedding', async t => {
} }
}); });
test('should check need to be embedded', async t => {
const docId = randomUUID();
await t.context.doc.upsert({
spaceId: workspace.id,
docId,
blob: Uint8Array.from([1, 2, 3]),
timestamp: Date.now(),
editorId: user.id,
});
{
let needsEmbedding = await t.context.copilotWorkspace.checkDocNeedEmbedded(
workspace.id,
docId
);
t.true(needsEmbedding, 'document with no embedding should need embedding');
}
{
await t.context.copilotContext.insertWorkspaceEmbedding(
workspace.id,
docId,
[
{
index: 0,
content: 'content',
embedding: Array.from({ length: 1024 }, () => 1),
},
]
);
let needsEmbedding = await t.context.copilotWorkspace.checkDocNeedEmbedded(
workspace.id,
docId
);
t.false(
needsEmbedding,
'document with recent embedding should not need embedding'
);
}
{
await t.context.doc.upsert({
spaceId: workspace.id,
docId,
blob: Uint8Array.from([4, 5, 6]),
timestamp: Date.now() + 1000, // Ensure timestamp is later
editorId: user.id,
});
let needsEmbedding = await t.context.copilotWorkspace.checkDocNeedEmbedded(
workspace.id,
docId
);
t.true(
needsEmbedding,
'document updated after embedding should need embedding'
);
}
});
test('should check embedding table', async t => { test('should check embedding table', async t => {
{ {
const ret = await t.context.copilotWorkspace.checkEmbeddingAvailable(); const ret = await t.context.copilotWorkspace.checkEmbeddingAvailable();

View File

@@ -14,6 +14,7 @@ import {
CallMetric, CallMetric,
DocNotFound, DocNotFound,
DocUpdateBlocked, DocUpdateBlocked,
EventBus,
GatewayErrorWrapper, GatewayErrorWrapper,
metrics, metrics,
NotInSpace, NotInSpace,
@@ -144,6 +145,7 @@ export class SpaceSyncGateway
constructor( constructor(
private readonly ac: AccessController, private readonly ac: AccessController,
private readonly event: EventBus,
private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly userspace: PgUserspaceDocStorageAdapter, private readonly userspace: PgUserspaceDocStorageAdapter,
private readonly docReader: DocReader, private readonly docReader: DocReader,
@@ -201,6 +203,7 @@ export class SpaceSyncGateway
await client.join(room); await client.join(room);
} }
} else { } else {
this.event.emit('workspace.embedding', { workspaceId: spaceId });
await this.selectAdapter(client, spaceType).join(user.id, spaceId); await this.selectAdapter(client, spaceType).join(user.id, spaceId);
} }

View File

@@ -175,6 +175,55 @@ export class CopilotWorkspaceConfigModel extends BaseModel {
}; };
} }
@Transactional()
async checkDocNeedEmbedded(workspaceId: string, docId: string) {
// NOTE: check if the document needs re-embedding.
// 1. check if there have been any recent updates to the document snapshot and update
// 2. check if the embedding is older than the snapshot and update
// 3. check if the embedding is older than 10 minutes (avoid frequent updates)
// if all conditions are met, re-embedding is required.
const result = await this.db.$queryRaw<{ needs_embedding: boolean }[]>`
SELECT
EXISTS (
WITH docs AS (
SELECT
s.workspace_id,
s.guid AS doc_id,
s.updated_at
FROM
snapshots s
WHERE
s.workspace_id = ${workspaceId}
AND s.guid = ${docId}
UNION
ALL
SELECT
u.workspace_id,
u.guid AS doc_id,
u.created_at AS updated_at
FROM
"updates" u
WHERE
u.workspace_id = ${workspaceId}
AND u.guid = ${docId}
)
SELECT
1
FROM
docs
LEFT JOIN ai_workspace_embeddings e
ON e.workspace_id = docs.workspace_id
AND e.doc_id = docs.doc_id
WHERE
e.updated_at IS NULL
OR docs.updated_at > e.updated_at
OR e.updated_at < NOW() - INTERVAL '10 minutes'
) AS needs_embedding;
`;
return result[0]?.needs_embedding ?? false;
}
// ================ embeddings ================ // ================ embeddings ================
async checkEmbeddingAvailable(): Promise<boolean> { async checkEmbeddingAvailable(): Promise<boolean> {

View File

@@ -16,6 +16,7 @@ import { Models } from '../../../models';
import { CopilotStorage } from '../storage'; import { CopilotStorage } from '../storage';
import { readStream } from '../utils'; import { readStream } from '../utils';
import { OpenAIEmbeddingClient } from './embedding'; import { OpenAIEmbeddingClient } from './embedding';
import type { Chunk, DocFragment } from './types';
import { EMBEDDING_DIMENSIONS, EmbeddingClient } from './types'; import { EMBEDDING_DIMENSIONS, EmbeddingClient } from './types';
@Injectable() @Injectable()
@@ -78,16 +79,23 @@ export class CopilotContextDocJob {
@OnEvent('workspace.doc.embedding') @OnEvent('workspace.doc.embedding')
async addDocEmbeddingQueue( async addDocEmbeddingQueue(
docs: Events['workspace.doc.embedding'], docs: Events['workspace.doc.embedding'],
contextId?: string options?: { contextId: string; priority: number }
) { ) {
if (!this.supportEmbedding) return; if (!this.supportEmbedding) return;
for (const { workspaceId, docId } of docs) { for (const { workspaceId, docId } of docs) {
await this.queue.add('copilot.embedding.docs', { await this.queue.add(
contextId, 'copilot.embedding.docs',
workspaceId, {
docId, contextId: options?.contextId,
}); workspaceId,
docId,
},
{
jobId: `workspace:embedding:${workspaceId}:${docId}`,
priority: options?.priority ?? 1,
}
);
} }
} }
@@ -110,14 +118,26 @@ export class CopilotContextDocJob {
}: Events['workspace.embedding']) { }: Events['workspace.embedding']) {
if (!this.supportEmbedding || !this.embeddingClient) return; if (!this.supportEmbedding || !this.embeddingClient) return;
if (enableDocEmbedding === undefined) {
enableDocEmbedding =
await this.models.workspace.allowEmbedding(workspaceId);
}
if (enableDocEmbedding) { if (enableDocEmbedding) {
const toBeEmbedDocIds = const toBeEmbedDocIds =
await this.models.copilotWorkspace.findDocsToEmbed(workspaceId); await this.models.copilotWorkspace.findDocsToEmbed(workspaceId);
for (const docId of toBeEmbedDocIds) { for (const docId of toBeEmbedDocIds) {
await this.queue.add('copilot.embedding.docs', { await this.queue.add(
workspaceId, 'copilot.embedding.docs',
docId, {
}); workspaceId,
docId,
},
{
jobId: `workspace:embedding:${workspaceId}:${docId}`,
priority: 1,
}
);
} }
} else { } else {
const controller = this.workspaceJobAbortController.get(workspaceId); const controller = this.workspaceJobAbortController.get(workspaceId);
@@ -132,14 +152,25 @@ export class CopilotContextDocJob {
async addDocEmbeddingQueueFromEvent(doc: Events['doc.indexer.updated']) { async addDocEmbeddingQueueFromEvent(doc: Events['doc.indexer.updated']) {
if (!this.supportEmbedding || !this.embeddingClient) return; if (!this.supportEmbedding || !this.embeddingClient) return;
await this.queue.add('copilot.embedding.docs', { await this.queue.add(
workspaceId: doc.workspaceId, 'copilot.embedding.docs',
docId: doc.workspaceId, {
}); workspaceId: doc.workspaceId,
docId: doc.docId,
},
{
jobId: `workspace:embedding:${doc.workspaceId}:${doc.docId}`,
priority: 2,
}
);
} }
@OnEvent('doc.indexer.deleted') @OnEvent('doc.indexer.deleted')
async deleteDocEmbeddingQueueFromEvent(doc: Events['doc.indexer.deleted']) { async deleteDocEmbeddingQueueFromEvent(doc: Events['doc.indexer.deleted']) {
await this.queue.remove(
`workspace:embedding:${doc.workspaceId}:${doc.docId}`,
'copilot.embedding.docs'
);
await this.models.copilotContext.deleteWorkspaceEmbedding( await this.models.copilotContext.deleteWorkspaceEmbedding(
doc.workspaceId, doc.workspaceId,
doc.docId doc.docId
@@ -221,6 +252,43 @@ export class CopilotContextDocJob {
} }
} }
private async getDocFragment(
workspaceId: string,
docId: string
): Promise<DocFragment | null> {
const docContent = await this.doc.getFullDocContent(workspaceId, docId);
const authors = await this.models.doc.getAuthors(workspaceId, docId);
if (docContent?.summary && authors) {
const { title = 'Untitled', summary } = docContent;
const { createdAt, updatedAt, createdByUser, updatedByUser } = authors;
return {
title,
summary,
createdAt: createdAt.toDateString(),
updatedAt: updatedAt.toDateString(),
createdBy: createdByUser?.name,
updatedBy: updatedByUser?.name,
};
}
return null;
}
private formatDocChunks(chunks: Chunk[], fragment: DocFragment): Chunk[] {
return chunks.map(chunk => ({
index: chunk.index,
content: [
`Title: ${fragment.title}`,
`Created at: ${fragment.createdAt}`,
`Updated at: ${fragment.updatedAt}`,
fragment.createdBy ? `Created by: ${fragment.createdBy}` : undefined,
fragment.updatedBy ? `Updated by: ${fragment.updatedBy}` : undefined,
chunk.content,
]
.filter(Boolean)
.join('\n'),
}));
}
private getWorkspaceSignal(workspaceId: string) { private getWorkspaceSignal(workspaceId: string) {
let controller = this.workspaceJobAbortController.get(workspaceId); let controller = this.workspaceJobAbortController.get(workspaceId);
if (!controller) { if (!controller) {
@@ -241,39 +309,49 @@ export class CopilotContextDocJob {
const signal = this.getWorkspaceSignal(workspaceId); const signal = this.getWorkspaceSignal(workspaceId);
try { try {
const content = await this.doc.getFullDocContent(workspaceId, docId); const needEmbedding =
if (signal.aborted) { await this.models.copilotWorkspace.checkDocNeedEmbedded(
return; workspaceId,
} else if (content) { docId
// fast fall for empty doc, journal is easily to create a empty doc );
if (content.summary) { if (needEmbedding) {
const embeddings = await this.embeddingClient.getFileEmbeddings( if (signal.aborted) return;
new File([content.summary], `${content.title || 'Untitled'}.md`), const fragment = await this.getDocFragment(workspaceId, docId);
signal if (fragment) {
); // fast fall for empty doc, journal is easily to create a empty doc
if (fragment.summary) {
const embeddings = await this.embeddingClient.getFileEmbeddings(
new File(
[fragment.summary],
`${fragment.title || 'Untitled'}.md`
),
chunks => this.formatDocChunks(chunks, fragment),
signal
);
for (const chunks of embeddings) { for (const chunks of embeddings) {
await this.models.copilotContext.insertWorkspaceEmbedding(
workspaceId,
docId,
chunks
);
}
} else {
// for empty doc, insert empty embedding
const emptyEmbedding = {
index: 0,
content: '',
embedding: Array.from({ length: EMBEDDING_DIMENSIONS }, () => 0),
};
await this.models.copilotContext.insertWorkspaceEmbedding( await this.models.copilotContext.insertWorkspaceEmbedding(
workspaceId, workspaceId,
docId, docId,
chunks [emptyEmbedding]
); );
} }
} else { } else if (contextId) {
// for empty doc, insert empty embedding throw new DocNotFound({ spaceId: workspaceId, docId });
const emptyEmbedding = {
index: 0,
content: '',
embedding: Array.from({ length: EMBEDDING_DIMENSIONS }, () => 0),
};
await this.models.copilotContext.insertWorkspaceEmbedding(
workspaceId,
docId,
[emptyEmbedding]
);
} }
} else if (contextId) {
throw new DocNotFound({ spaceId: workspaceId, docId });
} }
} catch (error: any) { } catch (error: any) {
if (contextId) { if (contextId) {

View File

@@ -498,7 +498,7 @@ export class CopilotContextResolver {
workspaceId: session.workspaceId, workspaceId: session.workspaceId,
docId, docId,
})), })),
session.id { contextId: session.id, priority: 0 }
); );
} }
@@ -559,7 +559,7 @@ export class CopilotContextResolver {
await this.jobs.addDocEmbeddingQueue( await this.jobs.addDocEmbeddingQueue(
[{ workspaceId: session.workspaceId, docId: options.docId }], [{ workspaceId: session.workspaceId, docId: options.docId }],
session.id { contextId: session.id, priority: 0 }
); );
return { ...record, status: record.status || null }; return { ...record, status: record.status || null };

View File

@@ -3,6 +3,7 @@ import { File } from 'node:buffer';
import { z } from 'zod'; import { z } from 'zod';
import { CopilotContextFileNotSupported } from '../../../base'; import { CopilotContextFileNotSupported } from '../../../base';
import type { PageDocContent } from '../../../core/utils/blocksuite';
import { ChunkSimilarity, Embedding } from '../../../models'; import { ChunkSimilarity, Embedding } from '../../../models';
import { parseDoc } from '../../../native'; import { parseDoc } from '../../../native';
@@ -10,7 +11,7 @@ declare global {
interface Events { interface Events {
'workspace.embedding': { 'workspace.embedding': {
workspaceId: string; workspaceId: string;
enableDocEmbedding: boolean; enableDocEmbedding?: boolean;
}; };
'workspace.doc.embedding': Array<{ 'workspace.doc.embedding': Array<{
@@ -53,6 +54,13 @@ declare global {
} }
} }
export type DocFragment = PageDocContent & {
createdAt: string;
createdBy?: string;
updatedAt: string;
updatedBy?: string;
};
export type Chunk = { export type Chunk = {
index: number; index: number;
content: string; content: string;
@@ -63,11 +71,12 @@ export const EMBEDDING_DIMENSIONS = 1024;
export abstract class EmbeddingClient { export abstract class EmbeddingClient {
async getFileEmbeddings( async getFileEmbeddings(
file: File, file: File,
chunkMapper: (chunk: Chunk[]) => Chunk[],
signal?: AbortSignal signal?: AbortSignal
): Promise<Embedding[][]> { ): Promise<Embedding[][]> {
const chunks = await this.getFileChunks(file, signal); const chunks = await this.getFileChunks(file, signal);
const chunkedEmbeddings = await Promise.all( const chunkedEmbeddings = await Promise.all(
chunks.map(chunk => this.generateEmbeddings(chunk)) chunks.map(chunk => this.generateEmbeddings(chunkMapper(chunk)))
); );
return chunkedEmbeddings; return chunkedEmbeddings;
} }