Commit 7293a072 authored by 王敏's avatar 王敏
Browse files

Merge remote-tracking branch 'origin/v0.9.2-dev-ds' into v0.9.2-dev-ds

# Conflicts:
#	vllm/model_executor/models/deepseek_v2.py
parents 98b7432a db2c32b0
......@@ -123,9 +123,9 @@ class Attention(nn.Module):
assert isinstance(quant_method, BaseKVCacheMethod)
# TODO (mgoin): kv cache dtype should be specified in the FP8
# checkpoint config and become the "auto" behavior
if self.kv_cache_dtype == "fp8_e5m2":
raise ValueError("fp8_e5m2 kv-cache is not supported with "
"fp8 checkpoints.")
# if self.kv_cache_dtype == "fp8_e5m2":
# raise ValueError("fp8_e5m2 kv-cache is not supported with "
# "fp8 checkpoints.")
# If quantization is enabled, we make "k_scale" and "v_scale"
# parameters so that it can be loaded from the model checkpoint.
# The k/v_scale will then be converted back to native float32
......
......@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, Optional
import regex as re
import torch
import os
from vllm import envs
from vllm.config import VllmConfig
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
......@@ -103,6 +104,35 @@ class P2pNcclConnector(KVConnectorBase_V1):
self.total_num_hidden_layers = getattr(self.model_config.hf_text_config,
"num_hidden_layers", 0)
self.pp_size = self.parallel_config.pipeline_parallel_size
self.tp_size = self.parallel_config.tensor_parallel_size
self.num_card = self.pp_size * self.tp_size
self.multiple_machines = 1 if self.num_card > 8 else 0
if self.is_producer and self.multiple_machines == 1:
self.ip_map = {}
self.duplicate_keys = []
config_file = os.getenv('IP_CONFIG_FILE')
if not config_file:
print("Warning: Please set the IPVNet FILE environment variable for cross machine recognition of the second IP address")
return
try:
with open(config_file, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if line and not line.startswith('#'):
ips = line.split()
if len(ips) == 2:
first_ip, second_ip = ips
if first_ip not in self.ip_map:
self.ip_map[first_ip] = second_ip
else:
print(f"warning: num {line_num} Incorrect format : {line}")
except Exception as e:
print(f"Error: Exception occurred while reading configuration file - {e}")
def get_ip_value(self, key):
return self.ip_map.get(key)
# ==============================
# Worker-side methods
......@@ -252,7 +282,13 @@ class P2pNcclConnector(KVConnectorBase_V1):
2, num_pages * page_size, -1)
inject_start_index = 0
for num in range(self.p2p_nccl_engine.tensor_split_num):
req_layer = f"{request.request_id}#{layer_name}"
with self.p2p_nccl_engine.recv_store_cv:
while req_layer not in self.p2p_nccl_engine.recv_split_nums:
self.p2p_nccl_engine.recv_store_cv.wait()
split_num = self.p2p_nccl_engine.recv_split_nums.get(req_layer)
for num in range(split_num):
kv_cache = self.p2p_nccl_engine.recv_tensor(
request.request_id + "#" + layer_name + "#" + str(num))
......@@ -280,6 +316,7 @@ class P2pNcclConnector(KVConnectorBase_V1):
# inject_kv_into_layer(kv_cache_layer, kv_cache,
# request.slot_mapping, request.request_id)
tensor_id = request.request_id + "#" + layer_name + "#" + str(num)
if tensor_id in self.p2p_nccl_engine.recv_store:
tensor = self.p2p_nccl_engine.recv_store.pop(tensor_id, None)
self.p2p_nccl_engine.send_request_id_to_tensor_ids.pop(
......@@ -387,6 +424,24 @@ class P2pNcclConnector(KVConnectorBase_V1):
pp_rank = (self.parallel_config.rank // self.parallel_config.tensor_parallel_size
) % self.parallel_config.pipeline_parallel_size
if (self.multiple_machines):
ip_second = self.get_ip_value(ip)
if (self.pp_size == 1):
if self._rank < 8:
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank + 8))
elif (self.pp_size == 2):
if (pp_rank == 0):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
else:
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, str(ip_second) + ":" + str(port + self._rank))
else:
print("Error: only suppprt pp1 pp2 !!!!!!")
else:
if (self.pp_size == 1):
self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name,
kv_cache, remote_address)
......
......@@ -117,6 +117,7 @@ class P2pNcclEngine:
self.p2p_async_kv_tokens = envs.VLLM_P2P_BUF_TOKENS
self.p2p_async_buf = None
self.tensor_split_num: int = 0
self.recv_split_nums: dict[str, int] = {}
mem_pool_size_gb = self.config.get_from_extra_config(
"mem_pool_size_gb", DEFAULT_MEM_POOL_SIZE_GB)
......@@ -200,7 +201,6 @@ class P2pNcclEngine:
tensor_id: str,
tensor: torch.Tensor,
remote_address: typing.Optional[str] = None,
tbo_evt = None,
) -> bool:
if remote_address is None:
with self.recv_store_cv:
......@@ -251,7 +251,8 @@ class P2pNcclEngine:
if remote_address is None:
with self.recv_store_cv:
self.recv_store[tensor_id] = tensor
self.recv_store_cv.notify()
# self.recv_store_cv.notify()
self.recv_store_cv.notify_all()
return True
else:
if self.send_type == "PUT":
......@@ -260,7 +261,7 @@ class P2pNcclEngine:
with self.send_queue_cv:
kv_layer, slot_mapping = tensor # tesor (kv_layer, slot_mapping)
self.send_queue.append([tensor_id, remote_address, kv_layer, slot_mapping, tbo_evt])
self.send_queue_cv.notify()
self.send_queue_cv.notify_all()
else: # GET
with self.send_store_cv:
tensor_size = tensor.element_size() * tensor.numel()
......@@ -365,7 +366,14 @@ class P2pNcclEngine:
elif data["cmd"] == "PUT":
tensor_id = data["tensor_id"]
if "tensor_split_num" in data:
self.tensor_split_num = data["tensor_split_num"]
# self.tensor_split_num = data["tensor_split_num"]
parts = tensor_id.split('#')
request_id = parts[0]
layer_name = parts[1]
req_layer = f"{request_id}#{layer_name}"
self.recv_split_nums[req_layer] = data["tensor_split_num"]
with self.recv_store_cv:
self.recv_store_cv.notify_all()
try:
with torch.cuda.stream(self.recv_stream):
tensor = torch.empty(data["shape"],
......@@ -397,7 +405,8 @@ class P2pNcclEngine:
with self.recv_store_cv:
self.recv_store[tensor_id] = tensor
self._have_received_tensor_id(tensor_id)
self.recv_store_cv.notify()
#self.recv_store_cv.notify()
self.recv_store_cv.notify_all()
elif data["cmd"] == "GET":
tensor_id = data["tensor_id"]
......@@ -450,7 +459,7 @@ class P2pNcclEngine:
else:
tensor_id, remote_address, tensor = self.send_queue.popleft()
if not self.send_queue:
self.send_queue_cv.notify()
self.send_queue_cv.notify_all()
if (envs.VLLM_ENABLE_TBO or envs.VLLM_P2P_ASYNC) and tbo_evt is not None:
self.send_stream.wait_event(tbo_evt)
self._send_kv_p2p_sync(tensor_id, kv_layer, slot_mapping, remote_address)
......@@ -590,20 +599,30 @@ class P2pNcclEngine:
"""
# Clear the buffer upon request completion.
# for request_id in finished_req_ids:
# for layer_name in forward_context.no_compile_layers:
# tensor_id = request_id + "#" + layer_name
# if tensor_id in self.recv_store:
# with self.recv_store_cv:
# tensor = self.recv_store.pop(tensor_id, None)
# self.send_request_id_to_tensor_ids.pop(
# request_id, None)
# self.recv_request_id_to_tensor_ids.pop(
# request_id, None)
# addr = 0
# if isinstance(tensor, tuple):
# addr, _, _ = tensor
# self.pool.free(addr)
for request_id in finished_req_ids:
for layer_name in forward_context.no_compile_layers:
tensor_id = request_id + "#" + layer_name
if tensor_id in self.recv_store:
ids = self.recv_request_id_to_tensor_ids.pop(request_id, set())
with self.recv_store_cv:
for tensor_id in ids:
tensor = self.recv_store.pop(tensor_id, None)
self.send_request_id_to_tensor_ids.pop(
request_id, None)
self.recv_request_id_to_tensor_ids.pop(
request_id, None)
addr = 0
if isinstance(tensor, tuple):
addr, _, _ = tensor
self.pool.free(addr)
self.send_request_id_to_tensor_ids.pop(request_id, None)
# TODO:Retrieve requests that have already sent the KV cache.
finished_sending: set[str] = set()
......
......@@ -226,21 +226,7 @@ class DeepseekV2MoE(nn.Module):
router_logits, _ = self.gate(hidden_states)
if not self.use_mori_ep:
if envs.envs.VLLM_USE_LIGHTOP_MOE_SUM_MUL_ADD:
if self.enable_expert_parallel:
final_hidden_states = self.experts(
hidden_states=hidden_states,
router_logits=router_logits)
if shared_output is not None:
if hidden_states.dtype != torch.float16:
final_hidden_states = final_hidden_states + shared_output
else:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states = final_hidden_states + shared_output \
* (1. / self.routed_scaling_factor)
else:
if envs.VLLM_USE_LIGHTOP_MOE_SUM_MUL_ADD:
final_hidden_states = self.experts(
hidden_states=hidden_states,
router_logits=router_logits,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment