refactor(core): workspace embedding entities (#12490)

### TL;DR

refactor: split workspace embedding module entities into:

* additional-attachments
* ignored-docs
* embedding-enabled
* embedding-progress
This commit is contained in:
yoyoyohamapi
2025-05-26 07:17:38 +00:00
parent c06c72e108
commit 0e8f19b92c
9 changed files with 442 additions and 333 deletions

View File

@@ -0,0 +1,153 @@
import type { WorkspaceService } from '@affine/core/modules/workspace';
import type { PaginationInput } from '@affine/graphql';
import {
catchErrorInto,
effect,
Entity,
fromPromise,
LiveData,
onComplete,
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY } from 'rxjs';
import { exhaustMap, mergeMap } from 'rxjs/operators';
import { COUNT_PER_PAGE } from '../constants';
import type { EmbeddingStore } from '../stores/embedding';
import type { LocalAttachmentFile, PersistedAttachmentFile } from '../types';
import { logger } from '../utils';
interface Attachments {
totalCount: number;
pageInfo: {
endCursor: string | null;
hasNextPage: boolean;
};
edges: {
node: PersistedAttachmentFile;
}[];
}
export class AdditionalAttachments extends Entity {
error$ = new LiveData<any>(null);
attachments$ = new LiveData<Attachments>({
edges: [],
pageInfo: {
endCursor: null,
hasNextPage: false,
},
totalCount: 0,
});
loading$ = new LiveData(true);
uploadingAttachments$ = new LiveData<LocalAttachmentFile[]>([]);
constructor(
private readonly workspaceService: WorkspaceService,
private readonly store: EmbeddingStore
) {
super();
}
mergedAttachments$ = LiveData.computed(get => {
const uploading = get(this.uploadingAttachments$);
const uploaded = get(this.attachments$).edges.map(edge => edge.node);
return [...uploading, ...uploaded].slice(0, 10);
});
getAttachments = effect(
exhaustMap((pagination: PaginationInput) => {
return fromPromise(signal =>
this.store.getEmbeddingFiles(
this.workspaceService.workspace.id,
pagination,
signal
)
).pipe(
smartRetry(),
mergeMap(value => {
const patched = {
...value,
edges: value.edges.map(edge => ({
...edge,
node: {
...edge.node,
status: 'uploaded' as const,
},
})),
};
this.attachments$.next(patched);
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace doc embedding attachments',
error
);
}),
onStart(() => this.loading$.setValue(true)),
onComplete(() => this.loading$.setValue(false))
);
})
);
addAttachments = (files: File[]) => {
const generateLocalId = () =>
Math.random().toString(36).slice(2) + Date.now();
const localAttachments: LocalAttachmentFile[] = files.map(file => ({
localId: generateLocalId(),
fileName: file.name,
mimeType: file.type,
size: file.size,
createdAt: file.lastModified,
status: 'uploading',
}));
this.uploadingAttachments$.next([
...localAttachments,
...this.uploadingAttachments$.value,
]);
this.store
.addEmbeddingFiles(this.workspaceService.workspace.id, files)
.then(() => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.filter(
att => !localAttachments.some(l => l.localId === att.localId)
)
);
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
})
.catch(error => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.map(att =>
localAttachments.some(l => l.localId === att.localId)
? { ...att, status: 'error', errorMessage: String(error) }
: att
)
);
});
};
removeAttachment = (id: string) => {
const localIndex = this.uploadingAttachments$.value.findIndex(
att => att.localId === id
);
if (localIndex !== -1) {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.filter(att => att.localId !== id)
);
return Promise.resolve();
}
return this.store
.removeEmbeddingFile(this.workspaceService.workspace.id, id)
.then(() => {
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
});
};
override dispose(): void {
this.getAttachments.unsubscribe();
}
}

View File

@@ -0,0 +1,63 @@
import type { WorkspaceService } from '@affine/core/modules/workspace';
import { logger } from '@sentry/react';
import {
catchErrorInto,
effect,
Entity,
fromPromise,
LiveData,
onComplete,
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY } from 'rxjs';
import { exhaustMap, mergeMap } from 'rxjs/operators';
import type { EmbeddingStore } from '../stores/embedding';
export class EmbeddingEnabled extends Entity {
enabled$ = new LiveData<boolean | null>(null);
loading$ = new LiveData(true);
error$ = new LiveData<any>(null);
constructor(
private readonly workspaceService: WorkspaceService,
private readonly store: EmbeddingStore
) {
super();
}
getEnabled = effect(
exhaustMap(() => {
return fromPromise(signal =>
this.store.getEnabled(this.workspaceService.workspace.id, signal)
).pipe(
smartRetry(),
mergeMap(value => {
this.enabled$.next(value);
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace doc embedding enabled',
error
);
}),
onStart(() => this.loading$.setValue(true)),
onComplete(() => this.loading$.setValue(false))
);
})
);
setEnabled = (enabled: boolean) => {
return this.store
.updateEnabled(this.workspaceService.workspace.id, enabled)
.then(() => {
this.getEnabled();
});
};
override dispose(): void {
this.getEnabled.unsubscribe();
}
}

View File

@@ -0,0 +1,86 @@
import type { WorkspaceService } from '@affine/core/modules/workspace';
import { logger } from '@sentry/react';
import {
catchErrorInto,
effect,
Entity,
fromPromise,
LiveData,
onComplete,
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY, interval, Subject } from 'rxjs';
import { exhaustMap, mergeMap, switchMap, takeUntil } from 'rxjs/operators';
import type { EmbeddingStore } from '../stores/embedding';
import type { LocalAttachmentFile } from '../types';
interface Progress {
embedded: number;
total: number;
}
export class EmbeddingProgress extends Entity {
progress$ = new LiveData<Progress | null>(null);
error$ = new LiveData<any>(null);
loading$ = new LiveData(true);
private readonly EMBEDDING_PROGRESS_POLL_INTERVAL = 3000;
private readonly stopEmbeddingProgress$ = new Subject<void>();
uploadingAttachments$ = new LiveData<LocalAttachmentFile[]>([]);
constructor(
private readonly workspaceService: WorkspaceService,
private readonly store: EmbeddingStore
) {
super();
}
startEmbeddingProgressPolling() {
this.stopEmbeddingProgressPolling();
this.getEmbeddingProgress();
}
stopEmbeddingProgressPolling() {
this.stopEmbeddingProgress$.next();
}
getEmbeddingProgress = effect(
exhaustMap(() => {
return interval(this.EMBEDDING_PROGRESS_POLL_INTERVAL).pipe(
takeUntil(this.stopEmbeddingProgress$),
switchMap(() =>
fromPromise(signal =>
this.store.getEmbeddingProgress(
this.workspaceService.workspace.id,
signal
)
).pipe(
smartRetry(),
mergeMap(value => {
this.progress$.next(value);
if (value && value.embedded === value.total) {
this.stopEmbeddingProgressPolling();
}
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace embedding progress',
error
);
}),
onStart(() => this.loading$.setValue(true)),
onComplete(() => this.loading$.setValue(false))
)
)
);
})
);
override dispose(): void {
this.stopEmbeddingProgress$.next();
this.getEmbeddingProgress.unsubscribe();
}
}

View File

@@ -1,290 +0,0 @@
import type { WorkspaceService } from '@affine/core/modules/workspace';
import { DebugLogger } from '@affine/debug';
import type { PaginationInput } from '@affine/graphql';
import {
catchErrorInto,
effect,
Entity,
fromPromise,
LiveData,
onComplete,
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY, interval, Subject } from 'rxjs';
import { exhaustMap, mergeMap, switchMap, takeUntil } from 'rxjs/operators';
import { COUNT_PER_PAGE } from '../constants';
import type { EmbeddingStore } from '../stores/embedding';
import type {
IgnoredDoc,
LocalAttachmentFile,
PersistedAttachmentFile,
} from '../types';
const logger = new DebugLogger('WorkspaceEmbedding');
export interface EmbeddingConfig {
enabled: boolean;
}
interface Attachments {
totalCount: number;
pageInfo: {
endCursor: string | null;
hasNextPage: boolean;
};
edges: {
node: PersistedAttachmentFile;
}[];
}
type IgnoredDocs = IgnoredDoc[];
interface EmbeddingProgress {
embedded: number;
total: number;
}
export class Embedding extends Entity {
enabled$ = new LiveData<boolean | null>(null);
error$ = new LiveData<any>(null);
attachments$ = new LiveData<Attachments>({
edges: [],
pageInfo: {
endCursor: null,
hasNextPage: false,
},
totalCount: 0,
});
ignoredDocs$ = new LiveData<IgnoredDocs>([]);
isEnabledLoading$ = new LiveData(true);
isAttachmentsLoading$ = new LiveData(true);
isIgnoredDocsLoading$ = new LiveData(true);
embeddingProgress$ = new LiveData<EmbeddingProgress | null>(null);
isEmbeddingProgressLoading$ = new LiveData(true);
private readonly EMBEDDING_PROGRESS_POLL_INTERVAL = 3000;
private readonly stopEmbeddingProgress$ = new Subject<void>();
uploadingAttachments$ = new LiveData<LocalAttachmentFile[]>([]);
constructor(
private readonly workspaceService: WorkspaceService,
private readonly store: EmbeddingStore
) {
super();
}
mergedAttachments$ = LiveData.computed(get => {
const uploading = get(this.uploadingAttachments$);
const uploaded = get(this.attachments$).edges.map(edge => edge.node);
return [...uploading, ...uploaded].slice(0, 10);
});
getEnabled = effect(
exhaustMap(() => {
return fromPromise(signal =>
this.store.getEnabled(this.workspaceService.workspace.id, signal)
).pipe(
smartRetry(),
mergeMap(value => {
this.enabled$.next(value);
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace doc embedding enabled',
error
);
}),
onStart(() => this.isEnabledLoading$.setValue(true)),
onComplete(() => this.isEnabledLoading$.setValue(false))
);
})
);
setEnabled = (enabled: boolean) => {
return this.store
.updateEnabled(this.workspaceService.workspace.id, enabled)
.then(() => {
this.getEnabled();
});
};
getIgnoredDocs = effect(
exhaustMap(() => {
return fromPromise(signal =>
this.store.getIgnoredDocs(this.workspaceService.workspace.id, signal)
).pipe(
smartRetry(),
mergeMap(value => {
this.ignoredDocs$.next(value);
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace doc embedding ignored docs',
error
);
}),
onStart(() => this.isIgnoredDocsLoading$.setValue(true)),
onComplete(() => this.isIgnoredDocsLoading$.setValue(false))
);
})
);
updateIgnoredDocs = ({
add,
remove,
}: {
add: string[];
remove: string[];
}) => {
return this.store
.updateIgnoredDocs(this.workspaceService.workspace.id, add, remove)
.then(() => {
this.getIgnoredDocs();
});
};
getAttachments = effect(
exhaustMap((pagination: PaginationInput) => {
return fromPromise(signal =>
this.store.getEmbeddingFiles(
this.workspaceService.workspace.id,
pagination,
signal
)
).pipe(
smartRetry(),
mergeMap(value => {
const patched = {
...value,
edges: value.edges.map(edge => ({
...edge,
node: {
...edge.node,
status: 'uploaded' as const,
},
})),
};
this.attachments$.next(patched);
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace doc embedding attachments',
error
);
}),
onStart(() => this.isAttachmentsLoading$.setValue(true)),
onComplete(() => this.isAttachmentsLoading$.setValue(false))
);
})
);
addAttachments = (files: File[]) => {
const generateLocalId = () =>
Math.random().toString(36).slice(2) + Date.now();
const localAttachments: LocalAttachmentFile[] = files.map(file => ({
localId: generateLocalId(),
fileName: file.name,
mimeType: file.type,
size: file.size,
createdAt: file.lastModified,
status: 'uploading',
}));
this.uploadingAttachments$.next([
...localAttachments,
...this.uploadingAttachments$.value,
]);
this.store
.addEmbeddingFiles(this.workspaceService.workspace.id, files)
.then(() => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.filter(
att => !localAttachments.some(l => l.localId === att.localId)
)
);
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
})
.catch(error => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.map(att =>
localAttachments.some(l => l.localId === att.localId)
? { ...att, status: 'error', errorMessage: String(error) }
: att
)
);
});
};
removeAttachment = (id: string) => {
const localIndex = this.uploadingAttachments$.value.findIndex(
att => att.localId === id
);
if (localIndex !== -1) {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.filter(att => att.localId !== id)
);
return Promise.resolve();
}
return this.store
.removeEmbeddingFile(this.workspaceService.workspace.id, id)
.then(() => {
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
});
};
startEmbeddingProgressPolling() {
this.stopEmbeddingProgressPolling();
this.getEmbeddingProgress();
}
stopEmbeddingProgressPolling() {
this.stopEmbeddingProgress$.next();
}
getEmbeddingProgress = effect(
exhaustMap(() => {
return interval(this.EMBEDDING_PROGRESS_POLL_INTERVAL).pipe(
takeUntil(this.stopEmbeddingProgress$),
switchMap(() =>
fromPromise(signal =>
this.store.getEmbeddingProgress(
this.workspaceService.workspace.id,
signal
)
).pipe(
smartRetry(),
mergeMap(value => {
this.embeddingProgress$.next(value);
if (value && value.embedded === value.total) {
this.stopEmbeddingProgressPolling();
}
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace embedding progress',
error
);
}),
onStart(() => this.isEmbeddingProgressLoading$.setValue(true)),
onComplete(() => this.isEmbeddingProgressLoading$.setValue(false))
)
)
);
})
);
override dispose(): void {
this.getEnabled.unsubscribe();
this.getAttachments.unsubscribe();
this.getIgnoredDocs.unsubscribe();
this.stopEmbeddingProgress$.next();
this.getEmbeddingProgress.unsubscribe();
}
}

View File

@@ -0,0 +1,70 @@
import type { WorkspaceService } from '@affine/core/modules/workspace';
import {
catchErrorInto,
effect,
Entity,
fromPromise,
LiveData,
onComplete,
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY } from 'rxjs';
import { exhaustMap, mergeMap } from 'rxjs/operators';
import type { EmbeddingStore } from '../stores/embedding';
import type { IgnoredDoc } from '../types';
import { logger } from '../utils';
export class IgnoredDocs extends Entity {
docs$ = new LiveData<IgnoredDoc[]>([]);
error$ = new LiveData<any>(null);
loading$ = new LiveData(true);
constructor(
private readonly workspaceService: WorkspaceService,
private readonly store: EmbeddingStore
) {
super();
}
getIgnoredDocs = effect(
exhaustMap(() => {
return fromPromise(signal =>
this.store.getIgnoredDocs(this.workspaceService.workspace.id, signal)
).pipe(
smartRetry(),
mergeMap(value => {
this.docs$.next(value);
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to fetch workspace doc embedding ignored docs',
error
);
}),
onStart(() => this.loading$.setValue(true)),
onComplete(() => this.loading$.setValue(false))
);
})
);
updateIgnoredDocs = ({
add,
remove,
}: {
add: string[];
remove: string[];
}) => {
return this.store
.updateIgnoredDocs(this.workspaceService.workspace.id, add, remove)
.then(() => {
this.getIgnoredDocs();
});
};
override dispose(): void {
this.getIgnoredDocs.unsubscribe();
}
}

View File

@@ -5,7 +5,10 @@ import {
} from '@affine/core/modules/workspace';
import { type Framework } from '@toeverything/infra';
import { Embedding } from './entities/embedding';
import { AdditionalAttachments } from './entities/additional-attachments';
import { EmbeddingEnabled } from './entities/embedding-enabled';
import { EmbeddingProgress } from './entities/embedding-progress';
import { IgnoredDocs } from './entities/ignored-docs';
import { EmbeddingService } from './services/embedding';
import { EmbeddingStore } from './stores/embedding';
@@ -14,7 +17,10 @@ export function configureIndexerEmbeddingModule(framework: Framework) {
.scope(WorkspaceScope)
.service(EmbeddingService)
.store(EmbeddingStore, [WorkspaceServerService])
.entity(Embedding, [WorkspaceService, EmbeddingStore]);
.entity(EmbeddingEnabled, [WorkspaceService, EmbeddingStore])
.entity(AdditionalAttachments, [WorkspaceService, EmbeddingStore])
.entity(IgnoredDocs, [WorkspaceService, EmbeddingStore])
.entity(EmbeddingProgress, [WorkspaceService, EmbeddingStore]);
}
export { EmbeddingSettings } from './view';

View File

@@ -1,7 +1,13 @@
import { Service } from '@toeverything/infra';
import { Embedding } from '../entities/embedding';
import { AdditionalAttachments } from '../entities/additional-attachments';
import { EmbeddingEnabled } from '../entities/embedding-enabled';
import { EmbeddingProgress } from '../entities/embedding-progress';
import { IgnoredDocs } from '../entities/ignored-docs';
export class EmbeddingService extends Service {
embedding = this.framework.createEntity(Embedding);
embeddingEnabled = this.framework.createEntity(EmbeddingEnabled);
additionalAttachments = this.framework.createEntity(AdditionalAttachments);
ignoredDocs = this.framework.createEntity(IgnoredDocs);
embeddingProgress = this.framework.createEntity(EmbeddingProgress);
}

View File

@@ -1,3 +1,5 @@
import { DebugLogger } from '@affine/debug';
import type {
AttachmentFile,
ErrorAttachmentFile,
@@ -36,3 +38,5 @@ export function getAttachmentId(attachment: AttachmentFile): string {
}
return attachment.localId;
}
export const logger = new DebugLogger('WorkspaceEmbedding');

View File

@@ -24,22 +24,26 @@ interface EmbeddingSettingsProps {}
export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
const t = useI18n();
const embeddingService = useService(EmbeddingService);
const embeddingEnabled = useLiveData(embeddingService.embedding.enabled$);
const { totalCount } = useLiveData(embeddingService.embedding.attachments$);
const attachments = useLiveData(
embeddingService.embedding.mergedAttachments$
);
const ignoredDocs = useLiveData(embeddingService.embedding.ignoredDocs$);
const embeddingProgress = useLiveData(
embeddingService.embedding.embeddingProgress$
);
const isIgnoredDocsLoading = useLiveData(
embeddingService.embedding.isIgnoredDocsLoading$
);
const workspaceDialogService = useService(WorkspaceDialogService);
const embeddingEnabled = useLiveData(
embeddingService.embeddingEnabled.enabled$
);
const attachments = useLiveData(
embeddingService.additionalAttachments.mergedAttachments$
);
const ignoredDocs = useLiveData(embeddingService.ignoredDocs.docs$);
const embeddingProgress = useLiveData(
embeddingService.embeddingProgress.progress$
);
const { totalCount } = useLiveData(
embeddingService.additionalAttachments.attachments$
);
const isIgnoredDocsLoading = useLiveData(
embeddingService.ignoredDocs.loading$
);
const isEnabledLoading = useLiveData(
embeddingService.embedding.isEnabledLoading$
embeddingService.embeddingEnabled.loading$
);
const handleEmbeddingToggle = useCallback(
@@ -50,7 +54,7 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
option: checked ? 'on' : 'off',
});
embeddingService.embedding.setEnabled(checked).catch(error => {
embeddingService.embeddingEnabled.setEnabled(checked).catch(error => {
const err = UserFriendlyError.fromAny(error);
notify.error({
title:
@@ -61,7 +65,7 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
});
});
},
[embeddingService.embedding, t]
[embeddingService.embeddingEnabled, t]
);
const handleAttachmentUpload = useCallback(
@@ -71,34 +75,36 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
control: 'Select doc',
docType: file.type,
});
embeddingService.embedding.addAttachments([file]);
embeddingService.additionalAttachments.addAttachments([file]);
},
[embeddingService.embedding]
[embeddingService.additionalAttachments]
);
const handleAttachmentsDelete = useCallback(
(fileId: string) => {
embeddingService.embedding.removeAttachment(fileId).catch(error => {
const err = UserFriendlyError.fromAny(error);
notify.error({
title:
t[
'com.affine.settings.workspace.indexer-embedding.embedding.remove-attachment.error'
](),
message: t[`error.${err.name}`](err.data),
embeddingService.additionalAttachments
.removeAttachment(fileId)
.catch(error => {
const err = UserFriendlyError.fromAny(error);
notify.error({
title:
t[
'com.affine.settings.workspace.indexer-embedding.embedding.remove-attachment.error'
](),
message: t[`error.${err.name}`](err.data),
});
});
});
},
[embeddingService.embedding, t]
[embeddingService.additionalAttachments, t]
);
const handleAttachmentsPageChange = useCallback(
(offset: number) => {
embeddingService.embedding.getAttachments({
embeddingService.additionalAttachments.getAttachments({
offset,
});
},
[embeddingService.embedding]
[embeddingService.additionalAttachments]
);
const handleSelectDoc = useCallback(() => {
@@ -122,7 +128,7 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
});
const add = selectedIds.filter(id => !initialIds?.includes(id));
const remove = initialIds?.filter(id => !selectedIds.includes(id));
embeddingService.embedding
embeddingService.ignoredDocs
.updateIgnoredDocs({ add, remove })
.catch(error => {
const err = UserFriendlyError.fromAny(error);
@@ -140,24 +146,29 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
ignoredDocs,
isIgnoredDocsLoading,
workspaceDialogService,
embeddingService.embedding,
embeddingService.ignoredDocs,
t,
]);
useEffect(() => {
embeddingService.embedding.startEmbeddingProgressPolling();
embeddingService.embedding.getEnabled();
embeddingService.embedding.getAttachments({
embeddingService.embeddingProgress.startEmbeddingProgressPolling();
embeddingService.embeddingEnabled.getEnabled();
embeddingService.additionalAttachments.getAttachments({
first: COUNT_PER_PAGE,
after: null,
});
embeddingService.embedding.getIgnoredDocs();
embeddingService.embedding.getEmbeddingProgress();
embeddingService.ignoredDocs.getIgnoredDocs();
embeddingService.embeddingProgress.getEmbeddingProgress();
return () => {
embeddingService.embedding.stopEmbeddingProgressPolling();
embeddingService.embeddingProgress.stopEmbeddingProgressPolling();
};
}, [embeddingService.embedding]);
}, [
embeddingService.embeddingProgress,
embeddingService.embeddingEnabled,
embeddingService.additionalAttachments,
embeddingService.ignoredDocs,
]);
return (
<>