Unverified Commit acf8aeb7 authored by Ning Xie's avatar Ning Xie Committed by GitHub
Browse files

[Misc] normalize multiprocessing Queue usage (#22371)


Signed-off-by: default avatarAndy Xie <andy.xning@gmail.com>
parent 7e3a8dc9
...@@ -118,8 +118,17 @@ def test_sharded_state_loader(enable_lora, tp_size, num_gpus_available, ...@@ -118,8 +118,17 @@ def test_sharded_state_loader(enable_lora, tp_size, num_gpus_available,
tensor_parallel_size=tp_size, tensor_parallel_size=tp_size,
)) ))
p.start() p.start()
p.join() # Call queue.get() before p.join() to prevent deadlock:
# If p.join() is called before queue.get() and the queue is full,
# the child process may block while writing to the queue and never
# terminate, causing the parent to wait indefinitely on p.join().
# See: https://github.com/vllm-project/vllm/pull/22371#discussion_r2257773814
out_before = queue.get() out_before = queue.get()
p.join()
queue.close()
queue.join_thread()
queue = ctx.Queue()
p = ctx.Process(target=_run_generate, p = ctx.Process(target=_run_generate,
args=(output_dir, queue), args=(output_dir, queue),
...@@ -131,7 +140,14 @@ def test_sharded_state_loader(enable_lora, tp_size, num_gpus_available, ...@@ -131,7 +140,14 @@ def test_sharded_state_loader(enable_lora, tp_size, num_gpus_available,
load_format="sharded_state", load_format="sharded_state",
)) ))
p.start() p.start()
p.join() # Call queue.get() before p.join() to prevent deadlock:
# If p.join() is called before queue.get() and the queue is full,
# the child process may block while writing to the queue and never
# terminate, causing the parent to wait indefinitely on p.join().
# See: https://github.com/vllm-project/vllm/pull/22371#discussion_r2257773814
out_after = queue.get() out_after = queue.get()
p.join()
queue.close()
queue.join_thread()
assert out_before == out_after assert out_before == out_after
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment