From 8d3461884980748969b7491e4cd658f8d8105fe6 Mon Sep 17 00:00:00 2001 From: DarkSky Date: Wed, 10 Aug 2022 00:49:29 +0800 Subject: [PATCH] feat: binary export --- .vscode/settings.json | 1 + .../db-service/src/services/database/index.ts | 7 +- libs/datasource/jwt-rpc/src/sqlite.ts | 160 ++++++++++-------- libs/datasource/jwt/src/adapter/yjs/index.ts | 26 ++- pnpm-lock.yaml | 47 +---- 5 files changed, 119 insertions(+), 122 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index c3d35c66ff..d6087c2b04 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,6 +4,7 @@ "editor.formatOnSaveMode": "file", "prettier.prettierPath": "./node_modules/prettier", "cSpell.words": [ + "AUTOINCREMENT", "Backlinks", "blockdb", "booktitle", diff --git a/libs/datasource/db-service/src/services/database/index.ts b/libs/datasource/db-service/src/services/database/index.ts index de83b24db0..439e6ca90e 100644 --- a/libs/datasource/db-service/src/services/database/index.ts +++ b/libs/datasource/db-service/src/services/database/index.ts @@ -78,7 +78,10 @@ export class Database { } async getDatabase(workspace: string, options?: BlockInitOptions) { - const db = await _getBlockDatabase(workspace, options); + const db = await _getBlockDatabase(workspace, { + ...this.#options, + ...options, + }); return db; } @@ -87,7 +90,7 @@ export class Database { name: string, listener: (connectivity: Connectivity) => void ) { - const db = await _getBlockDatabase(workspace); + const db = await _getBlockDatabase(workspace, this.#options); return db.addConnectivityListener(name, state => { const connectivity = state.get(name); if (connectivity) listener(connectivity); diff --git a/libs/datasource/jwt-rpc/src/sqlite.ts b/libs/datasource/jwt-rpc/src/sqlite.ts index 103078d05f..36e374efef 100644 --- a/libs/datasource/jwt-rpc/src/sqlite.ts +++ b/libs/datasource/jwt-rpc/src/sqlite.ts @@ -4,7 +4,7 @@ import { Observable } from 'lib0/observable.js'; const PREFERRED_TRIM_SIZE = 500; -const STMTS = { +const _stmts = { create: 'CREATE TABLE updates (key INTEGER PRIMARY KEY AUTOINCREMENT, value BLOB);', selectAll: 'SELECT * FROM updates where key >= $idx', selectCount: 'SELECT count(*) FROM updates', @@ -14,48 +14,19 @@ const STMTS = { }; const countUpdates = (db: Database) => { - const [cnt] = db.exec(STMTS.selectCount); + const [cnt] = db.exec(_stmts.selectCount); return cnt.values[0]?.[0] as number; }; const clearUpdates = (db: Database, idx: number) => { - db.exec(STMTS.delete, { $idx: idx }); + db.exec(_stmts.delete, { $idx: idx }); }; -const fetchUpdates = async (provider: SQLiteProvider) => { - const db = provider.db!; - const updates = db - .exec(STMTS.selectAll, { $idx: provider._dbref }) +const getAllUpdates = (db: Database, idx: number) => { + return db + .exec(_stmts.selectAll, { $idx: idx }) .flatMap(val => val.values as [number, Uint8Array][]) .sort(([a], [b]) => a - b); - Y.transact( - provider.doc, - () => { - updates.forEach(([, update]) => - Y.applyUpdate(provider.doc, update) - ); - }, - provider, - false - ); - - const lastKey = Math.max(...updates.map(([idx]) => idx)); - provider._dbref = lastKey + 1; - provider._dbsize = countUpdates(db); - return db; -}; - -const storeState = async (provider: SQLiteProvider, forceStore = true) => { - const db = await fetchUpdates(provider); - - if (forceStore || provider._dbsize >= PREFERRED_TRIM_SIZE) { - db.exec(STMTS.insert, { $data: Y.encodeStateAsUpdate(provider.doc) }); - - clearUpdates(db, provider._dbref); - - provider._dbsize = countUpdates(db); - console.log(db.export()); - } }; let _sqliteInstance: SqlJsStatic | undefined; @@ -79,77 +50,120 @@ const initSQLiteInstance = async () => { export class SQLiteProvider extends Observable { doc: Y.Doc; name: string; - _dbref: number; - _dbsize: number; - private _destroyed: boolean; - whenSynced: Promise; db: Database | null; - private _db: Promise; + whenSynced: Promise; synced: boolean; - _storeTimeout: number; - _storeTimeoutId: NodeJS.Timeout | null; - _storeUpdate: (update: Uint8Array, origin: any) => void; - constructor(dbname: string, doc: Y.Doc) { + private _ref: number; + private _size: number; + private _destroyed: boolean; + private _db: Promise; + private _saver?: (binary: Uint8Array) => void; + private _destroy: () => void; + + constructor(name: string, doc: Y.Doc, origin?: Uint8Array) { super(); this.doc = doc; - this.name = dbname; + this.name = name; - this._dbref = 0; - this._dbsize = 0; + this._ref = 0; + this._size = 0; this._destroyed = false; this.db = null; this.synced = false; this._db = initSQLiteInstance().then(db => { - const sqlite = new db.Database(); - return sqlite.run(STMTS.create); + const sqlite = new db.Database(origin); + return sqlite.run(_stmts.create); }); this.whenSynced = this._db.then(async db => { this.db = db; const currState = Y.encodeStateAsUpdate(doc); - await fetchUpdates(this); - db.exec(STMTS.insert, { $data: currState }); + await this._fetchUpdates(); + db.exec(_stmts.insert, { $data: currState }); if (this._destroyed) return this; this.emit('synced', [this]); this.synced = true; return this; }); - // Timeout in ms untill data is merged and persisted in idb. - this._storeTimeout = 1000; + // Timeout in ms until data is merged and persisted in sqlite. + const storeTimeout = 1000; + let storeTimeoutId: NodeJS.Timer | undefined = undefined; - this._storeTimeoutId = null; + const storeUpdate = (update: Uint8Array, origin: any) => { + if (this._saver && this.db && origin !== this) { + this.db.exec(_stmts.insert, { $data: update }); - this._storeUpdate = (update: Uint8Array, origin: any) => { - if (this.db && origin !== this) { - this.db.exec(STMTS.insert, { $data: update }); - - if (++this._dbsize >= PREFERRED_TRIM_SIZE) { + if (++this._size >= PREFERRED_TRIM_SIZE) { // debounce store call - if (this._storeTimeoutId !== null) { - clearTimeout(this._storeTimeoutId); - } - this._storeTimeoutId = setTimeout(() => { - storeState(this, false); - this._storeTimeoutId = null; - }, this._storeTimeout); + if (storeTimeoutId) clearTimeout(storeTimeoutId); + + storeTimeoutId = setTimeout(() => { + this._storeState(); + storeTimeoutId = undefined; + }, storeTimeout); } } }; - doc.on('update', this._storeUpdate); + + doc.on('update', storeUpdate); this.destroy = this.destroy.bind(this); doc.on('destroy', this.destroy); + + this._destroy = () => { + if (storeTimeoutId) clearTimeout(storeTimeoutId); + + this.doc.off('update', storeUpdate); + this.doc.off('destroy', this.destroy); + }; + } + + registerExporter(saver: (binary: Uint8Array) => void) { + this._saver = saver; + } + + private async _storeState() { + await this._fetchUpdates(); + + if (this.db && this._size >= PREFERRED_TRIM_SIZE) { + this.db.exec(_stmts.insert, { + $data: Y.encodeStateAsUpdate(this.doc), + }); + + clearUpdates(this.db, this._ref); + + this._size = countUpdates(this.db); + + this._saver?.(this.db?.export()); + } + } + + private async _fetchUpdates() { + if (this.db) { + const updates = getAllUpdates(this.db, this._ref); + + Y.transact( + this.doc, + () => { + updates.forEach(([, update]) => + Y.applyUpdate(this.doc, update) + ); + }, + this, + false + ); + + const lastKey = Math.max(...updates.map(([idx]) => idx)); + this._ref = lastKey + 1; + this._size = countUpdates(this.db); + } } override destroy(): Promise { - if (this._storeTimeoutId) { - clearTimeout(this._storeTimeoutId); - } - this.doc.off('update', this._storeUpdate); - this.doc.off('destroy', this.destroy); + this._destroy(); this._destroyed = true; return this._db.then(db => { db.close(); @@ -159,7 +173,7 @@ export class SQLiteProvider extends Observable { // Destroys this instance and removes all data from SQLite. async clearData(): Promise { return this._db.then(db => { - db.exec(STMTS.drop); + db.exec(_stmts.drop); return this.destroy(); }); } diff --git a/libs/datasource/jwt/src/adapter/yjs/index.ts b/libs/datasource/jwt/src/adapter/yjs/index.ts index 9f2a530ed5..d815fe4738 100644 --- a/libs/datasource/jwt/src/adapter/yjs/index.ts +++ b/libs/datasource/jwt/src/adapter/yjs/index.ts @@ -102,6 +102,8 @@ async function _initYjsDatabase( params: YjsInitOptions['params']; userId: string; token?: string; + importData?: Uint8Array; + exportData?: (binary: Uint8Array) => void; } ): Promise { if (_asyncInitLoading.has(workspace)) { @@ -122,7 +124,9 @@ async function _initYjsDatabase( const doc = new Doc({ autoLoad: true, shouldLoad: true }); const idbp = new IndexedDBProvider(workspace, doc).whenSynced; - const fsp: SQLiteProvider | undefined = undefined; // new SQLiteProvider(workspace, doc).whenSynced; + + const fs = new SQLiteProvider(workspace, doc, options.importData); + if (options.exportData) fs.registerExporter(options.exportData); const wsp = _initWebsocketProvider( backend, @@ -132,7 +136,11 @@ async function _initYjsDatabase( params ); - const [idb, [awareness, ws], fstore] = await Promise.all([idbp, wsp, fsp]); + const [idb, [awareness, ws], fstore] = await Promise.all([ + idbp, + wsp, + fs.whenSynced, + ]); const binaries = new Doc({ autoLoad: true, shouldLoad: true }); const binariesIdb = await new IndexedDBProvider( @@ -166,6 +174,7 @@ async function _initYjsDatabase( awareness, idb, binariesIdb, + fstore, ws, backend, gatekeeper, @@ -182,6 +191,8 @@ export type YjsInitOptions = { params?: Record; userId?: string; token?: string; + importData?: Uint8Array; + exportData?: (binary: Uint8Array) => void; }; export class YjsAdapter implements AsyncDatabaseAdapter { @@ -206,11 +217,20 @@ export class YjsAdapter implements AsyncDatabaseAdapter { workspace: string, options: YjsInitOptions ): Promise { - const { backend, params = {}, userId = 'default', token } = options; + const { + backend, + params = {}, + userId = 'default', + token, + importData, + exportData, + } = options; const providers = await _initYjsDatabase(backend, workspace, { params, userId, token, + importData, + exportData, }); return new YjsAdapter(providers); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a632fb99bd..dacfe065aa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -194,7 +194,7 @@ importers: yjs: ^13.5.41 dependencies: authing-js-sdk: 4.23.35 - firebase-admin: 11.0.1 + firebase-admin: 11.0.1_@firebase+app-types@0.7.0 lib0: 0.2.52 lru-cache: 7.13.2 nanoid: 4.0.0 @@ -571,9 +571,6 @@ importers: dependencies: ffc-js-client-side-sdk: 1.1.5 - libs/datasource/jwst/pkg: - specifiers: {} - libs/datasource/jwt: specifiers: '@types/debug': ^4.1.7 @@ -3294,15 +3291,6 @@ packages: - utf-8-validate dev: true - /@firebase/auth-interop-types/0.1.6_@firebase+util@1.6.3: - resolution: {integrity: sha512-etIi92fW3CctsmR9e3sYM3Uqnoq861M0Id9mdOPF6PWIg38BXL5k4upCNBggGUpLIS0H1grMOvy/wn1xymwe2g==} - peerDependencies: - '@firebase/app-types': 0.x - '@firebase/util': 1.x - dependencies: - '@firebase/util': 1.6.3 - dev: false - /@firebase/auth-interop-types/0.1.6_pbfwexsq7uf6mrzcwnikj3g37m: resolution: {integrity: sha512-etIi92fW3CctsmR9e3sYM3Uqnoq861M0Id9mdOPF6PWIg38BXL5k4upCNBggGUpLIS0H1grMOvy/wn1xymwe2g==} peerDependencies: @@ -3311,7 +3299,6 @@ packages: dependencies: '@firebase/app-types': 0.7.0 '@firebase/util': 1.6.3 - dev: true /@firebase/auth-types/0.11.0_pbfwexsq7uf6mrzcwnikj3g37m: resolution: {integrity: sha512-q7Bt6cx+ySj9elQHTsKulwk3+qDezhzRBFC9zlQ1BjgMueUOnGMcvqmU0zuKlQ4RhLSH7MNAdBV2znVaoN3Vxw==} @@ -3347,19 +3334,6 @@ packages: '@firebase/util': 1.6.3 tslib: 2.4.0 - /@firebase/database-compat/0.2.4: - resolution: {integrity: sha512-VtsGixO5mTjNMJn6PwxAJEAR70fj+3blCXIdQKel3q+eYGZAfdqxox1+tzZDnf9NWBJpaOgAHPk3JVDxEo9NFQ==} - dependencies: - '@firebase/component': 0.5.17 - '@firebase/database': 0.13.4 - '@firebase/database-types': 0.9.12 - '@firebase/logger': 0.3.3 - '@firebase/util': 1.6.3 - tslib: 2.4.0 - transitivePeerDependencies: - - '@firebase/app-types' - dev: false - /@firebase/database-compat/0.2.4_@firebase+app-types@0.7.0: resolution: {integrity: sha512-VtsGixO5mTjNMJn6PwxAJEAR70fj+3blCXIdQKel3q+eYGZAfdqxox1+tzZDnf9NWBJpaOgAHPk3JVDxEo9NFQ==} dependencies: @@ -3371,7 +3345,6 @@ packages: tslib: 2.4.0 transitivePeerDependencies: - '@firebase/app-types' - dev: true /@firebase/database-types/0.9.10: resolution: {integrity: sha512-2ji6nXRRsY+7hgU6zRhUtK0RmSjVWM71taI7Flgaw+BnopCo/lDF5HSwxp8z7LtiHlvQqeRA3Ozqx5VhlAbiKg==} @@ -3386,19 +3359,6 @@ packages: '@firebase/app-types': 0.7.0 '@firebase/util': 1.6.3 - /@firebase/database/0.13.4: - resolution: {integrity: sha512-NW7bOoiaC4sJCj6DY/m9xHoFNa0CK32YPMCh6FiMweLCDQbOZM8Ql/Kn6yyuxCb7K7ypz9eSbRlCWQJsJRQjhg==} - dependencies: - '@firebase/auth-interop-types': 0.1.6_@firebase+util@1.6.3 - '@firebase/component': 0.5.17 - '@firebase/logger': 0.3.3 - '@firebase/util': 1.6.3 - faye-websocket: 0.11.4 - tslib: 2.4.0 - transitivePeerDependencies: - - '@firebase/app-types' - dev: false - /@firebase/database/0.13.4_@firebase+app-types@0.7.0: resolution: {integrity: sha512-NW7bOoiaC4sJCj6DY/m9xHoFNa0CK32YPMCh6FiMweLCDQbOZM8Ql/Kn6yyuxCb7K7ypz9eSbRlCWQJsJRQjhg==} dependencies: @@ -3410,7 +3370,6 @@ packages: tslib: 2.4.0 transitivePeerDependencies: - '@firebase/app-types' - dev: true /@firebase/firestore-compat/0.1.23_53yvy43rwpg2c45kgeszsxtrca: resolution: {integrity: sha512-QfcuyMAavp//fQnjSfCEpnbWi7spIdKaXys1kOLu7395fLr+U6ykmto1HUMCSz8Yus9cEr/03Ujdi2SUl2GUAA==} @@ -10904,12 +10863,12 @@ packages: semver-regex: 2.0.0 dev: true - /firebase-admin/11.0.1: + /firebase-admin/11.0.1_@firebase+app-types@0.7.0: resolution: {integrity: sha512-rL3wlZbi2Kb/KJgcmj1YHlD4ZhfmhfgRO2YJialxAllm0tj1IQea878hHuBLGmv4DpbW9t9nLvX9kddNR2Y65Q==} engines: {node: '>=14'} dependencies: '@fastify/busboy': 1.1.0 - '@firebase/database-compat': 0.2.4 + '@firebase/database-compat': 0.2.4_@firebase+app-types@0.7.0 '@firebase/database-types': 0.9.10 '@types/node': 18.0.1 jsonwebtoken: 8.5.1