Unverified Commit fe718fd2 authored by hhzhang16's avatar hhzhang16 Committed by GitHub
Browse files

feat: deploy SLA profiler to k8s (#2030)


Co-authored-by: default avatarhongkuan <hongkuanz@nvidia.com>
Co-authored-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
Co-authored-by: default avatarHongkuan Zhou <tedzhouhk@gmail.com>
parent ba3ac235
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: profile-sla-binding
namespace: ${NAMESPACE}
subjects:
- kind: ServiceAccount
name: profile-sla-sa
namespace: ${NAMESPACE}
roleRef:
kind: Role
name: profile-sla-role
apiGroup: rbac.authorization.k8s.io
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: batch/v1
kind: Job
metadata:
name: profile-sla
namespace: ${NAMESPACE}
spec:
template:
spec:
serviceAccountName: profile-sla-sa
containers:
- name: profile-sla
image: ${DOCKER_IMAGE}
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
env:
- name: HUGGING_FACE_HUB_TOKEN
valueFrom:
secretKeyRef:
name: hf-token-secret
key: HF_TOKEN
- name: NATS_SERVER
value: nats://${NAMESPACE}-nats:4222
- name: ETCD_ENDPOINTS
value: ${NAMESPACE}-etcd:2379
command: ["python", "/workspace/benchmarks/profiler/profile_sla.py"]
args:
- --config
- ${DGD_CONFIG_FILE}
- --output-dir
- /workspace/profiling_results
- --namespace
- ${NAMESPACE}
volumeMounts:
- name: output-volume
mountPath: /workspace/profiling_results
restartPolicy: Never
volumes:
- name: output-volume
persistentVolumeClaim:
claimName: profiling-pvc
backoffLimit: 0
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: profile-sla-role
namespace: ${NAMESPACE}
rules:
# DynamoGraphDeployment custom resources - needed for create/get/delete operations
- apiGroups: ["nvidia.com"]
resources: ["dynamographdeployments"]
verbs: ["get", "create", "delete"]
# Pods - needed for listing pods by label selector and getting logs
- apiGroups: [""]
resources: ["pods"]
verbs: ["list"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: v1
kind: ServiceAccount
metadata:
name: profile-sla-sa
namespace: ${NAMESPACE}
imagePullSecrets:
- name: nvcr-imagepullsecret
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: profiling-pvc
namespace: ${NAMESPACE}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 50Gi
...@@ -14,15 +14,19 @@ ...@@ -14,15 +14,19 @@
# limitations under the License. # limitations under the License.
import argparse import argparse
import asyncio
import logging import logging
import math import math
import os import os
import subprocess
import numpy as np import numpy as np
import yaml import yaml
from utils.config import CONFIG_MODIFIERS from utils.config import CONFIG_MODIFIERS
from utils.defaults import DECODE_NUM_REQUESTS_RANGE from utils.defaults import DECODE_NUM_REQUESTS_RANGE
from utils.dynamo_deployment import (
DynamoDeploymentClient,
cleanup_remaining_deployments,
)
from utils.genai_perf import benchmark_decode, benchmark_prefill from utils.genai_perf import benchmark_decode, benchmark_prefill
from utils.plot import ( from utils.plot import (
plot_decode_3d_surface, plot_decode_3d_surface,
...@@ -30,11 +34,11 @@ from utils.plot import ( ...@@ -30,11 +34,11 @@ from utils.plot import (
plot_prefill_interpolation, plot_prefill_interpolation,
plot_prefill_performance, plot_prefill_performance,
) )
from utils.utils import ( from utils.profile_cache import (
get_available_gpu_count, check_decode_results_exist,
get_dynamo_serve_cmd, check_prefill_results_exist,
shutdown_deployment, load_existing_decode_results,
wait_for_server_ready, load_existing_prefill_results,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -47,63 +51,12 @@ formatter = logging.Formatter( ...@@ -47,63 +51,12 @@ formatter = logging.Formatter(
console_handler.setFormatter(formatter) console_handler.setFormatter(formatter)
logger.addHandler(console_handler) logger.addHandler(console_handler)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--backend",
type=str,
default="vllm_v0",
choices=["vllm_v0", "vllm_v1"],
help="backend type (currently only vllm is supported)",
)
parser.add_argument(
"--config", type=str, required=True, help="Path to the dynamo config file"
)
parser.add_argument(
"--example-dir",
type=str,
default=None,
help="path to the example directory, if not provided, will try to infer from config file location",
)
parser.add_argument(
"--output-dir",
type=str,
default="profiling_results",
help="Path to the output results directory",
)
parser.add_argument(
"--isl", type=int, default=3000, help="target input sequence length"
)
parser.add_argument(
"--osl", type=int, default=500, help="target output sequence length"
)
parser.add_argument(
"--ttft", type=int, default=50, help="target Time To First Token in ms"
)
parser.add_argument(
"--itl", type=int, default=10, help="target Inter Token Latency in ms"
)
# below are arguments used for interpolating TTFT and ITL under different ISL/OSL
parser.add_argument(
"--max-context-length",
type=int,
default=16384,
help="maximum context length supported by the served model",
)
parser.add_argument(
"--prefill-interpolation-granularity",
type=int,
default=16,
help="how many samples to benchmark to interpolate TTFT under different ISL",
)
parser.add_argument(
"--decode-interpolation-granularity",
type=int,
default=6,
help="how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length",
)
args = parser.parse_args()
async def run_profile(args):
# List to track all created deployment clients for cleanup in case of failure
deployment_clients = []
try:
config_modifier = CONFIG_MODIFIERS[args.backend] config_modifier = CONFIG_MODIFIERS[args.backend]
if args.example_dir is None: if args.example_dir is None:
...@@ -121,16 +74,28 @@ if __name__ == "__main__": ...@@ -121,16 +74,28 @@ if __name__ == "__main__":
with open(args.config, "r") as f: with open(args.config, "r") as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
# Get the number of available GPUs profile_tp_size = [
available_gpus = get_available_gpu_count() 2**i
for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
profile_tp_size = [2**i for i in range(int(math.log2(available_gpus)) + 1)] if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
]
logger.info(f"Profiling TP sizes: {profile_tp_size}") logger.info(f"Profiling TP sizes: {profile_tp_size}")
os.makedirs(args.output_dir, exist_ok=True) os.makedirs(args.output_dir, exist_ok=True)
model_name = config_modifier.get_model_name(config) model_name = config_modifier.get_model_name(config)
port = config_modifier.get_port(config)
# Log skip behavior
if args.force_rerun:
logger.info(
"Force rerun enabled - will re-run all tests even if results exist"
)
elif args.skip_existing_results:
logger.info(
"Skip existing results enabled - will skip TP sizes with existing results"
)
else:
logger.info("Skip existing results disabled - will re-run all tests")
# first profile prefill # first profile prefill
prefill_tp_size = [] prefill_tp_size = []
...@@ -138,8 +103,29 @@ if __name__ == "__main__": ...@@ -138,8 +103,29 @@ if __name__ == "__main__":
prefill_thpt_per_gpu = [] prefill_thpt_per_gpu = []
logger.info("Profiling prefill...") logger.info("Profiling prefill...")
prefill_config = config_modifier.convert_config(config, "prefill") prefill_config = config_modifier.convert_config(config, "prefill")
frontend_port = config_modifier.get_port(config)
for tp_size in profile_tp_size: for tp_size in profile_tp_size:
logger.info(f"Profiling prefill with TP size {tp_size}...") logger.info(f"Profiling prefill with TP size {tp_size}...")
# Check if results already exist for this TP size
if (
args.skip_existing_results
and not args.force_rerun
and check_prefill_results_exist(args.output_dir, tp_size, args.isl)
):
logger.info(f"Skipping prefill TP{tp_size} - results already exist")
ttft, thpt_per_gpu = load_existing_prefill_results(
args.output_dir, tp_size, args.isl
)
if ttft is not None and thpt_per_gpu is not None:
prefill_tp_size.append(tp_size)
prefill_ttft.append(ttft)
prefill_thpt_per_gpu.append(thpt_per_gpu)
logger.info(
f"Loaded existing prefill results: TP{tp_size} TTFT={ttft:.2f}ms, throughput={thpt_per_gpu:.2f} tokens/s/GPU"
)
continue
prefill_config = config_modifier.set_config_tp_size(prefill_config, tp_size) prefill_config = config_modifier.set_config_tp_size(prefill_config, tp_size)
logger.info(f"Dynamo config: {prefill_config}") logger.info(f"Dynamo config: {prefill_config}")
...@@ -147,31 +133,34 @@ if __name__ == "__main__": ...@@ -147,31 +133,34 @@ if __name__ == "__main__":
os.makedirs(work_dir, exist_ok=True) os.makedirs(work_dir, exist_ok=True)
prefill_config_fn = f"{work_dir}/config.yaml" prefill_config_fn = f"{work_dir}/config.yaml"
dynamo_log_fn = f"{work_dir}/dynamo.log"
with open(prefill_config_fn, "w") as f: with open(prefill_config_fn, "w") as f:
yaml.dump(prefill_config, f) yaml.dump(prefill_config, f)
# Start the dynamo serve process client = DynamoDeploymentClient(
logger.info(f"Starting dynamo serve with TP size {tp_size}...") namespace=args.namespace,
dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn) base_log_dir=work_dir,
with open(dynamo_log_fn, "w") as dynamo_log_f: model_name=model_name,
dynamo_process = subprocess.Popen( service_name=args.service_name,
dynamo_serve_cmd, frontend_port=frontend_port,
stdout=dynamo_log_f, )
stderr=subprocess.STDOUT, logger.info(f"Created client with service_name: {client.service_name}")
text=True, deployment_clients.append(client) # Track for cleanup
cwd=args.example_dir, await client.create_deployment(prefill_config_fn)
preexec_fn=os.setsid, # Use process group for clean termination logger.info("Waiting for deployment to be ready...")
await client.wait_for_deployment_ready()
logger.info("Deployment is ready")
logger.info("Getting deployment logs...")
await client.get_deployment_logs()
logger.info(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
) )
if not wait_for_server_ready(model_name, port):
logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
break
# run genai-perf # run genai-perf
base_url = client.get_service_url()
genai_perf_artifact_dir = f"{work_dir}/gap_isl{args.isl}" genai_perf_artifact_dir = f"{work_dir}/gap_isl{args.isl}"
gap_result = benchmark_prefill( gap_result = benchmark_prefill(
args.isl, genai_perf_artifact_dir, model_name, port args.isl, genai_perf_artifact_dir, model_name, base_url=base_url
) )
if gap_result is not None: if gap_result is not None:
ttft = gap_result["time_to_first_token"]["avg"] ttft = gap_result["time_to_first_token"]["avg"]
...@@ -179,7 +168,10 @@ if __name__ == "__main__": ...@@ -179,7 +168,10 @@ if __name__ == "__main__":
prefill_ttft.append(ttft) prefill_ttft.append(ttft)
prefill_thpt_per_gpu.append(args.isl / ttft / tp_size * 1000) prefill_thpt_per_gpu.append(args.isl / ttft / tp_size * 1000)
shutdown_deployment(dynamo_process) print("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
print("Deployment deleted")
# Plot the results as a 2D scatter plot # Plot the results as a 2D scatter plot
if prefill_tp_size and prefill_ttft and prefill_thpt_per_gpu: if prefill_tp_size and prefill_ttft and prefill_thpt_per_gpu:
...@@ -202,6 +194,45 @@ if __name__ == "__main__": ...@@ -202,6 +194,45 @@ if __name__ == "__main__":
decode_config = config_modifier.convert_config(config, "decode") decode_config = config_modifier.convert_config(config, "decode")
for tp_size in profile_tp_size: for tp_size in profile_tp_size:
logger.info(f"Profiling decode with TP size {tp_size}...") logger.info(f"Profiling decode with TP size {tp_size}...")
# Check if results already exist for this TP size
if (
args.skip_existing_results
and not args.force_rerun
and check_decode_results_exist(
args.output_dir, tp_size, args.isl, args.osl
)
):
logger.info(f"Skipping decode TP{tp_size} - results already exist")
existing_results = load_existing_decode_results(
args.output_dir, tp_size, args.isl, args.osl
)
if existing_results:
# Add existing results to our arrays
engine_decode_itl = []
engine_decode_thpt_per_gpu = []
for itl, thpt_per_gpu, concurrency in existing_results:
decode_tp_size.append(tp_size)
decode_itl.append(itl)
decode_thpt_per_gpu.append(thpt_per_gpu)
decode_concurrency.append(concurrency)
# We need to get kv_cache_size from existing logs or estimate it
estimated_kv_cache = max(
100000, concurrency * (args.isl + args.osl) * 2
) # Conservative estimate
decode_kv_cache_size.append(estimated_kv_cache)
engine_decode_itl.append(itl)
engine_decode_thpt_per_gpu.append(thpt_per_gpu)
# Store results for plotting
decode_results.append(
(tp_size, engine_decode_itl, engine_decode_thpt_per_gpu)
)
logger.info(
f"Loaded {len(existing_results)} existing decode results for TP{tp_size}"
)
continue
decode_config = config_modifier.set_config_tp_size(decode_config, tp_size) decode_config = config_modifier.set_config_tp_size(decode_config, tp_size)
logger.info(f"Dynamo config: {decode_config}") logger.info(f"Dynamo config: {decode_config}")
...@@ -209,28 +240,31 @@ if __name__ == "__main__": ...@@ -209,28 +240,31 @@ if __name__ == "__main__":
os.makedirs(work_dir, exist_ok=True) os.makedirs(work_dir, exist_ok=True)
decode_config_fn = f"{work_dir}/config.yaml" decode_config_fn = f"{work_dir}/config.yaml"
dynamo_log_fn = f"{work_dir}/dynamo.log"
with open(decode_config_fn, "w") as f: with open(decode_config_fn, "w") as f:
yaml.dump(decode_config, f) yaml.dump(decode_config, f)
# Start the dynamo serve process client = DynamoDeploymentClient(
logger.info(f"Starting dynamo serve with TP size {tp_size}...") namespace=args.namespace,
dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn) base_log_dir=work_dir,
with open(dynamo_log_fn, "w") as dynamo_log_f: model_name=model_name,
dynamo_process = subprocess.Popen( service_name=args.service_name,
dynamo_serve_cmd, frontend_port=frontend_port,
stdout=dynamo_log_f, )
stderr=subprocess.STDOUT, deployment_clients.append(client) # Track for cleanup
text=True, await client.create_deployment(decode_config_fn)
cwd=args.example_dir, logger.info("Waiting for deployment to be ready...")
preexec_fn=os.setsid, # Use process group for clean termination await client.wait_for_deployment_ready()
logger.info("Deployment is ready")
logger.info("Getting deployment logs...")
await client.get_deployment_logs()
logger.info(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
) )
if not wait_for_server_ready(model_name, port): max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
logger.error(f"Server did not become ready, skip profiling tp={tp_size}") f"{work_dir}/vllm-v1-agg/vllmdecodeworker/0.log"
break )
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(dynamo_log_fn)
max_concurrency = max_kv_tokens // (args.isl + args.osl) max_concurrency = max_kv_tokens // (args.isl + args.osl)
sweep_num_request = [ sweep_num_request = [
num for num in DECODE_NUM_REQUESTS_RANGE if num < max_concurrency num for num in DECODE_NUM_REQUESTS_RANGE if num < max_concurrency
...@@ -241,6 +275,7 @@ if __name__ == "__main__": ...@@ -241,6 +275,7 @@ if __name__ == "__main__":
engine_decode_itl = [] engine_decode_itl = []
engine_decode_thpt_per_gpu = [] engine_decode_thpt_per_gpu = []
base_url = client.get_service_url()
for num_request in sweep_num_request: for num_request in sweep_num_request:
genai_perf_artifact_dir = f"{work_dir}/gap_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}" genai_perf_artifact_dir = f"{work_dir}/gap_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
gap_result = benchmark_decode( gap_result = benchmark_decode(
...@@ -249,11 +284,13 @@ if __name__ == "__main__": ...@@ -249,11 +284,13 @@ if __name__ == "__main__":
num_request, num_request,
genai_perf_artifact_dir, genai_perf_artifact_dir,
model_name, model_name,
port, base_url=base_url,
) )
if gap_result is not None: if gap_result is not None:
itl = gap_result["inter_token_latency"]["avg"] itl = gap_result["inter_token_latency"]["avg"]
thpt_per_gpu = gap_result["output_token_throughput"]["avg"] / tp_size thpt_per_gpu = (
gap_result["output_token_throughput"]["avg"] / tp_size
)
engine_decode_itl.append(itl) engine_decode_itl.append(itl)
engine_decode_thpt_per_gpu.append(thpt_per_gpu) engine_decode_thpt_per_gpu.append(thpt_per_gpu)
decode_tp_size.append(tp_size) decode_tp_size.append(tp_size)
...@@ -262,10 +299,15 @@ if __name__ == "__main__": ...@@ -262,10 +299,15 @@ if __name__ == "__main__":
decode_concurrency.append(num_request) decode_concurrency.append(num_request)
decode_kv_cache_size.append(max_kv_tokens) decode_kv_cache_size.append(max_kv_tokens)
shutdown_deployment(dynamo_process) print("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
print("Deployment deleted")
# Store partial results for plotting later # Store partial results for plotting later
decode_results.append((tp_size, engine_decode_itl, engine_decode_thpt_per_gpu)) decode_results.append(
(tp_size, engine_decode_itl, engine_decode_thpt_per_gpu)
)
# Plot all decode results after profiling is complete # Plot all decode results after profiling is complete
if decode_results: if decode_results:
...@@ -279,7 +321,9 @@ if __name__ == "__main__": ...@@ -279,7 +321,9 @@ if __name__ == "__main__":
) )
selected_prefill_idx = int(np.argmin(np.array(prefill_ttft))) selected_prefill_idx = int(np.argmin(np.array(prefill_ttft)))
else: else:
valid_indices = [i for i, ttft in enumerate(prefill_ttft) if ttft <= args.ttft] valid_indices = [
i for i, ttft in enumerate(prefill_ttft) if ttft <= args.ttft
]
# Among valid TP sizes, select the one with highest throughput per GPU # Among valid TP sizes, select the one with highest throughput per GPU
valid_thpts = [prefill_thpt_per_gpu[i] for i in valid_indices] valid_thpts = [prefill_thpt_per_gpu[i] for i in valid_indices]
max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))] max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
...@@ -336,34 +380,46 @@ if __name__ == "__main__": ...@@ -336,34 +380,46 @@ if __name__ == "__main__":
f"Profiling prefill under best TP {best_prefill_tp} with different ISL..." f"Profiling prefill under best TP {best_prefill_tp} with different ISL..."
) )
prefill_config = config_modifier.convert_config(config, "prefill") prefill_config = config_modifier.convert_config(config, "prefill")
prefill_config = config_modifier.set_config_tp_size(prefill_config, tp_size) prefill_config = config_modifier.set_config_tp_size(
prefill_config, best_prefill_tp
)
logger.info(f"Dynamo config: {prefill_config}") logger.info(f"Dynamo config: {prefill_config}")
work_dir = f"{args.output_dir}/selected_prefill_interpolation" work_dir = f"{args.output_dir}/selected_prefill_interpolation"
os.makedirs(work_dir, exist_ok=True) os.makedirs(work_dir, exist_ok=True)
prefill_config_fn = f"{work_dir}/config.yaml" prefill_config_fn = f"{work_dir}/config.yaml"
dynamo_log_fn = f"{work_dir}/dynamo.log"
with open(prefill_config_fn, "w") as f: with open(prefill_config_fn, "w") as f:
yaml.dump(prefill_config, f) yaml.dump(prefill_config, f)
# Start the dynamo serve process client = DynamoDeploymentClient(
logger.info(f"Starting dynamo serve with TP size {tp_size}...") namespace=args.namespace,
dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn) base_log_dir=work_dir,
with open(dynamo_log_fn, "w") as dynamo_log_f: model_name=model_name,
dynamo_process = subprocess.Popen( service_name=args.service_name,
dynamo_serve_cmd, frontend_port=frontend_port,
stdout=dynamo_log_f, )
stderr=subprocess.STDOUT, deployment_clients.append(client) # Track for cleanup
text=True, await client.create_deployment(prefill_config_fn)
cwd=args.example_dir, logger.info("Waiting for deployment to be ready...")
preexec_fn=os.setsid, # Use process group for clean termination try:
) await client.wait_for_deployment_ready()
logger.info("Deployment is ready")
if not wait_for_server_ready(model_name, port): skip_profile = False
logger.error(f"Server did not become ready, skip profiling tp={tp_size}") except TimeoutError:
else: logger.error(
"Deployment failed to become ready within timeout, skipping profiling"
)
skip_profile = True
if not skip_profile:
logger.info("Getting deployment logs...")
await client.get_deployment_logs()
logger.info(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
)
base_url = client.get_service_url()
for isl in range( for isl in range(
100, 100,
args.max_context_length, args.max_context_length,
...@@ -372,7 +428,7 @@ if __name__ == "__main__": ...@@ -372,7 +428,7 @@ if __name__ == "__main__":
# run genai-perf # run genai-perf
genai_perf_artifact_dir = f"{work_dir}/gap_isl{isl}" genai_perf_artifact_dir = f"{work_dir}/gap_isl{isl}"
gap_result = benchmark_prefill( gap_result = benchmark_prefill(
isl, genai_perf_artifact_dir, model_name, port isl, genai_perf_artifact_dir, model_name, base_url=base_url
) )
if gap_result is not None: if gap_result is not None:
ttft = gap_result["time_to_first_token"]["avg"] ttft = gap_result["time_to_first_token"]["avg"]
...@@ -380,7 +436,10 @@ if __name__ == "__main__": ...@@ -380,7 +436,10 @@ if __name__ == "__main__":
prefill_ttft.append(ttft) prefill_ttft.append(ttft)
prefill_thpt_per_gpu.append(isl / ttft / best_prefill_tp * 1000) prefill_thpt_per_gpu.append(isl / ttft / best_prefill_tp * 1000)
shutdown_deployment(dynamo_process) print("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
print("Deployment deleted")
# Interpolate prefill_ttft vs prefill_isl with quadratic function (y=ax^2+bx+c) # Interpolate prefill_ttft vs prefill_isl with quadratic function (y=ax^2+bx+c)
if len(prefill_isl) > 2: if len(prefill_isl) > 2:
...@@ -415,36 +474,42 @@ if __name__ == "__main__": ...@@ -415,36 +474,42 @@ if __name__ == "__main__":
z_thpt_per_gpu = [] z_thpt_per_gpu = []
best_decode_tp = decode_tp_size[selected_decode_idx] best_decode_tp = decode_tp_size[selected_decode_idx]
logger.info(f"Profiling decode with TP size {best_decode_tp}...") logger.info(f"Profiling decode with TP size {best_decode_tp}...")
decode_config = config_modifier.set_config_tp_size(decode_config, best_decode_tp) decode_config = config_modifier.set_config_tp_size(
decode_config, best_decode_tp
)
logger.info(f"Dynamo config: {decode_config}") logger.info(f"Dynamo config: {decode_config}")
work_dir = f"{args.output_dir}/selected_decode_interpolation" work_dir = f"{args.output_dir}/selected_decode_interpolation"
os.makedirs(work_dir, exist_ok=True) os.makedirs(work_dir, exist_ok=True)
decode_config_fn = f"{work_dir}/config.yaml" decode_config_fn = f"{work_dir}/config.yaml"
dynamo_log_fn = f"{work_dir}/dynamo.log"
with open(decode_config_fn, "w") as f: with open(decode_config_fn, "w") as f:
yaml.dump(decode_config, f) yaml.dump(decode_config, f)
# Start the dynamo serve process client = DynamoDeploymentClient(
logger.info(f"Starting dynamo serve with TP size {tp_size}...") namespace=args.namespace,
dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn) base_log_dir=work_dir,
with open(dynamo_log_fn, "w") as dynamo_log_f: service_name=args.service_name,
dynamo_process = subprocess.Popen( frontend_port=frontend_port,
dynamo_serve_cmd, )
stdout=dynamo_log_f, deployment_clients.append(client) # Track for cleanup
stderr=subprocess.STDOUT, await client.create_deployment(decode_config_fn)
text=True, logger.info("Waiting for deployment to be ready...")
cwd=args.example_dir, await client.wait_for_deployment_ready()
preexec_fn=os.setsid, # Use process group for clean termination logger.info("Deployment is ready")
)
logger.info("Getting deployment logs...")
if not wait_for_server_ready(model_name, port): await client.get_deployment_logs()
logger.error(f"Server did not become ready, skip profiling tp={tp_size}") logger.info(
else: f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(dynamo_log_fn) )
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/vllm-v1-agg/vllmdecodeworker/0.log"
)
osl = 500 # not too large to reduce ITL variance, not too small to have stable measurement osl = 500 # not too large to reduce ITL variance, not too small to have stable measurement
base_url = client.get_service_url()
for isl in range( for isl in range(
100, 100,
args.max_context_length - osl, args.max_context_length - osl,
...@@ -463,7 +528,12 @@ if __name__ == "__main__": ...@@ -463,7 +528,12 @@ if __name__ == "__main__":
f"{work_dir}/gap_isl{isl}_osl{osl}_n{num_request}" f"{work_dir}/gap_isl{isl}_osl{osl}_n{num_request}"
) )
gap_result = benchmark_decode( gap_result = benchmark_decode(
isl, osl, num_request, genai_perf_artifact_dir, model_name, port isl,
osl,
num_request,
genai_perf_artifact_dir,
model_name,
base_url=base_url,
) )
if gap_result is not None: if gap_result is not None:
itl = gap_result["inter_token_latency"]["avg"] itl = gap_result["inter_token_latency"]["avg"]
...@@ -471,10 +541,13 @@ if __name__ == "__main__": ...@@ -471,10 +541,13 @@ if __name__ == "__main__":
y_context_length.append(isl + osl / 2) y_context_length.append(isl + osl / 2)
z_itl.append(itl) z_itl.append(itl)
z_thpt_per_gpu.append( z_thpt_per_gpu.append(
gap_result["output_token_throughput"]["avg"] / tp_size gap_result["output_token_throughput"]["avg"] / best_decode_tp
) )
shutdown_deployment(dynamo_process) print("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
print("Deployment deleted")
# Save the data points to a .npz file # Save the data points to a .npz file
save_path = f"{work_dir}/raw_data.npz" save_path = f"{work_dir}/raw_data.npz"
...@@ -492,3 +565,111 @@ if __name__ == "__main__": ...@@ -492,3 +565,111 @@ if __name__ == "__main__":
plot_decode_3d_surface( plot_decode_3d_surface(
x_kv_usage, y_context_length, z_itl, best_decode_tp, work_dir x_kv_usage, y_context_length, z_itl, best_decode_tp, work_dir
) )
except Exception as e:
logger.error(f"Profile job failed with error: {e}")
raise
finally:
# Always clean up any remaining deployments, even if the job failed
logger.info("Performing final cleanup of any remaining deployments...")
await cleanup_remaining_deployments(deployment_clients, args.namespace)
logger.info("Final cleanup completed.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Profile the TTFT and ITL of the Prefill and Decode engine with different parallelization mapping. When profiling prefill we mock/fix decode,when profiling decode we mock/fix prefill."
)
parser.add_argument(
"--namespace",
type=str,
default="dynamo-sla-profiler",
help="Kubernetes namespace to deploy the DynamoGraphDeployment",
)
parser.add_argument(
"--backend",
type=str,
default="vllm_v1",
choices=["vllm_v1"],
help="backend type, currently support [vllm_v1]",
)
parser.add_argument(
"--config",
type=str,
required=True,
help="Path to the DynamoGraphDeployment config file",
)
parser.add_argument(
"--example-dir",
type=str,
default=None,
help="path to the example directory, if not provided, will try to infer from config file location",
)
parser.add_argument(
"--output-dir",
type=str,
default="profiling_results",
help="Path to the output results directory",
)
parser.add_argument(
"--min-num-gpus-per-engine",
type=int,
default=1,
help="minimum number of GPUs per engine",
)
parser.add_argument(
"--max-num-gpus-per-engine",
type=int,
default=8,
help="maximum number of GPUs per engine",
)
parser.add_argument(
"--skip-existing-results",
action="store_true",
help="Skip TP sizes that already have results in the output directory",
)
parser.add_argument(
"--force-rerun",
action="store_true",
help="Force re-running all tests even if results already exist (overrides --skip-existing-results)",
)
parser.add_argument(
"--isl", type=int, default=3000, help="target input sequence length"
)
parser.add_argument(
"--osl", type=int, default=500, help="target output sequence length"
)
parser.add_argument(
"--ttft", type=int, default=50, help="target Time To First Token in ms"
)
parser.add_argument(
"--itl", type=int, default=10, help="target Inter Token Latency in ms"
)
# below are arguments used for interpolating TTFT and ITL under different ISL/OSL
parser.add_argument(
"--max-context-length",
type=int,
default=16384,
help="maximum context length supported by the served model",
)
parser.add_argument(
"--prefill-interpolation-granularity",
type=int,
default=16,
help="how many samples to benchmark to interpolate TTFT under different ISL",
)
parser.add_argument(
"--decode-interpolation-granularity",
type=int,
default=6,
help="how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length",
)
parser.add_argument(
"--service-name",
type=str,
default="",
help="Service name for port forwarding (default: {deployment_name}-frontend)",
)
args = parser.parse_args()
asyncio.run(run_profile(args))
...@@ -14,8 +14,11 @@ ...@@ -14,8 +14,11 @@
# limitations under the License. # limitations under the License.
import logging import logging
from copy import deepcopy
from typing import Literal from typing import Literal
from utils.defaults import DEFAULT_MODEL_NAME, DYNAMO_RUN_DEFAULT_PORT
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -29,192 +32,184 @@ console_handler.setFormatter(formatter) ...@@ -29,192 +32,184 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler) logger.addHandler(console_handler)
class VllmV0ConfigModifier: def break_arguments(args: list[str]) -> list[str]:
@classmethod ans = []
def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict: if isinstance(args, str):
config = config.copy() ans = args.split(" ")
else:
# disable planner for arg in args:
if "Planner" in config: ans.extend(arg.split(" "))
config["Planner"]["no-operation"] = True return ans
if target == "prefill":
if WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker in config:
# make PrefillWorker into VllmWorker
del config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker] = config[
WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker
]
del config[WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker]
# to profile prefill, we disable prefix caching def join_arguments(args: list[str]) -> list[str]:
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][ return [" ".join(args)]
"enable-prefix-caching"
] = False
elif target == "decode":
if WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker in config:
del config[WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker]
# to profile prefill, we enable prefix caching to pass the prefill stage
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"enable-prefix-caching"
] = True
# set num workers to 1 def append_argument(args: list[str], to_append) -> list[str]:
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]["ServiceArgs"][ idx = find_arg_index(args)
"workers" if isinstance(to_append, list):
] = 1 args[idx:idx] = to_append
else:
# set PP to 1 args.insert(idx, to_append)
if ( return args
"pipeline-parallel-size"
in config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]
and config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"pipeline-parallel-size"
]
> 1
):
logger.warning("Currently we only support TP, setting PP to 1")
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"pipeline-parallel-size"
] = 1
# always local prefill
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"remote-prefill"
] = False
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"conditional-disagg"
] = False
return config
@classmethod def find_arg_index(args: list[str]) -> int:
def set_config_tp_size(cls, config: dict, tp_size: int): # find the correct index to insert an argument
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][ idx = len(args)
"tensor-parallel-size"
] = tp_size
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]["ServiceArgs"][
"resources"
]["gpu"] = tp_size
return config
@classmethod try:
def get_model_name(cls, config: dict) -> str: new_idx = args.index("|")
if "Common" in config and "served_model_name" in config["Common"]: idx = min(idx, new_idx)
return config["Common"]["served_model_name"] except ValueError:
else: pass
return config["Frontend"]["served_model_name"]
@classmethod
def get_port(cls, config: dict) -> int:
if "Common" in config and "port" in config["Common"]:
return config["Common"]["port"]
else:
return config["Frontend"]["port"]
@classmethod
def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int:
try: try:
with open(dynamo_log_fn, "r") as f: new_idx = args.index("2>&1")
for line in f: idx = min(idx, new_idx)
if "Maximum concurrency for" in line: except ValueError:
line = line.strip().split("Maximum concurrency for ")[1] pass
token_count = int(line.split(" tokens per request: ")[0])
concurrency = float(line.split(" tokens per request: ")[1][:-1])
logger.info( return idx
f"Found KV cache info: {token_count} x {concurrency} = {int(token_count * concurrency)}"
)
return int(token_count * concurrency)
except Exception as e:
logger.warning(
f"Failed to parse KV cache size from line: {line}. Error: {e}"
)
return 0
class VllmV1ConfigModifier: class VllmV1ConfigModifier:
@classmethod @classmethod
def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict: def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict:
config = config.copy() config = deepcopy(config)
# disable planner # set metadata name
if "Planner" in config: config["metadata"]["name"] = "vllm-v1-agg"
config["Planner"]["no-operation"] = True
# turn-off disagg # disable planner
config["SimpleLoadBalancer"]["enable_disagg"] = False if "Planner" in config["spec"]["services"]:
del config["spec"]["services"]["Planner"]
if target == "prefill": if target == "prefill":
if WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker in config: # convert prefill worker into decode worker
# make VllmPrefillWorker into VllmDecodeWorker config["spec"]["services"][
del config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker] WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker] = config[ ] = config["spec"]["services"][
WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker
]
del config["spec"]["services"][
WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker
] ]
del config[WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker]
# to profile prefill, we disable prefix caching args = config["spec"]["services"][
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker
"enable-prefix-caching" ]["extraPodSpec"]["mainContainer"]["args"]
] = False
args = break_arguments(args)
# remove --is-prefill-worker flag
args.remove("--is-prefill-worker")
# disable prefix caching
if "--enable-prefix-caching" in args:
args.remove("--enable-prefix-caching")
if "--no-enable-prefix-caching" not in args:
args = append_argument(args, "--no-enable-prefix-caching")
config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"extraPodSpec"
]["mainContainer"]["args"] = join_arguments(args)
elif target == "decode": elif target == "decode":
if WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker in config: # delete prefill worker
del config[WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker] del config["spec"]["services"][
WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker
]
# to profile prefill, we enable prefix caching to pass the prefill stage args = config["spec"]["services"][
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker
"enable-prefix-caching" ]["extraPodSpec"]["mainContainer"]["args"]
] = True
args = break_arguments(args)
# enable prefix caching
if "--enable-prefix-caching" not in args:
args = append_argument(args, "--enable-prefix-caching")
if "--no-enable-prefix-caching" in args:
args.remove("--no-enable-prefix-caching")
config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"extraPodSpec"
]["mainContainer"]["args"] = join_arguments(args)
# set num workers to 1 # set num workers to 1
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker]["ServiceArgs"][ decode_worker_config = config["spec"]["services"][
"workers" WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker
] = 1
# set PP to 1
if (
"pipeline-parallel-size"
in config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker]
and config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"pipeline-parallel-size"
] ]
> 1 decode_worker_config["replicas"] = 1
):
logger.warning("Currently we only support TP, setting PP to 1")
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"pipeline-parallel-size"
] = 1
return config return config
@classmethod @classmethod
def set_config_tp_size(cls, config: dict, tp_size: int): def set_config_tp_size(cls, config: dict, tp_size: int):
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ config = deepcopy(config)
"tensor-parallel-size"
] = tp_size config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker]["ServiceArgs"][
"resources" "resources"
]["gpu"] = tp_size ]["requests"]["gpu"] = str(tp_size)
config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"resources"
]["limits"]["gpu"] = str(tp_size)
args = config["spec"]["services"][
WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker
]["extraPodSpec"]["mainContainer"]["args"]
args = break_arguments(args)
try:
idx = args.index("--tensor-parallel-size")
args[idx + 1] = str(tp_size)
except ValueError:
args = append_argument(args, ["--tensor-parallel-size", str(tp_size)])
config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"extraPodSpec"
]["mainContainer"]["args"] = join_arguments(args)
return config return config
@classmethod @classmethod
def get_model_name(cls, config: dict) -> str: def get_model_name(cls, config: dict) -> str:
if "Common" in config and "served_model_name" in config["Common"]: worker_name = WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker
return config["Common"]["served_model_name"] args = config["spec"]["services"][worker_name]["extraPodSpec"]["mainContainer"][
else: "args"
return config["Frontend"]["served_model_name"] ]
args = break_arguments(args)
for i, arg in enumerate(args):
if arg == "--model" and i + 1 < len(args):
return args[i + 1]
logger.warning(
f"Model name not found in configuration args, using default model name: {DEFAULT_MODEL_NAME}"
)
return DEFAULT_MODEL_NAME
@classmethod @classmethod
def get_port(cls, config: dict) -> int: def get_port(cls, config: dict) -> int:
if "Common" in config and "port" in config["Common"]: args = config["spec"]["services"]["Frontend"]["extraPodSpec"]["mainContainer"][
return config["Common"]["port"] "args"
else: ]
return config["Frontend"]["port"] args = break_arguments(args)
try:
idx = args.index("--http-port")
return int(args[idx + 1])
except ValueError:
logger.warning(
f"Port not found in configuration args, using default port: {DYNAMO_RUN_DEFAULT_PORT}"
)
return DYNAMO_RUN_DEFAULT_PORT
@classmethod @classmethod
def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int: def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int:
# TODO
try: try:
with open(dynamo_log_fn, "r") as f: with open(dynamo_log_fn, "r") as f:
for line in f: for line in f:
...@@ -237,6 +232,5 @@ class VllmV1ConfigModifier: ...@@ -237,6 +232,5 @@ class VllmV1ConfigModifier:
CONFIG_MODIFIERS = { CONFIG_MODIFIERS = {
"vllm_v0": VllmV0ConfigModifier,
"vllm_v1": VllmV1ConfigModifier, "vllm_v1": VllmV1ConfigModifier,
} }
...@@ -29,3 +29,6 @@ DECODE_NUM_REQUESTS_RANGE = [ ...@@ -29,3 +29,6 @@ DECODE_NUM_REQUESTS_RANGE = [
450, 450,
500, 500,
] ]
DEFAULT_MODEL_NAME = "Qwen/Qwen3-0.6B"
DYNAMO_RUN_DEFAULT_PORT = 8000
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
import time
from pathlib import Path
from typing import Optional, Union
import aiofiles
import httpx # added for HTTP requests
import kubernetes_asyncio as kubernetes
import yaml
from kubernetes_asyncio import client, config
# Example chat completion request for testing deployments
EXAMPLE_CHAT_REQUEST = {
"model": "Qwen/Qwen3-0.6B",
"messages": [
{
"role": "user",
"content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden.",
}
],
"stream": False,
"max_tokens": 30,
}
class DynamoDeploymentClient:
def __init__(
self,
namespace: str,
model_name: str = "Qwen/Qwen3-0.6B",
deployment_name: str = "vllm-v1-agg",
frontend_port: int = 8000,
base_log_dir: Optional[str] = None,
service_name: Optional[str] = None,
):
"""
Initialize the client with the namespace and deployment name.
Args:
namespace: The Kubernetes namespace
deployment_name: Name of the deployment, defaults to vllm-v1-agg
base_log_dir: Base directory for storing logs, defaults to ./logs if not specified
service_name: Service name for connecting to the service, defaults to {deployment_name}-frontend
"""
self.namespace = namespace
self.deployment_name = deployment_name
self.model_name = model_name
self.service_name = service_name or f"{deployment_name}-frontend"
self.components: list[str] = [] # Will store component names from CR
self.deployment_spec: Optional[
dict
] = None # Will store the full deployment spec
self.base_log_dir = Path(base_log_dir) if base_log_dir else Path("logs")
self.frontend_port = frontend_port
def _init_kubernetes(self):
"""Initialize kubernetes client"""
try:
# Try in-cluster config first (for pods with service accounts)
config.load_incluster_config()
except Exception:
# Fallback to kube config file (for local development)
config.load_kube_config()
self.k8s_client = client.ApiClient()
self.custom_api = client.CustomObjectsApi(self.k8s_client)
self.core_api = client.CoreV1Api(self.k8s_client)
def get_service_url(self) -> str:
"""
Get the service URL using Kubernetes service DNS.
"""
service_url = f"http://{self.service_name}.{self.namespace}.svc.cluster.local:{self.frontend_port}"
print(f"Using service URL: {service_url}")
return service_url
async def create_deployment(self, deployment: Union[dict, str]):
"""
Create a DynamoGraphDeployment from either a dict or yaml file path.
Args:
deployment: Either a dict containing the deployment spec or a path to a yaml file
"""
self._init_kubernetes()
if isinstance(deployment, str):
# Load from yaml file
async with aiofiles.open(deployment, "r") as f:
content = await f.read()
self.deployment_spec = yaml.safe_load(content)
else:
self.deployment_spec = deployment
# Extract component names
self.components = [
svc.lower() for svc in self.deployment_spec["spec"]["services"].keys()
]
# Ensure name and namespace are set correctly
self.deployment_spec["metadata"]["name"] = self.deployment_name
self.deployment_spec["metadata"]["namespace"] = self.namespace
try:
await self.custom_api.create_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.namespace,
plural="dynamographdeployments",
body=self.deployment_spec,
)
print(f"Successfully created deployment {self.deployment_name}")
except kubernetes.client.rest.ApiException as e:
if e.status == 409: # Already exists
print(f"Deployment {self.deployment_name} already exists")
else:
print(f"Failed to create deployment {self.deployment_name}: {e}")
raise
async def wait_for_deployment_ready(self, timeout: int = 1800):
"""
Wait for the custom resource to be ready.
Args:
timeout: Maximum time to wait in seconds, default to 30 mins (image pulling can take a while)
"""
start_time = time.time()
# TODO: A little brittle, also should output intermediate status every so often.
while (time.time() - start_time) < timeout:
try:
status = await self.custom_api.get_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.namespace,
plural="dynamographdeployments",
name=self.deployment_name,
)
# Check both conditions:
# 1. Ready condition is True
# 2. State is successful
status_obj = status.get("status", {})
conditions = status_obj.get("conditions", [])
current_state = status_obj.get("state", "unknown")
print(f"Current deployment state: {current_state}")
print(f"Current conditions: {conditions}")
print(f"Elapsed time: {time.time() - start_time:.1f}s / {timeout}s")
ready_condition = False
for condition in conditions:
if (
condition.get("type") == "Ready"
and condition.get("status") == "True"
):
ready_condition = True
break
state_successful = status_obj.get("state") == "successful"
if ready_condition and state_successful:
print(
"Deployment is ready: Ready condition is True and state is successful"
)
return True
else:
print(
f"Deployment not ready yet - Ready condition: {ready_condition}, State successful: {state_successful}"
)
except kubernetes.client.rest.ApiException as e:
print(f"API Exception while checking deployment status: {e}")
print(f"Status code: {e.status}, Reason: {e.reason}")
except Exception as e:
print(f"Unexpected exception while checking deployment status: {e}")
await asyncio.sleep(20)
raise TimeoutError("Deployment failed to become ready within timeout")
async def check_chat_completion(self):
"""
Test the deployment with a chat completion request using httpx.
"""
EXAMPLE_CHAT_REQUEST["model"] = self.model_name
base_url = self.get_service_url()
url = f"{base_url}/v1/chat/completions"
async with httpx.AsyncClient() as client:
response = await client.post(url, json=EXAMPLE_CHAT_REQUEST)
response.raise_for_status()
return response.text
async def get_deployment_logs(self):
"""
Get logs from all pods in the deployment, organized by component.
"""
# Create logs directory
base_dir = self.base_log_dir / self.deployment_name
base_dir.mkdir(parents=True, exist_ok=True)
for component in self.components:
component_dir = base_dir / component
component_dir.mkdir(exist_ok=True)
# List pods for this component using the selector label
# nvidia.com/selector: deployment-name-component
label_selector = (
f"nvidia.com/selector={self.deployment_name}-{component.lower()}"
)
pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace, label_selector=label_selector
)
# Get logs for each pod
for i, pod in enumerate(pods.items):
try:
logs = await self.core_api.read_namespaced_pod_log(
name=pod.metadata.name, namespace=self.namespace
)
async with aiofiles.open(component_dir / f"{i}.log", "w") as f:
await f.write(logs)
except kubernetes.client.rest.ApiException as e:
print(f"Error getting logs for pod {pod.metadata.name}: {e}")
async def delete_deployment(self):
"""
Delete the DynamoGraphDeployment CR.
"""
try:
await self.custom_api.delete_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.namespace,
plural="dynamographdeployments",
name=self.deployment_name,
)
except kubernetes.client.rest.ApiException as e:
if e.status != 404: # Ignore if already deleted
raise
async def cleanup_remaining_deployments(deployment_clients, namespace):
"""Clean up any remaining tracked deployments, handling errors gracefully."""
import logging
logger = logging.getLogger(__name__)
if not deployment_clients:
logger.info("No deployments to clean up")
return
logger.info(f"Cleaning up {len(deployment_clients)} remaining deployments...")
for deployment_client in deployment_clients:
try:
logger.info(
f"Attempting to delete deployment {deployment_client.deployment_name}..."
)
await deployment_client.delete_deployment()
logger.info(
f"Successfully deleted deployment {deployment_client.deployment_name}"
)
except Exception as e:
# If deployment doesn't exist (404), that's fine - it was already cleaned up
if "404" in str(e) or "not found" in str(e).lower():
logger.info(
f"Deployment {deployment_client.deployment_name} was already deleted"
)
else:
logger.error(
f"Failed to delete deployment {deployment_client.deployment_name}: {e}"
)
async def main():
parser = argparse.ArgumentParser(
description="Deploy and manage DynamoGraphDeployment CRDs"
)
parser.add_argument(
"--namespace",
"-n",
required=True,
help="Kubernetes namespace to deploy to (default: default)",
)
parser.add_argument(
"--yaml-file",
"-f",
required=True,
help="Path to the DynamoGraphDeployment YAML file",
)
parser.add_argument(
"--log-dir",
"-l",
default="/tmp/dynamo_logs",
help="Base directory for logs (default: /tmp/dynamo_logs)",
)
parser.add_argument(
"--service-name",
"-s",
help="Service name for connecting to the service (default: {deployment_name}-frontend)",
)
args = parser.parse_args()
# Example usage with parsed arguments
client = DynamoDeploymentClient(
namespace=args.namespace,
base_log_dir=args.log_dir,
service_name=args.service_name,
)
try:
# Create deployment from yaml file
await client.create_deployment(args.yaml_file)
# Wait for deployment to be ready
print("Waiting for deployment to be ready...")
await client.wait_for_deployment_ready()
print("Deployment is ready!")
# Test chat completion
print("Testing chat completion...")
response = await client.check_chat_completion()
print(f"Chat completion response: {response}")
# Get logs
print("Getting deployment logs...")
await client.get_deployment_logs()
print(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}!"
)
finally:
# Cleanup
print("Cleaning up deployment...")
await client.delete_deployment()
print("Deployment deleted!")
# run with:
# uv run benchmarks/profiler/utils/dynamo_deployment.py -n mo-dyn-cloud -f ./examples/vllm/deploy/agg.yaml -l ./client_logs
if __name__ == "__main__":
asyncio.run(main())
...@@ -34,7 +34,7 @@ def _get_common_genai_perf_cmd( ...@@ -34,7 +34,7 @@ def _get_common_genai_perf_cmd(
artifact_dir, artifact_dir,
seed=100, seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B", model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
port=8000, base_url="http://localhost:8000",
): ):
return [ return [
"genai-perf", "genai-perf",
...@@ -49,7 +49,7 @@ def _get_common_genai_perf_cmd( ...@@ -49,7 +49,7 @@ def _get_common_genai_perf_cmd(
"/v1/chat/completions", "/v1/chat/completions",
"--streaming", "--streaming",
"--url", "--url",
f"http://localhost:{port}", base_url,
"--extra-inputs", "--extra-inputs",
"ignore_eos:true", "ignore_eos:true",
"--extra-inputs", "--extra-inputs",
...@@ -69,13 +69,13 @@ def get_prefill_genai_perf_cmd( ...@@ -69,13 +69,13 @@ def get_prefill_genai_perf_cmd(
seed=100, seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B", model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
osl=5, osl=5,
port=8000, base_url="http://localhost:8000",
): ):
return _get_common_genai_perf_cmd( return _get_common_genai_perf_cmd(
artifact_dir, artifact_dir,
seed, seed,
model, model,
port, base_url,
) + [ ) + [
"--synthetic-input-tokens-mean", "--synthetic-input-tokens-mean",
str(isl), str(isl),
...@@ -103,13 +103,13 @@ def get_decode_genai_perf_cmd( ...@@ -103,13 +103,13 @@ def get_decode_genai_perf_cmd(
num_request, num_request,
seed=100, seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B", model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
port=8000, base_url="http://localhost:8000",
): ):
return _get_common_genai_perf_cmd( return _get_common_genai_perf_cmd(
artifact_dir, artifact_dir,
seed, seed,
model, model,
port, base_url,
) + [ ) + [
"--synthetic-input-tokens-mean", "--synthetic-input-tokens-mean",
str(isl), str(isl),
...@@ -146,11 +146,15 @@ def get_gap_result(artifact_dir: str) -> dict: ...@@ -146,11 +146,15 @@ def get_gap_result(artifact_dir: str) -> dict:
return json.load(f) return json.load(f)
def benchmark_prefill(isl, genai_perf_artifact_dir, model_name, port): def benchmark_prefill(
isl, genai_perf_artifact_dir, model_name, base_url="http://localhost:8000"
):
logger.info(f"Running genai-perf with isl {isl}") logger.info(f"Running genai-perf with isl {isl}")
genai_perf_cmd = get_prefill_genai_perf_cmd( genai_perf_cmd = get_prefill_genai_perf_cmd(
isl, genai_perf_artifact_dir, model=model_name, port=port isl, genai_perf_artifact_dir, model=model_name, base_url=base_url
) )
print(f"genai-perf cmd: {genai_perf_cmd}")
# import pdb; pdb.set_trace()
gap_process = subprocess.Popen( gap_process = subprocess.Popen(
genai_perf_cmd, genai_perf_cmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -169,12 +173,20 @@ def benchmark_prefill(isl, genai_perf_artifact_dir, model_name, port): ...@@ -169,12 +173,20 @@ def benchmark_prefill(isl, genai_perf_artifact_dir, model_name, port):
return None return None
def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name, port): def benchmark_decode(
isl,
osl,
num_request,
genai_perf_artifact_dir,
model_name,
base_url="http://localhost:8000",
):
logger.info(f"Profiling decode with num_request {num_request}...") logger.info(f"Profiling decode with num_request {num_request}...")
# first warm-up the engine by pre-computing all prefill tokens # first warm-up the engine by pre-computing all prefill tokens
# we use the same random seed to make sure the prompt is the same # we use the same random seed to make sure the prompt is the same
seed = random.randint(0, 1000000) seed = random.randint(0, 1000000)
genai_perf_cmd = get_decode_genai_perf_cmd( genai_perf_cmd = get_decode_genai_perf_cmd(
isl, isl,
osl, osl,
...@@ -182,7 +194,7 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name, ...@@ -182,7 +194,7 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,
num_request, num_request,
seed=seed, seed=seed,
model=model_name, model=model_name,
port=port, base_url=base_url,
) )
gap_process = subprocess.Popen( gap_process = subprocess.Popen(
genai_perf_cmd, genai_perf_cmd,
...@@ -199,7 +211,7 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name, ...@@ -199,7 +211,7 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,
num_request, num_request,
seed=seed, seed=seed,
model=model_name, model=model_name,
port=port, base_url=base_url,
) )
gap_process = subprocess.Popen( gap_process = subprocess.Popen(
genai_perf_cmd, genai_perf_cmd,
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import json
import logging
import os
import re
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
def check_prefill_results_exist(output_dir: str, tp_size: int, isl: int) -> bool:
"""Check if prefill results already exist for a given TP size."""
work_dir = f"{output_dir}/prefill_tp{tp_size}"
result_file = f"{work_dir}/gap_isl{isl}/*/profile_export_genai_perf.json"
# Check if the work directory exists
if not os.path.exists(work_dir):
return False
# Look for the genai-perf result file
result_files = glob.glob(result_file)
if not result_files:
return False
# Verify the result file has valid data
try:
with open(result_files[0], "r") as f:
data = json.load(f)
# Check if it has the required metrics
if "time_to_first_token" in data and "avg" in data["time_to_first_token"]:
logger.info(
f"Found existing prefill results for TP{tp_size} at {result_files[0]}"
)
return True
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return False
def check_decode_results_exist(
output_dir: str, tp_size: int, isl: int, osl: int
) -> bool:
"""Check if decode results already exist for a given TP size."""
work_dir = f"{output_dir}/decode_tp{tp_size}"
# Check if the work directory exists
if not os.path.exists(work_dir):
return False
# Look for at least one decode result file
result_pattern = (
f"{work_dir}/gap_request*_isl{isl}_osl{osl}_n*/*/profile_export_genai_perf.json"
)
result_files = glob.glob(result_pattern)
if not result_files:
return False
# Verify at least one result file has valid data
try:
with open(result_files[0], "r") as f:
data = json.load(f)
# Check if it has the required metrics
if "inter_token_latency" in data and "avg" in data["inter_token_latency"]:
logger.info(
f"Found existing decode results for TP{tp_size} at {result_files[0]} (and {len(result_files)-1} others)"
)
return True
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return False
def load_existing_prefill_results(
output_dir: str, tp_size: int, isl: int
) -> Tuple[Optional[float], Optional[float]]:
"""Load existing prefill results from disk."""
work_dir = f"{output_dir}/prefill_tp{tp_size}"
result_file = f"{work_dir}/gap_isl{isl}/*/profile_export_genai_perf.json"
result_files = glob.glob(result_file)
if result_files:
try:
with open(result_files[0], "r") as f:
data = json.load(f)
ttft = data["time_to_first_token"]["avg"]
thpt_per_gpu = isl / ttft / tp_size * 1000
return ttft, thpt_per_gpu
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return None, None
def load_existing_decode_results(
output_dir: str, tp_size: int, isl: int, osl: int
) -> List[Tuple[float, float, int]]:
"""Load existing decode results from disk."""
work_dir = f"{output_dir}/decode_tp{tp_size}"
result_pattern = (
f"{work_dir}/gap_request*_isl{isl}_osl{osl}_n*/*/profile_export_genai_perf.json"
)
result_files = glob.glob(result_pattern)
decode_results = []
for result_file in result_files:
try:
with open(result_file, "r") as f:
data = json.load(f)
itl = data["inter_token_latency"]["avg"]
thpt_per_gpu = data["output_token_throughput"]["avg"] / tp_size
# Extract concurrency from filename
match = re.search(r"gap_request(\d+)_", result_file)
if match:
concurrency = int(match.group(1))
decode_results.append((itl, thpt_per_gpu, concurrency))
except (json.JSONDecodeError, KeyError, FileNotFoundError):
continue
return decode_results
...@@ -138,6 +138,17 @@ cd ~/dynamo/components/backends/vllm/deploy ...@@ -138,6 +138,17 @@ cd ~/dynamo/components/backends/vllm/deploy
kubectl apply -f disagg.yaml kubectl apply -f disagg.yaml
``` ```
To change `DYN_LOG` level, edit the yaml file by adding
```yaml
...
spec:
envs:
- name: DYN_LOG
value: "debug" # or other log levels
...
```
### Testing the Deployment ### Testing the Deployment
Send a test request to verify your deployment: Send a test request to verify your deployment:
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
apiVersion: nvidia.com/v1alpha1 apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment kind: DynamoGraphDeployment
metadata: metadata:
name: vllm-v1-agg name: vllm-agg
spec: spec:
services: services:
Frontend: Frontend:
...@@ -26,7 +26,7 @@ spec: ...@@ -26,7 +26,7 @@ spec:
periodSeconds: 60 periodSeconds: 60
timeoutSeconds: 30 timeoutSeconds: 30
failureThreshold: 10 failureThreshold: 10
dynamoNamespace: vllm-v1-agg dynamoNamespace: vllm-agg
componentType: main componentType: main
replicas: 1 replicas: 1
resources: resources:
...@@ -38,7 +38,7 @@ spec: ...@@ -38,7 +38,7 @@ spec:
memory: "2Gi" memory: "2Gi"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
args: args:
- "python3 -m dynamo.frontend --http-port 8000" - "python3 -m dynamo.frontend --http-port 8000"
...@@ -63,7 +63,7 @@ spec: ...@@ -63,7 +63,7 @@ spec:
periodSeconds: 60 periodSeconds: 60
timeoutSeconds: 30 timeoutSeconds: 30
failureThreshold: 10 failureThreshold: 10
dynamoNamespace: vllm-v1-agg dynamoNamespace: vllm-agg
componentType: worker componentType: worker
replicas: 1 replicas: 1
resources: resources:
...@@ -77,7 +77,7 @@ spec: ...@@ -77,7 +77,7 @@ spec:
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - /bin/sh
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
apiVersion: nvidia.com/v1alpha1 apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment kind: DynamoGraphDeployment
metadata: metadata:
name: vllm-v1-agg-router name: vllm-agg-router
spec: spec:
services: services:
Frontend: Frontend:
...@@ -26,7 +26,7 @@ spec: ...@@ -26,7 +26,7 @@ spec:
periodSeconds: 60 periodSeconds: 60
timeoutSeconds: 30 timeoutSeconds: 30
failureThreshold: 10 failureThreshold: 10
dynamoNamespace: vllm-v1-agg-router dynamoNamespace: vllm-agg-router
componentType: main componentType: main
replicas: 1 replicas: 1
resources: resources:
...@@ -38,7 +38,7 @@ spec: ...@@ -38,7 +38,7 @@ spec:
memory: "2Gi" memory: "2Gi"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
args: args:
- "python3 -m dynamo.frontend --http-port 8000 --router-mode kv" - "python3 -m dynamo.frontend --http-port 8000 --router-mode kv"
...@@ -63,7 +63,7 @@ spec: ...@@ -63,7 +63,7 @@ spec:
periodSeconds: 60 periodSeconds: 60
timeoutSeconds: 30 timeoutSeconds: 30
failureThreshold: 10 failureThreshold: 10
dynamoNamespace: vllm-v1-agg-router dynamoNamespace: vllm-agg-router
componentType: worker componentType: worker
replicas: 2 replicas: 2
resources: resources:
...@@ -77,7 +77,7 @@ spec: ...@@ -77,7 +77,7 @@ spec:
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - /bin/sh
......
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
apiVersion: nvidia.com/v1alpha1 apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment kind: DynamoGraphDeployment
metadata: metadata:
name: vllm-v1-disagg name: vllm-disagg
spec: spec:
services: services:
Frontend: Frontend:
dynamoNamespace: vllm-v1-disagg dynamoNamespace: vllm-disagg
componentType: main componentType: main
replicas: 1 replicas: 1
livenessProbe: livenessProbe:
...@@ -31,19 +31,19 @@ spec: ...@@ -31,19 +31,19 @@ spec:
failureThreshold: 10 failureThreshold: 10
resources: resources:
requests: requests:
cpu: "1" cpu: "32"
memory: "2Gi" memory: "10Gi"
limits: limits:
cpu: "1" cpu: "32"
memory: "2Gi" memory: "10Gi"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
args: args:
- "python3 -m dynamo.frontend --http-port 8000" - "python3 -m dynamo.frontend --http-port 8000"
VllmDecodeWorker: VllmDecodeWorker:
dynamoNamespace: vllm-v1-disagg dynamoNamespace: vllm-disagg
envFromSecret: hf-token-secret envFromSecret: hf-token-secret
componentType: worker componentType: worker
replicas: 1 replicas: 1
...@@ -68,16 +68,16 @@ spec: ...@@ -68,16 +68,16 @@ spec:
failureThreshold: 10 failureThreshold: 10
resources: resources:
requests: requests:
cpu: "10" cpu: "32"
memory: "20Gi" memory: "40Gi"
gpu: "1" gpu: "1"
limits: limits:
cpu: "10" cpu: "32"
memory: "20Gi" memory: "40Gi"
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - /bin/sh
...@@ -85,7 +85,7 @@ spec: ...@@ -85,7 +85,7 @@ spec:
args: args:
- "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager 2>&1 | tee /tmp/vllm.log" - "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager 2>&1 | tee /tmp/vllm.log"
VllmPrefillWorker: VllmPrefillWorker:
dynamoNamespace: vllm-v1-disagg dynamoNamespace: vllm-disagg
envFromSecret: hf-token-secret envFromSecret: hf-token-secret
componentType: worker componentType: worker
replicas: 1 replicas: 1
...@@ -110,16 +110,16 @@ spec: ...@@ -110,16 +110,16 @@ spec:
failureThreshold: 10 failureThreshold: 10
resources: resources:
requests: requests:
cpu: "10" cpu: "32"
memory: "20Gi" memory: "40Gi"
gpu: "1" gpu: "1"
limits: limits:
cpu: "10" cpu: "32"
memory: "20Gi" memory: "40Gi"
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - /bin/sh
......
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
apiVersion: nvidia.com/v1alpha1 apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment kind: DynamoGraphDeployment
metadata: metadata:
name: vllm-v1-disagg-planner name: vllm-disagg-planner
spec: spec:
services: services:
Frontend: Frontend:
dynamoNamespace: vllm-v1-disagg-planner dynamoNamespace: vllm-disagg-planner
componentType: main componentType: main
replicas: 1 replicas: 1
livenessProbe: livenessProbe:
...@@ -38,12 +38,12 @@ spec: ...@@ -38,12 +38,12 @@ spec:
memory: "2Gi" memory: "2Gi"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
args: args:
- "python3 -m dynamo.frontend --http-port 8000" - "python3 -m dynamo.frontend --http-port 8000"
VllmDecodeWorker: VllmDecodeWorker:
dynamoNamespace: vllm-v1-disagg-planner dynamoNamespace: vllm-disagg-planner
envFromSecret: hf-token-secret envFromSecret: hf-token-secret
componentType: worker componentType: worker
replicas: 1 replicas: 1
...@@ -77,12 +77,12 @@ spec: ...@@ -77,12 +77,12 @@ spec:
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
args: args:
- "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager 2>&1 | tee /tmp/vllm.log" - "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager 2>&1 | tee /tmp/vllm.log"
VllmPrefillWorker: VllmPrefillWorker:
dynamoNamespace: vllm-v1-disagg-planner dynamoNamespace: vllm-disagg-planner
envFromSecret: hf-token-secret envFromSecret: hf-token-secret
componentType: worker componentType: worker
replicas: 1 replicas: 1
...@@ -116,7 +116,7 @@ spec: ...@@ -116,7 +116,7 @@ spec:
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - /bin/sh
......
...@@ -38,7 +38,7 @@ spec: ...@@ -38,7 +38,7 @@ spec:
memory: "2Gi" memory: "2Gi"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
args: args:
- "python3 -m dynamo.frontend --http-port 8000 --router-mode kv" - "python3 -m dynamo.frontend --http-port 8000 --router-mode kv"
...@@ -77,7 +77,7 @@ spec: ...@@ -77,7 +77,7 @@ spec:
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - /bin/sh
...@@ -119,7 +119,7 @@ spec: ...@@ -119,7 +119,7 @@ spec:
gpu: "1" gpu: "1"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm_v1-runtime:dep-216.4 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - /bin/sh
......
...@@ -34,7 +34,7 @@ PYTHONPATH=/workspace/examples/llm python components/planner.py --namespace <nam ...@@ -34,7 +34,7 @@ PYTHONPATH=/workspace/examples/llm python components/planner.py --namespace <nam
## Local Backend (LocalPlanner) ## Local Backend (LocalPlanner)
The LocalPlanner is built on top of circus, which is what we use to manage component subprocesses when running dynamo serve. LocalPlanner allows the planner component to scale workers up and down based on system metrics. The LocalPlanner is built on top of circus, which is what we use to manage component subprocesses when running with the frontend and workers. LocalPlanner allows the planner component to scale workers up and down based on system metrics.
**Current limitations** **Current limitations**
1. Single node only 1. Single node only
...@@ -78,7 +78,7 @@ The planner architecture is designed to be simple and extensible: ...@@ -78,7 +78,7 @@ The planner architecture is designed to be simple and extensible:
### Statefile ### Statefile
The statefile maintains the current state of all running workers and is used by the LocalPlanner to track and modify the deployment. It's stored at `~/.dynamo/state/{namespace}.json` (or in the directory specified by `DYN_LOCAL_STATE_DIR`). The statefile is automatically created when you run dynamo serve and is cleaned up when the arbiter terminates. Each worker is identified as `{namespace}_{component_name}` with an optional numeric suffix for additional instances. The statefile maintains the current state of all running workers and is used by the LocalPlanner to track and modify the deployment. It's stored at `~/.dynamo/state/{namespace}.json` (or in the directory specified by `DYN_LOCAL_STATE_DIR`). The statefile is automatically created when you run the frontend with workers and is cleaned up when the arbiter terminates. Each worker is identified as `{namespace}_{component_name}` with an optional numeric suffix for additional instances.
#### Example: Adding and Removing Workers #### Example: Adding and Removing Workers
......
...@@ -77,3 +77,22 @@ class KubernetesConnector(PlannerConnector): ...@@ -77,3 +77,22 @@ class KubernetesConnector(PlannerConnector):
def _get_graph_deployment_name(self, deployment: dict) -> str: def _get_graph_deployment_name(self, deployment: dict) -> str:
"""Get the name of the graph deployment""" """Get the name of the graph deployment"""
return deployment["metadata"]["name"] return deployment["metadata"]["name"]
if __name__ == "__main__":
import argparse
import asyncio
parser = argparse.ArgumentParser()
parser.add_argument("--namespace", type=str, default="dynamo")
parser.add_argument("--action", type=str, choices=["add", "remove"])
parser.add_argument("--component", type=str, default="planner")
parser.add_argument("--blocking", action="store_true")
args = parser.parse_args()
connector = KubernetesConnector(args.namespace)
if args.action == "add":
task = connector.add_component(args.component, args.blocking)
elif args.action == "remove":
task = connector.remove_component(args.component, args.blocking)
asyncio.run(task)
...@@ -14,13 +14,16 @@ ...@@ -14,13 +14,16 @@
# limitations under the License. # limitations under the License.
accelerate==1.6.0 accelerate==1.6.0
aiofiles
av==15.0.0 av==15.0.0
fastapi==0.115.6 fastapi==0.115.6
ftfy ftfy
genai-perf==0.0.15 genai-perf==0.0.15
grpcio-tools==1.66.0 grpcio-tools==1.66.0
httpx httpx
kr8s
kubernetes==32.0.1 kubernetes==32.0.1
kubernetes_asyncio
matplotlib matplotlib
msgspec msgspec
mypy mypy
...@@ -43,5 +46,6 @@ sentencepiece ...@@ -43,5 +46,6 @@ sentencepiece
tensorboard==2.19.0 tensorboard==2.19.0
tensorboardX==2.6.2.2 tensorboardX==2.6.2.2
transformers transformers
types-aiofiles
types-PyYAML types-PyYAML
uvicorn uvicorn
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment