mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-13 04:48:53 +00:00
feat(server): skip cleanup for stale workspace (#13418)
fix AI-408 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added a new field to workspaces to track the last time embeddings were checked. * Cleanup jobs for workspace embeddings now skip workspaces that haven't changed in over 30 days or have no embeddings, improving efficiency. * Cleanup jobs are now automatically triggered when a workspace is updated. * **Improvements** * Enhanced workspace selection for cleanup and indexing tasks to use more precise filters and batching. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
@@ -24,6 +24,7 @@ export type UpdateWorkspaceInput = Pick<
|
||||
| 'name'
|
||||
| 'avatarKey'
|
||||
| 'indexed'
|
||||
| 'lastCheckEmbeddings'
|
||||
>;
|
||||
|
||||
@Injectable()
|
||||
@@ -49,7 +50,11 @@ export class WorkspaceModel extends BaseModel {
|
||||
/**
|
||||
* Update the workspace with the given data.
|
||||
*/
|
||||
async update(workspaceId: string, data: UpdateWorkspaceInput) {
|
||||
async update(
|
||||
workspaceId: string,
|
||||
data: UpdateWorkspaceInput,
|
||||
notifyUpdate = true
|
||||
) {
|
||||
const workspace = await this.db.workspace.update({
|
||||
where: {
|
||||
id: workspaceId,
|
||||
@@ -60,7 +65,9 @@ export class WorkspaceModel extends BaseModel {
|
||||
`Updated workspace ${workspaceId} with data ${JSON.stringify(data)}`
|
||||
);
|
||||
|
||||
this.event.emit('workspace.updated', workspace);
|
||||
if (notifyUpdate) {
|
||||
this.event.emit('workspace.updated', workspace);
|
||||
}
|
||||
|
||||
return workspace;
|
||||
}
|
||||
@@ -81,25 +88,15 @@ export class WorkspaceModel extends BaseModel {
|
||||
});
|
||||
}
|
||||
|
||||
async listAfterSid(sid: number, limit: number) {
|
||||
return await this.db.workspace.findMany({
|
||||
where: {
|
||||
sid: { gt: sid },
|
||||
},
|
||||
take: limit,
|
||||
orderBy: {
|
||||
sid: 'asc',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async list<S extends Prisma.WorkspaceSelect>(
|
||||
where: Prisma.WorkspaceWhereInput = {},
|
||||
select?: S
|
||||
select?: S,
|
||||
limit?: number
|
||||
) {
|
||||
return (await this.db.workspace.findMany({
|
||||
where,
|
||||
select,
|
||||
take: limit,
|
||||
orderBy: {
|
||||
sid: 'asc',
|
||||
},
|
||||
|
||||
@@ -93,8 +93,11 @@ export class CopilotCronJobs {
|
||||
params: Jobs['copilot.workspace.cleanupTrashedDocEmbeddings']
|
||||
) {
|
||||
const nextSid = params.nextSid ?? 0;
|
||||
let workspaces = await this.models.workspace.listAfterSid(
|
||||
nextSid,
|
||||
// only consider workspaces that cleared their embeddings more than 24 hours ago
|
||||
const oneDayAgo = new Date(Date.now() - OneDay);
|
||||
const workspaces = await this.models.workspace.list(
|
||||
{ sid: { gt: nextSid }, lastCheckEmbeddings: { lt: oneDayAgo } },
|
||||
{ id: true, sid: true },
|
||||
CLEANUP_EMBEDDING_JOB_BATCH_SIZE
|
||||
);
|
||||
if (!workspaces.length) {
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
EventBus,
|
||||
JobQueue,
|
||||
mapAnyError,
|
||||
OneDay,
|
||||
OnEvent,
|
||||
OnJob,
|
||||
} from '../../../base';
|
||||
@@ -511,6 +512,7 @@ export class CopilotEmbeddingJob {
|
||||
return;
|
||||
}
|
||||
|
||||
const oneMonthAgo = new Date(Date.now() - OneDay * 30);
|
||||
const snapshot = await this.models.doc.getSnapshot(
|
||||
workspaceId,
|
||||
workspaceId
|
||||
@@ -518,11 +520,37 @@ export class CopilotEmbeddingJob {
|
||||
if (!snapshot) {
|
||||
this.logger.warn(`workspace snapshot ${workspaceId} not found`);
|
||||
return;
|
||||
} else if (
|
||||
// always check if never cleared
|
||||
workspace.lastCheckEmbeddings > new Date(0) &&
|
||||
snapshot.updatedAt < oneMonthAgo
|
||||
) {
|
||||
this.logger.verbose(
|
||||
`workspace ${workspaceId} is too old, skipping embeddings cleanup`
|
||||
);
|
||||
await this.models.workspace.update(
|
||||
workspaceId,
|
||||
{ lastCheckEmbeddings: new Date() },
|
||||
false
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const docIdsInEmbedding =
|
||||
await this.models.copilotContext.listWorkspaceDocEmbedding(workspaceId);
|
||||
if (!docIdsInEmbedding.length) {
|
||||
this.logger.verbose(
|
||||
`No doc embeddings found in workspace ${workspaceId}, skipping cleanup`
|
||||
);
|
||||
await this.models.workspace.update(
|
||||
workspaceId,
|
||||
{ lastCheckEmbeddings: new Date() },
|
||||
false
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const docIdsInWorkspace = readAllDocIdsFromWorkspaceSnapshot(snapshot.blob);
|
||||
const docIdsInEmbedding =
|
||||
await this.models.copilotContext.listWorkspaceDocEmbedding(workspaceId);
|
||||
const docIdsInWorkspaceSet = new Set(docIdsInWorkspace);
|
||||
|
||||
const deletedDocIds = docIdsInEmbedding.filter(
|
||||
@@ -534,5 +562,20 @@ export class CopilotEmbeddingJob {
|
||||
docId
|
||||
);
|
||||
}
|
||||
|
||||
await this.models.workspace.update(
|
||||
workspaceId,
|
||||
{ lastCheckEmbeddings: new Date() },
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@OnEvent('workspace.updated')
|
||||
async onWorkspaceUpdated({ id }: Events['workspace.updated']) {
|
||||
if (!this.supportEmbedding) return;
|
||||
|
||||
await this.queue.add('copilot.embedding.cleanupTrashedDocEmbeddings', {
|
||||
workspaceId: id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,10 +157,12 @@ export class IndexerJob {
|
||||
}
|
||||
|
||||
const startSid = payload.lastIndexedWorkspaceSid ?? 0;
|
||||
const workspaces = await this.models.workspace.listAfterSid(
|
||||
startSid,
|
||||
const workspaces = await this.models.workspace.list(
|
||||
{ sid: { gt: startSid } },
|
||||
{ id: true, indexed: true, sid: true },
|
||||
this.config.indexer.autoIndex.batchSize
|
||||
);
|
||||
|
||||
if (workspaces.length === 0) {
|
||||
// Keep the current sid value when repeating
|
||||
return JOB_SIGNAL.Repeat;
|
||||
|
||||
Reference in New Issue
Block a user