diff --git a/packages/common/infra/src/livedata/__tests__/livedata.spec.ts b/packages/common/infra/src/livedata/__tests__/livedata.spec.ts index 4eccf553de..dec3f98972 100644 --- a/packages/common/infra/src/livedata/__tests__/livedata.spec.ts +++ b/packages/common/infra/src/livedata/__tests__/livedata.spec.ts @@ -2,7 +2,7 @@ import type { Subscriber } from 'rxjs'; import { combineLatest, Observable, of } from 'rxjs'; import { describe, expect, test, vitest } from 'vitest'; -import { LiveData } from '..'; +import { LiveData, PoisonedError } from '..'; describe('livedata', () => { test('LiveData', async () => { @@ -133,6 +133,47 @@ describe('livedata', () => { } }); + test('poisoned', () => { + { + let subscriber: Subscriber = null!; + const livedata = LiveData.from( + new Observable(sub => { + subscriber = sub; + }), + 1 + ); + + let value: number = 0; + let error: any = null; + livedata.subscribe({ + next: v => { + value = v; + }, + error: e => { + error = e; + }, + }); + expect(value).toBe(1); + subscriber.next(2); + expect(value).toBe(2); + + expect(error).toBe(null); + subscriber.error('error'); + expect(error).toBeInstanceOf(PoisonedError); + + expect(() => livedata.next(3)).toThrowError(PoisonedError); + expect(() => livedata.value).toThrowError(PoisonedError); + + let error2: any = null; + livedata.subscribe({ + error: e => { + error2 = e; + }, + }); + expect(error2).toBeInstanceOf(PoisonedError); + } + }); + test('map', () => { { const livedata = new LiveData(0); @@ -223,4 +264,67 @@ describe('livedata', () => { expect(flatten.value).toEqual([4, 3]); } }); + + test('computed', () => { + { + const a = new LiveData(1); + const b = LiveData.computed(get => get(a) + 1); + expect(b.value).toBe(2); + } + + { + const a = new LiveData('v1'); + const v1 = new LiveData(100); + const v2 = new LiveData(200); + + const v = LiveData.computed(get => { + return get(a) === 'v1' ? get(v1) : get(v2); + }); + + expect(v.value).toBe(100); + + a.next('v2'); + expect(v.value).toBe(200); + } + + { + let watched = false; + let count = 0; + let subscriber: Subscriber = null!; + const a = LiveData.from( + new Observable(sub => { + count++; + watched = true; + subscriber = sub; + sub.next(1); + return () => { + watched = false; + }; + }), + 0 + ); + const b = LiveData.computed(get => get(a) + 1); + + expect(watched).toBe(false); + expect(count).toBe(0); + + const subscription = b.subscribe(_ => {}); + expect(watched).toBe(true); + expect(count).toBe(1); + subscriber.next(2); + expect(b.value).toBe(3); + + subscription.unsubscribe(); + expect(watched).toBe(false); + expect(count).toBe(1); + } + + { + let c = null! as LiveData; + const b = LiveData.computed(get => get(c) + 1); + c = LiveData.computed(get => get(b) + 1); + + expect(() => b.value).toThrowError(PoisonedError); + } + }); }); diff --git a/packages/common/infra/src/livedata/index.ts b/packages/common/infra/src/livedata/index.ts index 53d737d235..06524dc4cf 100644 --- a/packages/common/infra/src/livedata/index.ts +++ b/packages/common/infra/src/livedata/index.ts @@ -132,10 +132,100 @@ export class LiveData implements InteropObservable { return data; } + private static GLOBAL_COMPUTED_RECURSIVE_COUNT = 0; + + /** + * @example + * ```ts + * const a = new LiveData('v1'); + * const v1 = new LiveData(100); + * const v2 = new LiveData(200); + * + * const v = LiveData.computed(get => { + * return get(a) === 'v1' ? get(v1) : get(v2); + * }); + * + * expect(v.value).toBe(100); + * ``` + */ + static computed( + compute: (get: (data: LiveData) => L) => T + ): LiveData { + return LiveData.from( + new Observable(subscribe => { + const execute = (next: () => void) => { + const subscriptions: Subscription[] = []; + const getfn = (data: LiveData) => { + let value = null as L; + let first = true; + subscriptions.push( + data.subscribe({ + error(err) { + subscribe.error(err); + }, + next(v) { + value = v; + if (!first) { + next(); + } + first = false; + }, + }) + ); + return value; + }; + + LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT++; + + try { + if (LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT > 10) { + subscribe.error(new Error('computed recursive limit exceeded')); + } else { + subscribe.next(compute(getfn)); + } + } catch (err) { + subscribe.error(err); + } finally { + LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT--; + } + + return () => { + subscriptions.forEach(s => s.unsubscribe()); + }; + }; + + let prev = () => {}; + + const looper = () => { + const dispose = execute(looper); + prev(); + prev = dispose; + }; + + looper(); + + return () => { + prev(); + }; + }), + null as any + ); + } + private readonly raw: BehaviorSubject; private readonly ops = new Subject(); private readonly upstreamSubscription: Subscription | undefined; + /** + * When the upstream Observable of livedata throws an error, livedata will enter poisoned state. This is an + * unrecoverable abnormal state. Any operation on livedata will throw a PoisonedError. + * + * Since the development specification for livedata is not to throw any error, entering the poisoned state usually + * means a programming error. + */ + private isPoisoned = false; + private poisonedError: PoisonedError | null = null; + constructor( initialValue: T, upstream: @@ -155,17 +245,26 @@ export class LiveData implements InteropObservable { }, error: err => { logger.error('uncatched error in livedata', err); + this.isPoisoned = true; + this.poisonedError = new PoisonedError(err); + this.raw.error(this.poisonedError); }, }); } } getValue(): T { + if (this.isPoisoned) { + throw this.poisonedError; + } this.ops.next('get'); return this.raw.value; } setValue(v: T) { + if (this.isPoisoned) { + throw this.poisonedError; + } this.raw.next(v); this.ops.next('set'); } @@ -175,10 +274,13 @@ export class LiveData implements InteropObservable { } set value(v: T) { - this.setValue(v); + this.next(v); } next(v: T) { + if (this.isPoisoned) { + throw this.poisonedError; + } this.setValue(v); } @@ -306,6 +408,9 @@ export class LiveData implements InteropObservable { } reactSubscribe = (cb: () => void) => { + if (this.isPoisoned) { + throw this.poisonedError; + } this.ops.next('watch'); const subscription = this.raw .pipe(distinctUntilChanged(), skip(1)) @@ -317,6 +422,9 @@ export class LiveData implements InteropObservable { }; reactGetSnapshot = () => { + if (this.isPoisoned) { + throw this.poisonedError; + } this.ops.next('watch'); setImmediate(() => { this.ops.next('unwatch'); @@ -343,3 +451,12 @@ export type Unwrap = : T; export type Flat = T extends LiveData ? LiveData> : T; + +export class PoisonedError extends Error { + constructor(originalError: any) { + super( + 'The livedata is poisoned, original error: ' + + (originalError instanceof Error ? originalError.stack : originalError) + ); + } +}