"docs/vscode:/vscode.git/clone" did not exist on "24d6ea8afdb13ceee95b36645ba61a641f9a2f7f"
Commit 075e98ff authored by xiabo's avatar xiabo
Browse files

增加pd分离单实例跨机第二个ip通过配置文件获取。配置文件上设置如下:

# 第一个ip为D的第一个节点,第二个ip为D的第二个节点,配置:export IP_CONFIG_FILE=/data/xiabo/w4a8_1/ip_config.txt
192.168.1.1 192.168.1.100
192.168.1.2 192.168.1.101
192.168.1.3 192.168.1.102
10.16.1.75 10.16.1.76
parent cd738b68
...@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Optional ...@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Optional
import regex as re import regex as re
import torch import torch
import os
from vllm import envs from vllm import envs
from vllm.config import VllmConfig from vllm.config import VllmConfig
from vllm.distributed.kv_transfer.kv_connector.v1.base import ( from vllm.distributed.kv_transfer.kv_connector.v1.base import (
...@@ -103,6 +103,36 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -103,6 +103,36 @@ class P2pNcclConnector(KVConnectorBase_V1):
self.total_num_hidden_layers = getattr(self.model_config.hf_text_config, self.total_num_hidden_layers = getattr(self.model_config.hf_text_config,
"num_hidden_layers", 0) "num_hidden_layers", 0)
self.pp_size = self.parallel_config.pipeline_parallel_size self.pp_size = self.parallel_config.pipeline_parallel_size
self.tp_size = self.parallel_config.tensor_parallel_size
self.num_card = self.pp_size * self.tp_size
self.multiple_machines = 1 if self.num_card > 8 else 0
if self.is_producer and self.multiple_machines == 1:
self.ip_map = {}
self.duplicate_keys = []
config_file = os.getenv('IP_CONFIG_FILE')
if not config_file:
print("Warning: Please set the IPVNet FILE environment variable for cross machine recognition of the second IP address")
return
try:
with open(config_file, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if line and not line.startswith('#'):
ips = line.split()
if len(ips) == 2:
first_ip, second_ip = ips
if first_ip not in self.ip_map:
self.ip_map[first_ip] = second_ip
else:
print(f"warning: num {line_num} Incorrect format : {line}")
except Exception as e:
print(f"Error: Exception occurred while reading configuration file - {e}")
def get_ip_value(self, key):
return self.ip_map.get(key)
# ============================== # ==============================
# Worker-side methods # Worker-side methods
...@@ -387,6 +417,23 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -387,6 +417,23 @@ class P2pNcclConnector(KVConnectorBase_V1):
pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size
) % self.parallel_config.pipeline_parallel_size ) % self.parallel_config.pipeline_parallel_size
if (self.multiple_machines):
ip_second = self.get_ip_value(ip)
if (self.pp_size == 1):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank + 8))
elif (self.pp_size == 2):
if (pp_rank == 0):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
else:
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank))
else:
print("Error: only suppprt pp1 pp2 !!!!!!")
else:
if (self.pp_size == 1): if (self.pp_size == 1):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address) kv_cache, remote_address)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment