feat(core): migrate more pull to realtime (#14936)

#### PR Dependency Tree


* **PR #14936** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Refactor**
* Consolidated realtime subscription patterns for consistent, more
reliable live updates across comments, notifications, transcription
tasks, and embedding progress.
* Standardized realtime room naming and subscription keys for
deterministic delivery.

* **New Features**
* Introduced a reusable live-query mechanism powering realtime snapshot
+ event workflows used by comments, notifications, transcript tasks, and
embedding progress.

* **Tests**
* Added tests covering live-query behavior and deterministic
subscription key generation.

[![Review Change
Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/toeverything/AFFiNE/pull/14936)
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-05-11 00:33:25 +08:00
committed by GitHub
parent 8cf00738c2
commit db0ff0a9df
26 changed files with 812 additions and 441 deletions
@@ -3,12 +3,17 @@ import { z } from 'zod';
import { decodeWithJson, encodeWithJson } from '../../base/graphql';
import { AccessController } from '../permission';
import type { RealtimePublisher, RealtimeRegistry } from '../realtime';
import {
realtimeCommentRoom,
type RealtimePublisher,
type RealtimeRegistry,
registerRealtimeLiveQuery,
} from '../realtime';
import type { CommentCursor } from './resolver';
import { CommentService } from './service';
export function commentRoom(workspaceId: string, docId: string) {
return `workspace:${workspaceId}:doc:${docId}:comment`;
return realtimeCommentRoom(workspaceId, docId);
}
@Injectable()
@@ -27,53 +32,57 @@ export class CommentRealtimeProvider implements OnModuleInit {
first: z.number().optional(),
});
this.registry?.registerRequest({
name: 'comment.changes.get',
input,
handle: async (user, payload) => {
await this.assertRead(user.id, payload.workspaceId, payload.docId);
const cursor: CommentCursor = decodeWithJson(payload.after) ?? {};
const changes = await this.service.listCommentChanges(
payload.workspaceId,
payload.docId,
{
commentUpdatedAt: cursor.commentUpdatedAt,
replyUpdatedAt: cursor.replyUpdatedAt,
take: payload.first,
registerRealtimeLiveQuery(this.registry, {
request: {
name: 'comment.changes.get',
input,
handle: async (user, payload) => {
await this.assertRead(user.id, payload.workspaceId, payload.docId);
const cursor: CommentCursor = decodeWithJson(payload.after) ?? {};
const limit = payload.first;
const changes = await this.service.listCommentChanges(
payload.workspaceId,
payload.docId,
{
commentUpdatedAt: cursor.commentUpdatedAt,
replyUpdatedAt: cursor.replyUpdatedAt,
take: limit ? limit + 1 : undefined,
}
);
const pageChanges = limit ? changes.slice(0, limit) : changes;
const endCursor = cursor;
for (const change of pageChanges) {
if (change.commentId) {
endCursor.replyUpdatedAt = change.item.updatedAt;
} else {
endCursor.commentUpdatedAt = change.item.updatedAt;
}
}
);
const endCursor = cursor;
for (const change of changes) {
if (change.commentId) {
endCursor.replyUpdatedAt = change.item.updatedAt;
} else {
endCursor.commentUpdatedAt = change.item.updatedAt;
}
}
return {
changes: changes.map(change => ({
id: change.id,
action: change.action,
item: change.item,
commentId: change.commentId ?? undefined,
})) as never,
startCursor: '',
endCursor: encodeWithJson(endCursor),
hasNextPage: changes.length > 0,
};
return {
changes: pageChanges.map(change => ({
id: change.id,
action: change.action,
item: change.item,
commentId: change.commentId ?? null,
})),
startCursor: '',
endCursor: encodeWithJson(endCursor),
hasNextPage: limit ? changes.length > limit : false,
};
},
},
});
this.registry?.registerTopic({
name: 'comment.changed',
input: z.object({
workspaceId: z.string(),
docId: z.string(),
}),
authorize: async (user, payload) => {
await this.assertRead(user.id, payload.workspaceId, payload.docId);
topic: {
name: 'comment.changed',
input: z.object({
workspaceId: z.string(),
docId: z.string(),
}),
authorize: async (user, payload) => {
await this.assertRead(user.id, payload.workspaceId, payload.docId);
},
room: (_user, payload) =>
commentRoom(payload.workspaceId, payload.docId),
},
room: (_user, payload) => commentRoom(payload.workspaceId, payload.docId),
});
}
@@ -1,3 +0,0 @@
export function notificationCountRoom(userId: string) {
return `user:${userId}:notification`;
}
@@ -1,8 +1,11 @@
import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { z } from 'zod';
import type { RealtimeRegistry } from '../realtime';
import { notificationCountRoom } from './realtime-room';
import {
realtimeNotificationRoom,
type RealtimeRegistry,
registerRealtimeLiveQuery,
} from '../realtime';
import { NotificationService } from './service';
@Injectable()
@@ -13,23 +16,25 @@ export class NotificationRealtimeProvider implements OnModuleInit {
) {}
onModuleInit() {
this.registry?.registerRequest({
name: 'notification.count.get',
input: z.object({}).strict(),
handle: async user => ({
count: await this.service.countByUserId(user.id),
}),
});
this.registry?.registerTopic({
name: 'notification.count.changed',
input: z.object({}).strict(),
authorize: async () => {},
room: user => {
if (!user) {
throw new Error('User is required for notification count room');
}
return notificationCountRoom(user.id);
const input = z.object({}).strict();
registerRealtimeLiveQuery(this.registry, {
request: {
name: 'notification.count.get',
input,
handle: async user => ({
count: await this.service.countByUserId(user.id),
}),
},
topic: {
name: 'notification.count.changed',
input,
authorize: async () => {},
room: user => {
if (!user) {
throw new Error('User is required for notification count room');
}
return realtimeNotificationRoom(user.id);
},
},
});
}
@@ -18,12 +18,12 @@ import {
import { DocReader } from '../doc';
import { Mailer } from '../mail';
import type { RealtimePublisher } from '../realtime';
import { realtimeNotificationRoom } from '../realtime';
import { generateDocPath } from '../utils/doc';
import {
generateWorkspaceSettingsPath,
WorkspaceSettingsTab,
} from '../utils/workspace';
import { notificationCountRoom } from './realtime-room';
@Injectable()
export class NotificationService {
@@ -499,7 +499,7 @@ export class NotificationService {
'notification.count.changed',
{},
{ count: await this.countByUserId(userId), reason },
{ room: notificationCountRoom(userId) }
{ room: realtimeNotificationRoom(userId) }
);
} catch (error) {
this.logger.error(
@@ -1,11 +1,18 @@
import { getRealtimeInputKey } from '@affine/realtime';
import test from 'ava';
import { z } from 'zod';
import type { CurrentUser } from '../../auth';
import { RealtimeGateway } from '../gateway';
import type { RealtimePublisher } from '../publisher';
import {
realtimeCommentRoom,
realtimeNotificationRoom,
realtimeTranscriptTaskRoom,
realtimeWorkspaceEmbeddingProgressRoom,
registerRealtimeLiveQuery,
} from '../index';
import { RealtimePublisher } from '../publisher';
import { RealtimeRegistry } from '../registry';
import { stableStringify } from '../stable-stringify';
const user: CurrentUser = {
id: 'u1',
@@ -109,7 +116,7 @@ test('gateway authorizes subscription and joins room', async t => {
t.deepEqual(joined, ['workspace:space']);
t.deepEqual(result, {
data: {
subscriptionId: `socket-1:comment.changed:${stableStringify({
subscriptionId: `socket-1:comment.changed:${getRealtimeInputKey({
workspaceId: 'space',
docId: 'doc',
})}`,
@@ -126,22 +133,101 @@ test('gateway authorizes subscription and joins room', async t => {
);
});
test('stableStringify is deterministic for subscription input keys', t => {
test('getRealtimeInputKey is deterministic for subscription input keys', t => {
t.is(
stableStringify({ docId: 'doc', workspaceId: 'space' }),
stableStringify({ workspaceId: 'space', docId: 'doc' })
getRealtimeInputKey({ docId: 'doc', workspaceId: 'space' }),
getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' })
);
});
test('stableStringify follows JSON semantics for subscription input keys', t => {
t.is(stableStringify({ after: undefined }), stableStringify({}));
t.is(stableStringify([undefined]), '[null]');
test('getRealtimeInputKey follows JSON semantics for subscription input keys', t => {
t.is(getRealtimeInputKey({ after: undefined }), getRealtimeInputKey({}));
t.is(getRealtimeInputKey([undefined]), '[null]');
t.is(
stableStringify(new Date('2026-01-02T03:04:05.000Z')),
getRealtimeInputKey(new Date('2026-01-02T03:04:05.000Z')),
'"2026-01-02T03:04:05.000Z"'
);
});
test('room helpers produce stable realtime room names', t => {
t.is(realtimeNotificationRoom('u1'), 'user:u1:notification');
t.is(realtimeCommentRoom('space', 'doc'), 'workspace:space:doc:doc:comment');
t.is(
realtimeWorkspaceEmbeddingProgressRoom('space'),
'workspace:space:embedding-progress'
);
t.is(
realtimeTranscriptTaskRoom('space', 'task'),
'copilot:transcript:space:task'
);
});
test('registerRealtimeLiveQuery registers paired request and topic handlers', async t => {
const registry = new RealtimeRegistry();
registerRealtimeLiveQuery(registry, {
request: {
name: 'notification.count.get',
input: z.object({}).strict(),
handle: async () => ({ count: 7 }),
},
topic: {
name: 'notification.count.changed',
input: z.object({}).strict(),
authorize: async () => {},
room: currentUser => `user:${currentUser?.id}:notification`,
},
});
t.deepEqual(
await registry.getRequest('notification.count.get').handle(user, {}),
{
count: 7,
}
);
t.is(
registry.getTopic('notification.count.changed').room(user, {}),
'user:u1:notification'
);
});
test('publisher emits realtime event with shared input key', t => {
const registry = new RealtimeRegistry();
registry.registerTopic({
name: 'comment.changed',
input: z.object({ workspaceId: z.string(), docId: z.string() }),
authorize: async () => {},
room: (_currentUser, input) =>
realtimeCommentRoom(input.workspaceId, input.docId),
});
const emitted: unknown[] = [];
const publisher = new RealtimePublisher(registry, {
broadcast: () => {},
} as never);
publisher.attachServer({
to: (room: string) => ({
emit: (event: string, payload: unknown) =>
emitted.push({ room, event, payload }),
}),
} as never);
publisher.publishLocal({
topic: 'comment.changed',
input: { docId: 'doc', workspaceId: 'space' },
event: { changed: true },
});
t.like(emitted[0], {
room: 'workspace:space:doc:doc:comment',
event: 'realtime:event',
payload: {
topic: 'comment.changed',
inputKey: getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' }),
event: { changed: true },
},
});
});
test('gateway removes subscriptions on socket disconnect', async t => {
const registry = new RealtimeRegistry();
registry.registerTopic({
@@ -3,6 +3,7 @@ import type {
RealtimeSubscribeEnvelope,
RealtimeUnsubscribeEnvelope,
} from '@affine/realtime';
import { getRealtimeInputKey } from '@affine/realtime';
import { applyDecorators, Logger, UseInterceptors } from '@nestjs/common';
import {
ConnectedSocket,
@@ -25,7 +26,6 @@ import {
import { CurrentUser } from '../auth';
import { RealtimePublisher } from './publisher';
import { RealtimeRegistry } from './registry';
import { stableStringify } from './stable-stringify';
import type { RealtimePublishPayload } from './types';
const SubscribeMessage = (event: string) =>
@@ -87,7 +87,7 @@ export class RealtimeGateway implements OnGatewayInit, OnGatewayDisconnect {
await handler.authorize(user, input as never);
const room = handler.room(user, input as never);
await client.join(room);
const subscriptionId = `${client.id}:${envelope.topic}:${stableStringify(input)}`;
const subscriptionId = `${client.id}:${envelope.topic}:${getRealtimeInputKey(input)}`;
this.subscriptions.set(subscriptionId, {
socketId: client.id,
room,
@@ -11,6 +11,16 @@ import { RealtimeRegistry } from './registry';
})
export class RealtimeModule {}
export { registerRealtimeLiveQuery } from './provider';
export { RealtimePublisher } from './publisher';
export { RealtimeRegistry } from './registry';
export {
realtimeCommentRoom,
realtimeNotificationRoom,
realtimeTranscriptTaskRoom,
realtimeUserRoom,
realtimeWorkspaceDocRoom,
realtimeWorkspaceEmbeddingProgressRoom,
realtimeWorkspaceRoom,
} from './rooms';
export type { RealtimeRequestHandler, RealtimeTopicHandler } from './types';
@@ -0,0 +1,23 @@
import type { RealtimeRequestName, RealtimeTopicName } from '@affine/realtime';
import type { RealtimeRegistry } from './registry';
import type { RealtimeRequestHandler, RealtimeTopicHandler } from './types';
export type RealtimeLiveQueryDefinition<
Request extends RealtimeRequestName,
Topic extends RealtimeTopicName,
> = {
request: RealtimeRequestHandler<Request>;
topic: RealtimeTopicHandler<Topic>;
};
export function registerRealtimeLiveQuery<
Request extends RealtimeRequestName,
Topic extends RealtimeTopicName,
>(
registry: RealtimeRegistry | undefined,
definition: RealtimeLiveQueryDefinition<Request, Topic>
) {
registry?.registerRequest(definition.request);
registry?.registerTopic(definition.topic);
}
@@ -1,10 +1,13 @@
import type { RealtimeEvent, RealtimeTopicName } from '@affine/realtime';
import {
getRealtimeInputKey,
type RealtimeEvent,
type RealtimeTopicName,
} from '@affine/realtime';
import { Injectable, Logger } from '@nestjs/common';
import type { Server } from 'socket.io';
import { EventBus } from '../../base';
import { RealtimeRegistry } from './registry';
import { stableStringify } from './stable-stringify';
import type { RealtimePublishPayload } from './types';
@Injectable()
@@ -46,7 +49,7 @@ export class RealtimePublisher {
const room = payload.room ?? handler.room(null, payload.input as never);
const envelope: RealtimeEvent = {
topic: payload.topic,
inputKey: stableStringify(payload.input),
inputKey: getRealtimeInputKey(payload.input),
sentAt: Date.now(),
event: payload.event as never,
};
@@ -0,0 +1,34 @@
export function realtimeUserRoom(userId: string, scope: string) {
return `user:${userId}:${scope}`;
}
export function realtimeWorkspaceRoom(workspaceId: string, scope: string) {
return `workspace:${workspaceId}:${scope}`;
}
export function realtimeWorkspaceDocRoom(
workspaceId: string,
docId: string,
scope: string
) {
return `workspace:${workspaceId}:doc:${docId}:${scope}`;
}
export function realtimeTranscriptTaskRoom(
workspaceId: string,
taskId: string
) {
return `copilot:transcript:${workspaceId}:${taskId}`;
}
export function realtimeNotificationRoom(userId: string) {
return realtimeUserRoom(userId, 'notification');
}
export function realtimeCommentRoom(workspaceId: string, docId: string) {
return realtimeWorkspaceDocRoom(workspaceId, docId, 'comment');
}
export function realtimeWorkspaceEmbeddingProgressRoom(workspaceId: string) {
return realtimeWorkspaceRoom(workspaceId, 'embedding-progress');
}
@@ -1,31 +0,0 @@
export function stableStringify(value: unknown): string {
if (
value === undefined ||
typeof value === 'function' ||
typeof value === 'symbol'
) {
return 'null';
}
if (value === null || typeof value !== 'object') {
return JSON.stringify(value);
}
if (Array.isArray(value)) {
return `[${value.map(stableStringify).join(',')}]`;
}
if (value instanceof Date) {
return JSON.stringify(value.toJSON());
}
const record = value as Record<string, unknown>;
return `{${Object.keys(record)
.filter(key => {
const property = record[key];
return (
property !== undefined &&
typeof property !== 'function' &&
typeof property !== 'symbol'
);
})
.sort()
.map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`)
.join(',')}}`;
}
@@ -3,15 +3,17 @@ import { z } from 'zod';
import { OnEvent } from '../../../base';
import { AccessController } from '../../../core/permission';
import type {
import {
RealtimePublisher,
RealtimeRegistry,
realtimeWorkspaceEmbeddingProgressRoom,
registerRealtimeLiveQuery,
} from '../../../core/realtime';
import { Models } from '../../../models';
import { CopilotContextService } from './service';
export function workspaceEmbeddingRoom(workspaceId: string) {
return `workspace:${workspaceId}:embedding-progress`;
return realtimeWorkspaceEmbeddingProgressRoom(workspaceId);
}
@Injectable()
@@ -27,27 +29,28 @@ export class CopilotEmbeddingRealtimeProvider implements OnModuleInit {
onModuleInit() {
const input = z.object({ workspaceId: z.string() });
this.registry?.registerRequest({
name: 'workspace.embedding.progress.get',
input,
handle: async (user, payload) => {
await this.assertCopilot(user.id, payload.workspaceId);
if (!this.context.canEmbedding) {
return { total: 0, embedded: 0 };
}
return await this.models.copilotWorkspace.getEmbeddingStatus(
payload.workspaceId
);
registerRealtimeLiveQuery(this.registry, {
request: {
name: 'workspace.embedding.progress.get',
input,
handle: async (user, payload) => {
await this.assertCopilot(user.id, payload.workspaceId);
if (!this.context.canEmbedding) {
return { total: 0, embedded: 0 };
}
return await this.models.copilotWorkspace.getEmbeddingStatus(
payload.workspaceId
);
},
},
});
this.registry?.registerTopic({
name: 'workspace.embedding.progress.changed',
input,
authorize: async (user, payload) => {
await this.assertCopilot(user.id, payload.workspaceId);
topic: {
name: 'workspace.embedding.progress.changed',
input,
authorize: async (user, payload) => {
await this.assertCopilot(user.id, payload.workspaceId);
},
room: (_user, payload) => workspaceEmbeddingRoom(payload.workspaceId),
},
room: (_user, payload) => workspaceEmbeddingRoom(payload.workspaceId),
});
}
@@ -3,8 +3,12 @@ import { z } from 'zod';
import { CopilotTranscriptionJobNotFound } from '../../../base';
import { AccessController } from '../../../core/permission';
import type { RealtimeRegistry } from '../../../core/realtime';
import { CopilotTranscriptionService, transcriptTaskRoom } from './service';
import {
type RealtimeRegistry,
realtimeTranscriptTaskRoom,
registerRealtimeLiveQuery,
} from '../../../core/realtime';
import { CopilotTranscriptionService } from './service';
@Injectable()
export class CopilotTranscriptRealtimeProvider implements OnModuleInit {
@@ -15,47 +19,51 @@ export class CopilotTranscriptRealtimeProvider implements OnModuleInit {
) {}
onModuleInit() {
this.registry?.registerRequest({
name: 'copilot.transcript.task.get',
input: z
.object({
workspaceId: z.string(),
blobId: z.string().optional(),
taskId: z.string().optional(),
})
.refine(input => input.blobId || input.taskId),
handle: async (user, input) => {
await this.assertCopilot(user.id, input.workspaceId);
return {
task: await this.transcript.queryTask(
user.id,
input.workspaceId,
input.taskId,
input.blobId
),
};
},
const requestInput = z
.object({
workspaceId: z.string(),
blobId: z.string().optional(),
taskId: z.string().optional(),
})
.refine(input => input.blobId || input.taskId);
const topicInput = z.object({
workspaceId: z.string(),
taskId: z.string(),
});
this.registry?.registerTopic({
name: 'copilot.transcript.task.changed',
input: z.object({
workspaceId: z.string(),
taskId: z.string(),
}),
authorize: async (user, input) => {
await this.assertCopilot(user.id, input.workspaceId);
const task = await this.transcript.queryTask(
user.id,
input.workspaceId,
input.taskId
);
if (!task) {
throw new CopilotTranscriptionJobNotFound();
}
registerRealtimeLiveQuery(this.registry, {
request: {
name: 'copilot.transcript.task.get',
input: requestInput,
handle: async (user, input) => {
await this.assertCopilot(user.id, input.workspaceId);
return {
task: await this.transcript.queryTask(
user.id,
input.workspaceId,
input.taskId,
input.blobId
),
};
},
},
topic: {
name: 'copilot.transcript.task.changed',
input: topicInput,
authorize: async (user, input) => {
await this.assertCopilot(user.id, input.workspaceId);
const task = await this.transcript.queryTask(
user.id,
input.workspaceId,
input.taskId
);
if (!task) {
throw new CopilotTranscriptionJobNotFound();
}
},
room: (_user, input) =>
realtimeTranscriptTaskRoom(input.workspaceId, input.taskId),
},
room: (_user, input) =>
transcriptTaskRoom(input.workspaceId, input.taskId),
});
}
@@ -9,7 +9,10 @@ import {
OnJob,
sniffMime,
} from '../../../base';
import type { RealtimePublisher } from '../../../core/realtime';
import {
type RealtimePublisher,
realtimeTranscriptTaskRoom,
} from '../../../core/realtime';
import { Models } from '../../../models';
import { CopilotAccessPolicy } from '../access';
import { PromptService } from '../prompt';
@@ -33,10 +36,6 @@ const TRANSCRIPT_ACTION_ID = 'transcript.audio.gemini';
const TRANSCRIPT_ACTION_VERSION = 'v1';
const TRANSCRIPT_STRATEGY = 'gemini';
export function transcriptTaskRoom(workspaceId: string, taskId: string) {
return `copilot:transcript:${workspaceId}:${taskId}`;
}
export type TranscriptionJob = {
id: string;
status: AiJobStatus;
@@ -468,7 +467,7 @@ export class CopilotTranscriptionService {
'copilot.transcript.task.changed',
{ workspaceId, taskId },
{ taskId, status, error },
{ room: transcriptTaskRoom(workspaceId, taskId) }
{ room: realtimeTranscriptTaskRoom(workspaceId, taskId) }
);
}
}
@@ -1,7 +1,7 @@
import type { RealtimeEvent } from '@affine/realtime';
import { getRealtimeInputKey, type RealtimeEvent } from '@affine/realtime';
import { beforeEach, expect, test, vi } from 'vitest';
import { RealtimeManager, stableStringify } from '../manager';
import { RealtimeManager } from '../manager';
type Handler = (payload?: unknown) => void;
@@ -70,16 +70,16 @@ beforeEach(() => {
socket.disconnected = false;
});
test('stableStringify is deterministic for realtime subscription inputs', () => {
expect(stableStringify({ workspaceId: 'space', docId: 'doc' })).toBe(
stableStringify({ docId: 'doc', workspaceId: 'space' })
test('getRealtimeInputKey is deterministic for realtime subscription inputs', () => {
expect(getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' })).toBe(
getRealtimeInputKey({ docId: 'doc', workspaceId: 'space' })
);
});
test('stableStringify follows JSON semantics for edge values', () => {
expect(stableStringify({ a: undefined })).toBe(stableStringify({}));
expect(stableStringify([undefined])).toBe('[null]');
expect(stableStringify(new Date('2026-01-02T03:04:05.000Z'))).toBe(
test('getRealtimeInputKey follows JSON semantics for edge values', () => {
expect(getRealtimeInputKey({ a: undefined })).toBe(getRealtimeInputKey({}));
expect(getRealtimeInputKey([undefined])).toBe('[null]');
expect(getRealtimeInputKey(new Date('2026-01-02T03:04:05.000Z'))).toBe(
'"2026-01-02T03:04:05.000Z"'
);
});
@@ -159,13 +159,13 @@ test('subscribe routes events by topic and stable input key', async () => {
socket.emit('realtime:event', {
topic: 'comment.changed',
inputKey: stableStringify({ workspaceId: 'space', docId: 'other' }),
inputKey: getRealtimeInputKey({ workspaceId: 'space', docId: 'other' }),
sentAt: 1,
event: { changed: true },
} satisfies RealtimeEvent);
socket.emit('realtime:event', {
topic: 'comment.changed',
inputKey: stableStringify({ workspaceId: 'space', docId: 'doc' }),
inputKey: getRealtimeInputKey({ workspaceId: 'space', docId: 'doc' }),
sentAt: 2,
event: { changed: true },
} satisfies RealtimeEvent);
@@ -1 +1 @@
export { RealtimeManager, stableStringify } from './manager';
export { RealtimeManager } from './manager';
@@ -10,6 +10,7 @@ import type {
RealtimeTopicInputOf,
RealtimeTopicName,
} from '@affine/realtime';
import { getRealtimeInputKey } from '@affine/realtime';
import { Observable, Subject } from 'rxjs';
import { SocketConnection } from '../impls/cloud/socket';
@@ -151,7 +152,7 @@ export class RealtimeManager {
this.subscriptions.set(subscriptionId, {
topic,
input,
inputKey: stableStringify(input),
inputKey: getRealtimeInputKey(input),
subject$,
});
subscriber.next({
@@ -302,35 +303,3 @@ export class RealtimeManager {
this.subscriptions.clear();
}
}
export function stableStringify(value: unknown): string {
if (
value === undefined ||
typeof value === 'function' ||
typeof value === 'symbol'
) {
return 'null';
}
if (value === null || typeof value !== 'object') {
return JSON.stringify(value);
}
if (Array.isArray(value)) {
return `[${value.map(stableStringify).join(',')}]`;
}
if (value instanceof Date) {
return JSON.stringify(value.toJSON());
}
const record = value as Record<string, unknown>;
return `{${Object.keys(record)
.filter(key => {
const property = record[key];
return (
property !== undefined &&
typeof property !== 'function' &&
typeof property !== 'symbol'
);
})
.sort()
.map(key => `${JSON.stringify(key)}:${stableStringify(record[key])}`)
.join(',')}}`;
}
@@ -6,6 +6,11 @@ export { AccountLoggedIn } from './events/account-logged-in';
export { AccountLoggedOut } from './events/account-logged-out';
export { AuthProvider } from './provider/auth';
export { ValidatorProvider } from './provider/validator';
export {
RealtimeLiveQuery,
type RealtimeLiveQueryEventResult,
type RealtimeLiveQueryOptions,
} from './realtime/live-query';
export { ServerScope } from './scopes/server';
export { AccessTokenService } from './services/access-token';
export { AuthService } from './services/auth';
@@ -0,0 +1,156 @@
import { Subject } from 'rxjs';
import { describe, expect, test, vi } from 'vitest';
import { RealtimeLiveQuery } from './live-query';
describe('RealtimeLiveQuery', () => {
test('requests snapshot when subscription is ready', async () => {
const events$ = new Subject<{ type: 'ready' } | { count: number }>();
const applySnapshot = vi.fn();
const query = new RealtimeLiveQuery({
request: vi.fn().mockResolvedValue({ count: 1 }),
subscribe: () => events$,
applySnapshot,
});
query.start();
events$.next({ type: 'ready' });
await vi.waitFor(() =>
expect(applySnapshot).toHaveBeenCalledWith({ count: 1 })
);
query.dispose();
});
test('applies event without revalidating when applyEvent returns applied', async () => {
const events$ = new Subject<{ type: 'ready' } | { count: number }>();
const request = vi.fn().mockResolvedValue({ count: 1 });
const applyEvent = vi.fn().mockReturnValue('applied');
const query = new RealtimeLiveQuery({
request,
subscribe: () => events$,
applySnapshot: vi.fn(),
applyEvent,
});
query.start();
events$.next({ count: 2 });
expect(applyEvent).toHaveBeenCalledWith({ count: 2 });
expect(request).not.toHaveBeenCalled();
query.dispose();
});
test('applies business events that include a type field', () => {
const events$ = new Subject<
{ type: 'ready' } | { type: 'updated'; count: number }
>();
const request = vi.fn().mockResolvedValue({ count: 1 });
const applyEvent = vi.fn().mockReturnValue('applied');
const query = new RealtimeLiveQuery({
request,
subscribe: () => events$,
applySnapshot: vi.fn(),
applyEvent,
});
query.start();
events$.next({ type: 'updated', count: 2 });
expect(applyEvent).toHaveBeenCalledWith({ type: 'updated', count: 2 });
expect(request).not.toHaveBeenCalled();
query.dispose();
});
test('revalidates event when applyEvent asks for it', async () => {
const events$ = new Subject<{ type: 'ready' } | { changed: true }>();
const applySnapshot = vi.fn();
const query = new RealtimeLiveQuery({
request: vi.fn().mockResolvedValue({ changes: [] }),
subscribe: () => events$,
applySnapshot,
applyEvent: () => 'revalidate',
});
query.start();
events$.next({ changed: true });
await vi.waitFor(() =>
expect(applySnapshot).toHaveBeenCalledWith({ changes: [] })
);
query.dispose();
});
test('passes subscription and request errors to onError', async () => {
const events$ = new Subject<{ type: 'ready' }>();
const onError = vi.fn();
const requestError = new Error('request failed');
const query = new RealtimeLiveQuery({
request: vi.fn().mockRejectedValue(requestError),
subscribe: () => events$,
applySnapshot: vi.fn(),
onError,
});
query.start();
events$.next({ type: 'ready' });
await vi.waitFor(() => expect(onError).toHaveBeenCalledWith(requestError));
const subscriptionError = new Error('subscribe failed');
events$.error(subscriptionError);
expect(onError).toHaveBeenCalledWith(subscriptionError);
query.dispose();
});
test('dispose aborts in-flight request and ignores stale result', async () => {
const events$ = new Subject<{ type: 'ready' }>();
const applySnapshot = vi.fn();
let resolveRequest: (value: { count: number }) => void = () => {};
const query = new RealtimeLiveQuery({
request: vi.fn(
() =>
new Promise<{ count: number }>(resolve => {
resolveRequest = resolve;
})
),
subscribe: () => events$,
applySnapshot,
});
query.start();
events$.next({ type: 'ready' });
query.dispose();
resolveRequest({ count: 1 });
await Promise.resolve();
expect(applySnapshot).not.toHaveBeenCalled();
});
test('new request supersedes older in-flight result', async () => {
const events$ = new Subject<{ type: 'ready' }>();
const applySnapshot = vi.fn();
const resolvers: Array<(value: { count: number }) => void> = [];
const query = new RealtimeLiveQuery({
request: vi.fn(
() =>
new Promise<{ count: number }>(resolve => {
resolvers.push(resolve);
})
),
subscribe: () => events$,
applySnapshot,
});
query.start();
events$.next({ type: 'ready' });
events$.next({ type: 'ready' });
resolvers[0]({ count: 1 });
resolvers[1]({ count: 2 });
await vi.waitFor(() =>
expect(applySnapshot).toHaveBeenCalledWith({ count: 2 })
);
expect(applySnapshot).toHaveBeenCalledTimes(1);
query.dispose();
});
});
@@ -0,0 +1,100 @@
import type { RealtimeSubscriptionReady } from '@affine/realtime';
import type { Observable, Subscription } from 'rxjs';
export type RealtimeLiveQueryEventResult = 'applied' | 'revalidate';
export type RealtimeLiveQueryOptions<TSnapshot, TEvent extends object> = {
request: (signal: AbortSignal) => Promise<TSnapshot>;
subscribe: () => Observable<TEvent | RealtimeSubscriptionReady>;
applySnapshot: (snapshot: TSnapshot) => void;
applyEvent?: (event: TEvent) => RealtimeLiveQueryEventResult;
onError?: (error: unknown) => void;
};
function isReadyEvent<TEvent extends object>(
event: TEvent | RealtimeSubscriptionReady
): event is RealtimeSubscriptionReady {
return 'type' in event && event.type === 'ready';
}
export class RealtimeLiveQuery<TSnapshot, TEvent extends object> {
private subscription?: Subscription;
private requestController?: AbortController;
private generation = 0;
private started = false;
constructor(
private readonly options: RealtimeLiveQueryOptions<TSnapshot, TEvent>
) {}
start() {
this.stop();
this.started = true;
const generation = this.generation;
this.subscription = this.options.subscribe().subscribe({
next: event => {
if (isReadyEvent(event)) {
this.revalidate();
return;
}
if (!this.options.applyEvent) {
this.revalidate();
return;
}
if (this.options.applyEvent(event) === 'revalidate') {
this.revalidate();
}
},
error: error => {
if (this.generation === generation) {
this.options.onError?.(error);
}
},
});
}
revalidate() {
if (!this.started) {
return;
}
this.requestController?.abort();
const controller = new AbortController();
this.requestController = controller;
const generation = this.generation;
this.options.request(controller.signal).then(
snapshot => {
if (
this.started &&
this.generation === generation &&
this.requestController === controller &&
!controller.signal.aborted
) {
this.options.applySnapshot(snapshot);
}
},
error => {
if (
this.started &&
this.generation === generation &&
this.requestController === controller &&
!controller.signal.aborted
) {
this.options.onError?.(error);
}
}
);
}
stop() {
this.started = false;
this.generation += 1;
this.subscription?.unsubscribe();
this.subscription = undefined;
this.requestController?.abort();
this.requestController = undefined;
}
dispose() {
this.stop();
}
}
@@ -11,6 +11,10 @@ import {
updateReplyMutation,
uploadCommentAttachmentMutation,
} from '@affine/graphql';
import type {
RealtimeSubscriptionReady,
RealtimeTopicEventOf,
} from '@affine/realtime';
import { Entity } from '@toeverything/infra';
import type { Observable } from 'rxjs';
@@ -144,11 +148,11 @@ export class DocCommentStore extends Entity<{
{ after, workspaceId: this.currentWorkspaceId, docId: this.props.docId }
);
return {
changes: commentChanges.changes.map((change: any) => ({
changes: commentChanges.changes.map(change => ({
id: change.id,
action: change.action,
comment: normalizeComment(change.item as GQLCommentType),
commentId: change.commentId,
commentId: change.commentId ?? undefined,
})),
startCursor: commentChanges.startCursor,
endCursor: commentChanges.endCursor,
@@ -156,7 +160,9 @@ export class DocCommentStore extends Entity<{
};
}
subscribeCommentChanged(): Observable<unknown> {
subscribeCommentChanged(): Observable<
RealtimeTopicEventOf<'comment.changed'> | RealtimeSubscriptionReady
> {
return this.nbstoreService.realtime.subscribe('comment.changed', {
workspaceId: this.currentWorkspaceId,
docId: this.props.docId,
@@ -16,17 +16,9 @@ import {
onStart,
} from '@toeverything/infra';
import { nanoid } from 'nanoid';
import {
catchError,
filter,
first,
of,
Subject,
type Subscription,
switchMap,
tap,
} from 'rxjs';
import { catchError, filter, first, of, Subject, switchMap, tap } from 'rxjs';
import { RealtimeLiveQuery } from '../../cloud/realtime/live-query';
import { type DocDisplayMetaService } from '../../doc-display-meta';
import { GlobalContextService } from '../../global-context';
import type { SnapshotHelper } from '../services/snapshot-helper';
@@ -93,11 +85,19 @@ export class DocCommentEntity extends Entity<{
private readonly commentDeleted$ = new Subject<CommentId>();
readonly commentHighlighted$ = new LiveData<CommentId | null>(null);
private realtimeDisposable?: DisposeCallback;
private realtimeSubscription?: Subscription;
private readonly liveQuery = new RealtimeLiveQuery({
request: () => this.store.listCommentChanges({ after: this.startCursor }),
subscribe: () => this.store.subscribeCommentChanged(),
applySnapshot: result => {
this.handleCommentChanges(result);
this.startCursor = result.endCursor;
},
onError: error => {
console.error('Failed to sync comment changes:', error);
},
});
private startCursor?: string;
private fetchingCommentChanges = false;
private pendingCommentChangeFetch = false;
private startVersion = 0;
async addComment(
selections?: BaseSelection[],
@@ -490,52 +490,34 @@ export class DocCommentEntity extends Entity<{
}
start(): void {
if (this.realtimeDisposable) {
this.realtimeDisposable();
}
this.realtimeSubscription?.unsubscribe();
this.revalidate();
this.revalidateCommentsInEditor();
this.realtimeSubscription = this.store.subscribeCommentChanged().subscribe({
next: () => {
this.fetchCommentChanges().catch(() => {});
},
error: error => {
console.error('Failed to subscribe comment changes:', error);
},
this.startLiveQueryAfterInitialLoad().catch(error => {
console.error('Failed to start comment realtime:', error);
});
this.realtimeDisposable = () => this.realtimeSubscription?.unsubscribe();
}
stop(): void {
if (this.realtimeDisposable) {
this.realtimeDisposable();
}
this.realtimeSubscription?.unsubscribe();
this.startVersion++;
this.liveQuery.stop();
this.loading$.setValue(false);
}
private async fetchCommentChanges() {
if (this.fetchingCommentChanges) {
this.pendingCommentChangeFetch = true;
return;
}
this.fetchingCommentChanges = true;
private async startLiveQueryAfterInitialLoad() {
const version = ++this.startVersion;
this.liveQuery.stop();
this.loading$.setValue(true);
try {
do {
this.pendingCommentChangeFetch = false;
const result = await this.store.listCommentChanges({
after: this.startCursor,
});
this.handleCommentChanges(result);
this.startCursor = result.endCursor;
} while (this.pendingCommentChangeFetch);
} catch (error) {
console.error('Failed to fetch comment changes:', error);
const allComments = await this.loadAllComments();
if (version !== this.startVersion) {
return;
}
this.revalidateCommentsInEditor();
this.comments$.setValue(allComments);
this.liveQuery.start();
} finally {
this.fetchingCommentChanges = false;
if (version === this.startVersion) {
this.loading$.setValue(false);
}
}
}
@@ -650,31 +632,9 @@ export class DocCommentEntity extends Entity<{
revalidate = effect(
switchMap(() => {
return fromPromise(async () => {
const allComments: DocComment[] = [];
let cursor = '';
let firstResult: DocCommentListResult | null = null;
this.revalidateCommentsInEditor();
// Fetch all pages of comments
while (true) {
const result = await this.store.listComments({ after: cursor });
if (!firstResult) {
firstResult = result;
// Store the startCursor from the first page for incremental changes
this.startCursor = result.startCursor;
}
allComments.push(...result.comments);
cursor = result.endCursor;
if (!result.hasNextPage) {
break;
}
}
return allComments;
}).pipe(
return fromPromise(() => this.loadAllComments()).pipe(
tap(allComments => {
this.revalidateCommentsInEditor();
// Update state with all comments
this.comments$.setValue(allComments);
}),
onStart(() => this.loading$.setValue(true)),
@@ -688,6 +648,27 @@ export class DocCommentEntity extends Entity<{
})
);
private async loadAllComments() {
const allComments: DocComment[] = [];
let cursor = '';
let firstResult: DocCommentListResult | null = null;
this.revalidateCommentsInEditor();
while (true) {
const result = await this.store.listComments({ after: cursor });
if (!firstResult) {
firstResult = result;
this.startCursor = result.startCursor;
}
allComments.push(...result.comments);
cursor = result.endCursor;
if (!result.hasNextPage) {
break;
}
}
return allComments;
}
private readonly revalidateCommentsInEditor = () => {
this.commentsInEditor$.setValue(this.getCommentsInEditor());
};
@@ -1,10 +1,11 @@
import { shallowEqual } from '@affine/component';
import type { TranscriptionBlockProps } from '@affine/core/blocksuite/ai/blocks/transcription-block/model';
import { RealtimeLiveQuery } from '@affine/core/modules/cloud/realtime/live-query';
import { DebugLogger } from '@affine/debug';
import { UserFriendlyError } from '@affine/error';
import { AiJobStatus } from '@affine/graphql';
import { AiJobStatus, type TranscriptionResultType } from '@affine/graphql';
import type { RealtimeTopicEventOf } from '@affine/realtime';
import { Entity, LiveData } from '@toeverything/infra';
import type { Subscription } from 'rxjs';
import type { DefaultServerService, WorkspaceServerService } from '../../cloud';
import { AuthService } from '../../cloud/services/auth';
@@ -65,7 +66,10 @@ export class AudioTranscriptionJob extends Entity<{
}
disposed = false;
private taskSubscription?: Subscription;
private taskLiveQuery?: RealtimeLiveQuery<
TranscriptionResultType | null,
RealtimeTopicEventOf<'copilot.transcript.task.changed'>
>;
private taskWaitReject?: (error: unknown) => void;
private readonly _status$ = new LiveData<TranscriptionStatus>({
@@ -205,45 +209,56 @@ export class AudioTranscriptionJob extends Entity<{
await new Promise<void>((resolve, reject) => {
this.taskWaitReject = reject;
this.taskSubscription = this.store
.subscribeTranscriptTask(taskId)
.subscribe({
next: event => {
if (
'type' in event ||
(event.status as AiJobStatus) === AiJobStatus.finished
) {
this.checkTranscriptTask(taskId).then(() => {
if (this.status$.value.status === AiJobStatus.finished) {
resolve();
}
}, reject);
} else if ((event.status as AiJobStatus) === AiJobStatus.failed) {
reject(
UserFriendlyError.fromAny(
event.error ?? 'Transcription job failed'
)
);
this.taskLiveQuery = new RealtimeLiveQuery({
request: () => this.store.getTranscriptTask(this.props.blobId, taskId),
subscribe: () => this.store.subscribeTranscriptTask(taskId),
applySnapshot: job => {
this.applyTranscriptTaskSnapshot(taskId, job).then(() => {
if (this.status$.value.status === AiJobStatus.finished) {
resolve();
}
},
error: reject,
});
}, reject);
},
applyEvent: event => {
if (event.status === AiJobStatus.failed) {
reject(
UserFriendlyError.fromAny(
event.error ?? 'Transcription job failed'
)
);
return 'applied';
}
return event.status === AiJobStatus.finished
? 'revalidate'
: 'applied';
},
onError: reject,
});
this.taskLiveQuery.start();
}).finally(() => {
this.taskWaitReject = undefined;
this.taskSubscription?.unsubscribe();
this.taskSubscription = undefined;
this.taskLiveQuery?.dispose();
this.taskLiveQuery = undefined;
});
}
private rejectTaskWait(error: unknown) {
const reject = this.taskWaitReject;
this.taskWaitReject = undefined;
this.taskSubscription?.unsubscribe();
this.taskSubscription = undefined;
this.taskLiveQuery?.dispose();
this.taskLiveQuery = undefined;
reject?.(error);
}
private async checkTranscriptTask(taskId: string) {
const job = await this.store.getTranscriptTask(this.props.blobId, taskId);
await this.applyTranscriptTaskSnapshot(taskId, job);
}
private async applyTranscriptTaskSnapshot(
taskId: string,
job: TranscriptionResultType | null
) {
if (
this.disposed ||
this.props.blockProps.jobId !== taskId ||
@@ -251,7 +266,6 @@ export class AudioTranscriptionJob extends Entity<{
) {
return;
}
const job = await this.store.getTranscriptTask(this.props.blobId, taskId);
if (!job || job.status === AiJobStatus.failed) {
logger.debug('Job failed during realtime status check', {
@@ -1,20 +1,8 @@
import {
catchErrorInto,
effect,
exhaustMapWithTrailing,
fromPromise,
LiveData,
onComplete,
OnEvent,
onStart,
Service,
smartRetry,
} from '@toeverything/infra';
import type { Subscription } from 'rxjs';
import { tap } from 'rxjs';
import { LiveData, OnEvent, Service } from '@toeverything/infra';
import { AccountChanged, type AuthService } from '../../cloud';
import { ServerStarted } from '../../cloud/events/server-started';
import { RealtimeLiveQuery } from '../../cloud/realtime/live-query';
import { ApplicationFocused } from '../../lifecycle';
import type { NbstoreService } from '../../storage';
import type { NotificationStore } from '../stores/notification';
@@ -36,30 +24,25 @@ export class NotificationCountService extends Service {
readonly count$ = LiveData.from(this.store.watchNotificationCountCache(), 0);
readonly isLoading$ = new LiveData(false);
readonly error$ = new LiveData<any>(null);
private subscription?: Subscription;
private readonly liveQuery = new RealtimeLiveQuery({
request: signal => this.requestCount(signal),
subscribe: () =>
this.nbstoreService.realtime.subscribe('notification.count.changed', {}),
applySnapshot: result => this.setCount(result.count),
applyEvent: event => {
this.setCount(event.count);
return 'applied';
},
onError: error => this.error$.setValue(error),
});
revalidate = effect(
exhaustMapWithTrailing(() => {
return fromPromise(() => {
if (!this.loggedIn$.value) {
return Promise.resolve(0);
}
return this.nbstoreService.realtime
.request('notification.count.get', {}, { timeoutMs: 10000 })
.then(result => result.count);
}).pipe(
tap(result => {
this.setCount(result ?? 0);
}),
smartRetry(),
catchErrorInto(this.error$),
onStart(() => {
this.isLoading$.setValue(true);
}),
onComplete(() => this.isLoading$.setValue(false))
);
})
);
revalidate = () => {
if (!this.loggedIn$.value) {
this.setCount(0);
return;
}
this.liveQuery.revalidate();
};
handleApplicationFocused() {
this.revalidate();
@@ -76,33 +59,34 @@ export class NotificationCountService extends Service {
}
setCount(count: number) {
this.error$.setValue(null);
this.store.setNotificationCountCache(count);
}
override dispose(): void {
super.dispose();
this.revalidate.unsubscribe();
this.subscription?.unsubscribe();
this.liveQuery.dispose();
}
private subscribe() {
this.subscription?.unsubscribe();
if (!this.loggedIn$.value) {
this.liveQuery.stop();
this.setCount(0);
return;
}
this.subscription = this.nbstoreService.realtime
.subscribe('notification.count.changed', {})
.subscribe({
next: event => {
if ('type' in event) {
this.revalidate();
} else {
this.setCount(event.count);
}
},
error: error => {
this.error$.setValue(error);
},
});
this.liveQuery.start();
}
private async requestCount(signal: AbortSignal) {
this.isLoading$.setValue(true);
try {
return await this.nbstoreService.realtime.request(
'notification.count.get',
{},
{ signal, timeoutMs: 10000 }
);
} finally {
this.isLoading$.setValue(false);
}
}
}
@@ -1,17 +1,8 @@
import { RealtimeLiveQuery } from '@affine/core/modules/cloud/realtime/live-query';
import type { WorkspaceService } from '@affine/core/modules/workspace';
import type { RealtimeTopicEventOf } from '@affine/realtime';
import { logger } from '@sentry/react';
import {
catchErrorInto,
effect,
Entity,
fromPromise,
LiveData,
onComplete,
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY, type Subscription } from 'rxjs';
import { exhaustMap, mergeMap } from 'rxjs/operators';
import { Entity, LiveData } from '@toeverything/infra';
import type { EmbeddingStore } from '../stores/embedding';
import type { LocalAttachmentFile } from '../types';
@@ -26,8 +17,31 @@ export class EmbeddingProgress extends Entity {
error$ = new LiveData<any>(null);
loading$ = new LiveData(true);
private progressSubscription?: Subscription;
uploadingAttachments$ = new LiveData<LocalAttachmentFile[]>([]);
private started = false;
private readonly liveQuery = new RealtimeLiveQuery<
Progress,
RealtimeTopicEventOf<'workspace.embedding.progress.changed'>
>({
request: signal => this.requestProgress(signal),
subscribe: () =>
this.store.subscribeEmbeddingProgress(this.workspaceService.workspace.id),
applySnapshot: progress => this.applyProgress(progress),
applyEvent: event => {
if (
typeof event.embedded === 'number' &&
typeof event.total === 'number'
) {
this.applyProgress({ embedded: event.embedded, total: event.total });
return 'applied';
}
return 'revalidate';
},
onError: error => {
this.error$.setValue(error);
logger.error('Failed to fetch workspace embedding progress', { error });
},
});
constructor(
private readonly workspaceService: WorkspaceService,
@@ -37,48 +51,46 @@ export class EmbeddingProgress extends Entity {
}
startEmbeddingProgress() {
this.stopEmbeddingProgress();
this.progressSubscription = this.store
.subscribeEmbeddingProgress(this.workspaceService.workspace.id)
.subscribe({
next: () => this.getEmbeddingProgress(),
error: error => this.error$.setValue(error),
});
this.getEmbeddingProgress();
this.started = true;
this.liveQuery.start();
}
stopEmbeddingProgress() {
this.progressSubscription?.unsubscribe();
this.progressSubscription = undefined;
this.started = false;
this.liveQuery.stop();
}
getEmbeddingProgress = effect(
exhaustMap(() => {
return fromPromise(signal =>
this.store.getEmbeddingProgress(
this.workspaceService.workspace.id,
signal
)
).pipe(
smartRetry(),
mergeMap(value => {
this.progress$.next(value);
if (value && value.embedded === value.total && value.total > 0) {
this.stopEmbeddingProgress();
}
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error('Failed to fetch workspace embedding progress', error);
}),
onStart(() => this.loading$.setValue(true)),
onComplete(() => this.loading$.setValue(false))
);
})
);
getEmbeddingProgress = () => {
if (this.started) {
this.liveQuery.revalidate();
return;
}
this.requestProgress(new AbortController().signal).then(
progress => this.applyProgress(progress),
error => this.error$.setValue(error)
);
};
override dispose(): void {
this.progressSubscription?.unsubscribe();
this.getEmbeddingProgress.unsubscribe();
this.liveQuery.dispose();
}
private applyProgress(value: Progress | null) {
this.progress$.next(value);
if (value && value.embedded === value.total && value.total > 0) {
this.stopEmbeddingProgress();
}
}
private async requestProgress(signal: AbortSignal) {
this.loading$.setValue(true);
try {
return await this.store.getEmbeddingProgress(
this.workspaceService.workspace.id,
signal
);
} finally {
this.loading$.setValue(false);
}
}
}
@@ -127,10 +127,8 @@ test.describe('AISettings/Embedding', () => {
await page.getByTestId('embedding-progress-wrapper');
const progress = await page.getByTestId('embedding-progress');
// wait for the progress to be loading
const title = await page.getByTestId('embedding-progress-title');
await expect(title).toHaveText(/Loading sync status/i);
await expect(progress).not.toBeVisible();
await expect(title).not.toHaveAttribute('data-progress', 'loading');
const count = await page.getByTestId('embedding-progress-count');
await expect(count).toHaveText(/\d+\/\d+/);