mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-12 12:28:42 +00:00
feat(nbstore): add doc sync state (#9131)
This commit is contained in:
@@ -1,12 +1,13 @@
|
||||
import EventEmitter2 from 'eventemitter2';
|
||||
|
||||
import type { ConnectionStatus } from '../connection';
|
||||
import type { AwarenessStorage } from './awareness';
|
||||
import type { BlobStorage } from './blob';
|
||||
import type { DocStorage } from './doc';
|
||||
import type { Storage, StorageType } from './storage';
|
||||
import type { SyncStorage } from './sync';
|
||||
|
||||
type Storages = DocStorage | BlobStorage | SyncStorage;
|
||||
type Storages = DocStorage | BlobStorage | SyncStorage | AwarenessStorage;
|
||||
|
||||
export class SpaceStorage {
|
||||
protected readonly storages: Map<StorageType, Storage> = new Map();
|
||||
|
||||
@@ -1,16 +1,55 @@
|
||||
import type { Observable } from 'rxjs';
|
||||
import { combineLatest, map } from 'rxjs';
|
||||
|
||||
import type { DocStorage, SyncStorage } from '../../storage';
|
||||
import { DocSyncPeer } from './peer';
|
||||
|
||||
export interface DocSyncState {
|
||||
total: number;
|
||||
syncing: number;
|
||||
retrying: boolean;
|
||||
errorMessage: string | null;
|
||||
}
|
||||
|
||||
export interface DocSyncDocState {
|
||||
syncing: boolean;
|
||||
retrying: boolean;
|
||||
errorMessage: string | null;
|
||||
}
|
||||
|
||||
export class DocSync {
|
||||
private readonly peers: DocSyncPeer[];
|
||||
private readonly peers: DocSyncPeer[] = this.remotes.map(
|
||||
remote => new DocSyncPeer(this.local, this.sync, remote)
|
||||
);
|
||||
private abort: AbortController | null = null;
|
||||
|
||||
readonly state$: Observable<DocSyncState> = combineLatest(
|
||||
this.peers.map(peer => peer.peerState$)
|
||||
).pipe(
|
||||
map(allPeers => ({
|
||||
total: allPeers.reduce((acc, peer) => acc + peer.total, 0),
|
||||
syncing: allPeers.reduce((acc, peer) => acc + peer.syncing, 0),
|
||||
retrying: allPeers.some(peer => peer.retrying),
|
||||
errorMessage:
|
||||
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
|
||||
}))
|
||||
);
|
||||
|
||||
constructor(
|
||||
readonly local: DocStorage,
|
||||
readonly sync: SyncStorage,
|
||||
readonly remotes: DocStorage[]
|
||||
) {
|
||||
this.peers = remotes.map(remote => new DocSyncPeer(local, sync, remote));
|
||||
) {}
|
||||
|
||||
docState$(docId: string): Observable<DocSyncDocState> {
|
||||
return combineLatest(this.peers.map(peer => peer.docState$(docId))).pipe(
|
||||
map(allPeers => ({
|
||||
errorMessage:
|
||||
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
|
||||
retrying: allPeers.some(peer => peer.retrying),
|
||||
syncing: allPeers.some(peer => peer.syncing),
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
start() {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { remove } from 'lodash-es';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { Subject } from 'rxjs';
|
||||
import { Observable, Subject } from 'rxjs';
|
||||
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';
|
||||
|
||||
import type { DocStorage, SyncStorage } from '../../storage';
|
||||
@@ -46,6 +46,19 @@ interface Status {
|
||||
errorMessage: string | null;
|
||||
}
|
||||
|
||||
interface PeerState {
|
||||
total: number;
|
||||
syncing: number;
|
||||
retrying: boolean;
|
||||
errorMessage: string | null;
|
||||
}
|
||||
|
||||
interface PeerDocState {
|
||||
syncing: boolean;
|
||||
retrying: boolean;
|
||||
errorMessage: string | null;
|
||||
}
|
||||
|
||||
interface DocSyncPeerOptions {
|
||||
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
|
||||
}
|
||||
@@ -101,6 +114,50 @@ export class DocSyncPeer {
|
||||
};
|
||||
private readonly statusUpdatedSubject$ = new Subject<string | true>();
|
||||
|
||||
peerState$ = new Observable<PeerState>(subscribe => {
|
||||
const next = () => {
|
||||
if (!this.status.syncing) {
|
||||
// if syncing = false, jobMap is empty
|
||||
subscribe.next({
|
||||
total: this.status.docs.size,
|
||||
syncing: this.status.docs.size,
|
||||
retrying: this.status.retrying,
|
||||
errorMessage: this.status.errorMessage,
|
||||
});
|
||||
} else {
|
||||
const syncing = this.status.jobMap.size;
|
||||
subscribe.next({
|
||||
total: this.status.docs.size,
|
||||
syncing: syncing,
|
||||
retrying: this.status.retrying,
|
||||
errorMessage: this.status.errorMessage,
|
||||
});
|
||||
}
|
||||
};
|
||||
next();
|
||||
return this.statusUpdatedSubject$.subscribe(() => {
|
||||
next();
|
||||
});
|
||||
});
|
||||
|
||||
docState$(docId: string) {
|
||||
return new Observable<PeerDocState>(subscribe => {
|
||||
const next = () => {
|
||||
subscribe.next({
|
||||
syncing:
|
||||
!this.status.connectedDocs.has(docId) ||
|
||||
this.status.jobMap.has(docId),
|
||||
retrying: this.status.retrying,
|
||||
errorMessage: this.status.errorMessage,
|
||||
});
|
||||
};
|
||||
next();
|
||||
return this.statusUpdatedSubject$.subscribe(updatedId => {
|
||||
if (updatedId === true || updatedId === docId) next();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private readonly jobs = createJobErrorCatcher({
|
||||
connect: async (docId: string, signal?: AbortSignal) => {
|
||||
const pushedClock =
|
||||
|
||||
@@ -1,10 +1,21 @@
|
||||
import { combineLatest, map, type Observable, of } from 'rxjs';
|
||||
|
||||
import type { BlobStorage, DocStorage, SpaceStorage } from '../storage';
|
||||
import type { AwarenessStorage } from '../storage/awareness';
|
||||
import { AwarenessSync } from './awareness';
|
||||
import { BlobSync } from './blob';
|
||||
import { DocSync } from './doc';
|
||||
import { DocSync, type DocSyncState } from './doc';
|
||||
|
||||
export interface SyncState {
|
||||
doc?: DocSyncState;
|
||||
}
|
||||
|
||||
export class Sync {
|
||||
private readonly doc: DocSync | null;
|
||||
private readonly blob: BlobSync | null;
|
||||
readonly doc: DocSync | null;
|
||||
readonly blob: BlobSync | null;
|
||||
readonly awareness: AwarenessSync | null;
|
||||
|
||||
readonly state$: Observable<SyncState>;
|
||||
|
||||
constructor(
|
||||
readonly local: SpaceStorage,
|
||||
@@ -13,6 +24,7 @@ export class Sync {
|
||||
const doc = local.tryGet('doc');
|
||||
const blob = local.tryGet('blob');
|
||||
const sync = local.tryGet('sync');
|
||||
const awareness = local.tryGet('awareness');
|
||||
|
||||
this.doc =
|
||||
doc && sync
|
||||
@@ -32,6 +44,18 @@ export class Sync {
|
||||
.filter((v): v is BlobStorage => !!v)
|
||||
)
|
||||
: null;
|
||||
this.awareness = awareness
|
||||
? new AwarenessSync(
|
||||
awareness,
|
||||
peers
|
||||
.map(peer => peer.tryGet('awareness'))
|
||||
.filter((v): v is AwarenessStorage => !!v)
|
||||
)
|
||||
: null;
|
||||
|
||||
this.state$ = combineLatest([this.doc?.state$ ?? of(undefined)]).pipe(
|
||||
map(([doc]) => ({ doc }))
|
||||
);
|
||||
}
|
||||
|
||||
start() {
|
||||
|
||||
Reference in New Issue
Block a user