refactor: local storage

This commit is contained in:
DarkSky
2022-08-11 01:45:38 +08:00
parent 89191290e4
commit 86090be4a3
12 changed files with 318 additions and 74 deletions

View File

@@ -5,7 +5,7 @@ import { Observable } from 'lib0/observable.js';
const PREFERRED_TRIM_SIZE = 500;
const _stmts = {
create: 'CREATE TABLE updates (key INTEGER PRIMARY KEY AUTOINCREMENT, value BLOB);',
create: 'CREATE TABLE IF NOT EXISTS updates (key INTEGER PRIMARY KEY AUTOINCREMENT, value BLOB);',
selectAll: 'SELECT * FROM updates where key >= $idx',
selectCount: 'SELECT count(*) FROM updates',
insert: 'INSERT INTO updates VALUES (null, $data);',
@@ -59,7 +59,7 @@ export class SQLiteProvider extends Observable<string> {
private _size: number;
private _destroyed: boolean;
private _db: Promise<Database>;
private _saver?: (binary: Uint8Array) => void;
private _saver?: (binary: Uint8Array) => Promise<void> | undefined;
private _destroy: () => void;
constructor(name: string, doc: Y.Doc, origin?: Uint8Array) {
@@ -82,8 +82,9 @@ export class SQLiteProvider extends Observable<string> {
this.whenSynced = this._db.then(async db => {
this.db = db;
const currState = Y.encodeStateAsUpdate(doc);
await this._fetchUpdates();
await this._fetchUpdates(true);
db.exec(_stmts.insert, { $data: currState });
this._storeState();
if (this._destroyed) return this;
this.emit('synced', [this]);
this.synced = true;
@@ -91,21 +92,38 @@ export class SQLiteProvider extends Observable<string> {
});
// Timeout in ms until data is merged and persisted in sqlite.
const storeTimeout = 1000;
const storeTimeout = 500;
let storeTimeoutId: NodeJS.Timer | undefined = undefined;
let lastSize = 0;
const debouncedStoreState = (force = false) => {
// debounce store call
if (storeTimeoutId) clearTimeout(storeTimeoutId);
if (force) {
if (lastSize !== this._size) {
this._storeState();
storeTimeoutId = undefined;
lastSize = this._size;
}
} else {
storeTimeoutId = setTimeout(() => {
this._storeState();
storeTimeoutId = undefined;
}, storeTimeout);
}
};
const storeStateInterval = setInterval(
() => debouncedStoreState(true),
1000
);
const storeUpdate = (update: Uint8Array, origin: any) => {
if (this._saver && this.db && origin !== this) {
this.db.exec(_stmts.insert, { $data: update });
if (++this._size >= PREFERRED_TRIM_SIZE) {
// debounce store call
if (storeTimeoutId) clearTimeout(storeTimeoutId);
storeTimeoutId = setTimeout(() => {
this._storeState();
storeTimeoutId = undefined;
}, storeTimeout);
debouncedStoreState();
}
}
};
@@ -116,34 +134,53 @@ export class SQLiteProvider extends Observable<string> {
this._destroy = () => {
if (storeTimeoutId) clearTimeout(storeTimeoutId);
if (storeStateInterval) clearInterval(storeStateInterval);
this.doc.off('update', storeUpdate);
this.doc.off('destroy', this.destroy);
};
}
registerExporter(saver: (binary: Uint8Array) => void) {
registerExporter(saver: (binary: Uint8Array) => Promise<void> | undefined) {
this._saver = saver;
}
private async _storeState() {
private async _storeState(force?: boolean) {
await this._fetchUpdates();
if (this.db && this._size >= PREFERRED_TRIM_SIZE) {
this.db.exec(_stmts.insert, {
$data: Y.encodeStateAsUpdate(this.doc),
});
if (this.db) {
if (force || this._size >= PREFERRED_TRIM_SIZE) {
this.db.exec(_stmts.insert, {
$data: Y.encodeStateAsUpdate(this.doc),
});
clearUpdates(this.db, this._ref);
clearUpdates(this.db, this._ref);
this._size = countUpdates(this.db);
this._size = countUpdates(this.db);
}
this._saver?.(this.db?.export());
await this._saver?.(this.db?.export());
}
}
private async _fetchUpdates() {
private _waitUpdate(sync = false) {
if (sync) {
return new Promise<void>((resolve, reject) => {
const final = (_: any, origin: any) => {
if (origin === this) {
this.doc.off('update', final);
resolve();
}
};
this.doc.on('update', final);
});
}
return undefined;
}
private async _fetchUpdates(sync = false) {
if (this.db) {
const wait = this._waitUpdate(sync);
const updates = getAllUpdates(this.db, this._ref);
Y.transact(
@@ -160,6 +197,7 @@ export class SQLiteProvider extends Observable<string> {
const lastKey = Math.max(...updates.map(([idx]) => idx));
this._ref = lastKey + 1;
this._size = countUpdates(this.db);
await wait;
}
}