From 3cca879a83ab98e16f3b6bd8497f89cda8a544b7 Mon Sep 17 00:00:00 2001 From: pengx17 Date: Thu, 16 May 2024 06:31:04 +0000 Subject: [PATCH] refactor(electron): use sqlite to store server clock & sync meta (#6957) After this PR, IDB should not be used in desktop any longer. --- packages/common/infra/package.json | 1 + .../impls/engine/doc-sqlite.ts | 148 ++++++++---------- .../electron/src/helper/db/db-adapter.ts | 79 ++++++++++ .../frontend/electron/src/helper/db/index.ts | 48 ++++++ .../src/helper/db/workspace-db-adapter.ts | 2 +- packages/frontend/native/index.d.ts | 10 ++ packages/frontend/native/schema/src/lib.rs | 10 ++ packages/frontend/native/src/sqlite/mod.rs | 108 +++++++++++++ 8 files changed, 322 insertions(+), 84 deletions(-) diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index abf14e5d17..94f102ff5a 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -4,6 +4,7 @@ "private": true, "exports": { "./blocksuite": "./src/blocksuite/index.ts", + "./storage": "./src/storage/index.ts", "./app-config-storage": "./src/app-config-storage.ts", ".": "./src/index.ts" }, diff --git a/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-sqlite.ts b/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-sqlite.ts index a16791f38f..a22ddce918 100644 --- a/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-sqlite.ts +++ b/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-sqlite.ts @@ -1,15 +1,13 @@ import { apis } from '@affine/electron-api'; import type { ByteKV, ByteKVBehavior, DocStorage } from '@toeverything/infra'; import { AsyncLock, MemoryDocEventBus } from '@toeverything/infra'; -import type { DBSchema, IDBPDatabase, IDBPObjectStore } from 'idb'; -import { openDB } from 'idb'; export class SqliteDocStorage implements DocStorage { constructor(private readonly workspaceId: string) {} eventBus = new MemoryDocEventBus(); readonly doc = new Doc(this.workspaceId); - readonly syncMetadata = new KV(`${this.workspaceId}:sync-metadata`); - readonly serverClock = new KV(`${this.workspaceId}:server-clock`); + readonly syncMetadata = new SyncMetadataKV(this.workspaceId); + readonly serverClock = new ServerClockKV(this.workspaceId); } type DocType = DocStorage['doc']; @@ -72,102 +70,86 @@ class Doc implements DocType { } } -interface KvDBSchema extends DBSchema { - kv: { - key: string; - value: { key: string; val: Uint8Array }; - }; -} - -class KV implements ByteKV { - constructor(private readonly dbName: string) {} - - dbPromise: Promise> | null = null; - dbVersion = 1; - - upgradeDB(db: IDBPDatabase) { - db.createObjectStore('kv', { keyPath: 'key' }); +class SyncMetadataKV implements ByteKV { + constructor(private readonly workspaceId: string) {} + transaction(cb: (behavior: ByteKVBehavior) => Promise): Promise { + return cb(this); } - getDb() { - if (this.dbPromise === null) { - this.dbPromise = openDB(this.dbName, this.dbVersion, { - upgrade: db => this.upgradeDB(db), - }); + get(key: string): Uint8Array | null | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); } - return this.dbPromise; + return apis.db.getSyncMetadata(this.workspaceId, key); } - async transaction( - cb: (transaction: ByteKVBehavior) => Promise - ): Promise { - const db = await this.getDb(); - const store = db.transaction('kv', 'readwrite').objectStore('kv'); - - const behavior = new KVBehavior(store); - return await cb(behavior); + set(key: string, data: Uint8Array): void | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); + } + return apis.db.setSyncMetadata(this.workspaceId, key, data); } - async get(key: string): Promise { - const db = await this.getDb(); - const store = db.transaction('kv', 'readonly').objectStore('kv'); - return new KVBehavior(store).get(key); + keys(): string[] | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); + } + return apis.db.getSyncMetadataKeys(this.workspaceId); } - async set(key: string, value: Uint8Array): Promise { - const db = await this.getDb(); - const store = db.transaction('kv', 'readwrite').objectStore('kv'); - return new KVBehavior(store).set(key, value); + + del(key: string): void | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); + } + return apis.db.delSyncMetadata(this.workspaceId, key); } - async keys(): Promise { - const db = await this.getDb(); - const store = db.transaction('kv', 'readwrite').objectStore('kv'); - return new KVBehavior(store).keys(); - } - async clear() { - const db = await this.getDb(); - const store = db.transaction('kv', 'readwrite').objectStore('kv'); - return new KVBehavior(store).clear(); - } - async del(key: string) { - const db = await this.getDb(); - const store = db.transaction('kv', 'readwrite').objectStore('kv'); - return new KVBehavior(store).del(key); + + clear(): void | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); + } + return apis.db.clearSyncMetadata(this.workspaceId); } } -class KVBehavior implements ByteKVBehavior { - constructor( - private readonly store: IDBPObjectStore - ) {} - - async get(key: string): Promise { - const value = await this.store.get(key); - return value?.val ?? null; - } - async set(key: string, value: Uint8Array): Promise { - if (this.store.put === undefined) { - throw new Error('Cannot set in a readonly transaction'); - } - await this.store.put({ - key: key, - val: value, - }); - } - async keys(): Promise { - return await this.store.getAllKeys(); +class ServerClockKV implements ByteKV { + constructor(private readonly workspaceId: string) {} + transaction(cb: (behavior: ByteKVBehavior) => Promise): Promise { + return cb(this); } - async del(key: string) { - if (this.store.delete === undefined) { - throw new Error('Cannot set in a readonly transaction'); + get(key: string): Uint8Array | null | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); } - return await this.store.delete(key); + return apis.db.getServerClock(this.workspaceId, key); } - async clear() { - if (this.store.clear === undefined) { - throw new Error('Cannot set in a readonly transaction'); + set(key: string, data: Uint8Array): void | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); } - return await this.store.clear(); + return apis.db.setServerClock(this.workspaceId, key, data); + } + + keys(): string[] | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); + } + return apis.db.getServerClockKeys(this.workspaceId); + } + + del(key: string): void | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); + } + return apis.db.delServerClock(this.workspaceId, key); + } + + clear(): void | Promise { + if (!apis?.db) { + throw new Error('sqlite datasource is not available'); + } + return apis.db.clearServerClock(this.workspaceId); } } diff --git a/packages/frontend/electron/src/helper/db/db-adapter.ts b/packages/frontend/electron/src/helper/db/db-adapter.ts index 7caeeeae70..9c0c4f2864 100644 --- a/packages/frontend/electron/src/helper/db/db-adapter.ts +++ b/packages/frontend/electron/src/helper/db/db-adapter.ts @@ -1,6 +1,7 @@ import type { InsertRow } from '@affine/native'; import { SqliteConnection, ValidationResult } from '@affine/native'; import { WorkspaceVersion } from '@toeverything/infra/blocksuite'; +import type { ByteKVBehavior } from '@toeverything/infra/storage'; import { logger } from '../logger'; import { applyGuidCompatibilityFix, migrateToLatest } from './migration'; @@ -175,4 +176,82 @@ export class SQLiteAdapter { logger.error('replaceUpdates', error); } } + + serverClock: ByteKVBehavior = { + get: async key => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return null; + } + const blob = await this.db.getServerClock(key); + return blob?.data ?? null; + }, + set: async (key, data) => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return; + } + await this.db.setServerClock(key, data); + }, + keys: async () => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return []; + } + return await this.db.getServerClockKeys(); + }, + del: async key => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return; + } + await this.db.delServerClock(key); + }, + clear: async () => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return; + } + await this.db.clearServerClock(); + }, + }; + + syncMetadata: ByteKVBehavior = { + get: async key => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return null; + } + const blob = await this.db.getSyncMetadata(key); + return blob?.data ?? null; + }, + set: async (key, data) => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return; + } + await this.db.setSyncMetadata(key, data); + }, + keys: async () => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return []; + } + return await this.db.getSyncMetadataKeys(); + }, + del: async key => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return; + } + await this.db.delSyncMetadata(key); + }, + clear: async () => { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return; + } + await this.db.clearSyncMetadata(); + }, + }; } diff --git a/packages/frontend/electron/src/helper/db/index.ts b/packages/frontend/electron/src/helper/db/index.ts index b71fb7302e..f34f08bb89 100644 --- a/packages/frontend/electron/src/helper/db/index.ts +++ b/packages/frontend/electron/src/helper/db/index.ts @@ -40,6 +40,54 @@ export const dbHandlers = { getDefaultStorageLocation: async () => { return await mainRPC.getPath('sessionData'); }, + getServerClock: async (workspaceId: string, key: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.serverClock.get(key); + }, + setServerClock: async ( + workspaceId: string, + key: string, + data: Uint8Array + ) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.serverClock.set(key, data); + }, + getServerClockKeys: async (workspaceId: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.serverClock.keys(); + }, + clearServerClock: async (workspaceId: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.serverClock.clear(); + }, + delServerClock: async (workspaceId: string, key: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.serverClock.del(key); + }, + getSyncMetadata: async (workspaceId: string, key: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.syncMetadata.get(key); + }, + setSyncMetadata: async ( + workspaceId: string, + key: string, + data: Uint8Array + ) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.syncMetadata.set(key, data); + }, + getSyncMetadataKeys: async (workspaceId: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.syncMetadata.keys(); + }, + clearSyncMetadata: async (workspaceId: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.syncMetadata.clear(); + }, + delSyncMetadata: async (workspaceId: string, key: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.adapter.syncMetadata.del(key); + }, }; export const dbEvents = {} satisfies Record; diff --git a/packages/frontend/electron/src/helper/db/workspace-db-adapter.ts b/packages/frontend/electron/src/helper/db/workspace-db-adapter.ts index 9406e2e7e8..9ff214a5c8 100644 --- a/packages/frontend/electron/src/helper/db/workspace-db-adapter.ts +++ b/packages/frontend/electron/src/helper/db/workspace-db-adapter.ts @@ -31,7 +31,7 @@ export class WorkspaceSQLiteDB { this.update$.complete(); } - toDBDocId = (docId: string) => { + private readonly toDBDocId = (docId: string) => { return this.workspaceId === docId ? undefined : docId; }; diff --git a/packages/frontend/native/index.d.ts b/packages/frontend/native/index.d.ts index a82709e90f..d12c969509 100644 --- a/packages/frontend/native/index.d.ts +++ b/packages/frontend/native/index.d.ts @@ -13,6 +13,16 @@ export class SqliteConnection { getAllUpdates(): Promise> insertUpdates(updates: Array): Promise replaceUpdates(docId: string | undefined | null, updates: Array): Promise + getServerClock(key: string): Promise + setServerClock(key: string, data: Uint8Array): Promise + getServerClockKeys(): Promise> + clearServerClock(): Promise + delServerClock(key: string): Promise + getSyncMetadata(key: string): Promise + setSyncMetadata(key: string, data: Uint8Array): Promise + getSyncMetadataKeys(): Promise> + clearSyncMetadata(): Promise + delSyncMetadata(key: string): Promise initVersion(): Promise setVersion(version: number): Promise getMaxVersion(): Promise diff --git a/packages/frontend/native/schema/src/lib.rs b/packages/frontend/native/schema/src/lib.rs index b5709a15d9..d7daa6dda1 100644 --- a/packages/frontend/native/schema/src/lib.rs +++ b/packages/frontend/native/schema/src/lib.rs @@ -15,5 +15,15 @@ CREATE TABLE IF NOT EXISTS "blobs" ( CREATE TABLE IF NOT EXISTS "version_info" ( version NUMBER NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +); +CREATE TABLE IF NOT EXISTS "server_clock" ( + key TEXT PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +); +CREATE TABLE IF NOT EXISTS "sync_metadata" ( + key TEXT PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ) "#; diff --git a/packages/frontend/native/src/sqlite/mod.rs b/packages/frontend/native/src/sqlite/mod.rs index e400407f8f..e58ec75375 100644 --- a/packages/frontend/native/src/sqlite/mod.rs +++ b/packages/frontend/native/src/sqlite/mod.rs @@ -252,6 +252,114 @@ impl SqliteConnection { Ok(()) } + #[napi] + pub async fn get_server_clock(&self, key: String) -> Option { + sqlx::query_as!( + BlobRow, + "SELECT key, data, timestamp FROM server_clock WHERE key = ?", + key + ) + .fetch_one(&self.pool) + .await + .ok() + } + + #[napi] + pub async fn set_server_clock(&self, key: String, data: Uint8Array) -> napi::Result<()> { + let data = data.as_ref(); + sqlx::query!( + "INSERT INTO server_clock (key, data) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET data = excluded.data", + key, + data, + ) + .execute(&self.pool) + .await + .map_err(anyhow::Error::from)?; + Ok(()) + } + + #[napi] + pub async fn get_server_clock_keys(&self) -> napi::Result> { + let keys = sqlx::query!("SELECT key FROM server_clock") + .fetch_all(&self.pool) + .await + .map(|rows| rows.into_iter().map(|row| row.key).collect()) + .map_err(anyhow::Error::from)?; + Ok(keys) + } + + #[napi] + pub async fn clear_server_clock(&self) -> napi::Result<()> { + sqlx::query!("DELETE FROM server_clock") + .execute(&self.pool) + .await + .map_err(anyhow::Error::from)?; + Ok(()) + } + + #[napi] + pub async fn del_server_clock(&self, key: String) -> napi::Result<()> { + sqlx::query!("DELETE FROM server_clock WHERE key = ?", key) + .execute(&self.pool) + .await + .map_err(anyhow::Error::from)?; + Ok(()) + } + + #[napi] + pub async fn get_sync_metadata(&self, key: String) -> Option { + sqlx::query_as!( + BlobRow, + "SELECT key, data, timestamp FROM sync_metadata WHERE key = ?", + key + ) + .fetch_one(&self.pool) + .await + .ok() + } + + #[napi] + pub async fn set_sync_metadata(&self, key: String, data: Uint8Array) -> napi::Result<()> { + let data = data.as_ref(); + sqlx::query!( + "INSERT INTO sync_metadata (key, data) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET data = excluded.data", + key, + data, + ) + .execute(&self.pool) + .await + .map_err(anyhow::Error::from)?; + Ok(()) + } + + #[napi] + pub async fn get_sync_metadata_keys(&self) -> napi::Result> { + let keys = sqlx::query!("SELECT key FROM sync_metadata") + .fetch_all(&self.pool) + .await + .map(|rows| rows.into_iter().map(|row| row.key).collect()) + .map_err(anyhow::Error::from)?; + Ok(keys) + } + + #[napi] + pub async fn clear_sync_metadata(&self) -> napi::Result<()> { + sqlx::query!("DELETE FROM sync_metadata") + .execute(&self.pool) + .await + .map_err(anyhow::Error::from)?; + Ok(()) + } + + #[napi] + pub async fn del_sync_metadata(&self, key: String) -> napi::Result<()> { + sqlx::query!("DELETE FROM sync_metadata WHERE key = ?", key) + .execute(&self.pool) + .await + .map_err(anyhow::Error::from)?; + Ok(()) + } + #[napi] pub async fn init_version(&self) -> napi::Result<()> { // create version_info table