Unverified Commit 6b625a88 authored by Travis Johnson's avatar Travis Johnson Committed by GitHub
Browse files

[Bugfix] Quickfix followups to busy loop removal in #28053 (#36068)


Signed-off-by: default avatarTravis Johnson <tsjohnso@us.ibm.com>
Signed-off-by: default avatarNick Hill <nickhill123@gmail.com>
Co-authored-by: default avatarNick Hill <nickhill123@gmail.com>
parent 54756b61
...@@ -157,10 +157,13 @@ class MultiprocExecutor(Executor): ...@@ -157,10 +157,13 @@ class MultiprocExecutor(Executor):
global_start_rank = ( global_start_rank = (
self.local_world_size * self.parallel_config.node_rank_within_dp self.local_world_size * self.parallel_config.node_rank_within_dp
) )
# Keep track of socket file descriptors that are inherited by the # When using fork, keep track of socket file descriptors that are
# worker when using fork, so that we can close them in subsequent # inherited by the worker, so that we can close them in subsequent
# workers # workers
inherited_fds: list[int] = [] inherited_fds: list[int] | None = (
[] if context.get_start_method() == "fork" else None
)
for local_rank in range(self.local_world_size): for local_rank in range(self.local_world_size):
global_rank = global_start_rank + local_rank global_rank = global_start_rank + local_rank
is_driver_worker = self._is_driver_worker(global_rank) is_driver_worker = self._is_driver_worker(global_rank)
...@@ -175,13 +178,9 @@ class MultiprocExecutor(Executor): ...@@ -175,13 +178,9 @@ class MultiprocExecutor(Executor):
inherited_fds=inherited_fds, inherited_fds=inherited_fds,
) )
unready_workers.append(unready_worker_handle) unready_workers.append(unready_worker_handle)
if context.get_start_method() == "fork": if inherited_fds is not None:
inherited_fds.extend( inherited_fds.append(unready_worker_handle.death_writer.fileno())
[ inherited_fds.append(unready_worker_handle.ready_pipe.fileno())
unready_worker_handle.death_writer.fileno(),
unready_worker_handle.ready_pipe.fileno(),
]
)
# Workers must be created before wait_for_ready to avoid # Workers must be created before wait_for_ready to avoid
# deadlock, since worker.init_device() does a device sync. # deadlock, since worker.init_device() does a device sync.
...@@ -453,10 +452,11 @@ class MultiprocExecutor(Executor): ...@@ -453,10 +452,11 @@ class MultiprocExecutor(Executor):
w.worker_response_mq.shutdown() w.worker_response_mq.shutdown()
w.worker_response_mq = None w.worker_response_mq = None
if self.rpc_broadcast_mq is not None: if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None):
self.rpc_broadcast_mq.shutdown() rpc_broadcast_mq.shutdown()
self.rpc_broadcast_mq = None self.rpc_broadcast_mq = None
for mq in self.response_mqs: if response_mqs := getattr(self, "response_mqs", None):
for mq in response_mqs:
mq.shutdown() mq.shutdown()
self.response_mqs = [] self.response_mqs = []
...@@ -634,13 +634,16 @@ class WorkerProc: ...@@ -634,13 +634,16 @@ class WorkerProc:
input_shm_handle, # Receive SchedulerOutput input_shm_handle, # Receive SchedulerOutput
shared_worker_lock: LockType, shared_worker_lock: LockType,
is_driver_worker: bool, is_driver_worker: bool,
inherited_fds: list[int], inherited_fds: list[int] | None = None,
) -> UnreadyWorkerProcHandle: ) -> UnreadyWorkerProcHandle:
context = get_mp_context() context = get_mp_context()
# Ready pipe to communicate readiness from child to parent # Ready pipe to communicate readiness from child to parent
ready_reader, ready_writer = context.Pipe(duplex=False) ready_reader, ready_writer = context.Pipe(duplex=False)
# Death pipe to let child detect parent process exit # Death pipe to let child detect parent process exit
death_reader, death_writer = context.Pipe(duplex=False) death_reader, death_writer = context.Pipe(duplex=False)
if inherited_fds is not None:
inherited_fds = inherited_fds.copy()
inherited_fds.extend((ready_reader.fileno(), death_writer.fileno()))
process_kwargs = { process_kwargs = {
"vllm_config": vllm_config, "vllm_config": vllm_config,
"local_rank": local_rank, "local_rank": local_rank,
...@@ -652,8 +655,7 @@ class WorkerProc: ...@@ -652,8 +655,7 @@ class WorkerProc:
"shared_worker_lock": shared_worker_lock, "shared_worker_lock": shared_worker_lock,
"is_driver_worker": is_driver_worker, "is_driver_worker": is_driver_worker,
# Have the worker close parent end of this worker's pipes too # Have the worker close parent end of this worker's pipes too
"inherited_fds": inherited_fds "inherited_fds": inherited_fds if inherited_fds is not None else [],
+ [ready_reader.fileno(), death_writer.fileno()],
} }
# Run EngineCore busy loop in background process. # Run EngineCore busy loop in background process.
proc = context.Process( proc = context.Process(
...@@ -697,9 +699,8 @@ class WorkerProc: ...@@ -697,9 +699,8 @@ class WorkerProc:
unready_proc_handles: list[UnreadyWorkerProcHandle], unready_proc_handles: list[UnreadyWorkerProcHandle],
) -> list[WorkerProcHandle]: ) -> list[WorkerProcHandle]:
e = Exception( e = Exception(
"WorkerProc initialization failed due to " "WorkerProc initialization failed due to an exception in a "
"an exception in a background process. " "background process. See stack trace for root cause."
"See stack trace for root cause."
) )
pipes = {handle.ready_pipe: handle for handle in unready_proc_handles} pipes = {handle.ready_pipe: handle for handle in unready_proc_handles}
...@@ -802,7 +803,7 @@ class WorkerProc: ...@@ -802,7 +803,7 @@ class WorkerProc:
try: try:
os.close(fd) os.close(fd)
except Exception as e: except Exception as e:
logger.warning("Exception closing inherited connection: %s", e) logger.warning("Error closing inherited connection: %s: %s", type(e), e)
try: try:
# Initialize tracer # Initialize tracer
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment