diff --git a/packages/backend/server/package.json b/packages/backend/server/package.json index bf2c97d705..ce49d6699e 100644 --- a/packages/backend/server/package.json +++ b/packages/backend/server/package.json @@ -114,6 +114,7 @@ "@affine-tools/cli": "workspace:*", "@affine-tools/utils": "workspace:*", "@affine/graphql": "workspace:*", + "@affine/realtime": "workspace:*", "@faker-js/faker": "^10.1.0", "@nestjs/swagger": "^11.2.7", "@nestjs/testing": "patch:@nestjs/testing@npm%3A11.1.18#~/.yarn/patches/@nestjs-testing-npm-11.1.18-32c0f6af12.patch", diff --git a/packages/backend/server/src/__tests__/e2e/notification/resolver.spec.ts b/packages/backend/server/src/__tests__/e2e/notification/resolver.spec.ts index 8e2c2a53df..1dd0748f3a 100644 --- a/packages/backend/server/src/__tests__/e2e/notification/resolver.spec.ts +++ b/packages/backend/server/src/__tests__/e2e/notification/resolver.spec.ts @@ -273,7 +273,7 @@ e2e('should mark notification as read', async t => { const count = await app.gql({ query: notificationCountQuery, }); - t.is(count.currentUser!.notifications.totalCount, 0); + t.is(count.currentUser!.notificationCount, 0); // read again should work for (const notification of notifications) { diff --git a/packages/backend/server/src/app.module.ts b/packages/backend/server/src/app.module.ts index eaa5ac2849..0bc74a019d 100644 --- a/packages/backend/server/src/app.module.ts +++ b/packages/backend/server/src/app.module.ts @@ -42,6 +42,7 @@ import { NotificationModule } from './core/notification'; import { PermissionModule } from './core/permission'; import { QueueDashboardModule } from './core/queue-dashboard'; import { QuotaModule } from './core/quota'; +import { RealtimeModule } from './core/realtime'; import { SelfhostModule } from './core/selfhost'; import { StaticFileModule } from './core/static-files'; import { StorageModule } from './core/storage'; @@ -117,6 +118,7 @@ export const FunctionalityModules = [ ErrorModule, WebSocketModule, JobModule.forRoot(), + RealtimeModule, ModelsModule, ScheduleModule.forRoot(), MonitorModule, diff --git a/packages/backend/server/src/core/comment/index.ts b/packages/backend/server/src/core/comment/index.ts index d342d274db..562175b724 100644 --- a/packages/backend/server/src/core/comment/index.ts +++ b/packages/backend/server/src/core/comment/index.ts @@ -3,12 +3,13 @@ import { Module } from '@nestjs/common'; import { ServerConfigModule } from '../config'; import { PermissionModule } from '../permission'; import { StorageModule } from '../storage'; +import { CommentRealtimeProvider } from './realtime'; import { CommentResolver } from './resolver'; import { CommentService } from './service'; @Module({ imports: [PermissionModule, StorageModule, ServerConfigModule], - providers: [CommentResolver, CommentService], + providers: [CommentResolver, CommentService, CommentRealtimeProvider], exports: [CommentService], }) export class CommentModule {} diff --git a/packages/backend/server/src/core/comment/realtime.ts b/packages/backend/server/src/core/comment/realtime.ts new file mode 100644 index 0000000000..dcb6165bc8 --- /dev/null +++ b/packages/backend/server/src/core/comment/realtime.ts @@ -0,0 +1,99 @@ +import { Injectable, OnModuleInit, Optional } from '@nestjs/common'; +import { z } from 'zod'; + +import { decodeWithJson, encodeWithJson } from '../../base/graphql'; +import { AccessController } from '../permission'; +import type { RealtimePublisher, RealtimeRegistry } from '../realtime'; +import type { CommentCursor } from './resolver'; +import { CommentService } from './service'; + +export function commentRoom(workspaceId: string, docId: string) { + return `workspace:${workspaceId}:doc:${docId}:comment`; +} + +@Injectable() +export class CommentRealtimeProvider implements OnModuleInit { + constructor( + private readonly service: CommentService, + private readonly ac: AccessController, + @Optional() private readonly registry?: RealtimeRegistry + ) {} + + onModuleInit() { + const input = z.object({ + workspaceId: z.string(), + docId: z.string(), + after: z.string().optional(), + first: z.number().optional(), + }); + + this.registry?.registerRequest({ + name: 'comment.changes.get', + input, + handle: async (user, payload) => { + await this.assertRead(user.id, payload.workspaceId, payload.docId); + const cursor: CommentCursor = decodeWithJson(payload.after) ?? {}; + const changes = await this.service.listCommentChanges( + payload.workspaceId, + payload.docId, + { + commentUpdatedAt: cursor.commentUpdatedAt, + replyUpdatedAt: cursor.replyUpdatedAt, + take: payload.first, + } + ); + const endCursor = cursor; + for (const change of changes) { + if (change.commentId) { + endCursor.replyUpdatedAt = change.item.updatedAt; + } else { + endCursor.commentUpdatedAt = change.item.updatedAt; + } + } + return { + changes: changes.map(change => ({ + id: change.id, + action: change.action, + item: change.item, + commentId: change.commentId ?? undefined, + })) as never, + startCursor: '', + endCursor: encodeWithJson(endCursor), + hasNextPage: changes.length > 0, + }; + }, + }); + + this.registry?.registerTopic({ + name: 'comment.changed', + input: z.object({ + workspaceId: z.string(), + docId: z.string(), + }), + authorize: async (user, payload) => { + await this.assertRead(user.id, payload.workspaceId, payload.docId); + }, + room: (_user, payload) => commentRoom(payload.workspaceId, payload.docId), + }); + } + + private async assertRead(userId: string, workspaceId: string, docId: string) { + await this.ac + .user(userId) + .workspace(workspaceId) + .doc(docId) + .assert('Doc.Comments.Read'); + } +} + +export function publishCommentChanged( + publisher: RealtimePublisher | undefined, + workspaceId: string, + docId: string +) { + publisher?.publish( + 'comment.changed', + { workspaceId, docId }, + { changed: true } + ); +} diff --git a/packages/backend/server/src/core/comment/resolver.ts b/packages/backend/server/src/core/comment/resolver.ts index 6211bda2d7..1ae35871cc 100644 --- a/packages/backend/server/src/core/comment/resolver.ts +++ b/packages/backend/server/src/core/comment/resolver.ts @@ -1,5 +1,6 @@ import { randomUUID } from 'node:crypto'; +import { Optional } from '@nestjs/common'; import { Args, Mutation, @@ -26,9 +27,11 @@ import { Comment, DocMode, Models, Reply } from '../../models'; import { CurrentUser } from '../auth/session'; import { ServerFeature, ServerService } from '../config'; import { AccessController, DocAction } from '../permission'; +import type { RealtimePublisher } from '../realtime'; import { CommentAttachmentStorage } from '../storage'; import { UserType } from '../user'; import { WorkspaceType } from '../workspaces'; +import { publishCommentChanged } from './realtime'; import { CommentService } from './service'; import { CommentCreateInput, @@ -56,7 +59,8 @@ export class CommentResolver { private readonly commentAttachmentStorage: CommentAttachmentStorage, private readonly queue: JobQueue, private readonly models: Models, - private readonly server: ServerService + private readonly server: ServerService, + @Optional() private readonly realtime?: RealtimePublisher ) { // enable comment feature by default this.server.enableFeature(ServerFeature.Comment); @@ -81,6 +85,7 @@ export class CommentResolver { input.docMode, input.mentions ); + publishCommentChanged(this.realtime, comment.workspaceId, comment.docId); return { ...comment, @@ -108,6 +113,7 @@ export class CommentResolver { await this.assertPermission(me, comment, 'Doc.Comments.Update'); await this.service.updateComment(input); + publishCommentChanged(this.realtime, comment.workspaceId, comment.docId); return true; } @@ -126,6 +132,7 @@ export class CommentResolver { await this.assertPermission(me, comment, 'Doc.Comments.Resolve'); await this.service.resolveComment(input); + publishCommentChanged(this.realtime, comment.workspaceId, comment.docId); return true; } @@ -141,6 +148,7 @@ export class CommentResolver { await this.assertPermission(me, comment, 'Doc.Comments.Delete'); await this.service.deleteComment(id); + publishCommentChanged(this.realtime, comment.workspaceId, comment.docId); return true; } @@ -169,6 +177,7 @@ export class CommentResolver { input.mentions, reply ); + publishCommentChanged(this.realtime, comment.workspaceId, comment.docId); return { ...reply, @@ -195,6 +204,7 @@ export class CommentResolver { await this.assertPermission(me, reply, 'Doc.Comments.Update'); await this.service.updateReply(input); + publishCommentChanged(this.realtime, reply.workspaceId, reply.docId); return true; } @@ -210,6 +220,7 @@ export class CommentResolver { await this.assertPermission(me, reply, 'Doc.Comments.Delete'); await this.service.deleteReply(id); + publishCommentChanged(this.realtime, reply.workspaceId, reply.docId); return true; } @@ -294,6 +305,7 @@ export class CommentResolver { }) pagination: PaginationInput ): Promise { + // DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients. await this.assertPermission( me, { diff --git a/packages/backend/server/src/core/notification/index.ts b/packages/backend/server/src/core/notification/index.ts index 1c1a11f87f..57c0211024 100644 --- a/packages/backend/server/src/core/notification/index.ts +++ b/packages/backend/server/src/core/notification/index.ts @@ -5,6 +5,7 @@ import { MailModule } from '../mail'; import { PermissionModule } from '../permission'; import { StorageModule } from '../storage'; import { NotificationJob } from './job'; +import { NotificationRealtimeProvider } from './realtime'; import { NotificationResolver, UserNotificationResolver } from './resolver'; import { NotificationService } from './service'; @@ -15,6 +16,7 @@ import { NotificationService } from './service'; NotificationResolver, NotificationService, NotificationJob, + NotificationRealtimeProvider, ], exports: [NotificationService], }) diff --git a/packages/backend/server/src/core/notification/realtime-room.ts b/packages/backend/server/src/core/notification/realtime-room.ts new file mode 100644 index 0000000000..de24be2033 --- /dev/null +++ b/packages/backend/server/src/core/notification/realtime-room.ts @@ -0,0 +1,3 @@ +export function notificationCountRoom(userId: string) { + return `user:${userId}:notification`; +} diff --git a/packages/backend/server/src/core/notification/realtime.ts b/packages/backend/server/src/core/notification/realtime.ts new file mode 100644 index 0000000000..79e77c0822 --- /dev/null +++ b/packages/backend/server/src/core/notification/realtime.ts @@ -0,0 +1,36 @@ +import { Injectable, OnModuleInit, Optional } from '@nestjs/common'; +import { z } from 'zod'; + +import type { RealtimeRegistry } from '../realtime'; +import { notificationCountRoom } from './realtime-room'; +import { NotificationService } from './service'; + +@Injectable() +export class NotificationRealtimeProvider implements OnModuleInit { + constructor( + private readonly service: NotificationService, + @Optional() private readonly registry?: RealtimeRegistry + ) {} + + onModuleInit() { + this.registry?.registerRequest({ + name: 'notification.count.get', + input: z.object({}).strict(), + handle: async user => ({ + count: await this.service.countByUserId(user.id), + }), + }); + + this.registry?.registerTopic({ + name: 'notification.count.changed', + input: z.object({}).strict(), + authorize: async () => {}, + room: user => { + if (!user) { + throw new Error('User is required for notification count room'); + } + return notificationCountRoom(user.id); + }, + }); + } +} diff --git a/packages/backend/server/src/core/notification/resolver.ts b/packages/backend/server/src/core/notification/resolver.ts index 54829378c0..41bdd6d000 100644 --- a/packages/backend/server/src/core/notification/resolver.ts +++ b/packages/backend/server/src/core/notification/resolver.ts @@ -47,8 +47,11 @@ export class UserNotificationResolver { @ResolveField(() => Int, { description: 'Get user notification count', + deprecationReason: + 'Use realtime subscription "notification.count.changed" instead.', }) async notificationCount(@CurrentUser() me: UserType): Promise { + // DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients. return await this.service.countByUserId(me.id); } diff --git a/packages/backend/server/src/core/notification/service.ts b/packages/backend/server/src/core/notification/service.ts index 529d8397b1..439d7b282c 100644 --- a/packages/backend/server/src/core/notification/service.ts +++ b/packages/backend/server/src/core/notification/service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, Optional } from '@nestjs/common'; import { Prisma } from '@prisma/client'; import { NotificationNotFound, PaginationInput, URLHelper } from '../../base'; @@ -17,11 +17,13 @@ import { } from '../../models'; import { DocReader } from '../doc'; import { Mailer } from '../mail'; +import type { RealtimePublisher } from '../realtime'; import { generateDocPath } from '../utils/doc'; import { generateWorkspaceSettingsPath, WorkspaceSettingsTab, } from '../utils/workspace'; +import { notificationCountRoom } from './realtime-room'; @Injectable() export class NotificationService { @@ -31,11 +33,22 @@ export class NotificationService { private readonly models: Models, private readonly docReader: DocReader, private readonly mailer: Mailer, - private readonly url: URLHelper + private readonly url: URLHelper, + @Optional() private readonly realtime?: RealtimePublisher ) {} async cleanExpiredNotifications() { - return await this.models.notification.cleanExpiredNotifications(); + const userIds = + await this.models.notification.findExpiredNotificationUserIds(); + const count = await this.models.notification.cleanExpiredNotifications(); + if (count > 0) { + await Promise.all( + userIds.map(userId => + this.publishCountChanged(userId, 'expired-cleanup') + ) + ); + } + return count; } async createComment(input: CommentNotificationCreate, isMention?: boolean) { @@ -43,6 +56,7 @@ export class NotificationService { ? await this.models.notification.createCommentMention(input) : await this.models.notification.createComment(input); await this.sendCommentEmail(input, isMention); + await this.publishCountChanged(input.userId, 'created'); return notification; } @@ -93,6 +107,7 @@ export class NotificationService { async createMention(input: MentionNotificationCreate) { const notification = await this.models.notification.createMention(input); await this.sendMentionEmail(input); + await this.publishCountChanged(input.userId, 'created'); return notification; } @@ -147,6 +162,7 @@ export class NotificationService { NotificationType.Invitation ); await this.sendInvitationEmail(input); + await this.publishCountChanged(input.userId, 'created'); return notification; } @@ -198,6 +214,7 @@ export class NotificationService { NotificationType.InvitationAccepted ); await this.sendInvitationAcceptedEmail(input); + await this.publishCountChanged(input.userId, 'created'); return notification; } @@ -240,18 +257,22 @@ export class NotificationService { async createInvitationBlocked(input: InvitationNotificationCreate) { await this.ensureWorkspaceContentExists(input.body.workspaceId); - return await this.models.notification.createInvitation( + const notification = await this.models.notification.createInvitation( input, NotificationType.InvitationBlocked ); + await this.publishCountChanged(input.userId, 'created'); + return notification; } async createInvitationRejected(input: InvitationNotificationCreate) { await this.ensureWorkspaceContentExists(input.body.workspaceId); - return await this.models.notification.createInvitation( + const notification = await this.models.notification.createInvitation( input, NotificationType.InvitationRejected ); + await this.publishCountChanged(input.userId, 'created'); + return notification; } async createInvitationReviewRequest(input: InvitationNotificationCreate) { @@ -267,6 +288,7 @@ export class NotificationService { NotificationType.InvitationReviewRequest ); await this.sendInvitationReviewRequestEmail(input); + await this.publishCountChanged(input.userId, 'created'); return notification; } @@ -315,6 +337,7 @@ export class NotificationService { NotificationType.InvitationReviewApproved ); await this.sendInvitationReviewApprovedEmail(input); + await this.publishCountChanged(input.userId, 'created'); return notification; } @@ -354,6 +377,7 @@ export class NotificationService { const notification = await this.models.notification.createInvitationReviewDeclined(input); await this.sendInvitationReviewDeclinedEmail(input); + await this.publishCountChanged(input.userId, 'created'); return notification; } @@ -397,10 +421,12 @@ export class NotificationService { } throw err; } + await this.publishCountChanged(userId, 'read'); } async markAllAsRead(userId: string) { await this.models.notification.markAllAsRead(userId); + await this.publishCountChanged(userId, 'read-all'); } /** @@ -463,6 +489,26 @@ export class NotificationService { return await this.models.notification.countByUserId(userId); } + private async publishCountChanged( + userId: string, + reason: 'created' | 'read' | 'read-all' | 'expired-cleanup' + ) { + if (!this.realtime) return; + try { + this.realtime.publish( + 'notification.count.changed', + {}, + { count: await this.countByUserId(userId), reason }, + { room: notificationCountRoom(userId) } + ); + } catch (error) { + this.logger.error( + `Failed to publish notification count for user ${userId}`, + error + ); + } + } + private formatWorkspaceInfo(workspace: Workspace) { return { id: workspace.id, diff --git a/packages/backend/server/src/core/realtime/__tests__/registry.spec.ts b/packages/backend/server/src/core/realtime/__tests__/registry.spec.ts new file mode 100644 index 0000000000..b63b0186f1 --- /dev/null +++ b/packages/backend/server/src/core/realtime/__tests__/registry.spec.ts @@ -0,0 +1,170 @@ +import test from 'ava'; +import { z } from 'zod'; + +import type { CurrentUser } from '../../auth'; +import { RealtimeGateway } from '../gateway'; +import type { RealtimePublisher } from '../publisher'; +import { RealtimeRegistry } from '../registry'; +import { stableStringify } from '../stable-stringify'; + +const user: CurrentUser = { + id: 'u1', + email: 'u1@affine.pro', + name: 'User', + avatarUrl: null, + disabled: false, + hasPassword: true, + emailVerified: true, +}; + +function createGateway(registry: RealtimeRegistry) { + return new RealtimeGateway(registry, { + attachServer() {}, + publishLocal() {}, + } as unknown as RealtimePublisher); +} + +test('registry rejects duplicate request and topic handlers', t => { + const registry = new RealtimeRegistry(); + const request = { + name: 'notification.count.get' as const, + input: z.object({}).strict(), + handle: async () => ({ count: 0 }), + }; + const topic = { + name: 'notification.count.changed' as const, + input: z.object({}).strict(), + authorize: async () => {}, + room: () => 'room', + }; + + registry.registerRequest(request); + registry.registerTopic(topic); + + t.throws(() => registry.registerRequest(request), { + message: /already registered/, + }); + t.throws(() => registry.registerTopic(topic), { + message: /already registered/, + }); +}); + +test('gateway handles registered request with version gate', async t => { + const registry = new RealtimeRegistry(); + registry.registerRequest({ + name: 'notification.count.get', + input: z.object({}).strict(), + handle: async currentUser => ({ count: currentUser.id === 'u1' ? 1 : 0 }), + }); + const gateway = createGateway(registry); + + t.deepEqual( + await gateway.onRequest(user, { + op: 'notification.count.get', + input: {}, + clientVersion: '0.26.0', + }), + { data: { count: 1 } } + ); + t.like( + await gateway.onRequest(user, { + op: 'notification.count.get', + input: {}, + clientVersion: '0.25.0', + }), + { error: { code: 'UNSUPPORTED_CLIENT_VERSION' } } + ); +}); + +test('gateway authorizes subscription and joins room', async t => { + const registry = new RealtimeRegistry(); + registry.registerTopic({ + name: 'comment.changed', + input: z.object({ workspaceId: z.string(), docId: z.string() }), + authorize: async (_currentUser, input) => { + if (input.workspaceId !== 'space') { + throw new Error('denied'); + } + }, + room: (_currentUser, input) => `workspace:${input.workspaceId}`, + }); + const gateway = createGateway(registry); + const joined: string[] = []; + const client = { + id: 'socket-1', + join: async (room: string) => { + joined.push(room); + }, + leave: async (room: string) => { + joined.splice(joined.indexOf(room), 1); + }, + }; + + const result = await gateway.onSubscribe(user, client as never, { + topic: 'comment.changed', + input: { workspaceId: 'space', docId: 'doc' }, + clientVersion: '0.26.0', + }); + + t.deepEqual(joined, ['workspace:space']); + t.deepEqual(result, { + data: { + subscriptionId: `socket-1:comment.changed:${stableStringify({ + workspaceId: 'space', + docId: 'doc', + })}`, + }, + }); + + t.like( + await gateway.onSubscribe(user, client as never, { + topic: 'comment.changed', + input: { workspaceId: 'other', docId: 'doc' }, + clientVersion: '0.26.0', + }), + { error: { code: 'INTERNAL_SERVER_ERROR' } } + ); +}); + +test('stableStringify is deterministic for subscription input keys', t => { + t.is( + stableStringify({ docId: 'doc', workspaceId: 'space' }), + stableStringify({ workspaceId: 'space', docId: 'doc' }) + ); +}); + +test('stableStringify follows JSON semantics for subscription input keys', t => { + t.is(stableStringify({ after: undefined }), stableStringify({})); + t.is(stableStringify([undefined]), '[null]'); + t.is( + stableStringify(new Date('2026-01-02T03:04:05.000Z')), + '"2026-01-02T03:04:05.000Z"' + ); +}); + +test('gateway removes subscriptions on socket disconnect', async t => { + const registry = new RealtimeRegistry(); + registry.registerTopic({ + name: 'notification.count.changed', + input: z.object({}).strict(), + authorize: async () => {}, + room: () => 'user:u1:notification-count', + }); + const gateway = createGateway(registry); + const client = { + id: 'socket-1', + join: async () => {}, + leave: async () => {}, + }; + + await gateway.onSubscribe(user, client as never, { + topic: 'notification.count.changed', + input: {}, + clientVersion: '0.26.0', + }); + t.is((gateway as any).subscriptions.size, 1); + + gateway.handleDisconnect(client as never); + + t.is((gateway as any).subscriptions.size, 0); +}); diff --git a/packages/backend/server/src/core/realtime/gateway.ts b/packages/backend/server/src/core/realtime/gateway.ts new file mode 100644 index 0000000000..bf04d16dac --- /dev/null +++ b/packages/backend/server/src/core/realtime/gateway.ts @@ -0,0 +1,136 @@ +import type { + RealtimeRequestEnvelope, + RealtimeSubscribeEnvelope, + RealtimeUnsubscribeEnvelope, +} from '@affine/realtime'; +import { applyDecorators, Logger, UseInterceptors } from '@nestjs/common'; +import { + ConnectedSocket, + MessageBody, + OnGatewayDisconnect, + OnGatewayInit, + SubscribeMessage as RawSubscribeMessage, + WebSocketGateway, + WebSocketServer, +} from '@nestjs/websockets'; +import { ClsInterceptor } from 'nestjs-cls'; +import semver from 'semver'; +import type { Server, Socket } from 'socket.io'; + +import { + GatewayErrorWrapper, + OnEvent, + UnsupportedClientVersion, +} from '../../base'; +import { CurrentUser } from '../auth'; +import { RealtimePublisher } from './publisher'; +import { RealtimeRegistry } from './registry'; +import { stableStringify } from './stable-stringify'; +import type { RealtimePublishPayload } from './types'; + +const SubscribeMessage = (event: string) => + applyDecorators(GatewayErrorWrapper(event), RawSubscribeMessage(event)); + +const MIN_REALTIME_CLIENT_VERSION = new semver.Range('>=0.26.0-0', { + includePrerelease: true, +}); + +@WebSocketGateway() +@UseInterceptors(ClsInterceptor) +export class RealtimeGateway implements OnGatewayInit, OnGatewayDisconnect { + private readonly logger = new Logger(RealtimeGateway.name); + private readonly subscriptions = new Map< + string, + { socketId: string; room: string } + >(); + + @WebSocketServer() + private readonly server!: Server; + + constructor( + private readonly registry: RealtimeRegistry, + private readonly publisher: RealtimePublisher + ) {} + + afterInit(_server: Server) { + this.publisher.attachServer(this.server); + } + + handleDisconnect(client: Socket) { + for (const [subscriptionId, subscription] of this.subscriptions) { + if (subscription.socketId === client.id) { + this.subscriptions.delete(subscriptionId); + } + } + } + + @SubscribeMessage('realtime:request') + async onRequest( + @CurrentUser() user: CurrentUser, + @MessageBody() envelope: RealtimeRequestEnvelope + ) { + this.assertVersion(envelope.clientVersion); + const handler = this.registry.getRequest(envelope.op); + const input = handler.input.parse(envelope.input); + return { data: await handler.handle(user, input as never) }; + } + + @SubscribeMessage('realtime:subscribe') + async onSubscribe( + @CurrentUser() user: CurrentUser, + @ConnectedSocket() client: Socket, + @MessageBody() envelope: RealtimeSubscribeEnvelope + ) { + this.assertVersion(envelope.clientVersion); + const handler = this.registry.getTopic(envelope.topic); + const input = handler.input.parse(envelope.input); + await handler.authorize(user, input as never); + const room = handler.room(user, input as never); + await client.join(room); + const subscriptionId = `${client.id}:${envelope.topic}:${stableStringify(input)}`; + this.subscriptions.set(subscriptionId, { + socketId: client.id, + room, + }); + return { data: { subscriptionId } }; + } + + @SubscribeMessage('realtime:unsubscribe') + async onUnsubscribe( + @ConnectedSocket() client: Socket, + @MessageBody() envelope: RealtimeUnsubscribeEnvelope + ) { + this.assertVersion(envelope.clientVersion); + if (!envelope.subscriptionId) { + return { data: { ok: true } }; + } + const subscription = this.subscriptions.get(envelope.subscriptionId); + if (subscription?.socketId === client.id) { + await client.leave(subscription.room); + this.subscriptions.delete(envelope.subscriptionId); + } + return { data: { ok: true } }; + } + + @OnEvent('realtime.topic.changed', { suppressError: true }) + onRealtimeTopicChanged(payload: RealtimePublishPayload) { + try { + this.publisher.publishLocal(payload); + } catch (error) { + this.logger.error('Failed to publish realtime event', error); + } + } + + private assertVersion(clientVersion?: string) { + if ( + !clientVersion || + !semver.valid(clientVersion) || + !MIN_REALTIME_CLIENT_VERSION.test(clientVersion) + ) { + throw new UnsupportedClientVersion({ + clientVersion: clientVersion ?? 'unset_or_invalid', + requiredVersion: '>=0.26.0', + }); + } + } +} diff --git a/packages/backend/server/src/core/realtime/index.ts b/packages/backend/server/src/core/realtime/index.ts new file mode 100644 index 0000000000..6b96b4546c --- /dev/null +++ b/packages/backend/server/src/core/realtime/index.ts @@ -0,0 +1,16 @@ +import { Global, Module } from '@nestjs/common'; + +import { RealtimeGateway } from './gateway'; +import { RealtimePublisher } from './publisher'; +import { RealtimeRegistry } from './registry'; + +@Global() +@Module({ + providers: [RealtimeRegistry, RealtimePublisher, RealtimeGateway], + exports: [RealtimeRegistry, RealtimePublisher], +}) +export class RealtimeModule {} + +export { RealtimePublisher } from './publisher'; +export { RealtimeRegistry } from './registry'; +export type { RealtimeRequestHandler, RealtimeTopicHandler } from './types'; diff --git a/packages/backend/server/src/core/realtime/publisher.ts b/packages/backend/server/src/core/realtime/publisher.ts new file mode 100644 index 0000000000..8011dcb655 --- /dev/null +++ b/packages/backend/server/src/core/realtime/publisher.ts @@ -0,0 +1,55 @@ +import type { RealtimeEvent, RealtimeTopicName } from '@affine/realtime'; +import { Injectable, Logger } from '@nestjs/common'; +import type { Server } from 'socket.io'; + +import { EventBus } from '../../base'; +import { RealtimeRegistry } from './registry'; +import { stableStringify } from './stable-stringify'; +import type { RealtimePublishPayload } from './types'; + +@Injectable() +export class RealtimePublisher { + private readonly logger = new Logger(RealtimePublisher.name); + private server?: Server; + + constructor( + private readonly registry: RealtimeRegistry, + private readonly event: EventBus + ) {} + + attachServer(server: Server) { + this.server = server; + } + + publish( + topic: Topic, + input: RealtimePublishPayload['input'], + event: RealtimePublishPayload['event'], + options?: { room?: string } + ) { + const payload = { + topic, + input, + event, + room: options?.room, + } as RealtimePublishPayload; + try { + this.publishLocal(payload); + this.event.broadcast('realtime.topic.changed', payload); + } catch (error) { + this.logger.error(`Failed to publish realtime topic ${topic}`, error); + } + } + + publishLocal(payload: RealtimePublishPayload) { + const handler = this.registry.getTopic(payload.topic); + const room = payload.room ?? handler.room(null, payload.input as never); + const envelope: RealtimeEvent = { + topic: payload.topic, + inputKey: stableStringify(payload.input), + sentAt: Date.now(), + event: payload.event as never, + }; + this.server?.to(room).emit('realtime:event', envelope); + } +} diff --git a/packages/backend/server/src/core/realtime/registry.ts b/packages/backend/server/src/core/realtime/registry.ts new file mode 100644 index 0000000000..744e75f4f2 --- /dev/null +++ b/packages/backend/server/src/core/realtime/registry.ts @@ -0,0 +1,60 @@ +import type { RealtimeRequestName, RealtimeTopicName } from '@affine/realtime'; +import { Injectable } from '@nestjs/common'; + +import type { RealtimeRequestHandler, RealtimeTopicHandler } from './types'; + +@Injectable() +export class RealtimeRegistry { + private readonly requests = new Map< + RealtimeRequestName, + RealtimeRequestHandler + >(); + private readonly topics = new Map< + RealtimeTopicName, + RealtimeTopicHandler + >(); + + registerRequest( + handler: RealtimeRequestHandler + ) { + if (this.requests.has(handler.name)) { + throw new Error( + `Realtime request handler already registered: ${handler.name}` + ); + } + this.requests.set( + handler.name, + handler as RealtimeRequestHandler + ); + } + + registerTopic( + handler: RealtimeTopicHandler + ) { + if (this.topics.has(handler.name)) { + throw new Error( + `Realtime topic handler already registered: ${handler.name}` + ); + } + this.topics.set( + handler.name, + handler as RealtimeTopicHandler + ); + } + + getRequest(name: RealtimeRequestName) { + const handler = this.requests.get(name); + if (!handler) { + throw new Error(`Realtime request handler not found: ${name}`); + } + return handler; + } + + getTopic(name: RealtimeTopicName) { + const handler = this.topics.get(name); + if (!handler) { + throw new Error(`Realtime topic handler not found: ${name}`); + } + return handler; + } +} diff --git a/packages/backend/server/src/core/realtime/stable-stringify.ts b/packages/backend/server/src/core/realtime/stable-stringify.ts new file mode 100644 index 0000000000..aaaa7f1b5a --- /dev/null +++ b/packages/backend/server/src/core/realtime/stable-stringify.ts @@ -0,0 +1,31 @@ +export function stableStringify(value: unknown): string { + if ( + value === undefined || + typeof value === 'function' || + typeof value === 'symbol' + ) { + return 'null'; + } + if (value === null || typeof value !== 'object') { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return `[${value.map(stableStringify).join(',')}]`; + } + if (value instanceof Date) { + return JSON.stringify(value.toJSON()); + } + const record = value as Record; + return `{${Object.keys(record) + .filter(key => { + const property = record[key]; + return ( + property !== undefined && + typeof property !== 'function' && + typeof property !== 'symbol' + ); + }) + .sort() + .map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`) + .join(',')}}`; +} diff --git a/packages/backend/server/src/core/realtime/types.ts b/packages/backend/server/src/core/realtime/types.ts new file mode 100644 index 0000000000..e1b07eaadb --- /dev/null +++ b/packages/backend/server/src/core/realtime/types.ts @@ -0,0 +1,45 @@ +import type { + RealtimeRequestInputOf, + RealtimeRequestName, + RealtimeRequestOutputOf, + RealtimeTopicEventOf, + RealtimeTopicInputOf, + RealtimeTopicName, +} from '@affine/realtime'; +import type { z } from 'zod'; + +import type { CurrentUser } from '../auth'; + +declare global { + interface Events { + 'realtime.topic.changed': RealtimePublishPayload; + } +} + +export type RealtimeRequestHandler = { + name: Op; + input: z.ZodType>; + handle( + user: CurrentUser, + input: RealtimeRequestInputOf + ): Promise>; +}; + +export type RealtimeTopicHandler = { + name: Topic; + input: z.ZodType>; + authorize( + user: CurrentUser, + input: RealtimeTopicInputOf + ): Promise; + room(user: CurrentUser | null, input: RealtimeTopicInputOf): string; +}; + +export type RealtimePublishPayload< + Topic extends RealtimeTopicName = RealtimeTopicName, +> = { + topic: Topic; + input: RealtimeTopicInputOf; + event: RealtimeTopicEventOf; + room?: string; +}; diff --git a/packages/backend/server/src/models/notification.ts b/packages/backend/server/src/models/notification.ts index 6d1888bd87..5039131ff2 100644 --- a/packages/backend/server/src/models/notification.ts +++ b/packages/backend/server/src/models/notification.ts @@ -311,6 +311,15 @@ export class NotificationModel extends BaseModel { return row as UnionNotification; } + async findExpiredNotificationUserIds() { + const rows = await this.db.notification.findMany({ + distinct: ['userId'], + select: { userId: true }, + where: { createdAt: { lte: new Date(Date.now() - ONE_YEAR) } }, + }); + return rows.map(row => row.userId); + } + async cleanExpiredNotifications() { const { count } = await this.db.notification.deleteMany({ // delete notifications that are older than one year diff --git a/packages/backend/server/src/plugins/copilot/context/index.ts b/packages/backend/server/src/plugins/copilot/context/index.ts index d3afb3caa2..4cae01d15a 100644 --- a/packages/backend/server/src/plugins/copilot/context/index.ts +++ b/packages/backend/server/src/plugins/copilot/context/index.ts @@ -1,2 +1,3 @@ +export { CopilotEmbeddingRealtimeProvider } from './realtime'; export { CopilotContextResolver, CopilotContextRootResolver } from './resolver'; export { CopilotContextService } from './service'; diff --git a/packages/backend/server/src/plugins/copilot/context/realtime.ts b/packages/backend/server/src/plugins/copilot/context/realtime.ts new file mode 100644 index 0000000000..4e55a356ce --- /dev/null +++ b/packages/backend/server/src/plugins/copilot/context/realtime.ts @@ -0,0 +1,105 @@ +import { Injectable, OnModuleInit, Optional } from '@nestjs/common'; +import { z } from 'zod'; + +import { OnEvent } from '../../../base'; +import { AccessController } from '../../../core/permission'; +import type { + RealtimePublisher, + RealtimeRegistry, +} from '../../../core/realtime'; +import { Models } from '../../../models'; +import { CopilotContextService } from './service'; + +export function workspaceEmbeddingRoom(workspaceId: string) { + return `workspace:${workspaceId}:embedding-progress`; +} + +@Injectable() +export class CopilotEmbeddingRealtimeProvider implements OnModuleInit { + constructor( + private readonly ac: AccessController, + private readonly models: Models, + private readonly context: CopilotContextService, + @Optional() private readonly registry?: RealtimeRegistry, + @Optional() private readonly publisher?: RealtimePublisher + ) {} + + onModuleInit() { + const input = z.object({ workspaceId: z.string() }); + + this.registry?.registerRequest({ + name: 'workspace.embedding.progress.get', + input, + handle: async (user, payload) => { + await this.assertCopilot(user.id, payload.workspaceId); + if (!this.context.canEmbedding) { + return { total: 0, embedded: 0 }; + } + return await this.models.copilotWorkspace.getEmbeddingStatus( + payload.workspaceId + ); + }, + }); + + this.registry?.registerTopic({ + name: 'workspace.embedding.progress.changed', + input, + authorize: async (user, payload) => { + await this.assertCopilot(user.id, payload.workspaceId); + }, + room: (_user, payload) => workspaceEmbeddingRoom(payload.workspaceId), + }); + } + + @OnEvent('workspace.doc.embed.finished', { suppressError: true }) + async onDocEmbedFinished(payload: Events['workspace.doc.embed.finished']) { + await this.publishContext(payload.contextId, 'finished'); + } + + @OnEvent('workspace.doc.embed.failed', { suppressError: true }) + async onDocEmbedFailed(payload: Events['workspace.doc.embed.failed']) { + await this.publishContext(payload.contextId, 'failed'); + } + + @OnEvent('workspace.file.embed.finished', { suppressError: true }) + async onFileEmbedFinished(payload: Events['workspace.file.embed.finished']) { + await this.publishContext(payload.contextId, 'finished'); + } + + @OnEvent('workspace.file.embed.failed', { suppressError: true }) + async onFileEmbedFailed(payload: Events['workspace.file.embed.failed']) { + await this.publishContext(payload.contextId, 'failed'); + } + + @OnEvent('workspace.blob.embed.finished', { suppressError: true }) + async onBlobEmbedFinished(payload: Events['workspace.blob.embed.finished']) { + await this.publishContext(payload.contextId, 'finished'); + } + + @OnEvent('workspace.blob.embed.failed', { suppressError: true }) + async onBlobEmbedFailed(payload: Events['workspace.blob.embed.failed']) { + await this.publishContext(payload.contextId, 'failed'); + } + + private async publishContext( + contextId: string, + reason: 'finished' | 'failed' + ) { + if (!this.publisher) return; + const context = await this.context.get(contextId); + this.publisher.publish( + 'workspace.embedding.progress.changed', + { workspaceId: context.workspaceId }, + { reason }, + { room: workspaceEmbeddingRoom(context.workspaceId) } + ); + } + + private async assertCopilot(userId: string, workspaceId: string) { + await this.ac + .user(userId) + .workspace(workspaceId) + .allowLocal() + .assert('Workspace.Copilot'); + } +} diff --git a/packages/backend/server/src/plugins/copilot/context/resolver.ts b/packages/backend/server/src/plugins/copilot/context/resolver.ts index 6d401e4ef0..23caa4c79a 100644 --- a/packages/backend/server/src/plugins/copilot/context/resolver.ts +++ b/packages/backend/server/src/plugins/copilot/context/resolver.ts @@ -412,12 +412,15 @@ export class CopilotContextRootResolver { @Throttle('strict') @Query(() => ContextWorkspaceEmbeddingStatus, { description: 'query workspace embedding status', + deprecationReason: + 'Use realtime subscription "workspace.embedding.progress.changed" instead.', }) @CallMetric('ai', 'context_query_workspace_embedding_status') async queryWorkspaceEmbeddingStatus( @CurrentUser() user: CurrentUser, @Args('workspaceId') workspaceId: string ): Promise { + // DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients. await this.ac .user(user.id) .workspace(workspaceId) diff --git a/packages/backend/server/src/plugins/copilot/module-providers.ts b/packages/backend/server/src/plugins/copilot/module-providers.ts index c2d7bf4495..3cc1e8ca28 100644 --- a/packages/backend/server/src/plugins/copilot/module-providers.ts +++ b/packages/backend/server/src/plugins/copilot/module-providers.ts @@ -13,6 +13,7 @@ import { CopilotContextResolver, CopilotContextRootResolver, CopilotContextService, + CopilotEmbeddingRealtimeProvider, } from './context'; import { ConversationInboxService } from './conversation/inbox'; import { ConversationPolicy } from './conversation/policy'; @@ -55,6 +56,7 @@ import { CopilotStorage } from './storage'; import { CopilotTranscriptionResolver, CopilotTranscriptionService, + CopilotTranscriptRealtimeProvider, } from './transcript'; import { CopilotWorkspaceEmbeddingConfigResolver, @@ -106,11 +108,15 @@ export const COPILOT_RUNTIME_PROVIDERS = [ TurnPersistence, ]; -export const COPILOT_CONTEXT_PROVIDERS = [CopilotContextResolver]; +export const COPILOT_CONTEXT_PROVIDERS = [ + CopilotContextResolver, + CopilotEmbeddingRealtimeProvider, +]; export const COPILOT_TRANSCRIPT_PROVIDERS = [ CopilotTranscriptionService, CopilotTranscriptionResolver, + CopilotTranscriptRealtimeProvider, ]; export const COPILOT_WORKSPACE_PROVIDERS = [ diff --git a/packages/backend/server/src/plugins/copilot/transcript/index.ts b/packages/backend/server/src/plugins/copilot/transcript/index.ts index 12cabcc69c..11f1b5541a 100644 --- a/packages/backend/server/src/plugins/copilot/transcript/index.ts +++ b/packages/backend/server/src/plugins/copilot/transcript/index.ts @@ -1,2 +1,3 @@ +export { CopilotTranscriptRealtimeProvider } from './realtime'; export { CopilotTranscriptionResolver } from './resolver'; export { CopilotTranscriptionService } from './service'; diff --git a/packages/backend/server/src/plugins/copilot/transcript/realtime.ts b/packages/backend/server/src/plugins/copilot/transcript/realtime.ts new file mode 100644 index 0000000000..17bcc05ba5 --- /dev/null +++ b/packages/backend/server/src/plugins/copilot/transcript/realtime.ts @@ -0,0 +1,69 @@ +import { Injectable, OnModuleInit, Optional } from '@nestjs/common'; +import { z } from 'zod'; + +import { CopilotTranscriptionJobNotFound } from '../../../base'; +import { AccessController } from '../../../core/permission'; +import type { RealtimeRegistry } from '../../../core/realtime'; +import { CopilotTranscriptionService, transcriptTaskRoom } from './service'; + +@Injectable() +export class CopilotTranscriptRealtimeProvider implements OnModuleInit { + constructor( + private readonly ac: AccessController, + private readonly transcript: CopilotTranscriptionService, + @Optional() private readonly registry?: RealtimeRegistry + ) {} + + onModuleInit() { + this.registry?.registerRequest({ + name: 'copilot.transcript.task.get', + input: z + .object({ + workspaceId: z.string(), + blobId: z.string().optional(), + taskId: z.string().optional(), + }) + .refine(input => input.blobId || input.taskId), + handle: async (user, input) => { + await this.assertCopilot(user.id, input.workspaceId); + return { + task: await this.transcript.queryTask( + user.id, + input.workspaceId, + input.taskId, + input.blobId + ), + }; + }, + }); + + this.registry?.registerTopic({ + name: 'copilot.transcript.task.changed', + input: z.object({ + workspaceId: z.string(), + taskId: z.string(), + }), + authorize: async (user, input) => { + await this.assertCopilot(user.id, input.workspaceId); + const task = await this.transcript.queryTask( + user.id, + input.workspaceId, + input.taskId + ); + if (!task) { + throw new CopilotTranscriptionJobNotFound(); + } + }, + room: (_user, input) => + transcriptTaskRoom(input.workspaceId, input.taskId), + }); + } + + private async assertCopilot(userId: string, workspaceId: string) { + await this.ac + .user(userId) + .workspace(workspaceId) + .allowLocal() + .assert('Workspace.Copilot'); + } +} diff --git a/packages/backend/server/src/plugins/copilot/transcript/resolver.ts b/packages/backend/server/src/plugins/copilot/transcript/resolver.ts index 535f28d741..e8d952a844 100644 --- a/packages/backend/server/src/plugins/copilot/transcript/resolver.ts +++ b/packages/backend/server/src/plugins/copilot/transcript/resolver.ts @@ -416,6 +416,8 @@ export class CopilotTranscriptionResolver { @ResolveField(() => TranscriptionResultType, { nullable: true, + deprecationReason: + 'Use realtime subscription "copilot.transcript.task.changed" instead.', }) async transcriptTask( @Parent() copilot: CopilotType, @@ -425,6 +427,7 @@ export class CopilotTranscriptionResolver { @Args('blobId', { nullable: true }) blobId?: string ): Promise { + // DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients. if (!copilot.workspaceId) return null; if (!taskId && !blobId) return null; diff --git a/packages/backend/server/src/plugins/copilot/transcript/service.ts b/packages/backend/server/src/plugins/copilot/transcript/service.ts index a922bdd86b..af509ec5a1 100644 --- a/packages/backend/server/src/plugins/copilot/transcript/service.ts +++ b/packages/backend/server/src/plugins/copilot/transcript/service.ts @@ -9,6 +9,7 @@ import { OnJob, sniffMime, } from '../../../base'; +import type { RealtimePublisher } from '../../../core/realtime'; import { Models } from '../../../models'; import { CopilotAccessPolicy } from '../access'; import { PromptService } from '../prompt'; @@ -32,6 +33,10 @@ const TRANSCRIPT_ACTION_ID = 'transcript.audio.gemini'; const TRANSCRIPT_ACTION_VERSION = 'v1'; const TRANSCRIPT_STRATEGY = 'gemini'; +export function transcriptTaskRoom(workspaceId: string, taskId: string) { + return `copilot:transcript:${workspaceId}:${taskId}`; +} + export type TranscriptionJob = { id: string; status: AiJobStatus; @@ -62,7 +67,8 @@ export class CopilotTranscriptionService { private readonly tasks: TaskPolicy, private readonly prompts: PromptService, private readonly actionBridge: ActionRuntimeBridge, - @Optional() private readonly access?: CopilotAccessPolicy + @Optional() private readonly access?: CopilotAccessPolicy, + @Optional() private readonly realtime?: RealtimePublisher ) {} private parseTaskPayload(payload: unknown): TranscriptionPayloadV2 { @@ -252,6 +258,7 @@ export class CopilotTranscriptionService { modelId: model, }); await this.models.copilotTranscriptTask.markRunning(task.id); + this.publishTaskChanged(workspaceId, task.id, AiJobStatus.running); return { id: task.id, status: AiJobStatus.running, infos }; } @@ -294,6 +301,7 @@ export class CopilotTranscriptionService { retryOf: task.actionRunId ?? undefined, }); await this.models.copilotTranscriptTask.markRunning(taskId); + this.publishTaskChanged(workspaceId, taskId, AiJobStatus.running); return { id: taskId, status: AiJobStatus.running, @@ -389,6 +397,11 @@ export class CopilotTranscriptionService { }, onRunCreated: async ({ runId }) => { await this.models.copilotTranscriptTask.markRunning(taskId, runId); + this.publishTaskChanged( + task.workspaceId, + taskId, + AiJobStatus.running + ); }, prepareStructuredRoutes: { stepId: 'transcribe', @@ -425,6 +438,7 @@ export class CopilotTranscriptionService { protectedResult: parsedResult, errorCode: null, }); + this.publishTaskChanged(task.workspaceId, taskId, AiJobStatus.finished); } catch (error) { await this.models.copilotTranscriptTask.complete(taskId, { status: 'failed', @@ -434,7 +448,27 @@ export class CopilotTranscriptionService { errorCode: error instanceof Error ? error.message : 'transcript_task_failed', }); + this.publishTaskChanged( + task.workspaceId, + taskId, + AiJobStatus.failed, + error instanceof Error ? error.message : 'transcript_task_failed' + ); throw error; } } + + private publishTaskChanged( + workspaceId: string, + taskId: string, + status: AiJobStatus, + error?: string + ) { + this.realtime?.publish( + 'copilot.transcript.task.changed', + { workspaceId, taskId }, + { taskId, status, error }, + { room: transcriptTaskRoom(workspaceId, taskId) } + ); + } } diff --git a/packages/backend/server/src/schema.gql b/packages/backend/server/src/schema.gql index 879cb3a73c..ac1e4e47c1 100644 --- a/packages/backend/server/src/schema.gql +++ b/packages/backend/server/src/schema.gql @@ -519,7 +519,7 @@ type Copilot { """Get the session list in the workspace""" sessions(docId: String, options: QueryChatSessionsInput): [CopilotSessionType!]! @deprecated(reason: "use `chats` instead") - transcriptTask(blobId: String, taskId: String): TranscriptionResultType + transcriptTask(blobId: String, taskId: String): TranscriptionResultType @deprecated(reason: "Use realtime subscription \"copilot.transcript.task.changed\" instead.") workspaceId: ID } @@ -1975,7 +1975,7 @@ type Query { publicUserById(id: String!): PublicUserType """query workspace embedding status""" - queryWorkspaceEmbeddingStatus(workspaceId: String!): ContextWorkspaceEmbeddingStatus! + queryWorkspaceEmbeddingStatus(workspaceId: String!): ContextWorkspaceEmbeddingStatus! @deprecated(reason: "Use realtime subscription \"workspace.embedding.progress.changed\" instead.") revealedAccessTokens: [RevealedAccessToken!]! @deprecated(reason: "use currentUser.revealedAccessTokens") """server config""" @@ -2668,7 +2668,7 @@ type UserType { name: String! """Get user notification count""" - notificationCount: Int! + notificationCount: Int! @deprecated(reason: "Use realtime subscription \"notification.count.changed\" instead.") """Get current user notifications""" notifications(pagination: PaginationInput!): PaginatedNotificationObjectType! diff --git a/packages/backend/server/tsconfig.json b/packages/backend/server/tsconfig.json index 7c1c612283..79993fb638 100644 --- a/packages/backend/server/tsconfig.json +++ b/packages/backend/server/tsconfig.json @@ -16,6 +16,7 @@ { "path": "../native" }, { "path": "../../../tools/cli" }, { "path": "../../../tools/utils" }, - { "path": "../../common/graphql" } + { "path": "../../common/graphql" }, + { "path": "../../common/realtime" } ] } diff --git a/packages/common/graphql/src/graphql/index.ts b/packages/common/graphql/src/graphql/index.ts index 9f49ded6a5..46f99badb6 100644 --- a/packages/common/graphql/src/graphql/index.ts +++ b/packages/common/graphql/src/graphql/index.ts @@ -1296,6 +1296,7 @@ export const getWorkspaceEmbeddingStatusQuery = { embedded } }`, + deprecations: ["'queryWorkspaceEmbeddingStatus' is deprecated: Use realtime subscription \"workspace.embedding.progress.changed\" instead."], }; export const queueWorkspaceEmbeddingMutation = { @@ -1621,6 +1622,7 @@ export const getTranscriptTaskQuery = { } } }`, + deprecations: ["'transcriptTask' is deprecated: Use realtime subscription \"copilot.transcript.task.changed\" instead."], }; export const retryTranscriptTaskMutation = { @@ -2646,11 +2648,10 @@ export const notificationCountQuery = { op: 'notificationCount', query: `query notificationCount { currentUser { - notifications(pagination: {first: 1}) { - totalCount - } + notificationCount } }`, + deprecations: ["'notificationCount' is deprecated: Use realtime subscription \"notification.count.changed\" instead."], }; export const pricesQuery = { diff --git a/packages/common/graphql/src/graphql/notification-count.gql b/packages/common/graphql/src/graphql/notification-count.gql index b5c123fa08..1d36b833c6 100644 --- a/packages/common/graphql/src/graphql/notification-count.gql +++ b/packages/common/graphql/src/graphql/notification-count.gql @@ -1,7 +1,5 @@ query notificationCount { currentUser { - notifications(pagination: { first: 1 }) { - totalCount - } + notificationCount } } diff --git a/packages/common/graphql/src/schema.ts b/packages/common/graphql/src/schema.ts index 41bfc8fa09..a0d9762b10 100644 --- a/packages/common/graphql/src/schema.ts +++ b/packages/common/graphql/src/schema.ts @@ -580,6 +580,7 @@ export interface Copilot { * @deprecated use `chats` instead */ sessions: Array; + /** @deprecated Use realtime subscription "copilot.transcript.task.changed" instead. */ transcriptTask: Maybe; workspaceId: Maybe; } @@ -2635,7 +2636,10 @@ export interface Query { prices: Array; /** Get public user by id */ publicUserById: Maybe; - /** query workspace embedding status */ + /** + * query workspace embedding status + * @deprecated Use realtime subscription "workspace.embedding.progress.changed" instead. + */ queryWorkspaceEmbeddingStatus: ContextWorkspaceEmbeddingStatus; /** @deprecated use currentUser.revealedAccessTokens */ revealedAccessTokens: Array; @@ -3390,7 +3394,10 @@ export interface UserType { invoices: Array; /** User name */ name: Scalars['String']['output']; - /** Get user notification count */ + /** + * Get user notification count + * @deprecated Use realtime subscription "notification.count.changed" instead. + */ notificationCount: Scalars['Int']['output']; /** Get current user notifications */ notifications: PaginatedNotificationObjectType; @@ -7263,13 +7270,7 @@ export type NotificationCountQueryVariables = Exact<{ [key: string]: never }>; export type NotificationCountQuery = { __typename?: 'Query'; - currentUser: { - __typename?: 'UserType'; - notifications: { - __typename?: 'PaginatedNotificationObjectType'; - totalCount: number; - }; - } | null; + currentUser: { __typename?: 'UserType'; notificationCount: number } | null; }; export type PricesQueryVariables = Exact<{ [key: string]: never }>; diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index 314595a6d9..a5b7915bf9 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -19,6 +19,7 @@ }, "dependencies": { "@affine/reader": "workspace:*", + "@affine/realtime": "workspace:*", "@toeverything/infra": "workspace:*", "eventemitter2": "^6.4.9", "graphemer": "^1.4.0", @@ -32,6 +33,7 @@ "devDependencies": { "@affine/error": "workspace:*", "@affine/graphql": "workspace:*", + "@affine/realtime": "workspace:*", "@blocksuite/affine": "workspace:*", "fake-indexeddb": "^6.0.0", "idb": "^8.0.0", @@ -41,6 +43,7 @@ "peerDependencies": { "@affine/error": "workspace:*", "@affine/graphql": "workspace:*", + "@affine/realtime": "workspace:*", "@blocksuite/affine": "workspace:*", "idb": "^8.0.0", "socket.io-client": "^4.8.3" diff --git a/packages/common/nbstore/src/impls/cloud/socket.ts b/packages/common/nbstore/src/impls/cloud/socket.ts index 122361eb2d..9bef8945e7 100644 --- a/packages/common/nbstore/src/impls/cloud/socket.ts +++ b/packages/common/nbstore/src/impls/cloud/socket.ts @@ -1,3 +1,9 @@ +import { + type RealtimeEvent, + type RealtimeRequestEnvelope, + type RealtimeSubscribeEnvelope, + type RealtimeUnsubscribeEnvelope, +} from '@affine/realtime'; import { Manager as SocketIOManager, type Socket as SocketIO, @@ -52,6 +58,8 @@ interface ServerEvents { docId: string; awarenessUpdate: string; }; + + 'realtime:event': RealtimeEvent; } interface ClientEvents { @@ -116,6 +124,10 @@ interface ClientEvents { 'space:delete-doc': { spaceType: string; spaceId: string; docId: string }; 'telemetry:batch': [TelemetryBatch, TelemetryAck]; + + 'realtime:request': [RealtimeRequestEnvelope, unknown]; + 'realtime:subscribe': [RealtimeSubscribeEnvelope, { subscriptionId: string }]; + 'realtime:unsubscribe': [RealtimeUnsubscribeEnvelope, { ok: true }]; } export type ServerEventsMap = { @@ -227,10 +239,11 @@ class SocketManager { const SOCKET_MANAGER_CACHE = new Map(); function getSocketManager(endpoint: string, isSelfHosted: boolean) { - let manager = SOCKET_MANAGER_CACHE.get(endpoint); + const key = `${endpoint}:${isSelfHosted ? 'selfhosted' : 'cloud'}`; + let manager = SOCKET_MANAGER_CACHE.get(key); if (!manager) { manager = new SocketManager(endpoint, isSelfHosted); - SOCKET_MANAGER_CACHE.set(endpoint, manager); + SOCKET_MANAGER_CACHE.set(key, manager); } return manager; } diff --git a/packages/common/nbstore/src/realtime/__tests__/manager.spec.ts b/packages/common/nbstore/src/realtime/__tests__/manager.spec.ts new file mode 100644 index 0000000000..81074b8fdb --- /dev/null +++ b/packages/common/nbstore/src/realtime/__tests__/manager.spec.ts @@ -0,0 +1,296 @@ +import type { RealtimeEvent } from '@affine/realtime'; +import { beforeEach, expect, test, vi } from 'vitest'; + +import { RealtimeManager, stableStringify } from '../manager'; + +type Handler = (payload?: unknown) => void; + +class FakeSocket { + readonly handlers = new Map(); + readonly emitted: Array<{ event: string; payload: unknown }> = []; + connected = true; + disconnected = false; + nextRequestAck: unknown = { data: { count: 1 } }; + subscribeAcks: unknown[] = []; + nextSubscriptionId = 0; + + on(event: string, handler: Handler) { + this.handlers.set(event, handler); + } + + off(event: string) { + this.handlers.delete(event); + } + + async emitWithAck(event: string, payload: unknown) { + this.emitted.push({ event, payload }); + if (event === 'realtime:subscribe') { + const ack = this.subscribeAcks.shift(); + if (ack) return ack; + this.nextSubscriptionId += 1; + return { data: { subscriptionId: `sub-${this.nextSubscriptionId}` } }; + } + if (event === 'realtime:request') { + return this.nextRequestAck; + } + return { data: {} }; + } + + emit(event: string, payload?: unknown) { + this.handlers.get(event)?.(payload); + } +} + +const socket = new FakeSocket(); + +vi.mock('../../impls/cloud/socket', () => ({ + SocketConnection: class { + readonly inner = { socket }; + status = 'connected'; + readonly maybeConnection = { socket }; + + connect() {} + + async waitForConnected() {} + + disconnect() { + socket.disconnected = true; + } + }, +})); + +beforeEach(() => { + vi.stubGlobal('BUILD_CONFIG', { appVersion: 'test' }); + socket.handlers.clear(); + socket.emitted.length = 0; + socket.nextRequestAck = { data: { count: 1 } }; + socket.subscribeAcks = []; + socket.nextSubscriptionId = 0; + socket.connected = true; + socket.disconnected = false; +}); + +test('stableStringify is deterministic for realtime subscription inputs', () => { + expect(stableStringify({ workspaceId: 'space', docId: 'doc' })).toBe( + stableStringify({ docId: 'doc', workspaceId: 'space' }) + ); +}); + +test('stableStringify follows JSON semantics for edge values', () => { + expect(stableStringify({ a: undefined })).toBe(stableStringify({})); + expect(stableStringify([undefined])).toBe('[null]'); + expect(stableStringify(new Date('2026-01-02T03:04:05.000Z'))).toBe( + '"2026-01-02T03:04:05.000Z"' + ); +}); + +test('request sends generic realtime request with client version', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + + await expect(manager.request('notification.count.get', {})).resolves.toEqual({ + count: 1, + }); + + expect(socket.emitted).toEqual([ + { + event: 'realtime:request', + payload: { + op: 'notification.count.get', + input: {}, + clientVersion: 'test', + }, + }, + ]); +}); + +test('request rejects server ack error', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + socket.nextRequestAck = { + error: { name: 'Forbidden', message: 'No access' }, + }; + + await expect(manager.request('notification.count.get', {})).rejects.toThrow( + 'No access' + ); +}); + +test('request rejects when aborted', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + const controller = new AbortController(); + socket.nextRequestAck = new Promise(() => {}); + const request = manager.request( + 'notification.count.get', + {}, + { signal: controller.signal } + ); + + controller.abort(); + + await expect(request).rejects.toThrow('Realtime request aborted'); +}); + +test('subscribe routes events by topic and stable input key', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + const received: unknown[] = []; + const subscription = manager + .subscribe('comment.changed', { workspaceId: 'space', docId: 'doc' }) + .subscribe(event => received.push(event)); + await vi.waitFor(() => expect(received).toEqual([{ type: 'ready' }])); + + socket.emit('realtime:event', { + topic: 'comment.changed', + inputKey: stableStringify({ workspaceId: 'space', docId: 'other' }), + sentAt: 1, + event: { changed: true }, + } satisfies RealtimeEvent); + socket.emit('realtime:event', { + topic: 'comment.changed', + inputKey: stableStringify({ workspaceId: 'space', docId: 'doc' }), + sentAt: 2, + event: { changed: true }, + } satisfies RealtimeEvent); + + expect(received).toEqual([{ type: 'ready' }, { changed: true }]); + subscription.unsubscribe(); +}); + +test('unsubscribe leaves server room and clears status', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + const subscription = manager + .subscribe('notification.count.changed', {}) + .subscribe(); + await vi.waitFor(() => expect(manager.getStatus().subscriptions).toBe(1)); + + subscription.unsubscribe(); + + expect(manager.getStatus()).toMatchObject({ + connected: true, + subscriptions: 0, + }); + expect(socket.emitted.at(-1)).toEqual({ + event: 'realtime:unsubscribe', + payload: { + subscriptionId: 'sub-1', + topic: 'notification.count.changed', + input: {}, + clientVersion: 'test', + }, + }); +}); + +test('context switch disconnects socket and completes subscriptions', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + const completed = vi.fn(); + manager + .subscribe('notification.count.changed', {}) + .subscribe({ complete: completed }); + await vi.waitFor(() => expect(manager.getStatus().subscriptions).toBe(1)); + + manager.setContext({ + endpoint: 'http://other-server', + isSelfHosted: false, + authenticated: true, + }); + + expect(socket.disconnected).toBe(true); + expect(completed).toHaveBeenCalled(); + expect(manager.getStatus()).toMatchObject({ + endpoint: 'http://other-server', + connected: false, + subscriptions: 0, + }); +}); + +test('subscribe registers server room again after reconnect', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + const received: unknown[] = []; + const subscription = manager + .subscribe('notification.count.changed', {}) + .subscribe(event => received.push(event)); + await vi.waitFor(() => expect(received).toEqual([{ type: 'ready' }])); + + socket.emit('connect'); + + await vi.waitFor(() => + expect( + socket.emitted.filter(item => item.event === 'realtime:subscribe') + ).toHaveLength(2) + ); + expect(received).toEqual([{ type: 'ready' }, { type: 'ready' }]); + subscription.unsubscribe(); +}); + +test('failed reconnect only errors the affected subscription', async () => { + const manager = new RealtimeManager(); + manager.setContext({ + endpoint: 'http://server', + isSelfHosted: false, + authenticated: true, + }); + const first: unknown[] = []; + const firstErrors: unknown[] = []; + const second: unknown[] = []; + const secondErrors: unknown[] = []; + const firstSubscription = manager + .subscribe('notification.count.changed', {}) + .subscribe({ + next: event => first.push(event), + error: error => firstErrors.push(error), + }); + const secondSubscription = manager + .subscribe('comment.changed', { workspaceId: 'space', docId: 'doc' }) + .subscribe({ + next: event => second.push(event), + error: error => secondErrors.push(error), + }); + await vi.waitFor(() => expect(manager.getStatus().subscriptions).toBe(2)); + + socket.subscribeAcks = [ + { data: { subscriptionId: 'resub-1' } }, + { error: { name: 'Forbidden', message: 'No access' } }, + ]; + socket.emit('connect'); + + await vi.waitFor(() => expect(first).toHaveLength(2)); + await vi.waitFor(() => expect(secondErrors).toHaveLength(1)); + expect(firstErrors).toEqual([]); + expect(manager.getStatus().subscriptions).toBe(1); + + firstSubscription.unsubscribe(); + secondSubscription.unsubscribe(); +}); diff --git a/packages/common/nbstore/src/realtime/index.ts b/packages/common/nbstore/src/realtime/index.ts new file mode 100644 index 0000000000..12a9bc210c --- /dev/null +++ b/packages/common/nbstore/src/realtime/index.ts @@ -0,0 +1 @@ +export { RealtimeManager, stableStringify } from './manager'; diff --git a/packages/common/nbstore/src/realtime/manager.ts b/packages/common/nbstore/src/realtime/manager.ts new file mode 100644 index 0000000000..b706a4c29a --- /dev/null +++ b/packages/common/nbstore/src/realtime/manager.ts @@ -0,0 +1,336 @@ +import type { + RealtimeConfigureInput, + RealtimeEvent, + RealtimeRequestInputOf, + RealtimeRequestName, + RealtimeRequestOutputOf, + RealtimeStatus, + RealtimeSubscriptionReady, + RealtimeTopicEventOf, + RealtimeTopicInputOf, + RealtimeTopicName, +} from '@affine/realtime'; +import { Observable, Subject } from 'rxjs'; + +import { SocketConnection } from '../impls/cloud/socket'; + +const DEFAULT_REQUEST_TIMEOUT = 10_000; + +type RealtimeContext = RealtimeConfigureInput; + +function normalizeError(error: unknown) { + if (error instanceof Error) { + return { name: error.name, message: error.message }; + } + return { name: 'RealtimeError', message: String(error) }; +} + +function rejectAck(error: { name?: string; message?: string; code?: string }) { + const err = new Error(error.message ?? 'Realtime request failed'); + err.name = error.name ?? 'RealtimeError'; + return err; +} + +export class RealtimeManager { + private context?: RealtimeContext; + private socketConnection?: SocketConnection; + private socketKey?: string; + private lastError?: { name: string; message: string }; + private readonly subscriptions = new Map< + string, + { + topic: RealtimeTopicName; + input: RealtimeTopicInputOf; + inputKey: string; + subject$: Subject; + } + >(); + + setContext(context: RealtimeContext) { + const nextContext = { ...context }; + const changed = + !this.context || + this.context.endpoint !== nextContext.endpoint || + this.context.isSelfHosted !== nextContext.isSelfHosted || + this.context.authenticated !== nextContext.authenticated; + + this.context = nextContext; + + if (changed) { + this.resetConnection(); + } + } + + async request( + op: Op, + input: RealtimeRequestInputOf, + options?: { timeoutMs?: number; signal?: AbortSignal } + ): Promise> { + const socket = await this.connect(); + const timeoutMs = options?.timeoutMs ?? DEFAULT_REQUEST_TIMEOUT; + let timeoutId: ReturnType | undefined; + let abortHandler: (() => void) | undefined; + const abort = () => { + const error = new Error(`Realtime request aborted: ${op}`); + error.name = 'AbortError'; + return error; + }; + if (options?.signal?.aborted) { + throw abort(); + } + const timeout = new Promise((_resolve, reject) => { + timeoutId = setTimeout(() => { + const error = new Error(`Realtime request timed out: ${op}`); + error.name = 'RealtimeRequestTimeout'; + reject(error); + }, timeoutMs); + timeoutId.unref?.(); + }); + const aborted = new Promise((_resolve, reject) => { + abortHandler = () => reject(abort()); + options?.signal?.addEventListener('abort', abortHandler, { once: true }); + }); + + const ack = await Promise.race([ + socket.emitWithAck('realtime:request', { + op, + input, + clientVersion: BUILD_CONFIG.appVersion, + }), + timeout, + aborted, + ]).finally(() => { + if (timeoutId) { + clearTimeout(timeoutId); + } + if (abortHandler) { + options?.signal?.removeEventListener('abort', abortHandler); + } + }); + + if ('error' in ack) { + throw rejectAck(ack.error); + } + + return ack.data as unknown as RealtimeRequestOutputOf; + } + + subscribe( + topic: Topic, + input: RealtimeTopicInputOf + ): Observable | RealtimeSubscriptionReady> { + return new Observable(subscriber => { + let subscriptionId: string | undefined; + let subject$: Subject; + let closed = false; + + const setup = async () => { + try { + const socket = await this.connect(); + const ack = await socket.emitWithAck('realtime:subscribe', { + topic, + input, + clientVersion: BUILD_CONFIG.appVersion, + }); + if ('error' in ack) { + throw rejectAck(ack.error); + } + const data = ack.data; + subscriptionId = data.subscriptionId; + if (closed) { + await socket.emitWithAck('realtime:unsubscribe', { + subscriptionId: data.subscriptionId, + topic, + input, + clientVersion: BUILD_CONFIG.appVersion, + }); + return; + } + + subject$ = new Subject(); + this.subscriptions.set(subscriptionId, { + topic, + input, + inputKey: stableStringify(input), + subject$, + }); + subscriber.next({ + type: 'ready', + }); + subject$.subscribe({ + next: event => { + if ('type' in event) { + subscriber.next(event); + } else { + subscriber.next(event.event as RealtimeTopicEventOf); + } + }, + error: error => subscriber.error(error), + complete: () => subscriber.complete(), + }); + } catch (error) { + this.lastError = normalizeError(error); + subscriber.error(error); + } + }; + + setup().catch(error => subscriber.error(error)); + + return () => { + closed = true; + if (!subscriptionId) { + return; + } + const currentSubscriptionId = subscriptionId; + this.subscriptions.delete(currentSubscriptionId); + subject$?.complete(); + if (this.socketConnection?.inner?.socket.connected) { + this.socketConnection.inner.socket + .emitWithAck('realtime:unsubscribe', { + subscriptionId: currentSubscriptionId, + topic, + input, + clientVersion: BUILD_CONFIG.appVersion, + }) + .catch(() => {}); + } + }; + }); + } + + getStatus(): RealtimeStatus { + return { + endpoint: this.context?.endpoint, + connected: this.socketConnection?.status === 'connected', + connecting: this.socketConnection?.status === 'connecting', + subscriptions: this.subscriptions.size, + lastError: this.lastError, + }; + } + + private async connect() { + if (!this.context?.endpoint || !this.context.authenticated) { + const error = new Error('Realtime is not authenticated'); + error.name = 'RealtimeUnauthenticated'; + throw error; + } + + const key = `${this.context.endpoint}:${this.context.isSelfHosted}`; + if (!this.socketConnection || this.socketKey !== key) { + this.resetConnection(); + this.socketKey = key; + this.socketConnection = new SocketConnection( + this.context.endpoint, + this.context.isSelfHosted + ); + this.socketConnection.connect(); + } + + await this.socketConnection.waitForConnected(); + this.socketConnection.inner.socket.off('realtime:event', this.handleEvent); + this.socketConnection.inner.socket.on('realtime:event', this.handleEvent); + this.socketConnection.inner.socket.off('connect', this.handleReconnect); + this.socketConnection.inner.socket.on('connect', this.handleReconnect); + return this.socketConnection.inner.socket; + } + + private readonly handleEvent = (event: RealtimeEvent) => { + for (const subscription of this.subscriptions.values()) { + if ( + subscription.topic === event.topic && + subscription.inputKey === event.inputKey + ) { + subscription.subject$.next(event); + } + } + }; + + private readonly handleReconnect = () => { + this.resubscribeAll().catch(error => { + this.lastError = normalizeError(error); + }); + }; + + private async resubscribeAll() { + const socket = this.socketConnection?.inner.socket; + if (!socket?.connected || this.subscriptions.size === 0) { + return; + } + + const subscriptions = Array.from(this.subscriptions.entries()); + for (const [subscriptionId, subscription] of subscriptions) { + try { + const ack = await socket.emitWithAck('realtime:subscribe', { + topic: subscription.topic, + input: subscription.input, + clientVersion: BUILD_CONFIG.appVersion, + }); + if ('error' in ack) { + throw rejectAck(ack.error); + } + + this.subscriptions.delete(subscriptionId); + this.subscriptions.set(ack.data.subscriptionId, subscription); + subscription.subject$.next({ + type: 'ready', + }); + } catch (error) { + this.lastError = normalizeError(error); + this.subscriptions.delete(subscriptionId); + subscription.subject$.error(error); + } + } + } + + private resetConnection() { + if (this.socketConnection) { + this.socketConnection.maybeConnection?.socket.off( + 'realtime:event', + this.handleEvent + ); + this.socketConnection.maybeConnection?.socket.off( + 'connect', + this.handleReconnect + ); + this.socketConnection.disconnect(true); + } + this.socketConnection = undefined; + this.socketKey = undefined; + for (const subscription of this.subscriptions.values()) { + subscription.subject$.complete(); + } + this.subscriptions.clear(); + } +} + +export function stableStringify(value: unknown): string { + if ( + value === undefined || + typeof value === 'function' || + typeof value === 'symbol' + ) { + return 'null'; + } + if (value === null || typeof value !== 'object') { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return `[${value.map(stableStringify).join(',')}]`; + } + if (value instanceof Date) { + return JSON.stringify(value.toJSON()); + } + const record = value as Record; + return `{${Object.keys(record) + .filter(key => { + const property = record[key]; + return ( + property !== undefined && + typeof property !== 'function' && + typeof property !== 'symbol' + ); + }) + .sort() + .map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`) + .join(',')}}`; +} diff --git a/packages/common/nbstore/src/worker/client.ts b/packages/common/nbstore/src/worker/client.ts index 28545125ee..14c300cacd 100644 --- a/packages/common/nbstore/src/worker/client.ts +++ b/packages/common/nbstore/src/worker/client.ts @@ -1,3 +1,14 @@ +import type { + RealtimeConfigureInput, + RealtimeRequestInputOf, + RealtimeRequestName, + RealtimeRequestOutputOf, + RealtimeStatus, + RealtimeSubscriptionReady, + RealtimeTopicEventOf, + RealtimeTopicInputOf, + RealtimeTopicName, +} from '@affine/realtime'; import { OpClient, transfer } from '@toeverything/infra/op'; import type { Observable } from 'rxjs'; import { v4 as uuid } from 'uuid'; @@ -38,6 +49,30 @@ import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops'; export type { StoreInitOptions as WorkerInitOptions } from './ops'; +type RealtimeWorkerClient = { + call( + name: 'realtime.request', + payload: { + op: Op; + input: RealtimeRequestInputOf; + timeoutMs?: number; + } + ): Promise>; + ob$( + name: 'realtime.subscribe', + payload: { + topic: Topic; + input: RealtimeTopicInputOf; + } + ): Observable | RealtimeSubscriptionReady>; +}; + +function realtimeAbortError(op: RealtimeRequestName) { + const error = new Error(`Realtime request aborted: ${op}`); + error.name = 'AbortError'; + return error; +} + export class StoreManagerClient { private readonly connections = new Map< string, @@ -49,9 +84,11 @@ export class StoreManagerClient { constructor(private readonly client: OpClient) { this.telemetry = new TelemetryClient(this.client); + this.realtime = new RealtimeClient(this.client); } readonly telemetry: TelemetryClient; + readonly realtime: RealtimeClient; open(key: string, options: StoreInitOptions) { const { port1, port2 } = new MessageChannel(); @@ -138,6 +175,62 @@ class TelemetryClient { } } +export class RealtimeClient { + constructor(private readonly client: OpClient) {} + + configure(context: RealtimeConfigureInput): Promise { + return this.client.call('realtime.configure', context); + } + + request( + op: Op, + input: RealtimeRequestInputOf, + options?: { timeoutMs?: number; signal?: AbortSignal } + ): Promise> { + const request = (this.client as unknown as RealtimeWorkerClient).call( + 'realtime.request', + { + op, + input, + timeoutMs: options?.timeoutMs, + } + ); + if (!options?.signal) { + return request; + } + if (options.signal.aborted) { + return Promise.reject(realtimeAbortError(op)); + } + let abortHandler: (() => void) | undefined; + const aborted = new Promise((_resolve, reject) => { + abortHandler = () => reject(realtimeAbortError(op)); + options.signal?.addEventListener('abort', abortHandler, { once: true }); + }); + return Promise.race([request, aborted]).finally(() => { + if (abortHandler) { + options.signal?.removeEventListener('abort', abortHandler); + } + }); + } + + subscribe( + topic: Topic, + input: RealtimeTopicInputOf + ): Observable | RealtimeSubscriptionReady> { + return (this.client as unknown as RealtimeWorkerClient).ob$( + 'realtime.subscribe', + { + topic, + input, + } + ); + } + + status(): Promise { + return this.client.call('realtime.status'); + } +} + export class StoreClient { constructor(private readonly client: OpClient) { this.docStorage = new WorkerDocStorage(this.client); diff --git a/packages/common/nbstore/src/worker/consumer.ts b/packages/common/nbstore/src/worker/consumer.ts index 4153ddd04d..f98bf9cd16 100644 --- a/packages/common/nbstore/src/worker/consumer.ts +++ b/packages/common/nbstore/src/worker/consumer.ts @@ -2,6 +2,7 @@ import { OpConsumer } from '@toeverything/infra/op'; import { Observable } from 'rxjs'; import { type StorageConstructor } from '../impls'; +import { RealtimeManager } from '../realtime'; import { SpaceStorage } from '../storage'; import type { AwarenessRecord } from '../storage/awareness'; import { Sync } from '../sync'; @@ -340,6 +341,7 @@ export class StoreManagerConsumer { { store: StoreConsumer; refCount: number } >(); private readonly telemetry = new TelemetryManager(); + private readonly realtime = new RealtimeManager(); constructor( private readonly availableStorageImplementations: StorageConstructor[] @@ -393,6 +395,12 @@ export class StoreManagerConsumer { 'telemetry.pageview': event => this.telemetry.pageview(event), 'telemetry.flush': () => this.telemetry.flush(), 'telemetry.getQueueState': () => this.telemetry.getQueueState(), + 'realtime.configure': context => this.realtime.setContext(context), + 'realtime.request': ({ op, input, timeoutMs }) => + this.realtime.request(op, input, { timeoutMs }), + 'realtime.subscribe': ({ topic, input }) => + this.realtime.subscribe(topic, input), + 'realtime.status': () => this.realtime.getStatus(), }); } } diff --git a/packages/common/nbstore/src/worker/ops.ts b/packages/common/nbstore/src/worker/ops.ts index 13ace5ea32..3fca67479e 100644 --- a/packages/common/nbstore/src/worker/ops.ts +++ b/packages/common/nbstore/src/worker/ops.ts @@ -1,3 +1,15 @@ +import type { + RealtimeConfigureInput, + RealtimeRequestInputOf, + RealtimeRequestName, + RealtimeRequestOutputOf, + RealtimeStatus, + RealtimeSubscriptionReady, + RealtimeTopicEventOf, + RealtimeTopicInputOf, + RealtimeTopicName, +} from '@affine/realtime'; + import type { AvailableStorageImplementations } from '../impls'; import type { AggregateResult, @@ -189,4 +201,25 @@ export type WorkerManagerOps = { 'telemetry.pageview': [TelemetryEvent, { queued: boolean }]; 'telemetry.flush': [void, TelemetryAck]; 'telemetry.getQueueState': [void, TelemetryQueueState]; + 'realtime.configure': [RealtimeConfigureInput, void]; + 'realtime.request': [ + { + [Op in RealtimeRequestName]: { + op: Op; + input: RealtimeRequestInputOf; + timeoutMs?: number; + }; + }[RealtimeRequestName], + RealtimeRequestOutputOf, + ]; + 'realtime.subscribe': [ + { + [Topic in RealtimeTopicName]: { + topic: Topic; + input: RealtimeTopicInputOf; + }; + }[RealtimeTopicName], + RealtimeTopicEventOf | RealtimeSubscriptionReady, + ]; + 'realtime.status': [void, RealtimeStatus]; }; diff --git a/packages/common/nbstore/tsconfig.json b/packages/common/nbstore/tsconfig.json index aeba46152c..d8dea1fa58 100644 --- a/packages/common/nbstore/tsconfig.json +++ b/packages/common/nbstore/tsconfig.json @@ -9,6 +9,7 @@ }, "references": [ { "path": "../reader" }, + { "path": "../realtime" }, { "path": "../infra" }, { "path": "../error" }, { "path": "../graphql" }, diff --git a/packages/common/realtime/package.json b/packages/common/realtime/package.json new file mode 100644 index 0000000000..d142c763d5 --- /dev/null +++ b/packages/common/realtime/package.json @@ -0,0 +1,13 @@ +{ + "name": "@affine/realtime", + "type": "module", + "version": "0.26.3", + "private": true, + "sideEffects": false, + "exports": { + ".": "./src/index.ts" + }, + "dependencies": { + "@affine/graphql": "workspace:*" + } +} diff --git a/packages/common/realtime/src/index.ts b/packages/common/realtime/src/index.ts new file mode 100644 index 0000000000..0b19e4421a --- /dev/null +++ b/packages/common/realtime/src/index.ts @@ -0,0 +1,227 @@ +import type { CommentChangeObjectType } from '@affine/graphql'; + +export type RealtimeRequestName = keyof RealtimeRequestMap; +export type RealtimeTopicName = keyof RealtimeTopicMap; + +export interface RealtimeRequestMap { + 'notification.count.get': { + input: Record; + output: { count: number }; + }; + 'comment.changes.get': { + input: { + workspaceId: string; + docId: string; + after?: string; + first?: number; + }; + output: { + changes: CommentChangeObjectType[]; + startCursor: string; + endCursor: string; + hasNextPage: boolean; + }; + }; + 'workspace.embedding.progress.get': { + input: { workspaceId: string }; + output: { total: number; embedded: number }; + }; + 'copilot.transcript.task.get': { + input: { + workspaceId: string; + blobId?: string; + taskId?: string; + }; + output: { task: unknown | null }; + }; +} + +export type NotificationCountChangedReason = + | 'created' + | 'read' + | 'read-all' + | 'expired-cleanup' + | 'resync'; + +export type WorkspaceEmbeddingProgressReason = + | 'queued' + | 'progress' + | 'finished' + | 'failed' + | 'resync'; + +export interface RealtimeTopicMap { + 'notification.count.changed': { + input: Record; + event: { + count: number; + reason: NotificationCountChangedReason; + }; + }; + 'comment.changed': { + input: { + workspaceId: string; + docId: string; + }; + event: { + changed: true; + cursor?: string; + }; + }; + 'workspace.embedding.progress.changed': { + input: { workspaceId: string }; + event: { + total?: number; + embedded?: number; + reason: WorkspaceEmbeddingProgressReason; + }; + }; + 'copilot.transcript.task.changed': { + input: { + workspaceId: string; + taskId: string; + }; + event: { + taskId: string; + status: string; + error?: string; + }; + }; +} + +export type RealtimeRequestInputOf = + RealtimeRequestMap[Op]['input']; +export type RealtimeRequestOutputOf = + RealtimeRequestMap[Op]['output']; +export type RealtimeTopicInputOf = + RealtimeTopicMap[Topic]['input']; +export type RealtimeTopicEventOf = + RealtimeTopicMap[Topic]['event']; + +export type RealtimeError = { + name: string; + message: string; + code?: string; +}; + +export type RealtimeAck = { data: T } | { error: RealtimeError }; + +export type RealtimeRequestId = string; +export type RealtimeSubscriptionId = string; + +export type RealtimeRequestEnvelope< + Op extends RealtimeRequestName = RealtimeRequestName, +> = { + requestId?: RealtimeRequestId; + op: Op; + input: RealtimeRequestInputOf; + clientVersion?: string; +}; + +export type RealtimeRequestInput< + Op extends RealtimeRequestName = RealtimeRequestName, +> = Op extends RealtimeRequestName + ? { + op: Op; + input: RealtimeRequestInputOf; + timeoutMs?: number; + } + : never; + +export type RealtimeRequestOutput< + Op extends RealtimeRequestName = RealtimeRequestName, +> = RealtimeRequestOutputOf; + +export type RealtimeSubscribeEnvelope< + Topic extends RealtimeTopicName = RealtimeTopicName, +> = { + subscriptionId?: RealtimeSubscriptionId; + topic: Topic; + input: RealtimeTopicInputOf; + clientVersion?: string; +}; + +export type RealtimeSubscribeInput< + Topic extends RealtimeTopicName = RealtimeTopicName, +> = Topic extends RealtimeTopicName + ? { + topic: Topic; + input: RealtimeTopicInputOf; + } + : never; + +export type RealtimeUnsubscribeEnvelope = { + subscriptionId?: RealtimeSubscriptionId; + topic: RealtimeTopicName; + input: RealtimeTopicInputOf; + clientVersion?: string; +}; + +export type RealtimeReadyEvent = { + type: 'ready'; + snapshot?: unknown; +}; + +export type RealtimeSubscriptionReady = RealtimeReadyEvent; + +export type RealtimeEvent = + Topic extends RealtimeTopicName + ? { + topic: Topic; + inputKey: string; + seq?: number; + sentAt: number; + event: RealtimeTopicEventOf; + } + : never; + +export type RealtimeTopicEvent< + Topic extends RealtimeTopicName = RealtimeTopicName, +> = RealtimeTopicEventOf | RealtimeReadyEvent; + +export type RealtimeStatus = { + endpoint?: string; + connected: boolean; + connecting: boolean; + subscriptions: number; + lastError?: RealtimeError; +}; + +export type RealtimeConfigureInput = { + endpoint: string; + isSelfHosted: boolean; + authenticated: boolean; + clientVersion?: string; +}; + +export function getRealtimeInputKey(input: unknown): string { + if ( + input === undefined || + typeof input === 'function' || + typeof input === 'symbol' + ) { + return 'null'; + } + if (input === null || typeof input !== 'object') { + return JSON.stringify(input); + } + if (Array.isArray(input)) { + return `[${input.map(getRealtimeInputKey).join(',')}]`; + } + if (input instanceof Date) { + return JSON.stringify(input.toJSON()); + } + const record = input as Record; + return `{${Object.keys(record) + .filter(key => { + const property = record[key]; + return ( + property !== undefined && + typeof property !== 'function' && + typeof property !== 'symbol' + ); + }) + .sort() + .map(key => `${JSON.stringify(key)}:${getRealtimeInputKey(record[key])}`) + .join(',')}}`; +} diff --git a/packages/common/realtime/tsconfig.json b/packages/common/realtime/tsconfig.json new file mode 100644 index 0000000000..be4498036d --- /dev/null +++ b/packages/common/realtime/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../../tsconfig.web.json", + "include": ["./src"], + "compilerOptions": { + "rootDir": "./src", + "outDir": "./dist", + "tsBuildInfoFile": "./dist/tsconfig.tsbuildinfo" + }, + "references": [{ "path": "../graphql" }] +} diff --git a/packages/frontend/apps/android/src/app.tsx b/packages/frontend/apps/android/src/app.tsx index b0f05cfd66..2d2ab9a605 100644 --- a/packages/frontend/apps/android/src/app.tsx +++ b/packages/frontend/apps/android/src/app.tsx @@ -76,6 +76,7 @@ configureLocalStorageStateStorageImpls(framework); configureBrowserWorkspaceFlavours(framework); configureMobileModules(framework); framework.impl(NbstoreProvider, { + realtime: storeManagerClient.realtime, openStore(key, options) { const { store, dispose } = storeManagerClient.open(key, options); return { diff --git a/packages/frontend/apps/electron-renderer/src/app/effects/store-manager.ts b/packages/frontend/apps/electron-renderer/src/app/effects/store-manager.ts index 81a6b529e6..f0ae9dcd1b 100644 --- a/packages/frontend/apps/electron-renderer/src/app/effects/store-manager.ts +++ b/packages/frontend/apps/electron-renderer/src/app/effects/store-manager.ts @@ -59,6 +59,7 @@ export function setupStoreManager(framework: Framework) { }); framework.impl(NbstoreProvider, { + realtime: storeManagerClient.realtime, openStore(key, options) { const { store, dispose } = storeManagerClient.open(key, options); diff --git a/packages/frontend/apps/ios/src/app.tsx b/packages/frontend/apps/ios/src/app.tsx index eabcb42953..a4c99a07bb 100644 --- a/packages/frontend/apps/ios/src/app.tsx +++ b/packages/frontend/apps/ios/src/app.tsx @@ -94,6 +94,7 @@ configureLocalStorageStateStorageImpls(framework); configureBrowserWorkspaceFlavours(framework); configureMobileModules(framework); framework.impl(NbstoreProvider, { + realtime: storeManagerClient.realtime, openStore(key, options) { const { store, dispose } = storeManagerClient.open(key, options); return { diff --git a/packages/frontend/apps/mobile/src/app.tsx b/packages/frontend/apps/mobile/src/app.tsx index 698e38d078..3be2724474 100644 --- a/packages/frontend/apps/mobile/src/app.tsx +++ b/packages/frontend/apps/mobile/src/app.tsx @@ -48,6 +48,7 @@ configureLocalStorageStateStorageImpls(framework); configureBrowserWorkspaceFlavours(framework); configureMobileModules(framework); framework.impl(NbstoreProvider, { + realtime: storeManagerClient.realtime, openStore(key, options) { const { store, dispose } = storeManagerClient.open(key, options); return { diff --git a/packages/frontend/apps/web/src/app.tsx b/packages/frontend/apps/web/src/app.tsx index 348dd4e9fd..6b91cb6a82 100644 --- a/packages/frontend/apps/web/src/app.tsx +++ b/packages/frontend/apps/web/src/app.tsx @@ -63,6 +63,7 @@ configureBrowserWorkbenchModule(framework); configureLocalStorageStateStorageImpls(framework); configureBrowserWorkspaceFlavours(framework); framework.impl(NbstoreProvider, { + realtime: storeManagerClient.realtime, openStore(key, options) { return storeManagerClient.open(key, options); }, diff --git a/packages/frontend/core/src/modules/cloud/index.ts b/packages/frontend/core/src/modules/cloud/index.ts index fb5c1736fb..d6bc7cc00c 100644 --- a/packages/frontend/core/src/modules/cloud/index.ts +++ b/packages/frontend/core/src/modules/cloud/index.ts @@ -19,6 +19,7 @@ export { InvitationService } from './services/invitation'; export { InvoicesService } from './services/invoices'; export type { PublicUserInfo } from './services/public-user'; export { PublicUserService } from './services/public-user'; +export { RealtimeService } from './services/realtime'; export { SelfhostGenerateLicenseService } from './services/selfhost-generate-license'; export { SelfhostLicenseService } from './services/selfhost-license'; export { ServerService } from './services/server'; @@ -41,6 +42,7 @@ import { type Framework } from '@toeverything/infra'; import { GlobalCache, GlobalState } from '../storage/providers/global'; import { GlobalStateService } from '../storage/services/global'; +import { GlobalContextService } from '../global-context'; import { UrlService } from '../url'; import { WorkspaceScope, WorkspaceService } from '../workspace'; import { CloudDocMeta } from './entities/cloud-doc-meta'; @@ -69,6 +71,7 @@ import { FetchService } from './services/fetch'; import { GraphQLService } from './services/graphql'; import { InvoicesService } from './services/invoices'; import { PublicUserService } from './services/public-user'; +import { RealtimeService } from './services/realtime'; import { SelfhostGenerateLicenseService } from './services/selfhost-generate-license'; import { SelfhostLicenseService } from './services/selfhost-license'; import { ServerService } from './services/server'; @@ -100,6 +103,7 @@ import { DocCreatedByService } from './services/doc-created-by'; import { DocUpdatedByService } from './services/doc-updated-by'; import { DocCreatedByUpdatedBySyncService } from './services/doc-created-by-updated-by-sync'; import { WorkspacePermissionService } from '../permissions'; +import { NbstoreService } from '../storage'; import { DocScope, DocService, DocsService } from '../doc'; import { DocCreatedByUpdatedBySyncStore } from './stores/doc-created-by-updated-by-sync'; import { GlobalDialogService } from '../dialogs'; @@ -111,6 +115,11 @@ export function configureCloudModule(framework: Framework) { framework .service(ServersService, [ServerListStore, ServerConfigStore]) + .service(RealtimeService, [ + GlobalContextService, + ServersService, + NbstoreService, + ]) .service(DefaultServerService, [ServersService]) .store(ServerListStore, [GlobalStateService]) .store(ServerConfigStore) diff --git a/packages/frontend/core/src/modules/cloud/services/realtime.ts b/packages/frontend/core/src/modules/cloud/services/realtime.ts new file mode 100644 index 0000000000..1850339bbc --- /dev/null +++ b/packages/frontend/core/src/modules/cloud/services/realtime.ts @@ -0,0 +1,54 @@ +import { shallowEqual } from '@affine/component'; +import { ServerDeploymentType } from '@affine/graphql'; +import { LiveData, OnEvent, Service } from '@toeverything/infra'; + +import type { GlobalContextService } from '../../global-context'; +import { ApplicationStarted } from '../../lifecycle'; +import type { NbstoreService } from '../../storage'; +import type { Server } from '../entities/server'; +import type { ServersService } from './servers'; + +@OnEvent(ApplicationStarted, service => service.onApplicationStarted) +export class RealtimeService extends Service { + private readonly currentServer$ = + this.globalContextService.globalContext.serverId.$.selector(id => + id + ? this.serversService.server$(id) + : new LiveData(undefined) + ) + .flat() + .selector( + server => + [ + server, + server?.account$, + server?.config$.selector( + c => c.type === ServerDeploymentType.Selfhosted + ), + ] as const + ) + .flat() + .map(([server, account, selfHosted]) => ({ + endpoint: server?.baseUrl ?? '', + authenticated: !!account, + isSelfHosted: !!selfHosted, + })) + .distinctUntilChanged(shallowEqual); + + constructor( + private readonly globalContextService: GlobalContextService, + private readonly serversService: ServersService, + private readonly nbstoreService: NbstoreService + ) { + super(); + + const subscription = this.currentServer$.subscribe(context => { + this.nbstoreService.realtime.configure(context).catch(error => { + console.error('Failed to configure realtime context', error); + }); + }); + this.disposables.push(() => subscription.unsubscribe()); + } + + onApplicationStarted() {} +} diff --git a/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts b/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts index da1a3dc36d..e133a0f6ed 100644 --- a/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts +++ b/packages/frontend/core/src/modules/comment/entities/doc-comment-store.ts @@ -4,7 +4,6 @@ import { deleteCommentMutation, deleteReplyMutation, type DocMode, - listCommentChangesQuery, type ListCommentsQuery, listCommentsQuery, resolveCommentMutation, @@ -13,9 +12,11 @@ import { uploadCommentAttachmentMutation, } from '@affine/graphql'; import { Entity } from '@toeverything/infra'; +import type { Observable } from 'rxjs'; import type { DefaultServerService, WorkspaceServerService } from '../../cloud'; import { GraphQLService } from '../../cloud/services/graphql'; +import type { NbstoreService } from '../../storage'; import type { WorkspaceService } from '../../workspace'; import type { DocComment, @@ -75,7 +76,8 @@ export class DocCommentStore extends Entity<{ constructor( private readonly workspaceService: WorkspaceService, private readonly workspaceServerService: WorkspaceServerService, - private readonly defaultServerService: DefaultServerService + private readonly defaultServerService: DefaultServerService, + private readonly nbstoreService: NbstoreService ) { super(); } @@ -132,51 +134,35 @@ export class DocCommentStore extends Entity<{ }; } - // pool every 30s async listCommentChanges({ after, }: { after?: string; }): Promise { - const graphql = this.graphqlService; - if (!graphql) { - throw new Error('GraphQL service not found'); - } - - const response = await graphql.gql({ - query: listCommentChangesQuery, - variables: { - pagination: { - after, - }, - workspaceId: this.currentWorkspaceId, - docId: this.props.docId, - }, - }); - - const commentChanges = response.workspace?.commentChanges; - if (!commentChanges) { - return { - changes: [], - startCursor: '', - endCursor: after ?? '', - hasNextPage: false, - }; - } - + const commentChanges = await this.nbstoreService.realtime.request( + 'comment.changes.get', + { after, workspaceId: this.currentWorkspaceId, docId: this.props.docId } + ); return { - changes: commentChanges.edges.map(edge => ({ - id: edge.node.id, - action: edge.node.action, - comment: normalizeComment(edge.node.item), - commentId: edge.node.commentId || undefined, + changes: commentChanges.changes.map((change: any) => ({ + id: change.id, + action: change.action, + comment: normalizeComment(change.item as GQLCommentType), + commentId: change.commentId, })), - startCursor: commentChanges.pageInfo.startCursor || '', - endCursor: commentChanges.pageInfo.endCursor || '', - hasNextPage: commentChanges.pageInfo.hasNextPage, + startCursor: commentChanges.startCursor, + endCursor: commentChanges.endCursor, + hasNextPage: commentChanges.hasNextPage, }; } + subscribeCommentChanged(): Observable { + return this.nbstoreService.realtime.subscribe('comment.changed', { + workspaceId: this.currentWorkspaceId, + docId: this.props.docId, + }); + } + async createComment(commentInput: { content: DocCommentContent; mentions?: string[]; diff --git a/packages/frontend/core/src/modules/comment/entities/doc-comment.ts b/packages/frontend/core/src/modules/comment/entities/doc-comment.ts index 1dc732dcd3..92fe6a3ce2 100644 --- a/packages/frontend/core/src/modules/comment/entities/doc-comment.ts +++ b/packages/frontend/core/src/modules/comment/entities/doc-comment.ts @@ -22,9 +22,9 @@ import { first, of, Subject, + type Subscription, switchMap, tap, - timer, } from 'rxjs'; import { type DocDisplayMetaService } from '../../doc-display-meta'; @@ -93,8 +93,11 @@ export class DocCommentEntity extends Entity<{ private readonly commentDeleted$ = new Subject(); readonly commentHighlighted$ = new LiveData(null); - private pollingDisposable?: DisposeCallback; + private realtimeDisposable?: DisposeCallback; + private realtimeSubscription?: Subscription; private startCursor?: string; + private fetchingCommentChanges = false; + private pendingCommentChangeFetch = false; async addComment( selections?: BaseSelection[], @@ -486,84 +489,53 @@ export class DocCommentEntity extends Entity<{ return () => subscription.unsubscribe(); } - // Start polling comments every 30s - // 1. when comments$ is empty, fetch all comments - // 2. when comments$ is not empty, fetch changes (using end cursor) - // 3. loop. when doc is not loaded, skip start(): void { - if (this.pollingDisposable) { - this.pollingDisposable(); + if (this.realtimeDisposable) { + this.realtimeDisposable(); } + this.realtimeSubscription?.unsubscribe(); - // Initial load this.revalidate(); this.revalidateCommentsInEditor(); - // Set up polling every 10 seconds - const polling$ = timer(10000, 10000).pipe( - switchMap(() => { - // If we have comments, fetch changes; otherwise fetch all - if (this.comments$.value.length > 0) { - return fromPromise(async () => { - const res = await this.store.listCommentChanges({ - after: this.startCursor, - }); - return res; - }).pipe( - tap(result => { - if (result) { - this.handleCommentChanges(result); - this.startCursor = result.endCursor; - } - }), - catchError(error => { - console.error('Failed to fetch comment changes:', error); - return of(null); - }) - ); - } else { - return fromPromise(async () => { - const allComments: DocComment[] = []; - let cursor = ''; - let firstResult: DocCommentListResult | null = null; - - // Fetch all pages of comments - while (true) { - const result = await this.store.listComments({ after: cursor }); - if (!firstResult) { - firstResult = result; - // Store the startCursor from the first page for future polling - this.startCursor = result.startCursor; - } - allComments.push(...result.comments); - cursor = result.endCursor; - if (!result.hasNextPage) { - break; - } - } - - // Update state with all comments - this.comments$.setValue(allComments); - - return allComments; - }).pipe( - tap(() => this.revalidateCommentsInEditor()), - catchError(error => { - console.error('Failed to fetch comments:', error); - return of(null); - }) - ); - } - }) - ); - - const subscription = polling$.subscribe(); - this.pollingDisposable = () => subscription.unsubscribe(); + this.realtimeSubscription = this.store.subscribeCommentChanged().subscribe({ + next: () => { + this.fetchCommentChanges().catch(() => {}); + }, + error: error => { + console.error('Failed to subscribe comment changes:', error); + }, + }); + this.realtimeDisposable = () => this.realtimeSubscription?.unsubscribe(); } stop(): void { - if (this.pollingDisposable) { - this.pollingDisposable(); + if (this.realtimeDisposable) { + this.realtimeDisposable(); + } + this.realtimeSubscription?.unsubscribe(); + } + + private async fetchCommentChanges() { + if (this.fetchingCommentChanges) { + this.pendingCommentChangeFetch = true; + return; + } + + this.fetchingCommentChanges = true; + try { + do { + this.pendingCommentChangeFetch = false; + const result = await this.store.listCommentChanges({ + after: this.startCursor, + }); + this.handleCommentChanges(result); + this.startCursor = result.endCursor; + } while (this.pendingCommentChangeFetch); + } catch (error) { + console.error('Failed to fetch comment changes:', error); + } finally { + this.fetchingCommentChanges = false; } } @@ -688,7 +660,7 @@ export class DocCommentEntity extends Entity<{ const result = await this.store.listComments({ after: cursor }); if (!firstResult) { firstResult = result; - // Store the startCursor from the first page for polling + // Store the startCursor from the first page for incremental changes this.startCursor = result.startCursor; } allComments.push(...result.comments); diff --git a/packages/frontend/core/src/modules/comment/index.ts b/packages/frontend/core/src/modules/comment/index.ts index 8c45467d8b..b29e9aaee8 100644 --- a/packages/frontend/core/src/modules/comment/index.ts +++ b/packages/frontend/core/src/modules/comment/index.ts @@ -2,6 +2,7 @@ import type { Framework } from '@toeverything/infra'; import { DefaultServerService, WorkspaceServerService } from '../cloud'; import { DocDisplayMetaService } from '../doc-display-meta'; +import { NbstoreService } from '../storage'; import { WorkbenchService } from '../workbench'; import { WorkspaceScope, WorkspaceService } from '../workspace'; import { DocCommentEntity } from './entities/doc-comment'; @@ -25,5 +26,6 @@ export function configureCommentModule(framework: Framework) { WorkspaceService, WorkspaceServerService, DefaultServerService, + NbstoreService, ]); } diff --git a/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.spec.ts b/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.spec.ts index ca007e52f4..f3fe365391 100644 --- a/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.spec.ts +++ b/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.spec.ts @@ -1,5 +1,4 @@ import { - getTranscriptTaskQuery, retryTranscriptTaskMutation, settleTranscriptTaskMutation, submitTranscriptTaskMutation, @@ -10,6 +9,7 @@ import { describe, expect, test, vi } from 'vitest'; import { DefaultServerService } from '../../cloud/services/default-server'; import { GraphQLService } from '../../cloud/services/graphql'; import { WorkspaceServerService } from '../../cloud/services/workspace-server'; +import { NbstoreService } from '../../storage'; import { WorkspaceService } from '../../workspace'; import { AudioTranscriptionJobStore } from './audio-transcription-job-store'; @@ -30,6 +30,10 @@ function createStore( get: (key: unknown) => (key === GraphQLService ? { gql } : null), }, }; + const realtime = { + request: vi.fn().mockResolvedValue({ task: { id: 'task-2' } }), + subscribe: vi.fn(), + }; framework .service(WorkspaceService, { workspace: { id: 'workspace-1' }, @@ -42,10 +46,14 @@ function createStore( .service(DefaultServerService, { server: null, } as unknown as DefaultServerService) + .service(NbstoreService, { + realtime, + } as unknown as NbstoreService) .entity(AudioTranscriptionJobStore, [ WorkspaceService, WorkspaceServerService, DefaultServerService, + NbstoreService, ]); return framework.provider().createEntity(AudioTranscriptionJobStore, { blobId: 'blob-1', @@ -60,13 +68,6 @@ describe('AudioTranscriptionJobStore transcript task API', () => { .fn() .mockResolvedValueOnce({ submitTranscriptTask: { id: 'task-1' } }) .mockResolvedValueOnce({ retryTranscriptTask: { id: 'task-2' } }) - .mockResolvedValueOnce({ - currentUser: { - copilot: { - transcriptTask: { id: 'task-2' }, - }, - }, - }) .mockResolvedValueOnce({ settleTranscriptTask: { id: 'task-2' } }); const store = createStore(gql, async () => ({ files: [file], @@ -102,17 +103,6 @@ describe('AudioTranscriptionJobStore transcript task API', () => { ); expect(gql).toHaveBeenNthCalledWith( 3, - expect.objectContaining({ - query: getTranscriptTaskQuery, - variables: { - workspaceId: 'workspace-1', - taskId: 'task-2', - blobId: 'blob-1', - }, - }) - ); - expect(gql).toHaveBeenNthCalledWith( - 4, expect.objectContaining({ query: settleTranscriptTaskMutation, variables: { diff --git a/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.ts b/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.ts index cef12cb4a5..09d898e58e 100644 --- a/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.ts +++ b/packages/frontend/core/src/modules/media/entities/audio-transcription-job-store.ts @@ -1,13 +1,14 @@ import { - getTranscriptTaskQuery, retryTranscriptTaskMutation, settleTranscriptTaskMutation, submitTranscriptTaskMutation, + type TranscriptionResultType, } from '@affine/graphql'; import { Entity } from '@toeverything/infra'; import type { DefaultServerService, WorkspaceServerService } from '../../cloud'; import { GraphQLService } from '../../cloud/services/graphql'; +import type { NbstoreService } from '../../storage'; import type { WorkspaceService } from '../../workspace'; export class AudioTranscriptionJobStore extends Entity<{ @@ -20,7 +21,8 @@ export class AudioTranscriptionJobStore extends Entity<{ constructor( private readonly workspaceService: WorkspaceService, private readonly workspaceServerService: WorkspaceServerService, - private readonly defaultServerService: DefaultServerService + private readonly defaultServerService: DefaultServerService, + private readonly nbstoreService: NbstoreService ) { super(); } @@ -79,27 +81,27 @@ export class AudioTranscriptionJobStore extends Entity<{ return response.retryTranscriptTask; }; - getTranscriptTask = async (blobId: string, taskId?: string) => { - const graphqlService = this.graphqlService; - if (!graphqlService) { - throw new Error('No graphql service available'); - } + getTranscriptTask = async ( + blobId: string, + taskId?: string + ): Promise => { const currentWorkspaceId = this.currentWorkspaceId; if (!currentWorkspaceId) { throw new Error('No current workspace id'); } - const response = await graphqlService.gql({ - query: getTranscriptTaskQuery, - variables: { - workspaceId: currentWorkspaceId, - taskId, - blobId, - }, - }); - if (!response.currentUser?.copilot?.transcriptTask) { - return null; - } - return response.currentUser.copilot.transcriptTask; + const response = await this.nbstoreService.realtime.request( + 'copilot.transcript.task.get', + { workspaceId: currentWorkspaceId, taskId, blobId }, + { timeoutMs: 10000 } + ); + return response.task as TranscriptionResultType | null; + }; + + subscribeTranscriptTask = (taskId: string) => { + return this.nbstoreService.realtime.subscribe( + 'copilot.transcript.task.changed', + { workspaceId: this.currentWorkspaceId, taskId } + ); }; settleTranscriptTask = async (taskId: string) => { const graphqlService = this.graphqlService; diff --git a/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts b/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts index 7b6ffbd2bc..fef525b794 100644 --- a/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts +++ b/packages/frontend/core/src/modules/media/entities/audio-transcription-job.ts @@ -4,6 +4,7 @@ import { DebugLogger } from '@affine/debug'; import { UserFriendlyError } from '@affine/error'; import { AiJobStatus } from '@affine/graphql'; import { Entity, LiveData } from '@toeverything/infra'; +import type { Subscription } from 'rxjs'; import type { DefaultServerService, WorkspaceServerService } from '../../cloud'; import { AuthService } from '../../cloud/services/auth'; @@ -59,10 +60,13 @@ export class AudioTranscriptionJob extends Entity<{ super(); this.disposables.push(() => { this.disposed = true; + this.rejectTaskWait(new Error('Job disposed')); }); } disposed = false; + private taskSubscription?: Subscription; + private taskWaitReject?: (error: unknown) => void; private readonly _status$ = new LiveData({ status: 'waiting-for-job', @@ -190,38 +194,77 @@ export class AudioTranscriptionJob extends Entity<{ } private async untilTaskReadyOrSettled() { - while ( - !this.disposed && - this.props.blockProps.jobId && - this.props.blockProps.createdBy === this.currentUserId + const taskId = this.props.blockProps.jobId; + if (!taskId || this.props.blockProps.createdBy !== this.currentUserId) { + return; + } + + await this.checkTranscriptTask(taskId); + + this.rejectTaskWait(new Error('Transcript task wait replaced')); + + await new Promise((resolve, reject) => { + this.taskWaitReject = reject; + this.taskSubscription = this.store + .subscribeTranscriptTask(taskId) + .subscribe({ + next: event => { + if ( + 'type' in event || + (event.status as AiJobStatus) === AiJobStatus.finished + ) { + this.checkTranscriptTask(taskId).then(() => { + if (this.status$.value.status === AiJobStatus.finished) { + resolve(); + } + }, reject); + } else if ((event.status as AiJobStatus) === AiJobStatus.failed) { + reject( + UserFriendlyError.fromAny( + event.error ?? 'Transcription job failed' + ) + ); + } + }, + error: reject, + }); + }).finally(() => { + this.taskWaitReject = undefined; + this.taskSubscription?.unsubscribe(); + this.taskSubscription = undefined; + }); + } + + private rejectTaskWait(error: unknown) { + const reject = this.taskWaitReject; + this.taskWaitReject = undefined; + this.taskSubscription?.unsubscribe(); + this.taskSubscription = undefined; + reject?.(error); + } + + private async checkTranscriptTask(taskId: string) { + if ( + this.disposed || + this.props.blockProps.jobId !== taskId || + this.props.blockProps.createdBy !== this.currentUserId ) { - logger.debug('Polling job status', { - jobId: this.props.blockProps.jobId, + return; + } + const job = await this.store.getTranscriptTask(this.props.blobId, taskId); + + if (!job || job.status === AiJobStatus.failed) { + logger.debug('Job failed during realtime status check', { + jobId: taskId, }); - const job = await this.store.getTranscriptTask( - this.props.blobId, - this.props.blockProps.jobId - ); + throw UserFriendlyError.fromAny('Transcription job failed'); + } - if (!job || job?.status === 'failed') { - logger.debug('Job failed during polling', { - jobId: this.props.blockProps.jobId, - }); - throw UserFriendlyError.fromAny('Transcription job failed'); - } - - if (job?.status === AiJobStatus.finished) { - logger.debug('Transcript task is ready to settle', { - jobId: this.props.blockProps.jobId, - }); - this._status$.value = { - status: AiJobStatus.finished, - }; - return; - } - - // Add delay between polling attempts - await new Promise(resolve => setTimeout(resolve, 3000)); + if (job.status === AiJobStatus.finished) { + logger.debug('Transcript task is ready to settle', { jobId: taskId }); + this._status$.value = { + status: AiJobStatus.finished, + }; } } diff --git a/packages/frontend/core/src/modules/media/index.ts b/packages/frontend/core/src/modules/media/index.ts index e9ce8272a5..b78e831ebc 100644 --- a/packages/frontend/core/src/modules/media/index.ts +++ b/packages/frontend/core/src/modules/media/index.ts @@ -1,7 +1,7 @@ import type { Framework } from '@toeverything/infra'; import { DefaultServerService, WorkspaceServerService } from '../cloud'; -import { GlobalState, GlobalStateService } from '../storage'; +import { GlobalState, GlobalStateService, NbstoreService } from '../storage'; import { WorkbenchService } from '../workbench'; import { WorkspaceScope, WorkspaceService } from '../workspace'; import { AudioAttachmentBlock } from './entities/audio-attachment-block'; @@ -35,6 +35,7 @@ export function configureMediaModule(framework: Framework) { WorkspaceService, WorkspaceServerService, DefaultServerService, + NbstoreService, ]) .service(AudioAttachmentService) .service(AudioMediaManagerService, [ diff --git a/packages/frontend/core/src/modules/notification/index.ts b/packages/frontend/core/src/modules/notification/index.ts index f81d423a63..13343277a1 100644 --- a/packages/frontend/core/src/modules/notification/index.ts +++ b/packages/frontend/core/src/modules/notification/index.ts @@ -12,7 +12,7 @@ import { ServerScope, ServerService, } from '../cloud'; -import { GlobalSessionState } from '../storage'; +import { GlobalSessionState, NbstoreService } from '../storage'; import { NotificationCountService } from './services/count'; import { NotificationListService } from './services/list'; import { NotificationService } from './services/notification'; @@ -22,7 +22,11 @@ export function configureNotificationModule(framework: Framework) { framework .scope(ServerScope) .service(NotificationService, [NotificationStore]) - .service(NotificationCountService, [NotificationStore, AuthService]) + .service(NotificationCountService, [ + NotificationStore, + AuthService, + NbstoreService, + ]) .service(NotificationListService, [ NotificationStore, NotificationCountService, diff --git a/packages/frontend/core/src/modules/notification/services/count.ts b/packages/frontend/core/src/modules/notification/services/count.ts index dc33115fff..ef2f52a4bf 100644 --- a/packages/frontend/core/src/modules/notification/services/count.ts +++ b/packages/frontend/core/src/modules/notification/services/count.ts @@ -10,11 +10,13 @@ import { Service, smartRetry, } from '@toeverything/infra'; -import { switchMap, tap, timer } from 'rxjs'; +import type { Subscription } from 'rxjs'; +import { tap } from 'rxjs'; import { AccountChanged, type AuthService } from '../../cloud'; import { ServerStarted } from '../../cloud/events/server-started'; import { ApplicationFocused } from '../../lifecycle'; +import type { NbstoreService } from '../../storage'; import type { NotificationStore } from '../stores/notification'; @OnEvent(ApplicationFocused, s => s.handleApplicationFocused) @@ -23,7 +25,8 @@ import type { NotificationStore } from '../stores/notification'; export class NotificationCountService extends Service { constructor( private readonly store: NotificationStore, - private readonly authService: AuthService + private readonly authService: AuthService, + private readonly nbstoreService: NbstoreService ) { super(); } @@ -33,17 +36,17 @@ export class NotificationCountService extends Service { readonly count$ = LiveData.from(this.store.watchNotificationCountCache(), 0); readonly isLoading$ = new LiveData(false); readonly error$ = new LiveData(null); + private subscription?: Subscription; revalidate = effect( - switchMap(() => { - return timer(0, 30000); // revalidate every 30 seconds - }), exhaustMapWithTrailing(() => { - return fromPromise(signal => { + return fromPromise(() => { if (!this.loggedIn$.value) { return Promise.resolve(0); } - return this.store.getNotificationCount(signal); + return this.nbstoreService.realtime + .request('notification.count.get', {}, { timeoutMs: 10000 }) + .then(result => result.count); }).pipe( tap(result => { this.setCount(result ?? 0); @@ -63,10 +66,12 @@ export class NotificationCountService extends Service { } handleServerStarted() { + this.subscribe(); this.revalidate(); } handleAccountChanged() { + this.subscribe(); this.revalidate(); } @@ -77,5 +82,27 @@ export class NotificationCountService extends Service { override dispose(): void { super.dispose(); this.revalidate.unsubscribe(); + this.subscription?.unsubscribe(); + } + + private subscribe() { + this.subscription?.unsubscribe(); + if (!this.loggedIn$.value) { + return; + } + this.subscription = this.nbstoreService.realtime + .subscribe('notification.count.changed', {}) + .subscribe({ + next: event => { + if ('type' in event) { + this.revalidate(); + } else { + this.setCount(event.count); + } + }, + error: error => { + this.error$.setValue(error); + }, + }); } } diff --git a/packages/frontend/core/src/modules/notification/stores/notification.ts b/packages/frontend/core/src/modules/notification/stores/notification.ts index 7cd561ce15..2addc68b8f 100644 --- a/packages/frontend/core/src/modules/notification/stores/notification.ts +++ b/packages/frontend/core/src/modules/notification/stores/notification.ts @@ -3,7 +3,6 @@ import { type ListNotificationsQuery, listNotificationsQuery, mentionUserMutation, - notificationCountQuery, type PaginationInput, readAllNotificationsMutation, readNotificationMutation, @@ -52,17 +51,6 @@ export class NotificationStore extends Store { ); } - async getNotificationCount(signal?: AbortSignal) { - const result = await this.gqlService.gql({ - query: notificationCountQuery, - context: { - signal, - }, - }); - - return result.currentUser?.notifications.totalCount; - } - async listNotification(pagination: PaginationInput, signal?: AbortSignal) { const result = await this.gqlService.gql({ query: listNotificationsQuery, diff --git a/packages/frontend/core/src/modules/storage/providers/nbstore.ts b/packages/frontend/core/src/modules/storage/providers/nbstore.ts index 756ecd779f..13587484ff 100644 --- a/packages/frontend/core/src/modules/storage/providers/nbstore.ts +++ b/packages/frontend/core/src/modules/storage/providers/nbstore.ts @@ -1,10 +1,13 @@ import type { StoreClient, + StoreManagerClient, WorkerInitOptions, } from '@affine/nbstore/worker/client'; import { createIdentifier } from '@toeverything/infra'; export interface NbstoreProvider { + readonly realtime: StoreManagerClient['realtime']; + /** * Open a nbstore with the given options, if the store with the given key already exists, it will be returned. * diff --git a/packages/frontend/core/src/modules/storage/services/nbstore.ts b/packages/frontend/core/src/modules/storage/services/nbstore.ts index a5a95db322..e3363fba3c 100644 --- a/packages/frontend/core/src/modules/storage/services/nbstore.ts +++ b/packages/frontend/core/src/modules/storage/services/nbstore.ts @@ -11,4 +11,8 @@ export class NbstoreService extends Service { openStore(key: string, options: WorkerInitOptions) { return this.nbstoreProvider.openStore(key, options); } + + get realtime() { + return this.nbstoreProvider.realtime; + } } diff --git a/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts b/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts index eb803c9fee..ce2cc8da07 100644 --- a/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts +++ b/packages/frontend/core/src/modules/workspace-indexer-embedding/entities/embedding-progress.ts @@ -10,8 +10,8 @@ import { onStart, smartRetry, } from '@toeverything/infra'; -import { EMPTY, interval, Subject } from 'rxjs'; -import { exhaustMap, mergeMap, switchMap, takeUntil } from 'rxjs/operators'; +import { EMPTY, type Subscription } from 'rxjs'; +import { exhaustMap, mergeMap } from 'rxjs/operators'; import type { EmbeddingStore } from '../stores/embedding'; import type { LocalAttachmentFile } from '../types'; @@ -26,8 +26,7 @@ export class EmbeddingProgress extends Entity { error$ = new LiveData(null); loading$ = new LiveData(true); - private readonly EMBEDDING_PROGRESS_POLL_INTERVAL = 3000; - private readonly stopEmbeddingProgress$ = new Subject(); + private progressSubscription?: Subscription; uploadingAttachments$ = new LiveData([]); constructor( @@ -37,50 +36,49 @@ export class EmbeddingProgress extends Entity { super(); } - startEmbeddingProgressPolling() { - this.stopEmbeddingProgressPolling(); + startEmbeddingProgress() { + this.stopEmbeddingProgress(); + this.progressSubscription = this.store + .subscribeEmbeddingProgress(this.workspaceService.workspace.id) + .subscribe({ + next: () => this.getEmbeddingProgress(), + error: error => this.error$.setValue(error), + }); this.getEmbeddingProgress(); } - stopEmbeddingProgressPolling() { - this.stopEmbeddingProgress$.next(); + stopEmbeddingProgress() { + this.progressSubscription?.unsubscribe(); + this.progressSubscription = undefined; } getEmbeddingProgress = effect( exhaustMap(() => { - return interval(this.EMBEDDING_PROGRESS_POLL_INTERVAL).pipe( - takeUntil(this.stopEmbeddingProgress$), - switchMap(() => - fromPromise(signal => - this.store.getEmbeddingProgress( - this.workspaceService.workspace.id, - signal - ) - ).pipe( - smartRetry(), - mergeMap(value => { - this.progress$.next(value); - if (value && value.embedded === value.total && value.total > 0) { - this.stopEmbeddingProgressPolling(); - } - return EMPTY; - }), - catchErrorInto(this.error$, error => { - logger.error( - 'Failed to fetch workspace embedding progress', - error - ); - }), - onStart(() => this.loading$.setValue(true)), - onComplete(() => this.loading$.setValue(false)) - ) + return fromPromise(signal => + this.store.getEmbeddingProgress( + this.workspaceService.workspace.id, + signal ) + ).pipe( + smartRetry(), + mergeMap(value => { + this.progress$.next(value); + if (value && value.embedded === value.total && value.total > 0) { + this.stopEmbeddingProgress(); + } + return EMPTY; + }), + catchErrorInto(this.error$, error => { + logger.error('Failed to fetch workspace embedding progress', error); + }), + onStart(() => this.loading$.setValue(true)), + onComplete(() => this.loading$.setValue(false)) ); }) ); override dispose(): void { - this.stopEmbeddingProgress$.next(); + this.progressSubscription?.unsubscribe(); this.getEmbeddingProgress.unsubscribe(); } } diff --git a/packages/frontend/core/src/modules/workspace-indexer-embedding/index.ts b/packages/frontend/core/src/modules/workspace-indexer-embedding/index.ts index fdb3321b14..78f7c06032 100644 --- a/packages/frontend/core/src/modules/workspace-indexer-embedding/index.ts +++ b/packages/frontend/core/src/modules/workspace-indexer-embedding/index.ts @@ -1,4 +1,5 @@ import { WorkspaceServerService } from '@affine/core/modules/cloud'; +import { NbstoreService } from '@affine/core/modules/storage'; import { WorkspaceScope, WorkspaceService, @@ -16,7 +17,7 @@ export function configureIndexerEmbeddingModule(framework: Framework) { framework .scope(WorkspaceScope) .service(EmbeddingService) - .store(EmbeddingStore, [WorkspaceServerService]) + .store(EmbeddingStore, [WorkspaceServerService, NbstoreService]) .entity(EmbeddingEnabled, [WorkspaceService, EmbeddingStore]) .entity(AdditionalAttachments, [WorkspaceService, EmbeddingStore]) .entity(IgnoredDocs, [WorkspaceService, EmbeddingStore]) diff --git a/packages/frontend/core/src/modules/workspace-indexer-embedding/stores/embedding.ts b/packages/frontend/core/src/modules/workspace-indexer-embedding/stores/embedding.ts index 59446c2445..b6a9f70b91 100644 --- a/packages/frontend/core/src/modules/workspace-indexer-embedding/stores/embedding.ts +++ b/packages/frontend/core/src/modules/workspace-indexer-embedding/stores/embedding.ts @@ -1,11 +1,11 @@ import type { WorkspaceServerService } from '@affine/core/modules/cloud'; +import type { NbstoreService } from '@affine/core/modules/storage'; import { addWorkspaceEmbeddingFilesMutation, addWorkspaceEmbeddingIgnoredDocsMutation, getAllWorkspaceEmbeddingIgnoredDocsQuery, getWorkspaceConfigQuery, getWorkspaceEmbeddingFilesQuery, - getWorkspaceEmbeddingStatusQuery, type PaginationInput, removeWorkspaceEmbeddingFilesMutation, removeWorkspaceEmbeddingIgnoredDocsMutation, @@ -14,7 +14,10 @@ import { import { Store } from '@toeverything/infra'; export class EmbeddingStore extends Store { - constructor(private readonly workspaceServerService: WorkspaceServerService) { + constructor( + private readonly workspaceServerService: WorkspaceServerService, + private readonly nbstoreService: NbstoreService + ) { super(); } @@ -178,17 +181,17 @@ export class EmbeddingStore extends Store { } async getEmbeddingProgress(workspaceId: string, signal?: AbortSignal) { - if (!this.workspaceServerService.server) { - throw new Error('No Server'); - } + return await this.nbstoreService.realtime.request( + 'workspace.embedding.progress.get', + { workspaceId }, + { signal, timeoutMs: 10000 } + ); + } - const data = await this.workspaceServerService.server.gql({ - query: getWorkspaceEmbeddingStatusQuery, - variables: { - workspaceId, - }, - context: { signal }, - }); - return data.queryWorkspaceEmbeddingStatus; + subscribeEmbeddingProgress(workspaceId: string) { + return this.nbstoreService.realtime.subscribe( + 'workspace.embedding.progress.changed', + { workspaceId } + ); } } diff --git a/packages/frontend/core/src/modules/workspace-indexer-embedding/view/embedding-settings.tsx b/packages/frontend/core/src/modules/workspace-indexer-embedding/view/embedding-settings.tsx index 7d56c40b94..7076984ada 100644 --- a/packages/frontend/core/src/modules/workspace-indexer-embedding/view/embedding-settings.tsx +++ b/packages/frontend/core/src/modules/workspace-indexer-embedding/view/embedding-settings.tsx @@ -66,7 +66,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => { .setEnabled(checked) .then(() => { if (checked) { - embeddingService.embeddingProgress.startEmbeddingProgressPolling(); + embeddingService.embeddingProgress.startEmbeddingProgress(); } }) .catch(error => { @@ -91,8 +91,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => { docType: file.type, }); embeddingService.additionalAttachments.addAttachments([file]); - // Restart polling to track progress of newly uploaded files - embeddingService.embeddingProgress.startEmbeddingProgressPolling(); + embeddingService.embeddingProgress.startEmbeddingProgress(); }, [embeddingService.additionalAttachments, embeddingService.embeddingProgress] ); @@ -168,7 +167,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => { ]); useEffect(() => { - embeddingService.embeddingProgress.startEmbeddingProgressPolling(); + embeddingService.embeddingProgress.startEmbeddingProgress(); embeddingService.embeddingEnabled.getEnabled(); embeddingService.additionalAttachments.getAttachments({ first: COUNT_PER_PAGE, @@ -178,7 +177,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => { embeddingService.embeddingProgress.getEmbeddingProgress(); return () => { - embeddingService.embeddingProgress.stopEmbeddingProgressPolling(); + embeddingService.embeddingProgress.stopEmbeddingProgress(); }; }, [ embeddingService.embeddingProgress, diff --git a/tools/utils/src/workspace.gen.ts b/tools/utils/src/workspace.gen.ts index 2c52924ad6..7644ca4e68 100644 --- a/tools/utils/src/workspace.gen.ts +++ b/tools/utils/src/workspace.gen.ts @@ -1111,6 +1111,7 @@ export const PackageList = [ 'tools/cli', 'tools/utils', 'packages/common/graphql', + 'packages/common/realtime', ], }, { @@ -1148,6 +1149,7 @@ export const PackageList = [ name: '@affine/nbstore', workspaceDependencies: [ 'packages/common/reader', + 'packages/common/realtime', 'packages/common/infra', 'packages/common/error', 'packages/common/graphql', @@ -1159,6 +1161,11 @@ export const PackageList = [ name: '@affine/reader', workspaceDependencies: ['blocksuite/affine/all'], }, + { + location: 'packages/common/realtime', + name: '@affine/realtime', + workspaceDependencies: ['packages/common/graphql'], + }, { location: 'packages/common/s3-compat', name: '@affine/s3-compat', @@ -1524,6 +1531,7 @@ export type PackageName = | '@toeverything/infra' | '@affine/nbstore' | '@affine/reader' + | '@affine/realtime' | '@affine/s3-compat' | '@affine/admin' | '@affine/android' diff --git a/tsconfig.json b/tsconfig.json index d9f4650b7e..bfc35da0e0 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -132,6 +132,7 @@ { "path": "./packages/common/infra" }, { "path": "./packages/common/nbstore" }, { "path": "./packages/common/reader" }, + { "path": "./packages/common/realtime" }, { "path": "./packages/common/s3-compat" }, { "path": "./packages/frontend/admin" }, { "path": "./packages/frontend/apps/android" }, diff --git a/yarn.lock b/yarn.lock index d67ca21ff9..1112373225 100644 --- a/yarn.lock +++ b/yarn.lock @@ -818,6 +818,7 @@ __metadata: "@affine/error": "workspace:*" "@affine/graphql": "workspace:*" "@affine/reader": "workspace:*" + "@affine/realtime": "workspace:*" "@blocksuite/affine": "workspace:*" "@toeverything/infra": "workspace:*" eventemitter2: "npm:^6.4.9" @@ -835,6 +836,7 @@ __metadata: peerDependencies: "@affine/error": "workspace:*" "@affine/graphql": "workspace:*" + "@affine/realtime": "workspace:*" "@blocksuite/affine": "workspace:*" idb: ^8.0.0 socket.io-client: ^4.8.3 @@ -866,6 +868,14 @@ __metadata: languageName: unknown linkType: soft +"@affine/realtime@workspace:*, @affine/realtime@workspace:packages/common/realtime": + version: 0.0.0-use.local + resolution: "@affine/realtime@workspace:packages/common/realtime" + dependencies: + "@affine/graphql": "workspace:*" + languageName: unknown + linkType: soft + "@affine/revert-update@workspace:tools/revert-update": version: 0.0.0-use.local resolution: "@affine/revert-update@workspace:tools/revert-update" @@ -918,6 +928,7 @@ __metadata: "@affine-tools/cli": "workspace:*" "@affine-tools/utils": "workspace:*" "@affine/graphql": "workspace:*" + "@affine/realtime": "workspace:*" "@affine/s3-compat": "workspace:*" "@affine/server-native": "workspace:*" "@apollo/server": "npm:^5.5.0"