diff --git a/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md b/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md index da49bbba31..1809ad50c0 100644 --- a/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md +++ b/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md @@ -396,6 +396,15 @@ Generated by [AVA](https://avajs.dev). }, ], }, + { + args: [ + 'copilot.workspace.cleanupTrashedDocEmbeddings', + {}, + { + jobId: 'daily-copilot-cleanup-trashed-doc-embeddings', + }, + ], + }, ] > cleanup empty sessions calls diff --git a/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.snap b/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.snap index 1383cf492c..11157ea0b6 100644 Binary files a/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.snap and b/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.snap differ diff --git a/packages/backend/server/src/__tests__/models/copilot-context.spec.ts b/packages/backend/server/src/__tests__/models/copilot-context.spec.ts index 1c7465f993..61f8ec55dc 100644 --- a/packages/backend/server/src/__tests__/models/copilot-context.spec.ts +++ b/packages/backend/server/src/__tests__/models/copilot-context.spec.ts @@ -164,11 +164,14 @@ test('should insert embedding by doc id', async t => { ); { - const ret = await t.context.copilotContext.hasWorkspaceEmbedding( + const ret = await t.context.copilotContext.listWorkspaceEmbedding( workspace.id, [docId] ); - t.true(ret.has(docId), 'should return doc id when embedding is inserted'); + t.true( + ret.includes(docId), + 'should return doc id when embedding is inserted' + ); } { @@ -317,8 +320,8 @@ test('should merge doc status correctly', async t => { const hasEmbeddingStub = Sinon.stub( t.context.copilotContext, - 'hasWorkspaceEmbedding' - ).resolves(new Set()); + 'listWorkspaceEmbedding' + ).resolves([]); const stubResult = await t.context.copilotContext.mergeDocStatus( workspace.id, diff --git a/packages/backend/server/src/models/copilot-context.ts b/packages/backend/server/src/models/copilot-context.ts index 908e0f7b2e..53dc9cf745 100644 --- a/packages/backend/server/src/models/copilot-context.ts +++ b/packages/backend/server/src/models/copilot-context.ts @@ -84,11 +84,17 @@ export class CopilotContextModel extends BaseModel { } async mergeDocStatus(workspaceId: string, docs: ContextDoc[]) { - const docIds = Array.from(new Set(docs.map(doc => doc.id))); - const finishedDoc = await this.hasWorkspaceEmbedding(workspaceId, docIds); + const canEmbedding = await this.checkEmbeddingAvailable(); + const finishedDoc = canEmbedding + ? await this.listWorkspaceEmbedding( + workspaceId, + Array.from(new Set(docs.map(doc => doc.id))) + ) + : []; + const finishedDocSet = new Set(finishedDoc); for (const doc of docs) { - const status = finishedDoc.has(doc.id) + const status = finishedDocSet.has(doc.id) ? ContextEmbedStatus.finished : undefined; // NOTE: when the document has not been synchronized to the server or is in the embedding queue @@ -120,24 +126,17 @@ export class CopilotContextModel extends BaseModel { return Number(count) === 2; } - async hasWorkspaceEmbedding(workspaceId: string, docIds: string[]) { - const canEmbedding = await this.checkEmbeddingAvailable(); - if (!canEmbedding) { - return new Set(); - } - + async listWorkspaceEmbedding(workspaceId: string, docIds?: string[]) { const existsIds = await this.db.aiWorkspaceEmbedding - .findMany({ + .groupBy({ where: { workspaceId, - docId: { in: docIds }, - }, - select: { - docId: true, + docId: docIds ? { in: docIds } : undefined, }, + by: ['docId'], }) .then(r => r.map(r => r.docId)); - return new Set(existsIds); + return existsIds; } private processEmbeddings( diff --git a/packages/backend/server/src/models/workspace.ts b/packages/backend/server/src/models/workspace.ts index 05bb8545db..5d86b388ee 100644 --- a/packages/backend/server/src/models/workspace.ts +++ b/packages/backend/server/src/models/workspace.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { Transactional } from '@nestjs-cls/transactional'; -import { type Workspace } from '@prisma/client'; +import { Prisma, type Workspace } from '@prisma/client'; import { EventBus } from '../base'; import { BaseModel } from './base'; @@ -93,6 +93,19 @@ export class WorkspaceModel extends BaseModel { }); } + async list( + where: Prisma.WorkspaceWhereInput = {}, + select?: S + ) { + return (await this.db.workspace.findMany({ + where, + select, + orderBy: { + sid: 'asc', + }, + })) as Prisma.WorkspaceGetPayload<{ select: S }>[]; + } + async delete(workspaceId: string) { const rawResult = await this.db.workspace.deleteMany({ where: { diff --git a/packages/backend/server/src/plugins/copilot/cron.ts b/packages/backend/server/src/plugins/copilot/cron.ts index 686fac1660..3b0a4558a0 100644 --- a/packages/backend/server/src/plugins/copilot/cron.ts +++ b/packages/backend/server/src/plugins/copilot/cron.ts @@ -8,6 +8,7 @@ declare global { interface Jobs { 'copilot.session.cleanupEmptySessions': {}; 'copilot.session.generateMissingTitles': {}; + 'copilot.workspace.cleanupTrashedDocEmbeddings': {}; } } @@ -20,6 +21,14 @@ export class CopilotCronJobs { private readonly jobs: JobQueue ) {} + async triggerCleanupTrashedDocEmbeddings() { + await this.jobs.add( + 'copilot.workspace.cleanupTrashedDocEmbeddings', + {}, + { jobId: 'daily-copilot-cleanup-trashed-doc-embeddings' } + ); + } + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) async dailyCleanupJob() { await this.jobs.add( @@ -33,6 +42,12 @@ export class CopilotCronJobs { {}, { jobId: 'daily-copilot-generate-missing-titles' } ); + + await this.jobs.add( + 'copilot.workspace.cleanupTrashedDocEmbeddings', + {}, + { jobId: 'daily-copilot-cleanup-trashed-doc-embeddings' } + ); } async triggerGenerateMissingTitles() { @@ -68,4 +83,18 @@ export class CopilotCronJobs { `Scheduled title generation for ${sessions.length} sessions` ); } + + @OnJob('copilot.workspace.cleanupTrashedDocEmbeddings') + async cleanupTrashedDocEmbeddings() { + const workspaces = await this.models.workspace.list(undefined, { + id: true, + }); + for (const { id: workspaceId } of workspaces) { + await this.jobs.add( + 'copilot.embedding.cleanupTrashedDocEmbeddings', + { workspaceId }, + { jobId: `cleanup-trashed-doc-embeddings-${workspaceId}` } + ); + } + } } diff --git a/packages/backend/server/src/plugins/copilot/embedding/job.ts b/packages/backend/server/src/plugins/copilot/embedding/job.ts index f6c2c5951b..e4e33c86ba 100644 --- a/packages/backend/server/src/plugins/copilot/embedding/job.ts +++ b/packages/backend/server/src/plugins/copilot/embedding/job.ts @@ -12,6 +12,7 @@ import { OnJob, } from '../../../base'; import { DocReader } from '../../../core/doc'; +import { readAllDocIdsFromWorkspaceSnapshot } from '../../../core/utils/blocksuite'; import { Models } from '../../../models'; import { CopilotStorage } from '../storage'; import { readStream } from '../utils'; @@ -134,10 +135,30 @@ export class CopilotEmbeddingJob { if (enableDocEmbedding) { const toBeEmbedDocIds = await this.models.copilotWorkspace.findDocsToEmbed(workspaceId); + if (!toBeEmbedDocIds.length) { + return; + } + // filter out trashed docs + const rootSnapshot = await this.models.doc.getSnapshot( + workspaceId, + workspaceId + ); + if (!rootSnapshot) { + this.logger.warn( + `Root snapshot for workspace ${workspaceId} not found, skipping embedding.` + ); + return; + } + const allDocIds = new Set( + readAllDocIdsFromWorkspaceSnapshot(rootSnapshot.blob) + ); this.logger.log( `Trigger embedding for ${toBeEmbedDocIds.length} docs in workspace ${workspaceId}` ); - for (const docId of toBeEmbedDocIds) { + const finalToBeEmbedDocIds = toBeEmbedDocIds.filter(docId => + allDocIds.has(docId) + ); + for (const docId of finalToBeEmbedDocIds) { await this.queue.add( 'copilot.embedding.docs', { @@ -422,4 +443,39 @@ export class CopilotEmbeddingJob { ); } } + + @OnJob('copilot.embedding.cleanupTrashedDocEmbeddings') + async cleanupTrashedDocEmbeddings({ + workspaceId, + }: Jobs['copilot.embedding.cleanupTrashedDocEmbeddings']) { + const workspace = await this.models.workspace.get(workspaceId); + if (!workspace) { + this.logger.warn(`workspace ${workspaceId} not found`); + return; + } + + const snapshot = await this.models.doc.getSnapshot( + workspaceId, + workspaceId + ); + if (!snapshot) { + this.logger.warn(`workspace snapshot ${workspaceId} not found`); + return; + } + + const docIdsInWorkspace = readAllDocIdsFromWorkspaceSnapshot(snapshot.blob); + const docIdsInEmbedding = + await this.models.copilotContext.listWorkspaceEmbedding(workspaceId); + const docIdsInWorkspaceSet = new Set(docIdsInWorkspace); + + const deletedDocIds = docIdsInEmbedding.filter( + docId => !docIdsInWorkspaceSet.has(docId) + ); + for (const docId of deletedDocIds) { + await this.models.copilotContext.deleteWorkspaceEmbedding( + workspaceId, + docId + ); + } + } } diff --git a/packages/backend/server/src/plugins/copilot/embedding/types.ts b/packages/backend/server/src/plugins/copilot/embedding/types.ts index 751f23f75c..f037fc0682 100644 --- a/packages/backend/server/src/plugins/copilot/embedding/types.ts +++ b/packages/backend/server/src/plugins/copilot/embedding/types.ts @@ -61,6 +61,10 @@ declare global { fileId: string; fileName: string; }; + + 'copilot.embedding.cleanupTrashedDocEmbeddings': { + workspaceId: string; + }; } } diff --git a/packages/backend/server/src/plugins/copilot/index.ts b/packages/backend/server/src/plugins/copilot/index.ts index bee64e4828..02982bbbb8 100644 --- a/packages/backend/server/src/plugins/copilot/index.ts +++ b/packages/backend/server/src/plugins/copilot/index.ts @@ -64,8 +64,8 @@ import { // context CopilotContextResolver, CopilotContextService, + // jobs CopilotEmbeddingJob, - // cron jobs CopilotCronJobs, // transcription CopilotTranscriptionService, diff --git a/packages/backend/server/src/plugins/copilot/resolver.ts b/packages/backend/server/src/plugins/copilot/resolver.ts index aa59d7e882..a05cc7c30f 100644 --- a/packages/backend/server/src/plugins/copilot/resolver.ts +++ b/packages/backend/server/src/plugins/copilot/resolver.ts @@ -852,6 +852,14 @@ export class PromptsManagementResolver { return true; } + @Mutation(() => Boolean, { + description: 'Trigger cleanup of trashed doc embeddings', + }) + async triggerCleanupTrashedDocEmbeddings() { + await this.cron.triggerCleanupTrashedDocEmbeddings(); + return true; + } + @Query(() => [CopilotPromptType], { description: 'List all copilot prompts', }) diff --git a/packages/backend/server/src/schema.gql b/packages/backend/server/src/schema.gql index f5d5223f4d..edf792e578 100644 --- a/packages/backend/server/src/schema.gql +++ b/packages/backend/server/src/schema.gql @@ -1297,6 +1297,9 @@ type Mutation { setBlob(blob: Upload!, workspaceId: String!): String! submitAudioTranscription(blob: Upload, blobId: String!, blobs: [Upload!], workspaceId: String!): TranscriptionResultType + """Trigger cleanup of trashed doc embeddings""" + triggerCleanupTrashedDocEmbeddings: Boolean! + """Trigger generate missing titles cron job""" triggerGenerateTitleCron: Boolean! diff --git a/packages/common/graphql/src/schema.ts b/packages/common/graphql/src/schema.ts index c0acb18c90..0e7d456427 100644 --- a/packages/common/graphql/src/schema.ts +++ b/packages/common/graphql/src/schema.ts @@ -1440,6 +1440,8 @@ export interface Mutation { sendVerifyEmail: Scalars['Boolean']['output']; setBlob: Scalars['String']['output']; submitAudioTranscription: Maybe; + /** Trigger cleanup of trashed doc embeddings */ + triggerCleanupTrashedDocEmbeddings: Scalars['Boolean']['output']; /** Trigger generate missing titles cron job */ triggerGenerateTitleCron: Scalars['Boolean']['output']; /** update app configuration */