Commit 90227352 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.9.2-dev-pd-w8a8' into 'v0.9.2-dev'

增加pd分离单实例跨机第二个ip通过配置文件获取。配置文件上设置如下:

See merge request dcutoolkit/deeplearing/vllm!234
parents cd738b68 075e98ff
...@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Optional ...@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Optional
import regex as re import regex as re
import torch import torch
import os
from vllm import envs from vllm import envs
from vllm.config import VllmConfig from vllm.config import VllmConfig
from vllm.distributed.kv_transfer.kv_connector.v1.base import ( from vllm.distributed.kv_transfer.kv_connector.v1.base import (
...@@ -103,6 +103,36 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -103,6 +103,36 @@ class P2pNcclConnector(KVConnectorBase_V1):
self.total_num_hidden_layers = getattr(self.model_config.hf_text_config, self.total_num_hidden_layers = getattr(self.model_config.hf_text_config,
"num_hidden_layers", 0) "num_hidden_layers", 0)
self.pp_size = self.parallel_config.pipeline_parallel_size self.pp_size = self.parallel_config.pipeline_parallel_size
self.tp_size = self.parallel_config.tensor_parallel_size
self.num_card = self.pp_size * self.tp_size
self.multiple_machines = 1 if self.num_card > 8 else 0
if self.is_producer and self.multiple_machines == 1:
self.ip_map = {}
self.duplicate_keys = []
config_file = os.getenv('IP_CONFIG_FILE')
if not config_file:
print("Warning: Please set the IPVNet FILE environment variable for cross machine recognition of the second IP address")
return
try:
with open(config_file, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if line and not line.startswith('#'):
ips = line.split()
if len(ips) == 2:
first_ip, second_ip = ips
if first_ip not in self.ip_map:
self.ip_map[first_ip] = second_ip
else:
print(f"warning: num {line_num} Incorrect format : {line}")
except Exception as e:
print(f"Error: Exception occurred while reading configuration file - {e}")
def get_ip_value(self, key):
return self.ip_map.get(key)
# ============================== # ==============================
# Worker-side methods # Worker-side methods
...@@ -387,26 +417,43 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -387,26 +417,43 @@ class P2pNcclConnector(KVConnectorBase_V1):
pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size
) % self.parallel_config.pipeline_parallel_size ) % self.parallel_config.pipeline_parallel_size
if (self.pp_size == 1): if (self.multiple_machines):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, ip_second = self.get_ip_value(ip)
kv_cache, remote_address) if (self.pp_size == 1):
elif (self.pp_size == 2):
if (pp_rank == 0):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address) kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + self._rank + 4)) kv_cache, str(ip_second) + ":" + str(port + self._rank + 8))
elif (self.pp_size == 2):
if (pp_rank == 0):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
else:
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank))
else: else:
print("Error: only suppprt pp1 pp2 !!!!!!")
else:
if (self.pp_size == 1):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address) kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, elif (self.pp_size == 2):
kv_cache, ip + ":" + str(port + self._rank - 4)) if (pp_rank == 0):
elif (self.pp_size == 8): self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
for i in range(8): kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + i)) kv_cache, ip + ":" + str(port + self._rank + 4))
else: else:
print("Error: only suppprt pp1 pp2 pp8!!!!!!") self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + self._rank - 4))
elif (self.pp_size == 8):
for i in range(8):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + i))
else:
print("Error: only suppprt pp1 pp2 pp8!!!!!!")
def wait_for_save(self): def wait_for_save(self):
pass pass
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment