Unverified Commit 3bc129dc authored by William Arnold's avatar William Arnold Committed by GitHub
Browse files

feat: Add python-configurable engine routes for sglang profiling (#4617)


Signed-off-by: default avatarWilliam Arnold <7565007+Aphoh@users.noreply.github.com>
parent 76f4bc83
...@@ -123,6 +123,23 @@ async def init(runtime: DistributedRuntime, config: Config): ...@@ -123,6 +123,23 @@ async def init(runtime: DistributedRuntime, config: Config):
await _handle_non_leader_node(engine, generate_endpoint) await _handle_non_leader_node(engine, generate_endpoint)
return return
# Register engine routes for profiling
async def start_profile_handler(body: dict) -> dict:
"""Handle /engine/start_profile requests"""
await engine.tokenizer_manager.start_profile(**body)
return {"status": "ok", "message": "Profiling started"}
async def stop_profile_handler(body: dict) -> dict:
"""Handle /engine/stop_profile requests"""
await engine.tokenizer_manager.stop_profile()
return {"status": "ok", "message": "Profiling stopped"}
runtime.register_engine_route("start_profile", start_profile_handler)
runtime.register_engine_route("stop_profile", stop_profile_handler)
logging.info(
"Registered engine routes: /engine/start_profile, /engine/stop_profile"
)
prefill_client = None prefill_client = None
prefill_router_client = None prefill_router_client = None
if config.serving_mode == DisaggregationMode.DECODE: if config.serving_mode == DisaggregationMode.DECODE:
...@@ -225,6 +242,23 @@ async def init_prefill(runtime: DistributedRuntime, config: Config): ...@@ -225,6 +242,23 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
await _handle_non_leader_node(engine, generate_endpoint) await _handle_non_leader_node(engine, generate_endpoint)
return return
# Register engine routes for profiling
async def start_profile_handler(body: dict) -> dict:
"""Handle /engine/start_profile requests"""
await engine.tokenizer_manager.start_profile(**body)
return {"status": "ok", "message": "Profiling started"}
async def stop_profile_handler(body: dict) -> dict:
"""Handle /engine/stop_profile requests"""
await engine.tokenizer_manager.stop_profile()
return {"status": "ok", "message": "Profiling stopped"}
runtime.register_engine_route("start_profile", start_profile_handler)
runtime.register_engine_route("stop_profile", stop_profile_handler)
logging.info(
"Registered engine routes: /engine/start_profile, /engine/stop_profile"
)
# Perform dummy warmup for prefill worker to avoid initial TTFT hit # Perform dummy warmup for prefill worker to avoid initial TTFT hit
# Only needed on leader node that handles requests # Only needed on leader node that handles requests
await _warmup_prefill_engine(engine, server_args) await _warmup_prefill_engine(engine, server_args)
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
# Profiling SGLang Workers in Dynamo
Dynamo exposes profiling endpoints for SGLang workers via the system server's `/engine/*` routes. This allows you to start and stop PyTorch profiling on running inference workers without restarting them.
These endpoints wrap SGLang's internal `TokenizerManager.start_profile()` and `stop_profile()` methods. See SGLang's documentation for the full list of supported parameters.
## Quick Start
1. **Start profiling:**
```bash
curl -X POST http://localhost:9090/engine/start_profile \
-H "Content-Type: application/json" \
-d '{"output_dir": "/tmp/profiler_output"}'
```
2. **Run some inference requests to generate profiling data**
3. **Stop profiling:**
```bash
curl -X POST http://localhost:9090/engine/stop_profile
```
4. **View the traces:**
The profiler outputs Chrome trace files in the specified `output_dir`. You can view them using:
- Chrome's `chrome://tracing`
- [Perfetto UI](https://ui.perfetto.dev/)
- TensorBoard with the PyTorch Profiler plugin
## Test Script
A test script is provided at [`examples/backends/sglang/test_sglang_profile.py`](../../../examples/backends/sglang/test_sglang_profile.py) that demonstrates the full profiling workflow:
```bash
python examples/backends/sglang/test_sglang_profile.py
```
...@@ -61,6 +61,7 @@ ...@@ -61,6 +61,7 @@
backends/sglang/expert-distribution-eplb.md backends/sglang/expert-distribution-eplb.md
backends/sglang/gpt-oss.md backends/sglang/gpt-oss.md
backends/sglang/multimodal_epd.md backends/sglang/multimodal_epd.md
backends/sglang/profiling.md
backends/sglang/sgl-hicache-example.md backends/sglang/sgl-hicache-example.md
backends/sglang/sglang-disaggregation.md backends/sglang/sglang-disaggregation.md
backends/sglang/prometheus.md backends/sglang/prometheus.md
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Test script for /engine/start_profile and /engine/stop_profile routes.
This script demonstrates the new custom engine route registration feature.
It starts a simple sglang server with dynamo and tests the profiling endpoints.
Usage:
python test_sglang_profile.py
"""
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
import requests
# Configuration
MODEL = "Qwen/Qwen3-0.6B" # Small model for quick testing
HOST = "127.0.0.1"
PORT = 30000
SYSTEM_PORT = 9090
PROFILER_OUTPUT_DIR = "/tmp/dynamo_profiler_test"
def cleanup_output_dir():
"""Clean up the profiler output directory"""
import shutil
if os.path.exists(PROFILER_OUTPUT_DIR):
shutil.rmtree(PROFILER_OUTPUT_DIR)
os.makedirs(PROFILER_OUTPUT_DIR, exist_ok=True)
def start_frontend():
"""Start the Dynamo frontend (HTTP server)"""
print("\nStarting Dynamo frontend...")
print(f" - Frontend HTTP: http://{HOST}:{PORT}")
cmd = [
"python",
"-m",
"dynamo.frontend",
"--http-port",
str(PORT),
]
print(f"Command: {' '.join(cmd)}")
print("(Output will appear below)\n")
process = subprocess.Popen(cmd)
# Wait for frontend to be ready
max_wait = 30
start_time = time.time()
frontend_ready = False
while time.time() - start_time < max_wait:
try:
# Check /health endpoint first
response = requests.get(f"http://{HOST}:{PORT}/health", timeout=1)
if response.status_code == 200:
print("✓ Frontend is ready!")
frontend_ready = True
break
except requests.exceptions.RequestException:
pass
if process.poll() is not None:
print("✗ Frontend process died!")
sys.exit(1)
time.sleep(1)
if not frontend_ready:
print("✗ Frontend failed to start in time!")
process.kill()
sys.exit(1)
return process
def start_sglang_backend():
"""Start the sglang backend (inference engine)"""
print("\nStarting SGLang backend...")
print(f" - Model: {MODEL}")
print(f" - System server: http://{HOST}:{SYSTEM_PORT}")
# Set environment variables
env = os.environ.copy()
env["SGLANG_TORCH_PROFILER_DIR"] = PROFILER_OUTPUT_DIR
env["DYN_SYSTEM_PORT"] = str(SYSTEM_PORT)
cmd = [
"python",
"-m",
"dynamo.sglang",
"--model-path",
MODEL,
"--tp",
"1",
"--mem-fraction-static",
"0.8",
]
print(f"Command: {' '.join(cmd)}")
print("(Output will appear below)")
print("\nWaiting for backend to start...\n")
process = subprocess.Popen(cmd, env=env)
# Wait for backend to be ready (check system server health)
max_wait = 120 # 2 minutes
start_time = time.time()
backend_ready = False
while time.time() - start_time < max_wait:
try:
# Check system server health endpoint
response = requests.get(f"http://{HOST}:{SYSTEM_PORT}/health", timeout=1)
if response.status_code == 200:
print("✓ Backend is ready!")
backend_ready = True
break
except requests.exceptions.RequestException:
pass
# Check if process has died
if process.poll() is not None:
print("✗ Backend process died!")
sys.exit(1)
time.sleep(2)
if not backend_ready:
print("✗ Backend failed to start in time!")
process.kill()
sys.exit(1)
return process
def test_profiling_endpoints():
"""Test the /engine/start_profile and /engine/stop_profile endpoints"""
base_url = f"http://{HOST}:{SYSTEM_PORT}"
print("\n" + "=" * 60)
print("Testing /engine/start_profile and /engine/stop_profile")
print("=" * 60)
# Test 1: Start profiling with parameters (no num_steps so we control stop manually)
print("\n1. Starting profiling with parameters...")
response = requests.post(
f"{base_url}/engine/start_profile",
json={
"output_dir": PROFILER_OUTPUT_DIR,
"activities": ["CPU", "GPU"],
"with_stack": True,
"record_shapes": True,
},
)
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert response.json()["status"] == "ok", "Expected status 'ok'"
# Check available models
print("\n2. Checking available models...")
response = requests.get(f"http://{HOST}:{PORT}/v1/models")
if response.status_code == 200:
models = response.json()
print(f" Available models: {models}")
# Make a few inference requests to generate profiling data
print("\n3. Making inference requests...")
inference_url = f"http://{HOST}:{PORT}/v1/completions"
for i in range(3):
response = requests.post(
inference_url,
json={
"model": MODEL,
"prompt": f"Hello, this is test request {i+1}. ",
"max_tokens": 10,
"temperature": 0.8,
},
)
print(f" Request {i+1}: {response.status_code}")
if response.status_code != 200:
print(f" Response: {response.text[:200]}")
time.sleep(0.5)
# Test 2: Stop profiling
print("\n4. Stopping profiling...")
response = requests.post(f"{base_url}/engine/stop_profile")
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert response.json()["status"] == "ok", "Expected status 'ok'"
# Test 3: Test with empty body (GET-like POST)
print("\n5. Starting profiling with empty body...")
response = requests.post(f"{base_url}/engine/start_profile")
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
# Test 4: Test invalid route
print("\n6. Testing invalid route...")
response = requests.post(f"{base_url}/engine/nonexistent_route")
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
assert response.status_code == 404, f"Expected 404, got {response.status_code}"
# Stop profiling again
response = requests.post(f"{base_url}/engine/stop_profile")
print("\n" + "=" * 60)
print("✓ All tests passed!")
print("=" * 60)
# Check if profiling files were created
print(f"\nChecking profiler output directory: {PROFILER_OUTPUT_DIR}")
if os.path.exists(PROFILER_OUTPUT_DIR):
files = list(Path(PROFILER_OUTPUT_DIR).rglob("*"))
if files:
print(f"✓ Found {len(files)} files in output directory")
for f in files[:5]: # Show first 5 files
print(f" - {f}")
else:
print("⚠ No files found (profiling may not have run long enough)")
else:
print("⚠ Output directory not created")
def main():
"""Main test function"""
frontend_process = None
backend_process = None
try:
# Clean up output directory
cleanup_output_dir()
# Start frontend first
frontend_process = start_frontend()
# Start backend
backend_process = start_sglang_backend()
# Run tests
print("\n" + "=" * 60)
print("Both frontend and backend are ready!")
print("=" * 60)
time.sleep(2) # Give everything a moment to fully settle
test_profiling_endpoints()
print("\n✓ Test completed successfully!")
except KeyboardInterrupt:
print("\n⚠ Interrupted by user")
except Exception as e:
print(f"\n✗ Test failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# Cleanup
print("\nShutting down servers...")
if backend_process:
print(" Stopping backend...")
backend_process.send_signal(signal.SIGTERM)
try:
backend_process.wait(timeout=10)
except subprocess.TimeoutExpired:
print(" Force killing backend...")
backend_process.kill()
if frontend_process:
print(" Stopping frontend...")
frontend_process.send_signal(signal.SIGTERM)
try:
frontend_process.wait(timeout=10)
except subprocess.TimeoutExpired:
print(" Force killing frontend...")
frontend_process.kill()
print("✓ Servers stopped")
if __name__ == "__main__":
main()
...@@ -624,6 +624,84 @@ impl DistributedRuntime { ...@@ -624,6 +624,84 @@ impl DistributedRuntime {
CancellationToken { inner } CancellationToken { inner }
} }
/// Register an async Python callback for /engine/{route_name}
///
/// Args:
/// route_name: Route path (e.g., "start_profile" → /engine/start_profile)
/// callback: Async function with signature: async def(body: dict) -> dict
///
/// Example:
/// ```python
/// async def start_profile(body: dict) -> dict:
/// await engine.start_profile(**body)
/// return {"status": "ok"}
///
/// runtime.register_engine_route("start_profile", start_profile)
/// ```
#[pyo3(signature = (route_name, callback))]
fn register_engine_route(
&self,
py: Python<'_>,
route_name: String,
callback: PyObject,
) -> PyResult<()> {
// Capture TaskLocals at registration time when Python's event loop is running.
// This is needed because later, when the callback is invoked from an HTTP request,
// we'll be on a Rust thread without a running Python event loop.
let locals =
Arc::new(pyo3_async_runtimes::tokio::get_current_locals(py).map_err(to_pyerr)?);
let callback = Arc::new(callback);
// Wrap Python async callback in Rust async closure
let rust_callback: rs::engine_routes::EngineRouteCallback =
Arc::new(move |body: serde_json::Value| {
let callback = callback.clone();
let locals = locals.clone();
// Return a boxed future
Box::pin(async move {
// Acquire GIL to call Python callback and convert coroutine to future
let py_future = Python::with_gil(|py| {
// Convert body to Python dict
let py_body = pythonize::pythonize(py, &body).map_err(|e| {
anyhow::anyhow!("Failed to convert request body to Python: {}", e)
})?;
// Call Python async function to get a coroutine
let coroutine = callback.call1(py, (py_body,)).map_err(|e| {
anyhow::anyhow!("Failed to call Python callback: {}", e)
})?;
// Use the TaskLocals captured at registration time
pyo3_async_runtimes::into_future_with_locals(
&locals,
coroutine.into_bound(py),
)
.map_err(|e| {
anyhow::anyhow!("Failed to convert coroutine to future: {}", e)
})
})?;
// Await the Python coroutine (GIL is released during await)
let py_result = py_future
.await
.map_err(|e| anyhow::anyhow!("Python callback failed: {}", e))?;
// Convert result back to serde_json::Value
Python::with_gil(|py| {
pythonize::depythonize::<serde_json::Value>(py_result.bind(py))
.map_err(|e| anyhow::anyhow!("Failed to serialize response: {}", e))
})
})
});
self.inner
.engine_routes()
.register(&route_name, rust_callback);
tracing::debug!("Registered engine route: /engine/{}", route_name);
Ok(())
}
// This is used to pass the DistributedRuntime from the dynamo-runtime bindings // This is used to pass the DistributedRuntime from the dynamo-runtime bindings
// to the KVBM bindings, since KVBM cannot directly use the struct from this cdylib. // to the KVBM bindings, since KVBM cannot directly use the struct from this cdylib.
// TODO: Create a separate crate "dynamo-python" so that all binding crates can import // TODO: Create a separate crate "dynamo-python" so that all binding crates can import
......
...@@ -5,6 +5,7 @@ from typing import ( ...@@ -5,6 +5,7 @@ from typing import (
Any, Any,
AsyncGenerator, AsyncGenerator,
AsyncIterator, AsyncIterator,
Awaitable,
Callable, Callable,
Dict, Dict,
List, List,
...@@ -57,6 +58,32 @@ class DistributedRuntime: ...@@ -57,6 +58,32 @@ class DistributedRuntime:
""" """
... ...
def register_engine_route(
self,
route_name: str,
callback: Callable[[dict], Awaitable[dict]],
) -> None:
"""
Register an async callback for /engine/{route_name} on the system status server.
Args:
route_name: The route path (e.g., "start_profile" creates /engine/start_profile)
callback: Async function with signature: async def(body: dict) -> dict
Example:
async def start_profile(body: dict) -> dict:
await engine.start_profile(**body)
return {"status": "ok", "message": "Profiling started"}
runtime.register_engine_route("start_profile", start_profile)
The callback receives the JSON request body as a dict and should return
a dict that will be serialized as the JSON response.
For GET requests or empty bodies, an empty dict {} is passed.
"""
...
class CancellationToken: class CancellationToken:
def cancel(self) -> None: def cancel(self) -> None:
""" """
......
...@@ -74,6 +74,9 @@ pub struct DistributedRuntime { ...@@ -74,6 +74,9 @@ pub struct DistributedRuntime {
// This hierarchy's own metrics registry // This hierarchy's own metrics registry
metrics_registry: MetricsRegistry, metrics_registry: MetricsRegistry,
// Registry for /engine/* route callbacks
engine_routes: crate::engine_routes::EngineRouteRegistry,
} }
impl MetricsHierarchy for DistributedRuntime { impl MetricsHierarchy for DistributedRuntime {
...@@ -197,6 +200,7 @@ impl DistributedRuntime { ...@@ -197,6 +200,7 @@ impl DistributedRuntime {
system_health, system_health,
request_plane, request_plane,
local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(), local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
}; };
// Initialize the uptime gauge in SystemHealth // Initialize the uptime gauge in SystemHealth
...@@ -307,6 +311,11 @@ impl DistributedRuntime { ...@@ -307,6 +311,11 @@ impl DistributedRuntime {
&self.local_endpoint_registry &self.local_endpoint_registry
} }
/// Get the engine route registry for registering custom /engine/* routes
pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
&self.engine_routes
}
pub fn connection_id(&self) -> u64 { pub fn connection_id(&self) -> u64 {
self.discovery_client.instance_id() self.discovery_client.instance_id()
} }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
/// Callback type for engine routes (async)
/// Takes JSON body, returns JSON response (or error) wrapped in a Future
pub type EngineRouteCallback = Arc<
dyn Fn(
serde_json::Value,
) -> Pin<Box<dyn Future<Output = anyhow::Result<serde_json::Value>> + Send>>
+ Send
+ Sync,
>;
/// Registry for engine route callbacks
///
/// This registry stores callbacks that handle requests to `/engine/*` routes.
/// Routes are registered from Python via `runtime.register_engine_route()`.
#[derive(Clone, Default)]
pub struct EngineRouteRegistry {
routes: Arc<RwLock<HashMap<String, EngineRouteCallback>>>,
}
impl EngineRouteRegistry {
/// Create a new empty registry
pub fn new() -> Self {
Self {
routes: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Register a callback for a route (e.g., "start_profile" for /engine/start_profile)
pub fn register(&self, route: &str, callback: EngineRouteCallback) {
let mut routes = self.routes.write().unwrap();
routes.insert(route.to_string(), callback);
tracing::debug!("Registered engine route: /engine/{}", route);
}
/// Get callback for a route
pub fn get(&self, route: &str) -> Option<EngineRouteCallback> {
let routes = self.routes.read().unwrap();
routes.get(route).cloned()
}
/// List all registered routes
pub fn routes(&self) -> Vec<String> {
let routes = self.routes.read().unwrap();
routes.keys().cloned().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_registry_basic() {
let registry = EngineRouteRegistry::new();
// Register a simple callback
let callback: EngineRouteCallback =
Arc::new(|body| Box::pin(async move { Ok(serde_json::json!({"echo": body})) }));
registry.register("test", callback);
// Verify it's registered
assert!(registry.get("test").is_some());
assert!(registry.get("nonexistent").is_none());
// Verify routes list
let routes = registry.routes();
assert_eq!(routes.len(), 1);
assert!(routes.contains(&"test".to_string()));
}
#[tokio::test]
async fn test_callback_execution() {
let registry = EngineRouteRegistry::new();
let callback: EngineRouteCallback = Arc::new(|body| {
Box::pin(async move {
let input = body.get("input").and_then(|v| v.as_str()).unwrap_or("");
Ok(serde_json::json!({
"output": format!("processed: {}", input)
}))
})
});
registry.register("process", callback);
// Get and execute callback
let cb = registry.get("process").unwrap();
let result = cb(serde_json::json!({"input": "test"})).await.unwrap();
assert_eq!(result["output"], "processed: test");
}
#[tokio::test]
async fn test_clone_shares_routes() {
let registry = EngineRouteRegistry::new();
let callback: EngineRouteCallback =
Arc::new(|_| Box::pin(async { Ok(serde_json::json!({"ok": true})) }));
registry.register("test", callback);
// Clone the registry
let cloned = registry.clone();
// Both should see the same route
assert!(registry.get("test").is_some());
assert!(cloned.get("test").is_some());
// Register on clone
let callback2: EngineRouteCallback =
Arc::new(|_| Box::pin(async { Ok(serde_json::json!({"ok": false})) }));
cloned.register("test2", callback2);
// Original should also see it (they share the Arc)
assert!(registry.get("test2").is_some());
}
}
...@@ -24,6 +24,7 @@ pub mod component; ...@@ -24,6 +24,7 @@ pub mod component;
pub mod compute; pub mod compute;
pub mod discovery; pub mod discovery;
pub mod engine; pub mod engine;
pub mod engine_routes;
pub mod health_check; pub mod health_check;
pub mod local_endpoint_registry; pub mod local_endpoint_registry;
pub mod system_status_server; pub mod system_status_server;
......
...@@ -13,10 +13,11 @@ use crate::metrics::MetricsHierarchy; ...@@ -13,10 +13,11 @@ use crate::metrics::MetricsHierarchy;
use crate::traits::DistributedRuntimeProvider; use crate::traits::DistributedRuntimeProvider;
use axum::{ use axum::{
Router, Router,
body::Bytes,
extract::{Json, Path, State}, extract::{Json, Path, State},
http::StatusCode, http::StatusCode,
response::IntoResponse, response::IntoResponse,
routing::{delete, get, post}, routing::{any, delete, get, post},
}; };
use futures::StreamExt; use futures::StreamExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
...@@ -183,6 +184,13 @@ pub async fn spawn_system_status_server( ...@@ -183,6 +184,13 @@ pub async fn spawn_system_status_server(
let state = Arc::clone(&server_state); let state = Arc::clone(&server_state);
move || metadata_handler(state) move || metadata_handler(state)
}), }),
)
.route(
"/engine/{*path}",
any({
let state = Arc::clone(&server_state);
move |path, body| engine_route_handler(state, path, body)
}),
); );
// Add LoRA routes only if DYN_LORA_ENABLED is set to true // Add LoRA routes only if DYN_LORA_ENABLED is set to true
...@@ -524,6 +532,77 @@ fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse { ...@@ -524,6 +532,77 @@ fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
} }
} }
/// Engine route handler for /engine/* routes
///
/// This handler looks up registered callbacks in the engine routes registry
/// and invokes them with the request body, returning the response as JSON.
#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
async fn engine_route_handler(
state: Arc<SystemStatusState>,
Path(path): Path<String>,
body: Bytes,
) -> impl IntoResponse {
tracing::trace!("Engine route request to /engine/{}", path);
// Parse body as JSON (empty object for GET/empty body)
let body_json: serde_json::Value = if body.is_empty() {
serde_json::json!({})
} else {
match serde_json::from_slice(&body) {
Ok(json) => json,
Err(e) => {
tracing::warn!("Invalid JSON in request body: {}", e);
return (
StatusCode::BAD_REQUEST,
json!({
"error": "Invalid JSON",
"message": format!("{}", e)
})
.to_string(),
)
.into_response();
}
}
};
// Look up callback
let callback = match state.drt().engine_routes().get(&path) {
Some(cb) => cb,
None => {
tracing::debug!("Route /engine/{} not found", path);
return (
StatusCode::NOT_FOUND,
json!({
"error": "Route not found",
"message": format!("Route /engine/{} not found", path)
})
.to_string(),
)
.into_response();
}
};
// Call callback (it's async, so await it)
match callback(body_json).await {
Ok(response) => {
tracing::trace!("Engine route handler succeeded for /engine/{}", path);
(StatusCode::OK, response.to_string()).into_response()
}
Err(e) => {
tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
json!({
"error": "Handler error",
"message": format!("{}", e)
})
.to_string(),
)
.into_response()
}
}
}
// Regular tests: cargo test system_status_server --lib // Regular tests: cargo test system_status_server --lib
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment