Unverified Commit 777e602b authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

fix: signal handlers to clean up zombie vllm processes (#545)

parent 8edd23dc
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import asyncio import asyncio
import logging import logging
import os import os
import signal
import sys import sys
from pydantic import BaseModel from pydantic import BaseModel
...@@ -74,6 +75,9 @@ class PrefillWorker: ...@@ -74,6 +75,9 @@ class PrefillWorker:
) )
self.engine_args.enable_prefix_caching = False self.engine_args.enable_prefix_caching = False
signal.signal(signal.SIGTERM, self.shutdown_vllm_engine)
signal.signal(signal.SIGINT, self.shutdown_vllm_engine)
@async_on_start @async_on_start
async def async_init(self): async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args( self._engine_context = build_async_engine_client_from_engine_args(
...@@ -100,6 +104,18 @@ class PrefillWorker: ...@@ -100,6 +104,18 @@ class PrefillWorker:
task.add_done_callback(prefill_queue_handler_cb) task.add_done_callback(prefill_queue_handler_cb)
logger.info("PrefillWorker initialized") logger.info("PrefillWorker initialized")
def shutdown_vllm_engine(self, signum, frame):
"""Shutdown the background loop"""
logger.info(f"Received signal {signum}, shutting down")
loop = asyncio.get_event_loop()
try:
self.engine_client.close()
logger.info("PrefillWorker shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
finally:
loop.stop()
async def prefill_queue_handler(self): async def prefill_queue_handler(self):
logger.info("Prefill queue handler entered") logger.info("Prefill queue handler entered")
prefill_queue_nats_server = os.getenv("NATS_SERVER", "nats://localhost:4222") prefill_queue_nats_server = os.getenv("NATS_SERVER", "nats://localhost:4222")
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import asyncio import asyncio
import logging import logging
import os import os
import signal
from components.disagg_router import PyDisaggregatedRouter from components.disagg_router import PyDisaggregatedRouter
from components.prefill_worker import PrefillWorker from components.prefill_worker import PrefillWorker
...@@ -93,6 +94,9 @@ class VllmWorker: ...@@ -93,6 +94,9 @@ class VllmWorker:
logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}") logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}")
self.metrics_publisher = KvMetricsPublisher() self.metrics_publisher = KvMetricsPublisher()
signal.signal(signal.SIGTERM, self.shutdown_vllm_engine)
signal.signal(signal.SIGINT, self.shutdown_vllm_engine)
@async_on_start @async_on_start
async def async_init(self): async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args( self._engine_context = build_async_engine_client_from_engine_args(
...@@ -140,6 +144,18 @@ class VllmWorker: ...@@ -140,6 +144,18 @@ class VllmWorker:
self.disaggregated_router = None self.disaggregated_router = None
logger.info("VllmWorker has been initialized") logger.info("VllmWorker has been initialized")
def shutdown_vllm_engine(self, signum, frame):
"""Shutdown the background loop"""
logger.info(f"Received signal {signum}, shutting down")
loop = asyncio.get_event_loop()
try:
self.engine_client.close()
logger.info("VllmWorker shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
finally:
loop.stop()
async def create_metrics_publisher_endpoint(self): async def create_metrics_publisher_endpoint(self):
component = dynamo_context["component"] component = dynamo_context["component"]
await self.metrics_publisher.create_endpoint(component) await self.metrics_publisher.create_endpoint(component)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment