Unverified Commit fc161924 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: unified tracing across dynamo + sglang (#4248)

parent 8559c2bd
...@@ -103,11 +103,8 @@ async def init(runtime: DistributedRuntime, config: Config): ...@@ -103,11 +103,8 @@ async def init(runtime: DistributedRuntime, config: Config):
server_args, dynamo_args = config.server_args, config.dynamo_args server_args, dynamo_args = config.server_args, config.dynamo_args
# Prevent SGLang from blocking on non-leader nodes # Prevent SGLang from blocking on non-leader nodes
# We can switch this to 0 and leverage our own metrics
# after https://github.com/sgl-project/sglang/pull/13686
# is merged in
if server_args.node_rank >= 1: if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "1" os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
engine = sgl.Engine(server_args=server_args) engine = sgl.Engine(server_args=server_args)
...@@ -222,11 +219,8 @@ async def init_prefill(runtime: DistributedRuntime, config: Config): ...@@ -222,11 +219,8 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
server_args, dynamo_args = config.server_args, config.dynamo_args server_args, dynamo_args = config.server_args, config.dynamo_args
# Prevent SGLang from blocking on non-leader nodes # Prevent SGLang from blocking on non-leader nodes
# We can switch this to 0 and leverage our own metrics
# after https://github.com/sgl-project/sglang/pull/13686
# is merged in
if server_args.node_rank >= 1: if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "1" os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
engine = sgl.Engine(server_args=server_args) engine = sgl.Engine(server_args=server_args)
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import asyncio import asyncio
import base64
import json
import logging import logging
import random import random
import socket import socket
...@@ -10,6 +12,7 @@ from contextlib import asynccontextmanager ...@@ -10,6 +12,7 @@ from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional, Tuple from typing import Any, AsyncGenerator, Dict, Optional, Tuple
import sglang as sgl import sglang as sgl
from sglang.srt.tracing import trace as sglang_trace
from sglang.srt.utils import get_local_ip_auto from sglang.srt.utils import get_local_ip_auto
from dynamo._core import Client, Component, Context from dynamo._core import Client, Component, Context
...@@ -49,6 +52,7 @@ class BaseWorkerHandler(ABC): ...@@ -49,6 +52,7 @@ class BaseWorkerHandler(ABC):
self.prefill_client = prefill_client self.prefill_client = prefill_client
self.serving_mode = config.serving_mode self.serving_mode = config.serving_mode
self.skip_tokenizer_init = config.server_args.skip_tokenizer_init self.skip_tokenizer_init = config.server_args.skip_tokenizer_init
self.enable_trace = config.server_args.enable_trace
@abstractmethod @abstractmethod
async def generate(self, request: Dict[str, Any], context: Context): async def generate(self, request: Dict[str, Any], context: Context):
...@@ -117,6 +121,39 @@ class BaseWorkerHandler(ABC): ...@@ -117,6 +121,39 @@ class BaseWorkerHandler(ABC):
return bootstrap_host, bootstrap_port return bootstrap_host, bootstrap_port
def _propagate_trace_context_to_sglang(
self, context: Context, bootstrap_room: int = 0
):
"""Propagate Dynamo's trace context to SGLang for distributed tracing. SGLang expects a certain
format derived by loooking at https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/tracing/trace.py
in the to_dict() method.
Args:
context: Dynamo Context object containing trace information.
bootstrap_room: Bootstrap room ID (0 for aggregated, actual room for disaggregated).
"""
trace_id = context.trace_id
span_id = context.span_id
if not trace_id or not span_id:
return
# Build trace context for SGLang
trace_context = {
str(bootstrap_room): {
"root_span": {"traceparent": f"00-{trace_id}-{span_id}-01"},
"prev_span": {
"span_id": int(span_id, 16),
"trace_id": int(trace_id, 16),
},
}
}
# Encode and propagate
base64_context = base64.b64encode(
json.dumps(trace_context, ensure_ascii=False).encode("utf-8")
).decode("utf-8")
sglang_trace.trace_set_remote_propagate_context(base64_context)
async def _handle_cancellation( async def _handle_cancellation(
self, request_id_future: asyncio.Future, context: Context self, request_id_future: asyncio.Future, context: Context
): ):
......
...@@ -112,6 +112,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -112,6 +112,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
RuntimeError: If no bootstrap info received from prefill worker. RuntimeError: If no bootstrap info received from prefill worker.
""" """
logging.debug(f"New Request ID: {context.id()}") logging.debug(f"New Request ID: {context.id()}")
trace_id = context.trace_id
sampling_params = self._build_sampling_params(request) sampling_params = self._build_sampling_params(request)
input_param = self._get_input_param(request) input_param = self._get_input_param(request)
...@@ -154,6 +155,11 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -154,6 +155,11 @@ class DecodeWorkerHandler(BaseWorkerHandler):
if not bootstrap_info: if not bootstrap_info:
raise RuntimeError("No bootstrap info received from prefill worker") raise RuntimeError("No bootstrap info received from prefill worker")
if self.enable_trace:
self._propagate_trace_context_to_sglang(
context, bootstrap_info["bootstrap_room"]
)
decode = await self.engine.async_generate( decode = await self.engine.async_generate(
**input_param, **input_param,
sampling_params=sampling_params, sampling_params=sampling_params,
...@@ -161,6 +167,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -161,6 +167,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
bootstrap_host=bootstrap_info["bootstrap_host"], bootstrap_host=bootstrap_info["bootstrap_host"],
bootstrap_port=bootstrap_info["bootstrap_port"], bootstrap_port=bootstrap_info["bootstrap_port"],
bootstrap_room=bootstrap_info["bootstrap_room"], bootstrap_room=bootstrap_info["bootstrap_room"],
rid=trace_id,
) )
if self.skip_tokenizer_init: if self.skip_tokenizer_init:
...@@ -170,10 +177,14 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -170,10 +177,14 @@ class DecodeWorkerHandler(BaseWorkerHandler):
async for out in self._process_text_stream(decode, context): async for out in self._process_text_stream(decode, context):
yield out yield out
else: else:
if self.enable_trace:
self._propagate_trace_context_to_sglang(context)
agg = await self.engine.async_generate( agg = await self.engine.async_generate(
**input_param, **input_param,
sampling_params=sampling_params, sampling_params=sampling_params,
stream=True, stream=True,
rid=trace_id,
) )
if self.skip_tokenizer_init: if self.skip_tokenizer_init:
async for out in self._process_token_stream(agg, context): async for out in self._process_token_stream(agg, context):
......
...@@ -64,6 +64,7 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -64,6 +64,7 @@ class PrefillWorkerHandler(BaseWorkerHandler):
Bootstrap info dict with host, port, and room for decode worker connection. Bootstrap info dict with host, port, and room for decode worker connection.
""" """
logging.debug(f"New Request ID: {context.id()}") logging.debug(f"New Request ID: {context.id()}")
trace_id = context.trace_id
bootstrap_room = self._generate_bootstrap_room() bootstrap_room = self._generate_bootstrap_room()
bootstrap_info = { bootstrap_info = {
...@@ -76,6 +77,10 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -76,6 +77,10 @@ class PrefillWorkerHandler(BaseWorkerHandler):
input_param = self._get_input_param(request["request"]) input_param = self._get_input_param(request["request"])
# Propagate trace context to SGLang
if self.enable_trace:
self._propagate_trace_context_to_sglang(context, bootstrap_room)
results = await self.engine.async_generate( results = await self.engine.async_generate(
**input_param, **input_param,
sampling_params=request["sampling_params"], sampling_params=request["sampling_params"],
...@@ -83,6 +88,7 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -83,6 +88,7 @@ class PrefillWorkerHandler(BaseWorkerHandler):
bootstrap_host=self.bootstrap_host, bootstrap_host=self.bootstrap_host,
bootstrap_port=self.bootstrap_port, bootstrap_port=self.bootstrap_port,
bootstrap_room=bootstrap_room, bootstrap_room=bootstrap_room,
rid=trace_id,
) )
task = asyncio.create_task(self._consume_results(results, context)) task = asyncio.create_task(self._consume_results(results, context))
......
...@@ -66,7 +66,7 @@ ARG DEEPEP_COMMIT=9af0e0d0e74f3577af1979c9b9e1ac2cad0104ee ...@@ -66,7 +66,7 @@ ARG DEEPEP_COMMIT=9af0e0d0e74f3577af1979c9b9e1ac2cad0104ee
ARG DEEPEP_GB_COMMIT=1b14ad661c7640137fcfe93cccb2694ede1220b0 ARG DEEPEP_GB_COMMIT=1b14ad661c7640137fcfe93cccb2694ede1220b0
ARG CMAKE_BUILD_PARALLEL_LEVEL=2 ARG CMAKE_BUILD_PARALLEL_LEVEL=2
ARG SGL_KERNEL_VERSION=0.3.16.post5 ARG SGL_KERNEL_VERSION=0.3.16.post5
ARG SGLANG_COMMIT=0.5.4.post3 ARG SGLANG_COMMIT=0.5.6
ARG GDRCOPY_COMMIT=v2.4.4 ARG GDRCOPY_COMMIT=v2.4.4
ARG NVSHMEM_VERSION=3.3.9 ARG NVSHMEM_VERSION=3.3.9
ARG GRACE_BLACKWELL=false ARG GRACE_BLACKWELL=false
......
...@@ -9,7 +9,7 @@ distributor: ...@@ -9,7 +9,7 @@ distributor:
otlp: otlp:
protocols: protocols:
grpc: grpc:
endpoint: 0.0.0.0:4317 endpoint: 0.0.0.0:4317 # Receives from OTEL collector
http: http:
endpoint: 0.0.0.0:4318 endpoint: 0.0.0.0:4318
......
...@@ -46,10 +46,12 @@ while [[ $# -gt 0 ]]; do ...@@ -46,10 +46,12 @@ while [[ $# -gt 0 ]]; do
done done
# Enable tracing if requested # Enable tracing if requested
TRACE_ARGS=()
if [ "$ENABLE_OTEL" = true ]; then if [ "$ENABLE_OTEL" = true ]; then
export DYN_LOGGING_JSONL=true export DYN_LOGGING_JSONL=true
export OTEL_EXPORT_ENABLED=1 export OTEL_EXPORT_ENABLED=1
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317} export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317}
TRACE_ARGS+=(--enable-trace --otlp-traces-endpoint localhost:4317)
fi fi
# run ingress # run ingress
...@@ -59,7 +61,7 @@ python3 -m dynamo.frontend & ...@@ -59,7 +61,7 @@ python3 -m dynamo.frontend &
DYNAMO_PID=$! DYNAMO_PID=$!
# run worker with metrics enabled # run worker with metrics enabled
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \ OTEL_SERVICE_NAME=dynamo-worker DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
python3 -m dynamo.sglang \ python3 -m dynamo.sglang \
--model-path "$MODEL" \ --model-path "$MODEL" \
--served-model-name "$MODEL" \ --served-model-name "$MODEL" \
...@@ -68,4 +70,5 @@ python3 -m dynamo.sglang \ ...@@ -68,4 +70,5 @@ python3 -m dynamo.sglang \
--trust-remote-code \ --trust-remote-code \
--skip-tokenizer-init \ --skip-tokenizer-init \
--enable-metrics \ --enable-metrics \
"${TRACE_ARGS[@]}" \
"${EXTRA_ARGS[@]}" "${EXTRA_ARGS[@]}"
...@@ -37,10 +37,12 @@ while [[ $# -gt 0 ]]; do ...@@ -37,10 +37,12 @@ while [[ $# -gt 0 ]]; do
done done
# Enable tracing if requested # Enable tracing if requested
TRACE_ARGS=()
if [ "$ENABLE_OTEL" = true ]; then if [ "$ENABLE_OTEL" = true ]; then
export DYN_LOGGING_JSONL=true export DYN_LOGGING_JSONL=true
export OTEL_EXPORT_ENABLED=1 export OTEL_EXPORT_ENABLED=1
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317} export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317}
TRACE_ARGS+=(--enable-trace --otlp-traces-endpoint localhost:4317)
fi fi
# run ingress # run ingress
...@@ -59,4 +61,5 @@ python3 -m dynamo.sglang \ ...@@ -59,4 +61,5 @@ python3 -m dynamo.sglang \
--tp 1 \ --tp 1 \
--trust-remote-code \ --trust-remote-code \
--use-sglang-tokenizer \ --use-sglang-tokenizer \
--enable-metrics --enable-metrics \
"${TRACE_ARGS[@]}"
...@@ -37,10 +37,12 @@ while [[ $# -gt 0 ]]; do ...@@ -37,10 +37,12 @@ while [[ $# -gt 0 ]]; do
done done
# Enable tracing if requested # Enable tracing if requested
TRACE_ARGS=()
if [ "$ENABLE_OTEL" = true ]; then if [ "$ENABLE_OTEL" = true ]; then
export DYN_LOGGING_JSONL=true export DYN_LOGGING_JSONL=true
export OTEL_EXPORT_ENABLED=1 export OTEL_EXPORT_ENABLED=1
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317} export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317}
TRACE_ARGS+=(--enable-trace --otlp-traces-endpoint localhost:4317)
fi fi
# run ingress # run ingress
...@@ -58,7 +60,8 @@ python3 -m dynamo.sglang \ ...@@ -58,7 +60,8 @@ python3 -m dynamo.sglang \
--tp 1 \ --tp 1 \
--trust-remote-code \ --trust-remote-code \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}' \ --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}' \
--enable-metrics & --enable-metrics \
"${TRACE_ARGS[@]}" &
WORKER_PID=$! WORKER_PID=$!
OTEL_SERVICE_NAME=dynamo-worker-2 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_WORKER2:-8082} \ OTEL_SERVICE_NAME=dynamo-worker-2 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_WORKER2:-8082} \
...@@ -69,4 +72,5 @@ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \ ...@@ -69,4 +72,5 @@ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \
--tp 1 \ --tp 1 \
--trust-remote-code \ --trust-remote-code \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5558"}' \ --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5558"}' \
--enable-metrics --enable-metrics \
"${TRACE_ARGS[@]}"
...@@ -37,10 +37,12 @@ while [[ $# -gt 0 ]]; do ...@@ -37,10 +37,12 @@ while [[ $# -gt 0 ]]; do
done done
# Enable tracing if requested # Enable tracing if requested
TRACE_ARGS=()
if [ "$ENABLE_OTEL" = true ]; then if [ "$ENABLE_OTEL" = true ]; then
export DYN_LOGGING_JSONL=true export DYN_LOGGING_JSONL=true
export OTEL_EXPORT_ENABLED=1 export OTEL_EXPORT_ENABLED=1
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317} export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317}
TRACE_ARGS+=(--enable-trace --otlp-traces-endpoint localhost:4317)
fi fi
# run ingress # run ingress
...@@ -65,7 +67,8 @@ python3 -m dynamo.sglang \ ...@@ -65,7 +67,8 @@ python3 -m dynamo.sglang \
--host 0.0.0.0 \ --host 0.0.0.0 \
--port 40000 \ --port 40000 \
--disaggregation-transfer-backend nixl \ --disaggregation-transfer-backend nixl \
--enable-metrics --log-level debug & --enable-metrics \
"${TRACE_ARGS[@]}" &
PREFILL_PID=$! PREFILL_PID=$!
# run decode worker # run decode worker
...@@ -81,4 +84,5 @@ CUDA_VISIBLE_DEVICES=2,3 python3 -m dynamo.sglang \ ...@@ -81,4 +84,5 @@ CUDA_VISIBLE_DEVICES=2,3 python3 -m dynamo.sglang \
--disaggregation-bootstrap-port 12345 \ --disaggregation-bootstrap-port 12345 \
--host 0.0.0.0 \ --host 0.0.0.0 \
--disaggregation-transfer-backend nixl \ --disaggregation-transfer-backend nixl \
--enable-metrics --log-level debug --enable-metrics \
"${TRACE_ARGS[@]}"
...@@ -38,10 +38,12 @@ while [[ $# -gt 0 ]]; do ...@@ -38,10 +38,12 @@ while [[ $# -gt 0 ]]; do
done done
# Enable tracing if requested # Enable tracing if requested
TRACE_ARGS=()
if [ "$ENABLE_OTEL" = true ]; then if [ "$ENABLE_OTEL" = true ]; then
export DYN_LOGGING_JSONL=true export DYN_LOGGING_JSONL=true
export OTEL_EXPORT_ENABLED=1 export OTEL_EXPORT_ENABLED=1
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317} export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-http://localhost:4317}
TRACE_ARGS+=(--enable-trace --otlp-traces-endpoint localhost:4317)
fi fi
# run ingress # run ingress
...@@ -74,7 +76,8 @@ python3 -m dynamo.sglang \ ...@@ -74,7 +76,8 @@ python3 -m dynamo.sglang \
--host 0.0.0.0 \ --host 0.0.0.0 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}' \ --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}' \
--disaggregation-transfer-backend nixl \ --disaggregation-transfer-backend nixl \
--enable-metrics & --enable-metrics \
"${TRACE_ARGS[@]}" &
PREFILL_PID=$! PREFILL_PID=$!
# run prefill worker # run prefill worker
...@@ -89,7 +92,8 @@ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \ ...@@ -89,7 +92,8 @@ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \
--host 0.0.0.0 \ --host 0.0.0.0 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5558"}' \ --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5558"}' \
--disaggregation-transfer-backend nixl \ --disaggregation-transfer-backend nixl \
--enable-metrics & --enable-metrics \
"${TRACE_ARGS[@]}" &
PREFILL_PID=$! PREFILL_PID=$!
# run decode worker # run decode worker
...@@ -104,7 +108,8 @@ CUDA_VISIBLE_DEVICES=3 python3 -m dynamo.sglang \ ...@@ -104,7 +108,8 @@ CUDA_VISIBLE_DEVICES=3 python3 -m dynamo.sglang \
--host 0.0.0.0 \ --host 0.0.0.0 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5560"}' \ --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5560"}' \
--disaggregation-transfer-backend nixl \ --disaggregation-transfer-backend nixl \
--enable-metrics & --enable-metrics \
"${TRACE_ARGS[@]}" &
PREFILL_PID=$! PREFILL_PID=$!
# run decode worker # run decode worker
...@@ -119,4 +124,5 @@ CUDA_VISIBLE_DEVICES=2 python3 -m dynamo.sglang \ ...@@ -119,4 +124,5 @@ CUDA_VISIBLE_DEVICES=2 python3 -m dynamo.sglang \
--host 0.0.0.0 \ --host 0.0.0.0 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5559"}' \ --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5559"}' \
--disaggregation-transfer-backend nixl \ --disaggregation-transfer-backend nixl \
--enable-metrics --enable-metrics \
"${TRACE_ARGS[@]}"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment