Unverified Commit e63d728f authored by ryan-lempka's avatar ryan-lempka Committed by GitHub
Browse files

fix: use random port assignment for http tests (#2472)

parent 844f8819
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
/// Get a random available port for testing (prefer to hardcoding port numbers to avoid collisions)
pub async fn get_random_port() -> u16 {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind ephemeral port");
let port = listener
.local_addr()
.expect("failed to read local_addr")
.port();
drop(listener);
port
}
...@@ -46,11 +46,14 @@ use dynamo_runtime::{ ...@@ -46,11 +46,14 @@ use dynamo_runtime::{
use futures::StreamExt; use futures::StreamExt;
use prometheus::{proto::MetricType, Registry}; use prometheus::{proto::MetricType, Registry};
use reqwest::StatusCode; use reqwest::StatusCode;
use rstest::*;
use std::{io::Cursor, sync::Arc}; use std::{io::Cursor, sync::Arc};
use tokio::time::timeout; use tokio::time::timeout;
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
#[path = "common/ports.rs"]
mod ports;
use ports::get_random_port;
struct CounterEngine {} struct CounterEngine {}
// Add a new long-running test engine // Add a new long-running test engine
...@@ -270,8 +273,9 @@ fn inc_counter( ...@@ -270,8 +273,9 @@ fn inc_counter(
#[allow(deprecated)] #[allow(deprecated)]
#[tokio::test] #[tokio::test]
async fn test_http_service() { async fn test_http_service() {
let port = get_random_port().await;
let service = HttpService::builder() let service = HttpService::builder()
.port(8989) .port(port)
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
.build() .build()
...@@ -335,7 +339,7 @@ async fn test_http_service() { ...@@ -335,7 +339,7 @@ async fn test_http_service() {
request.max_tokens = Some(3000); request.max_tokens = Some(3000);
let response = client let response = client
.post("http://localhost:8989/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -415,7 +419,7 @@ async fn test_http_service() { ...@@ -415,7 +419,7 @@ async fn test_http_service() {
request.max_tokens = Some(0); request.max_tokens = Some(0);
let future = client let future = client
.post("http://localhost:8989/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&request) .json(&request)
.send(); .send();
...@@ -440,7 +444,7 @@ async fn test_http_service() { ...@@ -440,7 +444,7 @@ async fn test_http_service() {
request.stream = Some(true); request.stream = Some(true);
let response = client let response = client
.post("http://localhost:8989/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -461,7 +465,7 @@ async fn test_http_service() { ...@@ -461,7 +465,7 @@ async fn test_http_service() {
request.stream = Some(false); request.stream = Some(false);
let response = client let response = client
.post("http://localhost:8989/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -486,7 +490,7 @@ async fn test_http_service() { ...@@ -486,7 +490,7 @@ async fn test_http_service() {
.unwrap(); .unwrap();
let response = client let response = client
.post("http://localhost:8989/v1/completions") .post(format!("http://localhost:{}/v1/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -507,7 +511,7 @@ async fn test_http_service() { ...@@ -507,7 +511,7 @@ async fn test_http_service() {
request.stream = Some(true); request.stream = Some(true);
let response = client let response = client
.post("http://localhost:8989/v1/completions") .post(format!("http://localhost:{}/v1/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -529,7 +533,7 @@ async fn test_http_service() { ...@@ -529,7 +533,7 @@ async fn test_http_service() {
request.stream = Some(false); request.stream = Some(false);
let response = client let response = client
.post("http://localhost:8989/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -544,7 +548,7 @@ async fn test_http_service() { ...@@ -544,7 +548,7 @@ async fn test_http_service() {
// =========== Query /metrics endpoint =========== // =========== Query /metrics endpoint ===========
let response = client let response = client
.get("http://localhost:8989/metrics") .get(format!("http://localhost:{}/metrics", port))
.send() .send()
.await .await
.unwrap(); .unwrap();
...@@ -573,10 +577,8 @@ async fn wait_for_service_ready(port: u16) { ...@@ -573,10 +577,8 @@ async fn wait_for_service_ready(port: u16) {
} }
} }
#[fixture] async fn service_with_engines() -> (HttpService, Arc<CounterEngine>, Arc<AlwaysFailEngine>, u16) {
fn service_with_engines( let port = get_random_port().await;
#[default(8990)] port: u16,
) -> (HttpService, Arc<CounterEngine>, Arc<AlwaysFailEngine>) {
let service = HttpService::builder() let service = HttpService::builder()
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
...@@ -598,11 +600,10 @@ fn service_with_engines( ...@@ -598,11 +600,10 @@ fn service_with_engines(
.add_completions_model("bar", failure.clone()) .add_completions_model("bar", failure.clone())
.unwrap(); .unwrap();
(service, counter, failure) (service, counter, failure, port)
} }
#[fixture] fn pure_openai_client(port: u16) -> PureOpenAIClient {
fn pure_openai_client(#[default(8990)] port: u16) -> PureOpenAIClient {
let config = HttpClientConfig { let config = HttpClientConfig {
openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)), openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)),
verbose: false, verbose: false,
...@@ -610,8 +611,7 @@ fn pure_openai_client(#[default(8990)] port: u16) -> PureOpenAIClient { ...@@ -610,8 +611,7 @@ fn pure_openai_client(#[default(8990)] port: u16) -> PureOpenAIClient {
PureOpenAIClient::new(config) PureOpenAIClient::new(config)
} }
#[fixture] fn nv_custom_client(port: u16) -> NvCustomClient {
fn nv_custom_client(#[default(8991)] port: u16) -> NvCustomClient {
let config = HttpClientConfig { let config = HttpClientConfig {
openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)), openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)),
verbose: false, verbose: false,
...@@ -619,8 +619,7 @@ fn nv_custom_client(#[default(8991)] port: u16) -> NvCustomClient { ...@@ -619,8 +619,7 @@ fn nv_custom_client(#[default(8991)] port: u16) -> NvCustomClient {
NvCustomClient::new(config) NvCustomClient::new(config)
} }
#[fixture] fn generic_byot_client(port: u16) -> GenericBYOTClient {
fn generic_byot_client(#[default(8992)] port: u16) -> GenericBYOTClient {
let config = HttpClientConfig { let config = HttpClientConfig {
openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)), openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)),
verbose: false, verbose: false,
...@@ -628,13 +627,11 @@ fn generic_byot_client(#[default(8992)] port: u16) -> GenericBYOTClient { ...@@ -628,13 +627,11 @@ fn generic_byot_client(#[default(8992)] port: u16) -> GenericBYOTClient {
GenericBYOTClient::new(config) GenericBYOTClient::new(config)
} }
#[rstest]
#[tokio::test] #[tokio::test]
async fn test_pure_openai_client( async fn test_pure_openai_client() {
#[with(8990)] service_with_engines: (HttpService, Arc<CounterEngine>, Arc<AlwaysFailEngine>), let (service, _counter, _failure, port) = service_with_engines().await;
#[with(8990)] pure_openai_client: PureOpenAIClient, let pure_openai_client = pure_openai_client(port);
) {
let (service, _counter, _failure) = service_with_engines;
let token = CancellationToken::new(); let token = CancellationToken::new();
let cancel_token = token.clone(); let cancel_token = token.clone();
...@@ -642,7 +639,7 @@ async fn test_pure_openai_client( ...@@ -642,7 +639,7 @@ async fn test_pure_openai_client(
let task = tokio::spawn(async move { service.run(token).await }); let task = tokio::spawn(async move { service.run(token).await });
// Wait for service to be ready // Wait for service to be ready
wait_for_service_ready(8990).await; wait_for_service_ready(port).await;
// Test successful streaming request // Test successful streaming request
let request = async_openai::types::CreateChatCompletionRequestArgs::default() let request = async_openai::types::CreateChatCompletionRequestArgs::default()
...@@ -739,13 +736,11 @@ async fn test_pure_openai_client( ...@@ -739,13 +736,11 @@ async fn test_pure_openai_client(
task.await.unwrap().unwrap(); task.await.unwrap().unwrap();
} }
#[rstest]
#[tokio::test] #[tokio::test]
async fn test_nv_custom_client( async fn test_nv_custom_client() {
#[with(8991)] service_with_engines: (HttpService, Arc<CounterEngine>, Arc<AlwaysFailEngine>), let (service, _counter, _failure, port) = service_with_engines().await;
#[with(8991)] nv_custom_client: NvCustomClient, let nv_custom_client = nv_custom_client(port);
) {
let (service, _counter, _failure) = service_with_engines;
let token = CancellationToken::new(); let token = CancellationToken::new();
let cancel_token = token.clone(); let cancel_token = token.clone();
...@@ -753,7 +748,7 @@ async fn test_nv_custom_client( ...@@ -753,7 +748,7 @@ async fn test_nv_custom_client(
let task = tokio::spawn(async move { service.run(token).await }); let task = tokio::spawn(async move { service.run(token).await });
// Wait for service to be ready // Wait for service to be ready
wait_for_service_ready(8991).await; wait_for_service_ready(port).await;
// Test successful streaming request // Test successful streaming request
let inner_request = async_openai::types::CreateChatCompletionRequestArgs::default() let inner_request = async_openai::types::CreateChatCompletionRequestArgs::default()
...@@ -868,13 +863,11 @@ async fn test_nv_custom_client( ...@@ -868,13 +863,11 @@ async fn test_nv_custom_client(
task.await.unwrap().unwrap(); task.await.unwrap().unwrap();
} }
#[rstest]
#[tokio::test] #[tokio::test]
async fn test_generic_byot_client( async fn test_generic_byot_client() {
#[with(8992)] service_with_engines: (HttpService, Arc<CounterEngine>, Arc<AlwaysFailEngine>), let (service, _counter, _failure, port) = service_with_engines().await;
#[with(8992)] generic_byot_client: GenericBYOTClient, let generic_byot_client = generic_byot_client(port);
) {
let (service, _counter, _failure) = service_with_engines;
let token = CancellationToken::new(); let token = CancellationToken::new();
let cancel_token = token.clone(); let cancel_token = token.clone();
...@@ -882,7 +875,7 @@ async fn test_generic_byot_client( ...@@ -882,7 +875,7 @@ async fn test_generic_byot_client(
let task = tokio::spawn(async move { service.run(token).await }); let task = tokio::spawn(async move { service.run(token).await });
// Wait for service to be ready // Wait for service to be ready
wait_for_service_ready(8992).await; wait_for_service_ready(port).await;
// Test successful streaming request // Test successful streaming request
let request = serde_json::json!({ let request = serde_json::json!({
...@@ -965,13 +958,13 @@ async fn test_generic_byot_client( ...@@ -965,13 +958,13 @@ async fn test_generic_byot_client(
task.await.unwrap().unwrap(); task.await.unwrap().unwrap();
} }
#[rstest]
#[tokio::test] #[tokio::test]
async fn test_client_disconnect_cancellation_unary() { async fn test_client_disconnect_cancellation_unary() {
let port = get_random_port().await;
let service = HttpService::builder() let service = HttpService::builder()
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
.port(8993) .port(port)
.build() .build()
.unwrap(); .unwrap();
let state = service.state_clone(); let state = service.state_clone();
...@@ -984,7 +977,7 @@ async fn test_client_disconnect_cancellation_unary() { ...@@ -984,7 +977,7 @@ async fn test_client_disconnect_cancellation_unary() {
let task = tokio::spawn(async move { service.run(token).await }); let task = tokio::spawn(async move { service.run(token).await });
// Wait for service to be ready // Wait for service to be ready
wait_for_service_ready(8993).await; wait_for_service_ready(port).await;
// Create a long-running engine (10 seconds) // Create a long-running engine (10 seconds)
let long_running_engine = Arc::new(LongRunningEngine::new(10_000)); let long_running_engine = Arc::new(LongRunningEngine::new(10_000));
...@@ -1015,7 +1008,7 @@ async fn test_client_disconnect_cancellation_unary() { ...@@ -1015,7 +1008,7 @@ async fn test_client_disconnect_cancellation_unary() {
let request_future = async { let request_future = async {
client client
.post("http://localhost:8993/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -1054,15 +1047,15 @@ async fn test_client_disconnect_cancellation_unary() { ...@@ -1054,15 +1047,15 @@ async fn test_client_disconnect_cancellation_unary() {
task.await.unwrap().unwrap(); task.await.unwrap().unwrap();
} }
#[rstest]
#[tokio::test] #[tokio::test]
async fn test_client_disconnect_cancellation_streaming() { async fn test_client_disconnect_cancellation_streaming() {
dynamo_runtime::logging::init(); dynamo_runtime::logging::init();
let port = get_random_port().await;
let service = HttpService::builder() let service = HttpService::builder()
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
.port(8994) .port(port)
.build() .build()
.unwrap(); .unwrap();
let state = service.state_clone(); let state = service.state_clone();
...@@ -1075,7 +1068,7 @@ async fn test_client_disconnect_cancellation_streaming() { ...@@ -1075,7 +1068,7 @@ async fn test_client_disconnect_cancellation_streaming() {
let task = tokio::spawn(async move { service.run(token).await }); let task = tokio::spawn(async move { service.run(token).await });
// Wait for service to be ready // Wait for service to be ready
wait_for_service_ready(8994).await; wait_for_service_ready(port).await;
// Create a long-running engine (10 seconds) // Create a long-running engine (10 seconds)
let long_running_engine = Arc::new(LongRunningEngine::new(10_000)); let long_running_engine = Arc::new(LongRunningEngine::new(10_000));
...@@ -1106,7 +1099,7 @@ async fn test_client_disconnect_cancellation_streaming() { ...@@ -1106,7 +1099,7 @@ async fn test_client_disconnect_cancellation_streaming() {
let request_future = async { let request_future = async {
let response = client let response = client
.post("http://localhost:8994/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&request) .json(&request)
.send() .send()
.await .await
...@@ -1151,16 +1144,16 @@ async fn test_client_disconnect_cancellation_streaming() { ...@@ -1151,16 +1144,16 @@ async fn test_client_disconnect_cancellation_streaming() {
task.await.unwrap().unwrap(); task.await.unwrap().unwrap();
} }
#[rstest]
#[tokio::test] #[tokio::test]
async fn test_request_id_annotation() { async fn test_request_id_annotation() {
// TODO(ryan): make better fixtures, this is too much to test sometime so simple // TODO(ryan): make better fixtures, this is too much to test sometime so simple
dynamo_runtime::logging::init(); dynamo_runtime::logging::init();
let port = get_random_port().await;
let service = HttpService::builder() let service = HttpService::builder()
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
.port(8995) .port(port)
.build() .build()
.unwrap(); .unwrap();
let state = service.state_clone(); let state = service.state_clone();
...@@ -1173,7 +1166,7 @@ async fn test_request_id_annotation() { ...@@ -1173,7 +1166,7 @@ async fn test_request_id_annotation() {
let task = tokio::spawn(async move { service.run(token).await }); let task = tokio::spawn(async move { service.run(token).await });
// Wait for service to be ready // Wait for service to be ready
wait_for_service_ready(8995).await; wait_for_service_ready(port).await;
// Add a counter engine for this test // Add a counter engine for this test
let counter_engine = Arc::new(CounterEngine {}); let counter_engine = Arc::new(CounterEngine {});
...@@ -1205,7 +1198,7 @@ async fn test_request_id_annotation() { ...@@ -1205,7 +1198,7 @@ async fn test_request_id_annotation() {
// Make the streaming request with custom header // Make the streaming request with custom header
let response = client let response = client
.post("http://localhost:8995/v1/chat/completions") .post(format!("http://localhost:{}/v1/chat/completions", port))
.header("x-dynamo-request-id", request_uuid.to_string()) .header("x-dynamo-request-id", request_uuid.to_string())
.json(&request_json) .json(&request_json)
.send() .send()
......
...@@ -7,15 +7,20 @@ use dynamo_runtime::CancellationToken; ...@@ -7,15 +7,20 @@ use dynamo_runtime::CancellationToken;
use serial_test::serial; use serial_test::serial;
use std::{env, time::Duration}; use std::{env, time::Duration};
#[path = "common/ports.rs"]
mod ports;
use ports::get_random_port;
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn metrics_prefix_default_then_env_override() { async fn metrics_prefix_default_then_env_override() {
// Case 1: default prefix // Case 1: default prefix
env::remove_var(metrics::METRICS_PREFIX_ENV); env::remove_var(metrics::METRICS_PREFIX_ENV);
let svc1 = HttpService::builder().port(9101).build().unwrap(); let p1 = get_random_port().await;
let svc1 = HttpService::builder().port(p1).build().unwrap();
let token1 = CancellationToken::new(); let token1 = CancellationToken::new();
let _h1 = svc1.spawn(token1.clone()).await; let h1 = svc1.spawn(token1.clone()).await;
wait_for_metrics_ready(9101).await; wait_for_metrics_ready(p1).await;
// Populate labeled metrics // Populate labeled metrics
let s1 = svc1.state_clone(); let s1 = svc1.state_clone();
...@@ -26,7 +31,7 @@ async fn metrics_prefix_default_then_env_override() { ...@@ -26,7 +31,7 @@ async fn metrics_prefix_default_then_env_override() {
false, false,
); );
} }
let body1 = reqwest::get("http://localhost:9101/metrics") let body1 = reqwest::get(format!("http://localhost:{}/metrics", p1))
.await .await
.unwrap() .unwrap()
.text() .text()
...@@ -34,13 +39,15 @@ async fn metrics_prefix_default_then_env_override() { ...@@ -34,13 +39,15 @@ async fn metrics_prefix_default_then_env_override() {
.unwrap(); .unwrap();
assert!(body1.contains("dynamo_frontend_requests_total")); assert!(body1.contains("dynamo_frontend_requests_total"));
token1.cancel(); token1.cancel();
let _ = h1.await; // ensure port is released
// Case 2: env override to prefix // Case 2: env override to prefix
env::set_var(metrics::METRICS_PREFIX_ENV, "custom_prefix"); env::set_var(metrics::METRICS_PREFIX_ENV, "custom_prefix");
let svc2 = HttpService::builder().port(9102).build().unwrap(); let p2 = get_random_port().await;
let svc2 = HttpService::builder().port(p2).build().unwrap();
let token2 = CancellationToken::new(); let token2 = CancellationToken::new();
let _h2 = svc2.spawn(token2.clone()).await; let h2 = svc2.spawn(token2.clone()).await;
wait_for_metrics_ready(9102).await; wait_for_metrics_ready(p2).await;
// Populate labeled metrics // Populate labeled metrics
let s2 = svc2.state_clone(); let s2 = svc2.state_clone();
...@@ -50,7 +57,7 @@ async fn metrics_prefix_default_then_env_override() { ...@@ -50,7 +57,7 @@ async fn metrics_prefix_default_then_env_override() {
.create_inflight_guard("test-model", Endpoint::ChatCompletions, true); .create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
} }
// Single fetch and assert // Single fetch and assert
let body2 = reqwest::get("http://localhost:9102/metrics") let body2 = reqwest::get(format!("http://localhost:{}/metrics", p2))
.await .await
.unwrap() .unwrap()
.text() .text()
...@@ -59,13 +66,15 @@ async fn metrics_prefix_default_then_env_override() { ...@@ -59,13 +66,15 @@ async fn metrics_prefix_default_then_env_override() {
assert!(body2.contains("custom_prefix_requests_total")); assert!(body2.contains("custom_prefix_requests_total"));
assert!(!body2.contains("dynamo_frontend_requests_total")); assert!(!body2.contains("dynamo_frontend_requests_total"));
token2.cancel(); token2.cancel();
let _ = h2.await;
// Case 3: invalid env prefix is sanitized // Case 3: invalid env prefix is sanitized
env::set_var(metrics::METRICS_PREFIX_ENV, "nv-llm/http service"); env::set_var(metrics::METRICS_PREFIX_ENV, "nv-llm/http service");
let svc3 = HttpService::builder().port(9103).build().unwrap(); let p3 = get_random_port().await;
let svc3 = HttpService::builder().port(p3).build().unwrap();
let token3 = CancellationToken::new(); let token3 = CancellationToken::new();
let _h3 = svc3.spawn(token3.clone()).await; let h3 = svc3.spawn(token3.clone()).await;
wait_for_metrics_ready(9103).await; wait_for_metrics_ready(p3).await;
let s3 = svc3.state_clone(); let s3 = svc3.state_clone();
{ {
...@@ -73,7 +82,7 @@ async fn metrics_prefix_default_then_env_override() { ...@@ -73,7 +82,7 @@ async fn metrics_prefix_default_then_env_override() {
s3.metrics_clone() s3.metrics_clone()
.create_inflight_guard("test-model", Endpoint::ChatCompletions, true); .create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
} }
let body3 = reqwest::get("http://localhost:9103/metrics") let body3 = reqwest::get(format!("http://localhost:{}/metrics", p3))
.await .await
.unwrap() .unwrap()
.text() .text()
...@@ -82,6 +91,7 @@ async fn metrics_prefix_default_then_env_override() { ...@@ -82,6 +91,7 @@ async fn metrics_prefix_default_then_env_override() {
assert!(body3.contains("nv_llm_http_service_requests_total")); assert!(body3.contains("nv_llm_http_service_requests_total"));
assert!(!body3.contains("dynamo_frontend_requests_total")); assert!(!body3.contains("dynamo_frontend_requests_total"));
token3.cancel(); token3.cancel();
let _ = h3.await;
// Cleanup env to avoid leaking state // Cleanup env to avoid leaking state
env::remove_var(metrics::METRICS_PREFIX_ENV); env::remove_var(metrics::METRICS_PREFIX_ENV);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment