mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-12 04:18:54 +00:00
feat(storage): binding jwst storage to node (#2808)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
mutation createWorkspace {
|
||||
createWorkspace {
|
||||
mutation createWorkspace($init: Upload!) {
|
||||
createWorkspace(init: $init) {
|
||||
id
|
||||
public
|
||||
createdAt
|
||||
|
||||
@@ -11,10 +11,10 @@ export const createWorkspaceMutation = {
|
||||
id: 'createWorkspaceMutation' as const,
|
||||
operationName: 'createWorkspace',
|
||||
definitionName: 'createWorkspace',
|
||||
containsFile: false,
|
||||
containsFile: true,
|
||||
query: `
|
||||
mutation createWorkspace {
|
||||
createWorkspace {
|
||||
mutation createWorkspace($init: Upload!) {
|
||||
createWorkspace(init: $init) {
|
||||
id
|
||||
public
|
||||
createdAt
|
||||
|
||||
@@ -46,7 +46,9 @@ export interface UpdateWorkspaceInput {
|
||||
public: InputMaybe<Scalars['Boolean']['input']>;
|
||||
}
|
||||
|
||||
export type CreateWorkspaceMutationVariables = Exact<{ [key: string]: never }>;
|
||||
export type CreateWorkspaceMutationVariables = Exact<{
|
||||
init: Scalars['Upload']['input'];
|
||||
}>;
|
||||
|
||||
export type CreateWorkspaceMutation = {
|
||||
__typename?: 'Mutation';
|
||||
|
||||
4428
packages/storage/Cargo.lock
generated
Normal file
4428
packages/storage/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
27
packages/storage/Cargo.toml
Normal file
27
packages/storage/Cargo.toml
Normal file
@@ -0,0 +1,27 @@
|
||||
[package]
|
||||
name = "affine_storage"
|
||||
version = "1.0.0"
|
||||
edition = "2021"
|
||||
|
||||
# used to avoid sys dep conflict sqlx -> libsqlite-sys
|
||||
[workspace]
|
||||
|
||||
[lib]
|
||||
crate-type = ["cdylib"]
|
||||
|
||||
[dependencies]
|
||||
jwst = { git = "https://github.com/toeverything/OctoBase.git", branch = "master" }
|
||||
jwst-storage = { git = "https://github.com/toeverything/OctoBase.git", branch = "master" }
|
||||
napi = { version = "2", default-features = false, features = [
|
||||
"napi5",
|
||||
"async",
|
||||
] }
|
||||
napi-derive = { version = "2", features = ["type-def"] }
|
||||
yrs = { version = "0.16.5" }
|
||||
|
||||
[build-dependencies]
|
||||
napi-build = "2"
|
||||
|
||||
[patch.crates-io]
|
||||
lib0 = { git = "https://github.com/toeverything/y-crdt", rev = "a700f09" }
|
||||
yrs = { git = "https://github.com/toeverything/y-crdt", rev = "a700f09" }
|
||||
131
packages/storage/__tests__/storage.spec.js
Normal file
131
packages/storage/__tests__/storage.spec.js
Normal file
@@ -0,0 +1,131 @@
|
||||
import assert from 'node:assert';
|
||||
import { beforeEach, describe, test } from 'node:test';
|
||||
|
||||
import { encoding } from 'lib0';
|
||||
import * as Y from 'yjs';
|
||||
|
||||
import { Storage } from '../index.js';
|
||||
|
||||
// update binary by y.doc.text('content').insert('hello world')
|
||||
// prettier-ignore
|
||||
let init = Buffer.from([1,1,160,238,169,240,10,0,4,1,7,99,111,110,116,101,110,116,11,104,101,108,108,111,32,119,111,114,108,100,0])
|
||||
describe('Test jwst storage binding', () => {
|
||||
/** @type { Storage } */
|
||||
let storage;
|
||||
beforeEach(async () => {
|
||||
storage = await Storage.connect('sqlite::memory:', true);
|
||||
});
|
||||
|
||||
test('should be able to create workspace', async () => {
|
||||
const workspace = await storage.createWorkspace('test-workspace', init);
|
||||
|
||||
assert(workspace.id === 'test-workspace');
|
||||
assert.deepEqual(init, await storage.load(workspace.doc.guid));
|
||||
});
|
||||
|
||||
test('should not create workspace with same id', async () => {
|
||||
await storage.createWorkspace('test-workspace', init);
|
||||
await assert.rejects(
|
||||
storage.createWorkspace('test-workspace', init),
|
||||
/Workspace [\w-]+ already exists/
|
||||
);
|
||||
});
|
||||
|
||||
test('should be able to delete workspace', async () => {
|
||||
const workspace = await storage.createWorkspace('test-workspace', init);
|
||||
|
||||
await storage.deleteWorkspace(workspace.id);
|
||||
|
||||
await assert.rejects(
|
||||
storage.load(workspace.doc.guid),
|
||||
/Doc [\w-]+ not exists/
|
||||
);
|
||||
});
|
||||
|
||||
test('should be able to sync update', async () => {
|
||||
const workspace = await storage.createWorkspace('test-workspace', init);
|
||||
|
||||
const update = await storage.load(workspace.doc.guid);
|
||||
assert(update !== null);
|
||||
|
||||
const doc = new Y.Doc();
|
||||
Y.applyUpdate(doc, update);
|
||||
|
||||
let text = doc.getText('content');
|
||||
assert.equal(text.toJSON(), 'hello world');
|
||||
|
||||
const updates = [];
|
||||
doc.on('update', async (/** @type { UInt8Array } */ update) => {
|
||||
updates.push(Buffer.from(update));
|
||||
});
|
||||
|
||||
text.insert(5, ' my');
|
||||
text.insert(14, '!');
|
||||
|
||||
for (const update of updates) {
|
||||
await storage.sync(workspace.id, workspace.doc.guid, update);
|
||||
}
|
||||
|
||||
const update2 = await storage.load(workspace.doc.guid);
|
||||
const doc2 = new Y.Doc();
|
||||
Y.applyUpdate(doc2, update2);
|
||||
|
||||
text = doc2.getText('content');
|
||||
assert.equal(text.toJSON(), 'hello my world!');
|
||||
});
|
||||
|
||||
test('should be able to sync update with guid encoded', async () => {
|
||||
const workspace = await storage.createWorkspace('test-workspace', init);
|
||||
|
||||
const update = await storage.load(workspace.doc.guid);
|
||||
assert(update !== null);
|
||||
|
||||
const doc = new Y.Doc();
|
||||
Y.applyUpdate(doc, update);
|
||||
|
||||
let text = doc.getText('content');
|
||||
assert.equal(text.toJSON(), 'hello world');
|
||||
|
||||
const updates = [];
|
||||
doc.on('update', async (/** @type { UInt8Array } */ update) => {
|
||||
const prefix = encoding.encode(encoder => {
|
||||
encoding.writeVarString(encoder, workspace.doc.guid);
|
||||
});
|
||||
|
||||
updates.push(Buffer.concat([prefix, update]));
|
||||
});
|
||||
|
||||
text.insert(5, ' my');
|
||||
text.insert(14, '!');
|
||||
|
||||
for (const update of updates) {
|
||||
await storage.syncWithGuid(workspace.id, update);
|
||||
}
|
||||
|
||||
const update2 = await storage.load(workspace.doc.guid);
|
||||
const doc2 = new Y.Doc();
|
||||
Y.applyUpdate(doc2, update2);
|
||||
|
||||
text = doc2.getText('content');
|
||||
assert.equal(text.toJSON(), 'hello my world!');
|
||||
});
|
||||
|
||||
test('should be able to store blob', async () => {
|
||||
let workspace = await storage.createWorkspace('test-workspace', init);
|
||||
const blobId = await storage.uploadBlob(workspace.id, Buffer.from([1]));
|
||||
|
||||
assert(blobId !== null);
|
||||
|
||||
let blob = await storage.blob(workspace.id, blobId);
|
||||
|
||||
assert.deepEqual(blob.data, Buffer.from([1]));
|
||||
assert.strictEqual(blob.size, 1);
|
||||
assert.equal(blob.contentType, 'application/octet-stream');
|
||||
|
||||
await storage.uploadBlob(workspace.id, Buffer.from([1, 2, 3, 4, 5]));
|
||||
|
||||
const spaceTaken = await storage.blobsSize(workspace.id);
|
||||
|
||||
assert.equal(spaceTaken, 6);
|
||||
});
|
||||
});
|
||||
3
packages/storage/build.rs
Normal file
3
packages/storage/build.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
fn main() {
|
||||
napi_build::setup();
|
||||
}
|
||||
52
packages/storage/index.d.ts
vendored
Normal file
52
packages/storage/index.d.ts
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
/* auto-generated by NAPI-RS */
|
||||
/* eslint-disable */
|
||||
|
||||
export class Doc {
|
||||
get guid(): string;
|
||||
}
|
||||
|
||||
export class Storage {
|
||||
/** Create a storage instance and establish connection to persist store. */
|
||||
static connect(
|
||||
database: string,
|
||||
debugOnlyAutoMigrate?: boolean | undefined | null
|
||||
): Promise<Storage>;
|
||||
/** Get a workspace by id */
|
||||
getWorkspace(workspaceId: string): Promise<Workspace | null>;
|
||||
/** Create a new workspace with a init update. */
|
||||
createWorkspace(workspaceId: string, init: Buffer): Promise<Workspace>;
|
||||
/** Delete a workspace. */
|
||||
deleteWorkspace(workspaceId: string): Promise<void>;
|
||||
/** Sync doc updates. */
|
||||
sync(workspaceId: string, guid: string, update: Buffer): Promise<void>;
|
||||
/** Sync doc update with doc guid encoded. */
|
||||
syncWithGuid(workspaceId: string, update: Buffer): Promise<void>;
|
||||
/** Load doc as update buffer. */
|
||||
load(guid: string): Promise<Buffer | null>;
|
||||
/** Fetch a workspace blob. */
|
||||
blob(workspaceId: string, name: string): Promise<Blob | null>;
|
||||
/** Upload a blob into workspace storage. */
|
||||
uploadBlob(workspaceId: string, blob: Buffer): Promise<string>;
|
||||
/** Workspace size taken by blobs. */
|
||||
blobsSize(workspaceId: string): Promise<number>;
|
||||
}
|
||||
|
||||
export class Workspace {
|
||||
get doc(): Doc;
|
||||
isEmpty(): boolean;
|
||||
get id(): string;
|
||||
get clientId(): string;
|
||||
search(query: string): Array<SearchResult>;
|
||||
}
|
||||
|
||||
export interface Blob {
|
||||
contentType: string;
|
||||
lastModified: string;
|
||||
size: number;
|
||||
data: Buffer;
|
||||
}
|
||||
|
||||
export interface SearchResult {
|
||||
blockId: string;
|
||||
score: number;
|
||||
}
|
||||
10
packages/storage/index.js
Normal file
10
packages/storage/index.js
Normal file
@@ -0,0 +1,10 @@
|
||||
import { createRequire } from 'node:module';
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
|
||||
/** @type {import('.')} */
|
||||
const binding = require('./storage.node');
|
||||
|
||||
export const Storage = binding.Storage;
|
||||
export const Workspace = binding.Workspace;
|
||||
export const Document = binding.Doc;
|
||||
43
packages/storage/package.json
Normal file
43
packages/storage/package.json
Normal file
@@ -0,0 +1,43 @@
|
||||
{
|
||||
"name": "@affine/storage",
|
||||
"version": "1.0.0",
|
||||
"engines": {
|
||||
"node": ">= 10.16.0 < 11 || >= 11.8.0"
|
||||
},
|
||||
"type": "module",
|
||||
"main": "./index.js",
|
||||
"module": "./index.js",
|
||||
"types": "index.d.ts",
|
||||
"exports": {
|
||||
".": {
|
||||
"require": "./storage.node",
|
||||
"import": "./index.js",
|
||||
"types": "./index.d.ts"
|
||||
}
|
||||
},
|
||||
"napi": {
|
||||
"binaryName": "storage",
|
||||
"targets": [
|
||||
"aarch64-apple-darwin",
|
||||
"aarch64-unknown-linux-gnu",
|
||||
"aarch64-pc-windows-msvc",
|
||||
"x86_64-apple-darwin",
|
||||
"x86_64-pc-windows-msvc",
|
||||
"x86_64-unknown-linux-gnu",
|
||||
"universal-apple-darwin"
|
||||
]
|
||||
},
|
||||
"scripts": {
|
||||
"test": "node --test ./__tests__/**/*.spec.js",
|
||||
"build": "napi build --release --strip",
|
||||
"build:debug": "napi build",
|
||||
"prepublishOnly": "napi prepublish -t npm",
|
||||
"artifacts": "napi artifacts",
|
||||
"version": "napi version"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@napi-rs/cli": "^3.0.0-alpha.4",
|
||||
"lib0": "^0.2.78",
|
||||
"yjs": "^13.6.6"
|
||||
}
|
||||
}
|
||||
284
packages/storage/src/lib.rs
Normal file
284
packages/storage/src/lib.rs
Normal file
@@ -0,0 +1,284 @@
|
||||
#![deny(clippy::all)]
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::{Debug, Display},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
use jwst::{BlobStorage, SearchResult as JwstSearchResult, Workspace as JwstWorkspace, DocStorage};
|
||||
use jwst_storage::{JwstStorage, JwstStorageError};
|
||||
use yrs::{Doc as YDoc, ReadTxn, StateVector, Transact};
|
||||
|
||||
use napi::{bindgen_prelude::*, Error, Result, Status};
|
||||
|
||||
#[macro_use]
|
||||
extern crate napi_derive;
|
||||
|
||||
fn map_err_inner<T, E: Display + Debug>(v: std::result::Result<T, E>, status: Status) -> Result<T> {
|
||||
match v {
|
||||
Ok(val) => Ok(val),
|
||||
Err(e) => {
|
||||
dbg!(&e);
|
||||
Err(Error::new(status, e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! map_err {
|
||||
($val: expr) => {
|
||||
map_err_inner($val, Status::GenericFailure)
|
||||
};
|
||||
($val: expr, $stauts: ident) => {
|
||||
map_err_inner($val, $stauts)
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! napi_wrap {
|
||||
($( ($name: ident, $target: ident) ),*) => {
|
||||
$(
|
||||
#[napi]
|
||||
pub struct $name($target);
|
||||
|
||||
impl std::ops::Deref for $name {
|
||||
type Target = $target;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<$target> for $name {
|
||||
fn from(t: $target) -> Self {
|
||||
Self(t)
|
||||
}
|
||||
}
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
napi_wrap!(
|
||||
(Storage, JwstStorage),
|
||||
(Workspace, JwstWorkspace),
|
||||
(Doc, YDoc)
|
||||
);
|
||||
|
||||
fn to_update_v1(doc: &YDoc) -> Result<Buffer> {
|
||||
let trx = doc.transact();
|
||||
|
||||
map_err!(trx.encode_state_as_update_v1(&StateVector::default())).map(|update| update.into())
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct Blob {
|
||||
pub content_type: String,
|
||||
pub last_modified: String,
|
||||
pub size: i64,
|
||||
pub data: Buffer,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct SearchResult {
|
||||
pub block_id: String,
|
||||
pub score: f64,
|
||||
}
|
||||
|
||||
impl From<JwstSearchResult> for SearchResult {
|
||||
fn from(r: JwstSearchResult) -> Self {
|
||||
Self {
|
||||
block_id: r.block_id,
|
||||
score: r.score as f64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl Storage {
|
||||
/// Create a storage instance and establish connection to persist store.
|
||||
#[napi]
|
||||
pub async fn connect(database: String, debug_only_auto_migrate: Option<bool>) -> Result<Storage> {
|
||||
let inner = match if cfg!(debug_assertions) && debug_only_auto_migrate.unwrap_or(false) {
|
||||
JwstStorage::new_with_migration(&database).await
|
||||
} else {
|
||||
JwstStorage::new(&database).await
|
||||
} {
|
||||
Ok(storage) => storage,
|
||||
Err(JwstStorageError::Db(e)) => {
|
||||
return Err(Error::new(
|
||||
Status::GenericFailure,
|
||||
format!("failed to connect to database: {}", e),
|
||||
))
|
||||
}
|
||||
Err(e) => return Err(Error::new(Status::GenericFailure, e.to_string())),
|
||||
};
|
||||
|
||||
Ok(inner.into())
|
||||
}
|
||||
|
||||
/// Get a workspace by id
|
||||
#[napi]
|
||||
pub async fn get_workspace(&self, workspace_id: String) -> Result<Option<Workspace>> {
|
||||
match self.0.get_workspace(workspace_id).await {
|
||||
Ok(w) => Ok(Some(w.into())),
|
||||
Err(JwstStorageError::WorkspaceNotFound(_)) => Ok(None),
|
||||
Err(e) => Err(Error::new(Status::GenericFailure, e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new workspace with a init update.
|
||||
#[napi]
|
||||
pub async fn create_workspace(&self, workspace_id: String, init: Buffer) -> Result<Workspace> {
|
||||
if map_err!(self.0.docs().detect_workspace(&workspace_id).await)? {
|
||||
return Err(Error::new(
|
||||
Status::GenericFailure,
|
||||
format!("Workspace {} already exists", workspace_id),
|
||||
));
|
||||
}
|
||||
|
||||
let workspace = map_err!(self.0.create_workspace(workspace_id).await)?;
|
||||
|
||||
let init = init.as_ref();
|
||||
let guid = workspace.doc_guid().to_string();
|
||||
map_err!(self.docs().update_doc(workspace.id(), guid, init).await)?;
|
||||
|
||||
Ok(workspace.into())
|
||||
}
|
||||
|
||||
/// Delete a workspace.
|
||||
#[napi]
|
||||
pub async fn delete_workspace(&self, workspace_id: String) -> Result<()> {
|
||||
map_err!(self.docs().delete_workspace(&workspace_id).await)?;
|
||||
map_err!(self.blobs().delete_workspace(workspace_id).await)
|
||||
}
|
||||
|
||||
/// Sync doc updates.
|
||||
#[napi]
|
||||
pub async fn sync(&self, workspace_id: String, guid: String, update: Buffer) -> Result<()> {
|
||||
let update = update.as_ref();
|
||||
map_err!(self.docs().update_doc(workspace_id, guid, update).await)
|
||||
}
|
||||
|
||||
/// Sync doc update with doc guid encoded.
|
||||
#[napi]
|
||||
pub async fn sync_with_guid(&self, workspace_id: String, update: Buffer) -> Result<()> {
|
||||
let update = update.as_ref();
|
||||
map_err!(self.docs().update_doc_with_guid(workspace_id, update).await)
|
||||
}
|
||||
|
||||
/// Load doc as update buffer.
|
||||
#[napi]
|
||||
pub async fn load(&self, guid: String) -> Result<Option<Buffer>> {
|
||||
self.ensure_exists(&guid).await?;
|
||||
|
||||
if let Some(doc) = map_err!(self.docs().get_doc(guid).await)? {
|
||||
Ok(Some(to_update_v1(&doc)?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch a workspace blob.
|
||||
#[napi]
|
||||
pub async fn blob(&self, workspace_id: String, name: String) -> Result<Option<Blob>> {
|
||||
let (id, params) = {
|
||||
let path = PathBuf::from(name.clone());
|
||||
let ext = path
|
||||
.extension()
|
||||
.and_then(|s| s.to_str().map(|s| s.to_string()));
|
||||
let id = path
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str().map(|s| s.to_string()))
|
||||
.unwrap_or(name);
|
||||
|
||||
(id, ext.map(|ext| HashMap::from([("format".into(), ext)])))
|
||||
};
|
||||
|
||||
let Ok(meta) = self.blobs().get_metadata(Some(workspace_id.clone()), id.clone(), params.clone()).await else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let Ok(file) = self.blobs().get_blob(Some(workspace_id), id, params).await else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Ok(Some(Blob {
|
||||
content_type: meta.content_type,
|
||||
last_modified: format!("{:?}", meta.last_modified),
|
||||
size: meta.size,
|
||||
data: file.into(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Upload a blob into workspace storage.
|
||||
#[napi]
|
||||
pub async fn upload_blob(&self, workspace_id: String, blob: Buffer) -> Result<String> {
|
||||
// TODO: can optimize, avoid copy
|
||||
let blob = blob.as_ref().to_vec();
|
||||
map_err!(self.blobs().put_blob(Some(workspace_id), blob).await)
|
||||
}
|
||||
|
||||
/// Workspace size taken by blobs.
|
||||
#[napi]
|
||||
pub async fn blobs_size(&self, workspace_id: String) -> Result<i64> {
|
||||
map_err!(self.blobs().get_blobs_size(workspace_id).await)
|
||||
}
|
||||
|
||||
async fn ensure_exists(&self, guid: &str) -> Result<()> {
|
||||
if map_err!(self.docs().detect_doc(guid).await)? {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::new(
|
||||
Status::GenericFailure,
|
||||
format!("Doc {} not exists", guid),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl Workspace {
|
||||
#[napi(getter)]
|
||||
pub fn doc(&self) -> Doc {
|
||||
self.0.doc().into()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
#[napi(getter)]
|
||||
#[inline]
|
||||
pub fn id(&self) -> String {
|
||||
self.0.id()
|
||||
}
|
||||
|
||||
#[napi(getter)]
|
||||
#[inline]
|
||||
pub fn client_id(&self) -> String {
|
||||
self.0.client_id().to_string()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn search(&self, query: String) -> Result<Vec<SearchResult>> {
|
||||
// TODO: search in all subdocs
|
||||
let result = map_err!(self.0.search(&query))?;
|
||||
|
||||
Ok(
|
||||
result
|
||||
.into_inner()
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl Doc {
|
||||
#[napi(getter)]
|
||||
pub fn guid(&self) -> String {
|
||||
self.0.guid().to_string()
|
||||
}
|
||||
}
|
||||
9
packages/storage/tsconfig.json
Normal file
9
packages/storage/tsconfig.json
Normal file
@@ -0,0 +1,9 @@
|
||||
{
|
||||
"extends": "../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"noEmit": false,
|
||||
"outDir": "lib",
|
||||
"composite": true
|
||||
},
|
||||
"include": ["index.d.ts"]
|
||||
}
|
||||
Reference in New Issue
Block a user