feat(native): move sqlite operation into Rust (#2497)

Co-authored-by: Peng Xiao <pengxiao@outlook.com>
(cherry picked from commit d28c887237)
This commit is contained in:
LongYinan
2023-06-07 14:52:19 +08:00
committed by himself65
parent 057346ba95
commit 8d22316644
36 changed files with 1910 additions and 545 deletions

1
packages/native/.env Normal file
View File

@@ -0,0 +1 @@
DATABASE_URL="sqlite:affine.db"

View File

@@ -1 +1,2 @@
*.fixture
lib

View File

@@ -7,13 +7,15 @@ version = "0.0.0"
crate-type = ["cdylib"]
[dependencies]
affine_schema = { path = "./schema" }
anyhow = "1"
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
chrono = "0.4"
napi = { version = "2", default-features = false, features = [
"napi4",
"napi5",
"tokio_rt",
"serde-json",
"error_anyhow",
"chrono_date",
] }
napi-derive = "2"
notify = { version = "6", features = ["serde"] }
@@ -21,6 +23,13 @@ once_cell = "1"
parking_lot = "0.12"
serde = "1"
serde_json = "1"
sqlx = { version = "0.7.0-alpha.3", default-features = false, features = [
"sqlite",
"runtime-tokio",
"tls-rustls",
"chrono",
"macros",
] }
tokio = { version = "1", features = ["full"] }
uuid = { version = "1", default-features = false, features = [
"serde",
@@ -29,4 +38,16 @@ uuid = { version = "1", default-features = false, features = [
] }
[build-dependencies]
affine_schema = { path = "./schema" }
dotenv = "0.15"
napi-build = "2"
sqlx = { version = "0.7.0-alpha.3", default-features = false, features = [
"sqlite",
"runtime-tokio",
"tls-rustls",
"chrono",
"macros",
"migrate",
"json",
] }
tokio = { version = "1", features = ["full"] }

1
packages/native/affine.db Symbolic link
View File

@@ -0,0 +1 @@
../../affine.db

View File

@@ -1,6 +1,23 @@
extern crate napi_build;
use sqlx::sqlite::SqliteConnectOptions;
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
dotenv::dotenv().ok();
fn main() -> Result<(), std::io::Error> {
napi_build::setup();
let options = SqliteConnectOptions::new()
.filename("../../affine.db")
.journal_mode(sqlx::sqlite::SqliteJournalMode::Off)
.locking_mode(sqlx::sqlite::SqliteLockingMode::Exclusive)
.create_if_missing(true);
let pool = sqlx::sqlite::SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.await
.unwrap();
sqlx::query(affine_schema::SCHEMA)
.execute(&pool)
.await
.unwrap();
Ok(())
}

View File

@@ -7,7 +7,7 @@ export interface WatchOptions {
recursive?: boolean;
}
/** Watcher kind enumeration */
export enum WatcherKind {
export const enum WatcherKind {
/** inotify backend (linux) */
Inotify = 'Inotify',
/** FS-Event backend (mac) */
@@ -23,6 +23,16 @@ export enum WatcherKind {
Unknown = 'Unknown',
}
export function moveFile(src: string, dst: string): Promise<void>;
export interface BlobRow {
key: string;
data: Buffer;
timestamp: Date;
}
export interface UpdateRow {
id: number;
timestamp: Date;
data: Buffer;
}
export class Subscription {
toString(): string;
unsubscribe(): void;
@@ -39,3 +49,16 @@ export class FsWatcher {
static unwatch(p: string): void;
static close(): void;
}
export class SqliteConnection {
constructor(path: string);
connect(): Promise<void>;
addBlob(key: string, blob: Uint8Array): Promise<void>;
getBlob(key: string): Promise<BlobRow | null>;
deleteBlob(key: string): Promise<void>;
getBlobKeys(): Promise<Array<string>>;
getUpdates(): Promise<Array<UpdateRow>>;
insertUpdates(updates: Array<Uint8Array>): Promise<void>;
close(): Promise<void>;
get isClose(): boolean;
static validate(path: string): Promise<boolean>;
}

View File

@@ -263,9 +263,11 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`);
}
const { WatcherKind, Subscription, FsWatcher, moveFile } = nativeBinding;
const { WatcherKind, Subscription, FsWatcher, moveFile, SqliteConnection } =
nativeBinding;
module.exports.WatcherKind = WatcherKind;
module.exports.Subscription = Subscription;
module.exports.FsWatcher = FsWatcher;
module.exports.moveFile = moveFile;
module.exports.SqliteConnection = SqliteConnection;

View File

@@ -0,0 +1,4 @@
[package]
edition = "2021"
name = "affine_schema"
version = "0.0.0"

View File

@@ -0,0 +1 @@
A temporary crate to share the schema between AFFiNE native and `build.rs` in the AFFiNE native.

View File

@@ -0,0 +1,13 @@
// TODO
// dynamic create it from JavaScript side
// and remove this crate then.
pub const SCHEMA: &str = r#"CREATE TABLE IF NOT EXISTS "updates" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data BLOB NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS "blobs" (
key TEXT PRIMARY KEY NOT NULL,
data BLOB NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);"#;

View File

@@ -1 +1,2 @@
pub mod fs;
pub mod sqlite;

View File

@@ -0,0 +1,161 @@
use chrono::NaiveDateTime;
use napi::bindgen_prelude::{Buffer, Uint8Array};
use napi_derive::napi;
use sqlx::{
migrate::MigrateDatabase,
sqlite::{Sqlite, SqliteConnectOptions, SqlitePoolOptions},
Pool, Row,
};
#[napi(object)]
pub struct BlobRow {
pub key: String,
pub data: Buffer,
pub timestamp: NaiveDateTime,
}
#[napi(object)]
pub struct UpdateRow {
pub id: i64,
pub timestamp: NaiveDateTime,
pub data: Buffer,
}
#[napi]
pub struct SqliteConnection {
pool: Pool<Sqlite>,
path: String,
}
#[napi]
impl SqliteConnection {
#[napi(constructor)]
pub fn new(path: String) -> napi::Result<Self> {
let sqlite_options = SqliteConnectOptions::new()
.filename(&path)
.foreign_keys(false)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Off);
let pool = SqlitePoolOptions::new()
.max_connections(4)
.connect_lazy_with(sqlite_options);
Ok(Self { pool, path })
}
#[napi]
pub async fn connect(&self) -> napi::Result<()> {
if !Sqlite::database_exists(&self.path).await.unwrap_or(false) {
Sqlite::create_database(&self.path)
.await
.map_err(anyhow::Error::from)?;
};
let mut connection = self.pool.acquire().await.map_err(anyhow::Error::from)?;
sqlx::query(affine_schema::SCHEMA)
.execute(connection.as_mut())
.await
.map_err(anyhow::Error::from)?;
connection.detach();
Ok(())
}
#[napi]
pub async fn add_blob(&self, key: String, blob: Uint8Array) -> napi::Result<()> {
let blob = blob.as_ref();
sqlx::query_as!(
BlobRow,
"INSERT INTO blobs (key, data) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET data = excluded.data",
key,
blob,
)
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn get_blob(&self, key: String) -> Option<BlobRow> {
sqlx::query_as!(BlobRow, "SELECT * FROM blobs WHERE key = ?", key)
.fetch_one(&self.pool)
.await
.ok()
}
#[napi]
pub async fn delete_blob(&self, key: String) -> napi::Result<()> {
sqlx::query!("DELETE FROM blobs WHERE key = ?", key)
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn get_blob_keys(&self) -> napi::Result<Vec<String>> {
let keys = sqlx::query!("SELECT key FROM blobs")
.fetch_all(&self.pool)
.await
.map(|rows| rows.into_iter().map(|row| row.key).collect())
.map_err(anyhow::Error::from)?;
Ok(keys)
}
#[napi]
pub async fn get_updates(&self) -> napi::Result<Vec<UpdateRow>> {
let updates = sqlx::query_as!(UpdateRow, "SELECT * FROM updates")
.fetch_all(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(updates)
}
#[napi]
pub async fn insert_updates(&self, updates: Vec<Uint8Array>) -> napi::Result<()> {
let mut transaction = self.pool.begin().await.map_err(anyhow::Error::from)?;
for update in updates.into_iter() {
let update = update.as_ref();
sqlx::query_as!(UpdateRow, "INSERT INTO updates (data) VALUES ($1)", update)
.execute(&mut *transaction)
.await
.map_err(anyhow::Error::from)?;
}
transaction.commit().await.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn close(&self) {
self.pool.close().await;
}
#[napi(getter)]
pub fn is_close(&self) -> bool {
self.pool.is_closed()
}
#[napi]
pub async fn validate(path: String) -> bool {
if let Ok(pool) = SqlitePoolOptions::new()
.max_connections(1)
.connect(&path)
.await
{
if let Ok(res) = sqlx::query("SELECT name FROM sqlite_master WHERE type='table'")
.fetch_all(&pool)
.await
{
let names = res.iter().map(|row| row.get(0));
names.fold(0, |acc, cur: String| {
if cur == "updates" || cur == "blobs" {
acc + 1
} else {
acc
}
}) == 2
} else {
false
}
} else {
false
}
}
}