diff --git a/packages/common/infra/src/livedata/effect/__tests__/effect.spec.ts b/packages/common/infra/src/livedata/effect/__tests__/effect.spec.ts new file mode 100644 index 0000000000..969510baa1 --- /dev/null +++ b/packages/common/infra/src/livedata/effect/__tests__/effect.spec.ts @@ -0,0 +1,118 @@ +import type { Subscriber } from 'rxjs'; +import { from, Observable, switchMap } from 'rxjs'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; + +import { + catchErrorInto, + effect, + LiveData, + mapInto, + onComplete, + onStart, +} from '../../'; + +describe('example', () => { + type User = { + id: number; + name: string; + }; + + const fetchUser = vi.fn<[number], Promise>(); + + const user$ = new LiveData(null); + const isLoading$ = new LiveData(false); + const error$ = new LiveData(null); + + const loadUser = effect( + switchMap((id: number) => + from(fetchUser(id)).pipe( + mapInto(user$), + catchErrorInto(error$), + onStart(() => isLoading$.next(true)), + onComplete(() => isLoading$.next(false)) + ) + ) + ); + + beforeEach(() => { + fetchUser.mockClear(); + + user$.next(null); + isLoading$.next(false); + error$.next(null); + }); + + test('basic', async () => { + fetchUser.mockImplementation(async id => ({ id, name: 'John' })); + loadUser(1); + await vi.waitFor(() => + expect(user$.value).toStrictEqual({ id: 1, name: 'John' }) + ); + expect(fetchUser).toHaveBeenCalledOnce(); + expect(fetchUser).toHaveBeenCalledWith(1); + }); + + test('error', async () => { + fetchUser.mockRejectedValue(new Error('some error')); + loadUser(1); + await vi.waitFor(() => expect(error$.value).toBeInstanceOf(Error)); + }); + + test('isLoading', async () => { + let resolveFn: (value: User) => void = null!; + fetchUser.mockReturnValue( + new Promise(resolve => { + resolveFn = resolve; + }) + ); + loadUser(1); + await vi.waitFor(() => expect(isLoading$.value).toBe(true)); + expect(fetchUser).toHaveBeenCalledOnce(); + resolveFn({ id: 1, name: 'John' }); + await vi.waitFor(() => expect(isLoading$.value).toBe(false)); + }); + + test('switchMap', async () => { + let fetch1: Subscriber = null!; + let fetch1Canceled = false; + fetchUser.mockReturnValue( + new Observable(subscriber => { + fetch1 = subscriber; + return () => { + fetch1Canceled = true; + }; + }) as any + ); + + loadUser(1); + await vi.waitFor(() => expect(fetch1).toBeTruthy()); + expect(isLoading$.value).toBe(true); + + // start fetch2, should cancel fetch1 + let fetch2: Subscriber = null!; + fetchUser.mockReturnValue( + new Observable(subscriber => { + fetch2 = subscriber; + }) as any + ); + loadUser(2); + await vi.waitFor(() => expect(fetch1Canceled).toBe(true)); + expect(isLoading$.value).toBe(true); + + // fetch1 fail, should not affect fetch2 + fetch1.error(new Error('some error')); + expect(error$.value).toBe(null); + + // make fetch2 complete + fetch2.next({ id: 2, name: 'John' }); + fetch2.complete(); + + // should update user$ to fetch2 result + await vi.waitFor(() => + expect(user$.value).toStrictEqual({ id: 2, name: 'John' }) + ); + // should not have error + expect(error$.value).toBe(null); + expect(isLoading$.value).toBe(false); + }); +}); diff --git a/packages/common/infra/src/livedata/effect/index.ts b/packages/common/infra/src/livedata/effect/index.ts new file mode 100644 index 0000000000..b28561815e --- /dev/null +++ b/packages/common/infra/src/livedata/effect/index.ts @@ -0,0 +1,64 @@ +import { DebugLogger } from '@affine/debug'; +import { Unreachable } from '@affine/env/constant'; +import { type OperatorFunction, Subject } from 'rxjs'; + +const logger = new DebugLogger('effect'); + +export interface Effect { + (value: T): void; +} + +export function effect(op1: OperatorFunction): Effect; +export function effect( + op1: OperatorFunction, + op2: OperatorFunction +): Effect; +export function effect( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction +): Effect; +export function effect( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction +): Effect; +export function effect( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction, + op5: OperatorFunction +): Effect; +export function effect( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction, + op5: OperatorFunction, + op6: OperatorFunction +): Effect; +export function effect(...args: any[]) { + const subject$ = new Subject(); + + // eslint-disable-next-line prefer-spread + subject$.pipe.apply(subject$, args as any).subscribe({ + next(value) { + logger.error('effect should not emit value', value); + throw new Unreachable('effect should not emit value'); + }, + complete() { + logger.error('effect unexpected complete'); + throw new Unreachable('effect unexpected complete'); + }, + error(error) { + logger.error('effect uncatched error', error); + throw new Unreachable('effect uncatched error'); + }, + }); + + return ((value: unknown) => { + subject$.next(value); + }) as never; +} diff --git a/packages/common/infra/src/livedata/index.ts b/packages/common/infra/src/livedata/index.ts index 1e031469a8..089038bccd 100644 --- a/packages/common/infra/src/livedata/index.ts +++ b/packages/common/infra/src/livedata/index.ts @@ -1,489 +1,4 @@ -import { DebugLogger } from '@affine/debug'; -import type { - InteropObservable, - Observer, - OperatorFunction, - Subscription, - TeardownLogic, -} from 'rxjs'; -import { - BehaviorSubject, - combineLatest, - distinctUntilChanged, - EMPTY, - filter, - map, - Observable, - of, - scan, - skip, - Subject, - switchMap, -} from 'rxjs'; - -export * from './react'; - -const logger = new DebugLogger('livedata'); - -/** - * LiveData is a reactive data type. - * - * ## basic usage - * - * @example - * ```ts - * const livedata = new LiveData(0); // create livedata with initial value - * - * livedata.next(1); // update value - * - * console.log(livedata.value); // get current value - * - * livedata.subscribe(v => { // subscribe to value changes - * console.log(v); // 1 - * }); - * ``` - * - * ## observable - * - * LiveData is a rxjs observable, you can use rxjs operators. - * - * @example - * ```ts - * new LiveData(0).pipe( - * map(v => v + 1), - * filter(v => v > 1), - * ... - * ) - * ``` - * - * NOTICE: different from normal observable, LiveData will always emit the latest value when you subscribe to it. - * - * ## from observable - * - * LiveData can be created from observable or from other livedata. - * - * @example - * ```ts - * const A = LiveData.from( - * of(1, 2, 3, 4), // from observable - * 0 // initial value - * ); - * - * const B = LiveData.from( - * A.pipe(map(v => 'from a ' + v)), // from other livedata - * '' // initial value - * ); - * ``` - * - * ## Why is it called LiveData - * - * This API is very similar to LiveData in Android, as both are based on Observable, so I named it LiveData. - * - * @see {@link https://rxjs.dev/api/index/class/BehaviorSubject} - * @see {@link https://developer.android.com/topic/libraries/architecture/livedata} - */ -export class LiveData - extends Observable - implements InteropObservable -{ - static from( - upstream$: - | Observable - | InteropObservable - | ((stream: Observable) => Observable), - initialValue: T - ): LiveData { - const data$ = new LiveData( - initialValue, - typeof upstream$ === 'function' - ? upstream$ - : stream$ => - stream$.pipe( - filter( - (op): op is Exclude => op !== 'set' - ), - switchMap(v => { - if (v === 'get') { - return of('watch' as const, 'unwatch' as const); - } else { - return of(v); - } - }), - scan((acc, op) => { - if (op === 'watch') { - return acc + 1; - } else if (op === 'unwatch') { - return acc - 1; - } else { - return acc; - } - }, 0), - map(count => { - if (count > 0) { - return 'watch'; - } else { - return 'unwatch'; - } - }), - distinctUntilChanged(), - switchMap(op => { - if (op === 'watch') { - return upstream$; - } else { - return EMPTY; - } - }) - ) - ); - - return data$; - } - - private static GLOBAL_COMPUTED_RECURSIVE_COUNT = 0; - - /** - * @example - * ```ts - * const a = new LiveData('v1'); - * const v1 = new LiveData(100); - * const v2 = new LiveData(200); - * - * const v = LiveData.computed(get => { - * return get(a) === 'v1' ? get(v1) : get(v2); - * }); - * - * expect(v.value).toBe(100); - * ``` - */ - static computed( - compute: (get: (data: LiveData) => L) => T - ): LiveData { - return LiveData.from( - new Observable(subscribe => { - const execute = (next: () => void) => { - const subscriptions: Subscription[] = []; - const getfn = (data$: LiveData) => { - let value = null as L; - let first = true; - subscriptions.push( - data$.subscribe({ - error(err) { - subscribe.error(err); - }, - next(v) { - value = v; - if (!first) { - next(); - } - first = false; - }, - }) - ); - return value; - }; - - LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT++; - - try { - if (LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT > 10) { - subscribe.error(new Error('computed recursive limit exceeded')); - } else { - subscribe.next(compute(getfn)); - } - } catch (err) { - subscribe.error(err); - } finally { - LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT--; - } - - return () => { - subscriptions.forEach(s => s.unsubscribe()); - }; - }; - - let prev = () => {}; - - const looper = () => { - const dispose = execute(looper); - prev(); - prev = dispose; - }; - - looper(); - - return () => { - prev(); - }; - }), - null as any - ); - } - - private readonly raw$: BehaviorSubject; - private readonly ops$ = new Subject(); - private readonly upstreamSubscription: Subscription | undefined; - - /** - * When the upstream Observable of livedata throws an error, livedata will enter poisoned state. This is an - * unrecoverable abnormal state. Any operation on livedata will throw a PoisonedError. - * - * Since the development specification for livedata is not to throw any error, entering the poisoned state usually - * means a programming error. - */ - private isPoisoned = false; - private poisonedError: PoisonedError | null = null; - - constructor( - initialValue: T, - upstream: - | ((upstream: Observable) => Observable) - | undefined = undefined - ) { - super(); - this.raw$ = new BehaviorSubject(initialValue); - if (upstream) { - this.upstreamSubscription = upstream(this.ops$).subscribe({ - next: v => { - this.raw$.next(v); - }, - complete: () => { - if (!this.raw$.closed) { - logger.error('livedata upstream unexpected complete'); - } - }, - error: err => { - logger.error('uncatched error in livedata', err); - this.isPoisoned = true; - this.poisonedError = new PoisonedError(err); - this.raw$.error(this.poisonedError); - }, - }); - } - } - - getValue(): T { - if (this.isPoisoned) { - throw this.poisonedError; - } - this.ops$.next('get'); - return this.raw$.value; - } - - setValue(v: T) { - if (this.isPoisoned) { - throw this.poisonedError; - } - this.raw$.next(v); - this.ops$.next('set'); - } - - get value(): T { - return this.getValue(); - } - - set value(v: T) { - this.next(v); - } - - next(v: T) { - if (this.isPoisoned) { - throw this.poisonedError; - } - this.setValue(v); - } - - override subscribe( - observerOrNext?: Partial> | ((value: T) => void) - ): Subscription; - override subscribe( - next?: ((value: T) => void) | null, - error?: ((error: any) => void) | null, - complete?: (() => void) | null - ): Subscription; - override subscribe( - observerOrNext?: Partial> | ((value: T) => void) | null, - error?: ((error: any) => void) | null, - complete?: (() => void) | null - ): Subscription { - this.ops$.next('watch'); - const subscription = this.raw$.subscribe( - observerOrNext as any, - error, - complete - ); - subscription.add(() => { - this.ops$.next('unwatch'); - }); - return subscription; - } - - map(mapper: (v: T) => R): LiveData { - const sub$ = LiveData.from( - new Observable(subscriber => - this.subscribe({ - next: v => { - subscriber.next(mapper(v)); - }, - complete: () => { - sub$.complete(); - }, - }) - ), - undefined as R // is safe - ); - - return sub$; - } - - // eslint-disable-next-line rxjs/finnish - asObservable(): Observable { - return new Observable(subscriber => { - return this.subscribe(subscriber); - }); - } - - override pipe(): Observable; - override pipe(op1: OperatorFunction): Observable; - override pipe( - op1: OperatorFunction, - op2: OperatorFunction - ): Observable; - override pipe( - op1: OperatorFunction, - op2: OperatorFunction, - op3: OperatorFunction - ): Observable; - override pipe( - op1: OperatorFunction, - op2: OperatorFunction, - op3: OperatorFunction, - op4: OperatorFunction - ): Observable; - override pipe( - op1: OperatorFunction, - op2: OperatorFunction, - op3: OperatorFunction, - op4: OperatorFunction, - op5: OperatorFunction - ): Observable; - override pipe( - op1: OperatorFunction, - op2: OperatorFunction, - op3: OperatorFunction, - op4: OperatorFunction, - op5: OperatorFunction, - op6: OperatorFunction - ): Observable; - override pipe(...args: any[]) { - return new Observable(subscriber => { - this.ops$.next('watch'); - // eslint-disable-next-line prefer-spread - const subscription = this.raw$.pipe - .apply(this.raw$, args as any) - .subscribe(subscriber); - subscription.add(() => { - this.ops$.next('unwatch'); - }); - return subscription; - }); - } - - complete() { - this.ops$.complete(); - this.raw$.complete(); - this.upstreamSubscription?.unsubscribe(); - } - - /** - * flatten the livedata - * - * ``` - * new LiveData(new LiveData(0)).flat() // LiveData - * ``` - * - * ``` - * new LiveData([new LiveData(0)]).flat() // LiveData - * ``` - */ - flat(): Flat { - return LiveData.from( - this.pipe( - switchMap(v => { - if (v instanceof LiveData) { - return (v as LiveData).flat(); - } else if (Array.isArray(v)) { - return combineLatest( - v.map(v => { - if (v instanceof LiveData) { - return v.flat(); - } else { - return of(v); - } - }) - ); - } else { - return of(v); - } - }) - ), - null as any - ) as any; - } - - reactSubscribe = (cb: () => void) => { - if (this.isPoisoned) { - throw this.poisonedError; - } - this.ops$.next('watch'); - const subscription = this.raw$ - .pipe(distinctUntilChanged(), skip(1)) - .subscribe(cb); - subscription.add(() => { - this.ops$.next('unwatch'); - }); - return () => subscription.unsubscribe(); - }; - - reactGetSnapshot = () => { - if (this.isPoisoned) { - throw this.poisonedError; - } - this.ops$.next('watch'); - setImmediate(() => { - this.ops$.next('unwatch'); - }); - return this.raw$.value; - }; - - protected _subscribe(): TeardownLogic { - throw new Error('Method not implemented.'); - } - - [Symbol.observable || '@@observable']() { - return this; - } - - [Symbol.observable]() { - return this; - } -} - -export type LiveDataOperation = 'set' | 'get' | 'watch' | 'unwatch'; - -export type Unwrap = - T extends LiveData - ? Unwrap - : T extends LiveData[] - ? Unwrap[] - : T; - -export type Flat = T extends LiveData ? LiveData> : T; - -export class PoisonedError extends Error { - constructor(originalError: any) { - super( - 'The livedata is poisoned, original error: ' + - (originalError instanceof Error ? originalError.stack : originalError) - ); - } -} +export { type Effect, effect } from './effect'; +export { LiveData, PoisonedError } from './livedata'; +export { catchErrorInto, mapInto, onComplete, onStart } from './ops'; +export { useEnsureLiveData, useLiveData } from './react'; diff --git a/packages/common/infra/src/livedata/livedata.ts b/packages/common/infra/src/livedata/livedata.ts new file mode 100644 index 0000000000..5a6539d96a --- /dev/null +++ b/packages/common/infra/src/livedata/livedata.ts @@ -0,0 +1,487 @@ +import { DebugLogger } from '@affine/debug'; +import type { + InteropObservable, + Observer, + OperatorFunction, + Subscription, + TeardownLogic, +} from 'rxjs'; +import { + BehaviorSubject, + combineLatest, + distinctUntilChanged, + EMPTY, + filter, + map, + Observable, + of, + scan, + skip, + Subject, + switchMap, +} from 'rxjs'; + +const logger = new DebugLogger('livedata'); + +/** + * LiveData is a reactive data type. + * + * ## basic usage + * + * @example + * ```ts + * const livedata = new LiveData(0); // create livedata with initial value + * + * livedata.next(1); // update value + * + * console.log(livedata.value); // get current value + * + * livedata.subscribe(v => { // subscribe to value changes + * console.log(v); // 1 + * }); + * ``` + * + * ## observable + * + * LiveData is a rxjs observable, you can use rxjs operators. + * + * @example + * ```ts + * new LiveData(0).pipe( + * map(v => v + 1), + * filter(v => v > 1), + * ... + * ) + * ``` + * + * NOTICE: different from normal observable, LiveData will always emit the latest value when you subscribe to it. + * + * ## from observable + * + * LiveData can be created from observable or from other livedata. + * + * @example + * ```ts + * const A = LiveData.from( + * of(1, 2, 3, 4), // from observable + * 0 // initial value + * ); + * + * const B = LiveData.from( + * A.pipe(map(v => 'from a ' + v)), // from other livedata + * '' // initial value + * ); + * ``` + * + * ## Why is it called LiveData + * + * This API is very similar to LiveData in Android, as both are based on Observable, so I named it LiveData. + * + * @see {@link https://rxjs.dev/api/index/class/BehaviorSubject} + * @see {@link https://developer.android.com/topic/libraries/architecture/livedata} + */ +export class LiveData + extends Observable + implements InteropObservable +{ + static from( + upstream$: + | Observable + | InteropObservable + | ((stream: Observable) => Observable), + initialValue: T + ): LiveData { + const data$ = new LiveData( + initialValue, + typeof upstream$ === 'function' + ? upstream$ + : stream$ => + stream$.pipe( + filter( + (op): op is Exclude => op !== 'set' + ), + switchMap(v => { + if (v === 'get') { + return of('watch' as const, 'unwatch' as const); + } else { + return of(v); + } + }), + scan((acc, op) => { + if (op === 'watch') { + return acc + 1; + } else if (op === 'unwatch') { + return acc - 1; + } else { + return acc; + } + }, 0), + map(count => { + if (count > 0) { + return 'watch'; + } else { + return 'unwatch'; + } + }), + distinctUntilChanged(), + switchMap(op => { + if (op === 'watch') { + return upstream$; + } else { + return EMPTY; + } + }) + ) + ); + + return data$; + } + + private static GLOBAL_COMPUTED_RECURSIVE_COUNT = 0; + + /** + * @example + * ```ts + * const a = new LiveData('v1'); + * const v1 = new LiveData(100); + * const v2 = new LiveData(200); + * + * const v = LiveData.computed(get => { + * return get(a) === 'v1' ? get(v1) : get(v2); + * }); + * + * expect(v.value).toBe(100); + * ``` + */ + static computed( + compute: (get: (data: LiveData) => L) => T + ): LiveData { + return LiveData.from( + new Observable(subscribe => { + const execute = (next: () => void) => { + const subscriptions: Subscription[] = []; + const getfn = (data$: LiveData) => { + let value = null as L; + let first = true; + subscriptions.push( + data$.subscribe({ + error(err) { + subscribe.error(err); + }, + next(v) { + value = v; + if (!first) { + next(); + } + first = false; + }, + }) + ); + return value; + }; + + LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT++; + + try { + if (LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT > 10) { + subscribe.error(new Error('computed recursive limit exceeded')); + } else { + subscribe.next(compute(getfn)); + } + } catch (err) { + subscribe.error(err); + } finally { + LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT--; + } + + return () => { + subscriptions.forEach(s => s.unsubscribe()); + }; + }; + + let prev = () => {}; + + const looper = () => { + const dispose = execute(looper); + prev(); + prev = dispose; + }; + + looper(); + + return () => { + prev(); + }; + }), + null as any + ); + } + + private readonly raw$: BehaviorSubject; + private readonly ops$ = new Subject(); + private readonly upstreamSubscription: Subscription | undefined; + + /** + * When the upstream Observable of livedata throws an error, livedata will enter poisoned state. This is an + * unrecoverable abnormal state. Any operation on livedata will throw a PoisonedError. + * + * Since the development specification for livedata is not to throw any error, entering the poisoned state usually + * means a programming error. + */ + private isPoisoned = false; + private poisonedError: PoisonedError | null = null; + + constructor( + initialValue: T, + upstream: + | ((upstream: Observable) => Observable) + | undefined = undefined + ) { + super(); + this.raw$ = new BehaviorSubject(initialValue); + if (upstream) { + this.upstreamSubscription = upstream(this.ops$).subscribe({ + next: v => { + this.raw$.next(v); + }, + complete: () => { + if (!this.raw$.closed) { + logger.error('livedata upstream unexpected complete'); + } + }, + error: err => { + logger.error('uncatched error in livedata', err); + this.isPoisoned = true; + this.poisonedError = new PoisonedError(err); + this.raw$.error(this.poisonedError); + }, + }); + } + } + + getValue = (): T => { + if (this.isPoisoned) { + throw this.poisonedError; + } + this.ops$.next('get'); + return this.raw$.value; + }; + + setValue = (v: T) => { + if (this.isPoisoned) { + throw this.poisonedError; + } + this.raw$.next(v); + this.ops$.next('set'); + }; + + get value(): T { + return this.getValue(); + } + + set value(v: T) { + this.next(v); + } + + next = (v: T) => { + if (this.isPoisoned) { + throw this.poisonedError; + } + return this.setValue(v); + }; + + override subscribe( + observerOrNext?: Partial> | ((value: T) => void) + ): Subscription; + override subscribe( + next?: ((value: T) => void) | null, + error?: ((error: any) => void) | null, + complete?: (() => void) | null + ): Subscription; + override subscribe( + observerOrNext?: Partial> | ((value: T) => void) | null, + error?: ((error: any) => void) | null, + complete?: (() => void) | null + ): Subscription { + this.ops$.next('watch'); + const subscription = this.raw$.subscribe( + observerOrNext as any, + error, + complete + ); + subscription.add(() => { + this.ops$.next('unwatch'); + }); + return subscription; + } + + map(mapper: (v: T) => R): LiveData { + const sub$ = LiveData.from( + new Observable(subscriber => + this.subscribe({ + next: v => { + subscriber.next(mapper(v)); + }, + complete: () => { + sub$.complete(); + }, + }) + ), + undefined as R // is safe + ); + + return sub$; + } + + // eslint-disable-next-line rxjs/finnish + asObservable(): Observable { + return new Observable(subscriber => { + return this.subscribe(subscriber); + }); + } + + override pipe(): Observable; + override pipe(op1: OperatorFunction): Observable; + override pipe( + op1: OperatorFunction, + op2: OperatorFunction + ): Observable; + override pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction + ): Observable; + override pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction + ): Observable; + override pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction, + op5: OperatorFunction + ): Observable; + override pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction, + op5: OperatorFunction, + op6: OperatorFunction + ): Observable; + override pipe(...args: any[]) { + return new Observable(subscriber => { + this.ops$.next('watch'); + // eslint-disable-next-line prefer-spread + const subscription = this.raw$.pipe + .apply(this.raw$, args as any) + .subscribe(subscriber); + subscription.add(() => { + this.ops$.next('unwatch'); + }); + return subscription; + }); + } + + complete() { + this.ops$.complete(); + this.raw$.complete(); + this.upstreamSubscription?.unsubscribe(); + } + + /** + * flatten the livedata + * + * ``` + * new LiveData(new LiveData(0)).flat() // LiveData + * ``` + * + * ``` + * new LiveData([new LiveData(0)]).flat() // LiveData + * ``` + */ + flat(): Flat { + return LiveData.from( + this.pipe( + switchMap(v => { + if (v instanceof LiveData) { + return (v as LiveData).flat(); + } else if (Array.isArray(v)) { + return combineLatest( + v.map(v => { + if (v instanceof LiveData) { + return v.flat(); + } else { + return of(v); + } + }) + ); + } else { + return of(v); + } + }) + ), + null as any + ) as any; + } + + reactSubscribe = (cb: () => void) => { + if (this.isPoisoned) { + throw this.poisonedError; + } + this.ops$.next('watch'); + const subscription = this.raw$ + .pipe(distinctUntilChanged(), skip(1)) + .subscribe(cb); + subscription.add(() => { + this.ops$.next('unwatch'); + }); + return () => subscription.unsubscribe(); + }; + + reactGetSnapshot = () => { + if (this.isPoisoned) { + throw this.poisonedError; + } + this.ops$.next('watch'); + setImmediate(() => { + this.ops$.next('unwatch'); + }); + return this.raw$.value; + }; + + protected _subscribe(): TeardownLogic { + throw new Error('Method not implemented.'); + } + + [Symbol.observable || '@@observable']() { + return this; + } + + [Symbol.observable]() { + return this; + } +} + +export type LiveDataOperation = 'set' | 'get' | 'watch' | 'unwatch'; + +export type Unwrap = + T extends LiveData + ? Unwrap + : T extends LiveData[] + ? Unwrap[] + : T; + +export type Flat = T extends LiveData ? LiveData> : T; + +export class PoisonedError extends Error { + constructor(originalError: any) { + super( + 'The livedata is poisoned, original error: ' + + (originalError instanceof Error ? originalError.stack : originalError) + ); + } +} diff --git a/packages/common/infra/src/livedata/ops.ts b/packages/common/infra/src/livedata/ops.ts new file mode 100644 index 0000000000..7e848b955e --- /dev/null +++ b/packages/common/infra/src/livedata/ops.ts @@ -0,0 +1,54 @@ +import { + catchError, + EMPTY, + mergeMap, + Observable, + type OperatorFunction, + pipe, +} from 'rxjs'; + +import type { LiveData } from './livedata'; + +export function mapInto(l$: LiveData) { + return pipe( + mergeMap((value: T) => { + l$.next(value); + return EMPTY; + }) + ); +} + +export function catchErrorInto(l$: LiveData) { + return pipe( + catchError((error: any) => { + l$.next(error); + return EMPTY; + }) + ); +} + +export function onStart(cb: () => void): OperatorFunction { + return observable$ => + new Observable(subscribe => { + cb(); + return observable$.subscribe(subscribe); + }); +} + +export function onComplete(cb: () => void): OperatorFunction { + return observable$ => + new Observable(subscribe => { + return observable$.subscribe({ + complete() { + cb(); + subscribe.complete(); + }, + error(err) { + subscribe.error(err); + }, + next(value) { + subscribe.next(value); + }, + }); + }); +} diff --git a/packages/common/infra/src/livedata/react.ts b/packages/common/infra/src/livedata/react.ts index 153f384704..e0690af62d 100644 --- a/packages/common/infra/src/livedata/react.ts +++ b/packages/common/infra/src/livedata/react.ts @@ -1,7 +1,7 @@ import { use } from 'foxact/use'; import { useSyncExternalStore } from 'react'; -import type { LiveData } from './index'; +import type { LiveData } from './livedata'; function noopSubscribe() { return () => {};