mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-13 21:05:19 +00:00
feat(server): cleanup trashed doc's embedding (#13201)
fix AI-359 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added automated cleanup of embeddings for documents deleted or trashed from workspaces. * Introduced a new job to schedule and perform this cleanup per workspace daily and on demand. * Added new GraphQL mutation to manually trigger the cleanup process. * Added the ability to list workspaces with flexible filtering and selection options. * **Improvements** * Enhanced document status handling to more accurately reflect embedding presence. * Refined internal methods for managing and checking document embeddings. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
@@ -396,6 +396,15 @@ Generated by [AVA](https://avajs.dev).
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
args: [
|
||||
'copilot.workspace.cleanupTrashedDocEmbeddings',
|
||||
{},
|
||||
{
|
||||
jobId: 'daily-copilot-cleanup-trashed-doc-embeddings',
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
> cleanup empty sessions calls
|
||||
|
||||
Binary file not shown.
@@ -164,11 +164,14 @@ test('should insert embedding by doc id', async t => {
|
||||
);
|
||||
|
||||
{
|
||||
const ret = await t.context.copilotContext.hasWorkspaceEmbedding(
|
||||
const ret = await t.context.copilotContext.listWorkspaceEmbedding(
|
||||
workspace.id,
|
||||
[docId]
|
||||
);
|
||||
t.true(ret.has(docId), 'should return doc id when embedding is inserted');
|
||||
t.true(
|
||||
ret.includes(docId),
|
||||
'should return doc id when embedding is inserted'
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
@@ -317,8 +320,8 @@ test('should merge doc status correctly', async t => {
|
||||
|
||||
const hasEmbeddingStub = Sinon.stub(
|
||||
t.context.copilotContext,
|
||||
'hasWorkspaceEmbedding'
|
||||
).resolves(new Set<string>());
|
||||
'listWorkspaceEmbedding'
|
||||
).resolves([]);
|
||||
|
||||
const stubResult = await t.context.copilotContext.mergeDocStatus(
|
||||
workspace.id,
|
||||
|
||||
@@ -84,11 +84,17 @@ export class CopilotContextModel extends BaseModel {
|
||||
}
|
||||
|
||||
async mergeDocStatus(workspaceId: string, docs: ContextDoc[]) {
|
||||
const docIds = Array.from(new Set(docs.map(doc => doc.id)));
|
||||
const finishedDoc = await this.hasWorkspaceEmbedding(workspaceId, docIds);
|
||||
const canEmbedding = await this.checkEmbeddingAvailable();
|
||||
const finishedDoc = canEmbedding
|
||||
? await this.listWorkspaceEmbedding(
|
||||
workspaceId,
|
||||
Array.from(new Set(docs.map(doc => doc.id)))
|
||||
)
|
||||
: [];
|
||||
const finishedDocSet = new Set(finishedDoc);
|
||||
|
||||
for (const doc of docs) {
|
||||
const status = finishedDoc.has(doc.id)
|
||||
const status = finishedDocSet.has(doc.id)
|
||||
? ContextEmbedStatus.finished
|
||||
: undefined;
|
||||
// NOTE: when the document has not been synchronized to the server or is in the embedding queue
|
||||
@@ -120,24 +126,17 @@ export class CopilotContextModel extends BaseModel {
|
||||
return Number(count) === 2;
|
||||
}
|
||||
|
||||
async hasWorkspaceEmbedding(workspaceId: string, docIds: string[]) {
|
||||
const canEmbedding = await this.checkEmbeddingAvailable();
|
||||
if (!canEmbedding) {
|
||||
return new Set();
|
||||
}
|
||||
|
||||
async listWorkspaceEmbedding(workspaceId: string, docIds?: string[]) {
|
||||
const existsIds = await this.db.aiWorkspaceEmbedding
|
||||
.findMany({
|
||||
.groupBy({
|
||||
where: {
|
||||
workspaceId,
|
||||
docId: { in: docIds },
|
||||
},
|
||||
select: {
|
||||
docId: true,
|
||||
docId: docIds ? { in: docIds } : undefined,
|
||||
},
|
||||
by: ['docId'],
|
||||
})
|
||||
.then(r => r.map(r => r.docId));
|
||||
return new Set(existsIds);
|
||||
return existsIds;
|
||||
}
|
||||
|
||||
private processEmbeddings(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Transactional } from '@nestjs-cls/transactional';
|
||||
import { type Workspace } from '@prisma/client';
|
||||
import { Prisma, type Workspace } from '@prisma/client';
|
||||
|
||||
import { EventBus } from '../base';
|
||||
import { BaseModel } from './base';
|
||||
@@ -93,6 +93,19 @@ export class WorkspaceModel extends BaseModel {
|
||||
});
|
||||
}
|
||||
|
||||
async list<S extends Prisma.WorkspaceSelect>(
|
||||
where: Prisma.WorkspaceWhereInput = {},
|
||||
select?: S
|
||||
) {
|
||||
return (await this.db.workspace.findMany({
|
||||
where,
|
||||
select,
|
||||
orderBy: {
|
||||
sid: 'asc',
|
||||
},
|
||||
})) as Prisma.WorkspaceGetPayload<{ select: S }>[];
|
||||
}
|
||||
|
||||
async delete(workspaceId: string) {
|
||||
const rawResult = await this.db.workspace.deleteMany({
|
||||
where: {
|
||||
|
||||
@@ -8,6 +8,7 @@ declare global {
|
||||
interface Jobs {
|
||||
'copilot.session.cleanupEmptySessions': {};
|
||||
'copilot.session.generateMissingTitles': {};
|
||||
'copilot.workspace.cleanupTrashedDocEmbeddings': {};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +21,14 @@ export class CopilotCronJobs {
|
||||
private readonly jobs: JobQueue
|
||||
) {}
|
||||
|
||||
async triggerCleanupTrashedDocEmbeddings() {
|
||||
await this.jobs.add(
|
||||
'copilot.workspace.cleanupTrashedDocEmbeddings',
|
||||
{},
|
||||
{ jobId: 'daily-copilot-cleanup-trashed-doc-embeddings' }
|
||||
);
|
||||
}
|
||||
|
||||
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
|
||||
async dailyCleanupJob() {
|
||||
await this.jobs.add(
|
||||
@@ -33,6 +42,12 @@ export class CopilotCronJobs {
|
||||
{},
|
||||
{ jobId: 'daily-copilot-generate-missing-titles' }
|
||||
);
|
||||
|
||||
await this.jobs.add(
|
||||
'copilot.workspace.cleanupTrashedDocEmbeddings',
|
||||
{},
|
||||
{ jobId: 'daily-copilot-cleanup-trashed-doc-embeddings' }
|
||||
);
|
||||
}
|
||||
|
||||
async triggerGenerateMissingTitles() {
|
||||
@@ -68,4 +83,18 @@ export class CopilotCronJobs {
|
||||
`Scheduled title generation for ${sessions.length} sessions`
|
||||
);
|
||||
}
|
||||
|
||||
@OnJob('copilot.workspace.cleanupTrashedDocEmbeddings')
|
||||
async cleanupTrashedDocEmbeddings() {
|
||||
const workspaces = await this.models.workspace.list(undefined, {
|
||||
id: true,
|
||||
});
|
||||
for (const { id: workspaceId } of workspaces) {
|
||||
await this.jobs.add(
|
||||
'copilot.embedding.cleanupTrashedDocEmbeddings',
|
||||
{ workspaceId },
|
||||
{ jobId: `cleanup-trashed-doc-embeddings-${workspaceId}` }
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
OnJob,
|
||||
} from '../../../base';
|
||||
import { DocReader } from '../../../core/doc';
|
||||
import { readAllDocIdsFromWorkspaceSnapshot } from '../../../core/utils/blocksuite';
|
||||
import { Models } from '../../../models';
|
||||
import { CopilotStorage } from '../storage';
|
||||
import { readStream } from '../utils';
|
||||
@@ -134,10 +135,30 @@ export class CopilotEmbeddingJob {
|
||||
if (enableDocEmbedding) {
|
||||
const toBeEmbedDocIds =
|
||||
await this.models.copilotWorkspace.findDocsToEmbed(workspaceId);
|
||||
if (!toBeEmbedDocIds.length) {
|
||||
return;
|
||||
}
|
||||
// filter out trashed docs
|
||||
const rootSnapshot = await this.models.doc.getSnapshot(
|
||||
workspaceId,
|
||||
workspaceId
|
||||
);
|
||||
if (!rootSnapshot) {
|
||||
this.logger.warn(
|
||||
`Root snapshot for workspace ${workspaceId} not found, skipping embedding.`
|
||||
);
|
||||
return;
|
||||
}
|
||||
const allDocIds = new Set(
|
||||
readAllDocIdsFromWorkspaceSnapshot(rootSnapshot.blob)
|
||||
);
|
||||
this.logger.log(
|
||||
`Trigger embedding for ${toBeEmbedDocIds.length} docs in workspace ${workspaceId}`
|
||||
);
|
||||
for (const docId of toBeEmbedDocIds) {
|
||||
const finalToBeEmbedDocIds = toBeEmbedDocIds.filter(docId =>
|
||||
allDocIds.has(docId)
|
||||
);
|
||||
for (const docId of finalToBeEmbedDocIds) {
|
||||
await this.queue.add(
|
||||
'copilot.embedding.docs',
|
||||
{
|
||||
@@ -422,4 +443,39 @@ export class CopilotEmbeddingJob {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@OnJob('copilot.embedding.cleanupTrashedDocEmbeddings')
|
||||
async cleanupTrashedDocEmbeddings({
|
||||
workspaceId,
|
||||
}: Jobs['copilot.embedding.cleanupTrashedDocEmbeddings']) {
|
||||
const workspace = await this.models.workspace.get(workspaceId);
|
||||
if (!workspace) {
|
||||
this.logger.warn(`workspace ${workspaceId} not found`);
|
||||
return;
|
||||
}
|
||||
|
||||
const snapshot = await this.models.doc.getSnapshot(
|
||||
workspaceId,
|
||||
workspaceId
|
||||
);
|
||||
if (!snapshot) {
|
||||
this.logger.warn(`workspace snapshot ${workspaceId} not found`);
|
||||
return;
|
||||
}
|
||||
|
||||
const docIdsInWorkspace = readAllDocIdsFromWorkspaceSnapshot(snapshot.blob);
|
||||
const docIdsInEmbedding =
|
||||
await this.models.copilotContext.listWorkspaceEmbedding(workspaceId);
|
||||
const docIdsInWorkspaceSet = new Set(docIdsInWorkspace);
|
||||
|
||||
const deletedDocIds = docIdsInEmbedding.filter(
|
||||
docId => !docIdsInWorkspaceSet.has(docId)
|
||||
);
|
||||
for (const docId of deletedDocIds) {
|
||||
await this.models.copilotContext.deleteWorkspaceEmbedding(
|
||||
workspaceId,
|
||||
docId
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,6 +61,10 @@ declare global {
|
||||
fileId: string;
|
||||
fileName: string;
|
||||
};
|
||||
|
||||
'copilot.embedding.cleanupTrashedDocEmbeddings': {
|
||||
workspaceId: string;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -64,8 +64,8 @@ import {
|
||||
// context
|
||||
CopilotContextResolver,
|
||||
CopilotContextService,
|
||||
// jobs
|
||||
CopilotEmbeddingJob,
|
||||
// cron jobs
|
||||
CopilotCronJobs,
|
||||
// transcription
|
||||
CopilotTranscriptionService,
|
||||
|
||||
@@ -852,6 +852,14 @@ export class PromptsManagementResolver {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Mutation(() => Boolean, {
|
||||
description: 'Trigger cleanup of trashed doc embeddings',
|
||||
})
|
||||
async triggerCleanupTrashedDocEmbeddings() {
|
||||
await this.cron.triggerCleanupTrashedDocEmbeddings();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Query(() => [CopilotPromptType], {
|
||||
description: 'List all copilot prompts',
|
||||
})
|
||||
|
||||
@@ -1297,6 +1297,9 @@ type Mutation {
|
||||
setBlob(blob: Upload!, workspaceId: String!): String!
|
||||
submitAudioTranscription(blob: Upload, blobId: String!, blobs: [Upload!], workspaceId: String!): TranscriptionResultType
|
||||
|
||||
"""Trigger cleanup of trashed doc embeddings"""
|
||||
triggerCleanupTrashedDocEmbeddings: Boolean!
|
||||
|
||||
"""Trigger generate missing titles cron job"""
|
||||
triggerGenerateTitleCron: Boolean!
|
||||
|
||||
|
||||
Reference in New Issue
Block a user