Commit 3c117f20 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.9.2-dev-pd-bwandnmz' into 'v0.9.2-dev'

支持pd分离:P做跨机pp2tp8,D做单机的tp8.

See merge request dcutoolkit/deeplearing/vllm!311
parents 4f9947e6 bdeb3a85
...@@ -90,6 +90,8 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -90,6 +90,8 @@ class P2pNcclConnector(KVConnectorBase_V1):
if role == KVConnectorRole.WORKER else 0 if role == KVConnectorRole.WORKER else 0
self._local_rank = get_world_group().local_rank \ self._local_rank = get_world_group().local_rank \
if role == KVConnectorRole.WORKER else 0 if role == KVConnectorRole.WORKER else 0
self._tp_rank = get_tp_group().rank_in_group \
if role == KVConnectorRole.WORKER else 0
self.p2p_nccl_engine = P2pNcclEngine( self.p2p_nccl_engine = P2pNcclEngine(
local_rank=self._local_rank, local_rank=self._local_rank,
...@@ -105,9 +107,19 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -105,9 +107,19 @@ class P2pNcclConnector(KVConnectorBase_V1):
self.pp_size = self.parallel_config.pipeline_parallel_size self.pp_size = self.parallel_config.pipeline_parallel_size
self.tp_size = self.parallel_config.tensor_parallel_size self.tp_size = self.parallel_config.tensor_parallel_size
self.num_card = self.pp_size * self.tp_size self.num_card = self.pp_size * self.tp_size
self.multiple_machines = 1 if self.num_card > 8 else 0
if self.is_producer and self.multiple_machines == 1: self.remote_tp_size = self.config.get_from_extra_config(
"remote_tp_size", self.tp_size)
self.remote_pp_size = self.config.get_from_extra_config(
"remote_pp_size", self.pp_size)
self.enable_asymmetric_p2p = self.config.get_from_extra_config(
"enable_asymmetric_p2p", False)
self.remote_num_card = self.remote_tp_size * self.remote_pp_size
self.multiple_machines_d = 1 if self.remote_num_card > 8 else 0
self.multiple_machines_p = 1 if self.num_card > 8 else 0
if self.is_producer and self.multiple_machines_p == 1:
self.ip_map = {} self.ip_map = {}
self.duplicate_keys = [] self.duplicate_keys = []
config_file = os.getenv('IP_CONFIG_FILE') config_file = os.getenv('IP_CONFIG_FILE')
...@@ -417,7 +429,7 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -417,7 +429,7 @@ class P2pNcclConnector(KVConnectorBase_V1):
pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size
) % self.parallel_config.pipeline_parallel_size ) % self.parallel_config.pipeline_parallel_size
if (self.multiple_machines): if (self.multiple_machines_p and self.multiple_machines_d):
ip_second = self.get_ip_value(ip) ip_second = self.get_ip_value(ip)
if (self.pp_size == 1): if (self.pp_size == 1):
if self._rank < 8: if self._rank < 8:
...@@ -433,8 +445,16 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -433,8 +445,16 @@ class P2pNcclConnector(KVConnectorBase_V1):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank)) kv_cache, str(ip_second) + ":" + str(port + self._rank))
else: else:
print("Error: only suppprt pp1 pp2 !!!!!!") logger.error("Error: multiple machines only suppprt pp1tp16 and pp2tp8!!!!!!")
elif (self.multiple_machines_p and not self.multiple_machines_d):
if (self.pp_size == 2):
remote_address = ip + ":" + str(port + self._tp_rank)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
else: else:
logger.error("Error: P multiple machines D machine only suppprt P:pp2tp8 D:tp8 !!!!!!")
elif (not self.multiple_machines_p and not self.multiple_machines_d):
if (self.pp_size == 1): if (self.pp_size == 1):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address) kv_cache, remote_address)
...@@ -453,9 +473,13 @@ class P2pNcclConnector(KVConnectorBase_V1): ...@@ -453,9 +473,13 @@ class P2pNcclConnector(KVConnectorBase_V1):
for i in range(8): for i in range(8):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, ip + ":" + str(port + i)) kv_cache, ip + ":" + str(port + i))
elif (self.enable_asymmetric_p2p):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
else: else:
print("Error: only suppprt pp1 pp2 pp8!!!!!!") logger.error("Error: P/D single machine only suppprt multiple tp:: (P: pp2tp4 D:tp8 P:pp8tp1 D:tp8) !!!!!!")
else:
logger.error("Error: not support!!!!!!")
def wait_for_save(self): def wait_for_save(self):
pass pass
# if self.is_producer: # if self.is_producer:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment