From 8c24f2b9067ff90b071e6ac4c581c6406136185a Mon Sep 17 00:00:00 2001 From: forehalo Date: Fri, 13 Dec 2024 06:13:05 +0000 Subject: [PATCH] feat(nbstore): add sqlite implementation (#8811) --- .cargo/config.toml | 4 + .github/workflows/build-test.yml | 9 +- .github/workflows/release-desktop.yml | 4 +- Cargo.lock | 193 ++------ Cargo.toml | 4 +- packages/common/infra/package.json | 24 +- packages/common/native/Cargo.toml | 5 + .../native/benches/hashcash.rs | 2 +- packages/common/nbstore/package.json | 6 +- .../common/nbstore/src/impls/sqlite/blob.ts | 33 ++ .../common/nbstore/src/impls/sqlite/db.ts | 83 ++++ .../common/nbstore/src/impls/sqlite/doc.ts | 54 +++ .../common/nbstore/src/impls/sqlite/index.ts | 3 + .../common/nbstore/src/impls/sqlite/sync.ts | 53 +++ .../nbstore/src/impls/sqlite/v1/blob.ts | 62 +++ .../common/nbstore/src/impls/sqlite/v1/doc.ts | 67 +++ .../nbstore/src/impls/sqlite/v1/index.ts | 2 + packages/common/nbstore/src/op/index.ts | 2 + .../__snapshots__/storage.spec.ts.snap | 41 ++ .../src/storage/__tests__/storage.spec.ts | 36 ++ packages/common/nbstore/src/storage/index.ts | 20 +- .../common/nbstore/src/storage/storage.ts | 75 +++ packages/frontend/apps/electron/package.json | 1 + .../apps/electron/src/helper/db/types.ts | 1 - .../apps/electron/src/helper/dialog/dialog.ts | 97 ++-- .../apps/electron/src/helper/dialog/index.ts | 4 +- .../apps/electron/src/helper/exposed.ts | 13 +- .../apps/electron/src/helper/nbstore/blob.ts | 33 ++ .../apps/electron/src/helper/nbstore/db.ts | 40 ++ .../apps/electron/src/helper/nbstore/doc.ts | 83 ++++ .../electron/src/helper/nbstore/handlers.ts | 143 ++++++ .../apps/electron/src/helper/nbstore/index.ts | 4 + .../electron/src/helper/nbstore/storage.ts | 127 +++++ .../apps/electron/src/helper/nbstore/sync.ts | 70 +++ .../helper/{db => nbstore/v1}/db-adapter.ts | 2 +- .../helper/{db => nbstore/v1}/ensure-db.ts | 5 +- .../src/helper/{db => nbstore/v1}/index.ts | 7 +- .../helper/{db => nbstore/v1}/merge-update.ts | 0 .../v1}/workspace-db-adapter.ts | 6 +- .../electron/src/helper/workspace/handlers.ts | 8 +- .../electron/src/helper/workspace/meta.ts | 38 +- .../apps/electron/test/db/ensure-db.spec.ts | 6 +- .../test/db/workspace-db-adapter.spec.ts | 4 +- packages/frontend/native/.env | 1 - packages/frontend/native/Cargo.toml | 25 +- .../frontend/native/__tests__/db.spec.mts | 3 +- packages/frontend/native/build.rs | 29 -- packages/frontend/native/event.d.ts | 43 -- packages/frontend/native/index.d.ts | 77 ++- packages/frontend/native/index.js | 6 +- packages/frontend/native/nbstore/Cargo.toml | 23 + packages/frontend/native/nbstore/build.rs | 33 ++ packages/frontend/native/nbstore/src/blob.rs | 198 ++++++++ packages/frontend/native/nbstore/src/doc.rs | 449 ++++++++++++++++++ packages/frontend/native/nbstore/src/lib.rs | 311 ++++++++++++ .../frontend/native/nbstore/src/storage.rs | 129 +++++ packages/frontend/native/nbstore/src/sync.rs | 289 +++++++++++ packages/frontend/native/package.json | 4 +- packages/frontend/native/schema/Cargo.toml | 3 + packages/frontend/native/schema/src/lib.rs | 115 ++++- packages/frontend/native/schema/src/v1.rs | 26 + packages/frontend/native/sqlite_v1/Cargo.toml | 23 + packages/frontend/native/sqlite_v1/build.rs | 35 ++ .../sqlite/mod.rs => sqlite_v1/src/lib.rs} | 2 +- packages/frontend/native/src/lib.rs | 4 +- yarn.lock | 27 +- 66 files changed, 2932 insertions(+), 397 deletions(-) rename packages/{frontend => common}/native/benches/hashcash.rs (95%) create mode 100644 packages/common/nbstore/src/impls/sqlite/blob.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/db.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/doc.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/index.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/sync.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/v1/blob.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/v1/doc.ts create mode 100644 packages/common/nbstore/src/impls/sqlite/v1/index.ts create mode 100644 packages/common/nbstore/src/storage/__tests__/__snapshots__/storage.spec.ts.snap create mode 100644 packages/common/nbstore/src/storage/__tests__/storage.spec.ts delete mode 100644 packages/frontend/apps/electron/src/helper/db/types.ts create mode 100644 packages/frontend/apps/electron/src/helper/nbstore/blob.ts create mode 100644 packages/frontend/apps/electron/src/helper/nbstore/db.ts create mode 100644 packages/frontend/apps/electron/src/helper/nbstore/doc.ts create mode 100644 packages/frontend/apps/electron/src/helper/nbstore/handlers.ts create mode 100644 packages/frontend/apps/electron/src/helper/nbstore/index.ts create mode 100644 packages/frontend/apps/electron/src/helper/nbstore/storage.ts create mode 100644 packages/frontend/apps/electron/src/helper/nbstore/sync.ts rename packages/frontend/apps/electron/src/helper/{db => nbstore/v1}/db-adapter.ts (99%) rename packages/frontend/apps/electron/src/helper/{db => nbstore/v1}/ensure-db.ts (92%) rename packages/frontend/apps/electron/src/helper/{db => nbstore/v1}/index.ts (96%) rename packages/frontend/apps/electron/src/helper/{db => nbstore/v1}/merge-update.ts (100%) rename packages/frontend/apps/electron/src/helper/{db => nbstore/v1}/workspace-db-adapter.ts (96%) delete mode 100644 packages/frontend/native/.env delete mode 100644 packages/frontend/native/event.d.ts create mode 100644 packages/frontend/native/nbstore/Cargo.toml create mode 100644 packages/frontend/native/nbstore/build.rs create mode 100644 packages/frontend/native/nbstore/src/blob.rs create mode 100644 packages/frontend/native/nbstore/src/doc.rs create mode 100644 packages/frontend/native/nbstore/src/lib.rs create mode 100644 packages/frontend/native/nbstore/src/storage.rs create mode 100644 packages/frontend/native/nbstore/src/sync.rs create mode 100644 packages/frontend/native/schema/src/v1.rs create mode 100644 packages/frontend/native/sqlite_v1/Cargo.toml create mode 100644 packages/frontend/native/sqlite_v1/build.rs rename packages/frontend/native/{src/sqlite/mod.rs => sqlite_v1/src/lib.rs} (99%) diff --git a/.cargo/config.toml b/.cargo/config.toml index b9fe725d03..84683d84e3 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -2,3 +2,7 @@ rustflags = ["-C", "target-feature=+crt-static"] [target.aarch64-pc-windows-msvc] rustflags = ["-C", "target-feature=+crt-static"] +[target.'cfg(target_os = "linux")'] +rustflags = ["-C", "link-args=-Wl,--warn-unresolved-symbols"] +[target.'cfg(target_os = "macos")'] +rustflags = ["-C", "link-args=-all_load"] diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 4c999c55b0..1b33441921 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -354,11 +354,10 @@ jobs: name: affine fail_ci_if_error: false - server-native-test: - name: Run server native tests + rust-test: + name: Run native tests runs-on: ubuntu-latest env: - RUSTFLAGS: -D warnings CARGO_TERM_COLOR: always steps: - uses: actions/checkout@v4 @@ -656,7 +655,7 @@ jobs: uses: ./.github/actions/setup-node timeout-minutes: 10 with: - extra-flags: workspaces focus @affine/electron @affine/monorepo @affine-test/affine-desktop + extra-flags: workspaces focus @affine/electron @affine/monorepo @affine-test/affine-desktop @affine/nbstore @toeverything/infra playwright-install: true hard-link-nm: false enableScripts: false @@ -756,7 +755,7 @@ jobs: - build-server-native - build-electron-renderer - server-test - - server-native-test + - rust-test - copilot-api-test - copilot-e2e-test - server-e2e-test diff --git a/.github/workflows/release-desktop.yml b/.github/workflows/release-desktop.yml index d9533732c5..c62cb8678e 100644 --- a/.github/workflows/release-desktop.yml +++ b/.github/workflows/release-desktop.yml @@ -108,7 +108,7 @@ jobs: timeout-minutes: 10 uses: ./.github/actions/setup-node with: - extra-flags: workspaces focus @affine/electron @affine/monorepo + extra-flags: workspaces focus @affine/electron @affine/monorepo @affine/nbstore @toeverything/infra hard-link-nm: false nmHoistingLimits: workspaces enableScripts: false @@ -225,7 +225,7 @@ jobs: timeout-minutes: 10 uses: ./.github/actions/setup-node with: - extra-flags: workspaces focus @affine/electron @affine/monorepo + extra-flags: workspaces focus @affine/electron @affine/monorepo @affine/nbstore @toeverything/infra hard-link-nm: false nmHoistingLimits: workspaces - name: Build AFFiNE native diff --git a/Cargo.lock b/Cargo.lock index 730489ad32..1a3df94c2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,7 @@ name = "affine_common" version = "0.1.0" dependencies = [ "chrono", + "criterion2", "rand", "rayon", "sha3", @@ -40,30 +41,37 @@ name = "affine_native" version = "0.0.0" dependencies = [ "affine_common", - "affine_schema", - "anyhow", - "chrono", - "criterion2", - "dotenv", + "affine_nbstore", + "affine_sqlite_v1", "napi", "napi-build", "napi-derive", - "notify", "once_cell", - "parking_lot", - "rand", - "rayon", - "serde", - "serde_json", - "sha3", "sqlx", "tokio", - "uuid", +] + +[[package]] +name = "affine_nbstore" +version = "0.0.0" +dependencies = [ + "affine_schema", + "anyhow", + "chrono", + "dotenvy", + "napi", + "napi-build", + "napi-derive", + "sqlx", + "tokio", ] [[package]] name = "affine_schema" version = "0.0.0" +dependencies = [ + "sqlx", +] [[package]] name = "affine_server_native" @@ -85,6 +93,21 @@ dependencies = [ "y-octo", ] +[[package]] +name = "affine_sqlite_v1" +version = "0.0.0" +dependencies = [ + "affine_schema", + "anyhow", + "chrono", + "dotenvy", + "napi", + "napi-build", + "napi-derive", + "sqlx", + "tokio", +] + [[package]] name = "ahash" version = "0.8.11" @@ -320,12 +343,6 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.6.0" @@ -721,12 +738,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dotenv" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" - [[package]] name = "dotenvy" version = "0.15.7" @@ -803,18 +814,6 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7ef3d5e8ae27277c8285ac43ed153158178ef0f79567f32024ca8140a0c7cd8" -[[package]] -name = "filetime" -version = "0.2.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" -dependencies = [ - "cfg-if", - "libc", - "libredox", - "windows-sys 0.59.0", -] - [[package]] name = "flume" version = "0.11.1" @@ -844,15 +843,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "fsevent-sys" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" -dependencies = [ - "libc", -] - [[package]] name = "funty" version = "2.0.0" @@ -1234,35 +1224,6 @@ dependencies = [ "hashbrown 0.15.2", ] -[[package]] -name = "inotify" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc" -dependencies = [ - "bitflags 1.3.2", - "inotify-sys", - "libc", -] - -[[package]] -name = "inotify-sys" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" -dependencies = [ - "libc", -] - -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1294,26 +1255,6 @@ dependencies = [ "cpufeatures", ] -[[package]] -name = "kqueue" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" -dependencies = [ - "kqueue-sys", - "libc", -] - -[[package]] -name = "kqueue-sys" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" -dependencies = [ - "bitflags 1.3.2", - "libc", -] - [[package]] name = "lasso" version = "0.7.3" @@ -1365,17 +1306,6 @@ dependencies = [ "libc", ] -[[package]] -name = "libredox" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" -dependencies = [ - "bitflags 2.6.0", - "libc", - "redox_syscall", -] - [[package]] name = "libsqlite3-sys" version = "0.30.1" @@ -1502,7 +1432,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "log", "wasi", "windows-sys 0.52.0", ] @@ -1523,7 +1452,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4929caab512f6e9650b53d27b4076f3e0524a1369e5d4ab25965fcc60b31cad" dependencies = [ "anyhow", - "bitflags 2.6.0", + "bitflags", "chrono", "ctor", "napi-build", @@ -1583,35 +1512,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "notify" -version = "7.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c533b4c39709f9ba5005d8002048266593c1cfaf3c5f0739d5b8ab0c6c504009" -dependencies = [ - "bitflags 2.6.0", - "filetime", - "fsevent-sys", - "inotify", - "kqueue", - "libc", - "log", - "mio", - "notify-types", - "walkdir", - "windows-sys 0.52.0", -] - -[[package]] -name = "notify-types" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7393c226621f817964ffb3dc5704f9509e107a8b024b489cc2c1b217378785df" -dependencies = [ - "instant", - "serde", -] - [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1900,7 +1800,7 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ - "bitflags 2.6.0", + "bitflags", ] [[package]] @@ -2000,7 +1900,7 @@ version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ - "bitflags 2.6.0", + "bitflags", "errno", "libc", "linux-raw-sys", @@ -2400,7 +2300,7 @@ checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" dependencies = [ "atoi", "base64 0.22.1", - "bitflags 2.6.0", + "bitflags", "byteorder", "bytes", "chrono", @@ -2443,7 +2343,7 @@ checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" dependencies = [ "atoi", "base64 0.22.1", - "bitflags 2.6.0", + "bitflags", "byteorder", "chrono", "crc", @@ -2981,17 +2881,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" -[[package]] -name = "uuid" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" -dependencies = [ - "getrandom", - "rand", - "serde", -] - [[package]] name = "v_htmlescape" version = "0.15.8" diff --git a/Cargo.toml b/Cargo.toml index 1d53c6d655..1823271767 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,8 @@ members = [ "./packages/backend/native", "./packages/common/native", "./packages/frontend/native", + "./packages/frontend/native/sqlite_v1", + "./packages/frontend/native/nbstore", "./packages/frontend/native/schema", "./packages/frontend/mobile-native", ] @@ -13,7 +15,7 @@ affine_common = { path = "./packages/common/native" } criterion2 = { version = "2", default-features = false } anyhow = "1" chrono = "0.4" -dotenv = "0.15" +dotenvy = "0.15" file-format = { version = "0.26", features = ["reader"] } mimalloc = "0.1" napi = { version = "3.0.0-alpha.12", features = ["async", "chrono_date", "error_anyhow", "napi9", "serde"] } diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index 8b7d066119..78ed4cd337 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -34,36 +34,18 @@ "devDependencies": { "@affine-test/fixtures": "workspace:*", "@affine/templates": "workspace:*", + "@emotion/react": "^11.14.0", "@swc/core": "^1.0.0", "@testing-library/dom": "^10.0.0", "@testing-library/react": "^16.1.0", + "@types/react": "^19.0.1", "fake-indexeddb": "^6.0.0", - "react": "^19.0.0", "rxjs": "^7.8.1", "vitest": "2.1.8" }, "peerDependencies": { - "@affine/templates": "*", - "@swc/core": "^1.0.0", - "@testing-library/dom": ">=7.0.0", "electron": "*", - "react": "^19.0.0", - "react-dom": "^19.0.0", - "yjs": "^13" - }, - "peerDependenciesMeta": { - "@affine/templates": { - "optional": true - }, - "electron": { - "optional": true - }, - "react": { - "optional": true - }, - "yjs": { - "optional": true - } + "react-dom": "^19.0.0" }, "version": "0.18.0" } diff --git a/packages/common/native/Cargo.toml b/packages/common/native/Cargo.toml index 88b585d2c2..7693d7e486 100644 --- a/packages/common/native/Cargo.toml +++ b/packages/common/native/Cargo.toml @@ -10,3 +10,8 @@ sha3 = { workspace = true } [dev-dependencies] rayon = { workspace = true } +criterion2 = { workspace = true } + +[[bench]] +name = "hashcash" +harness = false diff --git a/packages/frontend/native/benches/hashcash.rs b/packages/common/native/benches/hashcash.rs similarity index 95% rename from packages/frontend/native/benches/hashcash.rs rename to packages/common/native/benches/hashcash.rs index 0fd4ded6d2..974cf4f829 100644 --- a/packages/frontend/native/benches/hashcash.rs +++ b/packages/common/native/benches/hashcash.rs @@ -2,7 +2,7 @@ use std::hint::black_box; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use affine_native::hashcash::Stamp; +use affine_common::hashcash::Stamp; fn bench_hashcash(c: &mut Criterion) { let mut group = c.benchmark_group("hashcash"); diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index ebabff02ad..87f84edba0 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -9,7 +9,9 @@ "./op": "./src/op/index.ts", "./idb": "./src/impls/idb/index.ts", "./idb/v1": "./src/impls/idb/v1/index.ts", - "./cloud": "./src/impls/cloud/index.ts" + "./cloud": "./src/impls/cloud/index.ts", + "./sqlite": "./src/impls/sqlite/index.ts", + "./sqlite/v1": "./src/impls/sqlite/v1/index.ts" }, "dependencies": { "@datastructures-js/binary-search-tree": "^5.3.2", @@ -21,6 +23,7 @@ "yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" }, "devDependencies": { + "@affine/electron-api": "workspace:*", "@affine/graphql": "workspace:*", "fake-indexeddb": "^6.0.0", "idb": "^8.0.0", @@ -28,6 +31,7 @@ "vitest": "2.1.8" }, "peerDependencies": { + "@affine/electron-api": "workspace:*", "@affine/graphql": "workspace:*", "idb": "^8.0.0", "socket.io-client": "^4.7.5" diff --git a/packages/common/nbstore/src/impls/sqlite/blob.ts b/packages/common/nbstore/src/impls/sqlite/blob.ts new file mode 100644 index 0000000000..803f433fa5 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/blob.ts @@ -0,0 +1,33 @@ +import { share } from '../../connection'; +import { type BlobRecord, BlobStorage } from '../../storage'; +import { NativeDBConnection } from './db'; + +export class SqliteBlobStorage extends BlobStorage { + override connection = share( + new NativeDBConnection(this.peer, this.spaceType, this.spaceId) + ); + + get db() { + return this.connection.apis; + } + + override async get(key: string) { + return this.db.getBlob(key); + } + + override async set(blob: BlobRecord) { + await this.db.setBlob(blob); + } + + override async delete(key: string, permanently: boolean) { + await this.db.deleteBlob(key, permanently); + } + + override async release() { + await this.db.releaseBlobs(); + } + + override async list() { + return this.db.listBlobs(); + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/db.ts b/packages/common/nbstore/src/impls/sqlite/db.ts new file mode 100644 index 0000000000..f1f4d4fc15 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/db.ts @@ -0,0 +1,83 @@ +import { apis, events } from '@affine/electron-api'; + +import { Connection, type ConnectionStatus } from '../../connection'; +import { type SpaceType, universalId } from '../../storage'; + +type NativeDBApis = NonNullable['nbstore'] extends infer APIs + ? { + [K in keyof APIs]: APIs[K] extends (...args: any[]) => any + ? Parameters extends [string, ...infer Rest] + ? (...args: Rest) => ReturnType + : never + : never; + } + : never; + +export class NativeDBConnection extends Connection { + readonly apis: NativeDBApis; + + constructor( + private readonly peer: string, + private readonly type: SpaceType, + private readonly id: string + ) { + super(); + if (!apis) { + throw new Error('Not in electron context.'); + } + + this.apis = this.bindApis(apis.nbstore); + this.listenToConnectionEvents(); + } + + override get shareId(): string { + return `sqlite:${this.peer}:${this.type}:${this.id}`; + } + + bindApis(originalApis: NonNullable['nbstore']): NativeDBApis { + const id = universalId({ + peer: this.peer, + type: this.type, + id: this.id, + }); + return new Proxy(originalApis, { + get: (target, key: keyof NativeDBApis) => { + const v = target[key]; + if (typeof v !== 'function') { + return v; + } + + return async (...args: any[]) => { + return v.call( + originalApis, + id, + // @ts-expect-error I don't know why it complains ts(2556) + ...args + ); + }; + }, + }) as unknown as NativeDBApis; + } + + override async doConnect() { + await this.apis.connect(); + } + + override async doDisconnect() { + await this.apis.close(); + } + + private listenToConnectionEvents() { + events?.nbstore.onConnectionStatusChanged( + ({ peer, spaceType, spaceId, status, error }) => { + if ( + peer === this.peer && + spaceType === this.type && + spaceId === this.id + ) { + this.setStatus(status as ConnectionStatus, error); + } + } + ); + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/doc.ts b/packages/common/nbstore/src/impls/sqlite/doc.ts new file mode 100644 index 0000000000..3147130e63 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/doc.ts @@ -0,0 +1,54 @@ +import { share } from '../../connection'; +import { type DocClock, DocStorage, type DocUpdate } from '../../storage'; +import { NativeDBConnection } from './db'; + +export class SqliteDocStorage extends DocStorage { + override connection = share( + new NativeDBConnection(this.peer, this.spaceType, this.spaceId) + ); + + get db() { + return this.connection.apis; + } + + override async getDoc(docId: string) { + return this.db.getDoc(docId); + } + + override async pushDocUpdate(update: DocUpdate) { + return this.db.pushDocUpdate(update); + } + + override async deleteDoc(docId: string) { + return this.db.deleteDoc(docId); + } + + override async getDocTimestamps(after?: Date) { + return this.db.getDocTimestamps(after ? new Date(after) : undefined); + } + + override getDocTimestamp(docId: string): Promise { + return this.db.getDocTimestamp(docId); + } + + protected override async getDocSnapshot() { + // handled in db + // see electron/src/helper/nbstore/doc.ts + return null; + } + + protected override async setDocSnapshot(): Promise { + // handled in db + return true; + } + + protected override async getDocUpdates() { + // handled in db + return []; + } + + protected override markUpdatesMerged() { + // handled in db + return Promise.resolve(0); + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/index.ts b/packages/common/nbstore/src/impls/sqlite/index.ts new file mode 100644 index 0000000000..debe733b43 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/index.ts @@ -0,0 +1,3 @@ +export * from './blob'; +export * from './doc'; +export * from './sync'; diff --git a/packages/common/nbstore/src/impls/sqlite/sync.ts b/packages/common/nbstore/src/impls/sqlite/sync.ts new file mode 100644 index 0000000000..26da3f6377 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/sync.ts @@ -0,0 +1,53 @@ +import { share } from '../../connection'; +import { type DocClock, SyncStorage } from '../../storage'; +import { NativeDBConnection } from './db'; + +export class SqliteSyncStorage extends SyncStorage { + override connection = share( + new NativeDBConnection(this.peer, this.spaceType, this.spaceId) + ); + + get db() { + return this.connection.apis; + } + + override async getPeerRemoteClocks(peer: string) { + return this.db.getPeerRemoteClocks(peer); + } + + override async getPeerRemoteClock(peer: string, docId: string) { + return this.db.getPeerRemoteClock(peer, docId); + } + + override async setPeerRemoteClock(peer: string, clock: DocClock) { + await this.db.setPeerRemoteClock(peer, clock); + } + + override async getPeerPulledRemoteClocks(peer: string) { + return this.db.getPeerPulledRemoteClocks(peer); + } + + override async getPeerPulledRemoteClock(peer: string, docId: string) { + return this.db.getPeerPulledRemoteClock(peer, docId); + } + + override async setPeerPulledRemoteClock(peer: string, clock: DocClock) { + await this.db.setPeerPulledRemoteClock(peer, clock); + } + + override async getPeerPushedClocks(peer: string) { + return this.db.getPeerPushedClocks(peer); + } + + override async getPeerPushedClock(peer: string, docId: string) { + return this.db.getPeerPushedClock(peer, docId); + } + + override async setPeerPushedClock(peer: string, clock: DocClock) { + await this.db.setPeerPushedClock(peer, clock); + } + + override async clearClocks() { + await this.db.clearClocks(); + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/v1/blob.ts b/packages/common/nbstore/src/impls/sqlite/v1/blob.ts new file mode 100644 index 0000000000..d01ab58198 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/v1/blob.ts @@ -0,0 +1,62 @@ +import { apis } from '@affine/electron-api'; + +import { DummyConnection, share } from '../../../connection'; +import { BlobStorage } from '../../../storage'; + +/** + * @deprecated readonly + */ +export class SqliteV1BlobStorage extends BlobStorage { + override connection = share(new DummyConnection()); + + get db() { + if (!apis) { + throw new Error('Not in electron context.'); + } + + return apis.db; + } + + override async get(key: string) { + const data: Uint8Array | null = await this.db.getBlob( + this.spaceType, + this.spaceId, + key + ); + + if (!data) { + return null; + } + + return { + key, + data, + mime: '', + createdAt: new Date(), + }; + } + + override async delete(key: string, permanently: boolean) { + if (permanently) { + await this.db.deleteBlob(this.spaceType, this.spaceId, key); + } + } + + override async list() { + const keys = await this.db.getBlobKeys(this.spaceType, this.spaceId); + + return keys.map(key => ({ + key, + mime: '', + size: 0, + createdAt: new Date(), + })); + } + + override async set() { + // no more writes + } + override async release() { + // no more writes + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/v1/doc.ts b/packages/common/nbstore/src/impls/sqlite/v1/doc.ts new file mode 100644 index 0000000000..085a76ce41 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/v1/doc.ts @@ -0,0 +1,67 @@ +import { apis } from '@affine/electron-api'; + +import { DummyConnection, share } from '../../../connection'; +import { type DocRecord, DocStorage, type DocUpdate } from '../../../storage'; + +/** + * @deprecated readonly + */ +export class SqliteV1DocStorage extends DocStorage { + override connection = share(new DummyConnection()); + + get db() { + if (!apis) { + throw new Error('Not in electron context.'); + } + + return apis.db; + } + + override async pushDocUpdate(update: DocUpdate) { + // no more writes + + return { docId: update.docId, timestamp: new Date() }; + } + + override async getDoc(docId: string) { + const bin = await this.db.getDocAsUpdates( + this.spaceType, + this.spaceId, + docId + ); + + return { + docId, + bin, + timestamp: new Date(), + }; + } + + override async deleteDoc(docId: string) { + await this.db.deleteDoc(this.spaceType, this.spaceId, docId); + } + + protected override async getDocSnapshot() { + return null; + } + + override async getDocTimestamps() { + return {}; + } + + override async getDocTimestamp() { + return null; + } + + protected override async setDocSnapshot(): Promise { + return false; + } + + protected override async getDocUpdates(): Promise { + return []; + } + + protected override async markUpdatesMerged(): Promise { + return 0; + } +} diff --git a/packages/common/nbstore/src/impls/sqlite/v1/index.ts b/packages/common/nbstore/src/impls/sqlite/v1/index.ts new file mode 100644 index 0000000000..d476ae6eb9 --- /dev/null +++ b/packages/common/nbstore/src/impls/sqlite/v1/index.ts @@ -0,0 +1,2 @@ +export * from './blob'; +export * from './doc'; diff --git a/packages/common/nbstore/src/op/index.ts b/packages/common/nbstore/src/op/index.ts index bd2cc616b9..a6b6ffd1fe 100644 --- a/packages/common/nbstore/src/op/index.ts +++ b/packages/common/nbstore/src/op/index.ts @@ -3,6 +3,8 @@ import { OpClient } from '@toeverything/infra/op'; import type { Storage } from '../storage'; import type { SpaceStorageOps } from './ops'; +export { SpaceStorageConsumer } from './consumer'; + export class SpaceStorageClient extends OpClient { /** * Adding a storage implementation to the backend. diff --git a/packages/common/nbstore/src/storage/__tests__/__snapshots__/storage.spec.ts.snap b/packages/common/nbstore/src/storage/__tests__/__snapshots__/storage.spec.ts.snap new file mode 100644 index 0000000000..fb408a77cc --- /dev/null +++ b/packages/common/nbstore/src/storage/__tests__/__snapshots__/storage.spec.ts.snap @@ -0,0 +1,41 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`parseUniversalId > should parse universal id > @peer(@name);@type(userspace);@id(@id); 1`] = ` +{ + "id": "@id", + "peer": "@name", + "type": "userspace", +} +`; + +exports[`parseUniversalId > should parse universal id > @peer(@peer(name);@type(userspace);@id(@id); 1`] = ` +{ + "id": "@id", + "peer": "@peer(name", + "type": "userspace", +} +`; + +exports[`parseUniversalId > should parse universal id > @peer(123);@type(userspace);@id(456); 1`] = ` +{ + "id": "456", + "peer": "123", + "type": "userspace", +} +`; + +exports[`parseUniversalId > should parse universal id > @peer(123);@type(workspace);@id(456); 1`] = ` +{ + "id": "456", + "peer": "123", + "type": "workspace", +} +`; + +exports[`parseUniversalId > should parse universal id > @peer(https://app.affine.pro);@type(userspace);@id(hello:world); 1`] = ` +{ + "id": "hello:world", + "peer": "https://app.affine.pro", + "type": "userspace", +} +`; diff --git a/packages/common/nbstore/src/storage/__tests__/storage.spec.ts b/packages/common/nbstore/src/storage/__tests__/storage.spec.ts new file mode 100644 index 0000000000..8787fcaaa5 --- /dev/null +++ b/packages/common/nbstore/src/storage/__tests__/storage.spec.ts @@ -0,0 +1,36 @@ +import { describe, expect, it } from 'vitest'; + +import { parseUniversalId, universalId } from '../storage'; + +describe('parseUniversalId', () => { + it('should generate universal id', () => { + expect(universalId({ peer: '123', type: 'workspace', id: '456' })).toEqual( + '@peer(123);@type(workspace);@id(456);' + ); + }); + + it('should parse universal id', () => { + const testcases = [ + '@peer(123);@type(userspace);@id(456);', + '@peer(123);@type(workspace);@id(456);', + '@peer(https://app.affine.pro);@type(userspace);@id(hello:world);', + '@peer(@name);@type(userspace);@id(@id);', + '@peer(@peer(name);@type(userspace);@id(@id);', + ]; + + testcases.forEach(id => { + expect(parseUniversalId(id)).toMatchSnapshot(id); + }); + }); + + it('should throw invalid universal id', () => { + const testcases = [ + '@peer(123);@type(anyspace);@id(456);', // invalid space type + '@peer(@peer(name););@type(userspace);@id(@id);', // invalid peer + ]; + + testcases.forEach(id => { + expect(() => parseUniversalId(id)).toThrow(); + }); + }); +}); diff --git a/packages/common/nbstore/src/storage/index.ts b/packages/common/nbstore/src/storage/index.ts index 33d13fcd28..d9e89e81fb 100644 --- a/packages/common/nbstore/src/storage/index.ts +++ b/packages/common/nbstore/src/storage/index.ts @@ -6,6 +6,8 @@ import type { BlobStorage } from './blob'; import type { DocStorage } from './doc'; import type { SyncStorage } from './sync'; +type Storages = DocStorage | BlobStorage | SyncStorage; + export class SpaceStorage { protected readonly storages: Map = new Map(); private readonly event = new EventEmitter2(); @@ -17,24 +19,20 @@ export class SpaceStorage { ); } - tryGet(type: 'blob'): BlobStorage | undefined; - tryGet(type: 'sync'): SyncStorage | undefined; - tryGet(type: 'doc'): DocStorage | undefined; - tryGet(type: StorageType) { - return this.storages.get(type); + tryGet( + type: T + ): Extract | undefined { + return this.storages.get(type) as Extract; } - get(type: 'blob'): BlobStorage; - get(type: 'sync'): SyncStorage; - get(type: 'doc'): DocStorage; - get(type: StorageType) { - const storage = this.storages.get(type); + get(type: T): Extract { + const storage = this.tryGet(type); if (!storage) { throw new Error(`Storage ${type} not registered.`); } - return storage; + return storage as Extract; } async connect() { diff --git a/packages/common/nbstore/src/storage/storage.ts b/packages/common/nbstore/src/storage/storage.ts index bbf6182d58..32e8a762ad 100644 --- a/packages/common/nbstore/src/storage/storage.ts +++ b/packages/common/nbstore/src/storage/storage.ts @@ -9,6 +9,77 @@ export interface StorageOptions { id: string; } +export function universalId({ peer, type, id }: StorageOptions) { + return `@peer(${peer});@type(${type});@id(${id});`; +} + +export function isValidUniversalId(opts: Record): boolean { + const requiredKeys: Array = [ + 'peer', + 'type', + 'id', + ] as const; + + for (const key of requiredKeys) { + if (!opts[key]) { + return false; + } + } + + return opts.type === 'userspace' || opts.type === 'workspace'; +} + +export function parseUniversalId(id: string) { + const result: Record = {}; + let key = ''; + let value = ''; + let isInValue = false; + + let i = -1; + + while (++i < id.length) { + const ch = id[i]; + const nextCh = id[i + 1]; + + // when we are in value string, we only care about ch and next char to be [')', ';'] to end the id part + if (isInValue) { + if (ch === ')' && nextCh === ';') { + result[key] = value; + key = ''; + value = ''; + isInValue = false; + i++; + continue; + } + + value += ch; + continue; + } + + if (ch === '@') { + const keyEnd = id.indexOf('(', i); + // we find '@' but no '(' in lookahead or '(' is immediately after '@', invalid id + if (keyEnd === -1 || keyEnd === i + 1) { + break; + } + + key = id.slice(i + 1, keyEnd); + i = keyEnd; + isInValue = true; + } else { + break; + } + } + + if (!isValidUniversalId(result)) { + throw new Error( + `Invalid universal storage id: ${id}. It should be in format of @peer(\${peer});@type(\${type});@id(\${id});` + ); + } + + return result as any; +} + export abstract class Storage { abstract readonly storageType: StorageType; abstract readonly connection: Connection; @@ -25,6 +96,10 @@ export abstract class Storage { return this.options.id; } + get universalId() { + return universalId(this.options); + } + constructor(public readonly options: Opts) {} async connect() { diff --git a/packages/frontend/apps/electron/package.json b/packages/frontend/apps/electron/package.json index 62c749dd8f..0c6ac58b31 100644 --- a/packages/frontend/apps/electron/package.json +++ b/packages/frontend/apps/electron/package.json @@ -28,6 +28,7 @@ "@affine/core": "workspace:*", "@affine/i18n": "workspace:*", "@affine/native": "workspace:*", + "@affine/nbstore": "workspace:*", "@blocksuite/affine": "0.18.7", "@electron-forge/cli": "^7.3.0", "@electron-forge/core": "^7.3.0", diff --git a/packages/frontend/apps/electron/src/helper/db/types.ts b/packages/frontend/apps/electron/src/helper/db/types.ts deleted file mode 100644 index 842b92bc10..0000000000 --- a/packages/frontend/apps/electron/src/helper/db/types.ts +++ /dev/null @@ -1 +0,0 @@ -export type SpaceType = 'userspace' | 'workspace'; diff --git a/packages/frontend/apps/electron/src/helper/dialog/dialog.ts b/packages/frontend/apps/electron/src/helper/dialog/dialog.ts index ff8e385e20..db790aa18a 100644 --- a/packages/frontend/apps/electron/src/helper/dialog/dialog.ts +++ b/packages/frontend/apps/electron/src/helper/dialog/dialog.ts @@ -2,9 +2,9 @@ import { ValidationResult } from '@affine/native'; import fs from 'fs-extra'; import { nanoid } from 'nanoid'; -import { ensureSQLiteDB } from '../db/ensure-db'; import { logger } from '../logger'; import { mainRPC } from '../main-rpc'; +import { ensureSQLiteDB } from '../nbstore/v1'; import { storeWorkspaceMeta } from '../workspace'; import { getWorkspaceDBPath, getWorkspacesBasePath } from '../workspace/meta'; @@ -69,14 +69,20 @@ function getDefaultDBFileName(name: string, id: string) { * * It will just copy the file to the given path */ -export async function saveDBFileAs( - workspaceId: string -): Promise { +export async function saveDBFileAs(id: string): Promise { try { - const db = await ensureSQLiteDB('workspace', workspaceId); - await db.checkpoint(); // make sure all changes (WAL) are written to db - const fakedResult = getFakedResult(); + // TODO(@forehalo): use `nbstore` when it is ready + // const storage = await ensureStorage(id); + const storage = await ensureSQLiteDB('workspace', id); + await storage.checkpoint(); // make sure all changes (WAL) are written to db + const fakedResult = getFakedResult(); + const dbPath = storage.path; + if (!dbPath) { + return { + error: 'DB_FILE_PATH_INVALID', + }; + } const ret = fakedResult ?? (await mainRPC.showSaveDialog({ @@ -91,8 +97,8 @@ export async function saveDBFileAs( }, ], defaultPath: getDefaultDBFileName( - await db.getWorkspaceName(), - workspaceId + (await storage.getWorkspaceName()) ?? 'db', + id ), message: 'Save Workspace as a SQLite Database file', })); @@ -103,7 +109,7 @@ export async function saveDBFileAs( }; } - await fs.copyFile(db.path, filePath); + await fs.copyFile(dbPath, filePath); logger.log('saved', filePath); if (!fakedResult) { mainRPC.showItemInFolder(filePath).catch(err => { @@ -188,28 +194,35 @@ export async function loadDBFile(): Promise { return { error: 'DB_FILE_PATH_INVALID' }; } - const { SqliteConnection } = await import('@affine/native'); - - const validationResult = await SqliteConnection.validate(originalPath); - - if (validationResult !== ValidationResult.Valid) { - return { error: 'DB_FILE_INVALID' }; // invalid db file - } - - // copy the db file to a new workspace id const workspaceId = nanoid(10); - const internalFilePath = await getWorkspaceDBPath('workspace', workspaceId); + return loadV1DBFile(originalPath, workspaceId); - await fs.ensureDir(await getWorkspacesBasePath()); - await fs.copy(originalPath, internalFilePath); - logger.info(`loadDBFile, copy: ${originalPath} -> ${internalFilePath}`); + // TODO(forehalo): use `nbstore` when it is ready + // let storage = new DocStorage(originalPath); - await storeWorkspaceMeta(workspaceId, { - id: workspaceId, - mainDBPath: internalFilePath, - }); + // // if imported db is not a valid v2 db, we will treat it as a v1 db + // if (!(await storage.validate())) { + // return loadV1DBFile(originalPath, workspaceId); + // } - return { workspaceId }; + // // v2 import logic + // const internalFilePath = await getSpaceDBPath( + // 'local', + // 'workspace', + // workspaceId + // ); + // await fs.ensureDir(await getWorkspacesBasePath()); + // await fs.copy(originalPath, internalFilePath); + // logger.info(`loadDBFile, copy: ${originalPath} -> ${internalFilePath}`); + + // storage = new DocStorage(internalFilePath); + // await storage.connect(); + // await storage.setSpaceId(workspaceId); + // await storage.close(); + + // return { + // workspaceId, + // }; } catch (err) { logger.error('loadDBFile', err); return { @@ -217,3 +230,31 @@ export async function loadDBFile(): Promise { }; } } + +async function loadV1DBFile( + originalPath: string, + workspaceId: string +): Promise { + const { SqliteConnection } = await import('@affine/native'); + + const validationResult = await SqliteConnection.validate(originalPath); + + if (validationResult !== ValidationResult.Valid) { + return { error: 'DB_FILE_INVALID' }; // invalid db file + } + + const internalFilePath = await getWorkspaceDBPath('workspace', workspaceId); + + await fs.ensureDir(await getWorkspacesBasePath()); + await fs.copy(originalPath, internalFilePath); + logger.info(`loadDBFile, copy: ${originalPath} -> ${internalFilePath}`); + + await storeWorkspaceMeta(workspaceId, { + id: workspaceId, + mainDBPath: internalFilePath, + }); + + return { + workspaceId, + }; +} diff --git a/packages/frontend/apps/electron/src/helper/dialog/index.ts b/packages/frontend/apps/electron/src/helper/dialog/index.ts index 9c773544b6..8516cb3d17 100644 --- a/packages/frontend/apps/electron/src/helper/dialog/index.ts +++ b/packages/frontend/apps/electron/src/helper/dialog/index.ts @@ -9,8 +9,8 @@ export const dialogHandlers = { loadDBFile: async () => { return loadDBFile(); }, - saveDBFileAs: async (workspaceId: string) => { - return saveDBFileAs(workspaceId); + saveDBFileAs: async (id: string) => { + return saveDBFileAs(id); }, selectDBFileLocation: async () => { return selectDBFileLocation(); diff --git a/packages/frontend/apps/electron/src/helper/exposed.ts b/packages/frontend/apps/electron/src/helper/exposed.ts index e3adb3d465..effa818c29 100644 --- a/packages/frontend/apps/electron/src/helper/exposed.ts +++ b/packages/frontend/apps/electron/src/helper/exposed.ts @@ -1,17 +1,24 @@ -import { dbEvents, dbHandlers } from './db'; import { dialogHandlers } from './dialog'; +import { + dbEventsV1, + dbHandlersV1, + nbstoreEvents, + nbstoreHandlers, +} from './nbstore'; import { provideExposed } from './provide'; import { workspaceEvents, workspaceHandlers } from './workspace'; export const handlers = { - db: dbHandlers, + db: dbHandlersV1, + nbstore: nbstoreHandlers, workspace: workspaceHandlers, dialog: dialogHandlers, }; export const events = { - db: dbEvents, + db: dbEventsV1, workspace: workspaceEvents, + nbstore: nbstoreEvents, }; const getExposedMeta = () => { diff --git a/packages/frontend/apps/electron/src/helper/nbstore/blob.ts b/packages/frontend/apps/electron/src/helper/nbstore/blob.ts new file mode 100644 index 0000000000..6e41097b45 --- /dev/null +++ b/packages/frontend/apps/electron/src/helper/nbstore/blob.ts @@ -0,0 +1,33 @@ +import { type BlobRecord, BlobStorage, share } from '@affine/nbstore'; + +import { NativeDBConnection } from './db'; + +export class SqliteBlobStorage extends BlobStorage { + override connection = share( + new NativeDBConnection(this.peer, this.spaceType, this.spaceId) + ); + + get db() { + return this.connection.inner; + } + + override async get(key: string) { + return this.db.getBlob(key); + } + + override async set(blob: BlobRecord) { + await this.db.setBlob(blob); + } + + override async delete(key: string, permanently: boolean) { + await this.db.deleteBlob(key, permanently); + } + + override async release() { + await this.db.releaseBlobs(); + } + + override async list() { + return this.db.listBlobs(); + } +} diff --git a/packages/frontend/apps/electron/src/helper/nbstore/db.ts b/packages/frontend/apps/electron/src/helper/nbstore/db.ts new file mode 100644 index 0000000000..5fd20214f7 --- /dev/null +++ b/packages/frontend/apps/electron/src/helper/nbstore/db.ts @@ -0,0 +1,40 @@ +import path from 'node:path'; + +import { DocStorage as NativeDocStorage } from '@affine/native'; +import { Connection, type SpaceType } from '@affine/nbstore'; +import fs from 'fs-extra'; + +import { logger } from '../logger'; +import { getSpaceDBPath } from '../workspace/meta'; + +export class NativeDBConnection extends Connection { + constructor( + private readonly peer: string, + private readonly type: SpaceType, + private readonly id: string + ) { + super(); + } + + async getDBPath() { + return await getSpaceDBPath(this.peer, this.type, this.id); + } + + override get shareId(): string { + return `sqlite:${this.peer}:${this.type}:${this.id}`; + } + + override async doConnect() { + const dbPath = await this.getDBPath(); + await fs.ensureDir(path.dirname(dbPath)); + const conn = new NativeDocStorage(dbPath); + await conn.connect(); + logger.info('[nbstore] connection established', this.shareId); + return conn; + } + + override async doDisconnect(conn: NativeDocStorage) { + await conn.close(); + logger.info('[nbstore] connection closed', this.shareId); + } +} diff --git a/packages/frontend/apps/electron/src/helper/nbstore/doc.ts b/packages/frontend/apps/electron/src/helper/nbstore/doc.ts new file mode 100644 index 0000000000..016ef9efd6 --- /dev/null +++ b/packages/frontend/apps/electron/src/helper/nbstore/doc.ts @@ -0,0 +1,83 @@ +import { + type DocClocks, + type DocRecord, + DocStorage, + type DocUpdate, + share, +} from '@affine/nbstore'; + +import { NativeDBConnection } from './db'; + +export class SqliteDocStorage extends DocStorage { + override connection = share( + new NativeDBConnection(this.peer, this.spaceType, this.spaceId) + ); + + get db() { + return this.connection.inner; + } + + override async pushDocUpdate(update: DocUpdate) { + const timestamp = await this.db.pushUpdate(update.docId, update.bin); + + return { docId: update.docId, timestamp }; + } + + override async deleteDoc(docId: string) { + await this.db.deleteDoc(docId); + } + + override async getDocTimestamps(after?: Date) { + const clocks = await this.db.getDocClocks(after); + + return clocks.reduce((ret, cur) => { + ret[cur.docId] = cur.timestamp; + return ret; + }, {} as DocClocks); + } + + override async getDocTimestamp(docId: string) { + return this.db.getDocClock(docId); + } + + protected override async getDocSnapshot(docId: string) { + const snapshot = await this.db.getDocSnapshot(docId); + + if (!snapshot) { + return null; + } + + return { + docId, + bin: snapshot.data, + timestamp: snapshot.timestamp, + }; + } + + protected override async setDocSnapshot( + snapshot: DocRecord + ): Promise { + return this.db.setDocSnapshot({ + docId: snapshot.docId, + data: Buffer.from(snapshot.bin), + timestamp: new Date(snapshot.timestamp), + }); + } + + protected override async getDocUpdates(docId: string) { + return this.db.getDocUpdates(docId).then(updates => + updates.map(update => ({ + docId, + bin: update.data, + timestamp: update.createdAt, + })) + ); + } + + protected override markUpdatesMerged(docId: string, updates: DocRecord[]) { + return this.db.markUpdatesMerged( + docId, + updates.map(update => update.timestamp) + ); + } +} diff --git a/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts new file mode 100644 index 0000000000..f05432b6b7 --- /dev/null +++ b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts @@ -0,0 +1,143 @@ +import { + type BlobRecord, + type DocClock, + type DocUpdate, +} from '@affine/nbstore'; + +import type { MainEventRegister } from '../type'; +import { + type ConnectionStatus, + ensureStorage, + getStorage, + onConnectionChanged, +} from './storage'; + +export const nbstoreHandlers = { + connect: async (id: string) => { + await ensureStorage(id); + }, + + close: async (id: string) => { + const store = getStorage(id); + + if (store) { + await store.disconnect(); + // The store may be shared with other tabs, so we don't delete it from cache + // the underlying connection will handle the close correctly + // STORE_CACHE.delete(`${spaceType}:${spaceId}`); + } + }, + + pushDocUpdate: async (id: string, update: DocUpdate) => { + const store = await ensureStorage(id); + return store.get('doc').pushDocUpdate(update); + }, + + getDoc: async (id: string, docId: string) => { + const store = await ensureStorage(id); + return store.get('doc').getDoc(docId); + }, + + deleteDoc: async (id: string, docId: string) => { + const store = await ensureStorage(id); + return store.get('doc').deleteDoc(docId); + }, + + getDocTimestamps: async (id: string, after?: Date) => { + const store = await ensureStorage(id); + return store.get('doc').getDocTimestamps(after); + }, + + getDocTimestamp: async (id: string, docId: string) => { + const store = await ensureStorage(id); + return store.get('doc').getDocTimestamp(docId); + }, + + setBlob: async (id: string, blob: BlobRecord) => { + const store = await ensureStorage(id); + return store.get('blob').set(blob); + }, + + getBlob: async (id: string, key: string) => { + const store = await ensureStorage(id); + return store.get('blob').get(key); + }, + + deleteBlob: async (id: string, key: string, permanently: boolean) => { + const store = await ensureStorage(id); + return store.get('blob').delete(key, permanently); + }, + + listBlobs: async (id: string) => { + const store = await ensureStorage(id); + return store.get('blob').list(); + }, + + releaseBlobs: async (id: string) => { + const store = await ensureStorage(id); + return store.get('blob').release(); + }, + + getPeerRemoteClocks: async (id: string, peer: string) => { + const store = await ensureStorage(id); + return store.get('sync').getPeerRemoteClocks(peer); + }, + + getPeerRemoteClock: async (id: string, peer: string, docId: string) => { + const store = await ensureStorage(id); + return store.get('sync').getPeerRemoteClock(peer, docId); + }, + + setPeerRemoteClock: async (id: string, peer: string, clock: DocClock) => { + const store = await ensureStorage(id); + return store.get('sync').setPeerRemoteClock(peer, clock); + }, + + getPeerPulledRemoteClocks: async (id: string, peer: string) => { + const store = await ensureStorage(id); + return store.get('sync').getPeerPulledRemoteClocks(peer); + }, + + getPeerPulledRemoteClock: async (id: string, peer: string, docId: string) => { + const store = await ensureStorage(id); + return store.get('sync').getPeerPulledRemoteClock(peer, docId); + }, + + setPeerPulledRemoteClock: async ( + id: string, + peer: string, + clock: DocClock + ) => { + const store = await ensureStorage(id); + return store.get('sync').setPeerPulledRemoteClock(peer, clock); + }, + + getPeerPushedClocks: async (id: string, peer: string) => { + const store = await ensureStorage(id); + return store.get('sync').getPeerPushedClocks(peer); + }, + + getPeerPushedClock: async (id: string, peer: string, docId: string) => { + const store = await ensureStorage(id); + return store.get('sync').getPeerPushedClock(peer, docId); + }, + + setPeerPushedClock: async (id: string, peer: string, clock: DocClock) => { + const store = await ensureStorage(id); + return store.get('sync').setPeerPushedClock(peer, clock); + }, + + clearClocks: async (id: string) => { + const store = await ensureStorage(id); + return store.get('sync').clearClocks(); + }, +}; + +export const nbstoreEvents = { + onConnectionStatusChanged: (fn: (payload: ConnectionStatus) => void) => { + const sub = onConnectionChanged(fn); + return () => { + sub.unsubscribe(); + }; + }, +} satisfies Record; diff --git a/packages/frontend/apps/electron/src/helper/nbstore/index.ts b/packages/frontend/apps/electron/src/helper/nbstore/index.ts new file mode 100644 index 0000000000..1bdd0b736d --- /dev/null +++ b/packages/frontend/apps/electron/src/helper/nbstore/index.ts @@ -0,0 +1,4 @@ +export { nbstoreEvents, nbstoreHandlers } from './handlers'; +export * from './storage'; +export { dbEvents as dbEventsV1, dbHandlers as dbHandlersV1 } from './v1'; +export { universalId } from '@affine/nbstore'; diff --git a/packages/frontend/apps/electron/src/helper/nbstore/storage.ts b/packages/frontend/apps/electron/src/helper/nbstore/storage.ts new file mode 100644 index 0000000000..dbec1d1098 --- /dev/null +++ b/packages/frontend/apps/electron/src/helper/nbstore/storage.ts @@ -0,0 +1,127 @@ +import { + parseUniversalId, + SpaceStorage, + type SpaceType, + type StorageType, +} from '@affine/nbstore'; +import { Subject } from 'rxjs'; +import { applyUpdate, Doc as YDoc } from 'yjs'; + +import { logger } from '../logger'; +import { SqliteBlobStorage } from './blob'; +import { NativeDBConnection } from './db'; +import { SqliteDocStorage } from './doc'; +import { SqliteSyncStorage } from './sync'; + +export class SqliteSpaceStorage extends SpaceStorage { + get connection() { + const docStore = this.get('doc'); + + if (!docStore) { + throw new Error('doc store not found'); + } + + const connection = docStore.connection; + + if (!(connection instanceof NativeDBConnection)) { + throw new Error('doc store connection is not a Sqlite connection'); + } + + return connection; + } + + async getDBPath() { + return this.connection.getDBPath(); + } + + async getWorkspaceName() { + const docStore = this.tryGet('doc'); + + if (!docStore) { + return null; + } + + const doc = await docStore.getDoc(docStore.spaceId); + if (!doc) { + return null; + } + + const ydoc = new YDoc(); + applyUpdate(ydoc, doc.bin); + return ydoc.getMap('meta').get('name') as string; + } + + async checkpoint() { + await this.connection.inner.checkpoint(); + } +} + +const STORE_CACHE = new Map(); +export interface ConnectionStatus { + peer: string; + spaceType: SpaceType; + spaceId: string; + storage: StorageType; + status: string; + error?: Error; +} +const CONNECTION$ = new Subject(); + +process.on('beforeExit', () => { + CONNECTION$.complete(); + STORE_CACHE.forEach(store => { + store.destroy().catch(err => { + logger.error('[nbstore] destroy store failed', err); + }); + }); +}); + +export function onConnectionChanged(fn: (payload: ConnectionStatus) => void) { + return CONNECTION$.subscribe({ next: fn }); +} + +export function getStorage(universalId: string) { + return STORE_CACHE.get(universalId); +} + +export async function ensureStorage(universalId: string) { + const { peer, type, id } = parseUniversalId(universalId); + let store = STORE_CACHE.get(universalId); + + if (!store) { + const opts = { + peer, + type, + id, + }; + + store = new SqliteSpaceStorage([ + new SqliteDocStorage(opts), + new SqliteBlobStorage(opts), + new SqliteSyncStorage(opts), + ]); + + store.on('connection', ({ storage, status, error }) => { + CONNECTION$.next({ + peer, + spaceType: type, + spaceId: id, + storage, + status, + error, + }); + logger.info( + `[nbstore] status changed: ${status}, spaceType: ${type}, spaceId: ${id}, storage: ${storage}` + ); + if (error) { + logger.error(`[nbstore] connection error: ${error}`); + } + }); + + await store.connect(); + + STORE_CACHE.set(universalId, store); + } + + return store; +} diff --git a/packages/frontend/apps/electron/src/helper/nbstore/sync.ts b/packages/frontend/apps/electron/src/helper/nbstore/sync.ts new file mode 100644 index 0000000000..2ffcf259e9 --- /dev/null +++ b/packages/frontend/apps/electron/src/helper/nbstore/sync.ts @@ -0,0 +1,70 @@ +import { + type DocClock, + type DocClocks, + share, + SyncStorage, +} from '@affine/nbstore'; + +import { NativeDBConnection } from './db'; + +export class SqliteSyncStorage extends SyncStorage { + override connection = share( + new NativeDBConnection(this.peer, this.spaceType, this.spaceId) + ); + + get db() { + return this.connection.inner; + } + + override async getPeerRemoteClocks(peer: string) { + const records = await this.db.getPeerRemoteClocks(peer); + return records.reduce((clocks, { docId, timestamp }) => { + clocks[docId] = timestamp; + return clocks; + }, {} as DocClocks); + } + + override async getPeerRemoteClock(peer: string, docId: string) { + return this.db.getPeerRemoteClock(peer, docId); + } + + override async setPeerRemoteClock(peer: string, clock: DocClock) { + await this.db.setPeerRemoteClock(peer, clock.docId, clock.timestamp); + } + + override async getPeerPulledRemoteClock(peer: string, docId: string) { + return this.db.getPeerPulledRemoteClock(peer, docId); + } + + override async getPeerPulledRemoteClocks(peer: string) { + const records = await this.db.getPeerPulledRemoteClocks(peer); + return records.reduce((clocks, { docId, timestamp }) => { + clocks[docId] = timestamp; + return clocks; + }, {} as DocClocks); + } + + override async setPeerPulledRemoteClock(peer: string, clock: DocClock) { + await this.db.setPeerPulledRemoteClock(peer, clock.docId, clock.timestamp); + } + + override async getPeerPushedClocks(peer: string) { + const records = await this.db.getPeerPushedClocks(peer); + return records.reduce((clocks, { docId, timestamp }) => { + clocks[docId] = timestamp; + return clocks; + }, {} as DocClocks); + } + + override async getPeerPushedClock(peer: string, docId: string) { + return this.db.getPeerPushedClock(peer, docId); + } + + override async setPeerPushedClock(peer: string, clock: DocClock) { + await this.db.setPeerPushedClock(peer, clock.docId, clock.timestamp); + } + + override async clearClocks() { + await this.db.clearClocks(); + } +} diff --git a/packages/frontend/apps/electron/src/helper/db/db-adapter.ts b/packages/frontend/apps/electron/src/helper/nbstore/v1/db-adapter.ts similarity index 99% rename from packages/frontend/apps/electron/src/helper/db/db-adapter.ts rename to packages/frontend/apps/electron/src/helper/nbstore/v1/db-adapter.ts index d61ea530e5..caee79a426 100644 --- a/packages/frontend/apps/electron/src/helper/db/db-adapter.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/v1/db-adapter.ts @@ -2,7 +2,7 @@ import type { InsertRow } from '@affine/native'; import { SqliteConnection } from '@affine/native'; import type { ByteKVBehavior } from '@toeverything/infra/storage'; -import { logger } from '../logger'; +import { logger } from '../../logger'; /** * A base class for SQLite DB adapter that provides basic methods around updates & blobs diff --git a/packages/frontend/apps/electron/src/helper/db/ensure-db.ts b/packages/frontend/apps/electron/src/helper/nbstore/v1/ensure-db.ts similarity index 92% rename from packages/frontend/apps/electron/src/helper/db/ensure-db.ts rename to packages/frontend/apps/electron/src/helper/nbstore/v1/ensure-db.ts index 82488d06ec..e7fcd0e2fe 100644 --- a/packages/frontend/apps/electron/src/helper/db/ensure-db.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/v1/ensure-db.ts @@ -1,5 +1,6 @@ -import { logger } from '../logger'; -import type { SpaceType } from './types'; +import type { SpaceType } from '@affine/nbstore'; + +import { logger } from '../../logger'; import type { WorkspaceSQLiteDB } from './workspace-db-adapter'; import { openWorkspaceDatabase } from './workspace-db-adapter'; diff --git a/packages/frontend/apps/electron/src/helper/db/index.ts b/packages/frontend/apps/electron/src/helper/nbstore/v1/index.ts similarity index 96% rename from packages/frontend/apps/electron/src/helper/db/index.ts rename to packages/frontend/apps/electron/src/helper/nbstore/v1/index.ts index f59931375a..7b06c65a9a 100644 --- a/packages/frontend/apps/electron/src/helper/db/index.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/v1/index.ts @@ -1,7 +1,8 @@ -import { mainRPC } from '../main-rpc'; -import type { MainEventRegister } from '../type'; +import type { SpaceType } from '@affine/nbstore'; + +import { mainRPC } from '../../main-rpc'; +import type { MainEventRegister } from '../../type'; import { ensureSQLiteDB } from './ensure-db'; -import type { SpaceType } from './types'; export * from './ensure-db'; diff --git a/packages/frontend/apps/electron/src/helper/db/merge-update.ts b/packages/frontend/apps/electron/src/helper/nbstore/v1/merge-update.ts similarity index 100% rename from packages/frontend/apps/electron/src/helper/db/merge-update.ts rename to packages/frontend/apps/electron/src/helper/nbstore/v1/merge-update.ts diff --git a/packages/frontend/apps/electron/src/helper/db/workspace-db-adapter.ts b/packages/frontend/apps/electron/src/helper/nbstore/v1/workspace-db-adapter.ts similarity index 96% rename from packages/frontend/apps/electron/src/helper/db/workspace-db-adapter.ts rename to packages/frontend/apps/electron/src/helper/nbstore/v1/workspace-db-adapter.ts index 5767a902d2..820b628cfd 100644 --- a/packages/frontend/apps/electron/src/helper/db/workspace-db-adapter.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/v1/workspace-db-adapter.ts @@ -1,12 +1,12 @@ +import type { SpaceType } from '@affine/nbstore'; import { AsyncLock } from '@toeverything/infra/utils'; import { Subject } from 'rxjs'; import { applyUpdate, Doc as YDoc } from 'yjs'; -import { logger } from '../logger'; -import { getWorkspaceMeta } from '../workspace/meta'; +import { logger } from '../../logger'; +import { getWorkspaceMeta } from '../../workspace/meta'; import { SQLiteAdapter } from './db-adapter'; import { mergeUpdate } from './merge-update'; -import type { SpaceType } from './types'; const TRIM_SIZE = 1; diff --git a/packages/frontend/apps/electron/src/helper/workspace/handlers.ts b/packages/frontend/apps/electron/src/helper/workspace/handlers.ts index 641ee9d3d3..988afa34e7 100644 --- a/packages/frontend/apps/electron/src/helper/workspace/handlers.ts +++ b/packages/frontend/apps/electron/src/helper/workspace/handlers.ts @@ -2,17 +2,17 @@ import path from 'node:path'; import fs from 'fs-extra'; -import { ensureSQLiteDB } from '../db/ensure-db'; import { logger } from '../logger'; +import { ensureSQLiteDB } from '../nbstore/v1/ensure-db'; import type { WorkspaceMeta } from '../type'; import { getDeletedWorkspacesBasePath, - getWorkspaceBasePath, + getWorkspaceBasePathV1, getWorkspaceMeta, } from './meta'; export async function deleteWorkspace(id: string) { - const basePath = await getWorkspaceBasePath('workspace', id); + const basePath = await getWorkspaceBasePathV1('workspace', id); const movedPath = path.join(await getDeletedWorkspacesBasePath(), `${id}`); try { const db = await ensureSQLiteDB('workspace', id); @@ -30,7 +30,7 @@ export async function storeWorkspaceMeta( meta: Partial ) { try { - const basePath = await getWorkspaceBasePath('workspace', workspaceId); + const basePath = await getWorkspaceBasePathV1('workspace', workspaceId); await fs.ensureDir(basePath); const metaPath = path.join(basePath, 'meta.json'); const currentMeta = await getWorkspaceMeta('workspace', workspaceId); diff --git a/packages/frontend/apps/electron/src/helper/workspace/meta.ts b/packages/frontend/apps/electron/src/helper/workspace/meta.ts index 9433be082a..9026097145 100644 --- a/packages/frontend/apps/electron/src/helper/workspace/meta.ts +++ b/packages/frontend/apps/electron/src/helper/workspace/meta.ts @@ -1,9 +1,9 @@ import path from 'node:path'; +import type { SpaceType } from '@affine/nbstore'; import fs from 'fs-extra'; import { isWindows } from '../../shared/utils'; -import type { SpaceType } from '../db/types'; import { logger } from '../logger'; import { mainRPC } from '../main-rpc'; import type { WorkspaceMeta } from '../type'; @@ -22,7 +22,7 @@ export async function getWorkspacesBasePath() { return path.join(await getAppDataPath(), 'workspaces'); } -export async function getWorkspaceBasePath( +export async function getWorkspaceBasePathV1( spaceType: SpaceType, workspaceId: string ) { @@ -33,6 +33,34 @@ export async function getWorkspaceBasePath( ); } +export async function getSpaceBasePath(spaceType: SpaceType) { + return path.join( + await getAppDataPath(), + spaceType === 'userspace' ? 'userspaces' : 'workspaces' + ); +} + +export function escapeFilename(name: string) { + // replace all special characters with '_' and replace repeated '_' with a single '_' and remove trailing '_' + return name + .replaceAll(/[\\/!@#$%^&*()+~`"':;,?<>|]/g, '_') + .replaceAll(/_+/g, '_') + .replace(/_+$/, ''); +} + +export async function getSpaceDBPath( + peer: string, + spaceType: SpaceType, + id: string +) { + return path.join( + await getSpaceBasePath(spaceType), + escapeFilename(peer), + id, + 'storage.db' + ); +} + export async function getDeletedWorkspacesBasePath() { return path.join(await getAppDataPath(), 'deleted-workspaces'); } @@ -42,7 +70,7 @@ export async function getWorkspaceDBPath( workspaceId: string ) { return path.join( - await getWorkspaceBasePath(spaceType, workspaceId), + await getWorkspaceBasePathV1(spaceType, workspaceId), 'storage.db' ); } @@ -52,7 +80,7 @@ export async function getWorkspaceMetaPath( workspaceId: string ) { return path.join( - await getWorkspaceBasePath(spaceType, workspaceId), + await getWorkspaceBasePathV1(spaceType, workspaceId), 'meta.json' ); } @@ -66,7 +94,7 @@ export async function getWorkspaceMeta( workspaceId: string ): Promise { try { - const basePath = await getWorkspaceBasePath(spaceType, workspaceId); + const basePath = await getWorkspaceBasePathV1(spaceType, workspaceId); const metaPath = await getWorkspaceMetaPath(spaceType, workspaceId); if ( !(await fs diff --git a/packages/frontend/apps/electron/test/db/ensure-db.spec.ts b/packages/frontend/apps/electron/test/db/ensure-db.spec.ts index 191ecd9646..680b3780fb 100644 --- a/packages/frontend/apps/electron/test/db/ensure-db.spec.ts +++ b/packages/frontend/apps/electron/test/db/ensure-db.spec.ts @@ -54,7 +54,7 @@ afterAll(() => { test('can get a valid WorkspaceSQLiteDB', async () => { const { ensureSQLiteDB } = await import( - '@affine/electron/helper/db/ensure-db' + '@affine/electron/helper/nbstore/v1/ensure-db' ); const workspaceId = v4(); const db0 = await ensureSQLiteDB('workspace', workspaceId); @@ -71,7 +71,7 @@ test('can get a valid WorkspaceSQLiteDB', async () => { test('db should be destroyed when app quits', async () => { const { ensureSQLiteDB } = await import( - '@affine/electron/helper/db/ensure-db' + '@affine/electron/helper/nbstore/v1/ensure-db' ); const workspaceId = v4(); const db0 = await ensureSQLiteDB('workspace', workspaceId); @@ -91,7 +91,7 @@ test('db should be destroyed when app quits', async () => { test('db should be removed in db$Map after destroyed', async () => { const { ensureSQLiteDB, db$Map } = await import( - '@affine/electron/helper/db/ensure-db' + '@affine/electron/helper/nbstore/v1/ensure-db' ); const workspaceId = v4(); const db = await ensureSQLiteDB('workspace', workspaceId); diff --git a/packages/frontend/apps/electron/test/db/workspace-db-adapter.spec.ts b/packages/frontend/apps/electron/test/db/workspace-db-adapter.spec.ts index f06f0fa3ad..29e69d96ff 100644 --- a/packages/frontend/apps/electron/test/db/workspace-db-adapter.spec.ts +++ b/packages/frontend/apps/electron/test/db/workspace-db-adapter.spec.ts @@ -26,7 +26,7 @@ afterAll(() => { test('can create new db file if not exists', async () => { const { openWorkspaceDatabase } = await import( - '@affine/electron/helper/db/workspace-db-adapter' + '@affine/electron/helper/nbstore/v1/workspace-db-adapter' ); const workspaceId = v4(); const db = await openWorkspaceDatabase('workspace', workspaceId); @@ -41,7 +41,7 @@ test('can create new db file if not exists', async () => { test('on destroy, check if resources have been released', async () => { const { openWorkspaceDatabase } = await import( - '@affine/electron/helper/db/workspace-db-adapter' + '@affine/electron/helper/nbstore/v1/workspace-db-adapter' ); const workspaceId = v4(); const db = await openWorkspaceDatabase('workspace', workspaceId); diff --git a/packages/frontend/native/.env b/packages/frontend/native/.env deleted file mode 100644 index 0170a2f443..0000000000 --- a/packages/frontend/native/.env +++ /dev/null @@ -1 +0,0 @@ -DATABASE_URL="sqlite:affine.db" diff --git a/packages/frontend/native/Cargo.toml b/packages/frontend/native/Cargo.toml index 0c71b42593..c30c76c881 100644 --- a/packages/frontend/native/Cargo.toml +++ b/packages/frontend/native/Cargo.toml @@ -6,38 +6,17 @@ version = "0.0.0" [lib] crate-type = ["rlib", "cdylib"] -[features] -noop = ["napi/noop", "napi-derive/noop"] - -[[bench]] -name = "hashcash" -harness = false - [dependencies] affine_common = { workspace = true } -affine_schema = { path = "./schema" } -anyhow = { workspace = true } -chrono = { workspace = true } -criterion2 = { workspace = true } +affine_sqlite_v1 = { path = "./sqlite_v1" } +affine_nbstore = { path = "./nbstore" } napi = { workspace = true } napi-derive = { workspace = true } -notify = { workspace = true, features = ["serde"] } once_cell = { workspace = true } -parking_lot = { workspace = true } -rand = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -sha3 = { workspace = true } sqlx = { workspace = true, default-features = false, features = ["chrono", "macros", "migrate", "runtime-tokio", "sqlite", "tls-rustls"] } tokio = { workspace = true, features = ["full"] } -uuid = { workspace = true, features = ["fast-rng", "serde", "v4"] } - -[dev-dependencies] -rayon = { workspace = true } [build-dependencies] -affine_schema = { path = "./schema" } -dotenv = { workspace = true } napi-build = { workspace = true } sqlx = { workspace = true, default-features = false, features = ["chrono", "json", "macros", "migrate", "runtime-tokio", "sqlite", "tls-rustls"] } tokio = { workspace = true, features = ["full"] } diff --git a/packages/frontend/native/__tests__/db.spec.mts b/packages/frontend/native/__tests__/db.spec.mts index 99cbc82cf3..b762f25855 100644 --- a/packages/frontend/native/__tests__/db.spec.mts +++ b/packages/frontend/native/__tests__/db.spec.mts @@ -1,6 +1,7 @@ -import test from 'ava'; import { fileURLToPath } from 'node:url'; +import test from 'ava'; + import { SqliteConnection, ValidationResult } from '../index'; test('db validate', async t => { diff --git a/packages/frontend/native/build.rs b/packages/frontend/native/build.rs index a5d6b2087f..f4cc262090 100644 --- a/packages/frontend/native/build.rs +++ b/packages/frontend/native/build.rs @@ -1,34 +1,5 @@ -use sqlx::sqlite::SqliteConnectOptions; -use std::fs; - #[tokio::main] async fn main() -> Result<(), std::io::Error> { - dotenv::dotenv().ok(); - - // always start with a fresh database to have - // latest db schema - let db_path = "../../../affine.db"; - - // check if db exists and then remove file - if fs::metadata(db_path).is_ok() { - fs::remove_file(db_path)?; - } - - #[cfg(not(feature = "noop"))] napi_build::setup(); - let options = SqliteConnectOptions::new() - .filename(db_path) - .journal_mode(sqlx::sqlite::SqliteJournalMode::Off) - .locking_mode(sqlx::sqlite::SqliteLockingMode::Exclusive) - .create_if_missing(true); - let pool = sqlx::sqlite::SqlitePoolOptions::new() - .max_connections(1) - .connect_with(options) - .await - .unwrap(); - sqlx::query(affine_schema::SCHEMA) - .execute(&pool) - .await - .unwrap(); Ok(()) } diff --git a/packages/frontend/native/event.d.ts b/packages/frontend/native/event.d.ts deleted file mode 100644 index b7ba335486..0000000000 --- a/packages/frontend/native/event.d.ts +++ /dev/null @@ -1,43 +0,0 @@ -export interface NotifyEvent { - type: EventKind; - paths: string[]; -} - -export type EventKind = - | 'any' - | 'other' - | { - remove: { - kind: 'any' | 'file' | 'folder' | 'other'; - }; - } - | { - create: { - kind: 'any' | 'file' | 'folder' | 'other'; - }; - } - | { - modify: - | { - kind: 'any' | 'other'; - } - | { - kind: 'data'; - mode: 'any' | 'size' | 'content' | 'other'; - } - | { - kind: 'metadata'; - mode: - | 'any' - | 'access-time' - | 'write-time' - | 'permissions' - | 'ownership' - | 'extended' - | 'other'; - } - | { - kind: 'rename'; - mode: 'any' | 'to' | 'from' | 'both' | 'other'; - }; - }; diff --git a/packages/frontend/native/index.d.ts b/packages/frontend/native/index.d.ts index c3611af9bf..dc657ae023 100644 --- a/packages/frontend/native/index.d.ts +++ b/packages/frontend/native/index.d.ts @@ -1,5 +1,43 @@ /* auto-generated by NAPI-RS */ /* eslint-disable */ +export declare class DocStorage { + constructor(path: string) + /** Initialize the database and run migrations. */ + connect(): Promise + close(): Promise + get isClosed(): Promise + /** + * Flush the WAL file to the database file. + * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B + */ + checkpoint(): Promise + validate(): Promise + setSpaceId(spaceId: string): Promise + pushUpdate(docId: string, update: Uint8Array): Promise + getDocSnapshot(docId: string): Promise + setDocSnapshot(snapshot: DocRecord): Promise + getDocUpdates(docId: string): Promise> + markUpdatesMerged(docId: string, updates: Array): Promise + deleteDoc(docId: string): Promise + getDocClocks(after?: Date | undefined | null): Promise> + getDocClock(docId: string): Promise + getBlob(key: string): Promise + setBlob(blob: SetBlob): Promise + deleteBlob(key: string, permanently: boolean): Promise + releaseBlobs(): Promise + listBlobs(): Promise> + getPeerRemoteClocks(peer: string): Promise> + getPeerRemoteClock(peer: string, docId: string): Promise + setPeerRemoteClock(peer: string, docId: string, clock: Date): Promise + getPeerPulledRemoteClocks(peer: string): Promise> + getPeerPulledRemoteClock(peer: string, docId: string): Promise + setPeerPulledRemoteClock(peer: string, docId: string, clock: Date): Promise + getPeerPushedClocks(peer: string): Promise> + getPeerPushedClock(peer: string, docId: string): Promise + setPeerPushedClock(peer: string, docId: string, clock: Date): Promise + clearClocks(): Promise +} + export declare class SqliteConnection { constructor(path: string) connect(): Promise @@ -37,19 +75,57 @@ export declare class SqliteConnection { checkpoint(): Promise } +export interface Blob { + key: string + data: Uint8Array + mime: string + size: number + createdAt: Date +} + export interface BlobRow { key: string data: Buffer timestamp: Date } +export interface DocClock { + docId: string + timestamp: Date +} + +export interface DocRecord { + docId: string + data: Uint8Array + timestamp: Date +} + +export interface DocUpdate { + docId: string + createdAt: Date + data: Uint8Array +} + export interface InsertRow { docId?: string data: Uint8Array } +export interface ListedBlob { + key: string + size: number + mime: string + createdAt: Date +} + export declare function mintChallengeResponse(resource: string, bits?: number | undefined | null): Promise +export interface SetBlob { + key: string + data: Uint8Array + mime: string +} + export interface UpdateRow { id: number timestamp: Date @@ -66,4 +142,3 @@ export declare enum ValidationResult { } export declare function verifyChallengeResponse(response: string, bits: number, resource: string): Promise - diff --git a/packages/frontend/native/index.js b/packages/frontend/native/index.js index c6c15fe67f..378709a9ee 100644 --- a/packages/frontend/native/index.js +++ b/packages/frontend/native/index.js @@ -1,9 +1,12 @@ // prettier-ignore /* eslint-disable */ +// @ts-nocheck /* auto-generated by NAPI-RS */ -const { readFileSync } = require('fs') +const { createRequire } = require('node:module') +require = createRequire(__filename) +const { readFileSync } = require('node:fs') let nativeBinding = null const loadErrors = [] @@ -361,6 +364,7 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } +module.exports.DocStorage = nativeBinding.DocStorage module.exports.SqliteConnection = nativeBinding.SqliteConnection module.exports.mintChallengeResponse = nativeBinding.mintChallengeResponse module.exports.ValidationResult = nativeBinding.ValidationResult diff --git a/packages/frontend/native/nbstore/Cargo.toml b/packages/frontend/native/nbstore/Cargo.toml new file mode 100644 index 0000000000..99293cada0 --- /dev/null +++ b/packages/frontend/native/nbstore/Cargo.toml @@ -0,0 +1,23 @@ +[package] +edition = "2021" +name = "affine_nbstore" +version = "0.0.0" + +[lib] +crate-type = ["rlib", "cdylib"] + +[dependencies] +affine_schema = { path = "../schema" } +anyhow = { workspace = true } +chrono = { workspace = true } +napi = { workspace = true } +napi-derive = { workspace = true } +sqlx = { workspace = true, default-features = false, features = ["chrono", "macros", "migrate", "runtime-tokio", "sqlite", "tls-rustls"] } +tokio = { workspace = true, features = ["full"] } + +[build-dependencies] +affine_schema = { path = "../schema" } +dotenvy = { workspace = true } +napi-build = { workspace = true } +sqlx = { workspace = true, default-features = false, features = ["chrono", "json", "macros", "migrate", "runtime-tokio", "sqlite", "tls-rustls"] } +tokio = { workspace = true, features = ["full"] } \ No newline at end of file diff --git a/packages/frontend/native/nbstore/build.rs b/packages/frontend/native/nbstore/build.rs new file mode 100644 index 0000000000..f6fa8cd5e0 --- /dev/null +++ b/packages/frontend/native/nbstore/build.rs @@ -0,0 +1,33 @@ +use affine_schema::get_migrator; +use sqlx::sqlite::SqliteConnectOptions; +use std::fs; + +#[tokio::main] +async fn main() -> Result<(), std::io::Error> { + napi_build::setup(); + + // always start with a fresh database to have latest db schema + let cwd = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let db_path = format!("{cwd}/affine.db"); + + if fs::metadata(&db_path).is_ok() { + fs::remove_file(&db_path)?; + } + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .journal_mode(sqlx::sqlite::SqliteJournalMode::Off) + .locking_mode(sqlx::sqlite::SqliteLockingMode::Exclusive) + .create_if_missing(true); + let pool = sqlx::sqlite::SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await + .unwrap(); + + get_migrator().run(&pool).await.unwrap(); + + println!("cargo::rustc-env=DATABASE_URL=sqlite://{db_path}"); + + Ok(()) +} diff --git a/packages/frontend/native/nbstore/src/blob.rs b/packages/frontend/native/nbstore/src/blob.rs new file mode 100644 index 0000000000..05e00d26e4 --- /dev/null +++ b/packages/frontend/native/nbstore/src/blob.rs @@ -0,0 +1,198 @@ +use super::{storage::SqliteDocStorage, Blob, ListedBlob, SetBlob}; + +type Result = std::result::Result; + +impl SqliteDocStorage { + pub async fn get_blob(&self, key: String) -> Result> { + sqlx::query_as!( + Blob, + "SELECT key, data, size, mime, created_at FROM blobs WHERE key = ? AND deleted_at IS NULL", + key + ) + .fetch_optional(&self.pool) + .await + } + + pub async fn set_blob(&self, blob: SetBlob) -> Result<()> { + sqlx::query( + r#" + INSERT INTO blobs (key, data, mime, size) + VALUES ($1, $2, $3, $4) + ON CONFLICT(key) + DO UPDATE SET data=$2, mime=$3, size=$4, deleted_at=NULL;"#, + ) + .bind(blob.key) + .bind(blob.data.as_ref()) + .bind(blob.mime) + .bind(blob.data.len() as i64) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn delete_blob(&self, key: String, permanently: bool) -> Result<()> { + if permanently { + sqlx::query("DELETE FROM blobs WHERE key = ?") + .bind(&key) + .execute(&self.pool) + .await?; + } else { + sqlx::query("UPDATE blobs SET deleted_at = CURRENT_TIMESTAMP WHERE key = ?") + .bind(&key) + .execute(&self.pool) + .await?; + } + + Ok(()) + } + + pub async fn release_blobs(&self) -> Result<()> { + sqlx::query("DELETE FROM blobs WHERE deleted_at IS NOT NULL;") + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn list_blobs(&self) -> Result> { + sqlx::query_as!( + ListedBlob, + "SELECT key, size, mime, created_at FROM blobs WHERE deleted_at IS NULL ORDER BY created_at DESC;" + ) + .fetch_all(&self.pool) + .await + } +} + +#[cfg(test)] +mod tests { + use napi::bindgen_prelude::Uint8Array; + use sqlx::Row; + + use super::*; + + async fn get_storage() -> SqliteDocStorage { + let storage = SqliteDocStorage::new(":memory:".to_string()); + storage.connect().await.unwrap(); + + storage + } + + #[tokio::test] + async fn delete_blob() { + let storage = get_storage().await; + + for i in 1..5u32 { + storage + .set_blob(SetBlob { + key: format!("test_{}", i), + data: Uint8Array::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + let result = storage.get_blob("test_1".to_string()).await.unwrap(); + + assert!(result.is_some()); + + storage + .delete_blob("test_".to_string(), false) + .await + .unwrap(); + + let result = storage.get_blob("test".to_string()).await.unwrap(); + assert!(result.is_none()); + + storage + .delete_blob("test_2".to_string(), true) + .await + .unwrap(); + + let result = storage.get_blob("test".to_string()).await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn list_blobs() { + let storage = get_storage().await; + + let blobs = storage.list_blobs().await.unwrap(); + + assert_eq!(blobs.len(), 0); + + for i in 1..5u32 { + storage + .set_blob(SetBlob { + key: format!("test_{}", i), + data: Uint8Array::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + let blobs = storage.list_blobs().await.unwrap(); + + assert_eq!(blobs.len(), 4); + assert_eq!( + blobs.iter().map(|b| b.key.as_str()).collect::>(), + vec!["test_1", "test_2", "test_3", "test_4"] + ); + + storage + .delete_blob("test_2".to_string(), false) + .await + .unwrap(); + + storage + .delete_blob("test_3".to_string(), true) + .await + .unwrap(); + + let query = sqlx::query("SELECT COUNT(*) as len FROM blobs;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(query.get::("len"), 3); + + let blobs = storage.list_blobs().await.unwrap(); + assert_eq!(blobs.len(), 2); + assert_eq!( + blobs.iter().map(|b| b.key.as_str()).collect::>(), + vec!["test_1", "test_4"] + ); + } + + #[tokio::test] + async fn release_blobs() { + let storage = get_storage().await; + + for i in 1..5u32 { + storage + .set_blob(SetBlob { + key: format!("test_{}", i), + data: Uint8Array::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + storage + .delete_blob("test_2".to_string(), false) + .await + .unwrap(); + storage.release_blobs().await.unwrap(); + + let query = sqlx::query("SELECT COUNT(*) as len FROM blobs;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(query.get::("len"), 3); + } +} diff --git a/packages/frontend/native/nbstore/src/doc.rs b/packages/frontend/native/nbstore/src/doc.rs new file mode 100644 index 0000000000..d44b972d0f --- /dev/null +++ b/packages/frontend/native/nbstore/src/doc.rs @@ -0,0 +1,449 @@ +use chrono::NaiveDateTime; +use sqlx::{QueryBuilder, Row}; + +use super::storage::{Result, SqliteDocStorage}; +use super::{DocClock, DocRecord, DocUpdate}; + +struct Meta { + space_id: String, +} + +impl SqliteDocStorage { + pub async fn set_space_id(&self, space_id: String) -> Result<()> { + // ensure only one record exists in table + let result = sqlx::query_as!(Meta, "SELECT * FROM meta;") + .fetch_optional(&self.pool) + .await?; + + match result { + Some(meta) => { + if meta.space_id != space_id { + sqlx::query("UPDATE meta SET space_id = $1;") + .bind(&space_id) + .execute(&self.pool) + .await?; + + sqlx::query("UPDATE updates SET doc_id = $1 WHERE doc_id = $2;") + .bind(&space_id) + .bind(&meta.space_id) + .execute(&self.pool) + .await?; + + sqlx::query("UPDATE snapshots SET doc_id = $1 WHERE doc_id = $2;") + .bind(&space_id) + .bind(&meta.space_id) + .execute(&self.pool) + .await?; + + sqlx::query("UPDATE clocks SET doc_id = $1 WHERE doc_id = $2;") + .bind(&space_id) + .bind(&meta.space_id) + .execute(&self.pool) + .await?; + + sqlx::query("UPDATE peer_clocks SET doc_id = $1 WHERE doc_id = $2;") + .bind(&space_id) + .bind(&meta.space_id) + .execute(&self.pool) + .await?; + } + } + None => { + sqlx::query("INSERT INTO meta (space_id) VALUES ($1);") + .bind(&space_id) + .execute(&self.pool) + .await?; + } + } + + Ok(()) + } + + pub async fn push_update>( + &self, + doc_id: String, + update: Update, + ) -> Result { + let timestamp = chrono::Utc::now().naive_utc(); + let mut tx = self.pool.begin().await?; + + sqlx::query(r#"INSERT INTO updates (doc_id, data, created_at) VALUES ($1, $2, $3);"#) + .bind(&doc_id) + .bind(update.as_ref()) + .bind(timestamp) + .execute(&mut *tx) + .await?; + + sqlx::query( + r#" + INSERT INTO clocks (doc_id, timestamp) VALUES ($1, $2) + ON CONFLICT(doc_id) + DO UPDATE SET timestamp=$2;"#, + ) + .bind(&doc_id) + .bind(timestamp) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(timestamp) + } + + pub async fn get_doc_snapshot(&self, doc_id: String) -> Result> { + sqlx::query_as!( + DocRecord, + "SELECT doc_id, data, updated_at as timestamp FROM snapshots WHERE doc_id = ?", + doc_id + ) + .fetch_optional(&self.pool) + .await + } + + pub async fn set_doc_snapshot(&self, snapshot: DocRecord) -> Result { + let result = sqlx::query( + r#" + INSERT INTO snapshots (doc_id, data, updated_at) + VALUES ($1, $2, $3) + ON CONFLICT(doc_id) + DO UPDATE SET data=$2, updated_at=$3 + WHERE updated_at <= $3;"#, + ) + .bind(snapshot.doc_id) + .bind(snapshot.data.as_ref()) + .bind(snapshot.timestamp) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected() == 1) + } + + pub async fn get_doc_updates(&self, doc_id: String) -> Result> { + sqlx::query_as!( + DocUpdate, + "SELECT doc_id, created_at, data FROM updates WHERE doc_id = ?", + doc_id + ) + .fetch_all(&self.pool) + .await + } + + pub async fn mark_updates_merged( + &self, + doc_id: String, + updates: Vec, + ) -> Result { + let mut qb = QueryBuilder::new("DELETE FROM updates"); + + qb.push(" WHERE doc_id = "); + qb.push_bind(doc_id); + qb.push(" AND created_at IN ("); + let mut separated = qb.separated(", "); + updates.iter().for_each(|update| { + separated.push_bind(update); + }); + qb.push(");"); + + let query = qb.build(); + + let result = query.execute(&self.pool).await?; + + Ok(result.rows_affected() as u32) + } + + pub async fn delete_doc(&self, doc_id: String) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("DELETE FROM updates WHERE doc_id = ?;") + .bind(&doc_id) + .execute(&mut *tx) + .await?; + + sqlx::query("DELETE FROM snapshots WHERE doc_id = ?;") + .bind(&doc_id) + .execute(&mut *tx) + .await?; + + sqlx::query("DELETE FROM clocks WHERE doc_id = ?;") + .bind(&doc_id) + .execute(&mut *tx) + .await?; + + tx.commit().await + } + + pub async fn get_doc_clocks(&self, after: Option) -> Result> { + let query = if let Some(after) = after { + sqlx::query("SELECT doc_id, timestamp FROM clocks WHERE timestamp > $1").bind(after) + } else { + sqlx::query("SELECT doc_id, timestamp FROM clocks") + }; + + let clocks = query.fetch_all(&self.pool).await?; + + Ok( + clocks + .iter() + .map(|row| DocClock { + doc_id: row.get("doc_id"), + timestamp: row.get("timestamp"), + }) + .collect(), + ) + } + + pub async fn get_doc_clock(&self, doc_id: String) -> Result> { + sqlx::query_as!( + DocClock, + "SELECT doc_id, timestamp FROM clocks WHERE doc_id = ?", + doc_id + ) + .fetch_optional(&self.pool) + .await + } +} + +#[cfg(test)] +mod tests { + use chrono::{DateTime, Utc}; + use napi::bindgen_prelude::Uint8Array; + + use super::*; + + async fn get_storage() -> SqliteDocStorage { + let storage = SqliteDocStorage::new(":memory:".to_string()); + storage.connect().await.unwrap(); + + storage + } + + #[tokio::test] + async fn set_space_id() { + let storage = get_storage().await; + + storage.set_space_id("test".to_string()).await.unwrap(); + + let result = sqlx::query!("SELECT space_id FROM meta;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(result.space_id, "test"); + + storage.set_space_id("test2".to_string()).await.unwrap(); + + let result = sqlx::query!("SELECT space_id FROM meta;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(result.space_id, "test2"); + } + + #[tokio::test] + async fn set_space_id_with_existing_doc() { + let storage = get_storage().await; + + storage.set_space_id("test".to_string()).await.unwrap(); + storage + .push_update("test".to_string(), vec![0, 0]) + .await + .unwrap(); + storage + .set_doc_snapshot(DocRecord { + doc_id: "test".to_string(), + data: Uint8Array::from(vec![0, 0]), + timestamp: Utc::now().naive_utc(), + }) + .await + .unwrap(); + + storage + .set_peer_pulled_remote_clock( + "remote".to_string(), + "test".to_string(), + Utc::now().naive_utc(), + ) + .await + .unwrap(); + + storage.set_space_id("new_id".to_string()).await.unwrap(); + + let result = sqlx::query!("SELECT space_id FROM meta;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(result.space_id, "new_id"); + + let clocks = storage.get_doc_clocks(None).await.unwrap(); + + assert_eq!(clocks[0].doc_id, "new_id"); + + let clocks = storage + .get_peer_pulled_remote_clock("remote".to_string(), "new_id".to_string()) + .await + .unwrap(); + + assert_eq!(clocks.doc_id, "new_id"); + + let updates = storage.get_doc_updates("new_id".to_string()).await.unwrap(); + + assert_eq!(updates.len(), 1); + + let snapshot = storage + .get_doc_snapshot("new_id".to_string()) + .await + .unwrap(); + + assert!(snapshot.is_some()); + } + + #[tokio::test] + async fn push_updates() { + let storage = get_storage().await; + + let updates = vec![vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]]; + + for update in updates.iter() { + storage + .push_update("test".to_string(), update) + .await + .unwrap(); + } + + let result = storage.get_doc_updates("test".to_string()).await.unwrap(); + + assert_eq!(result.len(), 4); + assert_eq!( + result.iter().map(|u| u.data.as_ref()).collect::>(), + updates + ); + } + + #[tokio::test] + async fn get_doc_snapshot() { + let storage = get_storage().await; + + let none = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(none.is_none()); + + let snapshot = DocRecord { + doc_id: "test".to_string(), + data: Uint8Array::from(vec![0, 0]), + timestamp: Utc::now().naive_utc(), + }; + + storage.set_doc_snapshot(snapshot).await.unwrap(); + + let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(result.is_some()); + assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); + } + + #[tokio::test] + async fn set_doc_snapshot() { + let storage = get_storage().await; + + let snapshot = DocRecord { + doc_id: "test".to_string(), + data: Uint8Array::from(vec![0, 0]), + timestamp: Utc::now().naive_utc(), + }; + + storage.set_doc_snapshot(snapshot).await.unwrap(); + + let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(result.is_some()); + assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); + + let snapshot = DocRecord { + doc_id: "test".to_string(), + data: Uint8Array::from(vec![0, 1]), + timestamp: DateTime::from_timestamp_millis(Utc::now().timestamp_millis() - 1000) + .unwrap() + .naive_utc(), + }; + + // can't update because it's tempstamp is older + storage.set_doc_snapshot(snapshot).await.unwrap(); + + let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(result.is_some()); + assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); + } + + #[tokio::test] + async fn get_doc_clocks() { + let storage = get_storage().await; + + let clocks = storage.get_doc_clocks(None).await.unwrap(); + + assert_eq!(clocks.len(), 0); + + for i in 1..5u32 { + storage + .push_update(format!("test_{i}"), vec![0, 0]) + .await + .unwrap(); + } + + let clocks = storage.get_doc_clocks(None).await.unwrap(); + + assert_eq!(clocks.len(), 4); + assert_eq!( + clocks.iter().map(|c| c.doc_id.as_str()).collect::>(), + vec!["test_1", "test_2", "test_3", "test_4"] + ); + + let clocks = storage + .get_doc_clocks(Some(Utc::now().naive_utc())) + .await + .unwrap(); + + assert_eq!(clocks.len(), 0); + + let clock = storage.get_doc_clock("test_1".to_string()).await.unwrap(); + + assert!(clock.is_some()); + assert_eq!(clock.unwrap().doc_id, "test_1"); + } + + #[tokio::test] + async fn mark_updates_merged() { + let storage = get_storage().await; + + let updates = [vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]]; + + for update in updates.iter() { + storage + .push_update("test".to_string(), update) + .await + .unwrap(); + } + + let updates = storage.get_doc_updates("test".to_string()).await.unwrap(); + + let result = storage + .mark_updates_merged( + "test".to_string(), + updates + .iter() + .skip(1) + .map(|u| u.created_at) + .collect::>(), + ) + .await + .unwrap(); + + assert_eq!(result, 3); + + let updates = storage.get_doc_updates("test".to_string()).await.unwrap(); + + assert_eq!(updates.len(), 1); + } +} diff --git a/packages/frontend/native/nbstore/src/lib.rs b/packages/frontend/native/nbstore/src/lib.rs new file mode 100644 index 0000000000..4b6e088b3a --- /dev/null +++ b/packages/frontend/native/nbstore/src/lib.rs @@ -0,0 +1,311 @@ +mod blob; +mod doc; +mod storage; +mod sync; + +use chrono::NaiveDateTime; +use napi::bindgen_prelude::*; +use napi_derive::napi; + +fn map_err(err: sqlx::Error) -> napi::Error { + napi::Error::from(anyhow::Error::from(err)) +} + +#[napi(object)] +pub struct DocUpdate { + pub doc_id: String, + pub created_at: NaiveDateTime, + pub data: Uint8Array, +} + +#[napi(object)] +pub struct DocRecord { + pub doc_id: String, + pub data: Uint8Array, + pub timestamp: NaiveDateTime, +} + +#[derive(Debug)] +#[napi(object)] +pub struct DocClock { + pub doc_id: String, + pub timestamp: NaiveDateTime, +} + +#[napi(object)] +pub struct SetBlob { + pub key: String, + pub data: Uint8Array, + pub mime: String, +} + +#[napi(object)] +pub struct Blob { + pub key: String, + pub data: Uint8Array, + pub mime: String, + pub size: i64, + pub created_at: NaiveDateTime, +} + +#[napi(object)] +pub struct ListedBlob { + pub key: String, + pub size: i64, + pub mime: String, + pub created_at: NaiveDateTime, +} + +#[napi] +pub struct DocStorage { + storage: storage::SqliteDocStorage, +} + +#[napi] +impl DocStorage { + #[napi(constructor, async_runtime)] + pub fn new(path: String) -> napi::Result { + Ok(Self { + storage: storage::SqliteDocStorage::new(path), + }) + } + + #[napi] + /// Initialize the database and run migrations. + pub async fn connect(&self) -> napi::Result<()> { + self.storage.connect().await.map_err(map_err) + } + + #[napi] + pub async fn close(&self) -> napi::Result<()> { + self.storage.close().await; + + Ok(()) + } + + #[napi(getter)] + pub async fn is_closed(&self) -> napi::Result { + Ok(self.storage.is_closed()) + } + + /** + * Flush the WAL file to the database file. + * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B + */ + #[napi] + pub async fn checkpoint(&self) -> napi::Result<()> { + self.storage.checkpoint().await.map_err(map_err) + } + + #[napi] + pub async fn validate(&self) -> napi::Result { + self.storage.validate().await.map_err(map_err) + } + + #[napi] + pub async fn set_space_id(&self, space_id: String) -> napi::Result<()> { + self.storage.set_space_id(space_id).await.map_err(map_err) + } + + #[napi] + pub async fn push_update( + &self, + doc_id: String, + update: Uint8Array, + ) -> napi::Result { + self + .storage + .push_update(doc_id, update) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_doc_snapshot(&self, doc_id: String) -> napi::Result> { + self.storage.get_doc_snapshot(doc_id).await.map_err(map_err) + } + + #[napi] + pub async fn set_doc_snapshot(&self, snapshot: DocRecord) -> napi::Result { + self + .storage + .set_doc_snapshot(snapshot) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_doc_updates(&self, doc_id: String) -> napi::Result> { + self.storage.get_doc_updates(doc_id).await.map_err(map_err) + } + + #[napi] + pub async fn mark_updates_merged( + &self, + doc_id: String, + updates: Vec, + ) -> napi::Result { + self + .storage + .mark_updates_merged(doc_id, updates) + .await + .map_err(map_err) + } + + #[napi] + pub async fn delete_doc(&self, doc_id: String) -> napi::Result<()> { + self.storage.delete_doc(doc_id).await.map_err(map_err) + } + + #[napi] + pub async fn get_doc_clocks(&self, after: Option) -> napi::Result> { + self.storage.get_doc_clocks(after).await.map_err(map_err) + } + + #[napi] + pub async fn get_doc_clock(&self, doc_id: String) -> napi::Result> { + self.storage.get_doc_clock(doc_id).await.map_err(map_err) + } + + #[napi] + pub async fn get_blob(&self, key: String) -> napi::Result> { + self.storage.get_blob(key).await.map_err(map_err) + } + + #[napi] + pub async fn set_blob(&self, blob: SetBlob) -> napi::Result<()> { + self.storage.set_blob(blob).await.map_err(map_err) + } + + #[napi] + pub async fn delete_blob(&self, key: String, permanently: bool) -> napi::Result<()> { + self + .storage + .delete_blob(key, permanently) + .await + .map_err(map_err) + } + + #[napi] + pub async fn release_blobs(&self) -> napi::Result<()> { + self.storage.release_blobs().await.map_err(map_err) + } + + #[napi] + pub async fn list_blobs(&self) -> napi::Result> { + self.storage.list_blobs().await.map_err(map_err) + } + + #[napi] + pub async fn get_peer_remote_clocks(&self, peer: String) -> napi::Result> { + self + .storage + .get_peer_remote_clocks(peer) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_peer_remote_clock( + &self, + peer: String, + doc_id: String, + ) -> napi::Result { + self + .storage + .get_peer_remote_clock(peer, doc_id) + .await + .map_err(map_err) + } + + #[napi] + pub async fn set_peer_remote_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> napi::Result<()> { + self + .storage + .set_peer_remote_clock(peer, doc_id, clock) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_peer_pulled_remote_clocks(&self, peer: String) -> napi::Result> { + self + .storage + .get_peer_pulled_remote_clocks(peer) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_peer_pulled_remote_clock( + &self, + peer: String, + doc_id: String, + ) -> napi::Result { + self + .storage + .get_peer_pulled_remote_clock(peer, doc_id) + .await + .map_err(map_err) + } + + #[napi] + pub async fn set_peer_pulled_remote_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> napi::Result<()> { + self + .storage + .set_peer_pulled_remote_clock(peer, doc_id, clock) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_peer_pushed_clocks(&self, peer: String) -> napi::Result> { + self + .storage + .get_peer_pushed_clocks(peer) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_peer_pushed_clock( + &self, + peer: String, + doc_id: String, + ) -> napi::Result { + self + .storage + .get_peer_pushed_clock(peer, doc_id) + .await + .map_err(map_err) + } + + #[napi] + pub async fn set_peer_pushed_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> napi::Result<()> { + self + .storage + .set_peer_pushed_clock(peer, doc_id, clock) + .await + .map_err(map_err) + } + + #[napi] + pub async fn clear_clocks(&self) -> napi::Result<()> { + self.storage.clear_clocks().await.map_err(map_err) + } +} diff --git a/packages/frontend/native/nbstore/src/storage.rs b/packages/frontend/native/nbstore/src/storage.rs new file mode 100644 index 0000000000..0f47075d65 --- /dev/null +++ b/packages/frontend/native/nbstore/src/storage.rs @@ -0,0 +1,129 @@ +use affine_schema::get_migrator; +use sqlx::{ + migrate::MigrateDatabase, + sqlite::{Sqlite, SqliteConnectOptions, SqlitePoolOptions}, + Pool, Row, +}; + +pub type Result = std::result::Result; + +pub struct SqliteDocStorage { + pub pool: Pool, + path: String, +} + +impl SqliteDocStorage { + pub fn new(path: String) -> Self { + let sqlite_options = SqliteConnectOptions::new() + .filename(&path) + .foreign_keys(false) + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); + + let mut pool_options = SqlitePoolOptions::new(); + + if cfg!(test) && path == ":memory:" { + pool_options = pool_options + .min_connections(1) + .max_connections(1) + .idle_timeout(None) + .max_lifetime(None); + } else { + pool_options = pool_options.max_connections(4); + } + + Self { + pool: pool_options.connect_lazy_with(sqlite_options), + path, + } + } + + pub async fn validate(&self) -> Result { + let record = sqlx::query("SELECT * FROM _sqlx_migrations ORDER BY installed_on ASC LIMIT 1;") + .fetch_optional(&self.pool) + .await; + + match record { + Ok(Some(row)) => { + let name: &str = row.try_get("description")?; + Ok(name == "init_v2") + } + _ => return Ok(false), + } + } + + pub async fn connect(&self) -> Result<()> { + if !Sqlite::database_exists(&self.path).await.unwrap_or(false) { + Sqlite::create_database(&self.path).await?; + }; + + self.migrate().await?; + + Ok(()) + } + + async fn migrate(&self) -> Result<()> { + let migrator = get_migrator(); + migrator.run(&self.pool).await?; + + Ok(()) + } + + pub async fn close(&self) { + self.pool.close().await + } + + pub fn is_closed(&self) -> bool { + self.pool.is_closed() + } + + /// + /// Flush the WAL file to the database file. + /// See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B + /// + pub async fn checkpoint(&self) -> Result<()> { + sqlx::query("PRAGMA wal_checkpoint(FULL);") + .execute(&self.pool) + .await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + async fn get_storage() -> SqliteDocStorage { + let storage = SqliteDocStorage::new(":memory:".to_string()); + storage.connect().await.unwrap(); + + storage + } + + #[tokio::test] + async fn init_tables() { + let storage = get_storage().await; + + sqlx::query("INSERT INTO meta (space_id) VALUES ($1);") + .bind("test") + .execute(&storage.pool) + .await + .unwrap(); + + let record = sqlx::query!("SELECT space_id FROM meta;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(record.space_id, "test"); + } + + #[tokio::test] + async fn validate_db() { + let storage = get_storage().await; + assert!(storage.validate().await.unwrap()); + + let storage = SqliteDocStorage::new(":memory:".to_string()); + assert!(!storage.validate().await.unwrap()); + } +} diff --git a/packages/frontend/native/nbstore/src/sync.rs b/packages/frontend/native/nbstore/src/sync.rs new file mode 100644 index 0000000000..37eabc5abe --- /dev/null +++ b/packages/frontend/native/nbstore/src/sync.rs @@ -0,0 +1,289 @@ +use chrono::NaiveDateTime; + +use super::storage::{Result, SqliteDocStorage}; +use super::DocClock; + +impl SqliteDocStorage { + pub async fn get_peer_remote_clocks(&self, peer: String) -> Result> { + sqlx::query_as!( + DocClock, + "SELECT doc_id, remote_clock as timestamp FROM peer_clocks WHERE peer = ?", + peer + ) + .fetch_all(&self.pool) + .await + } + + pub async fn get_peer_remote_clock(&self, peer: String, doc_id: String) -> Result { + sqlx::query_as!( + DocClock, + "SELECT doc_id, remote_clock as timestamp FROM peer_clocks WHERE peer = ? AND doc_id = ?", + peer, + doc_id + ) + .fetch_one(&self.pool) + .await + } + + pub async fn set_peer_remote_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO peer_clocks (peer, doc_id, remote_clock) + VALUES ($1, $2, $3) + ON CONFLICT(peer, doc_id) + DO UPDATE SET remote_clock=$3 WHERE remote_clock < $3;"#, + ) + .bind(peer) + .bind(doc_id) + .bind(clock) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_peer_pulled_remote_clocks(&self, peer: String) -> Result> { + sqlx::query_as!( + DocClock, + "SELECT doc_id, pulled_remote_clock as timestamp FROM peer_clocks WHERE peer = ?", + peer + ) + .fetch_all(&self.pool) + .await + } + + pub async fn get_peer_pulled_remote_clock( + &self, + peer: String, + doc_id: String, + ) -> Result { + sqlx::query_as!( + DocClock, + "SELECT doc_id, pulled_remote_clock as timestamp FROM peer_clocks WHERE peer = ? AND doc_id = ?", + peer, + doc_id + ) + .fetch_one(&self.pool) + .await + } + + pub async fn set_peer_pulled_remote_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO peer_clocks (peer, doc_id, pulled_remote_clock) + VALUES ($1, $2, $3) + ON CONFLICT(peer, doc_id) + DO UPDATE SET pulled_remote_clock=$3 WHERE pulled_remote_clock < $3;"#, + ) + .bind(peer) + .bind(doc_id) + .bind(clock) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_peer_pushed_clocks(&self, peer: String) -> Result> { + sqlx::query_as!( + DocClock, + "SELECT doc_id, pushed_clock as timestamp FROM peer_clocks WHERE peer = ?", + peer + ) + .fetch_all(&self.pool) + .await + } + + pub async fn get_peer_pushed_clock(&self, peer: String, doc_id: String) -> Result { + sqlx::query_as!( + DocClock, + "SELECT doc_id, pushed_clock as timestamp FROM peer_clocks WHERE peer = ? AND doc_id = ?", + peer, + doc_id + ) + .fetch_one(&self.pool) + .await + } + + pub async fn set_peer_pushed_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO peer_clocks (peer, doc_id, pushed_clock) + VALUES ($1, $2, $3) + ON CONFLICT(peer, doc_id) + DO UPDATE SET pushed_clock=$3 WHERE pushed_clock < $3;"#, + ) + .bind(peer) + .bind(doc_id) + .bind(clock) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn clear_clocks(&self) -> Result<()> { + sqlx::query("DELETE FROM peer_clocks;") + .execute(&self.pool) + .await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use chrono::{DateTime, Utc}; + use sqlx::Row; + + use super::*; + + async fn get_storage() -> SqliteDocStorage { + let storage = SqliteDocStorage::new(":memory:".to_string()); + storage.connect().await.unwrap(); + + storage + } + + #[tokio::test] + async fn set_peer_clock() { + let storage = get_storage().await; + let peer = String::from("peer1"); + + let clocks = storage.get_peer_remote_clocks(peer.clone()).await.unwrap(); + + assert!(clocks.is_empty()); + + let clock = Utc::now().naive_utc(); + storage + .set_peer_remote_clock(peer.clone(), "doc1".to_string(), clock) + .await + .unwrap(); + + let clocks = storage.get_peer_remote_clocks(peer.clone()).await.unwrap(); + + assert_eq!(clocks.len(), 1); + assert_eq!(clocks.first().unwrap().doc_id, "doc1"); + assert_eq!(clocks.first().unwrap().timestamp, clock); + } + + #[tokio::test] + async fn set_peer_pushed_clock() { + let storage = get_storage().await; + let peer = String::from("peer1"); + + let clocks = storage.get_peer_pushed_clocks(peer.clone()).await.unwrap(); + + assert!(clocks.is_empty()); + + let clock = Utc::now().naive_utc(); + storage + .set_peer_pushed_clock(peer.clone(), "doc1".to_string(), clock) + .await + .unwrap(); + + let clocks = storage.get_peer_pushed_clocks(peer.clone()).await.unwrap(); + + assert_eq!(clocks.len(), 1); + assert_eq!(clocks.first().unwrap().doc_id, "doc1"); + assert_eq!(clocks.first().unwrap().timestamp, clock); + } + + #[tokio::test] + async fn default_clocks() { + let storage = get_storage().await; + let peer = String::from("peer1"); + + storage + .set_peer_remote_clock(peer.clone(), "doc1".to_string(), Utc::now().naive_utc()) + .await + .unwrap(); + storage + .set_peer_pushed_clock(peer.clone(), "doc2".to_string(), Utc::now().naive_utc()) + .await + .unwrap(); + storage + .set_peer_pulled_remote_clock(peer.clone(), "doc3".to_string(), Utc::now().naive_utc()) + .await + .unwrap(); + + let record = sqlx::query("SELECT * FROM peer_clocks WHERE peer = ? AND doc_id = ?") + .bind(peer.clone()) + .bind("doc1") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!( + record.get::("pushed_clock"), + DateTime::from_timestamp(0, 0).unwrap().naive_utc() + ); + + let record = sqlx::query("SELECT * FROM peer_clocks WHERE peer = ? AND doc_id = ?") + .bind(peer.clone()) + .bind("doc2") + .fetch_one(&storage.pool) + .await + .unwrap(); + assert_eq!( + record.get::("remote_clock"), + DateTime::from_timestamp(0, 0).unwrap().naive_utc() + ); + + let record = sqlx::query("SELECT * FROM peer_clocks WHERE peer = ? AND doc_id = ?") + .bind(peer.clone()) + .bind("doc3") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!( + record.get::("remote_clock"), + DateTime::from_timestamp(0, 0).unwrap().naive_utc() + ); + } + + #[tokio::test] + async fn clear_clocks() { + let storage = get_storage().await; + let peer = String::from("peer1"); + + storage + .set_peer_remote_clock(peer.clone(), "doc1".to_string(), Utc::now().naive_utc()) + .await + .unwrap(); + storage + .set_peer_pushed_clock(peer.clone(), "doc2".to_string(), Utc::now().naive_utc()) + .await + .unwrap(); + + let clocks = storage.get_peer_remote_clocks(peer.clone()).await.unwrap(); + assert_eq!(clocks.len(), 2); + + let clocks = storage.get_peer_pushed_clocks(peer.clone()).await.unwrap(); + assert_eq!(clocks.len(), 2); + + storage.clear_clocks().await.unwrap(); + + let clocks = storage.get_peer_remote_clocks(peer.clone()).await.unwrap(); + assert!(clocks.is_empty()); + + let clocks = storage.get_peer_pushed_clocks(peer.clone()).await.unwrap(); + assert!(clocks.is_empty()); + } +} diff --git a/packages/frontend/native/package.json b/packages/frontend/native/package.json index 88b0ab6910..ff42721bae 100644 --- a/packages/frontend/native/package.json +++ b/packages/frontend/native/package.json @@ -13,9 +13,7 @@ "aarch64-unknown-linux-gnu", "aarch64-pc-windows-msvc" ], - "ts": { - "constEnum": false - } + "constEnum": false }, "license": "MIT", "ava": { diff --git a/packages/frontend/native/schema/Cargo.toml b/packages/frontend/native/schema/Cargo.toml index 6a51d5789b..79e53049df 100644 --- a/packages/frontend/native/schema/Cargo.toml +++ b/packages/frontend/native/schema/Cargo.toml @@ -2,3 +2,6 @@ edition = "2021" name = "affine_schema" version = "0.0.0" + +[dependencies] +sqlx = { workspace = true, default-features = false, features = ["migrate"] } \ No newline at end of file diff --git a/packages/frontend/native/schema/src/lib.rs b/packages/frontend/native/schema/src/lib.rs index d7daa6dda1..3aa0dba713 100644 --- a/packages/frontend/native/schema/src/lib.rs +++ b/packages/frontend/native/schema/src/lib.rs @@ -1,29 +1,96 @@ -// TODO -// dynamic create it from JavaScript side -// and remove this crate then. -pub const SCHEMA: &str = r#"CREATE TABLE IF NOT EXISTS "updates" ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - data BLOB NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, - doc_id TEXT +use std::borrow::Cow; + +use sqlx::migrate::{Migration, MigrationType, Migrator}; + +pub mod v1; + +type SimpleMigration = ( + /* name */ &'static str, + /* up */ &'static str, + /* down */ Option<&'static str>, ); -CREATE TABLE IF NOT EXISTS "blobs" ( - key TEXT PRIMARY KEY NOT NULL, - data BLOB NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL + +// ORDER MATTERS +const MIGRATIONS: &[SimpleMigration] = &[ + // v2 db init + ( + "init_v2", + r#" +CREATE TABLE "meta" ( + space_id VARCHAR PRIMARY KEY NOT NULL ); -CREATE TABLE IF NOT EXISTS "version_info" ( - version NUMBER NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL -); -CREATE TABLE IF NOT EXISTS "server_clock" ( - key TEXT PRIMARY KEY NOT NULL, + +CREATE TABLE "snapshots" ( + doc_id VARCHAR PRIMARY KEY NOT NULL, data BLOB NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL ); -CREATE TABLE IF NOT EXISTS "sync_metadata" ( - key TEXT PRIMARY KEY NOT NULL, + +CREATE TABLE "updates" ( + doc_id VARCHAR NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, data BLOB NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL -) -"#; + PRIMARY KEY (doc_id, created_at) +); + +CREATE TABLE "clocks" ( + doc_id VARCHAR PRIMARY KEY NOT NULL, + timestamp TIMESTAMP NOT NULL +); + +CREATE TABLE "blobs" ( + key VARCHAR PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + mime VARCHAR NOT NULL, + size INTEGER NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + deleted_at TIMESTAMP +); + +CREATE TABLE "peer_clocks" ( + peer VARCHAR NOT NULL, + doc_id VARCHAR NOT NULL, + remote_clock TIMESTAMP NOT NULL DEFAULT 0, + pulled_remote_clock TIMESTAMP NOT NULL DEFAULT 0, + pushed_clock TIMESTAMP NOT NULL DEFAULT 0, + PRIMARY KEY (peer, doc_id) +); +CREATE INDEX peer_clocks_doc_id ON peer_clocks (doc_id); + "#, + None, + ), +]; + +pub fn get_migrator() -> Migrator { + let mut migrations = vec![]; + + MIGRATIONS.iter().for_each(|&(name, up, down)| { + migrations.push(Migration::new( + migrations.len() as i64 + 1, + Cow::from(name), + if down.is_some() { + MigrationType::ReversibleUp + } else { + MigrationType::Simple + }, + Cow::from(up), + false, + )); + + if let Some(down) = down { + migrations.push(Migration::new( + migrations.len() as i64 + 1, + Cow::from(name), + MigrationType::ReversibleDown, + Cow::from(down), + false, + )); + } + }); + + Migrator { + migrations: Cow::Owned(migrations), + ..Migrator::DEFAULT + } +} diff --git a/packages/frontend/native/schema/src/v1.rs b/packages/frontend/native/schema/src/v1.rs new file mode 100644 index 0000000000..fe198e86fa --- /dev/null +++ b/packages/frontend/native/schema/src/v1.rs @@ -0,0 +1,26 @@ +pub const SCHEMA: &str = r#"CREATE TABLE IF NOT EXISTS "updates" ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + data BLOB NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + doc_id TEXT +); +CREATE TABLE IF NOT EXISTS "blobs" ( + key TEXT PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +); +CREATE TABLE IF NOT EXISTS "version_info" ( + version NUMBER NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +); +CREATE TABLE IF NOT EXISTS "server_clock" ( + key TEXT PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +); +CREATE TABLE IF NOT EXISTS "sync_metadata" ( + key TEXT PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +) +"#; diff --git a/packages/frontend/native/sqlite_v1/Cargo.toml b/packages/frontend/native/sqlite_v1/Cargo.toml new file mode 100644 index 0000000000..3dde2e558c --- /dev/null +++ b/packages/frontend/native/sqlite_v1/Cargo.toml @@ -0,0 +1,23 @@ +[package] +edition = "2021" +name = "affine_sqlite_v1" +version = "0.0.0" + +[lib] +crate-type = ["rlib", "cdylib"] + +[dependencies] +affine_schema = { path = "../schema" } +anyhow = { workspace = true } +chrono = { workspace = true } +napi = { workspace = true } +napi-derive = { workspace = true } +sqlx = { workspace = true, default-features = false, features = ["chrono", "macros", "migrate", "runtime-tokio", "sqlite", "tls-rustls"] } +tokio = { workspace = true, features = ["full"] } + +[build-dependencies] +affine_schema = { path = "../schema" } +dotenvy = { workspace = true } +napi-build = { workspace = true } +sqlx = { workspace = true, default-features = false, features = ["chrono", "json", "macros", "migrate", "runtime-tokio", "sqlite", "tls-rustls"] } +tokio = { workspace = true, features = ["full"] } diff --git a/packages/frontend/native/sqlite_v1/build.rs b/packages/frontend/native/sqlite_v1/build.rs new file mode 100644 index 0000000000..fb2ed2b8ce --- /dev/null +++ b/packages/frontend/native/sqlite_v1/build.rs @@ -0,0 +1,35 @@ +use sqlx::sqlite::SqliteConnectOptions; +use std::fs; + +#[tokio::main] +async fn main() -> Result<(), std::io::Error> { + napi_build::setup(); + + // always start with a fresh database to have latest db schema + let cwd = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let db_path = format!("{cwd}/affine.db"); + + if fs::metadata(&db_path).is_ok() { + fs::remove_file(&db_path)?; + } + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .journal_mode(sqlx::sqlite::SqliteJournalMode::Off) + .locking_mode(sqlx::sqlite::SqliteLockingMode::Exclusive) + .create_if_missing(true); + let pool = sqlx::sqlite::SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await + .unwrap(); + + sqlx::query(affine_schema::v1::SCHEMA) + .execute(&pool) + .await + .unwrap(); + + println!("cargo::rustc-env=DATABASE_URL=sqlite://{db_path}"); + + Ok(()) +} diff --git a/packages/frontend/native/src/sqlite/mod.rs b/packages/frontend/native/sqlite_v1/src/lib.rs similarity index 99% rename from packages/frontend/native/src/sqlite/mod.rs rename to packages/frontend/native/sqlite_v1/src/lib.rs index 17293d7ba9..e822c4b919 100644 --- a/packages/frontend/native/src/sqlite/mod.rs +++ b/packages/frontend/native/sqlite_v1/src/lib.rs @@ -68,7 +68,7 @@ impl SqliteConnection { .map_err(anyhow::Error::from)?; }; let mut connection = self.pool.acquire().await.map_err(anyhow::Error::from)?; - sqlx::query(affine_schema::SCHEMA) + sqlx::query(affine_schema::v1::SCHEMA) .execute(connection.as_mut()) .await .map_err(anyhow::Error::from)?; diff --git a/packages/frontend/native/src/lib.rs b/packages/frontend/native/src/lib.rs index fa8de0f14a..f9b3fdb30e 100644 --- a/packages/frontend/native/src/lib.rs +++ b/packages/frontend/native/src/lib.rs @@ -1,2 +1,4 @@ pub mod hashcash; -pub mod sqlite; + +pub use affine_nbstore::*; +pub use affine_sqlite_v1::*; diff --git a/yarn.lock b/yarn.lock index e1a3745d4f..3ce605281b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -516,6 +516,7 @@ __metadata: "@affine/core": "workspace:*" "@affine/i18n": "workspace:*" "@affine/native": "workspace:*" + "@affine/nbstore": "workspace:*" "@blocksuite/affine": "npm:0.18.7" "@electron-forge/cli": "npm:^7.3.0" "@electron-forge/core": "npm:^7.3.0" @@ -724,10 +725,11 @@ __metadata: languageName: unknown linkType: soft -"@affine/nbstore@workspace:packages/common/nbstore": +"@affine/nbstore@workspace:*, @affine/nbstore@workspace:packages/common/nbstore": version: 0.0.0-use.local resolution: "@affine/nbstore@workspace:packages/common/nbstore" dependencies: + "@affine/electron-api": "workspace:*" "@affine/graphql": "workspace:*" "@datastructures-js/binary-search-tree": "npm:^5.3.2" "@toeverything/infra": "workspace:*" @@ -741,6 +743,7 @@ __metadata: vitest: "npm:2.1.8" yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" peerDependencies: + "@affine/electron-api": "workspace:*" "@affine/graphql": "workspace:*" idb: ^8.0.0 socket.io-client: ^4.7.5 @@ -4057,7 +4060,7 @@ __metadata: languageName: node linkType: hard -"@emotion/react@npm:^11.11.4": +"@emotion/react@npm:^11.11.4, @emotion/react@npm:^11.14.0": version: 11.14.0 resolution: "@emotion/react@npm:11.14.0" dependencies: @@ -12937,9 +12940,11 @@ __metadata: "@affine/templates": "workspace:*" "@blocksuite/affine": "npm:0.18.7" "@datastructures-js/binary-search-tree": "npm:^5.3.2" + "@emotion/react": "npm:^11.14.0" "@swc/core": "npm:^1.0.0" "@testing-library/dom": "npm:^10.0.0" "@testing-library/react": "npm:^16.1.0" + "@types/react": "npm:^19.0.1" eventemitter2: "npm:^6.4.9" fake-indexeddb: "npm:^6.0.0" foxact: "npm:^0.2.33" @@ -12951,28 +12956,14 @@ __metadata: jotai-effect: "npm:^1.0.0" lodash-es: "npm:^4.17.21" nanoid: "npm:^5.0.7" - react: "npm:^19.0.0" + react: "npm:19.0.0" rxjs: "npm:^7.8.1" vitest: "npm:2.1.8" yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" zod: "npm:^3.22.4" peerDependencies: - "@affine/templates": "*" - "@swc/core": ^1.0.0 - "@testing-library/dom": ">=7.0.0" electron: "*" - react: ^19.0.0 react-dom: ^19.0.0 - yjs: ^13 - peerDependenciesMeta: - "@affine/templates": - optional: true - electron: - optional: true - react: - optional: true - yjs: - optional: true languageName: unknown linkType: soft @@ -13864,7 +13855,7 @@ __metadata: languageName: node linkType: hard -"@types/react@npm:^19.0.0": +"@types/react@npm:^19.0.0, @types/react@npm:^19.0.1": version: 19.0.1 resolution: "@types/react@npm:19.0.1" dependencies: