mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-12 12:28:42 +00:00
feat(core): pdf preview (#8569)
Co-authored-by: forehalo <forehalo@gmail.com>
This commit is contained in:
@@ -39,7 +39,7 @@ consumer.register('subscribeStatus', (id: number) => {
|
||||
|
||||
// subscribe
|
||||
const client: OpClient<Ops>;
|
||||
client.subscribe('subscribeStatus', 123, {
|
||||
client.ob$('subscribeStatus', 123).subscribe({
|
||||
next: status => {
|
||||
ui.setServerStatus(status);
|
||||
},
|
||||
|
||||
@@ -116,7 +116,7 @@ describe('op client', () => {
|
||||
|
||||
// @ts-expect-error internal api
|
||||
const subscriptions = ctx.producer.obs;
|
||||
ctx.producer.subscribe('sub', new Uint8Array([1, 2, 3]), ob);
|
||||
ctx.producer.ob$('sub', new Uint8Array([1, 2, 3])).subscribe(ob);
|
||||
|
||||
expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(`
|
||||
{
|
||||
@@ -160,7 +160,7 @@ describe('op client', () => {
|
||||
error: vi.fn(),
|
||||
complete: vi.fn(),
|
||||
};
|
||||
ctx.producer.subscribe('sub', new Uint8Array([1, 2, 3]), ob);
|
||||
ctx.producer.ob$('sub', new Uint8Array([1, 2, 3])).subscribe(ob);
|
||||
|
||||
expect(subscriptions.has('sub:2')).toBe(true);
|
||||
|
||||
@@ -179,29 +179,23 @@ describe('op client', () => {
|
||||
|
||||
it('should transfer transferables with subscribe op', async ctx => {
|
||||
const data = new Uint8Array([1, 2, 3]);
|
||||
const unsubscribe = ctx.producer.subscribe(
|
||||
'bin',
|
||||
transfer(data, [data.buffer]),
|
||||
{
|
||||
const sub = ctx.producer
|
||||
.ob$('bin', transfer(data, [data.buffer]))
|
||||
.subscribe({
|
||||
next: vi.fn(),
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
expect(data.byteLength).toBe(0);
|
||||
|
||||
unsubscribe();
|
||||
sub.unsubscribe();
|
||||
});
|
||||
|
||||
it('should unsubscribe subscription op', ctx => {
|
||||
const unsubscribe = ctx.producer.subscribe(
|
||||
'sub',
|
||||
new Uint8Array([1, 2, 3]),
|
||||
{
|
||||
next: vi.fn(),
|
||||
}
|
||||
);
|
||||
const sub = ctx.producer.ob$('sub', new Uint8Array([1, 2, 3])).subscribe({
|
||||
next: vi.fn(),
|
||||
});
|
||||
|
||||
unsubscribe();
|
||||
sub.unsubscribe();
|
||||
|
||||
expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(`
|
||||
[
|
||||
|
||||
@@ -22,7 +22,7 @@ interface PendingCall extends PromiseWithResolvers<any> {
|
||||
timeout: number | NodeJS.Timeout;
|
||||
}
|
||||
|
||||
interface OpClientOptions {
|
||||
export interface OpClientOptions {
|
||||
timeout?: number;
|
||||
}
|
||||
|
||||
@@ -155,15 +155,11 @@ export class OpClient<Ops extends OpSchema> extends AutoMessageHandler {
|
||||
return promise;
|
||||
}
|
||||
|
||||
subscribe<Op extends OpNames<Ops>, Out extends OpOutput<Ops, Op>>(
|
||||
ob$<Op extends OpNames<Ops>, Out extends OpOutput<Ops, Op>>(
|
||||
op: Op,
|
||||
...args: [
|
||||
...OpInput<Ops, Op>,
|
||||
Partial<Observer<Out>> | ((value: Out) => void),
|
||||
]
|
||||
): () => void {
|
||||
...args: OpInput<Ops, Op>
|
||||
): Observable<Out> {
|
||||
const payload = args[0];
|
||||
const observer = args[1] as Partial<Observer<Out>> | ((value: Out) => void);
|
||||
|
||||
const msg = {
|
||||
type: 'subscribe',
|
||||
@@ -172,24 +168,23 @@ export class OpClient<Ops extends OpSchema> extends AutoMessageHandler {
|
||||
payload,
|
||||
} satisfies SubscribeMessage;
|
||||
|
||||
const sub = new Observable<Out>(ob => {
|
||||
const sub$ = new Observable<Out>(ob => {
|
||||
this.obs.set(msg.id, ob);
|
||||
}).subscribe(observer);
|
||||
|
||||
sub.add(() => {
|
||||
this.obs.delete(msg.id);
|
||||
this.port.postMessage({
|
||||
type: 'unsubscribe',
|
||||
id: msg.id,
|
||||
} satisfies UnsubscribeMessage);
|
||||
return () => {
|
||||
ob.complete();
|
||||
this.obs.delete(msg.id);
|
||||
this.port.postMessage({
|
||||
type: 'unsubscribe',
|
||||
id: msg.id,
|
||||
} satisfies UnsubscribeMessage);
|
||||
};
|
||||
});
|
||||
|
||||
const transferables = fetchTransferables(payload);
|
||||
this.port.postMessage(msg, { transfer: transferables });
|
||||
|
||||
return () => {
|
||||
sub.unsubscribe();
|
||||
};
|
||||
return sub$;
|
||||
}
|
||||
|
||||
destroy() {
|
||||
|
||||
@@ -1,14 +1,5 @@
|
||||
import EventEmitter2 from 'eventemitter2';
|
||||
import {
|
||||
defer,
|
||||
from,
|
||||
fromEvent,
|
||||
Observable,
|
||||
of,
|
||||
share,
|
||||
take,
|
||||
takeUntil,
|
||||
} from 'rxjs';
|
||||
import { defer, from, fromEvent, Observable, of, take, takeUntil } from 'rxjs';
|
||||
|
||||
import {
|
||||
AutoMessageHandler,
|
||||
@@ -172,7 +163,7 @@ export class OpConsumer<Ops extends OpSchema> extends AutoMessageHandler {
|
||||
ob$ = of(ret$);
|
||||
}
|
||||
|
||||
return ob$.pipe(share(), takeUntil(fromEvent(signal, 'abort')));
|
||||
return ob$.pipe(takeUntil(fromEvent(signal, 'abort')));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -95,6 +95,7 @@ export type MessageCommunicapable = Pick<
|
||||
> & {
|
||||
start?(): void;
|
||||
close?(): void;
|
||||
terminate?(): void; // For Worker
|
||||
};
|
||||
|
||||
export function ignoreUnknownEvent(handler: (data: Messages) => void) {
|
||||
@@ -130,6 +131,7 @@ export function fetchTransferables(data: any): Transferable[] | undefined {
|
||||
}
|
||||
|
||||
export abstract class AutoMessageHandler {
|
||||
private listening = false;
|
||||
protected abstract handlers: Partial<MessageHandlers>;
|
||||
|
||||
constructor(protected readonly port: MessageCommunicapable) {}
|
||||
@@ -144,12 +146,21 @@ export abstract class AutoMessageHandler {
|
||||
});
|
||||
|
||||
listen() {
|
||||
if (this.listening) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.port.addEventListener('message', this.handleMessage);
|
||||
this.port.addEventListener('messageerror', console.error);
|
||||
this.port.start?.();
|
||||
this.listening = true;
|
||||
}
|
||||
|
||||
close() {
|
||||
this.port.close?.();
|
||||
this.port.terminate?.(); // For Worker
|
||||
this.port.removeEventListener('message', this.handleMessage);
|
||||
this.port.removeEventListener('messageerror', console.error);
|
||||
this.listening = false;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user