Unverified Commit ccede2b2 authored by Russell Bryant's avatar Russell Bryant Committed by GitHub
Browse files

[Core] cleanup zmq ipc sockets on exit (#11115)


Signed-off-by: default avatarRussell Bryant <rbryant@redhat.com>
parent 24a36d6d
import asyncio import asyncio
import atexit
import importlib import importlib
import inspect import inspect
import multiprocessing import multiprocessing
...@@ -196,6 +197,14 @@ async def build_async_engine_client_from_engine_args( ...@@ -196,6 +197,14 @@ async def build_async_engine_client_from_engine_args(
assert engine_pid is not None, "Engine process failed to start." assert engine_pid is not None, "Engine process failed to start."
logger.info("Started engine process with PID %d", engine_pid) logger.info("Started engine process with PID %d", engine_pid)
def _cleanup_ipc_path():
socket_path = ipc_path.replace("ipc://", "")
if os.path.exists(socket_path):
os.remove(socket_path)
# Ensure we clean up the local IPC socket file on exit.
atexit.register(_cleanup_ipc_path)
# Build RPCClient, which conforms to EngineClient Protocol. # Build RPCClient, which conforms to EngineClient Protocol.
engine_config = engine_args.create_engine_config() engine_config = engine_args.create_engine_config()
build_client = partial(MQLLMEngineClient, ipc_path, engine_config, build_client = partial(MQLLMEngineClient, ipc_path, engine_config,
......
...@@ -4,6 +4,7 @@ import queue ...@@ -4,6 +4,7 @@ import queue
import signal import signal
import threading import threading
import time import time
from dataclasses import dataclass
from multiprocessing.process import BaseProcess from multiprocessing.process import BaseProcess
from typing import List, Tuple, Type, Union from typing import List, Tuple, Type, Union
...@@ -129,6 +130,14 @@ class EngineCore: ...@@ -129,6 +130,14 @@ class EngineCore:
self.model_executor.profile(is_start) self.model_executor.profile(is_start)
@dataclass
class EngineCoreProcHandle:
proc: BaseProcess
ready_path: str
input_path: str
output_path: str
class EngineCoreProc(EngineCore): class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process.""" """ZMQ-wrapper for running EngineCore in background process."""
...@@ -200,7 +209,7 @@ class EngineCoreProc(EngineCore): ...@@ -200,7 +209,7 @@ class EngineCoreProc(EngineCore):
input_path: str, input_path: str,
output_path: str, output_path: str,
ready_path: str, ready_path: str,
) -> BaseProcess: ) -> EngineCoreProcHandle:
# The current process might have CUDA context, # The current process might have CUDA context,
# so we need to spawn a new process. # so we need to spawn a new process.
# NOTE(rob): this is a problem for using EngineCoreProc w/ # NOTE(rob): this is a problem for using EngineCoreProc w/
...@@ -222,7 +231,10 @@ class EngineCoreProc(EngineCore): ...@@ -222,7 +231,10 @@ class EngineCoreProc(EngineCore):
# Wait for startup # Wait for startup
EngineCoreProc.wait_for_startup(proc, ready_path) EngineCoreProc.wait_for_startup(proc, ready_path)
return proc return EngineCoreProcHandle(proc=proc,
ready_path=ready_path,
input_path=input_path,
output_path=output_path)
@staticmethod @staticmethod
def run_engine_core(*args, **kwargs): def run_engine_core(*args, **kwargs):
......
import atexit import atexit
import os
from typing import List, Union from typing import List, Union
import msgspec import msgspec
...@@ -148,7 +149,7 @@ class MPClient(EngineCoreClient): ...@@ -148,7 +149,7 @@ class MPClient(EngineCoreClient):
self.input_socket.bind(input_path) self.input_socket.bind(input_path)
# Start EngineCore in background process. # Start EngineCore in background process.
self.proc = EngineCoreProc.make_engine_core_process( self.proc_handle = EngineCoreProc.make_engine_core_process(
*args, *args,
input_path=input_path, input_path=input_path,
output_path=output_path, output_path=output_path,
...@@ -161,13 +162,24 @@ class MPClient(EngineCoreClient): ...@@ -161,13 +162,24 @@ class MPClient(EngineCoreClient):
# Shut down the zmq context. # Shut down the zmq context.
self.ctx.destroy(linger=0) self.ctx.destroy(linger=0)
# Shutdown the process if needed. if hasattr(self, "proc_handle"):
if hasattr(self, "proc") and self.proc.is_alive(): # Shutdown the process if needed.
self.proc.terminate() if self.proc_handle.proc.is_alive():
self.proc.join(5) self.proc_handle.proc.terminate()
self.proc_handle.proc.join(5)
if self.proc.is_alive():
kill_process_tree(self.proc.pid) if self.proc_handle.proc.is_alive():
kill_process_tree(self.proc_handle.proc.pid)
# Remove zmq ipc socket files
ipc_sockets = [
self.proc_handle.ready_path, self.proc_handle.output_path,
self.proc_handle.input_path
]
for ipc_socket in ipc_sockets:
socket_file = ipc_socket.replace("ipc://", "")
if os.path.exists(socket_file):
os.remove(socket_file)
def __del__(self): def __del__(self):
self.shutdown() self.shutdown()
......
...@@ -172,16 +172,23 @@ class MultiprocExecutor: ...@@ -172,16 +172,23 @@ class MultiprocExecutor:
# Send SIGTERM if still running # Send SIGTERM if still running
active_procs = [w.proc for w in self.workers if w.proc.is_alive()] active_procs = [w.proc for w in self.workers if w.proc.is_alive()]
self.workers = None
for p in active_procs: for p in active_procs:
p.terminate() p.terminate()
if wait_for_termination(active_procs, 4): if not wait_for_termination(active_procs, 4):
return # Send SIGKILL if still running
active_procs = [p for p in active_procs if p.is_alive()]
for p in active_procs:
p.kill()
# Send SIGKILL if still running self._cleanup_sockets()
active_procs = [p for p in active_procs if p.is_alive()] self.workers = None
for p in active_procs:
p.kill() def _cleanup_sockets(self):
for w in self.workers:
# Remove the zmq ipc socket file
socket_path = w.ready_path.replace("ipc://", "")
if os.path.exists(socket_path):
os.remove(socket_path)
def shutdown(self): def shutdown(self):
"""Properly shut down the executor and its workers""" """Properly shut down the executor and its workers"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment