From cc1d5b497aaba53619ba412a45b8b69aabde1202 Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Tue, 15 Jul 2025 20:00:33 +0800 Subject: [PATCH] feat(server): cleanup trashed doc's embedding (#13201) fix AI-359 ## Summary by CodeRabbit * **New Features** * Added automated cleanup of embeddings for documents deleted or trashed from workspaces. * Introduced a new job to schedule and perform this cleanup per workspace daily and on demand. * Added new GraphQL mutation to manually trigger the cleanup process. * Added the ability to list workspaces with flexible filtering and selection options. * **Improvements** * Enhanced document status handling to more accurately reflect embedding presence. * Refined internal methods for managing and checking document embeddings. --- .../__snapshots__/copilot.spec.ts.md | 9 +++ .../__snapshots__/copilot.spec.ts.snap | Bin 2278 -> 2366 bytes .../__tests__/models/copilot-context.spec.ts | 11 ++-- .../server/src/models/copilot-context.ts | 29 +++++---- .../backend/server/src/models/workspace.ts | 15 ++++- .../server/src/plugins/copilot/cron.ts | 29 +++++++++ .../src/plugins/copilot/embedding/job.ts | 58 +++++++++++++++++- .../src/plugins/copilot/embedding/types.ts | 4 ++ .../server/src/plugins/copilot/index.ts | 2 +- .../server/src/plugins/copilot/resolver.ts | 8 +++ packages/backend/server/src/schema.gql | 3 + packages/common/graphql/src/schema.ts | 2 + 12 files changed, 148 insertions(+), 22 deletions(-) diff --git a/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md b/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md index da49bbba31..1809ad50c0 100644 --- a/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md +++ b/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.md @@ -396,6 +396,15 @@ Generated by [AVA](https://avajs.dev). }, ], }, + { + args: [ + 'copilot.workspace.cleanupTrashedDocEmbeddings', + {}, + { + jobId: 'daily-copilot-cleanup-trashed-doc-embeddings', + }, + ], + }, ] > cleanup empty sessions calls diff --git a/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.snap b/packages/backend/server/src/__tests__/__snapshots__/copilot.spec.ts.snap index 1383cf492ce037d9940efeeffd50f4a3edd0c777..11157ea0b6d7d4fab32eb5c06dc831074f367862 100644 GIT binary patch literal 2366 zcmV-E3BmS3RzVybUkh98Rv00000000B+TWf3_R~7!wow2>KW5=Nt&?vM+n}~pHr+GmdMU<9UK--YE ziIk=#mF~`7?~XH%W$vsWJOxcE6_Bb5#7p@>BVJ1TBS=UfUX@5IRS2jAYAb$3AW&QJ zs*0%iQ3S#>b7yvace9gq>NG*?AM07)Gv~~?=R4m$_s;R7Gi5ihkI4s5iO7juv9MXz zf`(hM%EIDhS6ESCdEDn!Vc9hvSyD*p1imAEUYE5XlAk?Qsqly&p~npjA^xEf88-rW zH-Ps6xQYhX)4(lZ$QOySt99$vfrdd0G(vQHdgBPu$Ku}<>oC6s;4uKt0eCsnGkBmp zE9{6^emOoqo|s%uz*`8|OPZ*;Tyzv?QTBbLCR{hL=7Z3! zfX0Z95|9lJ)_KT1si`o-z!kZOT$GXU5?idm)<9ybn>Mo{MHsutjuY@X-94ktJ<_$! z2Vo_*mv|*yUKZ}c+f}fu!WSWrgca3HOX`W>yGu(^3j2=qeGX11^eLc zgZuv$+~!0JxCi$FxD~)101g8vFCo(_0{K9K=Y<^P-)VyUIR)|&q5(pd4?E*aGXbYf zfU~p^e~Ey{i=qDv0naKNze%6#cIfY5z)cLep8<2a_+Xp(Nd}x^z*7u3r;C@`#Q)0x z(*WLK05=<&`0&LimM|_wVtt`ED=SK@)4kbfO{@zrWN^+CGf%IUD@M6oPAt5*az~a* zJ<|k+&DCQFZFEi7(UCs8^x37)E`4^n64^y|RYm`c0FN{)3#5wloPTE8XKw|H(o^GM*sdDFR+Btn!T^28=RbN@o^5#$+5~z`YFk zcp+yQ8E2ipy203}MeYKfv zXPe3PvP!lRnb#0-9Ras3?Rd+@8Fqqz&v&X0+B57|1pJnOS9Q_$4BNnfcw2l+L58g) zZOZ+$>3EJBZ@nKb*nanf^oe|N@q#&Fid#ah3V>6bE>Gbi^;nSLqL zFJ=0rOuv-5Y%%-Bmon-gXH$FELD=vu$BSyfOEy{ znF98oEmq8kS7n0OjZ5|R^$d6qTR!W9g@}!H>}Y9xB+c<2NTY1x5aaUv)1Ws`zVqyWF? z$F2B|yW?VNiAm+S4oWHv@zrsY)nO;9-I+M^e(t&=h1bxiCORk<92v>vD_f@x4kRvT z->x|vw{AJo+RyEp;|uE!!9(Bit2e9JoNPO9PiAAjCj1uHEgr`8u__PlIde(;X5%aO zdu%!ub^NN8L{~YxTRH0oQAdYWi<`yWQ1Hs4RpT-pR(^lu^7h*p)w%%B=cgZ7ake*i zJc)rT%*j2)PPWw_%RT*9YlCZCJjC0&v&CK()nLc&i)Z)f9&37`?sCVsKHA`Z)bK1j z@al~OmF3V8Gw$N>efc9E-DBO=Cf1Nb=68F-u5sV7rBx3d-*)P*kW<>D45w81VeaLr zFn+M%c|2USGBB(8n^N=FkU9BfL^>0MY9`ghI)R^Y={uKmsU6fEH;61d418-gD9cVS z;2y|20uHZ6LGayXLGTQKXNwDhzXJG&F4lHx#D)kM)fd9{x9{mfcth(#$isLc%*XER zGWN;#u@af535XZ)7YKNTx|S|v{+p5sNLJfhwH0?r7v9f+gA90t0Vg%#kq)!Ie0koa z;ytCPa>{lE_Z#(TuO2O`^;D+Ei7XFL-1HC!sFT9Yy231#_d8NLel?-*RO8>88i$SZ zYn;q9&IjQ!S?9LM9`aDgWldD}1@^R87L|(QS5wW~j9l{prMXnF&t!83@Cp4ys{1?j zwmuK`GXr>1SJM6E`r3Awn!u?3?tiqfgdLa-%FP0HQ-wS3Vm5b^YQ`o-9M?&i2ewep z=H+^_Xij=*I42_&&q75LogXMlV>np}>`9SFQBm54it?Yh z*gYe&0XW6`#WcA;jSk7pz6s!T*ZoLpSF4Qo@hIc&jcPiqBtBW)B3oTOV%*kgdAs|9 zmbWn?zva9l&8+%YaP`xSMs` z?3c(m$^cstc^Ag-PVjB}ueY6#j}szJCjBJKqp*_O&$8?vg4kae@Jiu}9=5>%ES)7w kg#@=oJwuaIO6o7$dX(Rurrn%g6Q%j~KZ?GO%TPA}0Otgq%m4rY literal 2278 zcmVtKMt~-0S4-kkpsZ>C!DiEb85^V%UYWtx`ASw!~gsD^^C;~()K13i8 zt%{&RqT)j(D!jY1dv|-6+nk-G2|9l`@7=t8Z{M5W`^}r#_t-+w_RM4Q{!=2b0$Z%? zkQJ|HmyDt?c+nO{;292gd07}{g$ITdQd*vCNtah;#S7%Ur%ELr@FVn;h9QK1R3Pmd z03QbMF#ua=a5D{D@B6$?lw6IEkAp@)3^Yn~bA00{(dWYd6zb5w2jEEnF9CQx(KC2! zaZ#87vGj6cVj?oRnSgf@u%9xbdaLLrkOmsZ7Gjr&R|A`A$n@MmxIyxLv?6TVGnPHy zE`i2~juMaz4pzC(9hp&~`<^XQ5m}dka3WhQ$JRh(tD82xCWRlm$c_{6m8^SOn|qYi zw(R+()L!J3uz69~EALg4T^6qJc_56SB8&yg<+kAnDS27+XZO;b-M!h_Em;c-!WV9k z%&xI)1r>w4UQiLf5xbrc!TZ{YV8Qc`oq-09E1<9^n}DLv0+<7E1Atrez@kMExibaJ z0}3poM9+kmEK>O@fPVsbHv!jVqt%8R{U8BH2>2oa-^{^If%G2}@G}DbLcp6@@xhHl z&0y<>TAr;0YGXt<_GU#v@N8@}JQ2Ow80J!1GT?3yVf{V^JixkRz0f*3&~VFq@+u~EHW z5t6p=d=k>{8ie#;q=dAyK}bpI9&Y+i<`R8XRT&M+iTOF3Fx);?;G;YUufp+GBa1t?jDFFSJ|F(^iZ=skivupyfeFc zY~*xlr%JgBUkVe`7a~bJX%xkR=L?mM$>#Y7O$j%!55PwOd=kJx0C(pln;vD5l_uM_ z8p*cWNVeBivK2_bf`F?CxM^+2Tg=a}69jyZP3tqcga#nvwuE+N9`9Lvjt+wV*9pYtzXLYOPPzAll!Gi zzm(~hGW}AfU&>sxn0@<88Fi4ey}j$euepZhgg1y4Vb%iA@6dfzYr-g-1h6{~n=uUc zr%f$oRVSUuX-0|TK1TEyasd$3t8uvX80zc=054_@bPVzs(O;VkXvai7dd$nhBk8pg zYMxsC(>wIuToyh|s^aQO5LD&du3chaw`I@i;^~rsFSwJi$boalB42K>$m1!CuwMZ9Wo{!8djr6m z8H-wr#hzcj4mn;)*q=(XV5AY$CZy?g}r;@DbJ;^}$@khB{{3Fd6>3S0V3%DaW`a;Ya7gRo`=} zLEPCCdus5|O2RPnX|d;)Y>{I2VBFDkM<~-cH#TaK-X-o^UQHUN=a#I%@?6=Vpc65` z4?o+C@31>Aq?V9Wj%Pthxl??3*kpCc3M!w8ocRQ|Z4tw3Xj4XXP^?%okkMB*o*5j7 zTu#1SaVTuvvZQfulj{ce!}?g22Zx`&Abzun z_4_?GABtLT*@$LWIlE6e>v};)hgFN4!EImgQr)O<84oMJAHBH!);49j0MF#7>ltCT zH+DRcflJKExqK(v>W@}W|I^&y8Ws=Xwr(}w%e)%w*j-_CkIotMD^;6Yu5nL|yFtw{ zOwXy-B2*TAODx#+k-O4IJUVCG)+SbyLZ){+!mM!DGNn=VE!VWFwvf}AM;VE!@YCE$ zQ(@v@&2hM2Hxe+b_)V+$)nrP336U;%zKWy@tmU~0mwup?OHHq8*> z0e4?g5WIFyT8jO&rIfetn-y!{#Kue7Z<}f`o%`XEmX5vEK2{+8Lj=qa zut2~))V0tc{R;%#OTaS(tY(CVI@Cd}>*ID6@M*1D2GQTaNfM*0_bG8fTa)!g97_?6B&Q=;76KTRQ}MlmU-tl?>+= zX#-m4Os#eU4-z)nrs8Qt@k~}y z+3e~O?WRtId;3`t?ii8V^yq~}uNbZ-G8%cfX=Eg-%+*+dw4Va_Ie=ef++&%%WG@3a zoe>)DOhvDzR5YfNES;9UNy*PX+v(&k+mbau8SIZ; z#u)1W4(& { ); { - const ret = await t.context.copilotContext.hasWorkspaceEmbedding( + const ret = await t.context.copilotContext.listWorkspaceEmbedding( workspace.id, [docId] ); - t.true(ret.has(docId), 'should return doc id when embedding is inserted'); + t.true( + ret.includes(docId), + 'should return doc id when embedding is inserted' + ); } { @@ -317,8 +320,8 @@ test('should merge doc status correctly', async t => { const hasEmbeddingStub = Sinon.stub( t.context.copilotContext, - 'hasWorkspaceEmbedding' - ).resolves(new Set()); + 'listWorkspaceEmbedding' + ).resolves([]); const stubResult = await t.context.copilotContext.mergeDocStatus( workspace.id, diff --git a/packages/backend/server/src/models/copilot-context.ts b/packages/backend/server/src/models/copilot-context.ts index 908e0f7b2e..53dc9cf745 100644 --- a/packages/backend/server/src/models/copilot-context.ts +++ b/packages/backend/server/src/models/copilot-context.ts @@ -84,11 +84,17 @@ export class CopilotContextModel extends BaseModel { } async mergeDocStatus(workspaceId: string, docs: ContextDoc[]) { - const docIds = Array.from(new Set(docs.map(doc => doc.id))); - const finishedDoc = await this.hasWorkspaceEmbedding(workspaceId, docIds); + const canEmbedding = await this.checkEmbeddingAvailable(); + const finishedDoc = canEmbedding + ? await this.listWorkspaceEmbedding( + workspaceId, + Array.from(new Set(docs.map(doc => doc.id))) + ) + : []; + const finishedDocSet = new Set(finishedDoc); for (const doc of docs) { - const status = finishedDoc.has(doc.id) + const status = finishedDocSet.has(doc.id) ? ContextEmbedStatus.finished : undefined; // NOTE: when the document has not been synchronized to the server or is in the embedding queue @@ -120,24 +126,17 @@ export class CopilotContextModel extends BaseModel { return Number(count) === 2; } - async hasWorkspaceEmbedding(workspaceId: string, docIds: string[]) { - const canEmbedding = await this.checkEmbeddingAvailable(); - if (!canEmbedding) { - return new Set(); - } - + async listWorkspaceEmbedding(workspaceId: string, docIds?: string[]) { const existsIds = await this.db.aiWorkspaceEmbedding - .findMany({ + .groupBy({ where: { workspaceId, - docId: { in: docIds }, - }, - select: { - docId: true, + docId: docIds ? { in: docIds } : undefined, }, + by: ['docId'], }) .then(r => r.map(r => r.docId)); - return new Set(existsIds); + return existsIds; } private processEmbeddings( diff --git a/packages/backend/server/src/models/workspace.ts b/packages/backend/server/src/models/workspace.ts index 05bb8545db..5d86b388ee 100644 --- a/packages/backend/server/src/models/workspace.ts +++ b/packages/backend/server/src/models/workspace.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { Transactional } from '@nestjs-cls/transactional'; -import { type Workspace } from '@prisma/client'; +import { Prisma, type Workspace } from '@prisma/client'; import { EventBus } from '../base'; import { BaseModel } from './base'; @@ -93,6 +93,19 @@ export class WorkspaceModel extends BaseModel { }); } + async list( + where: Prisma.WorkspaceWhereInput = {}, + select?: S + ) { + return (await this.db.workspace.findMany({ + where, + select, + orderBy: { + sid: 'asc', + }, + })) as Prisma.WorkspaceGetPayload<{ select: S }>[]; + } + async delete(workspaceId: string) { const rawResult = await this.db.workspace.deleteMany({ where: { diff --git a/packages/backend/server/src/plugins/copilot/cron.ts b/packages/backend/server/src/plugins/copilot/cron.ts index 686fac1660..3b0a4558a0 100644 --- a/packages/backend/server/src/plugins/copilot/cron.ts +++ b/packages/backend/server/src/plugins/copilot/cron.ts @@ -8,6 +8,7 @@ declare global { interface Jobs { 'copilot.session.cleanupEmptySessions': {}; 'copilot.session.generateMissingTitles': {}; + 'copilot.workspace.cleanupTrashedDocEmbeddings': {}; } } @@ -20,6 +21,14 @@ export class CopilotCronJobs { private readonly jobs: JobQueue ) {} + async triggerCleanupTrashedDocEmbeddings() { + await this.jobs.add( + 'copilot.workspace.cleanupTrashedDocEmbeddings', + {}, + { jobId: 'daily-copilot-cleanup-trashed-doc-embeddings' } + ); + } + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) async dailyCleanupJob() { await this.jobs.add( @@ -33,6 +42,12 @@ export class CopilotCronJobs { {}, { jobId: 'daily-copilot-generate-missing-titles' } ); + + await this.jobs.add( + 'copilot.workspace.cleanupTrashedDocEmbeddings', + {}, + { jobId: 'daily-copilot-cleanup-trashed-doc-embeddings' } + ); } async triggerGenerateMissingTitles() { @@ -68,4 +83,18 @@ export class CopilotCronJobs { `Scheduled title generation for ${sessions.length} sessions` ); } + + @OnJob('copilot.workspace.cleanupTrashedDocEmbeddings') + async cleanupTrashedDocEmbeddings() { + const workspaces = await this.models.workspace.list(undefined, { + id: true, + }); + for (const { id: workspaceId } of workspaces) { + await this.jobs.add( + 'copilot.embedding.cleanupTrashedDocEmbeddings', + { workspaceId }, + { jobId: `cleanup-trashed-doc-embeddings-${workspaceId}` } + ); + } + } } diff --git a/packages/backend/server/src/plugins/copilot/embedding/job.ts b/packages/backend/server/src/plugins/copilot/embedding/job.ts index f6c2c5951b..e4e33c86ba 100644 --- a/packages/backend/server/src/plugins/copilot/embedding/job.ts +++ b/packages/backend/server/src/plugins/copilot/embedding/job.ts @@ -12,6 +12,7 @@ import { OnJob, } from '../../../base'; import { DocReader } from '../../../core/doc'; +import { readAllDocIdsFromWorkspaceSnapshot } from '../../../core/utils/blocksuite'; import { Models } from '../../../models'; import { CopilotStorage } from '../storage'; import { readStream } from '../utils'; @@ -134,10 +135,30 @@ export class CopilotEmbeddingJob { if (enableDocEmbedding) { const toBeEmbedDocIds = await this.models.copilotWorkspace.findDocsToEmbed(workspaceId); + if (!toBeEmbedDocIds.length) { + return; + } + // filter out trashed docs + const rootSnapshot = await this.models.doc.getSnapshot( + workspaceId, + workspaceId + ); + if (!rootSnapshot) { + this.logger.warn( + `Root snapshot for workspace ${workspaceId} not found, skipping embedding.` + ); + return; + } + const allDocIds = new Set( + readAllDocIdsFromWorkspaceSnapshot(rootSnapshot.blob) + ); this.logger.log( `Trigger embedding for ${toBeEmbedDocIds.length} docs in workspace ${workspaceId}` ); - for (const docId of toBeEmbedDocIds) { + const finalToBeEmbedDocIds = toBeEmbedDocIds.filter(docId => + allDocIds.has(docId) + ); + for (const docId of finalToBeEmbedDocIds) { await this.queue.add( 'copilot.embedding.docs', { @@ -422,4 +443,39 @@ export class CopilotEmbeddingJob { ); } } + + @OnJob('copilot.embedding.cleanupTrashedDocEmbeddings') + async cleanupTrashedDocEmbeddings({ + workspaceId, + }: Jobs['copilot.embedding.cleanupTrashedDocEmbeddings']) { + const workspace = await this.models.workspace.get(workspaceId); + if (!workspace) { + this.logger.warn(`workspace ${workspaceId} not found`); + return; + } + + const snapshot = await this.models.doc.getSnapshot( + workspaceId, + workspaceId + ); + if (!snapshot) { + this.logger.warn(`workspace snapshot ${workspaceId} not found`); + return; + } + + const docIdsInWorkspace = readAllDocIdsFromWorkspaceSnapshot(snapshot.blob); + const docIdsInEmbedding = + await this.models.copilotContext.listWorkspaceEmbedding(workspaceId); + const docIdsInWorkspaceSet = new Set(docIdsInWorkspace); + + const deletedDocIds = docIdsInEmbedding.filter( + docId => !docIdsInWorkspaceSet.has(docId) + ); + for (const docId of deletedDocIds) { + await this.models.copilotContext.deleteWorkspaceEmbedding( + workspaceId, + docId + ); + } + } } diff --git a/packages/backend/server/src/plugins/copilot/embedding/types.ts b/packages/backend/server/src/plugins/copilot/embedding/types.ts index 751f23f75c..f037fc0682 100644 --- a/packages/backend/server/src/plugins/copilot/embedding/types.ts +++ b/packages/backend/server/src/plugins/copilot/embedding/types.ts @@ -61,6 +61,10 @@ declare global { fileId: string; fileName: string; }; + + 'copilot.embedding.cleanupTrashedDocEmbeddings': { + workspaceId: string; + }; } } diff --git a/packages/backend/server/src/plugins/copilot/index.ts b/packages/backend/server/src/plugins/copilot/index.ts index bee64e4828..02982bbbb8 100644 --- a/packages/backend/server/src/plugins/copilot/index.ts +++ b/packages/backend/server/src/plugins/copilot/index.ts @@ -64,8 +64,8 @@ import { // context CopilotContextResolver, CopilotContextService, + // jobs CopilotEmbeddingJob, - // cron jobs CopilotCronJobs, // transcription CopilotTranscriptionService, diff --git a/packages/backend/server/src/plugins/copilot/resolver.ts b/packages/backend/server/src/plugins/copilot/resolver.ts index aa59d7e882..a05cc7c30f 100644 --- a/packages/backend/server/src/plugins/copilot/resolver.ts +++ b/packages/backend/server/src/plugins/copilot/resolver.ts @@ -852,6 +852,14 @@ export class PromptsManagementResolver { return true; } + @Mutation(() => Boolean, { + description: 'Trigger cleanup of trashed doc embeddings', + }) + async triggerCleanupTrashedDocEmbeddings() { + await this.cron.triggerCleanupTrashedDocEmbeddings(); + return true; + } + @Query(() => [CopilotPromptType], { description: 'List all copilot prompts', }) diff --git a/packages/backend/server/src/schema.gql b/packages/backend/server/src/schema.gql index f5d5223f4d..edf792e578 100644 --- a/packages/backend/server/src/schema.gql +++ b/packages/backend/server/src/schema.gql @@ -1297,6 +1297,9 @@ type Mutation { setBlob(blob: Upload!, workspaceId: String!): String! submitAudioTranscription(blob: Upload, blobId: String!, blobs: [Upload!], workspaceId: String!): TranscriptionResultType + """Trigger cleanup of trashed doc embeddings""" + triggerCleanupTrashedDocEmbeddings: Boolean! + """Trigger generate missing titles cron job""" triggerGenerateTitleCron: Boolean! diff --git a/packages/common/graphql/src/schema.ts b/packages/common/graphql/src/schema.ts index c0acb18c90..0e7d456427 100644 --- a/packages/common/graphql/src/schema.ts +++ b/packages/common/graphql/src/schema.ts @@ -1440,6 +1440,8 @@ export interface Mutation { sendVerifyEmail: Scalars['Boolean']['output']; setBlob: Scalars['String']['output']; submitAudioTranscription: Maybe; + /** Trigger cleanup of trashed doc embeddings */ + triggerCleanupTrashedDocEmbeddings: Scalars['Boolean']['output']; /** Trigger generate missing titles cron job */ triggerGenerateTitleCron: Scalars['Boolean']['output']; /** update app configuration */