feat(nbstore): init (#7639)

TODO

- [x] basic
- [x] storages
- [x] producer/consumer
- [x] operation pattern
- [x] events
- [x] worker
- [x] readme
- [x] peer dependencies
This commit is contained in:
forehalo
2024-11-22 03:13:04 +00:00
parent 76eabf644c
commit 4125038ff8
18 changed files with 882 additions and 3 deletions

View File

@@ -0,0 +1,69 @@
# Space Storage
## Usage
### Independent Storage usage
```ts
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';
const storage = new IndexedDBDocStorage({
peer: 'local'
spaceId: 'my-new-workspace',
});
await storage.connect();
storage.connection.onStatusChange((status: ConnectionStatus, error?: Error) => {
ui.show(status, error);
});
// { docId: string, bin: Uint8Array, timestamp: Date, editor?: string } | null
const doc = await storage.getDoc('my-first-doc');
```
### Use All storages together
```ts
import { SpaceStorage } from '@affine/nbstore';
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';
import { SqliteBlobStorage } from '@affine/nbstore/sqlite';
const storage = new SpaceStorage([new IndexedDBDocStorage({}), new SqliteBlobStorage({})]);
await storage.connect();
storage.on('connection', ({ storage, status, error }) => {
ui.show(storage, status, error);
});
await storage.get('doc').pushDocUpdate({ docId: 'my-first-doc', bin: new Uint8Array(), editor: 'me' });
await storage.tryGet('blob')?.get('img');
```
### Put Storage behind Worker
```ts
import { SpaceStorageWorkerClient } from '@affine/nbstore/op';
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';
const client = new SpaceStorageWorkerClient();
client.addStorage(IndexedDBDocStorage, {
// options can only be structure-cloneable type
peer: 'local',
spaceType: 'workspace',
spaceId: 'my-new-workspace',
});
await client.connect();
client.ob$('connection', ({ storage, status, error }) => {
ui.show(storage, status, error);
});
await client.call('pushDocUpdate', { docId: 'my-first-doc', bin: new Uint8Array(), editor: 'me' });
// call unregistered op will leads to Error
// Error { message: 'Handler for operation [listHistory] is not registered.' }
await client.call('listHistories', { docId: 'my-first-doc' });
```

View File

@@ -0,0 +1,17 @@
{
"name": "@affine/nbstore",
"type": "module",
"version": "0.18.0",
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
}
}

View File

@@ -0,0 +1,132 @@
import EventEmitter2 from 'eventemitter2';
export type ConnectionStatus =
| 'idle'
| 'connecting'
| 'connected'
| 'error'
| 'closed';
export abstract class Connection<T = any> {
private readonly event = new EventEmitter2();
private _inner: T | null = null;
private _status: ConnectionStatus = 'idle';
protected error?: Error;
private refCount = 0;
constructor() {
this.autoReconnect();
}
get shareId(): string | undefined {
return undefined;
}
get maybeConnection() {
return this._inner;
}
get inner(): T {
if (!this._inner) {
throw new Error(
`Connection ${this.constructor.name} has not been established.`
);
}
return this._inner;
}
protected set inner(inner: T | null) {
this._inner = inner;
}
get status() {
return this._status;
}
protected setStatus(status: ConnectionStatus, error?: Error) {
const shouldEmit = status !== this._status && error !== this.error;
this._status = status;
this.error = error;
if (shouldEmit) {
this.emitStatusChanged(status, error);
}
}
abstract doConnect(): Promise<T>;
abstract doDisconnect(conn: T): Promise<void>;
ref() {
this.refCount++;
}
deref() {
this.refCount = Math.max(0, this.refCount - 1);
}
async connect() {
if (this.status === 'idle' || this.status === 'error') {
this.setStatus('connecting');
try {
this._inner = await this.doConnect();
this.setStatus('connected');
} catch (error) {
this.setStatus('error', error as any);
}
}
}
async disconnect() {
this.deref();
if (this.refCount > 0) {
return;
}
if (this.status === 'connected') {
try {
if (this._inner) {
await this.doDisconnect(this._inner);
this._inner = null;
}
this.setStatus('closed');
} catch (error) {
this.setStatus('error', error as any);
}
}
}
private autoReconnect() {
// TODO:
// - maximum retry count
// - dynamic sleep time (attempt < 3 ? 1s : 1min)?
this.onStatusChanged(() => {
this.connect().catch(() => {});
});
}
onStatusChanged(
cb: (status: ConnectionStatus, error?: Error) => void
): () => void {
this.event.on('statusChanged', cb);
return () => {
this.event.off('statusChanged', cb);
};
}
private readonly emitStatusChanged = (
status: ConnectionStatus,
error?: Error
) => {
this.event.emit('statusChanged', status, error);
};
}
export class DummyConnection extends Connection<undefined> {
doConnect() {
return Promise.resolve(undefined);
}
doDisconnect() {
return Promise.resolve(undefined);
}
}

View File

@@ -0,0 +1,2 @@
export * from './connection';
export * from './shared-connection';

View File

@@ -0,0 +1,22 @@
import type { Connection } from './connection';
const CONNECTIONS: Map<string, Connection<any>> = new Map();
export function share<T extends Connection<any>>(conn: T): T {
if (!conn.shareId) {
throw new Error(
`Connection ${conn.constructor.name} is not shareable.\nIf you want to make it shareable, please override [shareId].`
);
}
const existing = CONNECTIONS.get(conn.shareId);
if (existing) {
existing.ref();
return existing as T;
}
CONNECTIONS.set(conn.shareId, conn);
conn.ref();
return conn;
}

View File

@@ -0,0 +1,2 @@
export * from './connection';
export * from './storage';

View File

@@ -0,0 +1,29 @@
import { Storage, type StorageOptions } from './storage';
export interface BlobStorageOptions extends StorageOptions {}
export interface BlobRecord {
key: string;
data: Uint8Array;
mime: string;
createdAt: Date;
}
export interface ListedBlobRecord {
key: string;
mime: string;
size: number;
createdAt: Date;
}
export abstract class BlobStorage<
Options extends BlobStorageOptions = BlobStorageOptions,
> extends Storage<Options> {
override readonly storageType = 'blob';
abstract get(key: string): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord): Promise<void>;
abstract delete(key: string, permanently: boolean): Promise<void>;
abstract release(): Promise<void>;
abstract list(): Promise<ListedBlobRecord[]>;
}

View File

@@ -0,0 +1,258 @@
import EventEmitter2 from 'eventemitter2';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';
import type { Lock } from './lock';
import { SingletonLocker } from './lock';
import { Storage, type StorageOptions } from './storage';
export interface DocClock {
docId: string;
timestamp: Date;
}
export type DocClocks = Record<string, Date>;
export interface DocRecord extends DocClock {
bin: Uint8Array;
editor?: string;
}
export interface DocDiff extends DocClock {
missing: Uint8Array;
state: Uint8Array;
}
export interface DocUpdate {
docId: string;
bin: Uint8Array;
editor?: string;
}
export interface Editor {
name: string;
avatarUrl: string | null;
}
export interface DocStorageOptions extends StorageOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}
export abstract class DocStorage<
Opts extends DocStorageOptions = DocStorageOptions,
> extends Storage<Opts> {
private readonly event = new EventEmitter2();
override readonly storageType = 'doc';
private readonly locker = new SingletonLocker();
/**
* Tell a binary is empty yjs binary or not.
*
* NOTE:
* `[0, 0]` is empty yjs update binary
* `[0]` is empty yjs state vector binary
*/
isEmptyBin(bin: Uint8Array): boolean {
return (
bin.length === 0 ||
// 0x0 for state vector
(bin.length === 1 && bin[0] === 0) ||
// 0x00 for update
(bin.length === 2 && bin[0] === 0 && bin[1] === 0)
);
}
// REGION: open apis by Op system
/**
* Get a doc record with latest binary.
*/
async getDoc(docId: string) {
await using _lock = await this.lockDocForUpdate(docId);
const snapshot = await this.getDocSnapshot(docId);
const updates = await this.getDocUpdates(docId);
if (updates.length) {
const { timestamp, bin, editor } = await this.squash(
snapshot ? [snapshot, ...updates] : updates
);
const newSnapshot = {
spaceId: this.spaceId,
docId,
bin,
timestamp,
editor,
};
await this.setDocSnapshot(newSnapshot, snapshot);
// always mark updates as merged unless throws
await this.markUpdatesMerged(docId, updates);
return newSnapshot;
}
return snapshot;
}
/**
* Get a yjs binary diff with the given state vector.
*/
async getDocDiff(docId: string, state?: Uint8Array) {
const doc = await this.getDoc(docId);
if (!doc) {
return null;
}
return {
docId,
missing: state ? diffUpdate(doc.bin, state) : doc.bin,
state: encodeStateVectorFromUpdate(doc.bin),
timestamp: doc.timestamp,
};
}
/**
* Push updates into storage
*/
abstract pushDocUpdate(update: DocUpdate): Promise<DocClock>;
/**
* Get all docs timestamps info. especially for useful in sync process.
*/
abstract getDocTimestamps(after?: Date): Promise<DocClocks>;
/**
* Delete a specific doc data with all snapshots and updates
*/
abstract deleteDoc(docId: string): Promise<void>;
/**
* Subscribe on doc updates emitted from storage itself.
*
* NOTE:
*
* There is not always update emitted from storage itself.
*
* For example, in Sqlite storage, the update will only come from user's updating on docs,
* in other words, the update will never somehow auto generated in storage internally.
*
* But for Cloud storage, there will be updates broadcasted from other clients,
* so the storage will emit updates to notify the client to integrate them.
*/
subscribeDocUpdate(callback: (update: DocRecord) => void) {
this.event.on('update', callback);
return () => {
this.event.off('update', callback);
};
}
// ENDREGION
// REGION: api for internal usage
protected on(
event: 'update',
callback: (update: DocRecord) => void
): () => void;
protected on(
event: 'snapshot',
callback: (snapshot: DocRecord, prevSnapshot: DocRecord | null) => void
): () => void;
protected on(event: string, callback: (...args: any[]) => void): () => void {
this.event.on(event, callback);
return () => {
this.event.off(event, callback);
};
}
protected emit(event: 'update', update: DocRecord): void;
protected emit(
event: 'snapshot',
snapshot: DocRecord,
prevSnapshot: DocRecord | null
): void;
protected emit(event: string, ...args: any[]): void {
this.event.emit(event, ...args);
}
protected off(event: string, callback: (...args: any[]) => void): void {
this.event.off(event, callback);
}
/**
* Get a doc snapshot from storage
*/
protected abstract getDocSnapshot(docId: string): Promise<DocRecord | null>;
/**
* Set the doc snapshot into storage
*
* @safety
* be careful when implementing this method.
*
* It might be called with outdated snapshot when running in multi-thread environment.
*
* A common solution is update the snapshot record is DB only when the coming one's timestamp is newer.
*
* @example
* ```ts
* await using _lock = await this.lockDocForUpdate(docId);
* // set snapshot
*
* ```
*/
protected abstract setDocSnapshot(
snapshot: DocRecord,
prevSnapshot: DocRecord | null
): Promise<boolean>;
/**
* Get all updates of a doc that haven't been merged into snapshot.
*
* Updates queue design exists for a performace concern:
* A huge amount of write time will be saved if we don't merge updates into snapshot immediately.
* Updates will be merged into snapshot when the latest doc is requested.
*/
protected abstract getDocUpdates(docId: string): Promise<DocRecord[]>;
/**
* Mark updates as merged into snapshot.
*/
protected abstract markUpdatesMerged(
docId: string,
updates: DocRecord[]
): Promise<number>;
/**
* Merge doc updates into a single update.
*/
protected async squash(updates: DocRecord[]): Promise<DocRecord> {
const lastUpdate = updates.at(-1);
if (!lastUpdate) {
throw new Error('No updates to be squashed.');
}
// fast return
if (updates.length === 1) {
return lastUpdate;
}
const finalUpdate = await this.mergeUpdates(updates.map(u => u.bin));
return {
docId: lastUpdate.docId,
bin: finalUpdate,
timestamp: lastUpdate.timestamp,
editor: lastUpdate.editor,
};
}
protected mergeUpdates(updates: Uint8Array[]) {
const merge = this.options?.mergeUpdates ?? mergeUpdates;
return merge(updates.filter(bin => !this.isEmptyBin(bin)));
}
protected async lockDocForUpdate(docId: string): Promise<Lock> {
return this.locker.lock(`workspace:${this.spaceId}:update`, docId);
}
}

View File

@@ -0,0 +1,122 @@
import { noop } from 'lodash-es';
import {
applyUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVector,
UndoManager,
} from 'yjs';
import { type DocRecord, DocStorage, type DocStorageOptions } from './doc';
export interface HistoryFilter {
before?: Date;
limit?: Date;
}
export interface ListedHistory {
userId: string | null;
timestamp: Date;
}
export abstract class HistoricalDocStorage<
Options extends DocStorageOptions = DocStorageOptions,
> extends DocStorage<Options> {
constructor(opts: Options) {
super(opts);
this.on('snapshot', snapshot => {
this.createHistory(snapshot.docId, snapshot).catch(noop);
});
}
override async setDocSnapshot(
snapshot: DocRecord,
prevSnapshot: DocRecord | null
): Promise<boolean> {
const success = await this.upsertDocSnapshot(snapshot, prevSnapshot);
if (success) {
this.emit('snapshot', snapshot, prevSnapshot);
}
return success;
}
/**
* Update the doc snapshot in storage or create a new one if not exists.
*
* @safety
* be careful when implementing this method.
*
* It might be called with outdated snapshot when running in multi-thread environment.
*
* A common solution is update the snapshot record is DB only when the coming one's timestamp is newer.
*
* @example
* ```ts
* await using _lock = await this.lockDocForUpdate(docId);
* // set snapshot
*
* ```
*/
abstract upsertDocSnapshot(
snapshot: DocRecord,
prevSnapshot: DocRecord | null
): Promise<boolean>;
abstract listHistories(
docId: string,
filter?: HistoryFilter
): Promise<ListedHistory[]>;
abstract getHistory(
docId: string,
timestamp: Date
): Promise<DocRecord | null>;
abstract deleteHistory(docId: string, timestamp: Date): Promise<void>;
async rollbackDoc(docId: string, timestamp: Date, editor?: string) {
const toSnapshot = await this.getHistory(docId, timestamp);
if (!toSnapshot) {
throw new Error('Can not find the version to rollback to.');
}
const fromSnapshot = await this.getDoc(docId);
if (!fromSnapshot) {
throw new Error('Can not find the current version of the doc.');
}
const change = this.generateRevertUpdate(fromSnapshot.bin, toSnapshot.bin);
await this.pushDocUpdate({ docId, bin: change, editor });
// force create a new history record after rollback
await this.createHistory(docId, fromSnapshot);
}
// history can only be created upon update pushing.
protected abstract createHistory(
docId: string,
snapshot: DocRecord
): Promise<void>;
protected generateRevertUpdate(
fromNewerBin: Uint8Array,
toOlderBin: Uint8Array
): Uint8Array {
const newerDoc = new Doc();
applyUpdate(newerDoc, fromNewerBin);
const olderDoc = new Doc();
applyUpdate(olderDoc, toOlderBin);
const newerState = encodeStateVector(newerDoc);
const olderState = encodeStateVector(olderDoc);
const diff = encodeStateAsUpdate(newerDoc, olderState);
const undoManager = new UndoManager(Array.from(olderDoc.share.values()));
applyUpdate(olderDoc, diff);
undoManager.undo();
return encodeStateAsUpdate(olderDoc, newerState);
}
}

View File

@@ -0,0 +1,93 @@
import EventEmitter2 from 'eventemitter2';
import type { ConnectionStatus } from '../connection';
import { type Storage, type StorageType } from '../storage';
export class SpaceStorage {
protected readonly storages: Map<StorageType, Storage> = new Map();
private readonly event = new EventEmitter2();
private readonly disposables: Set<() => void> = new Set();
constructor(storages: Storage[] = []) {
this.storages = new Map(
storages.map(storage => [storage.storageType, storage])
);
}
tryGet(type: StorageType) {
return this.storages.get(type);
}
get(type: StorageType) {
const storage = this.tryGet(type);
if (!storage) {
throw new Error(`Storage ${type} not registered.`);
}
return storage;
}
async connect() {
await Promise.allSettled(
Array.from(this.storages.values()).map(async storage => {
this.disposables.add(
storage.connection.onStatusChanged((status, error) => {
this.event.emit('connection', {
storage: storage.storageType,
status,
error,
});
})
);
await storage.connect();
})
);
}
async disconnect() {
await Promise.allSettled(
Array.from(this.storages.values()).map(async storage => {
await storage.disconnect();
})
);
}
on(
event: 'connection',
cb: (payload: {
storage: StorageType;
status: ConnectionStatus;
error?: Error;
}) => void
): () => void {
this.event.on(event, cb);
return () => {
this.event.off(event, cb);
};
}
off(
event: 'connection',
cb: (payload: {
storage: StorageType;
status: ConnectionStatus;
error?: Error;
}) => void
): void {
this.event.off(event, cb);
}
async destroy() {
await this.disconnect();
this.disposables.forEach(disposable => disposable());
this.event.removeAllListeners();
this.storages.clear();
}
}
export * from './blob';
export * from './doc';
export * from './history';
export * from './storage';
export * from './sync';

View File

@@ -0,0 +1,44 @@
export interface Locker {
lock(domain: string, resource: string): Promise<Lock>;
}
export class SingletonLocker implements Locker {
lockedResource = new Map<string, Lock>();
constructor() {}
async lock(domain: string, resource: string) {
const key = `${domain}:${resource}`;
let lock = this.lockedResource.get(key);
if (!lock) {
lock = new Lock();
this.lockedResource.set(key, lock);
}
await lock.acquire();
return lock;
}
}
export class Lock {
private inner: Promise<void> = Promise.resolve();
private release: () => void = () => {};
async acquire() {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let release: () => void = null!;
const nextLock = new Promise<void>(resolve => {
release = resolve;
});
await this.inner;
this.inner = nextLock;
this.release = release;
}
[Symbol.asyncDispose]() {
this.release();
return Promise.resolve();
}
}

View File

@@ -0,0 +1,37 @@
import type { Connection } from '../connection';
export type SpaceType = 'workspace' | 'userspace';
export type StorageType = 'blob' | 'doc' | 'sync';
export interface StorageOptions {
peer: string;
type: SpaceType;
id: string;
}
export abstract class Storage<Opts extends StorageOptions = StorageOptions> {
abstract readonly storageType: StorageType;
abstract readonly connection: Connection;
get peer() {
return this.options.peer;
}
get spaceType() {
return this.options.type;
}
get spaceId() {
return this.options.id;
}
constructor(public readonly options: Opts) {}
async connect() {
await this.connection.connect();
}
async disconnect() {
await this.connection.disconnect();
}
}

View File

@@ -0,0 +1,16 @@
import type { DocClock, DocClocks } from './doc';
import { Storage, type StorageOptions } from './storage';
export interface SyncStorageOptions extends StorageOptions {}
export abstract class SyncStorage<
Opts extends SyncStorageOptions = SyncStorageOptions,
> extends Storage<Opts> {
override readonly storageType = 'sync';
abstract getPeerClocks(peer: string): Promise<DocClocks>;
abstract setPeerClock(peer: string, clock: DocClock): Promise<void>;
abstract getPeerPushedClocks(peer: string): Promise<DocClocks>;
abstract setPeerPushedClock(peer: string, clock: DocClock): Promise<void>;
abstract clearClocks(): Promise<void>;
}

View File

@@ -0,0 +1,20 @@
{
"extends": "../../../tsconfig.json",
"include": ["./src"],
"compilerOptions": {
"composite": true,
"noEmit": false,
"outDir": "lib"
},
"references": [
{
"path": "../../frontend/graphql"
},
{
"path": "../../frontend/electron-api"
},
{
"path": "../infra"
}
]
}