Unverified Commit 7cfd966e authored by MatejKosec's avatar MatejKosec Committed by GitHub
Browse files

feat: SLA Planner configuration support for DEP and TEP VLLM (#4783)

Extends the MOE planner profiler to support  TEP (tensor expert parallel) and DEP (data expert parallel) configs with the vllm backend
parent 31189eea
...@@ -284,7 +284,9 @@ async def run_profile(args): ...@@ -284,7 +284,9 @@ async def run_profile(args):
deployment_clients.append(client) # Track for cleanup deployment_clients.append(client) # Track for cleanup
await client.create_deployment(prefill_config_fn) await client.create_deployment(prefill_config_fn)
logger.info("Waiting for deployment to be ready...") logger.info("Waiting for deployment to be ready...")
await client.wait_for_deployment_ready() await client.wait_for_deployment_ready(
timeout=getattr(args, "deployment_timeout", 1800)
)
logger.info("Deployment is ready") logger.info("Deployment is ready")
logger.info("Getting deployment logs...") logger.info("Getting deployment logs...")
...@@ -387,7 +389,9 @@ async def run_profile(args): ...@@ -387,7 +389,9 @@ async def run_profile(args):
deployment_clients.append(client) # Track for cleanup deployment_clients.append(client) # Track for cleanup
await client.create_deployment(decode_config_fn) await client.create_deployment(decode_config_fn)
logger.info("Waiting for deployment to be ready...") logger.info("Waiting for deployment to be ready...")
await client.wait_for_deployment_ready() await client.wait_for_deployment_ready(
timeout=getattr(args, "deployment_timeout", 1800)
)
logger.info("Deployment is ready") logger.info("Deployment is ready")
logger.info("Getting deployment logs...") logger.info("Getting deployment logs...")
...@@ -603,7 +607,9 @@ async def run_profile(args): ...@@ -603,7 +607,9 @@ async def run_profile(args):
await client.create_deployment(prefill_config_fn) await client.create_deployment(prefill_config_fn)
logger.info("Waiting for deployment to be ready...") logger.info("Waiting for deployment to be ready...")
try: try:
await client.wait_for_deployment_ready() await client.wait_for_deployment_ready(
timeout=getattr(args, "deployment_timeout", 1800)
)
logger.info("Deployment is ready") logger.info("Deployment is ready")
skip_profile = False skip_profile = False
......
...@@ -40,6 +40,7 @@ class Container(BaseModel): ...@@ -40,6 +40,7 @@ class Container(BaseModel):
workingDir: Optional[str] = None workingDir: Optional[str] = None
command: Optional[list[str]] = None command: Optional[list[str]] = None
args: Optional[list[str]] = None args: Optional[list[str]] = None
resources: Optional[dict] = None # For RDMA/custom resources
model_config = {"extra": "allow"} model_config = {"extra": "allow"}
......
...@@ -11,6 +11,7 @@ from benchmarks.profiler.utils.config import ( ...@@ -11,6 +11,7 @@ from benchmarks.profiler.utils.config import (
break_arguments, break_arguments,
get_service_name_by_type, get_service_name_by_type,
get_worker_service_from_config, get_worker_service_from_config,
remove_valued_arguments,
set_argument_value, set_argument_value,
setup_worker_service_resources, setup_worker_service_resources,
update_image, update_image,
...@@ -60,15 +61,12 @@ class VllmV1ConfigModifier(BaseConfigModifier): ...@@ -60,15 +61,12 @@ class VllmV1ConfigModifier(BaseConfigModifier):
target: EngineType, target: EngineType,
is_moe_model: bool = False, is_moe_model: bool = False,
) -> dict: ) -> dict:
if is_moe_model:
raise NotImplementedError(
"MoE model support is not implemented for VLLM backend"
)
cfg = Config.model_validate(config) cfg = Config.model_validate(config)
# MoE flags (--enable-expert-parallel) are set in set_config_tep_size/set_config_dep_size
# set metadata name # set metadata name
cfg.metadata.name = "vllm-agg" cfg.metadata.name = "agg"
# disable planner # disable planner
if "Planner" in cfg.spec.services: if "Planner" in cfg.spec.services:
...@@ -172,11 +170,9 @@ class VllmV1ConfigModifier(BaseConfigModifier): ...@@ -172,11 +170,9 @@ class VllmV1ConfigModifier(BaseConfigModifier):
args = validate_and_get_worker_args(worker_service, backend="vllm") args = validate_and_get_worker_args(worker_service, backend="vllm")
args = break_arguments(args) args = break_arguments(args)
try: # Remove --tp alias if present, use --tensor-parallel-size as canonical form
idx = args.index("--tensor-parallel-size") args = remove_valued_arguments(args, "--tp")
args[idx + 1] = str(tp_size) args = set_argument_value(args, "--tensor-parallel-size", str(tp_size))
except ValueError:
args = append_argument(args, ["--tensor-parallel-size", str(tp_size)])
worker_service.extraPodSpec.mainContainer.args = args worker_service.extraPodSpec.mainContainer.args = args
...@@ -190,10 +186,44 @@ class VllmV1ConfigModifier(BaseConfigModifier): ...@@ -190,10 +186,44 @@ class VllmV1ConfigModifier(BaseConfigModifier):
num_gpus_per_node: int, num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE, component_type: SubComponentType = SubComponentType.DECODE,
): ):
raise NotImplementedError( """
"TEP (Tensor Expert Parallelism) is not implemented for VLLM backend" Set Tensor Expert Parallelism (TEP) for vLLM MoE models.
vLLM derives expert parallelism size automatically:
expert_parallel_size = tensor_parallel_size * data_parallel_size
For TEP: TP=tep_size, DP=1 → EP size = tep_size
"""
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="vllm", sub_component_type=component_type
) )
# Set up resources with multinode configuration
setup_worker_service_resources(worker_service, tep_size, num_gpus_per_node)
# Get and validate args
args = validate_and_get_worker_args(worker_service, backend="vllm")
args = break_arguments(args)
# Remove aliases, use canonical forms
args = remove_valued_arguments(args, "--tp")
args = set_argument_value(args, "--tensor-parallel-size", str(tep_size))
args = remove_valued_arguments(args, "--dp")
args = set_argument_value(args, "--data-parallel-size", "1")
# Remove hybrid load balancing flags - not compatible with DP=1
args = remove_valued_arguments(args, "--data-parallel-size-local")
if "--data-parallel-hybrid-lb" in args:
args.remove("--data-parallel-hybrid-lb")
# Enable expert parallel for MoE
if "--enable-expert-parallel" not in args:
args = append_argument(args, "--enable-expert-parallel")
worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump()
@classmethod @classmethod
def set_config_dep_size( def set_config_dep_size(
cls, cls,
...@@ -202,10 +232,52 @@ class VllmV1ConfigModifier(BaseConfigModifier): ...@@ -202,10 +232,52 @@ class VllmV1ConfigModifier(BaseConfigModifier):
num_gpus_per_node: int, num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE, component_type: SubComponentType = SubComponentType.DECODE,
): ):
raise NotImplementedError( """
"DEP (Data Expert Parallelism) is not implemented for VLLM backend" Set Data Expert Parallelism (DEP) for vLLM MoE models.
vLLM derives expert parallelism size automatically:
expert_parallel_size = tensor_parallel_size * data_parallel_size
For DEP: TP=1, DP=dep_size → EP size = dep_size
"""
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="vllm", sub_component_type=component_type
) )
# Set up resources with multinode configuration
setup_worker_service_resources(worker_service, dep_size, num_gpus_per_node)
# Get and validate args
args = validate_and_get_worker_args(worker_service, backend="vllm")
args = break_arguments(args)
# Remove aliases, use canonical forms
args = remove_valued_arguments(args, "--tp")
args = set_argument_value(args, "--tensor-parallel-size", "1")
args = remove_valued_arguments(args, "--dp")
args = set_argument_value(args, "--data-parallel-size", str(dep_size))
# Handle hybrid load balancing for multinode DEP
# If dep_size > num_gpus_per_node, we need multinode and can use hybrid-lb
if dep_size > num_gpus_per_node and "--data-parallel-hybrid-lb" in args:
# Set local DP size to GPUs per node for hybrid load balancing
args = set_argument_value(
args, "--data-parallel-size-local", str(num_gpus_per_node)
)
else:
# Remove hybrid-lb flags if not needed or not multinode
args = remove_valued_arguments(args, "--data-parallel-size-local")
if "--data-parallel-hybrid-lb" in args:
args.remove("--data-parallel-hybrid-lb")
# Enable expert parallel for MoE
if "--enable-expert-parallel" not in args:
args = append_argument(args, "--enable-expert-parallel")
worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump()
@classmethod @classmethod
def get_model_name(cls, config: dict) -> str: def get_model_name(cls, config: dict) -> str:
cfg = Config.model_validate(config) cfg = Config.model_validate(config)
...@@ -273,10 +345,13 @@ class VllmV1ConfigModifier(BaseConfigModifier): ...@@ -273,10 +345,13 @@ class VllmV1ConfigModifier(BaseConfigModifier):
) )
concurrency = float(line.split(" tokens per request: ")[1][:-1]) concurrency = float(line.split(" tokens per request: ")[1][:-1])
# Log shows per-rank KV cache; multiply by attention_dp_size for total
kv_cache_per_rank = int(token_count * concurrency)
total_kv_cache = kv_cache_per_rank * attention_dp_size
logger.info( logger.info(
f"Found KV cache info: {token_count} x {concurrency} = {int(token_count * concurrency)}" f"Found KV cache: {kv_cache_per_rank} per rank x {attention_dp_size} = {total_kv_cache} total"
) )
return int(token_count * concurrency) return total_kv_cache
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Failed to parse KV cache size from line: {line}. Error: {e}" f"Failed to parse KV cache size from line: {line}. Error: {e}"
...@@ -295,6 +370,10 @@ class VllmV1ConfigModifier(BaseConfigModifier): ...@@ -295,6 +370,10 @@ class VllmV1ConfigModifier(BaseConfigModifier):
Configure prefill-related limits for aggregated prefill runs. Configure prefill-related limits for aggregated prefill runs.
vLLM uses --max-num-seqs to limit concurrency and vLLM uses --max-num-seqs to limit concurrency and
--max-num-batched-tokens to cap total tokens per step. --max-num-batched-tokens to cap total tokens per step.
In vLLM, --max-num-batched-tokens controls per-GPU buffer allocation
during memory profiling. For DEP (DP > 1), we must use the base token
limit per GPU, not the multiplied total, to avoid OOM during profiling.
""" """
cfg = Config.model_validate(config) cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config( worker_service = get_worker_service_from_config(
...@@ -303,10 +382,22 @@ class VllmV1ConfigModifier(BaseConfigModifier): ...@@ -303,10 +382,22 @@ class VllmV1ConfigModifier(BaseConfigModifier):
args = validate_and_get_worker_args(worker_service, backend="vllm") args = validate_and_get_worker_args(worker_service, backend="vllm")
args = break_arguments(args) args = break_arguments(args)
# Concurrency / batch size # Get DP size from args (check both --dp and --data-parallel-size aliases)
dp_size = 1
for i, arg in enumerate(args):
if arg in ("--dp", "--data-parallel-size") and i + 1 < len(args):
dp_size = int(args[i + 1])
break
# For DEP (DP > 1), compute per-GPU token limit to avoid OOM
per_gpu_max_tokens = (
max_num_tokens // dp_size if dp_size > 1 else max_num_tokens
)
args = set_argument_value(args, "--max-num-seqs", str(max_batch_size)) args = set_argument_value(args, "--max-num-seqs", str(max_batch_size))
# Token cap per step args = set_argument_value(
args = set_argument_value(args, "--max-num-batched-tokens", str(max_num_tokens)) args, "--max-num-batched-tokens", str(per_gpu_max_tokens)
)
worker_service.extraPodSpec.mainContainer.args = args worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump() return cfg.model_dump()
...@@ -209,10 +209,19 @@ def plot_decode_3d_surface( ...@@ -209,10 +209,19 @@ def plot_decode_3d_surface(
xi = np.linspace(min(x_kv_usage), max(x_kv_usage), 100) xi = np.linspace(min(x_kv_usage), max(x_kv_usage), 100)
yi = np.linspace(min(y_context_length), max(y_context_length), 100) yi = np.linspace(min(y_context_length), max(y_context_length), 100)
X, Y = np.meshgrid(xi, yi) X, Y = np.meshgrid(xi, yi)
Z_itl = griddata((x_kv_usage, y_context_length), z_itl, (X, Y), method="cubic")
Z_thpt = griddata( # Try cubic interpolation first, fallback to linear if Qhull error occurs
(x_kv_usage, y_context_length), z_thpt_per_gpu, (X, Y), method="cubic" try:
) Z_itl = griddata((x_kv_usage, y_context_length), z_itl, (X, Y), method="cubic")
Z_thpt = griddata(
(x_kv_usage, y_context_length), z_thpt_per_gpu, (X, Y), method="cubic"
)
except Exception as e:
logger.warning(f"Cubic interpolation failed: {e}. Falling back to linear.")
Z_itl = griddata((x_kv_usage, y_context_length), z_itl, (X, Y), method="linear")
Z_thpt = griddata(
(x_kv_usage, y_context_length), z_thpt_per_gpu, (X, Y), method="linear"
)
# Plot ITL surface # Plot ITL surface
fig = plt.figure(figsize=(12, 10)) fig = plt.figure(figsize=(12, 10))
......
...@@ -84,6 +84,7 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -84,6 +84,7 @@ def create_profiler_parser() -> argparse.Namespace:
serviceName: String (service name, default: "") serviceName: String (service name, default: "")
model: String (served model name) model: String (served model name)
dgdImage: String (container image to use for DGD components (frontend, planner, workers), overrides images in config file) dgdImage: String (container image to use for DGD components (frontend, planner, workers), overrides images in config file)
deploymentTimeout: Int (maximum time to wait for deployment to become ready in seconds, default: 1800)
modelCache: modelCache:
pvcName: String (name of the PVC to mount the model cache, pvcName: String (name of the PVC to mount the model cache,
if not provided, model must be HF name and will download from HF, default: "") if not provided, model must be HF name and will download from HF, default: "")
...@@ -175,6 +176,12 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -175,6 +176,12 @@ def create_profiler_parser() -> argparse.Namespace:
default=_get(deployment_cfg, "dgdImage", "dgd_image", ""), default=_get(deployment_cfg, "dgdImage", "dgd_image", ""),
help="Container image to use for DGD components (frontend, planner, workers). Overrides images in config file.", help="Container image to use for DGD components (frontend, planner, workers). Overrides images in config file.",
) )
parser.add_argument(
"--deployment-timeout",
type=int,
default=_get(deployment_cfg, "deploymentTimeout", "deployment_timeout", 1800),
help="Maximum time to wait for deployment to become ready in seconds (default: 1800)",
)
parser.add_argument( parser.add_argument(
"--namespace", "--namespace",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment