Unverified Commit 7ff10067 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

refactor: break profile_sla into different files; feat: support vllm_v1 (#1588)


Signed-off-by: default avatarHongkuan Zhou <tedzhouhk@gmail.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent d2bec6f8
This diff is collapsed.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Literal
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class VllmV0ConfigModifier:
@classmethod
def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict:
config = config.copy()
# disable planner
if "Planner" in config:
config["Planner"]["no-operation"] = True
if target == "prefill":
if WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker in config:
# make PrefillWorker into VllmWorker
del config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker] = config[
WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker
]
del config[WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker]
# to profile prefill, we disable prefix caching
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"enable-prefix-caching"
] = False
elif target == "decode":
if WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker in config:
del config[WORKER_COMPONENT_NAMES["vllm_v0"].prefill_worker]
# to profile prefill, we enable prefix caching to pass the prefill stage
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"enable-prefix-caching"
] = True
# set num workers to 1
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]["ServiceArgs"][
"workers"
] = 1
# set PP to 1
if (
"pipeline-parallel-size"
in config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]
and config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"pipeline-parallel-size"
]
> 1
):
logger.warning("Currently we only support TP, setting PP to 1")
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"pipeline-parallel-size"
] = 1
# always local prefill
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"remote-prefill"
] = False
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"conditional-disagg"
] = False
return config
@classmethod
def set_config_tp_size(cls, config: dict, tp_size: int):
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker][
"tensor-parallel-size"
] = tp_size
config[WORKER_COMPONENT_NAMES["vllm_v0"].decode_worker]["ServiceArgs"][
"resources"
]["gpu"] = tp_size
return config
@classmethod
def get_model_name(cls, config: dict) -> str:
if "Common" in config and "served_model_name" in config["Common"]:
return config["Common"]["served_model_name"]
else:
return config["Frontend"]["served_model_name"]
@classmethod
def get_port(cls, config: dict) -> int:
if "Common" in config and "port" in config["Common"]:
return config["Common"]["port"]
else:
return config["Frontend"]["port"]
@classmethod
def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int:
try:
with open(dynamo_log_fn, "r") as f:
for line in f:
if "Maximum concurrency for" in line:
line = line.strip().split("Maximum concurrency for ")[1]
token_count = int(line.split(" tokens per request: ")[0])
concurrency = float(line.split(" tokens per request: ")[1][:-1])
logger.info(
f"Found KV cache info: {token_count} x {concurrency} = {int(token_count * concurrency)}"
)
return int(token_count * concurrency)
except Exception as e:
logger.warning(
f"Failed to parse KV cache size from line: {line}. Error: {e}"
)
return 0
class VllmV1ConfigModifier:
@classmethod
def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict:
config = config.copy()
# disable planner
if "Planner" in config:
config["Planner"]["no-operation"] = True
# turn-off disagg
config["SimpleLoadBalancer"]["enable_disagg"] = False
if target == "prefill":
if WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker in config:
# make VllmPrefillWorker into VllmDecodeWorker
del config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker]
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker] = config[
WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker
]
del config[WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker]
# to profile prefill, we disable prefix caching
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"enable-prefix-caching"
] = False
elif target == "decode":
if WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker in config:
del config[WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker]
# to profile prefill, we enable prefix caching to pass the prefill stage
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"enable-prefix-caching"
] = True
# set num workers to 1
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker]["ServiceArgs"][
"workers"
] = 1
# set PP to 1
if (
"pipeline-parallel-size"
in config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker]
and config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"pipeline-parallel-size"
]
> 1
):
logger.warning("Currently we only support TP, setting PP to 1")
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"pipeline-parallel-size"
] = 1
return config
@classmethod
def set_config_tp_size(cls, config: dict, tp_size: int):
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][
"tensor-parallel-size"
] = tp_size
config[WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker]["ServiceArgs"][
"resources"
]["gpu"] = tp_size
return config
@classmethod
def get_model_name(cls, config: dict) -> str:
if "Common" in config and "served_model_name" in config["Common"]:
return config["Common"]["served_model_name"]
else:
return config["Frontend"]["served_model_name"]
@classmethod
def get_port(cls, config: dict) -> int:
if "Common" in config and "port" in config["Common"]:
return config["Common"]["port"]
else:
return config["Frontend"]["port"]
@classmethod
def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int:
try:
with open(dynamo_log_fn, "r") as f:
for line in f:
if "Maximum concurrency for" in line:
line = line.strip().split("Maximum concurrency for ")[1]
token_count = int(
line.split(" tokens per request: ")[0].replace(",", "")
)
concurrency = float(line.split(" tokens per request: ")[1][:-1])
logger.info(
f"Found KV cache info: {token_count} x {concurrency} = {int(token_count * concurrency)}"
)
return int(token_count * concurrency)
except Exception as e:
logger.warning(
f"Failed to parse KV cache size from line: {line}. Error: {e}"
)
return 0
CONFIG_MODIFIERS = {
"vllm_v0": VllmV0ConfigModifier,
"vllm_v1": VllmV1ConfigModifier,
}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
DECODE_NUM_REQUESTS_RANGE = [
1,
5,
10,
25,
50,
100,
150,
200,
250,
300,
350,
400,
450,
500,
]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import random
import subprocess
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def _get_common_genai_perf_cmd(
artifact_dir,
seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
port=8000,
):
return [
"genai-perf",
"profile",
"--model",
model,
"--tokenizer",
model,
"--endpoint-type",
"chat",
"--endpoint",
"/v1/chat/completions",
"--streaming",
"--url",
f"http://localhost:{port}",
"--extra-inputs",
"ignore_eos:true",
"--extra-inputs",
'{"nvext":{"ignore_eos":true}}',
"--warmup-request-count",
"3",
"--artifact-dir",
artifact_dir,
"--random-seed",
str(seed),
]
def get_prefill_genai_perf_cmd(
isl,
artifact_dir,
seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
osl=5,
port=8000,
):
return _get_common_genai_perf_cmd(
artifact_dir,
seed,
model,
port,
) + [
"--synthetic-input-tokens-mean",
str(isl),
"--synthetic-input-tokens-stddev",
"0",
"--output-tokens-mean",
str(osl),
"--output-tokens-stddev",
"0",
"--extra-inputs",
f"max_tokens:{osl}",
"--extra-inputs",
f"min_tokens:{osl}",
"--concurrency",
"1",
"--request-count",
"1",
]
def get_decode_genai_perf_cmd(
isl,
osl,
artifact_dir,
num_request,
seed=100,
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
port=8000,
):
return _get_common_genai_perf_cmd(
artifact_dir,
seed,
model,
port,
) + [
"--synthetic-input-tokens-mean",
str(isl),
"--synthetic-input-tokens-stddev",
"0",
"--output-tokens-mean",
str(osl),
"--output-tokens-stddev",
"0",
"--extra-inputs",
f"max_tokens:{osl}",
"--extra-inputs",
f"min_tokens:{osl}",
"--concurrency",
str(num_request),
"--num-dataset-entries",
str(num_request),
"--request-count",
str(num_request),
]
def get_gap_result(artifact_dir: str) -> dict:
json_file_path = None
for root, _, files in os.walk(artifact_dir):
if "profile_export_genai_perf.json" in files:
json_file_path = os.path.join(root, "profile_export_genai_perf.json")
break
if json_file_path is None:
raise FileNotFoundError(
f"profile_export_genai_perf.json not found in {artifact_dir}"
)
with open(json_file_path, "r") as f:
return json.load(f)
def benchmark_prefill(isl, genai_perf_artifact_dir, model_name, port):
logger.info(f"Running genai-perf with isl {isl}")
genai_perf_cmd = get_prefill_genai_perf_cmd(
isl, genai_perf_artifact_dir, model=model_name, port=port
)
gap_process = subprocess.Popen(
genai_perf_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = gap_process.communicate()
if gap_process.returncode == 0:
logger.info("Genai-perf profiling completed successfully")
logger.info(stdout)
gap_result = get_gap_result(genai_perf_artifact_dir)
return gap_result
else:
logger.error(f"Genai-perf failed with error code: {gap_process.returncode}")
logger.error(f"stderr: {stderr}")
return None
def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name, port):
logger.info(f"Profiling decode with num_request {num_request}...")
# first warm-up the engine by pre-computing all prefill tokens
# we use the same random seed to make sure the prompt is the same
seed = random.randint(0, 1000000)
genai_perf_cmd = get_decode_genai_perf_cmd(
isl,
osl,
f"{genai_perf_artifact_dir}_warmup",
num_request,
seed=seed,
model=model_name,
port=port,
)
gap_process = subprocess.Popen(
genai_perf_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
gap_process.communicate()
# then send out the real requests, hopefully, this will skip all prefill computation
genai_perf_cmd = get_decode_genai_perf_cmd(
isl,
osl,
genai_perf_artifact_dir,
num_request,
seed=seed,
model=model_name,
port=port,
)
gap_process = subprocess.Popen(
genai_perf_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = gap_process.communicate()
if gap_process.returncode == 0:
logger.info("Genai-perf profiling completed successfully")
logger.info(stdout)
gap_result = get_gap_result(genai_perf_artifact_dir)
return gap_result
else:
logger.error(f"Genai-perf failed with error code: {gap_process.returncode}")
logger.error(f"stderr: {stderr}")
return None
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import cm
from scipy.interpolate import griddata
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def plot_prefill_performance(
prefill_tp_size, prefill_ttft, prefill_thpt_per_gpu, target_ttft, output_dir
):
"""
Plot prefill performance as a 2D scatter plot with TP size annotations.
Args:
prefill_tp_size: list of TP sizes
prefill_ttft: list of time to first token values
prefill_thpt_per_gpu: list of throughput per GPU values
target_ttft: target TTFT value for the vertical line
output_dir: directory to save the plot
"""
plt.figure(figsize=(10, 6))
plt.scatter(prefill_ttft, prefill_thpt_per_gpu, s=100)
for i, tp in enumerate(prefill_tp_size):
plt.annotate(
f"TP{tp}",
(prefill_ttft[i], prefill_thpt_per_gpu[i]),
xytext=(10, 0),
textcoords="offset points",
fontsize=10,
)
plt.axvline(
x=target_ttft, color="r", linestyle="--", label=f"Target TTFT: {target_ttft} ms"
)
plt.legend()
plt.title("Prefill Performance")
plt.xlabel("Time to First Token (ms)")
plt.ylabel("Prefill throughput per GPU (tokens/s/GPU)")
plt.grid(True)
plot_path = f"{output_dir}/prefill_performance.png"
plt.savefig(plot_path, dpi=300)
logger.info(f"Performance plot saved to {plot_path}")
plt.close()
def plot_decode_performance(decode_results, target_itl, output_dir):
"""
Plot decode performance with multiple TP size lines.
Args:
decode_results: list of tuples (tp_size, itl_list, thpt_per_gpu_list)
target_itl: target ITL value for the vertical line
output_dir: directory to save the plot
"""
plt.figure(figsize=(10, 6))
for tp_size, itl_list, thpt_per_gpu_list in decode_results:
plt.plot(itl_list, thpt_per_gpu_list, label=f"TP{tp_size}")
plt.axvline(
x=target_itl, color="r", linestyle="--", label=f"Target ITL: {target_itl} ms"
)
plt.legend()
plt.title("Decode Performance")
plt.xlabel("Inter Token Latency (ms)")
plt.ylabel("Decode throughput per GPU (tokens/s/GPU)")
plt.grid(True)
plot_path = f"{output_dir}/decode_performance.png"
plt.savefig(plot_path, dpi=300)
logger.info(f"Performance plot saved to {plot_path}")
plt.close()
def plot_prefill_interpolation(
prefill_isl_np, prefill_ttft_np, prefill_thpt_per_gpu_np, work_dir
):
"""
Plot TTFT and throughput vs ISL with quadratic interpolation.
Args:
prefill_isl_np: numpy array of input sequence lengths
prefill_ttft_np: numpy array of time to first token values
prefill_thpt_per_gpu_np: numpy array of throughput per GPU values
work_dir: directory to save plots
"""
# Fit quadratic functions
ttft_coeffs = np.polyfit(prefill_isl_np, prefill_ttft_np, 2)
thpt_coeffs = np.polyfit(prefill_isl_np, prefill_thpt_per_gpu_np, 2)
# Create interpolation functions
ttft_poly = np.poly1d(ttft_coeffs)
thpt_poly = np.poly1d(thpt_coeffs)
# Generate points for smooth curves
x_interp = np.linspace(min(prefill_isl_np), max(prefill_isl_np), 100)
ttft_interp = ttft_poly(x_interp)
thpt_interp = thpt_poly(x_interp)
# Plot TTFT vs ISL
plt.figure(figsize=(10, 6))
plt.scatter(prefill_isl_np, prefill_ttft_np, s=100, label="Measured data")
plt.plot(
x_interp,
ttft_interp,
"r-",
label=f"Quadratic fit: {ttft_coeffs[0]:.2e}x² + {ttft_coeffs[1]:.2e}x + {ttft_coeffs[2]:.2e}",
)
plt.title("Prefill TTFT vs Input Sequence Length")
plt.xlabel("Input Sequence Length (tokens)")
plt.ylabel("Time to First Token (ms)")
plt.grid(True)
plt.legend()
ttft_plot_path = f"{work_dir}/prefill_ttft_interpolation.png"
plt.savefig(ttft_plot_path, dpi=300)
logger.info(f"TTFT interpolation plot saved to {ttft_plot_path}")
plt.close()
# Plot Throughput vs ISL
plt.figure(figsize=(10, 6))
plt.scatter(prefill_isl_np, prefill_thpt_per_gpu_np, s=100, label="Measured data")
plt.plot(
x_interp,
thpt_interp,
"g-",
label=f"Quadratic fit: {thpt_coeffs[0]:.2e}x² + {thpt_coeffs[1]:.2e}x + {thpt_coeffs[2]:.2e}",
)
plt.title("Prefill Throughput vs Input Sequence Length")
plt.xlabel("Input Sequence Length (tokens)")
plt.ylabel("Prefill throughput per GPU (tokens/s/GPU)")
plt.grid(True)
plt.legend()
thpt_plot_path = f"{work_dir}/prefill_throughput_interpolation.png"
plt.savefig(thpt_plot_path, dpi=300)
logger.info(
f"Prefill throughput per GPU interpolation plot saved to {thpt_plot_path}"
)
plt.close()
def plot_decode_3d_surface(x_kv_usage, y_context_length, z_itl, tp_size, work_dir):
"""
Plot 3D surface for decode interpolation with KV usage, context length, and ITL.
Args:
x_kv_usage: list of KV usage percentages
y_context_length: list of context lengths
z_itl: list of ITL values
tp_size: TP size for the plot filename
work_dir: directory to save the plot
"""
xi = np.linspace(min(x_kv_usage), max(x_kv_usage), 100)
yi = np.linspace(min(y_context_length), max(y_context_length), 100)
X, Y = np.meshgrid(xi, yi)
Z = griddata((x_kv_usage, y_context_length), z_itl, (X, Y), method="cubic")
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(111, projection="3d") # type: ignore
# Create the surface plot with customizations
surf = ax.plot_surface( # type: ignore
X,
Y,
Z,
cmap=cm.coolwarm, # type: ignore
linewidth=0.2,
antialiased=True,
alpha=0.8,
)
# Add a color bar with custom settings
cbar = fig.colorbar(surf, ax=ax, shrink=0.5, aspect=5)
cbar.set_label("Z Value", fontsize=12)
cbar.ax.tick_params(labelsize=10)
# Add labels with custom font sizes
ax.set_xlabel("Active KV Percentage", fontsize=12)
ax.set_ylabel("Decode Context Length", fontsize=12)
ax.set_zlabel("ITL", fontsize=12) # type: ignore
# Set viewing angle
ax.view_init(elev=30, azim=45) # type: ignore
ax.grid(True)
ax.tick_params(axis="both", which="major", labelsize=10)
plot_path = f"{work_dir}/decode_tp{tp_size}.png"
logger.info(f"Saving ITL surface plot to {plot_path}")
plt.savefig(plot_path, dpi=300, bbox_inches="tight")
plt.close()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import signal
import subprocess
import time
import pynvml
import requests
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def get_dynamo_serve_cmd(config_file_path):
config_file_path = os.path.abspath(config_file_path)
return [
"dynamo",
"serve",
"graphs.agg:Frontend",
"-f",
config_file_path,
]
def get_available_gpu_count():
try:
pynvml.nvmlInit()
gpu_count = pynvml.nvmlDeviceGetCount()
if gpu_count > 0:
logger.info(f"Detected {gpu_count} GPUs in the system:")
for i in range(gpu_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
name = pynvml.nvmlDeviceGetName(handle)
memory = pynvml.nvmlDeviceGetMemoryInfo(handle)
total_memory_mb = memory.total / (1024 * 1024)
free_memory_mb = memory.free / (1024 * 1024)
logger.info(
f" GPU {i}: {name}, Total Memory: {total_memory_mb:.2f} MB, Free Memory: {free_memory_mb:.2f} MB"
)
else:
logger.warning("No GPUs detected with pynvml.")
pynvml.nvmlShutdown()
return gpu_count
except ImportError:
logger.error(
"pynvml module not found. Please install it with 'pip install pynvml'"
)
return 0
except pynvml.NVMLError as e:
logger.error(f"NVML Error: {e}")
return 0
except Exception as e:
logger.error(f"Error detecting GPUs: {e}")
return 0
def shutdown_deployment(dynamo_process):
os.killpg(os.getpgid(dynamo_process.pid), signal.SIGINT)
dynamo_process.communicate()
try:
current_pid = os.getpid()
ps_cmd = ["ps", "-ef"]
ps_output = subprocess.check_output(ps_cmd, text=True)
for line in ps_output.splitlines():
if "python" in line.lower():
parts = line.split()
if len(parts) >= 2:
try:
pid = int(parts[1])
if pid != current_pid: # Exclude current process
os.kill(pid, signal.SIGKILL)
except ValueError:
continue
except Exception as e:
logger.error(f"Error killing Python processes: {e}")
time.sleep(5)
def wait_for_server_ready(model_name: str, port: int, timeout: int = 300):
logger.info("Waiting for the server to be ready...")
endpoint_url = f"http://localhost:{port}/v1/chat/completions"
start_time = time.time()
server_ready = False
while time.time() - start_time < timeout:
try:
# Send a simple request to check if the server is up
response = requests.post(
endpoint_url,
json={
"model": model_name,
"messages": [{"role": "user", "content": "Hello"}],
"max_tokens": 1,
},
timeout=5,
)
if response.status_code != 200:
logger.info(
f"Server returned status code {response.status_code}, waiting..."
)
time.sleep(5)
continue
logger.info(f"Server is ready after {time.time() - start_time:.2f} seconds")
server_ready = True
break
except (requests.RequestException, ConnectionError) as e:
logger.info(f"Server not ready yet: {e}")
time.sleep(5)
return server_ready
......@@ -44,3 +44,19 @@ class SLAPlannerDefaults(BasePlannerDefaults):
itl = 0.05 # in seconds
load_predictor = "arima" # ["constant", "arima", "prophet"]
load_prediction_window_size = 50 # predict load using how many recent load samples
class VllmV0ComponentName:
prefill_worker = "PrefillWorker"
decode_worker = "VllmWorker"
class VllmV1ComponentName:
prefill_worker = "VllmPrefillWorker"
decode_worker = "VllmDecodeWorker"
WORKER_COMPONENT_NAMES = {
"vllm_v0": VllmV0ComponentName,
"vllm_v1": VllmV1ComponentName,
}
......@@ -32,13 +32,14 @@ logger = logging.getLogger(__name__)
class LocalConnector(PlannerConnector):
def __init__(self, namespace: str, runtime: DistributedRuntime):
def __init__(self, namespace: str, runtime: DistributedRuntime, backend: str):
"""
Initialize LocalConnector and connect to CircusController.
Args:
namespace: The Dynamo namespace
runtime: Optional DistributedRuntime instance
backend: The backend to use ("vllm_v0", "vllm_v1")
"""
self.namespace = namespace
self.runtime = runtime
......
......@@ -27,7 +27,8 @@ We assume there is no piggy-backed prefill requests in the decode engine. Even i
```bash
cd $DYNAMO_HOME/benchmarks/profiler/
python -m utils.profile_sla \
python -m profile_sla \
--backend <vllm_v0/vllm_v1> \
--config <path-to-dynamo-config-file> \
--output-dir <path-to-profile-results-dir> \
--isl <target-isl> \
......
......@@ -29,7 +29,8 @@ Before using the SLA planner, you must profile the performance of the selected m
```bash
cd $DYNAMO_HOME/benchmarks/profiler/
python -m utils.profile_sla \
python -m profile_sla \
--backend <vllm_v0/vllm_v1> \
--config <path-to-dynamo-config-file> \
--output-dir <path-to-profile-results-dir> \
--isl <target-input-sequence-length> \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment