"tests/models/decoder_only/language/untest_fp8.py" did not exist on "40932d7a05916a725890fd87e277b9204542bfa7"
README.md 6.05 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# Active Message Handling System

This module provides an async future-based active message handling system with proper error handling, response notifications, and channel-based communication.

## Key Features

- **Async Future-Based**: Handlers are `Arc<dyn Future>` that can capture resources and run asynchronously
- **Concurrency Control**: Configurable concurrency limits with semaphore-based throttling
- **Response Notifications**: Optional response notifications with `:ok` or `:err(<message>)` format
- **Channel-Based Communication**: All communication happens through channels for clean separation
- **Error Handling**: Comprehensive error handling with logging and monitoring
- **Resource Capture**: Handlers can capture and share resources safely

## Architecture

```
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  Communication  │───▶│ ActiveMessage    │───▶│   Handler       │
│     Layer       │    │    Manager       │    │   Futures       │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         ▲                        │                       │
         │                        ▼                       ▼
         │              ┌──────────────────┐    ┌─────────────────┐
         └──────────────│   Response       │◀───│   Async Task    │
                        │  Notifications   │    │    Pool         │
                        └──────────────────┘    └─────────────────┘
```

## Usage

### 1. Initialize the System

```rust
use dynamo_llm::block_manager::distributed::worker::*;

// Create a worker and initialize active message manager
let mut worker = KvBlockManagerWorker::new(config)?;
worker.init_active_message_manager(4)?; // 4 concurrent handlers

// Create handlers
let handlers = create_example_handlers();
worker.register_handlers(handlers)?;

// Get communication channels
let message_sender = worker.get_message_sender()?;
let response_receiver = worker.get_response_receiver()?;
```

### 2. Create Custom Handlers

```rust
#[derive(Clone)]
struct MyHandler {
    name: String,
    shared_resource: Arc<Mutex<SomeResource>>,
}

impl MyHandler {
    async fn handle_message(&self, data: Vec<u8>) -> Result<()> {
        // Process the message asynchronously
        let processed_data = self.process_data(data).await?;

        // Update shared resources
        let mut resource = self.shared_resource.lock().await;
        resource.update(processed_data)?;

        Ok(())
    }
}

// Register the handler
let handler = MyHandler::new("my_handler".to_string(), shared_resource);
let mut handlers = HashMap::new();
handlers.insert("my_message_type".to_string(), create_handler!(handler));
```

### 3. Send Messages

```rust
// Message with response notification
let message = IncomingActiveMessage {
    message_type: "my_message_type".to_string(),
    message_data: b"Hello, World!".to_vec(),
    response_notification: Some("request_123".to_string()),
};

message_sender.send(message)?;
```

### 4. Handle Responses

```rust
// Spawn a task to handle responses
tokio::spawn(async move {
    while let Some(response) = response_receiver.recv().await {
        match response.is_success {
            true => {
                info!("✅ Success: {}", response.notification);
                // response.notification = "request_123:ok"
            }
            false => {
                warn!("❌ Error: {}", response.notification);
                // response.notification = "request_123:err(Error message)"
            }
        }
    }
});
```

## Message Flow

1. **Incoming Message**: Communication layer receives bytes and optional response notification prefix
2. **Channel Send**: Message is sent through the channel to the active message manager
3. **Handler Lookup**: Manager finds the appropriate handler for the message type
4. **Future Creation**: Handler factory creates an async future with captured resources
5. **Async Execution**: Future is spawned in a task with concurrency control
6. **Response Generation**: On completion, response notification is generated (if requested)
7. **Response Send**: Response is sent back through the response channel

## Response Notification Format

- **Success**: `{prefix}:ok`
- **Error**: `{prefix}:err({error_message})`

Example:
- Request with notification prefix: `"user_request_456"`
- Success response: `"user_request_456:ok"`
- Error response: `"user_request_456:err(Invalid data format)"`

## Error Handling

The system provides multiple levels of error handling:

1. **Handler Errors**: Caught and converted to error response notifications
2. **Unknown Message Types**: Generate error responses for unregistered message types
3. **Channel Errors**: Logged and handled gracefully
4. **Concurrency Limits**: Managed with semaphores to prevent resource exhaustion

## Testing

Run the comprehensive test suite:

```bash
cargo test test_active_message_flow
cargo test test_resource_capturing_handler
cargo test test_communication_integration
cargo test test_concurrency_performance
```

## Performance Characteristics

- **Concurrency**: Configurable concurrent handler limit
- **Memory**: Efficient channel-based communication with minimal copying
- **Latency**: Low-latency message dispatch with async processing
- **Throughput**: High throughput with proper backpressure handling

## Best Practices

1. **Handler Design**: Keep handlers lightweight and async-friendly
2. **Resource Management**: Use `Arc<Mutex<T>>` for shared resources
3. **Error Handling**: Always handle errors gracefully in handlers
4. **Concurrency**: Set appropriate concurrency limits based on workload
5. **Monitoring**: Use the response notifications for monitoring and debugging