Unverified Commit b401a1d6 authored by jh-nv's avatar jh-nv Committed by GitHub
Browse files

feat: propagate trace context to vLLM engine (#4918)


Signed-off-by: default avatarJie Hao <jihao@nvidia.com>
parent 676ce434
...@@ -2852,6 +2852,7 @@ dependencies = [ ...@@ -2852,6 +2852,7 @@ dependencies = [
"dashmap 6.1.0", "dashmap 6.1.0",
"derive-getters", "derive-getters",
"derive_builder", "derive_builder",
"dynamo-config",
"educe", "educe",
"either", "either",
"env_logger", "env_logger",
......
...@@ -16,6 +16,7 @@ from vllm.outputs import RequestOutput ...@@ -16,6 +16,7 @@ from vllm.outputs import RequestOutput
from vllm.sampling_params import SamplingParams, StructuredOutputsParams from vllm.sampling_params import SamplingParams, StructuredOutputsParams
from vllm.v1.engine.exceptions import EngineDeadError from vllm.v1.engine.exceptions import EngineDeadError
from dynamo._core import Context
from dynamo.common.utils.input_params import InputParamManager from dynamo.common.utils.input_params import InputParamManager
from dynamo.llm import ( from dynamo.llm import (
ModelInput, ModelInput,
...@@ -739,6 +740,20 @@ class BaseWorkerHandler(ABC): ...@@ -739,6 +740,20 @@ class BaseWorkerHandler(ABC):
return log_probs if log_probs else None, top_logprobs if top_logprobs else None return log_probs if log_probs else None, top_logprobs if top_logprobs else None
def _build_trace_headers(self, context: Context) -> dict[str, str] | None:
"""
Build trace headers from context for propagation to vLLM engine.
"""
trace_id = context.trace_id
span_id = context.span_id
if not trace_id or not span_id:
return None
# W3C Trace Context format: {version}-{trace_id}-{parent_id}-{trace_flags}
# version: 00, trace_flags: 01 (sampled)
# TODO: properly propagate the trace-flags from current span.
return {"traceparent": f"00-{trace_id}-{span_id}-01"}
async def generate_tokens( async def generate_tokens(
self, self,
prompt, prompt,
...@@ -746,6 +761,7 @@ class BaseWorkerHandler(ABC): ...@@ -746,6 +761,7 @@ class BaseWorkerHandler(ABC):
request_id, request_id,
data_parallel_rank=None, data_parallel_rank=None,
lora_request=None, lora_request=None,
trace_headers=None,
): ):
try: try:
# Log LoRA usage for this generation (debug level to avoid log spam) # Log LoRA usage for this generation (debug level to avoid log spam)
...@@ -764,6 +780,7 @@ class BaseWorkerHandler(ABC): ...@@ -764,6 +780,7 @@ class BaseWorkerHandler(ABC):
request_id, request_id,
lora_request=lora_request, lora_request=lora_request,
data_parallel_rank=data_parallel_rank, data_parallel_rank=data_parallel_rank,
trace_headers=trace_headers,
) )
num_output_tokens_so_far = 0 num_output_tokens_so_far = 0
...@@ -923,6 +940,8 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -923,6 +940,8 @@ class DecodeWorkerHandler(BaseWorkerHandler):
dp_rank = request.get("dp_rank", None) dp_rank = request.get("dp_rank", None)
trace_headers = self._build_trace_headers(context)
async with self._abort_monitor(context, request_id): async with self._abort_monitor(context, request_id):
try: try:
async for tok in self.generate_tokens( async for tok in self.generate_tokens(
...@@ -931,6 +950,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -931,6 +950,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
request_id, request_id,
data_parallel_rank=dp_rank, data_parallel_rank=dp_rank,
lora_request=lora_request, lora_request=lora_request,
trace_headers=trace_headers,
): ):
if prefill_result is not None and "completion_usage" in tok: if prefill_result is not None and "completion_usage" in tok:
tok["completion_usage"][ tok["completion_usage"][
...@@ -965,6 +985,8 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -965,6 +985,8 @@ class DecodeWorkerHandler(BaseWorkerHandler):
openai_request_id = request.get("id") or request.get("request_id", request_id) openai_request_id = request.get("id") or request.get("request_id", request_id)
previous_text = "" previous_text = ""
trace_headers = self._build_trace_headers(context)
async with self._abort_monitor(context, request_id): async with self._abort_monitor(context, request_id):
try: try:
gen = self.engine_client.generate( gen = self.engine_client.generate(
...@@ -972,6 +994,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -972,6 +994,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
sampling_params, sampling_params,
request_id, request_id,
data_parallel_rank=dp_rank, data_parallel_rank=dp_rank,
trace_headers=trace_headers,
) )
async for res in gen: async for res in gen:
...@@ -1114,6 +1137,8 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -1114,6 +1137,8 @@ class PrefillWorkerHandler(BaseWorkerHandler):
dp_rank = request.get("dp_rank", None) dp_rank = request.get("dp_rank", None)
trace_headers = self._build_trace_headers(context)
async with self._abort_monitor(context, request_id, is_prefill=True): async with self._abort_monitor(context, request_id, is_prefill=True):
try: try:
gen = self.engine_client.generate( gen = self.engine_client.generate(
...@@ -1122,6 +1147,7 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -1122,6 +1147,7 @@ class PrefillWorkerHandler(BaseWorkerHandler):
request_id, request_id,
data_parallel_rank=dp_rank, data_parallel_rank=dp_rank,
lora_request=lora_request, lora_request=lora_request,
trace_headers=trace_headers,
) )
except EngineDeadError as e: except EngineDeadError as e:
logger.error(f"vLLM EngineDeadError: {e}") logger.error(f"vLLM EngineDeadError: {e}")
......
...@@ -52,7 +52,8 @@ python -m dynamo.frontend --router-mode kv & ...@@ -52,7 +52,8 @@ python -m dynamo.frontend --router-mode kv &
# Start a single vLLM worker (aggregated prefill and decode) # Start a single vLLM worker (aggregated prefill and decode)
export OTEL_SERVICE_NAME=dynamo-worker-vllm export OTEL_SERVICE_NAME=dynamo-worker-vllm
python -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager & python -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager \
--otlp-traces-endpoint="$OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" &
wait wait
``` ```
...@@ -89,13 +90,17 @@ python -m dynamo.frontend --router-mode kv & ...@@ -89,13 +90,17 @@ python -m dynamo.frontend --router-mode kv &
# Run decode worker, make sure to wait for start up # Run decode worker, make sure to wait for start up
export OTEL_SERVICE_NAME=dynamo-worker-decode export OTEL_SERVICE_NAME=dynamo-worker-decode
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager & CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--model Qwen/Qwen3-0.6B \
--enforce-eager \
--otlp-traces-endpoint="$OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" &
# Run prefill worker, make sure to wait for start up # Run prefill worker, make sure to wait for start up
export OTEL_SERVICE_NAME=dynamo-worker-prefill export OTEL_SERVICE_NAME=dynamo-worker-prefill
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
--model Qwen/Qwen3-0.6B \ --model Qwen/Qwen3-0.6B \
--enforce-eager \ --enforce-eager \
--otlp-traces-endpoint="$OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" \
--is-prefill-worker & --is-prefill-worker &
``` ```
......
...@@ -34,6 +34,7 @@ chrono = { workspace = true } ...@@ -34,6 +34,7 @@ chrono = { workspace = true }
dashmap = { workspace = true } dashmap = { workspace = true }
derive_builder = { workspace = true } derive_builder = { workspace = true }
derive-getters = { workspace = true } derive-getters = { workspace = true }
dynamo-config = { workspace = true }
either = { workspace = true } either = { workspace = true }
etcd-client = { workspace = true } etcd-client = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
......
...@@ -89,6 +89,8 @@ use tracing_subscriber::util::SubscriberInitExt; ...@@ -89,6 +89,8 @@ use tracing_subscriber::util::SubscriberInitExt;
use crate::config::environment_names::logging as env_logging; use crate::config::environment_names::logging as env_logging;
use dynamo_config::env_is_truthy;
/// Default log level /// Default log level
const DEFAULT_FILTER_LEVEL: &str = "info"; const DEFAULT_FILTER_LEVEL: &str = "info";
...@@ -130,11 +132,9 @@ impl Default for LoggingConfig { ...@@ -130,11 +132,9 @@ impl Default for LoggingConfig {
} }
} }
/// Check if OTLP trace exporting is enabled (set OTEL_EXPORT_ENABLED to "1" to enable) /// Check if OTLP trace exporting is enabled (accepts: "1", "true", "on", "yes" - case insensitive)
fn otlp_exporter_enabled() -> bool { fn otlp_exporter_enabled() -> bool {
std::env::var(env_logging::otlp::OTEL_EXPORT_ENABLED) env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
.map(|v| v == "1")
.unwrap_or(false)
} }
/// Get the service name from environment or use default /// Get the service name from environment or use default
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment