Unverified Commit d319abf3 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: upload/download rust structs directly through NATs object store (#2540)

parent 8380f1bd
......@@ -550,6 +550,15 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.69.5"
......@@ -2029,6 +2038,7 @@ dependencies = [
"async-trait",
"async_zmq",
"axum 0.8.4",
"bincode",
"blake3",
"bytes",
"chrono",
......
......@@ -446,6 +446,15 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.69.5"
......@@ -1245,6 +1254,7 @@ dependencies = [
"derive_builder",
"dialoguer",
"dynamo-async-openai",
"dynamo-parsers",
"dynamo-runtime",
"either",
"erased-serde",
......@@ -1292,6 +1302,19 @@ dependencies = [
"zeromq",
]
[[package]]
name = "dynamo-parsers"
version = "0.4.1"
dependencies = [
"anyhow",
"dynamo-async-openai",
"regex",
"serde",
"serde_json",
"tracing",
"uuid",
]
[[package]]
name = "dynamo-py3"
version = "0.4.1"
......@@ -1335,6 +1358,7 @@ dependencies = [
"async-trait",
"async_zmq",
"axum",
"bincode",
"blake3",
"bytes",
"chrono",
......
......@@ -53,6 +53,7 @@ xxhash-rust = { workspace = true }
arc-swap = { version = "1" }
async-once-cell = { version = "0.5.4" }
bincode = { version = "1" }
console-subscriber = { version = "0.4", optional = true }
educe = { version = "0.6.0" }
figment = { version = "0.10.19", features = ["env", "json", "toml", "test"] }
......
......@@ -36,6 +36,8 @@ use bytes::Bytes;
use derive_builder::Builder;
use futures::{StreamExt, TryStreamExt};
use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Opts, Registry};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use tokio::fs::File as TokioFile;
......@@ -125,20 +127,29 @@ impl Client {
Ok(subscription)
}
/// Upload file to NATS at this URL
pub async fn object_store_upload(&self, filepath: &Path, nats_url: Url) -> anyhow::Result<()> {
let mut disk_file = TokioFile::open(filepath).await?;
let (bucket_name, key) = url_to_bucket_and_key(&nats_url)?;
/// Helper method to get or optionally create an object store bucket
///
/// # Arguments
/// * `bucket_name` - The name of the bucket to retrieve
/// * `create_if_not_found` - If true, creates the bucket when it doesn't exist
///
/// # Returns
/// The object store bucket or an error
async fn get_or_create_bucket(
&self,
bucket_name: &str,
create_if_not_found: bool,
) -> anyhow::Result<jetstream::object_store::ObjectStore> {
let context = self.jetstream();
let bucket = match context.get_object_store(&bucket_name).await {
Ok(bucket) => bucket,
match context.get_object_store(bucket_name).await {
Ok(bucket) => Ok(bucket),
Err(err) if err.to_string().contains("stream not found") => {
// err.source() is GetStreamError, which has a kind() which
// is GetStreamErrorKind::JetStream which wraps a jetstream::Error
// which has code 404. Phew. So yeah check the string for now.
if create_if_not_found {
tracing::debug!("Creating NATS bucket {bucket_name}");
context
.create_object_store(jetstream::object_store::Config {
......@@ -146,12 +157,25 @@ impl Client {
..Default::default()
})
.await
.map_err(|e| anyhow::anyhow!("Failed creating bucket / object store: {e}"))?
.map_err(|e| anyhow::anyhow!("Failed creating bucket / object store: {e}"))
} else {
anyhow::bail!(
"NATS get_object_store bucket does not exist: {bucket_name}. {err}."
);
}
}
Err(err) => {
anyhow::bail!("NATS get_object_store error: {err}");
}
};
}
}
/// Upload file to NATS at this URL
pub async fn object_store_upload(&self, filepath: &Path, nats_url: Url) -> anyhow::Result<()> {
let mut disk_file = TokioFile::open(filepath).await?;
let (bucket_name, key) = url_to_bucket_and_key(&nats_url)?;
let bucket = self.get_or_create_bucket(&bucket_name, true).await?;
let key_meta = async_nats::jetstream::object_store::ObjectMetadata {
name: key.to_string(),
......@@ -173,20 +197,7 @@ impl Client {
let mut disk_file = TokioFile::create(filepath).await?;
let (bucket_name, key) = url_to_bucket_and_key(&nats_url)?;
let context = self.jetstream();
let bucket = match context.get_object_store(&bucket_name).await {
Ok(bucket) => bucket,
Err(err) if err.to_string().contains("stream not found") => {
// err.source() is GetStreamError, which has a kind() which
// is GetStreamErrorKind::JetStream which wraps a jetstream::Error
// which has code 404. Phew. So yeah check the string for now.
anyhow::bail!("NATS get_object_store bucket does not exist: {bucket_name}. {err}.");
}
Err(err) => {
anyhow::bail!("NATS get_object_store error: {err}");
}
};
let bucket = self.get_or_create_bucket(&bucket_name, false).await?;
let mut obj_reader = bucket.get(&key).await.map_err(|e| {
anyhow::anyhow!(
......@@ -210,6 +221,59 @@ impl Client {
Err(err) => Err(anyhow::anyhow!("NATS get_object_store error: {err}")),
}
}
/// Upload a serializable struct to NATS object store using bincode
pub async fn object_store_upload_data<T>(&self, data: &T, nats_url: Url) -> anyhow::Result<()>
where
T: Serialize,
{
// Serialize the data using bincode (more efficient binary format)
let binary_data = bincode::serialize(data)
.map_err(|e| anyhow::anyhow!("Failed to serialize data with bincode: {e}"))?;
let (bucket_name, key) = url_to_bucket_and_key(&nats_url)?;
let bucket = self.get_or_create_bucket(&bucket_name, true).await?;
let key_meta = async_nats::jetstream::object_store::ObjectMetadata {
name: key.to_string(),
..Default::default()
};
// Upload the serialized bytes
let mut cursor = std::io::Cursor::new(binary_data);
bucket.put(key_meta, &mut cursor).await.map_err(|e| {
anyhow::anyhow!("Failed uploading to bucket / object store {bucket_name}/{key}: {e}")
})?;
Ok(())
}
/// Download and deserialize a struct from NATS object store using bincode
pub async fn object_store_download_data<T>(&self, nats_url: Url) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let (bucket_name, key) = url_to_bucket_and_key(&nats_url)?;
let bucket = self.get_or_create_bucket(&bucket_name, false).await?;
let mut obj_reader = bucket.get(&key).await.map_err(|e| {
anyhow::anyhow!(
"Failed downloading from bucket / object store {bucket_name}/{key}: {e}"
)
})?;
// Read all bytes into memory
let mut buffer = Vec::new();
tokio::io::copy(&mut obj_reader, &mut buffer)
.await
.map_err(|e| anyhow::anyhow!("Failed reading object data: {e}"))?;
// Deserialize from bincode
let data = bincode::deserialize(&buffer)
.map_err(|e| anyhow::anyhow!("Failed to deserialize data with bincode: {e}"))?;
Ok(data)
}
}
/// NATS client options
......@@ -615,6 +679,14 @@ mod tests {
use super::*;
use figment::Jail;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestData {
id: u32,
name: String,
values: Vec<f64>,
}
#[test]
fn test_client_options_builder() {
......@@ -660,4 +732,52 @@ mod tests {
Ok(())
});
}
// Integration test for object store data operations using bincode
#[tokio::test]
#[ignore] // Requires NATS server to be running
async fn test_object_store_data_operations() {
// Create test data
let test_data = TestData {
id: 42,
name: "test_item".to_string(),
values: vec![1.0, 2.5, 3.7, 4.2],
};
// Set up client
let client_options = ClientOptions::builder()
.server("nats://localhost:4222")
.build()
.expect("Failed to build client options");
let client = client_options
.connect()
.await
.expect("Failed to connect to NATS");
// Test URL (using .bin extension to indicate binary format)
let url =
Url::parse("nats://localhost/test-bucket/test-data.bin").expect("Failed to parse URL");
// Upload the data
client
.object_store_upload_data(&test_data, url.clone())
.await
.expect("Failed to upload data");
// Download the data
let downloaded_data: TestData = client
.object_store_download_data(url.clone())
.await
.expect("Failed to download data");
// Verify the data matches
assert_eq!(test_data, downloaded_data);
// Clean up
client
.object_store_delete_bucket("test-bucket")
.await
.expect("Failed to delete bucket");
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment