Unverified Commit 9fa8125c authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: trtllm use unified frontend (#4097)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 427ca9ab
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
export DYNAMO_HOME=${DYNAMO_HOME:-"/workspace"} export DYNAMO_HOME=${DYNAMO_HOME:-"/workspace"}
export MODEL_PATH=${MODEL_PATH:-"Qwen/Qwen2-VL-7B-Instruct"} export MODEL_PATH=${MODEL_PATH:-"Qwen/Qwen2-VL-7B-Instruct"}
export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"Qwen/Qwen2-VL-7B-Instruct"} export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"Qwen/Qwen2-VL-7B-Instruct"}
export DISAGGREGATION_STRATEGY=${DISAGGREGATION_STRATEGY:-"decode_first"}
export PREFILL_ENGINE_ARGS=${PREFILL_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/qwen2-vl-7b-instruct/trtllm/prefill.yaml"} export PREFILL_ENGINE_ARGS=${PREFILL_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/qwen2-vl-7b-instruct/trtllm/prefill.yaml"}
export DECODE_ENGINE_ARGS=${DECODE_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/qwen2-vl-7b-instruct/trtllm/decode.yaml"} export DECODE_ENGINE_ARGS=${DECODE_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/qwen2-vl-7b-instruct/trtllm/decode.yaml"}
export ENCODE_ENGINE_ARGS=${ENCODE_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/qwen2-vl-7b-instruct/trtllm/encode.yaml"} export ENCODE_ENGINE_ARGS=${ENCODE_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/qwen2-vl-7b-instruct/trtllm/encode.yaml"}
...@@ -37,7 +36,6 @@ CUDA_VISIBLE_DEVICES=$ENCODE_CUDA_VISIBLE_DEVICES python3 -m dynamo.trtllm \ ...@@ -37,7 +36,6 @@ CUDA_VISIBLE_DEVICES=$ENCODE_CUDA_VISIBLE_DEVICES python3 -m dynamo.trtllm \
--model-path "$MODEL_PATH" \ --model-path "$MODEL_PATH" \
--served-model-name "$SERVED_MODEL_NAME" \ --served-model-name "$SERVED_MODEL_NAME" \
--extra-engine-args "$ENCODE_ENGINE_ARGS" \ --extra-engine-args "$ENCODE_ENGINE_ARGS" \
--disaggregation-strategy "$DISAGGREGATION_STRATEGY" \
--modality "$MODALITY" \ --modality "$MODALITY" \
--allowed-local-media-path "$ALLOWED_LOCAL_MEDIA_PATH" \ --allowed-local-media-path "$ALLOWED_LOCAL_MEDIA_PATH" \
--max-file-size-mb "$MAX_FILE_SIZE_MB" \ --max-file-size-mb "$MAX_FILE_SIZE_MB" \
...@@ -49,7 +47,6 @@ CUDA_VISIBLE_DEVICES=$PREFILL_CUDA_VISIBLE_DEVICES python3 -m dynamo.trtllm \ ...@@ -49,7 +47,6 @@ CUDA_VISIBLE_DEVICES=$PREFILL_CUDA_VISIBLE_DEVICES python3 -m dynamo.trtllm \
--model-path "$MODEL_PATH" \ --model-path "$MODEL_PATH" \
--served-model-name "$SERVED_MODEL_NAME" \ --served-model-name "$SERVED_MODEL_NAME" \
--extra-engine-args "$PREFILL_ENGINE_ARGS" \ --extra-engine-args "$PREFILL_ENGINE_ARGS" \
--disaggregation-strategy "$DISAGGREGATION_STRATEGY" \
--modality "$MODALITY" \ --modality "$MODALITY" \
--disaggregation-mode prefill \ --disaggregation-mode prefill \
--encode-endpoint "$ENCODE_ENDPOINT" & --encode-endpoint "$ENCODE_ENDPOINT" &
...@@ -60,7 +57,6 @@ CUDA_VISIBLE_DEVICES=$DECODE_CUDA_VISIBLE_DEVICES python3 -m dynamo.trtllm \ ...@@ -60,7 +57,6 @@ CUDA_VISIBLE_DEVICES=$DECODE_CUDA_VISIBLE_DEVICES python3 -m dynamo.trtllm \
--model-path "$MODEL_PATH" \ --model-path "$MODEL_PATH" \
--served-model-name "$SERVED_MODEL_NAME" \ --served-model-name "$SERVED_MODEL_NAME" \
--extra-engine-args "$DECODE_ENGINE_ARGS" \ --extra-engine-args "$DECODE_ENGINE_ARGS" \
--disaggregation-strategy "$DISAGGREGATION_STRATEGY" \
--modality "$MODALITY" \ --modality "$MODALITY" \
--disaggregation-mode decode & --disaggregation-mode decode &
DECODE_PID=$! DECODE_PID=$!
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
export DYNAMO_HOME=${DYNAMO_HOME:-"/workspace"} export DYNAMO_HOME=${DYNAMO_HOME:-"/workspace"}
export MODEL_PATH=${MODEL_PATH:-"/model"} export MODEL_PATH=${MODEL_PATH:-"/model"}
export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"openai/gpt-oss-120b"} export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"openai/gpt-oss-120b"}
export DISAGGREGATION_STRATEGY=${DISAGGREGATION_STRATEGY:-"prefill_first"}
export PREFILL_ENGINE_ARGS=${PREFILL_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/gpt-oss-120b/trtllm/disagg/prefill.yaml"} export PREFILL_ENGINE_ARGS=${PREFILL_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/gpt-oss-120b/trtllm/disagg/prefill.yaml"}
export DECODE_ENGINE_ARGS=${DECODE_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/gpt-oss-120b/trtllm/disagg/decode.yaml"} export DECODE_ENGINE_ARGS=${DECODE_ENGINE_ARGS:-"$DYNAMO_HOME/recipes/gpt-oss-120b/trtllm/disagg/decode.yaml"}
...@@ -26,7 +25,6 @@ CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m dynamo.trtllm \ ...@@ -26,7 +25,6 @@ CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m dynamo.trtllm \
--dyn-reasoning-parser gpt_oss \ --dyn-reasoning-parser gpt_oss \
--dyn-tool-call-parser harmony \ --dyn-tool-call-parser harmony \
--disaggregation-mode prefill \ --disaggregation-mode prefill \
--disaggregation-strategy "$DISAGGREGATION_STRATEGY" \
--max-num-tokens 20000 \ --max-num-tokens 20000 \
--max-batch-size 32 \ --max-batch-size 32 \
--free-gpu-memory-fraction 0.9 \ --free-gpu-memory-fraction 0.9 \
...@@ -41,7 +39,6 @@ CUDA_VISIBLE_DEVICES=4,5,6,7 python3 -m dynamo.trtllm \ ...@@ -41,7 +39,6 @@ CUDA_VISIBLE_DEVICES=4,5,6,7 python3 -m dynamo.trtllm \
--dyn-reasoning-parser gpt_oss \ --dyn-reasoning-parser gpt_oss \
--dyn-tool-call-parser harmony \ --dyn-tool-call-parser harmony \
--disaggregation-mode decode \ --disaggregation-mode decode \
--disaggregation-strategy "$DISAGGREGATION_STRATEGY" \
--max-num-tokens 16384 \ --max-num-tokens 16384 \
--free-gpu-memory-fraction 0.9 \ --free-gpu-memory-fraction 0.9 \
--tensor-parallel-size 4 \ --tensor-parallel-size 4 \
......
...@@ -108,13 +108,13 @@ for ((i=1; i<=50; i++)); do ...@@ -108,13 +108,13 @@ for ((i=1; i<=50; i++)); do
if [[ "$http_code" == "200" ]] && echo "$body" | grep -q '"status":"healthy"' && echo "$body" | grep -q '"endpoints":\[[^]]*"dyn://dynamo.tensorrt_llm.generate"'; then if [[ "$http_code" == "200" ]] && echo "$body" | grep -q '"status":"healthy"' && echo "$body" | grep -q '"endpoints":\[[^]]*"dyn://dynamo.tensorrt_llm.generate"'; then
if [[ "$kind" == *disagg* ]]; then if [[ "$kind" == *disagg* ]]; then
if echo "$body" | grep -q '"tensorrt_llm_next"'; then if echo "$body" | grep -q '"prefill"'; then
echo "Health check succeeded on attempt $i" echo "Health check succeeded on attempt $i"
echo "$body" echo "$body"
failed=false failed=false
break break
else else
echo "Attempt $i: tensorrt_llm_next key not found in etcd." echo "Attempt $i: prefill endpoint not found in health check."
fi fi
else else
echo "Health check succeeded on attempt $i" echo "Health check succeeded on attempt $i"
......
...@@ -23,8 +23,6 @@ NUM_DECODE_NODES=${NUM_DECODE_NODES:-4} ...@@ -23,8 +23,6 @@ NUM_DECODE_NODES=${NUM_DECODE_NODES:-4}
NUM_DECODE_WORKERS=${NUM_DECODE_WORKERS:-1} NUM_DECODE_WORKERS=${NUM_DECODE_WORKERS:-1}
DECODE_ENGINE_CONFIG="${DECODE_ENGINE_CONFIG:-/mnt/recipes/deepseek-r1/trtllm/disagg/wide_ep/wide_ep_decode.yaml}" DECODE_ENGINE_CONFIG="${DECODE_ENGINE_CONFIG:-/mnt/recipes/deepseek-r1/trtllm/disagg/wide_ep/wide_ep_decode.yaml}"
DISAGGREGATION_STRATEGY=${DISAGGREGATION_STRATEGY:-"decode_first"}
# Automate settings of certain variables for convenience, but you are free # Automate settings of certain variables for convenience, but you are free
# to manually set these for more control as well. # to manually set these for more control as well.
ACCOUNT="$(sacctmgr -nP show assoc where user=$(whoami) format=account)" ACCOUNT="$(sacctmgr -nP show assoc where user=$(whoami) format=account)"
...@@ -70,7 +68,7 @@ for ((i=1; i<=${NUM_PREFILL_WORKERS}; i++)); do ...@@ -70,7 +68,7 @@ for ((i=1; i<=${NUM_PREFILL_WORKERS}; i++)); do
--oversubscribe \ --oversubscribe \
--container-image "${IMAGE}" \ --container-image "${IMAGE}" \
--container-mounts "${MOUNTS}" \ --container-mounts "${MOUNTS}" \
--container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,DISAGGREGATION_STRATEGY,ENGINE_CONFIG \ --container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,ENGINE_CONFIG \
--verbose \ --verbose \
--label \ --label \
-A "${ACCOUNT}" \ -A "${ACCOUNT}" \
...@@ -90,7 +88,7 @@ for ((i=1; i<=${NUM_DECODE_WORKERS}; i++)); do ...@@ -90,7 +88,7 @@ for ((i=1; i<=${NUM_DECODE_WORKERS}; i++)); do
--oversubscribe \ --oversubscribe \
--container-image "${IMAGE}" \ --container-image "${IMAGE}" \
--container-mounts "${MOUNTS}" \ --container-mounts "${MOUNTS}" \
--container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,DISAGGREGATION_STRATEGY,ENGINE_CONFIG \ --container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,ENGINE_CONFIG \
--verbose \ --verbose \
--label \ --label \
-A "${ACCOUNT}" \ -A "${ACCOUNT}" \
......
...@@ -27,8 +27,9 @@ if [[ -n ${DISAGGREGATION_MODE} ]]; then ...@@ -27,8 +27,9 @@ if [[ -n ${DISAGGREGATION_MODE} ]]; then
EXTRA_ARGS+="--disaggregation-mode ${DISAGGREGATION_MODE} " EXTRA_ARGS+="--disaggregation-mode ${DISAGGREGATION_MODE} "
fi fi
if [[ -n ${DISAGGREGATION_STRATEGY} ]]; then # Only publish KV events if using KV-aware routing (not needed for round-robin)
EXTRA_ARGS+="--disaggregation-strategy ${DISAGGREGATION_STRATEGY} " if [[ -n ${PUBLISH_KV_EVENTS} ]] && [[ ${PUBLISH_KV_EVENTS} == "true" ]]; then
EXTRA_ARGS+="--publish-events-and-metrics "
fi fi
if [[ -n ${MODALITY} ]]; then if [[ -n ${MODALITY} ]]; then
......
...@@ -251,8 +251,7 @@ spec: ...@@ -251,8 +251,7 @@ spec:
--tensor-parallel-size 1 \ --tensor-parallel-size 1 \
--max-batch-size 1 \ --max-batch-size 1 \
--free-gpu-memory-fraction 0.9 \ --free-gpu-memory-fraction 0.9 \
--disaggregation-mode prefill \ --disaggregation-mode prefill
--disaggregation-strategy prefill_first
command: command:
- /bin/sh - /bin/sh
- -c - -c
...@@ -311,8 +310,7 @@ spec: ...@@ -311,8 +310,7 @@ spec:
--tensor-parallel-size 2 \ --tensor-parallel-size 2 \
--max-batch-size 128 \ --max-batch-size 128 \
--free-gpu-memory-fraction 0.9 \ --free-gpu-memory-fraction 0.9 \
--disaggregation-mode decode \ --disaggregation-mode decode
--disaggregation-strategy prefill_first
command: command:
- /bin/sh - /bin/sh
- -c - -c
......
...@@ -60,19 +60,17 @@ pytest tests/fault_tolerance/cancellation/test_vllm.py::test_request_cancellatio ...@@ -60,19 +60,17 @@ pytest tests/fault_tolerance/cancellation/test_vllm.py::test_request_cancellatio
#### TRT-LLM Cancellation Tests #### TRT-LLM Cancellation Tests
| Test | Mode | Strategy | Cancellation Phase | Request Type | Setup | | Test | Mode | Cancellation Phase | Request Type | Setup |
|------|------|----------|-------------------|--------------|-------| |------|------|--------------------|--------------|-------|
| `test_request_cancellation_trtllm_aggregated` | Aggregated | N/A | During generation | 3 scenarios: completion, chat, streaming chat | 1 worker (prefill_and_decode) | | `test_request_cancellation_trtllm_aggregated` | Aggregated | During generation | 3 scenarios: completion, chat, streaming chat | 1 worker (prefill_and_decode) |
| `test_request_cancellation_trtllm_decode_first_decode_cancel` | Disaggregated | Decode-first | Remote decode | Streaming chat (5 responses read) | Prefill + Decode workers | | `test_request_cancellation_trtllm_disagg_decode_cancel` | Disaggregated | Remote decode | Streaming chat (5 responses read) | Prefill + Decode workers |
| `test_request_cancellation_trtllm_decode_first_remote_prefill_cancel` | Disaggregated | Decode-first | Remote prefill | Completion (long prompt) | Prefill + Decode workers | | `test_request_cancellation_trtllm_disagg_prefill_cancel` | Disaggregated | Remote prefill | Completion (long prompt) | Prefill + Decode workers |
| `test_request_cancellation_trtllm_prefill_first_prefill_cancel` | Disaggregated | Prefill-first | Local prefill | Completion (long prompt) | Decode + Prefill workers |
| `test_request_cancellation_trtllm_prefill_first_remote_decode_cancel` | Disaggregated | Prefill-first | Remote decode | Streaming chat (5 responses read) | Decode + Prefill workers |
**Run examples:** **Run examples:**
```bash ```bash
pytest tests/fault_tolerance/cancellation/test_trtllm.py::test_request_cancellation_trtllm_aggregated -v -s pytest tests/fault_tolerance/cancellation/test_trtllm.py::test_request_cancellation_trtllm_aggregated -v -s
pytest tests/fault_tolerance/cancellation/test_trtllm.py::test_request_cancellation_trtllm_decode_first_decode_cancel -v -s pytest tests/fault_tolerance/cancellation/test_trtllm.py::test_request_cancellation_trtllm_disagg_decode_cancel -v -s
# ... (other tests follow same pattern) pytest tests/fault_tolerance/cancellation/test_trtllm.py::test_request_cancellation_trtllm_disagg_prefill_cancel -v -s
``` ```
#### SGLang Cancellation Tests #### SGLang Cancellation Tests
...@@ -99,5 +97,5 @@ pytest tests/fault_tolerance/cancellation/test_sglang.py::test_request_cancellat ...@@ -99,5 +97,5 @@ pytest tests/fault_tolerance/cancellation/test_sglang.py::test_request_cancellat
**Verification patterns:** **Verification patterns:**
- Aggregated mode: "Aborted Request ID" in worker logs - Aggregated mode: "Aborted Request ID" in worker logs
- Remote prefill: "Aborted Request ID" in prefill, "Aborted Remote Request ID" in decode - Disaggregated - prefill cancellation: "Aborted Request ID" in prefill worker (cancellation during prefill)
- Remote decode: "Aborted Request ID" in decode, "Aborted Remote Request ID" in prefill - Disaggregated - decode cancellation: "Aborted Request ID" in decode worker (cancellation during decode)
...@@ -25,15 +25,17 @@ logger = logging.getLogger(__name__) ...@@ -25,15 +25,17 @@ logger = logging.getLogger(__name__)
class DynamoWorkerProcess(ManagedProcess): class DynamoWorkerProcess(ManagedProcess):
"""Process manager for Dynamo worker with TensorRT-LLM backend""" """Process manager for Dynamo worker with TensorRT-LLM backend"""
def __init__(self, request, mode: str = "prefill_and_decode", strategy: str = ""): def __init__(self, request, mode: str = "prefill_and_decode"):
""" """
Initialize TensorRT-LLM worker process. Initialize TensorRT-LLM worker process.
Args: Args:
request: pytest request object request: pytest request object
mode: One of "prefill_and_decode", "prefill", "decode" mode: One of "prefill_and_decode", "prefill", "decode"
strategy: One of "decode_first", "prefill_first"
""" """
# Prefill workers require migration_limit=0 (no KV cache migration support)
migration_limit = "0" if mode == "prefill" else "3"
command = [ command = [
"python3", "python3",
"-m", "-m",
...@@ -47,7 +49,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -47,7 +49,7 @@ class DynamoWorkerProcess(ManagedProcess):
"--max-seq-len", "--max-seq-len",
"8192", "8192",
"--migration-limit", "--migration-limit",
"3", migration_limit,
] ]
if mode != "prefill_and_decode": if mode != "prefill_and_decode":
with open("test_request_cancellation_trtllm_config.yaml", "w") as f: with open("test_request_cancellation_trtllm_config.yaml", "w") as f:
...@@ -56,8 +58,6 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -56,8 +58,6 @@ class DynamoWorkerProcess(ManagedProcess):
command += [ command += [
"--extra-engine-args", "--extra-engine-args",
"test_request_cancellation_trtllm_config.yaml", "test_request_cancellation_trtllm_config.yaml",
"--disaggregation-strategy",
strategy,
] ]
health_check_urls = [ health_check_urls = [
...@@ -207,15 +207,15 @@ def test_request_cancellation_trtllm_aggregated( ...@@ -207,15 +207,15 @@ def test_request_cancellation_trtllm_aggregated(
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_trtllm_decode_first_decode_cancel( def test_request_cancellation_trtllm_disagg_decode_cancel(
request, runtime_services, predownload_models request, runtime_services, predownload_models
): ):
""" """
End-to-end test for request cancellation during decode phase with decode_first strategy. End-to-end test for request cancellation during decode phase with unified frontend.
This test verifies that when a request is cancelled by the client during the decode phase, This test verifies that when a request is cancelled by the client during the decode phase,
the system properly handles the cancellation and cleans up resources the system properly handles the cancellation and cleans up resources
on the decode worker side in a disaggregated setup using decode_first strategy. on the decode worker side in a disaggregated setup.
""" """
# Step 1: Start the frontend # Step 1: Start the frontend
...@@ -223,15 +223,11 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel( ...@@ -223,15 +223,11 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the prefill worker # Step 2: Start the prefill worker
with DynamoWorkerProcess( with DynamoWorkerProcess(request, mode="prefill") as prefill_worker:
request, mode="prefill", strategy="decode_first"
) as prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# Step 3: Start the decode worker # Step 3: Start the decode worker
with DynamoWorkerProcess( with DynamoWorkerProcess(request, mode="decode") as decode_worker:
request, mode="decode", strategy="decode_first"
) as decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
...@@ -245,17 +241,17 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel( ...@@ -245,17 +241,17 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel(
# Send streaming request (non-blocking) # Send streaming request (non-blocking)
cancellable_req = send_cancellable_request("chat_completion_stream") cancellable_req = send_cancellable_request("chat_completion_stream")
# Poll for "New Request ID" pattern in decode worker # Poll for "Prefill Request ID" pattern in prefill worker (frontend routes here first)
request_id, decode_log_offset = poll_for_pattern( request_id, prefill_log_offset = poll_for_pattern(
process=decode_worker, process=prefill_worker,
pattern="New Request ID: ", pattern="Prefill Request ID: ",
match_type="contains", match_type="contains",
) )
# Verify same request ID reached prefill worker # Verify same request ID reached decode worker (after prefill completes)
_, prefill_log_offset = poll_for_pattern( _, decode_log_offset = poll_for_pattern(
process=prefill_worker, process=decode_worker,
pattern=f"New Request ID: {request_id}", pattern=f"Decode Request ID: {request_id}",
) )
# Read 5 streaming responses (decode phase) # Read 5 streaming responses (decode phase)
...@@ -287,15 +283,15 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel( ...@@ -287,15 +283,15 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel(
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel( def test_request_cancellation_trtllm_disagg_prefill_cancel(
request, runtime_services, predownload_models request, runtime_services, predownload_models
): ):
""" """
End-to-end test for request cancellation during remote prefill phase with decode_first strategy. End-to-end test for request cancellation during prefill phase with unified frontend.
This test verifies that when a request is cancelled by the client during the remote prefill phase, This test verifies that when a request is cancelled by the client during the prefill phase,
the system properly handles the cancellation and cleans up resources the system properly handles the cancellation and cleans up resources on the prefill worker.
on both the decode and prefill workers in a disaggregated setup using decode_first strategy. Since the request is cancelled before prefill completes, the decode worker never receives it.
""" """
# Step 1: Start the frontend # Step 1: Start the frontend
...@@ -303,106 +299,16 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel( ...@@ -303,106 +299,16 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the prefill worker # Step 2: Start the prefill worker
with DynamoWorkerProcess( with DynamoWorkerProcess(request, mode="prefill") as prefill_worker:
request, mode="prefill", strategy="decode_first"
) as prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# Step 3: Start the decode worker # Step 3: Start the decode worker
with DynamoWorkerProcess( with DynamoWorkerProcess(request, mode="decode") as decode_worker:
request, mode="decode", strategy="decode_first"
) as decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
time.sleep(2) time.sleep(2)
# Step 4: Test request cancellation during remote prefill phase
logger.info(
"Testing completion request cancellation during remote prefill phase..."
)
# Send request with long prompt (non-blocking)
cancellable_req = send_cancellable_request(
"completion", use_long_prompt=True
)
# Poll for "New Request ID" pattern in decode worker
request_id, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern="New Request ID: ",
match_type="contains",
)
# Poll for same request ID in prefill worker (remote prefill)
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"New Request ID: {request_id}",
)
# Cancel during prefill phase
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id} during remote prefill")
# Poll for "Aborted Request ID" in prefill worker first (where cancellation happens)
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=prefill_log_offset,
)
# Then poll for "Aborted Remote Request ID" in decode worker
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"Aborted Remote Request ID: {request_id}",
log_offset=decode_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
)
logger.info(
"Completion request cancellation during remote prefill phase detected successfully"
)
@pytest.mark.trtllm_marker
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_trtllm_prefill_first_prefill_cancel(
request, runtime_services, predownload_models
):
"""
End-to-end test for request cancellation during prefill phase with prefill_first strategy.
This test verifies that when a request is cancelled by the client during the prefill phase,
the system properly handles the cancellation and cleans up resources
on the prefill worker side in a disaggregated setup using prefill_first strategy.
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(request) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start the decode worker
with DynamoWorkerProcess(
request, mode="decode", strategy="prefill_first"
) as decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# Step 3: Start the prefill worker
with DynamoWorkerProcess(
request, mode="prefill", strategy="prefill_first"
) as prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
time.sleep(2)
# Step 4: Test request cancellation during prefill phase # Step 4: Test request cancellation during prefill phase
logger.info( logger.info(
"Testing completion request cancellation during prefill phase..." "Testing completion request cancellation during prefill phase..."
...@@ -413,18 +319,18 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel( ...@@ -413,18 +319,18 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel(
"completion", use_long_prompt=True "completion", use_long_prompt=True
) )
# Poll for "New Request ID" pattern in prefill worker # Poll for "Prefill Request ID" pattern in prefill worker (frontend routes here first)
request_id, prefill_log_offset = poll_for_pattern( request_id, prefill_log_offset = poll_for_pattern(
process=prefill_worker, process=prefill_worker,
pattern="New Request ID: ", pattern="Prefill Request ID: ",
match_type="contains", match_type="contains",
) )
# Cancel during prefill phase (before reaching decode worker) # Cancel during prefill phase
cancellable_req.cancel() cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id} during prefill phase") logger.info(f"Cancelled request ID: {request_id} during prefill")
# Poll for "Aborted Request ID" in prefill worker # Poll for "Aborted Request ID" in prefill worker (where cancellation happens)
_, prefill_log_offset = poll_for_pattern( _, prefill_log_offset = poll_for_pattern(
process=prefill_worker, process=prefill_worker,
pattern=f"Aborted Request ID: {request_id}", pattern=f"Aborted Request ID: {request_id}",
...@@ -440,90 +346,3 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel( ...@@ -440,90 +346,3 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel(
logger.info( logger.info(
"Completion request cancellation during prefill phase detected successfully" "Completion request cancellation during prefill phase detected successfully"
) )
@pytest.mark.trtllm_marker
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_trtllm_prefill_first_remote_decode_cancel(
request, runtime_services, predownload_models
):
"""
End-to-end test for request cancellation during remote decode phase with prefill_first strategy.
This test verifies that when a request is cancelled by the client during the remote decode phase,
the system properly handles the cancellation and cleans up resources
on both the prefill and decode workers in a disaggregated setup using prefill_first strategy.
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(request) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start the decode worker
with DynamoWorkerProcess(
request, mode="decode", strategy="prefill_first"
) as decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# Step 3: Start the prefill worker
with DynamoWorkerProcess(
request, mode="prefill", strategy="prefill_first"
) as prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
time.sleep(2)
# Step 4: Test request cancellation during remote decode phase
logger.info(
"Testing chat completion stream request cancellation during remote decode phase..."
)
# Send streaming request (non-blocking)
cancellable_req = send_cancellable_request("chat_completion_stream")
# Poll for "New Request ID" pattern in prefill worker
request_id, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern="New Request ID: ",
match_type="contains",
)
# Poll for same request ID in decode worker (remote decode)
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"New Request ID: {request_id}",
)
# Read 5 streaming responses (remote decode phase)
read_streaming_responses(cancellable_req, expected_count=5)
# Now cancel the request
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id} during remote decode")
# Poll for "Aborted Request ID" in decode worker first (where cancellation happens)
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=decode_log_offset,
)
# Then poll for "Aborted Remote Request ID" in prefill worker
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"Aborted Remote Request ID: {request_id}",
log_offset=prefill_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
)
logger.info(
"Chat completion stream cancellation during remote decode phase detected successfully"
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment