feat: basic db support

This commit is contained in:
DarkSky
2022-08-09 18:58:18 +08:00
parent 73d7d4e1a8
commit 0083b7445a
9 changed files with 445 additions and 24 deletions

View File

@@ -5,7 +5,11 @@
"author": "DarkSky <darksky2048@gmail.com>",
"dependencies": {
"lib0": "^0.2.52",
"sql.js": "^1.7.0",
"yjs": "^13.5.41",
"y-protocols": "^1.0.5"
},
"devDependencies": {
"@types/sql.js": "^1.4.3"
}
}

View File

@@ -1 +1,3 @@
export { IndexedDBProvider } from './indexeddb';
export { WebsocketProvider } from './provider';
export { SQLiteProvider } from './sqlite';

View File

@@ -0,0 +1,185 @@
import * as Y from 'yjs';
import * as idb from 'lib0/indexeddb.js';
import * as mutex from 'lib0/mutex.js';
import { Observable } from 'lib0/observable.js';
const customStoreName = 'custom';
const updatesStoreName = 'updates';
const PREFERRED_TRIM_SIZE = 500;
const fetchUpdates = async (provider: IndexedDBProvider) => {
const [updatesStore] = idb.transact(provider.db as IDBDatabase, [
updatesStoreName,
]); // , 'readonly')
const updates = await idb.getAll(
updatesStore,
idb.createIDBKeyRangeLowerBound(provider._dbref, false)
);
Y.transact(
provider.doc,
() => {
updates.forEach(val => Y.applyUpdate(provider.doc, val));
},
provider,
false
);
const lastKey = await idb.getLastKey(updatesStore);
provider._dbref = lastKey + 1;
const cnt = await idb.count(updatesStore);
provider._dbsize = cnt;
return updatesStore;
};
const storeState = (provider: IndexedDBProvider, forceStore = true) =>
fetchUpdates(provider).then(updatesStore => {
if (forceStore || provider._dbsize >= PREFERRED_TRIM_SIZE) {
idb.addAutoKey(updatesStore, Y.encodeStateAsUpdate(provider.doc))
.then(() =>
idb.del(
updatesStore,
idb.createIDBKeyRangeUpperBound(provider._dbref, true)
)
)
.then(() =>
idb.count(updatesStore).then(cnt => {
provider._dbsize = cnt;
})
);
}
});
export class IndexedDBProvider extends Observable<string> {
doc: Y.Doc;
name: string;
private _mux: mutex.mutex;
_dbref: number;
_dbsize: number;
private _destroyed: boolean;
whenSynced: Promise<IndexedDBProvider>;
db: IDBDatabase | null;
private _db: Promise<IDBDatabase>;
private synced: boolean;
private _storeTimeout: number;
private _storeTimeoutId: NodeJS.Timeout | null;
private _storeUpdate: (update: Uint8Array, origin: any) => void;
constructor(name: string, doc: Y.Doc) {
super();
this.doc = doc;
this.name = name;
this._mux = mutex.createMutex();
this._dbref = 0;
this._dbsize = 0;
this._destroyed = false;
this.db = null;
this.synced = false;
this._db = idb.openDB(name, db =>
idb.createStores(db, [
['updates', { autoIncrement: true }],
['custom'],
])
);
this.whenSynced = this._db.then(async db => {
this.db = db;
const currState = Y.encodeStateAsUpdate(doc);
const updatesStore = await fetchUpdates(this);
await idb.addAutoKey(updatesStore, currState);
if (this._destroyed) return this;
this.emit('synced', [this]);
this.synced = true;
return this;
});
// Timeout in ms untill data is merged and persisted in idb.
this._storeTimeout = 1000;
this._storeTimeoutId = null;
this._storeUpdate = (update: Uint8Array, origin: any) => {
if (this.db && origin !== this) {
const [updatesStore] = idb.transact(
/** @type {IDBDatabase} */ this.db,
[updatesStoreName]
);
idb.addAutoKey(updatesStore, update);
if (++this._dbsize >= PREFERRED_TRIM_SIZE) {
// debounce store call
if (this._storeTimeoutId !== null) {
clearTimeout(this._storeTimeoutId);
}
this._storeTimeoutId = setTimeout(() => {
storeState(this, false);
this._storeTimeoutId = null;
}, this._storeTimeout);
}
}
};
doc.on('update', this._storeUpdate);
this.destroy = this.destroy.bind(this);
doc.on('destroy', this.destroy);
}
override destroy() {
if (this._storeTimeoutId) {
clearTimeout(this._storeTimeoutId);
}
this.doc.off('update', this._storeUpdate);
this.doc.off('destroy', this.destroy);
this._destroyed = true;
return this._db.then(db => {
db.close();
});
}
/**
* Destroys this instance and removes all data from SQLite.
*
* @return {Promise<void>}
*/
async clearData(): Promise<void> {
return this.destroy().then(() => {
idb.deleteDB(this.name);
});
}
/**
* @param {String | number | ArrayBuffer | Date} key
* @return {Promise<String | number | ArrayBuffer | Date | any>}
*/
async get(
key: string | number | ArrayBuffer | Date
): Promise<string | number | ArrayBuffer | Date | any> {
return this._db.then(db => {
const [custom] = idb.transact(db, [customStoreName], 'readonly');
return idb.get(custom, key);
});
}
/**
* @param {String | number | ArrayBuffer | Date} key
* @param {String | number | ArrayBuffer | Date} value
* @return {Promise<String | number | ArrayBuffer | Date>}
*/
async set(
key: string | number | ArrayBuffer | Date,
value: string | number | ArrayBuffer | Date
): Promise<string | number | ArrayBuffer | Date> {
return this._db.then(db => {
const [custom] = idb.transact(db, [customStoreName]);
return idb.put(custom, value, key);
});
}
/**
* @param {String | number | ArrayBuffer | Date} key
* @return {Promise<undefined>}
*/
async del(key: string | number | ArrayBuffer | Date): Promise<undefined> {
return this._db.then(db => {
const [custom] = idb.transact(db, [customStoreName]);
return idb.del(custom, key);
});
}
}

View File

@@ -0,0 +1,166 @@
import * as Y from 'yjs';
import sqlite, { Database, SqlJsStatic } from 'sql.js';
import { Observable } from 'lib0/observable.js';
const PREFERRED_TRIM_SIZE = 500;
const STMTS = {
create: 'CREATE TABLE updates (key INTEGER PRIMARY KEY AUTOINCREMENT, value BLOB);',
selectAll: 'SELECT * FROM updates where key >= $idx',
selectCount: 'SELECT count(*) FROM updates',
insert: 'INSERT INTO updates VALUES (null, $data);',
delete: 'DELETE FROM updates WHERE key < $idx',
drop: 'DROP TABLE updates;',
};
const countUpdates = (db: Database) => {
const [cnt] = db.exec(STMTS.selectCount);
return cnt.values[0]?.[0] as number;
};
const clearUpdates = (db: Database, idx: number) => {
db.exec(STMTS.delete, { $idx: idx });
};
const fetchUpdates = async (provider: SQLiteProvider) => {
const db = provider.db!;
const updates = db
.exec(STMTS.selectAll, { $idx: provider._dbref })
.flatMap(val => val.values as [number, Uint8Array][])
.sort(([a], [b]) => a - b);
Y.transact(
provider.doc,
() => {
updates.forEach(([, update]) =>
Y.applyUpdate(provider.doc, update)
);
},
provider,
false
);
const lastKey = Math.max(...updates.map(([idx]) => idx));
provider._dbref = lastKey + 1;
provider._dbsize = countUpdates(db);
return db;
};
const storeState = async (provider: SQLiteProvider, forceStore = true) => {
const db = await fetchUpdates(provider);
if (forceStore || provider._dbsize >= PREFERRED_TRIM_SIZE) {
db.exec(STMTS.insert, { $data: Y.encodeStateAsUpdate(provider.doc) });
clearUpdates(db, provider._dbref);
provider._dbsize = countUpdates(db);
console.log(db.export());
}
};
let _sqliteInstance: SqlJsStatic | undefined;
let _sqliteProcessing = false;
const sleep = () => new Promise(resolve => setTimeout(resolve, 500));
const initSQLiteInstance = async () => {
while (_sqliteProcessing) {
await sleep();
}
if (_sqliteInstance) return _sqliteInstance;
_sqliteProcessing = true;
_sqliteInstance = await sqlite({
locateFile: () =>
new URL('sql.js/dist/sql-wasm.wasm', import.meta.url).href,
});
_sqliteProcessing = false;
return _sqliteInstance;
};
export class SQLiteProvider extends Observable<string> {
doc: Y.Doc;
name: string;
_dbref: number;
_dbsize: number;
private _destroyed: boolean;
whenSynced: Promise<SQLiteProvider>;
db: Database | null;
private _db: Promise<Database>;
synced: boolean;
_storeTimeout: number;
_storeTimeoutId: NodeJS.Timeout | null;
_storeUpdate: (update: Uint8Array, origin: any) => void;
constructor(dbname: string, doc: Y.Doc) {
super();
this.doc = doc;
this.name = dbname;
this._dbref = 0;
this._dbsize = 0;
this._destroyed = false;
this.db = null;
this.synced = false;
this._db = initSQLiteInstance().then(db => {
const sqlite = new db.Database();
return sqlite.run(STMTS.create);
});
this.whenSynced = this._db.then(async db => {
this.db = db;
const currState = Y.encodeStateAsUpdate(doc);
await fetchUpdates(this);
db.exec(STMTS.insert, { $data: currState });
if (this._destroyed) return this;
this.emit('synced', [this]);
this.synced = true;
return this;
});
// Timeout in ms untill data is merged and persisted in idb.
this._storeTimeout = 1000;
this._storeTimeoutId = null;
this._storeUpdate = (update: Uint8Array, origin: any) => {
if (this.db && origin !== this) {
this.db.exec(STMTS.insert, { $data: update });
if (++this._dbsize >= PREFERRED_TRIM_SIZE) {
// debounce store call
if (this._storeTimeoutId !== null) {
clearTimeout(this._storeTimeoutId);
}
this._storeTimeoutId = setTimeout(() => {
storeState(this, false);
this._storeTimeoutId = null;
}, this._storeTimeout);
}
}
};
doc.on('update', this._storeUpdate);
this.destroy = this.destroy.bind(this);
doc.on('destroy', this.destroy);
}
override destroy(): Promise<void> {
if (this._storeTimeoutId) {
clearTimeout(this._storeTimeoutId);
}
this.doc.off('update', this._storeUpdate);
this.doc.off('destroy', this.destroy);
this._destroyed = true;
return this._db.then(db => {
db.close();
});
}
// Destroys this instance and removes all data from SQLite.
async clearData(): Promise<void> {
return this._db.then(db => {
db.exec(STMTS.drop);
return this.destroy();
});
}
}

View File

@@ -13,8 +13,7 @@
"flexsearch": "^0.7.21",
"lib0": "^0.2.52",
"lru-cache": "^7.13.2",
"ts-debounce": "^4.0.0",
"y-indexeddb": "^9.0.9"
"ts-debounce": "^4.0.0"
},
"dependencies": {
"@types/flexsearch": "^0.7.3",

View File

@@ -7,7 +7,6 @@ import { fromEvent } from 'file-selector';
import LRUCache from 'lru-cache';
import { debounce } from 'ts-debounce';
import { nanoid } from 'nanoid';
import { IndexeddbPersistence } from 'y-indexeddb';
import { Awareness } from 'y-protocols/awareness.js';
import {
Doc,
@@ -19,7 +18,11 @@ import {
snapshot,
} from 'yjs';
import { WebsocketProvider } from '@toeverything/datasource/jwt-rpc';
import {
IndexedDBProvider,
SQLiteProvider,
WebsocketProvider,
} from '@toeverything/datasource/jwt-rpc';
import {
AsyncDatabaseAdapter,
@@ -46,8 +49,9 @@ const logger = getLogger('BlockDB:yjs');
type YjsProviders = {
awareness: Awareness;
idb: IndexeddbPersistence;
binariesIdb: IndexeddbPersistence;
idb: IndexedDBProvider;
binariesIdb: IndexedDBProvider;
fstore?: SQLiteProvider;
ws?: WebsocketProvider;
backend: string;
gatekeeper: GateKeeper;
@@ -117,7 +121,9 @@ async function _initYjsDatabase(
const doc = new Doc({ autoLoad: true, shouldLoad: true });
const idbp = new IndexeddbPersistence(workspace, doc).whenSynced;
const idbp = new IndexedDBProvider(workspace, doc).whenSynced;
const fsp: SQLiteProvider | undefined = undefined; // new SQLiteProvider(workspace, doc).whenSynced;
const wsp = _initWebsocketProvider(
backend,
workspace,
@@ -126,10 +132,10 @@ async function _initYjsDatabase(
params
);
const [idb, [awareness, ws]] = await Promise.all([idbp, wsp]);
const [idb, [awareness, ws], fstore] = await Promise.all([idbp, wsp, fsp]);
const binaries = new Doc({ autoLoad: true, shouldLoad: true });
const binariesIdb = await new IndexeddbPersistence(
const binariesIdb = await new IndexedDBProvider(
`${workspace}_binaries`,
binaries
).whenSynced;
@@ -147,6 +153,7 @@ async function _initYjsDatabase(
awareness,
idb,
binariesIdb,
fstore,
ws,
backend,
gatekeeper,
@@ -374,7 +381,7 @@ export class YjsAdapter implements AsyncDatabaseAdapter<YjsContentOperation> {
};
check();
});
await new IndexeddbPersistence(this._provider.idb.name, doc)
await new IndexedDBProvider(this._provider.idb.name, doc)
.whenSynced;
applyUpdate(doc, new Uint8Array(binary));
await update_check;