Unverified Commit 69dcba7b authored by kYLe's avatar kYLe Committed by GitHub
Browse files

feat: Add Hello World Multinode example (#624)

parent b4f23a13
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Hello World MultiNode Example
## Overview
This example demonstrates how to deploy workers into multinodes and route requests to different workers.
Pipeline Architecture:
```
Users/Clients (HTTP)
┌─────────────────────┐
│ Frontend (node 1) │ HTTP API endpoint (/generate)
└─────────────────────┘
│ dynamo/runtime
┌─────────────────────┐
│ Processor (node 1) │ ─────────────────
└─────────────────────┘ routing │
│ dynamo/runtime │ dynamo/runtime
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ Worker_1 (node 2) │ │ Worker_2 (node 3) │
└─────────────────────┘ └─────────────────────┘
```
## Component Descriptions
### Frontend Service
- Serves as the entry point for external HTTP requests
- Exposes a `/generate` HTTP API endpoint that clients can call
- Processes incoming text and passes it to the Middle service
### Processor Service
- Acts as an intermediary service in the pipeline
- Deployed on the same node as Frontend and receives requests from the Frontend
- Calls multiple workers based on the routing mode, random or round-robin.
### Worker Service
- Functions as the final service in the pipeline
- Deployed on a different node from Frontend and Processor
- Appends "GeneratedBy_HostName" to the text and yields tokens
## Prerequisites
Start required services (etcd and NATS) using [Docker Compose](../../../deploy/docker-compose.yml)
```bash
docker compose -f deploy/docker-compose.yml up -d
```
## Running the Single Worker Example
In this example, we will use two nodes to demo the multinode serving.
- Node 1
- Runs NATS and etcd services
- Deploys Frontend and Processor
- Node 2
- Deploys Worker
1. Set environment variables for NATS and etcd services
```bash
export NATS_SERVER="nats://Node_1_IP_ADDRESS:4222"
export ETCD_ENDPOINTS="http://Node_1_IP_ADDRESS:2379"
```
2. Launch Frontend and Processor services:
```bash
cd dynamo/examples/hello_world/multinode_example
dynamo serve components.graph:Frontend -f configs/one_worker.yaml
```
The `dynamo serve` command deploys the entire service graph, automatically handling the dependencies between Frontend, and Processor services. Since no worker is deployed yet, the service remains idle.
![text](./_img/waiting1worker.png)
3. Go to node 2 and launch Worker service
```bash
export NATS_SERVER="nats://Node_1_IP_ADDRESS:4222"
export ETCD_ENDPOINTS="http://Node_1_IP_ADDRESS:2379"
cd dynamo/examples/hello_world/multinode_example
dynamo serve components.worker:DummyWorker
```
You should see the worker is ready from node 1's terminal.
![text](./_img/1workerready.png)
4. Go back to node 1 and send request to frontend using curl:
```bash
curl -X 'POST' \
'http://localhost:8000/generate' \
-H 'accept: text/event-stream' \
-H 'Content-Type: application/json' \
-d '{
"prompt": "test prompt",
"request_id": "id_number"
}'
```
5. You should be able to see response as below:
`Response: {"worker_output":"test prompt_ProcessedBy_NODE1HOSTNAME_GeneratedBy_NODE2HOSTNAME","request_id":"id_number"}`
Here `NODE1HOSTNAME` is the hostname for node 1, and `NODE2HOSTNAME` is the hostname for node 2.
## Running the Two Workers Example
In this example, we will use three nodes to demo the multinode serving.
- Node 1
- Runs NATS and etcd services
- Deploys Frontend and Processor
- Node 2
- Deploys Worker 1
- Node 3
- Deploys Worker 2
1. Launch Frontend and Processor services using the `multi_worker.yaml` config from node 1. In this config file, we require 2 workers and set the router mode as **round robin**
```bash
dynamo serve components.graph:Frontend -f configs/multi_worker.yaml
```
The service is waiting for 2 workers this time.
2. Go to node 2 and node 3, launch worker service separately
```bash
export NATS_SERVER="nats://Node_1_IP_ADDRESS:4222"
export ETCD_ENDPOINTS="http://Node_1_IP_ADDRESS:2379"
dynamo serve components.worker:DummyWorker
```
You should see the following messages from node 1's terminal window when both workers are deployed
![text](./_img/2workerready.png)
3. Query the frontend using the same query as before, and run it multiple times. You should see following two responses in turn because of round-robin routing mode between 2 workers.
Response from worker 1: `Response: {"worker_output":"test prompt_ProcessedBy_NODE1HOSTNAME_GeneratedBy_NODE2HOSTNAME","request_id":"id_number"}`
Response from worker 2: `Response: {"worker_output":"test prompt_ProcessedBy_NODE1HOSTNAME_GeneratedBy_NODE3HOSTNAME","request_id":"id_number"}`
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from components.processor import Processor
from components.utils import GeneralRequest
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from dynamo.sdk import DYNAMO_IMAGE, depends, dynamo_endpoint, service
logger = logging.getLogger(__name__)
app = FastAPI(title="Hello World!")
@service(
dynamo={
"enabled": True,
"namespace": "dynamo-demo",
},
image=DYNAMO_IMAGE,
app=app,
)
class Frontend:
processor = depends(Processor)
@dynamo_endpoint(is_api=True)
async def generate(self, request: GeneralRequest): # from request body keys
"""Stream results from the pipeline."""
logger.info(f"-Frontend layer received: {request=}")
async def content_generator():
async for response in self.processor.generate(request.model_dump_json()):
yield f"Frontend: {response}"
return StreamingResponse(content_generator())
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from components.frontend import Frontend
from components.processor import Processor
Frontend.link(Processor)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import socket
from typing import Protocol
from components.utils import GeneralRequest, GeneralResponse, check_required_workers
from components.worker import DummyWorker
from dynamo._core import Client
from dynamo.sdk import (
DYNAMO_IMAGE,
async_on_start,
depends,
dynamo_context,
dynamo_endpoint,
service,
)
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.dependency import DynamoClient
logger = logging.getLogger(__name__)
@service(
dynamo={
"enabled": True,
"namespace": "dynamo-demo",
},
image=DYNAMO_IMAGE,
)
class Processor(Protocol):
"""
Pre and Post Processing
"""
worker: DynamoClient = depends(DummyWorker)
router: str
hostname: str
min_workers: int
worker_client: Client
def __init__(self):
config = ServiceConfig.get_instance()
processor_config = config.get("Processor", {})
self.hostname = socket.gethostname()
self.min_workers = processor_config.get("min_worker", 1)
self.router = processor_config.get("router", "round-robin")
@async_on_start
async def async_init(self):
runtime = dynamo_context["runtime"]
comp_ns, comp_name = DummyWorker.dynamo_address() # type: ignore
self.worker_client = (
await runtime.namespace(comp_ns)
.component(comp_name)
.endpoint("generate")
.client()
)
await check_required_workers(
self.worker_client, self.min_workers, tag="processor"
)
logger.info(f"----workers are all ready {self.worker_client.endpoint_ids()}")
async def _generate(
self,
raw_request: GeneralRequest,
):
raw_request.prompt = raw_request.prompt + "_ProcessedBy_" + self.hostname
if self.router == "random":
engine_generator = await self.worker_client.random(
raw_request.model_dump_json()
)
elif self.router == "round-robin":
engine_generator = await self.worker_client.round_robin(
raw_request.model_dump_json()
)
async for resp in engine_generator:
yield GeneralResponse.model_validate_json(resp.data())
@dynamo_endpoint()
async def generate(self, request: GeneralRequest):
"""Forward requests to backend."""
mid_request = request.model_dump_json()
logger.info(f"Received request{mid_request=}")
async for response in self._generate(request):
logger.debug(f"Received response: {response.model_dump_json()}")
yield response.model_dump_json()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
from pydantic import BaseModel
from dynamo._core import Client
logger = logging.getLogger(__name__)
class GeneralRequest(BaseModel):
prompt: str = "user input"
request_id: str = "id_string"
class GeneralResponse(BaseModel):
worker_output: str = "generated output"
request_id: str = "id_string"
async def check_required_workers(
workers_client: Client,
required_workers: int,
on_change=True,
poll_interval=5,
tag="",
):
"""Wait until the minimum number of workers are ready."""
worker_ids = workers_client.endpoint_ids()
num_workers = len(worker_ids)
new_count = -1 # Force to log "waiting for worker" once
while num_workers < required_workers:
if (not on_change) or new_count != num_workers:
num_workers = new_count if new_count >= 0 else num_workers
logger.info(
f" {tag} Waiting for more workers to be ready.\n"
f" Current: {num_workers},"
f" Required: {required_workers}"
)
await asyncio.sleep(poll_interval)
worker_ids = workers_client.endpoint_ids()
new_count = len(worker_ids)
return worker_ids
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import socket
from components.utils import GeneralRequest, GeneralResponse
from dynamo.sdk import DYNAMO_IMAGE, dynamo_endpoint, service
logger = logging.getLogger(__name__)
@service(
dynamo={
"enabled": True,
"namespace": "dynamo-demo",
},
image=DYNAMO_IMAGE,
resources={"cpu": "10", "memory": "20Gi"},
workers=1,
)
class DummyWorker:
def __init__(self):
self.hostname = socket.gethostname()
@dynamo_endpoint()
async def generate(self, request: GeneralRequest):
logger.info(f"{self.hostname}: Worker invoked")
yield GeneralResponse(
request_id=request.request_id,
worker_output=request.prompt + "_GeneratedBy_" + self.hostname,
).model_dump_json()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Processor:
min_worker: 2
router: round-robin
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Processor:
min_worker: 1
router: random
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment