feat(native): provide FSWatcher

This commit is contained in:
LongYinan
2023-05-10 17:16:48 +08:00
parent ee1e50f391
commit e54a5b6128
23 changed files with 1215 additions and 788 deletions

View File

@@ -1,197 +1 @@
# Created by https://www.toptal.com/developers/gitignore/api/node
# Edit at https://www.toptal.com/developers/gitignore?templates=node
### Node ###
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# TypeScript v1 declaration files
typings/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variables file
.env
.env.test
# parcel-bundler cache (https://parceljs.org/)
.cache
# Next.js build output
.next
# Nuxt.js build / generate output
.nuxt
dist
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# End of https://www.toptal.com/developers/gitignore/api/node
# Created by https://www.toptal.com/developers/gitignore/api/macos
# Edit at https://www.toptal.com/developers/gitignore?templates=macos
### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### macOS Patch ###
# iCloud generated files
*.icloud
# End of https://www.toptal.com/developers/gitignore/api/macos
# Created by https://www.toptal.com/developers/gitignore/api/windows
# Edit at https://www.toptal.com/developers/gitignore?templates=windows
### Windows ###
# Windows thumbnail cache files
Thumbs.db
Thumbs.db:encryptable
ehthumbs.db
ehthumbs_vista.db
# Dump file
*.stackdump
# Folder config file
[Dd]esktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msix
*.msm
*.msp
# Windows shortcuts
*.lnk
# End of https://www.toptal.com/developers/gitignore/api/windows
#Added by cargo
/target
Cargo.lock
.pnp.*
.yarn/*
!.yarn/patches
!.yarn/plugins
!.yarn/releases
!.yarn/sdks
!.yarn/versions
*.node
*.fixture

View File

@@ -7,23 +7,25 @@ version = "0.0.0"
crate-type = ["cdylib"]
[dependencies]
anyhow = "1"
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
napi = { version = "2.11.1", default-features = false, features = ["napi4", "tokio_rt"] }
napi-derive = "2.11.0"
jwst = { git = "https://github.com/toeverything/OctoBase", rev = "b701935", package = "jwst" }
jwst-storage = { git = "https://github.com/toeverything/OctoBase", rev = "b701935", package = "jwst-storage", features = [ "sqlite"] }
cloud-database = { git = "https://github.com/toeverything/OctoBase", rev = "b701935", package = "cloud-database", features = [ "sqlite"] }
jwst-rpc = { git = "https://github.com/toeverything/OctoBase", rev = "b701935", package = "jwst-rpc" }
lib0 = "0.16.3"
tokio = "1.24.2"
yrs = "0.16.3"
bytes = "1.3.0"
futures = "^0.3.25"
napi = { version = "2", default-features = false, features = [
"napi4",
"tokio_rt",
"serde-json",
"error_anyhow",
] }
napi-derive = "2"
notify = { version = "5", features = ["serde"] }
parking_lot = "0.12"
serde = "1"
serde_json = "1"
tokio = "1"
uuid = { version = "1", default-features = false, features = [
"serde",
"v4",
"fast-rng",
] }
[build-dependencies]
napi-build = "2.0.1"
[patch.crates-io]
rust-embed = { git = "https://github.com/pyrossh/rust-embed", rev = "7c0fc42" }
lib0 = { git = "https://github.com/toeverything/y-crdt", rev = "a3f7263" }
yrs = { git = "https://github.com/toeverything/y-crdt", rev = "a3f7263" }
napi-build = "2"

View File

@@ -0,0 +1,81 @@
import assert, { doesNotThrow } from 'node:assert';
import { promises as fs } from 'node:fs';
import { join } from 'node:path';
import { test } from 'node:test';
import { fileURLToPath } from 'node:url';
import { lastValueFrom, Subject } from 'rxjs';
import { v4 } from 'uuid';
import type { FSWatcher } from '../index';
import { watch } from '../index.js';
test('fs watch', { concurrency: false }, async t => {
let watcher: FSWatcher;
let fixture: string;
t.beforeEach(async () => {
const fixtureName = `fs-${v4()}.fixture`;
fixture = join(fileURLToPath(import.meta.url), '..', fixtureName);
await fs.writeFile(fixture, '\n');
watcher = watch(fixture);
});
t.afterEach(async () => {
watcher.close();
await fs.unlink(fixture).catch(() => false);
});
await t.test('should watch without error', () => {
doesNotThrow(() => {
const subscription = watcher.subscribe(() => {});
subscription.unsubscribe();
});
});
await t.test('should watch file change', () => {
return (async () => {
const defer = new Subject<void>();
const subscription = watcher.subscribe(
event => {
assert.deepEqual(event.paths, [fixture]);
subscription.unsubscribe();
defer.next();
defer.complete();
},
err => {
subscription.unsubscribe();
defer.error(err);
}
);
await fs.appendFile(fixture, 'test');
return lastValueFrom(defer.asObservable());
})();
});
await t.test('should watch file delete', () => {
return (async () => {
const defer = new Subject<void>();
const subscription = watcher.subscribe(
event => {
if (event.type.remove) {
assert.deepEqual(event.paths, [fixture]);
assert.deepEqual(event.type, {
remove: {
kind: 'file',
},
});
}
subscription.unsubscribe();
defer.next();
defer.complete();
},
err => {
subscription.unsubscribe();
defer.error(err);
}
);
await fs.unlink(fixture);
return lastValueFrom(defer.asObservable());
})();
});
});

View File

@@ -1,5 +1,6 @@
extern crate napi_build;
fn main() {
fn main() -> Result<(), std::io::Error> {
napi_build::setup();
Ok(())
}

View File

@@ -3,18 +3,42 @@
/* auto-generated by NAPI-RS */
export class Storage {
constructor(path: string);
error(): string | null;
getBlob(workspaceId: string | undefined | null, id: string): Promise<Buffer>;
connect(workspaceId: string, remote: string): Workspace | null;
sync(workspaceId: string, remote: string): Workspace;
export interface WatchOptions {
recursive?: boolean;
}
export class Workspace {
constructor(id: string);
id(): string;
clientId(): number;
search(query: string): string;
getSearchIndex(): Array<string>;
setSearchIndex(fields: Array<string>): boolean;
/** Watcher kind enumeration */
export const enum WatcherKind {
/** inotify backend (linux) */
Inotify = 'Inotify',
/** FS-Event backend (mac) */
Fsevent = 'Fsevent',
/** KQueue backend (bsd,optionally mac) */
Kqueue = 'Kqueue',
/** Polling based backend (fallback) */
PollWatcher = 'PollWatcher',
/** Windows backend */
ReadDirectoryChangesWatcher = 'ReadDirectoryChangesWatcher',
/** Fake watcher for testing */
NullWatcher = 'NullWatcher',
Unknown = 'Unknown',
}
export function watch(
p: string,
options?: WatchOptions | undefined | null
): FSWatcher;
export class Subscription {
unsubscribe(): void;
}
export type FSWatcher = FsWatcher;
export class FsWatcher {
get kind(): WatcherKind;
toString(): string;
subscribe(
callback: (value: any) => any,
errorCallback?: (
err: Error | null,
value: undefined
) => any | undefined | null
): Subscription;
close(): void;
}

View File

@@ -36,13 +36,13 @@ switch (platform) {
switch (arch) {
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'octobase.android-arm64.node')
join(__dirname, 'affine.android-arm64.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.android-arm64.node');
nativeBinding = require('./affine.android-arm64.node');
} else {
nativeBinding = require('@affine/octobase-node-android-arm64');
nativeBinding = require('@affine/native-android-arm64');
}
} catch (e) {
loadError = e;
@@ -50,13 +50,13 @@ switch (platform) {
break;
case 'arm':
localFileExisted = existsSync(
join(__dirname, 'octobase.android-arm-eabi.node')
join(__dirname, 'affine.android-arm-eabi.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.android-arm-eabi.node');
nativeBinding = require('./affine.android-arm-eabi.node');
} else {
nativeBinding = require('@affine/octobase-node-android-arm-eabi');
nativeBinding = require('@affine/native-android-arm-eabi');
}
} catch (e) {
loadError = e;
@@ -70,13 +70,13 @@ switch (platform) {
switch (arch) {
case 'x64':
localFileExisted = existsSync(
join(__dirname, 'octobase.win32-x64-msvc.node')
join(__dirname, 'affine.win32-x64-msvc.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.win32-x64-msvc.node');
nativeBinding = require('./affine.win32-x64-msvc.node');
} else {
nativeBinding = require('@affine/octobase-node-win32-x64-msvc');
nativeBinding = require('@affine/native-win32-x64-msvc');
}
} catch (e) {
loadError = e;
@@ -84,13 +84,13 @@ switch (platform) {
break;
case 'ia32':
localFileExisted = existsSync(
join(__dirname, 'octobase.win32-ia32-msvc.node')
join(__dirname, 'affine.win32-ia32-msvc.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.win32-ia32-msvc.node');
nativeBinding = require('./affine.win32-ia32-msvc.node');
} else {
nativeBinding = require('@affine/octobase-node-win32-ia32-msvc');
nativeBinding = require('@affine/native-win32-ia32-msvc');
}
} catch (e) {
loadError = e;
@@ -98,13 +98,13 @@ switch (platform) {
break;
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'octobase.win32-arm64-msvc.node')
join(__dirname, 'affine.win32-arm64-msvc.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.win32-arm64-msvc.node');
nativeBinding = require('./affine.win32-arm64-msvc.node');
} else {
nativeBinding = require('@affine/octobase-node-win32-arm64-msvc');
nativeBinding = require('@affine/native-win32-arm64-msvc');
}
} catch (e) {
loadError = e;
@@ -116,26 +116,26 @@ switch (platform) {
break;
case 'darwin':
localFileExisted = existsSync(
join(__dirname, 'octobase.darwin-universal.node')
join(__dirname, 'affine.darwin-universal.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.darwin-universal.node');
nativeBinding = require('./affine.darwin-universal.node');
} else {
nativeBinding = require('@affine/octobase-node-darwin-universal');
nativeBinding = require('@affine/native-darwin-universal');
}
break;
} catch {}
switch (arch) {
case 'x64':
localFileExisted = existsSync(
join(__dirname, 'octobase.darwin-x64.node')
join(__dirname, 'affine.darwin-x64.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.darwin-x64.node');
nativeBinding = require('./affine.darwin-x64.node');
} else {
nativeBinding = require('@affine/octobase-node-darwin-x64');
nativeBinding = require('@affine/native-darwin-x64');
}
} catch (e) {
loadError = e;
@@ -143,13 +143,13 @@ switch (platform) {
break;
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'octobase.darwin-arm64.node')
join(__dirname, 'affine.darwin-arm64.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.darwin-arm64.node');
nativeBinding = require('./affine.darwin-arm64.node');
} else {
nativeBinding = require('@affine/octobase-node-darwin-arm64');
nativeBinding = require('@affine/native-darwin-arm64');
}
} catch (e) {
loadError = e;
@@ -163,12 +163,12 @@ switch (platform) {
if (arch !== 'x64') {
throw new Error(`Unsupported architecture on FreeBSD: ${arch}`);
}
localFileExisted = existsSync(join(__dirname, 'octobase.freebsd-x64.node'));
localFileExisted = existsSync(join(__dirname, 'affine.freebsd-x64.node'));
try {
if (localFileExisted) {
nativeBinding = require('./octobase.freebsd-x64.node');
nativeBinding = require('./affine.freebsd-x64.node');
} else {
nativeBinding = require('@affine/octobase-node-freebsd-x64');
nativeBinding = require('@affine/native-freebsd-x64');
}
} catch (e) {
loadError = e;
@@ -179,26 +179,26 @@ switch (platform) {
case 'x64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'octobase.linux-x64-musl.node')
join(__dirname, 'affine.linux-x64-musl.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.linux-x64-musl.node');
nativeBinding = require('./affine.linux-x64-musl.node');
} else {
nativeBinding = require('@affine/octobase-node-linux-x64-musl');
nativeBinding = require('@affine/native-linux-x64-musl');
}
} catch (e) {
loadError = e;
}
} else {
localFileExisted = existsSync(
join(__dirname, 'octobase.linux-x64-gnu.node')
join(__dirname, 'affine.linux-x64-gnu.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.linux-x64-gnu.node');
nativeBinding = require('./affine.linux-x64-gnu.node');
} else {
nativeBinding = require('@affine/octobase-node-linux-x64-gnu');
nativeBinding = require('@affine/native-linux-x64-gnu');
}
} catch (e) {
loadError = e;
@@ -208,26 +208,26 @@ switch (platform) {
case 'arm64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'octobase.linux-arm64-musl.node')
join(__dirname, 'affine.linux-arm64-musl.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.linux-arm64-musl.node');
nativeBinding = require('./affine.linux-arm64-musl.node');
} else {
nativeBinding = require('@affine/octobase-node-linux-arm64-musl');
nativeBinding = require('@affine/native-linux-arm64-musl');
}
} catch (e) {
loadError = e;
}
} else {
localFileExisted = existsSync(
join(__dirname, 'octobase.linux-arm64-gnu.node')
join(__dirname, 'affine.linux-arm64-gnu.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.linux-arm64-gnu.node');
nativeBinding = require('./affine.linux-arm64-gnu.node');
} else {
nativeBinding = require('@affine/octobase-node-linux-arm64-gnu');
nativeBinding = require('@affine/native-linux-arm64-gnu');
}
} catch (e) {
loadError = e;
@@ -236,13 +236,13 @@ switch (platform) {
break;
case 'arm':
localFileExisted = existsSync(
join(__dirname, 'octobase.linux-arm-gnueabihf.node')
join(__dirname, 'affine.linux-arm-gnueabihf.node')
);
try {
if (localFileExisted) {
nativeBinding = require('./octobase.linux-arm-gnueabihf.node');
nativeBinding = require('./affine.linux-arm-gnueabihf.node');
} else {
nativeBinding = require('@affine/octobase-node-linux-arm-gnueabihf');
nativeBinding = require('@affine/native-linux-arm-gnueabihf');
}
} catch (e) {
loadError = e;
@@ -263,7 +263,9 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`);
}
const { Storage, Workspace } = nativeBinding;
const { WatcherKind, Subscription, watch, FsWatcher } = nativeBinding;
module.exports.Storage = Storage;
module.exports.Workspace = Workspace;
module.exports.WatcherKind = WatcherKind;
module.exports.Subscription = Subscription;
module.exports.watch = watch;
module.exports.FsWatcher = FsWatcher;

View File

@@ -1,20 +1,28 @@
{
"name": "@affine/octobase-node",
"name": "@affine/native",
"private": true,
"main": "index.js",
"types": "index.d.ts",
"napi": {
"name": "octobase",
"name": "affine",
"triples": {
"additional": [
"aarch64-apple-darwin"
"aarch64-apple-darwin",
"aarch64-unknown-linux-gnu",
"aarch64-pc-windows-msvc"
]
}
},
"license": "MIT",
"devDependencies": {
"@napi-rs/cli": "^2.15.2",
"@types/node": "^18.16.7"
"@types/node": "^18.16.7",
"@types/uuid": "^9.0.1",
"cross-env": "^7.0.3",
"rxjs": "^7.8.1",
"ts-node": "^10.9.1",
"typescript": "^5.0.4",
"uuid": "^9.0.0"
},
"engines": {
"node": ">= 10"
@@ -24,6 +32,7 @@
"build": "napi build --platform --release",
"build:debug": "napi build --platform",
"universal": "napi universal",
"test": "cross-env TS_NODE_PROJECT=./tsconfig.json node --test --loader ts-node/esm --experimental-specifier-resolution=node ./__tests__/**/*.mts",
"version": "napi version"
},
"version": "0.6.0-canary.0"

View File

@@ -1,216 +0,0 @@
use super::DynamicValue;
use jwst::{Block as JwstBlock, Workspace};
use lib0::any::Any;
#[napi()]
pub struct Block {
pub workspace: Workspace,
pub block: JwstBlock,
}
#[napi()]
impl Block {
#[napi(constructor)]
pub fn new(workspace: Workspace, block: JwstBlock) -> Self {
Self { workspace, block }
}
#[napi]
pub fn get(&self, key: String) -> Option<DynamicValue> {
self.workspace
.with_trx(|trx| self.block.get(&trx.trx, &key).map(DynamicValue::new))
}
#[napi]
pub fn children(&self) -> Vec<String> {
self.workspace.with_trx(|trx| self.block.children(&trx.trx))
}
#[napi]
pub fn push_children(&self, block: &Block) {
self.workspace
.with_trx(|mut trx| self.block.push_children(&mut trx.trx, &block.block));
}
#[napi]
pub fn insert_children_at(&self, block: &Block, pos: u32) {
self.workspace.with_trx(|mut trx| {
self.block
.insert_children_at(&mut trx.trx, &block.block, pos)
});
}
#[napi]
pub fn insert_children_before(&self, block: &Block, reference: &str) {
self.workspace.with_trx(|mut trx| {
self.block
.insert_children_before(&mut trx.trx, &block.block, reference)
});
}
#[napi]
pub fn insert_children_after(&self, block: &Block, reference: &str) {
self.workspace.with_trx(|mut trx| {
self.block
.insert_children_after(&mut trx.trx, &block.block, reference)
});
}
#[napi]
pub fn remove_children(&self, block: &Block) {
self.workspace
.with_trx(|mut trx| self.block.remove_children(&mut trx.trx, &block.block));
}
#[napi]
pub fn exists_children(&self, block_id: &str) -> i32 {
self.workspace
.with_trx(|trx| self.block.exists_children(&trx.trx, block_id))
.map(|i| i as i32)
.unwrap_or(-1)
}
#[napi]
pub fn parent(&self) -> String {
self.workspace
.with_trx(|trx| self.block.parent(&trx.trx).unwrap())
}
#[napi]
pub fn updated(&self) -> u64 {
self.workspace.with_trx(|trx| self.block.updated(&trx.trx))
}
#[napi]
pub fn id(&self) -> String {
self.block.block_id()
}
#[napi]
pub fn flavor(&self) -> String {
self.workspace.with_trx(|trx| self.block.flavor(&trx.trx))
}
#[napi]
pub fn version(&self) -> String {
self.workspace.with_trx(|trx| {
let [major, minor] = self.block.version(&trx.trx);
format!("{major}.{minor}")
})
}
#[napi]
pub fn created(&self) -> u64 {
self.workspace.with_trx(|trx| self.block.created(&trx.trx))
}
#[napi]
pub fn set_bool(&self, key: String, value: bool) {
self.workspace
.with_trx(|mut trx| self.block.set(&mut trx.trx, &key, value));
}
#[napi]
pub fn set_string(&self, key: String, value: String) {
self.workspace
.with_trx(|mut trx| self.block.set(&mut trx.trx, &key, value));
}
#[napi]
pub fn set_float(&self, key: String, value: f64) {
self.workspace
.with_trx(|mut trx| self.block.set(&mut trx.trx, &key, value));
}
#[napi]
pub fn set_integer(&self, key: String, value: i64) {
self.workspace
.with_trx(|mut trx| self.block.set(&mut trx.trx, &key, value));
}
#[napi]
pub fn set_null(&self, key: String) {
self.workspace
.with_trx(|mut trx| self.block.set(&mut trx.trx, &key, Any::Null));
}
#[napi]
pub fn is_bool(&self, key: String) -> bool {
self.workspace.with_trx(|trx| {
self.block
.get(&trx.trx, &key)
.map(|a| matches!(a, Any::Bool(_)))
.unwrap_or(false)
})
}
#[napi]
pub fn is_string(&self, key: String) -> bool {
self.workspace.with_trx(|trx| {
self.block
.get(&trx.trx, &key)
.map(|a| matches!(a, Any::String(_)))
.unwrap_or(false)
})
}
#[napi]
pub fn is_float(&self, key: String) -> bool {
self.workspace.with_trx(|trx| {
self.block
.get(&trx.trx, &key)
.map(|a| matches!(a, Any::Number(_)))
.unwrap_or(false)
})
}
#[napi]
pub fn is_integer(&self, key: String) -> bool {
self.workspace.with_trx(|trx| {
self.block
.get(&trx.trx, &key)
.map(|a| matches!(a, Any::BigInt(_)))
.unwrap_or(false)
})
}
#[napi]
pub fn get_bool(&self, key: String) -> Option<i64> {
self.workspace.with_trx(|trx| {
self.block.get(&trx.trx, &key).and_then(|a| match a {
Any::Bool(i) => Some(i.into()),
_ => None,
})
})
}
#[napi]
pub fn get_string(&self, key: String) -> Option<String> {
self.workspace.with_trx(|trx| {
self.block.get(&trx.trx, &key).and_then(|a| match a {
Any::String(i) => Some(i.into()),
_ => None,
})
})
}
#[napi]
pub fn get_float(&self, key: String) -> Option<f64> {
self.workspace.with_trx(|trx| {
self.block.get(&trx.trx, &key).and_then(|a| match a {
Any::Number(i) => Some(i),
_ => None,
})
})
}
#[napi]
pub fn get_integer(&self, key: String) -> Option<i64> {
self.workspace.with_trx(|trx| {
self.block.get(&trx.trx, &key).and_then(|a| match a {
Any::BigInt(i) => Some(i),
_ => None,
})
})
}
}

View File

@@ -1,68 +0,0 @@
use lib0::any::Any;
use std::collections::HashMap;
pub type DynamicValueMap = HashMap<String, DynamicValue>;
pub struct DynamicValue {
any: Any,
}
impl DynamicValue {
pub fn new(any: Any) -> Self {
Self { any }
}
pub fn as_bool(&self) -> Option<bool> {
match self.any {
Any::Bool(value) => Some(value),
_ => None,
}
}
pub fn as_number(&self) -> Option<f64> {
match self.any {
Any::Number(value) => Some(value),
_ => None,
}
}
pub fn as_int(&self) -> Option<i64> {
match self.any {
Any::BigInt(value) => Some(value),
_ => None,
}
}
pub fn as_string(&self) -> Option<String> {
match &self.any {
Any::String(value) => Some(value.to_string()),
_ => None,
}
}
pub fn as_buffer(&self) -> Option<Vec<u8>> {
match &self.any {
Any::Buffer(value) => Some(value.to_vec()),
_ => None,
}
}
pub fn as_array(&self) -> Option<Vec<DynamicValue>> {
match &self.any {
Any::Array(value) => Some(value.iter().map(|a| DynamicValue::new(a.clone())).collect()),
_ => None,
}
}
pub fn as_map(&self) -> Option<HashMap<String, DynamicValue>> {
match &self.any {
Any::Map(value) => Some(
value
.iter()
.map(|(key, value)| (key.clone(), DynamicValue::new(value.clone())))
.collect(),
),
_ => None,
}
}
}

189
packages/native/src/fs.rs Normal file
View File

@@ -0,0 +1,189 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use napi::{
bindgen_prelude::{FromNapiValue, ToNapiValue},
threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use parking_lot::Mutex;
#[napi(object)]
#[derive(Default)]
pub struct WatchOptions {
pub recursive: Option<bool>,
}
#[napi(string_enum)]
/// Watcher kind enumeration
pub enum WatcherKind {
/// inotify backend (linux)
Inotify,
/// FS-Event backend (mac)
Fsevent,
/// KQueue backend (bsd,optionally mac)
Kqueue,
/// Polling based backend (fallback)
PollWatcher,
/// Windows backend
ReadDirectoryChangesWatcher,
/// Fake watcher for testing
NullWatcher,
Unknown,
}
impl From<notify::WatcherKind> for WatcherKind {
fn from(value: notify::WatcherKind) -> Self {
match value {
notify::WatcherKind::Inotify => WatcherKind::Inotify,
notify::WatcherKind::Fsevent => WatcherKind::Fsevent,
notify::WatcherKind::Kqueue => WatcherKind::Kqueue,
notify::WatcherKind::PollWatcher => WatcherKind::PollWatcher,
notify::WatcherKind::ReadDirectoryChangesWatcher => WatcherKind::ReadDirectoryChangesWatcher,
notify::WatcherKind::NullWatcher => WatcherKind::NullWatcher,
_ => WatcherKind::Unknown,
}
}
}
#[napi]
pub struct Subscription {
id: uuid::Uuid,
error_uuid: Option<uuid::Uuid>,
event_emitter: Arc<Mutex<EventEmitter>>,
}
#[napi]
impl Subscription {
#[napi]
#[allow(clippy::inherent_to_string)]
pub fn to_string(&self) -> String {
self.id.to_string()
}
#[napi]
pub fn unsubscribe(&mut self) {
let mut event_emitter = self.event_emitter.lock();
event_emitter.listeners.remove(&self.id);
if let Some(error_uuid) = &self.error_uuid {
event_emitter.error_callbacks.remove(error_uuid);
}
}
}
#[napi]
pub fn watch(p: String, options: Option<WatchOptions>) -> Result<FSWatcher, anyhow::Error> {
let event_emitter = Arc::new(Mutex::new(EventEmitter {
listeners: Default::default(),
error_callbacks: Default::default(),
}));
let event_emitter_in_handler = event_emitter.clone();
let mut watcher: RecommendedWatcher =
notify::recommended_watcher(move |res: notify::Result<Event>| {
event_emitter_in_handler.lock().on(res);
})
.map_err(anyhow::Error::from)?;
let options = options.unwrap_or_default();
watcher
.watch(
Path::new(&p),
if options.recursive == Some(false) {
RecursiveMode::NonRecursive
} else {
RecursiveMode::Recursive
},
)
.map_err(anyhow::Error::from)?;
Ok(FSWatcher {
inner: watcher,
event_emitter,
})
}
#[napi]
pub struct FSWatcher {
inner: RecommendedWatcher,
event_emitter: Arc<Mutex<EventEmitter>>,
}
#[napi]
impl FSWatcher {
#[napi(getter)]
pub fn kind(&self) -> WatcherKind {
RecommendedWatcher::kind().into()
}
#[napi]
pub fn to_string(&self) -> napi::Result<String> {
Ok(format!("{:?}", self.inner))
}
#[napi]
pub fn subscribe(
&mut self,
callback: ThreadsafeFunction<serde_json::Value, ErrorStrategy::Fatal>,
error_callback: Option<ThreadsafeFunction<()>>,
) -> Subscription {
let uuid = uuid::Uuid::new_v4();
let mut event_emitter = self.event_emitter.lock();
event_emitter.listeners.insert(uuid, callback);
let mut error_uuid = None;
if let Some(error_callback) = error_callback {
let uuid = uuid::Uuid::new_v4();
event_emitter.error_callbacks.insert(uuid, error_callback);
error_uuid = Some(uuid);
}
drop(event_emitter);
Subscription {
id: uuid,
error_uuid,
event_emitter: self.event_emitter.clone(),
}
}
#[napi]
pub fn close(&mut self) -> napi::Result<()> {
// drop the previous watcher
self.inner = notify::recommended_watcher(|_| {}).map_err(anyhow::Error::from)?;
self.event_emitter.lock().stop();
Ok(())
}
}
#[derive(Clone)]
struct EventEmitter {
listeners: HashMap<uuid::Uuid, ThreadsafeFunction<serde_json::Value, ErrorStrategy::Fatal>>,
error_callbacks: HashMap<uuid::Uuid, ThreadsafeFunction<()>>,
}
impl EventEmitter {
fn on(&self, event: notify::Result<Event>) {
match event {
Ok(e) => match serde_json::value::to_value(e) {
Err(err) => {
let err: napi::Error = anyhow::Error::from(err).into();
for on_error in self.error_callbacks.values() {
on_error.call(Err(err.clone()), ThreadsafeFunctionCallMode::NonBlocking);
}
}
Ok(v) => {
for on_event in self.listeners.values() {
on_event.call(v.clone(), ThreadsafeFunctionCallMode::NonBlocking);
}
}
},
Err(err) => {
let err: napi::Error = anyhow::Error::from(err).into();
for on_error in self.error_callbacks.values() {
on_error.call(Err(err.clone()), ThreadsafeFunctionCallMode::NonBlocking);
}
}
}
}
fn stop(&mut self) {
self.listeners.clear();
self.error_callbacks.clear();
}
}

View File

@@ -1,12 +1 @@
// mod block;
mod dynamic_value;
mod storage;
mod workspace;
// pub use block::Block;
pub use dynamic_value::{DynamicValue, DynamicValueMap};
pub use storage::Storage;
pub use workspace::Workspace;
#[macro_use]
extern crate napi_derive;
pub mod fs;

View File

@@ -1,125 +0,0 @@
use crate::Workspace;
use jwst::{error, info, BlobStorage, DocStorage};
use jwst_rpc::start_client;
use jwst_storage::JwstStorage as AutoStorage;
use std::sync::Arc;
use tokio::{runtime::Runtime, sync::RwLock};
use napi::bindgen_prelude::*;
use napi::{Error, Result, Status};
#[napi]
#[derive(Clone)]
pub struct Storage {
pub(crate) storage: Option<Arc<RwLock<AutoStorage>>>,
pub(crate) error: Option<String>,
}
#[napi]
impl Storage {
#[napi(constructor)]
pub fn new(path: String) -> Self {
let rt = Runtime::new().unwrap();
// FIXME: do not use block_on
match rt.block_on(AutoStorage::new(&format!("sqlite:{path}?mode=rwc"))) {
Ok(pool) => Self {
storage: Some(Arc::new(RwLock::new(pool))),
error: None,
},
Err(e) => Self {
storage: None,
error: Some(e.to_string()),
},
}
}
#[napi]
pub fn error(&self) -> Option<String> {
self.error.clone()
}
#[napi]
pub async fn get_blob(&self, workspace_id: Option<String>, id: String) -> Result<Buffer> {
if let Some(storage) = &self.storage {
let storage_handle = storage.read().await;
let blobs = storage_handle.blobs();
let blob = blobs.get_blob(workspace_id.clone(), id.clone(), None).await.map_err(|e| {
Error::new(
Status::GenericFailure,
format!(
"Failed to get blob file {}/{} from storage, error: {}",
workspace_id.clone().unwrap_or_default().to_string(),
id,
e
),
)
})?;
Ok(blob.into())
} else {
return Err(Error::new(
Status::GenericFailure,
"Storage is not connected",
));
}
}
#[napi]
pub fn connect(&mut self, workspace_id: String, remote: String) -> Option<Workspace> {
match self.sync(workspace_id, remote) {
Ok(workspace) => Some(workspace),
Err(e) => {
error!("Failed to connect to workspace: {}", e);
self.error = Some(e.to_string());
None
}
}
}
#[napi]
pub fn sync(&self, workspace_id: String, remote: String) -> Result<Workspace> {
if let Some(storage) = &self.storage {
let rt = Runtime::new().unwrap();
// FIXME: do not use block_on
let mut workspace = rt
.block_on(async move {
let storage = storage.read().await;
start_client(&storage, workspace_id, remote).await
})
.map_err(|e| Error::new(Status::GenericFailure, e.to_string()))?;
let (sub, workspace) = {
let id = workspace.id();
let storage = self.storage.clone();
let sub = workspace.observe(move |_, e| {
let id = id.clone();
if let Some(storage) = storage.clone() {
let rt = Runtime::new().unwrap();
info!("update: {:?}", &e.update);
if let Err(e) = rt.block_on(async move {
let storage = storage.write().await;
storage.docs().write_update(id, &e.update).await
}) {
error!("Failed to write update to storage: {}", e);
}
}
});
(sub, workspace)
};
Ok(Workspace {
workspace,
_sub: sub,
})
} else {
Err(Error::new(
Status::GenericFailure,
"Storage is not connected",
))
}
}
}

View File

@@ -1,84 +0,0 @@
// use super::Block;
use jwst::Workspace as JwstWorkspace;
use yrs::UpdateSubscription;
#[napi()]
pub struct Workspace {
pub(crate) workspace: JwstWorkspace,
pub(crate) _sub: Option<UpdateSubscription>,
}
#[napi()]
impl Workspace {
#[napi(constructor)]
pub fn new(id: String) -> Self {
Self {
workspace: JwstWorkspace::new(id),
_sub: None,
}
}
#[napi]
pub fn id(&self) -> String {
self.workspace.id()
}
#[napi]
pub fn client_id(&self) -> i64 {
self.workspace.client_id() as i64
}
// #[napi]
// pub fn get(&self, block_id: String) -> Option<Block> {
// let workspace = self.workspace.clone();
// self.workspace.with_trx(|mut trx| {
// let block = trx
// .get_blocks()
// .get(&trx.trx, &block_id)
// .map(|b| Block::new(workspace, b));
// drop(trx);
// block
// })
// }
// #[napi]
// pub fn create(&self, block_id: String, flavor: String) -> Block {
// let workspace = self.workspace.clone();
// self.workspace.with_trx(|mut trx| {
// let block = Block::new(
// workspace,
// trx.get_blocks().create(&mut trx.trx, block_id, flavor),
// );
// drop(trx);
// block
// })
// }
#[napi]
pub fn search(&self, query: String) -> String {
self.workspace.search_result(query)
}
// #[napi]
// pub fn get_blocks_by_flavour(&self, flavour: &str) -> Vec<Block> {
// self.workspace
// .with_trx(|mut trx| trx.get_blocks().get_blocks_by_flavour(&trx.trx, flavour))
// .iter()
// .map(|block| Block {
// workspace: self.workspace.clone(),
// block: block.clone(),
// })
// .collect()
// }
#[napi]
pub fn get_search_index(&self) -> Vec<String> {
self.workspace.metadata().search_index
}
#[napi]
pub fn set_search_index(&self, fields: Vec<String>) -> bool {
self.workspace.set_search_index(fields)
}
}

View File

@@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"noEmit": true,
"outDir": "lib"
},
"include": ["index.d.ts", "__tests__/**/*.mts"],
"ts-node": {
"esm": true,
"experimentalSpecifierResolution": "node"
}
}