address.rs 12.1 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Internal address builder for constructing WorkerAddress instances.
//!
//! This module provides the builder pattern for creating WorkerAddress instances
//! from transport-specific endpoint data. It is internal to velo-transports.

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;
use velo_common::{WorkerAddress, WorkerAddressError};

/// Builder for constructing WorkerAddress instances.
///
/// This provides a mutable interface for collecting transport endpoints
/// before encoding them into the immutable WorkerAddress format.
#[derive(Debug, Clone, Default)]
pub(crate) struct WorkerAddressBuilder {
    entries: HashMap<String, Bytes>,
}

impl WorkerAddressBuilder {
    /// Create a new empty builder.
    pub fn new() -> Self {
        Self {
            entries: HashMap::new(),
        }
    }

    /// Add a new entry to the map.
    ///
    /// Returns an error if the key already exists.
    pub fn add_entry(
        &mut self,
        key: impl Into<String>,
        value: impl Into<Bytes>,
    ) -> Result<(), WorkerAddressError> {
        let key = key.into();
        if self.entries.contains_key(&key) {
            return Err(WorkerAddressError::KeyExists(key));
        }
        self.entries.insert(key, value.into());
        Ok(())
    }

    /// Check if a key exists in the map.
    #[allow(dead_code)]
    pub fn has_entry(&self, key: &str) -> bool {
        self.entries.contains_key(key)
    }

    /// Get a reference to an entry's value.
    #[allow(dead_code)]
    pub fn get_entry(&self, key: &str) -> Option<&Bytes> {
        self.entries.get(key)
    }

    /// Merge another WorkerAddress into this builder.
    ///
    /// This decodes the other address and attempts to add all its entries to this builder.
    /// If any key from the other address already exists in this builder, returns an error
    /// and leaves the builder unchanged.
    pub fn merge(&mut self, other: &WorkerAddress) -> Result<(), WorkerAddressError> {
        let map = decode_to_map(other.as_bytes())?;

        // First check if any keys would conflict
        for key in map.keys() {
            if self.entries.contains_key(key.as_ref()) {
                return Err(WorkerAddressError::KeyExists(key.to_string()));
            }
        }

        // All keys are unique, now add them
        for (key, value) in map {
            self.entries.insert(key.to_string(), value);
        }

        Ok(())
    }

    /// Build the WorkerAddress from this builder.
    ///
    /// This encodes the map into MessagePack binary format.
    pub fn build(self) -> Result<WorkerAddress, WorkerAddressError> {
        // Convert HashMap<String, Bytes> to HashMap<String, Vec<u8>> for MessagePack
        let serializable: HashMap<String, Vec<u8>> = self
            .entries
            .into_iter()
            .map(|(k, v)| (k, v.to_vec()))
            .collect();

        // Encode to MessagePack
        let encoded = rmp_serde::to_vec(&serializable)?;

        Ok(WorkerAddress::from_encoded(encoded))
    }
}

/// Decode WorkerAddress bytes from MessagePack into a map.
fn decode_to_map(bytes: &[u8]) -> Result<HashMap<Arc<str>, Bytes>, WorkerAddressError> {
    if bytes.is_empty() {
        return Err(WorkerAddressError::InvalidFormat("Empty bytes".to_string()));
    }

    // Decode MessagePack
    let decoded: HashMap<String, Vec<u8>> = rmp_serde::from_slice(bytes)?;

    // Convert to HashMap<Arc<str>, Bytes>
    Ok(decoded
        .into_iter()
        .map(|(k, v)| (Arc::from(k.as_str()), Bytes::from(v)))
        .collect())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_builder_basic() {
        let mut builder = WorkerAddressBuilder::new();

        builder
            .add_entry("endpoint", Bytes::from_static(b"tcp://127.0.0.1:5555"))
            .unwrap();
        builder
            .add_entry("protocol", Bytes::from_static(b"tcp"))
            .unwrap();

        assert!(builder.has_entry("endpoint"));
        assert!(builder.has_entry("protocol"));
        assert!(!builder.has_entry("nonexistent"));

        let address = builder.build().unwrap();
        assert!(!address.as_bytes().is_empty());

        // Verify we can read the entries back
        let entry = address.get_entry("endpoint").unwrap();
        assert_eq!(entry, Some(Bytes::from_static(b"tcp://127.0.0.1:5555")));
    }

    #[test]
    fn test_builder_add_duplicate_key() {
        let mut builder = WorkerAddressBuilder::new();

        builder
            .add_entry("key", Bytes::from_static(b"value1"))
            .unwrap();

        let result = builder.add_entry("key", Bytes::from_static(b"value2"));
        assert!(matches!(result, Err(WorkerAddressError::KeyExists(_))));
    }

    #[test]
    fn test_builder_merge() {
        // Build first address
        let mut builder1 = WorkerAddressBuilder::new();
        builder1
            .add_entry("tcp", Bytes::from_static(b"tcp://127.0.0.1:5555"))
            .unwrap();
        let address1 = builder1.build().unwrap();

        // Build second address
        let mut builder2 = WorkerAddressBuilder::new();
        builder2
            .add_entry("rdma", Bytes::from_static(b"rdma://10.0.0.1:6666"))
            .unwrap();
        let address2 = builder2.build().unwrap();

        // Merge both into a new builder
        let mut builder3 = WorkerAddressBuilder::new();
        builder3.merge(&address1).unwrap();
        builder3.merge(&address2).unwrap();

        let final_address = builder3.build().unwrap();

        // Verify both entries are present
        assert_eq!(
            final_address.get_entry("tcp").unwrap(),
            Some(Bytes::from_static(b"tcp://127.0.0.1:5555"))
        );
        assert_eq!(
            final_address.get_entry("rdma").unwrap(),
            Some(Bytes::from_static(b"rdma://10.0.0.1:6666"))
        );
    }

    #[test]
    fn test_builder_merge_with_conflict() {
        let mut builder1 = WorkerAddressBuilder::new();
        builder1
            .add_entry("tcp", Bytes::from_static(b"tcp://127.0.0.1:5555"))
            .unwrap();
        let address1 = builder1.build().unwrap();

        let mut builder2 = WorkerAddressBuilder::new();
        builder2
            .add_entry("tcp", Bytes::from_static(b"tcp://different:5555"))
            .unwrap();
        let address2 = builder2.build().unwrap();

        // Merge first address
        let mut builder3 = WorkerAddressBuilder::new();
        builder3.merge(&address1).unwrap();

        // Try to merge conflicting address - should fail
        let result = builder3.merge(&address2);
        assert!(matches!(result, Err(WorkerAddressError::KeyExists(_))));

        // Builder should be unchanged
        assert!(builder3.has_entry("tcp"));
        assert_eq!(
            builder3.get_entry("tcp").unwrap(),
            &Bytes::from_static(b"tcp://127.0.0.1:5555")
        );
    }

    #[test]
    fn test_empty_builder() {
        let builder = WorkerAddressBuilder::new();
        let address = builder.build().unwrap();

        // Empty address should still be valid
        let transports = address.available_transports().unwrap();
        assert_eq!(transports.len(), 0);
    }

    // ========================================================================
    // Integration tests: Verify WorkerAddressBuilder (velo-transports) produces
    // addresses that WorkerAddress (velo-common) can correctly decode.
    // These tests ensure the two crates stay in sync on the wire format.
    // ========================================================================

    #[test]
    fn test_builder_address_integration_get_entry() {
        // Build an address with multiple entries
        let mut builder = WorkerAddressBuilder::new();
        builder
            .add_entry("tcp", Bytes::from_static(b"tcp://127.0.0.1:5555"))
            .unwrap();
        builder
            .add_entry("rdma", Bytes::from_static(b"rdma://10.0.0.1:6666"))
            .unwrap();
        builder
            .add_entry("binary_data", Bytes::from_static(&[0x00, 0x01, 0x02, 0xFF]))
            .unwrap();
        let address = builder.build().unwrap();

        // Verify WorkerAddress::get_entry() correctly decodes each entry
        assert_eq!(
            address.get_entry("tcp").unwrap(),
            Some(Bytes::from_static(b"tcp://127.0.0.1:5555"))
        );
        assert_eq!(
            address.get_entry("rdma").unwrap(),
            Some(Bytes::from_static(b"rdma://10.0.0.1:6666"))
        );
        assert_eq!(
            address.get_entry("binary_data").unwrap(),
            Some(Bytes::from_static(&[0x00, 0x01, 0x02, 0xFF]))
        );
        assert_eq!(address.get_entry("nonexistent").unwrap(), None);
    }

    #[test]
    fn test_builder_address_integration_available_transports() {
        let mut builder = WorkerAddressBuilder::new();
        builder
            .add_entry("tcp", Bytes::from_static(b"tcp://127.0.0.1:5555"))
            .unwrap();
        builder
            .add_entry("rdma", Bytes::from_static(b"rdma://10.0.0.1:6666"))
            .unwrap();
        builder
            .add_entry("grpc", Bytes::from_static(b"grpc://localhost:9000"))
            .unwrap();
        let address = builder.build().unwrap();

        // Verify WorkerAddress::available_transports() returns all keys
        let transports = address.available_transports().unwrap();
        assert_eq!(transports.len(), 3);
        assert!(transports.contains(&velo_common::TransportKey::from("tcp")));
        assert!(transports.contains(&velo_common::TransportKey::from("rdma")));
        assert!(transports.contains(&velo_common::TransportKey::from("grpc")));
    }

    #[test]
    fn test_builder_address_integration_checksum_stability() {
        // Build same address twice - checksums should match
        let mut builder1 = WorkerAddressBuilder::new();
        builder1
            .add_entry("key", Bytes::from_static(b"value"))
            .unwrap();
        let address1 = builder1.build().unwrap();

        let mut builder2 = WorkerAddressBuilder::new();
        builder2
            .add_entry("key", Bytes::from_static(b"value"))
            .unwrap();
        let address2 = builder2.build().unwrap();

        // Same content should produce same checksum
        assert_eq!(address1.checksum(), address2.checksum());

        // Different content should produce different checksum
        let mut builder3 = WorkerAddressBuilder::new();
        builder3
            .add_entry("key", Bytes::from_static(b"different"))
            .unwrap();
        let address3 = builder3.build().unwrap();
        assert_ne!(address1.checksum(), address3.checksum());
    }

    #[test]
    fn test_builder_address_integration_bytes_roundtrip() {
        // Build an address
        let mut builder = WorkerAddressBuilder::new();
        builder
            .add_entry("endpoint", Bytes::from_static(b"test://value"))
            .unwrap();
        let address = builder.build().unwrap();

        // Get raw bytes and create new address via from_encoded
        let raw_bytes = address.to_bytes();
        let address2 = WorkerAddress::from_encoded(raw_bytes);

        // Both should be equal and decode the same
        assert_eq!(address, address2);
        assert_eq!(address.checksum(), address2.checksum());
        assert_eq!(
            address.get_entry("endpoint").unwrap(),
            address2.get_entry("endpoint").unwrap()
        );
    }

    #[test]
    fn test_builder_address_integration_serde_roundtrip() {
        // Build an address
        let mut builder = WorkerAddressBuilder::new();
        builder
            .add_entry("tcp", Bytes::from_static(b"tcp://127.0.0.1:5555"))
            .unwrap();
        let address = builder.build().unwrap();

        // Serialize to JSON and back
        let json = serde_json::to_string(&address).unwrap();
        let deserialized: WorkerAddress = serde_json::from_str(&json).unwrap();

        // Should be equal and decode correctly
        assert_eq!(address, deserialized);
        assert_eq!(
            address.get_entry("tcp").unwrap(),
            deserialized.get_entry("tcp").unwrap()
        );
    }
}