Unverified Commit 4d48fe67 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

test: Remove timing dependency on E2E cancellation tests (#3391)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent f414bc5a
...@@ -10,9 +10,9 @@ import pytest ...@@ -10,9 +10,9 @@ import pytest
from tests.fault_tolerance.cancellation.utils import ( from tests.fault_tolerance.cancellation.utils import (
DynamoFrontendProcess, DynamoFrontendProcess,
read_log_content, poll_for_pattern,
send_request_and_cancel, read_streaming_responses,
strip_ansi_codes, send_cancellable_request,
) )
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT from tests.utils.engine_process import FRONTEND_PORT
...@@ -126,102 +126,6 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -126,102 +126,6 @@ class DynamoWorkerProcess(ManagedProcess):
return False return False
def verify_request_cancelled(
frontend_process: DynamoFrontendProcess,
worker_process: DynamoWorkerProcess,
remote_worker_process: DynamoWorkerProcess | None = None,
frontend_log_offset: int = 0,
worker_log_offset: int = 0,
remote_worker_log_offset: int = 0,
assert_request_reach_remote_worker: bool = False,
assert_cancel_at_remote_worker: bool = False,
) -> tuple[int, int, int]:
"""Verify the logs contain expected cancellation messages"""
# Check worker log for cancellation pattern
worker_log_content = read_log_content(worker_process._log_path)
new_worker_content = worker_log_content[worker_log_offset:]
# Find the LAST occurrence of "New Request ID: <id>" line (health checks may log earlier ones)
request_id = None
for line in reversed(new_worker_content.split("\n")):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if "New Request ID: " in clean_line:
# Extract ID from the last delimiter occurrence on the line
parts = clean_line.rsplit("New Request ID: ", 1)
if len(parts) > 1:
request_id = parts[-1].strip()
break
if request_id is None:
pytest.fail("Could not find 'New Request ID: <id>' pattern in worker log")
has_worker_cancellation = False
cancellation_pattern = f"Aborted {'Remote ' if assert_cancel_at_remote_worker else ''}Request ID: {request_id}"
for line in new_worker_content.split("\n"):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(cancellation_pattern):
has_worker_cancellation = True
break
if not has_worker_cancellation:
pytest.fail(f"Could not find '{cancellation_pattern}' pattern in worker log")
# Check remote worker log if provided
if remote_worker_process is not None:
remote_worker_log_content = read_log_content(remote_worker_process._log_path)
new_remote_worker_content = remote_worker_log_content[remote_worker_log_offset:]
# Check if the same request ID reached remote worker
if assert_request_reach_remote_worker:
has_reach_remote = False
remote_reach_pattern = f"New Request ID: {request_id}"
for line in new_remote_worker_content.split("\n"):
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(remote_reach_pattern):
has_reach_remote = True
break
if not has_reach_remote:
pytest.fail(
f"Could not find '{remote_reach_pattern}' pattern in remote worker log"
)
# Check if the same request ID was cancelled at remote worker
if assert_cancel_at_remote_worker:
has_remote_cancel = False
remote_cancel_pattern = f"Aborted Request ID: {request_id}"
for line in remote_worker_log_content.split("\n"):
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(remote_cancel_pattern):
has_remote_cancel = True
break
if not has_remote_cancel:
pytest.fail(
f"Could not find '{remote_cancel_pattern}' pattern in remote worker log"
)
# Check frontend log for cancellation issued pattern
frontend_log_content = read_log_content(frontend_process._log_path)
new_frontend_content = frontend_log_content[frontend_log_offset:]
has_kill_message = False
kill_message = "issued control message Kill to sender"
for line in new_frontend_content.split("\n"):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(kill_message):
has_kill_message = True
break
if not has_kill_message:
pytest.fail("Could not find cancellation issued in frontend log")
return (
len(frontend_log_content),
len(worker_log_content),
(0 if remote_worker_process is None else len(remote_worker_log_content)),
)
@pytest.mark.trtllm_marker @pytest.mark.trtllm_marker
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
...@@ -242,16 +146,13 @@ def test_request_cancellation_trtllm_aggregated( ...@@ -242,16 +146,13 @@ def test_request_cancellation_trtllm_aggregated(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start an aggregated worker # Step 2: Start an aggregated worker
logger.info("Starting aggregated worker...") with DynamoWorkerProcess(request, mode="prefill_and_decode") as worker:
worker = DynamoWorkerProcess(request, mode="prefill_and_decode")
with worker:
logger.info(f"Aggregated Worker PID: {worker.get_pid()}") logger.info(f"Aggregated Worker PID: {worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
time.sleep(2) time.sleep(2)
# Step 3: Test request cancellation # Step 3: Test request cancellation with polling approach
frontend_log_offset, worker_log_offset = 0, 0 frontend_log_offset, worker_log_offset = 0, 0
test_scenarios = [ test_scenarios = [
...@@ -263,19 +164,40 @@ def test_request_cancellation_trtllm_aggregated( ...@@ -263,19 +164,40 @@ def test_request_cancellation_trtllm_aggregated(
), ),
] ]
for i, (request_type, description) in enumerate(test_scenarios, 1): for request_type, description in test_scenarios:
logger.info(f"Testing {description.lower()}...") logger.info(f"Testing {description.lower()}...")
send_request_and_cancel(request_type)
logger.info( # Send the request (non-blocking)
"Checking for cancellation messages in worker and frontend logs..." cancellable_req = send_cancellable_request(request_type)
# Poll for "New Request ID" pattern
request_id, worker_log_offset = poll_for_pattern(
process=worker,
pattern="New Request ID: ",
log_offset=worker_log_offset,
match_type="contains",
)
# For streaming, read 5 responses before cancelling
if request_type == "chat_completion_stream":
read_streaming_responses(cancellable_req, expected_count=5)
# Now cancel the request
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id}")
# Poll for "Aborted Request ID" with matching ID
_, worker_log_offset = poll_for_pattern(
process=worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=worker_log_offset,
) )
time.sleep(0.05) # time for cancellation to propagate
frontend_log_offset, worker_log_offset, _ = verify_request_cancelled( # Verify frontend log has kill message
frontend, _, frontend_log_offset = poll_for_pattern(
worker, process=frontend,
frontend_log_offset=frontend_log_offset, pattern="issued control message Kill to sender",
worker_log_offset=worker_log_offset, log_offset=frontend_log_offset,
) )
logger.info(f"{description} detected successfully") logger.info(f"{description} detected successfully")
...@@ -301,42 +223,63 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel( ...@@ -301,42 +223,63 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the prefill worker # Step 2: Start the prefill worker
logger.info("Starting prefill worker...") with DynamoWorkerProcess(
prefill_worker = DynamoWorkerProcess(
request, mode="prefill", strategy="decode_first" request, mode="prefill", strategy="decode_first"
) ) as prefill_worker:
with prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# Step 3: Start the decode worker # Step 3: Start the decode worker
logger.info("Starting decode worker...") with DynamoWorkerProcess(
decode_worker = DynamoWorkerProcess(
request, mode="decode", strategy="decode_first" request, mode="decode", strategy="decode_first"
) ) as decode_worker:
with decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
time.sleep(2) time.sleep(2)
# Step 4: Test request cancellation for completion scenario only # Step 4: Test request cancellation for streaming scenario
logger.info( logger.info(
"Testing completion request cancellation in decode worker (decode phase)..." "Testing chat completion stream request cancellation in decode worker (decode phase)..."
) )
send_request_and_cancel("completion")
logger.info( # Send streaming request (non-blocking)
"Checking for cancellation messages in decode and prefill worker and frontend logs..." cancellable_req = send_cancellable_request("chat_completion_stream")
# Poll for "New Request ID" pattern in decode worker
request_id, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern="New Request ID: ",
match_type="contains",
) )
time.sleep(0.05) # time for cancellation to propagate
verify_request_cancelled( # Verify same request ID reached prefill worker
frontend, _, prefill_log_offset = poll_for_pattern(
decode_worker, process=prefill_worker,
prefill_worker, pattern=f"New Request ID: {request_id}",
assert_request_reach_remote_worker=True, )
assert_cancel_at_remote_worker=False,
# Read 5 streaming responses (decode phase)
read_streaming_responses(cancellable_req, expected_count=5)
# Now cancel the request
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id}")
# Poll for "Aborted Request ID" in decode worker
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=decode_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
)
logger.info(
"Chat completion stream cancellation in decode phase detected successfully"
) )
...@@ -344,11 +287,6 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel( ...@@ -344,11 +287,6 @@ def test_request_cancellation_trtllm_decode_first_decode_cancel(
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.xfail(
reason="Time-sensitive test: Relies on request timeout (0.1s) to cancel during remote prefill phase. "
"May fail if prefill completes too quickly or timeout triggers at a different phase.",
strict=False,
)
def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel( def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel(
request, runtime_services, predownload_models request, runtime_services, predownload_models
): ):
...@@ -365,21 +303,15 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel( ...@@ -365,21 +303,15 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the prefill worker # Step 2: Start the prefill worker
logger.info("Starting prefill worker...") with DynamoWorkerProcess(
prefill_worker = DynamoWorkerProcess(
request, mode="prefill", strategy="decode_first" request, mode="prefill", strategy="decode_first"
) ) as prefill_worker:
with prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# Step 3: Start the decode worker # Step 3: Start the decode worker
logger.info("Starting decode worker...") with DynamoWorkerProcess(
decode_worker = DynamoWorkerProcess(
request, mode="decode", strategy="decode_first" request, mode="decode", strategy="decode_first"
) ) as decode_worker:
with decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
...@@ -389,18 +321,51 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel( ...@@ -389,18 +321,51 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel(
logger.info( logger.info(
"Testing completion request cancellation during remote prefill phase..." "Testing completion request cancellation during remote prefill phase..."
) )
send_request_and_cancel("completion", timeout=0.1, use_long_prompt=True)
logger.info( # Send request with long prompt (non-blocking)
"Checking for cancellation messages in decode and prefill worker and frontend logs..." cancellable_req = send_cancellable_request(
"completion", use_long_prompt=True
)
# Poll for "New Request ID" pattern in decode worker
request_id, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern="New Request ID: ",
match_type="contains",
)
# Poll for same request ID in prefill worker (remote prefill)
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"New Request ID: {request_id}",
)
# Cancel during prefill phase
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id} during remote prefill")
# Poll for "Aborted Request ID" in prefill worker first (where cancellation happens)
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=prefill_log_offset,
)
# Then poll for "Aborted Remote Request ID" in decode worker
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"Aborted Remote Request ID: {request_id}",
log_offset=decode_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
) )
time.sleep(0.05) # time for cancellation to propagate
verify_request_cancelled( logger.info(
frontend, "Completion request cancellation during remote prefill phase detected successfully"
decode_worker,
prefill_worker,
assert_request_reach_remote_worker=True,
assert_cancel_at_remote_worker=True,
) )
...@@ -408,11 +373,6 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel( ...@@ -408,11 +373,6 @@ def test_request_cancellation_trtllm_decode_first_remote_prefill_cancel(
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.xfail(
reason="Time-sensitive test: Relies on request timeout (0.1s) to cancel during prefill phase. "
"May fail if prefill completes too quickly or timeout triggers at a different phase.",
strict=False,
)
def test_request_cancellation_trtllm_prefill_first_prefill_cancel( def test_request_cancellation_trtllm_prefill_first_prefill_cancel(
request, runtime_services, predownload_models request, runtime_services, predownload_models
): ):
...@@ -429,21 +389,15 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel( ...@@ -429,21 +389,15 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the decode worker # Step 2: Start the decode worker
logger.info("Starting decode worker...") with DynamoWorkerProcess(
decode_worker = DynamoWorkerProcess(
request, mode="decode", strategy="prefill_first" request, mode="decode", strategy="prefill_first"
) ) as decode_worker:
with decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# Step 3: Start the prefill worker # Step 3: Start the prefill worker
logger.info("Starting prefill worker...") with DynamoWorkerProcess(
prefill_worker = DynamoWorkerProcess(
request, mode="prefill", strategy="prefill_first" request, mode="prefill", strategy="prefill_first"
) ) as prefill_worker:
with prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
...@@ -453,18 +407,38 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel( ...@@ -453,18 +407,38 @@ def test_request_cancellation_trtllm_prefill_first_prefill_cancel(
logger.info( logger.info(
"Testing completion request cancellation during prefill phase..." "Testing completion request cancellation during prefill phase..."
) )
send_request_and_cancel("completion", timeout=0.1, use_long_prompt=True)
logger.info( # Send request with long prompt (non-blocking)
"Checking for cancellation messages in prefill and decode worker and frontend logs..." cancellable_req = send_cancellable_request(
"completion", use_long_prompt=True
)
# Poll for "New Request ID" pattern in prefill worker
request_id, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern="New Request ID: ",
match_type="contains",
) )
time.sleep(0.05) # time for cancellation to propagate
verify_request_cancelled( # Cancel during prefill phase (before reaching decode worker)
frontend, cancellable_req.cancel()
prefill_worker, logger.info(f"Cancelled request ID: {request_id} during prefill phase")
decode_worker,
assert_request_reach_remote_worker=False, # Poll for "Aborted Request ID" in prefill worker
assert_cancel_at_remote_worker=False, _, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=prefill_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
)
logger.info(
"Completion request cancellation during prefill phase detected successfully"
) )
...@@ -488,21 +462,15 @@ def test_request_cancellation_trtllm_prefill_first_remote_decode_cancel( ...@@ -488,21 +462,15 @@ def test_request_cancellation_trtllm_prefill_first_remote_decode_cancel(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the decode worker # Step 2: Start the decode worker
logger.info("Starting decode worker...") with DynamoWorkerProcess(
decode_worker = DynamoWorkerProcess(
request, mode="decode", strategy="prefill_first" request, mode="decode", strategy="prefill_first"
) ) as decode_worker:
with decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# Step 3: Start the prefill worker # Step 3: Start the prefill worker
logger.info("Starting prefill worker...") with DynamoWorkerProcess(
prefill_worker = DynamoWorkerProcess(
request, mode="prefill", strategy="prefill_first" request, mode="prefill", strategy="prefill_first"
) ) as prefill_worker:
with prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness?
...@@ -510,18 +478,52 @@ def test_request_cancellation_trtllm_prefill_first_remote_decode_cancel( ...@@ -510,18 +478,52 @@ def test_request_cancellation_trtllm_prefill_first_remote_decode_cancel(
# Step 4: Test request cancellation during remote decode phase # Step 4: Test request cancellation during remote decode phase
logger.info( logger.info(
"Testing completion request cancellation during remote decode phase..." "Testing chat completion stream request cancellation during remote decode phase..."
)
# Send streaming request (non-blocking)
cancellable_req = send_cancellable_request("chat_completion_stream")
# Poll for "New Request ID" pattern in prefill worker
request_id, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern="New Request ID: ",
match_type="contains",
)
# Poll for same request ID in decode worker (remote decode)
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"New Request ID: {request_id}",
)
# Read 5 streaming responses (remote decode phase)
read_streaming_responses(cancellable_req, expected_count=5)
# Now cancel the request
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id} during remote decode")
# Poll for "Aborted Request ID" in decode worker first (where cancellation happens)
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=decode_log_offset,
)
# Then poll for "Aborted Remote Request ID" in prefill worker
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"Aborted Remote Request ID: {request_id}",
log_offset=prefill_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
) )
send_request_and_cancel("completion")
logger.info( logger.info(
"Checking for cancellation messages in prefill and decode worker and frontend logs..." "Chat completion stream cancellation during remote decode phase detected successfully"
)
time.sleep(0.05) # time for cancellation to propagate
verify_request_cancelled(
frontend,
prefill_worker,
decode_worker,
assert_request_reach_remote_worker=True,
assert_cancel_at_remote_worker=True,
) )
...@@ -4,15 +4,14 @@ ...@@ -4,15 +4,14 @@
import logging import logging
import os import os
import shutil import shutil
import time
import pytest import pytest
from tests.fault_tolerance.cancellation.utils import ( from tests.fault_tolerance.cancellation.utils import (
DynamoFrontendProcess, DynamoFrontendProcess,
read_log_content, poll_for_pattern,
send_request_and_cancel, read_streaming_responses,
strip_ansi_codes, send_cancellable_request,
) )
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT from tests.utils.engine_process import FRONTEND_PORT
...@@ -112,114 +111,19 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -112,114 +111,19 @@ class DynamoWorkerProcess(ManagedProcess):
return False return False
def verify_request_cancelled(
frontend_process: DynamoFrontendProcess,
worker_process: DynamoWorkerProcess,
prefill_worker_process: DynamoWorkerProcess | None = None,
frontend_log_offset: int = 0,
worker_log_offset: int = 0,
assert_cancel_at_prefill: bool = False,
) -> tuple[int, int]:
"""Verify that the worker and frontend logs contain cancellation messages
Returns:
tuple: (new_worker_log_length, new_frontend_log_length)
"""
# Check worker log for cancellation pattern
worker_log_content = read_log_content(worker_process._log_path)
new_worker_content = worker_log_content[worker_log_offset:]
# Find the LAST occurrence of "New Request ID: <id>" line (health checks may log earlier ones)
request_id = None
for line in reversed(new_worker_content.split("\n")):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if "New Request ID: " in clean_line:
# Extract ID from the last delimiter occurrence on the line
parts = clean_line.rsplit("New Request ID: ", 1)
if len(parts) > 1:
request_id = parts[-1].strip()
break
if request_id is None:
pytest.fail("Could not find 'New Request ID: <id>' pattern in worker log")
# Check if the same request ID was cancelled
has_worker_cancellation = False
cancellation_pattern = (
f"Aborted Remote Prefill Request ID: {request_id}"
if assert_cancel_at_prefill
else f"Aborted Request ID: {request_id}"
)
for line in new_worker_content.split("\n"):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(cancellation_pattern):
has_worker_cancellation = True
break
if not has_worker_cancellation:
pytest.fail(f"Could not find '{cancellation_pattern}' pattern in worker log")
# Check prefill worker log if provided
if prefill_worker_process is not None:
prefill_worker_log_content = read_log_content(prefill_worker_process._log_path)
# Check if the same request ID was remote prefilled
has_remote_prefill = False
remote_prefill_pattern = f"New Prefill Request ID: {request_id}"
for line in prefill_worker_log_content.split("\n"):
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(remote_prefill_pattern):
has_remote_prefill = True
break
if not has_remote_prefill:
pytest.fail(
f"Could not find '{remote_prefill_pattern}' pattern in prefill worker log"
)
# Check for remote prefill cancellation
if assert_cancel_at_prefill:
has_prefill_cancellation = False
prefill_cancellation_pattern = f"Aborted Prefill Request ID: {request_id}"
for line in prefill_worker_log_content.split("\n"):
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(prefill_cancellation_pattern):
has_prefill_cancellation = True
break
if not has_prefill_cancellation:
pytest.fail(
f"Could not find '{prefill_cancellation_pattern}' pattern in prefill worker log"
)
# Check frontend log for cancellation issued pattern
frontend_log_content = read_log_content(frontend_process._log_path)
new_frontend_content = frontend_log_content[frontend_log_offset:]
has_kill_message = False
kill_message = "issued control message Kill to sender"
for line in new_frontend_content.split("\n"):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(kill_message):
has_kill_message = True
break
if not has_kill_message:
pytest.fail("Could not find cancellation issued in frontend log")
return len(frontend_log_content), len(worker_log_content)
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_vllm(request, runtime_services, predownload_models): def test_request_cancellation_vllm_aggregated(
request, runtime_services, predownload_models
):
""" """
End-to-end test for request cancellation functionality. End-to-end test for request cancellation functionality in aggregated mode.
This test verifies that when a request is cancelled by the client, This test verifies that when a request is cancelled by the client,
the system properly handles the cancellation and cleans up resources the system properly handles the cancellation and cleans up resources
on the worker side. Tests three scenarios: on the worker side in aggregated (single worker) mode. Tests three scenarios:
1. Completion request 1. Completion request
2. Chat completion request (non-streaming) 2. Chat completion request (non-streaming)
3. Chat completion request (streaming) 3. Chat completion request (streaming)
...@@ -230,13 +134,10 @@ def test_request_cancellation_vllm(request, runtime_services, predownload_models ...@@ -230,13 +134,10 @@ def test_request_cancellation_vllm(request, runtime_services, predownload_models
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start a single worker # Step 2: Start a single worker
logger.info("Starting worker...") with DynamoWorkerProcess(request) as worker:
worker = DynamoWorkerProcess(request)
with worker:
logger.info(f"Worker PID: {worker.get_pid()}") logger.info(f"Worker PID: {worker.get_pid()}")
# Step 3: Test request cancellation # Step 3: Test request cancellation with polling approach
frontend_log_offset, worker_log_offset = 0, 0 frontend_log_offset, worker_log_offset = 0, 0
test_scenarios = [ test_scenarios = [
...@@ -248,19 +149,40 @@ def test_request_cancellation_vllm(request, runtime_services, predownload_models ...@@ -248,19 +149,40 @@ def test_request_cancellation_vllm(request, runtime_services, predownload_models
), ),
] ]
for i, (request_type, description) in enumerate(test_scenarios, 1): for request_type, description in test_scenarios:
logger.info(f"Testing {description.lower()}...") logger.info(f"Testing {description.lower()}...")
send_request_and_cancel(request_type)
logger.info( # Send the request (non-blocking)
"Checking for cancellation messages in worker and frontend logs..." cancellable_req = send_cancellable_request(request_type)
# Poll for "New Request ID" pattern
request_id, worker_log_offset = poll_for_pattern(
process=worker,
pattern="New Request ID: ",
log_offset=worker_log_offset,
match_type="contains",
) )
time.sleep(0.05) # time for cancellation to propagate
frontend_log_offset, worker_log_offset = verify_request_cancelled( # For streaming, read 5 responses before cancelling
frontend, if request_type == "chat_completion_stream":
worker, read_streaming_responses(cancellable_req, expected_count=5)
frontend_log_offset=frontend_log_offset,
worker_log_offset=worker_log_offset, # Now cancel the request
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id}")
# Poll for "Aborted Request ID" with matching ID
_, worker_log_offset = poll_for_pattern(
process=worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=worker_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
log_offset=frontend_log_offset,
) )
logger.info(f"{description} detected successfully") logger.info(f"{description} detected successfully")
...@@ -270,13 +192,13 @@ def test_request_cancellation_vllm(request, runtime_services, predownload_models ...@@ -270,13 +192,13 @@ def test_request_cancellation_vllm(request, runtime_services, predownload_models
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_vllm_decode( def test_request_cancellation_vllm_decode_cancel(
request, runtime_services, predownload_models request, runtime_services, predownload_models
): ):
""" """
End-to-end test for request cancellation functionality with remote prefill. End-to-end test for request cancellation during decode phase.
This test verifies that when a request is cancelled by the client, This test verifies that when a request is cancelled by the client during the decode phase,
the system properly handles the cancellation and cleans up resources the system properly handles the cancellation and cleans up resources
on the decode worker side in a disaggregated setup. on the decode worker side in a disaggregated setup.
""" """
...@@ -286,51 +208,72 @@ def test_request_cancellation_vllm_decode( ...@@ -286,51 +208,72 @@ def test_request_cancellation_vllm_decode(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the prefill worker # Step 2: Start the prefill worker
logger.info("Starting prefill worker...") with DynamoWorkerProcess(request, is_prefill=True) as prefill_worker:
prefill_worker = DynamoWorkerProcess(request, is_prefill=True)
with prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# Step 3: Start the decode worker # Step 3: Start the decode worker
logger.info("Starting decode worker...") with DynamoWorkerProcess(request, is_prefill=False) as decode_worker:
decode_worker = DynamoWorkerProcess(request, is_prefill=False)
with decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# Step 4: Test request cancellation for completion scenario only # Step 4: Test request cancellation for streaming scenario
logger.info( logger.info(
"Testing completion request cancellation in decode worker..." "Testing chat completion stream request cancellation in decode worker (decode phase)..."
)
# Send streaming request (non-blocking)
cancellable_req = send_cancellable_request("chat_completion_stream")
# Poll for "New Request ID" pattern in decode worker
request_id, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern="New Request ID: ",
match_type="contains",
)
# Verify same request ID reached prefill worker (as "New Prefill Request ID")
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"New Prefill Request ID: {request_id}",
)
# Read 5 streaming responses (decode phase)
read_streaming_responses(cancellable_req, expected_count=5)
# Now cancel the request
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id}")
# Poll for "Aborted Request ID" in decode worker
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"Aborted Request ID: {request_id}",
log_offset=decode_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
) )
send_request_and_cancel("completion")
logger.info( logger.info(
"Checking for cancellation messages in decode and prefill worker and frontend logs..." "Chat completion stream cancellation in decode phase detected successfully"
) )
time.sleep(0.05) # time for cancellation to propagate
verify_request_cancelled(frontend, decode_worker, prefill_worker)
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.xfail( def test_request_cancellation_vllm_remote_prefill_cancel(
reason="Time-sensitive test: Relies on request timeout (0.1s) to cancel during prefill phase. "
"May fail if prefill completes too quickly or timeout triggers at a different phase.",
strict=False,
)
def test_request_cancellation_vllm_prefill(
request, runtime_services, predownload_models request, runtime_services, predownload_models
): ):
""" """
End-to-end test for request cancellation on remote prefill. End-to-end test for request cancellation during remote prefill phase.
This test verifies that when a request is cancelled by the client during the This test verifies that when a request is cancelled by the client during the remote prefill phase,
prefill phase, the system properly handles the cancellation and cleans up the system properly handles the cancellation and cleans up resources
resources on the prefill worker and decode worker sides in a disaggregated on both the decode and prefill workers in a disaggregated setup.
setup.
""" """
# Step 1: Start the frontend # Step 1: Start the frontend
...@@ -338,32 +281,60 @@ def test_request_cancellation_vllm_prefill( ...@@ -338,32 +281,60 @@ def test_request_cancellation_vllm_prefill(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 2: Start the prefill worker # Step 2: Start the prefill worker
logger.info("Starting prefill worker...") with DynamoWorkerProcess(request, is_prefill=True) as prefill_worker:
prefill_worker = DynamoWorkerProcess(request, is_prefill=True)
with prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")
# Step 3: Start the decode worker # Step 3: Start the decode worker
logger.info("Starting decode worker...") with DynamoWorkerProcess(request, is_prefill=False) as decode_worker:
decode_worker = DynamoWorkerProcess(request, is_prefill=False)
with decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")
# Step 4: Test request cancellation for completion scenario only # Step 4: Test request cancellation during remote prefill phase
logger.info( logger.info(
"Testing completion request cancellation in prefill worker..." "Testing completion request cancellation during remote prefill phase..."
) )
send_request_and_cancel("completion", timeout=0.1, use_long_prompt=True)
logger.info( # Send request with long prompt (non-blocking)
"Checking for cancellation messages in decode and prefill worker and frontend logs..." cancellable_req = send_cancellable_request(
"completion", use_long_prompt=True
) )
time.sleep(0.05) # time for cancellation to propagate
verify_request_cancelled( # Poll for "New Request ID" pattern in decode worker
frontend, request_id, decode_log_offset = poll_for_pattern(
decode_worker, process=decode_worker,
prefill_worker, pattern="New Request ID: ",
assert_cancel_at_prefill=True, match_type="contains",
)
# Poll for same request ID in prefill worker (as "New Prefill Request ID")
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"New Prefill Request ID: {request_id}",
)
# Cancel during prefill phase
cancellable_req.cancel()
logger.info(f"Cancelled request ID: {request_id} during remote prefill")
# Poll for "Aborted Prefill Request ID" in prefill worker first (where cancellation happens)
_, prefill_log_offset = poll_for_pattern(
process=prefill_worker,
pattern=f"Aborted Prefill Request ID: {request_id}",
log_offset=prefill_log_offset,
)
# Then poll for "Aborted Remote Prefill Request ID" in decode worker
_, decode_log_offset = poll_for_pattern(
process=decode_worker,
pattern=f"Aborted Remote Prefill Request ID: {request_id}",
log_offset=decode_log_offset,
)
# Verify frontend log has kill message
_, frontend_log_offset = poll_for_pattern(
process=frontend,
pattern="issued control message Kill to sender",
)
logger.info(
"Completion request cancellation during remote prefill phase detected successfully"
) )
...@@ -5,7 +5,10 @@ import logging ...@@ -5,7 +5,10 @@ import logging
import os import os
import re import re
import shutil import shutil
import socket
import threading
import time import time
from typing import Any, Callable, Dict
import pytest import pytest
import requests import requests
...@@ -46,10 +49,134 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -46,10 +49,134 @@ class DynamoFrontendProcess(ManagedProcess):
) )
def send_completion_request( class CancellableRequest:
prompt: str, max_tokens: int, timeout: int | float = 120 """A wrapper for a single request that can be explicitly cancelled.
) -> requests.Response:
"""Send a completion request to the frontend""" Each instance supports only one post() call and should not be reused.
"""
# Class-level tracking for thread-safe socket monitoring
_socket_tracking_lock = threading.Lock()
_socket_trackers: Dict[
Any, Any
] = {} # Maps thread ID to CancellableRequest instance
_original_socket: Callable[..., Any] = socket.socket
@classmethod
def _global_tracked_socket(
cls, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None
):
"""Global socket tracker that routes to the appropriate CancellableRequest instance"""
sock = cls._original_socket(family, type, proto, fileno)
# Find which CancellableRequest should track this socket
thread_id = threading.current_thread().ident
with cls._socket_tracking_lock:
tracker = cls._socket_trackers.get(thread_id)
if tracker:
tracker._active_sockets.append(sock)
return sock
def __init__(self):
self.session = requests.Session()
self.response = None
self.exception = None
self._cancelled = False
self._request_thread = None
self._lock = threading.Lock()
self._active_sockets = []
def post(self, *args, **kwargs):
"""Start a POST request in a separate thread. Can only be called once."""
def make_request():
thread_id = threading.current_thread().ident
# Register this thread's tracker
with self.__class__._socket_tracking_lock:
self.__class__._socket_trackers[thread_id] = self
# Install global monkey-patch if not already installed
if socket.socket != self.__class__._global_tracked_socket:
socket.socket = self.__class__._global_tracked_socket # type: ignore[assignment,misc]
try:
self.response = self.session.post(*args, **kwargs)
except Exception as e:
self.exception = e
finally:
# Unregister this thread's tracker
with self.__class__._socket_tracking_lock:
self.__class__._socket_trackers.pop(thread_id, None)
# Only restore original socket if no other trackers are active
if (
not self.__class__._socket_trackers
and socket.socket == self.__class__._global_tracked_socket
):
socket.socket = self.__class__._original_socket # type: ignore[assignment,misc]
with self._lock:
if self._request_thread is not None:
raise RuntimeError(
"This CancellableRequest instance has already been used. Create a new instance."
)
self._request_thread = threading.Thread(target=make_request)
self._request_thread.start()
def cancel(self):
"""Cancel the request by forcefully closing the underlying TCP socket"""
with self._lock:
if self._cancelled:
return
self._cancelled = True
# Do the cleanup outside the lock to avoid holding it during I/O operations
# Force close all tracked sockets (this is the actual TCP connection)
for sock in self._active_sockets:
# Set socket to non-blocking to avoid hanging
try:
sock.setblocking(0)
except Exception as e:
logger.warning(f"Failed to set socket to non-blocking: {e}")
# Force shutdown both send and receive
try:
sock.shutdown(socket.SHUT_RDWR)
except Exception as e:
logger.warning(f"Failed to shutdown socket: {e}")
# Close the socket
try:
sock.close()
except Exception as e:
logger.warning(f"Failed to close socket: {e}")
self._active_sockets.clear()
# Also close at the requests level for cleanup
if self.response:
self.response.close()
for adapter in self.session.adapters.values():
adapter.close()
self.session.close()
def get_response(self):
"""Get the response or raise exception if there was one"""
if self._cancelled:
raise requests.exceptions.RequestException("Request was cancelled")
if self.exception:
raise self.exception
return self.response
def send_completion_request(prompt: str, max_tokens: int) -> CancellableRequest:
"""Send a completion request to the frontend
Args:
prompt: The prompt for completion
max_tokens: Maximum tokens to generate
Returns:
A CancellableRequest object that can be explicitly cancelled
"""
payload = { payload = {
"model": FAULT_TOLERANCE_MODEL_NAME, "model": FAULT_TOLERANCE_MODEL_NAME,
"prompt": prompt, "prompt": prompt,
...@@ -62,28 +189,29 @@ def send_completion_request( ...@@ -62,28 +189,29 @@ def send_completion_request(
f"Sending completion request with prompt: '{prompt[:50]}...' and max_tokens: {max_tokens}" f"Sending completion request with prompt: '{prompt[:50]}...' and max_tokens: {max_tokens}"
) )
session = requests.Session() # Return a cancellable request object
try: cancellable_req = CancellableRequest()
response = session.post( cancellable_req.post(
f"http://localhost:{FRONTEND_PORT}/v1/completions", f"http://localhost:{FRONTEND_PORT}/v1/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout,
) )
logger.info(f"Received response with status code: {response.status_code}") return cancellable_req
return response
except requests.exceptions.Timeout:
logger.error(f"Request timed out after {timeout} seconds")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Request failed with error: {e}")
raise
def send_chat_completion_request( def send_chat_completion_request(
prompt: str, max_tokens: int, timeout: int | float = 120, stream: bool = False prompt: str, max_tokens: int, stream: bool = False
) -> requests.Response: ) -> CancellableRequest:
"""Send a chat completion request to the frontend""" """Send a chat completion request to the frontend
Args:
prompt: The prompt for chat completion
max_tokens: Maximum tokens to generate
stream: Whether to stream the response
Returns:
A CancellableRequest object that can be explicitly cancelled
"""
payload = { payload = {
"model": FAULT_TOLERANCE_MODEL_NAME, "model": FAULT_TOLERANCE_MODEL_NAME,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
...@@ -97,67 +225,77 @@ def send_chat_completion_request( ...@@ -97,67 +225,77 @@ def send_chat_completion_request(
f"Sending chat completion request (stream={stream}) with prompt: '{prompt[:50]}...' and max_tokens: {max_tokens}" f"Sending chat completion request (stream={stream}) with prompt: '{prompt[:50]}...' and max_tokens: {max_tokens}"
) )
session = requests.Session() # Return a cancellable request object
try: cancellable_req = CancellableRequest()
response = session.post( cancellable_req.post(
f"http://localhost:{FRONTEND_PORT}/v1/chat/completions", f"http://localhost:{FRONTEND_PORT}/v1/chat/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout,
stream=stream, stream=stream,
) )
logger.info(f"Received response with status code: {response.status_code}") return cancellable_req
return response
except requests.exceptions.Timeout:
logger.error(f"Request timed out after {timeout} seconds")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Request failed with error: {e}")
raise
def send_request_and_cancel( def send_cancellable_request(
request_type: str = "completion", request_type: str = "completion",
timeout: int | float = 1,
use_long_prompt: bool = False, use_long_prompt: bool = False,
): ) -> CancellableRequest:
"""Send a request with short timeout to trigger cancellation""" """Send a request that can be manually cancelled.
logger.info(f"Sending {request_type} request to be cancelled...")
Args:
request_type: Type of request - "completion", "chat_completion", or "chat_completion_stream"
use_long_prompt: Whether to use an extremely long prompt
Returns:
A CancellableRequest object that can be explicitly cancelled
"""
prompt = "Tell me a very long and detailed story about the history of artificial intelligence, including all major milestones, researchers, and breakthroughs?" prompt = "Tell me a very long and detailed story about the history of artificial intelligence, including all major milestones, researchers, and breakthroughs?"
if use_long_prompt: if use_long_prompt:
prompt += " Make sure it is" + " long" * 8000 + "!" prompt += " Make sure it is" + " long" * 8000 + "!"
try:
if request_type == "completion": if request_type == "completion":
response = send_completion_request(prompt, 8000, timeout) return send_completion_request(prompt, 8192)
elif request_type == "chat_completion": elif request_type == "chat_completion":
response = send_chat_completion_request(prompt, 8000, timeout, False) return send_chat_completion_request(prompt, 8192, stream=False)
elif request_type == "chat_completion_stream": elif request_type == "chat_completion_stream":
response = send_chat_completion_request(prompt, 8000, timeout, True) return send_chat_completion_request(prompt, 8192, stream=True)
# Read a few responses and then disconnect
if response.status_code == 200:
itr_count, max_itr = 0, 5
try:
for res in response.iter_lines():
logger.info(f"Received response {itr_count + 1}: {res[:50]}...")
itr_count += 1
if itr_count >= max_itr:
break
time.sleep(0.1)
except Exception as e:
pytest.fail(f"Stream reading failed: {e}")
response.close()
raise Exception("Closed response")
else: else:
pytest.fail(f"Unknown request type: {request_type}") raise ValueError(f"Unknown request type: {request_type}")
def read_streaming_responses(
cancellable_req: CancellableRequest,
expected_count: int = 5,
) -> None:
"""Read a specific number of responses from a streaming request.
Args:
cancellable_req: The CancellableRequest object with an active stream
expected_count: Number of responses to read before returning
Raises:
pytest.fail if stream ends before expected_count responses
"""
response = cancellable_req.get_response()
if not response or response.status_code != 200:
pytest.fail( pytest.fail(
f"{request_type} request completed unexpectedly - should have been cancelled" f"Failed to get streaming response: status_code={response.status_code if response else 'None'}"
)
response_count = 0
for line in response.iter_lines():
response_count += 1
logger.info(
f"Received streaming response {response_count}: {line.decode()[:100]}"
)
if response_count >= expected_count:
logger.info(f"Successfully read {response_count} responses")
return
# If we get here, stream ended too early
pytest.fail(
f"Stream ended after only {response_count} lines - expected to read at least {expected_count}"
) )
except Exception as e:
logger.info(f"{request_type} request was cancelled: {e}")
def read_log_content(log_path: str | None) -> str: def read_log_content(log_path: str | None) -> str:
...@@ -176,3 +314,78 @@ def strip_ansi_codes(text: str) -> str: ...@@ -176,3 +314,78 @@ def strip_ansi_codes(text: str) -> str:
"""Remove ANSI color codes from text""" """Remove ANSI color codes from text"""
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
return ansi_escape.sub("", text) return ansi_escape.sub("", text)
def poll_for_pattern(
process: ManagedProcess,
pattern: str,
log_offset: int = 0,
max_wait_ms: int = 500,
poll_interval_ms: int = 5,
match_type: str = "endswith", # "contains" or "endswith"
) -> tuple[str, int]:
"""
Poll process log for a specific pattern.
Args:
process: The process to monitor logs from
pattern: The pattern to search for
log_offset: Offset in the log to start reading from
max_wait_ms: Maximum time to wait for the pattern in milliseconds
poll_interval_ms: Interval between polls in milliseconds
match_type: How to match the pattern - "contains" or "endswith"
Returns:
Tuple of (matched_content, new_log_offset) where matched_content is:
- For "contains": everything after the pattern on the same line
- For "endswith": empty string (since nothing follows)
"""
max_iterations = max_wait_ms // poll_interval_ms
iteration = 0
current_offset = log_offset
logger.info(
f"Starting to poll for '{pattern}' pattern (max {max_iterations} iterations)..."
)
while iteration < max_iterations:
# Read the process log
log_content = read_log_content(process._log_path)
new_content = log_content[current_offset:]
# Look for the pattern
for line in new_content.split("\n"):
clean_line = strip_ansi_codes(line).strip()
matched = False
result = ""
if match_type == "contains" and pattern in clean_line:
# Find the pattern and return everything after it
idx = clean_line.rfind(pattern) # Use rfind to get last occurrence
if idx != -1:
result = clean_line[idx + len(pattern) :].strip()
matched = True
elif match_type == "endswith" and clean_line.endswith(pattern):
# Pattern is at the end, nothing follows
result = ""
matched = True
if matched:
logger.info(f"Found pattern '{pattern}' at iteration {iteration}")
if match_type == "contains":
logger.info(f"Content after pattern: '{result}'")
# Update offset to current position
current_offset = len(log_content)
return result, current_offset
# Update offset for next poll
current_offset = len(log_content)
# Wait before next poll
time.sleep(poll_interval_ms / 1000.0)
iteration += 1
pytest.fail(
f"Failed to find '{pattern}' pattern after {max_iterations} iterations ({max_wait_ms}ms)"
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment