Unverified Commit 9e6a84af authored by Ryan McCormick's avatar Ryan McCormick Committed by GitHub
Browse files

chore(style): Log with {var} style in lib/runtime (#6757)

parent 72eabeae
......@@ -67,7 +67,7 @@ impl KubeDiscoveryClient {
tokio::spawn(async move {
if let Err(e) = daemon.run(watch_tx).await {
tracing::error!("Discovery daemon failed: {}", e);
tracing::error!("Discovery daemon failed: {e}");
}
});
......
......@@ -86,7 +86,7 @@ impl DiscoveryDaemon {
notify_ep.notify_one();
}
Err(e) => {
tracing::warn!("EndpointSlice reflector error: {}", e);
tracing::warn!("EndpointSlice reflector error: {e}");
notify_ep.notify_one();
}
}
......@@ -124,7 +124,7 @@ impl DiscoveryDaemon {
notify_cr.notify_one();
}
Err(e) => {
tracing::warn!("DynamoWorkerMetadata CR reflector error: {}", e);
tracing::warn!("DynamoWorkerMetadata CR reflector error: {e}");
notify_cr.notify_one();
}
}
......@@ -164,7 +164,7 @@ impl DiscoveryDaemon {
sequence += 1;
}
Err(e) => {
tracing::error!("Failed to aggregate snapshot: {}", e);
tracing::error!("Failed to aggregate snapshot: {e}");
// Continue on errors - don't crash daemon
}
}
......@@ -220,7 +220,7 @@ impl DiscoveryDaemon {
// Deserialize the data field to DiscoveryMetadata
match serde_json::from_value::<DiscoveryMetadata>(arc_cr.spec.data.clone()) {
Ok(metadata) => {
tracing::trace!("Loaded metadata from CR '{}'", cr_name);
tracing::trace!("Loaded metadata from CR '{cr_name}'");
cr_map.insert(cr_name.clone(), (Arc::new(metadata), generation));
}
Err(e) => {
......
......@@ -58,7 +58,7 @@ impl EventTransportKind {
/// Logs a warning if an invalid value is encountered.
pub fn from_env_or_default() -> Self {
Self::from_env().unwrap_or_else(|e| {
tracing::warn!("{}, defaulting to NATS", e);
tracing::warn!("{e}, defaulting to NATS");
Self::Nats
})
}
......
......@@ -145,7 +145,7 @@ impl DistributedRuntime {
(Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
}
DiscoveryBackend::KvStore(kv_selector) => {
tracing::info!("Initializing KV store discovery backend: {}", kv_selector);
tracing::info!("Initializing KV store discovery backend: {kv_selector}");
let runtime_clone = runtime.clone();
let store = match kv_selector {
kv::Selector::Etcd(etcd_config) => {
......@@ -227,7 +227,7 @@ impl DistributedRuntime {
.await
{
Ok((addr, handle)) => {
tracing::info!("System status server started successfully on {}", addr);
tracing::info!("System status server started successfully on {addr}");
// Store system status server information
let system_status_server_info =
......@@ -243,7 +243,7 @@ impl DistributedRuntime {
.expect("System status server info should only be set once");
}
Err(e) => {
tracing::error!("System status server startup failed: {}", e);
tracing::error!("System status server startup failed: {e}");
}
}
} else {
......@@ -274,7 +274,7 @@ impl DistributedRuntime {
config.canary_wait_time_secs,
config.health_check_request_timeout_secs
),
Err(e) => tracing::error!("Health check manager failed to start: {}", e),
Err(e) => tracing::error!("Health check manager failed to start: {e}"),
}
}
......@@ -402,7 +402,7 @@ impl DistributedRuntime {
) -> anyhow::Result<()> {
let Some(nats_client) = self.nats_client.as_ref() else {
// NATS not available - this is expected in approximate mode (--no-kv-events)
tracing::trace!("Skipping NATS publish (NATS not configured): {}", subject);
tracing::trace!("Skipping NATS publish (NATS not configured): {subject}");
return Ok(());
};
Ok(nats_client.client().publish(subject, payload).await?)
......
......@@ -37,7 +37,7 @@ impl EngineRouteRegistry {
pub fn register(&self, route: &str, callback: EngineRouteCallback) {
let mut routes = self.routes.write().unwrap();
routes.insert(route.to_string(), callback);
tracing::debug!("Registered engine route: /engine/{}", route);
tracing::debug!("Registered engine route: /engine/{route}");
}
/// Get callback for a route
......
......@@ -45,7 +45,7 @@ impl LocalEndpointRegistry {
/// * `endpoint_name` - Name of the endpoint (e.g., "load_lora", "generate")
/// * `engine` - The async engine that handles requests for this endpoint
pub fn register(&self, endpoint_name: String, engine: LocalAsyncEngine) {
tracing::debug!("Registering local endpoint: {}", endpoint_name);
tracing::debug!("Registering local endpoint: {endpoint_name}");
self.engines.insert(endpoint_name, engine);
}
......
......@@ -683,7 +683,7 @@ where
// Extract trace_id from span attributes
if let Some(trace_id_input) = visitor.fields.get("trace_id") {
if !is_valid_trace_id(trace_id_input) {
tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
tracing::trace!("trace id '{trace_id_input}' is not valid! Ignoring.");
} else {
trace_id = Some(trace_id_input.to_string());
}
......@@ -692,7 +692,7 @@ where
// Extract span_id from span attributes
if let Some(span_id_input) = visitor.fields.get("span_id") {
if !is_valid_span_id(span_id_input) {
tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
tracing::trace!("span id '{span_id_input}' is not valid! Ignoring.");
} else {
span_id = Some(span_id_input.to_string());
}
......@@ -701,7 +701,7 @@ where
// Extract parent_id from span attributes
if let Some(parent_id_input) = visitor.fields.get("parent_id") {
if !is_valid_span_id(parent_id_input) {
tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
tracing::trace!("parent id '{parent_id_input}' is not valid! Ignoring.");
} else {
parent_id = Some(parent_id_input.to_string());
}
......
......@@ -733,7 +733,7 @@ impl MetricsRegistry {
for registry in &registries {
for result in registry.execute_update_callbacks() {
if let Err(e) = result {
tracing::error!("Error executing metrics callback: {}", e);
tracing::error!("Error executing metrics callback: {e}");
}
}
}
......@@ -870,7 +870,7 @@ impl MetricsRegistry {
}
}
Err(e) => {
tracing::error!("Error executing exposition text callback: {}", e);
tracing::error!("Error executing exposition text callback: {e}");
}
}
}
......
......@@ -229,7 +229,7 @@ where
.error_type(ErrorType::Disconnected)
.message("Stream ended before generation completed")
.build();
tracing::debug!("{}", err);
tracing::debug!("{err}");
Some(U::from_err(err))
}
});
......
......@@ -380,14 +380,14 @@ impl TcpConnectionPool {
if conn.is_healthy() {
return Ok(conn);
} else {
tracing::debug!("Discarding unhealthy connection for {}", addr);
tracing::debug!("Discarding unhealthy connection for {addr}");
// Connection will be dropped here, cleaning up tasks
}
}
}
// Create new connection with configured channel buffer
tracing::debug!("Creating new TCP connection to {}", addr);
tracing::debug!("Creating new TCP connection to {addr}");
TcpConnection::connect(
addr,
self.config.connect_timeout,
......@@ -417,7 +417,7 @@ impl TcpConnectionPool {
if pool.len() < self.config.pool_size {
pool.push(conn);
} else {
tracing::debug!("Connection pool full for {}, dropping connection", addr);
tracing::debug!("Connection pool full for {addr}, dropping connection");
// Otherwise drop the connection (tasks will be cleaned up)
}
}
......@@ -503,7 +503,7 @@ impl RequestPlaneClient for TcpRequestClient {
payload: Bytes,
mut headers: Headers,
) -> Result<Bytes> {
tracing::debug!("TCP client sending request to address: {}", address);
tracing::debug!("TCP client sending request to address: {address}");
self.stats.requests_sent.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_sent
......@@ -558,7 +558,7 @@ impl RequestPlaneClient for TcpRequestClient {
}
Err(_) => {
self.stats.errors.fetch_add(1, Ordering::Relaxed);
tracing::warn!("TCP request timeout to {}", addr);
tracing::warn!("TCP request timeout to {addr}");
// Don't return timed-out connection to pool
Err(anyhow::anyhow!(
crate::error::DynamoError::builder()
......
......@@ -94,7 +94,7 @@ impl SharedHttpServer {
.lock()
.set_endpoint_health_status(&endpoint_name, HealthStatus::Ready);
tracing::debug!("Registered endpoint handler for subject: {}", subject_clone);
tracing::debug!("Registered endpoint handler for subject: {subject_clone}");
Ok(())
}
......@@ -170,7 +170,7 @@ impl SharedHttpServer {
tokio::select! {
result = http2_builder.serve_connection(io, hyper_service) => {
if let Err(e) = result {
tracing::debug!("HTTP/2 connection error: {}", e);
tracing::debug!("HTTP/2 connection error: {e}");
}
}
_ = cancel_clone.cancelled() => {
......@@ -180,7 +180,7 @@ impl SharedHttpServer {
});
}
Err(e) => {
tracing::error!("Failed to accept connection: {}", e);
tracing::error!("Failed to accept connection: {e}");
}
}
}
......@@ -213,7 +213,7 @@ async fn handle_shared_request(
let handler = match server.handlers.get(&endpoint_path) {
Some(h) => h.clone(),
None => {
tracing::warn!("No handler found for endpoint: {}", endpoint_path);
tracing::warn!("No handler found for endpoint: {endpoint_path}");
return (StatusCode::NOT_FOUND, "Endpoint not found");
}
};
......
......@@ -252,7 +252,7 @@ where
}
#[cfg(not(debug_assertions))]
{
tracing::error!("Failed to generate response stream: {}", error_string);
tracing::error!("Failed to generate response stream: {error_string}");
}
let _result = publisher.send_prologue(Some(error_string)).await;
......
......@@ -260,18 +260,18 @@ impl SharedTcpServer {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, peer_addr)) => {
tracing::trace!("Accepted TCP connection from {}", peer_addr);
tracing::trace!("Accepted TCP connection from {peer_addr}");
let handlers = self.handlers.clone();
let work_tx = self.work_tx.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(stream, handlers, work_tx).await {
tracing::error!("TCP connection error: {}", e);
tracing::error!("TCP connection error: {e}");
}
});
}
Err(e) => {
tracing::error!("Failed to accept TCP connection: {}", e);
tracing::error!("Failed to accept TCP connection: {e}");
}
}
}
......@@ -409,7 +409,7 @@ impl SharedTcpServer {
break;
}
Err(e) => {
tracing::warn!("Failed to read TCP request: {}", e);
tracing::warn!("Failed to read TCP request: {e}");
// Send error response
let error_response =
TcpResponseMessage::new(Bytes::from(format!("Read error: {}", e)));
......@@ -424,7 +424,7 @@ impl SharedTcpServer {
let endpoint_path = match request_msg.endpoint_path() {
Ok(path) => path,
Err(e) => {
tracing::warn!("Invalid UTF-8 in endpoint path: {}", e);
tracing::warn!("Invalid UTF-8 in endpoint path: {e}");
let error_response =
TcpResponseMessage::new(Bytes::from_static(b"Invalid endpoint path"));
if let Ok(encoded) = error_response.encode() {
......@@ -453,7 +453,7 @@ impl SharedTcpServer {
let handler = match handler {
Some(h) => h,
None => {
tracing::warn!("No handler found for endpoint: {}", endpoint_path);
tracing::warn!("No handler found for endpoint: {endpoint_path}");
// Send error response
let error_response = TcpResponseMessage::new(Bytes::from(format!(
"Unknown endpoint: {}",
......
......@@ -312,7 +312,7 @@ impl NetworkManager {
let server_clone = server.clone();
tokio::spawn(async move {
if let Err(e) = server_clone.start().await {
tracing::error!("HTTP request plane server error: {}", e);
tracing::error!("HTTP request plane server error: {e}");
}
});
......
......@@ -227,7 +227,7 @@ impl ResponseService for TcpStreamServer {
// oneshot channels to pass back the sender and receiver objects
let address = format!("{}:{}", self.local_ip, self.local_port);
tracing::debug!("Registering new TcpStream on {}", address);
tracing::debug!("Registering new TcpStream on {address}");
let send_stream = if options.enable_request_stream {
let sender_subject = uuid::Uuid::new_v4().to_string();
......@@ -341,7 +341,7 @@ async fn tcp_listener(
Ok((stream, _addr)) => (stream, _addr),
Err(e) => {
// the client should retry, so we don't need to abort
tracing::warn!("failed to accept tcp connection: {}", e);
tracing::warn!("failed to accept tcp connection: {e}");
eprintln!("failed to accept tcp connection: {}", e);
continue;
}
......@@ -350,14 +350,14 @@ async fn tcp_listener(
match stream.set_nodelay(true) {
Ok(_) => (),
Err(e) => {
tracing::warn!("failed to set tcp stream to nodelay: {}", e);
tracing::warn!("failed to set tcp stream to nodelay: {e}");
}
}
match stream.set_linger(Some(std::time::Duration::from_secs(0))) {
Ok(_) => (),
Err(e) => {
tracing::warn!("failed to set tcp stream to linger: {}", e);
tracing::warn!("failed to set tcp stream to linger: {e}");
}
}
......@@ -371,7 +371,7 @@ async fn tcp_listener(
match result {
Ok(_) => tracing::trace!("successfully processed tcp connection"),
Err(e) => {
tracing::warn!("failed to handle tcp connection: {}", e);
tracing::warn!("failed to handle tcp connection: {e}");
#[cfg(debug_assertions)]
eprintln!("failed to handle tcp connection: {}", e);
}
......@@ -561,7 +561,7 @@ async fn tcp_listener(
if !data.is_empty()
&& let Err(err) = response_tx.send(data).await {
tracing::debug!("forwarding body/data message to response channel failed: {}", err);
tracing::debug!("forwarding body/data message to response channel failed: {err}");
control_tx.send(ControlMessage::Kill).await.expect("the control channel should not be closed");
break;
};
......@@ -612,10 +612,10 @@ async fn tcp_listener(
let mut inner = socket_tx.into_inner();
if let Err(e) = inner.flush().await {
tracing::debug!("failed to flush socket: {}", e);
tracing::debug!("failed to flush socket: {e}");
}
if let Err(e) = inner.shutdown().await {
tracing::debug!("failed to shutdown socket: {}", e);
tracing::debug!("failed to shutdown socket: {e}");
}
}
}
......
......@@ -230,7 +230,7 @@ impl Runtime {
}
let count = thread_ids.lock().len();
tracing::debug!("Detected {} worker threads in runtime", count);
tracing::debug!("Detected {count} worker threads in runtime");
count
}
......@@ -318,7 +318,7 @@ impl Runtime {
tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
let count = tracker.get_count();
tracing::info!("Active graceful endpoints: {}", count);
tracing::info!("Active graceful endpoints: {count}");
if count != 0 {
tracker.wait_for_completion().await;
......
......@@ -224,7 +224,7 @@ pub async fn spawn_system_status_server(
.layer(TraceLayer::new_for_http().make_span_with(make_request_span));
let address = format!("{}:{}", host, port);
tracing::info!("[spawn_system_status_server] binding to: {}", address);
tracing::info!("[spawn_system_status_server] binding to: {address}");
let listener = match TcpListener::bind(&address).await {
Ok(listener) => {
......@@ -250,7 +250,7 @@ pub async fn spawn_system_status_server(
.with_graceful_shutdown(observer.cancelled_owned())
.await
{
tracing::error!("System status server error: {}", e);
tracing::error!("System status server error: {e}");
}
});
......@@ -297,7 +297,7 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let response = match state.drt().metrics().prometheus_expfmt() {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to get metrics from registry: {}", e);
tracing::error!("Failed to get metrics from registry: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get metrics".to_string(),
......@@ -334,7 +334,7 @@ async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
(StatusCode::OK, json).into_response()
}
Err(e) => {
tracing::error!("Failed to serialize metadata: {}", e);
tracing::error!("Failed to serialize metadata: {e}");
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize metadata".to_string(),
......@@ -406,7 +406,7 @@ async fn unload_lora_handler(
.strip_prefix('/')
.unwrap_or(&lora_name)
.to_string();
tracing::info!("Unloading LoRA: {}", lora_name);
tracing::info!("Unloading LoRA: {lora_name}");
// Call the unload_lora endpoint for each available backend
match call_lora_endpoint(
......@@ -427,7 +427,7 @@ async fn unload_lora_handler(
);
(StatusCode::INTERNAL_SERVER_ERROR, Json(response))
} else {
tracing::info!("LoRA unloaded successfully: {}", lora_name);
tracing::info!("LoRA unloaded successfully: {lora_name}");
(StatusCode::OK, Json(response))
}
}
......@@ -460,7 +460,7 @@ async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl
(StatusCode::OK, Json(response))
}
Err(e) => {
tracing::error!("Failed to list LoRAs: {}", e);
tracing::error!("Failed to list LoRAs: {e}");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(LoraResponse {
......@@ -487,7 +487,7 @@ async fn call_lora_endpoint(
) -> anyhow::Result<LoraResponse> {
use crate::engine::AsyncEngine;
tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
tracing::debug!("Calling local endpoint: '{endpoint_name}'");
// Get the endpoint from the local registry (in-process call only)
let local_registry = drt.local_endpoint_registry();
......@@ -558,7 +558,7 @@ async fn engine_route_handler(
Path(path): Path<String>,
body: Bytes,
) -> impl IntoResponse {
tracing::trace!("Engine route request to /engine/{}", path);
tracing::trace!("Engine route request to /engine/{path}");
// Parse body as JSON (empty object for GET/empty body)
let body_json: serde_json::Value = if body.is_empty() {
......@@ -567,7 +567,7 @@ async fn engine_route_handler(
match serde_json::from_slice(&body) {
Ok(json) => json,
Err(e) => {
tracing::warn!("Invalid JSON in request body: {}", e);
tracing::warn!("Invalid JSON in request body: {e}");
return (
StatusCode::BAD_REQUEST,
json!({
......@@ -585,7 +585,7 @@ async fn engine_route_handler(
let callback = match state.drt().engine_routes().get(&path) {
Some(cb) => cb,
None => {
tracing::debug!("Route /engine/{} not found", path);
tracing::debug!("Route /engine/{path} not found");
return (
StatusCode::NOT_FOUND,
json!({
......@@ -601,7 +601,7 @@ async fn engine_route_handler(
// Call callback (it's async, so await it)
match callback(body_json).await {
Ok(response) => {
tracing::trace!("Engine route handler succeeded for /engine/{}", path);
tracing::trace!("Engine route handler succeeded for /engine/{path}");
(StatusCode::OK, response.to_string()).into_response()
}
Err(e) => {
......@@ -1032,7 +1032,7 @@ mod integration_tests {
}
}
tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
tracing::info!("Health endpoint test results: {success_count}/200 requests succeeded");
if !failures.is_empty() {
tracing::warn!("Failed requests: {}", failures.len());
}
......
......@@ -444,7 +444,7 @@ impl Client {
.await
{
Ok((_, watch_stream)) => {
tracing::debug!("Watch stream established for prefix '{}'", prefix);
tracing::debug!("Watch stream established for prefix '{prefix}'");
return Ok(watch_stream);
}
Err(err) => {
......@@ -486,7 +486,7 @@ impl Client {
return true; // Exit to reconnect
}
None => {
tracing::warn!("Watch stream unexpectedly closed for prefix '{}'", prefix);
tracing::warn!("Watch stream unexpectedly closed for prefix '{prefix}'");
return true; // Exit to reconnect
}
};
......@@ -495,7 +495,7 @@ impl Client {
*start_revision = match response.header() {
Some(header) => header.revision() + 1,
None => {
tracing::error!("Missing header in watch response for prefix '{}'", prefix);
tracing::error!("Missing header in watch response for prefix '{prefix}'");
return false;
}
};
......@@ -692,14 +692,14 @@ impl KvCache {
WatchEvent::Delete(kv) => {
let key = String::from_utf8_lossy(kv.key()).to_string();
tracing::trace!("KvCache delete: {}", key);
tracing::trace!("KvCache delete: {key}");
let mut cache_write = cache.write().await;
cache_write.remove(&key);
}
}
}
tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
tracing::debug!("KvCache watcher for prefix '{prefix}' stopped");
});
}
......
......@@ -222,7 +222,7 @@ impl DistributedRWLock {
// Execute the atomic transaction
match etcd_client.etcd_client().kv_client().txn(txn).await {
Ok(response) if response.succeeded() => {
tracing::debug!("Acquired read lock for reader {}", reader_id);
tracing::debug!("Acquired read lock for reader {reader_id}");
return Ok(ReadLockGuard {
rwlock: self,
etcd_client,
......
......@@ -115,11 +115,11 @@ impl Server {
// but we also propagate the error to the caller's cancellation token
let watch_task = tokio::spawn(async move {
let result = primary_task.await.inspect_err(|e| {
tracing::error!("zmq server/router task failed: {}", e);
tracing::error!("zmq server/router task failed: {e}");
cancel_token.cancel();
})?;
result.inspect_err(|e| {
tracing::error!("zmq server/router task failed: {}", e);
tracing::error!("zmq server/router task failed: {e}");
cancel_token.cancel();
})
});
......@@ -155,7 +155,7 @@ impl Server {
// let port = addr.as_socket().map(|s| s.port());
// if let Some(port) = port {
// tracing::info!("Server listening on port {}", port);
// tracing::info!("Server listening on port {port}");
// }
loop {
......@@ -168,7 +168,7 @@ impl Server {
frames
},
Some(Err(e)) => {
tracing::warn!("Error receiving message: {}", e);
tracing::warn!("Error receiving message: {e}");
continue;
}
None => break,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment