Commit 09639eec authored by zhuwenwen's avatar zhuwenwen
Browse files

增加pd分离单实例跨机第二个ip通过配置文件获取。配置文件上设置如下:

# 第一个ip为D的第一个节点,第二个ip为D的第二个节点,配置:export IP_CONFIG_FILE=/data/xiabo/w4a8_1/ip_config.txt
192.168.1.1 192.168.1.100
192.168.1.2 192.168.1.101
192.168.1.3 192.168.1.102
10.16.1.75 10.16.1.76
parent 2b4f2bc3
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional from typing import TYPE_CHECKING, Any, Optional
import os
import regex as re import regex as re
import torch import torch
...@@ -92,6 +93,35 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -92,6 +93,35 @@ class P2pNcclConnector(KVConnectorBase_V1):
self.total_num_hidden_layers = getattr(self.model_config.hf_text_config, self.total_num_hidden_layers = getattr(self.model_config.hf_text_config,
"num_hidden_layers", 0) "num_hidden_layers", 0)
self.pp_size = self.parallel_config.pipeline_parallel_size self.pp_size = self.parallel_config.pipeline_parallel_size
self.tp_size = self.parallel_config.tensor_parallel_size
self.num_card = self.pp_size * self.tp_size
self.multiple_machines = 1 if self.num_card > 8 else 0
if self.is_producer and self.multiple_machines == 1:
self.ip_map = {}
self.duplicate_keys = []
config_file = os.getenv('IP_CONFIG_FILE')
if not config_file:
print("Warning: Please set the IPVNet FILE environment variable for cross machine recognition of the second IP address")
return
try:
with open(config_file, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if line and not line.startswith('#'):
ips = line.split()
if len(ips) == 2:
first_ip, second_ip = ips
if first_ip not in self.ip_map:
self.ip_map[first_ip] = second_ip
else:
print(f"warning: num {line_num} Incorrect format : {line}")
except Exception as e:
print(f"Error: Exception occurred while reading configuration file - {e}")
def get_ip_value(self, key):
return self.ip_map.get(key)
# ============================== # ==============================
# Worker-side methods # Worker-side methods
...@@ -292,26 +322,44 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -292,26 +322,44 @@ class P2pNcclConnector(KVConnectorBase_V1):
kv_cache = extract_kv_from_layer(kv_layer, request.block_ids) kv_cache = extract_kv_from_layer(kv_layer, request.block_ids)
pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size
) % self.parallel_config.pipeline_parallel_size ) % self.parallel_config.pipeline_parallel_size
if (self.pp_size == 1): if (self.multiple_machines):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, ip_second = self.get_ip_value(ip)
kv_cache, remote_address) if (self.pp_size == 1):
elif (self.pp_size == 2): self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
if (pp_rank == 0): kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank + 8))
elif (self.pp_size == 2):
if (pp_rank == 0):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
else:
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank))
else:
print("Error: only suppprt pp1 pp2 !!!!!!")
else:
if (self.pp_size == 1):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address) kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, elif (self.pp_size == 2):
kv_cache, ip + ":" + str(port + self._rank + 4)) if (pp_rank == 0):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + self._rank + 4))
else:
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + self._rank - 4))
elif (self.pp_size == 8):
for i in range(8):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + i))
else: else:
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, print("Error: only suppprt pp1 pp2 pp8!!!!!!")
kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + self._rank - 4))
elif (self.pp_size == 8):
for i in range(8):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + i))
else:
print("Error: only suppprt pp1 pp2 pp8!!!!!!")
def wait_for_save(self): def wait_for_save(self):
pass pass
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment