feat(server): make an abstraction for ydoc storage (#7901)

This commit is contained in:
forehalo
2024-08-21 05:30:22 +00:00
parent 682a01e441
commit 6f9f579e5d
5 changed files with 306 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
import { Connection } from './connection';
export interface BlobStorageOptions {}
export interface Blob {
key: string;
bin: Uint8Array;
mimeType: string;
}
export abstract class BlobStorageAdapter extends Connection {
abstract getBlob(spaceId: string, key: string): Promise<Blob | null>;
abstract setBlob(spaceId: string, blob: Blob): Promise<string>;
abstract deleteBlob(spaceId: string, key: string): Promise<boolean>;
abstract listBlobs(spaceId: string): Promise<Blob>;
}

View File

@@ -0,0 +1,11 @@
export class Connection {
protected connected: boolean = false;
connect(): Promise<void> {
this.connected = true;
return Promise.resolve();
}
disconnect(): Promise<void> {
this.connected = false;
return Promise.resolve();
}
}

View File

@@ -0,0 +1,205 @@
import {
applyUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVector,
mergeUpdates,
UndoManager,
} from 'yjs';
import { CallTimer } from '../../../fundamentals';
import { Connection } from './connection';
import { SingletonLocker } from './lock';
export interface DocRecord {
spaceId: string;
docId: string;
bin: Uint8Array;
timestamp: number;
}
export interface DocUpdate {
bin: Uint8Array;
timestamp: number;
}
export interface HistoryFilter {
before?: number;
limit?: number;
}
export interface DocStorageOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}
export abstract class DocStorageAdapter extends Connection {
private readonly locker = new SingletonLocker();
constructor(
protected readonly options: DocStorageOptions = {
mergeUpdates,
}
) {
super();
}
// open apis
isEmptyBin(bin: Uint8Array): boolean {
return (
bin.length === 0 ||
// 0x0 for state vector
(bin.length === 1 && bin[0] === 0) ||
// 0x00 for update
(bin.length === 2 && bin[0] === 0 && bin[1] === 0)
);
}
async getDoc(spaceId: string, docId: string): Promise<DocRecord | null> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
const snapshot = await this.getDocSnapshot(spaceId, docId);
const updates = await this.getDocUpdates(spaceId, docId);
if (updates.length) {
const { timestamp, bin } = await this.squash(
snapshot ? [snapshot, ...updates] : updates
);
const newSnapshot = {
spaceId: spaceId,
docId,
bin,
timestamp,
};
const success = await this.setDocSnapshot(newSnapshot);
// if there is old snapshot, create a new history record
if (success && snapshot) {
await this.createDocHistory(snapshot);
}
// always mark updates as merged unless throws
await this.markUpdatesMerged(spaceId, docId, updates);
return newSnapshot;
}
return snapshot;
}
abstract pushDocUpdates(
spaceId: string,
docId: string,
updates: Uint8Array[]
): Promise<number>;
abstract deleteDoc(spaceId: string, docId: string): Promise<void>;
abstract deleteSpace(spaceId: string): Promise<void>;
async rollbackDoc(
spaceId: string,
docId: string,
timestamp: number
): Promise<void> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
if (!toSnapshot) {
throw new Error('Can not find the version to rollback to.');
}
const fromSnapshot = await this.getDocSnapshot(spaceId, docId);
if (!fromSnapshot) {
throw new Error('Can not find the current version of the doc.');
}
const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin);
await this.pushDocUpdates(spaceId, docId, [change]);
// force create a new history record after rollback
await this.createDocHistory(fromSnapshot, true);
}
abstract getSpaceDocTimestamps(
spaceId: string,
after?: number
): Promise<Record<string, number> | null>;
abstract listDocHistories(
spaceId: string,
docId: string,
query: { skip?: number; limit?: number }
): Promise<number[]>;
abstract getDocHistory(
spaceId: string,
docId: string,
timestamp: number
): Promise<DocRecord | null>;
// api for internal usage
protected abstract getDocSnapshot(
spaceId: string,
docId: string
): Promise<DocRecord | null>;
protected abstract setDocSnapshot(snapshot: DocRecord): Promise<boolean>;
protected abstract getDocUpdates(
spaceId: string,
docId: string
): Promise<DocUpdate[]>;
protected abstract markUpdatesMerged(
spaceId: string,
docId: string,
updates: DocUpdate[]
): Promise<number>;
protected abstract createDocHistory(
snapshot: DocRecord,
force?: boolean
): Promise<boolean>;
@CallTimer('doc', 'squash')
protected async squash(updates: DocUpdate[]): Promise<DocUpdate> {
const merge = this.options?.mergeUpdates ?? mergeUpdates;
const lastUpdate = updates.at(-1);
if (!lastUpdate) {
throw new Error('No updates to be squashed.');
}
// fast return
if (updates.length === 1) {
return lastUpdate;
}
const finalUpdate = await merge(updates.map(u => u.bin));
return {
bin: finalUpdate,
timestamp: lastUpdate.timestamp,
};
}
protected async lockDocForUpdate(
spaceId: string,
docId: string
): Promise<AsyncDisposable> {
return this.locker.lock(`workspace:${spaceId}:update`, docId);
}
protected generateChangeUpdate(newerBin: Uint8Array, olderBin: Uint8Array) {
const newerDoc = new Doc();
applyUpdate(newerDoc, newerBin);
const olderDoc = new Doc();
applyUpdate(olderDoc, olderBin);
const newerState = encodeStateVector(newerDoc);
const olderState = encodeStateVector(olderDoc);
const diff = encodeStateAsUpdate(newerDoc, olderState);
const undoManager = new UndoManager(Array.from(newerDoc.share.values()));
applyUpdate(olderDoc, diff);
undoManager.undo();
return encodeStateAsUpdate(olderDoc, newerState);
}
}

View File

@@ -0,0 +1,32 @@
// TODO(@forehalo): share with frontend
import type { BlobStorageAdapter } from './blob';
import { Connection } from './connection';
import type { DocStorageAdapter } from './doc';
export class SpaceStorage extends Connection {
constructor(
public readonly doc: DocStorageAdapter,
public readonly blob: BlobStorageAdapter
) {
super();
}
override async connect() {
await this.doc.connect();
await this.blob.connect();
}
override async disconnect() {
await this.doc.disconnect();
await this.blob.disconnect();
}
}
export { BlobStorageAdapter, type BlobStorageOptions } from './blob';
export {
type DocRecord,
DocStorageAdapter,
type DocStorageOptions,
type DocUpdate,
type HistoryFilter,
} from './doc';

View File

@@ -0,0 +1,42 @@
export interface Locker {
lock(domain: string, resource: string): Promise<Lock>;
}
export class SingletonLocker implements Locker {
lockedResource = new Map<string, Lock>();
constructor() {}
async lock(domain: string, resource: string) {
let lock = this.lockedResource.get(`${domain}:${resource}`);
if (!lock) {
lock = new Lock();
}
await lock.acquire();
return lock;
}
}
export class Lock {
private inner: Promise<void> = Promise.resolve();
private release: () => void = () => {};
async acquire() {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let release: () => void = null!;
const nextLock = new Promise<void>(resolve => {
release = resolve;
});
await this.inner;
this.inner = nextLock;
this.release = release;
}
[Symbol.asyncDispose]() {
this.release();
return Promise.resolve();
}
}