Unverified Commit 1543914c authored by Robert Shaw's avatar Robert Shaw Committed by GitHub
Browse files

[V1] Improve TP>1 Error Handling + Stack Trace (#11721)


Co-authored-by: default avatarTyler Michael Smith <tyler@neuralmagic.com>
parent 61fed92c
import asyncio import asyncio
import os import os
import signal
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union
from vllm.config import ModelConfig, VllmConfig from vllm.config import ModelConfig, VllmConfig
...@@ -42,21 +41,6 @@ class AsyncLLM(EngineClient): ...@@ -42,21 +41,6 @@ class AsyncLLM(EngineClient):
start_engine_loop: bool = True, start_engine_loop: bool = True,
) -> None: ) -> None:
# The child processes will send SIGQUIT when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
# TODO: rather than killing the main process, we should
# figure out how to raise an AsyncEngineDeadError and
# handle at the API server level so we can return a better
# error code to the clients calling VLLM.
def sigquit_handler(signum, frame):
logger.fatal(
"AsyncLLM got SIGQUIT from worker processes, shutting "
"down. See stack trace above for root cause issue.")
kill_process_tree(os.getpid())
signal.signal(signal.SIGQUIT, sigquit_handler)
assert start_engine_loop assert start_engine_loop
self.log_requests = log_requests self.log_requests = log_requests
......
...@@ -198,7 +198,7 @@ class EngineCoreProc(EngineCore): ...@@ -198,7 +198,7 @@ class EngineCoreProc(EngineCore):
except Exception: except Exception:
traceback = get_exception_traceback() traceback = get_exception_traceback()
logger.error("EngineCore hit an exception: %s", traceback) logger.error("EngineCore hit an exception: %s", traceback)
parent_process.send_signal(signal.SIGQUIT) parent_process.send_signal(signal.SIGUSR1)
finally: finally:
if engine_core is not None: if engine_core is not None:
......
import os
import signal
import weakref import weakref
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Type from typing import List, Type
...@@ -8,7 +10,8 @@ import zmq.asyncio ...@@ -8,7 +10,8 @@ import zmq.asyncio
from vllm.config import VllmConfig from vllm.config import VllmConfig
from vllm.logger import init_logger from vllm.logger import init_logger
from vllm.utils import get_open_zmq_ipc_path, make_zmq_socket from vllm.utils import (get_open_zmq_ipc_path, kill_process_tree,
make_zmq_socket)
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs, from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
EngineCoreProfile, EngineCoreRequest, EngineCoreProfile, EngineCoreRequest,
EngineCoreRequestType, EngineCoreRequestUnion) EngineCoreRequestType, EngineCoreRequestUnion)
...@@ -134,6 +137,20 @@ class MPClient(EngineCoreClient): ...@@ -134,6 +137,20 @@ class MPClient(EngineCoreClient):
executor_class: Type[Executor], executor_class: Type[Executor],
log_stats: bool = False, log_stats: bool = False,
): ):
# The child processes will send SIGUSR1 when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
# TODO(rob): rather than killing the main process, we should
# figure out how to raise an AsyncEngineDeadError and
# handle at the API server level so we can return a better
# error code to the clients calling VLLM.
def sigusr1_handler(signum, frame):
logger.fatal("Got fatal signal from worker processes, shutting "
"down. See stack trace above for root cause issue.")
kill_process_tree(os.getpid())
signal.signal(signal.SIGUSR1, sigusr1_handler)
# Serialization setup. # Serialization setup.
self.encoder = PickleEncoder() self.encoder = PickleEncoder()
self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs)
......
...@@ -9,6 +9,7 @@ from enum import Enum, auto ...@@ -9,6 +9,7 @@ from enum import Enum, auto
from multiprocessing.process import BaseProcess from multiprocessing.process import BaseProcess
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
import psutil
import zmq import zmq
from vllm.config import VllmConfig from vllm.config import VllmConfig
...@@ -38,6 +39,19 @@ class MultiprocExecutor(Executor): ...@@ -38,6 +39,19 @@ class MultiprocExecutor(Executor):
# and ensure workers will be terminated. # and ensure workers will be terminated.
self._finalizer = weakref.finalize(self, self.shutdown) self._finalizer = weakref.finalize(self, self.shutdown)
# The child processes will send SIGUSR1 when unrecoverable
# errors happen.
def sigusr1_handler(signum, frame):
logger.fatal(
"MulitprocExecutor got fatal signal from worker processes, "
"shutting down. See stack trace above for root cause issue.")
# Propagate error up to parent process.
parent_process = psutil.Process().parent()
parent_process.send_signal(signal.SIGUSR1)
self.shutdown()
signal.signal(signal.SIGUSR1, sigusr1_handler)
self.vllm_config = vllm_config self.vllm_config = vllm_config
self.parallel_config = vllm_config.parallel_config self.parallel_config = vllm_config.parallel_config
...@@ -335,8 +349,11 @@ class WorkerProc: ...@@ -335,8 +349,11 @@ class WorkerProc:
except SystemExit: except SystemExit:
logger.debug("Worker interrupted.") logger.debug("Worker interrupted.")
except BaseException as e: except Exception:
logger.exception(e) # worker_busy_loop sends exceptions exceptons to Executor
# for shutdown, but if there is an error in startup or an
# error with IPC itself, we need to alert the parent.
psutil.Process().parent().send_signal(signal.SIGUSR1)
raise raise
finally: finally:
...@@ -377,9 +394,10 @@ class WorkerProc: ...@@ -377,9 +394,10 @@ class WorkerProc:
try: try:
output = getattr(self.worker, method)(*args, **kwargs) output = getattr(self.worker, method)(*args, **kwargs)
except BaseException as e: except Exception as e:
self.worker_response_mq.enqueue( self.worker_response_mq.enqueue(
(WorkerProc.ResponseStatus.FAILURE, e)) (WorkerProc.ResponseStatus.FAILURE, e))
logger.exception("WorkerProc hit an exception: %s", exc_info=e)
continue continue
self.worker_response_mq.enqueue( self.worker_response_mq.enqueue(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment