# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """ Load generation script for SLA planner scaling tests. This script uses aiperf to generate load at specific request rates to test the planner's scaling behavior. """ import argparse import asyncio import json import logging import os import signal import tempfile import time from typing import Any, Dict, List, Optional logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) class LoadGenerator: """Generate load using aiperf to test planner scaling.""" def __init__( self, base_url: str = "http://localhost:8000", model: str = "nvidia/Llama-3.1-8B-Instruct-FP8", isl: int = 4000, osl: int = 150, save_results: bool = False, ): self.base_url = base_url self.model = model self.isl = isl self.osl = osl self.save_results = save_results def _calculate_aiperf_params( self, req_per_sec: float, ) -> Dict[str, Any]: """ Calculate aiperf parameters to approximate desired request rate. Args: req_per_sec: Desired requests per second duration_sec: Test duration in seconds estimated_request_duration: Estimated average request duration in seconds Returns: Dictionary with concurrency and request_rate parameters """ concurrency = max(1, int(req_per_sec * 3)) return { "concurrency": concurrency, "request_rate": req_per_sec, } async def generate_load( self, req_per_sec: float, duration_sec: int, artifact_dir: Optional[str] = None ) -> Dict[str, Any]: """ Generate load at specified request rate for given duration. Args: req_per_sec: Target requests per second duration_sec: Duration to generate load (seconds) artifact_dir: Directory to store aiperf artifacts Returns: Dictionary with load test results """ logger.info(f"Generating load: {req_per_sec} req/s for {duration_sec}s") # Calculate aiperf parameters params = self._calculate_aiperf_params(req_per_sec) logger.info(f"Using request_rate={params['request_rate']} req/s") # Create artifact directory if not provided if artifact_dir is None: artifact_dir = tempfile.mkdtemp(prefix="scaling_test_") os.makedirs(artifact_dir, exist_ok=True) # Drive test length by caller-provided duration request_count = max(1, int(params["request_rate"] * duration_sec)) logger.info( f"Adjusted parameters: duration={duration_sec}s, request_count={request_count}" ) # Build aiperf command based on coworker's successful approach cmd = [ "aiperf", "profile", "--model", self.model, "--tokenizer", self.model, "--endpoint-type", "chat", "--url", self.base_url.replace("http://", ""), "--streaming", "--synthetic-input-tokens-mean", str(self.isl), "--output-tokens-mean", str(self.osl), "--request-rate", str(params["request_rate"]), "--request-count", str(request_count), # Use request count to limit test duration "--num-dataset-entries", str( max(20, int(params["request_rate"] * 10)) ), # Generate reasonable dataset size "--artifact-dir", artifact_dir, "-v", ] logger.info(f"Running command: {' '.join(cmd)}") logger.info( f"Expected duration: {duration_sec}s, timeout: {max(duration_sec * 2 + 120, int(duration_sec * 2.5))}s" ) # Run aiperf (async) start_time = time.time() # More generous timeout for high-load tests - allow 2x duration + 2 minutes buffer timeout = max(duration_sec * 2 + 120, int(duration_sec * 2.5)) # Write stdout/stderr to files instead of using PIPE. aiperf may fork # child processes that inherit pipe FDs; if those children outlive aiperf, # communicate() blocks forever waiting for EOF. File-based output avoids # this entirely. We also run aiperf in its own process group so that # os.killpg() can clean up the entire tree on timeout. stdout_path = os.path.join(artifact_dir, "aiperf.stdout.log") stderr_path = os.path.join(artifact_dir, "aiperf.stderr.log") try: with open(stdout_path, "wb") as stdout_f, open( stderr_path, "wb" ) as stderr_f: proc = await asyncio.create_subprocess_exec( *cmd, stdout=stdout_f, stderr=stderr_f, start_new_session=True, ) try: await asyncio.wait_for(proc.wait(), timeout=timeout) except asyncio.TimeoutError: try: os.killpg(proc.pid, signal.SIGKILL) except ProcessLookupError: pass await proc.wait() logger.error("aiperf timed out") raise RuntimeError("Load generation timed out") end_time = time.time() actual_duration = end_time - start_time if proc.returncode == 0: logger.info("Load generation completed successfully") logger.info(f"Actual duration: {actual_duration:.2f}s") results = self._parse_aiperf_results(artifact_dir) results.update( { "requested_req_per_sec": req_per_sec, "actual_duration": actual_duration, "target_duration": duration_sec, "aiperf_params": params, "artifact_dir": artifact_dir, "success": True, } ) return results else: logger.error(f"aiperf failed with return code {proc.returncode}") raise RuntimeError("aiperf failed; see logs in artifact dir") except RuntimeError: raise except Exception as e: logger.error(f"aiperf execution error: {e}") raise def _parse_aiperf_results(self, artifact_dir: str) -> Dict[str, Any]: """Parse aiperf results from artifact directory.""" try: # Look for the profile_export_aiperf.json file json_files = [f for f in os.listdir(artifact_dir) if f.endswith(".json")] if not json_files: logger.warning("No JSON results found in artifact directory") return {} # Main results file results_file = None for json_file in json_files: if "profile_export" in json_file or "aiperf" in json_file: results_file = os.path.join(artifact_dir, json_file) break if not results_file: results_file = os.path.join(artifact_dir, json_files[0]) logger.info(f"Parsing results from: {results_file}") with open(results_file, "r") as f: metrics = json.load(f) results = { "throughput": metrics.get("output_token_throughput", {}).get("avg", 0), "ttft_mean": metrics.get("time_to_first_token", {}).get("avg", 0), "itl_mean": metrics.get("inter_token_latency", {}).get("avg", 0), "end_to_end_latency_mean": metrics.get("request_latency", {}).get( "avg", 0 ), } logger.info(f"Parsed results: {results}") return results except Exception as e: logger.warning(f"Failed to parse aiperf results: {e}") return {} async def run_scaling_test(self) -> Dict[str, Any]: """ Run a graduated scaling test for prefill scaling. Uses a conservative graduated approach: - Phase 1: 8 req/s (baseline, should maintain 1P1D) - Phase 2: 18 req/s (should trigger prefill scaling to 2P1D) Returns: Dictionary with complete test results """ logger.info( "Starting graduated prefill scaling test scenario (targeting 1P1D -> 2P1D)" ) logger.info("Using conservative graduated approach with metric generation") # Graduated test parameters (optimized for prefill scaling) phases: List[Dict[str, Any]] = [ {"rate": 8.0, "duration": 90, "name": "baseline"}, {"rate": 18.0, "duration": 120, "name": "prefill_scaling_trigger"}, ] transition_delay = 30 # Create artifact directory timestamp = int(time.time()) if self.save_results: script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) base_dir = os.path.join( script_dir, "e2e_scaling_results", f"scaling_test_{timestamp}" ) else: base_dir = f"/tmp/scaling_test_{timestamp}" os.makedirs(base_dir, exist_ok=True) logger.info(f"Saving results to: {base_dir}") results = { "test_timestamp": timestamp, "config": { "approach": "graduated_scaling", "phases": phases, "transition_delay": transition_delay, "isl": self.isl, "osl": self.osl, "model": self.model, }, } try: phase_results = {} for i, phase in enumerate(phases): phase_name = f"phase{i+1}_{phase['name']}" logger.info( f"Starting {phase_name}: {phase['rate']} req/s for {phase['duration']}s" ) phase_dir = os.path.join(base_dir, phase_name) phase_result = await self.generate_load( req_per_sec=phase["rate"], duration_sec=phase["duration"], artifact_dir=phase_dir, ) phase_results[phase_name] = phase_result # Add transition delay except after last phase if i < len(phases) - 1: logger.info(f"Transition delay: {transition_delay}s") await asyncio.sleep(transition_delay) results["phase_results"] = phase_results logger.info("Graduated scaling test completed successfully") except Exception as e: logger.error(f"Scaling test failed: {e}") results["error"] = str(e) raise # Save results results_file = os.path.join(base_dir, "scaling_test_results.json") with open(results_file, "w") as f: json.dump(results, f, indent=2) logger.info(f"Test results saved to: {results_file}") return results async def main(): """Main function for scaling test execution.""" parser = argparse.ArgumentParser( description="SLA Planner Graduated Scaling Test - Optimized for 2P1D prefill scaling" ) parser.add_argument( "--base-url", default="http://localhost:8000", help="Service URL (default: http://localhost:8000)", ) parser.add_argument( "--model", default="nvidia/Llama-3.1-8B-Instruct-FP8", help="Model name (default: nvidia/Llama-3.1-8B-Instruct-FP8)", ) parser.add_argument( "--isl", type=int, default=4000, help="Input sequence length - optimized for prefill scaling (default: 4000)", ) parser.add_argument( "--osl", type=int, default=150, help="Output sequence length - optimized for prefill scaling (default: 150)", ) parser.add_argument( "--save-results", action="store_true", help="Save results to tests/planner/e2e_scaling_results instead of /tmp", ) args = parser.parse_args() generator = LoadGenerator( base_url=args.base_url, model=args.model, isl=args.isl, osl=args.osl, save_results=args.save_results, ) print("Starting SLA Planner Graduated Scaling Test...") print(f"Parameters: ISL={args.isl}, OSL={args.osl}") print( "Test phases: 8 -> 15 -> 25 req/s (optimized for 1P1D -> 2P1D prefill scaling)" ) results = await generator.run_scaling_test() print("\n" + "=" * 60) print("SCALING TEST COMPLETED") print("=" * 60) # Print results summary phase_results = results.get("phase_results", {}) for phase_name, phase_data in phase_results.items(): ok = isinstance(phase_data, dict) and phase_data.get( "success", bool(phase_data) ) if ok: duration = phase_data.get("actual_duration") if isinstance(duration, (int, float)): print(f"{phase_name}: {duration:.1f}s duration - SUCCESS") else: print(f"{phase_name}: SUCCESS") else: print(f"{phase_name}: FAILED") print("\nResults saved to scaling test directory") if __name__ == "__main__": asyncio.run(main())