Unverified Commit 6afb4d5b authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: workaround vllm prometheus cleanup error on shutdown (#4323)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 0e77d344
......@@ -4,6 +4,7 @@
import asyncio
import logging
import os
import tempfile
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Final
......@@ -73,6 +74,7 @@ class BaseWorkerHandler(ABC):
self.kv_publishers: list[ZmqKvEventPublisher] | None = None
self.engine_monitor = VllmEngineMonitor(runtime, engine)
self.image_loader = ImageLoader()
self.temp_dirs: list[tempfile.TemporaryDirectory] = []
@abstractmethod
async def generate(self, request, context) -> AsyncGenerator[dict, None]:
......@@ -115,9 +117,18 @@ class BaseWorkerHandler(ABC):
except Exception as e:
yield {"status": "error", "message": str(e)}
def add_temp_dir(self, temp_dir: tempfile.TemporaryDirectory) -> None:
"""Add a temporary directory to be cleaned up later."""
if temp_dir is not None:
self.temp_dirs.append(temp_dir)
def cleanup(self):
"""Override in subclasses if cleanup is needed."""
pass
"""Clean up resources including temporary directories."""
for temp_dir in self.temp_dirs:
try:
temp_dir.cleanup()
except Exception as e:
logger.warning(f"Failed to clean up temp directory: {e}")
async def _extract_multimodal_data(
self, request: Dict[str, Any]
......
......@@ -5,6 +5,7 @@ import asyncio
import logging
import os
import signal
import tempfile
from typing import Optional
import uvloop
......@@ -192,6 +193,16 @@ def setup_kv_event_publisher(
def setup_vllm_engine(config, stat_logger=None):
# Set PROMETHEUS_MULTIPROC_DIR before setup to avoid vLLM v0.11.0 bug
# vllm/v1/metrics/prometheus.py:79 passes TemporaryDirectory object instead of .name
prometheus_temp_dir = None
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
prometheus_temp_dir = tempfile.TemporaryDirectory(prefix="vllm_prometheus_")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prometheus_temp_dir.name
logger.debug(
f"Created PROMETHEUS_MULTIPROC_DIR at: {os.environ['PROMETHEUS_MULTIPROC_DIR']}"
)
setup_multiprocess_prometheus()
logger.debug(
f"Prometheus multiproc dir set to: {os.environ.get('PROMETHEUS_MULTIPROC_DIR')}"
......@@ -253,7 +264,7 @@ def setup_vllm_engine(config, stat_logger=None):
else:
logger.info(f"VllmWorker for {config.served_model_name} has been initialized")
return engine_client, vllm_config, default_sampling_params
return engine_client, vllm_config, default_sampling_params, prometheus_temp_dir
async def register_vllm_model(
......@@ -320,11 +331,17 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
generate_endpoint = component.endpoint(config.endpoint)
clear_endpoint = component.endpoint("clear_kv_blocks")
engine_client, vllm_config, default_sampling_params = setup_vllm_engine(config)
(
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
) = setup_vllm_engine(config)
handler = PrefillWorkerHandler(
runtime, component, engine_client, default_sampling_params
)
handler.add_temp_dir(prometheus_temp_dir)
# Check if kv event consolidator is enabled (port was allocated in setup_vllm_engine)
consolidator_enabled = False
......@@ -416,9 +433,12 @@ async def init(runtime: DistributedRuntime, config: Config):
config.engine_args.data_parallel_rank or 0,
metrics_labels=[("model", config.served_model_name or config.model)],
)
engine_client, vllm_config, default_sampling_params = setup_vllm_engine(
config, factory
)
(
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
) = setup_vllm_engine(config, factory)
# TODO Hack to get data, move this to registering in TBD
factory.set_num_gpu_blocks_all(vllm_config.cache_config.num_gpu_blocks)
......@@ -431,6 +451,7 @@ async def init(runtime: DistributedRuntime, config: Config):
engine_client,
default_sampling_params,
)
handler.add_temp_dir(prometheus_temp_dir)
# Check if kv event consolidator is enabled (port was allocated in setup_vllm_engine)
consolidator_enabled = False
......@@ -636,7 +657,12 @@ async def init_multimodal_worker(runtime: DistributedRuntime, config: Config):
generate_endpoint = component.endpoint(config.endpoint)
clear_endpoint = component.endpoint("clear_kv_blocks")
engine_client, vllm_config, default_sampling_params = setup_vllm_engine(config)
(
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
) = setup_vllm_engine(config)
# Set up decode worker client for disaggregated mode
decode_worker_client = None
......@@ -660,6 +686,7 @@ async def init_multimodal_worker(runtime: DistributedRuntime, config: Config):
handler = MultimodalPDWorkerHandler(
runtime, component, engine_client, config, decode_worker_client
)
handler.add_temp_dir(prometheus_temp_dir)
await handler.async_init(runtime)
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Usage: ./disagg_same_gpu.sh
# Automatically calculates GPU memory fraction so each worker gets 4GB
# Get total and free GPU memory
GPU_MEM_INFO=$(python3 -c "import torch; free, total = torch.cuda.mem_get_info(); print(f'{free/1024**3:.2f} {total/1024**3:.2f}')" 2>/dev/null)
if [ $? -ne 0 ]; then
echo "Error: Failed to check GPU memory. Is PyTorch with CUDA available?"
exit 1
fi
FREE_GPU_GB=$(echo $GPU_MEM_INFO | awk '{print $1}')
TOTAL_GPU_GB=$(echo $GPU_MEM_INFO | awk '{print $2}')
# Each worker needs 4GB
REQUIRED_GB_PER_WORKER=4
REQUIRED_GB_TOTAL=8
# Calculate fraction needed per worker (4GB / total GPU memory)
GPU_MEM_FRACTION=$(python3 -c "print(f'{$REQUIRED_GB_PER_WORKER / $TOTAL_GPU_GB:.3f}')")
# Check if we have enough free memory
if python3 -c "import sys; sys.exit(0 if float('$FREE_GPU_GB') >= $REQUIRED_GB_TOTAL else 1)"; then
echo "GPU memory check passed: ${FREE_GPU_GB}GB free / ${TOTAL_GPU_GB}GB total (required: ${REQUIRED_GB_TOTAL}GB)"
echo "Using ${GPU_MEM_FRACTION} memory fraction per worker (${REQUIRED_GB_PER_WORKER}GB each)"
else
echo "Error: Insufficient GPU memory. Required: ${REQUIRED_GB_TOTAL}GB, Available: ${FREE_GPU_GB}GB"
echo "Please free up GPU memory before running disaggregated mode on single GPU."
exit 1
fi
# Setup cleanup trap
cleanup() {
echo "Cleaning up background processes..."
kill $DYNAMO_PID $DECODE_PID 2>/dev/null || true
wait $DYNAMO_PID $DECODE_PID 2>/dev/null || true
echo "Cleanup complete."
}
trap cleanup EXIT INT TERM
# run ingress
python3 -m dynamo.frontend --http-port=8000 &
DYNAMO_PID=$!
# run decode worker with metrics on port 8081
# --enforce-eager is added for quick deployment. for production use, need to remove this flag
DYN_SYSTEM_PORT=8081 \
CUDA_VISIBLE_DEVICES=0 \
python3 -m dynamo.vllm \
--model Qwen/Qwen3-0.6B \
--enforce-eager \
--gpu-memory-utilization ${GPU_MEM_FRACTION} &
DECODE_PID=$!
# Wait for decode worker to initialize before starting prefill worker
# This prevents both workers from competing for GPU memory simultaneously, which can cause OOM.
# The decode worker needs time to:
# 1. Load model weights and allocate its memory fraction
# 2. Initialize KV cache
# 3. Register with NATS service discovery so prefill worker can find it
echo "Waiting for decode worker to initialize..."
sleep 10
# run prefill worker with metrics on port 8082 (foreground)
DYN_SYSTEM_PORT=8082 \
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=0 \
python3 -m dynamo.vllm \
--model Qwen/Qwen3-0.6B \
--enforce-eager \
--is-prefill-worker \
--gpu-memory-utilization ${GPU_MEM_FRACTION}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment