EYHN
2024-03-22 16:43:26 +00:00
parent 05c44db5a9
commit 34703a3b7d
85 changed files with 3248 additions and 2286 deletions

View File

@@ -4,6 +4,7 @@ export * from './blocksuite';
export * from './command';
export * from './di';
export * from './initialization';
export * from './lifecycle';
export * from './livedata';
export * from './page';
export * from './storage';

View File

@@ -105,7 +105,7 @@ export async function buildShowcaseWorkspace(
const { workspace, release } = workspaceManager.open(meta);
await workspace.engine.sync.waitForLoadedRootDoc();
await workspace.engine.waitForRootDocReady();
const pageRecordList = workspace.services.get(PageRecordList);

View File

@@ -11,7 +11,7 @@ import type {
import { assertExists } from '@blocksuite/global/utils';
import type { DeltaOperation, JobMiddleware } from '@blocksuite/store';
export const replaceIdMiddleware: JobMiddleware = ({ slots, workspace }) => {
export const replaceIdMiddleware: JobMiddleware = ({ slots, collection }) => {
const idMap = new Map<string, string>();
slots.afterImport.on(payload => {
if (
@@ -61,7 +61,7 @@ export const replaceIdMiddleware: JobMiddleware = ({ slots, workspace }) => {
});
slots.beforeImport.on(payload => {
if (payload.type === 'page') {
const newId = workspace.idGenerator('page');
const newId = collection.idGenerator('page');
idMap.set(payload.snapshot.meta.id, newId);
payload.snapshot.meta.id = newId;
return;
@@ -84,7 +84,7 @@ export const replaceIdMiddleware: JobMiddleware = ({ slots, workspace }) => {
if (idMap.has(original)) {
newId = idMap.get(original)!;
} else {
newId = workspace.idGenerator('block');
newId = collection.idGenerator('block');
idMap.set(original, newId);
}
snapshot.id = newId;
@@ -96,7 +96,7 @@ export const replaceIdMiddleware: JobMiddleware = ({ slots, workspace }) => {
if (idMap.has(original)) {
newId = idMap.get(original)!;
} else {
newId = workspace.idGenerator('block');
newId = collection.idGenerator('block');
idMap.set(original, newId);
}
});

View File

@@ -2,11 +2,7 @@ import { isEqual } from 'lodash-es';
import { distinctUntilChanged, map, Observable } from 'rxjs';
import { LiveData } from '../livedata';
import {
SyncEngineStep,
type Workspace,
type WorkspaceLocalState,
} from '../workspace';
import { type Workspace, type WorkspaceLocalState } from '../workspace';
import { PageRecord } from './record';
export class PageRecordList {
@@ -39,22 +35,8 @@ export class PageRecordList {
[]
);
public readonly isReady = LiveData.from<boolean>(
new Observable(subscriber => {
subscriber.next(
this.workspace.engine.status.sync.step === SyncEngineStep.Synced
);
const dispose = this.workspace.engine.onStatusChange.on(() => {
subscriber.next(
this.workspace.engine.status.sync.step === SyncEngineStep.Synced
);
}).dispose;
return () => {
dispose();
};
}),
false
public readonly isReady = this.workspace.engine.rootDocState.map(
state => !state.syncing
);
public record(id: string) {

View File

@@ -1 +1,2 @@
export * from './kv';
export * from './memento';

View File

@@ -0,0 +1,85 @@
import { AsyncLock } from '../utils';
export interface ByteKV extends ByteKVBehavior {
transaction<T>(cb: (transaction: ByteKVBehavior) => Promise<T>): Promise<T>;
}
export interface ByteKVBehavior {
get(key: string): Promise<Uint8Array | null> | Uint8Array | null;
set(key: string, value: Uint8Array): Promise<void> | void;
del(key: string): Promise<void> | void;
keys(): Promise<string[]> | string[];
clear(): Promise<void> | void;
}
export class MemoryByteKV implements ByteKV {
readonly lock = new AsyncLock();
constructor(readonly db = new Map<string, Uint8Array>()) {}
async transaction<T>(cb: (transaction: ByteKVBehavior) => Promise<T>) {
using _lock = await this.lock.acquire();
return await cb({
get: async key => {
return this.db.get(key) ?? null;
},
set: async (key, value) => {
this.db.set(key, value);
},
keys: async () => {
return Array.from(this.db.keys());
},
del: async key => {
this.db.delete(key);
},
clear: async () => {
this.db.clear();
},
});
}
get(key: string) {
return this.transaction(async tx => tx.get(key));
}
set(key: string, value: Uint8Array) {
return this.transaction(async tx => tx.set(key, value));
}
keys() {
return this.transaction(async tx => tx.keys());
}
clear() {
return this.transaction(async tx => tx.clear());
}
del(key: string) {
return this.transaction(async tx => tx.del(key));
}
}
export class ReadonlyByteKV extends MemoryByteKV implements ByteKV {
override transaction<T>(
cb: (transaction: ByteKVBehavior) => Promise<T>
): Promise<T> {
return super.transaction(tx => {
return cb({
...tx,
set() {
return Promise.resolve();
},
del() {
return Promise.resolve();
},
clear() {
return Promise.resolve();
},
});
});
}
override set(_key: string, _value: Uint8Array): Promise<void> {
return Promise.resolve();
}
override del(_key: string): Promise<void> {
return Promise.resolve();
}
override clear(): Promise<void> {
return Promise.resolve();
}
}

View File

@@ -10,6 +10,9 @@ export interface Memento {
get<T>(key: string): T | null;
watch<T>(key: string): Observable<T | null>;
set<T>(key: string, value: T | null): void;
del(key: string): void;
clear(): void;
keys(): string[];
}
/**
@@ -54,4 +57,43 @@ export class MemoryMemento implements Memento {
set<T>(key: string, value: T | null): void {
this.getLiveData(key).next(value);
}
keys(): string[] {
return Array.from(this.data.keys());
}
clear(): void {
this.data.clear();
}
del(key: string): void {
this.data.delete(key);
}
}
export function wrapMemento(memento: Memento, prefix: string): Memento {
return {
get<T>(key: string): T | null {
return memento.get(prefix + key);
},
watch(key: string) {
return memento.watch(prefix + key);
},
set<T>(key: string, value: T | null): void {
memento.set(prefix + key, value);
},
keys(): string[] {
return memento
.keys()
.filter(k => k.startsWith(prefix))
.map(k => k.slice(prefix.length));
},
clear() {
memento.keys().forEach(k => {
if (k.startsWith(prefix)) {
memento.del(k);
}
});
},
del(key: string): void {
memento.del(prefix + key);
},
};
}

View File

@@ -0,0 +1,20 @@
export class AsyncLock {
private _lock = Promise.resolve();
async acquire() {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let release: () => void = null!;
const nextLock = new Promise<void>(resolve => {
release = resolve;
});
await this._lock;
this._lock = nextLock;
return {
release,
[Symbol.dispose]: () => {
release();
},
};
}
}

View File

@@ -1,3 +1,4 @@
export * from './async-lock';
export * from './async-queue';
export * from './merge-updates';
export * from './object-pool';

View File

@@ -0,0 +1,127 @@
# DocEngine
The synchronization algorithm for yjs docs.
```
┌─────────┐ ┌───────────┐ ┌────────┐
│ Storage ◄──┤ DocEngine ├──► Server │
└─────────┘ └───────────┘ └────────┘
```
# Core Components
## DocStorage
```ts
export interface DocStorage {
eventBus: DocEventBus;
doc: ByteKV;
syncMetadata: ByteKV;
serverClock: ByteKV;
}
```
Represents the local storage used, Specific implementations are replaceable, such as `IndexedDBDocStorage` on the `browser` and `SqliteDocStorage` on the `desktop`.
### DocEventBus
Each `DocStorage` contains a `DocEventBus`, which is used to communicate with other engines that share the same storage.
With `DocEventBus` we can sync updates between engines without connecting to the server.
For example, on the `browser`, we have multiple tabs, all tabs share the same `IndexedDBDocStorage`, so we use `BroadcastChannel` to implement `DocEventBus`, which allows us to broadcast events to all tabs.
On the `desktop` app, if we have multiple Windows sharing the same `SqliteDocStorage`, we must build a mechanism to broadcast events between all Windows (currently not implemented).
## DocServer
```ts
export interface DocServer {
pullDoc(
docId: string,
stateVector: Uint8Array
): Promise<{
data: Uint8Array;
serverClock: number;
stateVector?: Uint8Array;
} | null>;
pushDoc(docId: string, data: Uint8Array): Promise<{ serverClock: number }>;
subscribeAllDocs(cb: (updates: { docId: string; data: Uint8Array; serverClock: number }) => void): Promise<() => void>;
loadServerClock(after: number): Promise<Map<string, number>>;
waitForConnectingServer(signal: AbortSignal): Promise<void>;
disconnectServer(): void;
onInterrupted(cb: (reason: string) => void): void;
}
```
Represents the server we want to synchronize, there is a simulated implementation in `tests/sync.spec.ts`, and the real implementation is in `packages/backend/server`.
### ServerClock
`ServerClock` is a clock generated after each updates is stored in the Server. It is used to determine the order in which updates are stored in the Server.
The `DocEngine` decides whether to pull updates from the server based on the `ServerClock`.
The `ServerClock` written later must be **greater** than all previously. So on the client side, we can use `loadServerClock(the largest ServerClock previously received)` to obtain all changed `ServerClock`.
## DocEngine
The `DocEngine` is where all the synchronization logic actually happens.
Due to the complexity of the implementation, we divide it into 2 parts.
## DocEngine - LocalPart
Synchronizing **the `YDoc` instance** and **storage**.
The typical workflow is:
1. load data from storage, apply to `YDoc` instance.
2. track `YDoc` changes
3. write the changes back to storage.
### SeqNum
There is a `SeqNum` on each Doc data in `Storage`. Every time `LocalPart` writes data, `SeqNum` will be +1.
There is also a `PushedSeqNum`, which is used for RemotePart later.
## DocEngine - RemotePart
Synchronizing `Storage` and `Server`.
The typical workflow is:
1. Connect with the server, Load `ServerClocks` for all docs, Start subscribing to server-side updates.
2. Check whether each doc requires `push` and `pull`
3. Execute all push and pull
4. Listen for updates from `LocalPart` and push the updates to the server
5. Listen for server-side updates and write them to storage.
### PushedSeqNum
Each Doc will record a `PushedSeqNum`, used to determine whether the doc has unpush updates.
After each `push` is completed, `PushedSeqNum` + 1
If `PushedSeqNum` and `SeqNum` are still different after we complete the push (usually means the previous `push` failed)
Then do a full pull and push and set `pushedSeqNum` = `SeqNum`
### PulledServerClock
Each Doc also record `PulledServerClock`, Used to compare with ServerClock to determine whether to `pull` doc.
When the `pull` is completed, set `PulledServerClock` = `ServerClock` returned by the server.
### Retry
The `RemotePart` may fail at any time, and `RemotePart`'s built-in retry mechanism will restart the process in 5 seconds after failure.

View File

@@ -0,0 +1,41 @@
import { describe, expect, test } from 'vitest';
import { PriorityQueue } from '../priority-queue';
describe('Priority Queue', () => {
test('priority', () => {
const queue = new PriorityQueue();
queue.push('foo', 1);
queue.push('bar', 2);
queue.push('baz', 0);
expect(queue.pop()).toBe('bar');
expect(queue.pop()).toBe('foo');
expect(queue.pop()).toBe('baz');
expect(queue.pop()).toBe(null);
queue.push('B', 1);
queue.push('A', 1);
// if priority same then follow id binary order
expect(queue.pop()).toBe('B');
expect(queue.pop()).toBe('A');
expect(queue.pop()).toBe(null);
queue.push('A', 1);
queue.push('B', 2);
queue.push('A', 3); // same id but different priority, update the priority
expect(queue.pop()).toBe('A');
expect(queue.pop()).toBe('B');
expect(queue.pop()).toBe(null);
queue.push('A', 1);
queue.push('B', 2);
queue.remove('B');
expect(queue.pop()).toBe('A');
expect(queue.pop()).toBe(null);
});
});

View File

@@ -0,0 +1,234 @@
import { nanoid } from 'nanoid';
import { describe, expect, test, vitest } from 'vitest';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';
import { AsyncLock } from '../../../../utils';
import { DocEngine } from '..';
import type { DocServer } from '../server';
import { MemoryStorage } from '../storage';
import { isEmptyUpdate } from '../utils';
class MiniServer {
lock = new AsyncLock();
db = new Map<string, { data: Uint8Array; clock: number }>();
listeners = new Set<{
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void;
clientId: string;
}>();
client() {
return new MiniServerClient(nanoid(), this);
}
}
class MiniServerClient implements DocServer {
constructor(
private readonly id: string,
private readonly server: MiniServer
) {}
async pullDoc(docId: string, stateVector: Uint8Array) {
using _lock = await this.server.lock.acquire();
const doc = this.server.db.get(docId);
if (!doc) {
return null;
}
const data = doc.data;
return {
data:
!isEmptyUpdate(data) && stateVector.length > 0
? diffUpdate(data, stateVector)
: data,
serverClock: 0,
stateVector: !isEmptyUpdate(data)
? encodeStateVectorFromUpdate(data)
: new Uint8Array(),
};
}
async pushDoc(
docId: string,
data: Uint8Array
): Promise<{ serverClock: number }> {
using _lock = await this.server.lock.acquire();
const doc = this.server.db.get(docId);
const oldData = doc?.data ?? new Uint8Array();
const newClock = (doc?.clock ?? 0) + 1;
this.server.db.set(docId, {
data: !isEmptyUpdate(data)
? !isEmptyUpdate(oldData)
? mergeUpdates([oldData, data])
: data
: oldData,
clock: newClock,
});
for (const { clientId, cb } of this.server.listeners) {
if (clientId !== this.id) {
cb({
docId,
data,
serverClock: newClock,
});
}
}
return { serverClock: newClock };
}
async loadServerClock(after: number): Promise<Map<string, number>> {
using _lock = await this.server.lock.acquire();
const map = new Map<string, number>();
for (const [docId, { clock }] of this.server.db) {
if (clock > after) {
map.set(docId, clock);
}
}
return map;
}
async subscribeAllDocs(
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void
): Promise<() => void> {
const listener = { cb, clientId: this.id };
this.server.listeners.add(listener);
return () => {
this.server.listeners.delete(listener);
};
}
async waitForConnectingServer(): Promise<void> {}
disconnectServer(): void {}
onInterrupted(_cb: (reason: string) => void): void {}
}
describe('sync', () => {
test('basic sync', async () => {
const storage = new MemoryStorage();
const server = new MiniServer();
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
map.set('a', 1);
await engine.waitForSynced();
expect(server.db.size).toBe(1);
expect(storage.docDb.keys().length).toBe(1);
});
test('can pull from server', async () => {
const server = new MiniServer();
{
const engine = new DocEngine(
new MemoryStorage(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
map.set('a', 1);
await engine.waitForSynced();
expect(server.db.size).toBe(1);
}
{
const engine = new DocEngine(
new MemoryStorage(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
await engine.waitForSynced();
expect(doc.getMap('aaa').get('a')).toBe(1);
}
});
test('2 client', async () => {
const server = new MiniServer();
await Promise.all([
(async () => {
const engine = new DocEngine(
new MemoryStorage(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
map.set('a', 1);
await vitest.waitUntil(() => {
return map.get('b') === 2;
});
})(),
(async () => {
const engine = new DocEngine(
new MemoryStorage(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
map.set('b', 2);
await vitest.waitUntil(() => {
return map.get('a') === 1;
});
})(),
]);
});
test('2 client share storage and eventBus (simulate different tabs in same browser)', async () => {
const server = new MiniServer();
const storage = new MemoryStorage();
await Promise.all([
(async () => {
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
map.set('a', 1);
await vitest.waitUntil(() => map.get('b') === 2);
})(),
(async () => {
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
map.set('b', 2);
await vitest.waitUntil(() => map.get('a') === 1);
})(),
]);
});
test('legacy data', async () => {
const server = new MiniServer();
const storage = new MemoryStorage();
{
// write legacy data to storage
const doc = new YDoc({ guid: 'a' });
const map = doc.getMap('aaa');
map.set('a', 1);
await storage.doc.set('a', encodeStateAsUpdate(doc));
}
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
// should load to ydoc and save to server
await vitest.waitUntil(
() => doc.getMap('aaa').get('a') === 1 && server.db.size === 1
);
});
});

View File

@@ -0,0 +1,43 @@
import { PriorityQueue } from './priority-queue';
export class AsyncPriorityQueue extends PriorityQueue {
private _resolveUpdate: (() => void) | null = null;
private _waitForUpdate: Promise<void> | null = null;
async asyncPop(abort?: AbortSignal): Promise<string> {
const update = this.pop();
if (update) {
return update;
} else {
if (!this._waitForUpdate) {
this._waitForUpdate = new Promise(resolve => {
this._resolveUpdate = resolve;
});
}
await Promise.race([
this._waitForUpdate,
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
return this.asyncPop(abort);
}
}
override push(id: string, priority: number = 0) {
super.push(id, priority);
if (this._resolveUpdate) {
const resolve = this._resolveUpdate;
this._resolveUpdate = null;
this._waitForUpdate = null;
resolve();
}
}
}

View File

@@ -0,0 +1,32 @@
export class ClockMap {
max: number = 0;
constructor(private readonly map: Map<string, number>) {
for (const value of map.values()) {
if (value > this.max) {
this.max = value;
}
}
}
get(id: string): number {
return this.map.get(id) ?? 0;
}
set(id: string, value: number) {
this.map.set(id, value);
if (value > this.max) {
this.max = value;
}
}
setIfBigger(id: string, value: number) {
if (value > this.get(id)) {
this.set(id, value);
}
}
clear() {
this.map.clear();
this.max = 0;
}
}

View File

@@ -0,0 +1,55 @@
export type DocEvent =
| {
type: 'ClientUpdateCommitted';
clientId: string;
docId: string;
update: Uint8Array;
seqNum: number;
}
| {
type: 'ServerUpdateCommitted';
docId: string;
update: Uint8Array;
clientId: string;
}
| {
type: 'LegacyClientUpdateCommitted';
docId: string;
update: Uint8Array;
};
export interface DocEventBus {
emit(event: DocEvent): void;
on(cb: (event: DocEvent) => void): () => void;
}
export class MemoryDocEventBus implements DocEventBus {
listeners = new Set<(event: DocEvent) => void>();
emit(event: DocEvent): void {
for (const listener of this.listeners) {
try {
listener(event);
} catch (e) {
console.error(e);
}
}
}
on(cb: (event: DocEvent) => void): () => void {
this.listeners.add(cb);
return () => {
this.listeners.delete(cb);
};
}
}
export class DocEventBusInner implements DocEventBus {
constructor(private readonly eventBusBehavior: DocEventBus) {}
emit(event: DocEvent) {
this.eventBusBehavior.emit(event);
}
on(cb: (event: DocEvent) => void) {
return this.eventBusBehavior.on(cb);
}
}

View File

@@ -0,0 +1,187 @@
import { DebugLogger } from '@affine/debug';
import { nanoid } from 'nanoid';
import { map } from 'rxjs';
import type { Doc as YDoc } from 'yjs';
import { createIdentifier } from '../../../di';
import { LiveData } from '../../../livedata';
import { MANUALLY_STOP } from '../../../utils';
import { DocEngineLocalPart } from './local';
import { DocEngineRemotePart } from './remote';
import type { DocServer } from './server';
import { type DocStorage, DocStorageInner } from './storage';
const logger = new DebugLogger('doc-engine');
export type { DocEvent, DocEventBus } from './event';
export { MemoryDocEventBus } from './event';
export type { DocServer } from './server';
export type { DocStorage } from './storage';
export {
MemoryStorage as MemoryDocStorage,
ReadonlyStorage as ReadonlyDocStorage,
} from './storage';
export const DocServerImpl = createIdentifier<DocServer>('DocServer');
export const DocStorageImpl = createIdentifier<DocStorage>('DocStorage');
export class DocEngine {
localPart: DocEngineLocalPart;
remotePart: DocEngineRemotePart | null;
storage: DocStorageInner;
engineState = LiveData.computed(get => {
const localState = get(this.localPart.engineState);
if (this.remotePart) {
const remoteState = get(this.remotePart?.engineState);
return {
total: remoteState.total,
syncing: remoteState.syncing,
saving: localState.syncing,
retrying: remoteState.retrying,
errorMessage: remoteState.errorMessage,
};
}
return {
total: localState.total,
syncing: localState.syncing,
saving: localState.syncing,
retrying: false,
errorMessage: null,
};
});
docState(docId: string) {
const localState = this.localPart.docState(docId);
const remoteState = this.remotePart?.docState(docId);
return LiveData.computed(get => {
const local = get(localState);
const remote = remoteState ? get(remoteState) : null;
return {
ready: local.ready,
saving: local.syncing,
syncing: local.syncing || remote?.syncing,
};
});
}
constructor(
storage: DocStorage,
private readonly server?: DocServer | null
) {
const clientId = nanoid();
this.storage = new DocStorageInner(storage);
this.localPart = new DocEngineLocalPart(clientId, this.storage);
this.remotePart = this.server
? new DocEngineRemotePart(clientId, this.storage, this.server)
: null;
}
abort = new AbortController();
start() {
this.abort.abort(MANUALLY_STOP);
this.abort = new AbortController();
Promise.all([
this.localPart.mainLoop(this.abort.signal),
this.remotePart?.mainLoop(this.abort.signal),
]).catch(err => {
if (err === MANUALLY_STOP) {
return;
}
logger.error('Doc engine error', err);
});
return this;
}
stop() {
this.abort.abort(MANUALLY_STOP);
}
async resetSyncStatus() {
this.stop();
await this.storage.clearSyncMetadata();
await this.storage.clearServerClock();
}
addDoc(doc: YDoc, withSubDocs = true) {
this.localPart.actions.addDoc(doc);
this.remotePart?.actions.addDoc(doc.guid);
if (withSubDocs) {
const subdocs = doc.getSubdocs();
for (const subdoc of subdocs) {
this.addDoc(subdoc, false);
}
doc.on('subdocs', ({ added }: { added: Set<YDoc> }) => {
for (const subdoc of added) {
this.addDoc(subdoc, false);
}
});
}
}
setPriority(docId: string, priority: number) {
this.localPart.setPriority(docId, priority);
this.remotePart?.setPriority(docId, priority);
}
/**
* ## Saved:
* YDoc changes have been saved to storage, and the browser can be safely closed without losing data.
*/
waitForSaved() {
return new Promise<void>(resolve => {
this.engineState
.pipe(map(state => state.saving === 0))
.subscribe(saved => {
if (saved) {
resolve();
}
});
});
}
/**
* ## Synced:
* is fully synchronized with the server
*/
waitForSynced() {
return new Promise<void>(resolve => {
this.engineState
.pipe(map(state => state.syncing === 0 && state.saving === 0))
.subscribe(synced => {
if (synced) {
resolve();
}
});
});
}
/**
* ## Ready:
*
* means that the doc has been loaded and the data can be modified.
* (is not force, you can still modify it if you know you are creating some new data)
*
* this is a temporary solution to deal with the yjs overwrite issue.
*
* if content is loaded from storage
* or if content is pulled from the server, it will be true, otherwise be false.
*
* For example, when opening a doc that is not in storage, ready = false until the content is pulled from the server.
*/
waitForReady(docId: string) {
return new Promise<void>(resolve => {
this.docState(docId)
.pipe(map(state => state.ready))
.subscribe(ready => {
if (ready) {
resolve();
}
});
});
}
}

View File

@@ -0,0 +1,302 @@
import { DebugLogger } from '@affine/debug';
import { Unreachable } from '@affine/env/constant';
import { groupBy } from 'lodash-es';
import { Observable, Subject } from 'rxjs';
import type { Doc as YDoc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, mergeUpdates } from 'yjs';
import { LiveData } from '../../../livedata';
import { throwIfAborted } from '../../../utils';
import { AsyncPriorityQueue } from './async-priority-queue';
import type { DocEvent } from './event';
import type { DocStorageInner } from './storage';
import { isEmptyUpdate } from './utils';
type Job =
| {
type: 'load';
docId: string;
}
| {
type: 'save';
docId: string;
update: Uint8Array;
}
| {
type: 'apply';
docId: string;
update: Uint8Array;
isInitialize: boolean;
};
const DOC_ENGINE_ORIGIN = 'doc-engine';
const logger = new DebugLogger('doc-engine:local');
export interface LocalEngineState {
total: number;
syncing: number;
}
export interface LocalDocState {
ready: boolean;
syncing: boolean;
}
/**
* never fail
*/
export class DocEngineLocalPart {
private readonly prioritySettings = new Map<string, number>();
private readonly statusUpdatedSubject = new Subject<string>();
private readonly status = {
docs: new Map<string, YDoc>(),
connectedDocs: new Set<string>(),
readyDocs: new Set<string>(),
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map<string, Job[]>(),
currentJob: null as { docId: string; jobs: Job[] } | null,
};
engineState = LiveData.from<LocalEngineState>(
new Observable(subscribe => {
const next = () => {
subscribe.next({
total: this.status.docs.size,
syncing: this.status.jobMap.size + (this.status.currentJob ? 1 : 0),
});
};
next();
return this.statusUpdatedSubject.subscribe(() => {
next();
});
}),
{ syncing: 0, total: 0 }
);
docState(docId: string) {
return LiveData.from<LocalDocState>(
new Observable(subscribe => {
const next = () => {
subscribe.next({
ready: this.status.readyDocs.has(docId) ?? false,
syncing:
(this.status.jobMap.get(docId)?.length ?? 0) > 0 ||
this.status.currentJob?.docId === docId,
});
};
next();
return this.statusUpdatedSubject.subscribe(updatedId => {
if (updatedId === docId) next();
});
}),
{ ready: false, syncing: false }
);
}
constructor(
private readonly clientId: string,
private readonly storage: DocStorageInner
) {}
async mainLoop(signal?: AbortSignal) {
const dispose = this.storage.eventBus.on(event => {
const handler = this.events[event.type];
if (handler) {
handler(event as any);
}
});
try {
// eslint-disable-next-line no-constant-condition
while (true) {
throwIfAborted(signal);
const docId = await this.status.jobDocQueue.asyncPop(signal);
const jobs = this.status.jobMap.get(docId);
this.status.jobMap.delete(docId);
if (!jobs) {
continue;
}
this.status.currentJob = { docId, jobs };
this.statusUpdatedSubject.next(docId);
const { apply, load, save } = groupBy(jobs, job => job.type) as {
[key in Job['type']]?: Job[];
};
if (load?.length) {
await this.jobs.load(load[0] as any, signal);
}
for (const applyJob of apply ?? []) {
await this.jobs.apply(applyJob as any, signal);
}
if (save?.length) {
await this.jobs.save(docId, save as any, signal);
}
this.status.currentJob = null;
this.statusUpdatedSubject.next(docId);
}
} finally {
dispose();
for (const docs of this.status.connectedDocs) {
const doc = this.status.docs.get(docs);
if (doc) {
doc.off('update', this.handleDocUpdate);
}
}
}
}
readonly actions = {
addDoc: (doc: YDoc) => {
this.schedule({
type: 'load',
docId: doc.guid,
});
this.status.docs.set(doc.guid, doc);
this.statusUpdatedSubject.next(doc.guid);
},
};
readonly jobs = {
load: async (job: Job & { type: 'load' }, signal?: AbortSignal) => {
const doc = this.status.docs.get(job.docId);
if (!doc) {
throw new Unreachable('doc not found');
}
const existingData = encodeStateAsUpdate(doc);
if (!isEmptyUpdate(existingData)) {
this.schedule({
type: 'save',
docId: doc.guid,
update: existingData,
});
}
// mark doc as loaded
doc.emit('sync', [true]);
doc.on('update', this.handleDocUpdate);
this.status.connectedDocs.add(job.docId);
this.statusUpdatedSubject.next(job.docId);
const docData = await this.storage.loadDocFromLocal(job.docId, signal);
if (!docData || isEmptyUpdate(docData)) {
return;
}
this.applyUpdate(job.docId, docData);
this.status.readyDocs.add(job.docId);
this.statusUpdatedSubject.next(job.docId);
},
save: async (
docId: string,
jobs: (Job & { type: 'save' })[],
signal?: AbortSignal
) => {
if (this.status.connectedDocs.has(docId)) {
const merged = mergeUpdates(
jobs.map(j => j.update).filter(update => !isEmptyUpdate(update))
);
const newSeqNum = await this.storage.commitDocAsClientUpdate(
docId,
merged,
signal
);
this.storage.eventBus.emit({
type: 'ClientUpdateCommitted',
seqNum: newSeqNum,
docId: docId,
clientId: this.clientId,
update: merged,
});
}
},
apply: async (job: Job & { type: 'apply' }, signal?: AbortSignal) => {
throwIfAborted(signal);
if (this.status.connectedDocs.has(job.docId)) {
this.applyUpdate(job.docId, job.update);
}
if (job.isInitialize && !isEmptyUpdate(job.update)) {
this.status.readyDocs.add(job.docId);
}
},
};
readonly events: {
[key in DocEvent['type']]?: (event: DocEvent & { type: key }) => void;
} = {
ServerUpdateCommitted: ({ docId, update, clientId }) => {
this.schedule({
type: 'apply',
docId,
update,
isInitialize: clientId === this.clientId,
});
},
ClientUpdateCommitted: ({ docId, update, clientId }) => {
if (clientId !== this.clientId) {
this.schedule({
type: 'apply',
docId,
update,
isInitialize: false,
});
}
},
LegacyClientUpdateCommitted: ({ docId, update }) => {
this.schedule({
type: 'save',
docId,
update,
});
},
};
handleDocUpdate = (update: Uint8Array, origin: any, doc: YDoc) => {
if (origin === DOC_ENGINE_ORIGIN) {
return;
}
this.schedule({
type: 'save',
docId: doc.guid,
update,
});
};
applyUpdate(docId: string, update: Uint8Array) {
const doc = this.status.docs.get(docId);
if (doc && !isEmptyUpdate(update)) {
try {
applyUpdate(doc, update, DOC_ENGINE_ORIGIN);
} catch (err) {
logger;
}
}
}
schedule(job: Job) {
const priority = this.prioritySettings.get(job.docId) ?? 0;
this.status.jobDocQueue.push(job.docId, priority);
const existingJobs = this.status.jobMap.get(job.docId) ?? [];
existingJobs.push(job);
this.status.jobMap.set(job.docId, existingJobs);
this.statusUpdatedSubject.next(job.docId);
}
setPriority(docId: string, priority: number) {
this.prioritySettings.set(docId, priority);
this.status.jobDocQueue.updatePriority(docId, priority);
}
}

View File

@@ -0,0 +1,69 @@
import { BinarySearchTree } from '@datastructures-js/binary-search-tree';
export class PriorityQueue {
tree = new BinarySearchTree<{ id: string; priority: number }>((a, b) => {
return a.priority === b.priority
? a.id === b.id
? 0
: a.id > b.id
? 1
: -1
: a.priority - b.priority;
});
priorityMap = new Map<string, number>();
push(id: string, priority: number = 0) {
const oldPriority = this.priorityMap.get(id);
if (oldPriority === priority) {
return;
}
if (oldPriority !== undefined) {
this.remove(id);
}
this.tree.insert({ id, priority });
this.priorityMap.set(id, priority);
}
pop() {
const node = this.tree.max();
if (!node) {
return null;
}
this.tree.removeNode(node);
const { id } = node.getValue();
this.priorityMap.delete(id);
return id;
}
remove(id: string, priority?: number) {
priority ??= this.priorityMap.get(id);
if (priority === undefined) {
return false;
}
const removed = this.tree.remove({ id, priority });
if (removed) {
this.priorityMap.delete(id);
}
return removed;
}
clear() {
this.tree.clear();
this.priorityMap.clear();
}
updatePriority(id: string, priority: number) {
if (this.remove(id)) {
this.push(id, priority);
}
}
get length() {
return this.tree.count;
}
}

View File

@@ -0,0 +1,545 @@
import { DebugLogger } from '@affine/debug';
import { remove } from 'lodash-es';
import { Observable, Subject } from 'rxjs';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';
import { LiveData } from '../../../livedata';
import { throwIfAborted } from '../../../utils';
import { AsyncPriorityQueue } from './async-priority-queue';
import { ClockMap } from './clock';
import type { DocEvent } from './event';
import type { DocServer } from './server';
import type { DocStorageInner } from './storage';
import { isEmptyUpdate } from './utils';
const logger = new DebugLogger('doc-engine:remote');
type Job =
| {
type: 'connect';
docId: string;
}
| {
type: 'push';
docId: string;
update: Uint8Array;
seqNum: number;
}
| {
type: 'pull';
docId: string;
}
| {
type: 'pullAndPush';
docId: string;
}
| {
type: 'save';
docId: string;
update?: Uint8Array;
serverClock: number;
};
export interface Status {
docs: Set<string>;
connectedDocs: Set<string>;
jobDocQueue: AsyncPriorityQueue;
jobMap: Map<string, Job[]>;
serverClocks: ClockMap;
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
}
export interface RemoteEngineState {
total: number;
syncing: number;
retrying: boolean;
errorMessage: string | null;
}
export interface RemoteDocState {
syncing: boolean;
}
export class DocEngineRemotePart {
private readonly prioritySettings = new Map<string, number>();
constructor(
private readonly clientId: string,
private readonly storage: DocStorageInner,
private readonly server: DocServer
) {}
private status: Status = {
docs: new Set<string>(),
connectedDocs: new Set<string>(),
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map(),
serverClocks: new ClockMap(new Map()),
syncing: false,
retrying: false,
errorMessage: null,
};
private readonly statusUpdatedSubject = new Subject<string | true>();
engineState = LiveData.from<RemoteEngineState>(
new Observable(subscribe => {
const next = () => {
if (!this.status.syncing) {
subscribe.next({
total: this.status.docs.size,
syncing: this.status.docs.size,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
}
const syncing = this.status.jobMap.size;
subscribe.next({
total: this.status.docs.size,
syncing: syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
};
next();
return this.statusUpdatedSubject.subscribe(() => {
next();
});
}),
{
syncing: 0,
total: 0,
retrying: false,
errorMessage: null,
}
);
docState(docId: string) {
return LiveData.from<RemoteDocState>(
new Observable(subscribe => {
const next = () => {
subscribe.next({
syncing:
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId),
});
};
next();
return this.statusUpdatedSubject.subscribe(updatedId => {
if (updatedId === true || updatedId === docId) next();
});
}),
{ syncing: false }
);
}
readonly jobs = {
connect: async (docId: string, signal?: AbortSignal) => {
const pushedSeqNum = await this.storage.loadDocSeqNumPushed(
docId,
signal
);
const seqNum = await this.storage.loadDocSeqNum(docId, signal);
if (pushedSeqNum === null || pushedSeqNum !== seqNum) {
await this.jobs.pullAndPush(docId, signal);
} else {
const pulled = await this.storage.loadDocServerClockPulled(docId);
if (pulled === null || pulled !== this.status.serverClocks.get(docId)) {
await this.jobs.pull(docId, signal);
}
}
this.status.connectedDocs.add(docId);
this.statusUpdatedSubject.next(docId);
},
push: async (
docId: string,
jobs: (Job & { type: 'push' })[],
signal?: AbortSignal
) => {
if (this.status.connectedDocs.has(docId)) {
const maxSeqNum = Math.max(...jobs.map(j => j.seqNum));
const pushedSeqNum =
(await this.storage.loadDocSeqNumPushed(docId, signal)) ?? 0;
if (maxSeqNum - pushedSeqNum === jobs.length) {
const merged = mergeUpdates(
jobs.map(j => j.update).filter(update => !isEmptyUpdate(update))
);
if (!isEmptyUpdate(merged)) {
const { serverClock } = await this.server.pushDoc(docId, merged);
this.schedule({
type: 'save',
docId,
serverClock,
});
}
await this.storage.saveDocPushedSeqNum(
docId,
{ add: jobs.length },
signal
);
} else {
// maybe other tab is modifying the doc, do full pull and push for safety
await this.jobs.pullAndPush(docId, signal);
}
}
},
pullAndPush: async (docId: string, signal?: AbortSignal) => {
const seqNum = await this.storage.loadDocSeqNum(docId, signal);
const data = await this.storage.loadDocFromLocal(docId, signal);
const stateVector =
data && !isEmptyUpdate(data)
? encodeStateVectorFromUpdate(data)
: new Uint8Array();
const serverData = await this.server.pullDoc(docId, stateVector);
if (serverData) {
const {
data: newData,
stateVector: serverStateVector,
serverClock,
} = serverData;
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
await this.storage.commitDocAsServerUpdate(
docId,
newData,
serverClock,
signal
);
this.storage.eventBus.emit({
type: 'ServerUpdateCommitted',
docId,
clientId: this.clientId,
update: newData,
});
const diff =
data && serverStateVector && serverStateVector.length > 0
? diffUpdate(data, serverStateVector)
: data;
if (diff && !isEmptyUpdate(diff)) {
const { serverClock } = await this.server.pushDoc(docId, diff);
this.schedule({
type: 'save',
docId,
serverClock,
});
}
await this.storage.saveDocPushedSeqNum(docId, seqNum, signal);
} else {
if (data && !isEmptyUpdate(data)) {
const { serverClock } = await this.server.pushDoc(docId, data);
await this.storage.saveDocServerClockPulled(
docId,
serverClock,
signal
);
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
}
await this.storage.saveDocPushedSeqNum(docId, seqNum, signal);
}
},
pull: async (docId: string, signal?: AbortSignal) => {
const data = await this.storage.loadDocFromLocal(docId, signal);
const stateVector =
data && !isEmptyUpdate(data)
? encodeStateVectorFromUpdate(data)
: new Uint8Array();
const serverDoc = await this.server.pullDoc(docId, stateVector);
if (!serverDoc) {
return;
}
const { data: newData, serverClock } = serverDoc;
await this.storage.commitDocAsServerUpdate(
docId,
newData,
serverClock,
signal
);
this.storage.eventBus.emit({
type: 'ServerUpdateCommitted',
docId,
clientId: this.clientId,
update: newData,
});
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
},
save: async (
docId: string,
jobs: (Job & { type: 'save' })[],
signal?: AbortSignal
) => {
const serverClock = jobs.reduce((a, b) => Math.max(a, b.serverClock), 0);
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
if (this.status.connectedDocs.has(docId)) {
const data = jobs
.map(j => j.update)
.filter((update): update is Uint8Array =>
update ? !isEmptyUpdate(update) : false
);
const update = data.length > 0 ? mergeUpdates(data) : new Uint8Array();
await this.storage.commitDocAsServerUpdate(
docId,
update,
serverClock,
signal
);
this.storage.eventBus.emit({
type: 'ServerUpdateCommitted',
docId,
clientId: this.clientId,
update,
});
}
},
};
readonly actions = {
updateServerClock: (docId: string, serverClock: number) => {
this.status.serverClocks.setIfBigger(docId, serverClock);
},
addDoc: (docId: string) => {
if (!this.status.docs.has(docId)) {
this.status.docs.add(docId);
this.statusUpdatedSubject.next(docId);
this.schedule({
type: 'connect',
docId,
});
}
},
};
readonly events: {
[key in DocEvent['type']]?: (event: DocEvent & { type: key }) => void;
} = {
ClientUpdateCommitted: ({ clientId, docId, seqNum, update }) => {
if (clientId !== this.clientId) {
return;
}
this.schedule({
type: 'push',
docId,
update,
seqNum,
});
},
};
async mainLoop(signal?: AbortSignal) {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
this.status.retrying = false;
await this.retryLoop(signal);
} catch (err) {
if (signal?.aborted) {
return;
}
logger.error('Remote sync error, retry in 5s', err);
this.status.errorMessage =
err instanceof Error ? err.message : `${err}`;
this.statusUpdatedSubject.next(true);
} finally {
this.status = {
docs: this.status.docs,
connectedDocs: new Set<string>(),
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map(),
serverClocks: new ClockMap(new Map()),
syncing: false,
retrying: true,
errorMessage: this.status.errorMessage,
};
this.statusUpdatedSubject.next(true);
}
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 5 * 1000);
}),
new Promise((_, reject) => {
// exit if manually stopped
if (signal?.aborted) {
reject(signal.reason);
}
signal?.addEventListener('abort', () => {
reject(signal.reason);
});
}),
]);
}
}
async retryLoop(signal?: AbortSignal) {
throwIfAborted(signal);
const abort = new AbortController();
signal?.addEventListener('abort', reason => {
abort.abort(reason);
});
signal = abort.signal;
const disposes: (() => void)[] = [];
try {
disposes.push(
this.storage.eventBus.on(event => {
const handler = this.events[event.type];
handler?.(event as any);
})
);
throwIfAborted(signal);
for (const doc of this.status.docs) {
this.schedule({
type: 'connect',
docId: doc,
});
}
logger.info('Remote sync started');
this.status.syncing = true;
this.statusUpdatedSubject.next(true);
this.server.onInterrupted(reason => {
abort.abort(reason);
});
await Promise.race([
this.server.waitForConnectingServer(signal),
new Promise<void>((_, reject) => {
setTimeout(() => {
reject(new Error('Connect to server timeout'));
}, 1000 * 30);
}),
new Promise((_, reject) => {
signal?.addEventListener('abort', reason => {
reject(reason);
});
}),
]);
throwIfAborted(signal);
disposes.push(
await this.server.subscribeAllDocs(({ docId, data, serverClock }) => {
this.schedule({
type: 'save',
docId: docId,
serverClock,
update: data,
});
})
);
const cachedClocks = await this.storage.loadServerClock(signal);
for (const [id, v] of cachedClocks) {
this.actions.updateServerClock(id, v);
}
const maxClockValue = this.status.serverClocks.max;
const newClocks = await this.server.loadServerClock(maxClockValue);
for (const [id, v] of newClocks) {
this.actions.updateServerClock(id, v);
}
await this.storage.saveServerClock(newClocks, signal);
// eslint-disable-next-line no-constant-condition
while (true) {
throwIfAborted(signal);
const docId = await this.status.jobDocQueue.asyncPop(signal);
// eslint-disable-next-line no-constant-condition
while (true) {
const jobs = this.status.jobMap.get(docId);
if (!jobs || jobs.length === 0) {
this.status.jobMap.delete(docId);
this.statusUpdatedSubject.next(docId);
break;
}
const connect = remove(jobs, j => j.type === 'connect');
if (connect && connect.length > 0) {
await this.jobs.connect(docId, signal);
continue;
}
const pullAndPush = remove(jobs, j => j.type === 'pullAndPush');
if (pullAndPush && pullAndPush.length > 0) {
await this.jobs.pullAndPush(docId, signal);
continue;
}
const pull = remove(jobs, j => j.type === 'pull');
if (pull && pull.length > 0) {
await this.jobs.pull(docId, signal);
continue;
}
const push = remove(jobs, j => j.type === 'push');
if (push && push.length > 0) {
await this.jobs.push(
docId,
push as (Job & { type: 'push' })[],
signal
);
continue;
}
const save = remove(jobs, j => j.type === 'save');
if (save && save.length > 0) {
await this.jobs.save(
docId,
save as (Job & { type: 'save' })[],
signal
);
continue;
}
}
}
} finally {
for (const dispose of disposes) {
dispose();
}
try {
this.server.disconnectServer();
} catch (err) {
logger.error('Error on disconnect server', err);
}
this.status.syncing = false;
logger.info('Remote sync ended');
}
}
schedule(job: Job) {
const priority = this.prioritySettings.get(job.docId) ?? 0;
this.status.jobDocQueue.push(job.docId, priority);
const existingJobs = this.status.jobMap.get(job.docId) ?? [];
existingJobs.push(job);
this.status.jobMap.set(job.docId, existingJobs);
this.statusUpdatedSubject.next(job.docId);
}
setPriority(docId: string, priority: number) {
this.prioritySettings.set(docId, priority);
this.status.jobDocQueue.updatePriority(docId, priority);
}
}

View File

@@ -0,0 +1,26 @@
export interface DocServer {
pullDoc(
docId: string,
stateVector: Uint8Array
): Promise<{
data: Uint8Array;
serverClock: number;
stateVector?: Uint8Array;
} | null>;
pushDoc(docId: string, data: Uint8Array): Promise<{ serverClock: number }>;
loadServerClock(after: number): Promise<Map<string, number>>;
subscribeAllDocs(
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void
): Promise<() => void>;
waitForConnectingServer(signal: AbortSignal): Promise<void>;
disconnectServer(): void;
onInterrupted(cb: (reason: string) => void): void;
}

View File

@@ -0,0 +1,364 @@
import {
type ByteKV,
type Memento,
MemoryMemento,
ReadonlyByteKV,
wrapMemento,
} from '../../../storage';
import { AsyncLock, mergeUpdates, throwIfAborted } from '../../../utils';
import type { DocEventBus } from '.';
import { DocEventBusInner, MemoryDocEventBus } from './event';
import { isEmptyUpdate } from './utils';
export interface DocStorage {
eventBus: DocEventBus;
doc: ByteKV;
syncMetadata: ByteKV;
serverClock: ByteKV;
}
const Keys = {
SeqNum: (docId: string) => `${docId}:seqNum`,
SeqNumPushed: (docId: string) => `${docId}:seqNumPushed`,
ServerClockPulled: (docId: string) => `${docId}:serverClockPulled`,
UpdatedTime: (docId: string) => `${docId}:updateTime`,
};
const Values = {
UInt64: {
parse: (buffer: Uint8Array) => {
const view = new DataView(buffer.buffer);
return Number(view.getBigUint64(0, false));
},
serialize: (value: number) => {
const buffer = new ArrayBuffer(8);
const view = new DataView(buffer);
view.setBigUint64(0, BigInt(value), false);
return new Uint8Array(buffer);
},
},
};
export class DocStorageInner {
public readonly eventBus = new DocEventBusInner(this.behavior.eventBus);
constructor(public readonly behavior: DocStorage) {}
async loadServerClock(signal?: AbortSignal): Promise<Map<string, number>> {
throwIfAborted(signal);
const list = await this.behavior.serverClock.keys();
const map = new Map<string, number>();
for (const key of list) {
const docId = key;
const value = await this.behavior.serverClock.get(key);
if (value) {
map.set(docId, Values.UInt64.parse(value));
}
}
return map;
}
async saveServerClock(map: Map<string, number>, signal?: AbortSignal) {
throwIfAborted(signal);
await this.behavior.serverClock.transaction(async transaction => {
for (const [docId, value] of map) {
const key = docId;
const oldBuffer = await transaction.get(key);
const old = oldBuffer ? Values.UInt64.parse(oldBuffer) : 0;
if (old < value) {
await transaction.set(key, Values.UInt64.serialize(value));
}
}
});
}
async loadDocSeqNum(docId: string, signal?: AbortSignal) {
throwIfAborted(signal);
const bytes = await this.behavior.syncMetadata.get(Keys.SeqNum(docId));
if (bytes === null) {
return 0;
}
return Values.UInt64.parse(bytes);
}
async saveDocSeqNum(
docId: string,
seqNum: number | true,
signal?: AbortSignal
) {
throwIfAborted(signal);
return await this.behavior.syncMetadata.transaction(async transaction => {
const key = Keys.SeqNum(docId);
const oldBytes = await transaction.get(key);
const old = oldBytes ? Values.UInt64.parse(oldBytes) : 0;
if (seqNum === true) {
await transaction.set(key, Values.UInt64.serialize(old + 1));
return old + 1;
}
if (old < seqNum) {
await transaction.set(key, Values.UInt64.serialize(seqNum));
return seqNum;
}
return old;
});
}
async loadDocSeqNumPushed(docId: string, signal?: AbortSignal) {
throwIfAborted(signal);
const bytes = await this.behavior.syncMetadata.get(
Keys.SeqNumPushed(docId)
);
if (bytes === null) {
return null;
}
return Values.UInt64.parse(bytes);
}
async saveDocPushedSeqNum(
docId: string,
seqNum: number | { add: number },
signal?: AbortSignal
) {
throwIfAborted(signal);
await this.behavior.syncMetadata.transaction(async transaction => {
const key = Keys.SeqNumPushed(docId);
const oldBytes = await transaction.get(key);
const old = oldBytes ? Values.UInt64.parse(oldBytes) : null;
if (typeof seqNum === 'object') {
return transaction.set(
key,
Values.UInt64.serialize((old ?? 0) + seqNum.add)
);
}
if (old === null || old < seqNum) {
return transaction.set(key, Values.UInt64.serialize(seqNum));
}
});
}
async loadDocServerClockPulled(docId: string, signal?: AbortSignal) {
throwIfAborted(signal);
const bytes = await this.behavior.syncMetadata.get(
Keys.ServerClockPulled(docId)
);
if (bytes === null) {
return null;
}
return bytes ? Values.UInt64.parse(bytes) : 0;
}
async saveDocServerClockPulled(
docId: string,
serverClock: number,
signal?: AbortSignal
) {
throwIfAborted(signal);
await this.behavior.syncMetadata.transaction(async transaction => {
const oldBytes = await transaction.get(Keys.ServerClockPulled(docId));
const old = oldBytes ? Values.UInt64.parse(oldBytes) : null;
if (old === null || old < serverClock) {
await transaction.set(
Keys.ServerClockPulled(docId),
Values.UInt64.serialize(serverClock)
);
}
});
}
async loadDocFromLocal(docId: string, signal?: AbortSignal) {
throwIfAborted(signal);
return await this.behavior.doc.get(docId);
}
/**
* Confirm that server updates are applied in the order they occur!!!
*/
async commitDocAsServerUpdate(
docId: string,
update: Uint8Array,
serverClock: number,
signal?: AbortSignal
) {
throwIfAborted(signal);
await this.behavior.doc.transaction(async tx => {
const data = await tx.get(docId);
await tx.set(
docId,
data && !isEmptyUpdate(data)
? !isEmptyUpdate(update)
? mergeUpdates([data, update])
: data
: update
);
});
await this.saveDocServerClockPulled(docId, serverClock);
}
async commitDocAsClientUpdate(
docId: string,
update: Uint8Array,
signal?: AbortSignal
) {
throwIfAborted(signal);
await this.behavior.doc.transaction(async tx => {
const data = await tx.get(docId);
await tx.set(
docId,
data && !isEmptyUpdate(data)
? !isEmptyUpdate(update)
? mergeUpdates([data, update])
: data
: update
);
});
return await this.saveDocSeqNum(docId, true);
}
clearSyncMetadata() {
return this.behavior.syncMetadata.clear();
}
async clearServerClock() {
return this.behavior.serverClock.clear();
}
}
export class ReadonlyStorage implements DocStorage {
constructor(
private readonly map: {
[key: string]: Uint8Array;
}
) {}
eventBus = new MemoryDocEventBus();
doc = new ReadonlyByteKV(new Map(Object.entries(this.map)));
serverClock = new ReadonlyByteKV();
syncMetadata = new ReadonlyByteKV();
}
export class MemoryStorage implements DocStorage {
constructor(private readonly memo: Memento = new MemoryMemento()) {}
eventBus = new MemoryDocEventBus();
lock = new AsyncLock();
readonly docDb = wrapMemento(this.memo, 'doc:');
readonly syncMetadataDb = wrapMemento(this.memo, 'syncMetadata:');
readonly serverClockDb = wrapMemento(this.memo, 'serverClock:');
readonly doc = {
transaction: async cb => {
using _lock = await this.lock.acquire();
return await cb({
get: async key => {
return this.docDb.get(key) ?? null;
},
set: async (key, value) => {
this.docDb.set(key, value);
},
keys: async () => {
return Array.from(this.docDb.keys());
},
clear: () => {
this.docDb.clear();
},
del: key => {
this.docDb.del(key);
},
});
},
get(key) {
return this.transaction(async tx => tx.get(key));
},
set(key, value) {
return this.transaction(async tx => tx.set(key, value));
},
keys() {
return this.transaction(async tx => tx.keys());
},
clear() {
return this.transaction(async tx => tx.clear());
},
del(key) {
return this.transaction(async tx => tx.del(key));
},
} satisfies ByteKV;
readonly syncMetadata = {
transaction: async cb => {
using _lock = await this.lock.acquire();
return await cb({
get: async key => {
return this.syncMetadataDb.get(key) ?? null;
},
set: async (key, value) => {
this.syncMetadataDb.set(key, value);
},
keys: async () => {
return Array.from(this.syncMetadataDb.keys());
},
clear: () => {
this.syncMetadataDb.clear();
},
del: key => {
this.syncMetadataDb.del(key);
},
});
},
get(key) {
return this.transaction(async tx => tx.get(key));
},
set(key, value) {
return this.transaction(async tx => tx.set(key, value));
},
keys() {
return this.transaction(async tx => tx.keys());
},
clear() {
return this.transaction(async tx => tx.clear());
},
del(key) {
return this.transaction(async tx => tx.del(key));
},
} satisfies ByteKV;
readonly serverClock = {
transaction: async cb => {
using _lock = await this.lock.acquire();
return await cb({
get: async key => {
return this.serverClockDb.get(key) ?? null;
},
set: async (key, value) => {
this.serverClockDb.set(key, value);
},
keys: async () => {
return Array.from(this.serverClockDb.keys());
},
clear: () => {
this.serverClockDb.clear();
},
del: key => {
this.serverClockDb.del(key);
},
});
},
get(key) {
return this.transaction(async tx => tx.get(key));
},
set(key, value) {
return this.transaction(async tx => tx.set(key, value));
},
keys() {
return this.transaction(async tx => tx.keys());
},
clear() {
return this.transaction(async tx => tx.clear());
},
del(key) {
return this.transaction(async tx => tx.del(key));
},
} satisfies ByteKV;
}

View File

@@ -0,0 +1,6 @@
export function isEmptyUpdate(binary: Uint8Array) {
return (
binary.byteLength === 0 ||
(binary.byteLength === 2 && binary[0] === 0 && binary[1] === 0)
);
}

View File

@@ -1,13 +1,12 @@
import { Slot } from '@blocksuite/global/utils';
import type { Doc as YDoc } from 'yjs';
import { throwIfAborted } from '../../utils/throw-if-aborted';
import type { AwarenessEngine } from './awareness';
import type { BlobEngine, BlobStatus } from './blob';
import type { SyncEngine } from './sync';
import { type SyncEngineStatus } from './sync';
import type { DocEngine } from './doc';
export interface WorkspaceEngineStatus {
sync: SyncEngineStatus;
blob: BlobStatus;
}
@@ -31,51 +30,57 @@ export class WorkspaceEngine {
constructor(
public blob: BlobEngine,
public sync: SyncEngine,
public awareness: AwarenessEngine
public doc: DocEngine,
public awareness: AwarenessEngine,
private readonly yDoc: YDoc
) {
this._status = {
sync: sync.status,
blob: blob.status,
};
sync.onStatusChange.on(status => {
this.status = {
sync: status,
blob: blob.status,
};
});
blob.onStatusChange.on(status => {
this.status = {
sync: sync.status,
blob: status,
};
});
this.doc.addDoc(yDoc);
}
start() {
this.sync.start();
this.doc.start();
this.awareness.connect();
this.blob.start();
}
canGracefulStop() {
return this.sync.canGracefulStop();
return this.doc.engineState.value.saving === 0;
}
async waitForGracefulStop(abort?: AbortSignal) {
await this.sync.waitForGracefulStop(abort);
await this.doc.waitForSaved();
throwIfAborted(abort);
this.forceStop();
}
forceStop() {
this.sync.forceStop();
this.doc.stop();
this.awareness.disconnect();
this.blob.stop();
}
docEngineState = this.doc.engineState;
rootDocState = this.doc.docState(this.yDoc.guid);
waitForSynced() {
return this.doc.waitForSynced();
}
waitForRootDocReady() {
return this.doc.waitForReady(this.yDoc.guid);
}
}
export * from './awareness';
export * from './blob';
export * from './doc';
export * from './error';
export * from './sync';

View File

@@ -1,167 +0,0 @@
import { WorkspaceFlavour } from '@affine/env/workspace';
import { DocCollection } from '@blocksuite/store';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { Doc } from 'yjs';
import { MemoryMemento } from '../../../../storage';
import { globalBlockSuiteSchema } from '../../../global-schema';
import { TestingSyncStorage } from '../../../testing';
import { SyncEngineStep, SyncPeerStep } from '../consts';
import { SyncEngine } from '../engine';
import { createTestStorage } from './test-storage';
beforeEach(() => {
vi.useFakeTimers({ toFake: ['requestIdleCallback'] });
});
const testMeta = {
id: 'test',
flavour: WorkspaceFlavour.LOCAL,
};
describe('SyncEngine', () => {
test('basic - indexeddb', async () => {
const storage = new MemoryMemento();
const storage1 = new MemoryMemento();
const storage2 = new MemoryMemento();
let prev: any;
{
const docCollection = new DocCollection({
id: 'test',
schema: globalBlockSuiteSchema,
});
const syncEngine = new SyncEngine(
docCollection.doc,
new TestingSyncStorage(testMeta, storage),
[
new TestingSyncStorage(testMeta, storage1),
new TestingSyncStorage(testMeta, storage2),
]
);
syncEngine.start();
const page = docCollection.createDoc({
id: 'page0',
});
page.load();
const pageBlockId = page.addBlock(
'affine:page' as keyof BlockSuite.BlockModels,
{
title: new page.Text(''),
}
);
page.addBlock(
'affine:surface' as keyof BlockSuite.BlockModels,
{},
pageBlockId
);
const frameId = page.addBlock(
'affine:note' as keyof BlockSuite.BlockModels,
{},
pageBlockId
);
page.addBlock(
'affine:paragraph' as keyof BlockSuite.BlockModels,
{},
frameId
);
await syncEngine.waitForSynced();
syncEngine.forceStop();
prev = docCollection.doc.toJSON();
}
for (const current of [storage, storage1, storage2]) {
const docCollection = new DocCollection({
id: 'test',
schema: globalBlockSuiteSchema,
});
const syncEngine = new SyncEngine(
docCollection.doc,
new TestingSyncStorage(testMeta, current),
[]
);
syncEngine.start();
await syncEngine.waitForSynced();
expect(docCollection.doc.toJSON()).toEqual({
...prev,
});
syncEngine.forceStop();
}
});
test('status', async () => {
const ydoc = new Doc({ guid: 'test' });
const storage1 = new MemoryMemento();
const storage2 = new MemoryMemento();
const localStorage = createTestStorage(
new TestingSyncStorage(testMeta, storage1)
);
const remoteStorage = createTestStorage(
new TestingSyncStorage(testMeta, storage2)
);
localStorage.pausePull();
localStorage.pausePush();
remoteStorage.pausePull();
remoteStorage.pausePush();
const syncEngine = new SyncEngine(ydoc, localStorage, [remoteStorage]);
expect(syncEngine.status.step).toEqual(SyncEngineStep.Stopped);
syncEngine.start();
await vi.waitFor(() => {
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(
SyncPeerStep.LoadingRootDoc
);
});
localStorage.resumePull();
await vi.waitFor(() => {
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(
SyncPeerStep.LoadingRootDoc
);
});
remoteStorage.resumePull();
await vi.waitFor(() => {
expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
});
ydoc.getArray('test').insert(0, [1, 2, 3]);
await vi.waitFor(() => {
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Syncing);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing);
});
localStorage.resumePush();
await vi.waitFor(() => {
expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing);
});
remoteStorage.resumePush();
await vi.waitFor(() => {
expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced);
expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced);
expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced);
});
});
});

View File

@@ -1,115 +0,0 @@
import { WorkspaceFlavour } from '@affine/env/workspace';
import { DocCollection } from '@blocksuite/store';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { MemoryMemento } from '../../../../storage';
import { globalBlockSuiteSchema } from '../../../global-schema';
import { TestingSyncStorage } from '../../../testing';
import { SyncPeerStep } from '../consts';
import { SyncPeer } from '../peer';
beforeEach(() => {
vi.useFakeTimers({ toFake: ['requestIdleCallback'] });
});
const testMeta = {
id: 'test',
flavour: WorkspaceFlavour.LOCAL,
};
describe('SyncPeer', () => {
test('basic - indexeddb', async () => {
const storage = new MemoryMemento();
let prev: any;
{
const docCollection = new DocCollection({
id: 'test',
schema: globalBlockSuiteSchema,
});
const syncPeer = new SyncPeer(
docCollection.doc,
new TestingSyncStorage(testMeta, storage)
);
await syncPeer.waitForLoaded();
const page = docCollection.createDoc({
id: 'page0',
});
page.load();
const pageBlockId = page.addBlock(
'affine:page' as keyof BlockSuite.BlockModels,
{
title: new page.Text(''),
}
);
page.addBlock(
'affine:surface' as keyof BlockSuite.BlockModels,
{},
pageBlockId
);
const frameId = page.addBlock(
'affine:note' as keyof BlockSuite.BlockModels,
{},
pageBlockId
);
page.addBlock(
'affine:paragraph' as keyof BlockSuite.BlockModels,
{},
frameId
);
await syncPeer.waitForSynced();
syncPeer.stop();
prev = docCollection.doc.toJSON();
}
{
const docCollection = new DocCollection({
id: 'test',
schema: globalBlockSuiteSchema,
});
const syncPeer = new SyncPeer(
docCollection.doc,
new TestingSyncStorage(testMeta, storage)
);
await syncPeer.waitForSynced();
expect(docCollection.doc.toJSON()).toEqual({
...prev,
});
syncPeer.stop();
}
});
test('status', async () => {
const storage = new MemoryMemento();
const docCollection = new DocCollection({
id: 'test',
schema: globalBlockSuiteSchema,
});
const syncPeer = new SyncPeer(
docCollection.doc,
new TestingSyncStorage(testMeta, storage)
);
expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingRootDoc);
await syncPeer.waitForSynced();
expect(syncPeer.status.step).toBe(SyncPeerStep.Synced);
const page = docCollection.createDoc({
id: 'page0',
});
expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingSubDoc);
page.load();
await syncPeer.waitForSynced();
page.addBlock('affine:page' as keyof BlockSuite.BlockModels, {
title: new page.Text(''),
});
expect(syncPeer.status.step).toBe(SyncPeerStep.Syncing);
syncPeer.stop();
});
});

View File

@@ -1,42 +0,0 @@
import type { SyncStorage } from '../storage';
export function createTestStorage(origin: SyncStorage) {
const controler = {
pausedPull: Promise.resolve(),
resumePull: () => {},
pausedPush: Promise.resolve(),
resumePush: () => {},
};
return {
name: `${origin.name}(testing)`,
pull(docId: string, state: Uint8Array) {
return controler.pausedPull.then(() => origin.pull(docId, state));
},
push(docId: string, data: Uint8Array) {
return controler.pausedPush.then(() => origin.push(docId, data));
},
subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
) {
return origin.subscribe(cb, disconnect);
},
pausePull() {
controler.pausedPull = new Promise(resolve => {
controler.resumePull = resolve;
});
},
resumePull() {
controler.resumePull?.();
},
pausePush() {
controler.pausedPush = new Promise(resolve => {
controler.resumePush = resolve;
});
},
resumePush() {
controler.resumePush?.();
},
};
}

View File

@@ -1,23 +0,0 @@
export enum SyncEngineStep {
// error
Rejected = -1,
// in progress
Stopped = 0,
Syncing = 1,
// finished
Synced = 2,
}
export enum SyncPeerStep {
// error
VersionRejected = -1,
// in progress
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Loaded = 4.5,
Syncing = 5,
// finished
Synced = 6,
}

View File

@@ -1,316 +0,0 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import { Observable } from 'rxjs';
import type { Doc } from 'yjs';
import { createIdentifier } from '../../../di';
import { LiveData } from '../../../livedata';
import { SharedPriorityTarget } from '../../../utils/async-queue';
import { MANUALLY_STOP, throwIfAborted } from '../../../utils/throw-if-aborted';
import { SyncEngineStep, SyncPeerStep } from './consts';
import { SyncPeer, type SyncPeerStatus } from './peer';
import { type SyncStorage } from './storage';
export interface SyncEngineStatus {
step: SyncEngineStep;
local: SyncPeerStatus | null;
remotes: (SyncPeerStatus | null)[];
error: string | null;
retrying: boolean;
}
export const LocalSyncStorage =
createIdentifier<SyncStorage>('LocalSyncStorage');
export const RemoteSyncStorage =
createIdentifier<SyncStorage>('RemoteSyncStorage');
/**
* # SyncEngine
*
* ```
* ┌────────────┐
* │ SyncEngine │
* └─────┬──────┘
* │
* ▼
* ┌────────────┐
* │ SyncPeer │
* ┌─────────┤ local ├─────────┐
* │ └─────┬──────┘ │
* │ │ │
* ▼ ▼ ▼
* ┌────────────┐ ┌────────────┐ ┌────────────┐
* │ SyncPeer │ │ SyncPeer │ │ SyncPeer │
* │ Remote │ │ Remote │ │ Remote │
* └────────────┘ └────────────┘ └────────────┘
* ```
*
* Sync engine manage sync peers
*
* Sync steps:
* 1. start local sync
* 2. wait for local sync complete
* 3. start remote sync
* 4. continuously sync local and remote
*/
export class SyncEngine {
get rootDocId() {
return this.rootDoc.guid;
}
logger = new DebugLogger('affine:sync-engine:' + this.rootDocId);
private _status: SyncEngineStatus;
onStatusChange = new Slot<SyncEngineStatus>();
private set status(s: SyncEngineStatus) {
this.logger.debug('status change', s);
this._status = s;
this.onStatusChange.emit(s);
}
isRootDocLoaded = LiveData.from(
new Observable<boolean>(observer => {
observer.next(
[this.status?.local, ...(this.status?.remotes ?? [])].some(
p => p?.rootDocLoaded === true
)
);
this.onStatusChange.on(status => {
observer.next(
[status?.local, ...(status?.remotes ?? [])].some(
p => p?.rootDocLoaded === true
)
);
});
}),
false
);
priorityTarget = new SharedPriorityTarget();
get status() {
return this._status;
}
private abort = new AbortController();
constructor(
private readonly rootDoc: Doc,
private readonly local: SyncStorage,
private readonly remotes: SyncStorage[]
) {
this._status = {
step: SyncEngineStep.Stopped,
local: null,
remotes: remotes.map(() => null),
error: null,
retrying: false,
};
}
start() {
if (this.status.step !== SyncEngineStep.Stopped) {
this.forceStop();
}
this.abort = new AbortController();
this.sync(this.abort.signal).catch(err => {
// should never reach here
this.logger.error(err);
});
}
canGracefulStop() {
return !!this.status.local && this.status.local.pendingPushUpdates === 0;
}
async waitForGracefulStop(abort?: AbortSignal) {
await Promise.race([
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
new Promise<void>(resolve => {
this.onStatusChange.on(() => {
if (this.canGracefulStop()) {
resolve();
}
});
}),
]);
throwIfAborted(abort);
this.forceStop();
}
forceStop() {
this.abort.abort(MANUALLY_STOP);
this._status = {
step: SyncEngineStep.Stopped,
local: null,
remotes: this.remotes.map(() => null),
error: 'Sync progress manually stopped',
retrying: false,
};
}
// main sync process, should never return until abort
async sync(signal: AbortSignal) {
const state: {
localPeer: SyncPeer | null;
remotePeers: (SyncPeer | null)[];
} = {
localPeer: null,
remotePeers: this.remotes.map(() => null),
};
const cleanUp: (() => void)[] = [];
try {
// Step 1: start local sync peer
state.localPeer = new SyncPeer(
this.rootDoc,
this.local,
this.priorityTarget
);
cleanUp.push(
state.localPeer.onStatusChange.on(() => {
if (!signal.aborted)
this.updateSyncingState(state.localPeer, state.remotePeers);
}).dispose
);
this.updateSyncingState(state.localPeer, state.remotePeers);
// Step 2: wait for local sync complete
await state.localPeer.waitForLoaded(signal);
// Step 3: start remote sync peer
state.remotePeers = this.remotes.map(remote => {
const peer = new SyncPeer(this.rootDoc, remote, this.priorityTarget);
cleanUp.push(
peer.onStatusChange.on(() => {
if (!signal.aborted)
this.updateSyncingState(state.localPeer, state.remotePeers);
}).dispose
);
return peer;
});
this.updateSyncingState(state.localPeer, state.remotePeers);
// Step 4: continuously sync local and remote
// wait for abort
await new Promise((_, reject) => {
if (signal.aborted) {
reject(signal.reason);
}
signal.addEventListener('abort', () => {
reject(signal.reason);
});
});
} catch (error) {
if (error === MANUALLY_STOP || signal.aborted) {
return;
}
throw error;
} finally {
// stop peers
state.localPeer?.stop();
for (const remotePeer of state.remotePeers) {
remotePeer?.stop();
}
for (const clean of cleanUp) {
clean();
}
}
}
updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) {
let step = SyncEngineStep.Synced;
let error = null;
const allPeer = [local, ...remotes];
for (const peer of allPeer) {
if (!peer || peer.status.step !== SyncPeerStep.Synced) {
if (peer && peer.status.step <= 0) {
// step < 0 means reject connection by server with some reason
// so the data may be out of date
step = SyncEngineStep.Rejected;
error = peer.status.lastError;
} else {
step = SyncEngineStep.Syncing;
}
break;
}
}
this.status = {
step,
local: local?.status ?? null,
remotes: remotes.map(peer => peer?.status ?? null),
error,
retrying: allPeer.some(
peer => peer?.status.step === SyncPeerStep.Retrying
),
};
}
async waitForSynced(abort?: AbortSignal) {
if (this.status.step === SyncEngineStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step === SyncEngineStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForLoadedRootDoc(abort?: AbortSignal) {
function isLoadedRootDoc(status: SyncEngineStatus) {
return ![status.local, ...status.remotes].some(
peer => !peer || peer.step <= SyncPeerStep.LoadingRootDoc
);
}
if (isLoadedRootDoc(this.status)) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (isLoadedRootDoc(status)) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
setPriorityRule(target: ((id: string) => boolean) | null) {
this.priorityTarget.priorityRule = target;
}
}

View File

@@ -1,20 +0,0 @@
/**
*
* **SyncEngine**
*
* Manages one local storage and multiple remote storages.
*
* Responsible for creating SyncPeers for synchronization, following the local-first strategy.
*
* **SyncPeer**
*
* Responsible for synchronizing a single storage with Y.Doc.
*
* Carries the main synchronization logic.
*
*/
export * from './consts';
export * from './engine';
export * from './peer';
export * from './storage';

View File

@@ -1,464 +0,0 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import { isEqual } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import {
PriorityAsyncQueue,
SharedPriorityTarget,
} from '../../../utils/async-queue';
import { mergeUpdates } from '../../../utils/merge-updates';
import { MANUALLY_STOP, throwIfAborted } from '../../../utils/throw-if-aborted';
import { SyncPeerStep } from './consts';
import type { SyncStorage } from './storage';
export interface SyncPeerStatus {
step: SyncPeerStep;
totalDocs: number;
loadedDocs: number;
pendingPullUpdates: number;
pendingPushUpdates: number;
lastError: string | null;
rootDocLoaded: boolean;
}
/**
* # SyncPeer
* A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs.
*
* ```
* ┌─────┐
* │Start│
* └──┬──┘
* │
* ┌──────┐ ┌─────▼──────┐ ┌────┐
* │listen◄─────┤pull rootdoc│ │peer│
* └──┬───┘ └─────┬──────┘ └──┬─┘
* │ │ onLoad() │
* ┌──▼───┐ ┌─────▼──────┐ ┌────▼────┐
* │listen◄─────┤pull subdocs│ │subscribe│
* └──┬───┘ └─────┬──────┘ └────┬────┘
* │ │ onReady() │
* ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐
* │queue├──────►apply updates◄───────┤queue│
* └─────┘ └─────────────┘ └─────┘
* ```
*
* listen: listen for updates from ydoc, typically from user modifications.
* subscribe: listen for updates from storage, typically from other users.
*
*/
export class SyncPeer {
private _status: SyncPeerStatus = {
step: SyncPeerStep.LoadingRootDoc,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
lastError: null,
rootDocLoaded: false,
};
onStatusChange = new Slot<SyncPeerStatus>();
readonly abort = new AbortController();
get name() {
return this.storage.name;
}
logger = new DebugLogger('affine:sync-peer:' + this.name);
constructor(
private readonly rootDoc: Doc,
private readonly storage: SyncStorage,
private readonly priorityTarget = new SharedPriorityTarget()
) {
this.logger.debug('peer start');
this.syncRetryLoop(this.abort.signal).catch(err => {
// should not reach here
console.error(err);
});
}
private set status(s: SyncPeerStatus) {
if (!isEqual(s, this._status)) {
this.logger.debug('status change', s);
this._status = s;
this.onStatusChange.emit(s);
}
}
get status() {
return this._status;
}
/**
* stop sync
*
* SyncPeer is one-time use, this peer should be discarded after call stop().
*/
stop() {
this.logger.debug('peer stop');
this.abort.abort(MANUALLY_STOP);
}
/**
* auto retry after 5 seconds if sync failed
*/
async syncRetryLoop(abort: AbortSignal) {
while (abort.aborted === false) {
try {
await this.sync(abort);
} catch (err) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}
this.logger.error('sync error', err);
}
try {
this.logger.error('retry after 5 seconds');
this.status = {
step: SyncPeerStep.Retrying,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
lastError: 'Retrying sync after 5 seconds',
rootDocLoaded: this.status.rootDocLoaded,
};
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 5 * 1000);
}),
new Promise((_, reject) => {
// exit if manually stopped
if (abort.aborted) {
reject(abort.reason);
}
abort.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
} catch (err) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}
// should never reach here
throw err;
}
}
}
private readonly state: {
connectedDocs: Map<string, Doc>;
pushUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array[];
}>;
pushingUpdate: boolean;
pullUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array;
}>;
subdocLoading: boolean;
subdocsLoadQueue: PriorityAsyncQueue<{ id: string; doc: Doc }>;
} = {
connectedDocs: new Map(),
pushUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
pushingUpdate: false,
pullUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
subdocLoading: false,
subdocsLoadQueue: new PriorityAsyncQueue([], this.priorityTarget),
};
initState() {
this.state.connectedDocs.clear();
this.state.pushUpdatesQueue.clear();
this.state.pullUpdatesQueue.clear();
this.state.subdocsLoadQueue.clear();
this.state.pushingUpdate = false;
this.state.subdocLoading = false;
}
/**
* main synchronization logic
*/
async sync(abortOuter: AbortSignal) {
this.initState();
const abortInner = new AbortController();
abortOuter.addEventListener('abort', reason => {
abortInner.abort(reason);
});
let dispose: (() => void) | null = null;
try {
this.reportSyncStatus();
// start listen storage updates
dispose = await this.storage.subscribe(
this.handleStorageUpdates,
reason => {
// abort if storage disconnect, should trigger retry loop
abortInner.abort('subscribe disconnect:' + reason);
}
);
throwIfAborted(abortInner.signal);
// Step 1: load root doc
await this.connectDoc(this.rootDoc, abortInner.signal);
// Step 2: load subdocs
this.state.subdocsLoadQueue.push(
...Array.from(this.rootDoc.getSubdocs()).map(doc => ({
id: doc.guid,
doc,
}))
);
this.reportSyncStatus();
this.rootDoc.on('subdocs', this.handleSubdocsUpdate);
// Finally: start sync
await Promise.all([
// load subdocs
(async () => {
while (throwIfAborted(abortInner.signal)) {
const subdoc = await this.state.subdocsLoadQueue.next(
abortInner.signal
);
this.state.subdocLoading = true;
this.reportSyncStatus();
await this.connectDoc(subdoc.doc, abortInner.signal);
this.state.subdocLoading = false;
this.reportSyncStatus();
}
})(),
// pull updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { id, data } = await this.state.pullUpdatesQueue.next(
abortInner.signal
);
// don't apply empty data or Uint8Array([0, 0])
if (
!(
data.byteLength === 0 ||
(data.byteLength === 2 && data[0] === 0 && data[1] === 0)
)
) {
const subdoc = this.state.connectedDocs.get(id);
if (subdoc) {
applyUpdate(subdoc, data, this.name);
}
}
this.reportSyncStatus();
}
})(),
// push updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { id, data } = await this.state.pushUpdatesQueue.next(
abortInner.signal
);
this.state.pushingUpdate = true;
this.reportSyncStatus();
const merged = mergeUpdates(data);
// don't push empty data or Uint8Array([0, 0])
if (
!(
merged.byteLength === 0 ||
(merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0)
)
) {
await this.storage.push(id, merged);
}
this.state.pushingUpdate = false;
this.reportSyncStatus();
}
})(),
]);
} finally {
dispose?.();
for (const docs of this.state.connectedDocs.values()) {
this.disconnectDoc(docs);
}
this.rootDoc.off('subdocs', this.handleSubdocsUpdate);
}
}
async connectDoc(doc: Doc, abort: AbortSignal) {
const { data: docData, state: inStorageState } =
(await this.storage.pull(doc.guid, encodeStateVector(doc))) ?? {};
throwIfAborted(abort);
if (docData !== undefined && doc.guid === this.rootDoc.guid) {
this.status = {
...this.status,
rootDocLoaded: true,
};
}
if (docData) {
applyUpdate(doc, docData, 'load');
}
// diff root doc and in-storage, save updates to pendingUpdates
this.state.pushUpdatesQueue.push({
id: doc.guid,
data: [encodeStateAsUpdate(doc, inStorageState)],
});
this.state.connectedDocs.set(doc.guid, doc);
// start listen root doc changes
doc.on('update', this.handleYDocUpdates);
// mark rootDoc as loaded
doc.emit('sync', [true]);
this.reportSyncStatus();
}
disconnectDoc(doc: Doc) {
doc.off('update', this.handleYDocUpdates);
this.state.connectedDocs.delete(doc.guid);
this.reportSyncStatus();
}
// handle updates from ydoc
handleYDocUpdates = (update: Uint8Array, origin: string, doc: Doc) => {
// don't push updates from storage
if (origin === this.name) {
return;
}
const exist = this.state.pushUpdatesQueue.find(({ id }) => id === doc.guid);
if (exist) {
exist.data.push(update);
} else {
this.state.pushUpdatesQueue.push({
id: doc.guid,
data: [update],
});
}
this.reportSyncStatus();
};
// handle subdocs changes, append new subdocs to queue, remove subdocs from queue
handleSubdocsUpdate = ({
added,
removed,
}: {
added: Set<Doc>;
removed: Set<Doc>;
}) => {
for (const subdoc of added) {
this.state.subdocsLoadQueue.push({ id: subdoc.guid, doc: subdoc });
}
for (const subdoc of removed) {
this.disconnectDoc(subdoc);
this.state.subdocsLoadQueue.remove(doc => doc.doc === subdoc);
}
this.reportSyncStatus();
};
// handle updates from storage
handleStorageUpdates = (id: string, data: Uint8Array) => {
this.state.pullUpdatesQueue.push({
id,
data,
});
this.reportSyncStatus();
};
reportSyncStatus() {
let step;
let lastError = null;
if (this.storage.errorMessage?.type === 'outdated') {
step = SyncPeerStep.VersionRejected;
lastError = this.storage.errorMessage.message.reason;
} else if (this.state.connectedDocs.size === 0) {
step = SyncPeerStep.LoadingRootDoc;
} else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) {
step = SyncPeerStep.LoadingSubDoc;
} else if (
this.state.pullUpdatesQueue.length ||
this.state.pushUpdatesQueue.length ||
this.state.pushingUpdate
) {
step = SyncPeerStep.Syncing;
} else {
step = SyncPeerStep.Synced;
}
this.status = {
step: step,
totalDocs:
this.state.connectedDocs.size + this.state.subdocsLoadQueue.length,
loadedDocs: this.state.connectedDocs.size,
pendingPullUpdates:
this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0),
pendingPushUpdates:
this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0),
lastError,
rootDocLoaded: this.status.rootDocLoaded,
};
}
async waitForSynced(abort?: AbortSignal) {
if (this.status.step >= SyncPeerStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step >= SyncPeerStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForLoaded(abort?: AbortSignal) {
if (this.status.step > SyncPeerStep.Loaded) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step > SyncPeerStep.Loaded) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
}

View File

@@ -1,57 +0,0 @@
export type RejectByVersion = {
currVersion: string;
requiredVersion: string;
reason: string;
};
export type SyncErrorMessage = {
type: 'outdated';
message: RejectByVersion;
};
export interface SyncStorage {
/**
* for debug
*/
name: string;
errorMessage?: SyncErrorMessage;
pull(
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array } | null>;
push(docId: string, data: Uint8Array): Promise<void>;
/**
* Subscribe to updates from peer
*
* @param cb callback to handle updates
* @param disconnect callback to handle disconnect, reason can be something like 'network-error'
*
* @returns unsubscribe function
*/
subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
): Promise<() => void>;
}
export const EmptySyncStorage: SyncStorage = {
name: 'empty',
pull: async () => null,
push: async () => {},
subscribe: async () => () => {},
};
export const ReadonlyMappingSyncStorage = (map: {
[key: string]: Uint8Array;
}): SyncStorage => ({
name: 'map',
pull: async (id: string) => {
const data = map[id];
return data ? { data } : null;
},
push: async () => {},
subscribe: async () => () => {},
});

View File

@@ -23,11 +23,11 @@ import {
AwarenessEngine,
AwarenessProvider,
BlobEngine,
DocEngine,
DocServerImpl,
DocStorageImpl,
LocalBlobStorage,
LocalSyncStorage,
RemoteBlobStorage,
RemoteSyncStorage,
SyncEngine,
WorkspaceEngine,
} from './engine';
import { WorkspaceFactory } from './factory';
@@ -63,13 +63,23 @@ export function configureWorkspaceServices(services: ServiceCollection) {
WorkspaceUpgradeController,
ServiceProvider,
])
.add(WorkspaceEngine, [BlobEngine, SyncEngine, AwarenessEngine])
.add(WorkspaceEngine, [
BlobEngine,
DocEngine,
AwarenessEngine,
RootYDocContext,
])
.add(AwarenessEngine, [[AwarenessProvider]])
.add(BlobEngine, [LocalBlobStorage, [RemoteBlobStorage]])
.add(SyncEngine, [RootYDocContext, LocalSyncStorage, [RemoteSyncStorage]])
.addImpl(DocEngine, services => {
return new DocEngine(
services.get(DocStorageImpl),
services.getOptional(DocServerImpl)
);
})
.add(WorkspaceUpgradeController, [
BlockSuiteWorkspaceContext,
SyncEngine,
DocEngine,
WorkspaceMetadataContext,
]);
}

View File

@@ -126,7 +126,7 @@ export class WorkspaceManager {
async transformLocalToCloud(local: Workspace): Promise<WorkspaceMetadata> {
assertEquals(local.flavour, WorkspaceFlavour.LOCAL);
await local.engine.sync.waitForSynced();
await local.engine.waitForSynced();
const newId = await this.list.create(
WorkspaceFlavour.AFFINE_CLOUD,

View File

@@ -6,15 +6,15 @@ import { applyUpdate, encodeStateAsUpdate } from 'yjs';
import { type ServiceCollection } from '../di';
import { GlobalState, type Memento } from '../storage';
import { mergeUpdates } from '../utils/merge-updates';
import { WorkspaceMetadataContext } from './context';
import {
AwarenessProvider,
type BlobStorage,
DocStorageImpl,
LocalBlobStorage,
LocalSyncStorage,
type SyncStorage,
MemoryDocStorage,
} from './engine';
import { MemoryStorage } from './engine/doc/storage';
import type { WorkspaceFactory } from './factory';
import { globalBlockSuiteSchema } from './global-schema';
import type { WorkspaceListProvider } from './list';
@@ -28,6 +28,7 @@ export class TestingLocalWorkspaceListProvider
implements WorkspaceListProvider
{
name = WorkspaceFlavour.LOCAL;
docStorage = new MemoryDocStorage(this.state);
constructor(private readonly state: Memento) {}
@@ -51,7 +52,6 @@ export class TestingLocalWorkspaceListProvider
const meta = { id, flavour: WorkspaceFlavour.LOCAL };
const blobStorage = new TestingBlobStorage(meta, this.state);
const syncStorage = new TestingSyncStorage(meta, this.state);
const docCollection = new DocCollection({
id: id,
@@ -63,9 +63,9 @@ export class TestingLocalWorkspaceListProvider
await initial(docCollection, blobStorage);
// save workspace to storage
await syncStorage.push(id, encodeStateAsUpdate(docCollection.doc));
await this.docStorage.doc.set(id, encodeStateAsUpdate(docCollection.doc));
for (const subdocs of docCollection.doc.getSubdocs()) {
await syncStorage.push(subdocs.guid, encodeStateAsUpdate(subdocs));
await this.docStorage.doc.set(subdocs.guid, encodeStateAsUpdate(subdocs));
}
const list = this.state.get<WorkspaceMetadata[]>(LIST_STORE_KEY) ?? [];
@@ -104,14 +104,7 @@ export class TestingLocalWorkspaceListProvider
}
async getInformation(id: string): Promise<WorkspaceInfo | undefined> {
// get information from root doc
const storage = new TestingSyncStorage(
{
flavour: WorkspaceFlavour.LOCAL,
id,
},
this.state
);
const data = await storage.pull(id, new Uint8Array([]));
const data = await this.docStorage.doc.get(id);
if (!data) {
return;
@@ -122,7 +115,7 @@ export class TestingLocalWorkspaceListProvider
schema: globalBlockSuiteSchema,
});
applyUpdate(bs.doc, data.data);
applyUpdate(bs.doc, data);
return {
name: bs.meta.name,
@@ -143,10 +136,7 @@ export class TestingLocalWorkspaceFactory implements WorkspaceFactory {
WorkspaceMetadataContext,
GlobalState,
])
.addImpl(LocalSyncStorage, TestingSyncStorage, [
WorkspaceMetadataContext,
GlobalState,
])
.addImpl(DocStorageImpl, MemoryStorage, [GlobalState])
.addImpl(AwarenessProvider, TestingAwarenessProvider);
}
@@ -161,38 +151,6 @@ export class TestingLocalWorkspaceFactory implements WorkspaceFactory {
}
}
export class TestingSyncStorage implements SyncStorage {
constructor(
private readonly metadata: WorkspaceMetadata,
private readonly state: Memento
) {}
name: string = 'testing';
async pull(
docId: string,
_: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> {
const key = 'testing-sync/' + this.metadata.id + '/' + docId;
const data = this.state.get<Uint8Array>(key);
if (data) {
return { data };
} else {
return null;
}
}
async push(docId: string, data: Uint8Array): Promise<void> {
const key = 'testing-sync/' + this.metadata.id + '/' + docId;
const oldData = this.state.get<Uint8Array>(key);
const update = mergeUpdates(oldData ? [oldData, data] : [data]);
this.state.set(key, update);
}
async subscribe(
_cb: (docId: string, data: Uint8Array) => void,
_disconnect: (reason: string) => void
): Promise<() => void> {
return () => {};
}
}
export class TestingBlobStorage implements BlobStorage {
name = 'testing';
readonly = false;

View File

@@ -7,7 +7,7 @@ import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { checkWorkspaceCompatibility, MigrationPoint } from '../blocksuite';
import { forceUpgradePages, upgradeV1ToV2 } from '../blocksuite';
import { migrateGuidCompatibility } from '../blocksuite';
import type { SyncEngine } from './engine/sync';
import type { DocEngine } from './engine';
import type { WorkspaceManager } from './manager';
import { type WorkspaceMetadata } from './metadata';
@@ -39,7 +39,7 @@ export class WorkspaceUpgradeController {
constructor(
private readonly docCollection: DocCollection,
private readonly sync: SyncEngine,
private readonly docEngine: DocEngine,
private readonly workspaceMetadata: WorkspaceMetadata
) {
docCollection.doc.on('update', () => {
@@ -69,7 +69,7 @@ export class WorkspaceUpgradeController {
this.status = { ...this.status, upgrading: true };
try {
await this.sync.waitForSynced();
await this.docEngine.waitForSynced();
const step = checkWorkspaceCompatibility(
this.docCollection,
@@ -109,12 +109,12 @@ export class WorkspaceUpgradeController {
migrateGuidCompatibility(clonedDoc);
await forceUpgradePages(clonedDoc, this.docCollection.schema);
applyDoc(this.docCollection.doc, clonedDoc);
await this.sync.waitForSynced();
await this.docEngine.waitForSynced();
return null;
} else if (step === MigrationPoint.BlockVersion) {
await forceUpgradePages(clonedDoc, this.docCollection.schema);
applyDoc(this.docCollection.doc, clonedDoc);
await this.sync.waitForSynced();
await this.docEngine.waitForSynced();
return null;
} else {
throw new Unreachable();

View File

@@ -126,8 +126,7 @@ export class Workspace {
this.services.get(CleanupService).cleanup();
}
// same as `WorkspaceEngine.sync.setPriorityRule`
setPriorityRule(target: ((id: string) => boolean) | null) {
this.engine.sync.setPriorityRule(target);
setPriorityLoad(docId: string, priority: number) {
this.engine.doc.setPriority(docId, priority);
}
}