fix(server): realtime handler (#15146)

#### PR Dependency Tree


* **PR #15146** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

* **Refactor**
* Reworked real-time backend wiring to centralize workspace, comments,
and Copilot embedding handlers under a unified server setup.
* Updated Copilot embedding real-time handling to use context
configuration when publishing updates.
* **New Features**
* Added automatic startup validation to ensure all required real-time
request/topic handlers are registered (for applicable server flavors).
* **Bug Fixes**
* Workspace real-time access now determines team status from quota
state.
* Improved Copilot embedding progress publishing (including completion
events).
* **Tests**
* Expanded real-time registry completeness and Copilot embedding
provider coverage.
  * Added quota-state restoration coverage after clearing stale expiry.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-06-24 15:18:35 +08:00
committed by GitHub
parent c1c19be271
commit c41d613b6e
18 changed files with 425 additions and 38 deletions
+3 -2
View File
@@ -55,13 +55,14 @@ import { Env } from './env';
import { ModelsModule } from './models';
import { CalendarModule } from './plugins/calendar';
import { CaptchaModule } from './plugins/captcha';
import { CopilotModule, CopilotRealtimeModule } from './plugins/copilot';
import { CopilotModule } from './plugins/copilot';
import { GCloudModule } from './plugins/gcloud';
import { IndexerModule } from './plugins/indexer';
import { LicenseModule } from './plugins/license';
import { OAuthModule } from './plugins/oauth';
import { PaymentModule } from './plugins/payment';
import { WorkerModule } from './plugins/worker';
import { ServerRealtimeHandlersModule } from './realtime-handlers.module';
export const FunctionalityModules = [
ClsModule.forRoot({
@@ -188,7 +189,7 @@ export function buildAppModule(env: Env) {
)
.useIf(
() => !env.flavors.graphql && (env.flavors.sync || env.flavors.front),
CopilotRealtimeModule
ServerRealtimeHandlersModule
)
// graphql server only
.useIf(
@@ -3,13 +3,20 @@ import { Module } from '@nestjs/common';
import { ServerConfigModule } from '../config';
import { PermissionModule } from '../permission';
import { StorageModule } from '../storage';
import { CommentRealtimeProvider } from './realtime';
import { CommentRealtimeModule } from './realtime.module';
import { CommentResolver } from './resolver';
import { CommentService } from './service';
@Module({
imports: [PermissionModule, StorageModule, ServerConfigModule],
providers: [CommentResolver, CommentService, CommentRealtimeProvider],
exports: [CommentService],
imports: [
PermissionModule,
StorageModule,
ServerConfigModule,
CommentRealtimeModule,
],
providers: [CommentResolver],
exports: [CommentRealtimeModule],
})
export class CommentModule {}
export { CommentRealtimeModule } from './realtime.module';
export { CommentService } from './service';
@@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { PermissionModule } from '../permission';
import { CommentRealtimeProvider } from './realtime';
import { CommentService } from './service';
@Module({
imports: [PermissionModule],
providers: [CommentService, CommentRealtimeProvider],
exports: [CommentService],
})
export class CommentRealtimeModule {}
@@ -175,7 +175,7 @@ export class EntitlementService {
quantity:
targetType === 'workspace'
? this.normalizedQuantity(input.quantity)
: undefined,
: null,
metadata: {
provider: input.provider ?? 'stripe',
recurring: input.recurring,
@@ -184,8 +184,8 @@ export class EntitlementService {
stripeSubscriptionId: input.stripeSubscriptionId ?? null,
legacySync: options.legacySync ?? false,
},
startsAt: input.start ?? undefined,
expiresAt: input.end ?? undefined,
startsAt: input.start ?? null,
expiresAt: input.end ?? null,
graceUntil:
status === 'grace' ? (input.trialEnd ?? input.end ?? new Date()) : null,
validatedAt: new Date(),
@@ -246,6 +246,46 @@ test('user quota state keeps ai capability alongside pro entitlement', async t =
t.is(quota.copilotActionLimit, undefined);
});
test('user quota state restores ai overlay after stale expiry is cleared', async t => {
const { owner } = await createWorkspace(t);
await t.context.entitlement.upsertFromCloudSubscription({
targetId: owner.id,
plan: SubscriptionPlan.Pro,
recurring: SubscriptionRecurring.Yearly,
status: 'active',
});
await t.context.db.entitlement.create({
data: {
targetType: 'user',
targetId: owner.id,
source: 'cloud_subscription',
plan: 'ai',
status: 'active',
subjectId: `${owner.id}:${SubscriptionPlan.AI}`,
metadata: {},
expiresAt: new Date('2020-01-01T00:00:00Z'),
},
});
await t.context.entitlement.upsertFromCloudSubscription({
targetId: owner.id,
plan: SubscriptionPlan.AI,
recurring: SubscriptionRecurring.Monthly,
status: 'active',
});
const state = await t.context.state.reconcileUserQuotaState(owner.id);
const ai = await t.context.db.entitlement.findFirstOrThrow({
where: {
targetId: owner.id,
source: 'cloud_subscription',
plan: 'ai',
},
});
t.is(ai.expiresAt, null);
t.deepEqual(state.flags, { unlimitedCopilot: true });
});
test('ai entitlement is a capability overlay on free quota', async t => {
const { owner } = await createWorkspace(t);
await t.context.entitlement.upsertFromCloudSubscription({
@@ -29,6 +29,12 @@ export class QuotaStateService {
private readonly event: EventBus
) {}
async getWorkspaceQuotaState(workspaceId: string) {
return await this.db.effectiveWorkspaceQuotaState.findUnique({
where: { workspaceId },
});
}
async reconcileUserQuotaState(
userId: string,
options: { emit?: boolean } = {}
@@ -5,7 +5,9 @@ import {
import test from 'ava';
import { z } from 'zod';
import { Flavor } from '../../../env';
import { PublicDocMode } from '../../../models';
import { CopilotEmbeddingRealtimeProvider } from '../../../plugins/copilot/context';
import type { CopilotTranscriptionReader } from '../../../plugins/copilot/transcript';
import { CopilotTranscriptRealtimeProvider } from '../../../plugins/copilot/transcript';
import type { CurrentUser } from '../../auth';
@@ -27,8 +29,11 @@ import {
WorkspaceConfigRealtimeProvider,
WorkspaceMembersRealtimeProvider,
} from '../../workspaces/realtime';
import { RealtimeRegistryCompletenessChecker } from '../completeness';
import { RealtimeGateway } from '../gateway';
import {
REALTIME_GATEWAY_REQUIRED_REQUESTS,
REALTIME_GATEWAY_REQUIRED_TOPICS,
realtimeCommentRoom,
realtimeDocGrantsRoom,
realtimeDocShareStateRoom,
@@ -90,6 +95,26 @@ test('registry rejects duplicate request and topic handlers', t => {
});
});
test('realtime registry completeness check only runs for explicit gateway flavors', t => {
const env = globalThis.env as unknown as { FLAVOR: Flavor };
const originalFlavor = globalThis.env.FLAVOR;
try {
const checker = new RealtimeRegistryCompletenessChecker(
new RealtimeRegistry()
);
env.FLAVOR = Flavor.AllInOne;
t.notThrows(() => checker.onApplicationBootstrap());
env.FLAVOR = Flavor.Front;
t.throws(() => checker.onApplicationBootstrap(), {
message: /Realtime gateway missing handlers/,
});
} finally {
env.FLAVOR = originalFlavor;
}
});
test('gateway handles registered request with version gate', async t => {
const registry = new RealtimeRegistry();
registry.registerRequest({
@@ -255,6 +280,12 @@ test('realtime providers expose runtime injection metadata for registry dependen
CopilotTranscriptRealtimeProvider
).includes(RealtimeRegistry)
);
t.true(
Reflect.getMetadata(
'design:paramtypes',
CopilotEmbeddingRealtimeProvider
).includes(RealtimeRegistry)
);
t.true(
Reflect.getMetadata(
'design:paramtypes',
@@ -297,6 +328,73 @@ test('realtime providers expose runtime injection metadata for registry dependen
);
});
test('front and sync realtime gateway required handlers are registered by lightweight providers', t => {
const registry = new RealtimeRegistry();
new WorkspaceAccessRealtimeProvider(
{} as never,
{} as never,
registry
).onModuleInit();
new WorkspaceConfigRealtimeProvider(
{} as never,
{} as never,
registry
).onModuleInit();
new WorkspaceMembersRealtimeProvider(
{} as never,
{} as never,
{} as never,
{} as never,
registry
).onModuleInit();
new DocShareRealtimeProvider(
{} as never,
{} as never,
registry
).onModuleInit();
new DocGrantsRealtimeProvider(
{} as never,
{} as never,
{} as never,
registry
).onModuleInit();
new UserRealtimeProvider({} as never, registry).onModuleInit();
new NotificationRealtimeProvider({} as never, registry).onModuleInit();
new CommentRealtimeProvider(
{} as never,
{} as never,
registry
).onModuleInit();
new CopilotEmbeddingRealtimeProvider(
{} as never,
{} as never,
registry,
{} as never
).onModuleInit();
new CopilotTranscriptRealtimeProvider(
{} as never,
{} as never,
registry
).onModuleInit();
new QuotaStateRealtimeProvider(
{} as never,
{} as never,
registry
).onModuleInit();
t.deepEqual(
REALTIME_GATEWAY_REQUIRED_REQUESTS.filter(
name => !registry.hasRequest(name)
),
[]
);
t.deepEqual(
REALTIME_GATEWAY_REQUIRED_TOPICS.filter(name => !registry.hasTopic(name)),
[]
);
});
test('workspace realtime providers register access, config, members and invite link handlers', async t => {
const registry = new RealtimeRegistry();
const assertions: unknown[] = [];
@@ -349,8 +447,11 @@ test('workspace realtime providers register access, config, members and invite l
count: async () => 1,
},
};
const workspaceService = {
isTeamWorkspace: async () => true,
const quotaState = {
getWorkspaceQuotaState: async () => ({ known: true, plan: 'team' }),
reconcileWorkspaceQuotaState: async () => {
throw new Error('workspace.access.get should not reconcile quota state');
},
};
const cache = {
get: async () => ({ inviteId: 'invite-link' }),
@@ -362,7 +463,7 @@ test('workspace realtime providers register access, config, members and invite l
new WorkspaceAccessRealtimeProvider(
ac,
workspaceService as never,
quotaState as never,
registry
).onModuleInit();
new WorkspaceConfigRealtimeProvider(
@@ -832,6 +933,76 @@ test('quota realtime provider exposes effective quota state snapshots', async t
);
});
test('copilot embedding realtime provider uses lightweight model reads', async t => {
const registry = new RealtimeRegistry();
const published: unknown[][] = [];
const assertions: unknown[] = [];
const ac = {
user(userId: string) {
return {
workspace(workspaceId: string) {
return {
allowLocal() {
return this;
},
async assert(action: string) {
assertions.push({ userId, workspaceId, action });
},
};
},
};
},
} as unknown as PermissionAccess;
const models = {
copilotWorkspace: {
checkEmbeddingAvailable: async () => true,
getEmbeddingStatus: async () => ({ total: 5, embedded: 3 }),
},
copilotContext: {
getConfig: async () => ({ workspaceId: 'space' }),
},
};
const publisher = {
publish: (...args: unknown[]) => published.push(args),
} as unknown as RealtimePublisher;
const provider = new CopilotEmbeddingRealtimeProvider(
ac,
models as never,
registry,
publisher
);
provider.onModuleInit();
t.deepEqual(
await registry
.getRequest('workspace.embedding.progress.get')
.handle(user, { workspaceId: 'space' }),
{
total: 5,
embedded: 3,
}
);
t.is(
registry
.getTopic('workspace.embedding.progress.changed')
.room(user, { workspaceId: 'space' }),
realtimeWorkspaceEmbeddingProgressRoom('space')
);
await provider.onDocEmbedFinished({ contextId: 'context', docId: 'doc' });
t.deepEqual(assertions, [
{ userId: 'u1', workspaceId: 'space', action: 'Workspace.Copilot' },
]);
t.deepEqual(published[0], [
'workspace.embedding.progress.changed',
{ workspaceId: 'space' },
{ reason: 'finished' },
{ room: realtimeWorkspaceEmbeddingProgressRoom('space') },
]);
});
test('copilot transcript realtime provider registers task live query handlers', async t => {
const registry = new RealtimeRegistry();
const assertions: unknown[] = [];
@@ -0,0 +1,43 @@
import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { Flavor } from '../../env';
import { RealtimeRegistry } from './registry';
import {
REALTIME_GATEWAY_REQUIRED_REQUESTS,
REALTIME_GATEWAY_REQUIRED_TOPICS,
} from './required-handlers';
@Injectable()
export class RealtimeRegistryCompletenessChecker implements OnApplicationBootstrap {
constructor(private readonly registry: RealtimeRegistry) {}
onApplicationBootstrap() {
if (
globalThis.env.FLAVOR !== Flavor.Front &&
globalThis.env.FLAVOR !== Flavor.Sync
) {
return;
}
const missingRequests = REALTIME_GATEWAY_REQUIRED_REQUESTS.filter(
name => !this.registry.hasRequest(name)
);
const missingTopics = REALTIME_GATEWAY_REQUIRED_TOPICS.filter(
name => !this.registry.hasTopic(name)
);
if (missingRequests.length || missingTopics.length) {
throw new Error(
[
'Realtime gateway missing handlers.',
missingRequests.length
? `requests: ${missingRequests.join(', ')}.`
: null,
missingTopics.length ? `topics: ${missingTopics.join(', ')}.` : null,
]
.filter(Boolean)
.join(' ')
);
}
}
}
@@ -1,19 +1,30 @@
import { Global, Module } from '@nestjs/common';
import { RealtimeRegistryCompletenessChecker } from './completeness';
import { RealtimeGateway } from './gateway';
import { RealtimePublisher } from './publisher';
import { RealtimeRegistry } from './registry';
@Global()
@Module({
providers: [RealtimeRegistry, RealtimePublisher, RealtimeGateway],
providers: [
RealtimeRegistry,
RealtimePublisher,
RealtimeGateway,
RealtimeRegistryCompletenessChecker,
],
exports: [RealtimeRegistry, RealtimePublisher],
})
export class RealtimeModule {}
export { RealtimeRegistryCompletenessChecker } from './completeness';
export { registerRealtimeLiveQuery } from './provider';
export { RealtimePublisher } from './publisher';
export { RealtimeRegistry } from './registry';
export {
REALTIME_GATEWAY_REQUIRED_REQUESTS,
REALTIME_GATEWAY_REQUIRED_TOPICS,
} from './required-handlers';
export {
realtimeCommentRoom,
realtimeDocGrantsRoom,
@@ -50,6 +50,10 @@ export class RealtimeRegistry {
return handler;
}
hasRequest(name: RealtimeRequestName) {
return this.requests.has(name);
}
getTopic(name: RealtimeTopicName) {
const handler = this.topics.get(name);
if (!handler) {
@@ -57,4 +61,8 @@ export class RealtimeRegistry {
}
return handler;
}
hasTopic(name: RealtimeTopicName) {
return this.topics.has(name);
}
}
@@ -0,0 +1,37 @@
import type { RealtimeRequestName, RealtimeTopicName } from '@affine/realtime';
export const REALTIME_GATEWAY_REQUIRED_REQUESTS = [
'workspace.access.get',
'workspace.config.get',
'workspace.members.get',
'workspace.invite-link.get',
'doc.share-state.get',
'doc.grants.get',
'user.profile.get',
'user.settings.get',
'user.access-tokens.get',
'notification.count.get',
'comment.changes.get',
'workspace.embedding.progress.get',
'copilot.transcript.task.get',
'user.quota-state.get',
'workspace.quota-state.get',
] as const satisfies readonly RealtimeRequestName[];
export const REALTIME_GATEWAY_REQUIRED_TOPICS = [
'workspace.access.changed',
'workspace.config.changed',
'workspace.members.changed',
'workspace.invite-link.changed',
'doc.share-state.changed',
'doc.grants.changed',
'user.profile.changed',
'user.settings.changed',
'user.access-tokens.changed',
'notification.count.changed',
'comment.changed',
'workspace.embedding.progress.changed',
'copilot.transcript.task.changed',
'user.quota-state.changed',
'workspace.quota-state.changed',
] as const satisfies readonly RealtimeTopicName[];
@@ -10,17 +10,8 @@ import { QuotaModule } from '../quota';
import { StorageModule } from '../storage';
import { UserModule } from '../user';
import { WorkspacesController } from './controller';
import { DocGrantsService } from './doc-grants';
import {
DocGrantsRealtimeProvider,
DocShareRealtimeProvider,
} from './doc-realtime';
import { WorkspaceEvents } from './event';
import {
WorkspaceAccessRealtimeProvider,
WorkspaceConfigRealtimeProvider,
WorkspaceMembersRealtimeProvider,
} from './realtime';
import { WorkspaceRealtimeModule } from './realtime.module';
import {
DocHistoryResolver,
DocResolver,
@@ -44,6 +35,7 @@ import { WorkspaceStatsJob } from './stats.job';
PermissionModule,
NotificationModule,
MailModule,
WorkspaceRealtimeModule,
],
controllers: [WorkspacesController],
providers: [
@@ -54,13 +46,7 @@ import { WorkspaceStatsJob } from './stats.job';
DocHistoryResolver,
WorkspaceBlobResolver,
WorkspaceService,
DocGrantsService,
WorkspaceEvents,
WorkspaceAccessRealtimeProvider,
WorkspaceConfigRealtimeProvider,
WorkspaceMembersRealtimeProvider,
DocShareRealtimeProvider,
DocGrantsRealtimeProvider,
AdminWorkspaceResolver,
WorkspaceStatsJob,
],
@@ -68,5 +54,6 @@ import { WorkspaceStatsJob } from './stats.job';
})
export class WorkspaceModule {}
export { WorkspaceRealtimeModule } from './realtime.module';
export { WorkspaceService } from './service';
export { InvitationType, WorkspaceType } from './types';
@@ -0,0 +1,28 @@
import { Module } from '@nestjs/common';
import { PermissionModule } from '../permission';
import { QuotaServiceModule } from '../quota';
import { DocGrantsService } from './doc-grants';
import {
DocGrantsRealtimeProvider,
DocShareRealtimeProvider,
} from './doc-realtime';
import {
WorkspaceAccessRealtimeProvider,
WorkspaceConfigRealtimeProvider,
WorkspaceMembersRealtimeProvider,
} from './realtime';
@Module({
imports: [PermissionModule, QuotaServiceModule],
providers: [
DocGrantsService,
WorkspaceAccessRealtimeProvider,
WorkspaceConfigRealtimeProvider,
WorkspaceMembersRealtimeProvider,
DocShareRealtimeProvider,
DocGrantsRealtimeProvider,
],
exports: [DocGrantsService],
})
export class WorkspaceRealtimeModule {}
@@ -23,6 +23,7 @@ import {
PermissionAccess,
WorkspaceRole,
} from '../permission';
import { QuotaStateService } from '../quota';
import { registerRealtimeLiveQuery } from '../realtime/provider';
import { RealtimePublisher } from '../realtime/publisher';
import { RealtimeRegistry } from '../realtime/registry';
@@ -32,7 +33,6 @@ import {
realtimeWorkspaceInviteLinkRoom,
realtimeWorkspaceMembersRoom,
} from '../realtime/rooms';
import { WorkspaceService } from './service';
const workspaceInput = z.object({ workspaceId: z.string() }).strict();
@@ -58,7 +58,7 @@ function serializeWorkspaceMember(
export class WorkspaceAccessRealtimeProvider implements OnModuleInit {
constructor(
private readonly ac: PermissionAccess,
private readonly workspaceService: WorkspaceService,
private readonly quotaState: QuotaStateService,
@Optional() private readonly registry?: RealtimeRegistry,
@Optional() private readonly publisher?: RealtimePublisher
) {}
@@ -125,10 +125,16 @@ export class WorkspaceAccessRealtimeProvider implements OnModuleInit {
return {
role: role ? WorkspaceRole[role] : WorkspaceRole[WorkspaceRole.External],
permissions: mapPermissionsToGraphqlPermissions(permissions),
team: await this.workspaceService.isTeamWorkspace(workspaceId),
team: await this.isTeamWorkspace(workspaceId),
};
}
private async isTeamWorkspace(workspaceId: string) {
const state = await this.quotaState.getWorkspaceQuotaState(workspaceId);
if (!state?.known) return false;
return ['team', 'selfhost_team'].includes(state.plan);
}
private publish(workspaceId: string, reason: string) {
this.publisher?.publishChanged(
'workspace.access.changed',
@@ -10,7 +10,6 @@ import {
registerRealtimeLiveQuery,
} from '../../../core/realtime';
import { Models } from '../../../models';
import { CopilotContextService } from './service';
export function workspaceEmbeddingRoom(workspaceId: string) {
return realtimeWorkspaceEmbeddingProgressRoom(workspaceId);
@@ -21,7 +20,6 @@ export class CopilotEmbeddingRealtimeProvider implements OnModuleInit {
constructor(
private readonly ac: PermissionAccess,
private readonly models: Models,
private readonly context: CopilotContextService,
private readonly registry: RealtimeRegistry,
private readonly publisher: RealtimePublisher
) {}
@@ -35,7 +33,9 @@ export class CopilotEmbeddingRealtimeProvider implements OnModuleInit {
input,
handle: async (user, payload) => {
await this.assertCopilot(user.id, payload.workspaceId);
if (!this.context.canEmbedding) {
const canEmbedding =
await this.models.copilotWorkspace.checkEmbeddingAvailable();
if (!canEmbedding) {
return { total: 0, embedded: 0 };
}
return await this.models.copilotWorkspace.getEmbeddingStatus(
@@ -89,7 +89,8 @@ export class CopilotEmbeddingRealtimeProvider implements OnModuleInit {
reason: 'finished' | 'failed'
) {
if (!this.publisher) return;
const context = await this.context.get(contextId);
const context = await this.models.copilotContext.getConfig(contextId);
if (!context) return;
this.publishWorkspace(context.workspaceId, reason);
}
@@ -14,6 +14,7 @@ import { CopilotController } from './controller';
import { WorkspaceMcpController } from './mcp/controller';
import {
COPILOT_API_PROVIDERS,
COPILOT_CONTEXT_REALTIME_PROVIDERS,
COPILOT_FEATURE_PROVIDERS,
COPILOT_KERNEL_PROVIDERS,
COPILOT_TRANSCRIPT_REALTIME_PROVIDERS,
@@ -43,6 +44,12 @@ export class CopilotKernelModule {}
})
export class CopilotRealtimeModule {}
@Module({
imports: [PermissionModule],
providers: [...COPILOT_CONTEXT_REALTIME_PROVIDERS],
})
export class CopilotEmbeddingRealtimeModule {}
@Module({
imports: [...COPILOT_SHARED_IMPORTS, CopilotKernelModule],
providers: [...COPILOT_FEATURE_PROVIDERS],
@@ -109,9 +109,13 @@ export const COPILOT_RUNTIME_PROVIDERS = [
TurnPersistence,
];
export const COPILOT_CONTEXT_REALTIME_PROVIDERS = [
CopilotEmbeddingRealtimeProvider,
];
export const COPILOT_CONTEXT_PROVIDERS = [
CopilotContextResolver,
CopilotEmbeddingRealtimeProvider,
...COPILOT_CONTEXT_REALTIME_PROVIDERS,
];
export const COPILOT_TRANSCRIPT_REALTIME_PROVIDERS = [
@@ -0,0 +1,18 @@
import { Module } from '@nestjs/common';
import { CommentRealtimeModule } from './core/comment';
import { WorkspaceRealtimeModule } from './core/workspaces';
import {
CopilotEmbeddingRealtimeModule,
CopilotRealtimeModule,
} from './plugins/copilot';
@Module({
imports: [
WorkspaceRealtimeModule,
CommentRealtimeModule,
CopilotEmbeddingRealtimeModule,
CopilotRealtimeModule,
],
})
export class ServerRealtimeHandlersModule {}