Commit 5fe1f8a2 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat(serve): Enhance multi-node deployment and worker configuration (#457)


Co-authored-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 01bad195
......@@ -96,7 +96,7 @@ class ResourceAllocator:
return unassigned[:count]
@inject
def get_worker_env(
def get_resource_envs(
self,
service: Service[Any],
services: dict[str, Any] = Provide[BentoMLContainer.config.services],
......@@ -105,14 +105,14 @@ class ResourceAllocator:
num_gpus = 0
num_workers = 1
worker_env: list[dict[str, str]] = []
resource_envs: list[dict[str, str]] = []
if "gpu" in (config.get("resources") or {}):
num_gpus = config["resources"]["gpu"] # type: ignore
if config.get("workers"):
if (workers := config["workers"]) == "cpu_count":
num_workers = int(self.system_resources["cpu"])
# don't assign gpus to workers
return num_workers, worker_env
return num_workers, resource_envs
else: # workers is a number
num_workers = workers
if num_gpus and DISABLE_GPU_ALLOCATION_ENV not in os.environ:
......@@ -120,7 +120,7 @@ class ResourceAllocator:
# K8s replicas: Assumes DYNAMO_DEPLOYMENT_ENV is set
# each pod in replicaset will have separate GPU with same CUDA_VISIBLE_DEVICES
assigned = self.assign_gpus(num_gpus)
worker_env = [
resource_envs = [
{"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))}
for _ in range(num_workers)
]
......@@ -128,7 +128,7 @@ class ResourceAllocator:
# local deployment where we split all available GPUs across workers
for _ in range(num_workers):
assigned = self.assign_gpus(num_gpus)
worker_env.append(
resource_envs.append(
{"CUDA_VISIBLE_DEVICES": ",".join(map(str, assigned))}
)
return num_workers, worker_env
return num_workers, resource_envs
......@@ -26,6 +26,7 @@ import typing as t
from typing import Any
import click
import uvloop
from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.sdk import dynamo_context
......@@ -186,6 +187,7 @@ def main(
logger.error(f"[{run_id}] Error in Dynamo component setup: {str(e)}")
raise
uvloop.install()
asyncio.run(worker())
......
......@@ -127,7 +127,7 @@ def create_dependency_watcher(
) -> tuple[Watcher, CircusSocket, str]:
from bentoml.serving import create_watcher
num_workers, worker_envs = scheduler.get_worker_env(svc)
num_workers, resource_envs = scheduler.get_resource_envs(svc)
uri, socket = _get_server_socket(svc, uds_path, port_stack, backlog)
args = [
"-m",
......@@ -141,8 +141,8 @@ def create_dependency_watcher(
"$(CIRCUS.WID)",
]
if worker_envs:
args.extend(["--worker-env", json.dumps(worker_envs)])
if resource_envs:
args.extend(["--worker-env", json.dumps(resource_envs)])
watcher = create_watcher(
name=f"service_{svc.name}",
......@@ -171,7 +171,7 @@ def create_dynamo_watcher(
uri, socket = _get_server_socket(svc, uds_path, port_stack, backlog)
# Get worker configuration
num_workers, worker_envs = scheduler.get_worker_env(svc)
num_workers, resource_envs = scheduler.get_resource_envs(svc)
# Create Dynamo-specific worker args
args = [
......@@ -184,8 +184,8 @@ def create_dynamo_watcher(
"$(CIRCUS.WID)",
]
if worker_envs:
args.extend(["--worker-env", json.dumps(worker_envs)])
if resource_envs:
args.extend(["--worker-env", json.dumps(resource_envs)])
# Update env to include ServiceConfig and service-specific environment variables
worker_env = env.copy() if env else {}
......@@ -315,7 +315,7 @@ def serve_http(
if service_name and service_name != svc.name:
svc = svc.find_dependent_by_name(service_name)
num_workers, worker_envs = allocator.get_worker_env(svc)
num_workers, resource_envs = allocator.get_resource_envs(svc)
server_on_deployment(svc)
uds_path = tempfile.mkdtemp(prefix="bentoml-uds-")
try:
......@@ -419,8 +419,8 @@ def serve_http(
*timeouts_args,
*timeout_args,
]
if worker_envs:
server_args.extend(["--worker-env", json.dumps(worker_envs)])
if resource_envs:
server_args.extend(["--worker-env", json.dumps(resource_envs)])
if development_mode:
server_args.append("--development-mode")
......@@ -438,13 +438,37 @@ def serve_http(
"--worker-id",
"$(CIRCUS.WID)",
]
# resource_envs is the resource allocation (ie CUDA_VISIBLE_DEVICES) for each worker created by the allocator
# these resource_envs are passed to each individual worker's environment which is set in serve_dynamo
if resource_envs:
args.extend(["--worker-env", json.dumps(resource_envs)])
# env is the base bentoml environment variables. We make a copy and update it to add any service configurations and additional env vars
worker_env = env.copy() if env else {}
# Pass through the main service config
if "DYNAMO_SERVICE_CONFIG" in os.environ:
worker_env["DYNAMO_SERVICE_CONFIG"] = os.environ[
"DYNAMO_SERVICE_CONFIG"
]
# Get service-specific environment variables from DYNAMO_SERVICE_ENVS
if "DYNAMO_SERVICE_ENVS" in os.environ:
try:
service_envs = json.loads(os.environ["DYNAMO_SERVICE_ENVS"])
if svc.name in service_envs:
service_args = service_envs[svc.name].get("ServiceArgs", {})
if "envs" in service_args:
worker_env.update(service_args["envs"])
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse DYNAMO_SERVICE_ENVS: {e}")
watcher = create_watcher(
name=f"dynamo_service_{svc.name}",
args=args,
numprocesses=num_workers,
working_dir=str(bento_path.absolute()),
close_child_stdin=not development_mode,
env=env, # Dependency map will be injected by serve_http
env=worker_env, # Dependency map will be injected by serve_http
)
watchers.append(watcher)
print(f"dynamo_service_{svc.name} entrypoint created")
......@@ -495,7 +519,7 @@ def serve_http(
arbiter.start(
cb=lambda _: logger.info( # type: ignore
(
"Starting Dynamo Service %s (%s/%s) listening on %s://%s:%d (Press CTRL+C to quit)"
"Starting Dynamo Service %s (Press CTRL+C to quit)"
if (
hasattr(svc, "is_dynamo_component")
and svc.is_dynamo_component()
......@@ -503,7 +527,7 @@ def serve_http(
else "Starting %s (Press CTRL+C to quit)"
),
*(
(svc.name, *svc.dynamo_address(), scheme, log_host, port)
(svc.name,)
if (
hasattr(svc, "is_dynamo_component")
and svc.is_dynamo_component()
......
......@@ -150,6 +150,50 @@ curl localhost:8000/v1/chat/completions -H "Content-Type: application/json"
```
### Multinode Examples
#### Single node sized models
You can deploy our example architectures on multiple nodes via NATS/ETCD based discovery and communication. Here's an example of deploying disaggregated serving on 2 nodes
##### Disaggregated Deployment with KV Routing
Node 1: Frontend, Processor, Router, 8 Decode
Node 2: 8 Prefill
**Step 1**: Start NATS/ETCD on your head node. Ensure you have the correct firewall rules to allow communication between the nodes as you will need the NATS/ETCD endpoints to be accessible by node 2.
```bash
# node 1
docker compose -f deploy/docker-compose.yml up -d
```
**Step 2**: Create the inference graph for this deployment. The easiest way to do this is to remove the `.link(PrefillWorker)` from the `disagg_router.py` file.
```python
# graphs/disag_router.py
# imports...
Frontend.link(Processor).link(Router).link(VllmWorker)
```
**Step 3**: Start the frontend, processor, router, and 8 VllmWorkers on node 1.
```bash
# node 1
cd /workspace/examples/llm
dynamo serve graphs.disagg_router:Frontend -f ./configs/disagg_router.yaml --VllmWorker.ServiceArgs.workers=8
```
**Step 4**: Start 8 PrefillWorkers on node 2.
Since we only want to start the `PrefillWorker` on node 2, you can simply run just the PrefillWorker component directly.
```bash
# node 2
export NATS_SERVER = '<your-nats-server-address>' # note this should start with nats://...
export ETCD_ENDPOINTS = '<your-etcd-endpoints-address>'
cd /workspace/examples/llm
dynamo serve components.prefill_worker:PrefillWorker -f ./configs/disagg_router.yaml --PrefillWorker.ServiceArgs.workers=8
```
You can now use the same curl request from above to interact with your deployment!
### Close deployment
Kill all dynamo processes managed by circusd.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment