Unverified Commit e31c8790 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat(sglang): allow for multi worker spin up in slurm scripts (#2328)

parent 11bc8a17
......@@ -3,7 +3,7 @@ SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All
SPDX-License-Identifier: Apache-2.0
-->
# LLM Deployment using SGLang
# Running SGLang with Dynamo
This directory contains an SGLang component for Dynamo and reference implementations for deploying Large Language Models (LLMs) in various configurations using SGLang. SGLang internally uses ZMQ to communicate between the ingress and the engine processes. For Dynamo, we leverage the runtime to communicate directly with the engine processes and handle ingress and pre/post processing on our end.
......@@ -224,7 +224,7 @@ Below we provide a selected list of advanced examples. Please open up an issue i
### Large scale P/D disaggregation with WideEP
- **[Run DeepSeek-R1 on 104+ H100s](docs/dsr1-wideep-h100.md)**
- **[Run DeepSeek-R1 on GB200s](docs/dsr1-wideep-gb200.md)**
- **[Run DeepSeek-R1-FP8 on GB200s](docs/dsr1-wideep-gb200.md)**
### Hierarchical Cache (HiCache)
- **[Enable SGLang Hierarchical Cache (HiCache)](docs/sgl-hicache-example.md)**
......
......@@ -12,8 +12,12 @@
# Constants
PREFILL_NODES={{ prefill_nodes }}
DECODE_NODES={{ decode_nodes }}
PREFILL_WORKERS={{ prefill_workers }}
DECODE_WORKERS={{ decode_workers }}
TOTAL_NODES=$((PREFILL_NODES + DECODE_NODES))
GPUS_PER_NODE={{ gpus_per_node }}
PREFILL_NODES_PER_WORKER=$((PREFILL_NODES / PREFILL_WORKERS))
DECODE_NODES_PER_WORKER=$((DECODE_NODES / DECODE_WORKERS))
LOG_DIR="${SLURM_SUBMIT_DIR}/logs/${SLURM_JOB_ID}/"
SCRIPT_DIR="${SLURM_SUBMIT_DIR}/scripts"
OUTPUT_DIR="${SLURM_SUBMIT_DIR}/outputs"
......@@ -22,7 +26,6 @@ CONFIG_DIR="{{ config_dir }}"
CONTAINER_IMAGE="{{ container_image }}"
NETWORK_INTERFACE="{{ network_interface }}"
GPU_TYPE="{{ gpu_type | default('h100') }}"
USE_SGLANG_COMMANDS="{{ use_sglang_commands | default(false) }}"
{% raw %}
......@@ -39,19 +42,29 @@ for i in "${!nodes[@]}"; do
echo "Node $i: ${nodes[$i]}"
done
PREFILL_HOST_IP=$(srun --nodes=1 --ntasks=1 --nodelist=${nodes[0]} ip route get $(getent ahosts ${nodes[0]} | grep STREAM | head -1 | awk '{print $1}') | awk '{for(i=1;i<=NF;i++) if($i=="src") print $(i+1)}')
if [ -z "$PREFILL_HOST_IP" ]; then
echo "Error: Could not retrieve IP address for prefill host ${nodes[0]} on interface $NETWORK_INTERFACE"
# Get IP address of the master node (first prefill node) for NATS/ETCD
MASTER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=${nodes[0]} ip addr show $NETWORK_INTERFACE | grep 'inet ' | awk '{print $2}' | cut -d'/' -f1)
if [ -z "$MASTER_IP" ]; then
echo "Error: Could not retrieve IP address for master host ${nodes[0]} on interface $NETWORK_INTERFACE"
exit 1
fi
echo "Prefill host IP address: $PREFILL_HOST_IP"
echo "Master IP address: $MASTER_IP"
DECODE_HOST_IP=$(srun --nodes=1 --ntasks=1 --nodelist=${nodes[$PREFILL_NODES]} ip route get $(getent ahosts ${nodes[$PREFILL_NODES]} | grep STREAM | head -1 | awk '{print $1}') | awk '{for(i=1;i<=NF;i++) if($i=="src") print $(i+1)}')
if [ -z "$DECODE_HOST_IP" ]; then
echo "Error: Could not retrieve IP address for decode host ${nodes[$PREFILL_NODES]} on interface $NETWORK_INTERFACE"
exit 1
fi
echo "Decode host IP address: $DECODE_HOST_IP"
# Compute leader nodes for each worker
prefill_leaders=()
for i in $(seq 0 $((PREFILL_WORKERS - 1))); do
leader_idx=$((i * PREFILL_NODES_PER_WORKER))
prefill_leaders[$i]=$leader_idx
done
decode_leaders=()
for i in $(seq 0 $((DECODE_WORKERS - 1))); do
leader_idx=$((PREFILL_NODES + i * DECODE_NODES_PER_WORKER))
decode_leaders[$i]=$leader_idx
done
echo "Prefill worker leaders: ${prefill_leaders[@]}"
echo "Decode worker leaders: ${decode_leaders[@]}"
# Prepare enroot arguments to pass to srun commands
ENROOT_ARGS="\
......@@ -62,31 +75,52 @@ ENROOT_ARGS="\
"
# Build common worker arguments
WORKER_ARGS="--gpu_type ${GPU_TYPE} --gpus_per_node ${GPUS_PER_NODE}"
if [ "$USE_SGLANG_COMMANDS" = "True" ]; then
WORKER_ARGS="${WORKER_ARGS} --use-sglang-commands"
fi
WORKER_ARGS="--gpu_type ${GPU_TYPE} --gpus_per_node ${GPUS_PER_NODE} --master_ip ${MASTER_IP}"
# Launch prefill tasks on the first PREFILL_NODES nodes
for i in $(seq 0 $((PREFILL_NODES - 1))); do
node=${nodes[$i]}
rank=$i
echo "Launching prefill task on node ${i} (rank ${rank}): $node"
# Launch prefill workers
for worker_idx in $(seq 0 $((PREFILL_WORKERS - 1))); do
leader_idx=${prefill_leaders[$worker_idx]}
leader_node=${nodes[$leader_idx]}
cmd="srun $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_prefill.out --error=${LOG_DIR}/${node}_prefill.err python /scripts/worker_setup.py --prefill_host_ip ${PREFILL_HOST_IP} --decode_host_ip ${DECODE_HOST_IP} --rank ${rank} --total_nodes ${PREFILL_NODES} --worker_type prefill --gpu_utilization_log /logs/${node}_prefill_gpu_utilization.log ${WORKER_ARGS}"
# Get leader IP for this worker group
LEADER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$leader_node ip route get $(getent ahosts $leader_node | grep STREAM | head -1 | awk '{print $1}') | awk '{for(i=1;i<=NF;i++) if($i=="src") print $(i+1)}')
echo "Prefill worker $worker_idx leader: $leader_node ($LEADER_IP)"
# Launch all nodes for this worker
for node_idx in $(seq 0 $((PREFILL_NODES_PER_WORKER - 1))); do
global_node_idx=$((leader_idx + node_idx))
node=${nodes[$global_node_idx]}
local_rank=$node_idx
echo "Launching prefill worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node"
cmd="srun $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_prefill_w${worker_idx}.out --error=${LOG_DIR}/${node}_prefill_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${PREFILL_NODES_PER_WORKER} --worker_type prefill --gpu_utilization_log /logs/${node}_prefill_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}"
echo "$cmd"
$cmd &
done
done
# Launch decode tasks on the next DECODE_NODES nodes
for i in $(seq $PREFILL_NODES $((PREFILL_NODES + DECODE_NODES - 1))); do
node=${nodes[$i]}
rank=$((i - PREFILL_NODES))
echo "Launching decode task on node ${i} (rank ${rank}): $node"
# Launch decode workers
for worker_idx in $(seq 0 $((DECODE_WORKERS - 1))); do
leader_idx=${decode_leaders[$worker_idx]}
leader_node=${nodes[$leader_idx]}
# Get leader IP for this worker group
LEADER_IP=$(srun --nodes=1 --ntasks=1 --nodelist=$leader_node ip route get $(getent ahosts $leader_node | grep STREAM | head -1 | awk '{print $1}') | awk '{for(i=1;i<=NF;i++) if($i=="src") print $(i+1)}')
echo "Decode worker $worker_idx leader: $leader_node ($LEADER_IP)"
# Launch all nodes for this worker
for node_idx in $(seq 0 $((DECODE_NODES_PER_WORKER - 1))); do
global_node_idx=$((leader_idx + node_idx))
node=${nodes[$global_node_idx]}
local_rank=$node_idx
echo "Launching decode worker $worker_idx, node $global_node_idx (local_rank $local_rank): $node"
cmd="srun $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_decode.out --error=${LOG_DIR}/${node}_decode.err python /scripts/worker_setup.py --decode_host_ip ${DECODE_HOST_IP} --prefill_host_ip ${PREFILL_HOST_IP} --rank ${rank} --total_nodes ${DECODE_NODES} --worker_type decode --gpu_utilization_log /logs/${node}_decode_gpu_utilization.log ${WORKER_ARGS}"
cmd="srun $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$node --output=${LOG_DIR}/${node}_decode_w${worker_idx}.out --error=${LOG_DIR}/${node}_decode_w${worker_idx}.err python /scripts/worker_setup.py --leader_ip ${LEADER_IP} --worker_idx ${worker_idx} --local_rank ${local_rank} --nodes_per_worker ${DECODE_NODES_PER_WORKER} --worker_type decode --gpu_utilization_log /logs/${node}_decode_w${worker_idx}_gpu_utilization.log ${WORKER_ARGS}"
echo "$cmd"
$cmd &
done
done
echo ""
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Function to print usage
print_usage() {
echo "Usage: $0 <mode>"
echo " mode: prefill or decode"
echo ""
echo "Examples:"
echo " $0 prefill"
echo " $0 decode"
exit 1
}
# Check if correct number of arguments provided
if [ $# -ne 1 ]; then
echo "Error: Expected 1 argument, got $#"
print_usage
fi
# Parse arguments
mode=$1
# Validate mode argument
if [ "$mode" != "prefill" ] && [ "$mode" != "decode" ]; then
echo "Error: mode must be 'prefill' or 'decode', got '$mode'"
print_usage
fi
echo "Mode: $mode"
echo "Command: dynamo"
# Check if required environment variables are set
if [ -z "$HOST_IP" ]; then
echo "Error: HOST_IP environment variable is not set"
exit 1
fi
if [ -z "$PORT" ]; then
echo "Error: PORT environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_GPUS" ]; then
echo "Error: TOTAL_GPUS environment variable is not set"
exit 1
fi
if [ -z "$RANK" ]; then
echo "Error: RANK environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_NODES" ]; then
echo "Error: TOTAL_NODES environment variable is not set"
exit 1
fi
# Construct command based on mode
if [ "$mode" = "prefill" ]; then
# GB200 dynamo prefill command
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=2048 \
MC_TE_METRIC=true \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
MC_FORCE_MNNVL=1 \
NCCL_MNNVL_ENABLE=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m dynamo.sglang.worker \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--skip-tokenizer-init \
--trust-remote-code \
--disaggregation-mode prefill \
--dist-init-addr "$HOST_IP:$PORT" \
--disaggregation-bootstrap-port 30001 \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--host 0.0.0.0 \
--decode-log-interval 1 \
--max-running-requests 12288 \
--context-length 9600 \
--disable-radix-cache \
--enable-deepep-moe \
--deepep-mode low_latency \
--ep-dispatch-algorithm dynamic \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--watchdog-timeout 1000000 \
--init-expert-location /configs/prefill_dsr1-0528_in1000out1000_num40000.json \
--disable-cuda-graph \
--chunked-prefill-size 16384 \
--max-total-tokens 65536 \
--deepep-config /configs/deepep_config.json \
--stream-interval 50 \
--log-level debug
elif [ "$mode" = "decode" ]; then
# GB200 dynamo decode command
DYN_SKIP_SGLANG_LOG_FORMATTING=1 \
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=512 \
MC_TE_METRIC=true \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_HACK_SEQ_BOOTSTRAP_ROOM=1 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
NCCL_MNNVL_ENABLE=1 \
MC_FORCE_MNNVL=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m dynamo.sglang.decode_worker \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--skip-tokenizer-init \
--trust-remote-code \
--disaggregation-mode decode \
--dist-init-addr "$HOST_IP:$PORT" \
--disaggregation-bootstrap-port 30001 \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--host 0.0.0.0 \
--decode-log-interval 1 \
--max-running-requests 36864 \
--context-length 9600 \
--disable-radix-cache \
--enable-deepep-moe \
--deepep-mode low_latency \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--cuda-graph-bs 1 2 4 8 16 24 32 40 48 56 64 80 96 112 128 160 192 224 256 320 384 448 512 \
--cuda-graph-max-bs 512 \
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm static \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--watchdog-timeout 1000000 \
--init-expert-location /configs/decode_dsr1-0528_loadgen_in1024out1024_num2000_2p12d.json \
--chunked-prefill-size 36864 \
--stream-interval 50 \
--mem-fraction-static 0.82
fi
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Function to print usage
print_usage() {
echo "Usage: $0 <mode> <cmd>"
echo " mode: prefill or decode"
echo " cmd: dynamo or sglang"
echo ""
echo "Examples:"
echo " $0 prefill dynamo"
echo " $0 decode sglang"
exit 1
}
# Check if correct number of arguments provided
if [ $# -ne 2 ]; then
echo "Error: Expected 2 arguments, got $#"
print_usage
fi
# Parse arguments
mode=$1
cmd=$2
# Validate mode argument
if [ "$mode" != "prefill" ] && [ "$mode" != "decode" ]; then
echo "Error: mode must be 'prefill' or 'decode', got '$mode'"
print_usage
fi
# Validate cmd argument
if [ "$cmd" != "dynamo" ] && [ "$cmd" != "sglang" ]; then
echo "Error: cmd must be 'dynamo' or 'sglang', got '$cmd'"
print_usage
fi
echo "Mode: $mode"
echo "Command: $cmd"
# Check if required environment variables are set
if [ -z "$HOST_IP" ]; then
echo "Error: HOST_IP environment variable is not set"
exit 1
fi
if [ -z "$PORT" ]; then
echo "Error: PORT environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_GPUS" ]; then
echo "Error: TOTAL_GPUS environment variable is not set"
exit 1
fi
if [ -z "$RANK" ]; then
echo "Error: RANK environment variable is not set"
exit 1
fi
if [ -z "$TOTAL_NODES" ]; then
echo "Error: TOTAL_NODES environment variable is not set"
exit 1
fi
# TODO: since the args for sglang and dynamo are the same, we can be a bit cleaner here
# Construct command based on mode and cmd
if [ "$mode" = "prefill" ]; then
if [ "$cmd" = "dynamo" ]; then
# We are not using a init-expert-location file for e2e benchmarking
# We also don't currently have a --deepep-config file for GB200
# Need to increase --context-length to 10k for 8k1k benchmarking
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=2048 \
MC_TE_METRIC=true \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
MC_FORCE_MNNVL=1 \
NCCL_MNNVL_ENABLE=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m dynamo.sglang \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--skip-tokenizer-init \
--trust-remote-code \
--disaggregation-mode prefill \
--dist-init-addr "$HOST_IP:$PORT" \
--disaggregation-bootstrap-port 30001 \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--host 0.0.0.0 \
--decode-log-interval 1 \
--max-running-requests 6144 \
--context-length 2716 \
--disable-radix-cache \
--enable-deepep-moe \
--deepep-mode low_latency \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm static \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--watchdog-timeout 1000000 \
--disable-cuda-graph \
--chunked-prefill-size 16384 \
--max-total-tokens 32768 \
--mem-fraction-static 0.8 \
--log-level debug
elif [ "$cmd" = "sglang" ]; then
# GB200 sglang prefill command
# We are not using a init-expert-location file for e2e benchmarking
# We also don't currently have a --deepep-config file for GB200
# Need to increase --context-length to 10k for 8k1k benchmarking
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=2048 \
MC_TE_METRIC=true \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
NCCL_MNNVL_ENABLE=1 \
MC_FORCE_MNNVL=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m sglang.launch_server \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--trust-remote-code \
--disaggregation-mode prefill \
--dist-init-addr "$HOST_IP:$PORT" \
--disaggregation-bootstrap-port 30001 \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--host 0.0.0.0 \
--decode-log-interval 1 \
--max-running-requests 6144 \
--context-length 2716 \
--disable-radix-cache \
--enable-deepep-moe \
--deepep-mode low_latency \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm static \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--watchdog-timeout 1000000 \
--disable-cuda-graph \
--chunked-prefill-size 16384 \
--max-total-tokens 32768 \
--mem-fraction-static 0.8 \
--log-level debug
fi
elif [ "$mode" = "decode" ]; then
if [ "$cmd" = "dynamo" ]; then
# Need to increase --context-length to 10k for 8k1k benchmarking
# We are not using a init-expert-location file for e2e benchmarking
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=768 \
MC_TE_METRIC=true \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_HACK_SEQ_BOOTSTRAP_ROOM=1 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
NCCL_MNNVL_ENABLE=1 \
MC_FORCE_MNNVL=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m dynamo.sglang \
--served-model-name deepseek-ai/DeepSeek-R1 \
--model-path /model/ \
--skip-tokenizer-init \
--trust-remote-code \
--disaggregation-mode decode \
--dist-init-addr "$HOST_IP:$PORT" \
--disaggregation-bootstrap-port 30001 \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--host 0.0.0.0 \
--decode-log-interval 1 \
--max-running-requests 36864 \
--context-length 2716 \
--disable-radix-cache \
--enable-deepep-moe \
--deepep-mode low_latency \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--cuda-graph-bs 768 \
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm static \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--watchdog-timeout 1000000 \
--chunked-prefill-size 36864 \
--mem-fraction-static 0.82 \
--log-level debug
elif [ "$cmd" = "sglang" ]; then
# GB200 sglang decode command
# Need to increase --context-length to 10k for 8k1k benchmarking
# We are not using a init-expert-location file for e2e benchmarking
SGLANG_DEEPEP_NUM_MAX_DISPATCH_TOKENS_PER_RANK=768 \
MC_TE_METRIC=true \
SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE=100000 \
SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=100000 \
SGLANG_DISAGGREGATION_WAITING_TIMEOUT=100000 \
SGLANG_HACK_SEQ_BOOTSTRAP_ROOM=1 \
SGLANG_MOONCAKE_CUSTOM_MEM_POOL=True \
NCCL_MNNVL_ENABLE=1 \
MC_FORCE_MNNVL=1 \
NCCL_CUMEM_ENABLE=1 \
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER=0 \
SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK=1 \
PYTHONUNBUFFERED=1 \
python3 -m sglang.launch_server \
--model-path /model/ \
--trust-remote-code \
--disaggregation-mode decode \
--dist-init-addr "$HOST_IP:$PORT" \
--disaggregation-bootstrap-port 30001 \
--nnodes "$TOTAL_NODES" \
--node-rank "$RANK" \
--tp-size "$TOTAL_GPUS" \
--dp-size "$TOTAL_GPUS" \
--enable-dp-attention \
--host 0.0.0.0 \
--decode-log-interval 1 \
--max-running-requests 36864 \
--context-length 2716 \
--disable-radix-cache \
--enable-deepep-moe \
--deepep-mode low_latency \
--moe-dense-tp-size 1 \
--enable-dp-lm-head \
--cuda-graph-bs 768 \
--disable-shared-experts-fusion \
--ep-num-redundant-experts 32 \
--ep-dispatch-algorithm static \
--eplb-algorithm deepseek \
--attention-backend cutlass_mla \
--watchdog-timeout 1000000 \
--chunked-prefill-size 36864 \
--mem-fraction-static 0.82 \
--log-level debug
fi
fi
......@@ -4,25 +4,23 @@
# Function to print usage
print_usage() {
echo "Usage: $0 <mode> <cmd>"
echo "Usage: $0 <mode>"
echo " mode: prefill or decode"
echo " cmd: dynamo or sglang"
echo ""
echo "Examples:"
echo " $0 prefill dynamo"
echo " $0 decode sglang"
echo " $0 prefill"
echo " $0 decode"
exit 1
}
# Check if correct number of arguments provided
if [ $# -ne 2 ]; then
echo "Error: Expected 2 arguments, got $#"
if [ $# -ne 1 ]; then
echo "Error: Expected 1 argument, got $#"
print_usage
fi
# Parse arguments
mode=$1
cmd=$2
# Validate mode argument
if [ "$mode" != "prefill" ] && [ "$mode" != "decode" ]; then
......@@ -30,14 +28,8 @@ if [ "$mode" != "prefill" ] && [ "$mode" != "decode" ]; then
print_usage
fi
# Validate cmd argument
if [ "$cmd" != "dynamo" ] && [ "$cmd" != "sglang" ]; then
echo "Error: cmd must be 'dynamo' or 'sglang', got '$cmd'"
print_usage
fi
echo "Mode: $mode"
echo "Command: $cmd"
echo "Command: dynamo"
# Check if required environment variables are set
......@@ -66,7 +58,7 @@ if [ -z "$TOTAL_NODES" ]; then
exit 1
fi
# Construct command based on mode and cmd
# Construct command based on mode
if [ "$mode" = "prefill" ]; then
if [ "$cmd" = "dynamo" ]; then
# H100 dynamo prefill command
......@@ -156,7 +148,7 @@ elif [ "$mode" = "decode" ]; then
--deepep-mode low_latency \
--mem-fraction-static 0.835 \
--ep-num-redundant-experts 32 \
--cuda-graph-bs 256
--cuda-graph-bs 128
elif [ "$cmd" = "sglang" ]; then
# H100 sglang decode command
python3 -m sglang.launch_server \
......@@ -182,7 +174,7 @@ elif [ "$mode" = "decode" ]; then
--deepep-mode low_latency \
--mem-fraction-static 0.835 \
--ep-num-redundant-experts 32 \
--cuda-graph-bs 256
--cuda-graph-bs 128
fi
fi
......
......@@ -124,28 +124,34 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--prefill_host_ip",
"--leader_ip",
type=str,
required=True,
help="IP address of the prefill host node",
help="IP address of the leader node for this worker group",
)
parser.add_argument(
"--decode_host_ip",
"--master_ip",
type=str,
required=True,
help="IP address of the decode host node",
help="IP address of the master node (first prefill node) for NATS/ETCD",
)
parser.add_argument(
"--rank",
"--worker_idx",
type=int,
required=True,
help="Rank of the current node (0 for host node)",
help="Index of the worker group (0-based)",
)
parser.add_argument(
"--total_nodes",
"--local_rank",
type=int,
required=True,
help="Total number of nodes in the cluster",
help="Local rank within the worker group (0 for leader)",
)
parser.add_argument(
"--nodes_per_worker",
type=int,
required=True,
help="Number of nodes per worker",
)
parser.add_argument(
"--worker_type",
......@@ -165,16 +171,11 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
default=None,
help="File to log GPU utilization (default: None)",
)
parser.add_argument(
"--use-sglang-commands",
action="store_true",
default=False,
help="Helper to spin up SGLang servers instead of dynamo. This is helpful for benchmarking SGLang as well",
)
parser.add_argument(
"--gpu_type",
type=str,
choices=["h100", "gb200"],
choices=["h100", "gb200-fp8"],
default="h100",
help="Type of GPU to use",
)
......@@ -184,57 +185,52 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
def _validate_args(args: argparse.Namespace) -> None:
"""Validate command line arguments"""
if args.rank < 0:
raise ValueError("Rank must be non-negative")
if args.worker_idx < 0:
raise ValueError("Worker index must be non-negative")
if args.local_rank < 0:
raise ValueError("Local rank must be non-negative")
if args.total_nodes < 1:
raise ValueError("Total nodes must be at least 1")
if args.nodes_per_worker < 1:
raise ValueError("Nodes per worker must be at least 1")
if args.gpus_per_node < 1:
raise ValueError("GPUs per node must be at least 1")
def get_sglang_mini_lb_command_args(prefill_host_ip: str, decode_host_ip: str) -> str:
cmd = (
f"python3 -m sglang.srt.disaggregation.launch_lb "
f"--prefill http://{prefill_host_ip}:30000 "
f"--decode http://{decode_host_ip}:30000 "
"--host 0.0.0.0 "
"--port 8000 "
"--timeout 3600"
if args.local_rank >= args.nodes_per_worker:
raise ValueError(
f"Local rank ({args.local_rank}) must be less than nodes per worker ({args.nodes_per_worker})"
)
return cmd
def setup_env_vars_for_gpu_script(
host_ip: str,
rank: int,
local_rank: int,
total_gpus: int,
total_nodes: int,
port: int = DIST_INIT_PORT,
):
"""Setup environment variables required by GPU scripts (h100.sh, gb200.sh)"""
"""Setup environment variables required by GPU scripts (h100.sh, gb200-fp8.sh, gb200-fp4.sh)"""
os.environ["HOST_IP"] = host_ip
os.environ["PORT"] = str(port)
os.environ["TOTAL_GPUS"] = str(total_gpus)
os.environ["RANK"] = str(rank)
os.environ["RANK"] = str(local_rank)
os.environ["TOTAL_NODES"] = str(total_nodes)
logging.info(f"Set HOST_IP: {host_ip}")
logging.info(f"Set PORT: {port}")
logging.info(f"Set TOTAL_GPUS: {total_gpus}")
logging.info(f"Set RANK: {rank}")
logging.info(f"Set RANK: {local_rank}")
logging.info(f"Set TOTAL_NODES: {total_nodes}")
def get_gpu_command(worker_type: str, use_sglang_commands: bool, gpu_type: str) -> str:
def get_gpu_command(worker_type: str, gpu_type: str) -> str:
"""Generate command to run the appropriate GPU script"""
script_name = f"{gpu_type}.sh"
script_path = Path(__file__).parent / script_name
mode = worker_type # "prefill" or "decode"
cmd = "sglang" if use_sglang_commands else "dynamo"
return f"bash {script_path} {mode} {cmd}"
return f"bash {script_path} {mode}"
def setup_head_prefill_node(prefill_host_ip: str) -> None:
......@@ -261,7 +257,7 @@ def setup_head_prefill_node(prefill_host_ip: str) -> None:
logging.info(f"Starting ingress server on node {prefill_host_ip}")
ingress_process = run_command(
"dynamo run in=http out=dyn --http-port=8000", background=True
"python3 -m dynamo.frontend --http-port=8000", background=True
)
if not ingress_process:
raise RuntimeError("Failed to start ingress")
......@@ -275,69 +271,68 @@ def setup_head_prefill_node(prefill_host_ip: str) -> None:
raise RuntimeError("Failed to start cache flush server")
def setup_prefill_node(
rank: int,
prefill_host_ip: str,
total_nodes: int,
total_gpus: int,
use_sglang_commands: bool,
def setup_prefill_worker(
worker_idx: int,
local_rank: int,
leader_ip: str,
master_ip: str,
nodes_per_worker: int,
gpus_per_node: int,
gpu_type: str,
) -> int:
"""
Setup the prefill node.
Setup the prefill worker.
"""
if not use_sglang_commands:
if rank == 0:
setup_head_prefill_node(prefill_host_ip)
total_gpus = nodes_per_worker * gpus_per_node
# Only the first prefill worker's leader node sets up NATS/ETCD/Frontend
if worker_idx == 0 and local_rank == 0:
setup_head_prefill_node(master_ip)
else:
logging.info(f"Setting up child prefill node: {rank}")
if not wait_for_etcd(f"http://{prefill_host_ip}:{ETCD_CLIENT_PORT}"):
logging.info(
f"Setting up child prefill worker {worker_idx}, local rank {local_rank}"
)
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
raise RuntimeError("Failed to connect to etcd")
else:
logging.info("Using SGLang servers. No need to setup etcd or nats")
# Setup environment variables for GPU script
setup_env_vars_for_gpu_script(prefill_host_ip, rank, total_gpus, total_nodes)
# Setup environment variables for GPU script - use leader_ip as dist-init-addr
setup_env_vars_for_gpu_script(leader_ip, local_rank, total_gpus, nodes_per_worker)
# Use appropriate GPU script instead of generating command directly
cmd_to_run = get_gpu_command("prefill", use_sglang_commands, gpu_type)
cmd_to_run = get_gpu_command("prefill", gpu_type)
return run_command(cmd_to_run)
def setup_decode_node(
rank: int,
decode_host_ip: str,
prefill_host_ip: str,
total_nodes: int,
total_gpus: int,
use_sglang_commands: bool,
def setup_decode_worker(
worker_idx: int,
local_rank: int,
leader_ip: str,
master_ip: str,
nodes_per_worker: int,
gpus_per_node: int,
gpu_type: str,
) -> int:
"""
Setup the decode node.
Setup the decode worker.
"""
logging.info(f"Setting up child decode node: {rank}")
total_gpus = nodes_per_worker * gpus_per_node
if use_sglang_commands:
sgl_mini_lb_cmd = get_sglang_mini_lb_command_args(
prefill_host_ip, decode_host_ip
)
run_command(sgl_mini_lb_cmd, background=True)
else:
if not wait_for_etcd(f"http://{prefill_host_ip}:{ETCD_CLIENT_PORT}"):
logging.info(f"Setting up decode worker {worker_idx}, local rank {local_rank}")
if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"):
raise RuntimeError("Failed to connect to etcd")
# Setup environment variables for GPU script
setup_env_vars_for_gpu_script(decode_host_ip, rank, total_gpus, total_nodes)
# Setup environment variables for GPU script - use leader_ip as dist-init-addr
setup_env_vars_for_gpu_script(leader_ip, local_rank, total_gpus, nodes_per_worker)
# Use appropriate GPU script instead of generating command directly
cmd_to_run = get_gpu_command("decode", use_sglang_commands, gpu_type)
cmd_to_run = get_gpu_command("decode", gpu_type)
return run_command(cmd_to_run)
def setup_env(prefill_host_ip: str):
nats_server = f"nats://{prefill_host_ip}:{NATS_PORT}"
etcd_endpoints = f"http://{prefill_host_ip}:{ETCD_CLIENT_PORT}"
def setup_env(master_ip: str):
nats_server = f"nats://{master_ip}:{NATS_PORT}"
etcd_endpoints = f"http://{master_ip}:{ETCD_CLIENT_PORT}"
os.environ["NATS_SERVER"] = nats_server
os.environ["ETCD_ENDPOINTS"] = etcd_endpoints
......@@ -354,35 +349,38 @@ def main(input_args: list[str] | None = None):
if args.gpu_utilization_log:
log_gpu_utilization(args.gpu_utilization_log)
logging.info(f"{args.worker_type.capitalize()} node setup started")
logging.info(f"{args.worker_type.capitalize()} worker setup started")
logging.info(f"Hostname: {socket.gethostname()}")
logging.info(f"Prefill host IP: {args.prefill_host_ip}")
logging.info(f"Decode host IP: {args.decode_host_ip}")
logging.info(f"Rank: {args.rank}")
logging.info(f"Use SGLang commands: {args.use_sglang_commands}")
setup_env(args.prefill_host_ip)
logging.info(f"Worker type: {args.worker_type}")
logging.info(f"Worker index: {args.worker_idx}")
logging.info(f"Local rank: {args.local_rank}")
logging.info(f"Leader IP: {args.leader_ip}")
logging.info(f"Master IP: {args.master_ip}")
logging.info(f"Nodes per worker: {args.nodes_per_worker}")
setup_env(args.master_ip)
if args.worker_type == "prefill":
setup_prefill_node(
args.rank,
args.prefill_host_ip,
args.total_nodes,
args.total_nodes * args.gpus_per_node,
args.use_sglang_commands,
setup_prefill_worker(
args.worker_idx,
args.local_rank,
args.leader_ip,
args.master_ip,
args.nodes_per_worker,
args.gpus_per_node,
args.gpu_type,
)
else:
setup_decode_node(
args.rank,
args.decode_host_ip,
args.prefill_host_ip,
args.total_nodes,
args.total_nodes * args.gpus_per_node,
args.use_sglang_commands,
setup_decode_worker(
args.worker_idx,
args.local_rank,
args.leader_ip,
args.master_ip,
args.nodes_per_worker,
args.gpus_per_node,
args.gpu_type,
)
logging.info(f"{args.worker_type.capitalize()} node setup complete")
logging.info(f"{args.worker_type.capitalize()} worker setup complete")
if __name__ == "__main__":
......
......@@ -95,20 +95,24 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac
"--decode-nodes", type=int, default=2, help="Number of decode nodes"
)
parser.add_argument(
"--gpus-per-node", type=int, default=8, help="Number of GPUs per node"
"--prefill-workers", type=int, default=1, help="Number of prefill workers"
)
parser.add_argument(
"--network-interface", default="eth3", help="Network interface to use"
"--decode-workers", type=int, default=1, help="Number of decode workers"
)
parser.add_argument(
"--gpu-type", choices=["h100", "gb200"], default="h100", help="GPU type to use"
"--gpus-per-node", type=int, default=8, help="Number of GPUs per node"
)
parser.add_argument(
"--use-sglang-commands",
action="store_true",
default=False,
help="Use SGLang commands instead of Dynamo",
"--network-interface", default="eth3", help="Network interface to use"
)
parser.add_argument(
"--gpu-type",
choices=["h100", "gb200-fp8"],
default="h100",
help="GPU type to use",
)
parser.add_argument(
"--partition",
default="batch",
......@@ -121,6 +125,17 @@ def main(input_args: list[str] | None = None):
setup_logging()
args = _parse_command_line_args(input_args)
# Validation
if args.prefill_nodes % args.prefill_workers != 0:
raise ValueError(
f"Prefill nodes ({args.prefill_nodes}) must be divisible by prefill workers ({args.prefill_workers})"
)
if args.decode_nodes % args.decode_workers != 0:
raise ValueError(
f"Decode nodes ({args.decode_nodes}) must be divisible by decode workers ({args.decode_workers})"
)
total_nodes = args.prefill_nodes + args.decode_nodes
template_vars = {
"job_name": args.job_name,
......@@ -129,13 +144,14 @@ def main(input_args: list[str] | None = None):
"time_limit": args.time_limit,
"prefill_nodes": args.prefill_nodes,
"decode_nodes": args.decode_nodes,
"prefill_workers": args.prefill_workers,
"decode_workers": args.decode_workers,
"model_dir": args.model_dir,
"config_dir": args.config_dir,
"container_image": args.container_image,
"gpus_per_node": args.gpus_per_node,
"network_interface": args.network_interface,
"gpu_type": args.gpu_type,
"use_sglang_commands": args.use_sglang_commands,
"partition": args.partition,
}
......
......@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
ARG SGLANG_IMAGE_TAG="v0.5.0rc0-cu126"
ARG SGLANG_IMAGE_TAG="v0.5.0rc2-cu126"
FROM lmsysorg/sglang:${SGLANG_IMAGE_TAG}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment