Unverified Commit de5ab2c4 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

docs: Python runtime Request Cancellation examples (#2893)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent e42746a1
......@@ -5,8 +5,8 @@ The examples below assume you build the latest image yourself from source. If us
:margin: 0
:padding: 3 4 0 0
.. grid-item-card:: :doc:`Hello World <../examples/runtime/hello_world/README>`
:link: ../examples/runtime/hello_world/README
.. grid-item-card:: :doc:`Hello World <../examples/custom_backend/hello_world/README>`
:link: ../examples/custom_backend/hello_world/README
:link-type: doc
Demonstrates the basic concepts of Dynamo by creating a simple GPU-unaware graph
......
......@@ -45,6 +45,8 @@ The Python `Context` class wraps the Rust `AsyncEngineContext` and exposes the f
- **`stop_generating()`**: Issues a stop generating signal, equivalent to the Rust method
- **`async_killed_or_stopped()`**: An async method that completes when the context becomes either killed or stopped, whichever happens first. This combines the functionality of the Rust `killed()` and `stopped()` async methods using `tokio::select!`.
For a working example of request cancellation, see the [cancellation demo](../../examples/custom_backend/cancellation/README.md).
### Context Usage in Python
The context is available optionally in both incoming and outgoing request scenarios:
......
......@@ -43,7 +43,7 @@
components/backends/sglang/docs/multinode-examples.md
examples/README.md
examples/runtime/hello_world/README.md
examples/custom_backend/hello_world/README.md
architecture/distributed_runtime.md
architecture/dynamo_flow.md
......
# Request Cancellation Demo
This demonstration shows how to implement request cancellation in Dynamo using:
- Client: `context.stop_generating()` to cancel requests
- Middle Server: Forwards requests and passes context through (optional)
- Backend Server: `context.is_stopped()` to check for cancellation
## Architecture
The demo supports two architectures:
**Direct Connection (Default):**
```
Client -> Backend Server
```
**With Middle Server:**
```
Client -> Middle Server -> Backend Server
```
The middle server acts as a proxy that:
1. Receives requests from clients
2. Forwards them to backend servers
3. Passes the original context through for cancellation support
4. Streams responses back to the client
## Usage
### Option 1: Direct Connection (Simple)
1. Start the backend server:
```bash
python3 server.py
```
2. Run the client (connects directly to backend):
```bash
python3 client.py
```
### Option 2: With Middle Server (Proxy)
1. Start the backend server:
```bash
python3 server.py
```
2. Start the middle server:
```bash
python3 middle_server.py
```
3. Run the client (connects through middle server):
```bash
python3 client.py --middle
```
## What happens
### Direct Connection:
1. Backend server generates numbers 0-999 with 0.1 second delays
2. Client receives the first 3 numbers (0, 1, 2) directly from backend
3. Client calls `context.stop_generating()` to cancel
4. Backend server detects cancellation via `context.is_stopped()` and stops
5. Both client and server handle the cancellation gracefully
### With Middle Server:
1. Backend server generates numbers 0-999 with 0.1 second delays
2. Middle server forwards requests and passes context through
3. Client receives the first 3 numbers (0, 1, 2) via the middle server
4. Client calls `context.stop_generating()` to cancel
5. Context cancellation propagates: Client → Middle Server → Backend Server
6. Backend server detects cancellation via `context.is_stopped()` and stops
7. All components handle the cancellation gracefully
## Key Components
- **Backend Server**: Checks `context.is_stopped()` before each yield
- **Middle Server**: Forwards requests and passes context through (when used)
- **Client**: Uses `Context()` object and calls `context.stop_generating()`
- **Graceful shutdown**: All components handle `asyncio.CancelledError`
## Notes
- The client defaults to direct connection for simplicity
- Use `--middle` flag to test the proxy scenario
- Both modes demonstrate the same cancellation behavior
- The middle server shows how to properly forward context in proxy scenarios
For more details on the request cancellation architecture, refer to the [architecture documentation](../../../docs/architecture/request_cancellation.md).
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Simple client demonstration of request cancellation using context.stop_generating()
"""
import asyncio
import sys
from dynamo._core import Context, DistributedRuntime
async def demo_cancellation(client):
"""Perform the generation request with cancellation demonstration"""
# Create context for cancellation control
context = Context()
# Start streaming request
print("Starting streaming request...")
stream = await client.generate("dummy_request", context=context)
iteration_count = 0
async for response in stream:
number = response.data()
print(f"Client: Received {number}")
# Cancel after receiving 3 responses
if iteration_count >= 2:
print("Client: Cancelling after 3 responses...")
context.stop_generating()
break
iteration_count += 1
print("Client: Stream stopped")
async def main():
"""Connect to server and demonstrate cancellation"""
# Parse command line argument
use_middle_server = False # Default to direct connection
if len(sys.argv) > 1:
if sys.argv[1] == "--middle":
use_middle_server = True
else:
print("Usage: python3 client.py [--middle]")
print(" (no flag): Connect directly to backend server (default)")
print(" --middle: Connect through middle server")
return
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, True)
# Connect to middle server or direct server based on argument
if use_middle_server:
endpoint = runtime.namespace("demo").component("middle").endpoint("generate")
print("Client connecting to middle server...")
else:
endpoint = runtime.namespace("demo").component("server").endpoint("generate")
print("Client connecting directly to backend server...")
client = await endpoint.client()
await client.wait_for_instances()
print(
f"Client connected to {'middle server' if use_middle_server else 'backend server'}"
)
# Perform the generation request with cancellation
await demo_cancellation(client)
runtime.shutdown()
if __name__ == "__main__":
asyncio.run(main())
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Middle server demonstration that proxies requests to backend servers
using round_robin() and passes context for cancellation support
"""
import asyncio
from dynamo._core import DistributedRuntime
class MiddleServer:
"""Middle server that forwards requests to backend servers"""
def __init__(self, runtime):
self.runtime = runtime
self.backend_client = None
async def initialize(self):
"""Initialize connection to backend servers"""
# Connect to backend servers
endpoint = (
self.runtime.namespace("demo").component("server").endpoint("generate")
)
self.backend_client = await endpoint.client()
await self.backend_client.wait_for_instances()
print("Middle server: Connected to backend servers")
async def generate(self, request, context):
"""Forward request to backend using round_robin and pass context"""
print("Middle server: Received request, forwarding to backend")
assert self.backend_client is not None, "Did you call initialize()?"
# Forward request to backend using round_robin with the same context
# This passes the cancellation context through to the backend
stream = await self.backend_client.generate(request, context=context)
# Stream responses back to client
try:
async for response in stream:
data = response.data()
print(f"Middle server: Forwarding response {data}")
yield data
except ValueError as e:
if str(e) != "Stream ended before generation completed":
raise
print("Middle server: Backend stream ended early due to cancellation")
async def main():
"""Start the middle server"""
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, True)
# Create middle server handler
handler = MiddleServer(runtime)
await handler.initialize()
# Create middle server component
component = runtime.namespace("demo").component("middle")
await component.create_service()
endpoint = component.endpoint("generate")
print("Middle server started")
print("Forwarding requests to backend servers...")
# Serve the endpoint - this blocks until shutdown
await endpoint.serve_endpoint(handler.generate)
runtime.shutdown()
if __name__ == "__main__":
asyncio.run(main())
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Simple server demonstration of request cancellation using context.is_stopped()
"""
import asyncio
from dynamo._core import DistributedRuntime
class DemoServer:
"""Simple server that generates numbers and respects cancellation"""
async def generate(self, request, context):
"""Generate numbers 0-999, checking for cancellation before each yield"""
for i in range(1000):
print(f"Server: Processing iteration {i}")
# Check if client requested cancellation
if context.is_stopped():
print(f"Server: Cancelled at iteration {i}")
raise asyncio.CancelledError
await asyncio.sleep(0.1) # Simulate some work
print(f"Server: Sending iteration {i}")
yield i
async def main():
"""Start the demo server"""
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, True)
# Create server component
component = runtime.namespace("demo").component("server")
await component.create_service()
endpoint = component.endpoint("generate")
handler = DemoServer()
print("Demo server started")
print("Waiting for client connections...")
# Serve the endpoint - this blocks until shutdown
await endpoint.serve_endpoint(handler.generate)
runtime.shutdown()
if __name__ == "__main__":
asyncio.run(main())
......@@ -68,13 +68,13 @@ The example demonstrates:
First, start the backend service:
```bash
cd examples/runtime/hello_world
cd examples/custom_backend/hello_world
python hello_world.py
```
Second, in a separate terminal, run the client:
```bash
cd examples/runtime/hello_world
cd examples/custom_backend/hello_world
python client.py
```
......@@ -112,7 +112,7 @@ Then deploy to kubernetes using
```bash
export NAMESPACE=<your-namespace>
cd dynamo
kubectl apply -f examples/runtime/hello_world/deploy/hello_world.yaml -n ${NAMESPACE}
kubectl apply -f examples/custom_backend/hello_world/deploy/hello_world.yaml -n ${NAMESPACE}
```
to delete your deployment:
......
......@@ -42,7 +42,7 @@ spec:
extraPodSpec:
mainContainer:
image: my-registry/dynamo:my-tag
workingDir: /workspace/examples/runtime/hello_world/
workingDir: /workspace/examples/custom_backend/hello_world/
command:
- /bin/sh
- -c
......@@ -81,7 +81,7 @@ spec:
extraPodSpec:
mainContainer:
image: my-registry/dynamo:my-tag
workingDir: /workspace/examples/runtime/hello_world/
workingDir: /workspace/examples/custom_backend/hello_world/
command:
- /bin/sh
- -c
......
......@@ -18,7 +18,7 @@ import logging
import uvloop
from dynamo.runtime import Context, DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
logger = logging.getLogger(__name__)
......@@ -26,13 +26,10 @@ configure_dynamo_logging(service_name="backend")
@dynamo_endpoint(str, str)
async def content_generator(request: str, context: Context):
logger.info(f"Received request: {request} with `id={context.id()}`")
async def content_generator(request: str):
logger.info(f"Received request: {request}")
for word in request.split(","):
await asyncio.sleep(1)
if context.is_stopped() or context.is_killed():
print("request got cancelled.")
return
yield f"Hello {word}!"
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Tests for the cancellation example in examples/custom_backend/cancellation
"""
import asyncio
import os
import subprocess
import pytest
pytestmark = pytest.mark.pre_merge
@pytest.fixture(scope="module")
def example_dir():
"""Path to the cancellation example directory"""
# Get the directory of this test file
test_dir = os.path.dirname(os.path.abspath(__file__))
# Navigate to the cancellation example directory relative to this test
return os.path.normpath(
os.path.join(test_dir, "../../../../../examples/custom_backend/cancellation")
)
@pytest.fixture(scope="function")
async def server_process(example_dir):
"""Start the backend server and clean up after test"""
server_proc = subprocess.Popen(
["python3", "-u", "server.py"],
cwd=example_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# Wait for server to start
await asyncio.sleep(1)
yield server_proc
# Cleanup
server_proc.terminate()
server_proc.wait(timeout=1)
@pytest.fixture(scope="function")
async def middle_server_process(example_dir, server_process):
"""Start the middle server (depends on backend server) and clean up after test"""
middle_proc = subprocess.Popen(
["python3", "-u", "middle_server.py"],
cwd=example_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# Wait for middle server to start
await asyncio.sleep(1)
yield middle_proc
# Cleanup
middle_proc.terminate()
middle_proc.wait(timeout=1)
def run_client(example_dir, use_middle=False):
"""Run the client and capture its output"""
cmd = ["python3", "client.py"]
if use_middle:
cmd.append("--middle")
client_proc = subprocess.Popen(
cmd,
cwd=example_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# Wait for client to complete
stdout, _ = client_proc.communicate(timeout=1)
if client_proc.returncode != 0:
pytest.fail(
f"Client failed with return code {client_proc.returncode}. Output: {stdout}"
)
return stdout
def stop_process(process):
"""Stop a running process and capture its output"""
process.terminate()
stdout, _ = process.communicate(timeout=1)
return stdout
@pytest.mark.asyncio
async def test_direct_connection_cancellation(example_dir, server_process):
"""Test cancellation with direct client-server connection"""
# Run the client (direct connection)
client_output = run_client(example_dir, use_middle=False)
# Wait for server to print cancellation message
await asyncio.sleep(1)
# Capture server output
server_output = stop_process(server_process)
# Assert expected messages
assert (
"Client: Cancelling after 3 responses..." in client_output
), f"Client output: {client_output}"
assert (
"Server: Cancelled at iteration" in server_output
), f"Server output: {server_output}"
@pytest.mark.asyncio
async def test_middle_server_cancellation(
example_dir, server_process, middle_server_process
):
"""Test cancellation with middle server proxy"""
# Run the client (through middle server)
client_output = run_client(example_dir, use_middle=True)
# Wait for server to print cancellation message
await asyncio.sleep(1)
# Capture output from all processes
server_output = stop_process(server_process)
middle_output = stop_process(middle_server_process)
# Assert expected messages
assert (
"Client: Cancelling after 3 responses..." in client_output
), f"Client output: {client_output}"
assert (
"Server: Cancelled at iteration" in server_output
), f"Server output: {server_output}"
assert (
"Middle server: Backend stream ended early due to cancellation" in middle_output
), f"Middle server output: {middle_output}"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Tests for the hello_world example in examples/custom_backend/hello_world
"""
import asyncio
import os
import subprocess
import pytest
pytestmark = pytest.mark.pre_merge
@pytest.fixture(scope="module")
def example_dir():
"""Path to the hello_world example directory"""
# Get the directory of this test file
test_dir = os.path.dirname(os.path.abspath(__file__))
# Navigate to the hello_world example directory relative to this test
return os.path.normpath(
os.path.join(test_dir, "../../../../examples/custom_backend/hello_world")
)
@pytest.fixture(scope="module")
async def server_process(example_dir):
"""Start the hello_world server and clean up after test"""
server_proc = subprocess.Popen(
["python3", "hello_world.py"],
cwd=example_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# Wait for server to start
await asyncio.sleep(1)
yield server_proc
# Cleanup
server_proc.terminate()
server_proc.wait(timeout=1)
async def run_client(example_dir):
"""Run the client for a specified duration and capture its output"""
client_proc = subprocess.Popen(
["python3", "-u", "client.py"],
cwd=example_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# Let it run for 5 seconds
await asyncio.sleep(5)
# Terminate the client
client_proc.terminate()
stdout, _ = client_proc.communicate(timeout=1)
return stdout
@pytest.mark.asyncio
async def test_hello_world(example_dir, server_process):
"""Test that hello_world starts and its client produces the expected output sequence"""
# Run the client for 5 seconds
client_output = await run_client(example_dir)
# Split output into lines and filter out empty lines
lines = [line.strip() for line in client_output.split("\n") if line.strip()]
# Each client iteration produces 4 lines in about 4 seconds
# The client ran for 5 seconds so the first iteration is expected to be completed
# Assert the first 4 lines are the expected sequence
assert (
len(lines) >= 4
), f"Expected at least 4 lines, got {len(lines)}. Output: {lines}"
expected_lines = ["Hello world!", "Hello sun!", "Hello moon!", "Hello star!"]
for i, expected_line in enumerate(expected_lines):
assert (
lines[i] == expected_line
), f"Line {i+1}: expected '{expected_line}', got '{lines[i]}'. Full output: {lines}"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment