diff --git a/packages/backend/server/migrations/20250303121921_ai_jobs/migration.sql b/packages/backend/server/migrations/20250303121921_ai_jobs/migration.sql new file mode 100644 index 0000000000..d640d36b46 --- /dev/null +++ b/packages/backend/server/migrations/20250303121921_ai_jobs/migration.sql @@ -0,0 +1,29 @@ +-- CreateEnum +CREATE TYPE "AiJobStatus" AS ENUM ('pending', 'running', 'finished', 'claimed', 'failed'); + +-- CreateEnum +CREATE TYPE "AiJobType" AS ENUM ('transcription'); + +-- CreateTable +CREATE TABLE "ai_jobs" ( + "id" VARCHAR NOT NULL, + "workspace_id" VARCHAR NOT NULL, + "blob_id" VARCHAR NOT NULL, + "created_by" VARCHAR, + "type" "AiJobType" NOT NULL, + "status" "AiJobStatus" NOT NULL DEFAULT 'pending', + "payload" JSON NOT NULL, + "started_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "finished_at" TIMESTAMPTZ(3), + + CONSTRAINT "ai_jobs_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "ai_jobs_created_by_workspace_id_blob_id_idx" ON "ai_jobs"("created_by", "workspace_id", "blob_id"); + +-- CreateIndex +CREATE UNIQUE INDEX "ai_jobs_workspace_id_blob_id_key" ON "ai_jobs"("workspace_id", "blob_id"); + +-- AddForeignKey +ALTER TABLE "ai_jobs" ADD CONSTRAINT "ai_jobs_created_by_fkey" FOREIGN KEY ("created_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/packages/backend/server/schema.prisma b/packages/backend/server/schema.prisma index 7b6080839e..e057310865 100644 --- a/packages/backend/server/schema.prisma +++ b/packages/backend/server/schema.prisma @@ -37,6 +37,7 @@ model User { updatedSnapshot Snapshot[] @relation("updatedSnapshot") createdUpdate Update[] @relation("createdUpdate") createdHistory SnapshotHistory[] @relation("createdHistory") + createdAiJobs AiJobs[] @relation("createdAiJobs") // receive notifications notifications Notification[] @relation("user_notifications") settings Settings? @@ -475,6 +476,40 @@ model AiWorkspaceEmbedding { @@map("ai_workspace_embeddings") } +enum AiJobStatus { + pending + running + finished + claimed + failed +} + +enum AiJobType { + transcription +} + +model AiJobs { + id String @id @default(uuid()) @db.VarChar + workspaceId String @map("workspace_id") @db.VarChar + blobId String @map("blob_id") @db.VarChar + createdBy String? @map("created_by") @db.VarChar + // job type, like "transcription" + type AiJobType + status AiJobStatus @default(pending) + // job result + payload Json @db.Json + + startedAt DateTime @default(now()) @map("started_at") @db.Timestamptz(3) + finishedAt DateTime? @map("finished_at") @db.Timestamptz(3) + + // will delete creator record if creator's account is deleted + createdByUser User? @relation(name: "createdAiJobs", fields: [createdBy], references: [id], onDelete: SetNull) + + @@unique([workspaceId, blobId]) + @@index([createdBy, workspaceId, blobId]) + @@map("ai_jobs") +} + model DataMigration { id String @id @default(uuid()) @db.VarChar name String @unique @db.VarChar @@ -709,12 +744,12 @@ model Notification { } model Settings { - userId String @id @map("user_id") @db.VarChar - createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) - updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(3) - payload Json @db.JsonB + userId String @id @map("user_id") @db.VarChar + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) + updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(3) + payload Json @db.JsonB - user User @relation(fields: [userId], references: [id], onDelete: Cascade) + user User @relation(fields: [userId], references: [id], onDelete: Cascade) @@map("settings") } diff --git a/packages/backend/server/src/__tests__/models/copilot-job.spec.ts b/packages/backend/server/src/__tests__/models/copilot-job.spec.ts new file mode 100644 index 0000000000..691beb4123 --- /dev/null +++ b/packages/backend/server/src/__tests__/models/copilot-job.spec.ts @@ -0,0 +1,132 @@ +import { AiJobStatus, AiJobType, PrismaClient } from '@prisma/client'; +import ava, { TestFn } from 'ava'; + +import { Config } from '../../base'; +import { CopilotJobModel } from '../../models'; +import { UserModel } from '../../models/user'; +import { WorkspaceModel } from '../../models/workspace'; +import { createTestingModule, type TestingModule } from '../utils'; + +interface Context { + config: Config; + module: TestingModule; + db: PrismaClient; + user: UserModel; + workspace: WorkspaceModel; + copilotJob: CopilotJobModel; +} + +const test = ava as TestFn; + +test.before(async t => { + const module = await createTestingModule(); + t.context.user = module.get(UserModel); + t.context.workspace = module.get(WorkspaceModel); + t.context.copilotJob = module.get(CopilotJobModel); + t.context.db = module.get(PrismaClient); + t.context.config = module.get(Config); + t.context.module = module; +}); + +test.beforeEach(async t => { + await t.context.module.initTestingDB(); +}); + +test.after(async t => { + await t.context.module.close(); +}); + +test('should create a copilot job', async t => { + const user = await t.context.user.create({ + email: 'test@affine.pro', + }); + const workspace = await t.context.workspace.create(user.id); + + const data = { + workspaceId: workspace.id, + blobId: 'blob-id', + createdBy: user.id, + type: AiJobType.transcription, + }; + + const job = await t.context.copilotJob.create(data); + + t.truthy(job.id); + + const job1 = await t.context.copilotJob.get(job.id); + t.deepEqual( + { + ...data, + id: job.id, + status: AiJobStatus.pending, + payload: {}, + }, + job1 + ); +}); + +test('should get null for non-exist job', async t => { + const job = await t.context.copilotJob.get('non-exist'); + t.is(job, null); +}); + +test('should update job', async t => { + const user = await t.context.user.create({ + email: 'test@affine.pro', + }); + const workspace = await t.context.workspace.create(user.id); + const { id: jobId } = await t.context.copilotJob.create({ + workspaceId: workspace.id, + blobId: 'blob-id', + createdBy: user.id, + type: AiJobType.transcription, + }); + + const hasJob = await t.context.copilotJob.has(workspace.id, 'blob-id'); + t.true(hasJob); + + const job = await t.context.copilotJob.get(jobId); + + const data = { + status: AiJobStatus.running, + payload: { foo: 'bar' }, + }; + await t.context.copilotJob.update(jobId, data); + const job1 = await t.context.copilotJob.get(jobId); + t.deepEqual(job1, { ...job, ...data }); +}); + +test('should claim job', async t => { + const user = await t.context.user.create({ + email: 'test@affine.pro', + }); + const workspace = await t.context.workspace.create(user.id); + const { id: jobId } = await t.context.copilotJob.create({ + workspaceId: workspace.id, + blobId: 'blob-id', + createdBy: user.id, + type: AiJobType.transcription, + }); + + const status = await t.context.copilotJob.claim(jobId, user.id); + t.is(status, AiJobStatus.pending, 'should not claim non-finished job'); + + await t.context.copilotJob.update(jobId, { status: AiJobStatus.finished }); + + const status1 = await t.context.copilotJob.claim(jobId, 'non-exist-user'); + t.is( + status1, + AiJobStatus.finished, + 'should not claim job created by other user' + ); + + const status2 = await t.context.copilotJob.claim(jobId, user.id); + t.is(status2, AiJobStatus.claimed, 'should claim finished job'); + + const status3 = await t.context.copilotJob.get(jobId); + t.is( + status3?.status, + AiJobStatus.claimed, + 'should update job status to claimed' + ); +}); diff --git a/packages/backend/server/src/base/job/queue/config.ts b/packages/backend/server/src/base/job/queue/config.ts index a6e7969282..5c3cd3b301 100644 --- a/packages/backend/server/src/base/job/queue/config.ts +++ b/packages/backend/server/src/base/job/queue/config.ts @@ -54,4 +54,8 @@ defineRuntimeConfig('job', { default: 1, desc: 'Concurrency of worker consuming of doc job queue', }, + 'queues.copilot.concurrency': { + default: 1, + desc: 'Concurrency of worker consuming of copilot job queue', + }, }); diff --git a/packages/backend/server/src/base/job/queue/def.ts b/packages/backend/server/src/base/job/queue/def.ts index a108284136..01e9b70050 100644 --- a/packages/backend/server/src/base/job/queue/def.ts +++ b/packages/backend/server/src/base/job/queue/def.ts @@ -26,6 +26,7 @@ export enum Queue { NIGHTLY_JOB = 'nightly', NOTIFICATION = 'notification', DOC = 'doc', + COPILOT = 'copilot', } export const QUEUES = Object.values(Queue); diff --git a/packages/backend/server/src/models/common/copilot.ts b/packages/backend/server/src/models/common/copilot.ts new file mode 100644 index 0000000000..38078486ea --- /dev/null +++ b/packages/backend/server/src/models/common/copilot.ts @@ -0,0 +1,12 @@ +import { AiJobStatus, AiJobType } from '@prisma/client'; +import type { JsonValue } from '@prisma/client/runtime/library'; + +export interface CopilotJob { + id?: string; + workspaceId: string; + blobId: string; + createdBy?: string; + type: AiJobType; + status?: AiJobStatus; + payload?: JsonValue; +} diff --git a/packages/backend/server/src/models/copilot-job.ts b/packages/backend/server/src/models/copilot-job.ts new file mode 100644 index 0000000000..05ab5e0d93 --- /dev/null +++ b/packages/backend/server/src/models/copilot-job.ts @@ -0,0 +1,151 @@ +import { Injectable } from '@nestjs/common'; +import { Transactional } from '@nestjs-cls/transactional'; +import { AiJobStatus, AiJobType } from '@prisma/client'; +import type { ZodType } from 'zod'; + +import { BaseModel } from './base'; +import { CopilotJob } from './common/copilot'; + +type CreateCopilotJobInput = Omit; +type UpdateCopilotJobInput = Pick; + +/** + * Copilot Job Model + */ +@Injectable() +export class CopilotJobModel extends BaseModel { + async create(job: CreateCopilotJobInput) { + const row = await this.db.aiJobs.create({ + data: { + workspaceId: job.workspaceId, + blobId: job.blobId, + createdBy: job.createdBy, + type: job.type, + status: AiJobStatus.pending, + payload: {}, + }, + select: { + id: true, + status: true, + }, + }); + return row; + } + + async has(workspaceId: string, blobId: string) { + const row = await this.db.aiJobs.findFirst({ + where: { + workspaceId, + blobId, + }, + }); + return !!row; + } + + async update(jobId: string, data: UpdateCopilotJobInput) { + const ret = await this.db.aiJobs.updateMany({ + where: { + id: jobId, + }, + data: { + status: data.status || undefined, + payload: data.payload || undefined, + }, + }); + return ret.count > 0; + } + + @Transactional() + async claim(jobId: string, userId: string) { + const job = await this.get(jobId); + + if ( + job && + job.createdBy === userId && + job.status === AiJobStatus.finished + ) { + await this.update(jobId, { status: AiJobStatus.claimed }); + } + + const ret = await this.db.aiJobs.findFirst({ + where: { id: jobId }, + select: { status: true }, + }); + return ret?.status; + } + + async getWithUser( + userId: string, + workspaceId: string, + jobId?: string, + type?: AiJobType + ) { + const row = await this.db.aiJobs.findFirst({ + where: { + id: jobId, + workspaceId, + type, + OR: [ + { + createdBy: userId, + status: { in: [AiJobStatus.finished, AiJobStatus.claimed] }, + }, + { createdBy: { not: userId }, status: AiJobStatus.claimed }, + ], + }, + }); + + if (!row) { + return null; + } + + return { + id: row.id, + workspaceId: row.workspaceId, + blobId: row.blobId, + createdBy: row.createdBy || undefined, + type: row.type, + status: row.status, + payload: row.payload, + }; + } + + async get(jobId: string): Promise { + const row = await this.db.aiJobs.findFirst({ + where: { + id: jobId, + }, + }); + + if (!row) { + return null; + } + + return { + id: row.id, + workspaceId: row.workspaceId, + blobId: row.blobId, + createdBy: row.createdBy || undefined, + type: row.type, + status: row.status, + payload: row.payload, + }; + } + + async getPayload< + C extends ZodType, + O = C extends ZodType ? T : never, + >(jobId: string, schema: C): Promise { + const row = await this.db.aiJobs.findUnique({ + where: { + id: jobId, + }, + select: { + payload: true, + }, + }); + + const ret = schema.safeParse(row?.payload); + return ret.success ? ret.data : ({} as O); + } +} diff --git a/packages/backend/server/src/models/index.ts b/packages/backend/server/src/models/index.ts index 571b449441..3706046ce4 100644 --- a/packages/backend/server/src/models/index.ts +++ b/packages/backend/server/src/models/index.ts @@ -7,6 +7,7 @@ import { import { ModuleRef } from '@nestjs/core'; import { ApplyType } from '../base'; +import { CopilotJobModel } from './copilot-job'; import { DocModel } from './doc'; import { DocUserModel } from './doc-user'; import { FeatureModel } from './feature'; @@ -38,6 +39,7 @@ const MODELS = { history: HistoryModel, notification: NotificationModel, settings: SettingsModel, + copilotJob: CopilotJobModel, }; type ModelsType = { @@ -90,6 +92,7 @@ const ModelsSymbolProvider: ExistingProvider = { export class ModelsModule {} export * from './common'; +export * from './copilot-job'; export * from './doc'; export * from './doc-user'; export * from './feature';