feat: local provider

This commit is contained in:
DarkSky
2023-01-02 18:55:53 +08:00
committed by DarkSky
parent b01703b836
commit 7fea77b64f
14 changed files with 464 additions and 142 deletions

View File

@@ -2,32 +2,52 @@ import assert from 'assert';
import { BlockSchema } from '@blocksuite/blocks/models';
import { Workspace } from '@blocksuite/store';
import { getLogger } from './index.js';
import { AffineProvider, BaseProvider } from './provider/index.js';
import { MemoryProvider } from './provider/index.js';
import { LocalProvider } from './provider/index.js';
import { getKVConfigure } from './store.js';
export class DataCenter {
private readonly _providers = new Map<string, typeof BaseProvider>();
private readonly _workspaces = new Map<string, Promise<BaseProvider>>();
private readonly _config;
private readonly _logger;
static async init(): Promise<DataCenter> {
const dc = new DataCenter();
dc.addProvider(AffineProvider);
dc.addProvider(MemoryProvider);
dc.addProvider(LocalProvider);
return dc;
}
private constructor() {
this._config = getKVConfigure('sys');
this._logger = getLogger('dc');
this._logger.enabled = true;
}
addProvider(provider: typeof BaseProvider) {
private addProvider(provider: typeof BaseProvider) {
this._providers.set(provider.id, provider);
}
private async _initWithProvider(id: string, providerId: string) {
private async _getProvider(id: string, providerId: string): Promise<string> {
const providerKey = `workspace:${id}:provider`;
if (this._providers.has(providerId)) {
await this._config.set(providerKey, providerId);
return providerId;
} else {
const providerValue = await this._config.get(providerKey);
if (providerValue) return providerValue;
}
throw Error(`Provider ${providerId} not found`);
}
private async _initWorkspace(id: string, pid: string): Promise<BaseProvider> {
this._logger(`Init workspace ${id} with ${pid}`);
const providerId = await this._getProvider(id, pid);
// init workspace & register block schema
const workspace = new Workspace({ room: id }).register(BlockSchema);
@@ -35,38 +55,22 @@ export class DataCenter {
assert(Provider);
const provider = new Provider();
console.log(`Loading workspace ${id} with provider ${Provider.id}`);
await provider.init(getKVConfigure(id), workspace);
await provider.init({
config: getKVConfigure(id),
logger: this._logger.extend(`${Provider.id}:${id}`),
workspace,
});
await provider.initData();
console.log(`Workspace ${id} loaded`);
this._logger(`Workspace ${id} loaded`);
return provider;
}
private async _initWorkspace(
id: string,
providerId: string
): Promise<BaseProvider> {
const providerKey = `workspace:${id}:provider`;
const providerValue = await this._config.get(providerKey);
if (this._providers.has(providerValue || providerId)) {
if (!providerValue) {
await this._config.set(providerKey, providerId);
}
return this._initWithProvider(id, await this._config.get(providerKey));
} else {
throw Error(`provider ${providerId} not found`);
}
}
async initWorkspace(
id: string,
provider = 'memory'
provider = 'local'
): Promise<Workspace | null> {
if (id) {
console.log('initWorkspace', id);
if (!this._workspaces.has(id)) {
this._workspaces.set(id, this._initWorkspace(id, provider));
}
@@ -81,4 +85,11 @@ export class DataCenter {
const config = getKVConfigure(workspace);
return config.set(key, value);
}
async getWorkspaceList() {
const keys = await this._config.keys();
return keys
.filter(k => k.startsWith('workspace:'))
.map(k => k.split(':')[1]);
}
}

View File

@@ -1,3 +1,4 @@
import debug from 'debug';
import { DataCenter } from './datacenter.js';
const _initializeDataCenter = () => {
@@ -13,3 +14,9 @@ const _initializeDataCenter = () => {
};
export const getDataCenter = _initializeDataCenter();
export function getLogger(namespace: string) {
const logger = debug(namespace);
logger.log = console.log.bind(console);
return logger;
}

View File

@@ -1 +1,79 @@
export { AffineProvider } from './provider.js';
import assert from 'assert';
import { applyUpdate } from 'yjs';
import type { InitialParams } from '../index.js';
import { LocalProvider } from '../local/index.js';
import { downloadWorkspace } from './apis.js';
import { token, Callback } from './token.js';
export class AffineProvider extends LocalProvider {
static id = 'affine';
private _onTokenRefresh?: Callback = undefined;
constructor() {
super();
}
async init(params: InitialParams) {
super.init(params);
this._onTokenRefresh = () => {
if (token.refresh) {
this._config.set('token', token.refresh);
}
};
assert(this._onTokenRefresh);
token.onChange(this._onTokenRefresh);
// initial login token
if (token.isExpired) {
try {
const refreshToken = await this._config.get('token');
await token.refreshToken(refreshToken);
if (token.refresh) {
this._config.set('token', token.refresh);
}
assert(token.isLogin);
} catch (_) {
this._logger('Authorization failed, fallback to local mode');
}
} else {
this._config.set('token', token.refresh);
}
}
async destroy() {
if (this._onTokenRefresh) {
token.offChange(this._onTokenRefresh);
}
}
async initData() {
await super.initData();
const workspace = this._workspace;
const doc = workspace.doc;
this._logger(`Login: ${token.isLogin}`);
if (workspace.room && token.isLogin) {
try {
const updates = await downloadWorkspace(workspace.room);
if (updates) {
applyUpdate(doc, new Uint8Array(updates));
}
} catch (e) {
this._logger('Failed to init cloud workspace', e);
}
}
// if after update, the space:meta is empty
// then we need to get map with doc
// just a workaround for yjs
doc.getMap('space:meta');
}
}

View File

@@ -1,74 +0,0 @@
import assert from 'assert';
import { Workspace } from '@blocksuite/store';
import { BaseProvider, ConfigStore } from '../index.js';
import { downloadWorkspace } from './apis.js';
import { token, Callback } from './token.js';
export class AffineProvider extends BaseProvider {
static id = 'affine';
private _onTokenRefresh?: Callback = undefined;
constructor() {
super();
}
async init(config: ConfigStore, workspace: Workspace) {
super.init(config, workspace);
this._onTokenRefresh = () => {
if (token.refresh) {
this._config.set('token', token.refresh);
}
};
assert(this._onTokenRefresh);
token.onChange(this._onTokenRefresh);
// initial login token
if (token.isExpired) {
try {
const refreshToken = await this._config.get('token');
await token.refreshToken(refreshToken);
if (token.refresh) {
this._config.set('token', token.refresh);
}
assert(token.isLogin);
} catch (_) {
console.warn('authorization failed, fallback to local mode');
}
} else {
this._config.set('token', token.refresh);
}
}
async destroy() {
if (this._onTokenRefresh) {
token.offChange(this._onTokenRefresh);
}
}
async initData() {
const workspace = this._workspace;
const doc = workspace.doc;
console.log(workspace.room, token.isLogin);
if (workspace.room && token.isLogin) {
try {
const updates = await downloadWorkspace(workspace.room);
if (updates) {
Workspace.Y.applyUpdate(doc, new Uint8Array(updates));
}
} catch (e) {
console.warn('Failed to init cloud workspace', e);
}
}
// if after update, the space:meta is empty
// then we need to get map with doc
// just a workaround for yjs
doc.getMap('space:meta');
}
}

View File

@@ -1,3 +1,4 @@
import { getLogger } from '../../index.js';
import { bareClient } from './request.js';
export interface AccessTokenMessage {
@@ -27,6 +28,7 @@ const login = (params: LoginParams): Promise<LoginResponse> =>
bareClient.post('api/user/token', { json: params }).json();
class Token {
private readonly _logger;
private _accessToken!: string;
private _refreshToken!: string;
@@ -34,17 +36,22 @@ class Token {
private _padding?: Promise<LoginResponse>;
constructor() {
this._logger = getLogger('token');
this._logger.enabled = true;
this._setToken(); // fill with default value
}
private _setToken(login?: LoginResponse) {
console.log('set login', login);
this._accessToken = login?.token || '';
this._refreshToken = login?.refresh || '';
this._user = Token.parse(this._accessToken);
if (login) {
this._logger('set login', login);
this.triggerChange(this._user);
} else {
this._logger('empty login');
}
}

View File

@@ -1,19 +1,23 @@
import { Workspace } from '@blocksuite/store';
/* eslint-disable @typescript-eslint/no-unused-vars */
import type { Workspace } from '@blocksuite/store';
import type { ConfigStore } from '../store.js';
import type { Logger, InitialParams, ConfigStore } from './index';
export class BaseProvider {
static id = 'memory';
static id = 'base';
protected _config!: ConfigStore;
protected _logger!: Logger;
protected _workspace!: Workspace;
constructor() {
// Nothing to do here
}
async init(config: ConfigStore, workspace: Workspace) {
this._config = config;
this._workspace = workspace;
async init(params: InitialParams) {
this._config = params.config;
this._logger = params.logger;
this._workspace = params.workspace;
this._logger.enabled = true;
}
async destroy() {
@@ -24,6 +28,16 @@ export class BaseProvider {
throw Error('Not implemented: initData');
}
// should return a blob url
async getBlob(_id: string): Promise<string | null> {
throw Error('Not implemented: getBlob');
}
// should return a blob unique id
async setBlob(_blob: Blob): Promise<string> {
throw Error('Not implemented: setBlob');
}
get workspace() {
return this._workspace;
}

View File

@@ -1,5 +1,17 @@
export type { ConfigStore } from '../store';
import type { Workspace } from '@blocksuite/store';
export { BaseProvider } from './base.js';
import type { getLogger } from '../index';
import type { ConfigStore } from '../store';
export type Logger = ReturnType<typeof getLogger>;
export type InitialParams = {
config: ConfigStore;
logger: Logger;
workspace: Workspace;
};
export type { ConfigStore, Workspace };
export type { BaseProvider } from './base.js';
export { AffineProvider } from './affine/index.js';
export { MemoryProvider } from './memory.js';
export { LocalProvider } from './local/index.js';

View File

@@ -0,0 +1,50 @@
import type { BlobStorage } from '@blocksuite/store';
import assert from 'assert';
import type { InitialParams } from '../index.js';
import { BaseProvider } from '../base.js';
import { IndexedDBProvider } from './indexeddb.js';
export class LocalProvider extends BaseProvider {
static id = 'local';
private _blobs!: BlobStorage;
private _idb?: IndexedDBProvider = undefined;
constructor() {
super();
}
async init(params: InitialParams) {
super.init(params);
const blobs = await this._workspace.blobs;
assert(blobs);
this._blobs = blobs;
}
async initData() {
assert(this._workspace.room);
this._logger('Loading local data');
this._idb = new IndexedDBProvider(
this._workspace.room,
this._workspace.doc
);
await this._idb.whenSynced;
this._logger('Local data loaded');
}
async destroy(): Promise<void> {
super.destroy();
if (this._idb) {
await this._idb.destroy();
}
}
async getBlob(id: string): Promise<string | null> {
return this._blobs.get(id);
}
async setBlob(blob: Blob): Promise<string> {
return this._blobs.set(blob);
}
}

View File

@@ -0,0 +1,199 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as idb from 'lib0/indexeddb.js';
import { Observable } from 'lib0/observable.js';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, transact } from 'yjs';
const customStoreName = 'custom';
const updatesStoreName = 'updates';
const PREFERRED_TRIM_SIZE = 500;
const fetchUpdates = async (provider: IndexedDBProvider) => {
const [updatesStore] = idb.transact(provider.db as IDBDatabase, [
updatesStoreName,
]); // , 'readonly')
if (updatesStore) {
const updates = await idb.getAll(
updatesStore,
idb.createIDBKeyRangeLowerBound(provider._dbref, false)
);
transact(
provider.doc,
() => {
updates.forEach(val => applyUpdate(provider.doc, val));
},
provider,
false
);
const lastKey = await idb.getLastKey(updatesStore);
provider._dbref = lastKey + 1;
const cnt = await idb.count(updatesStore);
provider._dbsize = cnt;
}
return updatesStore;
};
const storeState = (provider: IndexedDBProvider, forceStore = true) =>
fetchUpdates(provider).then(updatesStore => {
if (
updatesStore &&
(forceStore || provider._dbsize >= PREFERRED_TRIM_SIZE)
) {
idb
.addAutoKey(updatesStore, encodeStateAsUpdate(provider.doc))
.then(() =>
idb.del(
updatesStore,
idb.createIDBKeyRangeUpperBound(provider._dbref, true)
)
)
.then(() =>
idb.count(updatesStore).then(cnt => {
provider._dbsize = cnt;
})
);
}
});
export class IndexedDBProvider extends Observable<string> {
doc: Doc;
name: string;
_dbref: number;
_dbsize: number;
private _destroyed: boolean;
whenSynced: Promise<IndexedDBProvider>;
db: IDBDatabase | null;
private _db: Promise<IDBDatabase>;
private _storeTimeout: number;
private _storeTimeoutId: NodeJS.Timeout | null;
private _storeUpdate: (update: Uint8Array, origin: any) => void;
constructor(name: string, doc: Doc) {
super();
this.doc = doc;
this.name = name;
this._dbref = 0;
this._dbsize = 0;
this._destroyed = false;
this.db = null;
this._db = idb.openDB(name, db =>
idb.createStores(db, [['updates', { autoIncrement: true }], ['custom']])
);
this.whenSynced = this._db.then(async db => {
this.db = db;
const currState = encodeStateAsUpdate(doc);
const updatesStore = await fetchUpdates(this);
if (updatesStore) {
await idb.addAutoKey(updatesStore, currState);
}
if (this._destroyed) {
return this;
}
this.emit('synced', [this]);
return this;
});
// Timeout in ms untill data is merged and persisted in idb.
this._storeTimeout = 1000;
this._storeTimeoutId = null;
this._storeUpdate = (update: Uint8Array, origin: any) => {
if (this.db && origin !== this) {
const [updatesStore] = idb.transact(
/** @type {IDBDatabase} */ this.db,
[updatesStoreName]
);
if (updatesStore) {
idb.addAutoKey(updatesStore, update);
}
if (++this._dbsize >= PREFERRED_TRIM_SIZE) {
// debounce store call
if (this._storeTimeoutId !== null) {
clearTimeout(this._storeTimeoutId);
}
this._storeTimeoutId = setTimeout(() => {
storeState(this, false);
this._storeTimeoutId = null;
}, this._storeTimeout);
}
}
};
doc.on('update', this._storeUpdate);
this.destroy = this.destroy.bind(this);
doc.on('destroy', this.destroy);
}
override destroy() {
if (this._storeTimeoutId) {
clearTimeout(this._storeTimeoutId);
}
this.doc.off('update', this._storeUpdate);
this.doc.off('destroy', this.destroy);
this._destroyed = true;
return this._db.then(db => {
db.close();
});
}
/**
* Destroys this instance and removes all data from indexeddb.
*
* @return {Promise<void>}
*/
async clearData(): Promise<void> {
return this.destroy().then(() => {
idb.deleteDB(this.name);
});
}
/**
* @param {String | number | ArrayBuffer | Date} key
* @return {Promise<String | number | ArrayBuffer | Date | any>}
*/
async get(
key: string | number | ArrayBuffer | Date
): Promise<string | number | ArrayBuffer | Date | any> {
return this._db.then(db => {
const [custom] = idb.transact(db, [customStoreName], 'readonly');
if (custom) {
return idb.get(custom, key);
}
return undefined;
});
}
/**
* @param {String | number | ArrayBuffer | Date} key
* @param {String | number | ArrayBuffer | Date} value
* @return {Promise<String | number | ArrayBuffer | Date>}
*/
async set(
key: string | number | ArrayBuffer | Date,
value: string | number | ArrayBuffer | Date
): Promise<string | number | ArrayBuffer | Date> {
return this._db.then(db => {
const [custom] = idb.transact(db, [customStoreName]);
if (custom) {
return idb.put(custom, value, key);
}
return undefined;
});
}
/**
* @param {String | number | ArrayBuffer | Date} key
* @return {Promise<undefined>}
*/
async del(key: string | number | ArrayBuffer | Date): Promise<undefined> {
return this._db.then(db => {
const [custom] = idb.transact(db, [customStoreName]);
if (custom) {
return idb.del(custom, key);
}
return undefined;
});
}
}

View File

@@ -1,11 +0,0 @@
import { BaseProvider } from './base.js';
export class MemoryProvider extends BaseProvider {
constructor() {
super();
}
async initData() {
console.log('Skip data reload in memory provider');
}
}