mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-17 22:37:04 +08:00
feat(infra): framework
This commit is contained in:
@@ -4,10 +4,43 @@ import { type OperatorFunction, Subject } from 'rxjs';
|
||||
|
||||
const logger = new DebugLogger('effect');
|
||||
|
||||
export interface Effect<T> {
|
||||
(value: T): void;
|
||||
}
|
||||
export type Effect<T> = (T | undefined extends T // hack to detect if T is unknown
|
||||
? () => void
|
||||
: (value: T) => void) & {
|
||||
// unsubscribe effect, all ongoing effects will be cancelled.
|
||||
unsubscribe: () => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Create an effect.
|
||||
*
|
||||
* `effect( op1, op2, op3, ... )`
|
||||
*
|
||||
* You can think of an effect as a pipeline. When the effect is called, argument will be sent to the pipeline,
|
||||
* and the operators in the pipeline can be triggered.
|
||||
*
|
||||
*
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const loadUser = effect(
|
||||
* switchMap((id: number) =>
|
||||
* from(fetchUser(id)).pipe(
|
||||
* mapInto(user$),
|
||||
* catchErrorInto(error$),
|
||||
* onStart(() => isLoading$.next(true)),
|
||||
* onComplete(() => isLoading$.next(false))
|
||||
* )
|
||||
* )
|
||||
* );
|
||||
*
|
||||
* // emit value to effect
|
||||
* loadUser(1);
|
||||
*
|
||||
* // unsubscribe effect, will stop all ongoing processes
|
||||
* loadUser.unsubscribe();
|
||||
* ```
|
||||
*/
|
||||
export function effect<T, A>(op1: OperatorFunction<T, A>): Effect<T>;
|
||||
export function effect<T, A, B>(
|
||||
op1: OperatorFunction<T, A>,
|
||||
@@ -42,23 +75,47 @@ export function effect<T, A, B, C, D, E, F>(
|
||||
export function effect(...args: any[]) {
|
||||
const subject$ = new Subject<any>();
|
||||
|
||||
const effectLocation = environment.isDebug
|
||||
? `(${new Error().stack?.split('\n')[2].trim()})`
|
||||
: '';
|
||||
|
||||
class EffectError extends Unreachable {
|
||||
constructor(message: string, value?: any) {
|
||||
logger.error(`effect ${effectLocation} ${message}`, value);
|
||||
super(
|
||||
`effect ${effectLocation} ${message}` +
|
||||
` ${value ? (value instanceof Error ? value.stack ?? value.message : value + '') : ''}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// eslint-disable-next-line prefer-spread
|
||||
subject$.pipe.apply(subject$, args as any).subscribe({
|
||||
const subscription = subject$.pipe.apply(subject$, args as any).subscribe({
|
||||
next(value) {
|
||||
logger.error('effect should not emit value', value);
|
||||
throw new Unreachable('effect should not emit value');
|
||||
const error = new EffectError('should not emit value', value);
|
||||
setImmediate(() => {
|
||||
throw error;
|
||||
});
|
||||
},
|
||||
complete() {
|
||||
logger.error('effect unexpected complete');
|
||||
throw new Unreachable('effect unexpected complete');
|
||||
const error = new EffectError('effect unexpected complete');
|
||||
setImmediate(() => {
|
||||
throw error;
|
||||
});
|
||||
},
|
||||
error(error) {
|
||||
logger.error('effect uncatched error', error);
|
||||
throw new Unreachable('effect uncatched error');
|
||||
const effectError = new EffectError('effect uncaught error', error);
|
||||
setImmediate(() => {
|
||||
throw effectError;
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
return ((value: unknown) => {
|
||||
const fn = (value: unknown) => {
|
||||
subject$.next(value);
|
||||
}) as never;
|
||||
};
|
||||
|
||||
fn.unsubscribe = () => subscription.unsubscribe();
|
||||
|
||||
return fn as never;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user