feat(server): copilot job models (#10732)

This commit is contained in:
darkskygit
2025-03-19 14:34:14 +00:00
parent dd31ef95db
commit b099546164
8 changed files with 372 additions and 5 deletions

View File

@@ -0,0 +1,132 @@
import { AiJobStatus, AiJobType, PrismaClient } from '@prisma/client';
import ava, { TestFn } from 'ava';
import { Config } from '../../base';
import { CopilotJobModel } from '../../models';
import { UserModel } from '../../models/user';
import { WorkspaceModel } from '../../models/workspace';
import { createTestingModule, type TestingModule } from '../utils';
interface Context {
config: Config;
module: TestingModule;
db: PrismaClient;
user: UserModel;
workspace: WorkspaceModel;
copilotJob: CopilotJobModel;
}
const test = ava as TestFn<Context>;
test.before(async t => {
const module = await createTestingModule();
t.context.user = module.get(UserModel);
t.context.workspace = module.get(WorkspaceModel);
t.context.copilotJob = module.get(CopilotJobModel);
t.context.db = module.get(PrismaClient);
t.context.config = module.get(Config);
t.context.module = module;
});
test.beforeEach(async t => {
await t.context.module.initTestingDB();
});
test.after(async t => {
await t.context.module.close();
});
test('should create a copilot job', async t => {
const user = await t.context.user.create({
email: 'test@affine.pro',
});
const workspace = await t.context.workspace.create(user.id);
const data = {
workspaceId: workspace.id,
blobId: 'blob-id',
createdBy: user.id,
type: AiJobType.transcription,
};
const job = await t.context.copilotJob.create(data);
t.truthy(job.id);
const job1 = await t.context.copilotJob.get(job.id);
t.deepEqual(
{
...data,
id: job.id,
status: AiJobStatus.pending,
payload: {},
},
job1
);
});
test('should get null for non-exist job', async t => {
const job = await t.context.copilotJob.get('non-exist');
t.is(job, null);
});
test('should update job', async t => {
const user = await t.context.user.create({
email: 'test@affine.pro',
});
const workspace = await t.context.workspace.create(user.id);
const { id: jobId } = await t.context.copilotJob.create({
workspaceId: workspace.id,
blobId: 'blob-id',
createdBy: user.id,
type: AiJobType.transcription,
});
const hasJob = await t.context.copilotJob.has(workspace.id, 'blob-id');
t.true(hasJob);
const job = await t.context.copilotJob.get(jobId);
const data = {
status: AiJobStatus.running,
payload: { foo: 'bar' },
};
await t.context.copilotJob.update(jobId, data);
const job1 = await t.context.copilotJob.get(jobId);
t.deepEqual(job1, { ...job, ...data });
});
test('should claim job', async t => {
const user = await t.context.user.create({
email: 'test@affine.pro',
});
const workspace = await t.context.workspace.create(user.id);
const { id: jobId } = await t.context.copilotJob.create({
workspaceId: workspace.id,
blobId: 'blob-id',
createdBy: user.id,
type: AiJobType.transcription,
});
const status = await t.context.copilotJob.claim(jobId, user.id);
t.is(status, AiJobStatus.pending, 'should not claim non-finished job');
await t.context.copilotJob.update(jobId, { status: AiJobStatus.finished });
const status1 = await t.context.copilotJob.claim(jobId, 'non-exist-user');
t.is(
status1,
AiJobStatus.finished,
'should not claim job created by other user'
);
const status2 = await t.context.copilotJob.claim(jobId, user.id);
t.is(status2, AiJobStatus.claimed, 'should claim finished job');
const status3 = await t.context.copilotJob.get(jobId);
t.is(
status3?.status,
AiJobStatus.claimed,
'should update job status to claimed'
);
});

View File

@@ -54,4 +54,8 @@ defineRuntimeConfig('job', {
default: 1,
desc: 'Concurrency of worker consuming of doc job queue',
},
'queues.copilot.concurrency': {
default: 1,
desc: 'Concurrency of worker consuming of copilot job queue',
},
});

View File

@@ -26,6 +26,7 @@ export enum Queue {
NIGHTLY_JOB = 'nightly',
NOTIFICATION = 'notification',
DOC = 'doc',
COPILOT = 'copilot',
}
export const QUEUES = Object.values(Queue);

View File

@@ -0,0 +1,12 @@
import { AiJobStatus, AiJobType } from '@prisma/client';
import type { JsonValue } from '@prisma/client/runtime/library';
export interface CopilotJob {
id?: string;
workspaceId: string;
blobId: string;
createdBy?: string;
type: AiJobType;
status?: AiJobStatus;
payload?: JsonValue;
}

View File

@@ -0,0 +1,151 @@
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import { AiJobStatus, AiJobType } from '@prisma/client';
import type { ZodType } from 'zod';
import { BaseModel } from './base';
import { CopilotJob } from './common/copilot';
type CreateCopilotJobInput = Omit<CopilotJob, 'id' | 'status' | 'payload'>;
type UpdateCopilotJobInput = Pick<CopilotJob, 'status' | 'payload'>;
/**
* Copilot Job Model
*/
@Injectable()
export class CopilotJobModel extends BaseModel {
async create(job: CreateCopilotJobInput) {
const row = await this.db.aiJobs.create({
data: {
workspaceId: job.workspaceId,
blobId: job.blobId,
createdBy: job.createdBy,
type: job.type,
status: AiJobStatus.pending,
payload: {},
},
select: {
id: true,
status: true,
},
});
return row;
}
async has(workspaceId: string, blobId: string) {
const row = await this.db.aiJobs.findFirst({
where: {
workspaceId,
blobId,
},
});
return !!row;
}
async update(jobId: string, data: UpdateCopilotJobInput) {
const ret = await this.db.aiJobs.updateMany({
where: {
id: jobId,
},
data: {
status: data.status || undefined,
payload: data.payload || undefined,
},
});
return ret.count > 0;
}
@Transactional()
async claim(jobId: string, userId: string) {
const job = await this.get(jobId);
if (
job &&
job.createdBy === userId &&
job.status === AiJobStatus.finished
) {
await this.update(jobId, { status: AiJobStatus.claimed });
}
const ret = await this.db.aiJobs.findFirst({
where: { id: jobId },
select: { status: true },
});
return ret?.status;
}
async getWithUser(
userId: string,
workspaceId: string,
jobId?: string,
type?: AiJobType
) {
const row = await this.db.aiJobs.findFirst({
where: {
id: jobId,
workspaceId,
type,
OR: [
{
createdBy: userId,
status: { in: [AiJobStatus.finished, AiJobStatus.claimed] },
},
{ createdBy: { not: userId }, status: AiJobStatus.claimed },
],
},
});
if (!row) {
return null;
}
return {
id: row.id,
workspaceId: row.workspaceId,
blobId: row.blobId,
createdBy: row.createdBy || undefined,
type: row.type,
status: row.status,
payload: row.payload,
};
}
async get(jobId: string): Promise<CopilotJob | null> {
const row = await this.db.aiJobs.findFirst({
where: {
id: jobId,
},
});
if (!row) {
return null;
}
return {
id: row.id,
workspaceId: row.workspaceId,
blobId: row.blobId,
createdBy: row.createdBy || undefined,
type: row.type,
status: row.status,
payload: row.payload,
};
}
async getPayload<
C extends ZodType<any>,
O = C extends ZodType<infer T> ? T : never,
>(jobId: string, schema: C): Promise<O> {
const row = await this.db.aiJobs.findUnique({
where: {
id: jobId,
},
select: {
payload: true,
},
});
const ret = schema.safeParse(row?.payload);
return ret.success ? ret.data : ({} as O);
}
}

View File

@@ -7,6 +7,7 @@ import {
import { ModuleRef } from '@nestjs/core';
import { ApplyType } from '../base';
import { CopilotJobModel } from './copilot-job';
import { DocModel } from './doc';
import { DocUserModel } from './doc-user';
import { FeatureModel } from './feature';
@@ -38,6 +39,7 @@ const MODELS = {
history: HistoryModel,
notification: NotificationModel,
settings: SettingsModel,
copilotJob: CopilotJobModel,
};
type ModelsType = {
@@ -90,6 +92,7 @@ const ModelsSymbolProvider: ExistingProvider = {
export class ModelsModule {}
export * from './common';
export * from './copilot-job';
export * from './doc';
export * from './doc-user';
export * from './feature';