From 6f9f579e5d937199ab86258766b001968b9ce06b Mon Sep 17 00:00:00 2001 From: forehalo Date: Wed, 21 Aug 2024 05:30:22 +0000 Subject: [PATCH] feat(server): make an abstraction for ydoc storage (#7901) --- .../server/src/core/doc/storage/blob.ts | 16 ++ .../server/src/core/doc/storage/connection.ts | 11 + .../server/src/core/doc/storage/doc.ts | 205 ++++++++++++++++++ .../server/src/core/doc/storage/index.ts | 32 +++ .../server/src/core/doc/storage/lock.ts | 42 ++++ 5 files changed, 306 insertions(+) create mode 100644 packages/backend/server/src/core/doc/storage/blob.ts create mode 100644 packages/backend/server/src/core/doc/storage/connection.ts create mode 100644 packages/backend/server/src/core/doc/storage/doc.ts create mode 100644 packages/backend/server/src/core/doc/storage/index.ts create mode 100644 packages/backend/server/src/core/doc/storage/lock.ts diff --git a/packages/backend/server/src/core/doc/storage/blob.ts b/packages/backend/server/src/core/doc/storage/blob.ts new file mode 100644 index 0000000000..e3c78191b7 --- /dev/null +++ b/packages/backend/server/src/core/doc/storage/blob.ts @@ -0,0 +1,16 @@ +import { Connection } from './connection'; + +export interface BlobStorageOptions {} + +export interface Blob { + key: string; + bin: Uint8Array; + mimeType: string; +} + +export abstract class BlobStorageAdapter extends Connection { + abstract getBlob(spaceId: string, key: string): Promise; + abstract setBlob(spaceId: string, blob: Blob): Promise; + abstract deleteBlob(spaceId: string, key: string): Promise; + abstract listBlobs(spaceId: string): Promise; +} diff --git a/packages/backend/server/src/core/doc/storage/connection.ts b/packages/backend/server/src/core/doc/storage/connection.ts new file mode 100644 index 0000000000..f82a72fbd3 --- /dev/null +++ b/packages/backend/server/src/core/doc/storage/connection.ts @@ -0,0 +1,11 @@ +export class Connection { + protected connected: boolean = false; + connect(): Promise { + this.connected = true; + return Promise.resolve(); + } + disconnect(): Promise { + this.connected = false; + return Promise.resolve(); + } +} diff --git a/packages/backend/server/src/core/doc/storage/doc.ts b/packages/backend/server/src/core/doc/storage/doc.ts new file mode 100644 index 0000000000..1e5dd8cb02 --- /dev/null +++ b/packages/backend/server/src/core/doc/storage/doc.ts @@ -0,0 +1,205 @@ +import { + applyUpdate, + Doc, + encodeStateAsUpdate, + encodeStateVector, + mergeUpdates, + UndoManager, +} from 'yjs'; + +import { CallTimer } from '../../../fundamentals'; +import { Connection } from './connection'; +import { SingletonLocker } from './lock'; + +export interface DocRecord { + spaceId: string; + docId: string; + bin: Uint8Array; + timestamp: number; +} + +export interface DocUpdate { + bin: Uint8Array; + timestamp: number; +} + +export interface HistoryFilter { + before?: number; + limit?: number; +} + +export interface DocStorageOptions { + mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; +} + +export abstract class DocStorageAdapter extends Connection { + private readonly locker = new SingletonLocker(); + + constructor( + protected readonly options: DocStorageOptions = { + mergeUpdates, + } + ) { + super(); + } + + // open apis + isEmptyBin(bin: Uint8Array): boolean { + return ( + bin.length === 0 || + // 0x0 for state vector + (bin.length === 1 && bin[0] === 0) || + // 0x00 for update + (bin.length === 2 && bin[0] === 0 && bin[1] === 0) + ); + } + + async getDoc(spaceId: string, docId: string): Promise { + await using _lock = await this.lockDocForUpdate(spaceId, docId); + + const snapshot = await this.getDocSnapshot(spaceId, docId); + const updates = await this.getDocUpdates(spaceId, docId); + + if (updates.length) { + const { timestamp, bin } = await this.squash( + snapshot ? [snapshot, ...updates] : updates + ); + + const newSnapshot = { + spaceId: spaceId, + docId, + bin, + timestamp, + }; + + const success = await this.setDocSnapshot(newSnapshot); + + // if there is old snapshot, create a new history record + if (success && snapshot) { + await this.createDocHistory(snapshot); + } + + // always mark updates as merged unless throws + await this.markUpdatesMerged(spaceId, docId, updates); + + return newSnapshot; + } + + return snapshot; + } + + abstract pushDocUpdates( + spaceId: string, + docId: string, + updates: Uint8Array[] + ): Promise; + + abstract deleteDoc(spaceId: string, docId: string): Promise; + abstract deleteSpace(spaceId: string): Promise; + async rollbackDoc( + spaceId: string, + docId: string, + timestamp: number + ): Promise { + await using _lock = await this.lockDocForUpdate(spaceId, docId); + const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp); + if (!toSnapshot) { + throw new Error('Can not find the version to rollback to.'); + } + + const fromSnapshot = await this.getDocSnapshot(spaceId, docId); + + if (!fromSnapshot) { + throw new Error('Can not find the current version of the doc.'); + } + + const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin); + await this.pushDocUpdates(spaceId, docId, [change]); + // force create a new history record after rollback + await this.createDocHistory(fromSnapshot, true); + } + + abstract getSpaceDocTimestamps( + spaceId: string, + after?: number + ): Promise | null>; + abstract listDocHistories( + spaceId: string, + docId: string, + query: { skip?: number; limit?: number } + ): Promise; + abstract getDocHistory( + spaceId: string, + docId: string, + timestamp: number + ): Promise; + + // api for internal usage + protected abstract getDocSnapshot( + spaceId: string, + docId: string + ): Promise; + protected abstract setDocSnapshot(snapshot: DocRecord): Promise; + protected abstract getDocUpdates( + spaceId: string, + docId: string + ): Promise; + protected abstract markUpdatesMerged( + spaceId: string, + docId: string, + updates: DocUpdate[] + ): Promise; + + protected abstract createDocHistory( + snapshot: DocRecord, + force?: boolean + ): Promise; + + @CallTimer('doc', 'squash') + protected async squash(updates: DocUpdate[]): Promise { + const merge = this.options?.mergeUpdates ?? mergeUpdates; + const lastUpdate = updates.at(-1); + if (!lastUpdate) { + throw new Error('No updates to be squashed.'); + } + + // fast return + if (updates.length === 1) { + return lastUpdate; + } + + const finalUpdate = await merge(updates.map(u => u.bin)); + + return { + bin: finalUpdate, + timestamp: lastUpdate.timestamp, + }; + } + + protected async lockDocForUpdate( + spaceId: string, + docId: string + ): Promise { + return this.locker.lock(`workspace:${spaceId}:update`, docId); + } + + protected generateChangeUpdate(newerBin: Uint8Array, olderBin: Uint8Array) { + const newerDoc = new Doc(); + applyUpdate(newerDoc, newerBin); + const olderDoc = new Doc(); + applyUpdate(olderDoc, olderBin); + + const newerState = encodeStateVector(newerDoc); + const olderState = encodeStateVector(olderDoc); + + const diff = encodeStateAsUpdate(newerDoc, olderState); + + const undoManager = new UndoManager(Array.from(newerDoc.share.values())); + + applyUpdate(olderDoc, diff); + + undoManager.undo(); + + return encodeStateAsUpdate(olderDoc, newerState); + } +} diff --git a/packages/backend/server/src/core/doc/storage/index.ts b/packages/backend/server/src/core/doc/storage/index.ts new file mode 100644 index 0000000000..a69fc46d92 --- /dev/null +++ b/packages/backend/server/src/core/doc/storage/index.ts @@ -0,0 +1,32 @@ +// TODO(@forehalo): share with frontend +import type { BlobStorageAdapter } from './blob'; +import { Connection } from './connection'; +import type { DocStorageAdapter } from './doc'; + +export class SpaceStorage extends Connection { + constructor( + public readonly doc: DocStorageAdapter, + public readonly blob: BlobStorageAdapter + ) { + super(); + } + + override async connect() { + await this.doc.connect(); + await this.blob.connect(); + } + + override async disconnect() { + await this.doc.disconnect(); + await this.blob.disconnect(); + } +} + +export { BlobStorageAdapter, type BlobStorageOptions } from './blob'; +export { + type DocRecord, + DocStorageAdapter, + type DocStorageOptions, + type DocUpdate, + type HistoryFilter, +} from './doc'; diff --git a/packages/backend/server/src/core/doc/storage/lock.ts b/packages/backend/server/src/core/doc/storage/lock.ts new file mode 100644 index 0000000000..c4fcf45f3e --- /dev/null +++ b/packages/backend/server/src/core/doc/storage/lock.ts @@ -0,0 +1,42 @@ +export interface Locker { + lock(domain: string, resource: string): Promise; +} + +export class SingletonLocker implements Locker { + lockedResource = new Map(); + constructor() {} + + async lock(domain: string, resource: string) { + let lock = this.lockedResource.get(`${domain}:${resource}`); + + if (!lock) { + lock = new Lock(); + } + + await lock.acquire(); + + return lock; + } +} + +export class Lock { + private inner: Promise = Promise.resolve(); + private release: () => void = () => {}; + + async acquire() { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + let release: () => void = null!; + const nextLock = new Promise(resolve => { + release = resolve; + }); + + await this.inner; + this.inner = nextLock; + this.release = release; + } + + [Symbol.asyncDispose]() { + this.release(); + return Promise.resolve(); + } +}