Unverified Commit 2bfdbf2a authored by Tyler Michael Smith's avatar Tyler Michael Smith Committed by GitHub
Browse files

[V1][Core] Use weakref.finalize instead of atexit (#11242)


Signed-off-by: default avatarTyler Michael Smith <tyler@neuralmagic.com>
parent e88db68c
import atexit
import os import os
import weakref
from typing import List, Optional from typing import List, Optional
import msgspec import msgspec
...@@ -165,15 +165,9 @@ class MPClient(EngineCoreClient): ...@@ -165,15 +165,9 @@ class MPClient(EngineCoreClient):
ready_path=ready_path, # type: ignore[misc] ready_path=ready_path, # type: ignore[misc]
**kwargs, **kwargs,
) )
atexit.register(self.shutdown) self._finalizer = weakref.finalize(self, self.shutdown)
def shutdown(self): def shutdown(self):
# During final garbage collection in process shutdown, atexit may be
# None.
if atexit:
# in case shutdown gets called via __del__ first
atexit.unregister(self.shutdown)
# Shut down the zmq context. # Shut down the zmq context.
self.ctx.destroy(linger=0) self.ctx.destroy(linger=0)
...@@ -197,9 +191,6 @@ class MPClient(EngineCoreClient): ...@@ -197,9 +191,6 @@ class MPClient(EngineCoreClient):
os.remove(socket_file) os.remove(socket_file)
self.proc_handle = None self.proc_handle = None
def __del__(self):
self.shutdown()
class SyncMPClient(MPClient): class SyncMPClient(MPClient):
"""Synchronous client for multi-proc EngineCore.""" """Synchronous client for multi-proc EngineCore."""
......
import atexit
import os import os
import pickle import pickle
import signal import signal
import sys import sys
import time import time
import weakref
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
from multiprocessing.process import BaseProcess from multiprocessing.process import BaseProcess
...@@ -37,7 +37,7 @@ class MultiprocExecutor(Executor): ...@@ -37,7 +37,7 @@ class MultiprocExecutor(Executor):
def __init__(self, vllm_config: VllmConfig) -> None: def __init__(self, vllm_config: VllmConfig) -> None:
# Call self.shutdown at exit to clean up # Call self.shutdown at exit to clean up
# and ensure workers will be terminated. # and ensure workers will be terminated.
atexit.register(self.shutdown) self._finalizer = weakref.finalize(self, self.shutdown)
self.vllm_config = vllm_config self.vllm_config = vllm_config
self.parallel_config = vllm_config.parallel_config self.parallel_config = vllm_config.parallel_config
...@@ -195,14 +195,10 @@ class MultiprocExecutor(Executor): ...@@ -195,14 +195,10 @@ class MultiprocExecutor(Executor):
os.remove(socket_path) os.remove(socket_path)
def shutdown(self): def shutdown(self):
if atexit:
# in case shutdown was called explicitly, we don't need to call it
# again
atexit.unregister(self.shutdown)
"""Properly shut down the executor and its workers""" """Properly shut down the executor and its workers"""
if getattr(self, 'shutting_down', False): if getattr(self, 'shutting_down', False):
self.shutting_down = True self.shutting_down = True
for w in self.workers: #TODO: not sure if needed for w in self.workers:
w.worker_response_mq = None w.worker_response_mq = None
self._ensure_worker_termination() self._ensure_worker_termination()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment