Unverified Commit 74a56676 authored by Paul Li's avatar Paul Li Committed by GitHub
Browse files

test: create a helper function to send request directly to mocker instances (#3269)


Signed-off-by: default avatarPaul Li <zhixiong2008@gmail.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent 16409eef
...@@ -277,6 +277,97 @@ async def send_inflight_requests(urls: list, payload: dict, num_requests: int): ...@@ -277,6 +277,97 @@ async def send_inflight_requests(urls: list, payload: dict, num_requests: int):
logger.info(f"All {num_requests} requests completed successfully") logger.info(f"All {num_requests} requests completed successfully")
async def send_request_via_python_kv_router(
kv_python_router: KvPushRouter,
token_ids: list,
initial_wait: float,
max_retries: int,
stop_conditions: Optional[dict] = None,
sampling_options: Optional[dict] = None,
output_options: Optional[dict] = None,
router_config_override: Optional[dict] = None,
worker_id: Optional[
int
] = None, # If None, Router will select the best available worker
):
"""Send a request to the specified mocker instance.
Returns True if mockers respond, otherwise raises or returns False.
"""
wait_time = initial_wait
log_message = (
f"the mocker with worker_id={worker_id}"
if worker_id is not None
else "the best available mocker"
)
# Retry loop sending reuqest to mocker worker with exponential backoff
for attempt in range(max_retries + 1):
try:
logger.info(f"Sending request to {log_message} (attempt {attempt + 1})")
stream = await kv_python_router.generate(
token_ids=token_ids,
model=MODEL_NAME,
stop_conditions=stop_conditions,
sampling_options=sampling_options,
output_options=output_options,
router_config_override=router_config_override,
# worker_id=worker_id,
)
if stream is not None:
logger.info(f"Request succeeded on attempt {attempt + 1}")
break
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
await asyncio.sleep(wait_time)
wait_time *= 2
else:
raise RuntimeError(
f"Failed to connect to mockers after {max_retries + 1} attempts: {e}"
)
# Collect tokens from the SSE stream
generated_tokens = []
async for response in stream:
if isinstance(response, dict):
# Check if response has token_ids
if "token_ids" in response:
tokens = response["token_ids"]
if isinstance(tokens, list):
generated_tokens.extend(tokens)
logger.debug(f"Received {len(tokens)} tokens: {tokens}")
# Check for finish reason
if "finish_reason" in response:
logger.info(f"Stream finished with reason: {response['finish_reason']}")
# Verify if expected number of tokens are generated if max_tokens specified and ignore_eos is True
logger.info(f"Total generated tokens: {len(generated_tokens)}")
if (
stop_conditions
and "max_tokens" in stop_conditions
and "ignore_eos" in stop_conditions
and stop_conditions["ignore_eos"]
):
max_tokens = int(stop_conditions["max_tokens"])
assert len(generated_tokens) == max_tokens, (
f"Expected exactly {max_tokens} tokens but got {len(generated_tokens)}. "
f"Tokens: {generated_tokens}"
)
logger.info(
f"Successfully verified {max_tokens} tokens generated as expected via KvPushRouter with ignore_eos=True"
)
return True
return False
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME) @pytest.mark.model(MODEL_NAME)
def test_mocker_kv_router(request, runtime_services, predownload_tokenizers): def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
...@@ -288,7 +379,7 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers): ...@@ -288,7 +379,7 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting mocker KV router test") logger.info("Starting mocker KV router test")
# Create mocker args dictionary # Create mocker args dictiona: FixtureRequestry: tuple[NatsServer, EtcdServer]: NoneType
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
try: try:
...@@ -340,7 +431,7 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers) ...@@ -340,7 +431,7 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers)
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting mocker two KV router test") logger.info("Starting mocker two KV router test")
# Create mocker args dictionary # Create mocker args dictionary: FixtureRequest: tuple[NatsServer, EtcdServer]: NoneType
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
kv_routers = [] kv_routers = []
...@@ -404,7 +495,6 @@ def test_mocker_kv_router_overload_503( ...@@ -404,7 +495,6 @@ def test_mocker_kv_router_overload_503(
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting mocker KV router overload test for 503 status") logger.info("Starting mocker KV router overload test for 503 status")
# Create mocker args dictionary with limited resources # Create mocker args dictionary with limited resources
mocker_args = { mocker_args = {
"speedup_ratio": 10, "speedup_ratio": 10,
...@@ -584,201 +674,118 @@ def test_kv_push_router_bindings(request, runtime_services, predownload_tokenize ...@@ -584,201 +674,118 @@ def test_kv_push_router_bindings(request, runtime_services, predownload_tokenize
logger.info(f"All mockers using endpoint: {mockers.endpoint}") logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__() mockers.__enter__()
# Wait for mockers to be ready by sending a dummy request with retry # Get runtime and create endpoint
async def wait_for_mockers_ready(): runtime = get_runtime()
"""Send a dummy request to ensure mockers are ready""" # Use the namespace from the mockers
runtime = get_runtime() namespace = runtime.namespace(mockers.namespace)
# Use the namespace from the mockers component = namespace.component("mocker")
namespace = runtime.namespace(mockers.namespace) endpoint = component.endpoint("generate")
component = namespace.component("mocker")
endpoint = component.endpoint("generate") # Create KvRouterConfig with default settings
kv_router_config = KvRouterConfig()
kv_router_config = KvRouterConfig()
kv_push_router = KvPushRouter( # Create KvPushRouter Python object
endpoint=endpoint, kv_push_router = KvPushRouter(
block_size=BLOCK_SIZE, endpoint=endpoint,
kv_router_config=kv_router_config, block_size=BLOCK_SIZE,
) kv_router_config=kv_router_config,
)
# Dummy request with minimal tokens
dummy_token_ids = [1, 2, 3] # Just a few tokens for testing
max_retries = 8
wait_time = 1
for attempt in range(max_retries + 1):
try:
logger.info(
f"Sending dummy request to check mocker readiness (attempt {attempt + 1})"
)
stream = await kv_push_router.generate(
token_ids=dummy_token_ids,
model=MODEL_NAME,
stop_conditions={"max_tokens": 1}, # Generate just 1 token
sampling_options={"temperature": 0.7},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
)
# Consume the stream to verify it works
token_count = 0
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
token_count += len(response["token_ids"])
logger.info(
f"Mockers are ready! Dummy request succeeded on attempt {attempt + 1}"
)
return True
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
await asyncio.sleep(wait_time)
wait_time *= 2 # Exponential backoff
else:
raise RuntimeError(
f"Failed to connect to mockers after {max_retries + 1} attempts"
)
return False
# Wait for mockers to be ready
asyncio.run(wait_for_mockers_ready())
# Run the async test
async def test_kv_push_router():
# Get runtime and create endpoint
runtime = get_runtime()
# Use the namespace from the mockers
namespace = runtime.namespace(mockers.namespace)
component = namespace.component("mocker")
endpoint = component.endpoint("generate")
# Create KvRouterConfig with default settings logger.info("Created KvPushRouter Python object")
kv_router_config = KvRouterConfig()
# Create KvPushRouter Python object # Initialize and check the readiness of the mockers by sending dummy request
kv_push_router = KvPushRouter( asyncio.run(
endpoint=endpoint, send_request_via_python_kv_router(
block_size=BLOCK_SIZE, kv_python_router=kv_push_router,
kv_router_config=kv_router_config, token_ids=[1, 2, 3],
initial_wait=1.0,
max_retries=8,
stop_conditions={"max_tokens": 1}, # Generate just 1 token
sampling_options={"temperature": 0.7},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
) )
)
logger.info("Created KvPushRouter Python object") # Generate random token IDs (100 to 200 tokens)
num_input_tokens = random.randint(100, 200)
# Generate random token IDs (100 to 200 tokens) token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)]
num_input_tokens = random.randint(100, 200)
token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)]
logger.info(f"Generated {num_input_tokens} random token IDs")
# Set up generation parameters
stop_conditions = {
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 20, # Generate exactly 20 tokens
}
sampling_options = {"temperature": 0.7, "top_p": 0.9}
output_options = {"include_input_tokens": False, "return_full_text": False} # Set up override parameters
router_config_override = {
"overlap_score_weight": 0.5, # Override the default weight
"router_temperature": 0.5, # Override the default temperature
}
# Test with router config overrides logger.info(f"Generated {num_input_tokens} random token IDs")
router_config_override = {
"overlap_score_weight": 0.5, # Override the default weight
"router_temperature": 0.5, # Override the default temperature
}
# Call generate method # Test with full overrides
logger.info( logger.info(
"Calling generate method on KvPushRouter with router config overrides" f"Testing with full router config overrides: {router_config_override}"
) )
logger.info(f"Router config overrides: {router_config_override}") asyncio.run(
stream = await kv_push_router.generate( send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=token_ids, token_ids=token_ids,
model=MODEL_NAME, initial_wait=1.0,
stop_conditions=stop_conditions, max_retries=8,
sampling_options=sampling_options, stop_conditions={
output_options=output_options, "ignore_eos": True, # Don't stop on EOS token
"max_tokens": 20, # Generate exactly 20 tokens
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
router_config_override=router_config_override, router_config_override=router_config_override,
) )
)
# Collect tokens from the SSE stream # Test without overrides
generated_tokens = [] logger.info("Testing without router config overrides")
async for response in stream: asyncio.run(
if isinstance(response, dict): send_request_via_python_kv_router(
# Check if response has token_ids kv_python_router=kv_push_router,
if "token_ids" in response: token_ids=token_ids[:50], # Use fewer tokens for second test,
tokens = response["token_ids"] initial_wait=1.0,
if isinstance(tokens, list): max_retries=8,
generated_tokens.extend(tokens) stop_conditions={
logger.debug(f"Received {len(tokens)} tokens: {tokens}") "ignore_eos": True, # Don't stop on EOS token
"max_tokens": 10, # Generate exactly 10 tokens for the second test
# Check for finish reason },
if "finish_reason" in response: sampling_options={"temperature": 0.7, "top_p": 0.9},
logger.info( output_options={
f"Stream finished with reason: {response['finish_reason']}" "include_input_tokens": False,
) "return_full_text": False,
},
# Verify we got exactly 20 tokens
logger.info(f"Total generated tokens: {len(generated_tokens)}")
assert len(generated_tokens) == 20, (
f"Expected exactly 20 tokens but got {len(generated_tokens)}. "
f"Tokens: {generated_tokens}"
)
logger.info(
"Successfully verified 20 tokens generated via KvPushRouter with overrides"
)
# Test again without overrides
logger.info("Testing again without router config overrides")
stream = await kv_push_router.generate(
token_ids=token_ids[:50], # Use fewer tokens for second test
model=MODEL_NAME,
stop_conditions={"max_tokens": 10},
sampling_options=sampling_options,
output_options=output_options,
# No router_config_override this time # No router_config_override this time
) )
)
generated_tokens_no_override = [] # Test with partial override (only temperature)
async for response in stream: partial_override = {"router_temperature": 0.1}
if isinstance(response, dict) and "token_ids" in response: logger.info(f"Testing with partial router config overrides: {partial_override}")
generated_tokens_no_override.extend(response["token_ids"]) asyncio.run(
send_request_via_python_kv_router(
assert ( kv_python_router=kv_push_router,
len(generated_tokens_no_override) == 10 token_ids=token_ids[:30], # Use fewer tokens for third test,
), f"Expected 10 tokens but got {len(generated_tokens_no_override)}" initial_wait=1.0,
logger.info("Successfully verified generation without overrides") max_retries=8,
stop_conditions={
# Test with partial override (only temperature) "ignore_eos": True, # Don't stop on EOS token
logger.info( "max_tokens": 5, # Generate exactly 5 tokens for the third test
"Testing with partial router config override (temperature only)" },
) sampling_options={"temperature": 0.7, "top_p": 0.9},
partial_override = {"router_temperature": 0.1} output_options={
stream = await kv_push_router.generate( "include_input_tokens": False,
token_ids=token_ids[:30], # Use even fewer tokens "return_full_text": False,
model=MODEL_NAME, },
stop_conditions={"max_tokens": 5},
sampling_options=sampling_options,
output_options=output_options,
router_config_override=partial_override, router_config_override=partial_override,
) )
)
generated_tokens_partial = []
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
generated_tokens_partial.extend(response["token_ids"])
assert (
len(generated_tokens_partial) == 5
), f"Expected 5 tokens but got {len(generated_tokens_partial)}"
logger.info("Successfully verified generation with partial override")
# Run the async test
asyncio.run(test_kv_push_router())
logger.info("KvPushRouter bindings test completed successfully") logger.info("KvPushRouter bindings test completed successfully")
...@@ -799,7 +806,7 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers): ...@@ -799,7 +806,7 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting indexers sync test") logger.info("Starting indexers sync test")
# Create mocker args dictionary # Create mocker args dicti: FixtureRequestonary: tuple[NatsServer, EtcdServer]: NoneType
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
try: try:
...@@ -1028,7 +1035,7 @@ def test_query_instance_id_returns_worker_and_tokens( ...@@ -1028,7 +1035,7 @@ def test_query_instance_id_returns_worker_and_tokens(
1. NOT route the request to a worker immediately 1. NOT route the request to a worker immediately
2. Return worker_instance_id as an SSE event 2. Return worker_instance_id as an SSE event
3. Return token_data as an SSE event containing the request tokens 3. Return token_data as an SSE event containing the request tokens
4. Terminate the stream with [DONE] 4. Term: FixtureRequestinate the stream w: tuple[NatsServer, EtcdServer]ith [DONE]: NoneType
This tests the specific code block: This tests the specific code block:
if query_instance_id { if query_instance_id {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment