Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
f05f7629
Unverified
Commit
f05f7629
authored
Nov 21, 2025
by
Graham King
Committed by
GitHub
Nov 21, 2025
Browse files
chore: Make nats_client private at crate level, various tidy up (#4513)
Signed-off-by:
Graham King
<
grahamk@nvidia.com
>
parent
27904535
Changes
22
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
73 additions
and
1137 deletions
+73
-1137
lib/llm/src/audit/bus.rs
lib/llm/src/audit/bus.rs
+5
-5
lib/llm/src/audit/config.rs
lib/llm/src/audit/config.rs
+4
-4
lib/llm/src/audit/sink.rs
lib/llm/src/audit/sink.rs
+14
-16
lib/llm/src/block_manager/controller.rs
lib/llm/src/block_manager/controller.rs
+1
-3
lib/llm/src/entrypoint/input.rs
lib/llm/src/entrypoint/input.rs
+1
-1
lib/llm/src/entrypoint/input/endpoint.rs
lib/llm/src/entrypoint/input/endpoint.rs
+1
-6
lib/llm/src/kv_router/sequence.rs
lib/llm/src/kv_router/sequence.rs
+2
-4
lib/llm/src/mocker/engine.rs
lib/llm/src/mocker/engine.rs
+1
-2
lib/llm/src/preprocessor.rs
lib/llm/src/preprocessor.rs
+1
-1
lib/llm/tests/audit_nats_integration.rs
lib/llm/tests/audit_nats_integration.rs
+2
-16
lib/llm/tests/block_manager.rs
lib/llm/tests/block_manager.rs
+0
-882
lib/runtime/examples/hello_world/src/bin/server.rs
lib/runtime/examples/hello_world/src/bin/server.rs
+1
-4
lib/runtime/examples/service_metrics/src/bin/service_client.rs
...untime/examples/service_metrics/src/bin/service_client.rs
+1
-8
lib/runtime/examples/service_metrics/src/bin/service_server.rs
...untime/examples/service_metrics/src/bin/service_server.rs
+1
-2
lib/runtime/examples/system_metrics/src/bin/system_client.rs
lib/runtime/examples/system_metrics/src/bin/system_client.rs
+1
-4
lib/runtime/examples/system_metrics/src/lib.rs
lib/runtime/examples/system_metrics/src/lib.rs
+1
-2
lib/runtime/src/component.rs
lib/runtime/src/component.rs
+27
-121
lib/runtime/src/component/endpoint.rs
lib/runtime/src/component/endpoint.rs
+4
-14
lib/runtime/src/distributed.rs
lib/runtime/src/distributed.rs
+3
-36
lib/runtime/src/metrics.rs
lib/runtime/src/metrics.rs
+2
-6
No files found.
lib/llm/src/audit/bus.rs
View file @
f05f7629
...
...
@@ -2,22 +2,22 @@
// SPDX-License-Identifier: Apache-2.0
use
super
::
handle
::
AuditRecord
;
use
std
::
sync
::
{
Arc
,
OnceLock
}
;
use
std
::
sync
::
OnceLock
;
use
tokio
::
sync
::
broadcast
;
static
BUS
:
OnceLock
<
broadcast
::
Sender
<
Arc
<
AuditRecord
>>
>
=
OnceLock
::
new
();
static
BUS
:
OnceLock
<
broadcast
::
Sender
<
AuditRecord
>>
=
OnceLock
::
new
();
pub
fn
init
(
capacity
:
usize
)
{
let
(
tx
,
_
rx
)
=
broadcast
::
channel
::
<
Arc
<
AuditRecord
>
>
(
capacity
);
let
(
tx
,
_
rx
)
=
broadcast
::
channel
::
<
AuditRecord
>
(
capacity
);
let
_
=
BUS
.set
(
tx
);
}
pub
fn
subscribe
()
->
broadcast
::
Receiver
<
Arc
<
AuditRecord
>
>
{
pub
fn
subscribe
()
->
broadcast
::
Receiver
<
AuditRecord
>
{
BUS
.get
()
.expect
(
"audit bus not initialized"
)
.subscribe
()
}
pub
fn
publish
(
rec
:
AuditRecord
)
{
if
let
Some
(
tx
)
=
BUS
.get
()
{
let
_
=
tx
.send
(
Arc
::
new
(
rec
)
)
;
let
_
=
tx
.send
(
rec
);
}
}
lib/llm/src/audit/config.rs
View file @
f05f7629
...
...
@@ -10,11 +10,11 @@ pub struct AuditPolicy {
static
POLICY
:
OnceLock
<
AuditPolicy
>
=
OnceLock
::
new
();
/// Audit is enabled if we have at least one sink
pub
fn
init_from_env
()
->
AuditPolicy
{
let
enabled
=
std
::
env
::
var
(
"DYN_AUDIT_ENABLED"
)
.map
(|
v
|
v
==
"1"
||
v
.eq_ignore_ascii_case
(
"true"
))
.unwrap_or
(
false
);
AuditPolicy
{
enabled
}
AuditPolicy
{
enabled
:
std
::
env
::
var
(
"DYN_AUDIT_SINKS"
)
.is_ok
(),
}
}
pub
fn
policy
()
->
AuditPolicy
{
...
...
lib/llm/src/audit/sink.rs
View file @
f05f7629
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use
anyhow
::
Context
as
_
;
use
async_nats
::
jetstream
;
use
async_trait
::
async_trait
;
use
dynamo_runtime
::
transports
::
nats
;
use
std
::
sync
::
Arc
;
use
tokio
::
sync
::
broadcast
;
...
...
@@ -36,7 +38,7 @@ pub struct NatsSink {
}
impl
NatsSink
{
pub
fn
new
(
nats_client
:
&
dynamo_runtime
::
transports
::
nats
::
Client
)
->
Self
{
pub
fn
new
(
nats_client
:
dynamo_runtime
::
transports
::
nats
::
Client
)
->
Self
{
let
subject
=
std
::
env
::
var
(
"DYN_AUDIT_NATS_SUBJECT"
)
.unwrap_or_else
(|
_
|
"dynamo.audit.v1"
.to_string
());
Self
{
...
...
@@ -64,37 +66,32 @@ impl AuditSink for NatsSink {
}
}
fn
parse_sinks_from_env
(
nats_client
:
Option
<&
dynamo_runtime
::
transports
::
nats
::
Client
>
,
)
->
Vec
<
Arc
<
dyn
AuditSink
>>
{
async
fn
parse_sinks_from_env
()
->
anyhow
::
Result
<
Vec
<
Arc
<
dyn
AuditSink
>>>
{
let
cfg
=
std
::
env
::
var
(
"DYN_AUDIT_SINKS"
)
.unwrap_or_else
(|
_
|
"stderr"
.into
());
let
mut
out
:
Vec
<
Arc
<
dyn
AuditSink
>>
=
Vec
::
new
();
for
name
in
cfg
.split
(
','
)
.map
(|
s
|
s
.trim
()
.to_lowercase
())
{
match
name
.as_str
()
{
"stderr"
|
""
=>
out
.push
(
Arc
::
new
(
StderrSink
)),
"nats"
=>
{
if
let
Some
(
client
)
=
nats_client
{
out
.push
(
Arc
::
new
(
NatsSink
::
new
(
client
)));
}
else
{
tracing
::
warn!
(
"NATS sink requested but no DistributedRuntime NATS client available; skipping"
);
}
let
nats_client
=
nats
::
ClientOptions
::
default
()
.connect
()
.await
.context
(
"Attempting to connect NATS sink from env var DYN_AUDIT_SINKS"
)
?
;
out
.push
(
Arc
::
new
(
NatsSink
::
new
(
nats_client
)));
}
// "pg" => out.push(Arc::new(PostgresSink::from_env())),
other
=>
tracing
::
warn!
(
%
other
,
"audit: unknown sink ignored"
),
}
}
out
Ok
(
out
)
}
/// spawn one worker per sink; each subscribes to the bus (off hot path)
pub
fn
spawn_workers_from_env
(
drt
:
&
dynamo_runtime
::
DistributedRuntime
)
{
let
nats_client
=
drt
.nats_client
();
let
sinks
=
parse_sinks_from_env
(
nats_client
);
pub
async
fn
spawn_workers_from_env
()
->
anyhow
::
Result
<
()
>
{
let
sinks
=
parse_sinks_from_env
()
.await
?
;
for
sink
in
sinks
{
let
name
=
sink
.name
();
let
mut
rx
:
broadcast
::
Receiver
<
Arc
<
AuditRecord
>
>
=
bus
::
subscribe
();
let
mut
rx
:
broadcast
::
Receiver
<
AuditRecord
>
=
bus
::
subscribe
();
tokio
::
spawn
(
async
move
{
loop
{
match
rx
.recv
()
.await
{
...
...
@@ -110,4 +107,5 @@ pub fn spawn_workers_from_env(drt: &dynamo_runtime::DistributedRuntime) {
});
}
tracing
::
info!
(
"Audit sinks ready."
);
Ok
(())
}
lib/llm/src/block_manager/controller.rs
View file @
f05f7629
...
...
@@ -39,10 +39,8 @@ pub struct Controller<Locality: LocalityProvider, Metadata: BlockMetadata> {
impl
<
Locality
:
LocalityProvider
,
Metadata
:
BlockMetadata
>
Controller
<
Locality
,
Metadata
>
{
pub
async
fn
new
(
block_manager
:
KvBlockManager
<
Locality
,
Metadata
>
,
mut
component
:
dynamo_runtime
::
component
::
Component
,
component
:
dynamo_runtime
::
component
::
Component
,
)
->
anyhow
::
Result
<
Self
>
{
component
.add_stats_service
()
.await
?
;
let
handler
=
ControllerHandler
::
new
(
block_manager
.clone
());
let
engine
=
Ingress
::
for_engine
(
handler
.clone
())
?
;
...
...
lib/llm/src/entrypoint/input.rs
View file @
f05f7629
...
...
@@ -117,7 +117,7 @@ pub async fn run_input(
.and_then
(|
v
|
v
.parse
()
.ok
())
.unwrap_or
(
1024
);
crate
::
audit
::
bus
::
init
(
cap
);
crate
::
audit
::
sink
::
spawn_workers_from_env
(
&
drt
)
;
crate
::
audit
::
sink
::
spawn_workers_from_env
(
)
.await
?
;
tracing
::
info!
(
cap
,
"Audit initialized"
);
}
...
...
lib/llm/src/entrypoint/input/endpoint.rs
View file @
f05f7629
...
...
@@ -32,14 +32,9 @@ pub async fn run(
let
cancel_token
=
distributed_runtime
.primary_token
()
.clone
();
let
endpoint_id
:
EndpointId
=
path
.parse
()
?
;
let
mut
component
=
distributed_runtime
let
component
=
distributed_runtime
.namespace
(
&
endpoint_id
.namespace
)
?
.component
(
&
endpoint_id
.component
)
?
;
// We can only make the NATS service if we have NATS
if
distributed_runtime
.nats_client
()
.is_some
()
{
component
.add_stats_service
()
.await
?
;
}
let
endpoint
=
component
.endpoint
(
&
endpoint_id
.name
);
let
rt_fut
:
Pin
<
Box
<
dyn
Future
<
Output
=
_
>
+
Send
+
'static
>>
=
match
engine_config
{
...
...
lib/llm/src/kv_router/sequence.rs
View file @
f05f7629
...
...
@@ -1002,8 +1002,7 @@ mod tests {
// Create namespace and shared component for both seq_managers
let
namespace
=
distributed
.namespace
(
"test_cross_instance_sync"
)
?
;
let
mut
component
=
namespace
.component
(
"sequences"
)
?
;
component
.add_stats_service
()
.await
?
;
let
component
=
namespace
.component
(
"sequences"
)
?
;
// Create multi-worker sequence managers with:
// - Worker 0 with dp_size=2 (dp_ranks 0 and 1)
...
...
@@ -1168,8 +1167,7 @@ mod tests {
// Create namespace and shared component for both seq_managers
let
namespace
=
distributed
.namespace
(
"test_no_token_seq_sync"
)
?
;
let
mut
component
=
namespace
.component
(
"sequences"
)
?
;
component
.add_stats_service
()
.await
?
;
let
component
=
namespace
.component
(
"sequences"
)
?
;
// Create multi-worker sequence managers with ALL workers [0, 1, 2]
// Both use the same component to ensure event synchronization works
...
...
lib/llm/src/mocker/engine.rs
View file @
f05f7629
...
...
@@ -463,8 +463,7 @@ mod integration_tests {
tracing
::
info!
(
"✓ Runtime and distributed runtime created"
);
// Create component for MockVllmEngine (needed for publishers)
let
mut
test_component
=
distributed
.namespace
(
"test"
)
?
.component
(
MOCKER_COMPONENT
)
?
;
test_component
.add_stats_service
()
.await
?
;
let
test_component
=
distributed
.namespace
(
"test"
)
?
.component
(
MOCKER_COMPONENT
)
?
;
tracing
::
info!
(
"✓ Test component created"
);
// Create MockVllmEngine WITH component (enables publishers)
...
...
lib/llm/src/preprocessor.rs
View file @
f05f7629
...
...
@@ -856,7 +856,7 @@ impl
let
request_id
=
context
.id
()
.to_string
();
let
original_stream_flag
=
request
.inner.stream
.unwrap_or
(
false
);
// Build audit handle (None if DYN_AUDIT_
ENABLED=0
)
// Build audit handle (None if
no
DYN_AUDIT_
SINKS
)
let
mut
audit_handle
=
crate
::
audit
::
handle
::
create_handle
(
&
request
,
&
request_id
);
if
let
Some
(
ref
mut
h
)
=
audit_handle
{
...
...
lib/llm/tests/audit_nats_integration.rs
View file @
f05f7629
...
...
@@ -27,7 +27,6 @@ mod tests {
use
dynamo_llm
::
protocols
::
openai
::
chat_completions
::{
NvCreateChatCompletionRequest
,
NvCreateChatCompletionResponse
,
};
use
dynamo_runtime
::
Runtime
;
use
dynamo_runtime
::
transports
::
nats
;
use
futures
::
StreamExt
;
use
serde_json
::
Value
;
...
...
@@ -48,15 +47,6 @@ mod tests {
.expect
(
"Failed to connect to NATS server"
)
}
/// Helper to create a test DistributedRuntime with NATS
async
fn
create_test_drt
()
->
dynamo_runtime
::
DistributedRuntime
{
let
rt
=
Runtime
::
from_current
()
.unwrap
();
let
config
=
dynamo_runtime
::
distributed
::
DistributedConfig
::
from_settings
();
dynamo_runtime
::
DistributedRuntime
::
new
(
rt
,
config
)
.await
.expect
(
"Failed to create DistributedRuntime"
)
}
/// Helper to create a minimal test request
fn
create_test_request
(
model
:
&
str
,
store
:
bool
)
->
NvCreateChatCompletionRequest
{
let
json
=
serde_json
::
json!
({
...
...
@@ -155,7 +145,6 @@ mod tests {
// Core test: audit records are published to NATS with correct structure
async_with_vars
(
[
(
"DYN_AUDIT_ENABLED"
,
Some
(
"1"
)),
(
"DYN_AUDIT_SINKS"
,
Some
(
"nats"
)),
(
"DYN_AUDIT_NATS_SUBJECT"
,
Some
(
TEST_SUBJECT
)),
],
...
...
@@ -166,8 +155,7 @@ mod tests {
setup_test_stream
(
&
client
,
&
stream_name
,
TEST_SUBJECT
)
.await
;
bus
::
init
(
100
);
let
drt
=
create_test_drt
()
.await
;
sink
::
spawn_workers_from_env
(
&
drt
);
sink
::
spawn_workers_from_env
()
.await
.unwrap
();
time
::
sleep
(
Duration
::
from_millis
(
100
))
.await
;
// Emit audit record
...
...
@@ -212,7 +200,6 @@ mod tests {
async_with_vars
(
[
(
"DYN_AUDIT_ENABLED"
,
Some
(
"1"
)),
(
"DYN_AUDIT_SINKS"
,
Some
(
"nats"
)),
(
"DYN_AUDIT_NATS_SUBJECT"
,
Some
(
TEST_SUBJECT
)),
],
...
...
@@ -223,8 +210,7 @@ mod tests {
setup_test_stream
(
&
client
,
&
stream_name
,
TEST_SUBJECT
)
.await
;
bus
::
init
(
100
);
let
drt
=
create_test_drt
()
.await
;
sink
::
spawn_workers_from_env
(
&
drt
);
sink
::
spawn_workers_from_env
()
.await
.unwrap
();
time
::
sleep
(
Duration
::
from_millis
(
100
))
.await
;
// Request with store=true (should be audited)
...
...
lib/llm/tests/block_manager.rs
deleted
100644 → 0
View file @
27904535
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Block Manager Dynamo Integration Tests
//!
//! This module both the integration components in the `llm_kvbm` module
//! and the tests for the `llm_kvbm` module.
//!
//! The intent is to move [llm_kvbm] to a separate crate in the future.
#[cfg(feature
=
"block-manager"
)]
pub
mod
llm_kvbm
{
// alias for the kvbm module to make the refactor to standalone crate easier
use
dynamo_llm
::
block_manager
as
kvbm
;
// kvbm specific imports
use
kvbm
::{
block
::
registry
::
RegistrationHandle
,
events
::
*
};
// std imports
use
async_trait
::
async_trait
;
use
serde
::
Serialize
;
use
std
::
collections
::
VecDeque
;
use
std
::
sync
::
Arc
;
use
tokio
::
time
::
Duration
;
use
anyhow
::
Result
;
use
derive_builder
::
Builder
;
use
derive_getters
::
Dissolve
;
use
dynamo_llm
::
kv_router
::{
indexer
::
RouterEvent
,
protocols
::{
ExternalSequenceBlockHash
,
KvCacheEvent
,
KvCacheEventData
,
KvCacheRemoveData
,
KvCacheStoreData
,
KvCacheStoredBlockData
,
LocalBlockHash
,
},
};
use
dynamo_llm
::
tokens
::{
BlockHash
,
SequenceHash
};
use
dynamo_runtime
::
DistributedRuntime
;
use
dynamo_runtime
::
component
::
Namespace
;
use
dynamo_runtime
::
prelude
::
DistributedRuntimeProvider
;
use
dynamo_runtime
::
traits
::
events
::
EventPublisher
;
use
kvbm
::
events
::
EventManager
;
use
tokio
::
sync
::
mpsc
;
pub
use
tokio_util
::
sync
::
CancellationToken
;
pub
const
KV_EVENT_SUBJECT
:
&
str
=
"kv_events"
;
#[derive(Debug,
Clone)]
pub
enum
PublisherEvent
{
Store
(
RouterEvent
),
Remove
(
RouterEvent
),
}
// TODO[oandreeva] The potential issue with the start_batching_publisher
// same as with the worker task, it's potentially a slow subscriber.
// Needs to be perf tested and improved.
pub
async
fn
start_batching_publisher
(
component
:
Arc
<
KVBMDynamoRuntimeComponent
>
,
mut
rx
:
mpsc
::
UnboundedReceiver
<
PublisherEvent
>
,
max_batch_size
:
usize
,
deadline
:
Duration
,
)
{
let
mut
buffer
:
VecDeque
<
RouterEvent
>
=
VecDeque
::
new
();
let
mut
interval
=
tokio
::
time
::
interval
(
deadline
);
interval
.set_missed_tick_behavior
(
tokio
::
time
::
MissedTickBehavior
::
Skip
);
loop
{
tokio
::
select!
{
_
=
interval
.tick
()
=>
{
if
!
buffer
.is_empty
()
{
let
events
:
Vec
<
RouterEvent
>
=
buffer
.drain
(
..
)
.collect
();
if
let
Err
(
e
)
=
component
.publish
(
KV_EVENT_SUBJECT
,
&
events
)
.await
{
tracing
::
warn!
(
"Failed to publish events: {:?}"
,
e
);
}
}
}
maybe_evt
=
rx
.recv
()
=>
{
match
maybe_evt
{
Some
(
PublisherEvent
::
Store
(
data
))
=>
{
buffer
.push_back
(
data
);
if
buffer
.len
()
>=
max_batch_size
{
let
events
:
Vec
<
RouterEvent
>
=
buffer
.drain
(
..
)
.collect
();
if
let
Err
(
e
)
=
component
.publish
(
KV_EVENT_SUBJECT
,
&
events
)
.await
{
tracing
::
warn!
(
"Failed to publish events: {:?}"
,
e
);
}
}
}
Some
(
PublisherEvent
::
Remove
(
data
))
=>
{
buffer
.push_back
(
data
);
let
events
:
Vec
<
RouterEvent
>
=
buffer
.drain
(
..
)
.collect
();
if
let
Err
(
e
)
=
component
.publish
(
KV_EVENT_SUBJECT
,
&
events
)
.await
{
tracing
::
warn!
(
"Failed to publish events: {:?}"
,
e
);
}
}
None
=>
{
if
!
buffer
.is_empty
()
{
let
events
:
Vec
<
RouterEvent
>
=
buffer
.drain
(
..
)
.collect
();
if
let
Err
(
e
)
=
component
.publish
(
KV_EVENT_SUBJECT
,
&
events
)
.await
{
tracing
::
warn!
(
"Failed to publish events: {:?}"
,
e
);
}
}
break
;
}
}
}
}
}
}
#[derive(Builder,
Clone)]
#[builder(pattern
=
"owned"
)]
pub
struct
KVBMDynamoRuntimeComponent
{
#[builder(private)]
drt
:
DistributedRuntime
,
#[builder(setter(into))]
name
:
String
,
#[builder(setter(into))]
namespace
:
Namespace
,
/// Buffer State
#[builder(private)]
batch_tx
:
mpsc
::
UnboundedSender
<
PublisherEvent
>
,
}
impl
KVBMDynamoRuntimeComponent
{
pub
fn
new
(
drt
:
DistributedRuntime
,
name
:
String
,
namespace
:
Namespace
,
deadline
:
Duration
,
max_batch_size
:
usize
,
)
->
Arc
<
Self
>
{
let
(
tx
,
rx
)
=
mpsc
::
unbounded_channel
();
let
component
=
Arc
::
new
(
Self
{
drt
,
name
,
namespace
,
batch_tx
:
tx
,
});
let
batching_component
=
Arc
::
clone
(
&
component
);
component
.drt
()
.runtime
()
.secondary
()
.spawn
(
async
move
{
start_batching_publisher
(
batching_component
,
rx
,
max_batch_size
,
deadline
)
.await
;
});
component
}
pub
fn
namespace
(
&
self
)
->
&
Namespace
{
&
self
.namespace
}
pub
fn
name
(
&
self
)
->
&
str
{
&
self
.name
}
#[cfg(test)]
pub
fn
batch_tx
(
&
self
)
->
mpsc
::
UnboundedSender
<
PublisherEvent
>
{
self
.batch_tx
.clone
()
}
}
impl
DistributedRuntimeProvider
for
KVBMDynamoRuntimeComponent
{
fn
drt
(
&
self
)
->
&
DistributedRuntime
{
&
self
.drt
}
}
#[async_trait]
impl
EventPublisher
for
KVBMDynamoRuntimeComponent
{
fn
subject
(
&
self
)
->
String
{
format!
(
"namespace.{}"
,
self
.namespace
.name
())
}
async
fn
publish
(
&
self
,
event_name
:
impl
AsRef
<
str
>
+
Send
+
Sync
,
event
:
&
(
impl
Serialize
+
Send
+
Sync
),
)
->
Result
<
()
>
{
let
bytes
=
serde_json
::
to_vec
(
event
)
?
;
self
.publish_bytes
(
event_name
,
bytes
)
.await
}
async
fn
publish_bytes
(
&
self
,
event_name
:
impl
AsRef
<
str
>
+
Send
+
Sync
,
bytes
:
Vec
<
u8
>
,
)
->
Result
<
()
>
{
let
subject
=
format!
(
"{}.{}"
,
self
.subject
(),
event_name
.as_ref
());
let
Some
(
nats_client
)
=
self
.drt
()
.nats_client
()
else
{
anyhow
::
bail!
(
"KVBMDynamoRuntimeComponent EventPublisher requires NATS"
);
};
nats_client
.client
()
.publish
(
subject
,
bytes
.into
())
.await
?
;
Ok
(())
}
}
/// Translate the Dynamo [`DistributedRuntime`] to the [`kvbm::config::KvManagerRuntimeConfig`]
#[derive(Clone,
Builder,
Dissolve)]
#[builder(pattern
=
"owned"
,
build_fn(private,
name
=
"build_internal"
))]
pub
struct
DynamoKvbmRuntimeConfig
{
pub
runtime
:
DistributedRuntime
,
pub
nixl
:
kvbm
::
config
::
NixlOptions
,
}
impl
DynamoKvbmRuntimeConfig
{
pub
fn
builder
()
->
DynamoKvbmRuntimeConfigBuilder
{
DynamoKvbmRuntimeConfigBuilder
::
default
()
}
}
impl
DynamoKvbmRuntimeConfigBuilder
{
pub
fn
build
(
self
)
->
Result
<
kvbm
::
config
::
KvManagerRuntimeConfig
>
{
let
(
runtime
,
nixl
)
=
self
.build_internal
()
?
.dissolve
();
let
worker_id
=
runtime
.connection_id
();
Ok
(
kvbm
::
config
::
KvManagerRuntimeConfig
::
builder
()
.worker_id
(
worker_id
)
.cancellation_token
(
runtime
.primary_token
()
.child_token
())
.nixl
(
nixl
)
.build
()
?
)
}
}
pub
enum
Event
{
RegisterMultiple
{
blocks
:
Vec
<
(
SequenceHash
,
BlockHash
,
Option
<
SequenceHash
>
)
>
,
worker_identifier
:
u64
,
},
Release
{
sequence_hash
:
SequenceHash
,
worker_identifier
:
u64
,
},
}
/// Implementation of the [`kvbm::events::EventManager`] for the Dynamo Runtime Event Plane + the
/// Dynamo LLM KV router message protocol.
#[derive(Clone)]
pub
struct
DynamoEventManager
{
tx
:
mpsc
::
UnboundedSender
<
Event
>
,
worker_identifier
:
u64
,
}
impl
DynamoEventManager
{
pub
fn
new
(
component
:
Arc
<
KVBMDynamoRuntimeComponent
>
)
->
Self
{
let
(
tx
,
rx
)
=
mpsc
::
unbounded_channel
();
let
worker_id
=
component
.drt
()
.connection_id
();
component
.drt
()
.runtime
()
.secondary
()
.spawn
(
async
move
{
worker_task
(
component
,
rx
)
.await
;
});
Self
{
tx
,
worker_identifier
:
worker_id
,
}
}
pub
fn
publisher
(
self
:
&
Arc
<
Self
>
)
->
Publisher
{
Publisher
::
new
(
self
.clone
())
}
}
// Worker task to receive and process messages
// TODO[oandreeva] The potential issue with the worker task
// being a slow subscriber. Needs to be perf tested and improved.
pub
async
fn
worker_task
(
component
:
Arc
<
KVBMDynamoRuntimeComponent
>
,
mut
rx
:
mpsc
::
UnboundedReceiver
<
Event
>
,
)
{
let
mut
event_id_counter
:
u64
=
0
;
let
component_clone
=
component
.clone
();
loop
{
match
rx
.recv
()
.await
{
Some
(
Event
::
RegisterMultiple
{
blocks
,
worker_identifier
,
})
=>
{
let
parent_hash
=
blocks
.first
()
.and_then
(|(
_
,
_
,
parent
)|
*
parent
);
let
store_data
=
KvCacheStoreData
{
blocks
:
blocks
.iter
()
.map
(|(
sequence_hash
,
block_hash
,
_
parent_sequence_hash
)|
{
KvCacheStoredBlockData
{
block_hash
:
ExternalSequenceBlockHash
(
*
sequence_hash
),
tokens_hash
:
LocalBlockHash
(
*
block_hash
),
}
})
.collect
(),
parent_hash
:
parent_hash
.map
(
ExternalSequenceBlockHash
),
};
let
data
=
KvCacheEventData
::
Stored
(
store_data
);
let
event
=
KvCacheEvent
{
data
,
event_id
:
event_id_counter
,
dp_rank
:
0
,
};
let
router_event
=
RouterEvent
::
new
(
worker_identifier
,
event
);
event_id_counter
+=
1
;
if
let
Err
(
e
)
=
component_clone
.batch_tx
.send
(
PublisherEvent
::
Store
(
router_event
))
{
tracing
::
warn!
(
"Failed to send event to batch channel: {:?}"
,
e
);
}
}
Some
(
Event
::
Release
{
sequence_hash
,
worker_identifier
,
})
=>
{
let
event
=
KvCacheEvent
{
data
:
KvCacheEventData
::
Removed
(
KvCacheRemoveData
{
block_hashes
:
vec!
[
ExternalSequenceBlockHash
(
sequence_hash
)],
}),
event_id
:
event_id_counter
,
dp_rank
:
0
,
};
let
router_event
=
RouterEvent
::
new
(
worker_identifier
,
event
);
event_id_counter
+=
1
;
if
let
Err
(
e
)
=
component_clone
.batch_tx
.send
(
PublisherEvent
::
Remove
(
router_event
))
{
tracing
::
warn!
(
"Failed to send event to batch channel: {:?}"
,
e
);
}
}
None
=>
{
tracing
::
info!
(
"Receiver is closed, stopping KVBM Dynamo Event Manager"
);
break
;
}
}
}
}
impl
EventManager
for
DynamoEventManager
{}
impl
kvbm
::
events
::
EventPublisher
for
DynamoEventManager
{
fn
publish
(
&
self
,
handles
:
Vec
<
Arc
<
RegistrationHandle
>>
)
{
if
!
handles
.is_empty
()
{
let
blocks
=
handles
.iter
()
.map
(|
h
|
(
h
.sequence_hash
(),
h
.block_hash
(),
h
.parent_sequence_hash
()))
.collect
();
let
_
=
self
.tx
.send
(
Event
::
RegisterMultiple
{
blocks
,
worker_identifier
:
self
.worker_identifier
,
});
}
}
}
impl
kvbm
::
events
::
EventReleaseManager
for
DynamoEventManager
{
fn
block_release
(
&
self
,
registration_handle
:
&
RegistrationHandle
)
{
let
_
=
self
.tx
.send
(
Event
::
Release
{
sequence_hash
:
registration_handle
.sequence_hash
(),
worker_identifier
:
self
.worker_identifier
,
});
}
}
}
#[cfg(all(test,
feature
=
"testing-full"
))]
mod
tests
{
#[allow(unused_imports)]
use
super
::
llm_kvbm
::
*
;
use
futures
::
stream
::
StreamExt
;
use
std
::
sync
::
Arc
;
use
tokio
::
time
::
Duration
;
use
dynamo_llm
::
block_manager
as
kvbm
;
use
dynamo_llm
::
tokens
::{
TokenBlockSequence
,
Tokens
};
use
dynamo_runtime
::{
DistributedRuntime
,
Runtime
,
traits
::
events
::{
EventPublisher
,
EventSubscriber
},
};
use
kvbm
::{
KvBlockManagerConfig
,
KvManagerLayoutConfig
,
KvManagerModelConfig
,
NixlOptions
,
ReferenceBlockManager
,
block
::
BlockState
,
block
::
GlobalRegistry
,
block
::
registry
::
BlockRegistry
,
block
::
state
::
CompleteState
,
events
::
EventManager
,
storage
::{
DeviceAllocator
,
DiskAllocator
,
PinnedAllocator
},
};
use
dynamo_llm
::
kv_router
::{
indexer
::
RouterEvent
,
protocols
::{
ExternalSequenceBlockHash
,
KvCacheEvent
,
KvCacheEventData
,
KvCacheRemoveData
,
KvCacheStoreData
,
KvCacheStoredBlockData
,
LocalBlockHash
,
},
};
fn
create_sequence
()
->
TokenBlockSequence
{
let
tokens
=
Tokens
::
from
(
vec!
[
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
,
9
,
10
]);
// NOTE: 1337 was the original seed, so we are temporarily using that here to prove the logic has not changed
let
sequence
=
TokenBlockSequence
::
new
(
tokens
,
4
,
Some
(
1337_u64
));
assert_eq!
(
sequence
.blocks
()
.len
(),
2
);
assert_eq!
(
sequence
.current_block
()
.len
(),
2
);
assert_eq!
(
sequence
.blocks
()[
0
]
.tokens
(),
&
vec!
[
1
,
2
,
3
,
4
]);
assert_eq!
(
sequence
.blocks
()[
0
]
.sequence_hash
(),
14643705804678351452
);
assert_eq!
(
sequence
.blocks
()[
1
]
.tokens
(),
&
vec!
[
5
,
6
,
7
,
8
]);
assert_eq!
(
sequence
.blocks
()[
1
]
.sequence_hash
(),
4945711292740353085
);
assert_eq!
(
sequence
.current_block
()
.tokens
(),
&
vec!
[
9
,
10
]);
sequence
}
async
fn
create_dynamo_block_manager
()
->
ReferenceBlockManager
{
let
rt
=
Runtime
::
from_current
()
.unwrap
();
let
dtr
=
DistributedRuntime
::
from_settings
(
rt
.clone
())
.await
.unwrap
();
let
nixl
=
NixlOptions
::
Enabled
;
let
ns
=
dtr
.namespace
(
"test"
.to_string
())
.unwrap
();
let
kvbm_component
=
KVBMDynamoRuntimeComponent
::
new
(
dtr
.clone
(),
"kvbm_component"
.to_string
(),
ns
.clone
(),
Duration
::
from_secs
(
10
),
1
,
/*max_batch_size*/
);
let
manager
=
Arc
::
new
(
DynamoEventManager
::
new
(
kvbm_component
.clone
()));
let
config
=
KvBlockManagerConfig
::
builder
()
.runtime
(
DynamoKvbmRuntimeConfig
::
builder
()
.runtime
(
dtr
.clone
())
.nixl
(
nixl
)
.build
()
.unwrap
(),
)
.model
(
KvManagerModelConfig
::
builder
()
.num_layers
(
3
)
.outer_dim
(
2
)
.page_size
(
4
)
.inner_dim
(
16
)
.build
()
.unwrap
(),
)
.disk_layout
(
KvManagerLayoutConfig
::
builder
()
.num_blocks
(
16
)
.allocator
(
DiskAllocator
)
.build
()
.unwrap
(),
)
.host_layout
(
KvManagerLayoutConfig
::
builder
()
.num_blocks
(
16
)
.allocator
(
PinnedAllocator
::
default
())
.build
()
.unwrap
(),
)
.device_layout
(
KvManagerLayoutConfig
::
builder
()
.num_blocks
(
8
)
.allocator
(
DeviceAllocator
::
new
(
0
)
.unwrap
())
.build
()
.unwrap
(),
)
.event_manager
(
Some
(
manager
))
.build
()
.unwrap
();
ReferenceBlockManager
::
new
(
config
)
.await
.unwrap
()
}
async
fn
setup_kvbm_component
(
deadline
:
Duration
,
max_batch_size
:
usize
,
)
->
(
Arc
<
KVBMDynamoRuntimeComponent
>
,
Runtime
)
{
let
rt
=
Runtime
::
from_current
()
.unwrap
();
let
dtr
=
DistributedRuntime
::
from_settings
(
rt
.clone
())
.await
.unwrap
();
// Generate a random namespace name
let
namespace_name
=
format!
(
"test_namespace_{}"
,
rand
::
random
::
<
u32
>
());
let
ns
=
dtr
.namespace
(
namespace_name
)
.unwrap
();
// Create component with small batch size and short deadline for testing
let
kvbm_component
=
KVBMDynamoRuntimeComponent
::
new
(
dtr
.clone
(),
"kvbm_component"
.to_string
(),
ns
.clone
(),
deadline
,
max_batch_size
,
);
(
kvbm_component
,
rt
)
}
// There are issues with running the following 2 tests concurrently with others
// With --test-threads=1 flag, they pass.
#[tokio::test]
#[ignore]
async
fn
test_dynamo_block_manager_async
()
{
dynamo_runtime
::
logging
::
init
();
let
_
block_manager
=
create_dynamo_block_manager
()
.await
;
}
#[test]
#[ignore]
fn
test_create_dynamo_block_manager
()
{
// Create a runtime for the test
let
rt
=
tokio
::
runtime
::
Builder
::
new_current_thread
()
.enable_all
()
.build
()
.unwrap
();
// Run the async function in the runtime
let
_
block_manager
=
rt
.block_on
(
async
{
create_dynamo_block_manager
()
.await
});
}
#[tokio::test]
async
fn
test_kvbm_component_publish
()
{
dynamo_runtime
::
logging
::
init
();
let
rt
=
Runtime
::
from_current
()
.unwrap
();
let
dtr
=
DistributedRuntime
::
from_settings
(
rt
.clone
())
.await
.unwrap
();
let
namespace_name
=
"test_kvbm_component"
.to_string
();
let
ns
=
dtr
.namespace
(
namespace_name
)
.unwrap
();
let
kvbm_component
=
KVBMDynamoRuntimeComponent
::
new
(
dtr
.clone
(),
"kvbm_component"
.to_string
(),
ns
.clone
(),
Duration
::
from_secs
(
10
),
1
,
/*max_batch_size*/
);
// Create a subscriber
let
mut
subscriber
=
ns
.subscribe
(
"testing_channel"
.to_string
())
.await
.unwrap
();
if
let
Err
(
e
)
=
kvbm_component
.publish
(
"testing_channel"
.to_string
(),
&
"test_message"
.to_string
())
.await
{
tracing
::
warn!
(
"Failed to publish registration event: {:?}"
,
e
);
}
// Receive the message
if
let
Some
(
msg
)
=
subscriber
.next
()
.await
{
let
received
=
String
::
from_utf8
(
msg
.payload
.to_vec
())
.unwrap
();
assert_eq!
(
received
,
"
\"
test_message
\"
"
);
}
rt
.shutdown
();
}
#[tokio::test]
async
fn
test_dynamo_component_batching_publisher_max_batch_size
()
{
let
(
kvbm_component
,
rt
)
=
setup_kvbm_component
(
Duration
::
from_millis
(
100
),
2
)
.await
;
// Create a subscriber
let
mut
subscriber
=
kvbm_component
.namespace
()
.subscribe
(
KV_EVENT_SUBJECT
.to_string
())
.await
.unwrap
();
let
tx
=
kvbm_component
.batch_tx
();
// Send two store events - should trigger batch due to max_batch_size
let
event1
=
RouterEvent
::
new
(
1
,
KvCacheEvent
{
event_id
:
1
,
data
:
KvCacheEventData
::
Stored
(
KvCacheStoreData
{
blocks
:
vec!
[
KvCacheStoredBlockData
{
block_hash
:
ExternalSequenceBlockHash
(
1
),
tokens_hash
:
LocalBlockHash
(
1
),
}],
parent_hash
:
None
,
}),
dp_rank
:
0
,
},
);
let
event2
=
RouterEvent
::
new
(
2
,
KvCacheEvent
{
event_id
:
2
,
data
:
KvCacheEventData
::
Stored
(
KvCacheStoreData
{
blocks
:
vec!
[
KvCacheStoredBlockData
{
block_hash
:
ExternalSequenceBlockHash
(
2
),
tokens_hash
:
LocalBlockHash
(
2
),
}],
parent_hash
:
None
,
}),
dp_rank
:
0
,
},
);
tx
.send
(
PublisherEvent
::
Store
(
event1
))
.unwrap
();
tx
.send
(
PublisherEvent
::
Store
(
event2
))
.unwrap
();
// Should receive one batch with both events
let
msg
=
subscriber
.next
()
.await
.unwrap
();
let
received
:
Vec
<
RouterEvent
>
=
serde_json
::
from_slice
(
&
msg
.payload
)
.unwrap
();
assert_eq!
(
received
.len
(),
2
,
"Should receive both events in one batch"
);
drop
(
tx
);
// Close the channel
// No more events should be received
let
timeout
=
tokio
::
time
::
timeout
(
Duration
::
from_millis
(
200
),
subscriber
.next
())
.await
;
assert
!
(
timeout
.is_err
(),
"Should not receive any more events after channel closure"
);
rt
.shutdown
();
}
#[tokio::test]
async
fn
test_dynamo_component_batching_publisher_deadline
()
{
let
(
kvbm_component
,
rt
)
=
setup_kvbm_component
(
Duration
::
from_millis
(
100
),
2
)
.await
;
let
mut
subscriber
=
kvbm_component
.namespace
()
.subscribe
(
KV_EVENT_SUBJECT
.to_string
())
.await
.unwrap
();
let
tx
=
kvbm_component
.batch_tx
();
let
event3
=
RouterEvent
::
new
(
3
,
KvCacheEvent
{
event_id
:
3
,
data
:
KvCacheEventData
::
Stored
(
KvCacheStoreData
{
blocks
:
vec!
[
KvCacheStoredBlockData
{
block_hash
:
ExternalSequenceBlockHash
(
3
),
tokens_hash
:
LocalBlockHash
(
3
),
}],
parent_hash
:
None
,
}),
dp_rank
:
0
,
},
);
tx
.send
(
PublisherEvent
::
Store
(
event3
))
.unwrap
();
// Wait for deadline to trigger
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
150
))
.await
;
// Should receive the event after deadline
let
msg
=
subscriber
.next
()
.await
.unwrap
();
let
received
:
Vec
<
RouterEvent
>
=
serde_json
::
from_slice
(
&
msg
.payload
)
.unwrap
();
assert_eq!
(
received
.len
(),
1
,
"Should receive single event after deadline"
);
drop
(
tx
);
// No more events should be received
let
timeout
=
tokio
::
time
::
timeout
(
Duration
::
from_millis
(
200
),
subscriber
.next
())
.await
;
assert
!
(
timeout
.is_err
(),
"Should not receive any more events after channel closure"
);
rt
.shutdown
();
}
#[tokio::test]
async
fn
test_dynamo_component_batching_publisher_remove_event
()
{
let
(
kvbm_component
,
rt
)
=
setup_kvbm_component
(
Duration
::
from_millis
(
100
),
2
)
.await
;
// Create a subscriber
let
mut
subscriber
=
kvbm_component
.namespace
()
.subscribe
(
KV_EVENT_SUBJECT
.to_string
())
.await
.unwrap
();
let
tx
=
kvbm_component
.batch_tx
();
// Test 3: Immediate flush for Remove event
let
event4
=
RouterEvent
::
new
(
4
,
KvCacheEvent
{
event_id
:
4
,
data
:
KvCacheEventData
::
Removed
(
KvCacheRemoveData
{
block_hashes
:
vec!
[
ExternalSequenceBlockHash
(
4
)],
}),
dp_rank
:
0
,
},
);
tx
.send
(
PublisherEvent
::
Remove
(
event4
))
.unwrap
();
// Should receive remove event immediately
let
msg
=
subscriber
.next
()
.await
.unwrap
();
let
received
:
Vec
<
RouterEvent
>
=
serde_json
::
from_slice
(
&
msg
.payload
)
.unwrap
();
assert_eq!
(
received
.len
(),
1
,
"Should receive remove event immediately"
);
drop
(
tx
);
// Close the channel
// No more events should be received
let
timeout
=
tokio
::
time
::
timeout
(
Duration
::
from_millis
(
200
),
subscriber
.next
())
.await
;
assert
!
(
timeout
.is_err
(),
"Should not receive any more events after channel closure"
);
rt
.shutdown
();
}
#[tokio::test]
async
fn
test_dynamo_event_manager
()
{
dynamo_runtime
::
logging
::
init
();
let
sequence
=
create_sequence
();
let
(
kvbm_component
,
rt
)
=
setup_kvbm_component
(
Duration
::
from_secs
(
10
),
1
)
.await
;
let
mut
subscriber
=
kvbm_component
.namespace
()
.subscribe
(
KV_EVENT_SUBJECT
.to_string
())
.await
.unwrap
();
let
manager
=
Arc
::
new
(
DynamoEventManager
::
new
(
kvbm_component
));
let
event_manager
:
Arc
<
dyn
EventManager
>
=
manager
;
let
global_registry
=
GlobalRegistry
::
default
();
let
mut
block_registry
=
BlockRegistry
::
new
(
event_manager
,
global_registry
.clone
(),
rt
.primary
()
.clone
());
// Register a block
// Create CompleteState from TokenBlock
let
complete_state
=
CompleteState
::
new
(
sequence
.blocks
()[
0
]
.clone
());
let
mut
block_state
=
BlockState
::
Complete
(
complete_state
);
let
publish_handle
=
block_registry
.register_block
(
&
mut
block_state
)
.unwrap
();
// No event should have been triggered yet
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_millis
(
100
),
subscriber
.next
())
.await
;
assert
!
(
timeout
.is_err
(),
"Unexpected event triggered before dropping publish_handles"
);
// Dropping the publish handle should trigger a `Store` event
drop
(
publish_handle
);
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_secs
(
5
),
subscriber
.next
())
.await
;
match
timeout
{
Ok
(
Some
(
msg
))
=>
{
let
_
received
=
String
::
from_utf8
(
msg
.payload
.to_vec
())
.expect
(
"Failed to decode message payload"
);
}
Ok
(
None
)
=>
{
panic!
(
"Subscriber channel closed without receiving event"
);
}
Err
(
_
)
=>
{
panic!
(
"Timeout waiting for remove event"
);
}
}
// Dropping the block state should trigger a `Remove` event
drop
(
block_state
);
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_secs
(
5
),
subscriber
.next
())
.await
;
match
timeout
{
Ok
(
Some
(
msg
))
=>
{
let
_
received
=
String
::
from_utf8
(
msg
.payload
.to_vec
())
.expect
(
"Failed to decode message payload"
);
}
Ok
(
None
)
=>
{
panic!
(
"Subscriber channel closed without receiving event"
);
}
Err
(
_
)
=>
{
panic!
(
"Timeout waiting for remove event"
);
}
}
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_millis
(
100
),
subscriber
.next
())
.await
;
assert
!
(
timeout
.is_err
(),
"Unexpected event received after the expected events"
);
rt
.shutdown
();
}
#[tokio::test]
async
fn
test_dynamo_event_manager_multiple_handles
()
{
dynamo_runtime
::
logging
::
init
();
let
sequence
=
create_sequence
();
let
(
kvbm_component
,
rt
)
=
setup_kvbm_component
(
Duration
::
from_secs
(
10
),
1
)
.await
;
let
mut
subscriber
=
kvbm_component
.namespace
()
.subscribe
(
KV_EVENT_SUBJECT
.to_string
())
.await
.unwrap
();
let
manager
=
Arc
::
new
(
DynamoEventManager
::
new
(
kvbm_component
));
let
mut
publisher
=
manager
.publisher
();
let
event_manager
:
Arc
<
dyn
EventManager
>
=
manager
;
let
global_registry
=
GlobalRegistry
::
default
();
let
mut
block_registry
=
BlockRegistry
::
new
(
event_manager
,
global_registry
.clone
(),
rt
.primary
()
.clone
());
// Register first block
let
complete_state_1
=
CompleteState
::
new
(
sequence
.blocks
()[
0
]
.clone
());
let
mut
block_state_1
=
BlockState
::
Complete
(
complete_state_1
);
let
publish_handle_1
=
block_registry
.register_block
(
&
mut
block_state_1
)
.unwrap
();
// Register second block
let
complete_state_2
=
CompleteState
::
new
(
sequence
.blocks
()[
1
]
.clone
());
let
mut
block_state_2
=
BlockState
::
Complete
(
complete_state_2
);
let
publish_handle_2
=
block_registry
.register_block
(
&
mut
block_state_2
)
.unwrap
();
drop
(
block_state_2
);
drop
(
block_state_1
);
// No event should have been triggered yet
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_millis
(
100
),
subscriber
.next
())
.await
;
assert
!
(
timeout
.is_err
(),
"Unexpected event triggered before dropping publish_handles"
);
publisher
.take_handle
(
publish_handle_1
.unwrap
());
publisher
.take_handle
(
publish_handle_2
.unwrap
());
publisher
.publish
();
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_secs
(
5
),
subscriber
.next
())
.await
;
match
timeout
{
Ok
(
Some
(
msg
))
=>
{
let
_
received
=
String
::
from_utf8
(
msg
.payload
.to_vec
())
.expect
(
"Failed to decode message payload"
);
}
Ok
(
None
)
=>
{
panic!
(
"Subscriber channel closed without receiving event"
);
}
Err
(
_
)
=>
{
panic!
(
"Timeout waiting for remove event"
);
}
}
drop
(
publisher
);
let
expected_events
=
2
;
// 2 events per handle per remove
let
mut
event_count
=
0
;
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_secs
(
5
),
async
{
while
let
Some
(
msg
)
=
subscriber
.next
()
.await
{
let
_
received
=
String
::
from_utf8
(
msg
.payload
.to_vec
())
.expect
(
"Failed to decode message payload"
);
event_count
+=
1
;
if
event_count
==
expected_events
{
break
;
}
}
})
.await
;
if
timeout
.is_err
()
{
panic!
(
"Test timed out while waiting for events"
);
}
assert_eq!
(
event_count
,
expected_events
,
"Expected {} events to be triggered"
,
expected_events
);
let
timeout
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_millis
(
1000
),
subscriber
.next
())
.await
;
assert
!
(
timeout
.is_err
(),
"Unexpected event received after the expected events"
);
rt
.shutdown
();
}
}
lib/runtime/examples/hello_world/src/bin/server.rs
View file @
f05f7629
...
...
@@ -55,10 +55,7 @@ async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> {
// attach an ingress to an engine
let
ingress
=
Ingress
::
for_engine
(
RequestHandler
::
new
())
?
;
// // make the ingress discoverable via a component service
// // we must first create a service, then we can attach one more more endpoints
let
mut
component
=
runtime
.namespace
(
DEFAULT_NAMESPACE
)
?
.component
(
"backend"
)
?
;
component
.add_stats_service
()
.await
?
;
let
component
=
runtime
.namespace
(
DEFAULT_NAMESPACE
)
?
.component
(
"backend"
)
?
;
component
.endpoint
(
"generate"
)
.endpoint_builder
()
...
...
lib/runtime/examples/service_metrics/src/bin/service_client.rs
View file @
f05f7629
...
...
@@ -6,7 +6,7 @@ use service_metrics::DEFAULT_NAMESPACE;
use
dynamo_runtime
::{
DistributedRuntime
,
Runtime
,
Worker
,
logging
,
pipeline
::
PushRouter
,
protocols
::
annotated
::
Annotated
,
utils
::
Duration
,
protocols
::
annotated
::
Annotated
,
};
fn
main
()
->
anyhow
::
Result
<
()
>
{
...
...
@@ -33,13 +33,6 @@ async fn app(runtime: Runtime) -> anyhow::Result<()> {
println!
(
"{:?}"
,
resp
);
}
// This is just an illustration to invoke the server's stats_registry(<action>), where
// the action currently increments the `service_requests_total` metric. You can validate
// the result by running `curl http://localhost:8000/metrics`
let
service_set
=
component
.scrape_stats
(
Duration
::
from_millis
(
100
))
.await
?
;
println!
(
"{:?}"
,
service_set
);
runtime
.shutdown
();
Ok
(())
}
lib/runtime/examples/service_metrics/src/bin/service_server.rs
View file @
f05f7629
...
...
@@ -59,8 +59,7 @@ async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> {
// make the ingress discoverable via a component service
// we must first create a service, then we can attach one more more endpoints
let
mut
component
=
runtime
.namespace
(
DEFAULT_NAMESPACE
)
?
.component
(
"backend"
)
?
;
component
.add_stats_service
()
.await
?
;
let
component
=
runtime
.namespace
(
DEFAULT_NAMESPACE
)
?
.component
(
"backend"
)
?
;
component
.endpoint
(
"generate"
)
.endpoint_builder
()
...
...
lib/runtime/examples/system_metrics/src/bin/system_client.rs
View file @
f05f7629
...
...
@@ -6,7 +6,7 @@ use system_metrics::{DEFAULT_COMPONENT, DEFAULT_ENDPOINT, DEFAULT_NAMESPACE};
use
dynamo_runtime
::{
DistributedRuntime
,
Runtime
,
Worker
,
logging
,
pipeline
::
PushRouter
,
protocols
::
annotated
::
Annotated
,
utils
::
Duration
,
protocols
::
annotated
::
Annotated
,
};
fn
main
()
->
anyhow
::
Result
<
()
>
{
...
...
@@ -33,9 +33,6 @@ async fn app(runtime: Runtime) -> anyhow::Result<()> {
println!
(
"{:?}"
,
resp
);
}
let
service_set
=
component
.scrape_stats
(
Duration
::
from_millis
(
100
))
.await
?
;
println!
(
"{:?}"
,
service_set
);
runtime
.shutdown
();
Ok
(())
...
...
lib/runtime/examples/system_metrics/src/lib.rs
View file @
f05f7629
...
...
@@ -91,10 +91,9 @@ impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for Reques
pub
async
fn
backend
(
drt
:
DistributedRuntime
,
endpoint_name
:
Option
<&
str
>
)
->
anyhow
::
Result
<
()
>
{
let
endpoint_name
=
endpoint_name
.unwrap_or
(
DEFAULT_ENDPOINT
);
let
mut
component
=
drt
let
component
=
drt
.namespace
(
DEFAULT_NAMESPACE
)
?
.component
(
DEFAULT_COMPONENT
)
?
;
component
.add_stats_service
()
.await
?
;
let
endpoint
=
component
.endpoint
(
endpoint_name
);
// Create custom metrics for system stats
...
...
lib/runtime/src/component.rs
View file @
f05f7629
...
...
@@ -35,6 +35,7 @@ use crate::{
config
::
HealthStatus
,
distributed
::
RequestPlaneMode
,
metrics
::{
MetricsHierarchy
,
MetricsRegistry
,
prometheus_names
},
service
::
ServiceClient
,
service
::
ServiceSet
,
transports
::
etcd
::{
ETCD_ROOT_PATH
,
EtcdPath
},
};
...
...
@@ -150,13 +151,12 @@ impl PartialOrd for Instance {
/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
#[derive(Educe,
Builder,
Clone,
Validate)]
#[educe(Debug)]
#[builder(pattern
=
"owned"
)]
#[builder(pattern
=
"owned"
,
build_fn(private,
name
=
"build_internal"
)
)]
pub
struct
Component
{
#[builder(private)]
#[educe(Debug(ignore))]
drt
:
Arc
<
DistributedRuntime
>
,
// todo - restrict the namespace to a-z0-9-_A-Z
/// Name of the component
#[builder(setter(into))]
#[validate(custom(function
=
"validate_allowed_chars"
))]
...
...
@@ -299,10 +299,14 @@ impl Component {
/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
/// embedded in data field of ServiceInfo.
pub
async
fn
scrape_stats
(
&
self
,
timeout
:
Duration
)
->
anyhow
::
Result
<
ServiceSet
>
{
async
fn
scrape_stats
(
&
self
,
timeout
:
Duration
)
->
anyhow
::
Result
<
ServiceSet
>
{
// Debug: scraping stats for component
let
service_name
=
self
.service_name
();
let
Some
(
service_client
)
=
self
.drt
()
.service_client
()
else
{
let
Some
(
service_client
)
=
self
.drt
()
.nats_client
()
.map
(|
nc
|
ServiceClient
::
new
(
nc
.clone
()))
else
{
anyhow
::
bail!
(
"ServiceSet is gathered via NATS, do not call this in non-NATS setups."
);
};
service_client
...
...
@@ -317,7 +321,7 @@ impl Component {
/// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
/// which should be near or smaller than typical Prometheus scraping intervals to ensure
/// metrics are fresh when Prometheus collects them.
pub
fn
start_scraping_nats_service_component_metrics
(
&
self
)
->
anyhow
::
Result
<
()
>
{
fn
start_scraping_nats_service_component_metrics
(
&
self
)
->
anyhow
::
Result
<
()
>
{
const
MAX_WAIT_MS
:
std
::
time
::
Duration
=
std
::
time
::
Duration
::
from_millis
(
9800
);
// Should be <= Prometheus scrape interval
// If there is another component with the same service name, this will fail.
...
...
@@ -364,18 +368,8 @@ impl Component {
Ok
(())
}
/// TODO
///
/// This method will scrape the stats for all available services
/// Returns a stream of `ServiceInfo` objects.
/// This should be consumed by a `[tokio::time::timeout_at`] because each services
/// will only respond once, but there is no way to know when all services have responded.
pub
async
fn
stats_stream
(
&
self
)
->
anyhow
::
Result
<
()
>
{
unimplemented!
(
"collect_stats"
)
}
// Gather NATS metrics
pub
async
fn
add_stats_service
(
&
mut
self
)
->
anyhow
::
Result
<
()
>
{
async
fn
add_stats_service
(
&
mut
self
)
->
anyhow
::
Result
<
()
>
{
let
service_name
=
self
.service_name
();
// Pre-check to save cost of creating the service, but don't hold the lock
...
...
@@ -433,6 +427,21 @@ impl ComponentBuilder {
pub
fn
from_runtime
(
drt
:
Arc
<
DistributedRuntime
>
)
->
Self
{
Self
::
default
()
.drt
(
drt
)
}
pub
fn
build
(
self
)
->
Result
<
Component
,
anyhow
::
Error
>
{
let
component
=
self
.build_internal
()
?
;
// If this component is using NATS, gather it's metrics
if
component
.drt
()
.request_plane
()
.is_nats
()
{
let
mut
c
=
component
.clone
();
// Start in the background to isolate the async, and because we don't need it yet
component
.drt
()
.runtime
()
.secondary
()
.spawn
(
async
move
{
if
let
Err
(
err
)
=
c
.add_stats_service
()
.await
{
tracing
::
error!
(
error
=
%
err
,
component
=
c
.service_name
(),
"Failed starting stats service"
);
}
});
}
Ok
(
component
)
}
}
#[derive(Debug,
Clone)]
...
...
@@ -652,10 +661,10 @@ impl Namespace {
/// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
pub
fn
component
(
&
self
,
name
:
impl
Into
<
String
>
)
->
anyhow
::
Result
<
Component
>
{
Ok
(
ComponentBuilder
::
from_runtime
(
self
.runtime
.clone
())
ComponentBuilder
::
from_runtime
(
self
.runtime
.clone
())
.name
(
name
)
.namespace
(
self
.clone
())
.build
()
?
)
.build
()
}
/// Create a [`Namespace`] in the parent namespace
...
...
@@ -690,106 +699,3 @@ fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
Err
(
ValidationError
::
new
(
"invalid_characters"
))
}
}
// TODO - enable restrictions to the character sets allowed for namespaces,
// components, and endpoints.
//
// Put Validate traits on the struct and use the `validate_allowed_chars` method
// to validate the fields.
// #[cfg(test)]
// mod tests {
// use super::*;
// use validator::Validate;
// #[test]
// fn test_valid_names() {
// // Valid strings
// let valid_inputs = vec![
// "abc", // Lowercase letters
// "abc123", // Letters and numbers
// "a-b-c", // Letters with hyphens
// "a_b_c", // Letters with underscores
// "a-b_c-123", // Mixed valid characters
// "a", // Single character
// "a_b", // Short valid pattern
// "123456", // Only numbers
// "a---b_c123", // Repeated hyphens/underscores
// ];
// for input in valid_inputs {
// let result = validate_allowed_chars(input);
// assert!(result.is_ok(), "Expected '{}' to be valid", input);
// }
// }
// #[test]
// fn test_invalid_names() {
// // Invalid strings
// let invalid_inputs = vec![
// "abc!", // Invalid character `!`
// "abc@", // Invalid character `@`
// "123$", // Invalid character `$`
// "foo.bar", // Invalid character `.`
// "foo/bar", // Invalid character `/`
// "foo\\bar", // Invalid character `\`
// "abc#", // Invalid character `#`
// "abc def", // Spaces are not allowed
// "foo,", // Invalid character `,`
// "", // Empty string
// ];
// for input in invalid_inputs {
// let result = validate_allowed_chars(input);
// assert!(result.is_err(), "Expected '{}' to be invalid", input);
// }
// }
// // #[test]
// // fn test_struct_validation_valid() {
// // // Struct with valid data
// // let valid_data = InputData {
// // name: "valid-name_123".to_string(),
// // };
// // assert!(valid_data.validate().is_ok());
// // }
// // #[test]
// // fn test_struct_validation_invalid() {
// // // Struct with invalid data
// // let invalid_data = InputData {
// // name: "invalid!name".to_string(),
// // };
// // let result = invalid_data.validate();
// // assert!(result.is_err());
// // if let Err(errors) = result {
// // let error_map = errors.field_errors();
// // assert!(error_map.contains_key("name"));
// // let name_errors = &error_map["name"];
// // assert_eq!(name_errors[0].code, "invalid_characters");
// // }
// // }
// #[test]
// fn test_edge_cases() {
// // Edge cases
// let edge_inputs = vec![
// ("-", true), // Single hyphen
// ("_", true), // Single underscore
// ("a-", true), // Letter with hyphen
// ("-", false), // Repeated hyphens
// ("-a", false), // Hyphen at the beginning
// ("a-", false), // Hyphen at the end
// ];
// for (input, expected_validity) in edge_inputs {
// let result = validate_allowed_chars(input);
// if expected_validity {
// assert!(result.is_ok(), "Expected '{}' to be valid", input);
// } else {
// assert!(result.is_err(), "Expected '{}' to be invalid", input);
// }
// }
// }
// }
lib/runtime/src/component/endpoint.rs
View file @
f05f7629
...
...
@@ -64,7 +64,7 @@ impl EndpointConfigBuilder {
pub
async
fn
start
(
self
)
->
Result
<
()
>
{
let
(
mut
endpoint
,
endpoint
,
handler
,
stats_handler
,
metrics_labels
,
...
...
@@ -86,19 +86,6 @@ impl EndpointConfigBuilder {
// Add metrics to the handler. The endpoint provides additional information to the handler.
handler
.add_metrics
(
&
endpoint
,
metrics_labels
.as_deref
())
?
;
// Determine request plane mode
let
request_plane_mode
=
endpoint
.drt
()
.request_plane
();
if
request_plane_mode
.is_nats
()
{
// We only need the service if we want NATS metrics.
// TODO: This is called for every endpoint of a component. Ideally we only call it once
// on the component.
endpoint
.component
.add_stats_service
()
.await
?
;
}
tracing
::
info!
(
"Endpoint starting with request plane mode: {:?}"
,
request_plane_mode
);
// Insert the stats handler. depends on NATS.
if
let
Some
(
stats_handler
)
=
stats_handler
{
let
registry
=
endpoint
.drt
()
.component_registry
()
.inner
.lock
()
.await
;
...
...
@@ -123,6 +110,9 @@ impl EndpointConfigBuilder {
let
system_health
=
endpoint
.drt
()
.system_health
();
let
subject
=
endpoint
.subject_to
(
connection_id
);
let
request_plane_mode
=
endpoint
.drt
()
.request_plane
();
tracing
::
info!
(
"Endpoint starting with request plane mode: {request_plane_mode}"
,);
// Register health check target in SystemHealth if provided
if
let
Some
(
health_check_payload
)
=
&
health_check_payload
{
// Build transport based on request plane mode
...
...
lib/runtime/src/distributed.rs
View file @
f05f7629
...
...
@@ -13,7 +13,6 @@ use crate::{
discovery
::
Discovery
,
metrics
::
PrometheusUpdateCallback
,
metrics
::{
MetricsHierarchy
,
MetricsRegistry
},
service
::
ServiceClient
,
transports
::{
etcd
,
nats
,
tcp
},
};
use
crate
::{
discovery
,
system_status_server
,
transports
};
...
...
@@ -136,8 +135,6 @@ impl DistributedRuntime {
live_endpoint_path
,
)));
let
nats_client_for_metrics
=
nats_client
.clone
();
// Initialize discovery client based on backend configuration
let
discovery_backend
=
std
::
env
::
var
(
"DYN_DISCOVERY_BACKEND"
)
.unwrap_or_else
(|
_
|
"kv_store"
.to_string
());
...
...
@@ -171,6 +168,8 @@ impl DistributedRuntime {
}
};
let
nats_client_for_metrics
=
nats_client
.clone
();
let
distributed_runtime
=
Self
{
runtime
,
store
,
...
...
@@ -325,10 +324,6 @@ impl DistributedRuntime {
self
.discovery_client
.clone
()
}
pub
(
crate
)
fn
service_client
(
&
self
)
->
Option
<
ServiceClient
>
{
self
.nats_client
()
.map
(|
nc
|
ServiceClient
::
new
(
nc
.clone
()))
}
pub
async
fn
tcp_server
(
&
self
)
->
Result
<
Arc
<
tcp
::
server
::
TcpStreamServer
>>
{
Ok
(
self
.tcp_server
...
...
@@ -380,35 +375,7 @@ impl DistributedRuntime {
manager
.server
()
.await
}
/// DEPRECATED: Use network_manager().server() instead
#[deprecated(note
=
"Use request_plane_server() or network_manager().server() instead"
)]
pub
async
fn
http_server
(
&
self
,
)
->
Result
<
Arc
<
crate
::
pipeline
::
network
::
ingress
::
http_endpoint
::
SharedHttpServer
>>
{
// For backward compatibility, try to downcast
let
_
server
=
self
.request_plane_server
()
.await
?
;
// This will only work if we're actually in HTTP mode
// For now, just return an error suggesting the new API
anyhow
::
bail!
(
"http_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
)
}
/// DEPRECATED: Use network_manager().server() instead
#[deprecated(note
=
"Use request_plane_server() or network_manager().server() instead"
)]
pub
async
fn
shared_tcp_server
(
&
self
,
)
->
Result
<
Arc
<
crate
::
pipeline
::
network
::
ingress
::
shared_tcp_endpoint
::
SharedTcpServer
>>
{
// For backward compatibility, try to downcast
let
_
server
=
self
.request_plane_server
()
.await
?
;
// This will only work if we're actually in TCP mode
// For now, just return an error suggesting the new API
anyhow
::
bail!
(
"shared_tcp_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
)
}
pub
fn
nats_client
(
&
self
)
->
Option
<&
nats
::
Client
>
{
pub
(
crate
)
fn
nats_client
(
&
self
)
->
Option
<&
nats
::
Client
>
{
self
.nats_client
.as_ref
()
}
...
...
lib/runtime/src/metrics.rs
View file @
f05f7629
...
...
@@ -1553,10 +1553,7 @@ mod test_metricsregistry_nats {
// Create a namespace and component from the DRT
let
namespace
=
drt
.namespace
(
"ns789"
)
.unwrap
();
let
mut
component
=
namespace
.component
(
"comp789"
)
.unwrap
();
// Create a service to trigger metrics callback registration
component
.add_stats_service
()
.await
.unwrap
();
let
component
=
namespace
.component
(
"comp789"
)
.unwrap
();
// Get component output which should include NATS client metrics
// Additional checks for NATS client metrics (without checking specific values)
...
...
@@ -1662,11 +1659,10 @@ mod test_metricsregistry_nats {
let
runtime
=
Runtime
::
from_current
()
?
;
let
drt
=
DistributedRuntime
::
from_settings
(
runtime
.clone
())
.await
?
;
let
namespace
=
drt
.namespace
(
"ns123"
)
.unwrap
();
let
mut
component
=
namespace
.component
(
"comp123"
)
.unwrap
();
let
component
=
namespace
.component
(
"comp123"
)
.unwrap
();
let
ingress
=
Ingress
::
for_engine
(
MessageHandler
::
new
())
.unwrap
();
let
_
backend_handle
=
tokio
::
spawn
(
async
move
{
component
.add_stats_service
()
.await
.unwrap
();
let
endpoint
=
component
.endpoint
(
"echo"
)
.endpoint_builder
()
...
...
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment