Unverified Commit cbae7af5 authored by Nick Hill's avatar Nick Hill Committed by GitHub
Browse files

[V1][BugFix] Fix engine core client shutdown hangs (#13298)



Even though ZMQ context.destroy() is meant to close open sockets before terminating the context, it appears to be necessary to do this explicitly or else it can hang in the context.term() method.

Close zmq sockets explicitly before terminating context, make shutdown of client resource more robust, shut down engine core process prior to terminating zmq context.
Signed-off-by: default avatarNick Hill <nhill@redhat.com>
parent eb24dc4a
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
import asyncio import asyncio
import time import time
import uuid import uuid
from contextlib import ExitStack
from typing import Dict, List, Optional from typing import Dict, List, Optional
import pytest import pytest
...@@ -178,7 +177,7 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool): ...@@ -178,7 +177,7 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool):
@pytest.mark.asyncio(loop_scope="function") @pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_asyncio(monkeypatch): async def test_engine_core_client_asyncio(monkeypatch):
with monkeypatch.context() as m, ExitStack() as after: with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "1") m.setenv("VLLM_USE_V1", "1")
# Monkey-patch core engine utility function to test. # Monkey-patch core engine utility function to test.
...@@ -195,7 +194,6 @@ async def test_engine_core_client_asyncio(monkeypatch): ...@@ -195,7 +194,6 @@ async def test_engine_core_client_asyncio(monkeypatch):
executor_class=executor_class, executor_class=executor_class,
log_stats=True, log_stats=True,
) )
after.callback(client.shutdown)
MAX_TOKENS = 20 MAX_TOKENS = 20
params = SamplingParams(max_tokens=MAX_TOKENS) params = SamplingParams(max_tokens=MAX_TOKENS)
......
...@@ -8,6 +8,7 @@ import uuid ...@@ -8,6 +8,7 @@ import uuid
import weakref import weakref
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from concurrent.futures import Future from concurrent.futures import Future
from dataclasses import dataclass
from threading import Thread from threading import Thread
from typing import Any, Dict, List, Optional, Type, Union from typing import Any, Dict, List, Optional, Type, Union
...@@ -169,6 +170,31 @@ class InprocClient(EngineCoreClient): ...@@ -169,6 +170,31 @@ class InprocClient(EngineCoreClient):
self.engine_core.add_lora(lora_request) self.engine_core.add_lora(lora_request)
@dataclass
class BackgroundResources:
"""Used as a finalizer for clean shutdown, avoiding
circular reference back to the client object."""
ctx: Union[zmq.Context, zmq.asyncio.Context] = None
output_socket: Union[zmq.Socket, zmq.asyncio.Socket] = None
input_socket: Union[zmq.Socket, zmq.asyncio.Socket] = None
proc_handle: Optional[BackgroundProcHandle] = None
def __call__(self):
"""Clean up background resources."""
if self.proc_handle is not None:
self.proc_handle.shutdown()
# ZMQ context termination can hang if the sockets
# aren't explicitly closed first.
if self.output_socket is not None:
self.output_socket.close(linger=0)
if self.input_socket is not None:
self.input_socket.close(linger=0)
if self.ctx is not None:
self.ctx.destroy(linger=0)
class MPClient(EngineCoreClient): class MPClient(EngineCoreClient):
""" """
MPClient: base client for multi-proc EngineCore. MPClient: base client for multi-proc EngineCore.
...@@ -212,21 +238,22 @@ class MPClient(EngineCoreClient): ...@@ -212,21 +238,22 @@ class MPClient(EngineCoreClient):
zmq.asyncio.Context() # type: ignore[attr-defined] zmq.asyncio.Context() # type: ignore[attr-defined]
if asyncio_mode else zmq.Context()) # type: ignore[attr-defined] if asyncio_mode else zmq.Context()) # type: ignore[attr-defined]
# Note(rob): shutdown function cannot be a bound method, # This will ensure resources created so far are closed
# else the gc cannot collect the object. # when the client is garbage collected, even if an
self._finalizer = weakref.finalize(self, lambda x: x.destroy(linger=0), # exception is raised mid-construction.
self.ctx) resources = BackgroundResources(ctx=self.ctx)
self._finalizer = weakref.finalize(self, resources)
# Paths and sockets for IPC. # Paths and sockets for IPC.
output_path = get_open_zmq_ipc_path() output_path = get_open_zmq_ipc_path()
input_path = get_open_zmq_ipc_path() input_path = get_open_zmq_ipc_path()
self.output_socket = make_zmq_socket(self.ctx, output_path, resources.output_socket = make_zmq_socket(self.ctx, output_path,
zmq.constants.PULL) zmq.constants.PULL)
self.input_socket = make_zmq_socket(self.ctx, input_path, resources.input_socket = make_zmq_socket(self.ctx, input_path,
zmq.constants.PUSH) zmq.constants.PUSH)
# Start EngineCore in background process. # Start EngineCore in background process.
self.proc_handle = BackgroundProcHandle( resources.proc_handle = BackgroundProcHandle(
input_path=input_path, input_path=input_path,
output_path=output_path, output_path=output_path,
process_name="EngineCore", process_name="EngineCore",
...@@ -237,13 +264,11 @@ class MPClient(EngineCoreClient): ...@@ -237,13 +264,11 @@ class MPClient(EngineCoreClient):
"log_stats": log_stats, "log_stats": log_stats,
}) })
self.output_socket = resources.output_socket
self.input_socket = resources.input_socket
self.utility_results: Dict[int, AnyFuture] = {} self.utility_results: Dict[int, AnyFuture] = {}
def shutdown(self): def shutdown(self):
"""Clean up background resources."""
if hasattr(self, "proc_handle"):
self.proc_handle.shutdown()
self._finalizer() self._finalizer()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment