feat: add queue management for admin panel

This commit is contained in:
DarkSky
2026-01-01 06:13:50 +08:00
parent f745f7b669
commit 0b0ae5ea0a
11 changed files with 2460 additions and 34 deletions

View File

@@ -72,6 +72,7 @@
"@opentelemetry/semantic-conventions": "^1.38.0",
"@prisma/client": "^6.6.0",
"@prisma/instrumentation": "^6.7.0",
"@queuedash/api": "^3.14.0",
"@react-email/components": "0.0.38",
"@socket.io/redis-adapter": "^8.3.0",
"ai": "^5.0.108",

View File

@@ -40,6 +40,7 @@ import { MailModule } from './core/mail';
import { MonitorModule } from './core/monitor';
import { NotificationModule } from './core/notification';
import { PermissionModule } from './core/permission';
import { QueueDashboardModule } from './core/queue-dashboard';
import { QuotaModule } from './core/quota';
import { SelfhostModule } from './core/selfhost';
import { StorageModule } from './core/storage';
@@ -189,7 +190,8 @@ export function buildAppModule(env: Env) {
OAuthModule,
CustomerIoModule,
CommentModule,
AccessTokenModule
AccessTokenModule,
QueueDashboardModule
)
// doc service only
.useIf(() => env.flavors.doc, DocServiceModule)

View File

@@ -0,0 +1,100 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Injectable, Logger, Module, OnModuleInit } from '@nestjs/common';
import { HttpAdapterHost, ModuleRef } from '@nestjs/core';
import { createQueueDashExpressMiddleware } from '@queuedash/api';
import type { Queue as BullMQQueue } from 'bullmq';
import type { Application, NextFunction, Request, Response } from 'express';
import { Config } from '../../base/config';
import { QUEUES } from '../../base/job/queue/def';
import { AuthGuard, AuthModule } from '../auth';
import { FeatureModule, FeatureService } from '../features';
type QueueDashQueue = {
queue: BullMQQueue;
displayName: string;
type: 'bullmq';
};
@Injectable()
class QueueDashboardService implements OnModuleInit {
private readonly logger = new Logger(QueueDashboardService.name);
constructor(
private readonly adapterHost: HttpAdapterHost,
private readonly config: Config,
private readonly feature: FeatureService,
private readonly authGuard: AuthGuard,
private readonly moduleRef: ModuleRef
) {}
async onModuleInit() {
const httpAdapter = this.adapterHost.httpAdapter;
if (!httpAdapter) {
return;
}
const app = httpAdapter.getInstance<Application>();
const mountPath = `${this.config.server.path}/api/queue`;
const queues = this.collectQueues();
if (!queues.length) {
this.logger.warn('QueueDash not mounted: no queues available');
app.use(mountPath, (_req, res) => {
res.status(404).end();
});
return;
}
const guardMiddleware = async (
req: Request,
res: Response,
next: NextFunction
): Promise<void> => {
try {
const session = await this.authGuard.signIn(req, res);
const userId = session?.user?.id;
const isAdmin = userId ? await this.feature.isAdmin(userId) : false;
if (!isAdmin) {
res.status(404).end();
return;
}
} catch (error) {
this.logger.warn('QueueDash auth failed', error as Error);
res.status(404).end();
return;
}
next();
};
app.use(
mountPath,
guardMiddleware,
createQueueDashExpressMiddleware({ ctx: { queues } })
);
this.logger.log(`QueueDash mounted on ${mountPath}`);
}
private collectQueues(): QueueDashQueue[] {
const queues: QueueDashQueue[] = [];
for (const name of QUEUES) {
const queue = this.moduleRef.get<BullMQQueue>(getQueueToken(name), {
strict: false,
});
if (queue) {
queues.push({ queue, displayName: name, type: 'bullmq' });
}
}
return queues;
}
}
@Module({
imports: [AuthModule, FeatureModule],
providers: [QueueDashboardService],
})
export class QueueDashboardModule {}

View File

@@ -129,10 +129,12 @@ export class WorkspaceStatsJob {
private async withAdvisoryLock<T>(
callback: (tx: Prisma.TransactionClient) => Promise<T>
): Promise<T | null> {
const lockIdSql = Prisma.sql`(${LOCK_NAMESPACE}::bigint << 32) + ${LOCK_KEY}::bigint`;
return await this.prisma.$transaction(
async tx => {
const [lock] = await tx.$queryRaw<{ locked: boolean }[]>`
SELECT pg_try_advisory_lock(${LOCK_NAMESPACE}, ${LOCK_KEY}) AS locked
SELECT pg_try_advisory_lock(${lockIdSql}) AS locked
`;
if (!lock?.locked) {
@@ -142,7 +144,7 @@ export class WorkspaceStatsJob {
try {
return await callback(tx);
} finally {
await tx.$executeRaw`SELECT pg_advisory_unlock(${LOCK_NAMESPACE}, ${LOCK_KEY})`;
await tx.$executeRaw`SELECT pg_advisory_unlock(${lockIdSql})`;
}
},
{

View File

@@ -9,6 +9,7 @@
"@affine/graphql": "workspace:*",
"@affine/routes": "workspace:*",
"@blocksuite/icons": "^2.2.17",
"@queuedash/ui": "^3.14.0",
"@radix-ui/react-accordion": "^1.2.2",
"@radix-ui/react-alert-dialog": "^1.1.3",
"@radix-ui/react-aspect-ratio": "^1.1.1",

View File

@@ -26,6 +26,9 @@ export const Accounts = lazy(
export const Workspaces = lazy(
() => import(/* webpackChunkName: "workspaces" */ './modules/workspaces')
);
export const Queue = lazy(
() => import(/* webpackChunkName: "queue" */ './modules/queue')
);
export const AI = lazy(
() => import(/* webpackChunkName: "ai" */ './modules/ai')
);
@@ -98,6 +101,7 @@ export const App = () => {
path={ROUTES.admin.workspaces}
element={<Workspaces />}
/>
<Route path={`${ROUTES.admin.queue}/*`} element={<Queue />} />
<Route path={ROUTES.admin.ai} element={<AI />} />
<Route path={ROUTES.admin.about} element={<About />} />
<Route

View File

@@ -2,7 +2,7 @@ import { buttonVariants } from '@affine/admin/components/ui/button';
import { cn } from '@affine/admin/utils';
import { AccountIcon, SelfhostIcon } from '@blocksuite/icons/rc';
import { cssVarV2 } from '@toeverything/theme/v2';
import { LayoutDashboardIcon } from 'lucide-react';
import { LayoutDashboardIcon, ListChecksIcon } from 'lucide-react';
import { NavLink } from 'react-router-dom';
import { ServerVersion } from './server-version';
@@ -97,6 +97,12 @@ export function Nav({ isCollapsed = false }: NavProps) {
label="Workspaces"
isCollapsed={isCollapsed}
/>
<NavItem
to="/admin/queue"
icon={<ListChecksIcon size={18} />}
label="Queue"
isCollapsed={isCollapsed}
/>
{/* <NavItem
to="/admin/ai"
icon={<AiOutlineIcon fontSize={20} />}

View File

@@ -0,0 +1,23 @@
// eslint-disable-next-line @typescript-eslint/no-restricted-imports
import '@queuedash/ui/dist/styles.css';
import './queue.css';
import { QueueDashApp } from '@queuedash/ui';
import { Header } from '../header';
export function QueuePage() {
return (
<div className="h-screen flex-1 flex-col flex overflow-hidden">
<Header title="Queue" />
<div className="flex-1 overflow-hidden">
<QueueDashApp
apiUrl={`${environment.subPath}/api/queue/trpc`}
basename="/admin/queue"
/>
</div>
</div>
);
}
export { QueuePage as Component };

View File

@@ -0,0 +1,5 @@
/* Scoped queuedash modal alignment */
.react-aria-ModalOverlay section[role='dialog'] {
transform: unset;
}

View File

@@ -13,6 +13,7 @@ export const ROUTES = {
setup: '/admin/setup',
accounts: '/admin/accounts',
workspaces: '/admin/workspaces',
queue: '/admin/queue',
ai: '/admin/ai',
settings: { index: '/admin/settings', module: '/admin/settings/:module' },
about: '/admin/about',
@@ -30,6 +31,7 @@ export const RELATIVE_ROUTES = {
setup: 'setup',
accounts: 'accounts',
workspaces: 'workspaces',
queue: 'queue',
ai: 'ai',
settings: { index: 'settings', module: ':module' },
about: 'about',
@@ -45,6 +47,7 @@ admin.auth = () => '/admin/auth';
admin.setup = () => '/admin/setup';
admin.accounts = () => '/admin/accounts';
admin.workspaces = () => '/admin/workspaces';
admin.queue = () => '/admin/queue';
admin.ai = () => '/admin/ai';
const admin_settings = () => '/admin/settings';
admin_settings.module = (params: { module: string }) =>

2339
yarn.lock

File diff suppressed because it is too large Load Diff