Unverified Commit 7d5d6f8c authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files
parent 0715d469
...@@ -76,6 +76,7 @@ __pycache__/ ...@@ -76,6 +76,7 @@ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
*.so *.so
*.egg-info
### Helm ### ### Helm ###
*.tgz *.tgz
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
[workspace] [workspace]
members = [ members = [
"components/*", "components/http",
"components/metrics",
"components/router",
"launch/*", "launch/*",
"lib/llm", "lib/llm",
"lib/runtime", "lib/runtime",
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Planner
The planner is a component that monitors the state of the system and makes adjustments to the number of workers to ensure that the system is running efficiently. It can dynamically scale prefill/decode workers up and down based on a variety of KV metrics. You can find documentation and benchmarking examples in the [planner docs](../../docs/planner.md).
## Usage
After you've deployed a dynamo graph, you can start the planner with the following command:
```bash
python components/planner.py --namespace <namespace>
```
## Backends
1. `local` - uses circus to start/stop worker subprocesses
2. `kubernetes` - uses the kubernetes API to adjust replicas of each component's resource definition. This is a work in progress and not currently available
## Local Backend (LocalPlanner)
The LocalPlanner is built on top of circus, which is what we use to manage component subprocesses when running dynamo serve. LocalPlanner allows the planner component to scale workers up and down based on system metrics.
**Current limitations**
1. Single node only
2. Workers must be using only a single GPU
3. Your initial deployment must be replicas=1 for both prefill and decode
We are working on addressing these as fast as possible.
### Under the Hood
Circus has a concept of an arbiter and a watcher:
- **Arbiter**: The supervisor process that manages all watchers
- **Watcher**: A process that encodes environment variables, command, name, and other information needed to run a component
When a service is started, each worker process is spun up as a watcher. For example, when starting a VllmWorker, a watcher is created that looks like:
```json
{
"dynamo_VllmWorker": {
"watcher_name": "dynamo_VllmWorker",
"cmd": "/opt/dynamo/venv/bin/python3 -m dynamo.sdk.cli.serve_dynamo graphs.agg_router:Frontend --service-name VllmWorker --worker-id $(CIRCUS.WID) --worker-env [{\"CUDA_VISIBLE_DEVICES\": \"0\"}]",
"resources": {
"allocated_gpus": [
0
]
},
"lease": 7587886183172559418
}
}
```
The arbiter exposes an endpoint allowing messages to add/remove/change watchers. The LocalPlanner leverages this functionality to dynamically adjust worker counts.
### Implementation
The planner architecture is designed to be simple and extensible:
- An abstract class supports basic add/remove component operations
- This is implemented in `local_connector.py`
- Circus interaction logic is in `circusd.py`, which reads the statefile, connects to the endpoint, and provides add/remove functionality
- Planner starts an instance of `LocalConnector` and uses it to modify the deployment topology
### Statefile
The statefile maintains the current state of all running workers and is used by the LocalPlanner to track and modify the deployment. It's stored at `~/.dynamo/state/{namespace}.json` (or in the directory specified by `DYN_LOCAL_STATE_DIR`). The statefile is automatically created when you run dynamo serve and is cleaned up when the arbiter terminates. Each worker is identified as `{namespace}_{component_name}` with an optional numeric suffix for additional instances.
#### Example: Adding and Removing Workers
Starting with a single decode worker:
```json
{
"dynamo_VllmWorker": {..., "resources":{...}}
}
```
After adding a worker:
```json
{
"dynamo_VllmWorker": {..., "resources":{...}},
"dynamo_VllmWorker_1": {..., "resources":{...}}
}
```
After removing a worker (removes the highest suffix):
```json
{
"dynamo_VllmWorker": {..., "resources":{...}}
}
```
If scaled to zero, the initial entry is kept without resources to maintain configuration information:
```json
{
"dynamo_VllmWorker": {...}
}
```
### Looking forward
- Support for a multinode LocalPlanner
- Storing the statefile (and initial configurations) in ETCD using the the new `EtcdKvCache`.
### Testing
For manual testing, you can use the controller_test.py file to add/remove components after you've run a serve command with `--enable-local-planner`.
## Kubernetes Backend
[Coming soon]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = ["CircusController", "LocalConnector", "PlannerConnector"]
# Import the classes
from dynamo.planner.circusd import CircusController
from dynamo.planner.local_connector import LocalConnector
from dynamo.planner.planner_connector import PlannerConnector
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from circus.client import CircusClient
from circus.exc import CallError
from dynamo.sdk.lib.logging import configure_server_logging
configure_server_logging()
logger = logging.getLogger(__name__)
class CircusController:
"""A circus client implementation for Dynamo"""
def __init__(self, endpoint: str):
"""Initialize connection to arbiter.
Args:
endpoint: The circus endpoint (e.g., tcp://127.0.0.1:54927)
"""
self.endpoint = endpoint
self.client = CircusClient(endpoint=endpoint, timeout=15.0)
@classmethod
def from_state_file(cls, namespace: str) -> "CircusController":
"""
Create a CircusController from a Dynamo state file.
Args:
namespace: The Dynamo namespace
Returns:
CircusController instance
Raises:
FileNotFoundError: If state file doesn't exist
ValueError: If no endpoint found in state file
"""
state_file = (
Path(
os.environ.get("DYN_LOCAL_STATE_DIR", Path.home() / ".dynamo" / "state")
)
/ f"{namespace}.json"
)
if not state_file.exists():
raise FileNotFoundError(f"State file not found: {state_file}")
with open(state_file, "r") as f:
state = json.load(f)
endpoint = state.get("circus_endpoint")
if not endpoint:
raise ValueError(f"No endpoint found in state file: {state_file}")
return cls(endpoint)
async def add_watcher(
self,
name: str,
cmd: str,
env: Optional[Dict[str, str]] = None,
max_retries: int = 3,
base_delay: float = 2.0,
**options: Any,
) -> bool:
"""
Add a new watcher to circus
Args:
name: Name of the watcher
cmd: Command to run
env: Environment variables
max_retries: Maximum number of retry attempts
base_delay: Base delay for exponential backoff
**options: Additional watcher options
Returns:
True if successful, False otherwise
"""
watcher_options: dict[str, Any] = {
"copy_env": True,
"stop_children": True,
"graceful_timeout": 86400,
"respawn": False,
}
if env:
watcher_options["env"] = env
watcher_options.update(options)
for attempt in range(max_retries):
try:
if attempt > 0:
delay = base_delay * (2**attempt)
logger.info(
f"Retrying add_watcher for {name} (attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
response = self.client.send_message(
"add",
name=name,
cmd=cmd,
args=[],
options=watcher_options,
start=True,
)
if response.get("status") == "ok":
logger.info(
f"Successfully added watcher {name} on attempt {attempt + 1}"
)
return True
logger.error(
f"Failed to add watcher {name}: {response.get('reason', 'unknown error')}"
)
return False
except Exception as e:
if "arbiter is already running" in str(e):
if attempt == max_retries - 1:
logger.error(
f"Failed to remove watcher {name} after {max_retries} attempts: arbiter busy"
)
return False
logger.warning(
f"Arbiter busy with manage_watchers command, will retry removing watcher {name}"
)
continue
if attempt == max_retries - 1:
logger.error(
f"Failed to add watcher {name} after {max_retries} attempts: {e}"
)
return False
logger.warning(f"Error adding watcher {name}: {e}")
return False
async def remove_watcher(
self,
name: str,
nostop: bool = False,
waiting: bool = True,
max_retries: int = 3,
retry_delay: float = 2.0,
timeout: int = 600, # 10 minutes
) -> bool:
"""
Terminate processes and remove a watcher
Args:
name: The name of the watcher to remove
nostop: Whether to stop the processes or not
waiting: Whether to wait for completion
max_retries: Maximum number of retry attempts
retry_delay: Delay between retries in seconds
Returns:
True if successful, False otherwise
"""
exited = await self._wait_for_process_graceful_exit(name, timeout)
if not exited:
logger.error(
f"Process for {name} did not exit gracefully. Proceeding with forced removal."
)
logger.info(f"Removing watcher {name}")
for attempt in range(max_retries):
try:
if attempt > 0:
delay = retry_delay * (2**attempt)
logger.info(
f"Retrying remove_watcher for {name} (attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
response = self.client.send_message(
"rm",
name=name,
nostop=nostop,
waiting=waiting,
)
if response.get("status") == "ok":
logger.info(
f"Successfully removed watcher {name} on attempt {attempt + 1}"
)
break
logger.error(f"Failed to remove watcher {name}: {response}")
return False
except Exception as e:
if "arbiter is already running" in str(e):
if attempt == max_retries - 1:
logger.error(
f"Failed to remove watcher {name} after {max_retries} attempts: arbiter busy"
)
return False
logger.warning(
f"Arbiter busy with manage_watchers command, will retry removing watcher {name}"
)
continue
if attempt == max_retries - 1:
logger.error(
f"Failed to remove watcher {name} after {max_retries} attempts: {e}"
)
return False
# Verify the watcher is actually gone
removed = await self._verify_watcher_removal(name)
if not removed:
logger.error(f"Watcher {name} still exists after {max_retries} attempts")
return False
return True
async def _wait_for_process_graceful_exit(
self, name: str, timeout: int = 600
) -> bool:
"""
Wait for a watcher's process to exit gracefully. This is usually called after
we've revoked the lease which triggers a graceful exit.
Args:
name: The name of the watcher
timeout: The timeout for the wait
Returns:
True if the process exited gracefully, False otherwise
"""
start_time = asyncio.get_event_loop().time()
while True:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
logger.warning(
f"Timeout ({timeout}s) reached waiting for {name} to exit gracefully. "
f"Proceeding with forced removal."
)
return False
num_processes = await self._get_watcher_processes(name)
if num_processes is None:
logger.error(f"Failed to get process count for {name}")
return False
if num_processes == 0:
logger.info(f"Processes for {name} have exited gracefully")
return True
logger.info(
f"Currently {num_processes} alive, waiting for it to exit gracefully "
f"({int(elapsed)}s/{timeout}s elapsed)"
)
await asyncio.sleep(1)
async def _verify_watcher_removal(
self, name: str, max_attempts: int = 10, delay: float = 1.0
) -> bool:
"""
Verify that a watcher has been removed. This is usually called after a forced removal.
Args:
name: The name of the watcher
max_attempts: The maximum number of attempts to verify the watcher removal
delay: The delay between attempts in seconds
Returns:
True if the watcher has been removed, False otherwise
"""
for attempt in range(max_attempts):
watchers = await self._list_watchers()
if watchers is None:
logger.error("Failed to list watchers")
return False
if name not in watchers:
logger.info(f"Verified watcher {name} has been removed")
return True
logger.info(
f"Waiting for watcher {name} to be fully removed (attempt {attempt + 1}/{max_attempts})"
)
await asyncio.sleep(delay)
logger.error(
f"Watcher {name} still exists after {max_attempts} verification attempts"
)
return False
async def _get_watcher_processes(self, name: str) -> Optional[int]:
"""
Get number of processes for a watcher.
Args:
name: The name of the watcher
Returns:
Number of processes for the watcher. Returns None operation fails.
"""
try:
response = self.client.send_message("numprocesses", name=name)
return int(response.get("numprocesses", 0))
except (CallError, Exception) as e:
logger.error(f"Failed to get process count for {name}: {e}")
return None
async def _list_watchers(self) -> Optional[List[str]]:
"""
List all watchers managed by circus.
Returns:
List of watcher names. Returns None if the list operation fails.
"""
try:
response = self.client.send_message("list")
return response.get("watchers", [])
except (CallError, Exception) as e:
logger.error(f"Failed to list watchers: {e}")
return None
def close(self) -> None:
"""Close the connection to the arbiter."""
if hasattr(self, "client"):
self.client.stop()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List
import filelock
from dynamo.planner.circusd import CircusController
from dynamo.planner.planner_connector import PlannerConnector
from dynamo.runtime import DistributedRuntime
from dynamo.sdk.lib.logging import configure_server_logging
configure_server_logging()
logger = logging.getLogger(__name__)
class LocalConnector(PlannerConnector):
def __init__(self, namespace: str, runtime: DistributedRuntime):
"""
Initialize LocalConnector and connect to CircusController.
Args:
namespace: The Dynamo namespace
runtime: Optional DistributedRuntime instance
"""
self.namespace = namespace
self.runtime = runtime
self.state_file = Path.home() / ".dynamo" / "state" / f"{namespace}.json"
self.circus = CircusController.from_state_file(namespace)
self._lockfile = self.state_file.with_suffix(".lock")
self._file_lock = filelock.FileLock(self._lockfile)
self.worker_client: Any | None = None
self.prefill_client: Any | None = None
self.etcd_client: Any | None = None
async def _load_state(self) -> Dict[str, Any]:
"""Load state from state file.
Returns:
State dictionary
"""
if not self.state_file.exists():
raise FileNotFoundError(f"State file not found: {self.state_file}")
with self._file_lock:
with open(self.state_file, "r") as f:
return json.load(f)
async def _save_state(self, state: Dict[str, Any]) -> bool:
"""Save state to state file.
Args:
state: State dictionary to save
Returns:
True if successful
"""
try:
with self._file_lock:
with open(self.state_file, "w") as f:
json.dump(state, f, indent=2)
return True
except Exception as e:
logger.error(f"Failed to save state: {e}")
return False
async def _get_available_gpus(self) -> List[str]:
"""Get list of unallocated GPU IDs.
Returns:
List of available GPU IDs
"""
state = await self._load_state()
system_resources = state.get("environment", {}).get("SYSTEM_RESOURCES", {})
all_gpus = set(str(gpu) for gpu in system_resources.get("gpu_info", []))
allocated_gpus: set[str] = set()
for component_info in state.get("components", {}).values():
resources = component_info.get("resources", {})
gpu_list = resources.get("allocated_gpus", [])
allocated_gpus.update(str(gpu) for gpu in gpu_list)
logger.info(f"Allocated GPUs: {allocated_gpus}")
available = sorted(list(all_gpus - allocated_gpus))
logger.info(f"Available GPUs: {available}")
return available
async def add_component(self, component_name: str, blocking: bool = True) -> bool:
"""
Add a component. The steps are as follows:
1. Load state
2. Find max suffix to create unique watcher name
3. Built environment and command for watcher
4. Block until component is running
Args:
component_name: Name of the component
Returns:
True if successful
"""
state = await self._load_state()
# Find max suffix
max_suffix = 0
for watcher_name in state["components"].keys():
if watcher_name.startswith(f"{self.namespace}_{component_name}_"):
suffix = int(
watcher_name.replace(f"{self.namespace}_{component_name}_", "")
)
max_suffix = max(max_suffix, suffix)
watcher_name = f"{self.namespace}_{component_name}_{max_suffix + 1}"
if component_name not in [
c.replace(f"{self.namespace}_", "") for c in state["components"]
]:
raise ValueError(
f"Component {component_name} not found in state configuration"
)
# Get base command and config
component_info = state["components"][f"{self.namespace}_{component_name}"]
base_cmd = component_info["cmd"].split("--worker-env")[0].strip()
service_config = state["environment"].get("DYNAMO_SERVICE_CONFIG")
# Build environment
watcher_env = os.environ.copy()
if component_name in ["VllmWorker", "PrefillWorker"]:
available_gpus = await self._get_available_gpus()
if not available_gpus:
raise ValueError("No GPUs available for allocation")
gpu_id = available_gpus[0]
watcher_env["CUDA_VISIBLE_DEVICES"] = gpu_id
watcher_env["DYNAMO_SERVICE_CONFIG"] = service_config
# Build worker env list and command
worker_env_list = [watcher_env]
worker_env_arg = json.dumps(worker_env_list)
# We add a custom component name to ensure that the lease is attatched to this specific watcher
full_cmd = f"{base_cmd} --worker-env '{worker_env_arg}' --custom-component-name '{watcher_name}'"
pre_add_endpoint_ids = await self._get_endpoint_ids(component_name)
logger.info(f"Pre-add endpoint IDs: {pre_add_endpoint_ids}")
logger.info(f"Adding watcher {watcher_name}")
success = await self.circus.add_watcher(
name=watcher_name, cmd=full_cmd, env=watcher_env, singleton=True
)
if success:
resources = {}
if component_name in ["VllmWorker", "PrefillWorker"]:
resources["allocated_gpus"] = [gpu_id]
state["components"][watcher_name] = {
"watcher_name": watcher_name,
"cmd": full_cmd,
"resources": resources,
}
await self._save_state(state)
logger.info(
f"Succesfully created {watcher_name}. Waiting for worker to start..."
)
if blocking:
required_endpoint_ids = pre_add_endpoint_ids + 1
while True:
current_endpoint_ids = await self._get_endpoint_ids(component_name)
if current_endpoint_ids == required_endpoint_ids:
break
logger.info(
f"Waiting for {component_name} to start. Current endpoint IDs: {current_endpoint_ids}, Required endpoint IDs: {required_endpoint_ids}"
)
await asyncio.sleep(5)
return success
async def remove_component(
self, component_name: str, blocking: bool = True
) -> bool:
"""
Remove a component. The initial components are not numbered so we simply remove their resources
and lease but keep the entry in order to use the cmd. This allows us to re-add the component
without having to re-specify the cmd. For components that have been added, we remove their entry
entry
Args:
component_name: Name of the component
Returns:
True if successful
"""
logger.info(f"Attempting to remove component {component_name}")
state = await self._load_state()
matching_components = {}
base_name = f"{self.namespace}_{component_name}"
base_name_with_underscore = f"{base_name}_"
for watcher_name in state["components"].keys():
if watcher_name == base_name:
matching_components[0] = watcher_name
elif watcher_name.startswith(base_name_with_underscore):
suffix = int(watcher_name.replace(base_name_with_underscore, ""))
matching_components[suffix] = watcher_name
if not matching_components:
logger.error(f"No matching components found for {component_name}")
return False
highest_suffix = max(matching_components.keys())
target_watcher = matching_components[highest_suffix]
logger.info(f"Removing watcher {target_watcher}")
pre_remove_endpoint_ids = await self._get_endpoint_ids(component_name)
if component_name == "VllmWorker" or component_name == "PrefillWorker":
lease_id = state["components"][target_watcher]["lease"]
await self._revoke_lease(lease_id)
# Poll endpoint to ensure that worker has shut down gracefully and then remove the watcher
if blocking:
required_endpoint_ids = pre_remove_endpoint_ids - 1
while True:
current_endpoint_ids = await self._get_endpoint_ids(component_name)
if current_endpoint_ids == required_endpoint_ids:
break
logger.info(
f"Waiting for {component_name} to shutdown. Current endpoint IDs: {current_endpoint_ids}, Required endpoint IDs: {required_endpoint_ids}"
)
await asyncio.sleep(5)
success = await self.circus.remove_watcher(name=target_watcher)
logger.info(
f"Circus remove_watcher for {target_watcher} {'succeeded' if success else 'failed'}"
)
if success:
if highest_suffix > 0: # Numbered watcher - remove entire entry
if target_watcher in state["components"]:
del state["components"][target_watcher]
else: # Base watcher - just clear resources and lease
if target_watcher in state["components"]:
state["components"][target_watcher]["resources"] = {}
state["components"][target_watcher]["lease"] = None
await self._save_state(state)
return success
async def _get_endpoint_ids(self, component_name: str) -> int:
"""
Get the endpoint IDs for a component.
Args:
component_name: Name of the component
Returns:
Number of endpoint IDs for a component
"""
if component_name == "VllmWorker":
if self.worker_client is None:
self.worker_client = (
await self.runtime.namespace(self.namespace)
.component(component_name)
.endpoint("generate")
.client()
)
worker_ids = self.worker_client.endpoint_ids()
return len(worker_ids)
elif component_name == "PrefillWorker":
if self.prefill_client is None:
self.prefill_client = (
await self.runtime.namespace(self.namespace)
.component(component_name)
.endpoint("mock")
.client()
)
prefill_ids = self.prefill_client.endpoint_ids()
return len(prefill_ids)
else:
raise ValueError(f"Component {component_name} not supported")
async def _revoke_lease(self, lease_id: int) -> bool:
"""
Wrapper function around the etcd client to revoke a lease
Args:
lease_id: Lease ID to revoke
Returns:
True if successful
"""
if self.etcd_client is None:
self.etcd_client = self.runtime.etcd_client() # type: ignore
try:
await self.etcd_client.revoke_lease(lease_id)
logger.info(f"Revoked lease {lease_id}")
return True
except Exception as e:
logger.error(f"Failed to revoke lease {lease_id}: {e}")
return False
def __del__(self):
"""Cleanup circus controller connection on deletion."""
if hasattr(self, "circus"):
self.circus.close()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
# TODO: add ability to scale component to X replicas
class PlannerConnector(ABC):
@abstractmethod
async def add_component(self, component_name):
"""Add a component to the planner"""
pass
@abstractmethod
async def remove_component(self, component_name):
"""Remove a component from the planner"""
pass
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import sys
from typing import Literal
import pytest
from dynamo.planner import LocalConnector
from dynamo.runtime import DistributedRuntime, dynamo_worker
pytestmark = pytest.mark.skip("This is not a test file")
ComponentType = Literal["VllmWorker", "PrefillWorker"]
VALID_COMPONENTS = ["VllmWorker", "PrefillWorker"]
async def test_state_management(connector: LocalConnector) -> bool:
"""Test state file operations."""
print("\n=== Testing State Management ===")
try:
# Test load state
state = await connector._load_state()
print("✓ Load state successful")
# Test save state (with a copy)
success = await connector._save_state(state)
print(
f"{'✓' if success else '✗'} Save state {'successful' if success else 'failed'}"
)
return True
except Exception as e:
print(f"✗ State management test failed: {e}")
return False
async def test_add_component(
connector: LocalConnector, component: ComponentType
) -> bool:
"""Test adding a component."""
print(f"\n=== Testing Add Component: {component} ===")
try:
success = await connector.add_component(component)
print(
f"{'✓' if success else '✗'} Add {component} {'successful' if success else 'failed'}"
)
return success
except Exception as e:
print(f"✗ Add {component} test failed: {e}")
return False
async def test_remove_component(
connector: LocalConnector, component: ComponentType
) -> bool:
"""Test removing a component."""
print(f"\n=== Testing Remove Component: {component} ===")
try:
state = await connector._load_state()
base_name = f"{connector.namespace}_{component}_"
# Find all components with numbered suffixes
matching_components = []
for watcher_name in state["components"].keys():
if watcher_name.startswith(base_name):
try:
suffix = int(watcher_name.replace(base_name, ""))
matching_components.append((suffix, watcher_name))
except ValueError:
continue
if not matching_components:
base_component = f"{connector.namespace}_{component}"
if base_component in state["components"]:
success = await connector.remove_component(component)
print(
f"{'✓' if success else '✗'} Remove {component} {'successful' if success else 'failed'}"
)
return success
else:
print(f"✗ No {component} components found to remove")
return False
# Remember which watcher we're removing
highest_suffix = max(suffix for suffix, _ in matching_components)
target_component = f"{base_name}{highest_suffix}"
success = await connector.remove_component(component)
# New verification logic that handles both numbered and base watchers
if success:
new_state = await connector._load_state()
# For numbered watchers (with suffix > 0)
if highest_suffix > 0:
# Success if the component is completely removed
if target_component not in new_state["components"]:
print(f"✓ Successfully removed {target_component}")
return True
else:
print(f"✗ Failed to remove {target_component} from state")
return False
# For base watchers (no suffix)
else:
base_component = f"{connector.namespace}_{component}"
if base_component in new_state["components"]:
resources = new_state["components"][base_component].get(
"resources", {}
)
if not resources.get("allocated_gpus"):
print(f"✓ Successfully cleared resources for {base_component}")
return True
else:
print(f"✗ Failed to clear resources for {base_component}")
return False
# If we get here, neither condition was met
print(f"✗ Unexpected state after removing {component}")
return False
print(f"✗ Failed to remove {component}")
return False
except Exception as e:
print(f"✗ Remove {component} test failed: {e}")
return False
@dynamo_worker()
async def main(runtime: DistributedRuntime):
connector = LocalConnector("dynamo", runtime)
await connector.add_component("PrefillWorker")
await connector.add_component("VllmWorker")
await connector.remove_component("VllmWorker")
await connector.remove_component("PrefillWorker")
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash
# This is a simple load test script for the planner component. To validate
# 1. Run 1P1D (default disagg router setup)
# 2. Start planner with python components/planner.py --namespace dynamo --decode-kv-scale-up-threshold 0.2 --decode-kv-scale-down-threshold 0.1 --adjustment-interval 10
# 3. Run ./load_test.sh 100
# Expected behavior is scale up to 1P2D and then back down to 1P1D
# Check if the number of executions is provided
if [ $# -ne 1 ]; then
echo "Usage: $0 <number_of_executions>"
exit 1
fi
# Store the number of executions
executions=$1
echo "Starting $executions non-blocking executions..."
# Launch the command x times in the background
for (( i=1; i<=$executions; i++ )); do
# isl around 2000
curl localhost:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"messages": [
{
"role": "user",
"content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden. In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden."
}
],
"stream": true,
"max_tokens": 500
}' > /dev/null 2>&1 &
done
echo "All $executions executions have been launched!"
\ No newline at end of file
...@@ -30,6 +30,8 @@ pydantic==2.7.1 ...@@ -30,6 +30,8 @@ pydantic==2.7.1
pyright pyright
PyYAML PyYAML
sentencepiece sentencepiece
tensorboard==2.19.0
tensorboardX==2.6.2.2
transformers transformers
tritonclient==2.53.0 tritonclient==2.53.0
types-PyYAML types-PyYAML
......
...@@ -3270,7 +3270,7 @@ index 54f7b8fb6..0559f9db2 100644 ...@@ -3270,7 +3270,7 @@ index 54f7b8fb6..0559f9db2 100644
def _abort_and_cache_schedule( def _abort_and_cache_schedule(
diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py
index cafd8150b..ab1c11329 100644 index cafd8150b..6a5e45b4e 100644
--- a/vllm/engine/multiprocessing/__init__.py --- a/vllm/engine/multiprocessing/__init__.py
+++ b/vllm/engine/multiprocessing/__init__.py +++ b/vllm/engine/multiprocessing/__init__.py
@@ -1,4 +1,17 @@ @@ -1,4 +1,17 @@
...@@ -3335,16 +3335,42 @@ index cafd8150b..ab1c11329 100644 ...@@ -3335,16 +3335,42 @@ index cafd8150b..ab1c11329 100644
@dataclass @dataclass
class RPCError: class RPCError:
@@ -116,7 +135,7 @@ class RPCStartupRequest(Enum): @@ -113,9 +132,21 @@ class RPCStartupRequest(Enum):
IS_SERVER_READY = 1
+@dataclass
+class RPCHasUnfinishedRequestsRequest:
+ request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
+
+
@dataclass @dataclass
class RPCStartupResponse: class RPCStartupResponse:
tracing_enabled: bool tracing_enabled: bool
-
+ nixl_metadata: Optional[bytes] = None + nixl_metadata: Optional[bytes] = None
+
+
+@dataclass
+class RPCHasUnfinishedRequestsResponse:
+ has_unfinished_requests: bool
+ request_id: str
class RPCUProfileRequest(Enum): class RPCUProfileRequest(Enum):
START_PROFILE = 1 @@ -165,10 +196,10 @@ class RPCAdapterLoadedResponse:
@@ -181,3 +200,13 @@ def ENGINE_DEAD_ERROR( RPC_REQUEST_T = Union[RPCProcessRequest, RPCAbortRequest, RPCStartupRequest,
RPCUProfileRequest, RPCLoadAdapterRequest,
RPCResetPrefixCacheRequest, RPCSleepRequest,
- RPCWakeUpRequest, RPCIsSleepingRequest]
+ RPCWakeUpRequest, RPCIsSleepingRequest, RPCHasUnfinishedRequestsRequest]
REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCAdapterLoadedResponse,
- RPCIsSleepingResponse, RPCError]
+ RPCIsSleepingResponse, RPCError, RPCHasUnfinishedRequestsResponse]
def ENGINE_DEAD_ERROR(
@@ -181,3 +212,13 @@ def ENGINE_DEAD_ERROR(
return MQEngineDeadError( return MQEngineDeadError(
"Engine loop is not running. Inspect the stacktrace to " "Engine loop is not running. Inspect the stacktrace to "
f"find the original error: {repr(error)}.") f"find the original error: {repr(error)}.")
...@@ -3359,7 +3385,7 @@ index cafd8150b..ab1c11329 100644 ...@@ -3359,7 +3385,7 @@ index cafd8150b..ab1c11329 100644
+ gpu_cache_usage_perc: float + gpu_cache_usage_perc: float
+ gpu_prefix_cache_hit_rate: float + gpu_prefix_cache_hit_rate: float
diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py
index f058b1329..fa99e635c 100644 index f058b1329..2fdb5b8bf 100644
--- a/vllm/engine/multiprocessing/client.py --- a/vllm/engine/multiprocessing/client.py
+++ b/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py
@@ -1,4 +1,17 @@ @@ -1,4 +1,17 @@
...@@ -3408,16 +3434,19 @@ index f058b1329..fa99e635c 100644 ...@@ -3408,16 +3434,19 @@ index f058b1329..fa99e635c 100644
RPCAdapterLoadedResponse, RPCError, RPCAdapterLoadedResponse, RPCError,
RPCIsSleepingRequest, RPCIsSleepingRequest,
RPCIsSleepingResponse, RPCIsSleepingResponse,
@@ -34,7 +51,7 @@ from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, @@ -33,8 +50,9 @@ from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT,
RPCProcessRequest,
RPCResetPrefixCacheRequest, RPCResetPrefixCacheRequest,
RPCSleepRequest, RPCStartupRequest, RPCSleepRequest, RPCStartupRequest,
RPCStartupResponse, - RPCStartupResponse,
- RPCUProfileRequest, RPCWakeUpRequest) - RPCUProfileRequest, RPCWakeUpRequest)
+ RPCStartupResponse, RPCHasUnfinishedRequestsRequest,
+ RPCHasUnfinishedRequestsResponse,
+ RPCUProfileRequest, KvMetrics, RPCWakeUpRequest) + RPCUProfileRequest, KvMetrics, RPCWakeUpRequest)
from vllm.engine.protocol import EngineClient from vllm.engine.protocol import EngineClient
# yapf: enable # yapf: enable
from vllm.envs import VLLM_RPC_TIMEOUT from vllm.envs import VLLM_RPC_TIMEOUT
@@ -48,6 +65,8 @@ from vllm.prompt_adapter.request import PromptAdapterRequest @@ -48,6 +66,8 @@ from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import SamplingParams from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.utils import Device, deprecate_kwargs from vllm.utils import Device, deprecate_kwargs
...@@ -3426,7 +3455,7 @@ index f058b1329..fa99e635c 100644 ...@@ -3426,7 +3455,7 @@ index f058b1329..fa99e635c 100644
logger = init_logger(__name__) logger = init_logger(__name__)
@@ -93,6 +112,7 @@ class MQLLMEngineClient(EngineClient): @@ -93,6 +113,7 @@ class MQLLMEngineClient(EngineClient):
self._errored_with: Optional[BaseException] = None self._errored_with: Optional[BaseException] = None
# Get the configs. # Get the configs.
...@@ -3434,7 +3463,7 @@ index f058b1329..fa99e635c 100644 ...@@ -3434,7 +3463,7 @@ index f058b1329..fa99e635c 100644
self.model_config = engine_config.model_config self.model_config = engine_config.model_config
self.decoding_config = engine_config.decoding_config self.decoding_config = engine_config.decoding_config
@@ -117,6 +137,10 @@ class MQLLMEngineClient(EngineClient): @@ -117,6 +138,10 @@ class MQLLMEngineClient(EngineClient):
self.heartbeat_socket: Socket = self.context.socket(zmq.constants.PULL) self.heartbeat_socket: Socket = self.context.socket(zmq.constants.PULL)
self.heartbeat_socket.connect(f"{ipc_path}{IPC_HEALTH_EXT}") self.heartbeat_socket.connect(f"{ipc_path}{IPC_HEALTH_EXT}")
...@@ -3445,7 +3474,7 @@ index f058b1329..fa99e635c 100644 ...@@ -3445,7 +3474,7 @@ index f058b1329..fa99e635c 100644
# IPC path for the data socket. # IPC path for the data socket.
self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}" self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}"
@@ -131,8 +155,27 @@ class MQLLMEngineClient(EngineClient): @@ -131,8 +156,27 @@ class MQLLMEngineClient(EngineClient):
# Loop to check health of the LLMEngine periodically. # Loop to check health of the LLMEngine periodically.
# Started after the MQLLMEngine is ready. # Started after the MQLLMEngine is ready.
self.health_loop: Optional[asyncio.Task] = None self.health_loop: Optional[asyncio.Task] = None
...@@ -3473,7 +3502,7 @@ index f058b1329..fa99e635c 100644 ...@@ -3473,7 +3502,7 @@ index f058b1329..fa99e635c 100644
@staticmethod @staticmethod
def is_unsupported_config(vllm_config: VllmConfig): def is_unsupported_config(vllm_config: VllmConfig):
# Pipeline parallel not yet supported # Pipeline parallel not yet supported
@@ -182,6 +225,61 @@ class MQLLMEngineClient(EngineClient): @@ -182,6 +226,61 @@ class MQLLMEngineClient(EngineClient):
except Exception as e: except Exception as e:
self._set_errored(e) self._set_errored(e)
...@@ -3535,7 +3564,25 @@ index f058b1329..fa99e635c 100644 ...@@ -3535,7 +3564,25 @@ index f058b1329..fa99e635c 100644
async def run_output_handler_loop(self): async def run_output_handler_loop(self):
"""Get RequestOutputs from Engine and stream to Request Queues""" """Get RequestOutputs from Engine and stream to Request Queues"""
@@ -283,12 +381,26 @@ class MQLLMEngineClient(EngineClient): @@ -250,7 +349,7 @@ class MQLLMEngineClient(EngineClient):
# Put each output into the appropriate queue.
elif isinstance(
request_outputs,
- (RPCAdapterLoadedResponse, RPCIsSleepingResponse)):
+ (RPCAdapterLoadedResponse, RPCIsSleepingResponse, RPCHasUnfinishedRequestsResponse)):
self._add_output(request_outputs)
else:
for request_output in request_outputs:
@@ -261,7 +360,7 @@ class MQLLMEngineClient(EngineClient):
def _add_output(self, request_output: Union[RequestOutput,
RPCAdapterLoadedResponse,
- RPCIsSleepingResponse]):
+ RPCIsSleepingResponse, RPCHasUnfinishedRequestsResponse]):
queue = self.output_queues.get(request_output.request_id)
if queue is not None:
queue.put_nowait(request_output)
@@ -283,12 +382,25 @@ class MQLLMEngineClient(EngineClient):
# Wait until server is ready. # Wait until server is ready.
response = await self._wait_for_server_rpc(socket) response = await self._wait_for_server_rpc(socket)
...@@ -3558,7 +3605,6 @@ index f058b1329..fa99e635c 100644 ...@@ -3558,7 +3605,6 @@ index f058b1329..fa99e635c 100644
+ if self.metrics_loop is None: + if self.metrics_loop is None:
+ self.metrics_loop = asyncio.create_task( + self.metrics_loop = asyncio.create_task(
+ self.run_metrics_loop(timeout=VLLM_RPC_TIMEOUT)) + self.run_metrics_loop(timeout=VLLM_RPC_TIMEOUT))
+
def close(self): def close(self):
"""Destroy the ZeroMQ Context.""" """Destroy the ZeroMQ Context."""
...@@ -3634,15 +3680,31 @@ index f058b1329..fa99e635c 100644 ...@@ -3634,15 +3680,31 @@ index f058b1329..fa99e635c 100644
await self.input_socket.send_multipart(parts, copy=False) await self.input_socket.send_multipart(parts, copy=False)
# 4) Stream the RequestOutputs from the output queue. Note # 4) Stream the RequestOutputs from the output queue. Note
@@ -740,3 +866,6 @@ class MQLLMEngineClient(EngineClient): @@ -740,3 +866,22 @@ class MQLLMEngineClient(EngineClient):
# Raise on error, otherwise happily return None # Raise on error, otherwise happily return None
if isinstance(request_output, BaseException): if isinstance(request_output, BaseException):
raise request_output raise request_output
+ +
+ def set_metrics_publisher(self, metrics_publisher): + def set_metrics_publisher(self, metrics_publisher):
+ self.metrics_publisher = metrics_publisher + self.metrics_publisher = metrics_publisher
+
+ async def has_unfinished_requests(self) -> bool:
+ logger.info("Checking if there are unfinished requests")
+ if "has_unfinished_requests" not in self.output_queues:
+ logger.info("Creating has unfinished requests queue")
+
+ request = RPCHasUnfinishedRequestsRequest()
+ queue: asyncio.Queue[Union[BaseException, RPCHasUnfinishedRequestsResponse]] = asyncio.Queue()
+ self.output_queues[request.request_id] = queue
+ request_bytes = pickle.dumps(request)
+ await self.input_socket.send_multipart((request_bytes, ), copy=False)
+ response = await queue.get()
+ self.output_queues.pop(request.request_id)
+ if isinstance(response, BaseException):
+ raise response
+ return response.has_unfinished_requests
diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py
index 6ed5ae0a9..af4bd942b 100644 index 6ed5ae0a9..3a320c42c 100644
--- a/vllm/engine/multiprocessing/engine.py --- a/vllm/engine/multiprocessing/engine.py
+++ b/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py
@@ -1,13 +1,27 @@ @@ -1,13 +1,27 @@
...@@ -3688,11 +3750,13 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3688,11 +3750,13 @@ index 6ed5ae0a9..af4bd942b 100644
RPCAdapterLoadedResponse, RPCError, RPCAdapterLoadedResponse, RPCError,
RPCIsSleepingRequest, RPCIsSleepingRequest,
RPCIsSleepingResponse, RPCIsSleepingResponse,
@@ -25,13 +41,19 @@ from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, @@ -25,13 +41,21 @@ from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT,
RPCResetPrefixCacheRequest, RPCResetPrefixCacheRequest,
RPCSleepRequest, RPCStartupRequest, RPCSleepRequest, RPCStartupRequest,
RPCStartupResponse, RPCStartupResponse,
- RPCUProfileRequest, RPCWakeUpRequest) - RPCUProfileRequest, RPCWakeUpRequest)
+ RPCHasUnfinishedRequestsRequest,
+ RPCHasUnfinishedRequestsResponse,
+ RPCUProfileRequest, RPCWakeUpRequest, KvMetrics, + RPCUProfileRequest, RPCWakeUpRequest, KvMetrics,
+ IPC_REMOTE_NIXL_METADATA_EXT) + IPC_REMOTE_NIXL_METADATA_EXT)
# yapf: enable # yapf: enable
...@@ -3709,7 +3773,7 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3709,7 +3773,7 @@ index 6ed5ae0a9..af4bd942b 100644
from vllm.worker.model_runner_base import InputProcessingError from vllm.worker.model_runner_base import InputProcessingError
logger = init_logger(__name__) logger = init_logger(__name__)
@@ -39,6 +61,77 @@ logger = init_logger(__name__) @@ -39,6 +63,77 @@ logger = init_logger(__name__)
POLLING_TIMEOUT_MS = 10000 POLLING_TIMEOUT_MS = 10000
HEALTHY_RESPONSE = (pickle.dumps(VLLM_RPC_SUCCESS_STR), ) HEALTHY_RESPONSE = (pickle.dumps(VLLM_RPC_SUCCESS_STR), )
...@@ -3787,7 +3851,7 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3787,7 +3851,7 @@ index 6ed5ae0a9..af4bd942b 100644
class MQLLMEngine: class MQLLMEngine:
"""A multiprocessing wrapper for :class:`LLMEngine`. """A multiprocessing wrapper for :class:`LLMEngine`.
@@ -101,12 +194,37 @@ class MQLLMEngine: @@ -101,12 +196,37 @@ class MQLLMEngine:
self.heartbeat_socket = self.ctx.socket(zmq.constants.PUSH) self.heartbeat_socket = self.ctx.socket(zmq.constants.PUSH)
self.heartbeat_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}") self.heartbeat_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}")
...@@ -3825,7 +3889,7 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3825,7 +3889,7 @@ index 6ed5ae0a9..af4bd942b 100644
@property @property
def dead_error(self) -> BaseException: def dead_error(self) -> BaseException:
if self._errored_with is not None: if self._errored_with is not None:
@@ -192,8 +310,17 @@ class MQLLMEngine: @@ -192,8 +312,17 @@ class MQLLMEngine:
# Handle the query from the Client. # Handle the query from the Client.
if request == RPCStartupRequest.IS_SERVER_READY: if request == RPCStartupRequest.IS_SERVER_READY:
tracing_enabled = self.engine.is_tracing_enabled() tracing_enabled = self.engine.is_tracing_enabled()
...@@ -3845,7 +3909,7 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3845,7 +3909,7 @@ index 6ed5ae0a9..af4bd942b 100644
except Exception as e: except Exception as e:
response = e response = e
@@ -206,6 +333,7 @@ class MQLLMEngine: @@ -206,6 +335,7 @@ class MQLLMEngine:
while True: while True:
if not self.engine.has_unfinished_requests(): if not self.engine.has_unfinished_requests():
...@@ -3853,7 +3917,7 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3853,7 +3917,7 @@ index 6ed5ae0a9..af4bd942b 100644
# Poll until there is work to do. # Poll until there is work to do.
while self.input_socket.poll(timeout=POLLING_TIMEOUT_MS) == 0: while self.input_socket.poll(timeout=POLLING_TIMEOUT_MS) == 0:
# When there's no work, check on engine health and send # When there's no work, check on engine health and send
@@ -249,6 +377,13 @@ class MQLLMEngine: @@ -249,6 +379,13 @@ class MQLLMEngine:
def handle_new_input(self): def handle_new_input(self):
"""Handle new input from the socket""" """Handle new input from the socket"""
try: try:
...@@ -3867,7 +3931,16 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3867,7 +3931,16 @@ index 6ed5ae0a9..af4bd942b 100644
while self.input_socket.poll(timeout=0) != 0: while self.input_socket.poll(timeout=0) != 0:
frames = self.input_socket.recv_multipart(copy=False) frames = self.input_socket.recv_multipart(copy=False)
request = pickle.loads(frames[0].buffer) request = pickle.loads(frames[0].buffer)
@@ -297,6 +432,11 @@ class MQLLMEngine: @@ -277,6 +414,8 @@ class MQLLMEngine:
self.wake_up(request.tags)
elif isinstance(request, RPCIsSleepingRequest):
self._handle_is_sleeping_request(request)
+ elif isinstance(request, RPCHasUnfinishedRequestsRequest):
+ self._handle_has_unfinished_requests_request(request)
else:
raise ValueError("Unknown RPCRequest Type: "
f"{type(request)}")
@@ -297,6 +436,11 @@ class MQLLMEngine:
self._send_outputs(rpc_err) self._send_outputs(rpc_err)
try: try:
...@@ -3879,7 +3952,7 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3879,7 +3952,7 @@ index 6ed5ae0a9..af4bd942b 100644
self.engine.add_request( self.engine.add_request(
request_id=request_id, request_id=request_id,
prompt=request.prompt, prompt=request.prompt,
@@ -304,7 +444,9 @@ class MQLLMEngine: @@ -304,7 +448,9 @@ class MQLLMEngine:
lora_request=request.lora_request, lora_request=request.lora_request,
trace_headers=request.trace_headers, trace_headers=request.trace_headers,
prompt_adapter_request=request.prompt_adapter_request, prompt_adapter_request=request.prompt_adapter_request,
...@@ -3890,6 +3963,17 @@ index 6ed5ae0a9..af4bd942b 100644 ...@@ -3890,6 +3963,17 @@ index 6ed5ae0a9..af4bd942b 100644
if self.log_requests: if self.log_requests:
logger.info("Added request %s.", request.request_id) logger.info("Added request %s.", request.request_id)
@@ -348,6 +494,10 @@ class MQLLMEngine:
self._send_outputs(
RPCIsSleepingResponse(request_id=request.request_id,
is_sleeping=is_sleeping))
+
+ def _handle_has_unfinished_requests_request(self, request: RPCHasUnfinishedRequestsRequest):
+ response = RPCHasUnfinishedRequestsResponse(request_id=request.request_id, has_unfinished_requests=self.engine.has_unfinished_requests())
+ self._send_outputs(response)
def _health_check(self):
# Send unhealthy if engine has already errored
diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py
index dd0b67df4..f436b0752 100644 index dd0b67df4..f436b0752 100644
--- a/vllm/entrypoints/openai/serving_chat.py --- a/vllm/entrypoints/openai/serving_chat.py
......
...@@ -25,7 +25,12 @@ from _bentoml_sdk import Service ...@@ -25,7 +25,12 @@ from _bentoml_sdk import Service
from simple_di import inject from simple_di import inject
# Import our own resource module # Import our own resource module
from dynamo.sdk.lib.resource import NVIDIA_GPU, GPUManager, system_resources from dynamo.sdk.lib.resource import (
NVIDIA_GPU,
GPUManager,
ResourceError,
system_resources,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -33,6 +38,8 @@ logger = logging.getLogger(__name__) ...@@ -33,6 +38,8 @@ logger = logging.getLogger(__name__)
DYN_DISABLE_AUTO_GPU_ALLOCATION = "DYN_DISABLE_AUTO_GPU_ALLOCATION" DYN_DISABLE_AUTO_GPU_ALLOCATION = "DYN_DISABLE_AUTO_GPU_ALLOCATION"
DYN_DEPLOYMENT_ENV = "DYN_DEPLOYMENT_ENV" DYN_DEPLOYMENT_ENV = "DYN_DEPLOYMENT_ENV"
logger = logging.getLogger(__name__)
def format_memory_gb(memory_bytes: float) -> str: def format_memory_gb(memory_bytes: float) -> str:
"""Convert memory from bytes to formatted GB string. """Convert memory from bytes to formatted GB string.
...@@ -56,8 +63,12 @@ class ResourceAllocator: ...@@ -56,8 +63,12 @@ class ResourceAllocator:
(1.0, 1.0) # each item is (remaining, unit) (1.0, 1.0) # each item is (remaining, unit)
for _ in range(self.remaining_gpus) for _ in range(self.remaining_gpus)
] ]
self._service_gpu_allocations: dict[str, list[int]] = {}
logger.debug(
f"ResourceAllocator initialized with {self.remaining_gpus} GPUs available"
)
def assign_gpus(self, count: float) -> list[int]: def assign_gpus(self, count: float, service_name: str = "") -> list[int]:
""" """
Assign GPUs for use. Assign GPUs for use.
...@@ -67,8 +78,72 @@ class ResourceAllocator: ...@@ -67,8 +78,72 @@ class ResourceAllocator:
Returns: Returns:
List of GPU indices that were assigned List of GPU indices that were assigned
""" """
# Use our GPU manager's assign_gpus method if count > self.remaining_gpus:
return self.gpu_manager.assign_gpus(count) logger.warning(
f"Requested {count} GPUs, but only {self.remaining_gpus} are remaining. "
f"Serving may fail due to inadequate GPUs. Set {DYN_DISABLE_AUTO_GPU_ALLOCATION}=1 "
"to disable automatic allocation and allocate GPUs manually."
)
self.remaining_gpus = int(max(0, self.remaining_gpus - count))
assigned = [] # Will store assigned GPU indices
if count < 1: # a fractional GPU
try:
# try to find the GPU used with the same fragment
gpu = next(
i
for i, v in enumerate(self._available_gpus)
if v[0] > 0 and v[1] == count
)
except StopIteration:
try:
gpu = next(
i for i, v in enumerate(self._available_gpus) if v[0] == 1.0
)
except StopIteration:
gpu = len(self._available_gpus)
self._available_gpus.append((1.0, count))
remaining, _ = self._available_gpus[gpu]
if (remaining := remaining - count) < count:
# can't assign to the next one, mark it as zero.
self._available_gpus[gpu] = (0.0, count)
else:
self._available_gpus[gpu] = (remaining, count)
assigned = [gpu]
else: # allocate n GPUs, n is a positive integer
if int(count) != count:
raise ResourceError("Float GPUs larger than 1 is not supported")
count = int(count)
unassigned = [
gpu
for gpu, value in enumerate(self._available_gpus)
if value[0] > 0 and value[1] == 1.0
]
if len(unassigned) < count:
logger.warning(f"Not enough GPUs to be assigned, {count} is requested")
for _ in range(count - len(unassigned)):
unassigned.append(len(self._available_gpus))
self._available_gpus.append((1.0, 1.0))
for gpu in unassigned[:count]:
self._available_gpus[gpu] = (0.0, 1.0)
assigned = unassigned[:count]
# Store the allocation if service_name is provided
if service_name and assigned:
if service_name in self._service_gpu_allocations:
self._service_gpu_allocations[service_name].extend(assigned)
logger.debug(
f"Additional GPUs {assigned} allocated to service '{service_name}', "
f"total GPUs: {self._service_gpu_allocations[service_name]}"
)
else:
self._service_gpu_allocations[service_name] = assigned
logger.debug(f"GPUs {assigned} allocated to service '{service_name}'")
elif assigned:
logger.debug(f"GPUs {assigned} allocated without service name tracking")
return assigned
def get_gpu_stats(self) -> list[dict[str, Any]]: def get_gpu_stats(self) -> list[dict[str, Any]]:
"""Get detailed statistics for all GPUs.""" """Get detailed statistics for all GPUs."""
...@@ -127,7 +202,7 @@ class ResourceAllocator: ...@@ -127,7 +202,7 @@ class ResourceAllocator:
logger.info("K8s deployment detected") logger.info("K8s deployment detected")
# K8s replicas: Assumes DYNAMO_DEPLOYMENT_ENV is set # K8s replicas: Assumes DYNAMO_DEPLOYMENT_ENV is set
# each pod in replicaset will have separate GPU with same CUDA_VISIBLE_DEVICES # each pod in replicaset will have separate GPU with same CUDA_VISIBLE_DEVICES
assigned = self.assign_gpus(num_gpus) assigned = self.assign_gpus(num_gpus, service.name)
logger.info(f"Assigned GPUs for K8s: {assigned}") logger.info(f"Assigned GPUs for K8s: {assigned}")
# Generate environment variables for each worker # Generate environment variables for each worker
...@@ -135,11 +210,15 @@ class ResourceAllocator: ...@@ -135,11 +210,15 @@ class ResourceAllocator:
env_vars = {"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))} env_vars = {"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))}
resource_envs.append(env_vars) resource_envs.append(env_vars)
else: else:
logger.info("Local deployment detected") logger.info(
f"Local deployment detected. Allocating GPUs for {num_workers} workers of '{service.name}'"
)
# Local deployment where we split all available GPUs across workers # Local deployment where we split all available GPUs across workers
for worker_id in range(num_workers): for worker_id in range(num_workers):
assigned = self.assign_gpus(num_gpus) assigned = self.assign_gpus(num_gpus, service.name)
logger.info(f"Assigned GPUs for worker {worker_id}: {assigned}") logger.debug(
f"Worker {worker_id} of '{service.name}' assigned GPUs: {assigned}"
)
# Generate environment variables for this worker # Generate environment variables for this worker
env_vars = {"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))} env_vars = {"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))}
......
...@@ -81,7 +81,7 @@ def serve( ...@@ -81,7 +81,7 @@ def serve(
False, False,
help="Print the final service configuration and exit without starting the server", help="Print the final service configuration and exit without starting the server",
), ),
enable_planner: bool = typer.Option( enable_local_planner: bool = typer.Option(
False, False,
help="Save a snapshot of your service state to a file that allows planner to edit your deployment configuration", help="Save a snapshot of your service state to a file that allows planner to edit your deployment configuration",
), ),
...@@ -162,5 +162,5 @@ def serve( ...@@ -162,5 +162,5 @@ def serve(
# port=port, # port=port,
dependency_map=runner_map_dict, dependency_map=runner_map_dict,
service_name=service_name, service_name=service_name,
enable_planner=enable_planner, enable_local_planner=enable_local_planner,
) )
...@@ -35,6 +35,7 @@ from fastapi.responses import StreamingResponse ...@@ -35,6 +35,7 @@ from fastapi.responses import StreamingResponse
from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.sdk import dynamo_context from dynamo.sdk import dynamo_context
from dynamo.sdk.cli.utils import append_dynamo_state
from dynamo.sdk.lib.service import LinkedServices from dynamo.sdk.lib.service import LinkedServices
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -122,13 +123,24 @@ def setup_signal_handlers(): ...@@ -122,13 +123,24 @@ def setup_signal_handlers():
default=None, default=None,
help="If set, start the server as a bare worker with the given worker ID. Otherwise start a standalone server with a supervisor process.", help="If set, start the server as a bare worker with the given worker ID. Otherwise start a standalone server with a supervisor process.",
) )
@click.option(
"--custom-component-name",
required=False,
type=click.STRING,
default=None,
help="If set, use this custom component name instead of the default service name",
)
def main( def main(
bento_identifier: str, bento_identifier: str,
service_name: str, service_name: str,
runner_map: str | None, runner_map: str | None,
worker_env: str | None, worker_env: str | None,
worker_id: int | None, worker_id: int | None,
custom_component_name: str | None,
) -> None: ) -> None:
# hack to avoid bentoml from respawning the workers after their leases are revoked
os.environ["BENTOML_CONTAINERIZED"] = "true"
"""Start a worker for the given service - either Dynamo or regular service""" """Start a worker for the given service - either Dynamo or regular service"""
from _bentoml_impl.loader import import_service from _bentoml_impl.loader import import_service
from bentoml._internal.container import BentoMLContainer from bentoml._internal.container import BentoMLContainer
...@@ -258,7 +270,19 @@ def main( ...@@ -258,7 +270,19 @@ def main(
logger.info(f"Serving {service.name} with primary lease") logger.info(f"Serving {service.name} with primary lease")
else: else:
logger.info(f"Serving {service.name} with lease: {lease.id()}") logger.info(f"Serving {service.name} with lease: {lease.id()}")
# Map custom lease to component
watcher_name = None
if custom_component_name:
watcher_name = custom_component_name
else:
watcher_name = f"{namespace}_{component_name}"
append_dynamo_state(namespace, watcher_name, {"lease": lease.id()})
logger.info(
f"Appended lease {lease.id()}/{lease.id():x} to {watcher_name}"
)
result = await endpoints[0].serve_endpoint(twm[0], lease) result = await endpoints[0].serve_endpoint(twm[0], lease)
if class_instance.__class__.__name__ == "PrefillWorker":
await asyncio.wait_for(class_instance.task, timeout=None)
except GracefulExit: except GracefulExit:
logger.info(f"[{run_id}] Gracefully shutting down {service.name}") logger.info(f"[{run_id}] Gracefully shutting down {service.name}")
......
...@@ -34,7 +34,7 @@ from simple_di import inject ...@@ -34,7 +34,7 @@ from simple_di import inject
from dynamo.sdk.cli.circus import CircusRunner from dynamo.sdk.cli.circus import CircusRunner
from .allocator import ResourceAllocator from .allocator import NVIDIA_GPU, ResourceAllocator
from .circus import _get_server_socket from .circus import _get_server_socket
from .utils import ( from .utils import (
DYN_LOCAL_STATE_DIR, DYN_LOCAL_STATE_DIR,
...@@ -129,7 +129,7 @@ def serve_dynamo_graph( ...@@ -129,7 +129,7 @@ def serve_dynamo_graph(
working_dir: str | None = None, working_dir: str | None = None,
dependency_map: dict[str, str] | None = None, dependency_map: dict[str, str] | None = None,
service_name: str = "", service_name: str = "",
enable_planner: bool = False, enable_local_planner: bool = False,
) -> CircusRunner: ) -> CircusRunner:
from dynamo.sdk.cli.circus import create_arbiter, create_circus_watcher from dynamo.sdk.cli.circus import create_arbiter, create_circus_watcher
from dynamo.sdk.lib.loader import find_and_load_service from dynamo.sdk.lib.loader import find_and_load_service
...@@ -264,7 +264,7 @@ def serve_dynamo_graph( ...@@ -264,7 +264,7 @@ def serve_dynamo_graph(
arbiter = create_arbiter(**arbiter_kwargs) arbiter = create_arbiter(**arbiter_kwargs)
arbiter.exit_stack.callback(shutil.rmtree, uds_path, ignore_errors=True) arbiter.exit_stack.callback(shutil.rmtree, uds_path, ignore_errors=True)
if enable_planner: if enable_local_planner:
arbiter.exit_stack.callback( arbiter.exit_stack.callback(
shutil.rmtree, shutil.rmtree,
os.environ.get( os.environ.get(
...@@ -272,24 +272,80 @@ def serve_dynamo_graph( ...@@ -272,24 +272,80 @@ def serve_dynamo_graph(
), ),
ignore_errors=True, ignore_errors=True,
) )
logger.warning(f"arbiter: {arbiter.endpoint}") logger.warn(f"arbiter: {arbiter.endpoint}")
# save deployment state for planner # save deployment state for planner
if not namespace: if not namespace:
raise ValueError("No namespace found for service") raise ValueError("No namespace found for service")
save_dynamo_state(
namespace, # Track GPU allocation for each component
arbiter.endpoint, component_resources = {}
components={ logger.info(f"Building component resources for {len(watchers)} watchers")
for watcher in watchers:
component_name = watcher.name
logger.info(f"Processing watcher: {component_name}")
# Extract worker info including GPU allocation
worker_gpu_info: dict[str, Any] = {}
# Extract service name from watcher name
service_name = ""
if component_name.startswith(f"{namespace}"):
service_name = component_name.replace(f"{namespace}_", "", 1)
# Get GPU allocation from ResourceAllocator
if (
not worker_gpu_info
and hasattr(allocator, "_service_gpu_allocations")
and service_name
):
gpu_allocations = getattr(allocator, "_service_gpu_allocations", {})
if service_name in gpu_allocations:
logger.info(
f"Found GPU allocation for {service_name} in ResourceAllocator: {gpu_allocations[service_name]}"
)
worker_gpu_info["allocated_gpus"] = gpu_allocations[
service_name
]
# Store final worker GPU info
component_resources[component_name] = worker_gpu_info
logger.info(f"Final GPU info for {component_name}: {worker_gpu_info}")
logger.info(f"Completed component resources: {component_resources}")
# Now create components dict with resources included
components_dict = {
watcher.name: { watcher.name: {
"watcher_name": watcher.name, "watcher_name": watcher.name,
"cmd": watcher.cmd + " ".join(watcher.args), "cmd": watcher.cmd
+ " -m "
+ " ".join(
watcher.args[1:]
) # WAR because it combines python-m into 1 word
if hasattr(watcher, "args")
else watcher.cmd,
"resources": component_resources.get(watcher.name, {}),
} }
for watcher in watchers for watcher in watchers
}, }
save_dynamo_state(
namespace,
arbiter.endpoint,
components=components_dict,
environment={ environment={
"DYNAMO_SERVICE_CONFIG": os.environ["DYNAMO_SERVICE_CONFIG"], "DYNAMO_SERVICE_CONFIG": os.environ["DYNAMO_SERVICE_CONFIG"],
"SYSTEM_RESOURCES": {
"total_gpus": len(allocator.system_resources[NVIDIA_GPU]),
"gpu_info": [
str(gpu) for gpu in allocator.system_resources[NVIDIA_GPU]
],
},
}, },
) )
arbiter.start( arbiter.start(
cb=lambda _: logger.info( # type: ignore cb=lambda _: logger.info( # type: ignore
( (
......
...@@ -180,6 +180,35 @@ def save_dynamo_state( ...@@ -180,6 +180,35 @@ def save_dynamo_state(
logger.warning(f"Saved state to {state_file}") logger.warning(f"Saved state to {state_file}")
def append_dynamo_state(namespace: str, component_name: str, data: dict) -> None:
"""Append additional data to an existing component's state"""
state_dir = os.environ.get(
DYN_LOCAL_STATE_DIR, os.path.expanduser("~/.dynamo/state")
)
state_file = os.path.join(state_dir, f"{namespace}.json")
if not os.path.exists(state_file):
logger.warning(
f"Skipping append to state file {state_file} because it doesn't exist"
)
return
with open(state_file, "r") as f:
state = json.load(f)
if "components" not in state:
state["components"] = {}
if component_name not in state["components"]:
state["components"][component_name] = {}
state["components"][component_name].update(data)
logger.warning(f"Appending {data} to {component_name} in {state_file}")
with open(state_file, "w") as f:
json.dump(state, f)
def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, Any]: def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, Any]:
"""Parse a single CLI argument into service name, key, and value.""" """Parse a single CLI argument into service name, key, and value."""
......
...@@ -306,91 +306,6 @@ class GPUManager: ...@@ -306,91 +306,6 @@ class GPUManager:
logger.warning(f"Error getting GPU processes for GPU {index}: {e}") logger.warning(f"Error getting GPU processes for GPU {index}: {e}")
return [] return []
def assign_gpus(self, count: float) -> list[int]:
"""
Assign GPUs for use. It can handle fractional GPU requests.
Args:
count: Number of GPUs to assign (can be fractional)
Returns:
List of GPU indices that were assigned
"""
available_gpus = self.get_available_gpus()
if count > len(available_gpus):
logger.warning(
f"Requested {count} GPUs, but only {len(available_gpus)} are available. "
"Service may fail due to inadequate GPU resources."
)
# Handle fractional GPU allocation
if count < 1:
# Try to find a GPU with the same fraction size
try:
# Find a GPU where we've already used the same fraction size
gpu_idx, used_fraction = next(
(idx, used)
for idx, used, frac_size in self._gpu_fractions
if frac_size == count and used < 1.0
)
# Update the usage for this GPU
for i, (idx, used, frac_size) in enumerate(self._gpu_fractions):
if idx == gpu_idx and frac_size == count:
new_used = used + count
if new_used > 1.0:
new_used = 1.0 # Cap at 1.0
self._gpu_fractions[i] = (idx, new_used, frac_size)
break
return [gpu_idx]
except StopIteration:
# No existing fraction of this size, find a free GPU
if available_gpus:
gpu_idx = available_gpus[0]
self._gpu_fractions.append((gpu_idx, count, count))
return [gpu_idx]
else:
# No available GPUs, return the first GPU (or log warning)
if self.gpus:
logger.warning("No available GPUs, using GPU 0 by default")
self._gpu_fractions.append((0, count, count))
return [0]
else:
logger.error("No GPUs available for allocation")
return []
# Integer GPU allocation
if count >= 1:
if int(count) != count:
raise ResourceError(
"Fractional GPU count greater than 1 is not supported"
)
count_int = int(count)
assigned_gpus = available_gpus[:count_int]
# Mark these GPUs as fully used
for gpu_idx in assigned_gpus:
# Check if this GPU is already in _gpu_fractions
if not any(idx == gpu_idx for idx, _, _ in self._gpu_fractions):
self._gpu_fractions.append((gpu_idx, 1.0, 1.0))
else:
# Update the existing entry
for i, (idx, _, frac_size) in enumerate(self._gpu_fractions):
if idx == gpu_idx:
self._gpu_fractions[i] = (idx, 1.0, frac_size)
# Mark this GPU as unavailable for future requests
for gpu in self.gpus:
if gpu.index == gpu_idx:
gpu.available = False
return assigned_gpus
return []
def get_best_gpu_for_memory(self, required_memory: int) -> int: def get_best_gpu_for_memory(self, required_memory: int) -> int:
""" """
Return the index of the GPU with the most available memory that meets the requirement. Return the index of the GPU with the most available memory that meets the requirement.
...@@ -469,10 +384,10 @@ class GPUManager: ...@@ -469,10 +384,10 @@ class GPUManager:
def system_resources() -> dict[str, t.Any]: def system_resources() -> dict[str, t.Any]:
""" """
Get available system resources (CPU and GPU). Get available GPU resources
Returns: Returns:
Dictionary of resources with keys 'cpu' and 'nvidia.com/gpu' Dictionary of resources with keys 'nvidia.com/gpu'
""" """
resources = {} resources = {}
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Planner Benchmark Example
This guide shows an example of benchmarking `LocalPlanner` performance with synthetic data. In this example, we focus on 8x H100 SXM GPU and `deepseek-ai/DeepSeek-R1-Distill-Llama-8B` model with TP1 prefill and decode engine.
## Synthetic Data Generation
We first generate synthetic data with varying request rate from 0.75 to 3 using the provided `generate_synthetic_data.py` script.
```bash
python sin_synth.py \
--time-duration 600 \
--request-rate-min 5 \
--request-rate-max 20 \
--request-rate-period 150 \
--isl1 3000 \
--osl1 150 \
--isl2 3000 \
--osl2 150
```
This will generate a [mooncake style trace](https://github.com/kvcache-ai/Mooncake) with
* duration = 600 seconds
* isl/osl = 3000/150
* request rate varies sinusoidally from 0.75 to 3 requests with a period of 150 seconds
For other models and GPU SKUs, adjust the request rate ranges accordingly to match the load.
## Run the Benchmark
To measure the performance of dynamo with planner, we start from a 1p1d deployment and set planner to make adjustments every 10 seconds:
```bash
cd examples/llm
dynamo serve graphs.disagg:Frontend -f <path to disagg_1p1d.yml in this folder> --enable-local-planner
# in terminal 2
python components/planner.py \
--metric-pulling-interval 1 \
--adjustment-interval 10 \
--prefill-queue-scale-down-threshold 0.2 \
--prefill-queue-scale-up-threshold 10 \
--decode-kv-scale-down-threshold 0.3 \
--decode-kv-scale-up-threshold 0.6 \
--log-dir log/planner
# in terminal 3
genai-perf profile \
--tokenizer deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
-m deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--service-kind openai \
--endpoint-type chat \
--url http://localhost:8000 \
--streaming \
--input-file payload:sin_b512_t600_rr5.0-20.0-150.0_io3000150-3000150-0.2-0.8-10.jsonl
```
To view the performance metrics and planner decisions, launch tensorboard with
```bash
tensorboard --logdir log
```
and open `http://localhost:6006` in your browser. The following metrics are available:
* `average_kv_load`: the average KV load in decode workers
* `prefill_queue_size`: the size of the prefill queue
* `num_queued_request`: the number of requests queued in decode workers
* `num_prefill_workers`: the number of prefill workers
* `num_decode_workers`: the number of decode workers
* `num_gpu`: the total number of GPUs used
The benchmark results will be printed out in terminal 3 that runs the `genai-perf` command.
In this example, we use a fixed 2p2d engine as baseline. Planner provides a `--no-operation` flag to watch and log the metrics without making any adjustments:
```bash
# in terminal 1
dynamo serve --enable-local-planner graphs.disagg:Frontend -f disagg_2p2d.yml
# in terminal 2 (optional)
python components/planner.py --no-operation --log-dir log/2p2d
# in terminal 3
genai-perf profile --tokenizer deepseek-ai/DeepSeek-R1-Distill-Llama-8B -m deepseek-ai/DeepSeek-R1-Distill-Llama-8B --service-kind openai --endpoint-type chat --url http://localhost:8000 --streaming --input-file payload:sin_b512_t600_rr5.0-20.0-150.0_io3000150-3000150-0.2-0.8-10.jsonl
```
## Results
The below two figures show the performance comparison between planner and the baseline 2p2d deployment. Planner achieves 1.5x speedup while using 7.4% less GPU resources.
![Planner Performance Comparison](./images/planner_perf.png)
![Planner Tensorboard](./images/planner_tensorboard.png)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.Processor.chat/completions
port: 8000
Processor:
router: kv-load
common-configs: [model, block-size]
VllmWorker:
remote-prefill: true
conditional-disagg: false
ServiceArgs:
workers: 1
resources:
gpu: 1
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
ServiceArgs:
workers: 1
resources:
gpu: 1
common-configs: [model, block-size, max-model-len, kv-transfer-config]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment