Unverified Commit 5175b884 authored by Nick Hill's avatar Nick Hill Committed by GitHub
Browse files

[BugFix] Remove default multiproc executor `collective_rpc` timeout (#17000)


Signed-off-by: default avatarNick Hill <nhill@redhat.com>
parent 5536b30a
...@@ -38,7 +38,7 @@ logger = init_logger(__name__) ...@@ -38,7 +38,7 @@ logger = init_logger(__name__)
POLLING_TIMEOUT_MS = 5000 POLLING_TIMEOUT_MS = 5000
POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000 POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
EXECUTE_MODEL_TIMEOUT_S = 30 EXECUTE_MODEL_TIMEOUT_S = 40
class MultiprocExecutor(Executor): class MultiprocExecutor(Executor):
...@@ -151,16 +151,16 @@ class MultiprocExecutor(Executor): ...@@ -151,16 +151,16 @@ class MultiprocExecutor(Executor):
def collective_rpc(self, def collective_rpc(self,
method: Union[str, Callable], method: Union[str, Callable],
timeout: Optional[float] = 180.0, timeout: Optional[float] = None,
args: tuple = (), args: tuple = (),
kwargs: Optional[dict] = None, kwargs: Optional[dict] = None,
rank0_reply_only: bool = False) -> list[Any]: rank0_reply_only: bool = False) -> list[Any]:
start_time = time.monotonic()
kwargs = kwargs or {}
if self.is_failed: if self.is_failed:
raise RuntimeError("Executor failed.") raise RuntimeError("Executor failed.")
deadline = None if timeout is None else time.monotonic() + timeout
kwargs = kwargs or {}
# NOTE: If the args are heterogeneous, then we pack them into a list, # NOTE: If the args are heterogeneous, then we pack them into a list,
# and unpack them in the method of every worker, because every worker # and unpack them in the method of every worker, because every worker
# knows their own rank. # knows their own rank.
...@@ -176,8 +176,8 @@ class MultiprocExecutor(Executor): ...@@ -176,8 +176,8 @@ class MultiprocExecutor(Executor):
workers = (self.workers[0], ) if rank0_reply_only else self.workers workers = (self.workers[0], ) if rank0_reply_only else self.workers
responses = [None] * len(workers) responses = [None] * len(workers)
for w in workers: for w in workers:
dequeue_timeout = timeout - (time.monotonic() - start_time dequeue_timeout = None if deadline is None else (
) if timeout is not None else None deadline - time.monotonic())
status, result = w.worker_response_mq.dequeue( status, result = w.worker_response_mq.dequeue(
timeout=dequeue_timeout, cancel=self.shutdown_event) timeout=dequeue_timeout, cancel=self.shutdown_event)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment