"vscode:/vscode.git/clone" did not exist on "ee10d7e6ff5875386c7f136ce8b5f525c8fcef48"
Unverified Commit 6aa6ad89 authored by Qi Wang's avatar Qi Wang Committed by GitHub
Browse files

[BugFix] Fix implicit and incorrect assumption on ECConnector is_producer (#34783)


Signed-off-by: default avatarQi Wang <qiwa@nvidia.com>
parent c8c3935b
#!/bin/bash
set -euo pipefail
MODEL="${MODEL:-Qwen/Qwen2.5-VL-3B-Instruct}"
PORT="${PORT:-8000}"
GPU="${GPU:-0}"
NUM_PROMPTS="${NUM_PROMPTS:-200}"
EC_SHARED_STORAGE_PATH="${EC_SHARED_STORAGE_PATH:-/tmp/ec_cache}"
TIMEOUT="${TIMEOUT:-600}"
SERVER_PID=""
cleanup() {
echo "Stopping server..."
if [[ -n "$SERVER_PID" ]] && kill -0 "$SERVER_PID" 2>/dev/null; then
kill "$SERVER_PID" 2>/dev/null || true
wait "$SERVER_PID" 2>/dev/null || true
fi
echo "Done."
}
trap cleanup EXIT INT TERM
wait_for_server() {
local deadline=$((SECONDS + TIMEOUT))
echo "Waiting for server on port $PORT..."
while (( SECONDS < deadline )); do
if curl -sf "http://localhost:${PORT}/v1/models" > /dev/null 2>&1; then
echo "Server ready."
return 0
fi
sleep 2
done
echo "ERROR: Server did not start within ${TIMEOUT}s"
return 1
}
rm -rf "$EC_SHARED_STORAGE_PATH"
mkdir -p "$EC_SHARED_STORAGE_PATH"
###############################################################################
# Start server with ec_both
###############################################################################
CUDA_VISIBLE_DEVICES="$GPU" \
vllm serve "$MODEL" \
--port "$PORT" \
--enforce-eager \
--ec-transfer-config '{
"ec_connector": "ECExampleConnector",
"ec_role": "ec_both",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
}
}' \
"$@" &
SERVER_PID=$!
wait_for_server
###############################################################################
# Benchmark -- dataset contains duplicate images, exercises cache hits
###############################################################################
echo "Running benchmark ($NUM_PROMPTS prompts)..."
vllm bench serve \
--model "$MODEL" \
--backend openai-chat \
--endpoint /v1/chat/completions \
--dataset-name hf \
--dataset-path lmarena-ai/VisionArena-Chat \
--seed 0 \
--num-prompts "$NUM_PROMPTS" \
--port "$PORT"
echo "Benchmark complete."
......@@ -3010,12 +3010,16 @@ def test_ec_connector_with_partial_cache_hit_multi_round(use_kv_connector):
# Encoder cache should contain all mm items from request2
_assert_right_encoder_cache_allocated(scheduler, requests=[request2])
# Should call update_state_after_alloc for hash1_C, ONLY
# hash1_A should not be loaded from connector
# since it's computed in last request & exist in local cache
# Order of getting encoder cache should be: local cache -> connector-> compute
scheduler.ec_connector.update_state_after_alloc.assert_called_with(request2, 0)
scheduler.ec_connector.update_state_after_alloc.assert_called_once()
# update_state_after_alloc is called for all paths:
# index 0 (hash1_C): connector hit → queued for load
# index 1 (hash1_D): cache miss → no-op inside connector
# index 2 (hash1_E): cache miss → no-op inside connector
scheduler.ec_connector.update_state_after_alloc.assert_any_call(request2, 0)
scheduler.ec_connector.update_state_after_alloc.assert_any_call(request2, 1)
scheduler.ec_connector.update_state_after_alloc.assert_any_call(request2, 2)
scheduler.ec_connector.update_state_after_alloc.reset_mock()
......@@ -3087,7 +3091,6 @@ def test_ec_connector_schedule_multiple_requests(cache_exist, use_kv_connector):
# mm_hashes of requests exist in cache after scheduling for all scenario
_assert_right_encoder_cache_allocated(scheduler, requests=requests)
# Should only call update_state_after_alloc when loaded externally
if cache_exist == "connector_only":
scheduler.ec_connector.update_state_after_alloc.assert_called_with(
requests[-1], 0
......@@ -3098,9 +3101,15 @@ def test_ec_connector_schedule_multiple_requests(cache_exist, use_kv_connector):
# Check metadata should contain mm data for all 10 requests
_assert_right_ec_connector_metadata(output, mm_features_list=mm_features_list)
else:
elif cache_exist == "local":
# Local cache hit: items never reach update_state_after_alloc
scheduler.ec_connector.update_state_after_alloc.assert_not_called()
# ECConnector should carry no metadata
_assert_right_ec_connector_metadata(output, mm_features_list=[])
else:
# no_where: called from encoder_inputs_to_schedule but no-op
# inside connector (has_cache_item returns False)
assert cache_exist == "no_where"
scheduler.ec_connector.update_state_after_alloc.assert_called()
_assert_right_ec_connector_metadata(output, mm_features_list=[])
scheduler.ec_connector.update_state_after_alloc.reset_mock()
......@@ -3419,7 +3428,6 @@ def test_priority_scheduling_ec_connector_preemption_and_resumption(
# mm_hash of request_low exists in cache after scheduling for all scenario
_assert_right_encoder_cache_allocated(scheduler, requests=[request_low])
# Should only call update_state_after_alloc when loaded externally
if cache_exist == "connector_only":
scheduler.ec_connector.update_state_after_alloc.assert_called_with(
request_low, 0
......@@ -3427,9 +3435,14 @@ def test_priority_scheduling_ec_connector_preemption_and_resumption(
_assert_right_ec_connector_metadata(
output, mm_features_list=request_low.mm_features
)
else:
elif cache_exist == "local":
scheduler.ec_connector.update_state_after_alloc.assert_not_called()
# ECConnector should carry no metadata
_assert_right_ec_connector_metadata(output, mm_features_list=[])
else:
assert cache_exist == "no_where"
scheduler.ec_connector.update_state_after_alloc.assert_called_with(
request_low, 0
)
_assert_right_ec_connector_metadata(output, mm_features_list=[])
scheduler.ec_connector.update_state_after_alloc.reset_mock()
......
......@@ -233,7 +233,8 @@ class TestStateManagement:
# Initial state should be empty
assert len(connector._mm_datas_need_loads) == 0
# Update state for all 3 items
# Update state for all 3 items (mock cache existence)
with patch.object(connector, "has_cache_item", return_value=True):
for i in range(3):
connector.update_state_after_alloc(mock_request_with_3_mm, index=i)
......@@ -255,7 +256,8 @@ class TestStateManagement:
role=ECConnectorRole.SCHEDULER,
)
# Setup state for all 3 items
# Setup state for all 3 items (mock cache existence)
with patch.object(connector, "has_cache_item", return_value=True):
for i in range(3):
connector.update_state_after_alloc(mock_request_with_3_mm, index=i)
......@@ -298,7 +300,8 @@ class TestStateManagement:
role=ECConnectorRole.SCHEDULER,
)
# Add state
# Add state (mock cache existence)
with patch.object(connector, "has_cache_item", return_value=True):
for i in range(3):
connector.update_state_after_alloc(mock_request_with_3_mm, index=i)
assert len(connector._mm_datas_need_loads) == 3
......@@ -608,16 +611,13 @@ class TestEdgeCases:
with pytest.raises(FileNotFoundError):
connector.start_load_caches(encoder_cache=encoder_cache)
def test_has_caches_empty_request(self, mock_vllm_config_producer):
"""Test has_caches with request that has no MM data."""
def test_has_cache_item_empty_request(self, mock_vllm_config_producer):
"""Test has_cache_item with a nonexistent identifier."""
connector = ECExampleConnector(
vllm_config=mock_vllm_config_producer,
role=ECConnectorRole.SCHEDULER,
)
mock_request = MockRequest("req_empty", [], [])
result = connector.has_cache_item("nonexistent_hash")
result = connector.has_caches(mock_request)
assert len(result) == 0
assert result == []
assert result is False
......@@ -141,8 +141,10 @@ class ECExampleConnector(ECConnectorBase):
Update ECConnector state after encoder cache allocation.
"""
mm_hash = request.mm_features[index].identifier
# Only load cache if it is consumer and cache exists
if not self.is_consumer or not self.has_cache_item(mm_hash):
return
num_encoder_token = request.get_num_encoder_embeds(index)
# Insert mm_hash only if this block has not been recorded yet.
self._mm_datas_need_loads[mm_hash] = num_encoder_token
def build_connector_meta(
......
......@@ -515,6 +515,8 @@ class Scheduler(SchedulerInterface):
# Allocate the encoder cache.
for i in encoder_inputs_to_schedule:
self.encoder_cache_manager.allocate(request, i)
if self.ec_connector is not None:
self.ec_connector.update_state_after_alloc(request, i)
encoder_compute_budget = new_encoder_compute_budget
if external_load_encoder_input:
for i in external_load_encoder_input:
......@@ -803,6 +805,8 @@ class Scheduler(SchedulerInterface):
# Allocate the encoder cache.
for i in encoder_inputs_to_schedule:
self.encoder_cache_manager.allocate(request, i)
if self.ec_connector is not None:
self.ec_connector.update_state_after_alloc(request, i)
encoder_compute_budget = new_encoder_compute_budget
# Allocate for external load encoder cache
if external_load_encoder_input:
......
......@@ -193,9 +193,9 @@ class EngineCore:
logger.debug("Batch queue is enabled with size %d", self.batch_queue_size)
self.batch_queue = deque(maxlen=self.batch_queue_size)
self.is_ec_producer = (
vllm_config.ec_transfer_config is not None
and vllm_config.ec_transfer_config.is_ec_producer
self.is_ec_consumer = (
vllm_config.ec_transfer_config is None
or vllm_config.ec_transfer_config.is_ec_consumer
)
self.is_pooling_model = vllm_config.model_config.runner_type == "pooling"
......@@ -449,7 +449,7 @@ class EngineCore:
exec_future = self.model_executor.execute_model(
scheduler_output, non_block=True
)
if not self.is_ec_producer:
if self.is_ec_consumer:
model_executed = scheduler_output.total_num_scheduled_tokens > 0
if self.is_pooling_model or not model_executed:
......
......@@ -100,7 +100,7 @@ class RayDistributedExecutor(Executor):
self.uses_sampler = self.vllm_config.model_config.runner_type != "pooling" and (
self.vllm_config.ec_transfer_config is None
or not self.vllm_config.ec_transfer_config.is_ec_producer
or self.vllm_config.ec_transfer_config.is_ec_consumer
)
self.scheduler_output: SchedulerOutput | None = None
......
......@@ -3409,7 +3409,7 @@ class GPUModelRunner(
# Update persistent batch states.
self._update_states(scheduler_output)
if has_ec_transfer() and get_ec_transfer().is_producer:
if has_ec_transfer() and not get_ec_transfer().is_consumer:
with self.maybe_get_ec_connector_output(
scheduler_output,
encoder_cache=self.encoder_cache,
......@@ -6182,7 +6182,7 @@ class GPUModelRunner(
KVCacheSpec: A dictionary mapping layer names to their KV cache
format. Layers that do not need KV cache are not included.
"""
if has_ec_transfer() and get_ec_transfer().is_producer:
if has_ec_transfer() and not get_ec_transfer().is_consumer:
return {}
kv_cache_spec: dict[str, KVCacheSpec] = {}
layer_type = cast(type[Any], AttentionLayerBase)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment