diff --git a/packages/backend/server/src/core/comment/realtime.ts b/packages/backend/server/src/core/comment/realtime.ts index dcb6165bc8..f6ce858a43 100644 --- a/packages/backend/server/src/core/comment/realtime.ts +++ b/packages/backend/server/src/core/comment/realtime.ts @@ -3,12 +3,17 @@ import { z } from 'zod'; import { decodeWithJson, encodeWithJson } from '../../base/graphql'; import { AccessController } from '../permission'; -import type { RealtimePublisher, RealtimeRegistry } from '../realtime'; +import { + realtimeCommentRoom, + type RealtimePublisher, + type RealtimeRegistry, + registerRealtimeLiveQuery, +} from '../realtime'; import type { CommentCursor } from './resolver'; import { CommentService } from './service'; export function commentRoom(workspaceId: string, docId: string) { - return `workspace:${workspaceId}:doc:${docId}:comment`; + return realtimeCommentRoom(workspaceId, docId); } @Injectable() @@ -27,53 +32,57 @@ export class CommentRealtimeProvider implements OnModuleInit { first: z.number().optional(), }); - this.registry?.registerRequest({ - name: 'comment.changes.get', - input, - handle: async (user, payload) => { - await this.assertRead(user.id, payload.workspaceId, payload.docId); - const cursor: CommentCursor = decodeWithJson(payload.after) ?? {}; - const changes = await this.service.listCommentChanges( - payload.workspaceId, - payload.docId, - { - commentUpdatedAt: cursor.commentUpdatedAt, - replyUpdatedAt: cursor.replyUpdatedAt, - take: payload.first, + registerRealtimeLiveQuery(this.registry, { + request: { + name: 'comment.changes.get', + input, + handle: async (user, payload) => { + await this.assertRead(user.id, payload.workspaceId, payload.docId); + const cursor: CommentCursor = decodeWithJson(payload.after) ?? {}; + const limit = payload.first; + const changes = await this.service.listCommentChanges( + payload.workspaceId, + payload.docId, + { + commentUpdatedAt: cursor.commentUpdatedAt, + replyUpdatedAt: cursor.replyUpdatedAt, + take: limit ? limit + 1 : undefined, + } + ); + const pageChanges = limit ? changes.slice(0, limit) : changes; + const endCursor = cursor; + for (const change of pageChanges) { + if (change.commentId) { + endCursor.replyUpdatedAt = change.item.updatedAt; + } else { + endCursor.commentUpdatedAt = change.item.updatedAt; + } } - ); - const endCursor = cursor; - for (const change of changes) { - if (change.commentId) { - endCursor.replyUpdatedAt = change.item.updatedAt; - } else { - endCursor.commentUpdatedAt = change.item.updatedAt; - } - } - return { - changes: changes.map(change => ({ - id: change.id, - action: change.action, - item: change.item, - commentId: change.commentId ?? undefined, - })) as never, - startCursor: '', - endCursor: encodeWithJson(endCursor), - hasNextPage: changes.length > 0, - }; + return { + changes: pageChanges.map(change => ({ + id: change.id, + action: change.action, + item: change.item, + commentId: change.commentId ?? null, + })), + startCursor: '', + endCursor: encodeWithJson(endCursor), + hasNextPage: limit ? changes.length > limit : false, + }; + }, }, - }); - - this.registry?.registerTopic({ - name: 'comment.changed', - input: z.object({ - workspaceId: z.string(), - docId: z.string(), - }), - authorize: async (user, payload) => { - await this.assertRead(user.id, payload.workspaceId, payload.docId); + topic: { + name: 'comment.changed', + input: z.object({ + workspaceId: z.string(), + docId: z.string(), + }), + authorize: async (user, payload) => { + await this.assertRead(user.id, payload.workspaceId, payload.docId); + }, + room: (_user, payload) => + commentRoom(payload.workspaceId, payload.docId), }, - room: (_user, payload) => commentRoom(payload.workspaceId, payload.docId), }); } diff --git a/packages/backend/server/src/core/notification/realtime-room.ts b/packages/backend/server/src/core/notification/realtime-room.ts deleted file mode 100644 index de24be2033..0000000000 --- a/packages/backend/server/src/core/notification/realtime-room.ts +++ /dev/null @@ -1,3 +0,0 @@ -export function notificationCountRoom(userId: string) { - return `user:${userId}:notification`; -} diff --git a/packages/backend/server/src/core/notification/realtime.ts b/packages/backend/server/src/core/notification/realtime.ts index 79e77c0822..9cda7bfa1f 100644 --- a/packages/backend/server/src/core/notification/realtime.ts +++ b/packages/backend/server/src/core/notification/realtime.ts @@ -1,8 +1,11 @@ import { Injectable, OnModuleInit, Optional } from '@nestjs/common'; import { z } from 'zod'; -import type { RealtimeRegistry } from '../realtime'; -import { notificationCountRoom } from './realtime-room'; +import { + realtimeNotificationRoom, + type RealtimeRegistry, + registerRealtimeLiveQuery, +} from '../realtime'; import { NotificationService } from './service'; @Injectable() @@ -13,23 +16,25 @@ export class NotificationRealtimeProvider implements OnModuleInit { ) {} onModuleInit() { - this.registry?.registerRequest({ - name: 'notification.count.get', - input: z.object({}).strict(), - handle: async user => ({ - count: await this.service.countByUserId(user.id), - }), - }); - - this.registry?.registerTopic({ - name: 'notification.count.changed', - input: z.object({}).strict(), - authorize: async () => {}, - room: user => { - if (!user) { - throw new Error('User is required for notification count room'); - } - return notificationCountRoom(user.id); + const input = z.object({}).strict(); + registerRealtimeLiveQuery(this.registry, { + request: { + name: 'notification.count.get', + input, + handle: async user => ({ + count: await this.service.countByUserId(user.id), + }), + }, + topic: { + name: 'notification.count.changed', + input, + authorize: async () => {}, + room: user => { + if (!user) { + throw new Error('User is required for notification count room'); + } + return realtimeNotificationRoom(user.id); + }, }, }); } diff --git a/packages/backend/server/src/core/notification/service.ts b/packages/backend/server/src/core/notification/service.ts index 439d7b282c..131a9e79ec 100644 --- a/packages/backend/server/src/core/notification/service.ts +++ b/packages/backend/server/src/core/notification/service.ts @@ -18,12 +18,12 @@ import { import { DocReader } from '../doc'; import { Mailer } from '../mail'; import type { RealtimePublisher } from '../realtime'; +import { realtimeNotificationRoom } from '../realtime'; import { generateDocPath } from '../utils/doc'; import { generateWorkspaceSettingsPath, WorkspaceSettingsTab, } from '../utils/workspace'; -import { notificationCountRoom } from './realtime-room'; @Injectable() export class NotificationService { @@ -499,7 +499,7 @@ export class NotificationService { 'notification.count.changed', {}, { count: await this.countByUserId(userId), reason }, - { room: notificationCountRoom(userId) } + { room: realtimeNotificationRoom(userId) } ); } catch (error) { this.logger.error( diff --git a/packages/backend/server/src/core/realtime/__tests__/registry.spec.ts b/packages/backend/server/src/core/realtime/__tests__/registry.spec.ts index b63b0186f1..e89039ff6b 100644 --- a/packages/backend/server/src/core/realtime/__tests__/registry.spec.ts +++ b/packages/backend/server/src/core/realtime/__tests__/registry.spec.ts @@ -1,11 +1,18 @@ +import { getRealtimeInputKey } from '@affine/realtime'; import test from 'ava'; import { z } from 'zod'; import type { CurrentUser } from '../../auth'; import { RealtimeGateway } from '../gateway'; -import type { RealtimePublisher } from '../publisher'; +import { + realtimeCommentRoom, + realtimeNotificationRoom, + realtimeTranscriptTaskRoom, + realtimeWorkspaceEmbeddingProgressRoom, + registerRealtimeLiveQuery, +} from '../index'; +import { RealtimePublisher } from '../publisher'; import { RealtimeRegistry } from '../registry'; -import { stableStringify } from '../stable-stringify'; const user: CurrentUser = { id: 'u1', @@ -109,7 +116,7 @@ test('gateway authorizes subscription and joins room', async t => { t.deepEqual(joined, ['workspace:space']); t.deepEqual(result, { data: { - subscriptionId: `socket-1:comment.changed:${stableStringify({ + subscriptionId: `socket-1:comment.changed:${getRealtimeInputKey({ workspaceId: 'space', docId: 'doc', })}`, @@ -126,22 +133,101 @@ test('gateway authorizes subscription and joins room', async t => { ); }); -test('stableStringify is deterministic for subscription input keys', t => { +test('getRealtimeInputKey is deterministic for subscription input keys', t => { t.is( - stableStringify({ docId: 'doc', workspaceId: 'space' }), - stableStringify({ workspaceId: 'space', docId: 'doc' }) + getRealtimeInputKey({ docId: 'doc', workspaceId: 'space' }), + getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' }) ); }); -test('stableStringify follows JSON semantics for subscription input keys', t => { - t.is(stableStringify({ after: undefined }), stableStringify({})); - t.is(stableStringify([undefined]), '[null]'); +test('getRealtimeInputKey follows JSON semantics for subscription input keys', t => { + t.is(getRealtimeInputKey({ after: undefined }), getRealtimeInputKey({})); + t.is(getRealtimeInputKey([undefined]), '[null]'); t.is( - stableStringify(new Date('2026-01-02T03:04:05.000Z')), + getRealtimeInputKey(new Date('2026-01-02T03:04:05.000Z')), '"2026-01-02T03:04:05.000Z"' ); }); +test('room helpers produce stable realtime room names', t => { + t.is(realtimeNotificationRoom('u1'), 'user:u1:notification'); + t.is(realtimeCommentRoom('space', 'doc'), 'workspace:space:doc:doc:comment'); + t.is( + realtimeWorkspaceEmbeddingProgressRoom('space'), + 'workspace:space:embedding-progress' + ); + t.is( + realtimeTranscriptTaskRoom('space', 'task'), + 'copilot:transcript:space:task' + ); +}); + +test('registerRealtimeLiveQuery registers paired request and topic handlers', async t => { + const registry = new RealtimeRegistry(); + + registerRealtimeLiveQuery(registry, { + request: { + name: 'notification.count.get', + input: z.object({}).strict(), + handle: async () => ({ count: 7 }), + }, + topic: { + name: 'notification.count.changed', + input: z.object({}).strict(), + authorize: async () => {}, + room: currentUser => `user:${currentUser?.id}:notification`, + }, + }); + + t.deepEqual( + await registry.getRequest('notification.count.get').handle(user, {}), + { + count: 7, + } + ); + t.is( + registry.getTopic('notification.count.changed').room(user, {}), + 'user:u1:notification' + ); +}); + +test('publisher emits realtime event with shared input key', t => { + const registry = new RealtimeRegistry(); + registry.registerTopic({ + name: 'comment.changed', + input: z.object({ workspaceId: z.string(), docId: z.string() }), + authorize: async () => {}, + room: (_currentUser, input) => + realtimeCommentRoom(input.workspaceId, input.docId), + }); + const emitted: unknown[] = []; + const publisher = new RealtimePublisher(registry, { + broadcast: () => {}, + } as never); + publisher.attachServer({ + to: (room: string) => ({ + emit: (event: string, payload: unknown) => + emitted.push({ room, event, payload }), + }), + } as never); + + publisher.publishLocal({ + topic: 'comment.changed', + input: { docId: 'doc', workspaceId: 'space' }, + event: { changed: true }, + }); + + t.like(emitted[0], { + room: 'workspace:space:doc:doc:comment', + event: 'realtime:event', + payload: { + topic: 'comment.changed', + inputKey: getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' }), + event: { changed: true }, + }, + }); +}); + test('gateway removes subscriptions on socket disconnect', async t => { const registry = new RealtimeRegistry(); registry.registerTopic({ diff --git a/packages/backend/server/src/core/realtime/gateway.ts b/packages/backend/server/src/core/realtime/gateway.ts index bf04d16dac..ce399f1d19 100644 --- a/packages/backend/server/src/core/realtime/gateway.ts +++ b/packages/backend/server/src/core/realtime/gateway.ts @@ -3,6 +3,7 @@ import type { RealtimeSubscribeEnvelope, RealtimeUnsubscribeEnvelope, } from '@affine/realtime'; +import { getRealtimeInputKey } from '@affine/realtime'; import { applyDecorators, Logger, UseInterceptors } from '@nestjs/common'; import { ConnectedSocket, @@ -25,7 +26,6 @@ import { import { CurrentUser } from '../auth'; import { RealtimePublisher } from './publisher'; import { RealtimeRegistry } from './registry'; -import { stableStringify } from './stable-stringify'; import type { RealtimePublishPayload } from './types'; const SubscribeMessage = (event: string) => @@ -87,7 +87,7 @@ export class RealtimeGateway implements OnGatewayInit, OnGatewayDisconnect { await handler.authorize(user, input as never); const room = handler.room(user, input as never); await client.join(room); - const subscriptionId = `${client.id}:${envelope.topic}:${stableStringify(input)}`; + const subscriptionId = `${client.id}:${envelope.topic}:${getRealtimeInputKey(input)}`; this.subscriptions.set(subscriptionId, { socketId: client.id, room, diff --git a/packages/backend/server/src/core/realtime/index.ts b/packages/backend/server/src/core/realtime/index.ts index 6b96b4546c..d21afcdfc0 100644 --- a/packages/backend/server/src/core/realtime/index.ts +++ b/packages/backend/server/src/core/realtime/index.ts @@ -11,6 +11,16 @@ import { RealtimeRegistry } from './registry'; }) export class RealtimeModule {} +export { registerRealtimeLiveQuery } from './provider'; export { RealtimePublisher } from './publisher'; export { RealtimeRegistry } from './registry'; +export { + realtimeCommentRoom, + realtimeNotificationRoom, + realtimeTranscriptTaskRoom, + realtimeUserRoom, + realtimeWorkspaceDocRoom, + realtimeWorkspaceEmbeddingProgressRoom, + realtimeWorkspaceRoom, +} from './rooms'; export type { RealtimeRequestHandler, RealtimeTopicHandler } from './types'; diff --git a/packages/backend/server/src/core/realtime/provider.ts b/packages/backend/server/src/core/realtime/provider.ts new file mode 100644 index 0000000000..ff7f855dd1 --- /dev/null +++ b/packages/backend/server/src/core/realtime/provider.ts @@ -0,0 +1,23 @@ +import type { RealtimeRequestName, RealtimeTopicName } from '@affine/realtime'; + +import type { RealtimeRegistry } from './registry'; +import type { RealtimeRequestHandler, RealtimeTopicHandler } from './types'; + +export type RealtimeLiveQueryDefinition< + Request extends RealtimeRequestName, + Topic extends RealtimeTopicName, +> = { + request: RealtimeRequestHandler; + topic: RealtimeTopicHandler; +}; + +export function registerRealtimeLiveQuery< + Request extends RealtimeRequestName, + Topic extends RealtimeTopicName, +>( + registry: RealtimeRegistry | undefined, + definition: RealtimeLiveQueryDefinition +) { + registry?.registerRequest(definition.request); + registry?.registerTopic(definition.topic); +} diff --git a/packages/backend/server/src/core/realtime/publisher.ts b/packages/backend/server/src/core/realtime/publisher.ts index 8011dcb655..571f0134a5 100644 --- a/packages/backend/server/src/core/realtime/publisher.ts +++ b/packages/backend/server/src/core/realtime/publisher.ts @@ -1,10 +1,13 @@ -import type { RealtimeEvent, RealtimeTopicName } from '@affine/realtime'; +import { + getRealtimeInputKey, + type RealtimeEvent, + type RealtimeTopicName, +} from '@affine/realtime'; import { Injectable, Logger } from '@nestjs/common'; import type { Server } from 'socket.io'; import { EventBus } from '../../base'; import { RealtimeRegistry } from './registry'; -import { stableStringify } from './stable-stringify'; import type { RealtimePublishPayload } from './types'; @Injectable() @@ -46,7 +49,7 @@ export class RealtimePublisher { const room = payload.room ?? handler.room(null, payload.input as never); const envelope: RealtimeEvent = { topic: payload.topic, - inputKey: stableStringify(payload.input), + inputKey: getRealtimeInputKey(payload.input), sentAt: Date.now(), event: payload.event as never, }; diff --git a/packages/backend/server/src/core/realtime/rooms.ts b/packages/backend/server/src/core/realtime/rooms.ts new file mode 100644 index 0000000000..8a3e00f117 --- /dev/null +++ b/packages/backend/server/src/core/realtime/rooms.ts @@ -0,0 +1,34 @@ +export function realtimeUserRoom(userId: string, scope: string) { + return `user:${userId}:${scope}`; +} + +export function realtimeWorkspaceRoom(workspaceId: string, scope: string) { + return `workspace:${workspaceId}:${scope}`; +} + +export function realtimeWorkspaceDocRoom( + workspaceId: string, + docId: string, + scope: string +) { + return `workspace:${workspaceId}:doc:${docId}:${scope}`; +} + +export function realtimeTranscriptTaskRoom( + workspaceId: string, + taskId: string +) { + return `copilot:transcript:${workspaceId}:${taskId}`; +} + +export function realtimeNotificationRoom(userId: string) { + return realtimeUserRoom(userId, 'notification'); +} + +export function realtimeCommentRoom(workspaceId: string, docId: string) { + return realtimeWorkspaceDocRoom(workspaceId, docId, 'comment'); +} + +export function realtimeWorkspaceEmbeddingProgressRoom(workspaceId: string) { + return realtimeWorkspaceRoom(workspaceId, 'embedding-progress'); +} diff --git a/packages/backend/server/src/core/realtime/stable-stringify.ts b/packages/backend/server/src/core/realtime/stable-stringify.ts deleted file mode 100644 index aaaa7f1b5a..0000000000 --- a/packages/backend/server/src/core/realtime/stable-stringify.ts +++ /dev/null @@ -1,31 +0,0 @@ -export function stableStringify(value: unknown): string { - if ( - value === undefined || - typeof value === 'function' || - typeof value === 'symbol' - ) { - return 'null'; - } - if (value === null || typeof value !== 'object') { - return JSON.stringify(value); - } - if (Array.isArray(value)) { - return `[${value.map(stableStringify).join(',')}]`; - } - if (value instanceof Date) { - return JSON.stringify(value.toJSON()); - } - const record = value as Record; - return `{${Object.keys(record) - .filter(key => { - const property = record[key]; - return ( - property !== undefined && - typeof property !== 'function' && - typeof property !== 'symbol' - ); - }) - .sort() - .map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`) - .join(',')}}`; -} diff --git a/packages/backend/server/src/plugins/copilot/context/realtime.ts b/packages/backend/server/src/plugins/copilot/context/realtime.ts index 4e55a356ce..27a835e45a 100644 --- a/packages/backend/server/src/plugins/copilot/context/realtime.ts +++ b/packages/backend/server/src/plugins/copilot/context/realtime.ts @@ -3,15 +3,17 @@ import { z } from 'zod'; import { OnEvent } from '../../../base'; import { AccessController } from '../../../core/permission'; -import type { +import { RealtimePublisher, RealtimeRegistry, + realtimeWorkspaceEmbeddingProgressRoom, + registerRealtimeLiveQuery, } from '../../../core/realtime'; import { Models } from '../../../models'; import { CopilotContextService } from './service'; export function workspaceEmbeddingRoom(workspaceId: string) { - return `workspace:${workspaceId}:embedding-progress`; + return realtimeWorkspaceEmbeddingProgressRoom(workspaceId); } @Injectable() @@ -27,27 +29,28 @@ export class CopilotEmbeddingRealtimeProvider implements OnModuleInit { onModuleInit() { const input = z.object({ workspaceId: z.string() }); - this.registry?.registerRequest({ - name: 'workspace.embedding.progress.get', - input, - handle: async (user, payload) => { - await this.assertCopilot(user.id, payload.workspaceId); - if (!this.context.canEmbedding) { - return { total: 0, embedded: 0 }; - } - return await this.models.copilotWorkspace.getEmbeddingStatus( - payload.workspaceId - ); + registerRealtimeLiveQuery(this.registry, { + request: { + name: 'workspace.embedding.progress.get', + input, + handle: async (user, payload) => { + await this.assertCopilot(user.id, payload.workspaceId); + if (!this.context.canEmbedding) { + return { total: 0, embedded: 0 }; + } + return await this.models.copilotWorkspace.getEmbeddingStatus( + payload.workspaceId + ); + }, }, - }); - - this.registry?.registerTopic({ - name: 'workspace.embedding.progress.changed', - input, - authorize: async (user, payload) => { - await this.assertCopilot(user.id, payload.workspaceId); + topic: { + name: 'workspace.embedding.progress.changed', + input, + authorize: async (user, payload) => { + await this.assertCopilot(user.id, payload.workspaceId); + }, + room: (_user, payload) => workspaceEmbeddingRoom(payload.workspaceId), }, - room: (_user, payload) => workspaceEmbeddingRoom(payload.workspaceId), }); } diff --git a/packages/backend/server/src/plugins/copilot/transcript/realtime.ts b/packages/backend/server/src/plugins/copilot/transcript/realtime.ts index 17bcc05ba5..98a006f7bf 100644 --- a/packages/backend/server/src/plugins/copilot/transcript/realtime.ts +++ b/packages/backend/server/src/plugins/copilot/transcript/realtime.ts @@ -3,8 +3,12 @@ import { z } from 'zod'; import { CopilotTranscriptionJobNotFound } from '../../../base'; import { AccessController } from '../../../core/permission'; -import type { RealtimeRegistry } from '../../../core/realtime'; -import { CopilotTranscriptionService, transcriptTaskRoom } from './service'; +import { + type RealtimeRegistry, + realtimeTranscriptTaskRoom, + registerRealtimeLiveQuery, +} from '../../../core/realtime'; +import { CopilotTranscriptionService } from './service'; @Injectable() export class CopilotTranscriptRealtimeProvider implements OnModuleInit { @@ -15,47 +19,51 @@ export class CopilotTranscriptRealtimeProvider implements OnModuleInit { ) {} onModuleInit() { - this.registry?.registerRequest({ - name: 'copilot.transcript.task.get', - input: z - .object({ - workspaceId: z.string(), - blobId: z.string().optional(), - taskId: z.string().optional(), - }) - .refine(input => input.blobId || input.taskId), - handle: async (user, input) => { - await this.assertCopilot(user.id, input.workspaceId); - return { - task: await this.transcript.queryTask( - user.id, - input.workspaceId, - input.taskId, - input.blobId - ), - }; - }, + const requestInput = z + .object({ + workspaceId: z.string(), + blobId: z.string().optional(), + taskId: z.string().optional(), + }) + .refine(input => input.blobId || input.taskId); + const topicInput = z.object({ + workspaceId: z.string(), + taskId: z.string(), }); - this.registry?.registerTopic({ - name: 'copilot.transcript.task.changed', - input: z.object({ - workspaceId: z.string(), - taskId: z.string(), - }), - authorize: async (user, input) => { - await this.assertCopilot(user.id, input.workspaceId); - const task = await this.transcript.queryTask( - user.id, - input.workspaceId, - input.taskId - ); - if (!task) { - throw new CopilotTranscriptionJobNotFound(); - } + registerRealtimeLiveQuery(this.registry, { + request: { + name: 'copilot.transcript.task.get', + input: requestInput, + handle: async (user, input) => { + await this.assertCopilot(user.id, input.workspaceId); + return { + task: await this.transcript.queryTask( + user.id, + input.workspaceId, + input.taskId, + input.blobId + ), + }; + }, + }, + topic: { + name: 'copilot.transcript.task.changed', + input: topicInput, + authorize: async (user, input) => { + await this.assertCopilot(user.id, input.workspaceId); + const task = await this.transcript.queryTask( + user.id, + input.workspaceId, + input.taskId + ); + if (!task) { + throw new CopilotTranscriptionJobNotFound(); + } + }, + room: (_user, input) => + realtimeTranscriptTaskRoom(input.workspaceId, input.taskId), }, - room: (_user, input) => - transcriptTaskRoom(input.workspaceId, input.taskId), }); } diff --git a/packages/backend/server/src/plugins/copilot/transcript/service.ts b/packages/backend/server/src/plugins/copilot/transcript/service.ts index af509ec5a1..eb1d685f3d 100644 --- a/packages/backend/server/src/plugins/copilot/transcript/service.ts +++ b/packages/backend/server/src/plugins/copilot/transcript/service.ts @@ -9,7 +9,10 @@ import { OnJob, sniffMime, } from '../../../base'; -import type { RealtimePublisher } from '../../../core/realtime'; +import { + type RealtimePublisher, + realtimeTranscriptTaskRoom, +} from '../../../core/realtime'; import { Models } from '../../../models'; import { CopilotAccessPolicy } from '../access'; import { PromptService } from '../prompt'; @@ -33,10 +36,6 @@ const TRANSCRIPT_ACTION_ID = 'transcript.audio.gemini'; const TRANSCRIPT_ACTION_VERSION = 'v1'; const TRANSCRIPT_STRATEGY = 'gemini'; -export function transcriptTaskRoom(workspaceId: string, taskId: string) { - return `copilot:transcript:${workspaceId}:${taskId}`; -} - export type TranscriptionJob = { id: string; status: AiJobStatus; @@ -468,7 +467,7 @@ export class CopilotTranscriptionService { 'copilot.transcript.task.changed', { workspaceId, taskId }, { taskId, status, error }, - { room: transcriptTaskRoom(workspaceId, taskId) } + { room: realtimeTranscriptTaskRoom(workspaceId, taskId) } ); } } diff --git a/packages/common/nbstore/src/realtime/__tests__/manager.spec.ts b/packages/common/nbstore/src/realtime/__tests__/manager.spec.ts index 81074b8fdb..a4c4f306c1 100644 --- a/packages/common/nbstore/src/realtime/__tests__/manager.spec.ts +++ b/packages/common/nbstore/src/realtime/__tests__/manager.spec.ts @@ -1,7 +1,7 @@ -import type { RealtimeEvent } from '@affine/realtime'; +import { getRealtimeInputKey, type RealtimeEvent } from '@affine/realtime'; import { beforeEach, expect, test, vi } from 'vitest'; -import { RealtimeManager, stableStringify } from '../manager'; +import { RealtimeManager } from '../manager'; type Handler = (payload?: unknown) => void; @@ -70,16 +70,16 @@ beforeEach(() => { socket.disconnected = false; }); -test('stableStringify is deterministic for realtime subscription inputs', () => { - expect(stableStringify({ workspaceId: 'space', docId: 'doc' })).toBe( - stableStringify({ docId: 'doc', workspaceId: 'space' }) +test('getRealtimeInputKey is deterministic for realtime subscription inputs', () => { + expect(getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' })).toBe( + getRealtimeInputKey({ docId: 'doc', workspaceId: 'space' }) ); }); -test('stableStringify follows JSON semantics for edge values', () => { - expect(stableStringify({ a: undefined })).toBe(stableStringify({})); - expect(stableStringify([undefined])).toBe('[null]'); - expect(stableStringify(new Date('2026-01-02T03:04:05.000Z'))).toBe( +test('getRealtimeInputKey follows JSON semantics for edge values', () => { + expect(getRealtimeInputKey({ a: undefined })).toBe(getRealtimeInputKey({})); + expect(getRealtimeInputKey([undefined])).toBe('[null]'); + expect(getRealtimeInputKey(new Date('2026-01-02T03:04:05.000Z'))).toBe( '"2026-01-02T03:04:05.000Z"' ); }); @@ -159,13 +159,13 @@ test('subscribe routes events by topic and stable input key', async () => { socket.emit('realtime:event', { topic: 'comment.changed', - inputKey: stableStringify({ workspaceId: 'space', docId: 'other' }), + inputKey: getRealtimeInputKey({ workspaceId: 'space', docId: 'other' }), sentAt: 1, event: { changed: true }, } satisfies RealtimeEvent); socket.emit('realtime:event', { topic: 'comment.changed', - inputKey: stableStringify({ workspaceId: 'space', docId: 'doc' }), + inputKey: getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' }), sentAt: 2, event: { changed: true }, } satisfies RealtimeEvent); diff --git a/packages/common/nbstore/src/realtime/index.ts b/packages/common/nbstore/src/realtime/index.ts index 12a9bc210c..c2ad84dbdf 100644 --- a/packages/common/nbstore/src/realtime/index.ts +++ b/packages/common/nbstore/src/realtime/index.ts @@ -1 +1 @@ -export { RealtimeManager, stableStringify } from './manager'; +export { RealtimeManager } from './manager'; diff --git a/packages/common/nbstore/src/realtime/manager.ts b/packages/common/nbstore/src/realtime/manager.ts index b706a4c29a..502e17c2af 100644 --- a/packages/common/nbstore/src/realtime/manager.ts +++ b/packages/common/nbstore/src/realtime/manager.ts @@ -10,6 +10,7 @@ import type { RealtimeTopicInputOf, RealtimeTopicName, } from '@affine/realtime'; +import { getRealtimeInputKey } from '@affine/realtime'; import { Observable, Subject } from 'rxjs'; import { SocketConnection } from '../impls/cloud/socket'; @@ -151,7 +152,7 @@ export class RealtimeManager { this.subscriptions.set(subscriptionId, { topic, input, - inputKey: stableStringify(input), + inputKey: getRealtimeInputKey(input), subject$, }); subscriber.next({ @@ -302,35 +303,3 @@ export class RealtimeManager { this.subscriptions.clear(); } } - -export function stableStringify(value: unknown): string { - if ( - value === undefined || - typeof value === 'function' || - typeof value === 'symbol' - ) { - return 'null'; - } - if (value === null || typeof value !== 'object') { - return JSON.stringify(value); - } - if (Array.isArray(value)) { - return `[${value.map(stableStringify).join(',')}]`; - } - if (value instanceof Date) { - return JSON.stringify(value.toJSON()); - } - const record = value as Record; - return `{${Object.keys(record) - .filter(key => { - const property = record[key]; - return ( - property !== undefined && - typeof property !== 'function' && - typeof property !== 'symbol' - ); - }) - .sort() - .map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`) - .join(',')}}`; -} diff --git a/packages/frontend/core/src/modules/cloud/index.ts b/packages/frontend/core/src/modules/cloud/index.ts index d6bc7cc00c..ec60596ecd 100644 --- a/packages/frontend/core/src/modules/cloud/index.ts +++ b/packages/frontend/core/src/modules/cloud/index.ts @@ -6,6 +6,11 @@ export { AccountLoggedIn } from './events/account-logged-in'; export { AccountLoggedOut } from './events/account-logged-out'; export { AuthProvider } from './provider/auth'; export { ValidatorProvider } from './provider/validator'; +export { + RealtimeLiveQuery, + type RealtimeLiveQueryEventResult, + type RealtimeLiveQueryOptions, +} from './realtime/live-query'; export { ServerScope } from './scopes/server'; export { AccessTokenService } from './services/access-token'; export { AuthService } from './services/auth'; diff --git a/packages/frontend/core/src/modules/cloud/realtime/live-query.spec.ts b/packages/frontend/core/src/modules/cloud/realtime/live-query.spec.ts new file mode 100644 index 0000000000..efb655e695 --- /dev/null +++ b/packages/frontend/core/src/modules/cloud/realtime/live-query.spec.ts @@ -0,0 +1,156 @@ +import { Subject } from 'rxjs'; +import { describe, expect, test, vi } from 'vitest'; + +import { RealtimeLiveQuery } from './live-query'; + +describe('RealtimeLiveQuery', () => { + test('requests snapshot when subscription is ready', async () => { + const events$ = new Subject<{ type: 'ready' } | { count: number }>(); + const applySnapshot = vi.fn(); + const query = new RealtimeLiveQuery({ + request: vi.fn().mockResolvedValue({ count: 1 }), + subscribe: () => events$, + applySnapshot, + }); + + query.start(); + events$.next({ type: 'ready' }); + + await vi.waitFor(() => + expect(applySnapshot).toHaveBeenCalledWith({ count: 1 }) + ); + query.dispose(); + }); + + test('applies event without revalidating when applyEvent returns applied', async () => { + const events$ = new Subject<{ type: 'ready' } | { count: number }>(); + const request = vi.fn().mockResolvedValue({ count: 1 }); + const applyEvent = vi.fn().mockReturnValue('applied'); + const query = new RealtimeLiveQuery({ + request, + subscribe: () => events$, + applySnapshot: vi.fn(), + applyEvent, + }); + + query.start(); + events$.next({ count: 2 }); + + expect(applyEvent).toHaveBeenCalledWith({ count: 2 }); + expect(request).not.toHaveBeenCalled(); + query.dispose(); + }); + + test('applies business events that include a type field', () => { + const events$ = new Subject< + { type: 'ready' } | { type: 'updated'; count: number } + >(); + const request = vi.fn().mockResolvedValue({ count: 1 }); + const applyEvent = vi.fn().mockReturnValue('applied'); + const query = new RealtimeLiveQuery({ + request, + subscribe: () => events$, + applySnapshot: vi.fn(), + applyEvent, + }); + + query.start(); + events$.next({ type: 'updated', count: 2 }); + + expect(applyEvent).toHaveBeenCalledWith({ type: 'updated', count: 2 }); + expect(request).not.toHaveBeenCalled(); + query.dispose(); + }); + + test('revalidates event when applyEvent asks for it', async () => { + const events$ = new Subject<{ type: 'ready' } | { changed: true }>(); + const applySnapshot = vi.fn(); + const query = new RealtimeLiveQuery({ + request: vi.fn().mockResolvedValue({ changes: [] }), + subscribe: () => events$, + applySnapshot, + applyEvent: () => 'revalidate', + }); + + query.start(); + events$.next({ changed: true }); + + await vi.waitFor(() => + expect(applySnapshot).toHaveBeenCalledWith({ changes: [] }) + ); + query.dispose(); + }); + + test('passes subscription and request errors to onError', async () => { + const events$ = new Subject<{ type: 'ready' }>(); + const onError = vi.fn(); + const requestError = new Error('request failed'); + const query = new RealtimeLiveQuery({ + request: vi.fn().mockRejectedValue(requestError), + subscribe: () => events$, + applySnapshot: vi.fn(), + onError, + }); + + query.start(); + events$.next({ type: 'ready' }); + await vi.waitFor(() => expect(onError).toHaveBeenCalledWith(requestError)); + + const subscriptionError = new Error('subscribe failed'); + events$.error(subscriptionError); + expect(onError).toHaveBeenCalledWith(subscriptionError); + query.dispose(); + }); + + test('dispose aborts in-flight request and ignores stale result', async () => { + const events$ = new Subject<{ type: 'ready' }>(); + const applySnapshot = vi.fn(); + let resolveRequest: (value: { count: number }) => void = () => {}; + const query = new RealtimeLiveQuery({ + request: vi.fn( + () => + new Promise<{ count: number }>(resolve => { + resolveRequest = resolve; + }) + ), + subscribe: () => events$, + applySnapshot, + }); + + query.start(); + events$.next({ type: 'ready' }); + query.dispose(); + resolveRequest({ count: 1 }); + + await Promise.resolve(); + expect(applySnapshot).not.toHaveBeenCalled(); + }); + + test('new request supersedes older in-flight result', async () => { + const events$ = new Subject<{ type: 'ready' }>(); + const applySnapshot = vi.fn(); + const resolvers: Array<(value: { count: number }) => void> = []; + const query = new RealtimeLiveQuery({ + request: vi.fn( + () => + new Promise<{ count: number }>(resolve => { + resolvers.push(resolve); + }) + ), + subscribe: () => events$, + applySnapshot, + }); + + query.start(); + events$.next({ type: 'ready' }); + events$.next({ type: 'ready' }); + resolvers[0]({ count: 1 }); + resolvers[1]({ count: 2 }); + + await vi.waitFor(() => + expect(applySnapshot).toHaveBeenCalledWith({ count: 2 }) + ); + expect(applySnapshot).toHaveBeenCalledTimes(1); + query.dispose(); + }); +}); diff --git a/packages/frontend/core/src/modules/cloud/realtime/live-query.ts b/packages/frontend/core/src/modules/cloud/realtime/live-query.ts new file mode 100644 index 0000000000..ba30b53358 --- /dev/null +++ b/packages/frontend/core/src/modules/cloud/realtime/live-query.ts @@ -0,0 +1,100 @@ +import type { RealtimeSubscriptionReady } from '@affine/realtime'; +import type { Observable, Subscription } from 'rxjs'; + +export type RealtimeLiveQueryEventResult = 'applied' | 'revalidate'; + +export type RealtimeLiveQueryOptions = { + request: (signal: AbortSignal) => Promise; + subscribe: () => Observable; + applySnapshot: (snapshot: TSnapshot) => void; + applyEvent?: (event: TEvent) => RealtimeLiveQueryEventResult; + onError?: (error: unknown) => void; +}; + +function isReadyEvent( + event: TEvent | RealtimeSubscriptionReady +): event is RealtimeSubscriptionReady { + return 'type' in event && event.type === 'ready'; +} + +export class RealtimeLiveQuery { + private subscription?: Subscription; + private requestController?: AbortController; + private generation = 0; + private started = false; + + constructor( + private readonly options: RealtimeLiveQueryOptions + ) {} + + start() { + this.stop(); + this.started = true; + const generation = this.generation; + this.subscription = this.options.subscribe().subscribe({ + next: event => { + if (isReadyEvent(event)) { + this.revalidate(); + return; + } + if (!this.options.applyEvent) { + this.revalidate(); + return; + } + if (this.options.applyEvent(event) === 'revalidate') { + this.revalidate(); + } + }, + error: error => { + if (this.generation === generation) { + this.options.onError?.(error); + } + }, + }); + } + + revalidate() { + if (!this.started) { + return; + } + this.requestController?.abort(); + const controller = new AbortController(); + this.requestController = controller; + const generation = this.generation; + this.options.request(controller.signal).then( + snapshot => { + if ( + this.started && + this.generation === generation && + this.requestController === controller && + !controller.signal.aborted + ) { + this.options.applySnapshot(snapshot); + } + }, + error => { + if ( + this.started && + this.generation === generation && + this.requestController === controller && + !controller.signal.aborted + ) { + this.options.onError?.(error); + } + } + ); + } + + stop() { + this.started = false; + this.generation += 1; + this.subscription?.unsubscribe(); + this.subscription = undefined; + this.requestController?.abort(); + this.requestController = undefined; + } + + dispose() { + this.stop(); + } +} diff --git a/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts b/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts index e133a0f6ed..d324eeef41 100644 --- a/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts +++ b/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts @@ -11,6 +11,10 @@ import { updateReplyMutation, uploadCommentAttachmentMutation, } from '@affine/graphql'; +import type { + RealtimeSubscriptionReady, + RealtimeTopicEventOf, +} from '@affine/realtime'; import { Entity } from '@toeverything/infra'; import type { Observable } from 'rxjs'; @@ -144,11 +148,11 @@ export class DocCommentStore extends Entity<{ { after, workspaceId: this.currentWorkspaceId, docId: this.props.docId } ); return { - changes: commentChanges.changes.map((change: any) => ({ + changes: commentChanges.changes.map(change => ({ id: change.id, action: change.action, comment: normalizeComment(change.item as GQLCommentType), - commentId: change.commentId, + commentId: change.commentId ?? undefined, })), startCursor: commentChanges.startCursor, endCursor: commentChanges.endCursor, @@ -156,7 +160,9 @@ export class DocCommentStore extends Entity<{ }; } - subscribeCommentChanged(): Observable { + subscribeCommentChanged(): Observable< + RealtimeTopicEventOf<'comment.changed'> | RealtimeSubscriptionReady + > { return this.nbstoreService.realtime.subscribe('comment.changed', { workspaceId: this.currentWorkspaceId, docId: this.props.docId, diff --git a/packages/frontend/core/src/modules/comment/entities/doc-comment.ts b/packages/frontend/core/src/modules/comment/entities/doc-comment.ts index 92fe6a3ce2..6e40a6b2e2 100644 --- a/packages/frontend/core/src/modules/comment/entities/doc-comment.ts +++ b/packages/frontend/core/src/modules/comment/entities/doc-comment.ts @@ -16,17 +16,9 @@ import { onStart, } from '@toeverything/infra'; import { nanoid } from 'nanoid'; -import { - catchError, - filter, - first, - of, - Subject, - type Subscription, - switchMap, - tap, -} from 'rxjs'; +import { catchError, filter, first, of, Subject, switchMap, tap } from 'rxjs'; +import { RealtimeLiveQuery } from '../../cloud/realtime/live-query'; import { type DocDisplayMetaService } from '../../doc-display-meta'; import { GlobalContextService } from '../../global-context'; import type { SnapshotHelper } from '../services/snapshot-helper'; @@ -93,11 +85,19 @@ export class DocCommentEntity extends Entity<{ private readonly commentDeleted$ = new Subject(); readonly commentHighlighted$ = new LiveData(null); - private realtimeDisposable?: DisposeCallback; - private realtimeSubscription?: Subscription; + private readonly liveQuery = new RealtimeLiveQuery({ + request: () => this.store.listCommentChanges({ after: this.startCursor }), + subscribe: () => this.store.subscribeCommentChanged(), + applySnapshot: result => { + this.handleCommentChanges(result); + this.startCursor = result.endCursor; + }, + onError: error => { + console.error('Failed to sync comment changes:', error); + }, + }); private startCursor?: string; - private fetchingCommentChanges = false; - private pendingCommentChangeFetch = false; + private startVersion = 0; async addComment( selections?: BaseSelection[], @@ -490,52 +490,34 @@ export class DocCommentEntity extends Entity<{ } start(): void { - if (this.realtimeDisposable) { - this.realtimeDisposable(); - } - this.realtimeSubscription?.unsubscribe(); - - this.revalidate(); this.revalidateCommentsInEditor(); - - this.realtimeSubscription = this.store.subscribeCommentChanged().subscribe({ - next: () => { - this.fetchCommentChanges().catch(() => {}); - }, - error: error => { - console.error('Failed to subscribe comment changes:', error); - }, + this.startLiveQueryAfterInitialLoad().catch(error => { + console.error('Failed to start comment realtime:', error); }); - this.realtimeDisposable = () => this.realtimeSubscription?.unsubscribe(); } stop(): void { - if (this.realtimeDisposable) { - this.realtimeDisposable(); - } - this.realtimeSubscription?.unsubscribe(); + this.startVersion++; + this.liveQuery.stop(); + this.loading$.setValue(false); } - private async fetchCommentChanges() { - if (this.fetchingCommentChanges) { - this.pendingCommentChangeFetch = true; - return; - } - - this.fetchingCommentChanges = true; + private async startLiveQueryAfterInitialLoad() { + const version = ++this.startVersion; + this.liveQuery.stop(); + this.loading$.setValue(true); try { - do { - this.pendingCommentChangeFetch = false; - const result = await this.store.listCommentChanges({ - after: this.startCursor, - }); - this.handleCommentChanges(result); - this.startCursor = result.endCursor; - } while (this.pendingCommentChangeFetch); - } catch (error) { - console.error('Failed to fetch comment changes:', error); + const allComments = await this.loadAllComments(); + if (version !== this.startVersion) { + return; + } + this.revalidateCommentsInEditor(); + this.comments$.setValue(allComments); + this.liveQuery.start(); } finally { - this.fetchingCommentChanges = false; + if (version === this.startVersion) { + this.loading$.setValue(false); + } } } @@ -650,31 +632,9 @@ export class DocCommentEntity extends Entity<{ revalidate = effect( switchMap(() => { - return fromPromise(async () => { - const allComments: DocComment[] = []; - let cursor = ''; - let firstResult: DocCommentListResult | null = null; - this.revalidateCommentsInEditor(); - // Fetch all pages of comments - while (true) { - const result = await this.store.listComments({ after: cursor }); - if (!firstResult) { - firstResult = result; - // Store the startCursor from the first page for incremental changes - this.startCursor = result.startCursor; - } - allComments.push(...result.comments); - cursor = result.endCursor; - if (!result.hasNextPage) { - break; - } - } - - return allComments; - }).pipe( + return fromPromise(() => this.loadAllComments()).pipe( tap(allComments => { this.revalidateCommentsInEditor(); - // Update state with all comments this.comments$.setValue(allComments); }), onStart(() => this.loading$.setValue(true)), @@ -688,6 +648,27 @@ export class DocCommentEntity extends Entity<{ }) ); + private async loadAllComments() { + const allComments: DocComment[] = []; + let cursor = ''; + let firstResult: DocCommentListResult | null = null; + this.revalidateCommentsInEditor(); + while (true) { + const result = await this.store.listComments({ after: cursor }); + if (!firstResult) { + firstResult = result; + this.startCursor = result.startCursor; + } + allComments.push(...result.comments); + cursor = result.endCursor; + if (!result.hasNextPage) { + break; + } + } + + return allComments; + } + private readonly revalidateCommentsInEditor = () => { this.commentsInEditor$.setValue(this.getCommentsInEditor()); }; diff --git a/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts b/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts index fef525b794..f0d5a4ebbe 100644 --- a/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts +++ b/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts @@ -1,10 +1,11 @@ import { shallowEqual } from '@affine/component'; import type { TranscriptionBlockProps } from '@affine/core/blocksuite/ai/blocks/transcription-block/model'; +import { RealtimeLiveQuery } from '@affine/core/modules/cloud/realtime/live-query'; import { DebugLogger } from '@affine/debug'; import { UserFriendlyError } from '@affine/error'; -import { AiJobStatus } from '@affine/graphql'; +import { AiJobStatus, type TranscriptionResultType } from '@affine/graphql'; +import type { RealtimeTopicEventOf } from '@affine/realtime'; import { Entity, LiveData } from '@toeverything/infra'; -import type { Subscription } from 'rxjs'; import type { DefaultServerService, WorkspaceServerService } from '../../cloud'; import { AuthService } from '../../cloud/services/auth'; @@ -65,7 +66,10 @@ export class AudioTranscriptionJob extends Entity<{ } disposed = false; - private taskSubscription?: Subscription; + private taskLiveQuery?: RealtimeLiveQuery< + TranscriptionResultType | null, + RealtimeTopicEventOf<'copilot.transcript.task.changed'> + >; private taskWaitReject?: (error: unknown) => void; private readonly _status$ = new LiveData({ @@ -205,45 +209,56 @@ export class AudioTranscriptionJob extends Entity<{ await new Promise((resolve, reject) => { this.taskWaitReject = reject; - this.taskSubscription = this.store - .subscribeTranscriptTask(taskId) - .subscribe({ - next: event => { - if ( - 'type' in event || - (event.status as AiJobStatus) === AiJobStatus.finished - ) { - this.checkTranscriptTask(taskId).then(() => { - if (this.status$.value.status === AiJobStatus.finished) { - resolve(); - } - }, reject); - } else if ((event.status as AiJobStatus) === AiJobStatus.failed) { - reject( - UserFriendlyError.fromAny( - event.error ?? 'Transcription job failed' - ) - ); + this.taskLiveQuery = new RealtimeLiveQuery({ + request: () => this.store.getTranscriptTask(this.props.blobId, taskId), + subscribe: () => this.store.subscribeTranscriptTask(taskId), + applySnapshot: job => { + this.applyTranscriptTaskSnapshot(taskId, job).then(() => { + if (this.status$.value.status === AiJobStatus.finished) { + resolve(); } - }, - error: reject, - }); + }, reject); + }, + applyEvent: event => { + if (event.status === AiJobStatus.failed) { + reject( + UserFriendlyError.fromAny( + event.error ?? 'Transcription job failed' + ) + ); + return 'applied'; + } + return event.status === AiJobStatus.finished + ? 'revalidate' + : 'applied'; + }, + onError: reject, + }); + this.taskLiveQuery.start(); }).finally(() => { this.taskWaitReject = undefined; - this.taskSubscription?.unsubscribe(); - this.taskSubscription = undefined; + this.taskLiveQuery?.dispose(); + this.taskLiveQuery = undefined; }); } private rejectTaskWait(error: unknown) { const reject = this.taskWaitReject; this.taskWaitReject = undefined; - this.taskSubscription?.unsubscribe(); - this.taskSubscription = undefined; + this.taskLiveQuery?.dispose(); + this.taskLiveQuery = undefined; reject?.(error); } private async checkTranscriptTask(taskId: string) { + const job = await this.store.getTranscriptTask(this.props.blobId, taskId); + await this.applyTranscriptTaskSnapshot(taskId, job); + } + + private async applyTranscriptTaskSnapshot( + taskId: string, + job: TranscriptionResultType | null + ) { if ( this.disposed || this.props.blockProps.jobId !== taskId || @@ -251,7 +266,6 @@ export class AudioTranscriptionJob extends Entity<{ ) { return; } - const job = await this.store.getTranscriptTask(this.props.blobId, taskId); if (!job || job.status === AiJobStatus.failed) { logger.debug('Job failed during realtime status check', { diff --git a/packages/frontend/core/src/modules/notification/services/count.ts b/packages/frontend/core/src/modules/notification/services/count.ts index ef2f52a4bf..38e4b425e0 100644 --- a/packages/frontend/core/src/modules/notification/services/count.ts +++ b/packages/frontend/core/src/modules/notification/services/count.ts @@ -1,20 +1,8 @@ -import { - catchErrorInto, - effect, - exhaustMapWithTrailing, - fromPromise, - LiveData, - onComplete, - OnEvent, - onStart, - Service, - smartRetry, -} from '@toeverything/infra'; -import type { Subscription } from 'rxjs'; -import { tap } from 'rxjs'; +import { LiveData, OnEvent, Service } from '@toeverything/infra'; import { AccountChanged, type AuthService } from '../../cloud'; import { ServerStarted } from '../../cloud/events/server-started'; +import { RealtimeLiveQuery } from '../../cloud/realtime/live-query'; import { ApplicationFocused } from '../../lifecycle'; import type { NbstoreService } from '../../storage'; import type { NotificationStore } from '../stores/notification'; @@ -36,30 +24,25 @@ export class NotificationCountService extends Service { readonly count$ = LiveData.from(this.store.watchNotificationCountCache(), 0); readonly isLoading$ = new LiveData(false); readonly error$ = new LiveData(null); - private subscription?: Subscription; + private readonly liveQuery = new RealtimeLiveQuery({ + request: signal => this.requestCount(signal), + subscribe: () => + this.nbstoreService.realtime.subscribe('notification.count.changed', {}), + applySnapshot: result => this.setCount(result.count), + applyEvent: event => { + this.setCount(event.count); + return 'applied'; + }, + onError: error => this.error$.setValue(error), + }); - revalidate = effect( - exhaustMapWithTrailing(() => { - return fromPromise(() => { - if (!this.loggedIn$.value) { - return Promise.resolve(0); - } - return this.nbstoreService.realtime - .request('notification.count.get', {}, { timeoutMs: 10000 }) - .then(result => result.count); - }).pipe( - tap(result => { - this.setCount(result ?? 0); - }), - smartRetry(), - catchErrorInto(this.error$), - onStart(() => { - this.isLoading$.setValue(true); - }), - onComplete(() => this.isLoading$.setValue(false)) - ); - }) - ); + revalidate = () => { + if (!this.loggedIn$.value) { + this.setCount(0); + return; + } + this.liveQuery.revalidate(); + }; handleApplicationFocused() { this.revalidate(); @@ -76,33 +59,34 @@ export class NotificationCountService extends Service { } setCount(count: number) { + this.error$.setValue(null); this.store.setNotificationCountCache(count); } override dispose(): void { super.dispose(); - this.revalidate.unsubscribe(); - this.subscription?.unsubscribe(); + this.liveQuery.dispose(); } private subscribe() { - this.subscription?.unsubscribe(); if (!this.loggedIn$.value) { + this.liveQuery.stop(); + this.setCount(0); return; } - this.subscription = this.nbstoreService.realtime - .subscribe('notification.count.changed', {}) - .subscribe({ - next: event => { - if ('type' in event) { - this.revalidate(); - } else { - this.setCount(event.count); - } - }, - error: error => { - this.error$.setValue(error); - }, - }); + this.liveQuery.start(); + } + + private async requestCount(signal: AbortSignal) { + this.isLoading$.setValue(true); + try { + return await this.nbstoreService.realtime.request( + 'notification.count.get', + {}, + { signal, timeoutMs: 10000 } + ); + } finally { + this.isLoading$.setValue(false); + } } } diff --git a/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts b/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts index ce2cc8da07..bbaa15657a 100644 --- a/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts +++ b/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts @@ -1,17 +1,8 @@ +import { RealtimeLiveQuery } from '@affine/core/modules/cloud/realtime/live-query'; import type { WorkspaceService } from '@affine/core/modules/workspace'; +import type { RealtimeTopicEventOf } from '@affine/realtime'; import { logger } from '@sentry/react'; -import { - catchErrorInto, - effect, - Entity, - fromPromise, - LiveData, - onComplete, - onStart, - smartRetry, -} from '@toeverything/infra'; -import { EMPTY, type Subscription } from 'rxjs'; -import { exhaustMap, mergeMap } from 'rxjs/operators'; +import { Entity, LiveData } from '@toeverything/infra'; import type { EmbeddingStore } from '../stores/embedding'; import type { LocalAttachmentFile } from '../types'; @@ -26,8 +17,31 @@ export class EmbeddingProgress extends Entity { error$ = new LiveData(null); loading$ = new LiveData(true); - private progressSubscription?: Subscription; uploadingAttachments$ = new LiveData([]); + private started = false; + private readonly liveQuery = new RealtimeLiveQuery< + Progress, + RealtimeTopicEventOf<'workspace.embedding.progress.changed'> + >({ + request: signal => this.requestProgress(signal), + subscribe: () => + this.store.subscribeEmbeddingProgress(this.workspaceService.workspace.id), + applySnapshot: progress => this.applyProgress(progress), + applyEvent: event => { + if ( + typeof event.embedded === 'number' && + typeof event.total === 'number' + ) { + this.applyProgress({ embedded: event.embedded, total: event.total }); + return 'applied'; + } + return 'revalidate'; + }, + onError: error => { + this.error$.setValue(error); + logger.error('Failed to fetch workspace embedding progress', { error }); + }, + }); constructor( private readonly workspaceService: WorkspaceService, @@ -37,48 +51,46 @@ export class EmbeddingProgress extends Entity { } startEmbeddingProgress() { - this.stopEmbeddingProgress(); - this.progressSubscription = this.store - .subscribeEmbeddingProgress(this.workspaceService.workspace.id) - .subscribe({ - next: () => this.getEmbeddingProgress(), - error: error => this.error$.setValue(error), - }); - this.getEmbeddingProgress(); + this.started = true; + this.liveQuery.start(); } stopEmbeddingProgress() { - this.progressSubscription?.unsubscribe(); - this.progressSubscription = undefined; + this.started = false; + this.liveQuery.stop(); } - getEmbeddingProgress = effect( - exhaustMap(() => { - return fromPromise(signal => - this.store.getEmbeddingProgress( - this.workspaceService.workspace.id, - signal - ) - ).pipe( - smartRetry(), - mergeMap(value => { - this.progress$.next(value); - if (value && value.embedded === value.total && value.total > 0) { - this.stopEmbeddingProgress(); - } - return EMPTY; - }), - catchErrorInto(this.error$, error => { - logger.error('Failed to fetch workspace embedding progress', error); - }), - onStart(() => this.loading$.setValue(true)), - onComplete(() => this.loading$.setValue(false)) - ); - }) - ); + getEmbeddingProgress = () => { + if (this.started) { + this.liveQuery.revalidate(); + return; + } + this.requestProgress(new AbortController().signal).then( + progress => this.applyProgress(progress), + error => this.error$.setValue(error) + ); + }; override dispose(): void { - this.progressSubscription?.unsubscribe(); - this.getEmbeddingProgress.unsubscribe(); + this.liveQuery.dispose(); + } + + private applyProgress(value: Progress | null) { + this.progress$.next(value); + if (value && value.embedded === value.total && value.total > 0) { + this.stopEmbeddingProgress(); + } + } + + private async requestProgress(signal: AbortSignal) { + this.loading$.setValue(true); + try { + return await this.store.getEmbeddingProgress( + this.workspaceService.workspace.id, + signal + ); + } finally { + this.loading$.setValue(false); + } } } diff --git a/tests/affine-cloud-copilot/e2e/settings/embedding.spec.ts b/tests/affine-cloud-copilot/e2e/settings/embedding.spec.ts index 38517208c9..08625c0eb8 100644 --- a/tests/affine-cloud-copilot/e2e/settings/embedding.spec.ts +++ b/tests/affine-cloud-copilot/e2e/settings/embedding.spec.ts @@ -127,10 +127,8 @@ test.describe('AISettings/Embedding', () => { await page.getByTestId('embedding-progress-wrapper'); const progress = await page.getByTestId('embedding-progress'); - // wait for the progress to be loading const title = await page.getByTestId('embedding-progress-title'); - await expect(title).toHaveText(/Loading sync status/i); - await expect(progress).not.toBeVisible(); + await expect(title).not.toHaveAttribute('data-progress', 'loading'); const count = await page.getByTestId('embedding-progress-count'); await expect(count).toHaveText(/\d+\/\d+/);