Unverified Commit 4b3a2c1a authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: sglang multiple frontend slurm deployment scripts (#2623)


Signed-off-by: default avatarishandhanani <ishandhanani@gmail.com>
Signed-off-by: default avatarElnifio <elnifio0519@gmail.com>
Co-authored-by: default avatarYunzhou Liu <46603306+Elnifio@users.noreply.github.com>
parent cfb7aed7
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#SBATCH --partition={{ partition }} #SBATCH --partition={{ partition }}
# Constants # Constants
set -x
PREFILL_NODES={{ prefill_nodes }} PREFILL_NODES={{ prefill_nodes }}
DECODE_NODES={{ decode_nodes }} DECODE_NODES={{ decode_nodes }}
PREFILL_WORKERS={{ prefill_workers }} PREFILL_WORKERS={{ prefill_workers }}
...@@ -18,7 +19,7 @@ TOTAL_NODES=$((PREFILL_NODES + DECODE_NODES)) ...@@ -18,7 +19,7 @@ TOTAL_NODES=$((PREFILL_NODES + DECODE_NODES))
GPUS_PER_NODE={{ gpus_per_node }} GPUS_PER_NODE={{ gpus_per_node }}
PREFILL_NODES_PER_WORKER=$((PREFILL_NODES / PREFILL_WORKERS)) PREFILL_NODES_PER_WORKER=$((PREFILL_NODES / PREFILL_WORKERS))
DECODE_NODES_PER_WORKER=$((DECODE_NODES / DECODE_WORKERS)) DECODE_NODES_PER_WORKER=$((DECODE_NODES / DECODE_WORKERS))
LOG_DIR="${SLURM_SUBMIT_DIR}/logs/${SLURM_JOB_ID}/" LOG_DIR="${SLURM_SUBMIT_DIR}/logs/${SLURM_JOB_ID}"
SCRIPT_DIR="${SLURM_SUBMIT_DIR}/scripts" SCRIPT_DIR="${SLURM_SUBMIT_DIR}/scripts"
OUTPUT_DIR="${SLURM_SUBMIT_DIR}/outputs" OUTPUT_DIR="${SLURM_SUBMIT_DIR}/outputs"
MODEL_DIR="{{ model_dir }}" MODEL_DIR="{{ model_dir }}"
...@@ -26,6 +27,7 @@ CONFIG_DIR="{{ config_dir }}" ...@@ -26,6 +27,7 @@ CONFIG_DIR="{{ config_dir }}"
CONTAINER_IMAGE="{{ container_image }}" CONTAINER_IMAGE="{{ container_image }}"
NETWORK_INTERFACE="{{ network_interface }}" NETWORK_INTERFACE="{{ network_interface }}"
GPU_TYPE="{{ gpu_type | default('h100') }}" GPU_TYPE="{{ gpu_type | default('h100') }}"
set +x
{% raw %} {% raw %}
...@@ -42,24 +44,114 @@ for i in "${!nodes[@]}"; do ...@@ -42,24 +44,114 @@ for i in "${!nodes[@]}"; do
echo "Node $i: ${nodes[$i]}" echo "Node $i: ${nodes[$i]}"
done done
# Get IP address of the master node (first prefill node) for NATS/ETCD {% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# Multiple frontend architecture
# Node 0: nginx only
# Node 1: NATS/ETCD + first frontend + prefill worker
# Node 2+: prefill/decode workers + optional additional frontends
NGINX_NODE=${nodes[0]}
MASTER_NODE=${nodes[1]}
MASTER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$MASTER_NODE ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
if [ -z "$MASTER_IP" ]; then
echo "Error: Could not retrieve IP address for master host $MASTER_NODE on interface $NETWORK_INTERFACE"
exit 1
fi
echo "Master IP address (node 1): $MASTER_IP"
echo "Nginx node (node 0): $NGINX_NODE"
# Generate frontend IP list for nginx config
frontend_hosts=()
frontend_ips=()
# Node 1 always has a frontend (with NATS/ETCD)
frontend_hosts+=("$MASTER_NODE")
frontend_ips+=("$MASTER_IP")
# Add additional frontends based on num_additional_frontends
{% endraw %}ADDITIONAL_FRONTENDS={{ num_additional_frontends }}{% raw %}
if [ "$ADDITIONAL_FRONTENDS" -gt 0 ]; then
# Calculate which nodes get additional frontends
# We have TOTAL_NODES prefill/decode nodes, distribute additional frontends across them
nodes_per_frontend=$(( (TOTAL_NODES - 1 + ADDITIONAL_FRONTENDS - 1) / ADDITIONAL_FRONTENDS )) # ceil division
frontend_node_idx=2 # Start from node 2 (node 1 already has frontend)
for i in $(seq 1 $ADDITIONAL_FRONTENDS); do
if [ $frontend_node_idx -lt $TOTAL_NODES ]; then
node_name=${nodes[$frontend_node_idx]}
node_ip=$(srun --nodes=1 --ntasks=1 --nodelist=$node_name ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
frontend_hosts+=("$node_name")
frontend_ips+=("$node_ip")
echo "Additional frontend $i on node $frontend_node_idx: $node_name ($node_ip)"
frontend_node_idx=$((frontend_node_idx + nodes_per_frontend))
fi
done
fi
echo "Frontend hosts: ${frontend_hosts[@]}"
echo "Frontend IPs: ${frontend_ips[@]}"
# Generate nginx configuration
# Build a Python list literal of frontend hosts from the bash array
FRONTEND_LIST=$(printf "'%s'," "${frontend_ips[@]}")
FRONTEND_LIST="[${FRONTEND_LIST%,}]"
export FRONTEND_LIST SCRIPT_DIR LOG_DIR
python3 - <<'PY'
import os
from jinja2 import Template
template_path = os.path.join(os.environ['SCRIPT_DIR'], 'nginx.conf.j2')
output_path = os.path.join(os.environ['LOG_DIR'], 'nginx.conf')
with open(template_path, 'r') as f:
tmpl = Template(f.read())
frontend_hosts = eval(os.environ['FRONTEND_LIST'])
config = tmpl.render(frontend_hosts=frontend_hosts)
with open(output_path, 'w') as f:
f.write(config)
PY
{% endraw %}
{% else %}
{% raw %}
# Traditional architecture - first prefill node handles everything
MASTER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=${nodes[0]} ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1) MASTER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=${nodes[0]} ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
if [ -z "$MASTER_IP" ]; then if [ -z "$MASTER_IP" ]; then
echo "Error: Could not retrieve IP address for master host ${nodes[0]} on interface $NETWORK_INTERFACE" echo "Error: Could not retrieve IP address for master host ${nodes[0]} on interface $NETWORK_INTERFACE"
exit 1 exit 1
fi fi
echo "Master IP address: $MASTER_IP" echo "Master IP address: $MASTER_IP"
{% endraw %}
{% endif %}
{% raw %}
# Compute leader nodes for each worker # Compute leader nodes for each worker
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# With multiple frontends: keep offset 0; nginx coexists on node 0
WORKER_NODE_OFFSET=0
{% endraw %}
{% else %}
{% raw %}
# Traditional: workers start from node 0
WORKER_NODE_OFFSET=0
{% endraw %}
{% endif %}
{% raw %}
prefill_leaders=() prefill_leaders=()
for i in $(seq 0 $((PREFILL_WORKERS - 1))); do for i in $(seq 0 $((PREFILL_WORKERS - 1))); do
leader_idx=$((i * PREFILL_NODES_PER_WORKER)) leader_idx=$((WORKER_NODE_OFFSET + i * PREFILL_NODES_PER_WORKER))
prefill_leaders[$i]=$leader_idx prefill_leaders[$i]=$leader_idx
done done
decode_leaders=() decode_leaders=()
for i in $(seq 0 $((DECODE_WORKERS - 1))); do for i in $(seq 0 $((DECODE_WORKERS - 1))); do
leader_idx=$((PREFILL_NODES + i * DECODE_NODES_PER_WORKER)) leader_idx=$((WORKER_NODE_OFFSET + PREFILL_NODES + i * DECODE_NODES_PER_WORKER))
decode_leaders[$i]=$leader_idx decode_leaders[$i]=$leader_idx
done done
...@@ -76,6 +168,57 @@ ENROOT_ARGS="\ ...@@ -76,6 +168,57 @@ ENROOT_ARGS="\
# Build common worker arguments # Build common worker arguments
WORKER_ARGS="--gpu_type ${GPU_TYPE} --gpus_per_node ${GPUS_PER_NODE} --master_ip ${MASTER_IP}" WORKER_ARGS="--gpu_type ${GPU_TYPE} --gpus_per_node ${GPUS_PER_NODE} --master_ip ${MASTER_IP}"
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# Add multiple frontends flag for worker setup
WORKER_ARGS="$WORKER_ARGS --multiple-frontends-enabled"
{% endraw %}
{% endif %}
{% if use_init_location %}
{% raw %}
# Add multiple frontends flag for worker setup
WORKER_ARGS="$WORKER_ARGS --use_init_locations"
{% endraw %}
{% endif %}
{% raw %}
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
# Launch nginx on node 0
echo "Launching nginx on ${NGINX_NODE}"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$NGINX_NODE --output=${LOG_DIR}/${NGINX_NODE}_nginx.out --error=${LOG_DIR}/${NGINX_NODE}_nginx.err python /scripts/worker_setup.py --worker_type nginx --nginx_config /logs/nginx.conf ${WORKER_ARGS}"
echo "$cmd"
$cmd &
# Launch frontend on master node (node 1) - this will also start NATS/ETCD
echo "Launching frontend + NATS/ETCD on master node ${MASTER_NODE}"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$MASTER_NODE --output=${LOG_DIR}/${MASTER_NODE}_frontend_0.out --error=${LOG_DIR}/${MASTER_NODE}_frontend.err python /scripts/worker_setup.py --worker_type frontend --worker_idx 0 ${WORKER_ARGS}"
echo "$cmd"
$cmd &
# Launch additional frontends on designated nodes
if [ "$ADDITIONAL_FRONTENDS" -gt 0 ]; then
frontend_idx=1 # Start from 1 since node 1 is frontend 0
nodes_per_frontend=$(( (TOTAL_NODES - 2 + ADDITIONAL_FRONTENDS - 1) / ADDITIONAL_FRONTENDS ))
frontend_node_idx=2
for i in $(seq 1 $ADDITIONAL_FRONTENDS); do
if [ $frontend_node_idx -lt $TOTAL_NODES ]; then
node=${nodes[$frontend_node_idx]}
echo "Launching additional frontend $frontend_idx on node $frontend_node_idx: $node"
cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_frontend_${frontend_idx}.out --error=${LOG_DIR}/${node}_frontend_${frontend_idx}.err python /scripts/worker_setup.py --worker_type frontend --worker_idx ${frontend_idx} ${WORKER_ARGS}"
echo "$cmd"
$cmd &
frontend_idx=$((frontend_idx + 1))
frontend_node_idx=$((frontend_node_idx + nodes_per_frontend))
fi
done
fi
{% endraw %}
{% endif %}
{% raw %}
# Launch prefill workers # Launch prefill workers
for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do
...@@ -83,7 +226,7 @@ for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do ...@@ -83,7 +226,7 @@ for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do
leader_node=${nodes[$leader_idx]} leader_node=${nodes[$leader_idx]}
# Get leader IP for this worker group # Get leader IP for this worker group
LEADER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$leader_node ip route get $(getent ahosts $leader_node | grep STREAM | head -1 | awk '{print $1}') | awk '{for(i=1;i<=NF;i++) if($i=="src") print $(i+1)}') LEADER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$leader_node ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
echo "Prefill worker $worker_idx leader: $leader_node ($LEADER_IP)" echo "Prefill worker $worker_idx leader: $leader_node ($LEADER_IP)"
# Launch all nodes for this worker # Launch all nodes for this worker
...@@ -94,7 +237,7 @@ for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do ...@@ -94,7 +237,7 @@ for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do
echo "Launching prefill worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node" echo "Launching prefill worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node"
cmd="srun $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_prefill_w${worker_idx}.out --error=${LOG_DIR}/${node}_prefill_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${PREFILL_NODES_PER_WORKER} --worker_type prefill --gpu_utilization_log /logs/${node}_prefill_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}" cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_prefill_w${worker_idx}.out --error=${LOG_DIR}/${node}_prefill_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${PREFILL_NODES_PER_WORKER} --worker_type prefill --gpu_utilization_log /logs/${node}_prefill_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}"
echo "$cmd" echo "$cmd"
$cmd & $cmd &
done done
...@@ -106,7 +249,7 @@ for worker_idx in $(seq 0 $((DECODE_WORKERS - 1))); do ...@@ -106,7 +249,7 @@ for worker_idx in $(seq 0 $((DECODE_WORKERS - 1))); do
leader_node=${nodes[$leader_idx]} leader_node=${nodes[$leader_idx]}
# Get leader IP for this worker group # Get leader IP for this worker group
LEADER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$leader_node ip route get $(getent ahosts $leader_node | grep STREAM | head -1 | awk '{print $1}') | awk '{for(i=1;i<=NF;i++) if($i=="src") print $(i+1)}') LEADER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$leader_node ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
echo "Decode worker $worker_idx leader: $leader_node ($LEADER_IP)" echo "Decode worker $worker_idx leader: $leader_node ($LEADER_IP)"
# Launch all nodes for this worker # Launch all nodes for this worker
...@@ -117,22 +260,50 @@ for worker_idx in $(seq 0 $((DECODE_WORKERS - 1))); do ...@@ -117,22 +260,50 @@ for worker_idx in $(seq 0 $((DECODE_WORKERS - 1))); do
echo "Launching decode worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node" echo "Launching decode worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node"
cmd="srun $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_decode_w${worker_idx}.out --error=${LOG_DIR}/${node}_decode_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${DECODE_NODES_PER_WORKER} --worker_type decode --gpu_utilization_log /logs/${node}_decode_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}" cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_decode_w${worker_idx}.out --error=${LOG_DIR}/${node}_decode_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${DECODE_NODES_PER_WORKER} --worker_type decode --gpu_utilization_log /logs/${node}_decode_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}"
echo "$cmd" echo "$cmd"
$cmd & $cmd &
done done
done done
echo "" echo ""
{% endraw %}
{% if enable_multiple_frontends %}
{% raw %}
echo "Frontend available at: http://${NGINX_NODE}:8000"
echo "To connect to the nginx node:"
echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${NGINX_NODE} --overlap --pty bash"
echo "To connect to the master node (NATS/ETCD):"
echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${MASTER_NODE} --overlap --pty bash"
{% endraw %}
{% else %}
{% raw %}
echo "To connect to the host prefill node:" echo "To connect to the host prefill node:"
echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --overlap --pty bash" echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --overlap --pty bash"
{% endraw %}
{% endif %}
{% raw %}
echo "" echo ""
echo "Make sure to cancel the job at the end:" echo "Make sure to cancel the job at the end:"
echo "scancel $SLURM_JOB_ID" echo "scancel $SLURM_JOB_ID"
# Wait for all tasks to complete # Instead of waiting for all tasks to complete, wait for profile.sh to complete and then exit.
wait
echo "Script finished at $(date)"
{% endraw %} {% endraw %}
PROFILER_TYPE={{ profiler_type }}
PROFILER_ARGS="{{ profiler_arg }}"
{% if do_profile %}
{% raw %}
srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --output=${LOG_DIR}/profile.out --error=${LOG_DIR}/profile.err --overlap bash /scripts/${PROFILER_TYPE}/bench.sh $PREFILL_WORKERS $DECODE_WORKERS ${PROFILER_ARGS} &
{% endraw %}
{% endif %}
{% raw %}
wait -n
first_exit_code=$?
echo "Script finished at $(date) with exit code ${first_exit_code}"
exit $first_exit_code
{% endraw %}
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
wait_for_model() {
local model_host=$1
local model_port=$2
local poll=${3:-1}
local timeout=${4:-600}
local report_every=${5:-60}
local health_addr="http://${model_host}:${model_port}/health"
echo "Polling ${health_addr} every ${poll} seconds"
local start_ts=$(date +%s)
local report_ts=$(date +%s)
while :; do
curl_result=$(curl ${health_addr} 2>/dev/null)
health=$(grep '"status":"healthy"' <<< $curl_result)
if [[ -n $health ]]; then
echo "Model is alive. Health response: ${curl_result}; "
return 0;
fi
time_now=$(date +%s)
if [[ $((time_now - start_ts)) -ge $timeout ]]; then
echo "Model did not get healthy in ${timeout} seconds"
exit 2;
fi
if [[ $((time_now - report_ts)) -ge $report_every ]]; then
echo "Waiting for model to come alive. Current result: ${curl_result}"
report_ts=$time_now
fi
sleep $poll
done
}
warmup_model() {
service_host=$1
service_port=$2
served_model_name=$3
model_path=$4
config=$5
IFS='x' read -r -a config_list <<< "$config"
isl=${config_list[0]}
osl=${config_list[1]}
num_prompts=${config_list[2]}
concurrency=${config_list[3]}
request_rate=${config_list[4]}
command=(
python3 -m sglang.bench_serving
--base-url "http://${service_host}:${service_port}"
--model ${served_model_name} --tokenizer ${model_path}
--backend sglang-oai
--dataset-name random --random-input ${isl} --random-output ${osl}
--random-range-ratio 1
--num-prompts ${num_prompts} --request-rate ${request_rate} --max-concurrency ${concurrency}
)
echo "Config ${config}. Running command ${command[@]}"
${command[@]}
}
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
prefill_workers=$1
decode_workers=$2
chosen_isl=$3
chosen_osl=$4
chosen_concurrencies=$5
echo "Profiling for model with PrefillDP=${prefill_workers}, DecodeDP=${decode_workers}"
head_node="localhost"
head_port="8000"
SERVED_MODEL_NAME="deepseek-ai/DeepSeek-R1"
MODEL_PATH=/model/
random_seed=$(python3 -c "import random; print(random.randint(0, 65535))")
random_seed=$RANDOM
echo "Chosen random seed ${random_seed}"
source /scripts/benchmark_utils.sh
wait_for_model $head_node $head_port 5 2400 60
set -e
warmup_model $head_node $head_port $SERVED_MODEL_NAME $MODEL_PATH "${chosen_isl}x${chosen_osl}x10000x10000x250"
set +e
genai_perf_warmup_workers=$(python3 -c "print(max(${DP:-0}, ${prefill_workers:-0}, ${decode_workers:-0}))")
IFS='x' read -r -a concurrency_list <<< "$chosen_concurrencies"
profile_folder="/logs/gap_isl_${chosen_isl}_osl_${chosen_osl}"
mkdir -p $profile_folder
tmp_work_dir=$(mktemp -d -t genai-perf-XXXXXXXX)
for concurrency in ${concurrency_list[@]}; do
export_folder="${tmp_work_dir}/concurrency_${concurrency}"
mkdir -p $export_folder
export_model_name=${SERVED_MODEL_NAME//\//_}
export_file="${export_model_name}_generation_${concurrency}.json"
echo "Run benchmark for concurrency $concurrency; ISL $chosen_isl; OSL $chosen_osl"
command=(
genai-perf profile
-m ${SERVED_MODEL_NAME}
--tokenizer ${MODEL_PATH}
--endpoint-type chat
--endpoint /v1/chat/completions
--url "${head_node}:${head_port}"
--streaming
--concurrency ${concurrency}
--warmup-request-count $(( 2*genai_perf_warmup_workers ))
--request-count $(( 5*concurrency ))
--synthetic-input-tokens-mean ${chosen_isl} --synthetic-input-tokens-stddev 0
--output-tokens-mean ${chosen_osl} --output-tokens-stddev 0
--extra-inputs "max_tokens:${chosen_osl}" --extra-inputs "min_tokens:${chosen_osl}"
--artifact-dir ${export_folder}
--profile-export-file ${export_file}
--random-seed ${random_seed}
--tokenizer-trust-remote-code
--num-dataset-entries 3000
--
--max-threads ${concurrency}
)
set -e
${command[@]}
set +e
cp $export_folder/*/*_genai_perf.json $profile_folder
done
...@@ -57,11 +57,20 @@ if [ -z "$TOTAL_NODES" ]; then ...@@ -57,11 +57,20 @@ if [ -z "$TOTAL_NODES" ]; then
exit 1 exit 1
fi fi
if [ -z "$USE_INIT_LOCATIONS" ]; then
echo "Error: USE_INIT_LOCATIONS environment variable is not set"
exit 1
fi
# Construct command based on mode # Construct command based on mode
if [ "$mode" = "prefill" ]; then if [ "$mode" = "prefill" ]; then
# GB200 dynamo prefill command # GB200 dynamo prefill command
set -x
# SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=2048 \
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix="--init-expert-location /configs/prefill_dsr1-0528_in1000out1000_num40000.json"; fi
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \ DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=2048 \
MC_TE_METRIC=true \ MC_TE_METRIC=true \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \ SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \ SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
...@@ -87,12 +96,12 @@ if [ "$mode" = "prefill" ]; then ...@@ -87,12 +96,12 @@ if [ "$mode" = "prefill" ]; then
--dp-size "$TOTAL_GPUS" \ --dp-size "$TOTAL_GPUS" \
--enable-dp-attention \ --enable-dp-attention \
--host 0.0.0.0 \ --host 0.0.0.0 \
--decode-log-interval 1 \ --decode-log-interval 1000 \
--max-running-requests 12288 \ --max-running-requests 12288 \
--context-length 9600 \ --context-length 9600 \
--disable-radix-cache \ --disable-radix-cache \
--enable-deepep-moe \ --enable-deepep-moe \
--deepep-mode low_latency \ --deepep-mode normal \
--ep-dispatch-algorithm dynamic \ --ep-dispatch-algorithm dynamic \
--moe-dense-tp-size 1 \ --moe-dense-tp-size 1 \
--enable-dp-lm-head \ --enable-dp-lm-head \
...@@ -101,15 +110,18 @@ if [ "$mode" = "prefill" ]; then ...@@ -101,15 +110,18 @@ if [ "$mode" = "prefill" ]; then
--eplb-algorithm deepseek \ --eplb-algorithm deepseek \
--attention-backend cutlass_mla \ --attention-backend cutlass_mla \
--watchdog-timeout 1000000 \ --watchdog-timeout 1000000 \
--init-expert-location /configs/prefill_dsr1-0528_in1000out1000_num40000.json \
--disable-cuda-graph \ --disable-cuda-graph \
--chunked-prefill-size 16384 \ --chunked-prefill-size 131072 \
--max-total-tokens 65536 \ --max-total-tokens 524288 \
--deepep-config /configs/deepep_config.json \ --deepep-config /configs/deepep_config.json \
--stream-interval 50 \ --stream-interval 50 \
--log-level debug --log-level debug ${command_suffix}
elif [ "$mode" = "decode" ]; then elif [ "$mode" = "decode" ]; then
set -x
command_suffix=""
if [[ "${USE_INIT_LOCATIONS,,}" == "true" ]]; then command_suffix="--init-expert-location /configs/decode_dsr1-0528_loadgen_in1024out1024_num2000_2p12d.json"; fi
# GB200 dynamo decode command # GB200 dynamo decode command
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \ DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=512 \ SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=512 \
...@@ -139,7 +151,7 @@ elif [ "$mode" = "decode" ]; then ...@@ -139,7 +151,7 @@ elif [ "$mode" = "decode" ]; then
--dp-size "$TOTAL_GPUS" \ --dp-size "$TOTAL_GPUS" \
--enable-dp-attention \ --enable-dp-attention \
--host 0.0.0.0 \ --host 0.0.0.0 \
--decode-log-interval 1 \ --decode-log-interval 1000 \
--max-running-requests 36864 \ --max-running-requests 36864 \
--context-length 9600 \ --context-length 9600 \
--disable-radix-cache \ --disable-radix-cache \
...@@ -155,8 +167,7 @@ elif [ "$mode" = "decode" ]; then ...@@ -155,8 +167,7 @@ elif [ "$mode" = "decode" ]; then
--eplb-algorithm deepseek \ --eplb-algorithm deepseek \
--attention-backend cutlass_mla \ --attention-backend cutlass_mla \
--watchdog-timeout 1000000 \ --watchdog-timeout 1000000 \
--init-expert-location /configs/decode_dsr1-0528_loadgen_in1024out1024_num2000_2p12d.json \
--chunked-prefill-size 36864 \ --chunked-prefill-size 36864 \
--stream-interval 50 \ --stream-interval 50 \
--mem-fraction-static 0.82 --mem-fraction-static 0.82 ${command_suffix}
fi fi
# Defines the group of servers to which NGINX will proxy requests.
# NGINX will cycle through these servers in a round-robin fashion by default.
worker_processes auto;
http {
access_log off;
upstream backend_servers {
{% for frontend_host in frontend_hosts %}
server {{ frontend_host }}:8000;
{% endfor %}
}
# The main server block that listens for incoming traffic.
server {
listen 8000; # Listen on port 8000 for incoming HTTP requests.
location / {
# Pass the request to the upstream group defined above.
proxy_pass http://backend_servers;
proxy_buffering off;
proxy_read_timeout 24h;
proxy_send_timeout 24h;
}
}
}
events {
#
# Determines how many clients will be served by each worker process.
# (Max clients = worker_connections * worker_processes)
# Should be equal to `ulimit -n / worker_processes`
#
worker_connections 65535;
#
# Let each process accept multiple connections.
# Accept as many connections as possible, after nginx gets notification
# about a new connection.
# May flood worker_connections, if that option is set too low.
#
multi_accept on;
#
# Preferred connection method for newer linux versions.
# Essential for linux, optmized to serve many clients with each thread.
#
use epoll;
}
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
chosen_isl=$3
chosen_osl=$4
concurrency_list=$5
IFS='x' read -r -a chosen_concurrencies <<< "$concurrency_list"
chosen_req_rate=$6
echo "Config ${chosen_isl}; ${chosen_osl}; ${chosen_concurrencies[@]}; ${chosen_req_rate}"
head_node="localhost"
head_port="8000"
SERVED_MODEL_NAME="deepseek-ai/DeepSeek-R1"
MODEL_PATH=/model/
source /scripts/benchmark_utils.sh
wait_for_model $head_node $head_port 5 2400 60
sleep 300
set -e
warmup_model $head_node $head_port $SERVED_MODEL_NAME $MODEL_PATH "${chosen_isl}x${chosen_osl}x10000x10000x${chosen_req_rate}"
set +e
profile_folder="/logs/sglang_isl_${chosen_isl}_osl_${chosen_osl}"
mkdir -p $profile_folder
for max_concurrency in ${chosen_concurrencies[@]}; do
chosen_n_requests=$((5*max_concurrency))
export_file="${profile_folder}/concurrency_${max_concurrency}_req_rate_${chosen_req_rate}.json"
command=(
python3 -m sglang.bench_serving
--base-url "http://${head_node}:${head_port}"
--model ${SERVED_MODEL_NAME} --tokenizer ${MODEL_PATH}
--backend sglang-oai
--dataset-name random --random-input ${chosen_isl} --random-output ${chosen_osl}
--random-range-ratio 1
--num-prompts ${chosen_n_requests} --request-rate ${chosen_req_rate} --max-concurrency ${max_concurrency}
--output-file $export_file
)
echo "Running command ${command[@]}"
${command[@]}
echo "-----------------------------------------"
done
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
### Benchmark configuration and setup
# Benchmarking script setup - ISL/OSL/concurrencies/request_rate
chosen_isl=1024
chosen_osl=1024
chosen_req_rate=250
chosen_concurrencies=(2 10 20 50 100 200 500 1000 2000 2500 3000 3500 4000 4500 5000 7500 10000 12500 15000 16250 17500 18750 20000)
# Model config setup - frontend URL, model name, and path
head_node="localhost"
head_port="8000"
SERVED_MODEL_NAME="deepseek-ai/DeepSeek-R1"
MODEL_PATH=/model/
# This file contains `wait_for_model` and `warmup_model`
source /scripts/benchmark_utils.sh
### Benchmark runs
# 1. wait for model to come alive - `wait_for_model`
# 2. warms up the model - `warmup_model`
# 3. benchmark model - for concurrency in concurrencies; do <benchmark script>; done
wait_for_model $head_node $head_port 5 2400 60
set -e
warmup_model $head_node $head_port $SERVED_MODEL_NAME $MODEL_PATH "${chosen_isl}x${chosen_osl}x10000x10000x${chosen_req_rate}"
set +e
for max_concurrency in ${chosen_concurrencies[@]}; do
chosen_n_requests=$((5*max_concurrency))
command=(
python3 -m sglang.bench_serving
--base-url "http://${head_node}:${head_port}"
--model ${SERVED_MODEL_NAME} --tokenizer ${MODEL_PATH}
--backend sglang-oai
--dataset-name random --random-input ${chosen_isl} --random-output ${chosen_osl}
--random-range-ratio 1
--num-prompts ${chosen_n_requests} --request-rate ${chosen_req_rate} --max-concurrency ${max_concurrency}
)
echo "Running command ${command[@]}"
${command[@]}
echo "-----------------------------------------"
done
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Example script adapted from https://github.com/kedarpotdar-nv/bench_serving/tree/dynamo-fix.
model_name="deepseek-ai/DeepSeek-R1"
model_path="/model/"
head_node="localhost"
head_port=8000
source /scripts/benchmark_utils.sh
work_dir="/scripts/vllm/"
cd $work_dir
chosen_isl=$3
chosen_osl=$4
concurrency_list=$5
IFS='x' read -r -a chosen_concurrencies <<< "$concurrency_list"
chosen_req_rate=$6
echo "Config ${chosen_isl}; ${chosen_osl}; ${chosen_concurrencies[@]}; ${chosen_req_rate}"
wait_for_model $head_node $head_port 5 2400 60
set -e
warmup_model $head_node $head_port $model_name $model_path "${chosen_isl}x${chosen_osl}x10000x10000x${chosen_req_rate}"
set +e
result_dir="/logs/vllm_isl_${chosen_isl}_osl_${chosen_osl}"
mkdir -p $result_dir
set -e
for concurrency in "${chosen_concurrencies[@]}"
do
num_prompts=$((concurrency * 5))
echo "Running benchmark with concurrency: $concurrency and num-prompts: $num_prompts, writing to file ${result_dir}"
result_filename="isl_${chosen_isl}_osl_${chosen_osl}_concurrency_${concurrency}_req_rate_${chosen_req_rate}.json"
set -x
python3 benchmark_serving.py \
--model ${model_name} --tokenizer ${model_path} \
--host $head_node --port $head_port \
--backend "dynamo" --endpoint /v1/chat/completions \
--disable-tqdm \
--dataset-name random \
--num-prompts "$num_prompts" \
--random-input-len $chosen_isl \
--random-output-len $chosen_osl \
--random-range-ratio 0.8 \
--ignore-eos \
--request-rate ${chosen_req_rate} \
--percentile-metrics ttft,tpot,itl,e2el \
--max-concurrency "$concurrency" \
--save-result --result-dir $result_dir --result-filename $result_filename
set +x
echo "Completed benchmark with concurrency: $concurrency"
echo "-----------------------------------------"
done
set +e
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# pytest: skip-file
import argparse
import os
from typing import Any, Dict, List
def convert_to_pytorch_benchmark_format(
args: argparse.Namespace, metrics: Dict[str, List], extra_info: Dict[str, Any]
) -> List:
"""
Save the benchmark results in the format used by PyTorch OSS benchmark with
on metric per record
https://github.com/pytorch/pytorch/wiki/How-to-integrate-with-PyTorch-OSS-benchmark-database
"""
records = []
if not os.environ.get("SAVE_TO_PYTORCH_BENCHMARK_FORMAT", False):
return records
for name, benchmark_values in metrics.items():
record = {
"benchmark": {
"name": "vLLM benchmark",
"extra_info": {
"args": vars(args),
},
},
"model": {
"name": args.model,
},
"metric": {
"name": name,
"benchmark_values": benchmark_values,
"extra_info": extra_info,
},
}
records.append(record)
return records
...@@ -126,7 +126,7 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac ...@@ -126,7 +126,7 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
parser.add_argument( parser.add_argument(
"--leader_ip", "--leader_ip",
type=str, type=str,
required=True, required=False,
help="IP address of the leader node for this worker group", help="IP address of the leader node for this worker group",
) )
parser.add_argument( parser.add_argument(
...@@ -138,24 +138,24 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac ...@@ -138,24 +138,24 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
parser.add_argument( parser.add_argument(
"--worker_idx", "--worker_idx",
type=int, type=int,
required=True, required=False,
help="Index of the worker group (0-based)", help="Index of the worker group (0-based)",
) )
parser.add_argument( parser.add_argument(
"--local_rank", "--local_rank",
type=int, type=int,
required=True, required=False,
help="Local rank within the worker group (0 for leader)", help="Local rank within the worker group (0 for leader)",
) )
parser.add_argument( parser.add_argument(
"--nodes_per_worker", "--nodes_per_worker",
type=int, type=int,
required=True, required=False,
help="Number of nodes per worker", help="Number of nodes per worker",
) )
parser.add_argument( parser.add_argument(
"--worker_type", "--worker_type",
choices=["decode", "prefill"], choices=["decode", "prefill", "frontend", "nginx"],
required=True, required=True,
help="Type of worker to run", help="Type of worker to run",
) )
...@@ -180,18 +180,40 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac ...@@ -180,18 +180,40 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
help="Type of GPU to use", help="Type of GPU to use",
) )
parser.add_argument(
"--nginx_config",
type=str,
help="Path to nginx configuration file (required for nginx worker type)",
)
parser.add_argument(
"--multiple-frontends-enabled",
action="store_true",
help="Whether multiple frontend architecture is enabled (affects infrastructure setup)",
)
parser.add_argument(
"--use_init_locations",
action="store_true",
help="Whether we add --init-expert-locations to launch commands",
)
return parser.parse_args(args) return parser.parse_args(args)
def _validate_args(args: argparse.Namespace) -> None: def _validate_args(args: argparse.Namespace) -> None:
"""Validate command line arguments""" """Validate command line arguments"""
if args.worker_idx < 0: if args.worker_type in ["prefill", "decode"]:
raise ValueError("Worker index must be non-negative") if args.worker_idx is None or args.worker_idx < 0:
raise ValueError(
"Worker index must be provided and non-negative for prefill/decode"
)
if args.local_rank < 0: if args.worker_type in ["prefill", "decode"]:
if args.local_rank is None or args.local_rank < 0:
raise ValueError("Local rank must be non-negative") raise ValueError("Local rank must be non-negative")
if args.nodes_per_worker < 1: if args.nodes_per_worker is None or args.nodes_per_worker < 1:
raise ValueError("Nodes per worker must be at least 1") raise ValueError("Nodes per worker must be at least 1")
if args.gpus_per_node < 1: if args.gpus_per_node < 1:
...@@ -202,6 +224,10 @@ def _validate_args(args: argparse.Namespace) -> None: ...@@ -202,6 +224,10 @@ def _validate_args(args: argparse.Namespace) -> None:
f"Local rank ({args.local_rank}) must be less than nodes per worker ({args.nodes_per_worker})" f"Local rank ({args.local_rank}) must be less than nodes per worker ({args.nodes_per_worker})"
) )
# Validate nginx-specific arguments
if args.worker_type == "nginx" and not args.nginx_config:
raise ValueError("--nginx_config is required for nginx worker type")
def setup_env_vars_for_gpu_script( def setup_env_vars_for_gpu_script(
host_ip: str, host_ip: str,
...@@ -209,6 +235,7 @@ def setup_env_vars_for_gpu_script( ...@@ -209,6 +235,7 @@ def setup_env_vars_for_gpu_script(
total_gpus: int, total_gpus: int,
total_nodes: int, total_nodes: int,
port: int = DIST_INIT_PORT, port: int = DIST_INIT_PORT,
use_init_locations: bool = True,
): ):
"""Setup environment variables required by GPU scripts (h100.sh, gb200-fp8.sh, gb200-fp4.sh)""" """Setup environment variables required by GPU scripts (h100.sh, gb200-fp8.sh, gb200-fp4.sh)"""
os.environ["HOST_IP"] = host_ip os.environ["HOST_IP"] = host_ip
...@@ -216,12 +243,14 @@ def setup_env_vars_for_gpu_script( ...@@ -216,12 +243,14 @@ def setup_env_vars_for_gpu_script(
os.environ["TOTAL_GPUS"] = str(total_gpus) os.environ["TOTAL_GPUS"] = str(total_gpus)
os.environ["RANK"] = str(local_rank) os.environ["RANK"] = str(local_rank)
os.environ["TOTAL_NODES"] = str(total_nodes) os.environ["TOTAL_NODES"] = str(total_nodes)
os.environ["USE_INIT_LOCATIONS"] = str(use_init_locations)
logging.info(f"Set HOST_IP: {host_ip}") logging.info(f"Set HOST_IP: {host_ip}")
logging.info(f"Set PORT: {port}") logging.info(f"Set PORT: {port}")
logging.info(f"Set TOTAL_GPUS: {total_gpus}") logging.info(f"Set TOTAL_GPUS: {total_gpus}")
logging.info(f"Set RANK: {local_rank}") logging.info(f"Set RANK: {local_rank}")
logging.info(f"Set TOTAL_NODES: {total_nodes}") logging.info(f"Set TOTAL_NODES: {total_nodes}")
logging.info(f"Set USE_INIT_LOCATIONS: {use_init_locations}")
def get_gpu_command(worker_type: str, gpu_type: str) -> str: def get_gpu_command(worker_type: str, gpu_type: str) -> str:
...@@ -255,20 +284,33 @@ def setup_head_prefill_node(prefill_host_ip: str) -> None: ...@@ -255,20 +284,33 @@ def setup_head_prefill_node(prefill_host_ip: str) -> None:
if not etcd_process: if not etcd_process:
raise RuntimeError("Failed to start etcd") raise RuntimeError("Failed to start etcd")
logging.info(f"Starting ingress server on node {prefill_host_ip}")
ingress_process = run_command(
"python3 -m dynamo.frontend --http-port=8000", background=True
)
if not ingress_process:
raise RuntimeError("Failed to start ingress")
logging.info( def setup_nginx_worker(master_ip: str, nginx_config: str) -> int:
f"Starting http server on port 9001 for flush_cache endpoint on node {prefill_host_ip}" """Setup nginx load balancer"""
) logging.info("Setting up nginx load balancer")
cache_flush_server_cmd = "python3 utils/sgl_http_server.py --ns dynamo"
cache_flush_server_process = run_command(cache_flush_server_cmd, background=True) if not nginx_config or not os.path.exists(nginx_config):
if not cache_flush_server_process: raise ValueError(f"Nginx config file not found: {nginx_config}")
raise RuntimeError("Failed to start cache flush server")
nginx_cmd = f"apt-get update && apt-get install -y nginx && nginx -c {nginx_config} && sleep 86400"
return run_command(nginx_cmd)
def setup_frontend_worker(worker_idx: int, master_ip: str) -> int:
"""Setup a frontend worker"""
logging.info(f"Setting up frontend worker {worker_idx}")
# First frontend (worker_idx 0) also sets up NATS/ETCD
if worker_idx == 0:
setup_head_prefill_node(master_ip)
else:
logging.info(f"Setting up additional frontend worker {worker_idx}")
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
raise RuntimeError("Failed to connect to etcd")
# All frontends run the ingress server
frontend_cmd = "python3 -m dynamo.frontend --http-port=8000"
return run_command(frontend_cmd)
def setup_prefill_worker( def setup_prefill_worker(
...@@ -279,24 +321,29 @@ def setup_prefill_worker( ...@@ -279,24 +321,29 @@ def setup_prefill_worker(
nodes_per_worker: int, nodes_per_worker: int,
gpus_per_node: int, gpus_per_node: int,
gpu_type: str, gpu_type: str,
multiple_frontends_enabled: bool = False,
use_init_locations: bool = True,
) -> int: ) -> int:
""" """
Setup the prefill worker. Setup the prefill worker.
""" """
total_gpus = nodes_per_worker * gpus_per_node total_gpus = nodes_per_worker * gpus_per_node
# Only setup infrastructure in traditional mode (not multiple frontends)
# Only the first prefill worker's leader node sets up NATS/ETCD/Frontend if not multiple_frontends_enabled and worker_idx == 0 and local_rank == 0:
if worker_idx == 0 and local_rank == 0:
setup_head_prefill_node(master_ip) setup_head_prefill_node(master_ip)
else: else:
logging.info( logging.info(f"Setting up prefill worker {worker_idx}, local rank {local_rank}")
f"Setting up child prefill worker {worker_idx}, local rank {local_rank}"
)
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
raise RuntimeError("Failed to connect to etcd") raise RuntimeError("Failed to connect to etcd")
# Setup environment variables for GPU script - use leader_ip as dist-init-addr # Setup environment variables for GPU script - use leader_ip as dist-init-addr
setup_env_vars_for_gpu_script(leader_ip, local_rank, total_gpus, nodes_per_worker) setup_env_vars_for_gpu_script(
leader_ip,
local_rank,
total_gpus,
nodes_per_worker,
use_init_locations=use_init_locations,
)
# Use appropriate GPU script instead of generating command directly # Use appropriate GPU script instead of generating command directly
cmd_to_run = get_gpu_command("prefill", gpu_type) cmd_to_run = get_gpu_command("prefill", gpu_type)
...@@ -311,19 +358,25 @@ def setup_decode_worker( ...@@ -311,19 +358,25 @@ def setup_decode_worker(
nodes_per_worker: int, nodes_per_worker: int,
gpus_per_node: int, gpus_per_node: int,
gpu_type: str, gpu_type: str,
use_init_locations: bool = True,
) -> int: ) -> int:
""" """
Setup the decode worker. Setup the decode worker.
""" """
total_gpus = nodes_per_worker * gpus_per_node total_gpus = nodes_per_worker * gpus_per_node
logging.info(f"Setting up decode worker {worker_idx}, local rank {local_rank}") logging.info(f"Setting up decode worker {worker_idx}, local rank {local_rank}")
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
raise RuntimeError("Failed to connect to etcd") raise RuntimeError("Failed to connect to etcd")
# Setup environment variables for GPU script - use leader_ip as dist-init-addr # Setup environment variables for GPU script - use leader_ip as dist-init-addr
setup_env_vars_for_gpu_script(leader_ip, local_rank, total_gpus, nodes_per_worker) setup_env_vars_for_gpu_script(
leader_ip,
local_rank,
total_gpus,
nodes_per_worker,
use_init_locations=use_init_locations,
)
# Use appropriate GPU script instead of generating command directly # Use appropriate GPU script instead of generating command directly
cmd_to_run = get_gpu_command("decode", gpu_type) cmd_to_run = get_gpu_command("decode", gpu_type)
...@@ -357,9 +410,17 @@ def main(input_args: list[str] | None = None): ...@@ -357,9 +410,17 @@ def main(input_args: list[str] | None = None):
logging.info(f"Leader IP: {args.leader_ip}") logging.info(f"Leader IP: {args.leader_ip}")
logging.info(f"Master IP: {args.master_ip}") logging.info(f"Master IP: {args.master_ip}")
logging.info(f"Nodes per worker: {args.nodes_per_worker}") logging.info(f"Nodes per worker: {args.nodes_per_worker}")
logging.info(f"Use init locations?: {args.use_init_locations}")
setup_env(args.master_ip) setup_env(args.master_ip)
if args.worker_type == "prefill":
if args.worker_type == "nginx":
if not args.nginx_config:
raise ValueError("--nginx_config is required for nginx worker type")
setup_nginx_worker(args.master_ip, args.nginx_config)
elif args.worker_type == "frontend":
setup_frontend_worker(args.worker_idx, args.master_ip)
elif args.worker_type == "prefill":
setup_prefill_worker( setup_prefill_worker(
args.worker_idx, args.worker_idx,
args.local_rank, args.local_rank,
...@@ -368,8 +429,10 @@ def main(input_args: list[str] | None = None): ...@@ -368,8 +429,10 @@ def main(input_args: list[str] | None = None):
args.nodes_per_worker, args.nodes_per_worker,
args.gpus_per_node, args.gpus_per_node,
args.gpu_type, args.gpu_type,
args.multiple_frontends_enabled,
args.use_init_locations,
) )
else: elif args.worker_type == "decode":
setup_decode_worker( setup_decode_worker(
args.worker_idx, args.worker_idx,
args.local_rank, args.local_rank,
...@@ -378,6 +441,7 @@ def main(input_args: list[str] | None = None): ...@@ -378,6 +441,7 @@ def main(input_args: list[str] | None = None):
args.nodes_per_worker, args.nodes_per_worker,
args.gpus_per_node, args.gpus_per_node,
args.gpu_type, args.gpu_type,
args.use_init_locations,
) )
logging.info(f"{args.worker_type.capitalize()} worker setup complete") logging.info(f"{args.worker_type.capitalize()} worker setup complete")
......
...@@ -25,6 +25,35 @@ import tempfile ...@@ -25,6 +25,35 @@ import tempfile
from jinja2 import Template from jinja2 import Template
def print_welcome_message(job_ids: list[str]):
"""Print a clean welcome message with job information."""
job_id = f"<{', '.join(job_ids)}>"
print(
f"""
🚀 Welcome! We hope you enjoy your time on our GB200 NVL72.
Your logs for this submitted job will be available in logs/{job_id}
You can access them by running:
cd logs/{job_id}
You can view all of the prefill/decode worker logs by running:
tail -f *_decode_*.err *_prefill_*.err
To kick off the benchmark we suggest opening up a new terminal, SSH-ing
into the login node, and running the srun command that is found at the
bottom of the log.out. You can find it by running:
cat log.out
Enjoy :)
- NVIDIA
"""
)
def setup_logging(level: int = logging.INFO) -> None: def setup_logging(level: int = logging.INFO) -> None:
logging.basicConfig( logging.basicConfig(
level=level, level=level,
...@@ -45,7 +74,7 @@ def generate_job_script(template_path, output_path, **kwargs): ...@@ -45,7 +74,7 @@ def generate_job_script(template_path, output_path, **kwargs):
return output_path return output_path
def submit_job(job_script_path): def submit_job(job_script_path, extra_slurm_args=[]):
""" """
Submit the job script to SLURM and extract the job ID from the output. Submit the job script to SLURM and extract the job ID from the output.
...@@ -53,9 +82,14 @@ def submit_job(job_script_path): ...@@ -53,9 +82,14 @@ def submit_job(job_script_path):
The job ID of the submitted job. The job ID of the submitted job.
""" """
try: try:
result = subprocess.run( command = (
["sbatch", job_script_path], capture_output=True, text=True, check=True ["sbatch"]
+ ["--" + x for x in extra_slurm_args]
+ [
job_script_path,
]
) )
result = subprocess.run(command, capture_output=True, text=True, check=True)
output_lines = result.stdout.strip().split("\n") output_lines = result.stdout.strip().split("\n")
# sbatch typically outputs: "Submitted batch job JOBID" # sbatch typically outputs: "Submitted batch job JOBID"
...@@ -118,6 +152,45 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac ...@@ -118,6 +152,45 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
default="batch", default="batch",
help="SLURM partition to use", help="SLURM partition to use",
) )
parser.add_argument(
"--enable-multiple-frontends",
action="store_true",
help="Enable multiple frontend architecture with nginx load balancer",
)
parser.add_argument(
"--num-additional-frontends",
type=int,
default=0,
help="Number of additional frontend nodes (beyond the first frontend on node 1)",
)
parser.add_argument(
"--use-init-location",
action="store_true",
help="Whether we use '--init-expert-locations' json files",
)
parser.add_argument(
"--profiler",
type=str,
help="Profiler configurations. Example: "
+ '"type=vllm; isl=8192; osl=1024; concurrencies=16x2048x4096x8192; req-rate=inf"',
)
parser.add_argument(
"--extra-slurm-args",
action="append",
default=[],
help="Extra slurm arguments, remove the '--' prefix. Example: --extra-slurm-args dependency=afterok:<x>",
)
parser.add_argument(
"--retries",
type=int,
default=0,
help="Tries to launch the job multiple times to catch transient errors",
)
return parser.parse_args(args) return parser.parse_args(args)
...@@ -136,7 +209,45 @@ def main(input_args: list[str] | None = None): ...@@ -136,7 +209,45 @@ def main(input_args: list[str] | None = None):
f"Decode nodes ({args.decode_nodes}) must be divisible by decode workers ({args.decode_workers})" f"Decode nodes ({args.decode_nodes}) must be divisible by decode workers ({args.decode_workers})"
) )
# Validation for multiple frontends
if args.enable_multiple_frontends:
if args.num_additional_frontends < 0:
raise ValueError("Number of additional frontends cannot be negative")
total_nodes = args.prefill_nodes + args.decode_nodes total_nodes = args.prefill_nodes + args.decode_nodes
# parse profiler configs
profiler_config = {}
if args.profiler:
for key_val_pair in args.profiler.split("; "):
key, val = key_val_pair.split("=")
profiler_config[key] = val
# validate profiler configs
if profiler_config == {} or profiler_config["type"] == "manual":
parsable_config = ""
profiler_config["type"] = "manual"
elif profiler_config["type"] in ["sglang", "vllm", "gap"]:
parsable_config = ""
need_keys = ["isl", "osl", "concurrencies"]
assert all([key in profiler_config for key in need_keys])
assert profiler_config["isl"].isnumeric()
parsable_config = f"{parsable_config} {profiler_config['isl']}"
assert profiler_config["osl"].isnumeric()
parsable_config = f"{parsable_config} {profiler_config['osl']}"
assert all([x.isnumeric() for x in profiler_config["concurrencies"].split("x")])
parsable_config = f"{parsable_config} {profiler_config['concurrencies']}"
if profiler_config["type"] in ["sglang", "vllm"]:
assert "req-rate" in profiler_config
assert (
profiler_config["req-rate"] == "inf"
or profiler_config["req-rate"].isnumeric()
)
parsable_config = f"{parsable_config} {profiler_config['req-rate']}"
else:
assert False, profiler_config["type"]
template_vars = { template_vars = {
"job_name": args.job_name, "job_name": args.job_name,
"total_nodes": total_nodes, "total_nodes": total_nodes,
...@@ -153,12 +264,33 @@ def main(input_args: list[str] | None = None): ...@@ -153,12 +264,33 @@ def main(input_args: list[str] | None = None):
"network_interface": args.network_interface, "network_interface": args.network_interface,
"gpu_type": args.gpu_type, "gpu_type": args.gpu_type,
"partition": args.partition, "partition": args.partition,
"enable_multiple_frontends": args.enable_multiple_frontends,
"num_additional_frontends": args.num_additional_frontends,
"use_init_location": args.use_init_location,
"do_profile": profiler_config["type"] != "manual",
"profiler_type": profiler_config["type"],
"profiler_arg": parsable_config,
} }
with tempfile.NamedTemporaryFile(mode="w", suffix=".sh") as temp_file: with tempfile.NamedTemporaryFile(mode="w", suffix=".sh") as temp_file:
generate_job_script(args.template, temp_file.name, **template_vars) generate_job_script(args.template, temp_file.name, **template_vars)
job_id = submit_job(temp_file.name)
logging.info(f"Job logs will be available in: logs/{job_id}/") submitted_job_ids = []
job_id = submit_job(temp_file.name, args.extra_slurm_args)
submitted_job_ids.append(job_id)
# retries logic
extra_slurm_args_without_dependencies = [
x for x in args.extra_slurm_args if "dependency" not in x
]
for _ in range(args.retries):
dependencies = ",".join([f"afternotok:{job}" for job in submitted_job_ids])
slurm_args = extra_slurm_args_without_dependencies + [
f"dependency={dependencies}"
]
job_id = submit_job(temp_file.name, slurm_args)
submitted_job_ids.append(job_id)
print_welcome_message(submitted_job_ids)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -139,6 +139,7 @@ addopts = [ ...@@ -139,6 +139,7 @@ addopts = [
"--ignore-glob=*/llm/tensorrtllm*", "--ignore-glob=*/llm/tensorrtllm*",
"--ignore-glob=docs/*", "--ignore-glob=docs/*",
"--ignore-glob=components/backends/sglang/src/dynamo/sglang/request_handlers/*", "--ignore-glob=components/backends/sglang/src/dynamo/sglang/request_handlers/*",
"--ignore-glob=components/backends/sglang/slurm_jobs/*",
# FIXME: Get relative/generic blob paths to work here # FIXME: Get relative/generic blob paths to work here
] ]
xfail_strict = true xfail_strict = true
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment