"examples/llm/vscode:/vscode.git/clone" did not exist on "538b463025d2f971baa0e5b0328c1bab70599510"
Commit 2ee29443 authored by Ryan McCormick's avatar Ryan McCormick Committed by GitHub
Browse files

refactor: Use library constant for kv-hit-rate subject (#48)

Replaces hard-coded "kv-hit-rate" string in multiple places with KV_HIT_RATE_SUBJECT constant in lib/llm.
parent 44bde250
......@@ -28,6 +28,7 @@
//! - Overlap Blocks: Cumulative count of blocks that were already in the KV cache
use clap::Parser;
use dynemo_llm::kv_router::scheduler::KVHitRateEvent;
use dynemo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynemo_runtime::{
error, logging,
traits::events::{EventPublisher, EventSubscriber},
......@@ -125,7 +126,7 @@ async fn app(runtime: Runtime) -> Result<()> {
metrics_server.lock().await.start(9091);
// Subscribe to KV hit rate events
let kv_hit_rate_subject = "kv-hit-rate";
let kv_hit_rate_subject = KV_HIT_RATE_SUBJECT;
tracing::info!("Subscribing to KV hit rate events on subject: {kv_hit_rate_subject}");
// Clone the metrics server and config for the subscription task
......@@ -164,7 +165,7 @@ async fn app(runtime: Runtime) -> Result<()> {
);
}
Err(e) => {
tracing::warn!("Failed to deserialize KV hit rate event: {:?}", e);
tracing::warn!("Failed to deserialize KV hit rate event: {e}");
}
}
}
......
......@@ -35,6 +35,7 @@ use crate::kv_router::{
// this should be discovered from the backend
pub const KV_EVENT_SUBJECT: &str = "kv_events";
pub const KV_HIT_RATE_SUBJECT: &str = "kv-hit-rate";
pub struct KvRouter {
// properties of request plane
......
......@@ -22,6 +22,7 @@ use std::cmp::min;
use crate::kv_router::indexer::OverlapScores;
pub use crate::kv_router::protocols::{ForwardPassMetrics, KV_BLOCK_SIZE};
use crate::kv_router::scoring::ProcessedEndpoints;
use crate::kv_router::KV_HIT_RATE_SUBJECT;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KVHitRateEvent {
......@@ -119,9 +120,8 @@ impl KvScheduler {
// Publisher task
tokio::spawn(async move {
let mut event_rx = event_rx;
let subject = "kv-hit-rate";
while let Some(event) = event_rx.recv().await {
if let Err(e) = ns.publish(subject, &event).await {
if let Err(e) = ns.publish(KV_HIT_RATE_SUBJECT, &event).await {
tracing::warn!("Failed to publish KV hit rate event: {:?}", e);
}
}
......
......@@ -154,7 +154,6 @@ impl Component {
format!("{}/{}", self.namespace, self.name)
}
/// Returns a reference to the namespace string of this component
pub fn namespace(&self) -> &str {
&self.namespace
}
......
......@@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::Context;
use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::{Stream, TryStreamExt};
......@@ -70,7 +71,7 @@ impl EventSubscriber for Namespace {
// Transform the subscriber into a stream of deserialized events
let stream = subscriber.map(move |msg| {
serde_json::from_slice::<T>(&msg.payload)
.map_err(|e| anyhow::anyhow!("Failed to deserialize event: {}", e))
.with_context(|| format!("Failed to deserialize event payload: {:?}", msg.payload))
});
Ok(stream)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment