feat(infra): computed livedata (#6091)

This commit is contained in:
EYHN
2024-03-13 06:26:05 +00:00
parent cacc2d311e
commit fd9084ea6a
2 changed files with 223 additions and 2 deletions

View File

@@ -2,7 +2,7 @@ import type { Subscriber } from 'rxjs';
import { combineLatest, Observable, of } from 'rxjs';
import { describe, expect, test, vitest } from 'vitest';
import { LiveData } from '..';
import { LiveData, PoisonedError } from '..';
describe('livedata', () => {
test('LiveData', async () => {
@@ -133,6 +133,47 @@ describe('livedata', () => {
}
});
test('poisoned', () => {
{
let subscriber: Subscriber<number> = null!;
const livedata = LiveData.from<number>(
new Observable(sub => {
subscriber = sub;
}),
1
);
let value: number = 0;
let error: any = null;
livedata.subscribe({
next: v => {
value = v;
},
error: e => {
error = e;
},
});
expect(value).toBe(1);
subscriber.next(2);
expect(value).toBe(2);
expect(error).toBe(null);
subscriber.error('error');
expect(error).toBeInstanceOf(PoisonedError);
expect(() => livedata.next(3)).toThrowError(PoisonedError);
expect(() => livedata.value).toThrowError(PoisonedError);
let error2: any = null;
livedata.subscribe({
error: e => {
error2 = e;
},
});
expect(error2).toBeInstanceOf(PoisonedError);
}
});
test('map', () => {
{
const livedata = new LiveData(0);
@@ -223,4 +264,67 @@ describe('livedata', () => {
expect(flatten.value).toEqual([4, 3]);
}
});
test('computed', () => {
{
const a = new LiveData(1);
const b = LiveData.computed(get => get(a) + 1);
expect(b.value).toBe(2);
}
{
const a = new LiveData('v1');
const v1 = new LiveData(100);
const v2 = new LiveData(200);
const v = LiveData.computed(get => {
return get(a) === 'v1' ? get(v1) : get(v2);
});
expect(v.value).toBe(100);
a.next('v2');
expect(v.value).toBe(200);
}
{
let watched = false;
let count = 0;
let subscriber: Subscriber<number> = null!;
const a = LiveData.from<number>(
new Observable(sub => {
count++;
watched = true;
subscriber = sub;
sub.next(1);
return () => {
watched = false;
};
}),
0
);
const b = LiveData.computed(get => get(a) + 1);
expect(watched).toBe(false);
expect(count).toBe(0);
const subscription = b.subscribe(_ => {});
expect(watched).toBe(true);
expect(count).toBe(1);
subscriber.next(2);
expect(b.value).toBe(3);
subscription.unsubscribe();
expect(watched).toBe(false);
expect(count).toBe(1);
}
{
let c = null! as LiveData<number>;
const b = LiveData.computed(get => get(c) + 1);
c = LiveData.computed(get => get(b) + 1);
expect(() => b.value).toThrowError(PoisonedError);
}
});
});

View File

@@ -132,10 +132,100 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
return data;
}
private static GLOBAL_COMPUTED_RECURSIVE_COUNT = 0;
/**
* @example
* ```ts
* const a = new LiveData('v1');
* const v1 = new LiveData(100);
* const v2 = new LiveData(200);
*
* const v = LiveData.computed(get => {
* return get(a) === 'v1' ? get(v1) : get(v2);
* });
*
* expect(v.value).toBe(100);
* ```
*/
static computed<T>(
compute: (get: <L>(data: LiveData<L>) => L) => T
): LiveData<T> {
return LiveData.from(
new Observable(subscribe => {
const execute = (next: () => void) => {
const subscriptions: Subscription[] = [];
const getfn = <L>(data: LiveData<L>) => {
let value = null as L;
let first = true;
subscriptions.push(
data.subscribe({
error(err) {
subscribe.error(err);
},
next(v) {
value = v;
if (!first) {
next();
}
first = false;
},
})
);
return value;
};
LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT++;
try {
if (LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT > 10) {
subscribe.error(new Error('computed recursive limit exceeded'));
} else {
subscribe.next(compute(getfn));
}
} catch (err) {
subscribe.error(err);
} finally {
LiveData.GLOBAL_COMPUTED_RECURSIVE_COUNT--;
}
return () => {
subscriptions.forEach(s => s.unsubscribe());
};
};
let prev = () => {};
const looper = () => {
const dispose = execute(looper);
prev();
prev = dispose;
};
looper();
return () => {
prev();
};
}),
null as any
);
}
private readonly raw: BehaviorSubject<T>;
private readonly ops = new Subject<LiveDataOperation>();
private readonly upstreamSubscription: Subscription | undefined;
/**
* When the upstream Observable of livedata throws an error, livedata will enter poisoned state. This is an
* unrecoverable abnormal state. Any operation on livedata will throw a PoisonedError.
*
* Since the development specification for livedata is not to throw any error, entering the poisoned state usually
* means a programming error.
*/
private isPoisoned = false;
private poisonedError: PoisonedError | null = null;
constructor(
initialValue: T,
upstream:
@@ -155,17 +245,26 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
},
error: err => {
logger.error('uncatched error in livedata', err);
this.isPoisoned = true;
this.poisonedError = new PoisonedError(err);
this.raw.error(this.poisonedError);
},
});
}
}
getValue(): T {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops.next('get');
return this.raw.value;
}
setValue(v: T) {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.raw.next(v);
this.ops.next('set');
}
@@ -175,10 +274,13 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
}
set value(v: T) {
this.setValue(v);
this.next(v);
}
next(v: T) {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.setValue(v);
}
@@ -306,6 +408,9 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
}
reactSubscribe = (cb: () => void) => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops.next('watch');
const subscription = this.raw
.pipe(distinctUntilChanged(), skip(1))
@@ -317,6 +422,9 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
};
reactGetSnapshot = () => {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops.next('watch');
setImmediate(() => {
this.ops.next('unwatch');
@@ -343,3 +451,12 @@ export type Unwrap<T> =
: T;
export type Flat<T> = T extends LiveData<infer P> ? LiveData<Unwrap<P>> : T;
export class PoisonedError extends Error {
constructor(originalError: any) {
super(
'The livedata is poisoned, original error: ' +
(originalError instanceof Error ? originalError.stack : originalError)
);
}
}