Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
70c232f8
Unverified
Commit
70c232f8
authored
Jul 08, 2024
by
youkaichao
Committed by
GitHub
Jul 08, 2024
Browse files
[core][distributed] fix ray worker rank assignment (#6235)
parent
a3c9435d
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
28 additions
and
6 deletions
+28
-6
vllm/executor/ray_gpu_executor.py
vllm/executor/ray_gpu_executor.py
+28
-6
No files found.
vllm/executor/ray_gpu_executor.py
View file @
70c232f8
...
@@ -134,11 +134,32 @@ class RayGPUExecutor(DistributedGPUExecutor):
...
@@ -134,11 +134,32 @@ class RayGPUExecutor(DistributedGPUExecutor):
worker_node_and_gpu_ids
=
self
.
_run_workers
(
"get_node_and_gpu_ids"
,
worker_node_and_gpu_ids
=
self
.
_run_workers
(
"get_node_and_gpu_ids"
,
use_dummy_driver
=
True
)
use_dummy_driver
=
True
)
node_workers
=
defaultdict
(
list
)
# the order in `worker_node_and_gpu_ids` does not necessarily match
node_gpus
=
defaultdict
(
list
)
# the machine boundaries. We need to make sure that workers in the
# same node are assigned consecutive ranks.
for
i
,
(
node_id
,
gpu_ids
)
in
enumerate
(
worker_node_and_gpu_ids
):
# examples:
node_workers
[
node_id
].
append
(
i
)
# [('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [0]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [0]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [1]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [2]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [3]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [1]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [2]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [3])] # noqa
# initialize worker ranks with -1 (unassigned)
worker_ranks
=
[
-
1
for
x
in
worker_node_and_gpu_ids
]
current_rank
=
0
while
-
1
in
worker_ranks
:
# whenever we find an unassigned worker, find the node
index
=
worker_ranks
.
index
(
-
1
)
current_node_id
=
worker_node_and_gpu_ids
[
index
][
0
]
# assign ranks to all workers in the same node
for
i
,
(
node_id
,
_
)
in
enumerate
(
worker_node_and_gpu_ids
):
if
node_id
==
current_node_id
:
worker_ranks
[
i
]
=
current_rank
current_rank
+=
1
# with the above example, worker_ranks will be [0, 4, 5, 6, 7, 1, 2, 3]
node_workers
=
defaultdict
(
list
)
# node id -> list of worker ranks
node_gpus
=
defaultdict
(
list
)
# node id -> list of gpu ids
for
worker_rank
,
(
node_id
,
gpu_ids
)
in
zip
(
worker_ranks
,
worker_node_and_gpu_ids
):
node_workers
[
node_id
].
append
(
worker_rank
)
# `gpu_ids` can be a list of strings or integers.
# `gpu_ids` can be a list of strings or integers.
# convert them to integers for consistency.
# convert them to integers for consistency.
# NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs),
# NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs),
...
@@ -184,7 +205,8 @@ class RayGPUExecutor(DistributedGPUExecutor):
...
@@ -184,7 +205,8 @@ class RayGPUExecutor(DistributedGPUExecutor):
local_rank
=
node_workers
[
node_id
].
index
(
rank
),
local_rank
=
node_workers
[
node_id
].
index
(
rank
),
rank
=
rank
,
rank
=
rank
,
distributed_init_method
=
distributed_init_method
,
distributed_init_method
=
distributed_init_method
,
)
for
rank
,
(
node_id
,
_
)
in
enumerate
(
worker_node_and_gpu_ids
)
)
for
rank
,
(
node_id
,
_
)
in
zip
(
worker_ranks
,
worker_node_and_gpu_ids
)
]
]
self
.
_run_workers
(
"init_worker"
,
all_kwargs
=
init_worker_all_kwargs
)
self
.
_run_workers
(
"init_worker"
,
all_kwargs
=
init_worker_all_kwargs
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment