feat(nbstore): improve nbstore (#9512)

This commit is contained in:
EYHN
2025-01-06 09:38:03 +00:00
parent a2563d2180
commit 46c8c4a408
103 changed files with 3337 additions and 3423 deletions

View File

@@ -2,6 +2,7 @@ import type {
AwarenessRecord,
AwarenessStorage,
} from '../../storage/awareness';
import type { PeerStorageOptions } from '../types';
export interface AwarenessSync {
update(record: AwarenessRecord, origin?: string): Promise<void>;
@@ -13,14 +14,13 @@ export interface AwarenessSync {
}
export class AwarenessSyncImpl implements AwarenessSync {
constructor(
readonly local: AwarenessStorage,
readonly remotes: AwarenessStorage[]
) {}
constructor(readonly storages: PeerStorageOptions<AwarenessStorage>) {}
async update(record: AwarenessRecord, origin?: string) {
await Promise.all(
[this.local, ...this.remotes].map(peer => peer.update(record, origin))
[this.storages.local, ...Object.values(this.storages.remotes)].map(peer =>
peer.update(record, origin)
)
);
}
@@ -29,9 +29,10 @@ export class AwarenessSyncImpl implements AwarenessSync {
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => Promise<AwarenessRecord | null>
): () => void {
const unsubscribes = [this.local, ...this.remotes].map(peer =>
peer.subscribeUpdate(id, onUpdate, onCollect)
);
const unsubscribes = [
this.storages.local,
...Object.values(this.storages.remotes),
].map(peer => peer.subscribeUpdate(id, onUpdate, onCollect));
return () => {
unsubscribes.forEach(unsubscribe => unsubscribe());
};

View File

@@ -1,34 +1,48 @@
import EventEmitter2 from 'eventemitter2';
import { difference } from 'lodash-es';
import { BehaviorSubject, type Observable } from 'rxjs';
import type { BlobRecord, BlobStorage } from '../../storage';
import { OverCapacityError } from '../../storage';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
import type { PeerStorageOptions } from '../types';
export interface BlobSyncState {
isStorageOverCapacity: boolean;
}
export interface BlobSync {
readonly state$: Observable<BlobSyncState>;
downloadBlob(
blobId: string,
signal?: AbortSignal
): Promise<BlobRecord | null>;
uploadBlob(blob: BlobRecord, signal?: AbortSignal): Promise<void>;
fullSync(signal?: AbortSignal): Promise<void>;
setMaxBlobSize(size: number): void;
onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void;
}
export class BlobSyncImpl implements BlobSync {
readonly state$ = new BehaviorSubject<BlobSyncState>({
isStorageOverCapacity: false,
});
private abort: AbortController | null = null;
private maxBlobSize: number = 1024 * 1024 * 100; // 100MB
readonly event = new EventEmitter2();
constructor(
readonly local: BlobStorage,
readonly remotes: BlobStorage[]
) {}
constructor(readonly storages: PeerStorageOptions<BlobStorage>) {}
async downloadBlob(blobId: string, signal?: AbortSignal) {
const localBlob = await this.local.get(blobId, signal);
const localBlob = await this.storages.local.get(blobId, signal);
if (localBlob) {
return localBlob;
}
for (const storage of this.remotes) {
for (const storage of Object.values(this.storages.remotes)) {
const data = await storage.get(blobId, signal);
if (data) {
await this.local.set(data, signal);
await this.storages.local.set(data, signal);
return data;
}
}
@@ -36,21 +50,35 @@ export class BlobSyncImpl implements BlobSync {
}
async uploadBlob(blob: BlobRecord, signal?: AbortSignal) {
await this.local.set(blob);
if (blob.data.length > this.maxBlobSize) {
this.event.emit('abort-large-blob', blob.data.length);
console.error('blob over limit, abort set');
}
await this.storages.local.set(blob);
await Promise.allSettled(
this.remotes.map(remote => remote.set(blob, signal))
Object.values(this.storages.remotes).map(async remote => {
try {
return await remote.set(blob, signal);
} catch (err) {
if (err instanceof OverCapacityError) {
this.state$.next({ isStorageOverCapacity: true });
}
throw err;
}
})
);
}
private async sync(signal?: AbortSignal) {
async fullSync(signal?: AbortSignal) {
throwIfAborted(signal);
for (const remote of this.remotes) {
for (const [remotePeer, remote] of Object.entries(this.storages.remotes)) {
let localList: string[] = [];
let remoteList: string[] = [];
try {
localList = (await this.local.list(signal)).map(b => b.key);
localList = (await this.storages.local.list(signal)).map(b => b.key);
throwIfAborted(signal);
remoteList = (await remote.list(signal)).map(b => b.key);
throwIfAborted(signal);
@@ -65,7 +93,7 @@ export class BlobSyncImpl implements BlobSync {
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.local.get(key, signal);
const data = await this.storages.local.get(key, signal);
throwIfAborted(signal);
if (data) {
await remote.set(data, signal);
@@ -76,7 +104,7 @@ export class BlobSyncImpl implements BlobSync {
throw err;
}
console.error(
`error when sync ${key} from [${this.local.peer}] to [${remote.peer}]`,
`error when sync ${key} from [local] to [${remotePeer}]`,
err
);
}
@@ -89,7 +117,7 @@ export class BlobSyncImpl implements BlobSync {
const data = await remote.get(key, signal);
throwIfAborted(signal);
if (data) {
await this.local.set(data, signal);
await this.storages.local.set(data, signal);
throwIfAborted(signal);
}
} catch (err) {
@@ -97,7 +125,7 @@ export class BlobSyncImpl implements BlobSync {
throw err;
}
console.error(
`error when sync ${key} from [${remote.peer}] to [${this.local.peer}]`,
`error when sync ${key} from [${remotePeer}] to [local]`,
err
);
}
@@ -107,13 +135,13 @@ export class BlobSyncImpl implements BlobSync {
start() {
if (this.abort) {
this.abort.abort();
this.abort.abort(MANUALLY_STOP);
}
const abort = new AbortController();
this.abort = abort;
this.sync(abort.signal).catch(error => {
this.fullSync(abort.signal).catch(error => {
if (error === MANUALLY_STOP) {
return;
}
@@ -130,4 +158,15 @@ export class BlobSyncImpl implements BlobSync {
// TODO: implement
return () => {};
}
setMaxBlobSize(size: number): void {
this.maxBlobSize = size;
}
onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void {
this.event.on('abort-large-blob', cb);
return () => {
this.event.off('abort-large-blob', cb);
};
}
}

View File

@@ -1,17 +1,23 @@
import type { Observable } from 'rxjs';
import { combineLatest, map } from 'rxjs';
import { combineLatest, map, of } from 'rxjs';
import type { DocStorage, SyncStorage } from '../../storage';
import { DummyDocStorage } from '../../storage/dummy/doc';
import { DummySyncStorage } from '../../storage/dummy/sync';
import { MANUALLY_STOP } from '../../utils/throw-if-aborted';
import type { PeerStorageOptions } from '../types';
import { DocSyncPeer } from './peer';
export interface DocSyncState {
total: number;
syncing: number;
synced: boolean;
retrying: boolean;
errorMessage: string | null;
}
export interface DocSyncDocState {
synced: boolean;
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
@@ -24,43 +30,70 @@ export interface DocSync {
}
export class DocSyncImpl implements DocSync {
private readonly peers: DocSyncPeer[] = this.remotes.map(
remote => new DocSyncPeer(this.local, this.sync, remote)
private readonly peers: DocSyncPeer[] = Object.entries(
this.storages.remotes
).map(
([peerId, remote]) =>
new DocSyncPeer(peerId, this.storages.local, this.sync, remote)
);
private abort: AbortController | null = null;
readonly state$: Observable<DocSyncState> = combineLatest(
this.peers.map(peer => peer.peerState$)
).pipe(
map(allPeers => ({
total: allPeers.reduce((acc, peer) => acc + peer.total, 0),
syncing: allPeers.reduce((acc, peer) => acc + peer.syncing, 0),
retrying: allPeers.some(peer => peer.retrying),
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
}))
);
constructor(
readonly local: DocStorage,
readonly sync: SyncStorage,
readonly remotes: DocStorage[]
) {}
docState$(docId: string): Observable<DocSyncDocState> {
return combineLatest(this.peers.map(peer => peer.docState$(docId))).pipe(
get state$() {
return combineLatest(this.peers.map(peer => peer.peerState$)).pipe(
map(allPeers => ({
total: allPeers.reduce((acc, peer) => Math.max(acc, peer.total), 0),
syncing: allPeers.reduce((acc, peer) => Math.max(acc, peer.syncing), 0),
synced: allPeers.every(peer => peer.synced),
retrying: allPeers.some(peer => peer.retrying),
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
retrying: allPeers.some(peer => peer.retrying),
syncing: allPeers.some(peer => peer.syncing),
}))
) as Observable<DocSyncState>;
}
constructor(
readonly storages: PeerStorageOptions<DocStorage>,
readonly sync: SyncStorage
) {}
/**
* for testing
*/
static get dummy() {
return new DocSyncImpl(
{
local: new DummyDocStorage(),
remotes: {},
},
new DummySyncStorage()
);
}
docState$(docId: string): Observable<DocSyncDocState> {
if (this.peers.length === 0) {
return of({
errorMessage: null,
retrying: false,
syncing: false,
synced: true,
});
}
return combineLatest(this.peers.map(peer => peer.docState$(docId))).pipe(
map(allPeers => {
return {
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
retrying: allPeers.some(peer => peer.retrying),
syncing: allPeers.some(peer => peer.syncing),
synced: allPeers.every(peer => peer.synced),
};
})
);
}
start() {
if (this.abort) {
this.abort.abort();
this.abort.abort(MANUALLY_STOP);
}
const abort = new AbortController();
this.abort = abort;

View File

@@ -43,6 +43,7 @@ interface Status {
remoteClocks: ClockMap;
syncing: boolean;
retrying: boolean;
skipped: boolean;
errorMessage: string | null;
}
@@ -50,11 +51,13 @@ interface PeerState {
total: number;
syncing: number;
retrying: boolean;
synced: boolean;
errorMessage: string | null;
}
interface PeerDocState {
syncing: boolean;
synced: boolean;
retrying: boolean;
errorMessage: string | null;
}
@@ -92,10 +95,11 @@ export class DocSyncPeer {
/**
* random unique id for recognize self in "update" event
*/
private readonly uniqueId = `sync:${this.local.universalId}:${this.remote.universalId}:${nanoid()}`;
private readonly uniqueId = `sync:${this.peerId}:${nanoid()}`;
private readonly prioritySettings = new Map<string, number>();
constructor(
readonly peerId: string,
readonly local: DocStorage,
readonly syncMetadata: SyncStorage,
readonly remote: DocStorage,
@@ -110,43 +114,59 @@ export class DocSyncPeer {
remoteClocks: new ClockMap(new Map()),
syncing: false,
retrying: false,
skipped: false,
errorMessage: null,
};
private readonly statusUpdatedSubject$ = new Subject<string | true>();
peerState$ = new Observable<PeerState>(subscribe => {
const next = () => {
if (!this.status.syncing) {
// if syncing = false, jobMap is empty
subscribe.next({
total: this.status.docs.size,
syncing: this.status.docs.size,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
} else {
const syncing = this.status.jobMap.size;
subscribe.next({
total: this.status.docs.size,
syncing: syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
}
};
next();
return this.statusUpdatedSubject$.subscribe(() => {
get peerState$() {
return new Observable<PeerState>(subscribe => {
const next = () => {
if (this.status.skipped) {
subscribe.next({
total: 0,
syncing: 0,
synced: true,
retrying: false,
errorMessage: null,
});
} else if (!this.status.syncing) {
// if syncing = false, jobMap is empty
subscribe.next({
total: this.status.docs.size,
syncing: this.status.docs.size,
synced: false,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
} else {
const syncing = this.status.jobMap.size;
subscribe.next({
total: this.status.docs.size,
syncing: syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
synced: syncing === 0,
});
}
};
next();
return this.statusUpdatedSubject$.subscribe(() => {
next();
});
});
});
}
docState$(docId: string) {
return new Observable<PeerDocState>(subscribe => {
const next = () => {
const syncing =
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId);
subscribe.next({
syncing:
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId),
syncing: syncing,
synced: !syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
@@ -161,22 +181,21 @@ export class DocSyncPeer {
private readonly jobs = createJobErrorCatcher({
connect: async (docId: string, signal?: AbortSignal) => {
const pushedClock =
(await this.syncMetadata.getPeerPushedClock(this.remote.peer, docId))
(await this.syncMetadata.getPeerPushedClock(this.peerId, docId))
?.timestamp ?? null;
const clock = await this.local.getDocTimestamp(docId);
throwIfAborted(signal);
if (pushedClock === null || pushedClock !== clock?.timestamp) {
if (
!this.remote.isReadonly &&
(pushedClock === null || pushedClock !== clock?.timestamp)
) {
await this.jobs.pullAndPush(docId, signal);
} else {
// no need to push
const pulled =
(
await this.syncMetadata.getPeerPulledRemoteClock(
this.remote.peer,
docId
)
)?.timestamp ?? null;
(await this.syncMetadata.getPeerPulledRemoteClock(this.peerId, docId))
?.timestamp ?? null;
if (pulled === null || pulled !== this.status.remoteClocks.get(docId)) {
await this.jobs.pull(docId, signal);
}
@@ -214,7 +233,7 @@ export class DocSyncPeer {
});
}
throwIfAborted(signal);
await this.syncMetadata.setPeerPushedClock(this.remote.peer, {
await this.syncMetadata.setPeerPushedClock(this.peerId, {
docId,
timestamp: maxClock,
});
@@ -249,7 +268,7 @@ export class DocSyncPeer {
this.uniqueId
);
throwIfAborted(signal);
await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, {
await this.syncMetadata.setPeerPulledRemoteClock(this.peerId, {
docId,
timestamp: remoteClock,
});
@@ -273,7 +292,7 @@ export class DocSyncPeer {
});
}
throwIfAborted(signal);
await this.syncMetadata.setPeerPushedClock(this.remote.peer, {
await this.syncMetadata.setPeerPushedClock(this.peerId, {
docId,
timestamp: localClock,
});
@@ -294,7 +313,7 @@ export class DocSyncPeer {
remoteClock,
});
}
await this.syncMetadata.setPeerPushedClock(this.remote.peer, {
await this.syncMetadata.setPeerPushedClock(this.peerId, {
docId,
timestamp: localDocRecord.timestamp,
});
@@ -322,7 +341,7 @@ export class DocSyncPeer {
this.uniqueId
);
throwIfAborted(signal);
await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, {
await this.syncMetadata.setPeerPulledRemoteClock(this.peerId, {
docId,
timestamp: remoteClock,
});
@@ -360,7 +379,7 @@ export class DocSyncPeer {
);
throwIfAborted(signal);
await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, {
await this.syncMetadata.setPeerPulledRemoteClock(this.peerId, {
docId,
timestamp: remoteClock,
});
@@ -372,7 +391,7 @@ export class DocSyncPeer {
updateRemoteClock: async (docId: string, remoteClock: Date) => {
const updated = this.status.remoteClocks.setIfBigger(docId, remoteClock);
if (updated) {
await this.syncMetadata.setPeerRemoteClock(this.remote.peer, {
await this.syncMetadata.setPeerRemoteClock(this.peerId, {
docId,
timestamp: remoteClock,
});
@@ -455,6 +474,7 @@ export class DocSyncPeer {
jobMap: new Map(),
remoteClocks: new ClockMap(new Map()),
syncing: false,
skipped: false,
// tell ui to show retrying status
retrying: true,
// error message from last retry
@@ -482,6 +502,17 @@ export class DocSyncPeer {
private async retryLoop(signal?: AbortSignal) {
throwIfAborted(signal);
if (this.local.isReadonly) {
// Local is readonly, skip sync
this.status.skipped = true;
this.statusUpdatedSubject$.next(true);
await new Promise((_, reject) => {
signal?.addEventListener('abort', reason => {
reject(reason);
});
});
return;
}
const abort = new AbortController();
signal?.addEventListener('abort', reason => {
@@ -536,8 +567,8 @@ export class DocSyncPeer {
if (
origin === this.uniqueId ||
origin?.startsWith(
`sync:${this.local.peer}:${this.remote.peer}:`
// skip if local and remote is same
`sync:${this.peerId}:`
// skip if peerId is same
)
) {
return;
@@ -572,7 +603,7 @@ export class DocSyncPeer {
// get cached clocks from metadata
const cachedClocks = await this.syncMetadata.getPeerRemoteClocks(
this.remote.peer
this.peerId
);
throwIfAborted(signal);
for (const [id, v] of Object.entries(cachedClocks)) {

View File

@@ -1,65 +1,63 @@
import { combineLatest, map, type Observable, of } from 'rxjs';
import { map, type Observable } from 'rxjs';
import type {
AwarenessStorage,
BlobStorage,
DocStorage,
SpaceStorage,
} from '../storage';
import type { SpaceStorage } from '../storage';
import { AwarenessSyncImpl } from './awareness';
import { BlobSyncImpl } from './blob';
import { DocSyncImpl, type DocSyncState } from './doc';
import type { PeerStorageOptions } from './types';
export type { BlobSyncState } from './blob';
export type { DocSyncDocState, DocSyncState } from './doc';
export interface SyncState {
doc?: DocSyncState;
}
export class Sync {
readonly doc: DocSyncImpl | null;
readonly blob: BlobSyncImpl | null;
readonly awareness: AwarenessSyncImpl | null;
readonly doc: DocSyncImpl;
readonly blob: BlobSyncImpl;
readonly awareness: AwarenessSyncImpl;
readonly state$: Observable<SyncState>;
constructor(
readonly local: SpaceStorage,
readonly peers: SpaceStorage[]
) {
const doc = local.tryGet('doc');
const blob = local.tryGet('blob');
const sync = local.tryGet('sync');
const awareness = local.tryGet('awareness');
constructor(readonly storages: PeerStorageOptions<SpaceStorage>) {
const doc = storages.local.get('doc');
const blob = storages.local.get('blob');
const sync = storages.local.get('sync');
const awareness = storages.local.get('awareness');
this.doc =
doc && sync
? new DocSyncImpl(
doc,
sync,
peers
.map(peer => peer.tryGet('doc'))
.filter((v): v is DocStorage => !!v)
)
: null;
this.blob = blob
? new BlobSyncImpl(
blob,
peers
.map(peer => peer.tryGet('blob'))
.filter((v): v is BlobStorage => !!v)
)
: null;
this.awareness = awareness
? new AwarenessSyncImpl(
awareness,
peers
.map(peer => peer.tryGet('awareness'))
.filter((v): v is AwarenessStorage => !!v)
)
: null;
this.state$ = combineLatest([this.doc?.state$ ?? of(undefined)]).pipe(
map(([doc]) => ({ doc }))
this.doc = new DocSyncImpl(
{
local: doc,
remotes: Object.fromEntries(
Object.entries(storages.remotes).map(([peerId, remote]) => [
peerId,
remote.get('doc'),
])
),
},
sync
);
this.blob = new BlobSyncImpl({
local: blob,
remotes: Object.fromEntries(
Object.entries(storages.remotes).map(([peerId, remote]) => [
peerId,
remote.get('blob'),
])
),
});
this.awareness = new AwarenessSyncImpl({
local: awareness,
remotes: Object.fromEntries(
Object.entries(storages.remotes).map(([peerId, remote]) => [
peerId,
remote.get('awareness'),
])
),
});
this.state$ = this.doc.state$.pipe(map(doc => ({ doc })));
}
start() {

View File

@@ -0,0 +1,4 @@
export interface PeerStorageOptions<S> {
local: S;
remotes: Record<string, S>;
}