feat(infra): livedata effect (#6281)

This commit is contained in:
EYHN
2024-03-25 06:09:45 +00:00
parent f2adbdaba4
commit a8cd1579f5
6 changed files with 728 additions and 490 deletions

View File

@@ -0,0 +1,118 @@
import type { Subscriber } from 'rxjs';
import { from, Observable, switchMap } from 'rxjs';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import {
catchErrorInto,
effect,
LiveData,
mapInto,
onComplete,
onStart,
} from '../../';
describe('example', () => {
type User = {
id: number;
name: string;
};
const fetchUser = vi.fn<[number], Promise<User>>();
const user$ = new LiveData<User | null>(null);
const isLoading$ = new LiveData<boolean>(false);
const error$ = new LiveData<Error | null>(null);
const loadUser = effect(
switchMap((id: number) =>
from(fetchUser(id)).pipe(
mapInto(user$),
catchErrorInto(error$),
onStart(() => isLoading$.next(true)),
onComplete(() => isLoading$.next(false))
)
)
);
beforeEach(() => {
fetchUser.mockClear();
user$.next(null);
isLoading$.next(false);
error$.next(null);
});
test('basic', async () => {
fetchUser.mockImplementation(async id => ({ id, name: 'John' }));
loadUser(1);
await vi.waitFor(() =>
expect(user$.value).toStrictEqual({ id: 1, name: 'John' })
);
expect(fetchUser).toHaveBeenCalledOnce();
expect(fetchUser).toHaveBeenCalledWith(1);
});
test('error', async () => {
fetchUser.mockRejectedValue(new Error('some error'));
loadUser(1);
await vi.waitFor(() => expect(error$.value).toBeInstanceOf(Error));
});
test('isLoading', async () => {
let resolveFn: (value: User) => void = null!;
fetchUser.mockReturnValue(
new Promise(resolve => {
resolveFn = resolve;
})
);
loadUser(1);
await vi.waitFor(() => expect(isLoading$.value).toBe(true));
expect(fetchUser).toHaveBeenCalledOnce();
resolveFn({ id: 1, name: 'John' });
await vi.waitFor(() => expect(isLoading$.value).toBe(false));
});
test('switchMap', async () => {
let fetch1: Subscriber<User> = null!;
let fetch1Canceled = false;
fetchUser.mockReturnValue(
new Observable<User>(subscriber => {
fetch1 = subscriber;
return () => {
fetch1Canceled = true;
};
}) as any
);
loadUser(1);
await vi.waitFor(() => expect(fetch1).toBeTruthy());
expect(isLoading$.value).toBe(true);
// start fetch2, should cancel fetch1
let fetch2: Subscriber<User> = null!;
fetchUser.mockReturnValue(
new Observable<User>(subscriber => {
fetch2 = subscriber;
}) as any
);
loadUser(2);
await vi.waitFor(() => expect(fetch1Canceled).toBe(true));
expect(isLoading$.value).toBe(true);
// fetch1 fail, should not affect fetch2
fetch1.error(new Error('some error'));
expect(error$.value).toBe(null);
// make fetch2 complete
fetch2.next({ id: 2, name: 'John' });
fetch2.complete();
// should update user$ to fetch2 result
await vi.waitFor(() =>
expect(user$.value).toStrictEqual({ id: 2, name: 'John' })
);
// should not have error
expect(error$.value).toBe(null);
expect(isLoading$.value).toBe(false);
});
});

View File

@@ -0,0 +1,64 @@
import { DebugLogger } from '@affine/debug';
import { Unreachable } from '@affine/env/constant';
import { type OperatorFunction, Subject } from 'rxjs';
const logger = new DebugLogger('effect');
export interface Effect<T> {
(value: T): void;
}
export function effect<T, A>(op1: OperatorFunction<T, A>): Effect<T>;
export function effect<T, A, B>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>
): Effect<T>;
export function effect<T, A, B, C>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>
): Effect<T>;
export function effect<T, A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Effect<T>;
export function effect<T, A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Effect<T>;
export function effect<T, A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Effect<T>;
export function effect(...args: any[]) {
const subject$ = new Subject<any>();
// eslint-disable-next-line prefer-spread
subject$.pipe.apply(subject$, args as any).subscribe({
next(value) {
logger.error('effect should not emit value', value);
throw new Unreachable('effect should not emit value');
},
complete() {
logger.error('effect unexpected complete');
throw new Unreachable('effect unexpected complete');
},
error(error) {
logger.error('effect uncatched error', error);
throw new Unreachable('effect uncatched error');
},
});
return ((value: unknown) => {
subject$.next(value);
}) as never;
}

View File

@@ -1,489 +1,4 @@
import { DebugLogger } from '@affine/debug';
import type {
InteropObservable,
Observer,
OperatorFunction,
Subscription,
TeardownLogic,
} from 'rxjs';
import {
BehaviorSubject,
combineLatest,
distinctUntilChanged,
EMPTY,
filter,
map,
Observable,
of,
scan,
skip,
Subject,
switchMap,
} from 'rxjs';
export * from './react';
const logger = new DebugLogger('livedata');
/**
* LiveData is a reactive data type.
*
* ## basic usage
*
* @example
* ```ts
* const livedata = new LiveData(0); // create livedata with initial value
*
* livedata.next(1); // update value
*
* console.log(livedata.value); // get current value
*
* livedata.subscribe(v => { // subscribe to value changes
* console.log(v); // 1
* });
* ```
*
* ## observable
*
* LiveData is a rxjs observable, you can use rxjs operators.
*
* @example
* ```ts
* new LiveData(0).pipe(
* map(v => v + 1),
* filter(v => v > 1),
* ...
* )
* ```
*
* NOTICE: different from normal observable, LiveData will always emit the latest value when you subscribe to it.
*
* ## from observable
*
* LiveData can be created from observable or from other livedata.
*
* @example
* ```ts
* const A = LiveData.from(
* of(1, 2, 3, 4), // from observable
* 0 // initial value
* );
*
* const B = LiveData.from(
* A.pipe(map(v => 'from a ' + v)), // from other livedata
* '' // initial value
* );
* ```
*
* ## Why is it called LiveData
*
* This API is very similar to LiveData in Android, as both are based on Observable, so I named it LiveData.
*
* @see {@link https://rxjs.dev/api/index/class/BehaviorSubject}
* @see {@link https://developer.android.com/topic/libraries/architecture/livedata}
*/
export class LiveData<T = unknown>
extends Observable<T>
implements InteropObservable<T>
{
static from<T>(
upstream$:
| Observable<T>
| InteropObservable<T>
| ((stream: Observable<LiveDataOperation>) => Observable<T>),
initialValue: T
): LiveData<T> {
const data$ = new LiveData(
initialValue,
typeof upstream$ === 'function'
? upstream$
: stream$ =>
stream$.pipe(
filter(
(op): op is Exclude<LiveDataOperation, 'set'> => op !== 'set'
),
switchMap(v => {
if (v === 'get') {
return of('watch' as const, 'unwatch' as const);
} else {
return of(v);
}
}),
scan((acc, op) => {
if (op === 'watch') {
return acc + 1;
} else if (op === 'unwatch') {
return acc - 1;
} else {
return acc;
}
}, 0),
map(count => {
if (count > 0) {
return 'watch';
} else {
return 'unwatch';
}
}),
distinctUntilChanged(),
switchMap(op => {
if (op === 'watch') {
return upstream$;
} else {
return EMPTY;
}
})
)
);
return data$;
}
private static GLOBAL_COMPUTED_RECURSIVE_COUNT = 0;
/**
* @example
* ```ts
* const a = new LiveData('v1');
* const v1 = new LiveData(100);
* const v2 = new LiveData(200);
*
* const v = LiveData.computed(get => {
* return get(a) === 'v1' ? get(v1) : get(v2);
* });
*
* expect(v.value).toBe(100);
* ```
*/
static computed<T>(
compute: (get: <L>(data: LiveData<L>) => L) => T
): LiveData<T> {
return LiveData.from(
new Observable(subscribe => {
const execute = (next: () => void) => {
const subscriptions: Subscription[] = [];
const getfn = <L>(data$: LiveData<L>) => {
let value = null as L;
let first = true;
subscriptions.push(
data$.subscribe({
error(err) {
subscribe.error(err);
},
next(v) {
value = v;
if (!first) {
next();
}
first = false;
},
})
);
return value;
};
LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT++;
try {
if (LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT > 10) {
subscribe.error(new Error('computed recursive limit exceeded'));
} else {
subscribe.next(compute(getfn));
}
} catch (err) {
subscribe.error(err);
} finally {
LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT--;
}
return () => {
subscriptions.forEach(s => s.unsubscribe());
};
};
let prev = () => {};
const looper = () => {
const dispose = execute(looper);
prev();
prev = dispose;
};
looper();
return () => {
prev();
};
}),
null as any
);
}
private readonly raw$: BehaviorSubject<T>;
private readonly ops$ = new Subject<LiveDataOperation>();
private readonly upstreamSubscription: Subscription | undefined;
/**
* When the upstream Observable of livedata throws an error, livedata will enter poisoned state. This is an
* unrecoverable abnormal state. Any operation on livedata will throw a PoisonedError.
*
* Since the development specification for livedata is not to throw any error, entering the poisoned state usually
* means a programming error.
*/
private isPoisoned = false;
private poisonedError: PoisonedError | null = null;
constructor(
initialValue: T,
upstream:
| ((upstream: Observable<LiveDataOperation>) => Observable<T>)
| undefined = undefined
) {
super();
this.raw$ = new BehaviorSubject(initialValue);
if (upstream) {
this.upstreamSubscription = upstream(this.ops$).subscribe({
next: v => {
this.raw$.next(v);
},
complete: () => {
if (!this.raw$.closed) {
logger.error('livedata upstream unexpected complete');
}
},
error: err => {
logger.error('uncatched error in livedata', err);
this.isPoisoned = true;
this.poisonedError = new PoisonedError(err);
this.raw$.error(this.poisonedError);
},
});
}
}
getValue(): T {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops$.next('get');
return this.raw$.value;
}
setValue(v: T) {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.raw$.next(v);
this.ops$.next('set');
}
get value(): T {
return this.getValue();
}
set value(v: T) {
this.next(v);
}
next(v: T) {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.setValue(v);
}
override subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void)
): Subscription;
override subscribe(
next?: ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription;
override subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
this.ops$.next('watch');
const subscription = this.raw$.subscribe(
observerOrNext as any,
error,
complete
);
subscription.add(() => {
this.ops$.next('unwatch');
});
return subscription;
}
map<R>(mapper: (v: T) => R): LiveData<R> {
const sub$ = LiveData.from(
new Observable<R>(subscriber =>
this.subscribe({
next: v => {
subscriber.next(mapper(v));
},
complete: () => {
sub$.complete();
},
})
),
undefined as R // is safe
);
return sub$;
}
// eslint-disable-next-line rxjs/finnish
asObservable(): Observable<T> {
return new Observable<T>(subscriber => {
return this.subscribe(subscriber);
});
}
override pipe(): Observable<T>;
override pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
override pipe<A, B>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>
): Observable<B>;
override pipe<A, B, C>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>
): Observable<C>;
override pipe<A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Observable<D>;
override pipe<A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Observable<E>;
override pipe<A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Observable<F>;
override pipe(...args: any[]) {
return new Observable(subscriber => {
this.ops$.next('watch');
// eslint-disable-next-line prefer-spread
const subscription = this.raw$.pipe
.apply(this.raw$, args as any)
.subscribe(subscriber);
subscription.add(() => {
this.ops$.next('unwatch');
});
return subscription;
});
}
complete() {
this.ops$.complete();
this.raw$.complete();
this.upstreamSubscription?.unsubscribe();
}
/**
* flatten the livedata
*
* ```
* new LiveData(new LiveData(0)).flat() // LiveData<number>
* ```
*
* ```
* new LiveData([new LiveData(0)]).flat() // LiveData<number[]>
* ```
*/
flat(): Flat<this> {
return LiveData.from(
this.pipe(
switchMap(v => {
if (v instanceof LiveData) {
return (v as LiveData<any>).flat();
} else if (Array.isArray(v)) {
return combineLatest(
v.map(v => {
if (v instanceof LiveData) {
return v.flat();
} else {
return of(v);
}
})
);
} else {
return of(v);
}
})
),
null as any
) as any;
}
reactSubscribe = (cb: () => void) => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops$.next('watch');
const subscription = this.raw$
.pipe(distinctUntilChanged(), skip(1))
.subscribe(cb);
subscription.add(() => {
this.ops$.next('unwatch');
});
return () => subscription.unsubscribe();
};
reactGetSnapshot = () => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops$.next('watch');
setImmediate(() => {
this.ops$.next('unwatch');
});
return this.raw$.value;
};
protected _subscribe(): TeardownLogic {
throw new Error('Method not implemented.');
}
[Symbol.observable || '@@observable']() {
return this;
}
[Symbol.observable]() {
return this;
}
}
export type LiveDataOperation = 'set' | 'get' | 'watch' | 'unwatch';
export type Unwrap<T> =
T extends LiveData<infer Z>
? Unwrap<Z>
: T extends LiveData<infer A>[]
? Unwrap<A>[]
: T;
export type Flat<T> = T extends LiveData<infer P> ? LiveData<Unwrap<P>> : T;
export class PoisonedError extends Error {
constructor(originalError: any) {
super(
'The livedata is poisoned, original error: ' +
(originalError instanceof Error ? originalError.stack : originalError)
);
}
}
export { type Effect, effect } from './effect';
export { LiveData, PoisonedError } from './livedata';
export { catchErrorInto, mapInto, onComplete, onStart } from './ops';
export { useEnsureLiveData, useLiveData } from './react';

View File

@@ -0,0 +1,487 @@
import { DebugLogger } from '@affine/debug';
import type {
InteropObservable,
Observer,
OperatorFunction,
Subscription,
TeardownLogic,
} from 'rxjs';
import {
BehaviorSubject,
combineLatest,
distinctUntilChanged,
EMPTY,
filter,
map,
Observable,
of,
scan,
skip,
Subject,
switchMap,
} from 'rxjs';
const logger = new DebugLogger('livedata');
/**
* LiveData is a reactive data type.
*
* ## basic usage
*
* @example
* ```ts
* const livedata = new LiveData(0); // create livedata with initial value
*
* livedata.next(1); // update value
*
* console.log(livedata.value); // get current value
*
* livedata.subscribe(v => { // subscribe to value changes
* console.log(v); // 1
* });
* ```
*
* ## observable
*
* LiveData is a rxjs observable, you can use rxjs operators.
*
* @example
* ```ts
* new LiveData(0).pipe(
* map(v => v + 1),
* filter(v => v > 1),
* ...
* )
* ```
*
* NOTICE: different from normal observable, LiveData will always emit the latest value when you subscribe to it.
*
* ## from observable
*
* LiveData can be created from observable or from other livedata.
*
* @example
* ```ts
* const A = LiveData.from(
* of(1, 2, 3, 4), // from observable
* 0 // initial value
* );
*
* const B = LiveData.from(
* A.pipe(map(v => 'from a ' + v)), // from other livedata
* '' // initial value
* );
* ```
*
* ## Why is it called LiveData
*
* This API is very similar to LiveData in Android, as both are based on Observable, so I named it LiveData.
*
* @see {@link https://rxjs.dev/api/index/class/BehaviorSubject}
* @see {@link https://developer.android.com/topic/libraries/architecture/livedata}
*/
export class LiveData<T = unknown>
extends Observable<T>
implements InteropObservable<T>
{
static from<T>(
upstream$:
| Observable<T>
| InteropObservable<T>
| ((stream: Observable<LiveDataOperation>) => Observable<T>),
initialValue: T
): LiveData<T> {
const data$ = new LiveData(
initialValue,
typeof upstream$ === 'function'
? upstream$
: stream$ =>
stream$.pipe(
filter(
(op): op is Exclude<LiveDataOperation, 'set'> => op !== 'set'
),
switchMap(v => {
if (v === 'get') {
return of('watch' as const, 'unwatch' as const);
} else {
return of(v);
}
}),
scan((acc, op) => {
if (op === 'watch') {
return acc + 1;
} else if (op === 'unwatch') {
return acc - 1;
} else {
return acc;
}
}, 0),
map(count => {
if (count > 0) {
return 'watch';
} else {
return 'unwatch';
}
}),
distinctUntilChanged(),
switchMap(op => {
if (op === 'watch') {
return upstream$;
} else {
return EMPTY;
}
})
)
);
return data$;
}
private static GLOBAL_COMPUTED_RECURSIVE_COUNT = 0;
/**
* @example
* ```ts
* const a = new LiveData('v1');
* const v1 = new LiveData(100);
* const v2 = new LiveData(200);
*
* const v = LiveData.computed(get => {
* return get(a) === 'v1' ? get(v1) : get(v2);
* });
*
* expect(v.value).toBe(100);
* ```
*/
static computed<T>(
compute: (get: <L>(data: LiveData<L>) => L) => T
): LiveData<T> {
return LiveData.from(
new Observable(subscribe => {
const execute = (next: () => void) => {
const subscriptions: Subscription[] = [];
const getfn = <L>(data$: LiveData<L>) => {
let value = null as L;
let first = true;
subscriptions.push(
data$.subscribe({
error(err) {
subscribe.error(err);
},
next(v) {
value = v;
if (!first) {
next();
}
first = false;
},
})
);
return value;
};
LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT++;
try {
if (LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT > 10) {
subscribe.error(new Error('computed recursive limit exceeded'));
} else {
subscribe.next(compute(getfn));
}
} catch (err) {
subscribe.error(err);
} finally {
LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT--;
}
return () => {
subscriptions.forEach(s => s.unsubscribe());
};
};
let prev = () => {};
const looper = () => {
const dispose = execute(looper);
prev();
prev = dispose;
};
looper();
return () => {
prev();
};
}),
null as any
);
}
private readonly raw$: BehaviorSubject<T>;
private readonly ops$ = new Subject<LiveDataOperation>();
private readonly upstreamSubscription: Subscription | undefined;
/**
* When the upstream Observable of livedata throws an error, livedata will enter poisoned state. This is an
* unrecoverable abnormal state. Any operation on livedata will throw a PoisonedError.
*
* Since the development specification for livedata is not to throw any error, entering the poisoned state usually
* means a programming error.
*/
private isPoisoned = false;
private poisonedError: PoisonedError | null = null;
constructor(
initialValue: T,
upstream:
| ((upstream: Observable<LiveDataOperation>) => Observable<T>)
| undefined = undefined
) {
super();
this.raw$ = new BehaviorSubject(initialValue);
if (upstream) {
this.upstreamSubscription = upstream(this.ops$).subscribe({
next: v => {
this.raw$.next(v);
},
complete: () => {
if (!this.raw$.closed) {
logger.error('livedata upstream unexpected complete');
}
},
error: err => {
logger.error('uncatched error in livedata', err);
this.isPoisoned = true;
this.poisonedError = new PoisonedError(err);
this.raw$.error(this.poisonedError);
},
});
}
}
getValue = (): T => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops$.next('get');
return this.raw$.value;
};
setValue = (v: T) => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.raw$.next(v);
this.ops$.next('set');
};
get value(): T {
return this.getValue();
}
set value(v: T) {
this.next(v);
}
next = (v: T) => {
if (this.isPoisoned) {
throw this.poisonedError;
}
return this.setValue(v);
};
override subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void)
): Subscription;
override subscribe(
next?: ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription;
override subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
this.ops$.next('watch');
const subscription = this.raw$.subscribe(
observerOrNext as any,
error,
complete
);
subscription.add(() => {
this.ops$.next('unwatch');
});
return subscription;
}
map<R>(mapper: (v: T) => R): LiveData<R> {
const sub$ = LiveData.from(
new Observable<R>(subscriber =>
this.subscribe({
next: v => {
subscriber.next(mapper(v));
},
complete: () => {
sub$.complete();
},
})
),
undefined as R // is safe
);
return sub$;
}
// eslint-disable-next-line rxjs/finnish
asObservable(): Observable<T> {
return new Observable<T>(subscriber => {
return this.subscribe(subscriber);
});
}
override pipe(): Observable<T>;
override pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
override pipe<A, B>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>
): Observable<B>;
override pipe<A, B, C>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>
): Observable<C>;
override pipe<A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Observable<D>;
override pipe<A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Observable<E>;
override pipe<A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Observable<F>;
override pipe(...args: any[]) {
return new Observable(subscriber => {
this.ops$.next('watch');
// eslint-disable-next-line prefer-spread
const subscription = this.raw$.pipe
.apply(this.raw$, args as any)
.subscribe(subscriber);
subscription.add(() => {
this.ops$.next('unwatch');
});
return subscription;
});
}
complete() {
this.ops$.complete();
this.raw$.complete();
this.upstreamSubscription?.unsubscribe();
}
/**
* flatten the livedata
*
* ```
* new LiveData(new LiveData(0)).flat() // LiveData<number>
* ```
*
* ```
* new LiveData([new LiveData(0)]).flat() // LiveData<number[]>
* ```
*/
flat(): Flat<this> {
return LiveData.from(
this.pipe(
switchMap(v => {
if (v instanceof LiveData) {
return (v as LiveData<any>).flat();
} else if (Array.isArray(v)) {
return combineLatest(
v.map(v => {
if (v instanceof LiveData) {
return v.flat();
} else {
return of(v);
}
})
);
} else {
return of(v);
}
})
),
null as any
) as any;
}
reactSubscribe = (cb: () => void) => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops$.next('watch');
const subscription = this.raw$
.pipe(distinctUntilChanged(), skip(1))
.subscribe(cb);
subscription.add(() => {
this.ops$.next('unwatch');
});
return () => subscription.unsubscribe();
};
reactGetSnapshot = () => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops$.next('watch');
setImmediate(() => {
this.ops$.next('unwatch');
});
return this.raw$.value;
};
protected _subscribe(): TeardownLogic {
throw new Error('Method not implemented.');
}
[Symbol.observable || '@@observable']() {
return this;
}
[Symbol.observable]() {
return this;
}
}
export type LiveDataOperation = 'set' | 'get' | 'watch' | 'unwatch';
export type Unwrap<T> =
T extends LiveData<infer Z>
? Unwrap<Z>
: T extends LiveData<infer A>[]
? Unwrap<A>[]
: T;
export type Flat<T> = T extends LiveData<infer P> ? LiveData<Unwrap<P>> : T;
export class PoisonedError extends Error {
constructor(originalError: any) {
super(
'The livedata is poisoned, original error: ' +
(originalError instanceof Error ? originalError.stack : originalError)
);
}
}

View File

@@ -0,0 +1,54 @@
import {
catchError,
EMPTY,
mergeMap,
Observable,
type OperatorFunction,
pipe,
} from 'rxjs';
import type { LiveData } from './livedata';
export function mapInto<T>(l$: LiveData<T>) {
return pipe(
mergeMap((value: T) => {
l$.next(value);
return EMPTY;
})
);
}
export function catchErrorInto(l$: LiveData<any>) {
return pipe(
catchError((error: any) => {
l$.next(error);
return EMPTY;
})
);
}
export function onStart<T>(cb: () => void): OperatorFunction<T, T> {
return observable$ =>
new Observable(subscribe => {
cb();
return observable$.subscribe(subscribe);
});
}
export function onComplete<T>(cb: () => void): OperatorFunction<T, T> {
return observable$ =>
new Observable(subscribe => {
return observable$.subscribe({
complete() {
cb();
subscribe.complete();
},
error(err) {
subscribe.error(err);
},
next(value) {
subscribe.next(value);
},
});
});
}

View File

@@ -1,7 +1,7 @@
import { use } from 'foxact/use';
import { useSyncExternalStore } from 'react';
import type { LiveData } from './index';
import type { LiveData } from './livedata';
function noopSubscribe() {
return () => {};