Unverified Commit a13c4cb6 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat: add Rayon compute pool for CPU-intensive operations (#2969)


Signed-off-by: default avatarRyan Olson <rolson@nvidia.com>
parent 7ebbd001
This diff is collapsed.
...@@ -17,6 +17,7 @@ default = [] ...@@ -17,6 +17,7 @@ default = []
integration = [] integration = []
testing-etcd = [] # Tests that require an active ETCD server testing-etcd = [] # Tests that require an active ETCD server
tokio-console = ["dep:console-subscriber", "tokio/tracing"] tokio-console = ["dep:console-subscriber", "tokio/tracing"]
compute-validation = [] # Enable validation and timing for compute macros
[dependencies] [dependencies]
# Use workspace dependencies where available # Use workspace dependencies where available
...@@ -63,11 +64,14 @@ nid = { version = "3.0.0", features = ["serde"] } ...@@ -63,11 +64,14 @@ nid = { version = "3.0.0", features = ["serde"] }
nix = { version = "0.29", features = ["signal"] } nix = { version = "0.29", features = ["signal"] }
nuid = { version = "0.5" } nuid = { version = "0.5" }
once_cell = { version = "1" } once_cell = { version = "1" }
rayon = { version = "1.10" }
regex = { version = "1" } regex = { version = "1" }
socket2 = { version = "0.5.8" } socket2 = { version = "0.5.8" }
tokio-rayon = { version = "2.1" }
[dev-dependencies] [dev-dependencies]
assert_matches = { version = "1.5.0" } assert_matches = { version = "1.5.0" }
criterion = { version = "0.5", features = ["async_tokio"] }
env_logger = { version = "0.11" } env_logger = { version = "0.11" }
reqwest = { workspace = true } reqwest = { workspace = true }
rstest = { version = "0.23.0" } rstest = { version = "0.23.0" }
...@@ -75,3 +79,7 @@ temp-env = { version = "0.3.6" , features=["async_closure"] } ...@@ -75,3 +79,7 @@ temp-env = { version = "0.3.6" , features=["async_closure"] }
stdio-override = {version= "0.2.0"} stdio-override = {version= "0.2.0"}
jsonschema = {version = "0.17"} jsonschema = {version = "0.17"}
tempfile = { workspace = true } tempfile = { workspace = true }
[[bench]]
name = "compute_pool_overhead"
harness = false
\ No newline at end of file
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main};
use dynamo_runtime::compute::ComputePool;
use std::sync::Arc;
/// Compute-intensive function: sum of all primes up to n
fn compute_primes_sum(n: u64) -> u64 {
let mut sum = 0u64;
for candidate in 2..=n {
if is_prime(candidate) {
sum += candidate;
}
}
sum
}
fn is_prime(n: u64) -> bool {
if n <= 1 {
return false;
}
if n <= 3 {
return true;
}
if n.is_multiple_of(2) || n.is_multiple_of(3) {
return false;
}
let sqrt_n = (n as f64).sqrt() as u64;
for i in (5..=sqrt_n).step_by(6) {
if n.is_multiple_of(i) || n.is_multiple_of(i + 2) {
return false;
}
}
true
}
fn bench_compute_overhead(c: &mut Criterion) {
// Test 3 representative sizes: small, medium, large
let test_sizes = [10, 1_000, 100_000];
let mut group = c.benchmark_group("compute_overhead");
group.sample_size(10); // Reduce sample size for longer benchmarks
// Setup runtimes
let tokio_4thread = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
let tokio_1thread = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
// Setup compute pool
let compute_config = dynamo_runtime::compute::ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "bench".to_string(),
pin_threads: false,
};
let compute_pool = Arc::new(ComputePool::new(compute_config).unwrap());
for n in test_sizes {
// Benchmark 1: Direct execution on Tokio (4 threads)
group.bench_with_input(BenchmarkId::new("tokio_direct", n), &n, |b, &n| {
b.to_async(&tokio_4thread)
.iter(|| async move { black_box(compute_primes_sum(black_box(n))) });
});
// Benchmark 2: Rayon offload (1 Tokio thread + 4 Rayon threads)
let pool = compute_pool.clone();
group.bench_with_input(BenchmarkId::new("rayon_offload", n), &n, |b, &n| {
b.to_async(&tokio_1thread).iter(|| {
let pool = pool.clone();
async move {
pool.execute(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
}
});
});
// Benchmark 3: spawn_blocking (4 Tokio threads)
group.bench_with_input(BenchmarkId::new("spawn_blocking", n), &n, |b, &n| {
b.to_async(&tokio_4thread).iter(|| async move {
tokio::task::spawn_blocking(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
});
});
}
group.finish();
}
fn bench_parallel_tasks(c: &mut Criterion) {
let mut group = c.benchmark_group("parallel_tasks");
group.sample_size(10); // Even smaller sample for parallel benchmarks
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
let compute_config = dynamo_runtime::compute::ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "bench".to_string(),
pin_threads: false,
};
let compute_pool = Arc::new(ComputePool::new(compute_config).unwrap());
// Test with different batch sizes
for batch_size in [10, 100] {
let n = 10_000; // Fixed compute size
// Direct parallel execution on Tokio threads
group.bench_with_input(
BenchmarkId::new("tokio_direct_parallel", batch_size),
&batch_size,
|b, &batch_size| {
b.to_async(&tokio_runtime).iter(|| async move {
let tasks = (0..batch_size)
.map(|_| tokio::spawn(async move { compute_primes_sum(n) }))
.collect::<Vec<_>>();
for task in tasks {
black_box(task.await.unwrap());
}
});
},
);
// Parallel execution with Rayon
let pool = compute_pool.clone();
group.bench_with_input(
BenchmarkId::new("rayon_parallel", batch_size),
&batch_size,
|b, &batch_size| {
b.to_async(&tokio_runtime).iter(|| {
let pool = pool.clone();
async move {
let tasks = (0..batch_size)
.map(|_| {
let pool = pool.clone();
tokio::spawn(async move {
pool.execute(move || compute_primes_sum(n)).await.unwrap()
})
})
.collect::<Vec<_>>();
for task in tasks {
black_box(task.await.unwrap());
}
}
});
},
);
// Parallel execution with spawn_blocking
group.bench_with_input(
BenchmarkId::new("spawn_blocking_parallel", batch_size),
&batch_size,
|b, &batch_size| {
b.to_async(&tokio_runtime).iter(|| async move {
let tasks = (0..batch_size)
.map(|_| {
tokio::spawn(async move {
tokio::task::spawn_blocking(move || compute_primes_sum(n))
.await
.unwrap()
})
})
.collect::<Vec<_>>();
for task in tasks {
black_box(task.await.unwrap());
}
});
},
);
}
group.finish();
}
fn bench_block_in_place_overhead(c: &mut Criterion) {
// Test block_in_place overhead for medium-sized tasks
let test_sizes = [10, 1_000, 100_000];
let mut group = c.benchmark_group("block_in_place_overhead");
group.sample_size(10);
// Setup 4-thread runtime for testing
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
// Setup compute pool for comparison
let compute_config = dynamo_runtime::compute::ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "bench".to_string(),
pin_threads: false,
};
let compute_pool = Arc::new(ComputePool::new(compute_config).unwrap());
for n in test_sizes {
// Benchmark 1: Direct execution (baseline)
group.bench_with_input(BenchmarkId::new("direct", n), &n, |b, &n| {
b.to_async(&tokio_runtime)
.iter(|| async move { black_box(compute_primes_sum(black_box(n))) });
});
// Benchmark 2: block_in_place (no semaphore)
group.bench_with_input(BenchmarkId::new("block_in_place", n), &n, |b, &n| {
b.to_async(&tokio_runtime).iter(|| async move {
tokio::task::block_in_place(|| black_box(compute_primes_sum(black_box(n))))
});
});
// Benchmark 3: spawn_blocking
group.bench_with_input(BenchmarkId::new("spawn_blocking", n), &n, |b, &n| {
b.to_async(&tokio_runtime).iter(|| async move {
tokio::task::spawn_blocking(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
});
});
// Benchmark 4: Rayon offload
let pool = compute_pool.clone();
group.bench_with_input(BenchmarkId::new("rayon_offload", n), &n, |b, &n| {
b.to_async(&tokio_runtime).iter(|| {
let pool = pool.clone();
async move {
pool.execute(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
}
});
});
}
group.finish();
}
criterion_group!(
benches,
bench_compute_overhead,
bench_parallel_tasks,
bench_block_in_place_overhead
);
criterion_main!(benches);
# Rayon-Tokio Integration Strategy
## Overview
This document describes the integration strategy for combining Tokio's asynchronous runtime with Rayon's data-parallel compute capabilities in the Dynamo runtime. The core philosophy is simple:
- **Tokio** handles I/O-bound operations, waiting, and coordination
- **Rayon** handles CPU-bound operations and data parallelism
- Multiple async tasks can concurrently submit different types of work to the shared Rayon thread pool
## Architecture
```text
+---------------------------------------------------------------+
| Tokio Runtime |
| +-----------+ +-----------+ +-----------+ |
| | Async | | Async | | Async | |
| | Task 1 | | Task 2 | | Task 3 | |
| | | | | | | |
| | Receives | | Processes | | Handles | |
| | requests | | streams | | batches | |
| +-----+-----+ +-----+-----+ +-----+-----+ |
| | | | |
| +----------------+----------------+ |
| | |
| tokio_rayon::spawn |
| | |
+-------------------------+------------------------------------+
|
v
+---------------------------------------------------------------+
| Rayon Thread Pool |
| |
| +----------------------------------------------------------+ |
| | Work-Stealing Thread Pool (N threads) | |
| | | |
| | +---------+ +-----------+ +------------------+ | |
| | | scope() | | par_iter()| | join() | | |
| | | tasks | | chunks | | computations | | |
| | +---------+ +-----------+ +------------------+ | |
| | | |
| | All patterns share the same thread pool | |
| +----------------------------------------------------------+ |
+---------------------------------------------------------------+
```
## When to Use Tokio vs Rayon
### Use Tokio (async/await) when
- **Waiting for I/O**: Network requests, file I/O, database queries
- **Coordinating tasks**: Channels, synchronization, signaling
- **Stream processing**: Items arrive over time with delays
- **Resource pooling**: Connection pools, async locks
- **Service orchestration**: Managing component lifecycles
### Use Rayon (compute pool) when
- **Batch processing**: You have all data ready for parallel processing
- **CPU-intensive work**: Computation takes >1ms per item
- **Data transformation**: Tokenization, serialization, compression
- **Parallel algorithms**: Matrix operations, sorting, searching
- **Map-reduce patterns**: Aggregations over large datasets
### Decision Thresholds
- Use Rayon when processing **≥10 items** in parallel
- Use `spawn_blocking` when CPU work takes **>1ms**
- Keep Tokio for operations with **>100μs waits** between items
- Use Rayon when you can **saturate multiple CPU cores**
### Overhead Considerations
Based on benchmarks, the async bridge between Tokio and Rayon has:
- **~25μs overhead** for small tasks (due to channel communication)
- **~4% overhead** for tasks taking >2ms
- **Negligible overhead** for tasks taking >10ms
For minimal overhead when using Rayon from async context:
- **Small tasks (<100μs)**: Run directly on Tokio
- **Medium tasks (100μs-1ms)**: Use `spawn_blocking` + `pool.execute_sync()`
- **Large tasks (>1ms)**: Use `pool.execute()` for convenience
## Concurrent Usage Patterns
The key insight is that multiple async tasks can concurrently use the same Rayon thread pool with different parallelization patterns. Rayon's work-stealing scheduler efficiently distributes work regardless of the pattern used.
### Pattern 1: Concurrent Scope and ParIter
```rust,ignore
use std::sync::Arc;
use dynamo_runtime::compute::ComputePool;
async fn concurrent_compute_tasks(pool: Arc<ComputePool>) {
// Task 1: Using scope for dynamic task spawning
let task1 = tokio::spawn({
let pool = pool.clone();
async move {
pool.execute_scoped(|scope| {
// Dynamically spawn tasks based on runtime conditions
for i in 0..num_tasks {
scope.spawn(move |_| {
expensive_computation(i)
});
}
}).await
}
});
// Task 2: Using parallel iterators for batch processing
let task2 = tokio::spawn({
let pool = pool.clone();
async move {
pool.install(|| {
// Process data in parallel chunks
data.par_chunks(100)
.map(|chunk| transform_chunk(chunk))
.collect::<Vec<_>>()
}).await
}
});
// Task 3: Using join for binary parallelism
let task3 = tokio::spawn({
let pool = pool.clone();
async move {
pool.join(
|| compute_left_branch(),
|| compute_right_branch(),
).await
}
});
// All three tasks run concurrently, sharing the Rayon thread pool
let (r1, r2, r3) = tokio::join!(task1, task2, task3);
}
```
### Pattern 2: Stream Processing with Batch Compute
```rust,no_run
# use futures::StreamExt;
# use rayon::prelude::*;
# use std::sync::Arc;
# struct Data;
# fn process_item(_: &Data) -> i32 { 0 }
# async fn send_results(_: Vec<i32>) {}
# use dynamo_runtime::compute::ComputePool;
# use futures::stream::Stream;
/// Example: Process async stream with CPU-intensive batch operations
async fn stream_with_compute(
pool: Arc<ComputePool>,
stream: impl Stream<Item = Vec<Data>>,
) {
// Use for_each_concurrent for proper stream consumption
stream.for_each_concurrent(4, |batch| {
let pool = pool.clone();
async move {
// Process batch using parallel iterators
let result = pool.install(move || {
batch.par_iter()
.map(|item| process_item(item))
.collect::<Vec<_>>()
}).await.unwrap();
// Async I/O to send results
send_results(result).await;
}
}).await;
}
```
### Pattern 3: Mixed Workload Service
```rust,ignore
/// Real-world example: LLM service with mixed workloads
struct LLMService {
runtime: Arc<Runtime>,
tokenizer: Arc<Tokenizer>,
}
impl LLMService {
async fn run(&self) {
let pool = self.runtime.compute_pool()
.expect("Compute pool required");
// Tokenization service - uses parallel iterators
let tokenization_task = {
let pool = pool.clone();
let tokenizer = self.tokenizer.clone();
tokio::spawn(async move {
loop {
// Async I/O: receive batch from network
let texts = receive_tokenization_batch().await;
// CPU-bound: parallel tokenization
let tokens = pool.install(move || {
texts.par_iter()
.map(|text| tokenizer.encode(text))
.collect::<Vec<_>>()
}).await.unwrap();
// Async I/O: send results
send_tokens(tokens).await;
}
})
};
// Embedding service - uses scope for multi-stage computation
let embedding_task = {
let pool = pool.clone();
tokio::spawn(async move {
loop {
// Async I/O: receive request
let request = receive_embedding_request().await;
// CPU-bound: multi-stage parallel computation
let embeddings = pool.execute_scoped(|scope| {
let mut text_emb = None;
let mut context_emb = None;
scope.spawn(|_| {
text_emb = Some(compute_text_embedding(&request.text));
});
scope.spawn(|_| {
context_emb = Some(compute_context_embedding(&request.context));
});
// Scope waits for both to complete
combine_embeddings(text_emb.unwrap(), context_emb.unwrap())
}).await.unwrap();
// Async I/O: send results
send_embeddings(embeddings).await;
}
})
};
// Batch inference service - uses nested parallelism
let inference_task = {
let pool = pool.clone();
tokio::spawn(async move {
loop {
let batch = receive_inference_batch().await;
let results = pool.execute_scoped(|scope| {
let mut results = Vec::with_capacity(batch.len());
// Spawn a task for each item
for item in batch {
scope.spawn(move |s2| {
// Within each task, use parallel iterators
let preprocessed = item.data
.par_chunks(10)
.map(|chunk| preprocess(chunk))
.collect::<Vec<_>>();
// Can spawn more tasks within nested scope
let mut stages = vec![];
for p in preprocessed {
s2.spawn(move |_| {
stages.push(run_inference(p));
});
}
results.push(merge_stages(stages));
});
}
results
}).await.unwrap();
send_inference_results(results).await;
}
})
};
// All services run concurrently, sharing the compute pool
tokio::join!(tokenization_task, embedding_task, inference_task);
}
}
```
## How It Works: Thread Pool Sharing
Rayon's work-stealing scheduler ensures efficient resource utilization even when different async tasks submit different types of work:
1. **Work Queues**: Each Rayon thread has a local deque (double-ended queue)
2. **Local Execution**: Threads prefer executing their own tasks (LIFO for cache locality)
3. **Work Stealing**: Idle threads steal tasks from busy threads (FIFO from the other end)
4. **No Interference**: Different parallelization patterns (scope, par_iter) coexist peacefully
This means:
- A `scope` task spawning many small tasks works alongside `par_chunks` processing large batches
- The thread pool automatically balances load between different types of work
- No manual coordination needed between different async tasks using the pool
## Performance Considerations
### Thread Pool Sizing
```toml
[runtime]
# Tokio threads: optimize for concurrent async tasks
num_worker_threads = 8 # Usually number of cores
# Rayon threads: optimize for CPU saturation
compute_threads = 4 # Often cores/2 to avoid oversubscription
```
### Avoiding Oversubscription
Total threads = Tokio workers + Rayon threads + System threads
**Recommendation**: Keep total ≤ 1.5 × physical cores
### Monitoring Pool Utilization
```rust,ignore
// Check pool metrics
let metrics = pool.metrics();
println!("Active tasks: {}", metrics.tasks_active());
println!("Average duration: {:.2}ms", metrics.avg_task_duration_us() / 1000.0);
println!("Slow tasks (>100ms): {}", metrics.slow_tasks());
// Adjust pool size if consistently over/under utilized
if metrics.tasks_active() > pool.num_threads() * 2 {
// Consider increasing compute_threads
}
```
## Common Patterns and Best Practices
### DO: Batch Collection Before Processing
```rust,ignore
// ✅ Good: Collect async items, then process in parallel
let items = stream.take(100).collect::<Vec<_>>().await;
let processed = pool.install(|| {
items.par_iter().map(|item| process(item)).collect()
}).await?;
```
### DON'T: Mix Async and Compute in Tight Loops
```rust,ignore
// ❌ Bad: Alternating between async and compute
for item in items {
let data = fetch_data(item).await; // Async
let result = pool.execute(|| compute(data)).await?; // Compute
store_result(result).await; // Async
}
// ✅ Good: Batch operations
let all_data = futures::future::join_all(
items.iter().map(|item| fetch_data(item))
).await;
let all_results = pool.install(|| {
all_data.par_iter().map(|data| compute(data)).collect()
}).await?;
futures::future::join_all(
all_results.iter().map(|result| store_result(result))
).await;
```
### DO: Use Scope for Dynamic Parallelism
```rust,ignore
// ✅ Good: When you don't know the parallelism level upfront
pool.execute_scoped(|scope| {
while let Some(work) = find_more_work() {
scope.spawn(move |_| {
process_work(work);
});
}
}).await?;
```
### DO: Use ParIter for Data Parallelism
```rust,ignore
// ✅ Good: When processing collections
pool.install(|| {
data.par_chunks(optimal_chunk_size())
.map(|chunk| process_chunk(chunk))
.reduce(|| initial_value(), |a, b| combine(a, b))
}).await?;
```
## Troubleshooting
### Issue: High Latency Despite Low CPU Usage
**Cause**: Too few Rayon threads for the workload
**Solution**: Increase `compute_threads` configuration
### Issue: System Feels Sluggish
**Cause**: Thread oversubscription
**Solution**: Reduce total thread count (Tokio + Rayon)
### Issue: Uneven Work Distribution
**Cause**: Poor chunk size selection
**Solution**: Use smaller chunks or dynamic scheduling with `scope`
### Issue: Deadlock or Hanging
**Cause**: Nested `install()` calls or blocking in Rayon threads
**Solution**: Use `execute()` instead of `install()` for simple tasks
## Configuration Examples
### High-Throughput Service
```toml
# Many concurrent requests, moderate compute per request
[runtime]
num_worker_threads = 16
compute_threads = 8
compute_stack_size = "4MB"
```
### Batch Processing System
```toml
# Few concurrent tasks, heavy compute per batch
[runtime]
num_worker_threads = 4
compute_threads = 12
compute_stack_size = "8MB"
```
### Mixed Workload
```toml
# Balance between async I/O and compute
[runtime]
num_worker_threads = 8
compute_threads = 6
compute_stack_size = "2MB"
```
## Summary
The Rayon-Tokio integration provides a powerful model for handling mixed workloads:
1. **Tokio** manages async I/O and coordination
2. **Rayon** provides a shared compute thread pool
3. Multiple async tasks can concurrently use different Rayon patterns
4. Work-stealing ensures efficient resource utilization
5. Clear separation between I/O-bound and CPU-bound work
This architecture enables building high-performance services that efficiently handle both network I/O and CPU-intensive computations without manual thread management or complex synchronization.
\ No newline at end of file
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use dynamo_runtime::compute::{ComputeConfig, ComputePool};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::time::sleep;
/// Compute-intensive function: sum of all primes up to n
fn compute_primes_sum(n: u64) -> u64 {
let mut sum = 0u64;
for candidate in 2..=n {
if is_prime(candidate) {
sum += candidate;
}
}
sum
}
fn is_prime(n: u64) -> bool {
if n <= 1 {
return false;
}
if n <= 3 {
return true;
}
if n.is_multiple_of(2) || n.is_multiple_of(3) {
return false;
}
let sqrt_n = (n as f64).sqrt() as u64;
for i in (5..=sqrt_n).step_by(6) {
if n.is_multiple_of(i) || n.is_multiple_of(i + 2) {
return false;
}
}
true
}
/// Simulated async task that does both I/O and compute
async fn async_task_inline(id: usize, n: u64, io_delay: Duration) -> (usize, Duration) {
let start = Instant::now();
// Simulate async I/O operation
sleep(io_delay).await;
// CPU-intensive work (blocks the async runtime!)
let _result = compute_primes_sum(n);
// More async I/O
sleep(io_delay).await;
(id, start.elapsed())
}
/// Async task that offloads compute to Rayon pool
async fn async_task_rayon(
id: usize,
n: u64,
io_delay: Duration,
pool: Arc<ComputePool>,
) -> (usize, Duration) {
let start = Instant::now();
// Simulate async I/O operation
sleep(io_delay).await;
// CPU-intensive work (offloaded, doesn't block runtime)
let _result = pool.execute(move || compute_primes_sum(n)).await.unwrap();
// More async I/O
sleep(io_delay).await;
(id, start.elapsed())
}
/// Async task using spawn_blocking
async fn async_task_spawn_blocking(id: usize, n: u64, io_delay: Duration) -> (usize, Duration) {
let start = Instant::now();
// Simulate async I/O operation
sleep(io_delay).await;
// CPU-intensive work (offloaded to blocking pool)
let _result = tokio::task::spawn_blocking(move || compute_primes_sum(n))
.await
.unwrap();
// More async I/O
sleep(io_delay).await;
(id, start.elapsed())
}
async fn run_throughput_test(
name: &str,
num_tasks: usize,
n: u64,
io_delay: Duration,
pool: Option<Arc<ComputePool>>,
mode: &str,
) -> (Duration, Vec<Duration>) {
println!("\n Running: {} (n={}, tasks={})", name, n, num_tasks);
let completed = Arc::new(AtomicU64::new(0));
let start = Instant::now();
let tasks: Vec<_> = (0..num_tasks)
.map(|id| {
let pool = pool.clone();
let completed = completed.clone();
let mode = mode.to_string();
tokio::spawn(async move {
let result = match mode.as_str() {
"inline" => async_task_inline(id, n, io_delay).await,
"rayon" => async_task_rayon(id, n, io_delay, pool.unwrap()).await,
"spawn_blocking" => async_task_spawn_blocking(id, n, io_delay).await,
_ => panic!("Unknown mode"),
};
let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
if count.is_multiple_of(10) {
print!(".");
use std::io::{self, Write};
io::stdout().flush().unwrap();
}
result
})
})
.collect();
let mut latencies = Vec::new();
for task in tasks {
let (_id, latency) = task.await.unwrap();
latencies.push(latency);
}
let total_time = start.elapsed();
println!(" Done in {:.2}s", total_time.as_secs_f64());
(total_time, latencies)
}
fn calculate_percentiles(latencies: &mut [Duration]) -> (Duration, Duration, Duration) {
latencies.sort();
let len = latencies.len();
let p50 = latencies[len / 2];
let p95 = latencies[len * 95 / 100];
let p99 = latencies[len * 99 / 100];
(p50, p95, p99)
}
fn print_results(_name: &str, total: Duration, latencies: &mut [Duration]) {
let (p50, p95, p99) = calculate_percentiles(latencies);
let throughput = latencies.len() as f64 / total.as_secs_f64();
println!(" Total time: {:.2}s", total.as_secs_f64());
println!(" Throughput: {:.1} tasks/s", throughput);
println!(" Latency p50: {:.2}ms", p50.as_secs_f64() * 1000.0);
println!(" Latency p95: {:.2}ms", p95.as_secs_f64() * 1000.0);
println!(" Latency p99: {:.2}ms", p99.as_secs_f64() * 1000.0);
}
#[tokio::main]
async fn main() -> Result<()> {
println!("Async Throughput Demonstration");
println!("==================================\n");
println!("This demo shows how compute-intensive work affects async task throughput.\n");
// Create compute pool directly
let compute_config = ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "demo".to_string(),
pin_threads: false,
};
let pool = Arc::new(ComputePool::new(compute_config)?);
println!("Configuration:");
println!(" Rayon threads: {}", pool.num_threads());
// Test parameters
let num_tasks = 100;
let io_delay = Duration::from_millis(10);
println!("\nTest: {} concurrent async tasks", num_tasks);
println!("Each task: 10ms I/O → compute → 10ms I/O");
println!("Expected minimum time: ~20ms (if no blocking)");
// Test with different compute loads
for n in [10, 1_000, 100_000] {
println!("\n{:=<60}", "");
println!("Compute load: n={} (prime sum)", n);
println!("{:=<60}", "");
// Measure compute time alone
let compute_start = Instant::now();
let _ = compute_primes_sum(n);
let compute_time = compute_start.elapsed();
println!(
"Pure compute time: {:.2}ms",
compute_time.as_secs_f64() * 1000.0
);
// Test 1: Inline execution (blocks async runtime)
let (total1, mut latencies1) = run_throughput_test(
"Inline (blocks runtime)",
num_tasks,
n,
io_delay,
None,
"inline",
)
.await;
print_results("Inline", total1, &mut latencies1);
// Test 2: Rayon offload
let (total2, mut latencies2) = run_throughput_test(
"Rayon offload",
num_tasks,
n,
io_delay,
Some(pool.clone()),
"rayon",
)
.await;
print_results("Rayon", total2, &mut latencies2);
// Test 3: spawn_blocking
let (total3, mut latencies3) = run_throughput_test(
"spawn_blocking",
num_tasks,
n,
io_delay,
None,
"spawn_blocking",
)
.await;
print_results("spawn_blocking", total3, &mut latencies3);
// Analysis
println!("\n Impact Analysis:");
let speedup_rayon = total1.as_secs_f64() / total2.as_secs_f64();
let speedup_spawn = total1.as_secs_f64() / total3.as_secs_f64();
println!(
" Rayon vs Inline: {:.2}x throughput",
speedup_rayon
);
println!(
" spawn_blocking vs Inline: {:.2}x throughput",
speedup_spawn
);
if compute_time.as_millis() > 1 {
let blocking_factor = compute_time.as_secs_f64() / io_delay.as_secs_f64();
println!(
"\n Compute time ({:.1}ms) is {:.1}x the I/O time",
compute_time.as_secs_f64() * 1000.0,
blocking_factor
);
println!(" This severely impacts async concurrency when run inline!");
}
}
// Show pool metrics
println!("\n Compute Pool Metrics:");
println!("========================");
println!("{}", pool.metrics());
println!("\n Conclusion:");
println!("==============");
println!("• Small compute (n=10): Overhead may not justify offloading");
println!("• Medium compute (n=1000): Offloading preserves async throughput");
println!("• Large compute (n=100000): Offloading is essential for responsiveness");
println!("\nKey insight: Even small amounts of blocking compute can destroy");
println!("async throughput when you have many concurrent tasks!");
Ok(())
}
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use dynamo_runtime::compute::{ComputeConfig, ComputePool};
use std::sync::Arc;
use std::time::{Duration, Instant};
/// Compute-intensive function: sum of all primes up to n
fn compute_primes_sum(n: u64) -> u64 {
let mut sum = 0u64;
for candidate in 2..=n {
if is_prime(candidate) {
sum += candidate;
}
}
sum
}
fn is_prime(n: u64) -> bool {
if n <= 1 {
return false;
}
if n <= 3 {
return true;
}
if n.is_multiple_of(2) || n.is_multiple_of(3) {
return false;
}
let sqrt_n = (n as f64).sqrt() as u64;
for i in (5..=sqrt_n).step_by(6) {
if n.is_multiple_of(i) || n.is_multiple_of(i + 2) {
return false;
}
}
true
}
async fn measure_direct(n: u64) -> Duration {
let start = Instant::now();
let _ = compute_primes_sum(n);
start.elapsed()
}
async fn measure_rayon(pool: &ComputePool, n: u64) -> Duration {
let start = Instant::now();
let _ = pool.execute(move || compute_primes_sum(n)).await.unwrap();
start.elapsed()
}
async fn measure_spawn_blocking(n: u64) -> Duration {
let start = Instant::now();
let _ = tokio::task::spawn_blocking(move || compute_primes_sum(n))
.await
.unwrap();
start.elapsed()
}
fn format_duration(d: Duration) -> String {
if d.as_secs() > 0 {
format!("{:.2}s", d.as_secs_f64())
} else if d.as_millis() > 0 {
format!("{:.2}ms", d.as_secs_f64() * 1000.0)
} else if d.as_micros() > 0 {
format!("{:.2}μs", d.as_secs_f64() * 1_000_000.0)
} else {
format!("{}ns", d.as_nanos())
}
}
fn print_table_header() {
println!("\n{:=<120}", "");
println!(
"{:>10} | {:>15} | {:>15} | {:>15} | {:>12} | {:>12} | {:>20}",
"n", "Direct", "Rayon", "spawn_blocking", "Rayon Ratio", "Spawn Ratio", "Winner"
);
println!("{:-<120}", "");
}
fn print_row(
n: u64,
direct: Duration,
rayon: Duration,
spawn_blocking: Duration,
highlight_crossover: bool,
) {
let rayon_ratio = rayon.as_secs_f64() / direct.as_secs_f64();
let spawn_ratio = spawn_blocking.as_secs_f64() / direct.as_secs_f64();
let winner = if rayon_ratio < 1.0 && rayon_ratio < spawn_ratio {
"Rayon ✓"
} else if spawn_ratio < 1.0 && spawn_ratio < rayon_ratio {
"spawn_blocking"
} else {
"Direct"
};
let row = format!(
"{:>10} | {:>15} | {:>15} | {:>15} | {:>12.2}x | {:>12.2}x | {:>20}",
n,
format_duration(direct),
format_duration(rayon),
format_duration(spawn_blocking),
rayon_ratio,
spawn_ratio,
winner
);
if highlight_crossover {
println!(">>> {} <<<", row);
} else {
println!("{}", row);
}
}
#[tokio::main]
async fn main() -> Result<()> {
println!("🔬 Compute Pool Overhead Demonstration");
println!("=====================================\n");
// Create compute pool directly
let compute_config = ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "demo".to_string(),
pin_threads: false,
};
let pool = Arc::new(ComputePool::new(compute_config)?);
println!("Configuration:");
println!(" Rayon threads: {}", pool.num_threads());
println!();
// Warm up all execution paths
println!("Warming up...");
for _ in 0..5 {
let _ = measure_direct(100).await;
let _ = measure_rayon(&pool, 100).await;
let _ = measure_spawn_blocking(100).await;
}
print_table_header();
// Dynamic scanning with exponential growth
let mut n = 10u64;
let mut results = Vec::new();
let mut found_crossover = false;
let mut last_rayon_ratio = f64::MAX;
while n <= 1_000_000 {
// Measure each approach multiple times and take the minimum
let mut direct_times = Vec::new();
for _ in 0..3 {
direct_times.push(measure_direct(n).await);
}
let direct = direct_times.into_iter().min().unwrap();
let mut rayon_times = Vec::new();
for _ in 0..3 {
rayon_times.push(measure_rayon(&pool, n).await);
}
let rayon = rayon_times.into_iter().min().unwrap();
let mut spawn_times = Vec::new();
for _ in 0..3 {
spawn_times.push(measure_spawn_blocking(n).await);
}
let spawn_blocking = spawn_times.into_iter().min().unwrap();
let rayon_ratio = rayon.as_secs_f64() / direct.as_secs_f64();
// Detect crossover point
let is_crossover = !found_crossover && rayon_ratio < 1.0 && last_rayon_ratio >= 1.0;
if is_crossover {
found_crossover = true;
}
print_row(n, direct, rayon, spawn_blocking, is_crossover);
results.push((n, direct, rayon, spawn_blocking));
last_rayon_ratio = rayon_ratio;
// Adaptive step size
if n < 100 {
n = (n as f64 * 2.0) as u64;
} else if n < 10_000 {
n = (n as f64 * 3.16) as u64; // ~10x every 2 steps
} else {
n *= 10;
}
}
println!("{:=<120}", "");
// Analysis
println!("\n Analysis:");
println!("============\n");
if found_crossover {
let crossover_point = results
.iter()
.find(|(_, d, r, _)| r.as_secs_f64() < d.as_secs_f64())
.map(|(n, _, _, _)| *n);
if let Some(n) = crossover_point {
println!("✓ Rayon becomes beneficial at n ≈ {}", n);
println!(
" Below n={}: Overhead dominates, direct execution is faster",
n
);
println!(
" Above n={}: Compute dominates, Rayon offload is faster",
n
);
}
} else {
println!("✗ No crossover found in tested range");
println!(" Direct execution was always faster (overhead too high)");
}
// Find where spawn_blocking becomes beneficial
let spawn_crossover = results
.iter()
.find(|(_, d, _, s)| s.as_secs_f64() < d.as_secs_f64())
.map(|(n, _, _, _)| *n);
if let Some(n) = spawn_crossover {
println!("\n✓ spawn_blocking becomes beneficial at n ≈ {}", n);
}
// Show overhead at minimum
if let Some((n, direct, rayon, spawn)) = results.first() {
let rayon_overhead = rayon.as_secs_f64() - direct.as_secs_f64();
let spawn_overhead = spawn.as_secs_f64() - direct.as_secs_f64();
println!("\nOverhead at n={}:", n);
println!(
" Rayon: +{}",
format_duration(Duration::from_secs_f64(rayon_overhead))
);
println!(
" spawn_blocking: +{}",
format_duration(Duration::from_secs_f64(spawn_overhead))
);
}
// Show benefit at maximum
if let Some((n, direct, rayon, spawn)) = results.last() {
let rayon_speedup = direct.as_secs_f64() / rayon.as_secs_f64();
let spawn_speedup = direct.as_secs_f64() / spawn.as_secs_f64();
println!("\nSpeedup at n={}:", n);
println!(" Rayon: {:.2}x faster", rayon_speedup);
println!(" spawn_blocking: {:.2}x faster", spawn_speedup);
}
// Print pool metrics
println!("\n Compute Pool Metrics:");
println!("========================");
println!("{}", pool.metrics());
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Example demonstrating the use of ComputePool for CPU-intensive operations
//!
//! This example shows various patterns for using Rayon with Tokio:
//! - Fork-join with scope
//! - Parallel batch processing
//! - Dynamic task spawning
//! - Integration with async services
use anyhow::Result;
use dynamo_runtime::{
Worker,
compute::{ComputePool, ComputePoolExt},
};
use std::sync::{Arc, Mutex};
use std::time::Instant;
/// Simulate expensive CPU-bound computation
fn expensive_computation(input: u64) -> u64 {
// Simulate work with a simple prime check
let mut sum = 0u64;
for i in 2..input {
if is_prime(i) {
sum += i;
}
}
sum
}
fn is_prime(n: u64) -> bool {
if n <= 1 {
return false;
}
for i in 2..((n as f64).sqrt() as u64 + 1) {
if n.is_multiple_of(i) {
return false;
}
}
true
}
/// Example 1: Simple fork-join pattern
async fn example_fork_join(pool: &ComputePool) -> Result<()> {
println!("\n=== Example 1: Fork-Join Pattern ===");
let start = Instant::now();
// Run two expensive computations in parallel
let (result1, result2) = pool
.join(
|| expensive_computation(10000),
|| expensive_computation(20000),
)
.await?;
println!("Fork-join results: {} and {}", result1, result2);
println!("Time: {:?}", start.elapsed());
Ok(())
}
/// Example 2: Scope-based parallel execution
async fn example_scope(pool: &ComputePool) -> Result<()> {
println!("\n=== Example 2: Scope-based Execution ===");
let data = [1000, 2000, 3000, 4000, 5000];
let start = Instant::now();
let results = pool
.execute_scoped(move |scope| {
let results = Arc::new(Mutex::new(vec![0u64; data.len()]));
for (i, &value) in data.iter().enumerate() {
let results = results.clone();
scope.spawn(move |_| {
let result = expensive_computation(value);
let mut r = results.lock().unwrap();
r[i] = result;
});
}
Arc::try_unwrap(results).unwrap().into_inner().unwrap()
})
.await?;
println!("Scope results: {:?}", results);
println!("Time: {:?}", start.elapsed());
Ok(())
}
/// Example 3: Parallel map using extension trait
async fn example_parallel_map(pool: &ComputePool) -> Result<()> {
println!("\n=== Example 3: Parallel Map ===");
let items: Vec<u64> = (1..=10).map(|i| i * 1000).collect();
let start = Instant::now();
let results = pool
.parallel_map(items.clone(), expensive_computation)
.await?;
println!("Parallel map processed {} items", results.len());
println!("Time: {:?}", start.elapsed());
// Compare with sequential processing
let start_seq = Instant::now();
let _sequential: Vec<_> = items.iter().map(|&i| expensive_computation(i)).collect();
println!("Sequential time: {:?}", start_seq.elapsed());
Ok(())
}
/// Example 4: Simulating tokenization workload
async fn example_tokenization(pool: &ComputePool) -> Result<()> {
println!("\n=== Example 4: Batch Tokenization Simulation ===");
// Simulate batch of texts to tokenize
let texts: Vec<String> = (0..100)
.map(|i| {
format!(
"This is sample text number {} that needs to be tokenized",
i
)
})
.collect();
let start = Instant::now();
let texts_len = texts.len();
// Process in parallel using scope
let token_counts = pool
.execute_scoped(move |scope| {
let counts = Arc::new(Mutex::new(vec![0usize; texts_len]));
for (i, text) in texts.iter().enumerate() {
let text = text.clone();
let counts = counts.clone();
scope.spawn(move |_| {
// Simulate tokenization by counting words
let count = text.split_whitespace().count();
// Simulate more work
std::thread::sleep(std::time::Duration::from_micros(100));
let mut c = counts.lock().unwrap();
c[i] = count;
});
}
Arc::try_unwrap(counts).unwrap().into_inner().unwrap()
})
.await?;
let total_tokens: usize = token_counts.iter().sum();
println!(
"Tokenized {} texts, total tokens: {}",
texts_len, total_tokens
);
println!("Time: {:?}", start.elapsed());
Ok(())
}
/// Example 5: Hierarchical computation
async fn example_hierarchical(pool: &ComputePool) -> Result<()> {
println!("\n=== Example 5: Hierarchical Computation ===");
let start = Instant::now();
let result = pool
.execute_scoped(move |scope| {
let phase1_results = Arc::new(Mutex::new(vec![0u64; 4]));
// First level: compute initial values
for i in 0..4 {
let phase1_results = phase1_results.clone();
scope.spawn(move |s2| {
let intermediate = expensive_computation((i + 1) as u64 * 1000);
// Second level: further process each result
let phase2_results = Arc::new(Mutex::new(vec![0u64; 2]));
for j in 0..2 {
let value = intermediate + (j as u64 * 100);
let phase2_results = phase2_results.clone();
s2.spawn(move |_| {
let result = expensive_computation(value);
let mut r = phase2_results.lock().unwrap();
r[j] = result;
});
}
let sum: u64 = phase2_results.lock().unwrap().iter().sum();
let mut p1 = phase1_results.lock().unwrap();
p1[i] = sum;
});
}
phase1_results.lock().unwrap().iter().sum::<u64>()
})
.await?;
println!("Hierarchical computation result: {}", result);
println!("Time: {:?}", start.elapsed());
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
// Create worker and runtime
let worker = Worker::from_settings()?;
let runtime = worker.runtime().clone();
// Get compute pool
let pool = runtime
.compute_pool()
.ok_or_else(|| anyhow::anyhow!("Compute pool not initialized"))?
.clone();
println!(
"Compute pool initialized with {} threads",
pool.num_threads()
);
// Run examples
example_fork_join(&pool).await?;
example_scope(&pool).await?;
example_parallel_map(&pool).await?;
example_tokenization(&pool).await?;
example_hierarchical(&pool).await?;
// Print metrics
let metrics = pool.metrics();
println!("\n=== Compute Pool Metrics ===");
println!("{}", metrics);
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use dynamo_runtime::compute::{ComputeConfig, ComputePool};
use dynamo_runtime::{compute_large, compute_medium, compute_small};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
println!("Testing compute macros...\n");
// Create compute pool
let compute_config = ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "test".to_string(),
pin_threads: false,
};
let pool = Arc::new(ComputePool::new(compute_config)?);
// Test small macro (direct execution)
println!("Testing compute_small!...");
let result = compute_small!(2 + 2);
println!(" Result: {}", result);
// Test medium macro (block_in_place with fallback)
println!("\nTesting compute_medium!...");
let result = compute_medium!(pool, {
let mut sum = 0u64;
for i in 0..1000 {
sum += i;
}
sum
});
println!(" Result: {}", result);
// Test large macro (always Rayon)
println!("\nTesting compute_large!...");
let result = compute_large!(pool, {
let mut sum = 0u64;
for i in 0..1_000_000 {
sum += i;
}
sum
});
println!(" Result: {}", result);
println!("\n All macros working!");
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Example showing how to integrate ComputePool with tokenization workloads
//!
//! This demonstrates the pattern that could be used in lib/llm/src/preprocessor.rs
//! to leverage the compute pool for batch tokenization operations.
use anyhow::Result;
use dynamo_runtime::{Worker, compute::ComputePool};
use std::sync::{Arc, Mutex};
use std::time::Instant;
/// Mock tokenizer for demonstration
struct MockTokenizer;
impl MockTokenizer {
fn encode(&self, text: &str) -> Vec<u32> {
// Simulate tokenization work
let mut tokens = Vec::new();
for (i, word) in text.split_whitespace().enumerate() {
// Simulate expensive computation
let hash = word
.bytes()
.fold(0u32, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u32));
tokens.push(hash.wrapping_add(i as u32));
}
tokens
}
fn decode(&self, tokens: &[u32]) -> String {
// Simulate detokenization
tokens
.iter()
.map(|t| format!("token_{}", t % 1000))
.collect::<Vec<_>>()
.join(" ")
}
}
/// Pattern 1: Direct replacement for par_iter in preprocessor
///
/// This shows how the existing code in lib/llm/src/preprocessor.rs:330
/// could be enhanced with explicit compute pool control
async fn tokenize_batch_with_pool(
pool: &ComputePool,
tokenizer: Arc<MockTokenizer>,
texts: Vec<String>,
) -> Result<Vec<Vec<u32>>> {
println!(
"\n=== Tokenizing {} texts with compute pool ===",
texts.len()
);
let start = Instant::now();
// Option 1: Using scope for fine control
let token_batches = pool
.execute_scoped(move |scope| {
let results = Arc::new(Mutex::new(vec![Vec::new(); texts.len()]));
for (i, text) in texts.iter().enumerate() {
let tokenizer = tokenizer.clone();
let text = text.clone();
let results = results.clone();
scope.spawn(move |_| {
let tokens = tokenizer.encode(&text);
let mut r = results.lock().unwrap();
r[i] = tokens;
});
}
Arc::try_unwrap(results).unwrap().into_inner().unwrap()
})
.await?;
let total_tokens: usize = token_batches.iter().map(|v| v.len()).sum();
println!(
"Tokenized in {:?}, total tokens: {}",
start.elapsed(),
total_tokens
);
Ok(token_batches)
}
/// Pattern 2: Using rayon's par_iter within the compute pool
///
/// This maintains compatibility with existing code patterns
async fn tokenize_batch_par_iter(
pool: &ComputePool,
tokenizer: Arc<MockTokenizer>,
texts: Vec<String>,
) -> Result<Vec<Vec<u32>>> {
use rayon::prelude::*;
println!("\n=== Tokenizing with par_iter in compute pool ===");
let start = Instant::now();
// This is how the existing preprocessor code could work
let token_batches: Vec<Vec<u32>> = pool
.install(move || {
texts
.par_iter()
.map(|text| tokenizer.encode(text))
.collect()
})
.await?;
let total_tokens: usize = token_batches.iter().map(|v| v.len()).sum();
println!(
"Tokenized in {:?}, total tokens: {}",
start.elapsed(),
total_tokens
);
Ok(token_batches)
}
/// Pattern 3: Mixed async/sync processing
///
/// This shows how to handle a stream of requests where each request
/// contains a batch that needs parallel processing
async fn process_request_stream(pool: &ComputePool, tokenizer: Arc<MockTokenizer>) -> Result<()> {
println!("\n=== Processing request stream ===");
// Simulate incoming requests
let requests = vec![
vec![
"Request 1 text 1".to_string(),
"Request 1 text 2".to_string(),
],
vec![
"Request 2 text 1".to_string(),
"Request 2 text 2".to_string(),
"Request 2 text 3".to_string(),
],
vec!["Request 3 text 1".to_string()],
];
for (i, batch) in requests.into_iter().enumerate() {
println!("Processing request {}", i + 1);
// Each request gets processed in parallel
let tokens = tokenize_batch_with_pool(pool, tokenizer.clone(), batch).await?;
// Simulate async I/O between requests
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
println!(
"Request {} completed with {} token batches",
i + 1,
tokens.len()
);
}
Ok(())
}
/// Pattern 4: Encode/Decode pipeline
///
/// Shows how to chain multiple compute operations
async fn encode_decode_pipeline(
pool: &ComputePool,
tokenizer: Arc<MockTokenizer>,
texts: Vec<String>,
) -> Result<Vec<String>> {
println!("\n=== Encode/Decode Pipeline ===");
let start = Instant::now();
// Step 1: Encode all texts in parallel
let tokenizer_clone = tokenizer.clone();
let encoded = pool
.execute_scoped(move |scope| {
let results = Arc::new(Mutex::new(vec![Vec::new(); texts.len()]));
for (i, text) in texts.iter().enumerate() {
let tokenizer = tokenizer_clone.clone();
let text = text.clone();
let results = results.clone();
scope.spawn(move |_| {
let tokens = tokenizer.encode(&text);
let mut r = results.lock().unwrap();
r[i] = tokens;
});
}
Arc::try_unwrap(results).unwrap().into_inner().unwrap()
})
.await?;
println!("Encoding complete in {:?}", start.elapsed());
// Step 2: Decode all token sequences in parallel
let decoded_start = Instant::now();
let decoded = pool
.execute_scoped(move |scope| {
let results = Arc::new(Mutex::new(vec![String::new(); encoded.len()]));
for (i, tokens) in encoded.iter().enumerate() {
let tokenizer = tokenizer.clone();
let tokens = tokens.clone();
let results = results.clone();
scope.spawn(move |_| {
let text = tokenizer.decode(&tokens);
let mut r = results.lock().unwrap();
r[i] = text;
});
}
Arc::try_unwrap(results).unwrap().into_inner().unwrap()
})
.await?;
println!("Decoding complete in {:?}", decoded_start.elapsed());
println!("Total pipeline time: {:?}", start.elapsed());
Ok(decoded)
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
// Set compute pool configuration via environment
unsafe {
std::env::set_var("DYN_COMPUTE_THREADS", "4");
}
// Create worker and runtime
let worker = Worker::from_settings()?;
let runtime = worker.runtime().clone();
// Get compute pool
let pool = runtime
.compute_pool()
.ok_or_else(|| anyhow::anyhow!("Compute pool not initialized"))?
.clone();
println!(
"Compute pool initialized with {} threads",
pool.num_threads()
);
// Create mock tokenizer
let tokenizer = Arc::new(MockTokenizer);
// Generate test data
let texts: Vec<String> = (0..50)
.map(|i| {
format!(
"This is sample text number {} with some words to tokenize. \
The quick brown fox jumps over the lazy dog.",
i
)
})
.collect();
// Run examples
let _ = tokenize_batch_with_pool(&pool, tokenizer.clone(), texts.clone()).await?;
let _ = tokenize_batch_par_iter(&pool, tokenizer.clone(), texts.clone()).await?;
process_request_stream(&pool, tokenizer.clone()).await?;
let decoded = encode_decode_pipeline(&pool, tokenizer.clone(), texts.clone()).await?;
println!("\n=== Results ===");
println!("Processed {} texts", texts.len());
println!("First decoded text: {}", &decoded[0]);
// Print metrics
let metrics = pool.metrics();
println!("\n=== Compute Pool Metrics ===");
println!("{}", metrics);
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Zero-overhead macros for compute task execution with optional validation
//!
//! These macros provide size-aware execution strategies:
//! - `compute_small!`: Direct inline execution for tasks <100μs
//! - `compute_medium!`: Semaphore-guarded block_in_place for tasks 100μs-1ms
//! - `compute_large!`: Rayon offload for tasks >1ms
//!
//! When the `compute-validation` feature is enabled, these macros will
//! time execution and emit warnings if tasks are misclassified.
/// Execute a small compute task (<100μs) directly inline.
///
/// This macro has zero overhead and simply executes the expression directly.
/// When validation is enabled, it will warn if the task takes >100μs.
///
/// # Example
/// ```
/// # use dynamo_runtime::compute_small;
/// let result = compute_small!(2 + 2);
/// assert_eq!(result, 4);
/// ```
#[macro_export]
macro_rules! compute_small {
($expr:expr) => {{
#[cfg(feature = "compute-validation")]
let _start = std::time::Instant::now();
let result = $expr; // Direct execution, zero overhead
#[cfg(feature = "compute-validation")]
$crate::compute::validation::validate_small(_start.elapsed());
result
}};
}
/// Execute a medium compute task (100μs-1ms) with intelligent scheduling.
///
/// This macro first tries to use thread-local context if available (on Tokio worker threads).
/// If no thread-local context, it requires a pool parameter.
///
/// # Example
/// ```ignore
/// # use dynamo_runtime::{compute_medium, compute::ComputePool};
/// # async fn example(pool: &ComputePool) {
/// // With thread-local context (on worker thread)
/// let result = compute_medium!({
/// (0..1000).map(|i| i * 2).sum::<i32>()
/// }).await;
///
/// // Or with explicit pool (fallback)
/// let result = compute_medium!(pool, {
/// (0..1000).map(|i| i * 2).sum::<i32>()
/// }).await;
/// # }
/// ```
#[macro_export]
macro_rules! compute_medium {
// Thread-local version (no pool parameter)
($expr:expr) => {{
#[cfg(feature = "compute-validation")]
let _start = std::time::Instant::now();
let result = async {
// Try thread-local context first
if let Ok(_permit) = $crate::compute::thread_local::try_acquire_block_permit() {
// Got permit - use block_in_place
Ok(tokio::task::block_in_place(|| {
let r = $expr;
drop(_permit); // Release ASAP
r
}))
} else if let Some(pool) = $crate::compute::thread_local::get_pool() {
// No permit but have pool - offload
pool.execute(|| $expr).await
} else {
// No context available - fall back to inline execution
// This may block the async runtime but ensures the macro always works
tracing::warn!("compute_medium: No thread-local context, executing inline (may block async runtime)");
Ok($expr)
}
}
.await?;
#[cfg(feature = "compute-validation")]
$crate::compute::validation::validate_medium(_start.elapsed());
result
}};
// Explicit pool version (fallback)
($pool:expr, $expr:expr) => {{
#[cfg(feature = "compute-validation")]
let _start = std::time::Instant::now();
let result = async {
// Try thread-local permits first, fall back to pool
if let Ok(_permit) = $crate::compute::thread_local::try_acquire_block_permit() {
// Got permit - use block_in_place
Ok(tokio::task::block_in_place(|| {
let r = $expr;
drop(_permit); // Release ASAP
r
}))
} else {
// No permit available - offload to provided pool
$pool.execute(|| $expr).await
}
}
.await?;
#[cfg(feature = "compute-validation")]
$crate::compute::validation::validate_medium(_start.elapsed());
result
}};
}
/// Execute a large compute task (>1ms) on the Rayon thread pool.
///
/// This macro always offloads to Rayon as the overhead is negligible
/// compared to the computation time.
///
/// # Example
/// ```ignore
/// # use dynamo_runtime::{compute_large, compute::ComputePool};
/// # async fn example(pool: &ComputePool) {
/// // With thread-local context
/// let result = compute_large!({
/// expensive_matrix_multiplication()
/// }).await;
///
/// // Or with explicit pool
/// let result = compute_large!(pool, {
/// expensive_matrix_multiplication()
/// }).await;
/// # }
/// ```
#[macro_export]
macro_rules! compute_large {
// Thread-local version
($expr:expr) => {{
#[cfg(feature = "compute-validation")]
let _start = std::time::Instant::now();
let result = async {
if let Some(pool) = $crate::compute::thread_local::get_pool() {
pool.execute(|| $expr).await
} else {
// No pool available - fall back to inline execution
// Warning: Large tasks inline will severely block the async runtime
tracing::warn!("compute_large: No thread-local context, executing inline (will block async runtime!)");
Ok($expr)
}
}
.await?;
#[cfg(feature = "compute-validation")]
$crate::compute::validation::validate_large(_start.elapsed());
result
}};
// Explicit pool version
($pool:expr, $expr:expr) => {{
#[cfg(feature = "compute-validation")]
let _start = std::time::Instant::now();
let result = $pool.execute(|| $expr).await?;
#[cfg(feature = "compute-validation")]
$crate::compute::validation::validate_large(_start.elapsed());
result
}};
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Thread-local storage for compute resources
//!
//! This module provides thread-local access to compute resources (Rayon pool and semaphore)
//! for Tokio worker threads. This eliminates the need to pass Runtime or ComputePool
//! references through async function calls.
use super::ComputePool;
use std::cell::RefCell;
use std::sync::Arc;
use tokio::sync::Semaphore;
thread_local! {
/// Thread-local compute context available on Tokio worker threads
static COMPUTE_CONTEXT: RefCell<Option<ComputeContext>> = const { RefCell::new(None) };
}
/// Compute resources available to a Tokio worker thread
#[derive(Clone)]
pub struct ComputeContext {
/// The Rayon compute pool
pub pool: Arc<ComputePool>,
/// Semaphore for block_in_place permits
pub block_in_place_permits: Arc<Semaphore>,
}
/// Initialize the thread-local compute context
///
/// This should be called from the Tokio runtime's `on_thread_start` callback
pub fn initialize_context(pool: Arc<ComputePool>, permits: Arc<Semaphore>) {
COMPUTE_CONTEXT.with(|ctx| {
*ctx.borrow_mut() = Some(ComputeContext {
pool,
block_in_place_permits: permits,
});
});
}
/// Access the thread-local compute context
///
/// Returns None if called from a non-worker thread or if context wasn't initialized
pub fn with_context<F, R>(f: F) -> Option<R>
where
F: FnOnce(&ComputeContext) -> R,
{
COMPUTE_CONTEXT.with(|ctx| ctx.borrow().as_ref().map(f))
}
/// Try to acquire a block_in_place permit from thread-local context
///
/// Returns Ok(permit) if successful, Err if no context or no permits available
pub fn try_acquire_block_permit() -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
with_context(|ctx| {
ctx.block_in_place_permits
.clone()
.try_acquire_owned()
.map_err(|_| "No permits available")
})
.ok_or("No compute context on this thread")?
}
/// Get the compute pool from thread-local context
///
/// Returns None if called from a non-worker thread
pub fn get_pool() -> Option<Arc<ComputePool>> {
with_context(|ctx| ctx.pool.clone())
}
/// Check if the current thread has compute context initialized
///
/// Returns true if the thread-local context is initialized with a compute pool
/// and semaphore permits, meaning the compute macros will offload work.
/// Returns false if macros would fall back to inline execution.
pub fn has_compute_context() -> bool {
with_context(|_| ()).is_some()
}
/// Assert that the current thread has compute context initialized
///
/// Panics if the thread-local context is not initialized.
/// Use this to ensure compute macros will offload work rather than run inline.
pub fn assert_compute_context() {
if !has_compute_context() {
panic!(
"Thread-local compute context not initialized! \
Compute macros will fall back to inline execution. \
Call Runtime::initialize_thread_local() on worker threads."
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_uninitialized_context() {
// Should return None when context not initialized
assert!(get_pool().is_none());
assert!(try_acquire_block_permit().is_err());
assert!(!has_compute_context());
}
#[test]
#[should_panic(expected = "Thread-local compute context not initialized")]
fn test_assert_compute_context_panics() {
// Should panic when context not initialized
assert_compute_context();
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment