Unverified Commit 2d39ded6 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: unified logging, added informative warnings for KV router example (#912)

parent 942a0fb9
...@@ -23,7 +23,6 @@ from typing import AsyncIterator ...@@ -23,7 +23,6 @@ from typing import AsyncIterator
from components.worker import VllmWorker from components.worker import VllmWorker
from utils.logging import check_required_workers from utils.logging import check_required_workers
from utils.protocol import Tokens from utils.protocol import Tokens
from vllm.logger import logger as vllm_logger
from dynamo.llm import AggregatedMetrics, KvIndexer, KvMetricsAggregator, OverlapScores from dynamo.llm import AggregatedMetrics, KvIndexer, KvMetricsAggregator, OverlapScores
from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service
...@@ -83,7 +82,7 @@ class Router: ...@@ -83,7 +82,7 @@ class Router:
worker = depends(VllmWorker) worker = depends(VllmWorker)
def __init__(self): def __init__(self):
vllm_logger.info("Initializing Custom Router") logger.info("Initializing Custom Router")
self.args = parse_args(self.__class__.__name__, "") self.args = parse_args(self.__class__.__name__, "")
self.default_metrics = { self.default_metrics = {
...@@ -141,6 +140,8 @@ class Router: ...@@ -141,6 +140,8 @@ class Router:
worker_scores[worker_id] = ( worker_scores[worker_id] = (
score * self.indexer.block_size() / token_length score * self.indexer.block_size() / token_length
) )
else:
logger.warning("Cannot get KV scores")
worker_metrics = {} worker_metrics = {}
max_waiting = 0.0 max_waiting = 0.0
...@@ -154,6 +155,8 @@ class Router: ...@@ -154,6 +155,8 @@ class Router:
max_waiting = max( max_waiting = max(
max_waiting, worker_metrics[worker_id]["num_requests_waiting"] max_waiting, worker_metrics[worker_id]["num_requests_waiting"]
) )
else:
logger.warning("Cannot get metrics")
# Get all worker IDs from the client. This is needed because scores / metrics may not have values for all workers # Get all worker IDs from the client. This is needed because scores / metrics may not have values for all workers
# and we want all workers to be considered in the logit calculation # and we want all workers to be considered in the logit calculation
...@@ -175,7 +178,7 @@ class Router: ...@@ -175,7 +178,7 @@ class Router:
# Have 1 metric that weights towards cache hit # Have 1 metric that weights towards cache hit
# 2 metrics that penalize overloaded worker and queuing # 2 metrics that penalize overloaded worker and queuing
worker_logits[worker_id] = 2 * score - gpu_cache_usage - normalized_waiting worker_logits[worker_id] = 2 * score - gpu_cache_usage - normalized_waiting
vllm_logger.info( logger.info(
f"Formula for {worker_id}: {worker_logits[worker_id]:.3f} = 2.0 * {score:.3f} - {gpu_cache_usage:.3f} - {normalized_waiting:.3f}" f"Formula for {worker_id}: {worker_logits[worker_id]:.3f} = 2.0 * {score:.3f} - {gpu_cache_usage:.3f} - {normalized_waiting:.3f}"
) )
...@@ -204,7 +207,7 @@ class Router: ...@@ -204,7 +207,7 @@ class Router:
# Log to vllm_logger # Log to vllm_logger
for message in log_messages: for message in log_messages:
vllm_logger.info(message) logger.info(message)
return best_worker_id, worker_scores.get(best_worker_id, 0.0) return best_worker_id, worker_scores.get(best_worker_id, 0.0)
...@@ -217,14 +220,14 @@ class Router: ...@@ -217,14 +220,14 @@ class Router:
) )
except Exception as e: except Exception as e:
scores = {} scores = {}
vllm_logger.exception(f"Error finding matches: {e}") logger.exception(f"Error finding matches: {e}")
metrics = await self.metrics_aggregator.get_metrics() metrics = await self.metrics_aggregator.get_metrics()
worker_id, prefix_hit_rate = self._cost_function( worker_id, prefix_hit_rate = self._cost_function(
scores, metrics, len(request.tokens) scores, metrics, len(request.tokens)
) )
vllm_logger.info( logger.info(
f"Scheduling to worker_id: {worker_id} with estimated prefix hit rate: {prefix_hit_rate}" f"Scheduling to worker_id: {worker_id} with estimated prefix hit rate: {prefix_hit_rate}"
) )
yield f"{worker_id}_{prefix_hit_rate}" yield f"{worker_id}_{prefix_hit_rate}"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment