feat(core): user data db (#7930)

This commit is contained in:
EYHN
2024-09-11 07:55:37 +00:00
parent 498a69af53
commit d93c3b3719
36 changed files with 1220 additions and 281 deletions

View File

@@ -0,0 +1,84 @@
import type { GlobalState } from '@toeverything/infra';
import { Service } from '@toeverything/infra';
import { map, type Observable, switchMap } from 'rxjs';
import type { UserDBService } from '../../userspace';
import type { EditorSettingProvider } from '../provider/editor-setting-provider';
export class CurrentUserDBEditorSettingProvider
extends Service
implements EditorSettingProvider
{
currentUserDB$ = this.userDBService.currentUserDB.db$;
fallback = new GlobalStateEditorSettingProvider(this.globalState);
constructor(
public readonly userDBService: UserDBService,
public readonly globalState: GlobalState
) {
super();
}
set(key: string, value: string): void {
if (this.currentUserDB$.value) {
this.currentUserDB$.value?.editorSetting.create({
key,
value,
});
} else {
this.fallback.set(key, value);
}
}
get(key: string): string | undefined {
if (this.currentUserDB$.value) {
return this.currentUserDB$.value?.editorSetting.get(key)?.value;
} else {
return this.fallback.get(key);
}
}
watchAll(): Observable<Record<string, string>> {
return this.currentUserDB$.pipe(
switchMap(db => {
if (db) {
return db.editorSetting.find$().pipe(
map(settings => {
return settings.reduce(
(acc, setting) => {
acc[setting.key] = setting.value;
return acc;
},
{} as Record<string, string>
);
})
);
} else {
return this.fallback.watchAll();
}
})
);
}
}
const storageKey = 'editor-setting';
class GlobalStateEditorSettingProvider implements EditorSettingProvider {
constructor(public readonly globalState: GlobalState) {}
set(key: string, value: string): void {
const all = this.globalState.get<Record<string, string>>(storageKey) ?? {};
const after = {
...all,
[key]: value,
};
this.globalState.set(storageKey, after);
}
get(key: string): string | undefined {
return this.globalState.get<Record<string, string>>(storageKey)?.[key];
}
watchAll(): Observable<Record<string, string>> {
return this.globalState
.watch<Record<string, string>>(storageKey)
.pipe(map(all => all ?? {}));
}
}

View File

@@ -1,7 +1,8 @@
import { type Framework, GlobalState } from '@toeverything/infra';
import { UserDBService } from '../userspace';
import { EditorSetting } from './entities/editor-setting';
import { GlobalStateEditorSettingProvider } from './impls/global-state';
import { CurrentUserDBEditorSettingProvider } from './impls/user-db';
import { EditorSettingProvider } from './provider/editor-setting-provider';
import { EditorSettingService } from './services/editor-setting';
export type { FontFamily } from './schema';
@@ -12,7 +13,8 @@ export function configureEditorSettingModule(framework: Framework) {
framework
.service(EditorSettingService)
.entity(EditorSetting, [EditorSettingProvider])
.impl(EditorSettingProvider, GlobalStateEditorSettingProvider, [
.impl(EditorSettingProvider, CurrentUserDBEditorSettingProvider, [
UserDBService,
GlobalState,
]);
}

View File

@@ -24,6 +24,7 @@ import { configureSystemFontFamilyModule } from './system-font-family';
import { configureTagModule } from './tag';
import { configureTelemetryModule } from './telemetry';
import { configureThemeEditorModule } from './theme-editor';
import { configureUserspaceModule } from './userspace';
export function configureCommonModules(framework: Framework) {
configureInfraModules(framework);
@@ -51,4 +52,5 @@ export function configureCommonModules(framework: Framework) {
configureEditorSettingModule(framework);
configureImportTemplateModule(framework);
configureCreateWorkspaceModule(framework);
configureUserspaceModule(framework);
}

View File

@@ -0,0 +1,34 @@
import { Entity, LiveData } from '@toeverything/infra';
import { finalize, of, switchMap } from 'rxjs';
import type { AuthService } from '../../cloud';
import type { UserspaceService } from '../services/userspace';
export class CurrentUserDB extends Entity {
constructor(
private readonly userDBService: UserspaceService,
private readonly authService: AuthService
) {
super();
}
db$ = LiveData.from(
this.authService.session.account$
.selector(a => a?.id)
.pipe(
switchMap(userId => {
if (userId) {
const ref = this.userDBService.openDB(userId);
return of(ref.obj).pipe(
finalize(() => {
ref.release();
})
);
} else {
return of(null);
}
})
),
null
);
}

View File

@@ -0,0 +1,34 @@
import { DocEngine, Entity } from '@toeverything/infra';
import type { WebSocketService } from '../../cloud';
import { UserDBDocServer } from '../impls/user-db-doc-server';
import type { UserspaceStorageProvider } from '../provider/storage';
export class UserDBEngine extends Entity<{
userId: string;
}> {
private readonly userId = this.props.userId;
private readonly socket = this.websocketService.newSocket();
readonly docEngine = new DocEngine(
this.userspaceStorageProvider.getDocStorage('affine-cloud:' + this.userId),
new UserDBDocServer(this.userId, this.socket)
);
canGracefulStop() {
// TODO(@eyhn): Implement this
return true;
}
constructor(
private readonly userspaceStorageProvider: UserspaceStorageProvider,
private readonly websocketService: WebSocketService
) {
super();
this.docEngine.start();
}
override dispose() {
this.docEngine.stop();
this.socket.close();
}
}

View File

@@ -0,0 +1,35 @@
import type {
Table as OrmTable,
TableSchemaBuilder,
} from '@toeverything/infra';
import { Entity } from '@toeverything/infra';
import type { UserDBEngine } from './user-db-engine';
export class UserDBTable<Schema extends TableSchemaBuilder> extends Entity<{
table: OrmTable<Schema>;
storageDocId: string;
engine: UserDBEngine;
}> {
readonly table = this.props.table;
readonly docEngine = this.props.engine.docEngine;
isSyncing$ = this.docEngine
.docState$(this.props.storageDocId)
.map(docState => docState.syncing);
isLoading$ = this.docEngine
.docState$(this.props.storageDocId)
.map(docState => docState.loading);
create: typeof this.table.create = this.table.create.bind(this.table);
update: typeof this.table.update = this.table.update.bind(this.table);
get: typeof this.table.get = this.table.get.bind(this.table);
// eslint-disable-next-line rxjs/finnish
get$: typeof this.table.get$ = this.table.get$.bind(this.table);
find: typeof this.table.find = this.table.find.bind(this.table);
// eslint-disable-next-line rxjs/finnish
find$: typeof this.table.find$ = this.table.find$.bind(this.table);
keys: typeof this.table.keys = this.table.keys.bind(this.table);
delete: typeof this.table.delete = this.table.delete.bind(this.table);
}

View File

@@ -0,0 +1,46 @@
import { createORMClient, Entity, YjsDBAdapter } from '@toeverything/infra';
import { Doc as YDoc } from 'yjs';
import { USER_DB_SCHEMA } from '../schema';
import { UserDBEngine } from './user-db-engine';
import { UserDBTable } from './user-db-table';
const UserDBClient = createORMClient(USER_DB_SCHEMA);
export class UserDB extends Entity<{
userId: string;
}> {
readonly engine = this.framework.createEntity(UserDBEngine, {
userId: this.props.userId,
});
readonly db = new UserDBClient(
new YjsDBAdapter(USER_DB_SCHEMA, {
getDoc: guid => {
const ydoc = new YDoc({
guid,
});
this.engine.docEngine.addDoc(ydoc, false);
this.engine.docEngine.setPriority(ydoc.guid, 50);
return ydoc;
},
})
);
constructor() {
super();
Object.entries(USER_DB_SCHEMA).forEach(([tableName]) => {
const table = this.framework.createEntity(UserDBTable, {
table: this.db[tableName as keyof typeof USER_DB_SCHEMA],
storageDocId: tableName,
engine: this.engine,
});
Object.defineProperty(this, tableName, {
get: () => table,
});
});
}
}
export type UserDBWithTables = UserDB & {
[K in keyof USER_DB_SCHEMA]: UserDBTable<USER_DB_SCHEMA[K]>;
};

View File

@@ -0,0 +1,266 @@
import type {
ByteKV,
ByteKVBehavior,
DocEvent,
DocEventBus,
DocStorage,
} from '@toeverything/infra';
import type { DBSchema, IDBPDatabase, IDBPObjectStore } from 'idb';
import { openDB } from 'idb';
import { mergeUpdates } from 'yjs';
class BroadcastChannelDocEventBus implements DocEventBus {
senderChannel = new BroadcastChannel('user-db:' + this.userId);
constructor(private readonly userId: string) {}
emit(event: DocEvent): void {
this.senderChannel.postMessage(event);
}
on(cb: (event: DocEvent) => void): () => void {
const listener = (event: MessageEvent<DocEvent>) => {
cb(event.data);
};
const channel = new BroadcastChannel('user-db:' + this.userId);
channel.addEventListener('message', listener);
return () => {
channel.removeEventListener('message', listener);
channel.close();
};
}
}
function isEmptyUpdate(binary: Uint8Array) {
return (
binary.byteLength === 0 ||
(binary.byteLength === 2 && binary[0] === 0 && binary[1] === 0)
);
}
export class IndexedDBUserspaceDocStorage implements DocStorage {
constructor(private readonly userId: string) {}
eventBus = new BroadcastChannelDocEventBus(this.userId);
readonly doc = new Doc(this.userId);
readonly syncMetadata = new KV(`affine-cloud:${this.userId}:sync-metadata`);
readonly serverClock = new KV(`affine-cloud:${this.userId}:server-clock`);
}
interface DocDBSchema extends DBSchema {
userspace: {
key: string;
value: {
id: string;
updates: {
timestamp: number;
update: Uint8Array;
}[];
};
};
}
type DocType = DocStorage['doc'];
class Doc implements DocType {
dbName = 'affine-cloud:' + this.userId + ':doc';
dbPromise: Promise<IDBPDatabase<DocDBSchema>> | null = null;
dbVersion = 1;
constructor(private readonly userId: string) {}
upgradeDB(db: IDBPDatabase<DocDBSchema>) {
db.createObjectStore('userspace', { keyPath: 'id' });
}
getDb() {
if (this.dbPromise === null) {
this.dbPromise = openDB<DocDBSchema>(this.dbName, this.dbVersion, {
upgrade: db => this.upgradeDB(db),
});
}
return this.dbPromise;
}
async get(docId: string): Promise<Uint8Array | null> {
const db = await this.getDb();
const store = db
.transaction('userspace', 'readonly')
.objectStore('userspace');
const data = await store.get(docId);
if (!data) {
return null;
}
const updates = data.updates
.map(({ update }) => update)
.filter(update => !isEmptyUpdate(update));
const update = updates.length > 0 ? mergeUpdates(updates) : null;
return update;
}
async set(docId: string, data: Uint8Array) {
const db = await this.getDb();
const store = db
.transaction('userspace', 'readwrite')
.objectStore('userspace');
const rows = [{ timestamp: Date.now(), update: data }];
await store.put({
id: docId,
updates: rows,
});
}
async keys() {
const db = await this.getDb();
const store = db
.transaction('userspace', 'readonly')
.objectStore('userspace');
return store.getAllKeys();
}
clear(): void | Promise<void> {
return;
}
del(_key: string): void | Promise<void> {
return;
}
async transaction<T>(
cb: (transaction: ByteKVBehavior) => Promise<T>
): Promise<T> {
const db = await this.getDb();
const store = db
.transaction('userspace', 'readwrite')
.objectStore('userspace');
return await cb({
async get(docId) {
const data = await store.get(docId);
if (!data) {
return null;
}
const { updates } = data;
const update = mergeUpdates(updates.map(({ update }) => update));
return update;
},
keys() {
return store.getAllKeys();
},
async set(docId, data) {
const rows = [{ timestamp: Date.now(), update: data }];
await store.put({
id: docId,
updates: rows,
});
},
async clear() {
return await store.clear();
},
async del(key) {
return store.delete(key);
},
});
}
}
interface KvDBSchema extends DBSchema {
kv: {
key: string;
value: { key: string; val: Uint8Array };
};
}
class KV implements ByteKV {
constructor(private readonly dbName: string) {}
dbPromise: Promise<IDBPDatabase<KvDBSchema>> | null = null;
dbVersion = 1;
upgradeDB(db: IDBPDatabase<KvDBSchema>) {
db.createObjectStore('kv', { keyPath: 'key' });
}
getDb() {
if (this.dbPromise === null) {
this.dbPromise = openDB<KvDBSchema>(this.dbName, this.dbVersion, {
upgrade: db => this.upgradeDB(db),
});
}
return this.dbPromise;
}
async transaction<T>(
cb: (transaction: ByteKVBehavior) => Promise<T>
): Promise<T> {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
const behavior = new KVBehavior(store);
return await cb(behavior);
}
async get(key: string): Promise<Uint8Array | null> {
const db = await this.getDb();
const store = db.transaction('kv', 'readonly').objectStore('kv');
return new KVBehavior(store).get(key);
}
async set(key: string, value: Uint8Array): Promise<void> {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).set(key, value);
}
async keys(): Promise<string[]> {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).keys();
}
async clear() {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).clear();
}
async del(key: string) {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).del(key);
}
}
class KVBehavior implements ByteKVBehavior {
constructor(
private readonly store: IDBPObjectStore<KvDBSchema, ['kv'], 'kv', any>
) {}
async get(key: string): Promise<Uint8Array | null> {
const value = await this.store.get(key);
return value?.val ?? null;
}
async set(key: string, value: Uint8Array): Promise<void> {
if (this.store.put === undefined) {
throw new Error('Cannot set in a readonly transaction');
}
await this.store.put({
key: key,
val: value,
});
}
async keys(): Promise<string[]> {
return await this.store.getAllKeys();
}
async del(key: string) {
if (this.store.delete === undefined) {
throw new Error('Cannot set in a readonly transaction');
}
return await this.store.delete(key);
}
async clear() {
if (this.store.clear === undefined) {
throw new Error('Cannot set in a readonly transaction');
}
return await this.store.clear();
}
}

View File

@@ -0,0 +1,185 @@
import { apis } from '@affine/electron-api';
import type {
ByteKV,
ByteKVBehavior,
DocEvent,
DocEventBus,
DocStorage,
} from '@toeverything/infra';
import { AsyncLock } from '@toeverything/infra';
class BroadcastChannelDocEventBus implements DocEventBus {
senderChannel = new BroadcastChannel('user-db:' + this.userId);
constructor(private readonly userId: string) {}
emit(event: DocEvent): void {
this.senderChannel.postMessage(event);
}
on(cb: (event: DocEvent) => void): () => void {
const listener = (event: MessageEvent<DocEvent>) => {
cb(event.data);
};
const channel = new BroadcastChannel('user-db:' + this.userId);
channel.addEventListener('message', listener);
return () => {
channel.removeEventListener('message', listener);
channel.close();
};
}
}
export class SqliteUserspaceDocStorage implements DocStorage {
constructor(private readonly userId: string) {}
eventBus = new BroadcastChannelDocEventBus(this.userId);
readonly doc = new Doc(this.userId);
readonly syncMetadata = new SyncMetadataKV(this.userId);
readonly serverClock = new ServerClockKV(this.userId);
}
type DocType = DocStorage['doc'];
class Doc implements DocType {
lock = new AsyncLock();
constructor(private readonly userId: string) {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
}
async transaction<T>(
cb: (transaction: ByteKVBehavior) => Promise<T>
): Promise<T> {
using _lock = await this.lock.acquire();
return await cb(this);
}
keys(): string[] | Promise<string[]> {
return [];
}
async get(docId: string) {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
const update = await apis.db.getDocAsUpdates(
'userspace',
this.userId,
docId
);
if (update) {
if (
update.byteLength === 0 ||
(update.byteLength === 2 && update[0] === 0 && update[1] === 0)
) {
return null;
}
return update;
}
return null;
}
async set(docId: string, data: Uint8Array) {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
await apis.db.applyDocUpdate('userspace', this.userId, data, docId);
}
clear(): void | Promise<void> {
return;
}
async del(docId: string) {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
await apis.db.deleteDoc('userspace', this.userId, docId);
}
}
class SyncMetadataKV implements ByteKV {
constructor(private readonly userId: string) {}
transaction<T>(cb: (behavior: ByteKVBehavior) => Promise<T>): Promise<T> {
return cb(this);
}
get(key: string): Uint8Array | null | Promise<Uint8Array | null> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getSyncMetadata('userspace', this.userId, key);
}
set(key: string, data: Uint8Array): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.setSyncMetadata('userspace', this.userId, key, data);
}
keys(): string[] | Promise<string[]> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getSyncMetadataKeys('userspace', this.userId);
}
del(key: string): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.delSyncMetadata('userspace', this.userId, key);
}
clear(): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.clearSyncMetadata('userspace', this.userId);
}
}
class ServerClockKV implements ByteKV {
constructor(private readonly userId: string) {}
transaction<T>(cb: (behavior: ByteKVBehavior) => Promise<T>): Promise<T> {
return cb(this);
}
get(key: string): Uint8Array | null | Promise<Uint8Array | null> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getServerClock('userspace', this.userId, key);
}
set(key: string, data: Uint8Array): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.setServerClock('userspace', this.userId, key, data);
}
keys(): string[] | Promise<string[]> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getServerClockKeys('userspace', this.userId);
}
del(key: string): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.delServerClock('userspace', this.userId, key);
}
clear(): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.clearServerClock('userspace', this.userId);
}
}

View File

@@ -0,0 +1,195 @@
import { DebugLogger } from '@affine/debug';
import {
ErrorNames,
UserFriendlyError,
type UserFriendlyErrorResponse,
} from '@affine/graphql';
import { type DocServer, throwIfAborted } from '@toeverything/infra';
import type { Socket } from 'socket.io-client';
import {
base64ToUint8Array,
uint8ArrayToBase64,
} from '../../workspace-engine/utils/base64';
type WebsocketResponse<T> = { error: UserFriendlyErrorResponse } | { data: T };
const logger = new DebugLogger('affine-cloud-doc-engine-server');
export class UserDBDocServer implements DocServer {
interruptCb: ((reason: string) => void) | null = null;
SEND_TIMEOUT = 30000;
constructor(
private readonly userId: string,
private readonly socket: Socket
) {}
private async clientHandShake() {
await this.socket.emitWithAck('space:join', {
spaceType: 'userspace',
spaceId: this.userId,
clientVersion: runtimeConfig.appVersion,
});
}
async pullDoc(docId: string, state: Uint8Array) {
// for testing
await (window as any)._TEST_SIMULATE_SYNC_LAG;
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
const response: WebsocketResponse<{
missing: string;
state: string;
timestamp: number;
}> = await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('space:load-doc', {
spaceType: 'userspace',
spaceId: this.userId,
docId: docId,
stateVector,
});
if ('error' in response) {
const error = new UserFriendlyError(response.error);
if (error.name === ErrorNames.DOC_NOT_FOUND) {
return null;
} else {
throw error;
}
} else {
return {
data: base64ToUint8Array(response.data.missing),
stateVector: response.data.state
? base64ToUint8Array(response.data.state)
: undefined,
serverClock: response.data.timestamp,
};
}
}
async pushDoc(docId: string, data: Uint8Array) {
const payload = await uint8ArrayToBase64(data);
const response: WebsocketResponse<{ timestamp: number }> = await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('space:push-doc-updates', {
spaceType: 'userspace',
spaceId: this.userId,
docId: docId,
updates: [payload],
});
if ('error' in response) {
logger.error('client-update-v2 error', {
userId: this.userId,
guid: docId,
response,
});
throw new UserFriendlyError(response.error);
}
return { serverClock: response.data.timestamp };
}
async loadServerClock(after: number): Promise<Map<string, number>> {
const response: WebsocketResponse<Record<string, number>> =
await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('space:load-doc-timestamps', {
spaceType: 'userspace',
spaceId: this.userId,
timestamp: after,
});
if ('error' in response) {
logger.error('client-pre-sync error', {
workspaceId: this.userId,
response,
});
throw new UserFriendlyError(response.error);
}
return new Map(Object.entries(response.data));
}
async subscribeAllDocs(
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void
): Promise<() => void> {
const handleUpdate = async (message: {
spaceType: string;
spaceId: string;
docId: string;
updates: string[];
timestamp: number;
}) => {
if (
message.spaceType === 'userspace' &&
message.spaceId === this.userId
) {
message.updates.forEach(update => {
cb({
docId: message.docId,
data: base64ToUint8Array(update),
serverClock: message.timestamp,
});
});
}
};
this.socket.on('space:broadcast-doc-updates', handleUpdate);
return () => {
this.socket.off('space:broadcast-doc-updates', handleUpdate);
};
}
async waitForConnectingServer(signal: AbortSignal): Promise<void> {
this.socket.on('server-version-rejected', this.handleVersionRejected);
this.socket.on('disconnect', this.handleDisconnect);
throwIfAborted(signal);
if (this.socket.connected) {
await this.clientHandShake();
} else {
this.socket.connect();
await new Promise<void>((resolve, reject) => {
this.socket.on('connect', () => {
resolve();
});
signal.addEventListener('abort', () => {
reject('aborted');
});
});
throwIfAborted(signal);
await this.clientHandShake();
}
}
disconnectServer(): void {
if (!this.socket) {
return;
}
this.socket.emit('space:leave', {
spaceType: 'userspace',
spaceId: this.userId,
});
this.socket.off('server-version-rejected', this.handleVersionRejected);
this.socket.off('disconnect', this.handleDisconnect);
this.socket.disconnect();
}
onInterrupted = (cb: (reason: string) => void) => {
this.interruptCb = cb;
};
handleInterrupted = (reason: string) => {
this.interruptCb?.(reason);
};
handleDisconnect = (reason: Socket.DisconnectReason) => {
this.interruptCb?.(reason);
};
handleVersionRejected = () => {
this.interruptCb?.('Client version rejected');
};
}

View File

@@ -0,0 +1,40 @@
export { UserspaceService as UserDBService } from './services/userspace';
import type { Framework } from '@toeverything/infra';
import { AuthService, WebSocketService } from '../cloud';
import { CurrentUserDB } from './entities/current-user-db';
import { UserDB } from './entities/user-db';
import { UserDBEngine } from './entities/user-db-engine';
import { UserDBTable } from './entities/user-db-table';
import { IndexedDBUserspaceDocStorage } from './impls/indexeddb-storage';
import { SqliteUserspaceDocStorage } from './impls/sqlite-storage';
import { UserspaceStorageProvider } from './provider/storage';
import { UserspaceService } from './services/userspace';
export function configureUserspaceModule(framework: Framework) {
framework
.service(UserspaceService)
.entity(CurrentUserDB, [UserspaceService, AuthService])
.entity(UserDB)
.entity(UserDBTable)
.entity(UserDBEngine, [UserspaceStorageProvider, WebSocketService]);
}
export function configureIndexedDBUserspaceStorageProvider(
framework: Framework
) {
framework.impl(UserspaceStorageProvider, {
getDocStorage(userId: string) {
return new IndexedDBUserspaceDocStorage(userId);
},
});
}
export function configureSqliteUserspaceStorageProvider(framework: Framework) {
framework.impl(UserspaceStorageProvider, {
getDocStorage(userId: string) {
return new SqliteUserspaceDocStorage(userId);
},
});
}

View File

@@ -0,0 +1,8 @@
import { createIdentifier, type DocStorage } from '@toeverything/infra';
export interface UserspaceStorageProvider {
getDocStorage(userId: string): DocStorage;
}
export const UserspaceStorageProvider =
createIdentifier<UserspaceStorageProvider>('UserspaceStorageProvider');

View File

@@ -0,0 +1,9 @@
import { type DBSchemaBuilder, f } from '@toeverything/infra';
export const USER_DB_SCHEMA = {
editorSetting: {
key: f.string().primaryKey(),
value: f.string(),
},
} as const satisfies DBSchemaBuilder;
export type USER_DB_SCHEMA = typeof USER_DB_SCHEMA;

View File

@@ -0,0 +1,35 @@
import { ObjectPool, Service } from '@toeverything/infra';
import { CurrentUserDB } from '../entities/current-user-db';
import { UserDB, type UserDBWithTables } from '../entities/user-db';
export class UserspaceService extends Service {
pool = new ObjectPool<string, UserDBWithTables>({
onDelete(obj) {
obj.dispose();
},
onDangling(obj) {
return obj.engine.canGracefulStop();
},
});
private _currentUserDB: CurrentUserDB | null = null;
get currentUserDB() {
if (!this._currentUserDB) {
this._currentUserDB = this.framework.createEntity(CurrentUserDB);
}
return this._currentUserDB;
}
openDB(userId: string) {
const exists = this.pool.get(userId);
if (exists) {
return exists;
}
const db = this.framework.createEntity(UserDB, {
userId,
}) as UserDBWithTables;
return this.pool.put(userId, db);
}
}

View File

@@ -10,7 +10,7 @@ export class SqliteBlobStorage implements BlobStorage {
readonly = false;
async get(key: string) {
assertExists(apis);
const buffer = await apis.db.getBlob(this.workspaceId, key);
const buffer = await apis.db.getBlob('workspace', this.workspaceId, key);
if (buffer) {
return bufferToBlob(buffer);
}
@@ -19,6 +19,7 @@ export class SqliteBlobStorage implements BlobStorage {
async set(key: string, value: Blob) {
assertExists(apis);
await apis.db.addBlob(
'workspace',
this.workspaceId,
key,
new Uint8Array(await value.arrayBuffer())
@@ -27,10 +28,10 @@ export class SqliteBlobStorage implements BlobStorage {
}
delete(key: string) {
assertExists(apis);
return apis.db.deleteBlob(this.workspaceId, key);
return apis.db.deleteBlob('workspace', this.workspaceId, key);
}
list() {
assertExists(apis);
return apis.db.getBlobKeys(this.workspaceId);
return apis.db.getBlobKeys('workspace', this.workspaceId);
}
}

View File

@@ -37,7 +37,11 @@ class Doc implements DocType {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
const update = await apis.db.getDocAsUpdates(this.workspaceId, docId);
const update = await apis.db.getDocAsUpdates(
'workspace',
this.workspaceId,
docId
);
if (update) {
if (
@@ -57,7 +61,7 @@ class Doc implements DocType {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
await apis.db.applyDocUpdate(this.workspaceId, data, docId);
await apis.db.applyDocUpdate('workspace', this.workspaceId, data, docId);
}
clear(): void | Promise<void> {
@@ -68,7 +72,7 @@ class Doc implements DocType {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
await apis.db.deleteDoc(this.workspaceId, docId);
await apis.db.deleteDoc('workspace', this.workspaceId, docId);
}
}
@@ -82,35 +86,35 @@ class SyncMetadataKV implements ByteKV {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getSyncMetadata(this.workspaceId, key);
return apis.db.getSyncMetadata('workspace', this.workspaceId, key);
}
set(key: string, data: Uint8Array): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.setSyncMetadata(this.workspaceId, key, data);
return apis.db.setSyncMetadata('workspace', this.workspaceId, key, data);
}
keys(): string[] | Promise<string[]> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getSyncMetadataKeys(this.workspaceId);
return apis.db.getSyncMetadataKeys('workspace', this.workspaceId);
}
del(key: string): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.delSyncMetadata(this.workspaceId, key);
return apis.db.delSyncMetadata('workspace', this.workspaceId, key);
}
clear(): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.clearSyncMetadata(this.workspaceId);
return apis.db.clearSyncMetadata('workspace', this.workspaceId);
}
}
@@ -124,34 +128,34 @@ class ServerClockKV implements ByteKV {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getServerClock(this.workspaceId, key);
return apis.db.getServerClock('workspace', this.workspaceId, key);
}
set(key: string, data: Uint8Array): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.setServerClock(this.workspaceId, key, data);
return apis.db.setServerClock('workspace', this.workspaceId, key, data);
}
keys(): string[] | Promise<string[]> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getServerClockKeys(this.workspaceId);
return apis.db.getServerClockKeys('workspace', this.workspaceId);
}
del(key: string): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.delServerClock(this.workspaceId, key);
return apis.db.delServerClock('workspace', this.workspaceId, key);
}
clear(): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.clearServerClock(this.workspaceId);
return apis.db.clearServerClock('workspace', this.workspaceId);
}
}