chore: merge blocksuite source code (#9213)

This commit is contained in:
Mirone
2024-12-20 15:38:06 +08:00
committed by GitHub
parent 2c9ef916f4
commit 30200ff86d
2031 changed files with 238888 additions and 229 deletions

View File

@@ -0,0 +1,7 @@
# `@blocksuite/sync`
BlockSuite data synchronization engine.
## Documentation
Checkout [blocksuite.io](https://blocksuite.io/) for comprehensive documentation.

View File

@@ -0,0 +1,32 @@
{
"name": "@blocksuite/sync",
"description": "BlockSuite data synchronization engine abstraction and implementation.",
"type": "module",
"scripts": {
"build": "tsc",
"test:unit": "nx vite:test --run",
"test": "yarn test:unit"
},
"sideEffects": false,
"keywords": [],
"author": "toeverything",
"license": "MIT",
"dependencies": {
"@blocksuite/global": "workspace:*",
"idb": "^8.0.0",
"idb-keyval": "^6.2.1",
"y-protocols": "^1.0.6"
},
"peerDependencies": {
"yjs": "*"
},
"exports": {
".": "./src/index.ts"
},
"files": [
"src",
"dist",
"!src/__tests__",
"!dist/__tests__"
]
}

View File

@@ -0,0 +1,51 @@
import { NoopLogger } from '@blocksuite/global/utils';
import { beforeEach, describe, expect, it } from 'vitest';
import { BlobEngine } from '../blob/engine.js';
import { MemoryBlobSource } from '../blob/impl/index.js';
describe('BlobEngine with MemoryBlobSource', () => {
let mainSource: MemoryBlobSource;
let shadowSource: MemoryBlobSource;
let engine: BlobEngine;
beforeEach(() => {
mainSource = new MemoryBlobSource();
shadowSource = new MemoryBlobSource();
engine = new BlobEngine(mainSource, [shadowSource], new NoopLogger());
});
it('should set and get blobs', async () => {
const blob = new Blob(['test'], { type: 'text/plain' });
const key = await engine.set(blob);
const retrievedBlob = await engine.get(key);
expect(retrievedBlob).not.toBeNull();
expect(await retrievedBlob?.text()).toBe('test');
});
it('should sync blobs between main and shadow sources', async () => {
const blob = new Blob(['test'], { type: 'text/plain' });
const key = await engine.set(blob);
await engine.sync();
const retrievedBlob = await shadowSource.get(key);
expect(retrievedBlob).not.toBeNull();
expect(await retrievedBlob?.text()).toBe('test');
});
it('should list all blobs', async () => {
const blob1 = new Blob(['test1'], { type: 'text/plain' });
const blob2 = new Blob(['test2'], { type: 'text/plain' });
await engine.set(blob1);
await engine.set(blob2);
const blobList = await engine.list();
expect(blobList.length).toBe(2);
});
it('should not delete blobs (unsupported feature)', async () => {
const blob = new Blob(['test'], { type: 'text/plain' });
const key = await engine.set(blob);
await engine.delete(key);
const retrievedBlob = await engine.get(key);
expect(retrievedBlob).not.toBeNull();
});
});

View File

@@ -0,0 +1,18 @@
import type { Awareness } from 'y-protocols/awareness';
import type { AwarenessSource } from './source.js';
export class AwarenessEngine {
constructor(
readonly awareness: Awareness,
readonly sources: AwarenessSource[]
) {}
connect() {
this.sources.forEach(source => source.connect(this.awareness));
}
disconnect() {
this.sources.forEach(source => source.disconnect());
}
}

View File

@@ -0,0 +1,73 @@
import type { Awareness } from 'y-protocols/awareness';
import {
applyAwarenessUpdate,
encodeAwarenessUpdate,
} from 'y-protocols/awareness';
import type { AwarenessSource } from '../source.js';
type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>;
type ChannelMessage =
| { type: 'connect' }
| { type: 'update'; update: Uint8Array };
export class BroadcastChannelAwarenessSource implements AwarenessSource {
awareness: Awareness | null = null;
channel: BroadcastChannel | null = null;
handleAwarenessUpdate = (changes: AwarenessChanges, origin: unknown) => {
if (origin === 'remote') {
return;
}
const changedClients = Object.values(changes).reduce((res, cur) =>
res.concat(cur)
);
const update = encodeAwarenessUpdate(this.awareness!, changedClients);
this.channel?.postMessage({
type: 'update',
update: update,
} satisfies ChannelMessage);
};
constructor(readonly channelName: string) {}
connect(awareness: Awareness): void {
this.channel = new BroadcastChannel(this.channelName);
this.channel.postMessage({
type: 'connect',
} satisfies ChannelMessage);
this.awareness = awareness;
awareness.on('update', this.handleAwarenessUpdate);
this.channel.addEventListener(
'message',
(event: MessageEvent<ChannelMessage>) => {
this.handleChannelMessage(event);
}
);
}
disconnect(): void {
this.awareness?.off('update', this.handleAwarenessUpdate);
this.channel?.close();
this.channel = null;
}
handleChannelMessage(event: MessageEvent<ChannelMessage>) {
if (event.data.type === 'update') {
const update = event.data.update;
applyAwarenessUpdate(this.awareness!, update, 'remote');
}
if (event.data.type === 'connect') {
this.channel?.postMessage({
type: 'update',
update: encodeAwarenessUpdate(this.awareness!, [
this.awareness!.clientID,
]),
} satisfies ChannelMessage);
}
}
}

View File

@@ -0,0 +1 @@
export * from './broadcast.js';

View File

@@ -0,0 +1,3 @@
export * from './engine.js';
export * from './impl/index.js';
export * from './source.js';

View File

@@ -0,0 +1,6 @@
import type { Awareness } from 'y-protocols/awareness';
export interface AwarenessSource {
connect(awareness: Awareness): void;
disconnect(): void;
}

View File

@@ -0,0 +1,197 @@
import { type Logger, sha } from '@blocksuite/global/utils';
import type { BlobSource } from './source.js';
export interface BlobStatus {
isStorageOverCapacity: boolean;
}
/**
* # BlobEngine
*
* sync blobs between storages in background.
*
* all operations priority use main, then use shadows.
*/
export class BlobEngine {
private _abort: AbortController | null = null;
get sources() {
return [this.main, ...this.shadows];
}
constructor(
readonly main: BlobSource,
readonly shadows: BlobSource[],
readonly logger: Logger
) {}
async delete(_key: string) {
this.logger.error(
'You are trying to delete a blob. We do not support this feature yet. We need to wait until we implement the indexer, which will inform us which doc is using a particular blob so that we can safely delete it.'
);
}
async get(key: string) {
this.logger.debug('get blob', key);
for (const source of this.sources) {
const data = await source.get(key);
if (data) {
return data;
}
}
return null;
}
async list() {
const blobIdSet = new Set<string>();
for (const source of this.sources) {
const blobs = await source.list();
for (const blob of blobs) {
blobIdSet.add(blob);
}
}
return Array.from(blobIdSet);
}
async set(value: Blob): Promise<string>;
async set(key: string, value: Blob): Promise<string>;
async set(valueOrKey: string | Blob, _value?: Blob) {
if (this.main.readonly) {
throw new Error('main peer is readonly');
}
const key =
typeof valueOrKey === 'string'
? valueOrKey
: await sha(await valueOrKey.arrayBuffer());
const value = typeof valueOrKey === 'string' ? _value : valueOrKey;
if (!value) {
throw new Error('value is empty');
}
// await upload to the main peer
await this.main.set(key, value);
// uploads to other peers in the background
Promise.allSettled(
this.shadows
.filter(r => !r.readonly)
.map(peer =>
peer.set(key, value).catch(err => {
this.logger.error('Error when uploading to peer', err);
})
)
)
.then(result => {
if (result.some(({ status }) => status === 'rejected')) {
this.logger.error(
`blob ${key} update finish, but some peers failed to update`
);
} else {
this.logger.debug(`blob ${key} update finish`);
}
})
.catch(() => {
// Promise.allSettled never reject
});
return key;
}
start() {
if (this._abort) {
return;
}
this._abort = new AbortController();
const abortSignal = this._abort.signal;
const sync = () => {
if (abortSignal.aborted) {
return;
}
this.sync()
.catch(error => {
this.logger.error('sync blob error', error);
})
.finally(() => {
// sync every 1 minute
setTimeout(sync, 60000);
});
};
sync();
}
stop() {
this._abort?.abort();
this._abort = null;
}
async sync() {
if (this.main.readonly) {
return;
}
this.logger.debug('start syncing blob...');
for (const shadow of this.shadows) {
let mainList: string[] = [];
let shadowList: string[] = [];
if (!shadow.readonly) {
try {
mainList = await this.main.list();
shadowList = await shadow.list();
} catch (err) {
this.logger.error(`error when sync`, err);
continue;
}
const needUpload = mainList.filter(key => !shadowList.includes(key));
for (const key of needUpload) {
try {
const data = await this.main.get(key);
if (data) {
await shadow.set(key, data);
} else {
this.logger.error(
'data not found when trying upload from main to shadow'
);
}
} catch (err) {
this.logger.error(
`error when sync ${key} from [${this.main.name}] to [${shadow.name}]`,
err
);
}
}
}
const needDownload = shadowList.filter(key => !mainList.includes(key));
for (const key of needDownload) {
try {
const data = await shadow.get(key);
if (data) {
await this.main.set(key, data);
} else {
this.logger.error(
'data not found when trying download from shadow to main'
);
}
} catch (err) {
this.logger.error(
`error when sync ${key} from [${shadow.name}] to [${this.main.name}]`,
err
);
}
}
}
this.logger.debug('finish syncing blob');
}
}

View File

@@ -0,0 +1,2 @@
export * from './indexeddb.js';
export * from './memory.js';

View File

@@ -0,0 +1,39 @@
import { createStore, del, get, keys, set } from 'idb-keyval';
import type { BlobSource } from '../source.js';
export class IndexedDBBlobSource implements BlobSource {
readonly mimeTypeStore = createStore(`${this.name}_blob_mime`, 'blob_mime');
readonly = false;
readonly store = createStore(`${this.name}_blob`, 'blob');
constructor(readonly name: string) {}
async delete(key: string) {
await del(key, this.store);
await del(key, this.mimeTypeStore);
}
async get(key: string) {
const res = await get<ArrayBuffer>(key, this.store);
if (res) {
return new Blob([res], {
type: await get(key, this.mimeTypeStore),
});
}
return null;
}
async list() {
const list = await keys<string>(this.store);
return list;
}
async set(key: string, value: Blob) {
await set(key, await value.arrayBuffer(), this.store);
await set(key, value.type, this.mimeTypeStore);
return key;
}
}

View File

@@ -0,0 +1,27 @@
import type { BlobSource } from '../source.js';
export class MemoryBlobSource implements BlobSource {
readonly map = new Map<string, Blob>();
name = 'memory';
readonly = false;
delete(key: string) {
this.map.delete(key);
return Promise.resolve();
}
get(key: string) {
return Promise.resolve(this.map.get(key) ?? null);
}
list() {
return Promise.resolve(Array.from(this.map.keys()));
}
set(key: string, value: Blob) {
this.map.set(key, value);
return Promise.resolve(key);
}
}

View File

@@ -0,0 +1,3 @@
export * from './engine.js';
export * from './impl/index.js';
export * from './source.js';

View File

@@ -0,0 +1,8 @@
export interface BlobSource {
name: string;
readonly: boolean;
get: (key: string) => Promise<Blob | null>;
set: (key: string, value: Blob) => Promise<string>;
delete: (key: string) => Promise<void>;
list: () => Promise<string[]>;
}

View File

@@ -0,0 +1,15 @@
export enum DocEngineStep {
Stopped = 0,
Synced = 2,
Syncing = 1,
}
export enum DocPeerStep {
Loaded = 4.5,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Retrying = 1,
Stopped = 0,
Synced = 6,
Syncing = 5,
}

View File

@@ -0,0 +1,286 @@
import { type Logger, Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { SharedPriorityTarget } from '../utils/async-queue.js';
import { MANUALLY_STOP, throwIfAborted } from '../utils/throw-if-aborted.js';
import { DocEngineStep, DocPeerStep } from './consts.js';
import { type DocPeerStatus, SyncPeer } from './peer.js';
import type { DocSource } from './source.js';
export interface DocEngineStatus {
step: DocEngineStep;
main: DocPeerStatus | null;
shadows: (DocPeerStatus | null)[];
retrying: boolean;
}
/**
* # DocEngine
*
* ```
* ┌────────────┐
* │ DocEngine │
* └─────┬──────┘
* │
* ▼
* ┌────────────┐
* │ DocPeer │
* ┌─────────┤ main ├─────────┐
* │ └─────┬──────┘ │
* │ │ │
* ▼ ▼ ▼
* ┌────────────┐ ┌────────────┐ ┌────────────┐
* │ DocPeer │ │ DocPeer │ │ DocPeer │
* │ shadow │ │ shadow │ │ shadow │
* └────────────┘ └────────────┘ └────────────┘
* ```
*
* doc engine manage doc peers
*
* Sync steps:
* 1. start main sync
* 2. wait for main sync complete
* 3. start shadow sync
* 4. continuously sync main and shadows
*/
export class DocEngine {
private _abort = new AbortController();
private _status: DocEngineStatus;
readonly onStatusChange = new Slot<DocEngineStatus>();
readonly priorityTarget = new SharedPriorityTarget();
get rootDocId() {
return this.rootDoc.guid;
}
get status() {
return this._status;
}
constructor(
readonly rootDoc: Doc,
readonly main: DocSource,
readonly shadows: DocSource[],
readonly logger: Logger
) {
this._status = {
step: DocEngineStep.Stopped,
main: null,
shadows: shadows.map(() => null),
retrying: false,
};
this.logger.debug(`syne-engine:${this.rootDocId} status init`, this.status);
}
private setStatus(s: DocEngineStatus) {
this.logger.debug(`syne-engine:${this.rootDocId} status change`, s);
this._status = s;
this.onStatusChange.emit(s);
}
canGracefulStop() {
return !!this.status.main && this.status.main.pendingPushUpdates === 0;
}
forceStop() {
this._abort.abort(MANUALLY_STOP);
this.setStatus({
step: DocEngineStep.Stopped,
main: null,
shadows: this.shadows.map(() => null),
retrying: false,
});
}
setPriorityRule(target: ((id: string) => boolean) | null) {
this.priorityTarget.priorityRule = target;
}
start() {
if (this.status.step !== DocEngineStep.Stopped) {
this.forceStop();
}
this._abort = new AbortController();
this.sync(this._abort.signal).catch(err => {
// should never reach here
this.logger.error(`syne-engine:${this.rootDocId}`, err);
});
}
// main sync process, should never return until abort
async sync(signal: AbortSignal) {
const state: {
mainPeer: SyncPeer | null;
shadowPeers: (SyncPeer | null)[];
} = {
mainPeer: null,
shadowPeers: this.shadows.map(() => null),
};
const cleanUp: (() => void)[] = [];
try {
// Step 1: start main sync peer
state.mainPeer = new SyncPeer(
this.rootDoc,
this.main,
this.priorityTarget,
this.logger
);
cleanUp.push(
state.mainPeer.onStatusChange.on(() => {
if (!signal.aborted)
this.updateSyncingState(state.mainPeer, state.shadowPeers);
}).dispose
);
this.updateSyncingState(state.mainPeer, state.shadowPeers);
// Step 2: wait for main sync complete
await state.mainPeer.waitForLoaded(signal);
// Step 3: start shadow sync peer
state.shadowPeers = this.shadows.map(shadow => {
const peer = new SyncPeer(
this.rootDoc,
shadow,
this.priorityTarget,
this.logger
);
cleanUp.push(
peer.onStatusChange.on(() => {
if (!signal.aborted)
this.updateSyncingState(state.mainPeer, state.shadowPeers);
}).dispose
);
return peer;
});
this.updateSyncingState(state.mainPeer, state.shadowPeers);
// Step 4: continuously sync main and shadow
// wait for abort
await new Promise((_, reject) => {
if (signal.aborted) {
reject(signal.reason);
}
signal.addEventListener('abort', () => {
reject(signal.reason);
});
});
} catch (error) {
if (error === MANUALLY_STOP || signal.aborted) {
return;
}
throw error;
} finally {
// stop peers
state.mainPeer?.stop();
for (const shadowPeer of state.shadowPeers) {
shadowPeer?.stop();
}
for (const clean of cleanUp) {
clean();
}
}
}
updateSyncingState(local: SyncPeer | null, shadows: (SyncPeer | null)[]) {
let step = DocEngineStep.Synced;
const allPeer = [local, ...shadows];
for (const peer of allPeer) {
if (!peer || peer.status.step !== DocPeerStep.Synced) {
step = DocEngineStep.Syncing;
break;
}
}
this.setStatus({
step,
main: local?.status ?? null,
shadows: shadows.map(peer => peer?.status ?? null),
retrying: allPeer.some(
peer => peer?.status.step === DocPeerStep.Retrying
),
});
}
async waitForGracefulStop(abort?: AbortSignal) {
await Promise.race([
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
new Promise<void>(resolve => {
this.onStatusChange.on(() => {
if (this.canGracefulStop()) {
resolve();
}
});
}),
]);
throwIfAborted(abort);
this.forceStop();
}
async waitForLoadedRootDoc(abort?: AbortSignal) {
function isLoadedRootDoc(status: DocEngineStatus) {
return ![status.main, ...status.shadows].some(
peer => !peer || peer.step <= DocPeerStep.LoadingRootDoc
);
}
if (isLoadedRootDoc(this.status)) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (isLoadedRootDoc(status)) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForSynced(abort?: AbortSignal) {
if (this.status.step === DocEngineStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step === DocEngineStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
}

View File

@@ -0,0 +1,91 @@
import { assertExists } from '@blocksuite/global/utils';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';
import type { DocSource } from '../source.js';
type ChannelMessage =
| {
type: 'init';
}
| {
type: 'update';
docId: string;
data: Uint8Array;
};
export class BroadcastChannelDocSource implements DocSource {
private _onMessage = (event: MessageEvent<ChannelMessage>) => {
if (event.data.type === 'init') {
for (const [docId, data] of this.docMap) {
this.channel.postMessage({
type: 'update',
docId,
data,
} satisfies ChannelMessage);
}
return;
}
const { docId, data } = event.data;
const update = this.docMap.get(docId);
if (update) {
this.docMap.set(docId, mergeUpdates([update, data]));
} else {
this.docMap.set(docId, data);
}
};
channel = new BroadcastChannel(this.channelName);
docMap = new Map<string, Uint8Array>();
name = 'broadcast-channel';
constructor(readonly channelName: string = 'blocksuite:doc') {
this.channel.addEventListener('message', this._onMessage);
this.channel.postMessage({
type: 'init',
});
}
pull(docId: string, state: Uint8Array) {
const update = this.docMap.get(docId);
if (!update) return null;
const diff = state.length ? diffUpdate(update, state) : update;
return { data: diff, state: encodeStateVectorFromUpdate(update) };
}
push(docId: string, data: Uint8Array) {
const update = this.docMap.get(docId);
if (update) {
this.docMap.set(docId, mergeUpdates([update, data]));
} else {
this.docMap.set(docId, data);
}
assertExists(this.docMap.get(docId));
this.channel.postMessage({
type: 'update',
docId,
data: this.docMap.get(docId)!,
} satisfies ChannelMessage);
}
subscribe(cb: (docId: string, data: Uint8Array) => void) {
const abortController = new AbortController();
this.channel.addEventListener(
'message',
(event: MessageEvent<ChannelMessage>) => {
if (event.data.type !== 'update') return;
const { docId, data } = event.data;
cb(docId, data);
},
{ signal: abortController.signal }
);
return () => {
abortController.abort();
};
}
}

View File

@@ -0,0 +1,3 @@
export * from './broadcast.js';
export * from './indexeddb.js';
export * from './noop.js';

View File

@@ -0,0 +1,116 @@
import { type DBSchema, type IDBPDatabase, openDB } from 'idb';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';
import type { DocSource } from '../source.js';
export const dbVersion = 1;
export const DEFAULT_DB_NAME = 'blocksuite-local';
type UpdateMessage = {
timestamp: number;
update: Uint8Array;
};
type DocCollectionPersist = {
id: string;
updates: UpdateMessage[];
};
interface BlockSuiteBinaryDB extends DBSchema {
collection: {
key: string;
value: DocCollectionPersist;
};
}
export function upgradeDB(db: IDBPDatabase<BlockSuiteBinaryDB>) {
db.createObjectStore('collection', { keyPath: 'id' });
}
type ChannelMessage = {
type: 'db-updated';
payload: { docId: string; update: Uint8Array };
};
export class IndexedDBDocSource implements DocSource {
// indexeddb could be shared between tabs, so we use broadcast channel to notify other tabs
channel = new BroadcastChannel('indexeddb:' + this.dbName);
dbPromise: Promise<IDBPDatabase<BlockSuiteBinaryDB>> | null = null;
mergeCount = 1;
name = 'indexeddb';
constructor(readonly dbName: string = DEFAULT_DB_NAME) {}
getDb() {
if (this.dbPromise === null) {
this.dbPromise = openDB<BlockSuiteBinaryDB>(this.dbName, dbVersion, {
upgrade: upgradeDB,
});
}
return this.dbPromise;
}
async pull(
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> {
const db = await this.getDb();
const store = db
.transaction('collection', 'readonly')
.objectStore('collection');
const data = await store.get(docId);
if (!data) {
return null;
}
const { updates } = data;
const update = mergeUpdates(updates.map(({ update }) => update));
const diff = state.length ? diffUpdate(update, state) : update;
return { data: diff, state: encodeStateVectorFromUpdate(update) };
}
async push(docId: string, data: Uint8Array): Promise<void> {
const db = await this.getDb();
const store = db
.transaction('collection', 'readwrite')
.objectStore('collection');
const { updates } = (await store.get(docId)) ?? { updates: [] };
let rows: UpdateMessage[] = [
...updates,
{ timestamp: Date.now(), update: data },
];
if (this.mergeCount && rows.length >= this.mergeCount) {
const merged = mergeUpdates(rows.map(({ update }) => update));
rows = [{ timestamp: Date.now(), update: merged }];
}
await store.put({
id: docId,
updates: rows,
});
this.channel.postMessage({
type: 'db-updated',
payload: { docId, update: data },
} satisfies ChannelMessage);
}
subscribe(cb: (docId: string, data: Uint8Array) => void) {
function onMessage(event: MessageEvent<ChannelMessage>) {
const { type, payload } = event.data;
if (type === 'db-updated') {
const { docId, update } = payload;
cb(docId, update);
}
}
this.channel.addEventListener('message', onMessage);
return () => {
this.channel.removeEventListener('message', onMessage);
};
}
}

View File

@@ -0,0 +1,18 @@
import type { DocSource } from '../source.js';
export class NoopDocSource implements DocSource {
name = 'noop';
pull(_docId: string, _data: Uint8Array) {
return null;
}
push(_docId: string, _data: Uint8Array) {}
subscribe(
_cb: (docId: string, data: Uint8Array) => void,
_disconnect: (reason: string) => void
) {
return () => {};
}
}

View File

@@ -0,0 +1,21 @@
/**
*
* **DocEngine**
*
* Manages one main Y.Doc and multiple shadow Y.Doc.
*
* Responsible for creating DocPeers for synchronization, following the main-first strategy.
*
* **DocPeer**
*
* Responsible for synchronizing a single Y.Doc data source with Y.Doc.
*
* Carries the main synchronization logic.
*
*/
export * from './consts.js';
export * from './engine.js';
export * from './impl/index.js';
export * from './peer.js';
export * from './source.js';

View File

@@ -0,0 +1,449 @@
import { isEqual, type Logger, Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import {
applyUpdate,
encodeStateAsUpdate,
encodeStateVector,
mergeUpdates,
} from 'yjs';
import {
PriorityAsyncQueue,
SharedPriorityTarget,
} from '../utils/async-queue.js';
import { MANUALLY_STOP, throwIfAborted } from '../utils/throw-if-aborted.js';
import { DocPeerStep } from './consts.js';
import type { DocSource } from './source.js';
export interface DocPeerStatus {
step: DocPeerStep;
totalDocs: number;
loadedDocs: number;
pendingPullUpdates: number;
pendingPushUpdates: number;
}
/**
* # DocPeer
* A DocPeer is responsible for syncing one Storage with one Y.Doc and its subdocs.
*
* ```
* ┌─────┐
* │Start│
* └──┬──┘
* │
* ┌──────┐ ┌─────▼──────┐ ┌────┐
* │listen◄─────┤pull rootdoc│ │peer│
* └──┬───┘ └─────┬──────┘ └──┬─┘
* │ │ onLoad() │
* ┌──▼───┐ ┌─────▼──────┐ ┌────▼────┐
* │listen◄─────┤pull subdocs│ │subscribe│
* └──┬───┘ └─────┬──────┘ └────┬────┘
* │ │ onReady() │
* ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐
* │queue├──────►apply updates◄───────┤queue│
* └─────┘ └─────────────┘ └─────┘
* ```
*
* listen: listen for updates from ydoc, typically from user modifications.
* subscribe: listen for updates from storage, typically from other users.
*
*/
export class SyncPeer {
private _status: DocPeerStatus = {
step: DocPeerStep.LoadingRootDoc,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
};
readonly abort = new AbortController();
// handle updates from storage
handleStorageUpdates = (id: string, data: Uint8Array) => {
this.state.pullUpdatesQueue.push({
id,
data,
});
this.updateSyncStatus();
};
// handle subdocs changes, append new subdocs to queue, remove subdocs from queue
handleSubdocsUpdate = ({
added,
removed,
}: {
added: Set<Doc>;
removed: Set<Doc>;
}) => {
for (const subdoc of added) {
this.state.subdocsLoadQueue.push({ id: subdoc.guid, doc: subdoc });
}
for (const subdoc of removed) {
this.disconnectDoc(subdoc);
this.state.subdocsLoadQueue.remove(doc => doc.doc === subdoc);
}
this.updateSyncStatus();
};
// handle updates from ydoc
handleYDocUpdates = (update: Uint8Array, origin: string, doc: Doc) => {
// don't push updates from storage
if (origin === this.name) {
return;
}
const exist = this.state.pushUpdatesQueue.find(({ id }) => id === doc.guid);
if (exist) {
exist.data.push(update);
} else {
this.state.pushUpdatesQueue.push({
id: doc.guid,
data: [update],
});
}
this.updateSyncStatus();
};
readonly onStatusChange = new Slot<DocPeerStatus>();
readonly state: {
connectedDocs: Map<string, Doc>;
pushUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array[];
}>;
pushingUpdate: boolean;
pullUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array;
}>;
subdocLoading: boolean;
subdocsLoadQueue: PriorityAsyncQueue<{ id: string; doc: Doc }>;
} = {
connectedDocs: new Map(),
pushUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
pushingUpdate: false,
pullUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
subdocLoading: false,
subdocsLoadQueue: new PriorityAsyncQueue([], this.priorityTarget),
};
get name() {
return this.source.name;
}
private set status(s: DocPeerStatus) {
if (!isEqual(s, this._status)) {
this.logger.debug(`doc-peer:${this.name} status change`, s);
this._status = s;
this.onStatusChange.emit(s);
}
}
get status() {
return this._status;
}
constructor(
readonly rootDoc: Doc,
readonly source: DocSource,
readonly priorityTarget = new SharedPriorityTarget(),
readonly logger: Logger
) {
this.logger.debug(`doc-peer:${this.name} start`);
this.syncRetryLoop(this.abort.signal).catch(err => {
// should not reach here
console.error(err);
});
}
async connectDoc(doc: Doc, abort: AbortSignal) {
const { data: docData, state: inStorageState } =
(await this.source.pull(doc.guid, encodeStateVector(doc))) ?? {};
throwIfAborted(abort);
if (docData && docData.length > 0) {
applyUpdate(doc, docData, 'load');
}
// diff root doc and in-storage, save updates to pendingUpdates
this.state.pushUpdatesQueue.push({
id: doc.guid,
data: [encodeStateAsUpdate(doc, inStorageState)],
});
this.state.connectedDocs.set(doc.guid, doc);
// start listen root doc changes
doc.on('update', this.handleYDocUpdates);
// mark rootDoc as loaded
doc.emit('sync', [true, doc]);
this.updateSyncStatus();
}
disconnectDoc(doc: Doc) {
doc.off('update', this.handleYDocUpdates);
this.state.connectedDocs.delete(doc.guid);
this.updateSyncStatus();
}
initState() {
this.state.connectedDocs.clear();
this.state.pushUpdatesQueue.clear();
this.state.pullUpdatesQueue.clear();
this.state.subdocsLoadQueue.clear();
this.state.pushingUpdate = false;
this.state.subdocLoading = false;
}
/**
* stop sync
*
* DocPeer is one-time use, this peer should be discarded after call stop().
*/
stop() {
this.logger.debug(`doc-peer:${this.name} stop`);
this.abort.abort(MANUALLY_STOP);
}
/**
* main synchronization logic
*/
async sync(abortOuter: AbortSignal) {
this.initState();
const abortInner = new AbortController();
abortOuter.addEventListener('abort', reason => {
abortInner.abort(reason);
});
let dispose: (() => void) | null = null;
try {
this.updateSyncStatus();
// start listen storage updates
dispose = await this.source.subscribe(
this.handleStorageUpdates,
reason => {
// abort if storage disconnect, should trigger retry loop
abortInner.abort('subscribe disconnect:' + reason);
}
);
throwIfAborted(abortInner.signal);
// Step 1: load root doc
await this.connectDoc(this.rootDoc, abortInner.signal);
// Step 2: load subdocs
this.state.subdocsLoadQueue.push(
...Array.from(this.rootDoc.getSubdocs()).map(doc => ({
id: doc.guid,
doc,
}))
);
this.updateSyncStatus();
this.rootDoc.on('subdocs', this.handleSubdocsUpdate);
// Finally: start sync
await Promise.all([
// load subdocs
(async () => {
while (throwIfAborted(abortInner.signal)) {
const subdoc = await this.state.subdocsLoadQueue.next(
abortInner.signal
);
this.state.subdocLoading = true;
this.updateSyncStatus();
await this.connectDoc(subdoc.doc, abortInner.signal);
this.state.subdocLoading = false;
this.updateSyncStatus();
}
})(),
// pull updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { id, data } = await this.state.pullUpdatesQueue.next(
abortInner.signal
);
// don't apply empty data or Uint8Array([0, 0])
if (
!(
data.byteLength === 0 ||
(data.byteLength === 2 && data[0] === 0 && data[1] === 0)
)
) {
const subdoc = this.state.connectedDocs.get(id);
if (subdoc) {
applyUpdate(subdoc, data, this.name);
}
}
this.updateSyncStatus();
}
})(),
// push updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { id, data } = await this.state.pushUpdatesQueue.next(
abortInner.signal
);
this.state.pushingUpdate = true;
this.updateSyncStatus();
const merged = mergeUpdates(data);
// don't push empty data or Uint8Array([0, 0])
if (
!(
merged.byteLength === 0 ||
(merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0)
)
) {
await this.source.push(id, merged);
}
this.state.pushingUpdate = false;
this.updateSyncStatus();
}
})(),
]);
} finally {
dispose?.();
for (const docs of this.state.connectedDocs.values()) {
this.disconnectDoc(docs);
}
this.rootDoc.off('subdocs', this.handleSubdocsUpdate);
}
}
/**
* auto retry after 5 seconds if sync failed
*/
async syncRetryLoop(abort: AbortSignal) {
while (abort.aborted === false) {
try {
await this.sync(abort);
} catch (err) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}
this.logger.error(`doc-peer:${this.name} sync error`, err);
}
try {
this.logger.error(`doc-peer:${this.name} retry after 5 seconds`);
this.status = {
step: DocPeerStep.Retrying,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
};
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 5 * 1000);
}),
new Promise((_, reject) => {
// exit if manually stopped
if (abort.aborted) {
reject(abort.reason);
}
abort.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
} catch (err) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}
// should never reach here
throw err;
}
}
}
updateSyncStatus() {
let step;
if (this.state.connectedDocs.size === 0) {
step = DocPeerStep.LoadingRootDoc;
} else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) {
step = DocPeerStep.LoadingSubDoc;
} else if (
this.state.pullUpdatesQueue.length ||
this.state.pushUpdatesQueue.length ||
this.state.pushingUpdate
) {
step = DocPeerStep.Syncing;
} else {
step = DocPeerStep.Synced;
}
this.status = {
step: step,
totalDocs:
this.state.connectedDocs.size + this.state.subdocsLoadQueue.length,
loadedDocs: this.state.connectedDocs.size,
pendingPullUpdates:
this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0),
pendingPushUpdates:
this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0),
};
}
async waitForLoaded(abort?: AbortSignal) {
if (this.status.step > DocPeerStep.Loaded) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step > DocPeerStep.Loaded) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForSynced(abort?: AbortSignal) {
if (this.status.step >= DocPeerStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step >= DocPeerStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
}

View File

@@ -0,0 +1,28 @@
export interface DocSource {
/**
* for debug
*/
name: string;
pull(
docId: string,
state: Uint8Array
):
| Promise<{ data: Uint8Array; state?: Uint8Array } | null>
| { data: Uint8Array; state?: Uint8Array }
| null;
push(docId: string, data: Uint8Array): Promise<void> | void;
/**
* Subscribe to updates from peer
*
* @param cb callback to handle updates
* @param disconnect callback to handle disconnect, reason can be something like 'network-error'
*
* @returns unsubscribe function
*/
subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
): Promise<() => void> | (() => void);
}

View File

@@ -0,0 +1,3 @@
export * from './awareness/index.js';
export * from './blob/index.js';
export * from './doc/index.js';

View File

@@ -0,0 +1,45 @@
import { describe, expect, test, vi } from 'vitest';
import { AsyncQueue } from '../async-queue.js';
describe('async-queue', () => {
test('push & pop', async () => {
const queue = new AsyncQueue();
queue.push(1, 2, 3);
expect(queue.length).toBe(3);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
expect(await queue.next()).toBe(3);
expect(queue.length).toBe(0);
});
test('await', async () => {
const queue = new AsyncQueue<number>();
queue.push(1, 2);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
let v = -1;
// setup 2 pop tasks
void queue.next().then(next => {
v = next;
});
void queue.next().then(next => {
v = next;
});
// Wait for 100ms
await new Promise(resolve => setTimeout(resolve, 100));
// v should not be changed
expect(v).toBe(-1);
// push 3, should trigger the first pop task
queue.push(3);
await vi.waitFor(() => v === 3);
// push 4, should trigger the second pop task
queue.push(4);
await vi.waitFor(() => v === 4);
});
});

View File

@@ -0,0 +1,13 @@
import { describe, expect, test } from 'vitest';
import { throwIfAborted } from '../throw-if-aborted.js';
describe('throw-if-aborted', () => {
test('basic', () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
expect(throwIfAborted(abortSignal)).toBe(true);
abortController.abort('TEST_ABORT');
expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT');
});
});

View File

@@ -0,0 +1,102 @@
export class AsyncQueue<T> {
private _queue: T[];
private _resolveUpdate: (() => void) | null = null;
private _waitForUpdate: Promise<void> | null = null;
get length() {
return this._queue.length;
}
constructor(init: T[] = []) {
this._queue = init;
}
clear() {
this._queue = [];
}
find(predicate: (update: T) => boolean) {
return this._queue.find(predicate);
}
async next(
abort?: AbortSignal,
dequeue: (arr: T[]) => T | undefined = a => a.shift()
): Promise<T> {
const update = dequeue(this._queue);
if (update) {
return update;
} else {
if (!this._waitForUpdate) {
this._waitForUpdate = new Promise(resolve => {
this._resolveUpdate = resolve;
});
}
await Promise.race([
this._waitForUpdate,
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
return this.next(abort, dequeue);
}
}
push(...updates: T[]) {
this._queue.push(...updates);
if (this._resolveUpdate) {
const resolve = this._resolveUpdate;
this._resolveUpdate = null;
this._waitForUpdate = null;
resolve();
}
}
remove(predicate: (update: T) => boolean) {
const index = this._queue.findIndex(predicate);
if (index !== -1) {
this._queue.splice(index, 1);
}
}
}
export class PriorityAsyncQueue<
T extends { id: string },
> extends AsyncQueue<T> {
constructor(
init: T[] = [],
readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget()
) {
super(init);
}
override next(abort?: AbortSignal | undefined): Promise<T> {
return super.next(abort, arr => {
if (this.priorityTarget.priorityRule !== null) {
const index = arr.findIndex(update =>
this.priorityTarget.priorityRule?.(update.id)
);
if (index !== -1) {
return arr.splice(index, 1)[0];
}
}
return arr.shift();
});
}
}
/**
* Shared priority target can be shared by multiple queues.
*/
export class SharedPriorityTarget {
priorityRule: ((id: string) => boolean) | null = null;
}

View File

@@ -0,0 +1,9 @@
// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill
export function throwIfAborted(abort?: AbortSignal) {
if (abort?.aborted) {
throw new Error(abort.reason);
}
return true;
}
export const MANUALLY_STOP = 'manually-stop';

View File

@@ -0,0 +1,14 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "./src/",
"outDir": "./dist/",
"noEmit": false
},
"include": ["./src", "index.d.ts"],
"references": [
{
"path": "../global"
}
]
}

View File

@@ -0,0 +1,23 @@
import { defineConfig } from 'vitest/config';
export default defineConfig({
test: {
include: ['src/__tests__/**/*.unit.spec.ts'],
testTimeout: 500,
coverage: {
provider: 'istanbul', // or 'c8'
reporter: ['lcov'],
reportsDirectory: '../../../.coverage/sync',
},
/**
* Custom handler for console.log in tests.
*
* Return `false` to ignore the log.
*/
onConsoleLog(log, type) {
console.warn(`Unexpected ${type} log`, log);
throw new Error(log);
},
restoreMocks: true,
},
});