refactor(core): workspace mutation effect (#12488)

### TL;DR

* refactor: workspace embedding mutation effect
* tests: error display for workspace embedding
This commit is contained in:
yoyoyohamapi
2025-05-26 07:17:37 +00:00
parent da22391910
commit c06c72e108
6 changed files with 231 additions and 172 deletions

View File

@@ -11,15 +11,8 @@ import {
onStart,
smartRetry,
} from '@toeverything/infra';
import { EMPTY, interval, of, Subject } from 'rxjs';
import {
concatMap,
exhaustMap,
mergeMap,
switchMap,
takeUntil,
tap,
} from 'rxjs/operators';
import { EMPTY, interval, Subject } from 'rxjs';
import { exhaustMap, mergeMap, switchMap, takeUntil } from 'rxjs/operators';
import { COUNT_PER_PAGE } from '../constants';
import type { EmbeddingStore } from '../stores/embedding';
@@ -110,31 +103,13 @@ export class Embedding extends Entity {
})
);
setEnabled = effect(
exhaustMap((enabled: boolean) => {
return fromPromise(signal =>
this.store.updateEnabled(
this.workspaceService.workspace.id,
enabled,
signal
)
).pipe(
smartRetry(),
concatMap(() => {
this.getEnabled();
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to update workspace doc embedding enabled',
error
);
}),
onStart(() => this.isEnabledLoading$.setValue(true)),
onComplete(() => this.isEnabledLoading$.setValue(false))
);
})
);
setEnabled = (enabled: boolean) => {
return this.store
.updateEnabled(this.workspaceService.workspace.id, enabled)
.then(() => {
this.getEnabled();
});
};
getIgnoredDocs = effect(
exhaustMap(() => {
@@ -158,30 +133,19 @@ export class Embedding extends Entity {
})
);
updateIgnoredDocs = effect(
exhaustMap(({ add, remove }: { add: string[]; remove: string[] }) => {
return fromPromise(signal =>
this.store.updateIgnoredDocs(
this.workspaceService.workspace.id,
add,
remove,
signal
)
).pipe(
smartRetry(),
concatMap(() => {
this.getIgnoredDocs();
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to update workspace doc embedding ignored docs',
error
);
})
);
})
);
updateIgnoredDocs = ({
add,
remove,
}: {
add: string[];
remove: string[];
}) => {
return this.store
.updateIgnoredDocs(this.workspaceService.workspace.id, add, remove)
.then(() => {
this.getIgnoredDocs();
});
};
getAttachments = effect(
exhaustMap((pagination: PaginationInput) => {
@@ -219,95 +183,60 @@ export class Embedding extends Entity {
})
);
addAttachments = effect(
// Support parallel upload
mergeMap((files: File[]) => {
const generateLocalId = () =>
Math.random().toString(36).slice(2) + Date.now();
const localAttachments: LocalAttachmentFile[] = files.map(file => ({
localId: generateLocalId(),
fileName: file.name,
mimeType: file.type,
size: file.size,
createdAt: file.lastModified,
status: 'uploading',
}));
addAttachments = (files: File[]) => {
const generateLocalId = () =>
Math.random().toString(36).slice(2) + Date.now();
const localAttachments: LocalAttachmentFile[] = files.map(file => ({
localId: generateLocalId(),
fileName: file.name,
mimeType: file.type,
size: file.size,
createdAt: file.lastModified,
status: 'uploading',
}));
return of({ files, localAttachments }).pipe(
// Refresh uploading attachments immediately
tap(({ localAttachments }) => {
this.uploadingAttachments$.next([
...localAttachments,
...this.uploadingAttachments$.value,
]);
}),
// Uploading embedding files
switchMap(({ files }) => {
return fromPromise(signal =>
this.store.addEmbeddingFiles(
this.workspaceService.workspace.id,
files,
signal
)
);
}),
// Refresh uploading attachments
tap(() => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.filter(
att => !localAttachments.some(l => l.localId === att.localId)
)
);
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
}),
catchErrorInto(this.error$, error => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.map(att =>
localAttachments.some(l => l.localId === att.localId)
? { ...att, status: 'error', errorMessage: String(error) }
: att
)
);
logger.error(
'Failed to add workspace doc embedding attachments',
error
);
})
);
})
);
this.uploadingAttachments$.next([
...localAttachments,
...this.uploadingAttachments$.value,
]);
removeAttachment = effect(
exhaustMap((id: string) => {
const localIndex = this.uploadingAttachments$.value.findIndex(
att => att.localId === id
);
if (localIndex !== -1) {
this.store
.addEmbeddingFiles(this.workspaceService.workspace.id, files)
.then(() => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.filter(att => att.localId !== id)
this.uploadingAttachments$.value.filter(
att => !localAttachments.some(l => l.localId === att.localId)
)
);
return EMPTY;
}
return fromPromise(signal =>
this.store.removeEmbeddingFile(
this.workspaceService.workspace.id,
id,
signal
)
).pipe(
concatMap(() => {
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
return EMPTY;
}),
catchErrorInto(this.error$, error => {
logger.error(
'Failed to remove workspace doc embedding attachment',
error
);
})
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
})
.catch(error => {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.map(att =>
localAttachments.some(l => l.localId === att.localId)
? { ...att, status: 'error', errorMessage: String(error) }
: att
)
);
});
};
removeAttachment = (id: string) => {
const localIndex = this.uploadingAttachments$.value.findIndex(
att => att.localId === id
);
if (localIndex !== -1) {
this.uploadingAttachments$.next(
this.uploadingAttachments$.value.filter(att => att.localId !== id)
);
})
);
return Promise.resolve();
}
return this.store
.removeEmbeddingFile(this.workspaceService.workspace.id, id)
.then(() => {
this.getAttachments({ first: COUNT_PER_PAGE, after: null });
});
};
startEmbeddingProgressPolling() {
this.stopEmbeddingProgressPolling();
@@ -355,10 +284,6 @@ export class Embedding extends Entity {
this.getEnabled.unsubscribe();
this.getAttachments.unsubscribe();
this.getIgnoredDocs.unsubscribe();
this.updateIgnoredDocs.unsubscribe();
this.addAttachments.unsubscribe();
this.removeAttachment.unsubscribe();
this.setEnabled.unsubscribe();
this.stopEmbeddingProgress$.next();
this.getEmbeddingProgress.unsubscribe();
}

View File

@@ -1,4 +1,4 @@
import { Button, Switch } from '@affine/component';
import { Button, notify, Switch } from '@affine/component';
import {
SettingHeader,
SettingRow,
@@ -6,6 +6,7 @@ import {
} from '@affine/component/setting-components';
import { Upload } from '@affine/core/components/pure/file-upload';
import { WorkspaceDialogService } from '@affine/core/modules/dialogs';
import { UserFriendlyError } from '@affine/error';
import { useI18n } from '@affine/i18n';
import track from '@affine/track';
import { useLiveData, useService } from '@toeverything/infra';
@@ -48,9 +49,19 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
control: 'Workspace embedding',
option: checked ? 'on' : 'off',
});
embeddingService.embedding.setEnabled(checked);
embeddingService.embedding.setEnabled(checked).catch(error => {
const err = UserFriendlyError.fromAny(error);
notify.error({
title:
t[
'com.affine.settings.workspace.indexer-embedding.embedding.switch.error'
](),
message: t[`error.${err.name}`](err.data),
});
});
},
[embeddingService.embedding]
[embeddingService.embedding, t]
);
const handleAttachmentUpload = useCallback(
@@ -67,9 +78,18 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
const handleAttachmentsDelete = useCallback(
(fileId: string) => {
embeddingService.embedding.removeAttachment(fileId);
embeddingService.embedding.removeAttachment(fileId).catch(error => {
const err = UserFriendlyError.fromAny(error);
notify.error({
title:
t[
'com.affine.settings.workspace.indexer-embedding.embedding.remove-attachment.error'
](),
message: t[`error.${err.name}`](err.data),
});
});
},
[embeddingService.embedding]
[embeddingService.embedding, t]
);
const handleAttachmentsPageChange = useCallback(
@@ -102,7 +122,18 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
});
const add = selectedIds.filter(id => !initialIds?.includes(id));
const remove = initialIds?.filter(id => !selectedIds.includes(id));
embeddingService.embedding.updateIgnoredDocs({ add, remove });
embeddingService.embedding
.updateIgnoredDocs({ add, remove })
.catch(error => {
const err = UserFriendlyError.fromAny(error);
notify.error({
title:
t[
'com.affine.settings.workspace.indexer-embedding.embedding.update-ignored-docs.error'
](),
message: t[`error.${err.name}`](err.data),
});
});
}
);
}, [
@@ -110,6 +141,7 @@ export const EmbeddingSettings: React.FC<EmbeddingSettingsProps> = () => {
isIgnoredDocsLoading,
workspaceDialogService,
embeddingService.embedding,
t,
]);
useEffect(() => {