feat!: affine cloud support (#3813)

Co-authored-by: Hongtao Lye <codert.sn@gmail.com>
Co-authored-by: liuyi <forehalo@gmail.com>
Co-authored-by: LongYinan <lynweklm@gmail.com>
Co-authored-by: X1a0t <405028157@qq.com>
Co-authored-by: JimmFly <yangjinfei001@gmail.com>
Co-authored-by: Peng Xiao <pengxiao@outlook.com>
Co-authored-by: xiaodong zuo <53252747+zuoxiaodong0815@users.noreply.github.com>
Co-authored-by: DarkSky <25152247+darkskygit@users.noreply.github.com>
Co-authored-by: Qi <474021214@qq.com>
Co-authored-by: danielchim <kahungchim@gmail.com>
This commit is contained in:
Alex Yang
2023-08-29 05:07:05 -05:00
committed by GitHub
parent d0145c6f38
commit 2f6c4e3696
414 changed files with 19469 additions and 7591 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -3,15 +3,13 @@ name = "affine_storage"
version = "1.0.0"
edition = "2021"
# used to avoid sys dep conflict sqlx -> libsqlite-sys
[workspace]
[lib]
crate-type = ["cdylib"]
[dependencies]
jwst = { git = "https://github.com/toeverything/OctoBase.git", branch = "master" }
jwst-storage = { git = "https://github.com/toeverything/OctoBase.git", branch = "master" }
jwst = { git = "https://github.com/toeverything/OctoBase.git" }
jwst-codec = { git = "https://github.com/toeverything/OctoBase.git" }
jwst-storage = { git = "https://github.com/toeverything/OctoBase.git" }
napi = { version = "2", default-features = false, features = [
"napi5",
"async",
@@ -21,7 +19,3 @@ yrs = { version = "0.16.5" }
[build-dependencies]
napi-build = "2"
[patch.crates-io]
lib0 = { git = "https://github.com/toeverything/y-crdt", rev = "a700f09" }
yrs = { git = "https://github.com/toeverything/y-crdt", rev = "a700f09" }

View File

@@ -142,13 +142,16 @@ describe('Test jwst storage binding', () => {
});
test('should be able to store blob', async () => {
let workspace = await storage.createWorkspace('test-workspace', init);
let workspace = await storage.createWorkspace('test-workspace');
await storage.sync(workspace.id, workspace.doc.guid, init);
const blobId = await storage.uploadBlob(workspace.id, Buffer.from([1]));
assert(blobId !== null);
let blob = await storage.blob(workspace.id, blobId);
let list = await storage.listBlobs(workspace.id);
assert.deepEqual(list, [blobId]);
let blob = await storage.getBlob(workspace.id, blobId);
assert.deepEqual(blob.data, Buffer.from([1]));
assert.strictEqual(blob.size, 1);
assert.equal(blob.contentType, 'application/octet-stream');

View File

@@ -1,44 +1,24 @@
/* auto-generated by NAPI-RS */
/* eslint-disable */
export class Doc {
get guid(): string;
}
export class Storage {
/** Create a storage instance and establish connection to persist store. */
static connect(
database: string,
debugOnlyAutoMigrate?: boolean | undefined | null
): Promise<Storage>;
/** Get a workspace by id */
getWorkspace(workspaceId: string): Promise<Workspace | null>;
/** Create a new workspace with a init update. */
createWorkspace(workspaceId: string, init: Buffer): Promise<Workspace>;
/** Delete a workspace. */
deleteWorkspace(workspaceId: string): Promise<void>;
/** Sync doc updates. */
sync(workspaceId: string, guid: string, update: Buffer): Promise<void>;
/** Sync doc update with doc guid encoded. */
syncWithGuid(workspaceId: string, update: Buffer): Promise<void>;
/** Load doc as update buffer. */
load(guid: string): Promise<Buffer | null>;
/** List all blobs in a workspace. */
listBlobs(workspaceId?: string | undefined | null): Promise<Array<string>>;
/** Fetch a workspace blob. */
blob(workspaceId: string, name: string): Promise<Blob | null>;
getBlob(workspaceId: string, name: string): Promise<Blob | null>;
/** Upload a blob into workspace storage. */
uploadBlob(workspaceId: string, blob: Buffer): Promise<string>;
/** Delete a blob from workspace storage. */
deleteBlob(workspaceId: string, hash: string): Promise<boolean>;
/** Workspace size taken by blobs. */
blobsSize(workspaceId: string): Promise<number>;
}
export class Workspace {
get doc(): Doc;
isEmpty(): boolean;
get id(): string;
get clientId(): string;
search(query: string): Array<SearchResult>;
}
export interface Blob {
contentType: string;
lastModified: string;
@@ -46,7 +26,5 @@ export interface Blob {
data: Buffer;
}
export interface SearchResult {
blockId: string;
score: number;
}
/** Merge updates in form like `Y.applyUpdate(doc, update)` way and return the result binary. */
export function mergeUpdatesInApplyWay(updates: Array<Buffer>): Buffer;

View File

@@ -6,5 +6,4 @@ const require = createRequire(import.meta.url);
const binding = require('./storage.node');
export const Storage = binding.Storage;
export const Workspace = binding.Workspace;
export const Document = binding.Doc;
export const mergeUpdatesInApplyWay = binding.mergeUpdatesInApplyWay;

View File

@@ -17,6 +17,9 @@
},
{
"runtime": "node -v"
},
{
"runtime": "clang --version"
}
],
"outputs": ["{projectRoot}/*.node", "{workspaceRoot}/*.node"]

View File

@@ -6,9 +6,9 @@ use std::{
path::PathBuf,
};
use jwst::{BlobStorage, SearchResult as JwstSearchResult, Workspace as JwstWorkspace, DocStorage};
use jwst_storage::{JwstStorage, JwstStorageError};
use yrs::{Doc as YDoc, ReadTxn, StateVector, Transact};
use jwst::BlobStorage;
use jwst_codec::Doc;
use jwst_storage::{BlobStorageType, JwstStorage, JwstStorageError};
use napi::{bindgen_prelude::*, Error, Result, Status};
@@ -57,17 +57,7 @@ macro_rules! napi_wrap {
};
}
napi_wrap!(
(Storage, JwstStorage),
(Workspace, JwstWorkspace),
(Doc, YDoc)
);
fn to_update_v1(doc: &YDoc) -> Result<Buffer> {
let trx = doc.transact();
map_err!(trx.encode_state_as_update_v1(&StateVector::default())).map(|update| update.into())
}
napi_wrap!((Storage, JwstStorage));
#[napi(object)]
pub struct Blob {
@@ -77,37 +67,22 @@ pub struct Blob {
pub data: Buffer,
}
#[napi(object)]
pub struct SearchResult {
pub block_id: String,
pub score: f64,
}
impl From<JwstSearchResult> for SearchResult {
fn from(r: JwstSearchResult) -> Self {
Self {
block_id: r.block_id,
score: r.score as f64,
}
}
}
#[napi]
impl Storage {
/// Create a storage instance and establish connection to persist store.
#[napi]
pub async fn connect(database: String, debug_only_auto_migrate: Option<bool>) -> Result<Storage> {
let inner = match if cfg!(debug_assertions) && debug_only_auto_migrate.unwrap_or(false) {
JwstStorage::new_with_migration(&database).await
JwstStorage::new_with_migration(&database, BlobStorageType::DB).await
} else {
JwstStorage::new(&database).await
JwstStorage::new(&database, BlobStorageType::DB).await
} {
Ok(storage) => storage,
Err(JwstStorageError::Db(e)) => {
return Err(Error::new(
Status::GenericFailure,
format!("failed to connect to database: {}", e),
))
));
}
Err(e) => return Err(Error::new(Status::GenericFailure, e.to_string())),
};
@@ -115,71 +90,15 @@ impl Storage {
Ok(inner.into())
}
/// Get a workspace by id
/// List all blobs in a workspace.
#[napi]
pub async fn get_workspace(&self, workspace_id: String) -> Result<Option<Workspace>> {
match self.0.get_workspace(workspace_id).await {
Ok(w) => Ok(Some(w.into())),
Err(JwstStorageError::WorkspaceNotFound(_)) => Ok(None),
Err(e) => Err(Error::new(Status::GenericFailure, e.to_string())),
}
}
/// Create a new workspace with a init update.
#[napi]
pub async fn create_workspace(&self, workspace_id: String, init: Buffer) -> Result<Workspace> {
if map_err!(self.0.docs().detect_workspace(&workspace_id).await)? {
return Err(Error::new(
Status::GenericFailure,
format!("Workspace {} already exists", workspace_id),
));
}
let workspace = map_err!(self.0.create_workspace(workspace_id).await)?;
let init = init.as_ref();
let guid = workspace.doc_guid().to_string();
map_err!(self.docs().update_doc(workspace.id(), guid, init).await)?;
Ok(workspace.into())
}
/// Delete a workspace.
#[napi]
pub async fn delete_workspace(&self, workspace_id: String) -> Result<()> {
map_err!(self.docs().delete_workspace(&workspace_id).await)?;
map_err!(self.blobs().delete_workspace(workspace_id).await)
}
/// Sync doc updates.
#[napi]
pub async fn sync(&self, workspace_id: String, guid: String, update: Buffer) -> Result<()> {
let update = update.as_ref();
map_err!(self.docs().update_doc(workspace_id, guid, update).await)
}
/// Sync doc update with doc guid encoded.
#[napi]
pub async fn sync_with_guid(&self, workspace_id: String, update: Buffer) -> Result<()> {
let update = update.as_ref();
map_err!(self.docs().update_doc_with_guid(workspace_id, update).await)
}
/// Load doc as update buffer.
#[napi]
pub async fn load(&self, guid: String) -> Result<Option<Buffer>> {
self.ensure_exists(&guid).await?;
if let Some(doc) = map_err!(self.docs().get_doc(guid).await)? {
Ok(Some(to_update_v1(&doc)?))
} else {
Ok(None)
}
pub async fn list_blobs(&self, workspace_id: Option<String>) -> Result<Vec<String>> {
map_err!(self.blobs().list_blobs(workspace_id).await)
}
/// Fetch a workspace blob.
#[napi]
pub async fn blob(&self, workspace_id: String, name: String) -> Result<Option<Blob>> {
pub async fn get_blob(&self, workspace_id: String, name: String) -> Result<Option<Blob>> {
let (id, params) = {
let path = PathBuf::from(name.clone());
let ext = path
@@ -217,68 +136,28 @@ impl Storage {
map_err!(self.blobs().put_blob(Some(workspace_id), blob).await)
}
/// Delete a blob from workspace storage.
#[napi]
pub async fn delete_blob(&self, workspace_id: String, hash: String) -> Result<bool> {
map_err!(self.blobs().delete_blob(Some(workspace_id), hash).await)
}
/// Workspace size taken by blobs.
#[napi]
pub async fn blobs_size(&self, workspace_id: String) -> Result<i64> {
map_err!(self.blobs().get_blobs_size(workspace_id).await)
}
async fn ensure_exists(&self, guid: &str) -> Result<()> {
if map_err!(self.docs().detect_doc(guid).await)? {
Ok(())
} else {
Err(Error::new(
Status::GenericFailure,
format!("Doc {} not exists", guid),
))
}
}
}
#[napi]
impl Workspace {
#[napi(getter)]
pub fn doc(&self) -> Doc {
self.0.doc().into()
/// Merge updates in form like `Y.applyUpdate(doc, update)` way and return the result binary.
#[napi(catch_unwind)]
pub fn merge_updates_in_apply_way(updates: Vec<Buffer>) -> Result<Buffer> {
let mut doc = Doc::default();
for update in updates {
map_err!(doc.apply_update_from_binary(update.as_ref().to_vec()))?;
}
#[napi]
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
let buf = map_err!(doc.encode_update_v1())?;
#[napi(getter)]
#[inline]
pub fn id(&self) -> String {
self.0.id()
}
#[napi(getter)]
#[inline]
pub fn client_id(&self) -> String {
self.0.client_id().to_string()
}
#[napi]
pub fn search(&self, query: String) -> Result<Vec<SearchResult>> {
// TODO: search in all subdocs
let result = map_err!(self.0.search(&query))?;
Ok(
result
.into_inner()
.into_iter()
.map(Into::into)
.collect::<Vec<_>>(),
)
}
}
#[napi]
impl Doc {
#[napi(getter)]
pub fn guid(&self) -> String {
self.0.guid().to_string()
}
Ok(buf.into())
}