Unverified Commit 2b5655fd authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: mocker <> planner profiler (#4651)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 03070fd5
...@@ -702,7 +702,7 @@ async def run_profile(args): ...@@ -702,7 +702,7 @@ async def run_profile(args):
logger.info("Deployment deleted") logger.info("Deployment deleted")
# generate DGD with planner based on profiling results # generate DGD with planner based on profiling results
config = generate_dgd_config_with_planner( config, mocker_config = generate_dgd_config_with_planner(
config_path=args.config, config_path=args.config,
config_modifier=config_modifier, config_modifier=config_modifier,
output_dir=args.output_dir, output_dir=args.output_dir,
...@@ -720,6 +720,14 @@ async def run_profile(args): ...@@ -720,6 +720,14 @@ async def run_profile(args):
else: else:
yaml.dump(config, f) yaml.dump(config, f)
# save mocker config with planner for testing purposes
logger.debug(f"Mocker config with planner: {mocker_config}")
with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
if isinstance(mocker_config, list):
yaml.dump_all(mocker_config, f)
else:
yaml.dump(mocker_config, f)
except Exception as e: except Exception as e:
logger.error(f"Profile job failed with error: {e}") logger.error(f"Profile job failed with error: {e}")
raise raise
......
...@@ -100,7 +100,7 @@ class DgdPlannerServiceConfig(BaseModel): ...@@ -100,7 +100,7 @@ class DgdPlannerServiceConfig(BaseModel):
mainContainer=Container( mainContainer=Container(
image="my-registry/dynamo-runtime:my-tag", # placeholder image="my-registry/dynamo-runtime:my-tag", # placeholder
workingDir=f"{get_workspace_dir()}/components/src/dynamo/planner", workingDir=f"{get_workspace_dir()}/components/src/dynamo/planner",
command=["python3", "-m", "planner_sla"], command=["python3", "-m", "dynamo.planner.planner_sla"],
args=[], args=[],
) )
) )
......
...@@ -13,19 +13,28 @@ ...@@ -13,19 +13,28 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy
import json import json
from typing import Optional import os
from typing import Any, Optional
import numpy as np import numpy as np
import yaml import yaml
from benchmarks.profiler.utils.config import Config, DgdPlannerServiceConfig from benchmarks.profiler.utils.config import (
Config,
DgdPlannerServiceConfig,
set_argument_value,
)
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import ( from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
ParallelizationMapping, ParallelizationMapping,
) )
from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace
from dynamo.common.utils.paths import get_workspace_dir from dynamo.common.utils.paths import get_workspace_dir
from dynamo.planner.defaults import SubComponentType from dynamo.planner.defaults import MockerComponentName, SubComponentType
# Path to mocker disagg config relative to workspace
MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml"
def generate_dgd_config_with_planner( def generate_dgd_config_with_planner(
...@@ -36,7 +45,7 @@ def generate_dgd_config_with_planner( ...@@ -36,7 +45,7 @@ def generate_dgd_config_with_planner(
best_prefill_mapping: ParallelizationMapping, best_prefill_mapping: ParallelizationMapping,
best_decode_mapping: ParallelizationMapping, best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8, num_gpus_per_node: int = 8,
): ) -> tuple[list[dict] | dict, list[dict] | dict]:
"""Generate DGD config with planner based on profiling results. """Generate DGD config with planner based on profiling results.
Args: Args:
...@@ -49,8 +58,11 @@ def generate_dgd_config_with_planner( ...@@ -49,8 +58,11 @@ def generate_dgd_config_with_planner(
num_gpus_per_node: Number of GPUs per node (for TEP/DEP models) num_gpus_per_node: Number of GPUs per node (for TEP/DEP models)
Returns: Returns:
list[dict] | dict: If a ConfigMap is generated for planner data, returns a list tuple: (dgd_config, mocker_config) where:
of two YAML documents [ConfigMap, DGD]; otherwise returns a single DGD dict. - dgd_config: list[dict] | dict - If a ConfigMap is generated for planner data,
returns a list of two YAML documents [ConfigMap, DGD]; otherwise returns a single DGD dict.
- mocker_config: list[dict] | dict - Mocker DGD config with planner for testing purposes.
If a ConfigMap is generated, returns [ConfigMap, DGD]; otherwise returns a single DGD dict.
""" """
# Load config from file # Load config from file
...@@ -114,6 +126,7 @@ def generate_dgd_config_with_planner( ...@@ -114,6 +126,7 @@ def generate_dgd_config_with_planner(
planner_config = DgdPlannerServiceConfig() planner_config = DgdPlannerServiceConfig()
frontend_service = config.spec.services["Frontend"] frontend_service = config.spec.services["Frontend"]
planner_config.dynamoNamespace = getattr(frontend_service, "dynamoNamespace", "dynamo") # type: ignore[attr-defined] planner_config.dynamoNamespace = getattr(frontend_service, "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
frontend_image: Optional[str] = None
if frontend_service.extraPodSpec and frontend_service.extraPodSpec.mainContainer: if frontend_service.extraPodSpec and frontend_service.extraPodSpec.mainContainer:
frontend_image = frontend_service.extraPodSpec.mainContainer.image frontend_image = frontend_service.extraPodSpec.mainContainer.image
if frontend_image and planner_config.extraPodSpec.mainContainer: if frontend_image and planner_config.extraPodSpec.mainContainer:
...@@ -239,7 +252,148 @@ def generate_dgd_config_with_planner( ...@@ -239,7 +252,148 @@ def generate_dgd_config_with_planner(
# Finalize DGD services # Finalize DGD services
config_dict["spec"]["services"]["Planner"] = planner_dict config_dict["spec"]["services"]["Planner"] = planner_dict
# Generate mocker config with planner for testing purposes
mocker_config = _generate_mocker_config_with_planner(
args=args,
cm_mount_path=cm_mount_path,
config_map_obj=config_map_obj,
planner_dict=planner_dict,
)
# Return multi-doc YAML (ConfigMap + DGD) when ConfigMap is created; else DGD only
dgd_config: list[dict[str, Any]] | dict[str, Any]
if config_map_obj is not None:
dgd_config = [config_map_obj, config_dict]
else:
dgd_config = config_dict
return dgd_config, mocker_config
def _generate_mocker_config_with_planner(
args,
cm_mount_path: str,
config_map_obj: Optional[dict],
planner_dict: dict,
) -> list[dict] | dict:
"""Generate mocker DGD config with planner for testing purposes.
This loads the mocker disagg.yaml, updates the worker image, mounts the
profiling ConfigMap, sets --planner-profile-data for workers, and adds the planner service.
Args:
args: Parsed arguments namespace from profile_sla
cm_mount_path: Mount path for the ConfigMap containing profiling data
config_map_obj: The ConfigMap object (if created)
planner_dict: The planner service dict to reuse (includes planner image)
Returns:
list[dict] | dict: If a ConfigMap is generated, returns [ConfigMap, DGD];
otherwise returns a single DGD dict.
"""
# Load mocker disagg config
workspace_dir = get_workspace_dir()
mocker_config_path = os.path.join(workspace_dir, MOCKER_DISAGG_CONFIG_PATH)
with open(mocker_config_path, "r") as f:
mocker_config = yaml.safe_load(f)
# Update worker image if provided
if args.dgd_image:
for service_name, service_config in (
mocker_config.get("spec", {}).get("services", {}).items()
):
if service_config.get("extraPodSpec") and service_config[
"extraPodSpec"
].get("mainContainer"):
service_config["extraPodSpec"]["mainContainer"][
"image"
] = args.dgd_image
# Update worker args: --planner-profile-data, --model-path, --model-name
mocker_worker_names = [
MockerComponentName.prefill_worker_k8s_name,
MockerComponentName.decode_worker_k8s_name,
]
for worker_name in mocker_worker_names:
service_config = (
mocker_config.get("spec", {}).get("services", {}).get(worker_name)
)
if service_config:
main_container = service_config.get("extraPodSpec", {}).get(
"mainContainer", {}
)
args_list = main_container.get("args", [])
args_list = set_argument_value(
args_list, "--planner-profile-data", cm_mount_path
)
# Update model path and name if available in args
args_list = set_argument_value(args_list, "--model-path", args.model)
args_list = set_argument_value(args_list, "--model-name", args.model)
main_container["args"] = args_list
# Mount the ConfigMap if it exists
if config_map_obj is not None:
for worker_name in mocker_worker_names:
service_config = (
mocker_config.get("spec", {}).get("services", {}).get(worker_name)
)
if service_config:
extra_pod_spec = service_config.setdefault("extraPodSpec", {})
# Add volume
volumes = extra_pod_spec.setdefault("volumes", [])
volumes.append(
{
"name": "planner-profile-data",
"configMap": {"name": "planner-profile-data"},
}
)
# Add volume mount
main_container = extra_pod_spec.setdefault("mainContainer", {})
volume_mounts = main_container.setdefault("volumeMounts", [])
volume_mounts.append(
{
"name": "planner-profile-data",
"mountPath": cm_mount_path,
"readOnly": True,
}
)
# Add planner service (reuse the same planner config but with mocker backend)
mocker_planner_dict = copy.deepcopy(planner_dict)
# Get the mocker's dynamoNamespace from Frontend service
mocker_namespace = (
mocker_config.get("spec", {})
.get("services", {})
.get("Frontend", {})
.get("dynamoNamespace", "mocker-disagg")
)
# Update planner's dynamoNamespace to match mocker's namespace
mocker_planner_dict["dynamoNamespace"] = mocker_namespace
# Override --backend to mocker and --namespace to match mocker's dynamoNamespace
# Planner args use --key=value format, so we need to find and replace
planner_main_container = mocker_planner_dict.get("extraPodSpec", {}).get(
"mainContainer", {}
)
planner_args = planner_main_container.get("args", [])
updated_planner_args = []
for arg in planner_args:
if arg.startswith("--backend="):
updated_planner_args.append("--backend=mocker")
elif arg.startswith("--namespace="):
updated_planner_args.append(f"--namespace={mocker_namespace}")
else:
updated_planner_args.append(arg)
planner_main_container["args"] = updated_planner_args
mocker_config["spec"]["services"]["Planner"] = mocker_planner_dict
# Return multi-doc YAML (ConfigMap + DGD) when ConfigMap is created; else DGD only # Return multi-doc YAML (ConfigMap + DGD) when ConfigMap is created; else DGD only
if config_map_obj is not None: if config_map_obj is not None:
return [config_map_obj, config_dict] return [config_map_obj, mocker_config]
return config_dict return mocker_config
...@@ -25,6 +25,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume ...@@ -25,6 +25,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume
- `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster - `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster
- `--data-parallel-size`: Number of data parallel workers to simulate (default: 1) - `--data-parallel-size`: Number of data parallel workers to simulate (default: 1)
- `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool - `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool
- `--is-prefill-worker` / `--is-decode-worker`: Whether the worker is a prefill or decode worker for disaggregated deployment. If not specified, mocker will be in aggregated mode.
### Example with individual arguments (vLLM-style): ### Example with individual arguments (vLLM-style):
```bash ```bash
...@@ -65,4 +66,13 @@ python benchmarks/profiler/profile_sla.py \ ...@@ -65,4 +66,13 @@ python benchmarks/profiler/profile_sla.py \
--profile-config your_profile_config.yaml --profile-config your_profile_config.yaml
``` ```
Then use the resulting profile results directory directly with `--planner-profile-data`. Then use the resulting profile results directory directly with `--planner-profile-data`.
\ No newline at end of file
## Deploying Mocker in K8s
We provide the example DGD yaml configurations for aggregated and disaggregated deployment in `examples/backends/mocker/deploy/`. You can deploy the mocker engine in K8s by running:
```bash
kubectl apply -f examples/backends/mocker/deploy/agg.yaml # or, for disaggregated
kubectl apply -f examples/backends/mocker/deploy/disagg.yaml
```
\ No newline at end of file
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
import asyncio import asyncio
import logging import logging
import os import os
import signal
import uvloop import uvloop
...@@ -22,6 +23,17 @@ configure_dynamo_logging() ...@@ -22,6 +23,17 @@ configure_dynamo_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def graceful_shutdown(runtimes: list):
"""
Shutdown dynamo distributed runtime instances.
The endpoints will be immediately invalidated so no new requests will be accepted.
"""
logger.info("Received shutdown signal, shutting down DistributedRuntime instances")
for runtime in runtimes:
runtime.shutdown()
logger.info("DistributedRuntime shutdown complete")
async def worker(): async def worker():
"""Main worker function that launches mocker instances. """Main worker function that launches mocker instances.
...@@ -100,11 +112,20 @@ async def launch_workers(args, extra_engine_args_path): ...@@ -100,11 +112,20 @@ async def launch_workers(args, extra_engine_args_path):
logger.info(f"All {args.num_workers} mocker worker(s) created and running") logger.info(f"All {args.num_workers} mocker worker(s) created and running")
# Set up signal handler for graceful shutdown
def signal_handler():
asyncio.create_task(graceful_shutdown(runtimes))
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
logger.info("Signal handlers set up for graceful shutdown")
try: try:
# Wait for all futures to complete # Wait for all futures to complete
await asyncio.gather(*futures, return_exceptions=True) await asyncio.gather(*futures, return_exceptions=True)
finally: finally:
# Clean up runtimes # Clean up runtimes (in case they weren't already shut down by signal handler)
logger.info("Shutting down DistributedRuntime instances") logger.info("Shutting down DistributedRuntime instances")
for runtime in runtimes: for runtime in runtimes:
runtime.shutdown() runtime.shutdown()
......
...@@ -130,10 +130,21 @@ class TrtllmComponentName: ...@@ -130,10 +130,21 @@ class TrtllmComponentName:
decode_worker_endpoint = "generate" decode_worker_endpoint = "generate"
class MockerComponentName:
# Mocker backend for testing/simulation purposes
prefill_worker_k8s_name = "prefill"
prefill_worker_component_name = "prefill"
prefill_worker_endpoint = "generate"
decode_worker_k8s_name = "decode"
decode_worker_component_name = "backend"
decode_worker_endpoint = "generate"
WORKER_COMPONENT_NAMES = { WORKER_COMPONENT_NAMES = {
"vllm": VllmComponentName, "vllm": VllmComponentName,
"sglang": SGLangComponentName, "sglang": SGLangComponentName,
"trtllm": TrtllmComponentName, "trtllm": TrtllmComponentName,
"mocker": MockerComponentName,
} }
...@@ -177,6 +188,10 @@ class Service(BaseModel): ...@@ -177,6 +188,10 @@ class Service(BaseModel):
and len(args) > args.index("--served-model-name") + 1 and len(args) > args.index("--served-model-name") + 1
): ):
return args[args.index("--served-model-name") + 1] return args[args.index("--served-model-name") + 1]
if (
"--model-name" in args and len(args) > args.index("--model-name") + 1
): # mocker use --model-name
return args[args.index("--model-name") + 1]
if "--model" in args and len(args) > args.index("--model") + 1: if "--model" in args and len(args) > args.index("--model") + 1:
return args[args.index("--model") + 1] return args[args.index("--model") + 1]
......
...@@ -39,7 +39,7 @@ def create_sla_planner_parser() -> argparse.ArgumentParser: ...@@ -39,7 +39,7 @@ def create_sla_planner_parser() -> argparse.ArgumentParser:
parser.add_argument( parser.add_argument(
"--backend", "--backend",
default=SLAPlannerDefaults.backend, default=SLAPlannerDefaults.backend,
choices=["vllm", "sglang", "trtllm"], choices=["vllm", "sglang", "trtllm", "mocker"],
help="Backend type", help="Backend type",
) )
parser.add_argument( parser.add_argument(
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: mocker-agg
spec:
services:
Frontend:
dynamoNamespace: mocker-agg
componentType: frontend
replicas: 1
extraPodSpec:
mainContainer:
image: my-registry/mocker-runtime:my-tag
decode:
envFromSecret: hf-token-secret
dynamoNamespace: mocker-agg
componentType: worker
subComponentType: decode
replicas: 1
extraPodSpec:
mainContainer:
image: my-registry/mocker-runtime:my-tag
workingDir: /workspace
command:
- python3
- -m
- dynamo.mocker
args:
- --model-path
- nvidia/Llama-3.1-8B-Instruct-FP8
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --speedup-ratio
- "1.0"
- --planner-profile-data
- /workspace/tests/planner/profiling_results/H200_TP1P_TP1D
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: mocker-disagg
spec:
services:
Frontend:
dynamoNamespace: mocker-disagg
componentType: frontend
replicas: 1
extraPodSpec:
mainContainer:
image: my-registry/mocker-runtime:my-tag
prefill:
envFromSecret: hf-token-secret
dynamoNamespace: mocker-disagg
componentType: worker
subComponentType: prefill
replicas: 1
extraPodSpec:
mainContainer:
image: my-registry/mocker-runtime:my-tag
workingDir: /workspace
command:
- python3
- -m
- dynamo.mocker
args:
- --model-path
- nvidia/Llama-3.1-8B-Instruct-FP8
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --speedup-ratio
- "1.0"
- --planner-profile-data
- /workspace/tests/planner/profiling_results/H200_TP1P_TP1D
- --is-prefill-worker
decode:
envFromSecret: hf-token-secret
dynamoNamespace: mocker-disagg
componentType: worker
subComponentType: decode
replicas: 1
extraPodSpec:
mainContainer:
image: my-registry/mocker-runtime:my-tag
workingDir: /workspace
command:
- python3
- -m
- dynamo.mocker
args:
- --model-path
- nvidia/Llama-3.1-8B-Instruct-FP8
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --speedup-ratio
- "1.0"
- --planner-profile-data
- /workspace/tests/planner/profiling_results/H200_TP1P_TP1D
- --is-decode-worker
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment