style: enable rxjs/finnish (#6276)

chore(infra): use finnish notation for observables

do rename
This commit is contained in:
EYHN
2024-03-24 17:04:51 +00:00
parent c6676fd074
commit 2b42a75e5a
104 changed files with 797 additions and 591 deletions

View File

@@ -113,8 +113,8 @@ export async function buildShowcaseWorkspace(
// perhaps put them into middleware?
{
// the "Write, Draw, Plan all at Once." page should be set to edgeless mode
const edgelessPage1 = pageRecordList.records.value.find(
p => p.title.value === 'Write, Draw, Plan all at Once.'
const edgelessPage1 = pageRecordList.records$.value.find(
p => p.title$.value === 'Write, Draw, Plan all at Once.'
);
if (edgelessPage1) {
@@ -122,8 +122,8 @@ export async function buildShowcaseWorkspace(
}
// should jump to "Write, Draw, Plan all at Once." by default
const defaultPage = pageRecordList.records.value.find(p =>
p.title.value.startsWith('Write, Draw, Plan all at Once.')
const defaultPage = pageRecordList.records$.value.find(p =>
p.title$.value.startsWith('Write, Draw, Plan all at Once.')
);
if (defaultPage) {

View File

@@ -6,33 +6,33 @@ import { LiveData, PoisonedError } from '..';
describe('livedata', () => {
test('LiveData', async () => {
const livedata = new LiveData(0);
expect(livedata.value).toBe(0);
livedata.next(1);
expect(livedata.value).toBe(1);
const livedata$ = new LiveData(0);
expect(livedata$.value).toBe(0);
livedata$.next(1);
expect(livedata$.value).toBe(1);
let subscribed = 0;
livedata.subscribe(v => {
livedata$.subscribe(v => {
subscribed = v;
});
livedata.next(2);
expect(livedata.value).toBe(2);
livedata$.next(2);
expect(livedata$.value).toBe(2);
await vitest.waitFor(() => subscribed === 2);
});
test('from', async () => {
{
const livedata = LiveData.from(of(1, 2, 3, 4), 0);
expect(livedata.value).toBe(4);
const livedata$ = LiveData.from(of(1, 2, 3, 4), 0);
expect(livedata$.value).toBe(4);
}
{
let subscriber: Subscriber<number> = null!;
const observable = new Observable<number>(s => {
const observable$ = new Observable<number>(s => {
subscriber = s;
});
const livedata = LiveData.from(observable, 0);
const livedata$ = LiveData.from(observable$, 0);
let value = 0;
livedata.subscribe(v => {
livedata$.subscribe(v => {
value = v;
});
@@ -46,16 +46,16 @@ describe('livedata', () => {
{
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable(subscriber => {
const observable$ = new Observable(subscriber => {
observableSubscribed = true;
subscriber.next(1);
return () => {
observableClosed = true;
};
});
const livedata = LiveData.from(observable, 0);
const livedata$ = LiveData.from(observable$, 0);
expect(observableSubscribed).toBe(false);
const subscription = livedata.subscribe(_ => {});
const subscription = livedata$.subscribe(_ => {});
expect(observableSubscribed).toBe(true);
expect(observableClosed).toBe(false);
subscription.unsubscribe();
@@ -64,17 +64,17 @@ describe('livedata', () => {
{
let subscriber: Subscriber<number> = null!;
const observable = new Observable<number>(s => {
const observable$ = new Observable<number>(s => {
subscriber = s;
});
const livedata = LiveData.from(observable, 0);
const livedata$ = LiveData.from(observable$, 0);
let value1 = 0;
livedata.subscribe(v => {
livedata$.subscribe(v => {
value1 = v;
});
let value2 = 0;
livedata.subscribe(v => {
livedata$.subscribe(v => {
value2 = v;
});
@@ -91,17 +91,17 @@ describe('livedata', () => {
{
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable(subscriber => {
const observable$ = new Observable(subscriber => {
observableSubscribed = true;
subscriber.next(1);
return () => {
observableClosed = true;
};
});
const livedata = LiveData.from(observable, 0);
const livedata$ = LiveData.from(observable$, 0);
expect(observableSubscribed).toBe(false);
const subscription1 = livedata.subscribe(_ => {});
const subscription2 = livedata.subscribe(_ => {});
const subscription1 = livedata$.subscribe(_ => {});
const subscription2 = livedata$.subscribe(_ => {});
expect(observableSubscribed).toBe(true);
expect(observableClosed).toBe(false);
subscription1.unsubscribe();
@@ -112,31 +112,31 @@ describe('livedata', () => {
{
let observerCount = 0;
const observable = new Observable(_ => {
const observable$ = new Observable(_ => {
observerCount++;
});
const livedata = LiveData.from(observable, 0);
livedata.subscribe(_ => {});
livedata.subscribe(_ => {});
const livedata$ = LiveData.from(observable$, 0);
livedata$.subscribe(_ => {});
livedata$.subscribe(_ => {});
expect(observerCount).toBe(1);
}
{
let value = 0;
const observable = new Observable<number>(subscriber => {
const observable$ = new Observable<number>(subscriber => {
subscriber.next(value);
});
const livedata = LiveData.from(observable, 0);
expect(livedata.value).toBe(0);
const livedata$ = LiveData.from(observable$, 0);
expect(livedata$.value).toBe(0);
value = 1;
expect(livedata.value).toBe(1);
expect(livedata$.value).toBe(1);
}
});
test('poisoned', () => {
{
let subscriber: Subscriber<number> = null!;
const livedata = LiveData.from<number>(
const livedata$ = LiveData.from<number>(
new Observable(sub => {
subscriber = sub;
}),
@@ -145,7 +145,7 @@ describe('livedata', () => {
let value: number = 0;
let error: any = null;
livedata.subscribe({
livedata$.subscribe({
next: v => {
value = v;
},
@@ -161,11 +161,11 @@ describe('livedata', () => {
subscriber.error('error');
expect(error).toBeInstanceOf(PoisonedError);
expect(() => livedata.next(3)).toThrowError(PoisonedError);
expect(() => livedata.value).toThrowError(PoisonedError);
expect(() => livedata$.next(3)).toThrowError(PoisonedError);
expect(() => livedata$.value).toThrowError(PoisonedError);
let error2: any = null;
livedata.subscribe({
livedata$.subscribe({
error: e => {
error2 = e;
},
@@ -176,29 +176,29 @@ describe('livedata', () => {
test('map', () => {
{
const livedata = new LiveData(0);
const mapped = livedata.map(v => v + 1);
expect(mapped.value).toBe(1);
livedata.next(1);
expect(mapped.value).toBe(2);
const livedata$ = new LiveData(0);
const mapped$ = livedata$.map(v => v + 1);
expect(mapped$.value).toBe(1);
livedata$.next(1);
expect(mapped$.value).toBe(2);
}
{
const livedata = new LiveData(0);
const mapped = livedata.map(v => v + 1);
const livedata$ = new LiveData(0);
const mapped$ = livedata$.map(v => v + 1);
let value = 0;
mapped.subscribe(v => {
mapped$.subscribe(v => {
value = v;
});
expect(value).toBe(1);
livedata.next(1);
livedata$.next(1);
expect(value).toBe(2);
}
{
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable<number>(subscriber => {
const observable$ = new Observable<number>(subscriber => {
observableSubscribed = true;
subscriber.next(1);
return () => {
@@ -206,11 +206,11 @@ describe('livedata', () => {
};
});
const livedata = LiveData.from(observable, 0);
const mapped = livedata.map(v => v + 1);
const livedata$ = LiveData.from(observable$, 0);
const mapped$ = livedata$.map(v => v + 1);
expect(observableSubscribed).toBe(false);
const subscription = mapped.subscribe(_ => {});
const subscription = mapped$.subscribe(_ => {});
expect(observableSubscribed).toBe(true);
expect(observableClosed).toBe(false);
subscription.unsubscribe();
@@ -219,9 +219,9 @@ describe('livedata', () => {
});
test('interop with rxjs', () => {
const ob = combineLatest([new LiveData(1)]);
const ob$ = combineLatest([new LiveData(1)]);
let value = 0;
ob.subscribe(v => {
ob$.subscribe(v => {
value = v[0];
});
expect(value).toBe(1);
@@ -229,69 +229,69 @@ describe('livedata', () => {
test('flat', () => {
{
const wrapped = new LiveData(new LiveData(0));
const flatten = wrapped.flat();
expect(flatten.value).toBe(0);
const wrapped$ = new LiveData(new LiveData(0));
const flatten$ = wrapped$.flat();
expect(flatten$.value).toBe(0);
wrapped.next(new LiveData(1));
expect(flatten.value).toBe(1);
wrapped$.next(new LiveData(1));
expect(flatten$.value).toBe(1);
wrapped.next(LiveData.from(of(2, 3), 0));
expect(flatten.value).toBe(3);
wrapped$.next(LiveData.from(of(2, 3), 0));
expect(flatten$.value).toBe(3);
}
{
const wrapped = new LiveData(
const wrapped$ = new LiveData(
new LiveData([
new LiveData(new LiveData(1)),
new LiveData(new LiveData(2)),
])
);
const flatten = wrapped.flat();
expect(flatten.value).toStrictEqual([1, 2]);
const flatten$ = wrapped$.flat();
expect(flatten$.value).toStrictEqual([1, 2]);
}
{
const wrapped = new LiveData([new LiveData(0), new LiveData(1)]);
const flatten = wrapped.flat();
const wrapped$ = new LiveData([new LiveData(0), new LiveData(1)]);
const flatten$ = wrapped$.flat();
expect(flatten.value).toEqual([0, 1]);
expect(flatten$.value).toEqual([0, 1]);
const inner = new LiveData(2);
wrapped.next([inner, new LiveData(3)]);
expect(flatten.value).toEqual([2, 3]);
inner.next(4);
expect(flatten.value).toEqual([4, 3]);
const inner$ = new LiveData(2);
wrapped$.next([inner$, new LiveData(3)]);
expect(flatten$.value).toEqual([2, 3]);
inner$.next(4);
expect(flatten$.value).toEqual([4, 3]);
}
});
test('computed', () => {
{
const a = new LiveData(1);
const b = LiveData.computed(get => get(a) + 1);
expect(b.value).toBe(2);
const a$ = new LiveData(1);
const b$ = LiveData.computed(get => get(a$) + 1);
expect(b$.value).toBe(2);
}
{
const a = new LiveData('v1');
const v1 = new LiveData(100);
const v2 = new LiveData(200);
const a$ = new LiveData('v1');
const v1$ = new LiveData(100);
const v2$ = new LiveData(200);
const v = LiveData.computed(get => {
return get(a) === 'v1' ? get(v1) : get(v2);
const v$ = LiveData.computed(get => {
return get(a$) === 'v1' ? get(v1$) : get(v2$);
});
expect(v.value).toBe(100);
expect(v$.value).toBe(100);
a.next('v2');
expect(v.value).toBe(200);
a$.next('v2');
expect(v$.value).toBe(200);
}
{
let watched = false;
let count = 0;
let subscriber: Subscriber<number> = null!;
const a = LiveData.from<number>(
const a$ = LiveData.from<number>(
new Observable(sub => {
count++;
watched = true;
@@ -303,16 +303,16 @@ describe('livedata', () => {
}),
0
);
const b = LiveData.computed(get => get(a) + 1);
const b$ = LiveData.computed(get => get(a$) + 1);
expect(watched).toBe(false);
expect(count).toBe(0);
const subscription = b.subscribe(_ => {});
const subscription = b$.subscribe(_ => {});
expect(watched).toBe(true);
expect(count).toBe(1);
subscriber.next(2);
expect(b.value).toBe(3);
expect(b$.value).toBe(3);
subscription.unsubscribe();
expect(watched).toBe(false);
@@ -320,11 +320,11 @@ describe('livedata', () => {
}
{
let c = null! as LiveData<number>;
const b = LiveData.computed(get => get(c) + 1);
c = LiveData.computed(get => get(b) + 1);
let c$ = null! as LiveData<number>;
const b$ = LiveData.computed(get => get(c$) + 1);
c$ = LiveData.computed(get => get(b$) + 1);
expect(() => b.value).toThrowError(PoisonedError);
expect(() => b$.value).toThrowError(PoisonedError);
}
});
});

View File

@@ -10,11 +10,11 @@ import { LiveData, useLiveData } from '..';
describe('livedata', () => {
test('react', () => {
const livedata = new LiveData(0);
const livedata$ = new LiveData(0);
const Component = () => {
const renderCount = useRef(0);
renderCount.current++;
const value = useLiveData(livedata);
const value = useLiveData(livedata$);
return (
<main>
{renderCount.current}:{value}
@@ -23,7 +23,7 @@ describe('livedata', () => {
};
const { rerender } = render(<Component />);
expect(screen.getByRole('main').innerText).toBe('1:0');
livedata.next(1);
livedata$.next(1);
rerender(<Component />);
expect(screen.getByRole('main').innerText).toBe('3:1');
});
@@ -31,7 +31,7 @@ describe('livedata', () => {
test('lifecycle', async () => {
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable<number>(subscriber => {
const observable$ = new Observable<number>(subscriber => {
observableSubscribed = true;
subscriber.next(1);
console.log(1);
@@ -40,9 +40,9 @@ describe('livedata', () => {
};
});
const livedata = LiveData.from(observable, 0);
const livedata$ = LiveData.from(observable$, 0);
const Component1 = () => {
const value = useLiveData(livedata);
const value = useLiveData(livedata$);
return <main>{value}</main>;
};

View File

@@ -14,6 +14,7 @@ import {
skip,
type Subscription,
switchMap,
type TeardownLogic,
} from 'rxjs';
import { BehaviorSubject, Subject } from 'rxjs';
@@ -78,20 +79,23 @@ const logger = new DebugLogger('livedata');
* @see {@link https://rxjs.dev/api/index/class/BehaviorSubject}
* @see {@link https://developer.android.com/topic/libraries/architecture/livedata}
*/
export class LiveData<T = unknown> implements InteropObservable<T> {
export class LiveData<T = unknown>
extends Observable<T>
implements InteropObservable<T>
{
static from<T>(
upstream:
upstream$:
| Observable<T>
| InteropObservable<T>
| ((stream: Observable<LiveDataOperation>) => Observable<T>),
initialValue: T
): LiveData<T> {
const data = new LiveData(
const data$ = new LiveData(
initialValue,
typeof upstream === 'function'
? upstream
: stream =>
stream.pipe(
typeof upstream$ === 'function'
? upstream$
: stream$ =>
stream$.pipe(
filter(
(op): op is Exclude<LiveDataOperation, 'set'> => op !== 'set'
),
@@ -121,7 +125,7 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
distinctUntilChanged(),
switchMap(op => {
if (op === 'watch') {
return upstream;
return upstream$;
} else {
return EMPTY;
}
@@ -129,7 +133,7 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
)
);
return data;
return data$;
}
private static GLOBAL_COMPUTED_RECURSIVE_COUNT = 0;
@@ -155,11 +159,11 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
new Observable(subscribe => {
const execute = (next: () => void) => {
const subscriptions: Subscription[] = [];
const getfn = <L>(data: LiveData<L>) => {
const getfn = <L>(data$: LiveData<L>) => {
let value = null as L;
let first = true;
subscriptions.push(
data.subscribe({
data$.subscribe({
error(err) {
subscribe.error(err);
},
@@ -212,8 +216,8 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
);
}
private readonly raw: BehaviorSubject<T>;
private readonly ops = new Subject<LiveDataOperation>();
private readonly raw$: BehaviorSubject<T>;
private readonly ops$ = new Subject<LiveDataOperation>();
private readonly upstreamSubscription: Subscription | undefined;
/**
@@ -232,14 +236,15 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
| ((upstream: Observable<LiveDataOperation>) => Observable<T>)
| undefined = undefined
) {
this.raw = new BehaviorSubject(initialValue);
super();
this.raw$ = new BehaviorSubject(initialValue);
if (upstream) {
this.upstreamSubscription = upstream(this.ops).subscribe({
this.upstreamSubscription = upstream(this.ops$).subscribe({
next: v => {
this.raw.next(v);
this.raw$.next(v);
},
complete: () => {
if (!this.raw.closed) {
if (!this.raw$.closed) {
logger.error('livedata upstream unexpected complete');
}
},
@@ -247,7 +252,7 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
logger.error('uncatched error in livedata', err);
this.isPoisoned = true;
this.poisonedError = new PoisonedError(err);
this.raw.error(this.poisonedError);
this.raw$.error(this.poisonedError);
},
});
}
@@ -257,16 +262,16 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops.next('get');
return this.raw.value;
this.ops$.next('get');
return this.raw$.value;
}
setValue(v: T) {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.raw.next(v);
this.ops.next('set');
this.raw$.next(v);
this.ops$.next('set');
}
get value(): T {
@@ -284,66 +289,81 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
this.setValue(v);
}
subscribe(
observer?: Partial<Observer<T>> | ((value: T) => void) | undefined
override subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void)
): Subscription;
override subscribe(
next?: ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription;
override subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
this.ops.next('watch');
const subscription = this.raw.subscribe(observer);
this.ops$.next('watch');
const subscription = this.raw$.subscribe(
observerOrNext as any,
error,
complete
);
subscription.add(() => {
this.ops.next('unwatch');
this.ops$.next('unwatch');
});
return subscription;
}
map<R>(mapper: (v: T) => R) {
const sub = LiveData.from(
map<R>(mapper: (v: T) => R): LiveData<R> {
const sub$ = LiveData.from(
new Observable<R>(subscriber =>
this.subscribe({
next: v => {
subscriber.next(mapper(v));
},
complete: () => {
sub.complete();
sub$.complete();
},
})
),
undefined as R // is safe
);
return sub;
return sub$;
}
// eslint-disable-next-line rxjs/finnish
asObservable(): Observable<T> {
return new Observable<T>(subscriber => {
return this.subscribe(subscriber);
});
}
pipe(): Observable<T>;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
pipe<A, B>(
override pipe(): Observable<T>;
override pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
override pipe<A, B>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>
): Observable<B>;
pipe<A, B, C>(
override pipe<A, B, C>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>
): Observable<C>;
pipe<A, B, C, D>(
override pipe<A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Observable<D>;
pipe<A, B, C, D, E>(
override pipe<A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Observable<E>;
pipe<A, B, C, D, E, F>(
override pipe<A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
@@ -351,23 +371,23 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Observable<F>;
pipe(...args: any[]) {
override pipe(...args: any[]) {
return new Observable(subscriber => {
this.ops.next('watch');
this.ops$.next('watch');
// eslint-disable-next-line prefer-spread
const subscription = this.raw.pipe
.apply(this.raw, args as any)
const subscription = this.raw$.pipe
.apply(this.raw$, args as any)
.subscribe(subscriber);
subscription.add(() => {
this.ops.next('unwatch');
this.ops$.next('unwatch');
});
return subscription;
});
}
complete() {
this.ops.complete();
this.raw.complete();
this.ops$.complete();
this.raw$.complete();
this.upstreamSubscription?.unsubscribe();
}
@@ -411,12 +431,12 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops.next('watch');
const subscription = this.raw
this.ops$.next('watch');
const subscription = this.raw$
.pipe(distinctUntilChanged(), skip(1))
.subscribe(cb);
subscription.add(() => {
this.ops.next('unwatch');
this.ops$.next('unwatch');
});
return () => subscription.unsubscribe();
};
@@ -425,13 +445,17 @@ export class LiveData<T = unknown> implements InteropObservable<T> {
if (this.isPoisoned) {
throw this.poisonedError;
}
this.ops.next('watch');
this.ops$.next('watch');
setImmediate(() => {
this.ops.next('unwatch');
this.ops$.next('unwatch');
});
return this.raw.value;
return this.raw$.value;
};
protected _subscribe(): TeardownLogic {
throw new Error('Method not implemented.');
}
[Symbol.observable || '@@observable']() {
return this;
}

View File

@@ -40,13 +40,13 @@ export function useLiveData<Input extends LiveData<any> | null | undefined>(
/**
* subscribe LiveData and return the value. If the value is nullish, will suspends until the value is not nullish.
*/
export function useEnsureLiveData<T>(liveData: LiveData<T>): NonNullable<T> {
const data = useLiveData(liveData);
export function useEnsureLiveData<T>(liveData$: LiveData<T>): NonNullable<T> {
const data = useLiveData(liveData$);
if (data === null || data === undefined) {
return use(
new Promise((resolve, reject) => {
const subscription = liveData.subscribe({
const subscription = liveData$.subscribe({
next(value) {
if (value === null || value === undefined) {
resolve(value);

View File

@@ -16,7 +16,7 @@ export class PageManager {
) {}
open(pageId: string) {
const pageRecord = this.pageRecordList.record(pageId).value;
const pageRecord = this.pageRecordList.record$(pageId).value;
if (!pageRecord) {
throw new Error('Page record not found');
}

View File

@@ -14,9 +14,9 @@ export class Doc {
return this.record.id;
}
readonly mete = this.record.meta;
readonly mode = this.record.mode;
readonly title = this.record.title;
readonly mete$ = this.record.meta$;
readonly mode$ = this.record.mode$;
readonly title$ = this.record.title$;
setMode(mode: PageMode) {
this.record.setMode(mode);

View File

@@ -11,7 +11,7 @@ export class PageRecordList {
private readonly localState: WorkspaceLocalState
) {}
public readonly records = LiveData.from<PageRecord[]>(
public readonly records$ = LiveData.from<PageRecord[]>(
new Observable<string[]>(subscriber => {
const emit = () => {
subscriber.next(
@@ -35,11 +35,11 @@ export class PageRecordList {
[]
);
public readonly isReady = this.workspace.engine.rootDocState.map(
public readonly isReady$ = this.workspace.engine.rootDocState$.map(
state => !state.syncing
);
public record(id: string) {
return this.records.map(record => record.find(record => record.id === id));
public record$(id: string) {
return this.records$.map(record => record.find(record => record.id === id));
}
}

View File

@@ -13,7 +13,7 @@ export class PageRecord {
private readonly localState: WorkspaceLocalState
) {}
meta = LiveData.from<DocMeta>(
meta$ = LiveData.from<DocMeta>(
new Observable(subscriber => {
const emit = () => {
const meta = this.workspace.docCollection.meta.docMetas.find(
@@ -45,7 +45,7 @@ export class PageRecord {
this.workspace.docCollection.setDocMeta(this.id, meta);
}
mode: LiveData<PageMode> = LiveData.from(
mode$: LiveData<PageMode> = LiveData.from(
this.localState.watch<PageMode>(`page:${this.id}:mode`),
'page'
).map(mode => (mode === 'edgeless' ? 'edgeless' : 'page'));
@@ -55,9 +55,9 @@ export class PageRecord {
}
toggleMode() {
this.setMode(this.mode.value === 'edgeless' ? 'page' : 'edgeless');
return this.mode.value;
this.setMode(this.mode$.value === 'edgeless' ? 'page' : 'edgeless');
return this.mode$.value;
}
title = this.meta.map(meta => meta.title);
title$ = this.meta$.map(meta => meta.title);
}

View File

@@ -40,12 +40,12 @@ export class MemoryMemento implements Memento {
private readonly data = new Map<string, LiveData<any>>();
private getLiveData(key: string): LiveData<any> {
let data = this.data.get(key);
if (!data) {
data = new LiveData<any>(null);
this.data.set(key, data);
let data$ = this.data.get(key);
if (!data$) {
data$ = new LiveData<any>(null);
this.data.set(key, data$);
}
return data;
return data$;
}
get<T>(key: string): T | null {

View File

@@ -14,13 +14,13 @@ describe('Workspace System', () => {
const provider = services.provider();
const workspaceManager = provider.get(WorkspaceManager);
const workspaceListService = provider.get(WorkspaceListService);
expect(workspaceListService.workspaceList.value.length).toBe(0);
expect(workspaceListService.workspaceList$.value.length).toBe(0);
const { workspace } = workspaceManager.open(
await workspaceManager.createWorkspace(WorkspaceFlavour.LOCAL)
);
expect(workspaceListService.workspaceList.value.length).toBe(1);
expect(workspaceListService.workspaceList$.value.length).toBe(1);
const page = workspace.docCollection.createDoc({
id: 'page0',

View File

@@ -32,10 +32,10 @@ export class DocEngine {
storage: DocStorageInner;
engineState = LiveData.computed(get => {
const localState = get(this.localPart.engineState);
engineState$ = LiveData.computed(get => {
const localState = get(this.localPart.engineState$);
if (this.remotePart) {
const remoteState = get(this.remotePart?.engineState);
const remoteState = get(this.remotePart?.engineState$);
return {
total: remoteState.total,
syncing: remoteState.syncing,
@@ -53,12 +53,12 @@ export class DocEngine {
};
});
docState(docId: string) {
const localState = this.localPart.docState(docId);
const remoteState = this.remotePart?.docState(docId);
docState$(docId: string) {
const localState$ = this.localPart.docState$(docId);
const remoteState$ = this.remotePart?.docState$(docId);
return LiveData.computed(get => {
const local = get(localState);
const remote = remoteState ? get(remoteState) : null;
const local = get(localState$);
const remote = remoteState$ ? get(remoteState$) : null;
return {
ready: local.ready,
saving: local.syncing,
@@ -134,7 +134,7 @@ export class DocEngine {
*/
waitForSaved() {
return new Promise<void>(resolve => {
this.engineState
this.engineState$
.pipe(map(state => state.saving === 0))
.subscribe(saved => {
if (saved) {
@@ -150,7 +150,7 @@ export class DocEngine {
*/
waitForSynced() {
return new Promise<void>(resolve => {
this.engineState
this.engineState$
.pipe(map(state => state.syncing === 0 && state.saving === 0))
.subscribe(synced => {
if (synced) {
@@ -175,7 +175,7 @@ export class DocEngine {
*/
waitForReady(docId: string) {
return new Promise<void>(resolve => {
this.docState(docId)
this.docState$(docId)
.pipe(map(state => state.ready))
.subscribe(ready => {
if (ready) {

View File

@@ -48,7 +48,7 @@ export interface LocalDocState {
*/
export class DocEngineLocalPart {
private readonly prioritySettings = new Map<string, number>();
private readonly statusUpdatedSubject = new Subject<string>();
private readonly statusUpdatedSubject$ = new Subject<string>();
private readonly status = {
docs: new Map<string, YDoc>(),
@@ -59,7 +59,7 @@ export class DocEngineLocalPart {
currentJob: null as { docId: string; jobs: Job[] } | null,
};
engineState = LiveData.from<LocalEngineState>(
engineState$ = LiveData.from<LocalEngineState>(
new Observable(subscribe => {
const next = () => {
subscribe.next({
@@ -68,14 +68,14 @@ export class DocEngineLocalPart {
});
};
next();
return this.statusUpdatedSubject.subscribe(() => {
return this.statusUpdatedSubject$.subscribe(() => {
next();
});
}),
{ syncing: 0, total: 0 }
);
docState(docId: string) {
docState$(docId: string) {
return LiveData.from<LocalDocState>(
new Observable(subscribe => {
const next = () => {
@@ -87,7 +87,7 @@ export class DocEngineLocalPart {
});
};
next();
return this.statusUpdatedSubject.subscribe(updatedId => {
return this.statusUpdatedSubject$.subscribe(updatedId => {
if (updatedId === docId) next();
});
}),
@@ -120,7 +120,7 @@ export class DocEngineLocalPart {
}
this.status.currentJob = { docId, jobs };
this.statusUpdatedSubject.next(docId);
this.statusUpdatedSubject$.next(docId);
const { apply, load, save } = groupBy(jobs, job => job.type) as {
[key in Job['type']]?: Job[];
@@ -139,7 +139,7 @@ export class DocEngineLocalPart {
}
this.status.currentJob = null;
this.statusUpdatedSubject.next(docId);
this.statusUpdatedSubject$.next(docId);
}
} finally {
dispose();
@@ -161,7 +161,7 @@ export class DocEngineLocalPart {
});
this.status.docs.set(doc.guid, doc);
this.statusUpdatedSubject.next(doc.guid);
this.statusUpdatedSubject$.next(doc.guid);
},
};
@@ -186,7 +186,7 @@ export class DocEngineLocalPart {
doc.on('update', this.handleDocUpdate);
this.status.connectedDocs.add(job.docId);
this.statusUpdatedSubject.next(job.docId);
this.statusUpdatedSubject$.next(job.docId);
const docData = await this.storage.loadDocFromLocal(job.docId, signal);
@@ -196,7 +196,7 @@ export class DocEngineLocalPart {
this.applyUpdate(job.docId, docData);
this.status.readyDocs.add(job.docId);
this.statusUpdatedSubject.next(job.docId);
this.statusUpdatedSubject$.next(job.docId);
},
save: async (
docId: string,
@@ -292,7 +292,7 @@ export class DocEngineLocalPart {
const existingJobs = this.status.jobMap.get(job.docId) ?? [];
existingJobs.push(job);
this.status.jobMap.set(job.docId, existingJobs);
this.statusUpdatedSubject.next(job.docId);
this.statusUpdatedSubject$.next(job.docId);
}
setPriority(docId: string, priority: number) {

View File

@@ -81,9 +81,9 @@ export class DocEngineRemotePart {
retrying: false,
errorMessage: null,
};
private readonly statusUpdatedSubject = new Subject<string | true>();
private readonly statusUpdatedSubject$ = new Subject<string | true>();
engineState = LiveData.from<RemoteEngineState>(
engineState$ = LiveData.from<RemoteEngineState>(
new Observable(subscribe => {
const next = () => {
if (!this.status.syncing) {
@@ -103,7 +103,7 @@ export class DocEngineRemotePart {
});
};
next();
return this.statusUpdatedSubject.subscribe(() => {
return this.statusUpdatedSubject$.subscribe(() => {
next();
});
}),
@@ -115,7 +115,7 @@ export class DocEngineRemotePart {
}
);
docState(docId: string) {
docState$(docId: string) {
return LiveData.from<RemoteDocState>(
new Observable(subscribe => {
const next = () => {
@@ -126,7 +126,7 @@ export class DocEngineRemotePart {
});
};
next();
return this.statusUpdatedSubject.subscribe(updatedId => {
return this.statusUpdatedSubject$.subscribe(updatedId => {
if (updatedId === true || updatedId === docId) next();
});
}),
@@ -152,7 +152,7 @@ export class DocEngineRemotePart {
}
this.status.connectedDocs.add(docId);
this.statusUpdatedSubject.next(docId);
this.statusUpdatedSubject$.next(docId);
},
push: async (
docId: string,
@@ -321,7 +321,7 @@ export class DocEngineRemotePart {
addDoc: (docId: string) => {
if (!this.status.docs.has(docId)) {
this.status.docs.add(docId);
this.statusUpdatedSubject.next(docId);
this.statusUpdatedSubject$.next(docId);
this.schedule({
type: 'connect',
docId,
@@ -359,7 +359,7 @@ export class DocEngineRemotePart {
logger.error('Remote sync error, retry in 5s', err);
this.status.errorMessage =
err instanceof Error ? err.message : `${err}`;
this.statusUpdatedSubject.next(true);
this.statusUpdatedSubject$.next(true);
} finally {
this.status = {
docs: this.status.docs,
@@ -371,7 +371,7 @@ export class DocEngineRemotePart {
retrying: true,
errorMessage: this.status.errorMessage,
};
this.statusUpdatedSubject.next(true);
this.statusUpdatedSubject$.next(true);
}
await Promise.race([
new Promise<void>(resolve => {
@@ -420,7 +420,7 @@ export class DocEngineRemotePart {
logger.info('Remote sync started');
this.status.syncing = true;
this.statusUpdatedSubject.next(true);
this.statusUpdatedSubject$.next(true);
this.server.onInterrupted(reason => {
abort.abort(reason);
@@ -471,7 +471,7 @@ export class DocEngineRemotePart {
const jobs = this.status.jobMap.get(docId);
if (!jobs || jobs.length === 0) {
this.status.jobMap.delete(docId);
this.statusUpdatedSubject.next(docId);
this.statusUpdatedSubject$.next(docId);
break;
}
@@ -535,7 +535,7 @@ export class DocEngineRemotePart {
const existingJobs = this.status.jobMap.get(job.docId) ?? [];
existingJobs.push(job);
this.status.jobMap.set(job.docId, existingJobs);
this.statusUpdatedSubject.next(job.docId);
this.statusUpdatedSubject$.next(job.docId);
}
setPriority(docId: string, priority: number) {

View File

@@ -52,7 +52,7 @@ export class WorkspaceEngine {
}
canGracefulStop() {
return this.doc.engineState.value.saving === 0;
return this.doc.engineState$.value.saving === 0;
}
async waitForGracefulStop(abort?: AbortSignal) {
@@ -67,9 +67,9 @@ export class WorkspaceEngine {
this.blob.stop();
}
docEngineState = this.doc.engineState;
docEngineState$ = this.doc.engineState$;
rootDocState = this.doc.docState(this.yDoc.guid);
rootDocState$ = this.doc.docState$(this.yDoc.guid);
waitForSynced() {
return this.doc.waitForSynced();

View File

@@ -86,18 +86,18 @@ export class WorkspaceListService {
WorkspaceInformation
>();
status = new LiveData<WorkspaceListStatus>({
status$ = new LiveData<WorkspaceListStatus>({
loading: true,
workspaceList: [],
});
setStatus(status: WorkspaceListStatus) {
this.status.next(status);
this.status$.next(status);
// update cache
writeWorkspaceListCache(this.cache, status.workspaceList);
}
workspaceList = this.status.map(x => x.workspaceList);
workspaceList$ = this.status$.map(x => x.workspaceList);
constructor(
private readonly providers: WorkspaceListProvider[],
@@ -106,8 +106,8 @@ export class WorkspaceListService {
// initialize workspace list from cache
const cached = readWorkspaceListCache(cache);
const workspaceList = cached;
this.status.next({
...this.status.value,
this.status$.next({
...this.status$.value,
workspaceList,
});
@@ -134,7 +134,7 @@ export class WorkspaceListService {
}
const metadata = await provider.create(initial);
// update workspace list
this.setStatus(this.addWorkspace(this.status.value, metadata));
this.setStatus(this.addWorkspace(this.status$.value, metadata));
return metadata;
}
@@ -157,7 +157,7 @@ export class WorkspaceListService {
await provider.delete(workspaceMetadata.id);
// delete workspace from list
this.setStatus(this.deleteWorkspace(this.status.value, workspaceMetadata));
this.setStatus(this.deleteWorkspace(this.status$.value, workspaceMetadata));
}
/**
@@ -201,7 +201,7 @@ export class WorkspaceListService {
added?: WorkspaceMetadata[];
deleted?: WorkspaceMetadata[];
}) {
let status = this.status.value;
let status = this.status$.value;
for (const added of changed.added ?? []) {
status = this.addWorkspace(status, added);
@@ -239,7 +239,7 @@ export class WorkspaceListService {
})
.finally(() => {
this.setStatus({
...this.status.value,
...this.status$.value,
loading: false,
});
});
@@ -250,7 +250,7 @@ export class WorkspaceListService {
this.providers.map(async provider => {
try {
const list = await provider.getList();
const oldList = this.workspaceList.value.filter(
const oldList = this.workspaceList$.value.filter(
w => w.flavour === provider.name
);
this.handleWorkspaceChange({