Commit 20254503 authored by 王敏's avatar 王敏
Browse files

Merge remote-tracking branch 'origin/v0.15.1-dev' into v0.15.1-dev-pcp

parents 1e9ff2e7 3842b316
...@@ -19,7 +19,7 @@ grpcio-tools>=1.76.0 ...@@ -19,7 +19,7 @@ grpcio-tools>=1.76.0
numa numa
# pytrie # pytrie
cmake==3.29 cmake==3.29.2
quart quart
fastrlock==0.8.3 fastrlock==0.8.3
# cupy==12.3.0 # cupy==12.3.0
......
...@@ -383,6 +383,24 @@ def fused_add_rms_norm_opt_fake( ...@@ -383,6 +383,24 @@ def fused_add_rms_norm_opt_fake(
) -> None: ) -> None:
return None return None
def silu_and_mul_opt_lightop(input: torch.Tensor) -> torch.Tensor:
return torch.ops.vllm.silu_and_mul_opt_lightop(input)
def silu_and_mul_opt_lightop_impl(input: torch.Tensor) -> torch.Tensor:
d = input.shape[-1] // 2
output_shape = input.shape[:-1] + (d,)
out = torch.empty(output_shape, dtype=input.dtype, device=input.device)
op.silu_and_mul_opt(out, input)
return out
def silu_and_mul_opt_lightop_fake(input: torch.Tensor) -> torch.Tensor:
d = input.shape[-1] // 2
output_shape = input.shape[:-1] + (d,)
return input.new_empty(output_shape)
def fused_qk_norm_rope( def fused_qk_norm_rope(
qkv: torch.Tensor, qkv: torch.Tensor,
num_heads_q: int, num_heads_q: int,
...@@ -3631,6 +3649,13 @@ direct_register_custom_op( ...@@ -3631,6 +3649,13 @@ direct_register_custom_op(
fake_impl=fused_add_rms_norm_opt_fake, fake_impl=fused_add_rms_norm_opt_fake,
) )
direct_register_custom_op(
op_name="silu_and_mul_opt_lightop",
op_func=silu_and_mul_opt_lightop_impl,
mutates_args=[],
fake_impl=silu_and_mul_opt_lightop_fake,
)
""" """
qwen3-vl-8b中LLM的修改 rms+mrope dim==1 2026/03/18 qwen3-vl-8b中LLM的修改 rms+mrope dim==1 2026/03/18
""" """
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import importlib import importlib
from vllm import envs
from collections.abc import Callable from collections.abc import Callable
from typing import TYPE_CHECKING, Optional, cast from typing import TYPE_CHECKING, Optional, cast
...@@ -45,6 +46,7 @@ class KVConnectorFactory: ...@@ -45,6 +46,7 @@ class KVConnectorFactory:
config: "VllmConfig", config: "VllmConfig",
role: KVConnectorRole, role: KVConnectorRole,
kv_cache_config: Optional["KVCacheConfig"] = None, kv_cache_config: Optional["KVCacheConfig"] = None,
dp_rank: int = -1,
) -> KVConnectorBase: ) -> KVConnectorBase:
kv_transfer_config = config.kv_transfer_config kv_transfer_config = config.kv_transfer_config
if kv_transfer_config is None: if kv_transfer_config is None:
...@@ -77,6 +79,8 @@ class KVConnectorFactory: ...@@ -77,6 +79,8 @@ class KVConnectorFactory:
if compat_sig: if compat_sig:
# Old signature: __init__(self, vllm_config, role) # Old signature: __init__(self, vllm_config, role)
return connector_cls(config, role) return connector_cls(config, role)
elif envs.VLLM_USE_DP_CONNECTOR:
return connector_cls(config, role, kv_cache_config, dp_rank)
else: else:
# New signature: __init__(self, vllm_config, role, kv_cache_config) # New signature: __init__(self, vllm_config, role, kv_cache_config)
return connector_cls(config, role, kv_cache_config) return connector_cls(config, role, kv_cache_config)
...@@ -160,6 +164,11 @@ KVConnectorFactory.register_connector( ...@@ -160,6 +164,11 @@ KVConnectorFactory.register_connector(
"vllm.distributed.kv_transfer.kv_connector.v1.du.du_swift_connector", "vllm.distributed.kv_transfer.kv_connector.v1.du.du_swift_connector",
"DuSwiftConnector") "DuSwiftConnector")
KVConnectorFactory.register_connector(
"DuSwiftConnectorDp",
"vllm.distributed.kv_transfer.kv_connector.v1.du.du_swift_connector_dp",
"DuSwiftConnectorDp")
KVConnectorFactory.register_connector( KVConnectorFactory.register_connector(
"LMCacheConnectorV1", "LMCacheConnectorV1",
"vllm.distributed.kv_transfer.kv_connector.v1.lmcache_connector", "vllm.distributed.kv_transfer.kv_connector.v1.lmcache_connector",
......
...@@ -1843,6 +1843,9 @@ environment_variables: dict[str, Callable[[], Any]] = { ...@@ -1843,6 +1843,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
# vllm will use rmsquant fused op # vllm will use rmsquant fused op
"USE_FUSED_RMS_QUANT": "USE_FUSED_RMS_QUANT":
lambda: bool(int(os.getenv("USE_FUSED_RMS_QUANT", "0"))), lambda: bool(int(os.getenv("USE_FUSED_RMS_QUANT", "0"))),
#vllm use dp connector
"VLLM_USE_DP_CONNECTOR":
lambda: bool(int(os.getenv("VLLM_USE_DP_CONNECTOR", "0"))),
# vllm pd separation will be used async # vllm pd separation will be used async
"VLLM_P2P_ASYNC": "VLLM_P2P_ASYNC":
lambda: bool(int(os.getenv("VLLM_P2P_ASYNC", "0"))), lambda: bool(int(os.getenv("VLLM_P2P_ASYNC", "0"))),
......
...@@ -150,12 +150,14 @@ class SiluAndMul(CustomOp): ...@@ -150,12 +150,14 @@ class SiluAndMul(CustomOp):
return F.silu(x[..., :d]) * x[..., d:] return F.silu(x[..., :d]) * x[..., d:]
def forward_cuda(self, x: torch.Tensor) -> torch.Tensor: def forward_cuda(self, x: torch.Tensor) -> torch.Tensor:
if envs.VLLM_USE_OPT_OP:
from vllm import _custom_ops as ops
return ops.silu_and_mul_opt_lightop(x)
else:
d = x.shape[-1] // 2 d = x.shape[-1] // 2
output_shape = x.shape[:-1] + (d,) output_shape = x.shape[:-1] + (d,)
out = torch.empty(output_shape, dtype=x.dtype, device=x.device) out = torch.empty(output_shape, dtype=x.dtype, device=x.device)
if envs.VLLM_USE_OPT_OP:
self.op_opt(out, x)
else:
self.op(out, x) self.op(out, x)
return out return out
......
...@@ -121,7 +121,7 @@ class Scheduler(SchedulerInterface): ...@@ -121,7 +121,7 @@ class Scheduler(SchedulerInterface):
config=self.vllm_config, config=self.vllm_config,
role=KVConnectorRole.SCHEDULER, role=KVConnectorRole.SCHEDULER,
kv_cache_config=self.kv_cache_config, kv_cache_config=self.kv_cache_config,
) dp_rank=self.parallel_config.data_parallel_rank)
if self.log_stats: if self.log_stats:
self.connector_prefix_cache_stats = PrefixCacheStats() self.connector_prefix_cache_stats = PrefixCacheStats()
kv_load_failure_policy = ( kv_load_failure_policy = (
...@@ -556,6 +556,12 @@ class Scheduler(SchedulerInterface): ...@@ -556,6 +556,12 @@ class Scheduler(SchedulerInterface):
+ len(scheduled_running_reqs) >= max_batch_running): + len(scheduled_running_reqs) >= max_batch_running):
break break
request = self.waiting.peek_request() request = self.waiting.peek_request()
if self.connector and not self.connector.is_producer and \
request.request_id not in self.finished_recving_kv_req_ids and \
envs.VLLM_USE_DP_CONNECTOR:
self.waiting.pop_request()
skipped_waiting_requests.prepend_request(request)
continue
# KVTransfer: skip request if still waiting for remote kvs. # KVTransfer: skip request if still waiting for remote kvs.
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
is_ready = self._update_waiting_for_remote_kv(request) is_ready = self._update_waiting_for_remote_kv(request)
......
...@@ -66,6 +66,7 @@ from vllm.v1.serial_utils import MsgpackDecoder, MsgpackEncoder ...@@ -66,6 +66,7 @@ from vllm.v1.serial_utils import MsgpackDecoder, MsgpackEncoder
from vllm.v1.structured_output import StructuredOutputManager from vllm.v1.structured_output import StructuredOutputManager
from vllm.v1.utils import compute_iteration_details from vllm.v1.utils import compute_iteration_details
from vllm.version import __version__ as VLLM_VERSION from vllm.version import __version__ as VLLM_VERSION
from vllm import envs
logger = init_logger(__name__) logger = init_logger(__name__)
...@@ -1155,6 +1156,11 @@ class EngineCoreProc(EngineCore): ...@@ -1155,6 +1156,11 @@ class EngineCoreProc(EngineCore):
# Push to input queue for core busy loop. # Push to input queue for core busy loop.
self.input_queue.put_nowait((request_type, request)) self.input_queue.put_nowait((request_type, request))
if isinstance(request, tuple) and self.scheduler.connector is not None \
and envs.VLLM_USE_DP_CONNECTOR:
req, _ = request
if request_type == EngineCoreRequestType.ADD:
self.scheduler.connector.register_req(req.request_id)
def process_output_sockets( def process_output_sockets(
self, self,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment