"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "0b544e6476eb88c3c011522c1feea6f16edf7022"
Unverified Commit 1e584823 authored by dtc's avatar dtc Committed by GitHub
Browse files

[Bugfix] Strengthen the check of X-data-parallel-rank in Hybrid LB mode (#32314)


Signed-off-by: default avatarTianchen Ding <dtcccc@linux.alibaba.com>
parent 4c1c501a
...@@ -143,6 +143,7 @@ def test_mp_client_uses_env_timeout(monkeypatch: pytest.MonkeyPatch): ...@@ -143,6 +143,7 @@ def test_mp_client_uses_env_timeout(monkeypatch: pytest.MonkeyPatch):
data_parallel_rank_local=None, data_parallel_rank_local=None,
data_parallel_hybrid_lb=False, data_parallel_hybrid_lb=False,
data_parallel_external_lb=False, data_parallel_external_lb=False,
local_engines_only=False,
) )
vllm_config = SimpleNamespace(parallel_config=parallel_config) vllm_config = SimpleNamespace(parallel_config=parallel_config)
......
...@@ -361,6 +361,14 @@ class ParallelConfig: ...@@ -361,6 +361,14 @@ class ParallelConfig:
def num_ubatches(self) -> int: def num_ubatches(self) -> int:
return 2 if self.enable_dbo else self.ubatch_size return 2 if self.enable_dbo else self.ubatch_size
@property
def local_engines_only(self) -> bool:
"""
Client manages local+remote EngineCores in pure internal LB case.
Client manages local EngineCores in hybrid and external LB case.
"""
return self.data_parallel_external_lb or self.data_parallel_hybrid_lb
def get_next_dp_init_port(self) -> int: def get_next_dp_init_port(self) -> int:
""" """
We might need to initialize process groups in multiple We might need to initialize process groups in multiple
......
...@@ -190,9 +190,7 @@ def run_multi_api_server(args: argparse.Namespace): ...@@ -190,9 +190,7 @@ def run_multi_api_server(args: argparse.Namespace):
parallel_config = vllm_config.parallel_config parallel_config = vllm_config.parallel_config
dp_rank = parallel_config.data_parallel_rank dp_rank = parallel_config.data_parallel_rank
external_dp_lb = parallel_config.data_parallel_external_lb assert parallel_config.local_engines_only or dp_rank == 0
hybrid_dp_lb = parallel_config.data_parallel_hybrid_lb
assert external_dp_lb or hybrid_dp_lb or dp_rank == 0
api_server_manager: APIServerProcessManager | None = None api_server_manager: APIServerProcessManager | None = None
...@@ -218,7 +216,7 @@ def run_multi_api_server(args: argparse.Namespace): ...@@ -218,7 +216,7 @@ def run_multi_api_server(args: argparse.Namespace):
# (after the launcher context manager exits), # (after the launcher context manager exits),
# since we get the front-end stats update address from the coordinator # since we get the front-end stats update address from the coordinator
# via the handshake with the local engine. # via the handshake with the local engine.
if dp_rank == 0 or not (external_dp_lb or hybrid_dp_lb): if dp_rank == 0 or not parallel_config.local_engines_only:
# Start API servers using the manager. # Start API servers using the manager.
api_server_manager = APIServerProcessManager(**api_server_manager_kwargs) api_server_manager = APIServerProcessManager(**api_server_manager_kwargs)
......
...@@ -62,12 +62,10 @@ class DPCoordinator: ...@@ -62,12 +62,10 @@ class DPCoordinator:
assert dp_size > 1, "Coordinator only used for data parallel" assert dp_size > 1, "Coordinator only used for data parallel"
host = parallel_config.data_parallel_master_ip host = parallel_config.data_parallel_master_ip
external_lb = parallel_config.data_parallel_external_lb
hybrid_lb = parallel_config.data_parallel_hybrid_lb
# Assume coordinator is colocated with front-end procs when not in # Assume coordinator is colocated with front-end procs when not in
# either external or hybrid DP LB mode. # either external or hybrid DP LB mode.
local_only = not (external_lb or hybrid_lb) local_only = not parallel_config.local_engines_only
front_publish_address = get_engine_client_zmq_addr( front_publish_address = get_engine_client_zmq_addr(
local_only=local_only, host=host local_only=local_only, host=host
) )
......
...@@ -507,12 +507,7 @@ class MPClient(EngineCoreClient): ...@@ -507,12 +507,7 @@ class MPClient(EngineCoreClient):
offline_mode = parallel_config.data_parallel_rank_local is not None offline_mode = parallel_config.data_parallel_rank_local is not None
# Client manages local+remote EngineCores in pure internal LB case. # Client manages local+remote EngineCores in pure internal LB case.
# Client manages local EngineCores in hybrid and external LB case. # Client manages local EngineCores in hybrid and external LB case.
local_engines_only = ( num_ranks = dp_local_size if parallel_config.local_engines_only else dp_size
parallel_config.data_parallel_hybrid_lb
or parallel_config.data_parallel_external_lb
)
num_ranks = dp_local_size if local_engines_only else dp_size
self.engine_ranks_managed = ( self.engine_ranks_managed = (
[dp_rank] if offline_mode else list(range(dp_rank, dp_rank + num_ranks)) [dp_rank] if offline_mode else list(range(dp_rank, dp_rank + num_ranks))
) )
......
...@@ -458,13 +458,14 @@ class InputProcessor: ...@@ -458,13 +458,14 @@ class InputProcessor:
self._validate_lora(lora_request) self._validate_lora(lora_request)
self._validate_params(params) self._validate_params(params)
data_parallel_size = self.vllm_config.parallel_config.data_parallel_size parallel_config = self.vllm_config.parallel_config
if data_parallel_rank is not None and not ( dp_size = parallel_config.data_parallel_size
0 <= data_parallel_rank < data_parallel_size dp_local_size = parallel_config.data_parallel_size_local
): num_ranks = dp_local_size if parallel_config.local_engines_only else dp_size
if data_parallel_rank is not None and not (0 <= data_parallel_rank < num_ranks):
raise ValueError( raise ValueError(
f"data_parallel_rank {data_parallel_rank} " f"data_parallel_rank {data_parallel_rank} "
f"is out of range [0, {data_parallel_size})." f"is out of range [0, {num_ranks})."
) )
if arrival_time is None: if arrival_time is None:
......
...@@ -787,10 +787,7 @@ def launch_core_engines( ...@@ -787,10 +787,7 @@ def launch_core_engines(
local_start_index = parallel_config.data_parallel_rank_local local_start_index = parallel_config.data_parallel_rank_local
dp_rank = parallel_config.data_parallel_rank dp_rank = parallel_config.data_parallel_rank
host = parallel_config.data_parallel_master_ip host = parallel_config.data_parallel_master_ip
local_engines_only = ( local_engines_only = parallel_config.local_engines_only
parallel_config.data_parallel_hybrid_lb
or parallel_config.data_parallel_external_lb
)
# In offline mode there is an LLM instance per DP rank and # In offline mode there is an LLM instance per DP rank and
# one core engine per LLM, see # one core engine per LLM, see
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment