Unverified Commit f8219b12 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: agg support + solidfy fp8 configs (#3886)

parent 2c910b2e
#!/bin/bash
#SBATCH --job-name={{ job_name }}
#SBATCH --nodes={{ total_nodes }}
#SBATCH --ntasks={{ total_nodes }}
#SBATCH --ntasks-per-node=1
#SBATCH --account={{ account }}
#SBATCH --time={{ time_limit }}
#SBATCH --output=logs/%j_{{ agg_workers }}A_{{ timestamp }}/log.out
#SBATCH --error=logs/%j_{{ agg_workers }}A_{{ timestamp }}/log.err
#SBATCH --partition={{ partition }}
# Constants
set -x
AGG_NODES={{ agg_nodes }}
AGG_WORKERS={{ agg_workers }}
TOTAL_NODES={{ total_nodes }}
GPUS_PER_NODE={{ gpus_per_node }}
TOTAL_GPUS=$((AGG_NODES * GPUS_PER_NODE))
PREFILL_GPUS=0
DECODE_GPUS=$TOTAL_GPUS
AGG_NODES_PER_WORKER=$((AGG_NODES / AGG_WORKERS))
LOG_DIR="${SLURM_SUBMIT_DIR}/logs/${SLURM_JOB_ID}_{{ agg_workers }}A_{{ timestamp }}"
SCRIPT_DIR="${SLURM_SUBMIT_DIR}/scripts"
OUTPUT_DIR="${SLURM_SUBMIT_DIR}/outputs"
MODEL_DIR="{{ model_dir }}"
CONFIG_DIR="{{ config_dir }}"
CONTAINER_IMAGE="{{ container_image }}"
NETWORK_INTERFACE="{{ network_interface }}"
GPU_TYPE="{{ gpu_type | default('h100') }}"
set +x
{% raw %}
mkdir -p "${OUTPUT_DIR}" "${LOG_DIR}"
nodes=($(scontrol show hostnames $SLURM_NODELIST))
if [ ${#nodes[@]} -ne $TOTAL_NODES ]; then
echo "Error: Expected $TOTAL_NODES nodes but got ${#nodes[@]} nodes"
exit 1
fi
# Print node information
for i in "${!nodes[@]}"; do
echo "Node $i: ${nodes[$i]}"
done
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# Multiple frontend architecture
# Node 0: nginx + aggregated worker shard
# Node 1: NATS/ETCD + first frontend
# Node 2+: aggregated workers + optional additional frontends
NGINX_NODE=${nodes[0]}
MASTER_NODE=${nodes[1]}
MASTER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$MASTER_NODE ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
if [ -z "$MASTER_IP" ]; then
echo "Error: Could not retrieve IP address for master host $MASTER_NODE on interface $NETWORK_INTERFACE"
exit 1
fi
echo "Master IP address (node 1): $MASTER_IP"
echo "Nginx node (node 0): $NGINX_NODE"
# Generate frontend IP list for nginx config
frontend_hosts=()
frontend_ips=()
# Node 1 always has a frontend (with NATS/ETCD)
frontend_hosts+=("$MASTER_NODE")
frontend_ips+=("$MASTER_IP")
# Add additional frontends based on num_additional_frontends
{% endraw %}ADDITIONAL_FRONTENDS={{ num_additional_frontends }}{% raw %}
if [ "$ADDITIONAL_FRONTENDS" -gt 0 ]; then
# Calculate which nodes get additional frontends
# We have AGG_NODES aggregated worker nodes, distribute additional frontends across them
nodes_per_frontend=$(( (AGG_NODES - 1 + ADDITIONAL_FRONTENDS - 1) / ADDITIONAL_FRONTENDS )) # ceil division
frontend_node_idx=2 # Start from node 2 (node 1 already has frontend)
for i in $(seq 1 $ADDITIONAL_FRONTENDS); do
if [ $frontend_node_idx -lt $TOTAL_NODES ]; then
node_name=${nodes[$frontend_node_idx]}
node_ip=$(srun --nodes=1 --ntasks=1 --nodelist=$node_name ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
frontend_hosts+=("$node_name")
frontend_ips+=("$node_ip")
echo "Additional frontend $i on node $frontend_node_idx: $node_name ($node_ip)"
frontend_node_idx=$((frontend_node_idx + nodes_per_frontend))
fi
done
fi
echo "Frontend hosts: ${frontend_hosts[@]}"
echo "Frontend IPs: ${frontend_ips[@]}"
# Generate nginx configuration
# Build a Python list literal of frontend hosts from the bash array
FRONTEND_LIST=$(printf "'%s'," "${frontend_ips[@]}")
FRONTEND_LIST="[${FRONTEND_LIST%,}]"
export FRONTEND_LIST SCRIPT_DIR LOG_DIR
python3 - <<'PY'
import os
from jinja2 import Template
template_path = os.path.join(os.environ['SCRIPT_DIR'], 'nginx.conf.j2')
output_path = os.path.join(os.environ['LOG_DIR'], 'nginx.conf')
with open(template_path, 'r') as f:
tmpl = Template(f.read())
frontend_hosts = eval(os.environ['FRONTEND_LIST'])
config = tmpl.render(frontend_hosts=frontend_hosts)
with open(output_path, 'w') as f:
f.write(config)
PY
{% endraw %}
{% else %}
{% raw %}
# Traditional architecture - first aggregated worker node handles everything
MASTER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=${nodes[0]} ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
if [ -z "$MASTER_IP" ]; then
echo "Error: Could not retrieve IP address for master host ${nodes[0]} on interface $NETWORK_INTERFACE"
exit 1
fi
echo "Master IP address: $MASTER_IP"
{% endraw %}
{% endif %}
{% raw %}
# Compute leader nodes for each aggregated worker
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# With multiple frontends: keep offset 0; nginx coexists on node 0
WORKER_NODE_OFFSET=0
{% endraw %}
{% else %}
{% raw %}
# Traditional: workers start from node 0
WORKER_NODE_OFFSET=0
{% endraw %}
{% endif %}
{% raw %}
agg_leaders=()
for i in $(seq 0 $((AGG_WORKERS - 1))); do
leader_idx=$((WORKER_NODE_OFFSET + i * AGG_NODES_PER_WORKER))
agg_leaders[$i]=$leader_idx
done
echo "Aggregated worker leaders: ${agg_leaders[@]}"
# Prepare enroot arguments to pass to srun commands
ENROOT_ARGS="\
--container-image=${CONTAINER_IMAGE} \
--no-container-entrypoint \
--no-container-mount-home \
--container-mounts=${MODEL_DIR}:/model/,${CONFIG_DIR}:/configs/,${SCRIPT_DIR}:/scripts/,${OUTPUT_DIR}:/outputs/,${LOG_DIR}:/logs/ \
"
# Build common worker arguments
{% endraw %}
SCRIPT_VARIANT="{{ script_variant | default('default') }}"
{% raw %}
WORKER_ARGS="--gpu_type ${GPU_TYPE} --script-variant ${SCRIPT_VARIANT} --gpus_per_node ${GPUS_PER_NODE} --master_ip ${MASTER_IP}"
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# Add multiple frontends flag for worker setup
WORKER_ARGS="$WORKER_ARGS --multiple-frontends-enabled"
{% endraw %}
{% endif %}
{% if run_in_ci %}
{% raw %}
# Add CI mode flag for worker setup
WORKER_ARGS="$WORKER_ARGS --run-in-ci"
{% endraw %}
{% endif %}
{% raw %}
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# Launch nginx on node 0
echo "Launching nginx on ${NGINX_NODE}"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$NGINX_NODE --output=${LOG_DIR}/${NGINX_NODE}_nginx.out --error=${LOG_DIR}/${NGINX_NODE}_nginx.err python /scripts/worker_setup.py --worker_type nginx --nginx_config /logs/nginx.conf ${WORKER_ARGS}"
echo "$cmd"
$cmd &
# Launch frontend on master node (node 1) - this will also start NATS/ETCD
echo "Launching frontend + NATS/ETCD on master node ${MASTER_NODE}"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$MASTER_NODE --output=${LOG_DIR}/${MASTER_NODE}_frontend_0.out --error=${LOG_DIR}/${MASTER_NODE}_frontend.err python /scripts/worker_setup.py --worker_type frontend --worker_idx 0 ${WORKER_ARGS}"
echo "$cmd"
$cmd &
# Launch additional frontends on designated nodes
if [ "$ADDITIONAL_FRONTENDS" -gt 0 ]; then
frontend_idx=1 # Start from 1 since node 1 is frontend 0
nodes_per_frontend=$(( (TOTAL_NODES - 2 + ADDITIONAL_FRONTENDS - 1) / ADDITIONAL_FRONTENDS ))
frontend_node_idx=2
for i in $(seq 1 $ADDITIONAL_FRONTENDS); do
if [ $frontend_node_idx -lt $TOTAL_NODES ]; then
node=${nodes[$frontend_node_idx]}
echo "Launching additional frontend $frontend_idx on node $frontend_node_idx: $node"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_frontend_${frontend_idx}.out --error=${LOG_DIR}/${node}_frontend_${frontend_idx}.err python /scripts/worker_setup.py --worker_type frontend --worker_idx ${frontend_idx} ${WORKER_ARGS}"
echo "$cmd"
$cmd &
frontend_idx=$((frontend_idx + 1))
frontend_node_idx=$((frontend_node_idx + nodes_per_frontend))
fi
done
fi
{% endraw %}
{% else %}
{% raw %}
# Traditional: first aggregated worker node also runs frontend + NATS/ETCD
# This is handled in setup_aggregated_worker when worker_idx=0 and local_rank=0
{% endraw %}
{% endif %}
{% raw %}
# Launch aggregated workers
for worker_idx in $(seq 0 $((AGG_WORKERS - 1))); do
leader_idx=${agg_leaders[$worker_idx]}
leader_node=${nodes[$leader_idx]}
# Get leader IP for this worker group
LEADER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$leader_node ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
echo "Aggregated worker $worker_idx leader: $leader_node ($LEADER_IP)"
# Launch all nodes for this worker
for node_idx in $(seq 0 $((AGG_NODES_PER_WORKER - 1))); do
global_node_idx=$((leader_idx + node_idx))
node=${nodes[$global_node_idx]}
local_rank=$node_idx
echo "Launching aggregated worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node"
{% endraw %}
{% if enable_config_dump %}
{% raw %}
CONFIG_DUMP_ARG="--dump-config-path /logs/${node}_config.json"
{% endraw %}
{% else %}
{% raw %}
CONFIG_DUMP_ARG=""
{% endraw %}
{% endif %}
{% raw %}
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_agg_w${worker_idx}.out --error=${LOG_DIR}/${node}_agg_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${AGG_NODES_PER_WORKER} --worker_type aggregated --gpu_utilization_log /logs/${node}_agg_w${worker_idx}_gpu_utilization.log ${CONFIG_DUMP_ARG} ${WORKER_ARGS}"
echo "$cmd"
$cmd &
done
done
echo ""
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
echo "Frontend available at: http://${NGINX_NODE}:8000"
echo "To connect to the nginx node:"
echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${NGINX_NODE} --overlap --pty bash"
echo "To connect to the master node (NATS/ETCD):"
echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${MASTER_NODE} --overlap --pty bash"
{% endraw %}
{% else %}
{% raw %}
echo "To connect to the master node:"
echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --overlap --pty bash"
{% endraw %}
{% endif %}
{% raw %}
echo ""
echo "Make sure to cancel the job at the end:"
echo "scancel $SLURM_JOB_ID"
# Instead of waiting for all tasks to complete, wait for profile.sh to complete and then exit.
{% endraw %}
PROFILER_TYPE={{ profiler_type }}
PROFILER_ARGS="{{ profiler_arg }}"
{% if do_profile %}
{% raw %}
srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --output=${LOG_DIR}/profile.out --error=${LOG_DIR}/profile.err --overlap bash /scripts/${PROFILER_TYPE}/bench.sh 0 $AGG_WORKERS $PREFILL_GPUS $DECODE_GPUS $TOTAL_GPUS ${PROFILER_ARGS} &
{% endraw %}
{% endif %}
{% raw %}
wait -n
first_exit_code=$?
echo "Script finished at $(date) with exit code ${first_exit_code}"
exit $first_exit_code
{% endraw %}
......@@ -5,8 +5,8 @@
#SBATCH --ntasks-per-node=1
#SBATCH --account={{ account }}
#SBATCH --time={{ time_limit }}
#SBATCH --output=logs/%j/log.out
#SBATCH --error=logs/%j/log.err
#SBATCH --output=logs/%j_{{ prefill_workers }}P_{{ decode_workers }}D_{{ timestamp }}/log.out
#SBATCH --error=logs/%j_{{ prefill_workers }}P_{{ decode_workers }}D_{{ timestamp }}/log.err
#SBATCH --partition={{ partition }}
# Constants
......@@ -18,9 +18,11 @@ DECODE_WORKERS={{ decode_workers }}
TOTAL_NODES=$((PREFILL_NODES + DECODE_NODES))
GPUS_PER_NODE={{ gpus_per_node }}
TOTAL_GPUS=$((TOTAL_NODES * GPUS_PER_NODE))
PREFILL_GPUS=$((PREFILL_NODES * GPUS_PER_NODE))
DECODE_GPUS=$((DECODE_NODES * GPUS_PER_NODE))
PREFILL_NODES_PER_WORKER=$((PREFILL_NODES / PREFILL_WORKERS))
DECODE_NODES_PER_WORKER=$((DECODE_NODES / DECODE_WORKERS))
LOG_DIR="${SLURM_SUBMIT_DIR}/logs/${SLURM_JOB_ID}"
LOG_DIR="${SLURM_SUBMIT_DIR}/logs/${SLURM_JOB_ID}_{{ prefill_workers }}P_{{ decode_workers }}D_{{ timestamp }}"
SCRIPT_DIR="${SLURM_SUBMIT_DIR}/scripts"
OUTPUT_DIR="${SLURM_SUBMIT_DIR}/outputs"
MODEL_DIR="{{ model_dir }}"
......@@ -49,8 +51,8 @@ done
{% if enable_multiple_frontends %}
{% raw %}
# Multiple frontend architecture
# Node 0: nginx only
# Node 1: NATS/ETCD + first frontend + prefill worker
# Node 0: nginx only + prefill shard
# Node 1: NATS/ETCD + first frontend + prefill shard
# Node 2+: prefill/decode workers + optional additional frontends
NGINX_NODE=${nodes[0]}
......@@ -168,7 +170,10 @@ ENROOT_ARGS="\
"
# Build common worker arguments
WORKER_ARGS="--gpu_type ${GPU_TYPE} --gpus_per_node ${GPUS_PER_NODE} --master_ip ${MASTER_IP}"
{% endraw %}
SCRIPT_VARIANT="{{ script_variant | default('default') }}"
{% raw %}
WORKER_ARGS="--gpu_type ${GPU_TYPE} --script-variant ${SCRIPT_VARIANT} --gpus_per_node ${GPUS_PER_NODE} --master_ip ${MASTER_IP}"
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
......@@ -182,6 +187,12 @@ WORKER_ARGS="$WORKER_ARGS --multiple-frontends-enabled"
WORKER_ARGS="$WORKER_ARGS --use_init_locations"
{% endraw %}
{% endif %}
{% if run_in_ci %}
{% raw %}
# Add CI mode flag for worker setup
WORKER_ARGS="$WORKER_ARGS --run-in-ci"
{% endraw %}
{% endif %}
{% raw %}
{% endraw %}
......@@ -237,8 +248,18 @@ for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do
local_rank=$node_idx
echo "Launching prefill worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_prefill_w${worker_idx}.out --error=${LOG_DIR}/${node}_prefill_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${PREFILL_NODES_PER_WORKER} --worker_type prefill --gpu_utilization_log /logs/${node}_prefill_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}"
{% endraw %}
{% if enable_config_dump %}
{% raw %}
CONFIG_DUMP_ARG="--dump-config-path /logs/${node}_config.json"
{% endraw %}
{% else %}
{% raw %}
CONFIG_DUMP_ARG=""
{% endraw %}
{% endif %}
{% raw %}
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_prefill_w${worker_idx}.out --error=${LOG_DIR}/${node}_prefill_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${PREFILL_NODES_PER_WORKER} --worker_type prefill --gpu_utilization_log /logs/${node}_prefill_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS} ${CONFIG_DUMP_ARG}"
echo "$cmd"
$cmd &
done
......@@ -260,8 +281,18 @@ for worker_idx in $(seq 0 $((DECODE_WORKERS - 1))); do
local_rank=$node_idx
echo "Launching decode worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_decode_w${worker_idx}.out --error=${LOG_DIR}/${node}_decode_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${DECODE_NODES_PER_WORKER} --worker_type decode --gpu_utilization_log /logs/${node}_decode_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}"
{% endraw %}
{% if enable_config_dump %}
{% raw %}
CONFIG_DUMP_ARG="--dump-config-path /logs/${node}_config.json"
{% endraw %}
{% else %}
{% raw %}
CONFIG_DUMP_ARG=""
{% endraw %}
{% endif %}
{% raw %}
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_decode_w${worker_idx}.out --error=${LOG_DIR}/${node}_decode_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${DECODE_NODES_PER_WORKER} --worker_type decode --gpu_utilization_log /logs/${node}_decode_w${worker_idx}_gpu_utilization.log ${CONFIG_DUMP_ARG} ${WORKER_ARGS}"
echo "$cmd"
$cmd &
done
......@@ -298,7 +329,7 @@ PROFILER_ARGS="{{ profiler_arg }}"
{% if do_profile %}
{% raw %}
srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --output=${LOG_DIR}/profile.out --error=${LOG_DIR}/profile.err --overlap bash /scripts/${PROFILER_TYPE}/bench.sh $PREFILL_WORKERS $DECODE_WORKERS $TOTAL_GPUS ${PROFILER_ARGS} &
srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --output=${LOG_DIR}/profile.out --error=${LOG_DIR}/profile.err --overlap bash /scripts/${PROFILER_TYPE}/bench.sh $PREFILL_WORKERS $DECODE_WORKERS $PREFILL_GPUS $DECODE_GPUS ${PROFILER_ARGS} &
{% endraw %}
{% endif %}
......
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# ruff: noqa
# pylint: skip-file
import json
import os
import re
### Slurm configs
SLURM_JOB_ID = "slurm id"
### Model Deployment configurations
PREFILL_TP = "Prefill TP"
PREFILL_DP = "Prefill DP"
DECODE_TP = "Decode TP"
DECODE_DP = "Decode DP"
FRONTENDS = "Frontends"
### Profiler configs
PROFILER_TYPE = "Profiler type"
ISL = "ISL"
OSL = "OSL"
REQUEST_RATE = "Request rate"
CONCURRENCIES = "Concurrencies"
OUTPUT_TPS = "Output TPS"
OUTPUT_TPS_PER_USER = "Output TPS/User"
ITL = "Mean ITL (ms)"
TTFT = "Mean TTFT (ms)"
TPOT = "Mean TPOT (ms)"
### FORMAT PRINT ORDERS
KEY_PRINT_ORDER = [
SLURM_JOB_ID,
PREFILL_TP,
PREFILL_DP,
DECODE_TP,
DECODE_DP,
FRONTENDS,
PROFILER_TYPE,
ISL,
OSL,
REQUEST_RATE,
CONCURRENCIES,
OUTPUT_TPS,
OUTPUT_TPS_PER_USER,
ITL,
TTFT,
TPOT,
]
def format_key_order():
report = "================\nThe following log will be reported according to this order:\n----\n"
for key in KEY_PRINT_ORDER:
report += f"{key}\n"
print(report[:-1])
def format_print(result):
report = "================\n"
for key in KEY_PRINT_ORDER:
report += f"{result.get(key, '')}\n"
print(report[:-1])
def analyze_sgl_out(folder):
result = []
for file in os.listdir(folder):
with open(f"{folder}/{file}", "r") as f:
content = json.load(f)
res = [
content["max_concurrency"],
content["output_throughput"],
content["mean_itl_ms"],
content["mean_ttft_ms"],
content["request_rate"],
]
if "mean_tpot_ms" in content:
res.append(content["mean_tpot_ms"])
result.append(res)
out = {
REQUEST_RATE: [],
CONCURRENCIES: [],
OUTPUT_TPS: [],
ITL: [],
TTFT: [],
TPOT: [],
}
for data in sorted(result, key=lambda x: x[0]):
con, tps, itl, ttft, req_rate = data[0:5]
out[CONCURRENCIES].append(con)
out[OUTPUT_TPS].append(tps)
out[ITL].append(itl)
out[TTFT].append(ttft)
out[REQUEST_RATE].append(req_rate)
if len(data) >= 6:
if TPOT not in out:
out[TPOT] = []
out[TPOT].append(data[5])
return out
def analyze_gap_out(folder):
result = []
for file in os.listdir(folder):
with open(f"{folder}/{file}", "r") as f:
content = json.load(f)
result.append(
(
content["input_config"]["perf_analyzer"]["stimulus"]["concurrency"],
content["output_token_throughput_per_user"]["avg"],
content["output_token_throughput"]["avg"],
)
)
out = {CONCURRENCIES: [], OUTPUT_TPS: [], OUTPUT_TPS_PER_USER: []}
for con, tpspuser, tps in sorted(result, key=lambda x: x[0]):
out[CONCURRENCIES].append(con)
out[OUTPUT_TPS].append(tps)
out[OUTPUT_TPS_PER_USER].append(tpspuser)
return out
def analyze(p):
files = os.listdir(p)
prefill_nodes = {}
decode_nodes = {}
frontends = []
profile_result = {}
for file in files:
p_re = re.search(
"([-_A-Za-z0-9]+)_(prefill|decode|nginx|frontend)_([a-zA-Z0-9]+).out", file
)
if p_re is not None:
_, node_type, number = p_re.groups()
if node_type == "prefill":
if number not in prefill_nodes:
prefill_nodes[number] = []
prefill_nodes[number].append(file)
elif node_type == "decode":
if number not in decode_nodes:
decode_nodes[number] = []
decode_nodes[number].append(file)
elif node_type == "frontend":
frontends.append(file)
profiler_match = re.match("(sglang|vllm|gap)_isl_([0-9]+)_osl_([0-9]+)", file)
if profiler_match:
profiler, isl, osl = profiler_match.groups()
if profiler == "gap":
profile_result = analyze_gap_out(f"{p}/{file}")
else:
profile_result = analyze_sgl_out(f"{p}/{file}")
profile_result[PROFILER_TYPE] = profiler
profile_result[ISL] = isl
profile_result[OSL] = osl
config = {SLURM_JOB_ID: p}
if len(prefill_nodes.values()) != 0:
config[PREFILL_TP] = f"{len(list(prefill_nodes.values())[0]) * 4}"
config[PREFILL_DP] = f"{len(prefill_nodes.keys())}"
if len(decode_nodes.values()) != 0:
config[DECODE_TP] = f"{len(list(decode_nodes.values())[0]) * 4}"
config[DECODE_DP] = f"{len(decode_nodes.keys())}"
if len(frontends) != 0:
config[FRONTENDS] = f"{len(frontends)}"
result = {**config}
for key, value in profile_result.items():
result[key] = (
value
if type(value) != list
else ", ".join([str(x) for x in value]) # ignore:
)
return result
paths = [x for x in os.listdir(".") if ".py" not in x and os.path.isdir(x)]
format_key_order()
def extract_job_id(dirname):
"""Extract job ID from directory name for sorting.
Handles formats like:
- 12345_3P_1D_20250104_123456 (disaggregated)
- 12345_4A_20250104_123456 (aggregated)
- 12345 (legacy format)
"""
try:
return int(dirname.split("_")[0])
except (ValueError, IndexError):
# If directory name doesn't match expected format, return -1
return -1
for path in sorted(paths, key=extract_job_id, reverse=True):
result = analyze(path)
if OUTPUT_TPS not in result:
pass
else:
format_print(result)
......@@ -4,11 +4,13 @@
prefill_workers=$1
decode_workers=$2
total_gpus=$3
prefill_gpus=$3
decode_gpus=$4
total_gpus=$((prefill_gpus+decode_gpus))
chosen_isl=$4
chosen_osl=$5
chosen_concurrencies=$6
chosen_isl=$5
chosen_osl=$6
chosen_concurrencies=$7
echo "Profiling for model with PrefillDP=${prefill_workers}, DecodeDP=${decode_workers}"
......
......@@ -67,10 +67,18 @@ fi
# Construct command based on mode
if [ "$mode" = "prefill" ]; then
set -x
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
# no expert locations collected for fp4 yet
command_suffix=""
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix=" "; fi
if [[ -n "${DUMP_CONFIG_PATH}" ]]; then command_suffix="${command_suffix} --dump-config-to ${DUMP_CONFIG_PATH}"; fi
# we have to install pre-release cutedsl for a integer overflow fix
python3 -m pip install --no-cache-dir --upgrade --pre nvidia-cutlass-dsl
# set your own cache variables here
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
export SGLANG_DG_CACHE_DIR="/configs/dg-10212025"
export FLASHINFER_WORKSPACE_BASE="/configs/flashinfer-cache"
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
SGLANG_NVFP4_CKPT_FP8_GEMM_IN_ATTN=1 \
......@@ -112,12 +120,12 @@ if [ "$mode" = "prefill" ]; then
--max-prefill-tokens 16384 \
--load-balance-method round_robin \
--quantization modelopt_fp4 \
--enable-ep-moe \
--moe-runner-backend flashinfer_cutlass \
--dist-init-addr "$HOST_IP_MACHINE:$PORT" \
--disaggregation-bootstrap-port 30001 \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--ep-size "$TOTAL_GPUS" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
......@@ -125,12 +133,27 @@ if [ "$mode" = "prefill" ]; then
--stream-interval 50 \
--log-level debug ${command_suffix}
# For now we must keep SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK and cuda-graph-bs at 1024 until
# DeepEP merges in https://github.com/deepseek-ai/DeepEP/pull/440
# the nvidia-cutlass-dsl install fixes https://github.com/flashinfer-ai/flashinfer/issues/1830#issuecomment-3380074018
# which was previously limiting us to DISPATCH_TOKENS and cuda-graph-bs == 384
# For now use 12 nodes for fp4 since flashinfer_cutedsl requires experts per gpu < 8
# We have 288 (256 + 32 redundant) => 288/48 = 6
elif [ "$mode" = "decode" ]; then
set -x
# no expert locations collected for fp4 yet
command_suffix=""
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix=" "; fi
if [[ -n "${DUMP_CONFIG_PATH}" ]]; then command_suffix="${command_suffix} --dump-config-to ${DUMP_CONFIG_PATH}"; fi
# set your own cache variables here
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
export SGLANG_DG_CACHE_DIR="/configs/dg-10212025"
export FLASHINFER_WORKSPACE_BASE="/configs/flashinfer-cache"
# we have to install pre-release cutedsl for a integer overflow fix
python3 -m pip install --no-cache-dir --upgrade --pre nvidia-cutlass-dsl
SGLANG_NVFP4_CKPT_FP8_GEMM_IN_ATTN=1 \
SGLANG_PER_TOKEN_GROUP_QUANT_8BIT_V2=1 \
......@@ -141,7 +164,7 @@ elif [ "$mode" = "decode" ]; then
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_HACK_SEQ_BOOTSTRAP_ROOM=1 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=1408 \
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=384 \
SGLANG_CUTEDSL_MOE_NVFP4_DISPATCH=1 \
SGLANG_FP4_GEMM_BACKEND=cutlass \
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
......@@ -165,11 +188,10 @@ elif [ "$mode" = "decode" ]; then
--enable-dp-attention \
--chunked-prefill-size 786432 \
--mem-fraction-static 0.83 \
--enable-ep-moe \
--moe-a2a-backend deepep \
--deepep-mode low_latency \
--ep-dispatch-algorithm static \
--cuda-graph-bs 1408 \
--cuda-graph-bs 384 \
--num-reserved-decode-tokens 112 \
--ep-num-redundant-experts 32 \
--eplb-algorithm deepseek \
......@@ -184,6 +206,7 @@ elif [ "$mode" = "decode" ]; then
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--ep-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-single-batch-overlap \
--enable-dp-attention \
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Simple agg script (not an optimized config)
print_usage() {
echo "Usage: $0"
echo ""
echo "This script runs aggregated mode (single dynamo.sglang instance)"
exit 1
}
echo "Mode: aggregated"
echo "Command: dynamo"
# Check if required environment variables are set
if [ -z "$HOST_IP_MACHINE" ]; then
echo "Error: HOST_IP_MACHINE environment variable is not set"
exit 1
fi
if [ -z "$PORT" ]; then
echo "Error: PORT environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_GPUS" ]; then
echo "Error: TOTAL_GPUS environment variable is not set"
exit 1
fi
if [ -z "$RANK" ]; then
echo "Error: RANK environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_NODES" ]; then
echo "Error: TOTAL_NODES environment variable is not set"
exit 1
fi
# Construct command suffix for config dump
command_suffix=""
if [[ -n "${DUMP_CONFIG_PATH}" ]]; then command_suffix="--dump-config-to ${DUMP_CONFIG_PATH}"; fi
set -x
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
export SGLANG_DG_CACHE_DIR="/configs/dg-10212025"
export FLASHINFER_WORKSPACE_BASE="/configs/flashinfer-cache"
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
MC_TE_METRIC=true \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
MC_FORCE_MNNVL=1 \
NCCL_MNNVL_ENABLE=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m dynamo.sglang \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--skip-tokenizer-init \
--trust-remote-code \
--dist-init-addr "$HOST_IP_MACHINE:$PORT" \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--host 0.0.0.0 \
--max-running-requests 30000 \
--context-length 2200 \
--disable-radix-cache \
--moe-a2a-backend deepep \
--load-balance-method round_robin \
--deepep-mode normal \
--ep-dispatch-algorithm dynamic \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--eplb-algorithm deepseek \
--attention-backend trtllm_mla \
--kv-cache-dtype fp8_e4m3 \
--watchdog-timeout 1000000 \
--disable-cuda-graph \
--chunked-prefill-size 131072 \
--max-total-tokens 524288 \
--deepep-config /configs/deepep_config.json \
--stream-interval 50 \
--mem-fraction-static 0.75 ${command_suffix}
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Low Latency Config
# Function to print usage
print_usage() {
echo "Usage: $0 <mode>"
echo " mode: prefill or decode"
echo ""
echo "Examples:"
echo " $0 prefill"
echo " $0 decode"
exit 1
}
# Check if correct number of arguments provided
if [ $# -ne 1 ]; then
echo "Error: Expected 1 argument, got $#"
print_usage
fi
# Parse arguments
mode=$1
# Validate mode argument
if [ "$mode" != "prefill" ] && [ "$mode" != "decode" ]; then
echo "Error: mode must be 'prefill' or 'decode', got '$mode'"
print_usage
fi
echo "Mode: $mode"
echo "Command: dynamo"
# Check if required environment variables are set
if [ -z "$HOST_IP_MACHINE" ]; then
echo "Error: HOST_IP_MACHINE environment variable is not set"
exit 1
fi
if [ -z "$PORT" ]; then
echo "Error: PORT environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_GPUS" ]; then
echo "Error: TOTAL_GPUS environment variable is not set"
exit 1
fi
if [ -z "$RANK" ]; then
echo "Error: RANK environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_NODES" ]; then
echo "Error: TOTAL_NODES environment variable is not set"
exit 1
fi
if [ -z "$USE_INIT_LOCATIONS" ]; then
echo "Error: USE_INIT_LOCATIONS environment variable is not set"
exit 1
fi
if [ -z "$RUN_IN_CI" ]; then
echo "Error: RUN_IN_CI environment variable is not set"
exit 1
fi
# Construct command based on mode
if [ "$mode" = "prefill" ]; then
set -x
if [[ "${RUN_IN_CI,,}" == "true" ]]; then
python3 -m pip install /configs/ai_dynamo_runtime-0.6.1-cp310-abi3-manylinux_2_28_aarch64.whl
python3 -m pip install /configs/ai_dynamo-0.6.1-py3-none-any.whl
fi
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
export SGLANG_DG_CACHE_DIR="/configs/dg-10212025"
command_suffix=""
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix="--init-expert-location /configs/prefill_dsr1-0528_in1000out1000_num40000.json"; fi
if [[ -n "${DUMP_CONFIG_PATH}" ]]; then command_suffix="${command_suffix} --dump-config-to ${DUMP_CONFIG_PATH}"; fi
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
MC_TE_METRIC=true \
SGLANG_ENABLE_FLASHINFER_GEMM=1 \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
MC_FORCE_MNNVL=1 \
NCCL_MNNVL_ENABLE=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGLANG_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m dynamo.sglang \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--trust-remote-code \
--disable-radix-cache \
--moe-dense-tp-size 1 \
--max-running-requests 512 \
--chunked-prefill-size 8192 \
--mem-fraction-static 0.95 \
--cuda-graph-max-bs 128 \
--context-length 2200 \
--kv-cache-dtype fp8_e4m3 \
--quantization fp8 \
--attention-backend trtllm_mla \
--stream-interval 10 \
--max-total-tokens 8192 \
--enable-flashinfer-allreduce-fusion \
--moe-runner-backend flashinfer_trtllm \
--load-balance-method round_robin \
--scheduler-recv-interval 10 \
--enable-symm-mem \
--dist-init-addr "$HOST_IP_MACHINE:$PORT" \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--base-gpu-id 0 \
--disaggregation-mode prefill \
--host 0.0.0.0 \
--tensor-parallel-size "$TOTAL_GPUS" \
--data-parallel-size 1 \
--expert-parallel-size 1 ${command_suffix}
elif [ "$mode" = "decode" ]; then
set -x
if [[ "${RUN_IN_CI,,}" == "true" ]]; then
python3 -m pip install /configs/ai_dynamo_runtime-0.6.1-cp310-abi3-manylinux_2_28_aarch64.whl
python3 -m pip install /configs/ai_dynamo-0.6.1-py3-none-any.whl
fi
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
export SGLANG_DG_CACHE_DIR="/configs/dg-10212025"
command_suffix=""
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix="--init-expert-location /configs/decode_dsr1-0528_loadgen_in1024out1024_num2000_2p12d.json"; fi
if [[ -n "${DUMP_CONFIG_PATH}" ]]; then command_suffix="${command_suffix} --dump-config-to ${DUMP_CONFIG_PATH}"; fi
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
MC_TE_METRIC=true \
SGLANG_ENABLE_FLASHINFER_GEMM=1 \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_DECODE_BOOTSTRAP_TIMEOUT=1000 \
SGLANG_HACK_SEQ_BOOTSTRAP_ROOM=1 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
MC_FORCE_MNNVL=1 \
NCCL_MNNVL_ENABLE=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGLANG_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m dynamo.sglang \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--trust-remote-code \
--disable-radix-cache \
--moe-dense-tp-size 1 \
--max-running-requests 512 \
--chunked-prefill-size 8192 \
--mem-fraction-static 0.95 \
--cuda-graph-max-bs 128 \
--context-length 2200 \
--kv-cache-dtype fp8_e4m3 \
--quantization fp8 \
--attention-backend trtllm_mla \
--stream-interval 10 \
--enable-flashinfer-allreduce-fusion \
--moe-runner-backend flashinfer_trtllm \
--prefill-round-robin-balance \
--scheduler-recv-interval 10 \
--enable-symm-mem \
--dist-init-addr "$HOST_IP_MACHINE:$PORT" \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--base-gpu-id 0 \
--disaggregation-mode decode \
--host 0.0.0.0 \
--tensor-parallel-size "$TOTAL_GPUS" \
--data-parallel-size 1 \
--expert-parallel-size 1 ${command_suffix}
fi
\ No newline at end of file
......@@ -62,14 +62,24 @@ if [ -z "$USE_INIT_LOCATIONS" ]; then
exit 1
fi
if [ -z "$RUN_IN_CI" ]; then
echo "Error: RUN_IN_CI environment variable is not set"
exit 1
fi
# Construct command based on mode
if [ "$mode" = "prefill" ]; then
set -x
if [[ "${RUN_IN_CI,,}" == "true" ]]; then
python3 -m pip install /configs/ai_dynamo_runtime-0.6.1-cp310-abi3-manylinux_2_28_aarch64.whl
python3 -m pip install /configs/ai_dynamo-0.6.1-py3-none-any.whl
fi
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
export SGL_DG_CACHE_DIR="/configs/dgcache/3p1dcache"
export SGLANG_DG_CACHE_DIR="/configs/dg-10212025"
command_suffix=""
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix="--init-expert-location /configs/prefill_dsr1-0528_in1000out1000_num40000.json"; fi
if [[ -n "${DUMP_CONFIG_PATH}" ]]; then command_suffix="${command_suffix} --dump-config-to ${DUMP_CONFIG_PATH}"; fi
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
MC_TE_METRIC=true \
......@@ -109,7 +119,8 @@ if [ "$mode" = "prefill" ]; then
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--attention-backend trtllm_mla \
--kv-cache-dtype fp8_e4m3 \
--watchdog-timeout 1000000 \
--disable-cuda-graph \
--chunked-prefill-size 131072 \
......@@ -120,12 +131,16 @@ if [ "$mode" = "prefill" ]; then
elif [ "$mode" = "decode" ]; then
set -x
set -x
if [[ "${RUN_IN_CI,,}" == "true" ]]; then
python3 -m pip install /configs/ai_dynamo_runtime-0.6.1-cp310-abi3-manylinux_2_28_aarch64.whl
python3 -m pip install /configs/ai_dynamo-0.6.1-py3-none-any.whl
fi
export TORCH_DISTRIBUTED_DEFAULT_TIMEOUT=1800
export SGL_DG_CACHE_DIR="/configs/dgcache/3p1dcache"
export SGLANG_DG_CACHE_DIR="/configs/dg-10212025"
command_suffix=""
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix="--init-expert-location /configs/decode_dsr1-0528_loadgen_in1024out1024_num2000_2p12d.json"; fi
if [[ -n "${DUMP_CONFIG_PATH}" ]]; then command_suffix="${command_suffix} --dump-config-to ${DUMP_CONFIG_PATH}"; fi
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=768 \
......@@ -171,7 +186,8 @@ elif [ "$mode" = "decode" ]; then
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm static \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--attention-backend trtllm_mla \
--kv-cache-dtype fp8_e4m3 \
--watchdog-timeout 1000000 \
--chunked-prefill-size 36864 \
--stream-interval 50 \
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Function to print usage
print_usage() {
echo "Usage: $0 <mode>"
echo " mode: prefill or decode"
echo ""
echo "Examples:"
echo " $0 prefill"
echo " $0 decode"
exit 1
}
# Check if correct number of arguments provided
if [ $# -ne 1 ]; then
echo "Error: Expected 1 argument, got $#"
print_usage
fi
# Parse arguments
mode=$1
# Validate mode argument
if [ "$mode" != "prefill" ] && [ "$mode" != "decode" ]; then
echo "Error: mode must be 'prefill' or 'decode', got '$mode'"
print_usage
fi
echo "Mode: $mode"
echo "Command: dynamo"
# Check if required environment variables are set
if [ -z "$HOST_IP" ]; then
echo "Error: HOST_IP environment variable is not set"
exit 1
fi
if [ -z "$PORT" ]; then
echo "Error: PORT environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_GPUS" ]; then
echo "Error: TOTAL_GPUS environment variable is not set"
exit 1
fi
if [ -z "$RANK" ]; then
echo "Error: RANK environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_NODES" ]; then
echo "Error: TOTAL_NODES environment variable is not set"
exit 1
fi
# Construct command based on mode
if [ "$mode" = "prefill" ]; then
if [ "$cmd" = "dynamo" ]; then
# H100 dynamo prefill command
python3 -m dynamo.sglang \
--model-path /model/ \
--served-model-name deepseek-ai/DeepSeek-R1 \
--skip-tokenizer-init \
--disaggregation-mode prefill \
--disaggregation-transfer-backend nixl \
--disaggregation-bootstrap-port 30001 \
--dist-init-addr "$HOST_IP:$PORT" \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--decode-log-interval 1 \
--enable-deepep-moe \
--page-size 1 \
--trust-remote-code \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-radix-cache \
--watchdog-timeout 1000000 \
--enable-two-batch-overlap \
--deepep-mode normal \
--mem-fraction-static 0.85 \
--deepep-config /configs/deepep.json \
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm dynamic \
--eplb-algorithm deepseek
elif [ "$cmd" = "sglang" ]; then
# H100 sglang prefill command
python3 -m sglang.launch_server \
--model-path /model/ \
--served-model-name deepseek-ai/DeepSeek-R1 \
--disaggregation-transfer-backend nixl \
--disaggregation-mode prefill \
--dist-init-addr "$HOST_IP:$PORT" \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--decode-log-interval 1 \
--enable-deepep-moe \
--page-size 1 \
--host 0.0.0.0 \
--trust-remote-code \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-radix-cache \
--watchdog-timeout 1000000 \
--enable-two-batch-overlap \
--deepep-mode normal \
--mem-fraction-static 0.85 \
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm dynamic \
--eplb-algorithm deepseek \
--deepep-config /configs/deepep.json
fi
elif [ "$mode" = "decode" ]; then
if [ "$cmd" = "dynamo" ]; then
# H100 dynamo decode command
python3 -m dynamo.sglang \
--model-path /model/ \
--served-model-name deepseek-ai/DeepSeek-R1 \
--skip-tokenizer-init \
--disaggregation-mode decode \
--disaggregation-transfer-backend nixl \
--disaggregation-bootstrap-port 30001 \
--dist-init-addr "$HOST_IP:$PORT" \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--decode-log-interval 1 \
--enable-deepep-moe \
--page-size 1 \
--trust-remote-code \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-radix-cache \
--watchdog-timeout 1000000 \
--enable-two-batch-overlap \
--deepep-mode low_latency \
--mem-fraction-static 0.835 \
--ep-num-redundant-experts 32 \
--cuda-graph-bs 128
elif [ "$cmd" = "sglang" ]; then
# H100 sglang decode command
python3 -m sglang.launch_server \
--model-path /model/ \
--disaggregation-transfer-backend nixl \
--disaggregation-mode decode \
--dist-init-addr "$HOST_IP:$PORT" \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--decode-log-interval 1 \
--enable-deepep-moe \
--page-size 1 \
--host 0.0.0.0 \
--trust-remote-code \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-radix-cache \
--watchdog-timeout 1000000 \
--enable-two-batch-overlap \
--deepep-mode low_latency \
--mem-fraction-static 0.835 \
--ep-num-redundant-experts 32 \
--cuda-graph-bs 128
fi
fi
......@@ -4,14 +4,16 @@
n_prefill=$1
n_decode=$2
total_gpus=$3
prefill_gpus=$3
decode_gpus=$4
total_gpus=$((prefill_gpus+decode_gpus))
chosen_isl=$4
chosen_osl=$5
concurrency_list=$6
chosen_isl=$5
chosen_osl=$6
concurrency_list=$7
IFS='x' read -r -a chosen_concurrencies <<< "$concurrency_list"
chosen_req_rate=$7
chosen_req_rate=$8
echo "Config ${chosen_isl}; ${chosen_osl}; ${chosen_concurrencies[@]}; ${chosen_req_rate}"
......
......@@ -380,7 +380,13 @@ async def async_request_dynamo_completions(
if not chunk_bytes:
continue
chunk = chunk_bytes.decode("utf-8").removeprefix("data: ")
chunk = chunk_bytes.decode("utf-8")
# Skip SSE event/comment lines (not data)
if chunk.startswith("event:") or chunk.startswith(":"):
continue
chunk = chunk.removeprefix("data: ")
if chunk != "[DONE]":
data = json.loads(chunk)
......
......@@ -11,17 +11,19 @@ head_port=8000
n_prefill=$1
n_decode=$2
total_gpus=$3
prefill_gpus=$3
decode_gpus=$4
total_gpus=$((prefill_gpus+decode_gpus))
source /scripts/benchmark_utils.sh
work_dir="/scripts/vllm/"
cd $work_dir
chosen_isl=$4
chosen_osl=$5
concurrency_list=$6
chosen_isl=$5
chosen_osl=$6
concurrency_list=$7
IFS='x' read -r -a chosen_concurrencies <<< "$concurrency_list"
chosen_req_rate=$7
chosen_req_rate=$8
echo "Config ${chosen_isl}; ${chosen_osl}; ${chosen_concurrencies[@]}; ${chosen_req_rate}"
......@@ -32,28 +34,35 @@ wait_for_model_report_interval=60 # wait_for_model report interval -> 60s
wait_for_model $head_node $head_port $n_prefill $n_decode $wait_for_model_check_interval $wait_for_model_timeout $wait_for_model_report_interval
set -e
# Warmup the model
# Warmup the model with a sweep of concurrencies
warmup_isl=$chosen_isl
warmup_osl=$chosen_osl
warmup_prompts=10000
warmup_concurrencies=10000
warmup_req_rate=250
set -x
python3 benchmark_serving.py \
warmup_concurrency_list=(1 4 8 32 64 128 256 512 1024 4096)
for warmup_concurrency in "${warmup_concurrency_list[@]}"
do
echo "Warming up model with concurrency $warmup_concurrency"
echo "$(date '+%Y-%m-%d %H:%M:%S')"
num_prompts=$((warmup_concurrency * 5))
set -x
python3 -u benchmark_serving.py \
--model ${model_name} --tokenizer ${model_path} \
--host $head_node --port $head_port \
--backend "dynamo" --endpoint /v1/completions \
--disable-tqdm \
--dataset-name random \
--num-prompts "$warmup_prompts" \
--num-prompts "$num_prompts" \
--random-input-len $warmup_isl \
--random-output-len $warmup_osl \
--random-range-ratio 0.8 \
--ignore-eos \
--request-rate ${warmup_req_rate} \
--percentile-metrics ttft,tpot,itl,e2el \
--max-concurrency "$warmup_concurrencies"
set +x
--max-concurrency "$warmup_concurrency"
set +x
echo "$(date '+%Y-%m-%d %H:%M:%S')"
done
set +e
result_dir="/logs/vllm_isl_${chosen_isl}_osl_${chosen_osl}"
......@@ -64,10 +73,11 @@ for concurrency in "${chosen_concurrencies[@]}"
do
num_prompts=$((concurrency * 5))
echo "Running benchmark with concurrency: $concurrency and num-prompts: $num_prompts, writing to file ${result_dir}"
result_filename="isl_${chosen_isl}_osl_${chosen_osl}_concurrency_${concurrency}_req_rate_${chosen_req_rate}_gpus${total_gpus}.json"
result_filename="isl_${chosen_isl}_osl_${chosen_osl}_concurrency_${concurrency}_req_rate_${chosen_req_rate}_ctx${prefill_gpus}_gen${decode_gpus}.json"
set -x
python3 benchmark_serving.py \
echo "$(date '+%Y-%m-%d %H:%M:%S')"
python3 -u benchmark_serving.py \
--model ${model_name} --tokenizer ${model_path} \
--host $head_node --port $head_port \
--backend "dynamo" --endpoint /v1/completions \
......@@ -84,6 +94,7 @@ do
--save-result --result-dir $result_dir --result-filename $result_filename
set +x
echo "$(date '+%Y-%m-%d %H:%M:%S')"
echo "Completed benchmark with concurrency: $concurrency"
echo "-----------------------------------------"
done
......
......@@ -155,7 +155,7 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
)
parser.add_argument(
"--worker_type",
choices=["decode", "prefill", "frontend", "nginx"],
choices=["decode", "prefill", "frontend", "nginx", "aggregated"],
required=True,
help="Type of worker to run",
)
......@@ -175,9 +175,14 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
parser.add_argument(
"--gpu_type",
type=str,
choices=["gb200-fp8", "gb200-fp4"],
default="gb200-fp8",
help="Type of GPU to use. You can choose between gb200-fp8 and gb200-fp4.",
help="Type of GPU to use (script will be validated at runtime)",
)
parser.add_argument(
"--script-variant",
type=str,
default="default",
help="Script variant to use (e.g., 'default', 'optim', 'decode-optim'). Defaults to 'default'",
)
parser.add_argument(
......@@ -198,6 +203,19 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
help="Whether we add --init-expert-locations to launch commands",
)
parser.add_argument(
"--dump-config-path",
type=str,
default=None,
help="Path to dump config file (e.g., /logs/node_config.json)",
)
parser.add_argument(
"--run-in-ci",
action="store_true",
help="Run in CI mode - use binaries from /configs/ for nats/etcd",
)
return parser.parse_args(args)
......@@ -236,6 +254,8 @@ def setup_env_vars_for_gpu_script(
total_nodes: int,
port: int = DIST_INIT_PORT,
use_init_locations: bool = True,
dump_config_path: str | None = None,
run_in_ci: bool = False,
):
"""Setup environment variables required by GPU scripts (gb200-fp8.sh)"""
os.environ["HOST_IP_MACHINE"] = host_ip
......@@ -244,6 +264,11 @@ def setup_env_vars_for_gpu_script(
os.environ["RANK"] = str(local_rank)
os.environ["TOTAL_NODES"] = str(total_nodes)
os.environ["USE_INIT_LOCATIONS"] = str(use_init_locations)
os.environ["RUN_IN_CI"] = str(run_in_ci)
if dump_config_path:
os.environ["DUMP_CONFIG_PATH"] = dump_config_path
else:
os.environ.pop("DUMP_CONFIG_PATH", None)
logging.info(f"Set HOST_IP: {host_ip}")
logging.info(f"Set PORT: {port}")
......@@ -251,30 +276,65 @@ def setup_env_vars_for_gpu_script(
logging.info(f"Set RANK: {local_rank}")
logging.info(f"Set TOTAL_NODES: {total_nodes}")
logging.info(f"Set USE_INIT_LOCATIONS: {use_init_locations}")
logging.info(f"Set RUN_IN_CI: {run_in_ci}")
if dump_config_path:
logging.info(f"Set DUMP_CONFIG_PATH: {dump_config_path}")
def get_gpu_command(worker_type: str, gpu_type: str) -> str:
"""Generate command to run the appropriate GPU script"""
script_name = f"{gpu_type}.sh"
script_path = Path(__file__).parent / script_name
mode = worker_type # "prefill" or "decode"
def get_gpu_command(
worker_type: str, gpu_type: str, script_variant: str = "default"
) -> str:
"""Generate command to run the appropriate GPU script.
Scripts are organized as: scripts/{gpu_type}/{agg,disagg}/{script_variant}.sh
"""
script_base = Path(__file__).parent
script_name = f"{script_variant}.sh"
if worker_type == "aggregated":
# Remove any -prefill or -decode suffix if present
base_gpu_type = gpu_type.replace("-prefill", "").replace("-decode", "")
script_path = script_base / base_gpu_type / "agg" / script_name
if not script_path.exists():
raise ValueError(f"Aggregated GPU script not found: {script_path}")
return f"bash {script_path}"
else:
# Disaggregated mode: scripts/{gpu_type}/disagg/{script_variant}.sh {prefill|decode}
script_path = script_base / gpu_type / "disagg" / script_name
if not script_path.exists():
raise ValueError(f"Disaggregated GPU script not found: {script_path}")
mode = worker_type # "prefill" or "decode"
return f"bash {script_path} {mode}"
def setup_head_prefill_node(prefill_host_ip: str) -> None:
def setup_head_prefill_node(prefill_host_ip: str, run_in_ci: bool = False) -> None:
"""
Setup NATS, etcd, ingress, and http servers on the prefill host node.
"""
if run_in_ci:
logging.info(
f"Starting nats server on node {prefill_host_ip} (CI mode - using /configs/nats-server)"
)
nats_cmd = "/configs/nats-server -js"
else:
logging.info(f"Starting nats server on node {prefill_host_ip}")
nats_cmd = "nats-server -js"
nats_process = run_command("nats-server -js", background=True)
nats_process = run_command(nats_cmd, background=True)
if not nats_process:
raise RuntimeError("Failed to start nats-server")
if run_in_ci:
logging.info(
f"Starting etcd server on node {prefill_host_ip} (CI mode - using /configs/etcd)"
)
etcd_binary = "/configs/etcd"
else:
logging.info(f"Starting etcd server on node {prefill_host_ip}")
etcd_binary = "etcd"
etcd_cmd = (
f"etcd --listen-client-urls {ETCD_LISTEN_ADDR}:{ETCD_CLIENT_PORT} "
f"{etcd_binary} --listen-client-urls {ETCD_LISTEN_ADDR}:{ETCD_CLIENT_PORT} "
f"--advertise-client-urls {ETCD_LISTEN_ADDR}:{ETCD_CLIENT_PORT} "
f"--listen-peer-urls {ETCD_LISTEN_ADDR}:{ETCD_PEER_PORT} "
f"--initial-cluster default=http://{prefill_host_ip}:{ETCD_PEER_PORT}"
......@@ -296,13 +356,15 @@ def setup_nginx_worker(master_ip: str, nginx_config: str) -> int:
return run_command(nginx_cmd)
def setup_frontend_worker(worker_idx: int, master_ip: str) -> int:
def setup_frontend_worker(
worker_idx: int, master_ip: str, run_in_ci: bool = False
) -> int:
"""Setup a frontend worker"""
logging.info(f"Setting up frontend worker {worker_idx}")
# First frontend (worker_idx 0) also sets up NATS/ETCD
if worker_idx == 0:
setup_head_prefill_node(master_ip)
setup_head_prefill_node(master_ip, run_in_ci)
else:
logging.info(f"Setting up additional frontend worker {worker_idx}")
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
......@@ -310,6 +372,8 @@ def setup_frontend_worker(worker_idx: int, master_ip: str) -> int:
# All frontends run the ingress server
frontend_cmd = "python3 -m dynamo.frontend --http-port=8000"
if run_in_ci:
frontend_cmd = "python3 -m pip install /configs/ai_dynamo_runtime-0.6.1-cp310-abi3-manylinux_2_28_aarch64.whl && python3 -m pip install /configs/ai_dynamo-0.6.1-py3-none-any.whl && python3 -m dynamo.frontend --http-port=8000"
return run_command(frontend_cmd)
......@@ -323,6 +387,9 @@ def setup_prefill_worker(
gpu_type: str,
multiple_frontends_enabled: bool = False,
use_init_locations: bool = True,
dump_config_path: str | None = None,
script_variant: str = "default",
run_in_ci: bool = False,
) -> int:
"""
Setup the prefill worker.
......@@ -330,7 +397,7 @@ def setup_prefill_worker(
total_gpus = nodes_per_worker * gpus_per_node
# Only setup infrastructure in traditional mode (not multiple frontends)
if not multiple_frontends_enabled and worker_idx == 0 and local_rank == 0:
setup_head_prefill_node(master_ip)
setup_head_prefill_node(master_ip, run_in_ci)
else:
logging.info(f"Setting up prefill worker {worker_idx}, local rank {local_rank}")
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
......@@ -343,10 +410,12 @@ def setup_prefill_worker(
total_gpus,
nodes_per_worker,
use_init_locations=use_init_locations,
dump_config_path=dump_config_path,
run_in_ci=run_in_ci,
)
# Use appropriate GPU script instead of generating command directly
cmd_to_run = get_gpu_command("prefill", gpu_type)
cmd_to_run = get_gpu_command("prefill", gpu_type, script_variant)
return run_command(cmd_to_run)
......@@ -359,6 +428,9 @@ def setup_decode_worker(
gpus_per_node: int,
gpu_type: str,
use_init_locations: bool = True,
dump_config_path: str | None = None,
script_variant: str = "default",
run_in_ci: bool = False,
) -> int:
"""
Setup the decode worker.
......@@ -376,10 +448,56 @@ def setup_decode_worker(
total_gpus,
nodes_per_worker,
use_init_locations=use_init_locations,
dump_config_path=dump_config_path,
run_in_ci=run_in_ci,
)
# Use appropriate GPU script instead of generating command directly
cmd_to_run = get_gpu_command("decode", gpu_type)
cmd_to_run = get_gpu_command("decode", gpu_type, script_variant)
return run_command(cmd_to_run)
def setup_aggregated_worker(
worker_idx: int,
local_rank: int,
leader_ip: str,
master_ip: str,
nodes_per_worker: int,
gpus_per_node: int,
gpu_type: str,
multiple_frontends_enabled: bool = False,
dump_config_path: str | None = None,
script_variant: str = "default",
run_in_ci: bool = False,
) -> int:
"""
Setup the aggregated worker.
"""
total_gpus = nodes_per_worker * gpus_per_node
# Only setup infrastructure in traditional mode (not multiple frontends) on first worker, first node
if not multiple_frontends_enabled and worker_idx == 0 and local_rank == 0:
setup_head_prefill_node(master_ip, run_in_ci)
else:
logging.info(
f"Setting up aggregated worker {worker_idx}, local rank {local_rank}"
)
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
raise RuntimeError("Failed to connect to etcd")
# Setup environment variables for GPU script - use leader_ip as dist-init-addr
# Aggregated mode doesn't use init locations
setup_env_vars_for_gpu_script(
leader_ip,
local_rank,
total_gpus,
nodes_per_worker,
use_init_locations=False,
dump_config_path=dump_config_path,
run_in_ci=run_in_ci,
)
# Use appropriate aggregated GPU script
cmd_to_run = get_gpu_command("aggregated", gpu_type, script_variant)
return run_command(cmd_to_run)
......@@ -410,6 +528,7 @@ def main(input_args: list[str] | None = None):
logging.info(f"Leader IP: {args.leader_ip}")
logging.info(f"Master IP: {args.master_ip}")
logging.info(f"Nodes per worker: {args.nodes_per_worker}")
logging.info(f"Run in CI mode?: {args.run_in_ci}")
logging.info(f"Use init locations?: {args.use_init_locations}")
setup_env(args.master_ip)
......@@ -419,7 +538,7 @@ def main(input_args: list[str] | None = None):
raise ValueError("--nginx_config is required for nginx worker type")
setup_nginx_worker(args.master_ip, args.nginx_config)
elif args.worker_type == "frontend":
setup_frontend_worker(args.worker_idx, args.master_ip)
setup_frontend_worker(args.worker_idx, args.master_ip, args.run_in_ci)
elif args.worker_type == "prefill":
setup_prefill_worker(
args.worker_idx,
......@@ -431,6 +550,9 @@ def main(input_args: list[str] | None = None):
args.gpu_type,
args.multiple_frontends_enabled,
args.use_init_locations,
args.dump_config_path,
args.script_variant,
args.run_in_ci,
)
elif args.worker_type == "decode":
setup_decode_worker(
......@@ -442,6 +564,23 @@ def main(input_args: list[str] | None = None):
args.gpus_per_node,
args.gpu_type,
args.use_init_locations,
args.dump_config_path,
args.script_variant,
args.run_in_ci,
)
elif args.worker_type == "aggregated":
setup_aggregated_worker(
args.worker_idx,
args.local_rank,
args.leader_ip,
args.master_ip,
args.nodes_per_worker,
args.gpus_per_node,
args.gpu_type,
args.multiple_frontends_enabled,
args.dump_config_path,
args.script_variant,
args.run_in_ci,
)
logging.info(f"{args.worker_type.capitalize()} worker setup complete")
......
......@@ -62,6 +62,7 @@ ISL=$6
OSL=$7
CONCURRENCIES=$8
REQUEST_RATE=$9
SCRIPT_VARIANT=${10}
RETRIES=1 # defaults to retry the job 1 time to avoid transient errors
......@@ -74,10 +75,14 @@ if [[ $PREFILL_NODES -eq 6 ]] && [[ $PREFILL_WORKERS -eq 3 ]] && [[ $DECODE_NODE
USE_INIT_LOCATIONS=(--use-init-location)
fi
SCRIPT_VARIANT_ARGS=()
if [[ -n "$SCRIPT_VARIANT" ]]; then
SCRIPT_VARIANT_ARGS=(--script-variant "$SCRIPT_VARIANT")
fi
command=(
python3 submit_job_script.py
--account $SLURM_ACCOUNT --partition $SLURM_PARTITION --time-limit $TIME_LIMIT
--template job_script_template.j2
--model-dir $MODEL_PATH --config-dir $CONFIG_DIR
--container-image $CONTAINER_IMAGE
......@@ -90,6 +95,9 @@ command=(
--profiler "${profiler_args}"
--retries $RETRIES
--run-in-ci
${SCRIPT_VARIANT_ARGS[@]}
)
"${command[@]}"
......@@ -19,24 +19,27 @@ Script to generate SLURM job scripts from Jinja2 templates.
import argparse
import logging
import os
import pathlib
import subprocess
import tempfile
from datetime import datetime
from jinja2 import Template
def print_welcome_message(job_ids: list[str]):
def print_welcome_message(job_ids: list[str], log_dir_name: str):
"""Print a clean welcome message with job information."""
job_id = f"<{', '.join(job_ids)}>"
_ = f"{', '.join(job_ids)}"
print(
f"""
🚀 Welcome! We hope you enjoy your time on our GB200 NVL72.
Your logs for this submitted job will be available in logs/{job_id}
Your logs for this submitted job will be available in logs/{log_dir_name}
You can access them by running:
cd logs/{job_id}
cd logs/{log_dir_name}
You can view all of the prefill/decode worker logs by running:
......@@ -71,7 +74,7 @@ def generate_job_script(template_path, output_path, **kwargs):
with open(output_path, "w") as f:
f.write(rendered_script)
return output_path
return output_path, rendered_script
def submit_job(job_script_path, extra_slurm_args=[]):
......@@ -105,13 +108,36 @@ def submit_job(job_script_path, extra_slurm_args=[]):
raise
def _get_available_gpu_types() -> list[str]:
"""Discover available GPU types by scanning scripts directory structure.
Looks for scripts in: scripts/{gpu_type}/{agg,disagg}/*.sh
"""
script_dir = pathlib.Path(__file__).parent / "scripts"
gpu_types = set()
# Scan for GPU type directories (directories that contain agg/ or disagg/)
for gpu_dir in script_dir.iterdir():
if not gpu_dir.is_dir():
continue
# Check if this directory has agg/ or disagg/ subdirectories
has_agg = (gpu_dir / "agg").is_dir()
has_disagg = (gpu_dir / "disagg").is_dir()
if has_agg or has_disagg:
gpu_types.add(gpu_dir.name)
return sorted(list(gpu_types))
def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate and submit SLURM job scripts"
)
parser.add_argument(
"--template", required=True, help="Path to Jinja2 template file"
)
# Get available GPU types dynamically
available_gpu_types = _get_available_gpu_types()
# Template parameters
parser.add_argument("--job-name", default="dynamo_setup", help="SLURM job name")
......@@ -123,16 +149,22 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
"--time-limit", default="04:00:00", help="Time limit (HH:MM:SS)"
)
parser.add_argument(
"--prefill-nodes", type=int, default=2, help="Number of prefill nodes"
"--prefill-nodes", type=int, default=None, help="Number of prefill nodes"
)
parser.add_argument(
"--decode-nodes", type=int, default=2, help="Number of decode nodes"
"--decode-nodes", type=int, default=None, help="Number of decode nodes"
)
parser.add_argument(
"--prefill-workers", type=int, default=1, help="Number of prefill workers"
"--prefill-workers", type=int, default=None, help="Number of prefill workers"
)
parser.add_argument(
"--decode-workers", type=int, default=1, help="Number of decode workers"
"--decode-workers", type=int, default=None, help="Number of decode workers"
)
parser.add_argument(
"--agg-nodes", type=int, default=None, help="Number of aggregated worker nodes"
)
parser.add_argument(
"--agg-workers", type=int, default=None, help="Number of aggregated workers"
)
parser.add_argument(
"--gpus-per-node", type=int, default=8, help="Number of GPUs per node"
......@@ -142,9 +174,15 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
)
parser.add_argument(
"--gpu-type",
choices=["gb200-fp8", "gb200-fp4"],
default="gb200-fp8",
help="GPU type to use. You can choose between gb200-fp8 and gb200-fp4.",
choices=available_gpu_types,
default=available_gpu_types[0] if available_gpu_types else None,
help=f"GPU type to use. Available types: {', '.join(available_gpu_types)}",
)
parser.add_argument(
"--script-variant",
type=str,
default="default",
help="Script variant to use (e.g., 'default', 'optim', 'decode-optim'). Defaults to 'default.sh'",
)
parser.add_argument(
......@@ -191,31 +229,137 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
help="Tries to launch the job multiple times to catch transient errors",
)
parser.add_argument(
"--disable-config-dump",
action="store_false",
dest="enable_config_dump",
default=True,
help="Disable dumping config to file on each node (default: config dump is enabled)",
)
parser.add_argument(
"--run-in-ci",
action="store_true",
help="Run in CI mode - use binaries from /configs/ for nats/etcd and install dynamo wheel",
)
return parser.parse_args(args)
def main(input_args: list[str] | None = None):
setup_logging()
args = _parse_command_line_args(input_args)
def _validate_args(args: argparse.Namespace) -> None:
"""Validate arguments and ensure aggregated and disaggregated args are mutually exclusive."""
has_disagg_args = any(
[
args.prefill_nodes is not None,
args.decode_nodes is not None,
args.prefill_workers is not None,
args.decode_workers is not None,
]
)
has_agg_args = any(
[
args.agg_nodes is not None,
args.agg_workers is not None,
]
)
# Validation
if has_disagg_args and has_agg_args:
raise ValueError(
"Cannot specify both aggregated (--agg-nodes, --agg-workers) and "
"disaggregated (--prefill-nodes, --decode-nodes, --prefill-workers, --decode-workers) arguments"
)
if has_disagg_args:
# Validate disaggregated args
if args.prefill_nodes is None or args.decode_nodes is None:
raise ValueError(
"Disaggregated mode requires both --prefill-nodes and --decode-nodes"
)
if args.prefill_workers is None or args.decode_workers is None:
raise ValueError(
"Disaggregated mode requires both --prefill-workers and --decode-workers"
)
if args.prefill_nodes % args.prefill_workers != 0:
raise ValueError(
f"Prefill nodes ({args.prefill_nodes}) must be divisible by prefill workers ({args.prefill_workers})"
)
if args.decode_nodes % args.decode_workers != 0:
raise ValueError(
f"Decode nodes ({args.decode_nodes}) must be divisible by decode workers ({args.decode_workers})"
)
# Validate GPU script exists for disaggregated mode
script_dir = pathlib.Path(__file__).parent / "scripts"
disagg_dir = script_dir / args.gpu_type / "disagg"
# Use script variant (defaults to "default")
script_name = f"{args.script_variant}.sh"
gpu_script = disagg_dir / script_name
if not gpu_script.exists():
raise ValueError(
f"Disaggregated GPU script not found: {gpu_script}. Available GPU types: {', '.join(_get_available_gpu_types())}"
)
if has_agg_args:
# Validate aggregated args
if args.agg_nodes is None or args.agg_workers is None:
raise ValueError(
"Aggregated mode requires both --agg-nodes and --agg-workers"
)
if args.agg_nodes % args.agg_workers != 0:
raise ValueError(
f"Aggregated nodes ({args.agg_nodes}) must be divisible by aggregated workers ({args.agg_workers})"
)
# Validate aggregated GPU script exists
script_dir = pathlib.Path(__file__).parent / "scripts"
# Remove any -prefill or -decode suffix if present
base_gpu_type = args.gpu_type.replace("-prefill", "").replace("-decode", "")
agg_dir = script_dir / base_gpu_type / "agg"
# Use script variant (defaults to "default")
script_name = f"{args.script_variant}.sh"
agg_gpu_script = agg_dir / script_name
if not agg_gpu_script.exists():
raise ValueError(
f"Aggregated GPU script not found: {agg_gpu_script}. Available GPU types: {', '.join(_get_available_gpu_types())}"
)
if not has_disagg_args and not has_agg_args:
raise ValueError(
"Must specify either aggregated (--agg-nodes, --agg-workers) or "
"disaggregated (--prefill-nodes, --decode-nodes, --prefill-workers, --decode-workers) arguments"
)
def main(input_args: list[str] | None = None):
setup_logging()
args = _parse_command_line_args(input_args)
# Validate arguments
_validate_args(args)
# Determine mode and set defaults
is_aggregated = args.agg_nodes is not None
if is_aggregated:
agg_nodes = args.agg_nodes
agg_workers = args.agg_workers
prefill_nodes = 0
decode_nodes = 0
prefill_workers = 0
decode_workers = 0
total_nodes = agg_nodes
else:
prefill_nodes = args.prefill_nodes
decode_nodes = args.decode_nodes
prefill_workers = args.prefill_workers
decode_workers = args.decode_workers
agg_nodes = 0
agg_workers = 0
total_nodes = prefill_nodes + decode_nodes
# Validation for multiple frontends
if args.enable_multiple_frontends:
if args.num_additional_frontends < 0:
raise ValueError("Number of additional frontends cannot be negative")
total_nodes = args.prefill_nodes + args.decode_nodes
# parse profiler configs
profiler_config = {}
if args.profiler:
......@@ -248,21 +392,34 @@ def main(input_args: list[str] | None = None):
else:
assert False, profiler_config["type"]
# Generate timestamp for log directory naming
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Select template based on mode
if is_aggregated:
template_path = "job_script_template_agg.j2"
else:
template_path = "job_script_template_disagg.j2"
template_vars = {
"job_name": args.job_name,
"total_nodes": total_nodes,
"account": args.account,
"time_limit": args.time_limit,
"prefill_nodes": args.prefill_nodes,
"decode_nodes": args.decode_nodes,
"prefill_workers": args.prefill_workers,
"decode_workers": args.decode_workers,
"prefill_nodes": prefill_nodes,
"decode_nodes": decode_nodes,
"prefill_workers": prefill_workers,
"decode_workers": decode_workers,
"agg_nodes": agg_nodes,
"agg_workers": agg_workers,
"is_aggregated": is_aggregated,
"model_dir": args.model_dir,
"config_dir": args.config_dir,
"container_image": args.container_image,
"gpus_per_node": args.gpus_per_node,
"network_interface": args.network_interface,
"gpu_type": args.gpu_type,
"script_variant": args.script_variant,
"partition": args.partition,
"enable_multiple_frontends": args.enable_multiple_frontends,
"num_additional_frontends": args.num_additional_frontends,
......@@ -270,27 +427,80 @@ def main(input_args: list[str] | None = None):
"do_profile": profiler_config["type"] != "manual",
"profiler_type": profiler_config["type"],
"profiler_arg": parsable_config,
"timestamp": timestamp,
"enable_config_dump": args.enable_config_dump,
"run_in_ci": args.run_in_ci,
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".sh") as temp_file:
generate_job_script(args.template, temp_file.name, **template_vars)
# Create temporary file for sbatch script
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False)
temp_path = temp_file.name
temp_file.close()
try:
_, rendered_script = generate_job_script(
template_path, temp_path, **template_vars
)
submitted_job_ids = []
job_id = submit_job(temp_file.name, args.extra_slurm_args)
job_id = submit_job(temp_path, args.extra_slurm_args)
submitted_job_ids.append(job_id)
# Create log directory with new naming format IMMEDIATELY after submission
# SLURM will write log.out/log.err to this directory when job starts
if is_aggregated:
log_dir_name = f"{job_id}_{agg_workers}A_{timestamp}"
else:
log_dir_name = f"{job_id}_{prefill_workers}P_{decode_workers}D_{timestamp}"
log_dir_path = os.path.join("logs", log_dir_name)
os.makedirs(log_dir_path, exist_ok=True)
# Save rendered sbatch script
sbatch_script_path = os.path.join(log_dir_path, "sbatch_script.sh")
with open(sbatch_script_path, "w") as f:
f.write(rendered_script)
logging.info(f"Saved rendered sbatch script to {sbatch_script_path}")
# retries logic
if args.retries > 0:
extra_slurm_args_without_dependencies = [
x for x in args.extra_slurm_args if "dependency" not in x
]
for _ in range(args.retries):
dependencies = ",".join([f"afternotok:{job}" for job in submitted_job_ids])
dependencies = ",".join(
[f"afternotok:{job}" for job in submitted_job_ids]
)
slurm_args = extra_slurm_args_without_dependencies + [
f"dependency={dependencies}"
]
job_id = submit_job(temp_file.name, slurm_args)
job_id = submit_job(temp_path, slurm_args)
submitted_job_ids.append(job_id)
print_welcome_message(submitted_job_ids)
# Save script for retry job as well
if is_aggregated:
retry_log_dir_name = f"{job_id}_{agg_workers}A_{timestamp}"
else:
retry_log_dir_name = (
f"{job_id}_{prefill_workers}P_{decode_workers}D_{timestamp}"
)
retry_log_dir_path = os.path.join("logs", retry_log_dir_name)
os.makedirs(retry_log_dir_path, exist_ok=True)
retry_sbatch_script_path = os.path.join(
retry_log_dir_path, "sbatch_script.sh"
)
with open(retry_sbatch_script_path, "w") as f:
f.write(rendered_script)
logging.info(
f"Saved rendered sbatch script to {retry_sbatch_script_path}"
)
print_welcome_message(submitted_job_ids, log_dir_name)
finally:
# Clean up temporary file
try:
os.unlink(temp_path)
except OSError:
pass
if __name__ == "__main__":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment