Commit a62a8627 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

refactor: Use tracing crate (#161)

parent e1bd07fe
......@@ -30,7 +30,6 @@ use pythonize::{depythonize, pythonize};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tracing as log;
/// Add bingings from this crate to the provided module
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
......@@ -124,7 +123,7 @@ where
let ctx = context.context();
let id = context.id().to_string();
log::trace!("processing request: {}", id);
tracing::trace!("processing request: {}", id);
// Clone the PyObject to move into the thread
......@@ -147,7 +146,7 @@ where
let request_id = id.clone();
tokio::spawn(async move {
log::debug!(
tracing::debug!(
request_id,
"starting task to process python async generator stream"
);
......@@ -157,7 +156,7 @@ where
while let Some(item) = stream.next().await {
count += 1;
log::trace!(
tracing::trace!(
request_id,
"processing the {}th item from python async generator",
count
......@@ -178,17 +177,17 @@ where
// see: https://github.com/triton-inference-server/triton_distributed/issues/130
ctx.stop_generating();
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e);
log::error!(request_id, "{}", msg);
tracing::error!(request_id, "{}", msg);
msg
}
ResponseProcessingError::PythonException(e) => {
let msg = format!("a python exception was caught while processing the async generator: {}", e);
log::warn!(request_id, "{}", msg);
tracing::warn!(request_id, "{}", msg);
msg
}
ResponseProcessingError::OffloadError(e) => {
let msg = format!("critical error: failed to offload the python async generator to a new thread: {}", e);
log::error!(request_id, "{}", msg);
tracing::error!(request_id, "{}", msg);
msg
}
};
......@@ -198,7 +197,7 @@ where
};
if tx.send(response).await.is_err() {
log::trace!(
tracing::trace!(
request_id,
"error forwarding annotated response to channel; channel is closed"
);
......@@ -206,7 +205,7 @@ where
}
if done {
log::debug!(
tracing::debug!(
request_id,
"early termination of python async generator stream task"
);
......@@ -214,7 +213,7 @@ where
}
}
log::debug!(
tracing::debug!(
request_id,
"finished processing python async generator stream"
);
......
......@@ -22,7 +22,6 @@ use pyo3::{exceptions::PyException, prelude::*};
use rs::pipeline::network::Ingress;
use std::{fmt::Display, sync::Arc};
use tokio::sync::Mutex;
use tracing as log;
use triton_distributed::{
self as rs,
......@@ -353,7 +352,7 @@ async fn process_stream(
// Send the PyObject through the channel or log an error
if let Err(e) = tx.send(annotated).await {
log::error!("Failed to send response: {:?}", e);
tracing::error!("Failed to send response: {:?}", e);
}
if is_error {
......
......@@ -43,7 +43,7 @@
use crate::discovery::Lease;
use super::{error, log, transports::nats::Slug, DistributedRuntime, Result};
use super::{error, transports::nats::Slug, DistributedRuntime, Result};
use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
use async_nats::{
......
......@@ -85,20 +85,20 @@ where
// currently this is created once per client, but this object/task should only be instantiated
// once per worker/instance
secondary.spawn(async move {
log::debug!("Starting endpoint watcher for prefix: {}", prefix);
tracing::debug!("Starting endpoint watcher for prefix: {}", prefix);
let mut map = HashMap::new();
loop {
let kv_event = tokio::select! {
_ = watch_tx.closed() => {
log::debug!("all watchers have closed; shutting down endpoint watcher for prefix: {}", prefix);
tracing::debug!("all watchers have closed; shutting down endpoint watcher for prefix: {}", prefix);
break;
}
kv_event = kv_event_rx.recv() => {
match kv_event {
Some(kv_event) => kv_event,
None => {
log::debug!("watch stream has closed; shutting down endpoint watcher for prefix: {}", prefix);
tracing::debug!("watch stream has closed; shutting down endpoint watcher for prefix: {}", prefix);
break;
}
}
......@@ -112,7 +112,7 @@ where
if let (Ok(key), Ok(val)) = (key, val) {
map.insert(key.clone(), val.lease_id);
} else {
log::error!("Unable to parse put endpoint event; shutting down endpoint watcher for prefix: {}", prefix);
tracing::error!("Unable to parse put endpoint event; shutting down endpoint watcher for prefix: {}", prefix);
break;
}
}
......@@ -120,7 +120,7 @@ where
match String::from_utf8(kv.key().to_vec()) {
Ok(key) => { map.remove(&key); }
Err(_) => {
log::error!("Unable to parse delete endpoint event; shutting down endpoint watcher for prefix: {}", prefix);
tracing::error!("Unable to parse delete endpoint event; shutting down endpoint watcher for prefix: {}", prefix);
break;
}
}
......@@ -130,13 +130,13 @@ where
let endpoint_ids: Vec<i64> = map.values().cloned().collect();
if watch_tx.send(endpoint_ids).is_err() {
log::debug!("Unable to send watch updates; shutting down endpoint watcher for prefix: {}", prefix);
tracing::debug!("Unable to send watch updates; shutting down endpoint watcher for prefix: {}", prefix);
break;
}
}
log::debug!("Completed endpoint watcher for prefix: {}", prefix);
tracing::debug!("Completed endpoint watcher for prefix: {}", prefix);
let _ = watch_tx.send(vec![]);
});
......
......@@ -43,7 +43,7 @@ impl EndpointConfigBuilder {
let (endpoint, lease, handler) = self.build_internal()?.dissolve();
let lease = lease.unwrap_or(endpoint.component.drt.primary_lease());
log::debug!(
tracing::debug!(
"Starting endpoint: {}",
endpoint.etcd_path_with_id(lease.id())
);
......@@ -78,7 +78,7 @@ impl EndpointConfigBuilder {
// launch in primary runtime
let task = tokio::spawn(push_endpoint.start(service_endpoint));
// log::debug!(worker_id, "endpoint subject: {}", subject);
// tracing::debug!(worker_id, "endpoint subject: {}", subject);
// make the components service endpoint discovery in etcd
......@@ -104,7 +104,7 @@ impl EndpointConfigBuilder {
)
.await
{
log::error!("Failed to register discoverable service: {:?}", e);
tracing::error!("Failed to register discoverable service: {:?}", e);
cancel_token.cancel();
return Err(error!("Failed to register discoverable service"));
}
......
......@@ -71,7 +71,7 @@ impl ServiceConfigBuilder {
None => builder,
};
log::debug!("Starting service: {}", service_name);
tracing::debug!("Starting service: {}", service_name);
builder
.description(description)
......
......@@ -23,7 +23,6 @@ use std::sync::{Arc, Mutex};
pub use anyhow::{anyhow as error, Context as ErrorContext, Error, Ok as OK, Result};
use async_once_cell::OnceCell;
use tracing as log;
mod config;
pub use config::RuntimeConfig;
......
......@@ -15,7 +15,6 @@
use anyhow::Result;
use async_nats::client::Client;
use tracing as log;
use super::*;
......@@ -131,7 +130,7 @@ where
let ctrl = serde_json::to_vec(&control_message).unwrap();
let data = serde_json::to_vec(&request).unwrap();
log::trace!(
tracing::trace!(
"[req: {}] packaging two-part message; ctrl: {} bytes, data: {} bytes",
id,
ctrl.len(),
......@@ -148,7 +147,7 @@ where
// TRANSPORT ABSTRACT REQUIRED - END HERE
log::trace!("[req: {}] enqueueing two-part message to nats", id);
tracing::trace!("[req: {}] enqueueing two-part message to nats", id);
// we might need to add a timeout on this if there is no subscriber to the subject; however, I think nats
// will handle this for us
......@@ -157,7 +156,7 @@ where
.request(address.to_string(), buffer)
.await?;
log::trace!("[req: {}] awaiting transport handshake", id);
tracing::trace!("[req: {}] awaiting transport handshake", id);
let response_stream = response_stream_provider
.await
.map_err(|_| PipelineError::DetatchedStreamReceiver)?
......
......@@ -18,7 +18,6 @@ use anyhow::Result;
use async_nats::service::endpoint::Endpoint;
use derive_builder::Builder;
use tokio_util::sync::CancellationToken;
use tracing as log;
#[derive(Builder)]
pub struct PushEndpoint {
......@@ -48,9 +47,9 @@ impl PushEndpoint {
// process shutdown
_ = self.cancellation_token.cancelled() => {
// log::trace!(worker_id, "Shutting down service {}", self.endpoint.name);
// tracing::trace!(worker_id, "Shutting down service {}", self.endpoint.name);
if let Err(e) = endpoint.stop().await {
log::warn!("Failed to stop NATS service: {:?}", e);
tracing::warn!("Failed to stop NATS service: {:?}", e);
}
break;
}
......@@ -59,15 +58,15 @@ impl PushEndpoint {
if let Some(req) = req {
let response = "".to_string();
if let Err(e) = req.respond(Ok(response.into())).await {
log::warn!("Failed to respond to request; this may indicate the request has shutdown: {:?}", e);
tracing::warn!("Failed to respond to request; this may indicate the request has shutdown: {:?}", e);
}
let ingress = self.service_handler.clone();
let worker_id = "".to_string();
tokio::spawn(async move {
log::trace!(worker_id, "handling new request");
tracing::trace!(worker_id, "handling new request");
let result = ingress.handle_payload(req.message.payload).await;
log::trace!(worker_id, "request handled: {:?}", result);
tracing::trace!(worker_id, "request handled: {:?}", result);
});
} else {
break;
......
......@@ -18,7 +18,6 @@ use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing as log;
use super::{CallHomeHandshake, ControlMessage, TcpStreamConnectionInfo};
use crate::engine::AsyncEngineContext;
......@@ -161,7 +160,7 @@ impl TcpClient {
tokio::spawn(async move {
while let Some(msg) = bytes_rx.recv().await {
if let Err(e) = framed_writer.send(msg).await {
log::trace!(
tracing::trace!(
"failed to send message to stream; possible disconnect: {:?}",
e
);
......@@ -172,7 +171,7 @@ impl TcpClient {
}
drop(alive_rx);
if let Err(e) = framed_writer.get_mut().shutdown().await {
log::trace!("failed to shutdown writer: {:?}", e);
tracing::trace!("failed to shutdown writer: {:?}", e);
}
});
......
......@@ -25,7 +25,7 @@
//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
//! private; however, for now we are exposing most objects as fully public while the API is maturing.
use super::{error, log, Result, Runtime, RuntimeType};
use super::{error, Result, Runtime, RuntimeType};
use crate::config::{self, RuntimeConfig};
use futures::Future;
......
......@@ -19,7 +19,7 @@
// we will want to associate the components cancellation token with the
// component's "service state"
use crate::{log, transports::nats, Result};
use crate::{transports::nats, Result};
use async_nats::Message;
use async_stream::try_stream;
......@@ -95,7 +95,7 @@ impl ServiceClient {
continue;
}
let service = serde_json::from_slice::<ServiceInfo>(&message.payload)?;
log::trace!("service: {:?}", service);
tracing::trace!("service: {:?}", service);
yield service;
}
}
......@@ -106,7 +106,7 @@ impl ServiceClient {
let (ok, err): (Vec<_>, Vec<_>) = services.into_iter().partition(Result::is_ok);
if !err.is_empty() {
log::error!("failed to collect services: {:?}", err);
tracing::error!("failed to collect services: {:?}", err);
}
Ok(ServiceSet {
......
......@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{error, log, CancellationToken, ErrorContext, Result, Runtime};
use crate::{error, CancellationToken, ErrorContext, Result, Runtime};
use async_nats::jetstream::kv;
use derive_builder::Builder;
......
......@@ -30,9 +30,9 @@ pub async fn create_lease(
tokio::spawn(async move {
match keep_alive(lease_client, id, ttl, child).await {
Ok(_) => log::trace!("keep alive task exited successfully"),
Ok(_) => tracing::trace!("keep alive task exited successfully"),
Err(e) => {
log::info!("keep alive task failed: {:?}", e);
tracing::info!("keep alive task failed: {:?}", e);
token.cancel();
}
}
......@@ -72,7 +72,7 @@ pub async fn keep_alive(
status = heartbeat_receiver.message() => {
if let Some(resp) = status? {
log::trace!(lease_id, "keep alive response received: {:?}", resp);
tracing::trace!(lease_id, "keep alive response received: {:?}", resp);
// update ttl and deadline
ttl = resp.ttl();
......@@ -86,20 +86,20 @@ pub async fn keep_alive(
}
_ = token.cancelled() => {
log::trace!(lease_id, "cancellation token triggered; revoking lease");
tracing::trace!(lease_id, "cancellation token triggered; revoking lease");
let _ = client.revoke(lease_id).await?;
return Ok(());
}
_ = tokio::time::sleep(tokio::time::Duration::from_secs(ttl as u64 / 2)) => {
log::trace!(lease_id, "sending keep alive");
tracing::trace!(lease_id, "sending keep alive");
// if we get a error issuing the heartbeat, set the ttl to 0
// this will allow us to poll the response stream once and the cancellation token once, then
// immediately try to tick the heartbeat
// this will repeat until either the heartbeat is reestablished or the deadline is exceeded
if let Err(e) = heartbeat_sender.keep_alive().await {
log::warn!(lease_id, "keep alive failed: {:?}", e);
tracing::warn!(lease_id, "keep alive failed: {:?}", e);
ttl = 0;
}
}
......
......@@ -760,7 +760,7 @@ mod tests {
// loop {
// let req = tokio::select! {
// _ = cancellation_token.cancelled() => {
// // log::trace!(worker_id, "Shutting down service {}", self.endpoint.name);
// // tracing::trace!(worker_id, "Shutting down service {}", self.endpoint.name);
// return Ok(());
// }
......@@ -773,7 +773,7 @@ mod tests {
// if let Some(req) = req {
// let response = "DONE".to_string();
// if let Err(e) = req.respond(Ok(response.into())).await {
// log::warn!("Failed to respond to the shutdown request: {:?}", e);
// tracing::warn!("Failed to respond to the shutdown request: {:?}", e);
// }
// controller.set_stage(ServiceStage::ShuttingDown);
......
......@@ -32,7 +32,7 @@
//! and release builds. In development, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG] and
//! in release, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE].
use super::{error, log, CancellationToken, Result, Runtime, RuntimeConfig};
use super::{error, CancellationToken, Result, Runtime, RuntimeConfig};
use futures::Future;
use once_cell::sync::OnceCell;
......@@ -151,10 +151,10 @@ impl Worker {
match &result {
Ok(_) => {
log::info!("Application shutdown successfully");
tracing::info!("Application shutdown successfully");
}
Err(e) => {
log::error!("Application shutdown with error: {:?}", e);
tracing::error!("Application shutdown with error: {:?}", e);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment