Unverified Commit 1b290ace authored by Dominik Schwabe's avatar Dominik Schwabe Committed by GitHub
Browse files

Run default _AsyncLLMEngine._run_workers_async in threadpool (#1628)

parent 0d578228
...@@ -206,18 +206,17 @@ class _AsyncLLMEngine(LLMEngine): ...@@ -206,18 +206,17 @@ class _AsyncLLMEngine(LLMEngine):
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Runs the given method on all workers.""" """Runs the given method on all workers."""
all_outputs = [] coros = []
for worker in self.workers: for worker in self.workers:
if self.parallel_config.worker_use_ray: if self.parallel_config.worker_use_ray:
executor = partial(worker.execute_method.remote, method) coros.append(
worker.execute_method.remote(method, *args, **kwargs))
else: else:
executor = getattr(worker, method) executor = getattr(worker, method)
coros.append(asyncio.get_event_loop().run_in_executor(
None, partial(executor, *args, **kwargs)))
output = executor(*args, **kwargs) all_outputs = await asyncio.gather(*coros)
all_outputs.append(output)
if self.parallel_config.worker_use_ray:
all_outputs = await asyncio.gather(*all_outputs)
if get_all_outputs: if get_all_outputs:
return all_outputs return all_outputs
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment