Commit 1ce7ba03 authored by Ryan McCormick's avatar Ryan McCormick Committed by GitHub
Browse files

feat: Enhance mock worker with mock KvHitRate events (#50)

parent 9f53922a
......@@ -2199,6 +2199,7 @@ dependencies = [
name = "metrics"
version = "0.1.0"
dependencies = [
"async-nats",
"axum 0.6.20",
"clap",
"dynemo-llm",
......
......@@ -28,6 +28,7 @@ dynemo-llm = { path = "../../lib/llm" }
# workspace - todo
# crates.io
async-nats = { version = "0.38", features = ["service"] }
clap = { version = "4.5", features = ["derive", "env"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
......
......@@ -2,11 +2,14 @@
## Quickstart
To start `metrics`, simply point it at the namespace/component/endpoint trio that
you're interested in observing metrics from. This will scrape statistics from
the services associated with that endpoint, do some postprocessing on them,
and then publish an event with the postprocessed data.
To start the `metrics` component, simply point it at the `namespace/component/endpoint` trio that
you're interested in observing metrics from.
This will:
1. Scrape statistics from the services associated with that `endpoint`, do some postprocessing, and aggregate them.
2. Listen for `KvHitRateEvent`s on `namespace/kv-hit-rate`, and aggregate them.
For example:
```bash
# For more details, try DYN_LOG=debug
DYN_LOG=info cargo run --bin metrics -- --namespace dynemo --component backend --endpoint generate
......@@ -16,24 +19,16 @@ DYN_LOG=info cargo run --bin metrics -- --namespace dynemo --component backend -
# ...
```
With no matching endpoints running, you should see warnings in the logs:
With no matching endpoints running to collect stats from, you should see warnings in the logs:
```bash
2025-02-26T18:45:06.474161Z WARN metrics: No endpoints found matching subject dynemo_backend_720278f8.generate
```
To see metrics published to a matching endpoint, you can use the
[mock_worker example](src/bin/mock_worker.rs) in this directory to launch
1 or more workers that publish LLM Metrics:
```bash
# Can run multiple workers in separate shells
cargo run --bin mock_worker
```
After a matching endpoint gets started, you should see the warnings go away
since the endpoint will automatically get discovered.
After a matching endpoint gets started, you should see the warnings stop
when the endpoint gets automatically discovered.
When stats are found from target endpoints, the metrics component will
aggregate and publish metrics as both events and as updates to a prometheus server:
aggregate them and publish them to a prometheus server running on `localhost:9091/metrics` by default:
```
2025-02-28T04:05:58.077901Z INFO metrics: Aggregated metrics: ProcessedEndpoints { endpoints: [Endpoint { name: "worker-7587884888253033398", subject: "dynemo_backend_720278f8.generate-694d951a80e06bb6", data: ForwardPassMetrics { request_active_slots: 58, request_total_slots: 100, kv_active_blocks: 77, kv_total_blocks: 100 } }, Endpoint { name: "worker-7587884888253033401", subject: "dynemo_backend_720278f8.generate-694d951a80e06bb9", data: ForwardPassMetrics { request_active_slots: 71, request_total_slots: 100, kv_active_blocks: 29, kv_total_blocks: 100 } }], worker_ids: [7587884888253033398, 7587884888253033401], load_avg: 53.0, load_std: 24.0 }
```
......@@ -51,3 +46,20 @@ curl localhost:9091/metrics
# llm_kv_blocks_total{component="backend",endpoint="generate",worker_id="7587884888253033398"} 100
# llm_kv_blocks_total{component="backend",endpoint="generate",worker_id="7587884888253033401"} 100
```
## Mock Worker
For convenience and debugging, there is a mock worker that registers a mock `StatsHandler`
with the `endpoint` and publishes mock `KvHitRateEvent`s on `namespace/kv-hit-rate`.
```bash
# Can run multiple workers in separate shells to see aggregation as well.
DYN_LOG=info cargo run --bin mock_worker
```
**NOTE**: When using the mock worker, the data from the stats handler and the
events will be random and shouldn't be expected to correlate with each other.
## Real Worker
See the KV Routing example in `examples/python_rs/llm/vllm`.
......@@ -13,18 +13,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use dynemo_llm::kv_router::protocols::ForwardPassMetrics;
use async_nats::service::endpoint::Stats;
use dynemo_llm::kv_router::{
protocols::ForwardPassMetrics, scheduler::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
};
use dynemo_runtime::{
component::Namespace,
logging,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
},
protocols::annotated::Annotated,
stream, DistributedRuntime, Result, Runtime, Worker,
stream,
traits::events::EventPublisher,
DistributedRuntime, Result, Runtime, Worker,
};
use rand::Rng;
use std::sync::Arc;
use tokio::time::{interval, Duration};
fn main() -> Result<()> {
logging::init();
......@@ -37,16 +44,16 @@ async fn app(runtime: Runtime) -> Result<()> {
backend(distributed).await
}
struct RequestHandler {}
struct MockRequestHandler {}
impl RequestHandler {
impl MockRequestHandler {
fn new() -> Arc<Self> {
Arc::new(Self {})
}
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MockRequestHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();
......@@ -61,15 +68,75 @@ impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for Reques
}
}
/// Spawns a background task that periodically publishes mock KV hit rate events
async fn mock_event_publisher(namespace: Namespace) {
// NOTE: These events are just for testing, and shouldn't be interpreted
// in correlation with the stats handler's data:
// 1. The worker ID associated with the events here won't match the
// worker ID of the endpoint's service stats handler.
// 2. These events aren't coming through the KV Router, so the metrics won't
// be reflective of the KV Router's performance.
// 3. The data in these events aren't in sync with the stats handler's
// ForwardPassMetrics data, so they may not correlate well.
let worker_id = rand::thread_rng().gen_range(1..=1000);
let mut interval = interval(Duration::from_secs(1));
loop {
interval.tick().await;
// Generate random KV hit rate event using a new thread_rng each time
let isl_blocks = rand::thread_rng().gen_range(0..=100);
let overlap_blocks = rand::thread_rng().gen_range(0..=isl_blocks);
let event = KVHitRateEvent {
worker_id,
isl_blocks,
overlap_blocks,
};
if let Err(e) = namespace.publish(KV_HIT_RATE_SUBJECT, &event).await {
tracing::warn!("Failed to publish KV hit rate event: {e}");
} else {
tracing::info!(
"Published KV hit rate event: worker_id={worker_id}, isl_blocks={isl_blocks}, overlap_blocks={overlap_blocks}, hit_rate={:.2}%",
(overlap_blocks as f64 / isl_blocks as f64) * 100.0
);
}
}
}
/// Generates mock forward pass metrics for stats handler
fn mock_stats_handler(_stats: Stats) -> serde_json::Value {
println!("stats in: {:?}", _stats);
let request_total_slots = 100;
let request_active_slots = rand::thread_rng().gen_range(0..=request_total_slots);
let kv_total_blocks = 100;
let kv_active_blocks = rand::thread_rng().gen_range(0..=kv_total_blocks);
let stats = ForwardPassMetrics {
request_active_slots,
request_total_slots,
kv_active_blocks,
kv_total_blocks,
};
println!("stats out: {:?}", stats);
serde_json::to_value(stats).unwrap()
}
async fn backend(runtime: DistributedRuntime) -> Result<()> {
let namespace = runtime.namespace("dynemo")?;
// Spawn background task for publishing KV hit rate events
let namespace_clone = namespace.clone();
tokio::spawn(async move {
mock_event_publisher(namespace_clone).await;
});
// attach an ingress to an engine
let ingress = Ingress::for_engine(RequestHandler::new())?;
let ingress = Ingress::for_engine(MockRequestHandler::new())?;
// make the ingress discoverable via a component service
// we must first create a service, then we can attach one more more endpoints
runtime
.namespace("dynemo")?
namespace
.component("backend")?
.service_builder()
.create()
......@@ -77,21 +144,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
.endpoint("generate")
.endpoint_builder()
// Dummy stats handler to demonstrate how to attach a custom stats handler
.stats_handler(|_stats| {
println!("stats in: {:?}", _stats);
let request_total_slots = 100;
let request_active_slots = rand::thread_rng().gen_range(0..request_total_slots);
let kv_total_blocks = 100;
let kv_active_blocks = rand::thread_rng().gen_range(0..kv_total_blocks);
let stats = ForwardPassMetrics {
request_active_slots,
request_total_slots,
kv_active_blocks,
kv_total_blocks,
};
println!("stats out: {:?}", stats);
serde_json::to_value(stats).unwrap()
})
.stats_handler(mock_stats_handler)
.handler(ingress)
.start()
.await
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment