Commit 9f0181a8 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: add local gpu allocation (#232)


Co-authored-by: default avatarMeenakshi Sharma <163925564+nvda-mesharma@users.noreply.github.com>
parent 548578f4
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import os
import warnings
from typing import Any
from _bentoml_sdk import Service
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.resource import system_resources
from bentoml.exceptions import BentoMLConfigException
from simple_di import Provide, inject
NVIDIA_GPU = "nvidia.com/gpu"
DISABLE_GPU_ALLOCATION_ENV = "DYNAMO_DISABLE_GPU_ALLOCATION"
DYNAMO_DEPLOYMENT_ENV = "DYNAMO_DEPLOYMENT_ENV"
class ResourceAllocator:
def __init__(self) -> None:
self.system_resources = system_resources()
self.remaining_gpus = len(self.system_resources[NVIDIA_GPU])
self._available_gpus: list[tuple[float, float]] = [
(1.0, 1.0) # each item is (remaining, unit)
for _ in range(self.remaining_gpus)
]
def assign_gpus(self, count: float) -> list[int]:
if count > self.remaining_gpus:
warnings.warn(
f"Requested {count} GPUs, but only {self.remaining_gpus} are remaining. "
f"Serving may fail due to inadequate GPUs. Set {DISABLE_GPU_ALLOCATION_ENV}=1 "
"to disable automatic allocation and allocate GPUs manually.",
ResourceWarning,
stacklevel=3,
)
self.remaining_gpus = int(max(0, self.remaining_gpus - count))
if count < 1: # a fractional GPU
try:
# try to find the GPU used with the same fragment
gpu = next(
i
for i, v in enumerate(self._available_gpus)
if v[0] > 0 and v[1] == count
)
except StopIteration:
try:
gpu = next(
i for i, v in enumerate(self._available_gpus) if v[0] == 1.0
)
except StopIteration:
gpu = len(self._available_gpus)
self._available_gpus.append((1.0, count))
remaining, _ = self._available_gpus[gpu]
if (remaining := remaining - count) < count:
# can't assign to the next one, mark it as zero.
self._available_gpus[gpu] = (0.0, count)
else:
self._available_gpus[gpu] = (remaining, count)
return [gpu]
else: # allocate n GPUs, n is a positive integer
if int(count) != count:
raise BentoMLConfigException(
"Float GPUs larger than 1 is not supported"
)
count = int(count)
unassigned = [
gpu
for gpu, value in enumerate(self._available_gpus)
if value[0] > 0 and value[1] == 1.0
]
if len(unassigned) < count:
warnings.warn(
f"Not enough GPUs to be assigned, {count} is requested",
ResourceWarning,
)
for _ in range(count - len(unassigned)):
unassigned.append(len(self._available_gpus))
self._available_gpus.append((1.0, 1.0))
for gpu in unassigned[:count]:
self._available_gpus[gpu] = (0.0, 1.0)
return unassigned[:count]
@inject
def get_worker_env(
self,
service: Service[Any],
services: dict[str, Any] = Provide[BentoMLContainer.config.services],
) -> tuple[int, list[dict[str, str]]]:
config = services[service.name]
num_gpus = 0
num_workers = 1
worker_env: list[dict[str, str]] = []
if "gpu" in (config.get("resources") or {}):
num_gpus = config["resources"]["gpu"] # type: ignore
if config.get("workers"):
if (workers := config["workers"]) == "cpu_count":
num_workers = int(self.system_resources["cpu"])
# don't assign gpus to workers
return num_workers, worker_env
else: # workers is a number
num_workers = workers
if num_gpus and DISABLE_GPU_ALLOCATION_ENV not in os.environ:
if os.environ.get(DYNAMO_DEPLOYMENT_ENV):
# K8s replicas: Assumes DYNAMO_DEPLOYMENT_ENV is set
# each pod in replicaset will have separate GPU with same CUDA_VISIBLE_DEVICES
assigned = self.assign_gpus(num_gpus)
worker_env = [
{"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))}
for _ in range(num_workers)
]
else:
# local deployment where we split all available GPUs across workers
for _ in range(num_workers):
assigned = self.assign_gpus(num_gpus)
worker_env.append(
{"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))}
)
return num_workers, worker_env
......@@ -36,8 +36,7 @@ from circus.sockets import CircusSocket
from circus.watcher import Watcher
from simple_di import Provide, inject
if t.TYPE_CHECKING:
from _bentoml_impl.server.allocator import ResourceAllocator
from .allocator import ResourceAllocator
# Define a Protocol for services to ensure type safety
......@@ -271,7 +270,6 @@ def serve_http(
threaded: bool = False,
) -> Server:
from _bentoml_impl.loader import import_service, normalize_identifier
from _bentoml_impl.server.allocator import ResourceAllocator
from bentoml._internal.log import SERVER_LOGGING_CONFIG
from bentoml._internal.utils import reserve_free_port
from bentoml._internal.utils.analytics.usage_stats import track_serve
......@@ -285,6 +283,8 @@ def serve_http(
)
from circus.sockets import CircusSocket
from .allocator import ResourceAllocator
bento_id: str = ""
env = {"PROMETHEUS_MULTIPROC_DIR": ensure_prometheus_dir()}
if isinstance(bento_identifier, Service):
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
from components.processor import Processor
from components.routerless.worker import VllmWorkerRouterLess
from components.worker import VllmWorker
from pydantic import BaseModel
from dynamo.sdk import depends, service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
class FrontendConfig(BaseModel):
model: str
endpoint: str
port: int = 8080
@service(
resources={"cpu": "10", "memory": "20Gi"},
workers=1,
image=DYNAMO_IMAGE,
)
# todo this should be called ApiServer
class Frontend:
worker = depends(VllmWorker)
worker_routerless = depends(VllmWorkerRouterLess)
processor = depends(Processor)
def __init__(self):
config = ServiceConfig.get_instance()
frontend_config = FrontendConfig(**config.get("Frontend", {}))
subprocess.run(
["llmctl", "http", "remove", "chat-models", frontend_config.model]
)
subprocess.run(
[
"llmctl",
"http",
"add",
"chat-models",
frontend_config.model,
frontend_config.endpoint,
]
)
print("Starting HTTP server")
subprocess.run(
["http", "-p", str(frontend_config.port)], stdout=None, stderr=None
)
......@@ -14,17 +14,28 @@
# limitations under the License.
import subprocess
from pathlib import Path
from components.processor import Processor
from components.routerless.worker import VllmWorkerRouterLess
from components.worker import VllmWorker
from pydantic import BaseModel
from dynamo import sdk
from dynamo.sdk import depends, service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
def get_http_binary_path():
sdk_path = Path(sdk.__file__)
binary_path = sdk_path.parent / "cli/bin/http"
if not binary_path.exists():
return "http"
else:
return str(binary_path)
class FrontendConfig(BaseModel):
model: str
endpoint: str
......@@ -61,8 +72,9 @@ class Frontend:
)
print("Starting HTTP server")
http_binary = get_http_binary_path()
process = subprocess.Popen(
["http", "-p", str(frontend_config.port)], stdout=None, stderr=None
[http_binary, "-p", str(frontend_config.port)], stdout=None, stderr=None
)
try:
process.wait()
......
......@@ -126,7 +126,6 @@ class Router:
# pull metrics for each worker
max_waiting = 0.0
if metrics:
print("[ROUTER] metrics.endpoint ", metrics.endpoints)
for endpoint in metrics.endpoints:
worker_id = endpoint.worker_id
worker_metrics[worker_id] = {
......
......@@ -27,13 +27,7 @@ from vllm.entrypoints.openai.api_server import (
from vllm.inputs.data import TokensPrompt
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from dynamo.sdk import (
async_on_start,
dynamo_context,
dynamo_endpoint,
server_context,
service,
)
from dynamo.sdk import async_on_start, dynamo_context, dynamo_endpoint, service
class RequestType(BaseModel):
......@@ -52,12 +46,6 @@ class PrefillWorker:
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
gpu_idx = (
self.engine_args.cuda_visible_device_offset
+ server_context.worker_index
- 1
)
os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_idx}"
self._loaded_metadata = set()
self.initialized = False
if self.engine_args.enable_chunked_prefill is not False:
......
......@@ -84,7 +84,6 @@ class Processor(ProcessMixIn):
async def async_init(self):
runtime = dynamo_context["runtime"]
comp_ns, comp_name = VllmWorker.dynamo_address() # type: ignore
print(f"[Processor] comp_ns: {comp_ns}, comp_name: {comp_name}")
self.worker_client = (
await runtime.namespace(comp_ns)
.component(comp_name)
......
......@@ -14,8 +14,6 @@
# limitations under the License.
import os
import msgspec
from utils.nixl import NixlMetadataStore
from utils.vllm import parse_vllm_args
......@@ -25,13 +23,7 @@ from vllm.entrypoints.openai.api_server import (
from vllm.inputs.data import TokensPrompt
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from dynamo.sdk import (
async_on_start,
dynamo_context,
dynamo_endpoint,
server_context,
service,
)
from dynamo.sdk import async_on_start, dynamo_context, dynamo_endpoint, service
@service(
......@@ -46,12 +38,6 @@ class PrefillWorkerRouterLess:
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
gpu_idx = (
self.engine_args.cuda_visible_device_offset
+ server_context.worker_index
- 1
)
os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_idx}"
self._loaded_metadata = set()
self.initialized = False
if self.engine_args.enable_chunked_prefill is not False:
......
......@@ -39,6 +39,4 @@ PrefillWorker:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
max-model-len: 16384
# dynamo arg for local deployment
cuda-visible-device-offset: 1
max-num-batched-tokens: 16384
......@@ -48,4 +48,3 @@ PrefillWorker:
block-size: 64
max-model-len: 16384
max-num-batched-tokens: 16384
cuda-visible-device-offset: 1
......@@ -45,12 +45,6 @@ def parse_vllm_args(service_name, prefix) -> AsyncEngineArgs:
default=1000,
help="Maximum length of local prefill",
)
parser.add_argument(
"--cuda-visible-device-offset",
type=int,
default=0,
help="Offset of CUDA_VISIBLE_DEVICE",
)
parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args(vllm_args)
engine_args = AsyncEngineArgs.from_cli_args(args)
......@@ -58,5 +52,4 @@ def parse_vllm_args(service_name, prefix) -> AsyncEngineArgs:
engine_args.remote_prefill = args.remote_prefill
engine_args.conditional_disagg = args.conditional_disagg
engine_args.max_local_prefill_length = args.max_local_prefill_length
engine_args.cuda_visible_device_offset = args.cuda_visible_device_offset
return engine_args
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment