Commit afbf92fc authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: cli args to override service configs and small misc cleanups (#166)

parent 7f136e29
# Dynamo SDK
Dynamo is a python based SDK for building and deploying distributed inference applications. Dynamo leverages concepts from open source projects like [BentoML](https://github.com/bentoml/bentoml) to provide a developer friendly experience to go from local development to K8s deployment.
## Installation
```bash
pip install ai-dynamo-sdk
```
## Quickstart
Lets build a simple distributed pipeline with 3 components: `Frontend`, `Middle` and `Backend`. The structure of the pipeline looks like this:
```
Users/Clients (HTTP)
┌─────────────┐
│ Frontend │ HTTP API endpoint (/generate)
└─────────────┘
┌─────────────┐
│ Middle │
└─────────────┘
┌─────────────┐
│ Backend │
└─────────────┘
```
The code for the pipeline looks like this:
```python
# filename: pipeline.py
from dynamo.sdk import service, dynamo_endpoint, depends, api
from pydantic import BaseModel
class RequestType(BaseModel):
text: str
@service(resources={"cpu": "1"})
class Frontend:
middle = depends(Middle)
@api
async def generate(self, text: str):
request = RequestType(text=text)
async for response in self.middle.generate(request.model_dump_json()):
yield f"Frontend: {response}"
@service(
resources={"cpu": "1"},
dynamo={"enabled": True, "namespace": "inference"}
)
class Middle:
backend = depends(Backend)
@dynamo_endpoint()
async def generate(self, req: RequestType):
text = f"{req.text}-mid"
for token in text.split():
yield f"Mid: {token}"
@service(
resources={"cpu": "1"},
dynamo={"enabled": True, "namespace": "inference"}
)
class Backend:
@dynamo_endpoint()
async def generate(self, req: RequestType):
text = f"{req.text}-back"
for token in text.split():
yield f"Backend: {token}"
```
You can run this pipeline locally by spinning up ETCD and NATS and then running the pipeline:
```bash
# Spin up ETCD and NATS
docker compose -f deploy/docker-compose.yml up -d
```
then
```bash
# Run the pipeline
dynamo serve pipeline:Frontend
```
Once it's up and running, you can make a request to the pipeline using
```bash
curl -X POST http://localhost:3000/generate \
-H "Content-Type: application/json" \
-d '{"text": "federer"}'
```
You should see the following output:
```bash
federer-mid-back
```
...@@ -66,14 +66,16 @@ def deprecated_option(*param_decls: str, **attrs: t.Any): ...@@ -66,14 +66,16 @@ def deprecated_option(*param_decls: str, **attrs: t.Any):
def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, t.Any]: def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, t.Any]:
"""Parse a single CLI argument into service name, key, and value.""" """Parse a single CLI argument into service name, key, and value."""
service, key = arg_name.split(".", 1) parts = arg_name.split(".")
service = parts[0]
# Handle nested keys (e.g., ServiceArgs.workers or ServiceArgs.envs.CUDA_VISIBLE_DEVICES)
nested_keys = parts[1:]
# Parse value based on type # Parse value based on type
try: try:
# Try as JSON for complex types
value = json.loads(arg_value) value = json.loads(arg_value)
except json.JSONDecodeError: except json.JSONDecodeError:
# Handle basic types
if arg_value.isdigit(): if arg_value.isdigit():
value = int(arg_value) value = int(arg_value)
elif arg_value.replace(".", "", 1).isdigit() and arg_value.count(".") <= 1: elif arg_value.replace(".", "", 1).isdigit() and arg_value.count(".") <= 1:
...@@ -83,7 +85,12 @@ def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, t.Any]: ...@@ -83,7 +85,12 @@ def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, t.Any]:
else: else:
value = arg_value value = arg_value
return service, key, value # Build nested dict structure
result = value
for key in reversed(nested_keys[1:]):
result = {key: result}
return service, nested_keys[0], result
def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]: def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]:
...@@ -91,13 +98,30 @@ def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]: ...@@ -91,13 +98,30 @@ def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]:
dict dict
) )
def deep_update(d: dict, key: str, value: t.Any):
"""
Recursively updates nested dictionaries. We use this to process arguments like
---Worker.ServiceArgs.env.CUDA_VISIBLE_DEVICES="0,1"
The _parse_service_arg function will parse this into:
service = "Worker"
nested_keys = ["ServiceArgs", "envs", "CUDA_VISIBLE_DEVICES"]
And returns returns: ("VllmWorker", "ServiceArgs", {"envs": {"CUDA_VISIBLE_DEVICES": "0,1"}})
We then use deep_update to update the service_configs dictionary with this nested value.
"""
if isinstance(value, dict) and key in d and isinstance(d[key], dict):
for k, v in value.items():
deep_update(d[key], k, v)
else:
d[key] = value
index = 0 index = 0
while index < len(args): while index < len(args):
next_arg = args[index] next_arg = args[index]
arg_name = ""
arg_value = ""
if not (next_arg.startswith("--") or "." not in next_arg): if not (next_arg.startswith("--") or "." not in next_arg):
continue continue
try: try:
...@@ -115,9 +139,8 @@ def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]: ...@@ -115,9 +139,8 @@ def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]:
if arg_value.startswith("-"): if arg_value.startswith("-"):
raise ValueError("Service arg value can not start with -") raise ValueError("Service arg value can not start with -")
arg_name = arg_name[2:] arg_name = arg_name[2:]
parsed = _parse_service_arg(arg_name, arg_value) service, key, value = _parse_service_arg(arg_name, arg_value)
service, key, value = parsed deep_update(service_configs[service], key, value)
service_configs[service][key] = value
except Exception: except Exception:
raise ValueError(f"Error parsing service arg: {args[index]}") raise ValueError(f"Error parsing service arg: {args[index]}")
...@@ -280,11 +303,18 @@ def build_serve_command() -> click.Group: ...@@ -280,11 +303,18 @@ def build_serve_command() -> click.Group:
show_default=True, show_default=True,
hidden=True, hidden=True,
) )
@click.option(
"--dry-run",
is_flag=True,
help="Print the final service configuration and exit without starting the server",
default=False,
)
@click.pass_context @click.pass_context
@env_manager @env_manager
def serve( def serve(
ctx: click.Context, ctx: click.Context,
bento: str, bento: str,
dry_run: bool,
development: bool, development: bool,
port: int, port: int,
host: str, host: str,
...@@ -360,8 +390,6 @@ def build_serve_command() -> click.Group: ...@@ -360,8 +390,6 @@ def build_serve_command() -> click.Group:
service_configs[service] = {} service_configs[service] = {}
service_configs[service][key] = value service_configs[service][key] = value
# Process Overrides
# Process service-specific options # Process service-specific options
cmdline_overrides: t.Dict[str, t.Any] = _parse_service_args(ctx.args) cmdline_overrides: t.Dict[str, t.Any] = _parse_service_args(ctx.args)
for service, configs in cmdline_overrides.items(): for service, configs in cmdline_overrides.items():
...@@ -370,6 +398,13 @@ def build_serve_command() -> click.Group: ...@@ -370,6 +398,13 @@ def build_serve_command() -> click.Group:
service_configs[service] = {} service_configs[service] = {}
service_configs[service][key] = value service_configs[service][key] = value
if dry_run:
rich.print("[bold]Service Configuration:[/bold]")
rich.print(json.dumps(service_configs, indent=2))
rich.print("\n[bold]Environment Variable that would be set:[/bold]")
rich.print(f"DYNAMO_SERVICE_CONFIG={json.dumps(service_configs)}")
sys.exit(0)
# Set environment variable with service configuration # Set environment variable with service configuration
if service_configs: if service_configs:
os.environ["DYNAMO_SERVICE_CONFIG"] = json.dumps(service_configs) os.environ["DYNAMO_SERVICE_CONFIG"] = json.dumps(service_configs)
......
...@@ -13,22 +13,35 @@ ...@@ -13,22 +13,35 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import pytest
# linking syntax example from dynamo.sdk.lib.service import LinkedServices
pytestmark = pytest.mark.pre_merge
from dynamo.sdk.tests.pipeline import Backend, Frontend, Middle
# print("INITIAL DEPENDENCIES") def test_remove_backend2():
# print("Frontend dependencies", Frontend.dependencies) from dynamo.sdk.tests.pipeline import Backend, Backend2, Frontend, Middle
# print("Middle dependencies", Middle.dependencies)
# print("Backend dependencies", Backend.dependencies)
# print("\n\n\n") # Initial state assertions
assert set(Frontend.dependencies.keys()) == {"backend", "middle"}
assert Frontend.dependencies["backend"].on == Backend
assert Frontend.dependencies["middle"].on == Middle
print() assert set(Middle.dependencies.keys()) == {"backend", "backend2"}
Frontend.link(Middle).build() assert Middle.dependencies["backend"].on == Backend
assert Middle.dependencies["backend2"].on == Backend2
print("Frontend dependencies", Frontend.dependencies) assert Backend.dependencies == {}
print("Middle dependencies", Middle.dependencies)
print("Backend dependencies", Backend.dependencies) Frontend.link(Middle).link(Backend)
LinkedServices.remove_unused_edges()
# Final state assertions after linking and cleanup
assert set(Frontend.dependencies.keys()) == {"middle"}
assert Frontend.dependencies["middle"].on == Middle
assert set(Middle.dependencies.keys()) == {"backend"}
assert Middle.dependencies["backend"].on == Backend
assert Backend.dependencies == {}
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import subprocess import subprocess
from components.processor import Processor from components.processor import Processor
...@@ -45,7 +44,6 @@ class Frontend: ...@@ -45,7 +44,6 @@ class Frontend:
config = ServiceConfig.get_instance() config = ServiceConfig.get_instance()
frontend_config = FrontendConfig(**config.get("Frontend", {})) frontend_config = FrontendConfig(**config.get("Frontend", {}))
os.environ["TRT_LOG"] = "DEBUG"
subprocess.run( subprocess.run(
["llmctl", "http", "remove", "chat-models", frontend_config.model] ["llmctl", "http", "remove", "chat-models", frontend_config.model]
) )
......
...@@ -20,4 +20,7 @@ Frontend: ...@@ -20,4 +20,7 @@ Frontend:
VllmWorkerRouterLess: VllmWorkerRouterLess:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
enforce-eager: true enforce-eager: true
\ No newline at end of file max-model-len: 16384
max-num-batched-tokens: 16384
block-size: 64
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment