feat(editor): replace slot with rxjs subject (#10768)

This commit is contained in:
Mirone
2025-03-12 11:29:24 +09:00
committed by GitHub
parent 19f978d9aa
commit cd63e0ed8b
302 changed files with 1405 additions and 1251 deletions

View File

@@ -1,5 +1,5 @@
import { Slot } from '@blocksuite/global/slot';
import type { Logger } from '@blocksuite/global/utils';
import { Subject } from 'rxjs';
import type { Doc } from 'yjs';
import { SharedPriorityTarget } from '../utils/async-queue.js';
@@ -49,7 +49,7 @@ export class DocEngine {
private _status: DocEngineStatus;
readonly onStatusChange = new Slot<DocEngineStatus>();
readonly onStatusChange = new Subject<DocEngineStatus>();
readonly priorityTarget = new SharedPriorityTarget();
@@ -79,7 +79,7 @@ export class DocEngine {
private setStatus(s: DocEngineStatus) {
this.logger.debug(`syne-engine:${this.rootDocId} status change`, s);
this._status = s;
this.onStatusChange.emit(s);
this.onStatusChange.next(s);
}
canGracefulStop() {
@@ -133,10 +133,10 @@ export class DocEngine {
);
cleanUp.push(
state.mainPeer.onStatusChange.on(() => {
state.mainPeer.onStatusChange.subscribe(() => {
if (!signal.aborted)
this.updateSyncingState(state.mainPeer, state.shadowPeers);
}).dispose
}).unsubscribe
);
this.updateSyncingState(state.mainPeer, state.shadowPeers);
@@ -153,10 +153,10 @@ export class DocEngine {
this.logger
);
cleanUp.push(
peer.onStatusChange.on(() => {
peer.onStatusChange.subscribe(() => {
if (!signal.aborted)
this.updateSyncingState(state.mainPeer, state.shadowPeers);
}).dispose
}).unsubscribe
);
return peer;
});
@@ -221,7 +221,7 @@ export class DocEngine {
});
}),
new Promise<void>(resolve => {
this.onStatusChange.on(() => {
this.onStatusChange.subscribe(() => {
if (this.canGracefulStop()) {
resolve();
}
@@ -243,7 +243,7 @@ export class DocEngine {
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
this.onStatusChange.subscribe(status => {
if (isLoadedRootDoc(status)) {
resolve();
}
@@ -267,7 +267,7 @@ export class DocEngine {
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
this.onStatusChange.subscribe(status => {
if (status.step === DocEngineStep.Synced) {
resolve();
}

View File

@@ -1,6 +1,6 @@
import { Slot } from '@blocksuite/global/slot';
import type { Logger } from '@blocksuite/global/utils';
import isEqual from 'lodash-es/isEqual';
import { Subject } from 'rxjs';
import type { Doc } from 'yjs';
import {
applyUpdate,
@@ -110,7 +110,7 @@ export class SyncPeer {
this.updateSyncStatus();
};
readonly onStatusChange = new Slot<DocPeerStatus>();
readonly onStatusChange = new Subject<DocPeerStatus>();
readonly state: {
connectedDocs: Map<string, Doc>;
@@ -142,7 +142,7 @@ export class SyncPeer {
if (!isEqual(s, this._status)) {
this.logger.debug(`doc-peer:${this.name} status change`, s);
this._status = s;
this.onStatusChange.emit(s);
this.onStatusChange.next(s);
}
}
@@ -402,50 +402,50 @@ export class SyncPeer {
}
async waitForLoaded(abort?: AbortSignal) {
if (this.status.step > DocPeerStep.Loaded) {
if (this.status.step >= DocPeerStep.LoadingSubDoc) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step > DocPeerStep.Loaded) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
await Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.subscribe(status => {
if (status.step >= DocPeerStep.LoadingSubDoc) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
throwIfAborted(abort);
}
async waitForSynced(abort?: AbortSignal) {
if (this.status.step >= DocPeerStep.Synced) {
if (this.status.step === DocPeerStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step >= DocPeerStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
await Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.subscribe(status => {
if (status.step === DocPeerStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
throwIfAborted(abort);
}
}