feat(server): realtime notification & task status (#14934)

#### PR Dependency Tree


* **PR #14934** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Full realtime platform added: live notifications, comments, embedding
progress, and transcription task updates via realtime subscriptions.

* **Chores**
* Frontend switched from polling/GraphQL queries to realtime channels;
legacy query fields marked deprecated and client libs updated to use
realtime APIs.

[![Review Change
Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/toeverything/AFFiNE/pull/14934)
<!-- end of auto-generated comment: release notes by coderabbit.ai -->


#### PR Dependency Tree


* **PR #14934** 👈
  * **PR #14936**

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)
This commit is contained in:
DarkSky
2026-05-10 23:21:50 +08:00
committed by GitHub
parent 417d31cabe
commit 8cf00738c2
70 changed files with 2378 additions and 283 deletions
+1
View File
@@ -114,6 +114,7 @@
"@affine-tools/cli": "workspace:*",
"@affine-tools/utils": "workspace:*",
"@affine/graphql": "workspace:*",
"@affine/realtime": "workspace:*",
"@faker-js/faker": "^10.1.0",
"@nestjs/swagger": "^11.2.7",
"@nestjs/testing": "patch:@nestjs/testing@npm%3A11.1.18#~/.yarn/patches/@nestjs-testing-npm-11.1.18-32c0f6af12.patch",
@@ -273,7 +273,7 @@ e2e('should mark notification as read', async t => {
const count = await app.gql({
query: notificationCountQuery,
});
t.is(count.currentUser!.notifications.totalCount, 0);
t.is(count.currentUser!.notificationCount, 0);
// read again should work
for (const notification of notifications) {
@@ -42,6 +42,7 @@ import { NotificationModule } from './core/notification';
import { PermissionModule } from './core/permission';
import { QueueDashboardModule } from './core/queue-dashboard';
import { QuotaModule } from './core/quota';
import { RealtimeModule } from './core/realtime';
import { SelfhostModule } from './core/selfhost';
import { StaticFileModule } from './core/static-files';
import { StorageModule } from './core/storage';
@@ -117,6 +118,7 @@ export const FunctionalityModules = [
ErrorModule,
WebSocketModule,
JobModule.forRoot(),
RealtimeModule,
ModelsModule,
ScheduleModule.forRoot(),
MonitorModule,
@@ -3,12 +3,13 @@ import { Module } from '@nestjs/common';
import { ServerConfigModule } from '../config';
import { PermissionModule } from '../permission';
import { StorageModule } from '../storage';
import { CommentRealtimeProvider } from './realtime';
import { CommentResolver } from './resolver';
import { CommentService } from './service';
@Module({
imports: [PermissionModule, StorageModule, ServerConfigModule],
providers: [CommentResolver, CommentService],
providers: [CommentResolver, CommentService, CommentRealtimeProvider],
exports: [CommentService],
})
export class CommentModule {}
@@ -0,0 +1,99 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { z } from 'zod';
import { decodeWithJson, encodeWithJson } from '../../base/graphql';
import { AccessController } from '../permission';
import type { RealtimePublisher, RealtimeRegistry } from '../realtime';
import type { CommentCursor } from './resolver';
import { CommentService } from './service';
export function commentRoom(workspaceId: string, docId: string) {
return `workspace:${workspaceId}:doc:${docId}:comment`;
}
@Injectable()
export class CommentRealtimeProvider implements OnModuleInit {
constructor(
private readonly service: CommentService,
private readonly ac: AccessController,
@Optional() private readonly registry?: RealtimeRegistry
) {}
onModuleInit() {
const input = z.object({
workspaceId: z.string(),
docId: z.string(),
after: z.string().optional(),
first: z.number().optional(),
});
this.registry?.registerRequest({
name: 'comment.changes.get',
input,
handle: async (user, payload) => {
await this.assertRead(user.id, payload.workspaceId, payload.docId);
const cursor: CommentCursor = decodeWithJson(payload.after) ?? {};
const changes = await this.service.listCommentChanges(
payload.workspaceId,
payload.docId,
{
commentUpdatedAt: cursor.commentUpdatedAt,
replyUpdatedAt: cursor.replyUpdatedAt,
take: payload.first,
}
);
const endCursor = cursor;
for (const change of changes) {
if (change.commentId) {
endCursor.replyUpdatedAt = change.item.updatedAt;
} else {
endCursor.commentUpdatedAt = change.item.updatedAt;
}
}
return {
changes: changes.map(change => ({
id: change.id,
action: change.action,
item: change.item,
commentId: change.commentId ?? undefined,
})) as never,
startCursor: '',
endCursor: encodeWithJson(endCursor),
hasNextPage: changes.length > 0,
};
},
});
this.registry?.registerTopic({
name: 'comment.changed',
input: z.object({
workspaceId: z.string(),
docId: z.string(),
}),
authorize: async (user, payload) => {
await this.assertRead(user.id, payload.workspaceId, payload.docId);
},
room: (_user, payload) => commentRoom(payload.workspaceId, payload.docId),
});
}
private async assertRead(userId: string, workspaceId: string, docId: string) {
await this.ac
.user(userId)
.workspace(workspaceId)
.doc(docId)
.assert('Doc.Comments.Read');
}
}
export function publishCommentChanged(
publisher: RealtimePublisher | undefined,
workspaceId: string,
docId: string
) {
publisher?.publish(
'comment.changed',
{ workspaceId, docId },
{ changed: true }
);
}
@@ -1,5 +1,6 @@
import { randomUUID } from 'node:crypto';
import { Optional } from '@nestjs/common';
import {
Args,
Mutation,
@@ -26,9 +27,11 @@ import { Comment, DocMode, Models, Reply } from '../../models';
import { CurrentUser } from '../auth/session';
import { ServerFeature, ServerService } from '../config';
import { AccessController, DocAction } from '../permission';
import type { RealtimePublisher } from '../realtime';
import { CommentAttachmentStorage } from '../storage';
import { UserType } from '../user';
import { WorkspaceType } from '../workspaces';
import { publishCommentChanged } from './realtime';
import { CommentService } from './service';
import {
CommentCreateInput,
@@ -56,7 +59,8 @@ export class CommentResolver {
private readonly commentAttachmentStorage: CommentAttachmentStorage,
private readonly queue: JobQueue,
private readonly models: Models,
private readonly server: ServerService
private readonly server: ServerService,
@Optional() private readonly realtime?: RealtimePublisher
) {
// enable comment feature by default
this.server.enableFeature(ServerFeature.Comment);
@@ -81,6 +85,7 @@ export class CommentResolver {
input.docMode,
input.mentions
);
publishCommentChanged(this.realtime, comment.workspaceId, comment.docId);
return {
...comment,
@@ -108,6 +113,7 @@ export class CommentResolver {
await this.assertPermission(me, comment, 'Doc.Comments.Update');
await this.service.updateComment(input);
publishCommentChanged(this.realtime, comment.workspaceId, comment.docId);
return true;
}
@@ -126,6 +132,7 @@ export class CommentResolver {
await this.assertPermission(me, comment, 'Doc.Comments.Resolve');
await this.service.resolveComment(input);
publishCommentChanged(this.realtime, comment.workspaceId, comment.docId);
return true;
}
@@ -141,6 +148,7 @@ export class CommentResolver {
await this.assertPermission(me, comment, 'Doc.Comments.Delete');
await this.service.deleteComment(id);
publishCommentChanged(this.realtime, comment.workspaceId, comment.docId);
return true;
}
@@ -169,6 +177,7 @@ export class CommentResolver {
input.mentions,
reply
);
publishCommentChanged(this.realtime, comment.workspaceId, comment.docId);
return {
...reply,
@@ -195,6 +204,7 @@ export class CommentResolver {
await this.assertPermission(me, reply, 'Doc.Comments.Update');
await this.service.updateReply(input);
publishCommentChanged(this.realtime, reply.workspaceId, reply.docId);
return true;
}
@@ -210,6 +220,7 @@ export class CommentResolver {
await this.assertPermission(me, reply, 'Doc.Comments.Delete');
await this.service.deleteReply(id);
publishCommentChanged(this.realtime, reply.workspaceId, reply.docId);
return true;
}
@@ -294,6 +305,7 @@ export class CommentResolver {
})
pagination: PaginationInput
): Promise<PaginatedCommentChangeObjectType> {
// DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients.
await this.assertPermission(
me,
{
@@ -5,6 +5,7 @@ import { MailModule } from '../mail';
import { PermissionModule } from '../permission';
import { StorageModule } from '../storage';
import { NotificationJob } from './job';
import { NotificationRealtimeProvider } from './realtime';
import { NotificationResolver, UserNotificationResolver } from './resolver';
import { NotificationService } from './service';
@@ -15,6 +16,7 @@ import { NotificationService } from './service';
NotificationResolver,
NotificationService,
NotificationJob,
NotificationRealtimeProvider,
],
exports: [NotificationService],
})
@@ -0,0 +1,3 @@
export function notificationCountRoom(userId: string) {
return `user:${userId}:notification`;
}
@@ -0,0 +1,36 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { z } from 'zod';
import type { RealtimeRegistry } from '../realtime';
import { notificationCountRoom } from './realtime-room';
import { NotificationService } from './service';
@Injectable()
export class NotificationRealtimeProvider implements OnModuleInit {
constructor(
private readonly service: NotificationService,
@Optional() private readonly registry?: RealtimeRegistry
) {}
onModuleInit() {
this.registry?.registerRequest({
name: 'notification.count.get',
input: z.object({}).strict(),
handle: async user => ({
count: await this.service.countByUserId(user.id),
}),
});
this.registry?.registerTopic({
name: 'notification.count.changed',
input: z.object({}).strict(),
authorize: async () => {},
room: user => {
if (!user) {
throw new Error('User is required for notification count room');
}
return notificationCountRoom(user.id);
},
});
}
}
@@ -47,8 +47,11 @@ export class UserNotificationResolver {
@ResolveField(() => Int, {
description: 'Get user notification count',
deprecationReason:
'Use realtime subscription "notification.count.changed" instead.',
})
async notificationCount(@CurrentUser() me: UserType): Promise<number> {
// DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients.
return await this.service.countByUserId(me.id);
}
@@ -1,4 +1,4 @@
import { Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger, Optional } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { NotificationNotFound, PaginationInput, URLHelper } from '../../base';
@@ -17,11 +17,13 @@ import {
} from '../../models';
import { DocReader } from '../doc';
import { Mailer } from '../mail';
import type { RealtimePublisher } from '../realtime';
import { generateDocPath } from '../utils/doc';
import {
generateWorkspaceSettingsPath,
WorkspaceSettingsTab,
} from '../utils/workspace';
import { notificationCountRoom } from './realtime-room';
@Injectable()
export class NotificationService {
@@ -31,11 +33,22 @@ export class NotificationService {
private readonly models: Models,
private readonly docReader: DocReader,
private readonly mailer: Mailer,
private readonly url: URLHelper
private readonly url: URLHelper,
@Optional() private readonly realtime?: RealtimePublisher
) {}
async cleanExpiredNotifications() {
return await this.models.notification.cleanExpiredNotifications();
const userIds =
await this.models.notification.findExpiredNotificationUserIds();
const count = await this.models.notification.cleanExpiredNotifications();
if (count > 0) {
await Promise.all(
userIds.map(userId =>
this.publishCountChanged(userId, 'expired-cleanup')
)
);
}
return count;
}
async createComment(input: CommentNotificationCreate, isMention?: boolean) {
@@ -43,6 +56,7 @@ export class NotificationService {
? await this.models.notification.createCommentMention(input)
: await this.models.notification.createComment(input);
await this.sendCommentEmail(input, isMention);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
@@ -93,6 +107,7 @@ export class NotificationService {
async createMention(input: MentionNotificationCreate) {
const notification = await this.models.notification.createMention(input);
await this.sendMentionEmail(input);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
@@ -147,6 +162,7 @@ export class NotificationService {
NotificationType.Invitation
);
await this.sendInvitationEmail(input);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
@@ -198,6 +214,7 @@ export class NotificationService {
NotificationType.InvitationAccepted
);
await this.sendInvitationAcceptedEmail(input);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
@@ -240,18 +257,22 @@ export class NotificationService {
async createInvitationBlocked(input: InvitationNotificationCreate) {
await this.ensureWorkspaceContentExists(input.body.workspaceId);
return await this.models.notification.createInvitation(
const notification = await this.models.notification.createInvitation(
input,
NotificationType.InvitationBlocked
);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
async createInvitationRejected(input: InvitationNotificationCreate) {
await this.ensureWorkspaceContentExists(input.body.workspaceId);
return await this.models.notification.createInvitation(
const notification = await this.models.notification.createInvitation(
input,
NotificationType.InvitationRejected
);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
async createInvitationReviewRequest(input: InvitationNotificationCreate) {
@@ -267,6 +288,7 @@ export class NotificationService {
NotificationType.InvitationReviewRequest
);
await this.sendInvitationReviewRequestEmail(input);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
@@ -315,6 +337,7 @@ export class NotificationService {
NotificationType.InvitationReviewApproved
);
await this.sendInvitationReviewApprovedEmail(input);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
@@ -354,6 +377,7 @@ export class NotificationService {
const notification =
await this.models.notification.createInvitationReviewDeclined(input);
await this.sendInvitationReviewDeclinedEmail(input);
await this.publishCountChanged(input.userId, 'created');
return notification;
}
@@ -397,10 +421,12 @@ export class NotificationService {
}
throw err;
}
await this.publishCountChanged(userId, 'read');
}
async markAllAsRead(userId: string) {
await this.models.notification.markAllAsRead(userId);
await this.publishCountChanged(userId, 'read-all');
}
/**
@@ -463,6 +489,26 @@ export class NotificationService {
return await this.models.notification.countByUserId(userId);
}
private async publishCountChanged(
userId: string,
reason: 'created' | 'read' | 'read-all' | 'expired-cleanup'
) {
if (!this.realtime) return;
try {
this.realtime.publish(
'notification.count.changed',
{},
{ count: await this.countByUserId(userId), reason },
{ room: notificationCountRoom(userId) }
);
} catch (error) {
this.logger.error(
`Failed to publish notification count for user ${userId}`,
error
);
}
}
private formatWorkspaceInfo(workspace: Workspace) {
return {
id: workspace.id,
@@ -0,0 +1,170 @@
import test from 'ava';
import { z } from 'zod';
import type { CurrentUser } from '../../auth';
import { RealtimeGateway } from '../gateway';
import type { RealtimePublisher } from '../publisher';
import { RealtimeRegistry } from '../registry';
import { stableStringify } from '../stable-stringify';
const user: CurrentUser = {
id: 'u1',
email: 'u1@affine.pro',
name: 'User',
avatarUrl: null,
disabled: false,
hasPassword: true,
emailVerified: true,
};
function createGateway(registry: RealtimeRegistry) {
return new RealtimeGateway(registry, {
attachServer() {},
publishLocal() {},
} as unknown as RealtimePublisher);
}
test('registry rejects duplicate request and topic handlers', t => {
const registry = new RealtimeRegistry();
const request = {
name: 'notification.count.get' as const,
input: z.object({}).strict(),
handle: async () => ({ count: 0 }),
};
const topic = {
name: 'notification.count.changed' as const,
input: z.object({}).strict(),
authorize: async () => {},
room: () => 'room',
};
registry.registerRequest(request);
registry.registerTopic(topic);
t.throws(() => registry.registerRequest(request), {
message: /already registered/,
});
t.throws(() => registry.registerTopic(topic), {
message: /already registered/,
});
});
test('gateway handles registered request with version gate', async t => {
const registry = new RealtimeRegistry();
registry.registerRequest({
name: 'notification.count.get',
input: z.object({}).strict(),
handle: async currentUser => ({ count: currentUser.id === 'u1' ? 1 : 0 }),
});
const gateway = createGateway(registry);
t.deepEqual(
await gateway.onRequest(user, {
op: 'notification.count.get',
input: {},
clientVersion: '0.26.0',
}),
{ data: { count: 1 } }
);
t.like(
await gateway.onRequest(user, {
op: 'notification.count.get',
input: {},
clientVersion: '0.25.0',
}),
{ error: { code: 'UNSUPPORTED_CLIENT_VERSION' } }
);
});
test('gateway authorizes subscription and joins room', async t => {
const registry = new RealtimeRegistry();
registry.registerTopic({
name: 'comment.changed',
input: z.object({ workspaceId: z.string(), docId: z.string() }),
authorize: async (_currentUser, input) => {
if (input.workspaceId !== 'space') {
throw new Error('denied');
}
},
room: (_currentUser, input) => `workspace:${input.workspaceId}`,
});
const gateway = createGateway(registry);
const joined: string[] = [];
const client = {
id: 'socket-1',
join: async (room: string) => {
joined.push(room);
},
leave: async (room: string) => {
joined.splice(joined.indexOf(room), 1);
},
};
const result = await gateway.onSubscribe(user, client as never, {
topic: 'comment.changed',
input: { workspaceId: 'space', docId: 'doc' },
clientVersion: '0.26.0',
});
t.deepEqual(joined, ['workspace:space']);
t.deepEqual(result, {
data: {
subscriptionId: `socket-1:comment.changed:${stableStringify({
workspaceId: 'space',
docId: 'doc',
})}`,
},
});
t.like(
await gateway.onSubscribe(user, client as never, {
topic: 'comment.changed',
input: { workspaceId: 'other', docId: 'doc' },
clientVersion: '0.26.0',
}),
{ error: { code: 'INTERNAL_SERVER_ERROR' } }
);
});
test('stableStringify is deterministic for subscription input keys', t => {
t.is(
stableStringify({ docId: 'doc', workspaceId: 'space' }),
stableStringify({ workspaceId: 'space', docId: 'doc' })
);
});
test('stableStringify follows JSON semantics for subscription input keys', t => {
t.is(stableStringify({ after: undefined }), stableStringify({}));
t.is(stableStringify([undefined]), '[null]');
t.is(
stableStringify(new Date('2026-01-02T03:04:05.000Z')),
'"2026-01-02T03:04:05.000Z"'
);
});
test('gateway removes subscriptions on socket disconnect', async t => {
const registry = new RealtimeRegistry();
registry.registerTopic({
name: 'notification.count.changed',
input: z.object({}).strict(),
authorize: async () => {},
room: () => 'user:u1:notification-count',
});
const gateway = createGateway(registry);
const client = {
id: 'socket-1',
join: async () => {},
leave: async () => {},
};
await gateway.onSubscribe(user, client as never, {
topic: 'notification.count.changed',
input: {},
clientVersion: '0.26.0',
});
t.is((gateway as any).subscriptions.size, 1);
gateway.handleDisconnect(client as never);
t.is((gateway as any).subscriptions.size, 0);
});
@@ -0,0 +1,136 @@
import type {
RealtimeRequestEnvelope,
RealtimeSubscribeEnvelope,
RealtimeUnsubscribeEnvelope,
} from '@affine/realtime';
import { applyDecorators, Logger, UseInterceptors } from '@nestjs/common';
import {
ConnectedSocket,
MessageBody,
OnGatewayDisconnect,
OnGatewayInit,
SubscribeMessage as RawSubscribeMessage,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { ClsInterceptor } from 'nestjs-cls';
import semver from 'semver';
import type { Server, Socket } from 'socket.io';
import {
GatewayErrorWrapper,
OnEvent,
UnsupportedClientVersion,
} from '../../base';
import { CurrentUser } from '../auth';
import { RealtimePublisher } from './publisher';
import { RealtimeRegistry } from './registry';
import { stableStringify } from './stable-stringify';
import type { RealtimePublishPayload } from './types';
const SubscribeMessage = (event: string) =>
applyDecorators(GatewayErrorWrapper(event), RawSubscribeMessage(event));
const MIN_REALTIME_CLIENT_VERSION = new semver.Range('>=0.26.0-0', {
includePrerelease: true,
});
@WebSocketGateway()
@UseInterceptors(ClsInterceptor)
export class RealtimeGateway implements OnGatewayInit, OnGatewayDisconnect {
private readonly logger = new Logger(RealtimeGateway.name);
private readonly subscriptions = new Map<
string,
{ socketId: string; room: string }
>();
@WebSocketServer()
private readonly server!: Server;
constructor(
private readonly registry: RealtimeRegistry,
private readonly publisher: RealtimePublisher
) {}
afterInit(_server: Server) {
this.publisher.attachServer(this.server);
}
handleDisconnect(client: Socket) {
for (const [subscriptionId, subscription] of this.subscriptions) {
if (subscription.socketId === client.id) {
this.subscriptions.delete(subscriptionId);
}
}
}
@SubscribeMessage('realtime:request')
async onRequest(
@CurrentUser() user: CurrentUser,
@MessageBody() envelope: RealtimeRequestEnvelope
) {
this.assertVersion(envelope.clientVersion);
const handler = this.registry.getRequest(envelope.op);
const input = handler.input.parse(envelope.input);
return { data: await handler.handle(user, input as never) };
}
@SubscribeMessage('realtime:subscribe')
async onSubscribe(
@CurrentUser() user: CurrentUser,
@ConnectedSocket() client: Socket,
@MessageBody() envelope: RealtimeSubscribeEnvelope
) {
this.assertVersion(envelope.clientVersion);
const handler = this.registry.getTopic(envelope.topic);
const input = handler.input.parse(envelope.input);
await handler.authorize(user, input as never);
const room = handler.room(user, input as never);
await client.join(room);
const subscriptionId = `${client.id}:${envelope.topic}:${stableStringify(input)}`;
this.subscriptions.set(subscriptionId, {
socketId: client.id,
room,
});
return { data: { subscriptionId } };
}
@SubscribeMessage('realtime:unsubscribe')
async onUnsubscribe(
@ConnectedSocket() client: Socket,
@MessageBody() envelope: RealtimeUnsubscribeEnvelope
) {
this.assertVersion(envelope.clientVersion);
if (!envelope.subscriptionId) {
return { data: { ok: true } };
}
const subscription = this.subscriptions.get(envelope.subscriptionId);
if (subscription?.socketId === client.id) {
await client.leave(subscription.room);
this.subscriptions.delete(envelope.subscriptionId);
}
return { data: { ok: true } };
}
@OnEvent('realtime.topic.changed', { suppressError: true })
onRealtimeTopicChanged(payload: RealtimePublishPayload) {
try {
this.publisher.publishLocal(payload);
} catch (error) {
this.logger.error('Failed to publish realtime event', error);
}
}
private assertVersion(clientVersion?: string) {
if (
!clientVersion ||
!semver.valid(clientVersion) ||
!MIN_REALTIME_CLIENT_VERSION.test(clientVersion)
) {
throw new UnsupportedClientVersion({
clientVersion: clientVersion ?? 'unset_or_invalid',
requiredVersion: '>=0.26.0',
});
}
}
}
@@ -0,0 +1,16 @@
import { Global, Module } from '@nestjs/common';
import { RealtimeGateway } from './gateway';
import { RealtimePublisher } from './publisher';
import { RealtimeRegistry } from './registry';
@Global()
@Module({
providers: [RealtimeRegistry, RealtimePublisher, RealtimeGateway],
exports: [RealtimeRegistry, RealtimePublisher],
})
export class RealtimeModule {}
export { RealtimePublisher } from './publisher';
export { RealtimeRegistry } from './registry';
export type { RealtimeRequestHandler, RealtimeTopicHandler } from './types';
@@ -0,0 +1,55 @@
import type { RealtimeEvent, RealtimeTopicName } from '@affine/realtime';
import { Injectable, Logger } from '@nestjs/common';
import type { Server } from 'socket.io';
import { EventBus } from '../../base';
import { RealtimeRegistry } from './registry';
import { stableStringify } from './stable-stringify';
import type { RealtimePublishPayload } from './types';
@Injectable()
export class RealtimePublisher {
private readonly logger = new Logger(RealtimePublisher.name);
private server?: Server;
constructor(
private readonly registry: RealtimeRegistry,
private readonly event: EventBus
) {}
attachServer(server: Server) {
this.server = server;
}
publish<Topic extends RealtimeTopicName>(
topic: Topic,
input: RealtimePublishPayload<Topic>['input'],
event: RealtimePublishPayload<Topic>['event'],
options?: { room?: string }
) {
const payload = {
topic,
input,
event,
room: options?.room,
} as RealtimePublishPayload<Topic>;
try {
this.publishLocal(payload);
this.event.broadcast('realtime.topic.changed', payload);
} catch (error) {
this.logger.error(`Failed to publish realtime topic ${topic}`, error);
}
}
publishLocal(payload: RealtimePublishPayload) {
const handler = this.registry.getTopic(payload.topic);
const room = payload.room ?? handler.room(null, payload.input as never);
const envelope: RealtimeEvent = {
topic: payload.topic,
inputKey: stableStringify(payload.input),
sentAt: Date.now(),
event: payload.event as never,
};
this.server?.to(room).emit('realtime:event', envelope);
}
}
@@ -0,0 +1,60 @@
import type { RealtimeRequestName, RealtimeTopicName } from '@affine/realtime';
import { Injectable } from '@nestjs/common';
import type { RealtimeRequestHandler, RealtimeTopicHandler } from './types';
@Injectable()
export class RealtimeRegistry {
private readonly requests = new Map<
RealtimeRequestName,
RealtimeRequestHandler<RealtimeRequestName>
>();
private readonly topics = new Map<
RealtimeTopicName,
RealtimeTopicHandler<RealtimeTopicName>
>();
registerRequest<Op extends RealtimeRequestName>(
handler: RealtimeRequestHandler<Op>
) {
if (this.requests.has(handler.name)) {
throw new Error(
`Realtime request handler already registered: ${handler.name}`
);
}
this.requests.set(
handler.name,
handler as RealtimeRequestHandler<RealtimeRequestName>
);
}
registerTopic<Topic extends RealtimeTopicName>(
handler: RealtimeTopicHandler<Topic>
) {
if (this.topics.has(handler.name)) {
throw new Error(
`Realtime topic handler already registered: ${handler.name}`
);
}
this.topics.set(
handler.name,
handler as RealtimeTopicHandler<RealtimeTopicName>
);
}
getRequest(name: RealtimeRequestName) {
const handler = this.requests.get(name);
if (!handler) {
throw new Error(`Realtime request handler not found: ${name}`);
}
return handler;
}
getTopic(name: RealtimeTopicName) {
const handler = this.topics.get(name);
if (!handler) {
throw new Error(`Realtime topic handler not found: ${name}`);
}
return handler;
}
}
@@ -0,0 +1,31 @@
export function stableStringify(value: unknown): string {
if (
value === undefined ||
typeof value === 'function' ||
typeof value === 'symbol'
) {
return 'null';
}
if (value === null || typeof value !== 'object') {
return JSON.stringify(value);
}
if (Array.isArray(value)) {
return `[${value.map(stableStringify).join(',')}]`;
}
if (value instanceof Date) {
return JSON.stringify(value.toJSON());
}
const record = value as Record<string, unknown>;
return `{${Object.keys(record)
.filter(key => {
const property = record[key];
return (
property !== undefined &&
typeof property !== 'function' &&
typeof property !== 'symbol'
);
})
.sort()
.map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`)
.join(',')}}`;
}
@@ -0,0 +1,45 @@
import type {
RealtimeRequestInputOf,
RealtimeRequestName,
RealtimeRequestOutputOf,
RealtimeTopicEventOf,
RealtimeTopicInputOf,
RealtimeTopicName,
} from '@affine/realtime';
import type { z } from 'zod';
import type { CurrentUser } from '../auth';
declare global {
interface Events {
'realtime.topic.changed': RealtimePublishPayload;
}
}
export type RealtimeRequestHandler<Op extends RealtimeRequestName> = {
name: Op;
input: z.ZodType<RealtimeRequestInputOf<Op>>;
handle(
user: CurrentUser,
input: RealtimeRequestInputOf<Op>
): Promise<RealtimeRequestOutputOf<Op>>;
};
export type RealtimeTopicHandler<Topic extends RealtimeTopicName> = {
name: Topic;
input: z.ZodType<RealtimeTopicInputOf<Topic>>;
authorize(
user: CurrentUser,
input: RealtimeTopicInputOf<Topic>
): Promise<void>;
room(user: CurrentUser | null, input: RealtimeTopicInputOf<Topic>): string;
};
export type RealtimePublishPayload<
Topic extends RealtimeTopicName = RealtimeTopicName,
> = {
topic: Topic;
input: RealtimeTopicInputOf<Topic>;
event: RealtimeTopicEventOf<Topic>;
room?: string;
};
@@ -311,6 +311,15 @@ export class NotificationModel extends BaseModel {
return row as UnionNotification;
}
async findExpiredNotificationUserIds() {
const rows = await this.db.notification.findMany({
distinct: ['userId'],
select: { userId: true },
where: { createdAt: { lte: new Date(Date.now() - ONE_YEAR) } },
});
return rows.map(row => row.userId);
}
async cleanExpiredNotifications() {
const { count } = await this.db.notification.deleteMany({
// delete notifications that are older than one year
@@ -1,2 +1,3 @@
export { CopilotEmbeddingRealtimeProvider } from './realtime';
export { CopilotContextResolver, CopilotContextRootResolver } from './resolver';
export { CopilotContextService } from './service';
@@ -0,0 +1,105 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { z } from 'zod';
import { OnEvent } from '../../../base';
import { AccessController } from '../../../core/permission';
import type {
RealtimePublisher,
RealtimeRegistry,
} from '../../../core/realtime';
import { Models } from '../../../models';
import { CopilotContextService } from './service';
export function workspaceEmbeddingRoom(workspaceId: string) {
return `workspace:${workspaceId}:embedding-progress`;
}
@Injectable()
export class CopilotEmbeddingRealtimeProvider implements OnModuleInit {
constructor(
private readonly ac: AccessController,
private readonly models: Models,
private readonly context: CopilotContextService,
@Optional() private readonly registry?: RealtimeRegistry,
@Optional() private readonly publisher?: RealtimePublisher
) {}
onModuleInit() {
const input = z.object({ workspaceId: z.string() });
this.registry?.registerRequest({
name: 'workspace.embedding.progress.get',
input,
handle: async (user, payload) => {
await this.assertCopilot(user.id, payload.workspaceId);
if (!this.context.canEmbedding) {
return { total: 0, embedded: 0 };
}
return await this.models.copilotWorkspace.getEmbeddingStatus(
payload.workspaceId
);
},
});
this.registry?.registerTopic({
name: 'workspace.embedding.progress.changed',
input,
authorize: async (user, payload) => {
await this.assertCopilot(user.id, payload.workspaceId);
},
room: (_user, payload) => workspaceEmbeddingRoom(payload.workspaceId),
});
}
@OnEvent('workspace.doc.embed.finished', { suppressError: true })
async onDocEmbedFinished(payload: Events['workspace.doc.embed.finished']) {
await this.publishContext(payload.contextId, 'finished');
}
@OnEvent('workspace.doc.embed.failed', { suppressError: true })
async onDocEmbedFailed(payload: Events['workspace.doc.embed.failed']) {
await this.publishContext(payload.contextId, 'failed');
}
@OnEvent('workspace.file.embed.finished', { suppressError: true })
async onFileEmbedFinished(payload: Events['workspace.file.embed.finished']) {
await this.publishContext(payload.contextId, 'finished');
}
@OnEvent('workspace.file.embed.failed', { suppressError: true })
async onFileEmbedFailed(payload: Events['workspace.file.embed.failed']) {
await this.publishContext(payload.contextId, 'failed');
}
@OnEvent('workspace.blob.embed.finished', { suppressError: true })
async onBlobEmbedFinished(payload: Events['workspace.blob.embed.finished']) {
await this.publishContext(payload.contextId, 'finished');
}
@OnEvent('workspace.blob.embed.failed', { suppressError: true })
async onBlobEmbedFailed(payload: Events['workspace.blob.embed.failed']) {
await this.publishContext(payload.contextId, 'failed');
}
private async publishContext(
contextId: string,
reason: 'finished' | 'failed'
) {
if (!this.publisher) return;
const context = await this.context.get(contextId);
this.publisher.publish(
'workspace.embedding.progress.changed',
{ workspaceId: context.workspaceId },
{ reason },
{ room: workspaceEmbeddingRoom(context.workspaceId) }
);
}
private async assertCopilot(userId: string, workspaceId: string) {
await this.ac
.user(userId)
.workspace(workspaceId)
.allowLocal()
.assert('Workspace.Copilot');
}
}
@@ -412,12 +412,15 @@ export class CopilotContextRootResolver {
@Throttle('strict')
@Query(() => ContextWorkspaceEmbeddingStatus, {
description: 'query workspace embedding status',
deprecationReason:
'Use realtime subscription "workspace.embedding.progress.changed" instead.',
})
@CallMetric('ai', 'context_query_workspace_embedding_status')
async queryWorkspaceEmbeddingStatus(
@CurrentUser() user: CurrentUser,
@Args('workspaceId') workspaceId: string
): Promise<ContextWorkspaceEmbeddingStatus> {
// DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients.
await this.ac
.user(user.id)
.workspace(workspaceId)
@@ -13,6 +13,7 @@ import {
CopilotContextResolver,
CopilotContextRootResolver,
CopilotContextService,
CopilotEmbeddingRealtimeProvider,
} from './context';
import { ConversationInboxService } from './conversation/inbox';
import { ConversationPolicy } from './conversation/policy';
@@ -55,6 +56,7 @@ import { CopilotStorage } from './storage';
import {
CopilotTranscriptionResolver,
CopilotTranscriptionService,
CopilotTranscriptRealtimeProvider,
} from './transcript';
import {
CopilotWorkspaceEmbeddingConfigResolver,
@@ -106,11 +108,15 @@ export const COPILOT_RUNTIME_PROVIDERS = [
TurnPersistence,
];
export const COPILOT_CONTEXT_PROVIDERS = [CopilotContextResolver];
export const COPILOT_CONTEXT_PROVIDERS = [
CopilotContextResolver,
CopilotEmbeddingRealtimeProvider,
];
export const COPILOT_TRANSCRIPT_PROVIDERS = [
CopilotTranscriptionService,
CopilotTranscriptionResolver,
CopilotTranscriptRealtimeProvider,
];
export const COPILOT_WORKSPACE_PROVIDERS = [
@@ -1,2 +1,3 @@
export { CopilotTranscriptRealtimeProvider } from './realtime';
export { CopilotTranscriptionResolver } from './resolver';
export { CopilotTranscriptionService } from './service';
@@ -0,0 +1,69 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { z } from 'zod';
import { CopilotTranscriptionJobNotFound } from '../../../base';
import { AccessController } from '../../../core/permission';
import type { RealtimeRegistry } from '../../../core/realtime';
import { CopilotTranscriptionService, transcriptTaskRoom } from './service';
@Injectable()
export class CopilotTranscriptRealtimeProvider implements OnModuleInit {
constructor(
private readonly ac: AccessController,
private readonly transcript: CopilotTranscriptionService,
@Optional() private readonly registry?: RealtimeRegistry
) {}
onModuleInit() {
this.registry?.registerRequest({
name: 'copilot.transcript.task.get',
input: z
.object({
workspaceId: z.string(),
blobId: z.string().optional(),
taskId: z.string().optional(),
})
.refine(input => input.blobId || input.taskId),
handle: async (user, input) => {
await this.assertCopilot(user.id, input.workspaceId);
return {
task: await this.transcript.queryTask(
user.id,
input.workspaceId,
input.taskId,
input.blobId
),
};
},
});
this.registry?.registerTopic({
name: 'copilot.transcript.task.changed',
input: z.object({
workspaceId: z.string(),
taskId: z.string(),
}),
authorize: async (user, input) => {
await this.assertCopilot(user.id, input.workspaceId);
const task = await this.transcript.queryTask(
user.id,
input.workspaceId,
input.taskId
);
if (!task) {
throw new CopilotTranscriptionJobNotFound();
}
},
room: (_user, input) =>
transcriptTaskRoom(input.workspaceId, input.taskId),
});
}
private async assertCopilot(userId: string, workspaceId: string) {
await this.ac
.user(userId)
.workspace(workspaceId)
.allowLocal()
.assert('Workspace.Copilot');
}
}
@@ -416,6 +416,8 @@ export class CopilotTranscriptionResolver {
@ResolveField(() => TranscriptionResultType, {
nullable: true,
deprecationReason:
'Use realtime subscription "copilot.transcript.task.changed" instead.',
})
async transcriptTask(
@Parent() copilot: CopilotType,
@@ -425,6 +427,7 @@ export class CopilotTranscriptionResolver {
@Args('blobId', { nullable: true })
blobId?: string
): Promise<TranscriptionResultType | null> {
// DEPRECATED-0.26-COMPAT(realtime): remove after server no longer supports 0.26.x clients.
if (!copilot.workspaceId) return null;
if (!taskId && !blobId) return null;
@@ -9,6 +9,7 @@ import {
OnJob,
sniffMime,
} from '../../../base';
import type { RealtimePublisher } from '../../../core/realtime';
import { Models } from '../../../models';
import { CopilotAccessPolicy } from '../access';
import { PromptService } from '../prompt';
@@ -32,6 +33,10 @@ const TRANSCRIPT_ACTION_ID = 'transcript.audio.gemini';
const TRANSCRIPT_ACTION_VERSION = 'v1';
const TRANSCRIPT_STRATEGY = 'gemini';
export function transcriptTaskRoom(workspaceId: string, taskId: string) {
return `copilot:transcript:${workspaceId}:${taskId}`;
}
export type TranscriptionJob = {
id: string;
status: AiJobStatus;
@@ -62,7 +67,8 @@ export class CopilotTranscriptionService {
private readonly tasks: TaskPolicy,
private readonly prompts: PromptService,
private readonly actionBridge: ActionRuntimeBridge,
@Optional() private readonly access?: CopilotAccessPolicy
@Optional() private readonly access?: CopilotAccessPolicy,
@Optional() private readonly realtime?: RealtimePublisher
) {}
private parseTaskPayload(payload: unknown): TranscriptionPayloadV2 {
@@ -252,6 +258,7 @@ export class CopilotTranscriptionService {
modelId: model,
});
await this.models.copilotTranscriptTask.markRunning(task.id);
this.publishTaskChanged(workspaceId, task.id, AiJobStatus.running);
return { id: task.id, status: AiJobStatus.running, infos };
}
@@ -294,6 +301,7 @@ export class CopilotTranscriptionService {
retryOf: task.actionRunId ?? undefined,
});
await this.models.copilotTranscriptTask.markRunning(taskId);
this.publishTaskChanged(workspaceId, taskId, AiJobStatus.running);
return {
id: taskId,
status: AiJobStatus.running,
@@ -389,6 +397,11 @@ export class CopilotTranscriptionService {
},
onRunCreated: async ({ runId }) => {
await this.models.copilotTranscriptTask.markRunning(taskId, runId);
this.publishTaskChanged(
task.workspaceId,
taskId,
AiJobStatus.running
);
},
prepareStructuredRoutes: {
stepId: 'transcribe',
@@ -425,6 +438,7 @@ export class CopilotTranscriptionService {
protectedResult: parsedResult,
errorCode: null,
});
this.publishTaskChanged(task.workspaceId, taskId, AiJobStatus.finished);
} catch (error) {
await this.models.copilotTranscriptTask.complete(taskId, {
status: 'failed',
@@ -434,7 +448,27 @@ export class CopilotTranscriptionService {
errorCode:
error instanceof Error ? error.message : 'transcript_task_failed',
});
this.publishTaskChanged(
task.workspaceId,
taskId,
AiJobStatus.failed,
error instanceof Error ? error.message : 'transcript_task_failed'
);
throw error;
}
}
private publishTaskChanged(
workspaceId: string,
taskId: string,
status: AiJobStatus,
error?: string
) {
this.realtime?.publish(
'copilot.transcript.task.changed',
{ workspaceId, taskId },
{ taskId, status, error },
{ room: transcriptTaskRoom(workspaceId, taskId) }
);
}
}
+3 -3
View File
@@ -519,7 +519,7 @@ type Copilot {
"""Get the session list in the workspace"""
sessions(docId: String, options: QueryChatSessionsInput): [CopilotSessionType!]! @deprecated(reason: "use `chats` instead")
transcriptTask(blobId: String, taskId: String): TranscriptionResultType
transcriptTask(blobId: String, taskId: String): TranscriptionResultType @deprecated(reason: "Use realtime subscription \"copilot.transcript.task.changed\" instead.")
workspaceId: ID
}
@@ -1975,7 +1975,7 @@ type Query {
publicUserById(id: String!): PublicUserType
"""query workspace embedding status"""
queryWorkspaceEmbeddingStatus(workspaceId: String!): ContextWorkspaceEmbeddingStatus!
queryWorkspaceEmbeddingStatus(workspaceId: String!): ContextWorkspaceEmbeddingStatus! @deprecated(reason: "Use realtime subscription \"workspace.embedding.progress.changed\" instead.")
revealedAccessTokens: [RevealedAccessToken!]! @deprecated(reason: "use currentUser.revealedAccessTokens")
"""server config"""
@@ -2668,7 +2668,7 @@ type UserType {
name: String!
"""Get user notification count"""
notificationCount: Int!
notificationCount: Int! @deprecated(reason: "Use realtime subscription \"notification.count.changed\" instead.")
"""Get current user notifications"""
notifications(pagination: PaginationInput!): PaginatedNotificationObjectType!
+2 -1
View File
@@ -16,6 +16,7 @@
{ "path": "../native" },
{ "path": "../../../tools/cli" },
{ "path": "../../../tools/utils" },
{ "path": "../../common/graphql" }
{ "path": "../../common/graphql" },
{ "path": "../../common/realtime" }
]
}
+4 -3
View File
@@ -1296,6 +1296,7 @@ export const getWorkspaceEmbeddingStatusQuery = {
embedded
}
}`,
deprecations: ["'queryWorkspaceEmbeddingStatus' is deprecated: Use realtime subscription \"workspace.embedding.progress.changed\" instead."],
};
export const queueWorkspaceEmbeddingMutation = {
@@ -1621,6 +1622,7 @@ export const getTranscriptTaskQuery = {
}
}
}`,
deprecations: ["'transcriptTask' is deprecated: Use realtime subscription \"copilot.transcript.task.changed\" instead."],
};
export const retryTranscriptTaskMutation = {
@@ -2646,11 +2648,10 @@ export const notificationCountQuery = {
op: 'notificationCount',
query: `query notificationCount {
currentUser {
notifications(pagination: {first: 1}) {
totalCount
}
notificationCount
}
}`,
deprecations: ["'notificationCount' is deprecated: Use realtime subscription \"notification.count.changed\" instead."],
};
export const pricesQuery = {
@@ -1,7 +1,5 @@
query notificationCount {
currentUser {
notifications(pagination: { first: 1 }) {
totalCount
}
notificationCount
}
}
+10 -9
View File
@@ -580,6 +580,7 @@ export interface Copilot {
* @deprecated use `chats` instead
*/
sessions: Array<CopilotSessionType>;
/** @deprecated Use realtime subscription "copilot.transcript.task.changed" instead. */
transcriptTask: Maybe<TranscriptionResultType>;
workspaceId: Maybe<Scalars['ID']['output']>;
}
@@ -2635,7 +2636,10 @@ export interface Query {
prices: Array<SubscriptionPrice>;
/** Get public user by id */
publicUserById: Maybe<PublicUserType>;
/** query workspace embedding status */
/**
* query workspace embedding status
* @deprecated Use realtime subscription "workspace.embedding.progress.changed" instead.
*/
queryWorkspaceEmbeddingStatus: ContextWorkspaceEmbeddingStatus;
/** @deprecated use currentUser.revealedAccessTokens */
revealedAccessTokens: Array<RevealedAccessToken>;
@@ -3390,7 +3394,10 @@ export interface UserType {
invoices: Array<InvoiceType>;
/** User name */
name: Scalars['String']['output'];
/** Get user notification count */
/**
* Get user notification count
* @deprecated Use realtime subscription "notification.count.changed" instead.
*/
notificationCount: Scalars['Int']['output'];
/** Get current user notifications */
notifications: PaginatedNotificationObjectType;
@@ -7263,13 +7270,7 @@ export type NotificationCountQueryVariables = Exact<{ [key: string]: never }>;
export type NotificationCountQuery = {
__typename?: 'Query';
currentUser: {
__typename?: 'UserType';
notifications: {
__typename?: 'PaginatedNotificationObjectType';
totalCount: number;
};
} | null;
currentUser: { __typename?: 'UserType'; notificationCount: number } | null;
};
export type PricesQueryVariables = Exact<{ [key: string]: never }>;
+3
View File
@@ -19,6 +19,7 @@
},
"dependencies": {
"@affine/reader": "workspace:*",
"@affine/realtime": "workspace:*",
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"graphemer": "^1.4.0",
@@ -32,6 +33,7 @@
"devDependencies": {
"@affine/error": "workspace:*",
"@affine/graphql": "workspace:*",
"@affine/realtime": "workspace:*",
"@blocksuite/affine": "workspace:*",
"fake-indexeddb": "^6.0.0",
"idb": "^8.0.0",
@@ -41,6 +43,7 @@
"peerDependencies": {
"@affine/error": "workspace:*",
"@affine/graphql": "workspace:*",
"@affine/realtime": "workspace:*",
"@blocksuite/affine": "workspace:*",
"idb": "^8.0.0",
"socket.io-client": "^4.8.3"
@@ -1,3 +1,9 @@
import {
type RealtimeEvent,
type RealtimeRequestEnvelope,
type RealtimeSubscribeEnvelope,
type RealtimeUnsubscribeEnvelope,
} from '@affine/realtime';
import {
Manager as SocketIOManager,
type Socket as SocketIO,
@@ -52,6 +58,8 @@ interface ServerEvents {
docId: string;
awarenessUpdate: string;
};
'realtime:event': RealtimeEvent;
}
interface ClientEvents {
@@ -116,6 +124,10 @@ interface ClientEvents {
'space:delete-doc': { spaceType: string; spaceId: string; docId: string };
'telemetry:batch': [TelemetryBatch, TelemetryAck];
'realtime:request': [RealtimeRequestEnvelope, unknown];
'realtime:subscribe': [RealtimeSubscribeEnvelope, { subscriptionId: string }];
'realtime:unsubscribe': [RealtimeUnsubscribeEnvelope, { ok: true }];
}
export type ServerEventsMap = {
@@ -227,10 +239,11 @@ class SocketManager {
const SOCKET_MANAGER_CACHE = new Map<string, SocketManager>();
function getSocketManager(endpoint: string, isSelfHosted: boolean) {
let manager = SOCKET_MANAGER_CACHE.get(endpoint);
const key = `${endpoint}:${isSelfHosted ? 'selfhosted' : 'cloud'}`;
let manager = SOCKET_MANAGER_CACHE.get(key);
if (!manager) {
manager = new SocketManager(endpoint, isSelfHosted);
SOCKET_MANAGER_CACHE.set(endpoint, manager);
SOCKET_MANAGER_CACHE.set(key, manager);
}
return manager;
}
@@ -0,0 +1,296 @@
import type { RealtimeEvent } from '@affine/realtime';
import { beforeEach, expect, test, vi } from 'vitest';
import { RealtimeManager, stableStringify } from '../manager';
type Handler = (payload?: unknown) => void;
class FakeSocket {
readonly handlers = new Map<string, Handler>();
readonly emitted: Array<{ event: string; payload: unknown }> = [];
connected = true;
disconnected = false;
nextRequestAck: unknown = { data: { count: 1 } };
subscribeAcks: unknown[] = [];
nextSubscriptionId = 0;
on(event: string, handler: Handler) {
this.handlers.set(event, handler);
}
off(event: string) {
this.handlers.delete(event);
}
async emitWithAck(event: string, payload: unknown) {
this.emitted.push({ event, payload });
if (event === 'realtime:subscribe') {
const ack = this.subscribeAcks.shift();
if (ack) return ack;
this.nextSubscriptionId += 1;
return { data: { subscriptionId: `sub-${this.nextSubscriptionId}` } };
}
if (event === 'realtime:request') {
return this.nextRequestAck;
}
return { data: {} };
}
emit(event: string, payload?: unknown) {
this.handlers.get(event)?.(payload);
}
}
const socket = new FakeSocket();
vi.mock('../../impls/cloud/socket', () => ({
SocketConnection: class {
readonly inner = { socket };
status = 'connected';
readonly maybeConnection = { socket };
connect() {}
async waitForConnected() {}
disconnect() {
socket.disconnected = true;
}
},
}));
beforeEach(() => {
vi.stubGlobal('BUILD_CONFIG', { appVersion: 'test' });
socket.handlers.clear();
socket.emitted.length = 0;
socket.nextRequestAck = { data: { count: 1 } };
socket.subscribeAcks = [];
socket.nextSubscriptionId = 0;
socket.connected = true;
socket.disconnected = false;
});
test('stableStringify is deterministic for realtime subscription inputs', () => {
expect(stableStringify({ workspaceId: 'space', docId: 'doc' })).toBe(
stableStringify({ docId: 'doc', workspaceId: 'space' })
);
});
test('stableStringify follows JSON semantics for edge values', () => {
expect(stableStringify({ a: undefined })).toBe(stableStringify({}));
expect(stableStringify([undefined])).toBe('[null]');
expect(stableStringify(new Date('2026-01-02T03:04:05.000Z'))).toBe(
'"2026-01-02T03:04:05.000Z"'
);
});
test('request sends generic realtime request with client version', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
await expect(manager.request('notification.count.get', {})).resolves.toEqual({
count: 1,
});
expect(socket.emitted).toEqual([
{
event: 'realtime:request',
payload: {
op: 'notification.count.get',
input: {},
clientVersion: 'test',
},
},
]);
});
test('request rejects server ack error', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
socket.nextRequestAck = {
error: { name: 'Forbidden', message: 'No access' },
};
await expect(manager.request('notification.count.get', {})).rejects.toThrow(
'No access'
);
});
test('request rejects when aborted', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
const controller = new AbortController();
socket.nextRequestAck = new Promise(() => {});
const request = manager.request(
'notification.count.get',
{},
{ signal: controller.signal }
);
controller.abort();
await expect(request).rejects.toThrow('Realtime request aborted');
});
test('subscribe routes events by topic and stable input key', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
const received: unknown[] = [];
const subscription = manager
.subscribe('comment.changed', { workspaceId: 'space', docId: 'doc' })
.subscribe(event => received.push(event));
await vi.waitFor(() => expect(received).toEqual([{ type: 'ready' }]));
socket.emit('realtime:event', {
topic: 'comment.changed',
inputKey: stableStringify({ workspaceId: 'space', docId: 'other' }),
sentAt: 1,
event: { changed: true },
} satisfies RealtimeEvent);
socket.emit('realtime:event', {
topic: 'comment.changed',
inputKey: stableStringify({ workspaceId: 'space', docId: 'doc' }),
sentAt: 2,
event: { changed: true },
} satisfies RealtimeEvent);
expect(received).toEqual([{ type: 'ready' }, { changed: true }]);
subscription.unsubscribe();
});
test('unsubscribe leaves server room and clears status', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
const subscription = manager
.subscribe('notification.count.changed', {})
.subscribe();
await vi.waitFor(() => expect(manager.getStatus().subscriptions).toBe(1));
subscription.unsubscribe();
expect(manager.getStatus()).toMatchObject({
connected: true,
subscriptions: 0,
});
expect(socket.emitted.at(-1)).toEqual({
event: 'realtime:unsubscribe',
payload: {
subscriptionId: 'sub-1',
topic: 'notification.count.changed',
input: {},
clientVersion: 'test',
},
});
});
test('context switch disconnects socket and completes subscriptions', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
const completed = vi.fn();
manager
.subscribe('notification.count.changed', {})
.subscribe({ complete: completed });
await vi.waitFor(() => expect(manager.getStatus().subscriptions).toBe(1));
manager.setContext({
endpoint: 'http://other-server',
isSelfHosted: false,
authenticated: true,
});
expect(socket.disconnected).toBe(true);
expect(completed).toHaveBeenCalled();
expect(manager.getStatus()).toMatchObject({
endpoint: 'http://other-server',
connected: false,
subscriptions: 0,
});
});
test('subscribe registers server room again after reconnect', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
const received: unknown[] = [];
const subscription = manager
.subscribe('notification.count.changed', {})
.subscribe(event => received.push(event));
await vi.waitFor(() => expect(received).toEqual([{ type: 'ready' }]));
socket.emit('connect');
await vi.waitFor(() =>
expect(
socket.emitted.filter(item => item.event === 'realtime:subscribe')
).toHaveLength(2)
);
expect(received).toEqual([{ type: 'ready' }, { type: 'ready' }]);
subscription.unsubscribe();
});
test('failed reconnect only errors the affected subscription', async () => {
const manager = new RealtimeManager();
manager.setContext({
endpoint: 'http://server',
isSelfHosted: false,
authenticated: true,
});
const first: unknown[] = [];
const firstErrors: unknown[] = [];
const second: unknown[] = [];
const secondErrors: unknown[] = [];
const firstSubscription = manager
.subscribe('notification.count.changed', {})
.subscribe({
next: event => first.push(event),
error: error => firstErrors.push(error),
});
const secondSubscription = manager
.subscribe('comment.changed', { workspaceId: 'space', docId: 'doc' })
.subscribe({
next: event => second.push(event),
error: error => secondErrors.push(error),
});
await vi.waitFor(() => expect(manager.getStatus().subscriptions).toBe(2));
socket.subscribeAcks = [
{ data: { subscriptionId: 'resub-1' } },
{ error: { name: 'Forbidden', message: 'No access' } },
];
socket.emit('connect');
await vi.waitFor(() => expect(first).toHaveLength(2));
await vi.waitFor(() => expect(secondErrors).toHaveLength(1));
expect(firstErrors).toEqual([]);
expect(manager.getStatus().subscriptions).toBe(1);
firstSubscription.unsubscribe();
secondSubscription.unsubscribe();
});
@@ -0,0 +1 @@
export { RealtimeManager, stableStringify } from './manager';
@@ -0,0 +1,336 @@
import type {
RealtimeConfigureInput,
RealtimeEvent,
RealtimeRequestInputOf,
RealtimeRequestName,
RealtimeRequestOutputOf,
RealtimeStatus,
RealtimeSubscriptionReady,
RealtimeTopicEventOf,
RealtimeTopicInputOf,
RealtimeTopicName,
} from '@affine/realtime';
import { Observable, Subject } from 'rxjs';
import { SocketConnection } from '../impls/cloud/socket';
const DEFAULT_REQUEST_TIMEOUT = 10_000;
type RealtimeContext = RealtimeConfigureInput;
function normalizeError(error: unknown) {
if (error instanceof Error) {
return { name: error.name, message: error.message };
}
return { name: 'RealtimeError', message: String(error) };
}
function rejectAck(error: { name?: string; message?: string; code?: string }) {
const err = new Error(error.message ?? 'Realtime request failed');
err.name = error.name ?? 'RealtimeError';
return err;
}
export class RealtimeManager {
private context?: RealtimeContext;
private socketConnection?: SocketConnection;
private socketKey?: string;
private lastError?: { name: string; message: string };
private readonly subscriptions = new Map<
string,
{
topic: RealtimeTopicName;
input: RealtimeTopicInputOf<RealtimeTopicName>;
inputKey: string;
subject$: Subject<RealtimeEvent | RealtimeSubscriptionReady>;
}
>();
setContext(context: RealtimeContext) {
const nextContext = { ...context };
const changed =
!this.context ||
this.context.endpoint !== nextContext.endpoint ||
this.context.isSelfHosted !== nextContext.isSelfHosted ||
this.context.authenticated !== nextContext.authenticated;
this.context = nextContext;
if (changed) {
this.resetConnection();
}
}
async request<Op extends RealtimeRequestName>(
op: Op,
input: RealtimeRequestInputOf<Op>,
options?: { timeoutMs?: number; signal?: AbortSignal }
): Promise<RealtimeRequestOutputOf<Op>> {
const socket = await this.connect();
const timeoutMs = options?.timeoutMs ?? DEFAULT_REQUEST_TIMEOUT;
let timeoutId: ReturnType<typeof setTimeout> | undefined;
let abortHandler: (() => void) | undefined;
const abort = () => {
const error = new Error(`Realtime request aborted: ${op}`);
error.name = 'AbortError';
return error;
};
if (options?.signal?.aborted) {
throw abort();
}
const timeout = new Promise<never>((_resolve, reject) => {
timeoutId = setTimeout(() => {
const error = new Error(`Realtime request timed out: ${op}`);
error.name = 'RealtimeRequestTimeout';
reject(error);
}, timeoutMs);
timeoutId.unref?.();
});
const aborted = new Promise<never>((_resolve, reject) => {
abortHandler = () => reject(abort());
options?.signal?.addEventListener('abort', abortHandler, { once: true });
});
const ack = await Promise.race([
socket.emitWithAck('realtime:request', {
op,
input,
clientVersion: BUILD_CONFIG.appVersion,
}),
timeout,
aborted,
]).finally(() => {
if (timeoutId) {
clearTimeout(timeoutId);
}
if (abortHandler) {
options?.signal?.removeEventListener('abort', abortHandler);
}
});
if ('error' in ack) {
throw rejectAck(ack.error);
}
return ack.data as unknown as RealtimeRequestOutputOf<Op>;
}
subscribe<Topic extends RealtimeTopicName>(
topic: Topic,
input: RealtimeTopicInputOf<Topic>
): Observable<RealtimeTopicEventOf<Topic> | RealtimeSubscriptionReady> {
return new Observable(subscriber => {
let subscriptionId: string | undefined;
let subject$: Subject<RealtimeEvent | RealtimeSubscriptionReady>;
let closed = false;
const setup = async () => {
try {
const socket = await this.connect();
const ack = await socket.emitWithAck('realtime:subscribe', {
topic,
input,
clientVersion: BUILD_CONFIG.appVersion,
});
if ('error' in ack) {
throw rejectAck(ack.error);
}
const data = ack.data;
subscriptionId = data.subscriptionId;
if (closed) {
await socket.emitWithAck('realtime:unsubscribe', {
subscriptionId: data.subscriptionId,
topic,
input,
clientVersion: BUILD_CONFIG.appVersion,
});
return;
}
subject$ = new Subject();
this.subscriptions.set(subscriptionId, {
topic,
input,
inputKey: stableStringify(input),
subject$,
});
subscriber.next({
type: 'ready',
});
subject$.subscribe({
next: event => {
if ('type' in event) {
subscriber.next(event);
} else {
subscriber.next(event.event as RealtimeTopicEventOf<Topic>);
}
},
error: error => subscriber.error(error),
complete: () => subscriber.complete(),
});
} catch (error) {
this.lastError = normalizeError(error);
subscriber.error(error);
}
};
setup().catch(error => subscriber.error(error));
return () => {
closed = true;
if (!subscriptionId) {
return;
}
const currentSubscriptionId = subscriptionId;
this.subscriptions.delete(currentSubscriptionId);
subject$?.complete();
if (this.socketConnection?.inner?.socket.connected) {
this.socketConnection.inner.socket
.emitWithAck('realtime:unsubscribe', {
subscriptionId: currentSubscriptionId,
topic,
input,
clientVersion: BUILD_CONFIG.appVersion,
})
.catch(() => {});
}
};
});
}
getStatus(): RealtimeStatus {
return {
endpoint: this.context?.endpoint,
connected: this.socketConnection?.status === 'connected',
connecting: this.socketConnection?.status === 'connecting',
subscriptions: this.subscriptions.size,
lastError: this.lastError,
};
}
private async connect() {
if (!this.context?.endpoint || !this.context.authenticated) {
const error = new Error('Realtime is not authenticated');
error.name = 'RealtimeUnauthenticated';
throw error;
}
const key = `${this.context.endpoint}:${this.context.isSelfHosted}`;
if (!this.socketConnection || this.socketKey !== key) {
this.resetConnection();
this.socketKey = key;
this.socketConnection = new SocketConnection(
this.context.endpoint,
this.context.isSelfHosted
);
this.socketConnection.connect();
}
await this.socketConnection.waitForConnected();
this.socketConnection.inner.socket.off('realtime:event', this.handleEvent);
this.socketConnection.inner.socket.on('realtime:event', this.handleEvent);
this.socketConnection.inner.socket.off('connect', this.handleReconnect);
this.socketConnection.inner.socket.on('connect', this.handleReconnect);
return this.socketConnection.inner.socket;
}
private readonly handleEvent = (event: RealtimeEvent) => {
for (const subscription of this.subscriptions.values()) {
if (
subscription.topic === event.topic &&
subscription.inputKey === event.inputKey
) {
subscription.subject$.next(event);
}
}
};
private readonly handleReconnect = () => {
this.resubscribeAll().catch(error => {
this.lastError = normalizeError(error);
});
};
private async resubscribeAll() {
const socket = this.socketConnection?.inner.socket;
if (!socket?.connected || this.subscriptions.size === 0) {
return;
}
const subscriptions = Array.from(this.subscriptions.entries());
for (const [subscriptionId, subscription] of subscriptions) {
try {
const ack = await socket.emitWithAck('realtime:subscribe', {
topic: subscription.topic,
input: subscription.input,
clientVersion: BUILD_CONFIG.appVersion,
});
if ('error' in ack) {
throw rejectAck(ack.error);
}
this.subscriptions.delete(subscriptionId);
this.subscriptions.set(ack.data.subscriptionId, subscription);
subscription.subject$.next({
type: 'ready',
});
} catch (error) {
this.lastError = normalizeError(error);
this.subscriptions.delete(subscriptionId);
subscription.subject$.error(error);
}
}
}
private resetConnection() {
if (this.socketConnection) {
this.socketConnection.maybeConnection?.socket.off(
'realtime:event',
this.handleEvent
);
this.socketConnection.maybeConnection?.socket.off(
'connect',
this.handleReconnect
);
this.socketConnection.disconnect(true);
}
this.socketConnection = undefined;
this.socketKey = undefined;
for (const subscription of this.subscriptions.values()) {
subscription.subject$.complete();
}
this.subscriptions.clear();
}
}
export function stableStringify(value: unknown): string {
if (
value === undefined ||
typeof value === 'function' ||
typeof value === 'symbol'
) {
return 'null';
}
if (value === null || typeof value !== 'object') {
return JSON.stringify(value);
}
if (Array.isArray(value)) {
return `[${value.map(stableStringify).join(',')}]`;
}
if (value instanceof Date) {
return JSON.stringify(value.toJSON());
}
const record = value as Record<string, unknown>;
return `{${Object.keys(record)
.filter(key => {
const property = record[key];
return (
property !== undefined &&
typeof property !== 'function' &&
typeof property !== 'symbol'
);
})
.sort()
.map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`)
.join(',')}}`;
}
@@ -1,3 +1,14 @@
import type {
RealtimeConfigureInput,
RealtimeRequestInputOf,
RealtimeRequestName,
RealtimeRequestOutputOf,
RealtimeStatus,
RealtimeSubscriptionReady,
RealtimeTopicEventOf,
RealtimeTopicInputOf,
RealtimeTopicName,
} from '@affine/realtime';
import { OpClient, transfer } from '@toeverything/infra/op';
import type { Observable } from 'rxjs';
import { v4 as uuid } from 'uuid';
@@ -38,6 +49,30 @@ import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
export type { StoreInitOptions as WorkerInitOptions } from './ops';
type RealtimeWorkerClient = {
call<Op extends RealtimeRequestName>(
name: 'realtime.request',
payload: {
op: Op;
input: RealtimeRequestInputOf<Op>;
timeoutMs?: number;
}
): Promise<RealtimeRequestOutputOf<Op>>;
ob$<Topic extends RealtimeTopicName>(
name: 'realtime.subscribe',
payload: {
topic: Topic;
input: RealtimeTopicInputOf<Topic>;
}
): Observable<RealtimeTopicEventOf<Topic> | RealtimeSubscriptionReady>;
};
function realtimeAbortError(op: RealtimeRequestName) {
const error = new Error(`Realtime request aborted: ${op}`);
error.name = 'AbortError';
return error;
}
export class StoreManagerClient {
private readonly connections = new Map<
string,
@@ -49,9 +84,11 @@ export class StoreManagerClient {
constructor(private readonly client: OpClient<WorkerManagerOps>) {
this.telemetry = new TelemetryClient(this.client);
this.realtime = new RealtimeClient(this.client);
}
readonly telemetry: TelemetryClient;
readonly realtime: RealtimeClient;
open(key: string, options: StoreInitOptions) {
const { port1, port2 } = new MessageChannel();
@@ -138,6 +175,62 @@ class TelemetryClient {
}
}
export class RealtimeClient {
constructor(private readonly client: OpClient<WorkerManagerOps>) {}
configure(context: RealtimeConfigureInput): Promise<void> {
return this.client.call('realtime.configure', context);
}
request<Op extends RealtimeRequestName>(
op: Op,
input: RealtimeRequestInputOf<Op>,
options?: { timeoutMs?: number; signal?: AbortSignal }
): Promise<RealtimeRequestOutputOf<Op>> {
const request = (this.client as unknown as RealtimeWorkerClient).call(
'realtime.request',
{
op,
input,
timeoutMs: options?.timeoutMs,
}
);
if (!options?.signal) {
return request;
}
if (options.signal.aborted) {
return Promise.reject(realtimeAbortError(op));
}
let abortHandler: (() => void) | undefined;
const aborted = new Promise<never>((_resolve, reject) => {
abortHandler = () => reject(realtimeAbortError(op));
options.signal?.addEventListener('abort', abortHandler, { once: true });
});
return Promise.race([request, aborted]).finally(() => {
if (abortHandler) {
options.signal?.removeEventListener('abort', abortHandler);
}
});
}
subscribe<Topic extends RealtimeTopicName>(
topic: Topic,
input: RealtimeTopicInputOf<Topic>
): Observable<RealtimeTopicEventOf<Topic> | RealtimeSubscriptionReady> {
return (this.client as unknown as RealtimeWorkerClient).ob$(
'realtime.subscribe',
{
topic,
input,
}
);
}
status(): Promise<RealtimeStatus> {
return this.client.call('realtime.status');
}
}
export class StoreClient {
constructor(private readonly client: OpClient<WorkerOps>) {
this.docStorage = new WorkerDocStorage(this.client);
@@ -2,6 +2,7 @@ import { OpConsumer } from '@toeverything/infra/op';
import { Observable } from 'rxjs';
import { type StorageConstructor } from '../impls';
import { RealtimeManager } from '../realtime';
import { SpaceStorage } from '../storage';
import type { AwarenessRecord } from '../storage/awareness';
import { Sync } from '../sync';
@@ -340,6 +341,7 @@ export class StoreManagerConsumer {
{ store: StoreConsumer; refCount: number }
>();
private readonly telemetry = new TelemetryManager();
private readonly realtime = new RealtimeManager();
constructor(
private readonly availableStorageImplementations: StorageConstructor[]
@@ -393,6 +395,12 @@ export class StoreManagerConsumer {
'telemetry.pageview': event => this.telemetry.pageview(event),
'telemetry.flush': () => this.telemetry.flush(),
'telemetry.getQueueState': () => this.telemetry.getQueueState(),
'realtime.configure': context => this.realtime.setContext(context),
'realtime.request': ({ op, input, timeoutMs }) =>
this.realtime.request(op, input, { timeoutMs }),
'realtime.subscribe': ({ topic, input }) =>
this.realtime.subscribe(topic, input),
'realtime.status': () => this.realtime.getStatus(),
});
}
}
+33
View File
@@ -1,3 +1,15 @@
import type {
RealtimeConfigureInput,
RealtimeRequestInputOf,
RealtimeRequestName,
RealtimeRequestOutputOf,
RealtimeStatus,
RealtimeSubscriptionReady,
RealtimeTopicEventOf,
RealtimeTopicInputOf,
RealtimeTopicName,
} from '@affine/realtime';
import type { AvailableStorageImplementations } from '../impls';
import type {
AggregateResult,
@@ -189,4 +201,25 @@ export type WorkerManagerOps = {
'telemetry.pageview': [TelemetryEvent, { queued: boolean }];
'telemetry.flush': [void, TelemetryAck];
'telemetry.getQueueState': [void, TelemetryQueueState];
'realtime.configure': [RealtimeConfigureInput, void];
'realtime.request': [
{
[Op in RealtimeRequestName]: {
op: Op;
input: RealtimeRequestInputOf<Op>;
timeoutMs?: number;
};
}[RealtimeRequestName],
RealtimeRequestOutputOf<RealtimeRequestName>,
];
'realtime.subscribe': [
{
[Topic in RealtimeTopicName]: {
topic: Topic;
input: RealtimeTopicInputOf<Topic>;
};
}[RealtimeTopicName],
RealtimeTopicEventOf<RealtimeTopicName> | RealtimeSubscriptionReady,
];
'realtime.status': [void, RealtimeStatus];
};
+1
View File
@@ -9,6 +9,7 @@
},
"references": [
{ "path": "../reader" },
{ "path": "../realtime" },
{ "path": "../infra" },
{ "path": "../error" },
{ "path": "../graphql" },
+13
View File
@@ -0,0 +1,13 @@
{
"name": "@affine/realtime",
"type": "module",
"version": "0.26.3",
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"@affine/graphql": "workspace:*"
}
}
+227
View File
@@ -0,0 +1,227 @@
import type { CommentChangeObjectType } from '@affine/graphql';
export type RealtimeRequestName = keyof RealtimeRequestMap;
export type RealtimeTopicName = keyof RealtimeTopicMap;
export interface RealtimeRequestMap {
'notification.count.get': {
input: Record<string, never>;
output: { count: number };
};
'comment.changes.get': {
input: {
workspaceId: string;
docId: string;
after?: string;
first?: number;
};
output: {
changes: CommentChangeObjectType[];
startCursor: string;
endCursor: string;
hasNextPage: boolean;
};
};
'workspace.embedding.progress.get': {
input: { workspaceId: string };
output: { total: number; embedded: number };
};
'copilot.transcript.task.get': {
input: {
workspaceId: string;
blobId?: string;
taskId?: string;
};
output: { task: unknown | null };
};
}
export type NotificationCountChangedReason =
| 'created'
| 'read'
| 'read-all'
| 'expired-cleanup'
| 'resync';
export type WorkspaceEmbeddingProgressReason =
| 'queued'
| 'progress'
| 'finished'
| 'failed'
| 'resync';
export interface RealtimeTopicMap {
'notification.count.changed': {
input: Record<string, never>;
event: {
count: number;
reason: NotificationCountChangedReason;
};
};
'comment.changed': {
input: {
workspaceId: string;
docId: string;
};
event: {
changed: true;
cursor?: string;
};
};
'workspace.embedding.progress.changed': {
input: { workspaceId: string };
event: {
total?: number;
embedded?: number;
reason: WorkspaceEmbeddingProgressReason;
};
};
'copilot.transcript.task.changed': {
input: {
workspaceId: string;
taskId: string;
};
event: {
taskId: string;
status: string;
error?: string;
};
};
}
export type RealtimeRequestInputOf<Op extends RealtimeRequestName> =
RealtimeRequestMap[Op]['input'];
export type RealtimeRequestOutputOf<Op extends RealtimeRequestName> =
RealtimeRequestMap[Op]['output'];
export type RealtimeTopicInputOf<Topic extends RealtimeTopicName> =
RealtimeTopicMap[Topic]['input'];
export type RealtimeTopicEventOf<Topic extends RealtimeTopicName> =
RealtimeTopicMap[Topic]['event'];
export type RealtimeError = {
name: string;
message: string;
code?: string;
};
export type RealtimeAck<T> = { data: T } | { error: RealtimeError };
export type RealtimeRequestId = string;
export type RealtimeSubscriptionId = string;
export type RealtimeRequestEnvelope<
Op extends RealtimeRequestName = RealtimeRequestName,
> = {
requestId?: RealtimeRequestId;
op: Op;
input: RealtimeRequestInputOf<Op>;
clientVersion?: string;
};
export type RealtimeRequestInput<
Op extends RealtimeRequestName = RealtimeRequestName,
> = Op extends RealtimeRequestName
? {
op: Op;
input: RealtimeRequestInputOf<Op>;
timeoutMs?: number;
}
: never;
export type RealtimeRequestOutput<
Op extends RealtimeRequestName = RealtimeRequestName,
> = RealtimeRequestOutputOf<Op>;
export type RealtimeSubscribeEnvelope<
Topic extends RealtimeTopicName = RealtimeTopicName,
> = {
subscriptionId?: RealtimeSubscriptionId;
topic: Topic;
input: RealtimeTopicInputOf<Topic>;
clientVersion?: string;
};
export type RealtimeSubscribeInput<
Topic extends RealtimeTopicName = RealtimeTopicName,
> = Topic extends RealtimeTopicName
? {
topic: Topic;
input: RealtimeTopicInputOf<Topic>;
}
: never;
export type RealtimeUnsubscribeEnvelope = {
subscriptionId?: RealtimeSubscriptionId;
topic: RealtimeTopicName;
input: RealtimeTopicInputOf<RealtimeTopicName>;
clientVersion?: string;
};
export type RealtimeReadyEvent = {
type: 'ready';
snapshot?: unknown;
};
export type RealtimeSubscriptionReady = RealtimeReadyEvent;
export type RealtimeEvent<Topic extends RealtimeTopicName = RealtimeTopicName> =
Topic extends RealtimeTopicName
? {
topic: Topic;
inputKey: string;
seq?: number;
sentAt: number;
event: RealtimeTopicEventOf<Topic>;
}
: never;
export type RealtimeTopicEvent<
Topic extends RealtimeTopicName = RealtimeTopicName,
> = RealtimeTopicEventOf<Topic> | RealtimeReadyEvent;
export type RealtimeStatus = {
endpoint?: string;
connected: boolean;
connecting: boolean;
subscriptions: number;
lastError?: RealtimeError;
};
export type RealtimeConfigureInput = {
endpoint: string;
isSelfHosted: boolean;
authenticated: boolean;
clientVersion?: string;
};
export function getRealtimeInputKey(input: unknown): string {
if (
input === undefined ||
typeof input === 'function' ||
typeof input === 'symbol'
) {
return 'null';
}
if (input === null || typeof input !== 'object') {
return JSON.stringify(input);
}
if (Array.isArray(input)) {
return `[${input.map(getRealtimeInputKey).join(',')}]`;
}
if (input instanceof Date) {
return JSON.stringify(input.toJSON());
}
const record = input as Record<string, unknown>;
return `{${Object.keys(record)
.filter(key => {
const property = record[key];
return (
property !== undefined &&
typeof property !== 'function' &&
typeof property !== 'symbol'
);
})
.sort()
.map(key => `${JSON.stringify(key)}:${getRealtimeInputKey(record[key])}`)
.join(',')}}`;
}
+10
View File
@@ -0,0 +1,10 @@
{
"extends": "../../../tsconfig.web.json",
"include": ["./src"],
"compilerOptions": {
"rootDir": "./src",
"outDir": "./dist",
"tsBuildInfoFile": "./dist/tsconfig.tsbuildinfo"
},
"references": [{ "path": "../graphql" }]
}
@@ -76,6 +76,7 @@ configureLocalStorageStateStorageImpls(framework);
configureBrowserWorkspaceFlavours(framework);
configureMobileModules(framework);
framework.impl(NbstoreProvider, {
realtime: storeManagerClient.realtime,
openStore(key, options) {
const { store, dispose } = storeManagerClient.open(key, options);
return {
@@ -59,6 +59,7 @@ export function setupStoreManager(framework: Framework) {
});
framework.impl(NbstoreProvider, {
realtime: storeManagerClient.realtime,
openStore(key, options) {
const { store, dispose } = storeManagerClient.open(key, options);
+1
View File
@@ -94,6 +94,7 @@ configureLocalStorageStateStorageImpls(framework);
configureBrowserWorkspaceFlavours(framework);
configureMobileModules(framework);
framework.impl(NbstoreProvider, {
realtime: storeManagerClient.realtime,
openStore(key, options) {
const { store, dispose } = storeManagerClient.open(key, options);
return {
@@ -48,6 +48,7 @@ configureLocalStorageStateStorageImpls(framework);
configureBrowserWorkspaceFlavours(framework);
configureMobileModules(framework);
framework.impl(NbstoreProvider, {
realtime: storeManagerClient.realtime,
openStore(key, options) {
const { store, dispose } = storeManagerClient.open(key, options);
return {
+1
View File
@@ -63,6 +63,7 @@ configureBrowserWorkbenchModule(framework);
configureLocalStorageStateStorageImpls(framework);
configureBrowserWorkspaceFlavours(framework);
framework.impl(NbstoreProvider, {
realtime: storeManagerClient.realtime,
openStore(key, options) {
return storeManagerClient.open(key, options);
},
@@ -19,6 +19,7 @@ export { InvitationService } from './services/invitation';
export { InvoicesService } from './services/invoices';
export type { PublicUserInfo } from './services/public-user';
export { PublicUserService } from './services/public-user';
export { RealtimeService } from './services/realtime';
export { SelfhostGenerateLicenseService } from './services/selfhost-generate-license';
export { SelfhostLicenseService } from './services/selfhost-license';
export { ServerService } from './services/server';
@@ -41,6 +42,7 @@ import { type Framework } from '@toeverything/infra';
import { GlobalCache, GlobalState } from '../storage/providers/global';
import { GlobalStateService } from '../storage/services/global';
import { GlobalContextService } from '../global-context';
import { UrlService } from '../url';
import { WorkspaceScope, WorkspaceService } from '../workspace';
import { CloudDocMeta } from './entities/cloud-doc-meta';
@@ -69,6 +71,7 @@ import { FetchService } from './services/fetch';
import { GraphQLService } from './services/graphql';
import { InvoicesService } from './services/invoices';
import { PublicUserService } from './services/public-user';
import { RealtimeService } from './services/realtime';
import { SelfhostGenerateLicenseService } from './services/selfhost-generate-license';
import { SelfhostLicenseService } from './services/selfhost-license';
import { ServerService } from './services/server';
@@ -100,6 +103,7 @@ import { DocCreatedByService } from './services/doc-created-by';
import { DocUpdatedByService } from './services/doc-updated-by';
import { DocCreatedByUpdatedBySyncService } from './services/doc-created-by-updated-by-sync';
import { WorkspacePermissionService } from '../permissions';
import { NbstoreService } from '../storage';
import { DocScope, DocService, DocsService } from '../doc';
import { DocCreatedByUpdatedBySyncStore } from './stores/doc-created-by-updated-by-sync';
import { GlobalDialogService } from '../dialogs';
@@ -111,6 +115,11 @@ export function configureCloudModule(framework: Framework) {
framework
.service(ServersService, [ServerListStore, ServerConfigStore])
.service(RealtimeService, [
GlobalContextService,
ServersService,
NbstoreService,
])
.service(DefaultServerService, [ServersService])
.store(ServerListStore, [GlobalStateService])
.store(ServerConfigStore)
@@ -0,0 +1,54 @@
import { shallowEqual } from '@affine/component';
import { ServerDeploymentType } from '@affine/graphql';
import { LiveData, OnEvent, Service } from '@toeverything/infra';
import type { GlobalContextService } from '../../global-context';
import { ApplicationStarted } from '../../lifecycle';
import type { NbstoreService } from '../../storage';
import type { Server } from '../entities/server';
import type { ServersService } from './servers';
@OnEvent(ApplicationStarted, service => service.onApplicationStarted)
export class RealtimeService extends Service {
private readonly currentServer$ =
this.globalContextService.globalContext.serverId.$.selector(id =>
id
? this.serversService.server$(id)
: new LiveData<Server | undefined>(undefined)
)
.flat()
.selector(
server =>
[
server,
server?.account$,
server?.config$.selector(
c => c.type === ServerDeploymentType.Selfhosted
),
] as const
)
.flat()
.map(([server, account, selfHosted]) => ({
endpoint: server?.baseUrl ?? '',
authenticated: !!account,
isSelfHosted: !!selfHosted,
}))
.distinctUntilChanged(shallowEqual);
constructor(
private readonly globalContextService: GlobalContextService,
private readonly serversService: ServersService,
private readonly nbstoreService: NbstoreService
) {
super();
const subscription = this.currentServer$.subscribe(context => {
this.nbstoreService.realtime.configure(context).catch(error => {
console.error('Failed to configure realtime context', error);
});
});
this.disposables.push(() => subscription.unsubscribe());
}
onApplicationStarted() {}
}
@@ -4,7 +4,6 @@ import {
deleteCommentMutation,
deleteReplyMutation,
type DocMode,
listCommentChangesQuery,
type ListCommentsQuery,
listCommentsQuery,
resolveCommentMutation,
@@ -13,9 +12,11 @@ import {
uploadCommentAttachmentMutation,
} from '@affine/graphql';
import { Entity } from '@toeverything/infra';
import type { Observable } from 'rxjs';
import type { DefaultServerService, WorkspaceServerService } from '../../cloud';
import { GraphQLService } from '../../cloud/services/graphql';
import type { NbstoreService } from '../../storage';
import type { WorkspaceService } from '../../workspace';
import type {
DocComment,
@@ -75,7 +76,8 @@ export class DocCommentStore extends Entity<{
constructor(
private readonly workspaceService: WorkspaceService,
private readonly workspaceServerService: WorkspaceServerService,
private readonly defaultServerService: DefaultServerService
private readonly defaultServerService: DefaultServerService,
private readonly nbstoreService: NbstoreService
) {
super();
}
@@ -132,51 +134,35 @@ export class DocCommentStore extends Entity<{
};
}
// pool every 30s
async listCommentChanges({
after,
}: {
after?: string;
}): Promise<DocCommentChangeListResult> {
const graphql = this.graphqlService;
if (!graphql) {
throw new Error('GraphQL service not found');
}
const response = await graphql.gql({
query: listCommentChangesQuery,
variables: {
pagination: {
after,
},
workspaceId: this.currentWorkspaceId,
docId: this.props.docId,
},
});
const commentChanges = response.workspace?.commentChanges;
if (!commentChanges) {
return {
changes: [],
startCursor: '',
endCursor: after ?? '',
hasNextPage: false,
};
}
const commentChanges = await this.nbstoreService.realtime.request(
'comment.changes.get',
{ after, workspaceId: this.currentWorkspaceId, docId: this.props.docId }
);
return {
changes: commentChanges.edges.map(edge => ({
id: edge.node.id,
action: edge.node.action,
comment: normalizeComment(edge.node.item),
commentId: edge.node.commentId || undefined,
changes: commentChanges.changes.map((change: any) => ({
id: change.id,
action: change.action,
comment: normalizeComment(change.item as GQLCommentType),
commentId: change.commentId,
})),
startCursor: commentChanges.pageInfo.startCursor || '',
endCursor: commentChanges.pageInfo.endCursor || '',
hasNextPage: commentChanges.pageInfo.hasNextPage,
startCursor: commentChanges.startCursor,
endCursor: commentChanges.endCursor,
hasNextPage: commentChanges.hasNextPage,
};
}
subscribeCommentChanged(): Observable<unknown> {
return this.nbstoreService.realtime.subscribe('comment.changed', {
workspaceId: this.currentWorkspaceId,
docId: this.props.docId,
});
}
async createComment(commentInput: {
content: DocCommentContent;
mentions?: string[];
@@ -22,9 +22,9 @@ import {
first,
of,
Subject,
type Subscription,
switchMap,
tap,
timer,
} from 'rxjs';
import { type DocDisplayMetaService } from '../../doc-display-meta';
@@ -93,8 +93,11 @@ export class DocCommentEntity extends Entity<{
private readonly commentDeleted$ = new Subject<CommentId>();
readonly commentHighlighted$ = new LiveData<CommentId | null>(null);
private pollingDisposable?: DisposeCallback;
private realtimeDisposable?: DisposeCallback;
private realtimeSubscription?: Subscription;
private startCursor?: string;
private fetchingCommentChanges = false;
private pendingCommentChangeFetch = false;
async addComment(
selections?: BaseSelection[],
@@ -486,84 +489,53 @@ export class DocCommentEntity extends Entity<{
return () => subscription.unsubscribe();
}
// Start polling comments every 30s
// 1. when comments$ is empty, fetch all comments
// 2. when comments$ is not empty, fetch changes (using end cursor)
// 3. loop. when doc is not loaded, skip
start(): void {
if (this.pollingDisposable) {
this.pollingDisposable();
if (this.realtimeDisposable) {
this.realtimeDisposable();
}
this.realtimeSubscription?.unsubscribe();
// Initial load
this.revalidate();
this.revalidateCommentsInEditor();
// Set up polling every 10 seconds
const polling$ = timer(10000, 10000).pipe(
switchMap(() => {
// If we have comments, fetch changes; otherwise fetch all
if (this.comments$.value.length > 0) {
return fromPromise(async () => {
const res = await this.store.listCommentChanges({
after: this.startCursor,
});
return res;
}).pipe(
tap(result => {
if (result) {
this.handleCommentChanges(result);
this.startCursor = result.endCursor;
}
}),
catchError(error => {
console.error('Failed to fetch comment changes:', error);
return of(null);
})
);
} else {
return fromPromise(async () => {
const allComments: DocComment[] = [];
let cursor = '';
let firstResult: DocCommentListResult | null = null;
// Fetch all pages of comments
while (true) {
const result = await this.store.listComments({ after: cursor });
if (!firstResult) {
firstResult = result;
// Store the startCursor from the first page for future polling
this.startCursor = result.startCursor;
}
allComments.push(...result.comments);
cursor = result.endCursor;
if (!result.hasNextPage) {
break;
}
}
// Update state with all comments
this.comments$.setValue(allComments);
return allComments;
}).pipe(
tap(() => this.revalidateCommentsInEditor()),
catchError(error => {
console.error('Failed to fetch comments:', error);
return of(null);
})
);
}
})
);
const subscription = polling$.subscribe();
this.pollingDisposable = () => subscription.unsubscribe();
this.realtimeSubscription = this.store.subscribeCommentChanged().subscribe({
next: () => {
this.fetchCommentChanges().catch(() => {});
},
error: error => {
console.error('Failed to subscribe comment changes:', error);
},
});
this.realtimeDisposable = () => this.realtimeSubscription?.unsubscribe();
}
stop(): void {
if (this.pollingDisposable) {
this.pollingDisposable();
if (this.realtimeDisposable) {
this.realtimeDisposable();
}
this.realtimeSubscription?.unsubscribe();
}
private async fetchCommentChanges() {
if (this.fetchingCommentChanges) {
this.pendingCommentChangeFetch = true;
return;
}
this.fetchingCommentChanges = true;
try {
do {
this.pendingCommentChangeFetch = false;
const result = await this.store.listCommentChanges({
after: this.startCursor,
});
this.handleCommentChanges(result);
this.startCursor = result.endCursor;
} while (this.pendingCommentChangeFetch);
} catch (error) {
console.error('Failed to fetch comment changes:', error);
} finally {
this.fetchingCommentChanges = false;
}
}
@@ -688,7 +660,7 @@ export class DocCommentEntity extends Entity<{
const result = await this.store.listComments({ after: cursor });
if (!firstResult) {
firstResult = result;
// Store the startCursor from the first page for polling
// Store the startCursor from the first page for incremental changes
this.startCursor = result.startCursor;
}
allComments.push(...result.comments);
@@ -2,6 +2,7 @@ import type { Framework } from '@toeverything/infra';
import { DefaultServerService, WorkspaceServerService } from '../cloud';
import { DocDisplayMetaService } from '../doc-display-meta';
import { NbstoreService } from '../storage';
import { WorkbenchService } from '../workbench';
import { WorkspaceScope, WorkspaceService } from '../workspace';
import { DocCommentEntity } from './entities/doc-comment';
@@ -25,5 +26,6 @@ export function configureCommentModule(framework: Framework) {
WorkspaceService,
WorkspaceServerService,
DefaultServerService,
NbstoreService,
]);
}
@@ -1,5 +1,4 @@
import {
getTranscriptTaskQuery,
retryTranscriptTaskMutation,
settleTranscriptTaskMutation,
submitTranscriptTaskMutation,
@@ -10,6 +9,7 @@ import { describe, expect, test, vi } from 'vitest';
import { DefaultServerService } from '../../cloud/services/default-server';
import { GraphQLService } from '../../cloud/services/graphql';
import { WorkspaceServerService } from '../../cloud/services/workspace-server';
import { NbstoreService } from '../../storage';
import { WorkspaceService } from '../../workspace';
import { AudioTranscriptionJobStore } from './audio-transcription-job-store';
@@ -30,6 +30,10 @@ function createStore(
get: (key: unknown) => (key === GraphQLService ? { gql } : null),
},
};
const realtime = {
request: vi.fn().mockResolvedValue({ task: { id: 'task-2' } }),
subscribe: vi.fn(),
};
framework
.service(WorkspaceService, {
workspace: { id: 'workspace-1' },
@@ -42,10 +46,14 @@ function createStore(
.service(DefaultServerService, {
server: null,
} as unknown as DefaultServerService)
.service(NbstoreService, {
realtime,
} as unknown as NbstoreService)
.entity(AudioTranscriptionJobStore, [
WorkspaceService,
WorkspaceServerService,
DefaultServerService,
NbstoreService,
]);
return framework.provider().createEntity(AudioTranscriptionJobStore, {
blobId: 'blob-1',
@@ -60,13 +68,6 @@ describe('AudioTranscriptionJobStore transcript task API', () => {
.fn()
.mockResolvedValueOnce({ submitTranscriptTask: { id: 'task-1' } })
.mockResolvedValueOnce({ retryTranscriptTask: { id: 'task-2' } })
.mockResolvedValueOnce({
currentUser: {
copilot: {
transcriptTask: { id: 'task-2' },
},
},
})
.mockResolvedValueOnce({ settleTranscriptTask: { id: 'task-2' } });
const store = createStore(gql, async () => ({
files: [file],
@@ -102,17 +103,6 @@ describe('AudioTranscriptionJobStore transcript task API', () => {
);
expect(gql).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
query: getTranscriptTaskQuery,
variables: {
workspaceId: 'workspace-1',
taskId: 'task-2',
blobId: 'blob-1',
},
})
);
expect(gql).toHaveBeenNthCalledWith(
4,
expect.objectContaining({
query: settleTranscriptTaskMutation,
variables: {
@@ -1,13 +1,14 @@
import {
getTranscriptTaskQuery,
retryTranscriptTaskMutation,
settleTranscriptTaskMutation,
submitTranscriptTaskMutation,
type TranscriptionResultType,
} from '@affine/graphql';
import { Entity } from '@toeverything/infra';
import type { DefaultServerService, WorkspaceServerService } from '../../cloud';
import { GraphQLService } from '../../cloud/services/graphql';
import type { NbstoreService } from '../../storage';
import type { WorkspaceService } from '../../workspace';
export class AudioTranscriptionJobStore extends Entity<{
@@ -20,7 +21,8 @@ export class AudioTranscriptionJobStore extends Entity<{
constructor(
private readonly workspaceService: WorkspaceService,
private readonly workspaceServerService: WorkspaceServerService,
private readonly defaultServerService: DefaultServerService
private readonly defaultServerService: DefaultServerService,
private readonly nbstoreService: NbstoreService
) {
super();
}
@@ -79,27 +81,27 @@ export class AudioTranscriptionJobStore extends Entity<{
return response.retryTranscriptTask;
};
getTranscriptTask = async (blobId: string, taskId?: string) => {
const graphqlService = this.graphqlService;
if (!graphqlService) {
throw new Error('No graphql service available');
}
getTranscriptTask = async (
blobId: string,
taskId?: string
): Promise<TranscriptionResultType | null> => {
const currentWorkspaceId = this.currentWorkspaceId;
if (!currentWorkspaceId) {
throw new Error('No current workspace id');
}
const response = await graphqlService.gql({
query: getTranscriptTaskQuery,
variables: {
workspaceId: currentWorkspaceId,
taskId,
blobId,
},
});
if (!response.currentUser?.copilot?.transcriptTask) {
return null;
}
return response.currentUser.copilot.transcriptTask;
const response = await this.nbstoreService.realtime.request(
'copilot.transcript.task.get',
{ workspaceId: currentWorkspaceId, taskId, blobId },
{ timeoutMs: 10000 }
);
return response.task as TranscriptionResultType | null;
};
subscribeTranscriptTask = (taskId: string) => {
return this.nbstoreService.realtime.subscribe(
'copilot.transcript.task.changed',
{ workspaceId: this.currentWorkspaceId, taskId }
);
};
settleTranscriptTask = async (taskId: string) => {
const graphqlService = this.graphqlService;
@@ -4,6 +4,7 @@ import { DebugLogger } from '@affine/debug';
import { UserFriendlyError } from '@affine/error';
import { AiJobStatus } from '@affine/graphql';
import { Entity, LiveData } from '@toeverything/infra';
import type { Subscription } from 'rxjs';
import type { DefaultServerService, WorkspaceServerService } from '../../cloud';
import { AuthService } from '../../cloud/services/auth';
@@ -59,10 +60,13 @@ export class AudioTranscriptionJob extends Entity<{
super();
this.disposables.push(() => {
this.disposed = true;
this.rejectTaskWait(new Error('Job disposed'));
});
}
disposed = false;
private taskSubscription?: Subscription;
private taskWaitReject?: (error: unknown) => void;
private readonly _status$ = new LiveData<TranscriptionStatus>({
status: 'waiting-for-job',
@@ -190,38 +194,77 @@ export class AudioTranscriptionJob extends Entity<{
}
private async untilTaskReadyOrSettled() {
while (
!this.disposed &&
this.props.blockProps.jobId &&
this.props.blockProps.createdBy === this.currentUserId
const taskId = this.props.blockProps.jobId;
if (!taskId || this.props.blockProps.createdBy !== this.currentUserId) {
return;
}
await this.checkTranscriptTask(taskId);
this.rejectTaskWait(new Error('Transcript task wait replaced'));
await new Promise<void>((resolve, reject) => {
this.taskWaitReject = reject;
this.taskSubscription = this.store
.subscribeTranscriptTask(taskId)
.subscribe({
next: event => {
if (
'type' in event ||
(event.status as AiJobStatus) === AiJobStatus.finished
) {
this.checkTranscriptTask(taskId).then(() => {
if (this.status$.value.status === AiJobStatus.finished) {
resolve();
}
}, reject);
} else if ((event.status as AiJobStatus) === AiJobStatus.failed) {
reject(
UserFriendlyError.fromAny(
event.error ?? 'Transcription job failed'
)
);
}
},
error: reject,
});
}).finally(() => {
this.taskWaitReject = undefined;
this.taskSubscription?.unsubscribe();
this.taskSubscription = undefined;
});
}
private rejectTaskWait(error: unknown) {
const reject = this.taskWaitReject;
this.taskWaitReject = undefined;
this.taskSubscription?.unsubscribe();
this.taskSubscription = undefined;
reject?.(error);
}
private async checkTranscriptTask(taskId: string) {
if (
this.disposed ||
this.props.blockProps.jobId !== taskId ||
this.props.blockProps.createdBy !== this.currentUserId
) {
logger.debug('Polling job status', {
jobId: this.props.blockProps.jobId,
return;
}
const job = await this.store.getTranscriptTask(this.props.blobId, taskId);
if (!job || job.status === AiJobStatus.failed) {
logger.debug('Job failed during realtime status check', {
jobId: taskId,
});
const job = await this.store.getTranscriptTask(
this.props.blobId,
this.props.blockProps.jobId
);
throw UserFriendlyError.fromAny('Transcription job failed');
}
if (!job || job?.status === 'failed') {
logger.debug('Job failed during polling', {
jobId: this.props.blockProps.jobId,
});
throw UserFriendlyError.fromAny('Transcription job failed');
}
if (job?.status === AiJobStatus.finished) {
logger.debug('Transcript task is ready to settle', {
jobId: this.props.blockProps.jobId,
});
this._status$.value = {
status: AiJobStatus.finished,
};
return;
}
// Add delay between polling attempts
await new Promise(resolve => setTimeout(resolve, 3000));
if (job.status === AiJobStatus.finished) {
logger.debug('Transcript task is ready to settle', { jobId: taskId });
this._status$.value = {
status: AiJobStatus.finished,
};
}
}
@@ -1,7 +1,7 @@
import type { Framework } from '@toeverything/infra';
import { DefaultServerService, WorkspaceServerService } from '../cloud';
import { GlobalState, GlobalStateService } from '../storage';
import { GlobalState, GlobalStateService, NbstoreService } from '../storage';
import { WorkbenchService } from '../workbench';
import { WorkspaceScope, WorkspaceService } from '../workspace';
import { AudioAttachmentBlock } from './entities/audio-attachment-block';
@@ -35,6 +35,7 @@ export function configureMediaModule(framework: Framework) {
WorkspaceService,
WorkspaceServerService,
DefaultServerService,
NbstoreService,
])
.service(AudioAttachmentService)
.service(AudioMediaManagerService, [
@@ -12,7 +12,7 @@ import {
ServerScope,
ServerService,
} from '../cloud';
import { GlobalSessionState } from '../storage';
import { GlobalSessionState, NbstoreService } from '../storage';
import { NotificationCountService } from './services/count';
import { NotificationListService } from './services/list';
import { NotificationService } from './services/notification';
@@ -22,7 +22,11 @@ export function configureNotificationModule(framework: Framework) {
framework
.scope(ServerScope)
.service(NotificationService, [NotificationStore])
.service(NotificationCountService, [NotificationStore, AuthService])
.service(NotificationCountService, [
NotificationStore,
AuthService,
NbstoreService,
])
.service(NotificationListService, [
NotificationStore,
NotificationCountService,
@@ -10,11 +10,13 @@ import {
Service,
smartRetry,
} from '@toeverything/infra';
import { switchMap, tap, timer } from 'rxjs';
import type { Subscription } from 'rxjs';
import { tap } from 'rxjs';
import { AccountChanged, type AuthService } from '../../cloud';
import { ServerStarted } from '../../cloud/events/server-started';
import { ApplicationFocused } from '../../lifecycle';
import type { NbstoreService } from '../../storage';
import type { NotificationStore } from '../stores/notification';
@OnEvent(ApplicationFocused, s => s.handleApplicationFocused)
@@ -23,7 +25,8 @@ import type { NotificationStore } from '../stores/notification';
export class NotificationCountService extends Service {
constructor(
private readonly store: NotificationStore,
private readonly authService: AuthService
private readonly authService: AuthService,
private readonly nbstoreService: NbstoreService
) {
super();
}
@@ -33,17 +36,17 @@ export class NotificationCountService extends Service {
readonly count$ = LiveData.from(this.store.watchNotificationCountCache(), 0);
readonly isLoading$ = new LiveData(false);
readonly error$ = new LiveData<any>(null);
private subscription?: Subscription;
revalidate = effect(
switchMap(() => {
return timer(0, 30000); // revalidate every 30 seconds
}),
exhaustMapWithTrailing(() => {
return fromPromise(signal => {
return fromPromise(() => {
if (!this.loggedIn$.value) {
return Promise.resolve(0);
}
return this.store.getNotificationCount(signal);
return this.nbstoreService.realtime
.request('notification.count.get', {}, { timeoutMs: 10000 })
.then(result => result.count);
}).pipe(
tap(result => {
this.setCount(result ?? 0);
@@ -63,10 +66,12 @@ export class NotificationCountService extends Service {
}
handleServerStarted() {
this.subscribe();
this.revalidate();
}
handleAccountChanged() {
this.subscribe();
this.revalidate();
}
@@ -77,5 +82,27 @@ export class NotificationCountService extends Service {
override dispose(): void {
super.dispose();
this.revalidate.unsubscribe();
this.subscription?.unsubscribe();
}
private subscribe() {
this.subscription?.unsubscribe();
if (!this.loggedIn$.value) {
return;
}
this.subscription = this.nbstoreService.realtime
.subscribe('notification.count.changed', {})
.subscribe({
next: event => {
if ('type' in event) {
this.revalidate();
} else {
this.setCount(event.count);
}
},
error: error => {
this.error$.setValue(error);
},
});
}
}
@@ -3,7 +3,6 @@ import {
type ListNotificationsQuery,
listNotificationsQuery,
mentionUserMutation,
notificationCountQuery,
type PaginationInput,
readAllNotificationsMutation,
readNotificationMutation,
@@ -52,17 +51,6 @@ export class NotificationStore extends Store {
);
}
async getNotificationCount(signal?: AbortSignal) {
const result = await this.gqlService.gql({
query: notificationCountQuery,
context: {
signal,
},
});
return result.currentUser?.notifications.totalCount;
}
async listNotification(pagination: PaginationInput, signal?: AbortSignal) {
const result = await this.gqlService.gql({
query: listNotificationsQuery,
@@ -1,10 +1,13 @@
import type {
StoreClient,
StoreManagerClient,
WorkerInitOptions,
} from '@affine/nbstore/worker/client';
import { createIdentifier } from '@toeverything/infra';
export interface NbstoreProvider {
readonly realtime: StoreManagerClient['realtime'];
/**
* Open a nbstore with the given options, if the store with the given key already exists, it will be returned.
*
@@ -11,4 +11,8 @@ export class NbstoreService extends Service {
openStore(key: string, options: WorkerInitOptions) {
return this.nbstoreProvider.openStore(key, options);
}
get realtime() {
return this.nbstoreProvider.realtime;
}
}
@@ -10,8 +10,8 @@ import {
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY, interval, Subject } from 'rxjs';
import { exhaustMap, mergeMap, switchMap, takeUntil } from 'rxjs/operators';
import { EMPTY, type Subscription } from 'rxjs';
import { exhaustMap, mergeMap } from 'rxjs/operators';
import type { EmbeddingStore } from '../stores/embedding';
import type { LocalAttachmentFile } from '../types';
@@ -26,8 +26,7 @@ export class EmbeddingProgress extends Entity {
error$ = new LiveData<any>(null);
loading$ = new LiveData(true);
private readonly EMBEDDING_PROGRESS_POLL_INTERVAL = 3000;
private readonly stopEmbeddingProgress$ = new Subject<void>();
private progressSubscription?: Subscription;
uploadingAttachments$ = new LiveData<LocalAttachmentFile[]>([]);
constructor(
@@ -37,50 +36,49 @@ export class EmbeddingProgress extends Entity {
super();
}
startEmbeddingProgressPolling() {
this.stopEmbeddingProgressPolling();
startEmbeddingProgress() {
this.stopEmbeddingProgress();
this.progressSubscription = this.store
.subscribeEmbeddingProgress(this.workspaceService.workspace.id)
.subscribe({
next: () => this.getEmbeddingProgress(),
error: error => this.error$.setValue(error),
});
this.getEmbeddingProgress();
}
stopEmbeddingProgressPolling() {
this.stopEmbeddingProgress$.next();
stopEmbeddingProgress() {
this.progressSubscription?.unsubscribe();
this.progressSubscription = undefined;
}
getEmbeddingProgress = effect(
exhaustMap(() => {
return interval(this.EMBEDDING_PROGRESS_POLL_INTERVAL).pipe(
takeUntil(this.stopEmbeddingProgress$),
switchMap(() =>
fromPromise(signal =>
this.store.getEmbeddingProgress(
this.workspaceService.workspace.id,
signal
)
).pipe(
smartRetry(),
mergeMap(value => {
this.progress$.next(value);
if (value && value.embedded === value.total && value.total > 0) {
this.stopEmbeddingProgressPolling();
}
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace embedding progress',
error
);
}),
onStart(() => this.loading$.setValue(true)),
onComplete(() => this.loading$.setValue(false))
)
return fromPromise(signal =>
this.store.getEmbeddingProgress(
this.workspaceService.workspace.id,
signal
)
).pipe(
smartRetry(),
mergeMap(value => {
this.progress$.next(value);
if (value && value.embedded === value.total && value.total > 0) {
this.stopEmbeddingProgress();
}
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error('Failed to fetch workspace embedding progress', error);
}),
onStart(() => this.loading$.setValue(true)),
onComplete(() => this.loading$.setValue(false))
);
})
);
override dispose(): void {
this.stopEmbeddingProgress$.next();
this.progressSubscription?.unsubscribe();
this.getEmbeddingProgress.unsubscribe();
}
}
@@ -1,4 +1,5 @@
import { WorkspaceServerService } from '@affine/core/modules/cloud';
import { NbstoreService } from '@affine/core/modules/storage';
import {
WorkspaceScope,
WorkspaceService,
@@ -16,7 +17,7 @@ export function configureIndexerEmbeddingModule(framework: Framework) {
framework
.scope(WorkspaceScope)
.service(EmbeddingService)
.store(EmbeddingStore, [WorkspaceServerService])
.store(EmbeddingStore, [WorkspaceServerService, NbstoreService])
.entity(EmbeddingEnabled, [WorkspaceService, EmbeddingStore])
.entity(AdditionalAttachments, [WorkspaceService, EmbeddingStore])
.entity(IgnoredDocs, [WorkspaceService, EmbeddingStore])
@@ -1,11 +1,11 @@
import type { WorkspaceServerService } from '@affine/core/modules/cloud';
import type { NbstoreService } from '@affine/core/modules/storage';
import {
addWorkspaceEmbeddingFilesMutation,
addWorkspaceEmbeddingIgnoredDocsMutation,
getAllWorkspaceEmbeddingIgnoredDocsQuery,
getWorkspaceConfigQuery,
getWorkspaceEmbeddingFilesQuery,
getWorkspaceEmbeddingStatusQuery,
type PaginationInput,
removeWorkspaceEmbeddingFilesMutation,
removeWorkspaceEmbeddingIgnoredDocsMutation,
@@ -14,7 +14,10 @@ import {
import { Store } from '@toeverything/infra';
export class EmbeddingStore extends Store {
constructor(private readonly workspaceServerService: WorkspaceServerService) {
constructor(
private readonly workspaceServerService: WorkspaceServerService,
private readonly nbstoreService: NbstoreService
) {
super();
}
@@ -178,17 +181,17 @@ export class EmbeddingStore extends Store {
}
async getEmbeddingProgress(workspaceId: string, signal?: AbortSignal) {
if (!this.workspaceServerService.server) {
throw new Error('No Server');
}
return await this.nbstoreService.realtime.request(
'workspace.embedding.progress.get',
{ workspaceId },
{ signal, timeoutMs: 10000 }
);
}
const data = await this.workspaceServerService.server.gql({
query: getWorkspaceEmbeddingStatusQuery,
variables: {
workspaceId,
},
context: { signal },
});
return data.queryWorkspaceEmbeddingStatus;
subscribeEmbeddingProgress(workspaceId: string) {
return this.nbstoreService.realtime.subscribe(
'workspace.embedding.progress.changed',
{ workspaceId }
);
}
}
@@ -66,7 +66,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => {
.setEnabled(checked)
.then(() => {
if (checked) {
embeddingService.embeddingProgress.startEmbeddingProgressPolling();
embeddingService.embeddingProgress.startEmbeddingProgress();
}
})
.catch(error => {
@@ -91,8 +91,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => {
docType: file.type,
});
embeddingService.additionalAttachments.addAttachments([file]);
// Restart polling to track progress of newly uploaded files
embeddingService.embeddingProgress.startEmbeddingProgressPolling();
embeddingService.embeddingProgress.startEmbeddingProgress();
},
[embeddingService.additionalAttachments, embeddingService.embeddingProgress]
);
@@ -168,7 +167,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => {
]);
useEffect(() => {
embeddingService.embeddingProgress.startEmbeddingProgressPolling();
embeddingService.embeddingProgress.startEmbeddingProgress();
embeddingService.embeddingEnabled.getEnabled();
embeddingService.additionalAttachments.getAttachments({
first: COUNT_PER_PAGE,
@@ -178,7 +177,7 @@ const EmbeddingCloud: React.FC<{ disabled: boolean }> = ({ disabled }) => {
embeddingService.embeddingProgress.getEmbeddingProgress();
return () => {
embeddingService.embeddingProgress.stopEmbeddingProgressPolling();
embeddingService.embeddingProgress.stopEmbeddingProgress();
};
}, [
embeddingService.embeddingProgress,
+8
View File
@@ -1111,6 +1111,7 @@ export const PackageList = [
'tools/cli',
'tools/utils',
'packages/common/graphql',
'packages/common/realtime',
],
},
{
@@ -1148,6 +1149,7 @@ export const PackageList = [
name: '@affine/nbstore',
workspaceDependencies: [
'packages/common/reader',
'packages/common/realtime',
'packages/common/infra',
'packages/common/error',
'packages/common/graphql',
@@ -1159,6 +1161,11 @@ export const PackageList = [
name: '@affine/reader',
workspaceDependencies: ['blocksuite/affine/all'],
},
{
location: 'packages/common/realtime',
name: '@affine/realtime',
workspaceDependencies: ['packages/common/graphql'],
},
{
location: 'packages/common/s3-compat',
name: '@affine/s3-compat',
@@ -1524,6 +1531,7 @@ export type PackageName =
| '@toeverything/infra'
| '@affine/nbstore'
| '@affine/reader'
| '@affine/realtime'
| '@affine/s3-compat'
| '@affine/admin'
| '@affine/android'
+1
View File
@@ -132,6 +132,7 @@
{ "path": "./packages/common/infra" },
{ "path": "./packages/common/nbstore" },
{ "path": "./packages/common/reader" },
{ "path": "./packages/common/realtime" },
{ "path": "./packages/common/s3-compat" },
{ "path": "./packages/frontend/admin" },
{ "path": "./packages/frontend/apps/android" },
+11
View File
@@ -818,6 +818,7 @@ __metadata:
"@affine/error": "workspace:*"
"@affine/graphql": "workspace:*"
"@affine/reader": "workspace:*"
"@affine/realtime": "workspace:*"
"@blocksuite/affine": "workspace:*"
"@toeverything/infra": "workspace:*"
eventemitter2: "npm:^6.4.9"
@@ -835,6 +836,7 @@ __metadata:
peerDependencies:
"@affine/error": "workspace:*"
"@affine/graphql": "workspace:*"
"@affine/realtime": "workspace:*"
"@blocksuite/affine": "workspace:*"
idb: ^8.0.0
socket.io-client: ^4.8.3
@@ -866,6 +868,14 @@ __metadata:
languageName: unknown
linkType: soft
"@affine/realtime@workspace:*, @affine/realtime@workspace:packages/common/realtime":
version: 0.0.0-use.local
resolution: "@affine/realtime@workspace:packages/common/realtime"
dependencies:
"@affine/graphql": "workspace:*"
languageName: unknown
linkType: soft
"@affine/revert-update@workspace:tools/revert-update":
version: 0.0.0-use.local
resolution: "@affine/revert-update@workspace:tools/revert-update"
@@ -918,6 +928,7 @@ __metadata:
"@affine-tools/cli": "workspace:*"
"@affine-tools/utils": "workspace:*"
"@affine/graphql": "workspace:*"
"@affine/realtime": "workspace:*"
"@affine/s3-compat": "workspace:*"
"@affine/server-native": "workspace:*"
"@apollo/server": "npm:^5.5.0"