"lib/bindings/python/vscode:/vscode.git/clone" did not exist on "c5d9d267033ddb6ce26be9dc6f6a5b9aa9684c9b"
Unverified Commit 6216ae55 authored by jh-nv's avatar jh-nv Committed by GitHub
Browse files

chore: add mypy typing for profiler (#6863)

parent a96f5d73
......@@ -127,7 +127,7 @@ def _parse_args() -> tuple[DynamoGraphDeploymentRequestSpec, ProfilerOperational
return dgdr, ops
def main():
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
......
......@@ -56,8 +56,8 @@ async def run_interpolation(
isl: int,
osl: int,
sweep_max_context_length: int,
deployment_clients: list,
):
deployment_clients: list[DynamoDeploymentClient],
) -> None:
"""Generate interpolation curves for the planner based on sweep mode.
Takes the output disagg DGD config and uses ``convert_config`` to strip
......
......@@ -255,7 +255,7 @@ def _write_final_output(ops: ProfilerOperationalConfig, final_config: Any) -> bo
async def run_profile(
dgdr: DynamoGraphDeploymentRequestSpec,
ops: ProfilerOperationalConfig | None = None,
):
) -> None:
"""Run the profiling pipeline.
Args:
......
......@@ -75,17 +75,17 @@ def _get_common_aiperf_cmd(
def get_prefill_aiperf_cmd(
isl,
artifact_dir,
seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
tokenizer="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
osl=AIPERF_PREFILL_BENCHMARK_OSL,
base_url="http://localhost:8000",
isl: int,
artifact_dir: str,
seed: int = 100,
model: str = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
tokenizer: str = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
osl: int = AIPERF_PREFILL_BENCHMARK_OSL,
base_url: str = "http://localhost:8000",
concurrency: int = 1,
request_count: int = 1,
warmup_request_count: int = AIPERF_WARMUP_REQUEST_PER_DP_RANK,
):
) -> list[str]:
return _get_common_aiperf_cmd(
artifact_dir,
seed,
......@@ -114,16 +114,16 @@ def get_prefill_aiperf_cmd(
def get_decode_aiperf_cmd(
isl,
osl,
artifact_dir,
num_request,
seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
tokenizer="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
base_url="http://localhost:8000",
isl: int,
osl: int,
artifact_dir: str,
num_request: int,
seed: int = 100,
model: str = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
tokenizer: str = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
base_url: str = "http://localhost:8000",
warmup_request_count: int = AIPERF_WARMUP_REQUEST_PER_DP_RANK,
):
) -> list[str]:
return _get_common_aiperf_cmd(
artifact_dir,
seed,
......@@ -168,15 +168,15 @@ def get_aiperf_result(artifact_dir: str) -> dict:
def benchmark_prefill(
isl,
aiperf_artifact_dir,
model_name,
tokenizer,
base_url="http://localhost:8000",
isl: int,
aiperf_artifact_dir: str,
model_name: str,
tokenizer: str,
base_url: str = "http://localhost:8000",
concurrency: int = 1,
request_count: int = 1,
warmup_request_count: int = 3,
):
) -> Optional[dict]:
logger.info(f"Running aiperf with isl {isl}")
aiperf_cmd = get_prefill_aiperf_cmd(
isl,
......@@ -243,6 +243,7 @@ def get_prefill_ttft(
request_count=total_concurrency,
warmup_request_count=AIPERF_WARMUP_REQUEST_PER_DP_RANK * attention_dp_size,
)
assert aiperf_result is not None
try:
max_ttft = float(aiperf_result["time_to_first_token"]["max"])
# subtract the decoding time in-between prefill runs
......@@ -266,6 +267,7 @@ def get_prefill_ttft(
tokenizer,
base_url=base_url,
)
assert aiperf_result is not None
try:
return float(aiperf_result["time_to_first_token"]["avg"])
except (KeyError, TypeError, ValueError):
......@@ -311,15 +313,15 @@ def get_decode_itl_and_thpt_per_gpu(
def benchmark_decode(
isl,
osl,
num_request,
aiperf_artifact_dir,
model_name,
tokenizer,
base_url="http://localhost:8000",
isl: int,
osl: int,
num_request: int,
aiperf_artifact_dir: str,
model_name: str,
tokenizer: str,
base_url: str = "http://localhost:8000",
warmup_request_count: int = AIPERF_WARMUP_REQUEST_PER_DP_RANK,
):
) -> Optional[dict]:
logger.info(f"Profiling decode with num_request {num_request}...")
# first warm-up the engine by pre-computing all prefill tokens
......
......@@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import logging
import math
import shlex
from typing import Optional
from typing import Any, Optional
from pydantic import BaseModel
......@@ -59,6 +61,7 @@ class Service(BaseModel):
resources: Optional[ServiceResources] = None
extraPodSpec: Optional[PodSpec] = None
subComponentType: Optional[str] = None
multinode: Optional[MultinodeConfig | dict[str, Any]] = None
model_config = {"extra": "allow"}
......@@ -164,7 +167,7 @@ def sanitize_cli_args(args: list[str]) -> list[str]:
return result
def append_argument(args: list[str], to_append) -> list[str]:
def append_argument(args: list[str], to_append: str | list[str]) -> list[str]:
idx = find_arg_index(args)
if isinstance(to_append, list):
args[idx:idx] = to_append
......@@ -214,7 +217,9 @@ def parse_override_engine_args(args: list[str]) -> tuple[dict, list[str]]:
return override_dict, args
def set_multinode_config(worker_service, gpu_count: int, num_gpus_per_node: int):
def set_multinode_config(
worker_service: Service, gpu_count: int, num_gpus_per_node: int
) -> None:
"""Helper function to set multinode configuration based on GPU count and GPUs per node."""
if gpu_count <= num_gpus_per_node:
# Single node: remove multinode configuration if present
......@@ -284,7 +289,7 @@ def get_worker_service_from_config(
config: Config,
backend: str = "sglang",
sub_component_type: SubComponentType = SubComponentType.DECODE,
):
) -> Service:
"""Helper function to get a worker service from config.
First tries to find service by subComponentType, then falls back to component name.
......@@ -310,8 +315,8 @@ def get_worker_service_from_config(
def setup_worker_service_resources(
worker_service, gpu_count: int, num_gpus_per_node: Optional[int] = None
):
worker_service: Service, gpu_count: int, num_gpus_per_node: Optional[int] = None
) -> None:
"""Helper function to set up worker service resources (requests and limits)."""
# Handle multinode configuration if num_gpus_per_node is provided
if num_gpus_per_node is not None:
......@@ -332,7 +337,9 @@ def setup_worker_service_resources(
else gpu_count
)
def _update_resource_dict(resource_dict: dict[str, str], gpu_value: int):
def _update_resource_dict(
resource_dict: dict[str, str | dict[str, Any]], gpu_value: int
) -> None:
"""Helper function to update gpu and custom rdma/ib fields in a resource dictionary.
Args:
......@@ -354,7 +361,7 @@ def setup_worker_service_resources(
_update_resource_dict(worker_service.resources.requests, gpu_value)
def validate_and_get_worker_args(worker_service, backend):
def validate_and_get_worker_args(worker_service: Service, backend: str) -> list[str]:
"""Helper function to validate worker service and get its arguments.
Args:
......@@ -378,7 +385,7 @@ def validate_and_get_worker_args(worker_service, backend):
return break_arguments(args)
def set_argument_value(args: list, arg_name: str, value: str):
def set_argument_value(args: list[str], arg_name: str, value: str) -> list[str]:
"""Helper function to set an argument value, adding it if not present."""
try:
idx = args.index(arg_name)
......
......@@ -135,6 +135,12 @@ class BaseConfigModifier:
# Subclasses should override, e.g. "vllm" / "sglang" / "trtllm"
BACKEND: str = ""
@classmethod
def load_default_config(cls, mode: str = "disagg") -> dict:
"""Load default DGD config for the given mode. Subclasses must implement."""
raise NotImplementedError("Subclasses must implement load_default_config")
# Worker CLI arg name for model path / name. vLLM uses "--model"; others use "--model-path".
WORKER_MODEL_PATH_ARG: str = "--model-path"
WORKER_SERVED_MODEL_NAME_ARG: str = "--served-model-name"
......
......@@ -176,7 +176,7 @@ class SGLangConfigModifier(BaseConfigModifier):
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
) -> dict:
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="sglang", sub_component_type=component_type
......@@ -213,7 +213,7 @@ class SGLangConfigModifier(BaseConfigModifier):
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
) -> dict:
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="sglang", sub_component_type=component_type
......@@ -254,7 +254,7 @@ class SGLangConfigModifier(BaseConfigModifier):
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
) -> dict:
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="sglang", sub_component_type=component_type
......
......@@ -207,7 +207,7 @@ class TrtllmConfigModifier(BaseConfigModifier):
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
) -> dict:
cfg = Config.model_validate(config)
# Get the worker service using helper function
......@@ -245,7 +245,7 @@ class TrtllmConfigModifier(BaseConfigModifier):
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
) -> dict:
raise NotImplementedError(
"TEP (Tensor Expert Parallelism) is not implemented for TrtLLM backend"
)
......@@ -257,7 +257,7 @@ class TrtllmConfigModifier(BaseConfigModifier):
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
) -> dict:
raise NotImplementedError(
"DEP (Data Expert Parallelism) is not implemented for TrtLLM backend"
)
......
......@@ -168,7 +168,7 @@ class VllmV1ConfigModifier(BaseConfigModifier):
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
) -> dict:
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="vllm", sub_component_type=component_type
......
......@@ -223,7 +223,7 @@ class DynamoGraphDeploymentRequestSpec(BaseModel):
description='Model specifies the model to deploy (e.g., "Qwen/Qwen3-0.6B", "meta-llama/Llama-3-70b"). Can be a HuggingFace ID or a private model name.'
)
backend: BackendType = Field(
default="auto",
default=BackendType.Auto,
description="Backend specifies the inference backend to use for profiling and deployment.",
)
image: Optional[str] = Field(
......@@ -255,7 +255,7 @@ class DynamoGraphDeploymentRequestSpec(BaseModel):
description="Features controls optional Dynamo platform features in the generated deployment.",
)
searchStrategy: SearchStrategy = Field(
default="rapid",
default=SearchStrategy.Rapid,
description='SearchStrategy controls the profiling search depth. "rapid" performs a fast sweep; "thorough" explores more configurations.',
)
autoApply: Optional[bool] = Field(
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Sequence
def compute_pareto(x, y):
def compute_pareto(
x: Sequence[float], y: Sequence[float]
) -> tuple[list[float], list[float], list[int]]:
"""
Compute the pareto front (top-left is better) for the given x and y values.
......
......@@ -15,6 +15,7 @@
import logging
from collections import defaultdict
from typing import Any
import matplotlib.pyplot as plt
import numpy as np
......@@ -35,7 +36,9 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def plot_prefill_performance(prefill_data, target_ttft, output_dir):
def plot_prefill_performance(
prefill_data: Any, target_ttft: float, output_dir: str
) -> None:
"""
Plot prefill performance as a 2D scatter plot with GPU count and mapping annotations.
......@@ -77,7 +80,9 @@ def plot_prefill_performance(prefill_data, target_ttft, output_dir):
plt.close()
def plot_decode_performance(decode_data, target_itl, output_dir):
def plot_decode_performance(
decode_data: Any, target_itl: float, output_dir: str
) -> None:
"""
Plot decode performance with multiple GPU count lines.
......@@ -134,8 +139,11 @@ def plot_decode_performance(decode_data, target_itl, output_dir):
def plot_prefill_interpolation(
prefill_isl_np, prefill_ttft_np, prefill_thpt_per_gpu_np, work_dir
):
prefill_isl_np: np.ndarray,
prefill_ttft_np: np.ndarray,
prefill_thpt_per_gpu_np: np.ndarray,
work_dir: str,
) -> None:
"""
Plot TTFT and throughput vs ISL with quadratic interpolation.
......@@ -194,8 +202,12 @@ def plot_prefill_interpolation(
def plot_decode_3d_surface(
x_kv_usage, y_context_length, z_itl, z_thpt_per_gpu, work_dir
):
x_kv_usage: list[float],
y_context_length: list[float],
z_itl: list[float],
z_thpt_per_gpu: list[float],
work_dir: str,
) -> None:
"""
Plot 3D surface for decode interpolation with KV usage, context length, and ITL.
......@@ -296,7 +308,9 @@ def plot_decode_3d_surface(
plt.close()
def plot_pd_joint_results(isl, osl, prefill_data, decode_data, output_dir):
def plot_pd_joint_results(
isl: int, osl: int, prefill_data: Any, decode_data: Any, output_dir: str
) -> None:
"""
Plot joint prefill and decode results showing cost per 1000 requests under different SLA.
......
......@@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Callable, Optional, Tuple
from typing import Any, Callable, Optional, Tuple
import numpy as np
......@@ -22,7 +22,9 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def get_num_request_range(attn_dp_size, engine_max_concurrency, granularity):
def get_num_request_range(
attn_dp_size: int, engine_max_concurrency: int, granularity: int
) -> list[int]:
# for MoE models with attn-dp, we want the num_request to be a multiple of attn_dp_size
# so that we can make sure the request is sent to the same dp rank as the warmup request
# this is guaranteed because the dp scheduler is scheduling round-robin
......@@ -102,16 +104,16 @@ def _profile_decode_helper(
def profile_decode(
work_dir,
model_name,
tokenizer,
url,
num_gpus,
max_kv_tokens,
max_context_length,
interpolation_granularity,
attention_dp_size,
):
work_dir: str,
model_name: str,
tokenizer: str,
url: str,
num_gpus: int,
max_kv_tokens: int,
max_context_length: int,
interpolation_granularity: int,
attention_dp_size: int,
) -> None:
def get_itl_and_thpt_per_gpu(isl, osl, num_request):
ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{isl}_osl{osl}_n{num_request}"
return get_decode_itl_and_thpt_per_gpu(
......@@ -138,15 +140,15 @@ def profile_decode(
def profile_decode_aiconfigurator(
work_dir,
num_gpus,
max_kv_tokens,
max_context_length,
interpolation_granularity,
work_dir: str,
num_gpus: int,
max_kv_tokens: int,
max_context_length: int,
interpolation_granularity: int,
ai_configurator_perf_estimator: AIConfiguratorPerfEstimator,
attention_dp_size,
**model_config_kwargs,
):
attention_dp_size: int,
**model_config_kwargs: Any,
) -> None:
def get_itl_and_thpt_per_gpu(isl, osl, num_request):
perf_dict = ai_configurator_perf_estimator.estimate_perf(
isl,
......
......@@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Callable, Optional
from typing import Any, Callable, Optional
import numpy as np
......@@ -82,15 +82,15 @@ def _profile_prefill_helper(
def profile_prefill(
work_dir,
model_name,
tokenizer,
url,
num_gpus,
max_context_length,
interpolation_granularity,
work_dir: str,
model_name: str,
tokenizer: str,
url: str,
num_gpus: int,
max_context_length: int,
interpolation_granularity: int,
attention_dp_size: int = 1,
):
) -> None:
def get_ttft(isl):
ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{isl}"
return get_prefill_ttft(
......@@ -113,13 +113,13 @@ def profile_prefill(
def profile_prefill_aiconfigurator(
work_dir,
num_gpus,
max_context_length,
interpolation_granularity,
work_dir: str,
num_gpus: int,
max_context_length: int,
interpolation_granularity: int,
ai_configurator_perf_estimator: AIConfiguratorPerfEstimator,
**model_config_kwargs,
):
**model_config_kwargs: Any,
) -> None:
def get_ttft(isl):
perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
isl,
......
......@@ -12,6 +12,7 @@ import logging
import os
import time
from enum import Enum
from typing import Any
import yaml
......@@ -47,7 +48,7 @@ def write_profiler_status(
outputs: Optional dict of output files (for success status)
"""
status_file = os.path.join(output_dir, STATUS_FILE_NAME)
status_data = {
status_data: dict[str, Any] = {
"status": status.value,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
......
......@@ -4,6 +4,7 @@
import json
import logging
import queue
from typing import Any
from dynamo.profiler.webui.utils import (
add_profiling_error,
......@@ -28,7 +29,9 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def pick_config_with_webui(prefill_data, decode_data, args):
def pick_config_with_webui(
prefill_data: Any, decode_data: Any, args: Any
) -> tuple[int, int]:
"""
Launch WebUI for user to pick configurations.
......
......@@ -9,6 +9,7 @@ import threading
import time
from enum import Enum
from pathlib import Path
from typing import Any, Callable
import gradio as gr
import numpy as np
......@@ -81,10 +82,10 @@ def build_single_service_preview_header_lines(
*,
service_name: str,
engine_type: str,
mapping,
mapping: Any,
ttft_or_itl_ms: float | None,
thpt_per_gpu: float | None,
args,
args: Any,
) -> list[str]:
header_lines = [
"# DynamoGraphDeployment Service Config Preview",
......@@ -112,13 +113,13 @@ def build_two_service_preview_header_lines(
*,
prefill_service_name: str,
decode_service_name: str,
prefill_mapping,
decode_mapping,
prefill_mapping: Any,
decode_mapping: Any,
prefill_ttft_ms: float | None,
prefill_thpt_per_gpu: float | None,
decode_itl_ms: float | None,
decode_thpt_per_gpu: float | None,
args,
args: Any,
) -> list[str]:
header_lines = [
"# DynamoGraphDeployment Services Config Preview",
......@@ -170,11 +171,11 @@ WEB_UI_SELECTION_TIMEOUT = 3600
def generate_config_data(
prefill_data,
decode_data,
args,
prefill_data: Any,
decode_data: Any,
args: Any,
write_to_disk: bool = True,
):
) -> dict:
"""
Generate JSON data file for WebUI from profiling results.
......@@ -228,7 +229,7 @@ def generate_config_data(
return data
def populate_prefill_data(data, prefill_data, args):
def populate_prefill_data(data: dict, prefill_data: Any, args: Any) -> None:
"""Populate prefill chart and table data."""
if not prefill_data.num_gpus:
return
......@@ -289,7 +290,7 @@ def populate_prefill_data(data, prefill_data, args):
data[PlotType.PREFILL]["table"]["data"] = table_data
def populate_decode_data(data, decode_data, args):
def populate_decode_data(data: dict, decode_data: Any, args: Any) -> None:
"""Populate decode chart and table data."""
if not decode_data.num_gpus:
return
......@@ -354,11 +355,11 @@ def populate_decode_data(data, decode_data, args):
def populate_cost_data(
data,
prefill_data,
decode_data,
args,
):
data: dict,
prefill_data: Any,
decode_data: Any,
args: Any,
) -> None:
"""Populate cost chart and table data with pareto-optimal configurations.
Note: This function computes GPU hours (not cost). The frontend handles
......@@ -487,8 +488,11 @@ def populate_cost_data(
def create_selection_handler(
data_dict_ref, selection_queue, prefill_selection, decode_selection
):
data_dict_ref: dict,
selection_queue: queue.Queue,
prefill_selection: dict,
decode_selection: dict,
) -> Callable[[str], str]:
"""Create a selection handler closure for the WebUI.
Args:
......@@ -571,9 +575,9 @@ def create_selection_handler(
def create_gradio_interface(
json_data_str,
handle_selection,
):
json_data_str: str,
handle_selection: Callable[[str], str],
) -> Any:
"""Create the Gradio interface for configuration selection.
Args:
......@@ -665,7 +669,9 @@ def create_gradio_interface(
return demo
def wait_for_selection(demo, selection_queue, port):
def wait_for_selection(
demo: Any, selection_queue: queue.Queue, port: int
) -> tuple[int, int]:
"""Launch the demo and wait for user selection.
Args:
......
......@@ -16,6 +16,7 @@ dependencies = [
"ai-dynamo-runtime==1.0.0",
"transformers>=4.56.0",
"pytest>=8.3.4",
"types-aiofiles>=24.1.0",
"types-psutil>=7.0.0.20250218",
"types-requests>=2.32.4.20260107",
"kubernetes>=32.0.1,<33.0.0",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment