Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
51c1b9f1
Unverified
Commit
51c1b9f1
authored
Nov 25, 2025
by
Graham King
Committed by
GitHub
Nov 25, 2025
Browse files
chore(runtime): Tidy up component paths (#4560)
Signed-off-by:
Graham King
<
grahamk@nvidia.com
>
parent
4babb33c
Changes
11
Show whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
88 additions
and
849 deletions
+88
-849
lib/llm/src/discovery/model_manager.rs
lib/llm/src/discovery/model_manager.rs
+16
-9
lib/llm/src/kv_router/subscriber.rs
lib/llm/src/kv_router/subscriber.rs
+4
-3
lib/runtime/src/component.rs
lib/runtime/src/component.rs
+1
-100
lib/runtime/src/component/client.rs
lib/runtime/src/component/client.rs
+5
-16
lib/runtime/src/component/endpoint.rs
lib/runtime/src/component/endpoint.rs
+36
-34
lib/runtime/src/pipeline/network/egress/push_router.rs
lib/runtime/src/pipeline/network/egress/push_router.rs
+6
-6
lib/runtime/src/protocols.rs
lib/runtime/src/protocols.rs
+10
-2
lib/runtime/src/transports/etcd.rs
lib/runtime/src/transports/etcd.rs
+0
-2
lib/runtime/src/transports/etcd/path.rs
lib/runtime/src/transports/etcd/path.rs
+0
-542
lib/runtime/src/transports/nats.rs
lib/runtime/src/transports/nats.rs
+10
-0
lib/runtime/tests/namespace_etcd_path.rs
lib/runtime/tests/namespace_etcd_path.rs
+0
-135
No files found.
lib/llm/src/discovery/model_manager.rs
View file @
51c1b9f1
...
@@ -9,8 +9,8 @@ use std::{
...
@@ -9,8 +9,8 @@ use std::{
use
parking_lot
::{
Mutex
,
RwLock
};
use
parking_lot
::{
Mutex
,
RwLock
};
use
tokio
::
sync
::
oneshot
;
use
tokio
::
sync
::
oneshot
;
use
dynamo_runtime
::
prelude
::
DistributedRuntimeProvider
;
use
dynamo_runtime
::{
component
::
Endpoint
,
storage
::
key_value_store
::
Key
};
use
dynamo_runtime
::{
component
::
Endpoint
,
storage
::
key_value_store
::
Key
};
use
dynamo_runtime
::{
prelude
::
DistributedRuntimeProvider
,
protocols
::
EndpointId
};
use
crate
::{
use
crate
::{
discovery
::
KV_ROUTERS_ROOT_PATH
,
discovery
::
KV_ROUTERS_ROOT_PATH
,
...
@@ -56,7 +56,7 @@ pub struct ModelManager {
...
@@ -56,7 +56,7 @@ pub struct ModelManager {
// These are Mutex because we read and write rarely and equally
// These are Mutex because we read and write rarely and equally
cards
:
Mutex
<
HashMap
<
String
,
ModelDeploymentCard
>>
,
cards
:
Mutex
<
HashMap
<
String
,
ModelDeploymentCard
>>
,
kv_choosers
:
Mutex
<
HashMap
<
String
,
Arc
<
KvRouter
>>>
,
// Key: component service_name
kv_choosers
:
Mutex
<
HashMap
<
EndpointId
,
Arc
<
KvRouter
>>>
,
prefill_router_activators
:
Mutex
<
HashMap
<
String
,
PrefillActivationState
>>
,
prefill_router_activators
:
Mutex
<
HashMap
<
String
,
PrefillActivationState
>>
,
}
}
...
@@ -293,13 +293,13 @@ impl ModelManager {
...
@@ -293,13 +293,13 @@ impl ModelManager {
kv_cache_block_size
:
u32
,
kv_cache_block_size
:
u32
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
)
->
anyhow
::
Result
<
Arc
<
KvRouter
>>
{
)
->
anyhow
::
Result
<
Arc
<
KvRouter
>>
{
let
endpoint_
path
=
endpoint
.
path
();
let
endpoint_
id
=
endpoint
.
id
();
if
let
Some
(
kv_chooser
)
=
self
.get_kv_chooser
(
&
endpoint_
path
)
{
if
let
Some
(
kv_chooser
)
=
self
.get_kv_chooser
(
&
endpoint_
id
)
{
// Check if the existing router has a different block size
// Check if the existing router has a different block size
if
kv_chooser
.block_size
()
!=
kv_cache_block_size
{
if
kv_chooser
.block_size
()
!=
kv_cache_block_size
{
tracing
::
warn!
(
tracing
::
warn!
(
endpoint
=
%
endpoint_
path
,
endpoint
=
%
endpoint_
id
,
existing_block_size
=
%
kv_chooser
.block_size
(),
existing_block_size
=
%
kv_chooser
.block_size
(),
requested_block_size
=
%
kv_cache_block_size
,
requested_block_size
=
%
kv_cache_block_size
,
"KV Router block size mismatch! Endpoint is requesting a different kv_cache_block_size than the existing router.
\
"KV Router block size mismatch! Endpoint is requesting a different kv_cache_block_size than the existing router.
\
...
@@ -315,7 +315,14 @@ impl ModelManager {
...
@@ -315,7 +315,14 @@ impl ModelManager {
.get_or_create_bucket
(
KV_ROUTERS_ROOT_PATH
,
None
)
.get_or_create_bucket
(
KV_ROUTERS_ROOT_PATH
,
None
)
.await
?
;
.await
?
;
let
router_uuid
=
uuid
::
Uuid
::
new_v4
();
let
router_uuid
=
uuid
::
Uuid
::
new_v4
();
let
router_key
=
Key
::
new
(
format!
(
"{}/{router_uuid}"
,
endpoint
.path
()));
// In lib/llm/src/kv_router/subscriber.rs we filter on component.service_name() so this
// must have that prefix.
let
router_key
=
Key
::
new
(
format!
(
"{}/{}/{}"
,
endpoint
.component
()
.service_name
(),
endpoint
.name
(),
router_uuid
,
));
let
json_router_config
=
serde_json
::
to_vec_pretty
(
&
kv_router_config
.unwrap_or_default
())
?
;
let
json_router_config
=
serde_json
::
to_vec_pretty
(
&
kv_router_config
.unwrap_or_default
())
?
;
router_bucket
router_bucket
.insert
(
&
router_key
,
json_router_config
.into
(),
0
)
.insert
(
&
router_key
,
json_router_config
.into
(),
0
)
...
@@ -334,12 +341,12 @@ impl ModelManager {
...
@@ -334,12 +341,12 @@ impl ModelManager {
let
new_kv_chooser
=
Arc
::
new
(
chooser
);
let
new_kv_chooser
=
Arc
::
new
(
chooser
);
self
.kv_choosers
self
.kv_choosers
.lock
()
.lock
()
.insert
(
endpoint_
path
,
new_kv_chooser
.clone
());
.insert
(
endpoint_
id
,
new_kv_chooser
.clone
());
Ok
(
new_kv_chooser
)
Ok
(
new_kv_chooser
)
}
}
fn
get_kv_chooser
(
&
self
,
service_name
:
&
str
)
->
Option
<
Arc
<
KvRouter
>>
{
fn
get_kv_chooser
(
&
self
,
id
:
&
EndpointId
)
->
Option
<
Arc
<
KvRouter
>>
{
self
.kv_choosers
.lock
()
.get
(
service_name
)
.cloned
()
self
.kv_choosers
.lock
()
.get
(
id
)
.cloned
()
}
}
/// Register a prefill router for a decode model. Returns a receiver that will be
/// Register a prefill router for a decode model. Returns a receiver that will be
...
...
lib/llm/src/kv_router/subscriber.rs
View file @
51c1b9f1
...
@@ -420,10 +420,11 @@ pub async fn start_kv_router_background(
...
@@ -420,10 +420,11 @@ pub async fn start_kv_router_background(
tracing
::
info!
(
"Detected router replica deletion: {key}"
);
tracing
::
info!
(
"Detected router replica deletion: {key}"
);
// Only process deletions for routers on the same component
// Only process deletions for routers on the same component
if
!
key
.contains
(
component
.path
()
.as_str
())
{
// Must match model_manager.rs kv_chooser_for
if
!
key
.contains
(
&
component
.service_name
())
{
tracing
::
trace!
(
tracing
::
trace!
(
"Skipping router deletion from different component (key: {key}, subscriber component: {})"
,
"Skipping router deletion from different component (key: {key}, subscriber component: {})"
,
component
.
path
()
component
.
service_name
()
);
);
continue
;
continue
;
}
}
...
@@ -480,7 +481,7 @@ async fn cleanup_orphaned_consumers(
...
@@ -480,7 +481,7 @@ async fn cleanup_orphaned_consumers(
};
};
// Filter to only routers for this component
// Filter to only routers for this component
let
component_path
=
component
.
path
();
let
component_path
=
component
.
service_name
();
let
active_uuids
:
HashSet
<
String
>
=
entries
let
active_uuids
:
HashSet
<
String
>
=
entries
.iter
()
.iter
()
.filter_map
(|(
key
,
_
)|
{
.filter_map
(|(
key
,
_
)|
{
...
...
lib/runtime/src/component.rs
View file @
51c1b9f1
...
@@ -37,16 +37,9 @@ use crate::{
...
@@ -37,16 +37,9 @@ use crate::{
metrics
::{
MetricsHierarchy
,
MetricsRegistry
,
prometheus_names
},
metrics
::{
MetricsHierarchy
,
MetricsRegistry
,
prometheus_names
},
service
::
ServiceClient
,
service
::
ServiceClient
,
service
::
ServiceSet
,
service
::
ServiceSet
,
transports
::
etcd
::{
ETCD_ROOT_PATH
,
EtcdPath
},
};
};
use
super
::{
use
super
::{
DistributedRuntime
,
Runtime
,
traits
::
*
,
transports
::
nats
::
Slug
,
utils
::
Duration
};
DistributedRuntime
,
Runtime
,
traits
::
*
,
transports
::
etcd
::{
COMPONENT_KEYWORD
,
ENDPOINT_KEYWORD
},
transports
::
nats
::
Slug
,
utils
::
Duration
,
};
use
crate
::
pipeline
::
network
::{
PushWorkHandler
,
ingress
::
push_endpoint
::
PushEndpoint
};
use
crate
::
pipeline
::
network
::{
PushWorkHandler
,
ingress
::
push_endpoint
::
PushEndpoint
};
use
crate
::
protocols
::
EndpointId
;
use
crate
::
protocols
::
EndpointId
;
...
@@ -73,10 +66,6 @@ pub mod service;
...
@@ -73,10 +66,6 @@ pub mod service;
pub
use
client
::
Client
;
pub
use
client
::
Client
;
/// The root key-value path where each instance registers itself in.
/// An instance is namespace+component+endpoint+lease_id and must be unique.
pub
const
INSTANCE_ROOT_PATH
:
&
str
=
"v1/instances"
;
#[derive(Debug,
Clone,
Serialize,
Deserialize,
Eq,
PartialEq,
Hash)]
#[derive(Debug,
Clone,
Serialize,
Deserialize,
Eq,
PartialEq,
Hash)]
#[serde(rename_all
=
"snake_case"
)]
#[serde(rename_all
=
"snake_case"
)]
pub
enum
TransportType
{
pub
enum
TransportType
{
...
@@ -232,27 +221,11 @@ impl MetricsHierarchy for Component {
...
@@ -232,27 +221,11 @@ impl MetricsHierarchy for Component {
}
}
impl
Component
{
impl
Component
{
/// The component part of an instance path in key-value store.
pub
fn
instance_root
(
&
self
)
->
String
{
let
ns
=
self
.namespace
.name
();
let
cp
=
&
self
.name
;
format!
(
"{INSTANCE_ROOT_PATH}/{ns}/{cp}"
)
}
pub
fn
service_name
(
&
self
)
->
String
{
pub
fn
service_name
(
&
self
)
->
String
{
let
service_name
=
format!
(
"{}_{}"
,
self
.namespace
.name
(),
self
.name
);
let
service_name
=
format!
(
"{}_{}"
,
self
.namespace
.name
(),
self
.name
);
Slug
::
slugify
(
&
service_name
)
.to_string
()
Slug
::
slugify
(
&
service_name
)
.to_string
()
}
}
pub
fn
path
(
&
self
)
->
String
{
format!
(
"{}/{}"
,
self
.namespace
.name
(),
self
.name
)
}
pub
fn
etcd_path
(
&
self
)
->
EtcdPath
{
EtcdPath
::
new_component
(
&
self
.namespace
.name
(),
&
self
.name
)
.expect
(
"Component name and namespace should be valid"
)
}
pub
fn
namespace
(
&
self
)
->
&
Namespace
{
pub
fn
namespace
(
&
self
)
->
&
Namespace
{
&
self
.namespace
&
self
.namespace
}
}
...
@@ -525,74 +498,6 @@ impl Endpoint {
...
@@ -525,74 +498,6 @@ impl Endpoint {
&
self
.component
&
self
.component
}
}
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
pub
fn
path
(
&
self
)
->
String
{
format!
(
"{}/{}/{}"
,
self
.component
.path
(),
ENDPOINT_KEYWORD
,
self
.name
)
}
/// The endpoint part of an instance path in etcd
pub
fn
etcd_root
(
&
self
)
->
String
{
let
component_path
=
self
.component
.instance_root
();
let
endpoint_name
=
&
self
.name
;
format!
(
"{component_path}/{endpoint_name}"
)
}
/// The endpoint as an EtcdPath object
pub
fn
etcd_path
(
&
self
)
->
EtcdPath
{
EtcdPath
::
new_endpoint
(
&
self
.component
.namespace
()
.name
(),
self
.component
.name
(),
&
self
.name
,
)
.expect
(
"Endpoint name and component name should be valid"
)
}
/// The fully path of an instance in etcd
pub
fn
etcd_path_with_lease_id
(
&
self
,
lease_id
:
u64
)
->
String
{
format!
(
"{INSTANCE_ROOT_PATH}/{}"
,
self
.unique_path
(
lease_id
))
}
/// Full path of this endpoint with forward slash separators, including lease id
pub
fn
unique_path
(
&
self
,
lease_id
:
u64
)
->
String
{
let
ns
=
self
.component
.namespace
()
.name
();
let
cp
=
self
.component
.name
();
let
ep
=
self
.name
();
format!
(
"{ns}/{cp}/{ep}/{lease_id:x}"
)
}
/// The endpoint as an EtcdPath object with instance ID
pub
fn
etcd_path_object_with_lease_id
(
&
self
,
instance_id
:
i64
)
->
EtcdPath
{
EtcdPath
::
new_endpoint_with_lease
(
&
self
.component
.namespace
()
.name
(),
self
.component
.name
(),
&
self
.name
,
instance_id
,
)
.expect
(
"Endpoint name and component name should be valid"
)
}
pub
fn
name_with_id
(
&
self
,
instance_id
:
u64
)
->
String
{
format!
(
"{}-{:x}"
,
self
.name
,
instance_id
)
}
pub
fn
subject
(
&
self
)
->
String
{
format!
(
"{}.{}"
,
self
.component
.service_name
(),
self
.name
)
}
/// Subject to an instance of the [Endpoint] with a specific lease id
pub
fn
subject_to
(
&
self
,
lease_id
:
u64
)
->
String
{
format!
(
"{}.{}"
,
self
.component
.service_name
(),
self
.name_with_id
(
lease_id
)
)
}
pub
async
fn
client
(
&
self
)
->
anyhow
::
Result
<
client
::
Client
>
{
pub
async
fn
client
(
&
self
)
->
anyhow
::
Result
<
client
::
Client
>
{
client
::
Client
::
new
(
self
.clone
())
.await
client
::
Client
::
new
(
self
.clone
())
.await
}
}
...
@@ -676,10 +581,6 @@ impl Namespace {
...
@@ -676,10 +581,6 @@ impl Namespace {
.build
()
?
)
.build
()
?
)
}
}
pub
fn
etcd_path
(
&
self
)
->
String
{
format!
(
"{ETCD_ROOT_PATH}{}"
,
self
.name
())
}
pub
fn
name
(
&
self
)
->
String
{
pub
fn
name
(
&
self
)
->
String
{
match
&
self
.parent
{
match
&
self
.parent
{
Some
(
parent
)
=>
format!
(
"{}.{}"
,
parent
.name
(),
self
.name
),
Some
(
parent
)
=>
format!
(
"{}.{}"
,
parent
.name
(),
self
.name
),
...
...
lib/runtime/src/component/client.rs
View file @
51c1b9f1
...
@@ -43,7 +43,7 @@ impl Client {
...
@@ -43,7 +43,7 @@ impl Client {
pub
(
crate
)
async
fn
new
(
endpoint
:
Endpoint
)
->
Result
<
Self
>
{
pub
(
crate
)
async
fn
new
(
endpoint
:
Endpoint
)
->
Result
<
Self
>
{
tracing
::
trace!
(
tracing
::
trace!
(
"Client::new_dynamic: Creating dynamic client for endpoint: {}"
,
"Client::new_dynamic: Creating dynamic client for endpoint: {}"
,
endpoint
.
path
()
endpoint
.
id
()
);
);
let
instance_source
=
Self
::
get_or_create_dynamic_instance_source
(
&
endpoint
)
.await
?
;
let
instance_source
=
Self
::
get_or_create_dynamic_instance_source
(
&
endpoint
)
.await
?
;
...
@@ -60,15 +60,6 @@ impl Client {
...
@@ -60,15 +60,6 @@ impl Client {
Ok
(
client
)
Ok
(
client
)
}
}
pub
fn
path
(
&
self
)
->
String
{
self
.endpoint
.path
()
}
/// The root etcd path we watch in etcd to discover new instances to route to.
pub
fn
etcd_root
(
&
self
)
->
String
{
self
.endpoint
.etcd_root
()
}
/// Instances available from watching key-value store
/// Instances available from watching key-value store
pub
fn
instances
(
&
self
)
->
Vec
<
Instance
>
{
pub
fn
instances
(
&
self
)
->
Vec
<
Instance
>
{
self
.instance_source
.borrow
()
.clone
()
self
.instance_source
.borrow
()
.clone
()
...
@@ -95,7 +86,7 @@ impl Client {
...
@@ -95,7 +86,7 @@ impl Client {
pub
async
fn
wait_for_instances
(
&
self
)
->
Result
<
Vec
<
Instance
>>
{
pub
async
fn
wait_for_instances
(
&
self
)
->
Result
<
Vec
<
Instance
>>
{
tracing
::
trace!
(
tracing
::
trace!
(
"wait_for_instances: Starting wait for endpoint: {}"
,
"wait_for_instances: Starting wait for endpoint: {}"
,
self
.endpoint
.
path
()
self
.endpoint
.
id
()
);
);
let
mut
rx
=
self
.instance_source
.as_ref
()
.clone
();
let
mut
rx
=
self
.instance_source
.as_ref
()
.clone
();
// wait for there to be 1 or more endpoints
// wait for there to be 1 or more endpoints
...
@@ -108,7 +99,7 @@ impl Client {
...
@@ -108,7 +99,7 @@ impl Client {
tracing
::
info!
(
tracing
::
info!
(
"wait_for_instances: Found {} instance(s) for endpoint: {}"
,
"wait_for_instances: Found {} instance(s) for endpoint: {}"
,
instances
.len
(),
instances
.len
(),
self
.endpoint
.
path
()
self
.endpoint
.
id
()
);
);
break
;
break
;
}
}
...
@@ -145,7 +136,7 @@ impl Client {
...
@@ -145,7 +136,7 @@ impl Client {
fn
monitor_instance_source
(
&
self
)
{
fn
monitor_instance_source
(
&
self
)
{
let
cancel_token
=
self
.endpoint
.drt
()
.primary_token
();
let
cancel_token
=
self
.endpoint
.drt
()
.primary_token
();
let
client
=
self
.clone
();
let
client
=
self
.clone
();
let
endpoint_
path
=
self
.endpoint
.
path
();
let
endpoint_
id
=
self
.endpoint
.
id
();
tokio
::
task
::
spawn
(
async
move
{
tokio
::
task
::
spawn
(
async
move
{
let
mut
rx
=
client
.instance_source
.as_ref
()
.clone
();
let
mut
rx
=
client
.instance_source
.as_ref
()
.clone
();
while
!
cancel_token
.is_cancelled
()
{
while
!
cancel_token
.is_cancelled
()
{
...
@@ -164,9 +155,7 @@ impl Client {
...
@@ -164,9 +155,7 @@ impl Client {
if
let
Err
(
err
)
=
rx
.changed
()
.await
{
if
let
Err
(
err
)
=
rx
.changed
()
.await
{
tracing
::
error!
(
tracing
::
error!
(
"monitor_instance_source: The Sender is dropped: {}, endpoint={}"
,
"monitor_instance_source: The Sender is dropped: {err}, endpoint={endpoint_id}"
,
err
,
endpoint_path
);
);
cancel_token
.cancel
();
cancel_token
.cancel
();
}
}
...
...
lib/runtime/src/component/endpoint.rs
View file @
51c1b9f1
...
@@ -14,8 +14,10 @@ use crate::{
...
@@ -14,8 +14,10 @@ use crate::{
component
::{
Endpoint
,
Instance
,
TransportType
,
service
::
EndpointStatsHandler
},
component
::{
Endpoint
,
Instance
,
TransportType
,
service
::
EndpointStatsHandler
},
distributed
::
RequestPlaneMode
,
distributed
::
RequestPlaneMode
,
pipeline
::
network
::{
PushWorkHandler
,
ingress
::
push_endpoint
::
PushEndpoint
},
pipeline
::
network
::{
PushWorkHandler
,
ingress
::
push_endpoint
::
PushEndpoint
},
protocols
::
EndpointId
,
storage
::
key_value_store
,
storage
::
key_value_store
,
traits
::
DistributedRuntimeProvider
,
traits
::
DistributedRuntimeProvider
,
transports
::
nats
,
};
};
#[derive(Educe,
Builder,
Dissolve)]
#[derive(Educe,
Builder,
Dissolve)]
...
@@ -72,11 +74,9 @@ impl EndpointConfigBuilder {
...
@@ -72,11 +74,9 @@ impl EndpointConfigBuilder {
health_check_payload
,
health_check_payload
,
)
=
self
.build_internal
()
?
.dissolve
();
)
=
self
.build_internal
()
?
.dissolve
();
let
connection_id
=
endpoint
.drt
()
.connection_id
();
let
connection_id
=
endpoint
.drt
()
.connection_id
();
let
endpoint_id
=
endpoint
.id
();
tracing
::
debug!
(
tracing
::
debug!
(
"Starting endpoint: {endpoint_id}"
);
"Starting endpoint: {}"
,
endpoint
.etcd_path_with_lease_id
(
connection_id
)
);
let
service_name
=
endpoint
.component
.service_name
();
let
service_name
=
endpoint
.component
.service_name
();
...
@@ -94,21 +94,22 @@ impl EndpointConfigBuilder {
...
@@ -94,21 +94,22 @@ impl EndpointConfigBuilder {
.get
(
&
service_name
)
.get
(
&
service_name
)
.cloned
()
.cloned
()
.expect
(
"no stats handler registry; this is unexpected"
);
.expect
(
"no stats handler registry; this is unexpected"
);
handler_map
// There is something wrong with the stats handler map I think.
.lock
()
// Here the connection_id is included, but in component/service.rs add_stats_service it uses service_name,
.insert
(
endpoint
.subject_to
(
connection_id
),
stats_handler
);
// no connection id so it's per-endpoint not per-instance. Doesn't match.
// To not block current refactor I am keeping previous behavior, but I think needs
// investigation.
handler_map
.lock
()
.insert
(
nats
::
instance_subject
(
&
endpoint_id
,
connection_id
),
stats_handler
,
);
}
}
// This creates a child token of the runtime's endpoint_shutdown_token. That token is
// This creates a child token of the runtime's endpoint_shutdown_token. That token is
// cancelled first as part of graceful shutdown. See Runtime::shutdown.
// cancelled first as part of graceful shutdown. See Runtime::shutdown.
let
endpoint_shutdown_token
=
endpoint
.drt
()
.child_token
();
let
endpoint_shutdown_token
=
endpoint
.drt
()
.child_token
();
// Extract all values needed from endpoint before any spawns
let
namespace_name
=
endpoint
.component.namespace.name
.clone
();
let
component_name
=
endpoint
.component.name
.clone
();
let
endpoint_name
=
endpoint
.name
.clone
();
let
system_health
=
endpoint
.drt
()
.system_health
();
let
system_health
=
endpoint
.drt
()
.system_health
();
let
subject
=
endpoint
.subject_to
(
connection_id
);
let
request_plane_mode
=
endpoint
.drt
()
.request_plane
();
let
request_plane_mode
=
endpoint
.drt
()
.request_plane
();
tracing
::
info!
(
"Endpoint starting with request plane mode: {request_plane_mode}"
,);
tracing
::
info!
(
"Endpoint starting with request plane mode: {request_plane_mode}"
,);
...
@@ -116,23 +117,23 @@ impl EndpointConfigBuilder {
...
@@ -116,23 +117,23 @@ impl EndpointConfigBuilder {
// Register health check target in SystemHealth if provided
// Register health check target in SystemHealth if provided
if
let
Some
(
health_check_payload
)
=
&
health_check_payload
{
if
let
Some
(
health_check_payload
)
=
&
health_check_payload
{
// Build transport based on request plane mode
// Build transport based on request plane mode
let
transport
=
build_transport_type
(
request_plane_mode
,
&
endpoint_
name
,
&
subject
);
let
transport
=
build_transport_type
(
request_plane_mode
,
&
endpoint_
id
,
connection_id
);
let
instance
=
Instance
{
let
instance
=
Instance
{
component
:
component
_name
.clone
(),
component
:
endpoint_id
.
component
.clone
(),
endpoint
:
endpoint_name
.clone
(),
endpoint
:
endpoint_
id
.
name
.clone
(),
namespace
:
namespace
_name
.clone
(),
namespace
:
endpoint_id
.
namespace
.clone
(),
instance_id
:
connection_id
,
instance_id
:
connection_id
,
transport
,
transport
,
};
};
tracing
::
debug!
(
endpoint_name
=
%
endpoint
_
name
,
"Registering endpoint health check target"
);
tracing
::
debug!
(
endpoint_name
=
%
endpoint
.
name
,
"Registering endpoint health check target"
);
let
guard
=
system_health
.lock
();
let
guard
=
system_health
.lock
();
guard
.register_health_check_target
(
guard
.register_health_check_target
(
&
endpoint
_
name
,
&
endpoint
.
name
,
instance
,
instance
,
health_check_payload
.clone
(),
health_check_payload
.clone
(),
);
);
if
let
Some
(
notifier
)
=
guard
.get_endpoint_health_check_notifier
(
&
endpoint
_
name
)
{
if
let
Some
(
notifier
)
=
guard
.get_endpoint_health_check_notifier
(
&
endpoint
.
name
)
{
handler
.set_endpoint_health_check_notifier
(
notifier
)
?
;
handler
.set_endpoint_health_check_notifier
(
notifier
)
?
;
}
}
}
}
...
@@ -157,9 +158,9 @@ impl EndpointConfigBuilder {
...
@@ -157,9 +158,9 @@ impl EndpointConfigBuilder {
};
};
// Create clones for the async closure
// Create clones for the async closure
let
namespace_name_for_task
=
namespace
_name
.clone
();
let
namespace_name_for_task
=
endpoint_id
.
namespace
.clone
();
let
component_name_for_task
=
component
_name
.clone
();
let
component_name_for_task
=
endpoint_id
.
component
.clone
();
let
endpoint_name_for_task
=
endpoint_name
.clone
();
let
endpoint_name_for_task
=
endpoint_
id
.
name
.clone
();
// Get the unified request plane server (works for all transport types)
// Get the unified request plane server (works for all transport types)
let
server
=
endpoint
.drt
()
.request_plane_server
()
.await
?
;
let
server
=
endpoint
.drt
()
.request_plane_server
()
.await
?
;
...
@@ -222,19 +223,18 @@ impl EndpointConfigBuilder {
...
@@ -222,19 +223,18 @@ impl EndpointConfigBuilder {
let
discovery
=
endpoint
.drt
()
.discovery
();
let
discovery
=
endpoint
.drt
()
.discovery
();
// Build transport for discovery service based on request plane mode
// Build transport for discovery service based on request plane mode
let
transport
=
build_transport_type
(
request_plane_mode
,
&
endpoint_
name
,
&
subject
);
let
transport
=
build_transport_type
(
request_plane_mode
,
&
endpoint_
id
,
connection_id
);
let
discovery_spec
=
crate
::
discovery
::
DiscoverySpec
::
Endpoint
{
let
discovery_spec
=
crate
::
discovery
::
DiscoverySpec
::
Endpoint
{
namespace
:
namespace
_name
.clone
(),
namespace
:
endpoint_id
.
namespace
.clone
(),
component
:
component
_name
.clone
(),
component
:
endpoint_id
.
component
.clone
(),
endpoint
:
endpoint_name
.clone
(),
endpoint
:
endpoint_
id
.
name
.clone
(),
transport
,
transport
,
};
};
if
let
Err
(
e
)
=
discovery
.register
(
discovery_spec
)
.await
{
if
let
Err
(
e
)
=
discovery
.register
(
discovery_spec
)
.await
{
tracing
::
error!
(
tracing
::
error!
(
component_name
,
%
endpoint_id
,
endpoint_name
,
error
=
%
e
,
error
=
%
e
,
"Unable to register service for discovery"
"Unable to register service for discovery"
);
);
...
@@ -259,8 +259,8 @@ impl EndpointConfigBuilder {
...
@@ -259,8 +259,8 @@ impl EndpointConfigBuilder {
/// - NATS: Uses subject-based addressing (unique per endpoint)
/// - NATS: Uses subject-based addressing (unique per endpoint)
fn
build_transport_type
(
fn
build_transport_type
(
mode
:
RequestPlaneMode
,
mode
:
RequestPlaneMode
,
endpoint_
name
:
&
str
,
endpoint_
id
:
&
EndpointId
,
subject
:
&
str
,
connection_id
:
u64
,
)
->
TransportType
{
)
->
TransportType
{
match
mode
{
match
mode
{
RequestPlaneMode
::
Http
=>
{
RequestPlaneMode
::
Http
=>
{
...
@@ -273,8 +273,8 @@ fn build_transport_type(
...
@@ -273,8 +273,8 @@ fn build_transport_type(
std
::
env
::
var
(
"DYN_HTTP_RPC_ROOT_PATH"
)
.unwrap_or_else
(|
_
|
"/v1/rpc"
.to_string
());
std
::
env
::
var
(
"DYN_HTTP_RPC_ROOT_PATH"
)
.unwrap_or_else
(|
_
|
"/v1/rpc"
.to_string
());
let
http_endpoint
=
format!
(
let
http_endpoint
=
format!
(
"http://{
}:{}{
}/{}"
,
"http://{
http_host}:{http_port}{rpc_root
}/{}"
,
http_host
,
http_port
,
rpc_root
,
endpoint_name
endpoint_
id
.
name
);
);
TransportType
::
Http
(
http_endpoint
)
TransportType
::
Http
(
http_endpoint
)
...
@@ -288,10 +288,12 @@ fn build_transport_type(
...
@@ -288,10 +288,12 @@ fn build_transport_type(
// Include endpoint name for proper TCP routing
// Include endpoint name for proper TCP routing
// TCP client parses this format and adds x-endpoint-path header for server-side routing
// TCP client parses this format and adds x-endpoint-path header for server-side routing
let
tcp_endpoint
=
format!
(
"{}:{}/{}"
,
tcp_host
,
tcp_port
,
endpoint_name
);
let
tcp_endpoint
=
format!
(
"{}:{}/{}"
,
tcp_host
,
tcp_port
,
endpoint_
id
.
name
);
TransportType
::
Tcp
(
tcp_endpoint
)
TransportType
::
Tcp
(
tcp_endpoint
)
}
}
RequestPlaneMode
::
Nats
=>
TransportType
::
Nats
(
subject
.to_string
()),
RequestPlaneMode
::
Nats
=>
{
TransportType
::
Nats
(
nats
::
instance_subject
(
endpoint_id
,
connection_id
))
}
}
}
}
}
lib/runtime/src/pipeline/network/egress/push_router.rs
View file @
51c1b9f1
...
@@ -147,8 +147,8 @@ where
...
@@ -147,8 +147,8 @@ where
let
count
=
instance_ids
.len
();
let
count
=
instance_ids
.len
();
if
count
==
0
{
if
count
==
0
{
return
Err
(
anyhow
::
anyhow!
(
return
Err
(
anyhow
::
anyhow!
(
"no instances found for endpoint {
:?
}"
,
"no instances found for endpoint {}"
,
self
.client.endpoint
.
etcd_root
()
self
.client.endpoint
.
id
()
));
));
}
}
instance_ids
[
counter
%
count
]
instance_ids
[
counter
%
count
]
...
@@ -166,8 +166,8 @@ where
...
@@ -166,8 +166,8 @@ where
let
count
=
instance_ids
.len
();
let
count
=
instance_ids
.len
();
if
count
==
0
{
if
count
==
0
{
return
Err
(
anyhow
::
anyhow!
(
return
Err
(
anyhow
::
anyhow!
(
"no instances found for endpoint {
:?
}"
,
"no instances found for endpoint {}"
,
self
.client.endpoint
.
etcd_root
()
self
.client.endpoint
.
id
()
));
));
}
}
let
counter
=
rand
::
rng
()
.random
::
<
u64
>
()
as
usize
;
let
counter
=
rand
::
rng
()
.random
::
<
u64
>
()
as
usize
;
...
@@ -189,8 +189,8 @@ where
...
@@ -189,8 +189,8 @@ where
if
!
found
{
if
!
found
{
return
Err
(
anyhow
::
anyhow!
(
return
Err
(
anyhow
::
anyhow!
(
"instance_id={instance_id} not found for endpoint {
:?
}"
,
"instance_id={instance_id} not found for endpoint {}"
,
self
.client.endpoint
.
etcd_root
()
self
.client.endpoint
.
id
()
));
));
}
}
...
...
lib/runtime/src/protocols.rs
View file @
51c1b9f1
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// SPDX-License-Identifier: Apache-2.0
use
s
erde
::{
Deserialize
,
Serialize
}
;
use
s
td
::
fmt
;
use
std
::
str
::
FromStr
;
use
std
::
str
::
FromStr
;
use
serde
::{
Deserialize
,
Serialize
};
pub
mod
annotated
;
pub
mod
annotated
;
pub
mod
maybe_error
;
pub
mod
maybe_error
;
...
@@ -36,13 +38,19 @@ pub struct Component {
...
@@ -36,13 +38,19 @@ pub struct Component {
///
///
/// Example format: `"namespace/component/endpoint"`
/// Example format: `"namespace/component/endpoint"`
///
///
#[derive(Debug,
Clone,
Serialize,
Deserialize,
Eq,
PartialEq)]
#[derive(Debug,
Clone,
Serialize,
Deserialize,
Eq,
PartialEq
,
Hash
)]
pub
struct
EndpointId
{
pub
struct
EndpointId
{
pub
namespace
:
String
,
pub
namespace
:
String
,
pub
component
:
String
,
pub
component
:
String
,
pub
name
:
String
,
pub
name
:
String
,
}
}
impl
fmt
::
Display
for
EndpointId
{
fn
fmt
(
&
self
,
f
:
&
mut
fmt
::
Formatter
<
'_
>
)
->
fmt
::
Result
{
write!
(
f
,
"{}/{}/{}"
,
self
.namespace
,
self
.component
,
self
.name
)
}
}
impl
PartialEq
<
Vec
<&
str
>>
for
EndpointId
{
impl
PartialEq
<
Vec
<&
str
>>
for
EndpointId
{
fn
eq
(
&
self
,
other
:
&
Vec
<&
str
>
)
->
bool
{
fn
eq
(
&
self
,
other
:
&
Vec
<&
str
>
)
->
bool
{
if
other
.len
()
!=
3
{
if
other
.len
()
!=
3
{
...
...
lib/runtime/src/transports/etcd.rs
View file @
51c1b9f1
...
@@ -25,12 +25,10 @@ use tokio_util::sync::CancellationToken;
...
@@ -25,12 +25,10 @@ use tokio_util::sync::CancellationToken;
mod
connector
;
mod
connector
;
mod
lease
;
mod
lease
;
mod
lock
;
mod
lock
;
mod
path
;
use
connector
::
Connector
;
use
connector
::
Connector
;
use
lease
::
*
;
use
lease
::
*
;
pub
use
lock
::
*
;
pub
use
lock
::
*
;
pub
use
path
::
*
;
use
super
::
utils
::
build_in_runtime
;
use
super
::
utils
::
build_in_runtime
;
use
crate
::
config
::
environment_names
::
etcd
as
env_etcd
;
use
crate
::
config
::
environment_names
::
etcd
as
env_etcd
;
...
...
lib/runtime/src/transports/etcd/path.rs
deleted
100644 → 0
View file @
4babb33c
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! EtcdPath - Parsing and validation for hierarchical etcd paths
use
once_cell
::
sync
::
Lazy
;
use
std
::
str
::
FromStr
;
use
validator
::
ValidationError
;
/// The root etcd path prefix
pub
const
ETCD_ROOT_PATH
:
&
str
=
"v1/dynamo/"
;
/// Reserved keyword for component paths (with underscores to prevent user conflicts)
pub
const
COMPONENT_KEYWORD
:
&
str
=
"_component_"
;
/// Reserved keyword for endpoint paths (with underscores to prevent user conflicts)
pub
const
ENDPOINT_KEYWORD
:
&
str
=
"_endpoint_"
;
static
ALLOWED_CHARS_REGEX
:
Lazy
<
regex
::
Regex
>
=
Lazy
::
new
(||
regex
::
Regex
::
new
(
r"^[a-z0-9-_]+$"
)
.unwrap
());
// TODO(ryan): this was an initial implementation that inspired the DEP; we'll keep it asis for now
// and update this impl with respect to the DEP.
//
// Notes:
// - follow up on this comment: https://github.com/ai-dynamo/dynamo/pull/1459#discussion_r2140616397
// - we will be decoupling the "identifer" from the "extra path" bits as two separate objects
// - this issue above is a problem, but will be solved by the DEP
/// Represents a parsed etcd path with hierarchical namespaces, components, endpoints, and extra paths
#[derive(Debug,
Clone,
PartialEq,
Eq)]
pub
struct
EtcdPath
{
/// The hierarchical namespace (e.g., "ns1.ns2.ns3")
pub
namespace
:
String
,
/// Optional component name
pub
component
:
Option
<
String
>
,
/// Optional endpoint name (requires component to be present)
pub
endpoint
:
Option
<
String
>
,
/// Optional lease ID (only valid with endpoint, in hexadecimal format)
pub
lease_id
:
Option
<
i64
>
,
/// Optional additional path segments beyond the standard structure
pub
extra_path
:
Option
<
Vec
<
String
>>
,
}
/// Errors that can occur during etcd path parsing
#[derive(Debug,
thiserror::Error)]
pub
enum
EtcdPathError
{
#[error(
"Path must start with '{}'"
,
ETCD_ROOT_PATH)]
InvalidPrefix
,
#[error(
"Invalid namespace: {0}"
)]
InvalidNamespace
(
String
),
#[error(
"Invalid component name: {0}"
)]
InvalidComponent
(
String
),
#[error(
"Invalid endpoint name: {0}"
)]
InvalidEndpoint
(
String
),
#[error(
"Invalid extra path segment: {0}"
)]
InvalidExtraPath
(
String
),
#[error(
"Endpoint requires component to be present"
)]
EndpointWithoutComponent
,
#[error(
"Expected '{}' keyword after namespace"
,
COMPONENT_KEYWORD)]
ExpectedComponentKeyword
,
#[error(
"Expected '{}' keyword after component"
,
ENDPOINT_KEYWORD)]
ExpectedEndpointKeyword
,
#[error(
"Reserved keyword '{0}' cannot be used in extra path"
)]
ReservedKeyword
(
String
),
#[error(
"Empty namespace not allowed"
)]
EmptyNamespace
,
#[error(
"Empty component name not allowed"
)]
EmptyComponent
,
#[error(
"Empty endpoint name not allowed"
)]
EmptyEndpoint
,
}
impl
EtcdPath
{
/// Create a new EtcdPath with just a namespace
pub
fn
new_namespace
(
namespace
:
&
str
)
->
Result
<
Self
,
EtcdPathError
>
{
validate_namespace
(
namespace
)
?
;
Ok
(
Self
{
namespace
:
namespace
.to_string
(),
component
:
None
,
endpoint
:
None
,
lease_id
:
None
,
extra_path
:
None
,
})
}
/// Create a new EtcdPath with namespace and component
pub
fn
new_component
(
namespace
:
&
str
,
component
:
&
str
)
->
Result
<
Self
,
EtcdPathError
>
{
validate_namespace
(
namespace
)
?
;
validate_component
(
component
)
?
;
Ok
(
Self
{
namespace
:
namespace
.to_string
(),
component
:
Some
(
component
.to_string
()),
endpoint
:
None
,
lease_id
:
None
,
extra_path
:
None
,
})
}
/// Create a new EtcdPath with namespace, component, and endpoint
pub
fn
new_endpoint
(
namespace
:
&
str
,
component
:
&
str
,
endpoint
:
&
str
,
)
->
Result
<
Self
,
EtcdPathError
>
{
validate_namespace
(
namespace
)
?
;
validate_component
(
component
)
?
;
validate_endpoint
(
endpoint
)
?
;
Ok
(
Self
{
namespace
:
namespace
.to_string
(),
component
:
Some
(
component
.to_string
()),
endpoint
:
Some
(
endpoint
.to_string
()),
lease_id
:
None
,
extra_path
:
None
,
})
}
/// Create a new EtcdPath for an endpoint with lease ID
pub
fn
new_endpoint_with_lease
(
namespace
:
&
str
,
component
:
&
str
,
endpoint
:
&
str
,
lease_id
:
i64
,
)
->
Result
<
Self
,
EtcdPathError
>
{
validate_namespace
(
namespace
)
?
;
validate_component
(
component
)
?
;
validate_endpoint
(
endpoint
)
?
;
Ok
(
Self
{
namespace
:
namespace
.to_string
(),
component
:
Some
(
component
.to_string
()),
endpoint
:
Some
(
endpoint
.to_string
()),
lease_id
:
Some
(
lease_id
),
extra_path
:
None
,
})
}
/// Add extra path segments to this EtcdPath
pub
fn
with_extra_path
(
mut
self
,
extra_path
:
Vec
<
String
>
)
->
Result
<
Self
,
EtcdPathError
>
{
for
segment
in
&
extra_path
{
validate_extra_path_segment
(
segment
)
?
;
}
self
.extra_path
=
if
extra_path
.is_empty
()
{
None
}
else
{
Some
(
extra_path
)
};
self
.lease_id
=
None
;
Ok
(
self
)
}
/// Internal method to convert the EtcdPath back to a string representation
fn
_
to_string
(
&
self
)
->
String
{
let
mut
path
=
format!
(
"{}{}"
,
ETCD_ROOT_PATH
,
self
.namespace
);
if
let
Some
(
ref
component
)
=
self
.component
{
path
.push
(
'/'
);
path
.push_str
(
COMPONENT_KEYWORD
);
path
.push
(
'/'
);
path
.push_str
(
component
);
if
let
Some
(
ref
endpoint
)
=
self
.endpoint
{
path
.push
(
'/'
);
path
.push_str
(
ENDPOINT_KEYWORD
);
path
.push
(
'/'
);
path
.push_str
(
endpoint
);
// Add lease ID if present
if
let
Some
(
lease_id
)
=
self
.lease_id
{
path
.push
(
':'
);
path
.push_str
(
&
format!
(
"{:x}"
,
lease_id
));
}
}
}
if
let
Some
(
ref
extra_path
)
=
self
.extra_path
{
for
segment
in
extra_path
{
path
.push
(
'/'
);
path
.push_str
(
segment
);
}
}
path
}
/// Parse an etcd path string into its components
pub
fn
parse
(
input
:
&
str
)
->
Result
<
Self
,
EtcdPathError
>
{
// Check for required prefix
if
!
input
.starts_with
(
ETCD_ROOT_PATH
)
{
return
Err
(
EtcdPathError
::
InvalidPrefix
);
}
// Remove the prefix and split into segments
let
path_without_prefix
=
&
input
[
ETCD_ROOT_PATH
.len
()
..
];
let
segments
:
Vec
<&
str
>
=
path_without_prefix
.split
(
'/'
)
.collect
();
if
segments
.is_empty
()
||
segments
[
0
]
.is_empty
()
{
return
Err
(
EtcdPathError
::
EmptyNamespace
);
}
// First segment is always the namespace
let
namespace
=
segments
[
0
]
.to_string
();
validate_namespace
(
&
namespace
)
?
;
let
mut
etcd_path
=
Self
{
namespace
,
component
:
None
,
endpoint
:
None
,
lease_id
:
None
,
extra_path
:
None
,
};
// Parse remaining segments
let
mut
i
=
1
;
while
i
<
segments
.len
()
{
match
segments
[
i
]
{
COMPONENT_KEYWORD
=>
{
if
i
+
1
>=
segments
.len
()
{
return
Err
(
EtcdPathError
::
EmptyComponent
);
}
let
component_name
=
segments
[
i
+
1
]
.to_string
();
validate_component
(
&
component_name
)
?
;
etcd_path
.component
=
Some
(
component_name
);
i
+=
2
;
}
ENDPOINT_KEYWORD
=>
{
if
etcd_path
.component
.is_none
()
{
return
Err
(
EtcdPathError
::
EndpointWithoutComponent
);
}
if
i
+
1
>=
segments
.len
()
{
return
Err
(
EtcdPathError
::
EmptyEndpoint
);
}
let
endpoint_segment
=
segments
[
i
+
1
];
// Check if endpoint has a lease ID suffix (:lease_id)
if
let
Some
(
colon_pos
)
=
endpoint_segment
.find
(
':'
)
{
let
endpoint_name
=
endpoint_segment
[
..
colon_pos
]
.to_string
();
let
lease_id_str
=
&
endpoint_segment
[
colon_pos
+
1
..
];
validate_endpoint
(
&
endpoint_name
)
?
;
// Parse lease ID as hexadecimal
let
lease_id
=
i64
::
from_str_radix
(
lease_id_str
,
16
)
.map_err
(|
_
|
{
EtcdPathError
::
InvalidEndpoint
(
format!
(
"Invalid lease ID format: {}"
,
lease_id_str
))
})
?
;
etcd_path
.endpoint
=
Some
(
endpoint_name
);
etcd_path
.lease_id
=
Some
(
lease_id
);
}
else
{
let
endpoint_name
=
endpoint_segment
.to_string
();
validate_endpoint
(
&
endpoint_name
)
?
;
etcd_path
.endpoint
=
Some
(
endpoint_name
);
}
i
+=
2
;
}
_
=>
{
// This is an extra path segment
let
mut
extra_path
=
Vec
::
new
();
while
i
<
segments
.len
()
{
validate_extra_path_segment
(
segments
[
i
])
?
;
extra_path
.push
(
segments
[
i
]
.to_string
());
i
+=
1
;
}
etcd_path
.extra_path
=
if
extra_path
.is_empty
()
{
None
}
else
{
Some
(
extra_path
)
};
break
;
}
}
}
Ok
(
etcd_path
)
}
}
impl
FromStr
for
EtcdPath
{
type
Err
=
EtcdPathError
;
fn
from_str
(
s
:
&
str
)
->
Result
<
Self
,
Self
::
Err
>
{
Self
::
parse
(
s
)
}
}
impl
EtcdPath
{
/// Try to create an EtcdPath from a String
pub
fn
from_string
(
s
:
String
)
->
Result
<
Self
,
EtcdPathError
>
{
Self
::
parse
(
&
s
)
}
}
impl
std
::
fmt
::
Display
for
EtcdPath
{
fn
fmt
(
&
self
,
f
:
&
mut
std
::
fmt
::
Formatter
<
'_
>
)
->
std
::
fmt
::
Result
{
write!
(
f
,
"{}"
,
self
._to_string
())
}
}
/// Validate namespace using the existing validation function
fn
validate_namespace
(
namespace
:
&
str
)
->
Result
<
(),
EtcdPathError
>
{
if
namespace
.is_empty
()
{
return
Err
(
EtcdPathError
::
EmptyNamespace
);
}
// Split by dots and validate each part
for
part
in
namespace
.split
(
'.'
)
{
if
part
.is_empty
()
{
return
Err
(
EtcdPathError
::
InvalidNamespace
(
format!
(
"Empty namespace segment in '{}'"
,
namespace
)));
}
validate_allowed_chars
(
part
)
.map_err
(|
_
|
{
EtcdPathError
::
InvalidNamespace
(
format!
(
"Invalid characters in '{}'"
,
part
))
})
?
;
}
Ok
(())
}
/// Validate component name
fn
validate_component
(
component
:
&
str
)
->
Result
<
(),
EtcdPathError
>
{
if
component
.is_empty
()
{
return
Err
(
EtcdPathError
::
EmptyComponent
);
}
validate_allowed_chars
(
component
)
.map_err
(|
_
|
EtcdPathError
::
InvalidComponent
(
component
.to_string
()))
}
/// Validate endpoint name
fn
validate_endpoint
(
endpoint
:
&
str
)
->
Result
<
(),
EtcdPathError
>
{
if
endpoint
.is_empty
()
{
return
Err
(
EtcdPathError
::
EmptyEndpoint
);
}
validate_allowed_chars
(
endpoint
)
.map_err
(|
_
|
EtcdPathError
::
InvalidEndpoint
(
endpoint
.to_string
()))
}
/// Validate extra path segment
fn
validate_extra_path_segment
(
segment
:
&
str
)
->
Result
<
(),
EtcdPathError
>
{
if
segment
.is_empty
()
{
return
Err
(
EtcdPathError
::
InvalidExtraPath
(
"Empty path segment"
.to_string
(),
));
}
// Check for reserved keywords
if
segment
==
COMPONENT_KEYWORD
{
return
Err
(
EtcdPathError
::
ReservedKeyword
(
segment
.to_string
()));
}
if
segment
==
ENDPOINT_KEYWORD
{
return
Err
(
EtcdPathError
::
ReservedKeyword
(
segment
.to_string
()));
}
validate_allowed_chars
(
segment
)
.map_err
(|
_
|
EtcdPathError
::
InvalidExtraPath
(
segment
.to_string
()))
}
/// Custom validator function (same as in component.rs)
fn
validate_allowed_chars
(
input
:
&
str
)
->
Result
<
(),
ValidationError
>
{
if
ALLOWED_CHARS_REGEX
.is_match
(
input
)
{
Ok
(())
}
else
{
Err
(
ValidationError
::
new
(
"invalid_characters"
))
}
}
#[cfg(test)]
mod
tests
{
use
super
::
*
;
#[test]
fn
test_namespace_and_component
()
{
let
s
=
format!
(
"{ETCD_ROOT_PATH}ns1.ns2/_component_/my-component"
);
let
path
=
EtcdPath
::
parse
(
&
s
)
.unwrap
();
assert_eq!
(
path
.namespace
,
"ns1.ns2"
);
assert_eq!
(
path
.component
,
Some
(
"my-component"
.to_string
()));
assert_eq!
(
path
.endpoint
,
None
);
assert_eq!
(
path
.extra_path
,
None
);
assert_eq!
(
path
.to_string
(),
s
);
}
#[test]
fn
test_full_path_with_endpoint
()
{
let
s
=
format!
(
"{ETCD_ROOT_PATH}ns1.ns2.ns3/_component_/component-name/_endpoint_/endpoint-name"
);
let
path
=
EtcdPath
::
parse
(
&
s
)
.unwrap
();
assert_eq!
(
path
.namespace
,
"ns1.ns2.ns3"
);
assert_eq!
(
path
.component
,
Some
(
"component-name"
.to_string
()));
assert_eq!
(
path
.endpoint
,
Some
(
"endpoint-name"
.to_string
()));
assert_eq!
(
path
.extra_path
,
None
);
assert_eq!
(
path
.to_string
(),
s
);
}
#[test]
fn
test_invalid_prefix
()
{
let
result
=
EtcdPath
::
parse
(
"invalid://ns1"
);
assert
!
(
matches!
(
result
,
Err
(
EtcdPathError
::
InvalidPrefix
)));
}
#[test]
fn
test_invalid_characters
()
{
let
result
=
EtcdPath
::
parse
(
&
format!
(
"{ETCD_ROOT_PATH}ns1!/_component_/comp1"
));
assert
!
(
matches!
(
result
,
Err
(
EtcdPathError
::
InvalidNamespace
(
_
))));
}
#[test]
fn
test_constructor_methods
()
{
let
path
=
EtcdPath
::
new_namespace
(
"ns1.ns2.ns3"
)
.unwrap
();
assert_eq!
(
path
.to_string
(),
format!
(
"{ETCD_ROOT_PATH}ns1.ns2.ns3"
));
let
path
=
EtcdPath
::
new_component
(
"ns1.ns2"
,
"comp1"
)
.unwrap
();
assert_eq!
(
path
.to_string
(),
format!
(
"{ETCD_ROOT_PATH}ns1.ns2/_component_/comp1"
)
);
let
path
=
EtcdPath
::
new_endpoint
(
"ns1"
,
"comp1"
,
"ep1"
)
.unwrap
();
assert_eq!
(
path
.to_string
(),
format!
(
"{ETCD_ROOT_PATH}ns1/_component_/comp1/_endpoint_/ep1"
)
);
}
#[test]
fn
test_with_extra_path_method
()
{
let
path
=
EtcdPath
::
new_component
(
"ns1"
,
"comp1"
)
.unwrap
()
.with_extra_path
(
vec!
[
"path1"
.to_string
(),
"path2"
.to_string
()])
.unwrap
();
assert_eq!
(
path
.to_string
(),
format!
(
"{ETCD_ROOT_PATH}ns1/_component_/comp1/path1/path2"
)
);
}
#[test]
fn
test_endpoint_with_lease_id
()
{
// Test creating endpoint with lease ID
let
path
=
EtcdPath
::
new_endpoint_with_lease
(
"ns1"
,
"comp1"
,
"ep1"
,
0xabc123
)
.unwrap
();
assert_eq!
(
path
.namespace
,
"ns1"
);
assert_eq!
(
path
.component
,
Some
(
"comp1"
.to_string
()));
assert_eq!
(
path
.endpoint
,
Some
(
"ep1"
.to_string
()));
assert_eq!
(
path
.lease_id
,
Some
(
0xabc123
));
assert_eq!
(
path
.to_string
(),
format!
(
"{ETCD_ROOT_PATH}ns1/_component_/comp1/_endpoint_/ep1:abc123"
)
);
}
#[test]
fn
test_parse_endpoint_with_lease_id
()
{
// Test parsing endpoint with lease ID
let
path
=
EtcdPath
::
parse
(
&
format!
(
"{ETCD_ROOT_PATH}ns1/_component_/comp1/_endpoint_/ep1:abc123"
))
.unwrap
();
assert_eq!
(
path
.namespace
,
"ns1"
);
assert_eq!
(
path
.component
,
Some
(
"comp1"
.to_string
()));
assert_eq!
(
path
.endpoint
,
Some
(
"ep1"
.to_string
()));
assert_eq!
(
path
.lease_id
,
Some
(
0xabc123
));
assert_eq!
(
path
.extra_path
,
None
);
}
#[test]
fn
test_parse_endpoint_without_lease_id
()
{
// Test that endpoints without lease ID still work
let
path
=
EtcdPath
::
parse
(
&
format!
(
"{ETCD_ROOT_PATH}ns1/_component_/comp1/_endpoint_/ep1"
))
.unwrap
();
assert_eq!
(
path
.namespace
,
"ns1"
);
assert_eq!
(
path
.component
,
Some
(
"comp1"
.to_string
()));
assert_eq!
(
path
.endpoint
,
Some
(
"ep1"
.to_string
()));
assert_eq!
(
path
.lease_id
,
None
);
assert_eq!
(
path
.extra_path
,
None
);
}
#[test]
fn
test_invalid_lease_id_format
()
{
// Test invalid lease ID format
let
result
=
EtcdPath
::
parse
(
&
format!
(
"{ETCD_ROOT_PATH}ns1/_component_/comp1/_endpoint_/ep1:invalid"
));
assert
!
(
matches!
(
result
,
Err
(
EtcdPathError
::
InvalidEndpoint
(
_
))));
}
#[test]
fn
test_lease_id_round_trip
()
{
// Test round-trip: create -> to_string -> parse -> verify
let
original_path
=
EtcdPath
::
new_endpoint_with_lease
(
"production"
,
"api-gateway"
,
"http"
,
0xdeadbeef
)
.unwrap
();
// Convert to string
let
path_string
=
original_path
.to_string
();
assert_eq!
(
path_string
,
format!
(
"{ETCD_ROOT_PATH}production/_component_/api-gateway/_endpoint_/http:deadbeef"
)
);
// Parse back from string
let
parsed_path
=
EtcdPath
::
parse
(
&
path_string
)
.unwrap
();
// Verify all fields match
assert_eq!
(
parsed_path
.namespace
,
"production"
);
assert_eq!
(
parsed_path
.component
,
Some
(
"api-gateway"
.to_string
()));
assert_eq!
(
parsed_path
.endpoint
,
Some
(
"http"
.to_string
()));
assert_eq!
(
parsed_path
.lease_id
,
Some
(
0xdeadbeef
));
assert_eq!
(
parsed_path
.extra_path
,
None
);
// Verify the parsed path equals the original
assert_eq!
(
parsed_path
,
original_path
);
}
#[test]
fn
test_lease_id_edge_cases
()
{
// Test with lease ID 0
let
path
=
EtcdPath
::
new_endpoint_with_lease
(
"ns"
,
"comp"
,
"ep"
,
0
)
.unwrap
();
assert_eq!
(
path
.to_string
(),
format!
(
"{ETCD_ROOT_PATH}ns/_component_/comp/_endpoint_/ep:0"
)
);
// Test with maximum i64 value
let
path
=
EtcdPath
::
new_endpoint_with_lease
(
"ns"
,
"comp"
,
"ep"
,
i64
::
MAX
)
.unwrap
();
assert_eq!
(
path
.to_string
(),
format!
(
"{ETCD_ROOT_PATH}ns/_component_/comp/_endpoint_/ep:7fffffffffffffff"
)
);
// Test parsing maximum value
let
parsed
=
EtcdPath
::
parse
(
&
format!
(
"{ETCD_ROOT_PATH}ns/_component_/comp/_endpoint_/ep:7fffffffffffffff"
))
.unwrap
();
assert_eq!
(
parsed
.lease_id
,
Some
(
i64
::
MAX
));
}
}
lib/runtime/src/transports/nats.rs
View file @
51c1b9f1
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
//!
//!
//! Note: `NATS_AUTH_USERNAME` and `NATS_AUTH_PASSWORD` must be used together.
//! Note: `NATS_AUTH_USERNAME` and `NATS_AUTH_PASSWORD` must be used together.
use
crate
::
metrics
::
MetricsHierarchy
;
use
crate
::
metrics
::
MetricsHierarchy
;
use
crate
::
protocols
::
EndpointId
;
use
crate
::
traits
::
events
::
EventPublisher
;
use
crate
::
traits
::
events
::
EventPublisher
;
use
anyhow
::
Result
;
use
anyhow
::
Result
;
...
@@ -990,6 +991,15 @@ impl DRTNatsClientPrometheusMetrics {
...
@@ -990,6 +991,15 @@ impl DRTNatsClientPrometheusMetrics {
}
}
}
}
/// The NATS subject / inbox to talk to an instance on.
/// TODO: Do we need to sanitize the names?
pub
(
crate
)
fn
instance_subject
(
endpoint_id
:
&
EndpointId
,
instance_id
:
u64
)
->
String
{
format!
(
"{}_{}.{}-{:x}"
,
endpoint_id
.namespace
,
endpoint_id
.component
,
endpoint_id
.name
,
instance_id
,
)
}
#[cfg(test)]
#[cfg(test)]
mod
tests
{
mod
tests
{
...
...
lib/runtime/tests/namespace_etcd_path.rs
deleted
100644 → 0
View file @
4babb33c
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Test file for recursive namespace etcd_path functionality
#[allow(unused_imports)]
use
dynamo_runtime
::{
DistributedRuntime
,
Runtime
};
#[cfg(feature
=
"integration"
)]
#[test]
fn
test_namespace_etcd_path_format
()
{
// Test that the etcd_path format is correct for the expected use case
// This test verifies the format: dynamo://ns1.ns2.ns3/component/{component.name()}
// Expected format examples:
let
single_ns_path
=
"dynamo://ns1"
;
let
nested_ns_path
=
"dynamo://ns1.ns2.ns3"
;
let
component_path
=
"dynamo://ns1.ns2.ns3/_component_/my-component"
;
// Verify the format matches our requirements
assert
!
(
single_ns_path
.starts_with
(
"dynamo://"
));
assert
!
(
nested_ns_path
.starts_with
(
"dynamo://"
));
assert
!
(
nested_ns_path
.contains
(
"."
));
assert
!
(
component_path
.contains
(
"/_component_/"
));
// Test the specific format requested in the user query (now with reserved keywords)
let
expected_format
=
"dynamo://ns1.ns2.ns3/_component_/my-component"
;
assert_eq!
(
component_path
,
expected_format
);
println!
(
"✅ Namespace etcd_path format verification passed"
);
println!
(
" Single namespace: {}"
,
single_ns_path
);
println!
(
" Nested namespace: {}"
,
nested_ns_path
);
println!
(
" Component path: {}"
,
component_path
);
}
#[cfg(feature
=
"integration"
)]
#[tokio::test]
async
fn
test_recursive_namespace_implementation
()
{
use
dynamo_runtime
::{
distributed
::
DistributedConfig
,
storage
::
key_value_store
::
KeyValueStoreSelect
,
transports
::
nats
,
};
let
runtime
=
Runtime
::
from_current
()
.unwrap
();
let
config
=
DistributedConfig
{
store_backend
:
KeyValueStoreSelect
::
Memory
,
nats_config
:
Some
(
nats
::
ClientOptions
::
default
()),
request_plane
:
dynamo_runtime
::
distributed
::
RequestPlaneMode
::
default
(),
};
let
distributed_runtime
=
DistributedRuntime
::
new
(
runtime
,
config
)
.await
.unwrap
();
// Test single namespace
let
ns1
=
distributed_runtime
.namespace
(
"ns1"
)
.unwrap
();
assert_eq!
(
ns1
.etcd_path
(),
"dynamo://ns1"
);
assert_eq!
(
ns1
.name
(),
"ns1"
);
// Test nested namespace ns1.ns2
let
ns2
=
ns1
.namespace
(
"ns2"
)
.unwrap
();
assert_eq!
(
ns2
.etcd_path
(),
"dynamo://ns1.ns2"
);
assert_eq!
(
ns2
.name
(),
"ns1.ns2"
);
// Test deeply nested namespace ns1.ns2.ns3
let
ns3
=
ns2
.namespace
(
"ns3"
)
.unwrap
();
assert_eq!
(
ns3
.etcd_path
(),
"dynamo://ns1.ns2.ns3"
);
assert_eq!
(
ns3
.name
(),
"ns1.ns2.ns3"
);
// Test component in deeply nested namespace
let
component
=
ns3
.component
(
"my-component"
)
.unwrap
();
assert_eq!
(
component
.etcd_path
()
.to_string
(),
"dynamo://ns1.ns2.ns3/_component_/my-component"
);
assert_eq!
(
component
.name
(),
"my-component"
);
assert_eq!
(
component
.path
(),
"ns1.ns2.ns3/my-component"
);
println!
(
"✅ Actual recursive namespace implementation test passed!"
);
println!
(
" Root namespace: {}"
,
ns1
.etcd_path
());
println!
(
" Nested namespace: {}"
,
ns2
.etcd_path
());
println!
(
" Deep namespace: {}"
,
ns3
.etcd_path
());
println!
(
" Component path: {}"
,
component
.etcd_path
());
}
#[cfg(feature
=
"integration"
)]
#[tokio::test]
async
fn
test_multiple_branches_recursive_namespaces
()
{
use
dynamo_runtime
::{
distributed
::
DistributedConfig
,
storage
::
key_value_store
::
KeyValueStoreSelect
,
transports
::
nats
,
};
let
runtime
=
Runtime
::
from_current
()
.unwrap
();
let
config
=
DistributedConfig
{
store_backend
:
KeyValueStoreSelect
::
Memory
,
nats_config
:
Some
(
nats
::
ClientOptions
::
default
()),
request_plane
:
dynamo_runtime
::
distributed
::
RequestPlaneMode
::
default
(),
};
let
distributed_runtime
=
DistributedRuntime
::
new
(
runtime
,
config
)
.await
.unwrap
();
// Create root namespace
let
root
=
distributed_runtime
.namespace
(
"root"
)
.unwrap
();
// Create multiple branches
let
prod_ns
=
root
.namespace
(
"prod"
)
.unwrap
();
let
staging_ns
=
root
.namespace
(
"staging"
)
.unwrap
();
// Create services in each branch
let
prod_service_ns
=
prod_ns
.namespace
(
"services"
)
.unwrap
();
let
staging_service_ns
=
staging_ns
.namespace
(
"services"
)
.unwrap
();
// Verify the paths are correct
assert_eq!
(
prod_service_ns
.etcd_path
(),
"dynamo://root.prod.services"
);
assert_eq!
(
staging_service_ns
.etcd_path
(),
"dynamo://root.staging.services"
);
// Create components in each branch
let
prod_component
=
prod_service_ns
.component
(
"api-gateway"
)
.unwrap
();
let
staging_component
=
staging_service_ns
.component
(
"api-gateway"
)
.unwrap
();
assert_eq!
(
prod_component
.etcd_path
()
.to_string
(),
"dynamo://root.prod.services/_component_/api-gateway"
);
assert_eq!
(
staging_component
.etcd_path
()
.to_string
(),
"dynamo://root.staging.services/_component_/api-gateway"
);
// Verify they are different
assert_ne!
(
prod_component
.etcd_path
(),
staging_component
.etcd_path
());
println!
(
"✅ Multiple branches recursive namespaces test passed!"
);
println!
(
" Production: {}"
,
prod_component
.etcd_path
());
println!
(
" Staging: {}"
,
staging_component
.etcd_path
());
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment