Unverified Commit 6a358f7c authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore(llm): Rename protocols::Endpoint to EndpointId (#2615)

parent 02e59bba
...@@ -13,7 +13,7 @@ use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig; ...@@ -13,7 +13,7 @@ use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig;
use dynamo_llm::local_model::DEFAULT_HTTP_PORT; use dynamo_llm::local_model::DEFAULT_HTTP_PORT;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder}; use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_llm::mocker::protocols::MockEngineArgs; use dynamo_llm::mocker::protocols::MockEngineArgs;
use dynamo_runtime::protocols::Endpoint as EndpointId; use dynamo_runtime::protocols::EndpointId;
use crate::RouterMode; use crate::RouterMode;
...@@ -130,14 +130,7 @@ impl EntrypointArgs { ...@@ -130,14 +130,7 @@ impl EntrypointArgs {
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
extra_engine_args: Option<PathBuf>, extra_engine_args: Option<PathBuf>,
) -> PyResult<Self> { ) -> PyResult<Self> {
let endpoint_id_obj: Option<EndpointId> = match endpoint_id { let endpoint_id_obj: Option<EndpointId> = endpoint_id.as_deref().map(EndpointId::from);
Some(eid) => Some(eid.parse().map_err(|_| {
PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Invalid endpoint_id format: {eid}"
))
})?),
None => None,
};
if (tls_cert_path.is_some() && tls_key_path.is_none()) if (tls_cert_path.is_some() && tls_key_path.is_none())
|| (tls_cert_path.is_none() && tls_key_path.is_some()) || (tls_cert_path.is_none() && tls_key_path.is_some())
{ {
......
...@@ -21,11 +21,12 @@ use crate::{ ...@@ -21,11 +21,12 @@ use crate::{
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ModelEntry { pub struct ModelEntry {
/// Public name of the model /// Public name of the model
/// This will be used to identify the model in the HTTP service from the value used in an an OpenAI ChatRequest. /// Used to identify the model in the HTTP service from the value used in an OpenAI ChatRequest.
pub name: String, pub name: String,
/// How to address this on the network /// How to address this on the network
pub endpoint: protocols::Endpoint, #[serde(rename = "endpoint")]
pub endpoint_id: protocols::EndpointId,
/// Specifies whether the model is a chat, completions, etc model. /// Specifies whether the model is a chat, completions, etc model.
pub model_type: ModelType, pub model_type: ModelType,
...@@ -45,8 +46,8 @@ impl ModelEntry { ...@@ -45,8 +46,8 @@ impl ModelEntry {
matches!(self.model_type, ModelType::Backend) matches!(self.model_type, ModelType::Backend)
} }
/// Fetch the ModelDeploymentCard from NATS. /// Fetch the ModelDeploymentCard from etcd.
/// This does not touch it's fields so you may need to call move_from_nats on it. /// This does not touch its fields so you may need to call move_from_nats on it.
pub async fn load_mdc( pub async fn load_mdc(
&self, &self,
etcd_client: &etcd::Client, etcd_client: &etcd::Client,
......
...@@ -268,7 +268,7 @@ impl ModelWatcher { ...@@ -268,7 +268,7 @@ impl ModelWatcher {
// Handles a PUT event from etcd, this usually means adding a new model to the list of served // Handles a PUT event from etcd, this usually means adding a new model to the list of served
// models. // models.
async fn handle_put(&self, model_entry: &ModelEntry) -> anyhow::Result<()> { async fn handle_put(&self, model_entry: &ModelEntry) -> anyhow::Result<()> {
let endpoint_id = model_entry.endpoint.clone(); let endpoint_id = &model_entry.endpoint_id;
let component = self let component = self
.drt .drt
.namespace(&endpoint_id.namespace)? .namespace(&endpoint_id.namespace)?
......
...@@ -20,7 +20,7 @@ use dynamo_runtime::engine::AsyncEngineStream; ...@@ -20,7 +20,7 @@ use dynamo_runtime::engine::AsyncEngineStream;
use dynamo_runtime::pipeline::{ use dynamo_runtime::pipeline::{
network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source, network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
}; };
use dynamo_runtime::{protocols::Endpoint as EndpointId, DistributedRuntime}; use dynamo_runtime::{protocols::EndpointId, DistributedRuntime};
use crate::entrypoint::EngineConfig; use crate::entrypoint::EngineConfig;
...@@ -141,7 +141,7 @@ pub async fn run( ...@@ -141,7 +141,7 @@ pub async fn run(
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
mod integration_tests { mod integration_tests {
use super::*; use super::*;
use dynamo_runtime::protocols::Endpoint as EndpointId; use dynamo_runtime::protocols::EndpointId;
async fn create_test_environment() -> anyhow::Result<(DistributedRuntime, EngineConfig)> { async fn create_test_environment() -> anyhow::Result<(DistributedRuntime, EngineConfig)> {
// Create a minimal distributed runtime and engine config for testing // Create a minimal distributed runtime and engine config for testing
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{service_v2, RouteDoc}; use super::{service_v2, RouteDoc};
use axum::{http::Method, response::IntoResponse, routing::post, Json, Router}; use axum::{http::Method, response::IntoResponse, routing::post, Json, Router};
...@@ -88,8 +76,8 @@ async fn clear_kv_blocks_handler( ...@@ -88,8 +76,8 @@ async fn clear_kv_blocks_handler(
// create client for each model entry // create client for each model entry
for entry in &model_entries { for entry in &model_entries {
let namespace = &entry.endpoint.namespace; let namespace = &entry.endpoint_id.namespace;
let component = &entry.endpoint.component; let component = &entry.endpoint_id.component;
let entry_name = entry.name.to_string(); let entry_name = entry.name.to_string();
tracing::debug!("Processing worker group: {}/{}", namespace, component); tracing::debug!("Processing worker group: {}/{}", namespace, component);
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{service_v2, RouteDoc}; use super::{service_v2, RouteDoc};
use axum::{http::Method, http::StatusCode, response::IntoResponse, routing::get, Json, Router}; use axum::{http::Method, http::StatusCode, response::IntoResponse, routing::get, Json, Router};
...@@ -104,7 +77,7 @@ async fn health_handler( ...@@ -104,7 +77,7 @@ async fn health_handler(
} else { } else {
let endpoints: Vec<String> = model_entries let endpoints: Vec<String> = model_entries
.iter() .iter()
.map(|entry| entry.endpoint.as_url()) .map(|entry| entry.endpoint_id.as_url())
.collect(); .collect();
( (
StatusCode::OK, StatusCode::OK,
......
...@@ -6,7 +6,7 @@ use std::path::{Path, PathBuf}; ...@@ -6,7 +6,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use anyhow::Context as _; use anyhow::Context as _;
use dynamo_runtime::protocols::Endpoint as EndpointId; use dynamo_runtime::protocols::EndpointId;
use dynamo_runtime::slug::Slug; use dynamo_runtime::slug::Slug;
use dynamo_runtime::traits::DistributedRuntimeProvider; use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::{ use dynamo_runtime::{
...@@ -402,7 +402,7 @@ impl LocalModel { ...@@ -402,7 +402,7 @@ impl LocalModel {
tracing::debug!("Registering with etcd as {network_name}"); tracing::debug!("Registering with etcd as {network_name}");
let model_registration = ModelEntry { let model_registration = ModelEntry {
name: self.display_name().to_string(), name: self.display_name().to_string(),
endpoint: endpoint.id(), endpoint_id: endpoint.id(),
model_type, model_type,
runtime_config: Some(self.runtime_config.clone()), runtime_config: Some(self.runtime_config.clone()),
}; };
......
...@@ -440,7 +440,7 @@ impl AnnotatedMockEngine { ...@@ -440,7 +440,7 @@ impl AnnotatedMockEngine {
pub fn new( pub fn new(
inner: MockVllmEngine, inner: MockVllmEngine,
distributed_runtime: DistributedRuntime, distributed_runtime: DistributedRuntime,
endpoint: dynamo_runtime::protocols::Endpoint, endpoint_id: dynamo_runtime::protocols::EndpointId,
) -> Self { ) -> Self {
let inner = Arc::new(inner); let inner = Arc::new(inner);
let inner_clone = inner.clone(); let inner_clone = inner.clone();
...@@ -449,13 +449,13 @@ impl AnnotatedMockEngine { ...@@ -449,13 +449,13 @@ impl AnnotatedMockEngine {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
// Try to create component // Try to create component
let Ok(namespace) = distributed_runtime.namespace(&endpoint.namespace) else { let Ok(namespace) = distributed_runtime.namespace(&endpoint_id.namespace) else {
tracing::debug!("Namespace not available yet, retrying..."); tracing::debug!("Namespace not available yet, retrying...");
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
continue; continue;
}; };
let Ok(component) = namespace.component(&endpoint.component) else { let Ok(component) = namespace.component(&endpoint_id.component) else {
tracing::debug!("Component not available yet, retrying..."); tracing::debug!("Component not available yet, retrying...");
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
continue; continue;
...@@ -509,13 +509,13 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -509,13 +509,13 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
/// Create a mocker engine as ExecutionContext /// Create a mocker engine as ExecutionContext
pub async fn make_mocker_engine( pub async fn make_mocker_engine(
distributed_runtime: DistributedRuntime, distributed_runtime: DistributedRuntime,
endpoint: dynamo_runtime::protocols::Endpoint, endpoint_id: dynamo_runtime::protocols::EndpointId,
args: MockEngineArgs, args: MockEngineArgs,
) -> Result<crate::backend::ExecutionContext, Error> { ) -> Result<crate::backend::ExecutionContext, Error> {
// Create the mocker engine // Create the mocker engine
tracing::info!("Creating mocker engine with config: {args:?}"); tracing::info!("Creating mocker engine with config: {args:?}");
let annotated_engine = let annotated_engine =
AnnotatedMockEngine::new(MockVllmEngine::new(args), distributed_runtime, endpoint); AnnotatedMockEngine::new(MockVllmEngine::new(args), distributed_runtime, endpoint_id);
Ok(Arc::new(annotated_engine)) Ok(Arc::new(annotated_engine))
} }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Asynchronous Scheduler for LLM Request Management //! Asynchronous Scheduler for LLM Request Management
//! //!
...@@ -207,7 +195,7 @@ impl SchedulerState { ...@@ -207,7 +195,7 @@ impl SchedulerState {
/// Remove a UUID and its associated Request from collections. /// Remove a UUID and its associated Request from collections.
fn complete(&mut self, uuid: &Uuid) { fn complete(&mut self, uuid: &Uuid) {
tracing::debug!("Request {} will complete", uuid); tracing::trace!("Request {uuid} will complete");
self.decode.remove(uuid); self.decode.remove(uuid);
self.requests.remove(uuid); self.requests.remove(uuid);
self.prefill_costs.remove(uuid); self.prefill_costs.remove(uuid);
......
...@@ -192,10 +192,5 @@ mod tests { ...@@ -192,10 +192,5 @@ mod tests {
assert_eq!(format!("{}", output.err().unwrap()), "Test error"); assert_eq!(format!("{}", output.err().unwrap()), "Test error");
assert!(!output.is_ok()); assert!(!output.is_ok());
assert!(output.is_err()); assert!(output.is_err());
let output = LLMEngineOutput::from_err(anyhow::Error::msg("Test error 2").into());
assert_eq!(format!("{}", output.err().unwrap()), "Test error 2");
assert!(!output.is_ok());
assert!(output.is_err());
} }
} }
...@@ -47,7 +47,7 @@ use super::{ ...@@ -47,7 +47,7 @@ use super::{
}; };
use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler}; use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
use crate::protocols::Endpoint as EndpointId; use crate::protocols::EndpointId;
use crate::service::ComponentNatsServerPrometheusMetrics; use crate::service::ComponentNatsServerPrometheusMetrics;
use async_nats::{ use async_nats::{
rustls::quic, rustls::quic,
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::str::FromStr; use std::str::FromStr;
use crate::pipeline::PipelineError;
pub mod annotated; pub mod annotated;
pub mod maybe_error; pub mod maybe_error;
...@@ -43,22 +29,21 @@ pub struct Component { ...@@ -43,22 +29,21 @@ pub struct Component {
/// Represents an endpoint with a namespace, component, and name. /// Represents an endpoint with a namespace, component, and name.
/// ///
/// An `Endpoint` is defined by a three-part string separated by `/` or a '.': /// An [EndpointId] is defined by a three-part string separated by `/` or a '.':
/// - **namespace** /// - **namespace**
/// - **component** /// - **component**
/// - **name** /// - **name**
/// ///
/// Example format: `"namespace/component/endpoint"` /// Example format: `"namespace/component/endpoint"`
/// ///
/// TODO: There is also an Endpoint in runtime/src/component.rs
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Endpoint { pub struct EndpointId {
pub namespace: String, pub namespace: String,
pub component: String, pub component: String,
pub name: String, pub name: String,
} }
impl PartialEq<Vec<&str>> for Endpoint { impl PartialEq<Vec<&str>> for EndpointId {
fn eq(&self, other: &Vec<&str>) -> bool { fn eq(&self, other: &Vec<&str>) -> bool {
if other.len() != 3 { if other.len() != 3 {
return false; return false;
...@@ -68,15 +53,27 @@ impl PartialEq<Vec<&str>> for Endpoint { ...@@ -68,15 +53,27 @@ impl PartialEq<Vec<&str>> for Endpoint {
} }
} }
impl PartialEq<Endpoint> for Vec<&str> { impl PartialEq<[&str; 3]> for EndpointId {
fn eq(&self, other: &Endpoint) -> bool { fn eq(&self, other: &[&str; 3]) -> bool {
self.namespace == other[0] && self.component == other[1] && self.name == other[2]
}
}
impl PartialEq<EndpointId> for [&str; 3] {
fn eq(&self, other: &EndpointId) -> bool {
other == self
}
}
impl PartialEq<EndpointId> for Vec<&str> {
fn eq(&self, other: &EndpointId) -> bool {
other == self other == self
} }
} }
impl Default for Endpoint { impl Default for EndpointId {
fn default() -> Self { fn default() -> Self {
Endpoint { EndpointId {
namespace: DEFAULT_NAMESPACE.to_string(), namespace: DEFAULT_NAMESPACE.to_string(),
component: DEFAULT_COMPONENT.to_string(), component: DEFAULT_COMPONENT.to_string(),
name: DEFAULT_ENDPOINT.to_string(), name: DEFAULT_ENDPOINT.to_string(),
...@@ -84,8 +81,8 @@ impl Default for Endpoint { ...@@ -84,8 +81,8 @@ impl Default for Endpoint {
} }
} }
impl From<&str> for Endpoint { impl From<&str> for EndpointId {
/// Creates an `Endpoint` from a string. /// Creates an [EndpointId] from a string.
/// ///
/// # Arguments /// # Arguments
/// - `path`: A string in the format `"namespace/component/endpoint"`. /// - `path`: A string in the format `"namespace/component/endpoint"`.
...@@ -95,60 +92,86 @@ impl From<&str> for Endpoint { ...@@ -95,60 +92,86 @@ impl From<&str> for Endpoint {
/// Default values are used for missing parts. /// Default values are used for missing parts.
/// ///
/// # Examples: /// # Examples:
/// - "component" -> ["DEFAULT_NS", "component", "DEFAULT_E"] /// - "component" -> ["DEFAULT_NAMESPACE", "component", "DEFAULT_ENDPOINT"]
/// - "namespace.component" -> ["namespace", "component", "DEFAULT_E"] /// - "namespace.component" -> ["namespace", "component", "DEFAULT_ENDPOINT"]
/// - "namespace.component.endpoint" -> ["namespace", "component", "endpoint"] /// - "namespace.component.endpoint" -> ["namespace", "component", "endpoint"]
/// - "namespace/component" -> ["namespace", "component", "DEFAULT_E"] /// - "namespace/component" -> ["namespace", "component", "DEFAULT_ENDPOINT"]
/// - "namespace.component.endpoint.other.parts" -> ["namespace", "component", "endpoint_other_parts"] /// - "namespace.component.endpoint.other.parts" -> ["namespace", "component", "endpoint_other_parts"]
/// ///
/// # Examples /// # Examples
/// ```ignore /// ```
/// use dynamo_runtime:protocols::Endpoint; /// use dynamo_runtime::protocols::EndpointId;
/// ///
/// let endpoint = Endpoint::from("namespace/component/endpoint"); /// let endpoint = EndpointId::from("namespace/component/endpoint");
/// assert_eq!(endpoint.namespace, "namespace"); /// assert_eq!(endpoint.namespace, "namespace");
/// assert_eq!(endpoint.component, "component"); /// assert_eq!(endpoint.component, "component");
/// assert_eq!(endpoint.name, "endpoint"); /// assert_eq!(endpoint.name, "endpoint");
/// ``` /// ```
fn from(input: &str) -> Self { fn from(s: &str) -> Self {
let mut result = Endpoint::default(); let input = s.strip_prefix(ENDPOINT_SCHEME).unwrap_or(s);
// Split the input string on either '.' or '/' // Split the input string on either '.' or '/'
let elements: Vec<&str> = input let mut parts = input
.trim_matches([' ', '/', '.']) .trim_matches([' ', '/', '.'])
.split(['.', '/']) .split(['.', '/'])
.filter(|x| !x.is_empty()) .filter(|x| !x.is_empty());
.collect();
// Extract the first three potential components.
match elements.len() { let p1 = parts.next();
0 => {} let p2 = parts.next();
1 => { let p3 = parts.next();
result.component = elements[0].to_string();
let namespace;
let component;
let name;
match (p1, p2, p3) {
(None, _, _) => {
// 0 elements: all fields remain empty.
// Should this be an error?
namespace = DEFAULT_NAMESPACE.to_string();
component = DEFAULT_COMPONENT.to_string();
name = DEFAULT_ENDPOINT.to_string();
} }
2 => { (Some(c), None, _) => {
result.namespace = elements[0].to_string(); namespace = DEFAULT_NAMESPACE.to_string();
result.component = elements[1].to_string(); component = c.to_string();
name = DEFAULT_ENDPOINT.to_string();
} }
3 => { (Some(ns), Some(c), None) => {
result.namespace = elements[0].to_string(); // 2 elements: namespace, component
result.component = elements[1].to_string(); namespace = ns.to_string();
result.name = elements[2].to_string(); component = c.to_string();
name = DEFAULT_ENDPOINT.to_string();
} }
x if x > 3 => { (Some(ns), Some(c), Some(ep)) => {
result.namespace = elements[0].to_string(); namespace = ns.to_string();
result.component = elements[1].to_string(); component = c.to_string();
result.name = elements[2..].join("_");
// For the 'name' field, we need to handle 'n' and any remaining parts.
// Instead of collecting into a Vec and then joining, we can build the string directly.
let mut endpoint_buf = String::from(ep); // Start with the third part
for part in parts {
// 'parts' iterator continues from where p3 left off
endpoint_buf.push('_');
endpoint_buf.push_str(part);
}
name = endpoint_buf;
} }
_ => unreachable!(),
} }
result
EndpointId {
namespace,
component,
name,
}
} }
} }
impl FromStr for Endpoint { impl FromStr for EndpointId {
type Err = PipelineError; type Err = core::convert::Infallible;
/// Parses an `Endpoint` from a string using the standard Rust `.parse::<T>()` pattern. /// Parses an `EndpointId` from a string using the standard Rust `.parse::<T>()` pattern.
/// ///
/// This is implemented in terms of [`From<&str>`]. /// This is implemented in terms of [`From<&str>`].
/// ///
...@@ -156,25 +179,24 @@ impl FromStr for Endpoint { ...@@ -156,25 +179,24 @@ impl FromStr for Endpoint {
/// Does not fail /// Does not fail
/// ///
/// # Examples /// # Examples
/// ```ignore /// ```
/// use std::str::FromStr; /// use std::str::FromStr;
/// use dynamo_runtime:protocols::Endpoint; /// use dynamo_runtime::protocols::EndpointId;
/// ///
/// let endpoint: Endpoint = "namespace/component/endpoint".parse().unwrap(); /// let endpoint: EndpointId = "namespace/component/endpoint".parse().unwrap();
/// assert_eq!(endpoint.namespace, "namespace"); /// assert_eq!(endpoint.namespace, "namespace");
/// assert_eq!(endpoint.component, "component"); /// assert_eq!(endpoint.component, "component");
/// assert_eq!(endpoint.name, "endpoint"); /// assert_eq!(endpoint.name, "endpoint");
/// let endpoint: Endpoint = "dyn://namespace/component/endpoint".parse().unwrap(); /// let endpoint: EndpointId = "dyn://namespace/component/endpoint".parse().unwrap();
/// // same as above /// // same as above
/// assert_eq!(endpoint.name, "endpoint"); /// assert_eq!(endpoint.name, "endpoint");
/// ``` /// ```
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
let cleaned = s.strip_prefix(ENDPOINT_SCHEME).unwrap_or(s); Ok(EndpointId::from(s))
Ok(Endpoint::from(cleaned))
} }
} }
impl Endpoint { impl EndpointId {
/// As a String like dyn://dynamo.internal.worker /// As a String like dyn://dynamo.internal.worker
pub fn as_url(&self) -> String { pub fn as_url(&self) -> String {
format!( format!(
...@@ -193,7 +215,7 @@ mod tests { ...@@ -193,7 +215,7 @@ mod tests {
#[test] #[test]
fn test_valid_endpoint_from() { fn test_valid_endpoint_from() {
let input = "namespace1/component1/endpoint1"; let input = "namespace1/component1/endpoint1";
let endpoint = Endpoint::from(input); let endpoint = EndpointId::from(input);
assert_eq!(endpoint.namespace, "namespace1"); assert_eq!(endpoint.namespace, "namespace1");
assert_eq!(endpoint.component, "component1"); assert_eq!(endpoint.component, "component1");
...@@ -203,7 +225,7 @@ mod tests { ...@@ -203,7 +225,7 @@ mod tests {
#[test] #[test]
fn test_valid_endpoint_from_str() { fn test_valid_endpoint_from_str() {
let input = "namespace2/component2/endpoint2"; let input = "namespace2/component2/endpoint2";
let endpoint = Endpoint::from_str(input).unwrap(); let endpoint = EndpointId::from_str(input).unwrap();
assert_eq!(endpoint.namespace, "namespace2"); assert_eq!(endpoint.namespace, "namespace2");
assert_eq!(endpoint.component, "component2"); assert_eq!(endpoint.component, "component2");
...@@ -213,7 +235,7 @@ mod tests { ...@@ -213,7 +235,7 @@ mod tests {
#[test] #[test]
fn test_valid_endpoint_parse() { fn test_valid_endpoint_parse() {
let input = "namespace3/component3/endpoint3"; let input = "namespace3/component3/endpoint3";
let endpoint: Endpoint = input.parse().unwrap(); let endpoint: EndpointId = input.parse().unwrap();
assert_eq!(endpoint.namespace, "namespace3"); assert_eq!(endpoint.namespace, "namespace3");
assert_eq!(endpoint.component, "component3"); assert_eq!(endpoint.component, "component3");
...@@ -222,7 +244,7 @@ mod tests { ...@@ -222,7 +244,7 @@ mod tests {
#[test] #[test]
fn test_endpoint_from() { fn test_endpoint_from() {
let result = Endpoint::from("component"); let result = EndpointId::from("component");
assert_eq!( assert_eq!(
result, result,
vec![DEFAULT_NAMESPACE, "component", DEFAULT_ENDPOINT] vec![DEFAULT_NAMESPACE, "component", DEFAULT_ENDPOINT]
...@@ -231,19 +253,19 @@ mod tests { ...@@ -231,19 +253,19 @@ mod tests {
#[test] #[test]
fn test_namespace_component_endpoint() { fn test_namespace_component_endpoint() {
let result = Endpoint::from("namespace.component.endpoint"); let result = EndpointId::from("namespace.component.endpoint");
assert_eq!(result, vec!["namespace", "component", "endpoint"]); assert_eq!(result, vec!["namespace", "component", "endpoint"]);
} }
#[test] #[test]
fn test_forward_slash_separator() { fn test_forward_slash_separator() {
let result = Endpoint::from("namespace/component"); let result = EndpointId::from("namespace/component");
assert_eq!(result, vec!["namespace", "component", DEFAULT_ENDPOINT]); assert_eq!(result, vec!["namespace", "component", DEFAULT_ENDPOINT]);
} }
#[test] #[test]
fn test_multiple_parts() { fn test_multiple_parts() {
let result = Endpoint::from("namespace.component.endpoint.other.parts"); let result = EndpointId::from("namespace.component.endpoint.other.parts");
assert_eq!( assert_eq!(
result, result,
vec!["namespace", "component", "endpoint_other_parts"] vec!["namespace", "component", "endpoint_other_parts"]
...@@ -253,23 +275,31 @@ mod tests { ...@@ -253,23 +275,31 @@ mod tests {
#[test] #[test]
fn test_mixed_separators() { fn test_mixed_separators() {
// Do it the .into way for variety and documentation // Do it the .into way for variety and documentation
let result: Endpoint = "namespace/component.endpoint".into(); let result: EndpointId = "namespace/component.endpoint".into();
assert_eq!(result, vec!["namespace", "component", "endpoint"]); assert_eq!(result, vec!["namespace", "component", "endpoint"]);
} }
#[test] #[test]
fn test_empty_string() { fn test_empty_string() {
let result = Endpoint::from(""); let result = EndpointId::from("");
assert_eq!( assert_eq!(
result, result,
vec![DEFAULT_NAMESPACE, DEFAULT_COMPONENT, DEFAULT_ENDPOINT] vec![DEFAULT_NAMESPACE, DEFAULT_COMPONENT, DEFAULT_ENDPOINT]
); );
// White space is equivalent to an empty string // White space is equivalent to an empty string
let result = Endpoint::from(" "); let result = EndpointId::from(" ");
assert_eq!( assert_eq!(
result, result,
vec![DEFAULT_NAMESPACE, DEFAULT_COMPONENT, DEFAULT_ENDPOINT] vec![DEFAULT_NAMESPACE, DEFAULT_COMPONENT, DEFAULT_ENDPOINT]
); );
} }
#[test]
fn test_parse_with_scheme_and_url_roundtrip() {
let input = "dyn://ns/cp/ep";
let endpoint: EndpointId = input.parse().unwrap();
assert_eq!(endpoint, vec!["ns", "cp", "ep"]);
assert_eq!(endpoint.as_url(), "dyn://ns.cp.ep");
}
} }
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*; use super::*;
use crate::{error, Result}; use crate::{error, Result};
...@@ -199,17 +187,13 @@ mod tests { ...@@ -199,17 +187,13 @@ mod tests {
let annotated = Annotated::from_data("Test data".to_string()); let annotated = Annotated::from_data("Test data".to_string());
assert!(annotated.err().is_none()); assert!(annotated.err().is_none());
assert!(annotated.is_ok()); assert!(annotated.is_ok());
assert!(!annotated.is_err());
let annotated = Annotated::<String>::from_error("Test error 2".to_string()); let annotated = Annotated::<String>::from_error("Test error 2".to_string());
assert_eq!(format!("{}", annotated.err().unwrap()), "Test error 2"); assert_eq!(format!("{}", annotated.err().unwrap()), "Test error 2");
assert!(!annotated.is_ok());
assert!(annotated.is_err()); assert!(annotated.is_err());
let annotated = let annotated =
Annotated::<String>::from_err(anyhow::Error::msg("Test error 3".to_string()).into()); Annotated::<String>::from_err(anyhow::Error::msg("Test error 3".to_string()).into());
assert_eq!(format!("{}", annotated.err().unwrap()), "Test error 3");
assert!(!annotated.is_ok());
assert!(annotated.is_err()); assert!(annotated.is_err());
} }
} }
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::HashMap, pin::Pin, time::Duration}; use std::{collections::HashMap, pin::Pin, time::Duration};
use crate::{protocols::Endpoint, slug::Slug, transports::nats::Client}; use crate::{protocols::EndpointId, slug::Slug, transports::nats::Client};
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
...@@ -24,7 +12,7 @@ use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome}; ...@@ -24,7 +12,7 @@ use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome};
#[derive(Clone)] #[derive(Clone)]
pub struct NATSStorage { pub struct NATSStorage {
client: Client, client: Client,
endpoint: Endpoint, endpoint: EndpointId,
} }
pub struct NATSBucket { pub struct NATSBucket {
...@@ -58,7 +46,7 @@ impl KeyValueStore for NATSStorage { ...@@ -58,7 +46,7 @@ impl KeyValueStore for NATSStorage {
} }
impl NATSStorage { impl NATSStorage {
pub fn new(client: Client, endpoint: Endpoint) -> Self { pub fn new(client: Client, endpoint: EndpointId) -> Self {
NATSStorage { client, endpoint } NATSStorage { client, endpoint }
} }
...@@ -91,8 +79,10 @@ impl NATSStorage { ...@@ -91,8 +79,10 @@ impl NATSStorage {
}, },
) )
.await; .await;
let nats_store = create_result
.map_err(|err| StorageError::KeyValueError(err.to_string(), bucket_name.clone()))?;
tracing::debug!("Created bucket {bucket_name}"); tracing::debug!("Created bucket {bucket_name}");
create_result.map_err(|err| StorageError::KeyValueError(err.to_string(), bucket_name)) Ok(nats_store)
} }
async fn get_key_value( async fn get_key_value(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment