fix(server): realtime loading (#14959)

#### PR Dependency Tree


* **PR #14959** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Refactor**
* Rewired realtime and copilot services to require their runtime
dependencies, improving reliability and removing nullable/optional
runtime paths.

* **Tests**
* Centralized service creation in tests with helper factories and added
checks ensuring realtime dependency injection is configured as expected.

<!-- review_stack_entry_start -->

[![Review Change
Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/toeverything/AFFiNE/pull/14959)

<!-- review_stack_entry_end -->
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-05-14 11:54:45 +08:00
committed by GitHub
parent 419fc5d5e0
commit f626dbd590
11 changed files with 83 additions and 45 deletions
@@ -137,6 +137,21 @@ function createSuccessfulTranscriptBridge(
};
}
function createCopilotTranscriptionService(...deps: unknown[]) {
return new CopilotTranscriptionService(
deps[0] as never,
deps[1] as never,
deps[2] as never,
deps[3] as never,
deps[4] as never,
deps[5] as never,
(deps[6] ?? {
assertQuotaOrByok: Sinon.stub().resolves(undefined),
}) as never,
(deps[7] ?? { publish: Sinon.stub() }) as never
);
}
test('queryTask hides ready transcript task result until settlement', async t => {
const payload = TranscriptPayloadSchema.parse({
infos: [
@@ -148,7 +163,7 @@ test('queryTask hides ready transcript task result until settlement', async t =>
],
normalizedTranscript: '00:00:05 A: Kickoff',
});
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -181,7 +196,7 @@ test('settleTask unlocks ready transcript task result idempotently', async t =>
status: 'settled',
protectedResult: payload,
});
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -216,7 +231,7 @@ test('settleTask checks copilot quota before unlocking ready task', async t => {
protectedResult: payload,
});
const assertQuotaOrByok = Sinon.stub().rejects(new Error('quota exceeded'));
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -248,7 +263,7 @@ test('settleTask checks copilot quota before unlocking ready task', async t => {
});
test('retryTask rejects ready transcript tasks', async t => {
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -272,7 +287,7 @@ test('retryTask rejects ready transcript tasks', async t => {
});
test('retryTask rejects settled transcript tasks', async t => {
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -306,7 +321,7 @@ test('retryTask reuses failed task and queues a new action attempt', async t =>
summaryJson: null,
providerMeta: { provider: 'gemini', model: 'gemini-2.5-flash' },
});
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -352,7 +367,7 @@ test('retryTask prechecks quota or BYOK before queueing provider work', async t
const payload = TranscriptPayloadSchema.parse({
normalizedTranscript: '00:00:05 A: Kickoff',
});
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -391,7 +406,7 @@ for (const status of ['ready', 'settled']) {
test(`submitTask allows a new task for the same blob after ${status} task`, async t => {
const createdTasks: unknown[] = [];
const queuedJobs: unknown[] = [];
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves({
@@ -439,7 +454,7 @@ for (const status of ['ready', 'settled']) {
test('submitTask prechecks quota or BYOK before persisting uploads', async t => {
const assertQuotaOrByok = Sinon.stub().rejects(new Error('quota exceeded'));
const resolveTranscriptionModel = Sinon.stub().resolves('gemini-2.5-flash');
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves(null),
@@ -468,7 +483,7 @@ test('submitTask prechecks quota or BYOK before persisting uploads', async t =>
});
test('submitTask rejects unavailable transcript strategy', async t => {
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
getWithUser: Sinon.stub().resolves(null),
@@ -515,7 +530,7 @@ test('transcriptTask runs native transcript recipe through action bridge when av
const bridgeInputs: unknown[] = [];
const markRunning = Sinon.stub().resolves({ id: 'task-1' });
const complete = Sinon.stub().resolves({ id: 'task-1', status: 'ready' });
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
get: Sinon.stub().resolves({
@@ -586,7 +601,7 @@ test('transcriptTask fails task when native action bridge reports an error event
normalizedTranscript: '00:00:05 A: Kickoff',
});
const complete = Sinon.stub().resolves({ id: 'task-1', status: 'failed' });
const service = new CopilotTranscriptionService(
const service = createCopilotTranscriptionService(
{
copilotTranscriptTask: {
get: Sinon.stub().resolves({
+4 -1
View File
@@ -185,7 +185,10 @@ export function buildAppModule(env: Env) {
.useIf(
() => env.flavors.sync || env.flavors.front,
SyncModule,
TelemetryModule,
TelemetryModule
)
.useIf(
() => !env.flavors.graphql && (env.flavors.sync || env.flavors.front),
CopilotRealtimeModule
)
// graphql server only
@@ -1,12 +1,12 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { z } from 'zod';
import { decodeWithJson, encodeWithJson } from '../../base/graphql';
import { AccessController } from '../permission';
import {
realtimeCommentRoom,
type RealtimePublisher,
type RealtimeRegistry,
RealtimePublisher,
RealtimeRegistry,
registerRealtimeLiveQuery,
} from '../realtime';
import type { CommentCursor } from './resolver';
@@ -21,7 +21,7 @@ export class CommentRealtimeProvider implements OnModuleInit {
constructor(
private readonly service: CommentService,
private readonly ac: AccessController,
@Optional() private readonly registry?: RealtimeRegistry
private readonly registry: RealtimeRegistry
) {}
onModuleInit() {
@@ -1,6 +1,5 @@
import { randomUUID } from 'node:crypto';
import { Optional } from '@nestjs/common';
import {
Args,
Mutation,
@@ -27,7 +26,7 @@ import { Comment, DocMode, Models, Reply } from '../../models';
import { CurrentUser } from '../auth/session';
import { ServerFeature, ServerService } from '../config';
import { AccessController, DocAction } from '../permission';
import type { RealtimePublisher } from '../realtime';
import { RealtimePublisher } from '../realtime';
import { CommentAttachmentStorage } from '../storage';
import { UserType } from '../user';
import { WorkspaceType } from '../workspaces';
@@ -60,7 +59,7 @@ export class CommentResolver {
private readonly queue: JobQueue,
private readonly models: Models,
private readonly server: ServerService,
@Optional() private readonly realtime?: RealtimePublisher
private readonly realtime: RealtimePublisher
) {
// enable comment feature by default
this.server.enableFeature(ServerFeature.Comment);
@@ -1,9 +1,9 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { z } from 'zod';
import {
realtimeNotificationRoom,
type RealtimeRegistry,
RealtimeRegistry,
registerRealtimeLiveQuery,
} from '../realtime';
import { NotificationService } from './service';
@@ -12,7 +12,7 @@ import { NotificationService } from './service';
export class NotificationRealtimeProvider implements OnModuleInit {
constructor(
private readonly service: NotificationService,
@Optional() private readonly registry?: RealtimeRegistry
private readonly registry: RealtimeRegistry
) {}
onModuleInit() {
@@ -1,4 +1,4 @@
import { Injectable, Logger, Optional } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { NotificationNotFound, PaginationInput, URLHelper } from '../../base';
@@ -17,8 +17,7 @@ import {
} from '../../models';
import { DocReader } from '../doc';
import { Mailer } from '../mail';
import type { RealtimePublisher } from '../realtime';
import { realtimeNotificationRoom } from '../realtime';
import { realtimeNotificationRoom, RealtimePublisher } from '../realtime';
import { generateDocPath } from '../utils/doc';
import {
generateWorkspaceSettingsPath,
@@ -34,7 +33,7 @@ export class NotificationService {
private readonly docReader: DocReader,
private readonly mailer: Mailer,
private readonly url: URLHelper,
@Optional() private readonly realtime?: RealtimePublisher
private readonly realtime: RealtimePublisher
) {}
async cleanExpiredNotifications() {
@@ -5,6 +5,8 @@ import { z } from 'zod';
import type { CopilotTranscriptionReader } from '../../../plugins/copilot/transcript';
import { CopilotTranscriptRealtimeProvider } from '../../../plugins/copilot/transcript';
import type { CurrentUser } from '../../auth';
import { CommentRealtimeProvider } from '../../comment/realtime';
import { NotificationRealtimeProvider } from '../../notification/realtime';
import type { AccessController } from '../../permission';
import { RealtimeGateway } from '../gateway';
import {
@@ -194,6 +196,26 @@ test('registerRealtimeLiveQuery registers paired request and topic handlers', as
);
});
test('realtime providers expose runtime injection metadata for registry dependencies', t => {
t.true(
Reflect.getMetadata(
'design:paramtypes',
NotificationRealtimeProvider
).includes(RealtimeRegistry)
);
t.true(
Reflect.getMetadata('design:paramtypes', CommentRealtimeProvider).includes(
RealtimeRegistry
)
);
t.true(
Reflect.getMetadata(
'design:paramtypes',
CopilotTranscriptRealtimeProvider
).includes(RealtimeRegistry)
);
});
test('copilot transcript realtime provider registers task live query handlers', async t => {
const registry = new RealtimeRegistry();
const assertions: unknown[] = [];
@@ -1,6 +1,6 @@
import type { RealtimeRequestName, RealtimeTopicName } from '@affine/realtime';
import type { RealtimeRegistry } from './registry';
import { RealtimeRegistry } from './registry';
import type { RealtimeRequestHandler, RealtimeTopicHandler } from './types';
export type RealtimeLiveQueryDefinition<
@@ -15,9 +15,9 @@ export function registerRealtimeLiveQuery<
Request extends RealtimeRequestName,
Topic extends RealtimeTopicName,
>(
registry: RealtimeRegistry | undefined,
registry: RealtimeRegistry,
definition: RealtimeLiveQueryDefinition<Request, Topic>
) {
registry?.registerRequest(definition.request);
registry?.registerTopic(definition.topic);
registry.registerRequest(definition.request);
registry.registerTopic(definition.topic);
}
@@ -1,4 +1,4 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { z } from 'zod';
import { OnEvent } from '../../../base';
@@ -22,8 +22,8 @@ export class CopilotEmbeddingRealtimeProvider implements OnModuleInit {
private readonly ac: AccessController,
private readonly models: Models,
private readonly context: CopilotContextService,
@Optional() private readonly registry?: RealtimeRegistry,
@Optional() private readonly publisher?: RealtimePublisher
private readonly registry: RealtimeRegistry,
private readonly publisher: RealtimePublisher
) {}
onModuleInit() {
@@ -1,10 +1,10 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { z } from 'zod';
import { CopilotTranscriptionJobNotFound } from '../../../base';
import { AccessController } from '../../../core/permission';
import {
type RealtimeRegistry,
RealtimeRegistry,
realtimeTranscriptTaskRoom,
registerRealtimeLiveQuery,
} from '../../../core/realtime';
@@ -15,7 +15,7 @@ export class CopilotTranscriptRealtimeProvider implements OnModuleInit {
constructor(
private readonly ac: AccessController,
private readonly transcript: CopilotTranscriptionReader,
@Optional() private readonly registry?: RealtimeRegistry
private readonly registry: RealtimeRegistry
) {}
onModuleInit() {
@@ -1,4 +1,4 @@
import { BadRequestException, Injectable, Optional } from '@nestjs/common';
import { BadRequestException, Injectable } from '@nestjs/common';
import { AiJobStatus } from '@prisma/client';
import {
@@ -10,7 +10,7 @@ import {
sniffMime,
} from '../../../base';
import {
type RealtimePublisher,
RealtimePublisher,
realtimeTranscriptTaskRoom,
} from '../../../core/realtime';
import { Models } from '../../../models';
@@ -45,8 +45,8 @@ export class CopilotTranscriptionService {
private readonly tasks: TaskPolicy,
private readonly prompts: PromptService,
private readonly actionBridge: ActionRuntimeBridge,
@Optional() private readonly access?: CopilotAccessPolicy,
@Optional() private readonly realtime?: RealtimePublisher
private readonly access: CopilotAccessPolicy,
private readonly realtime: RealtimePublisher
) {}
private parseTaskPayload(payload: unknown): TranscriptionPayloadV2 {
@@ -180,7 +180,7 @@ export class CopilotTranscriptionService {
throw new CopilotTranscriptionJobExists();
}
await this.access?.assertQuotaOrByok({
await this.access.assertQuotaOrByok({
userId,
workspaceId,
featureKind: 'transcript',
@@ -234,7 +234,7 @@ export class CopilotTranscriptionService {
);
}
await this.access?.assertQuotaOrByok({
await this.access.assertQuotaOrByok({
userId,
workspaceId,
featureKind: 'transcript',
@@ -282,7 +282,7 @@ export class CopilotTranscriptionService {
return taskToJob(task);
}
await this.access?.assertQuotaOrByok({
await this.access.assertQuotaOrByok({
userId,
workspaceId,
featureKind: 'transcript',
@@ -412,7 +412,7 @@ export class CopilotTranscriptionService {
status: AiJobStatus,
error?: string
) {
this.realtime?.publish(
this.realtime.publish(
'copilot.transcript.task.changed',
{ workspaceId, taskId },
{ taskId, status, error },