component.rs 16.9 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5

//! The [Component] module defines the top-level API for building distributed applications.
//!
Graham King's avatar
Graham King committed
6
7
8
//! A distributed application consists of a set of [Component] that can host one
//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
//! that can be accessed by other [Component] in the distributed application.
Ryan Olson's avatar
Ryan Olson committed
9
10
11
12
//!
//! A [Component] is made discoverable by registering it with the distributed runtime under
//! a [`Namespace`].
//!
Graham King's avatar
Graham King committed
13
//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
Ryan Olson's avatar
Ryan Olson committed
14
15
16
17
18
19
20
21
22
23
24
25
//!
//! We might extend namespace to include grouping behavior, which would define groups of
//! components that are tightly coupled.
//!
//! A [Component] is the core building block of a distributed application. It is a logical
//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
//! distributed application.
//!
//! A [Component] can present to the distributed application one or more configuration files
//! which define how that component was constructed/configured and what capabilities it can
//! provide.
//!
Graham King's avatar
Graham King committed
26
//! Other [Component] can write to watching locations within a [Component] etcd
Ryan Olson's avatar
Ryan Olson committed
27
28
29
30
31
//! path. This allows the [Component] to take dynamic actions depending on the watch
//! triggers.
//!
//! TODO: Top-level Overview of Endpoints/Functions

32
33
use std::fmt;

34
use crate::{
35
36
    config::HealthStatus,
    distributed::RequestPlaneMode,
37
    metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38
    service::ServiceClient,
39
    service::ServiceSet,
40
};
Ryan Olson's avatar
Ryan Olson committed
41

42
use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
Ryan Olson's avatar
Ryan Olson committed
43

44
use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45
use crate::protocols::EndpointId;
Ryan Olson's avatar
Ryan Olson committed
46
47
48
49
use async_nats::{
    rustls::quic,
    service::{Service, ServiceExt},
};
50
use dashmap::DashMap;
Ryan Olson's avatar
Ryan Olson committed
51
52
53
54
use derive_builder::Builder;
use derive_getters::Getters;
use educe::Educe;
use serde::{Deserialize, Serialize};
55
use std::{collections::HashMap, hash::Hash, sync::Arc};
Ryan Olson's avatar
Ryan Olson committed
56
57
58
use validator::{Validate, ValidationError};

mod client;
59
60
#[allow(clippy::module_inception)]
mod component;
Ryan Olson's avatar
Ryan Olson committed
61
mod endpoint;
Ryan Olson's avatar
Ryan Olson committed
62
mod namespace;
Ryan Olson's avatar
Ryan Olson committed
63
mod registry;
64
pub mod service;
Ryan Olson's avatar
Ryan Olson committed
65

66
pub use client::Client;
67
68
pub(crate) use client::RoutingOccupancyState;
pub(crate) use client::get_or_create_routing_occupancy_state;
69
pub use endpoint::build_transport_type;
70

71
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
Ryan Olson's avatar
Ryan Olson committed
72
73
#[serde(rename_all = "snake_case")]
pub enum TransportType {
74
75
76
77
    #[serde(rename = "nats_tcp")]
    Nats(String),
    Http(String),
    Tcp(String),
Ryan Olson's avatar
Ryan Olson committed
78
79
}

80
81
82
83
84
85
86
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum DeviceType {
    Cpu,
    Cuda,
}

87
88
#[derive(Default)]
pub struct RegistryInner {
89
    pub(crate) services: HashMap<String, Service>,
90
91
}

Ryan Olson's avatar
Ryan Olson committed
92
93
#[derive(Clone)]
pub struct Registry {
94
    pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
Ryan Olson's avatar
Ryan Olson committed
95
96
}

97
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
98
pub struct Instance {
Ryan Olson's avatar
Ryan Olson committed
99
100
101
    pub component: String,
    pub endpoint: String,
    pub namespace: String,
102
    pub instance_id: u64,
Ryan Olson's avatar
Ryan Olson committed
103
    pub transport: TransportType,
104
105
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub device_type: Option<DeviceType>,
Ryan Olson's avatar
Ryan Olson committed
106
107
}

108
impl Instance {
109
    pub fn id(&self) -> u64 {
110
        self.instance_id
111
    }
112
113
114
115
116
117
118
    pub fn endpoint_id(&self) -> EndpointId {
        EndpointId {
            namespace: self.namespace.clone(),
            component: self.component.clone(),
            name: self.endpoint.clone(),
        }
    }
119
120
}

121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
impl fmt::Display for Instance {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{}/{}/{}/{}",
            self.namespace, self.component, self.endpoint, self.instance_id
        )
    }
}

/// Sort by string name
impl std::cmp::Ord for Instance {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.to_string().cmp(&other.to_string())
    }
}

impl PartialOrd for Instance {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        // Since Ord is fully implemented, the comparison is always total.
        Some(self.cmp(other))
    }
}

Ryan Olson's avatar
Ryan Olson committed
145
/// A [Component] a discoverable entity in the distributed runtime.
Graham King's avatar
Graham King committed
146
147
/// You can host [Endpoint] on a [Component] by first creating
/// a [Service] then adding one or more [Endpoint] to the [Service].
Ryan Olson's avatar
Ryan Olson committed
148
149
///
/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
150
#[derive(Educe, Builder, Clone, Validate)]
Ryan Olson's avatar
Ryan Olson committed
151
#[educe(Debug)]
152
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
Ryan Olson's avatar
Ryan Olson committed
153
154
155
pub struct Component {
    #[builder(private)]
    #[educe(Debug(ignore))]
156
    drt: Arc<DistributedRuntime>,
Ryan Olson's avatar
Ryan Olson committed
157
158
159

    /// Name of the component
    #[builder(setter(into))]
160
    #[validate(custom(function = "validate_allowed_chars"))]
Ryan Olson's avatar
Ryan Olson committed
161
162
    name: String,

163
164
165
166
    /// Additional labels for metrics
    #[builder(default = "Vec::new()")]
    labels: Vec<(String, String)>,

Ryan Olson's avatar
Ryan Olson committed
167
168
169
    // todo - restrict the namespace to a-z0-9-_A-Z
    /// Namespace
    #[builder(setter(into))]
170
    namespace: Namespace,
171

172
173
174
    /// This hierarchy's own metrics registry
    #[builder(default = "crate::MetricsRegistry::new()")]
    metrics_registry: crate::MetricsRegistry,
Ryan Olson's avatar
Ryan Olson committed
175
176
}

177
178
179
180
181
182
183
184
185
impl Hash for Component {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.namespace.name().hash(state);
        self.name.hash(state);
    }
}

impl PartialEq for Component {
    fn eq(&self, other: &Self) -> bool {
186
        self.namespace.name() == other.namespace.name() && self.name == other.name
187
188
189
190
191
    }
}

impl Eq for Component {}

192
193
impl std::fmt::Display for Component {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194
        write!(f, "{}.{}", self.namespace.name(), self.name)
195
196
197
    }
}

198
199
200
201
202
203
204
205
206
207
208
209
impl DistributedRuntimeProvider for Component {
    fn drt(&self) -> &DistributedRuntime {
        &self.drt
    }
}

impl RuntimeProvider for Component {
    fn rt(&self) -> &Runtime {
        self.drt.rt()
    }
}

210
impl MetricsHierarchy for Component {
211
212
213
214
    fn basename(&self) -> String {
        self.name.clone()
    }

215
216
217
218
219
220
221
222
223
224
225
226
227
228
    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
        let mut parents = vec![];

        // Get all ancestors of namespace (DRT, parent namespaces, etc.)
        parents.extend(self.namespace.parent_hierarchies());

        // Add namespace itself
        parents.push(&self.namespace as &dyn MetricsHierarchy);

        parents
    }

    fn get_metrics_registry(&self) -> &MetricsRegistry {
        &self.metrics_registry
229
    }
230
231
232
233

    fn connection_id(&self) -> Option<u64> {
        Some(self.drt.connection_id())
    }
234
235
}

Ryan Olson's avatar
Ryan Olson committed
236
impl Component {
237
    pub fn service_name(&self) -> String {
238
        let service_name = format!("{}_{}", self.namespace.name(), self.name);
239
        Slug::slugify(&service_name).to_string()
Ryan Olson's avatar
Ryan Olson committed
240
241
    }

242
    pub fn namespace(&self) -> &Namespace {
243
244
245
        &self.namespace
    }

246
247
    pub fn name(&self) -> &str {
        &self.name
248
249
    }

250
251
252
253
    pub fn labels(&self) -> &[(String, String)] {
        &self.labels
    }

Ryan Olson's avatar
Ryan Olson committed
254
    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
255
        let endpoint = Endpoint {
Ryan Olson's avatar
Ryan Olson committed
256
257
            component: self.clone(),
            name: endpoint.into(),
258
            labels: Vec::new(),
259
            metrics_registry: crate::MetricsRegistry::new(),
260
261
262
263
264
        };
        // Attach endpoint registry so scrapes traverse separate registries (avoids collisions).
        self.get_metrics_registry()
            .add_child_registry(endpoint.get_metrics_registry());
        endpoint
Ryan Olson's avatar
Ryan Olson committed
265
266
    }

267
    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
268
269
270
271
272
        let discovery = self.drt.discovery();

        let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
            namespace: self.namespace.name(),
            component: self.name.clone(),
273
        };
274
275
276
277
278
279
280
281
282
283
284
285

        let discovery_instances = discovery.list(discovery_query).await?;

        // Extract Instance from DiscoveryInstance::Endpoint wrapper
        let mut instances: Vec<Instance> = discovery_instances
            .into_iter()
            .filter_map(|di| match di {
                crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
                _ => None, // Ignore all other variants (ModelCard, etc.)
            })
            .collect();

286
287
        instances.sort();
        Ok(instances)
Ryan Olson's avatar
Ryan Olson committed
288
289
290
291
    }
}

impl ComponentBuilder {
292
    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
Ryan Olson's avatar
Ryan Olson committed
293
294
        Self::default().drt(drt)
    }
295
296
297

    pub fn build(self) -> Result<Component, anyhow::Error> {
        let component = self.build_internal()?;
298
299
300
        // If this component is using NATS, register the NATS service and wait for completion.
        // This prevents a race condition where serve_endpoint() tries to look up the service
        // before it's registered in the component registry.
301
302
        let drt = component.drt();
        if drt.request_plane().is_nats() {
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
            let mut rx = drt.register_nats_service(component.clone());
            // Wait synchronously for the NATS service registration to complete.
            // Uses block_in_place() to safely call blocking_recv() from async contexts.
            // This temporarily moves the current task off the runtime thread to allow
            // blocking without deadlocking the runtime.
            let result = tokio::task::block_in_place(|| rx.blocking_recv());
            match result {
                Some(Ok(())) => {
                    tracing::debug!(
                        component = component.service_name(),
                        "NATS service registration completed"
                    );
                }
                Some(Err(e)) => {
                    return Err(anyhow::anyhow!(
                        "NATS service registration failed for component '{}': {}",
                        component.service_name(),
                        e
                    ));
                }
                None => {
                    return Err(anyhow::anyhow!(
                        "NATS service registration channel closed unexpectedly for component '{}'",
                        component.service_name()
                    ));
                }
            }
330
331
332
        }
        Ok(component)
    }
Ryan Olson's avatar
Ryan Olson committed
333
334
335
336
337
338
339
340
341
}

#[derive(Debug, Clone)]
pub struct Endpoint {
    component: Component,

    // todo - restrict alphabet
    /// Endpoint name
    name: String,
342

343
344
    /// Additional labels for metrics
    labels: Vec<(String, String)>,
345
346
347

    /// This hierarchy's own metrics registry
    metrics_registry: crate::MetricsRegistry,
Ryan Olson's avatar
Ryan Olson committed
348
349
}

350
351
352
353
354
355
356
357
358
impl Hash for Endpoint {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.component.hash(state);
        self.name.hash(state);
    }
}

impl PartialEq for Endpoint {
    fn eq(&self, other: &Self) -> bool {
359
        self.component == other.component && self.name == other.name
360
361
362
363
364
    }
}

impl Eq for Endpoint {}

365
366
367
368
369
370
371
372
373
374
375
376
impl DistributedRuntimeProvider for Endpoint {
    fn drt(&self) -> &DistributedRuntime {
        self.component.drt()
    }
}

impl RuntimeProvider for Endpoint {
    fn rt(&self) -> &Runtime {
        self.component.rt()
    }
}

377
impl MetricsHierarchy for Endpoint {
378
379
380
381
    fn basename(&self) -> String {
        self.name.clone()
    }

382
383
384
385
386
387
388
389
390
391
392
393
394
395
    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
        let mut parents = vec![];

        // Get all ancestors of component (DRT, Namespace, etc.)
        parents.extend(self.component.parent_hierarchies());

        // Add component itself
        parents.push(&self.component as &dyn MetricsHierarchy);

        parents
    }

    fn get_metrics_registry(&self) -> &MetricsRegistry {
        &self.metrics_registry
396
    }
397
398
399
400

    fn connection_id(&self) -> Option<u64> {
        Some(self.component.drt().connection_id())
    }
401
402
}

Ryan Olson's avatar
Ryan Olson committed
403
impl Endpoint {
404
405
406
407
408
409
410
411
    pub fn id(&self) -> EndpointId {
        EndpointId {
            namespace: self.component.namespace().name().to_string(),
            component: self.component.name().to_string(),
            name: self.name().to_string(),
        }
    }

Ryan Olson's avatar
Ryan Olson committed
412
413
414
415
    pub fn name(&self) -> &str {
        &self.name
    }

416
417
418
419
    pub fn component(&self) -> &Component {
        &self.component
    }

420
    pub async fn client(&self) -> anyhow::Result<client::Client> {
421
        client::Client::new(self.clone()).await
Ryan Olson's avatar
Ryan Olson committed
422
423
424
425
426
427
428
    }

    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
    }
}

429
#[derive(Builder, Clone, Validate)]
Ryan Olson's avatar
Ryan Olson committed
430
431
432
#[builder(pattern = "owned")]
pub struct Namespace {
    #[builder(private)]
433
    runtime: Arc<DistributedRuntime>,
Ryan Olson's avatar
Ryan Olson committed
434

435
    #[validate(custom(function = "validate_allowed_chars"))]
Ryan Olson's avatar
Ryan Olson committed
436
    name: String,
437

438
439
    #[builder(default = "None")]
    parent: Option<Arc<Namespace>>,
440
441
442
443

    /// Additional labels for metrics
    #[builder(default = "Vec::new()")]
    labels: Vec<(String, String)>,
444
445
446
447

    /// This hierarchy's own metrics registry
    #[builder(default = "crate::MetricsRegistry::new()")]
    metrics_registry: crate::MetricsRegistry,
448
449
450
451
452
453
454

    /// Cache for components to avoid duplicate registrations and metrics collisions.
    /// When the same component is requested multiple times, we return the cached instance
    /// to ensure all endpoints share the same Component and MetricsRegistry.
    /// Uses DashMap for lock-free reads and automatic handling of concurrent inserts.
    #[builder(default = "Arc::new(DashMap::new())")]
    component_cache: Arc<DashMap<String, Component>>,
Ryan Olson's avatar
Ryan Olson committed
455
456
}

457
458
459
460
461
462
impl DistributedRuntimeProvider for Namespace {
    fn drt(&self) -> &DistributedRuntime {
        &self.runtime
    }
}

463
464
465
466
impl std::fmt::Debug for Namespace {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
467
468
            "Namespace {{ name: {}; parent: {:?} }}",
            self.name, self.parent
469
470
471
472
        )
    }
}

473
474
475
476
477
478
impl RuntimeProvider for Namespace {
    fn rt(&self) -> &Runtime {
        self.runtime.rt()
    }
}

479
480
481
482
483
484
impl std::fmt::Display for Namespace {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.name)
    }
}

Ryan Olson's avatar
Ryan Olson committed
485
impl Namespace {
486
    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
487
        let ns = NamespaceBuilder::default()
488
            .runtime(Arc::new(runtime))
Ryan Olson's avatar
Ryan Olson committed
489
            .name(name)
490
491
492
493
494
495
            .build()?;
        // Attach namespace registry so scrapes traverse separate registries (avoids collisions).
        ns.drt()
            .get_metrics_registry()
            .add_child_registry(ns.get_metrics_registry());
        Ok(ns)
Ryan Olson's avatar
Ryan Olson committed
496
497
    }

498
    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
499
500
501
502
    ///
    /// Components are cached by name to ensure that multiple calls with the same name
    /// return the same Component instance. This prevents duplicate metrics registrations
    /// and ensures all endpoints share the same Component's MetricsRegistry.
503
    pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
504
505
506
507
508
509
510
511
512
        let name = name.into();

        // Fast path: Check if component exists in cache
        // DashMap provides lock-free reads via internal sharding
        if let Some(cached) = self.component_cache.get(&name) {
            return Ok(cached.value().clone());
        }

        // Slow path: Create new component
513
        let component = ComponentBuilder::from_runtime(self.runtime.clone())
514
            .name(&name)
515
            .namespace(self.clone())
516
            .build()?;
517

518
519
520
        // Attach component registry so scrapes traverse separate registries (avoids collisions).
        self.get_metrics_registry()
            .add_child_registry(component.get_metrics_registry());
521
522
523
524
525
526

        // Cache the component for future calls
        // DashMap handles race conditions internally - if another thread
        // inserted the same key concurrently, we just use our created component
        self.component_cache.insert(name, component.clone());

527
        Ok(component)
Ryan Olson's avatar
Ryan Olson committed
528
    }
Ryan Olson's avatar
Ryan Olson committed
529

530
    /// Create a [`Namespace`] in the parent namespace
531
    pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
532
        let child = NamespaceBuilder::default()
533
534
535
            .runtime(self.runtime.clone())
            .name(name.into())
            .parent(Some(Arc::new(self.clone())))
536
537
538
539
540
            .build()?;
        // Attach child namespace registry so scrapes traverse separate registries (avoids collisions).
        self.get_metrics_registry()
            .add_child_registry(child.get_metrics_registry());
        Ok(child)
541
542
543
544
545
546
547
    }

    pub fn name(&self) -> String {
        match &self.parent {
            Some(parent) => format!("{}.{}", parent.name(), self.name),
            None => self.name.clone(),
        }
Ryan Olson's avatar
Ryan Olson committed
548
    }
Ryan Olson's avatar
Ryan Olson committed
549
550
551
552
553
554
555
556
557
558
559
560
561
}

// Custom validator function
fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
    // Define the allowed character set using a regex
    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();

    if regex.is_match(input) {
        Ok(())
    } else {
        Err(ValidationError::new("invalid_characters"))
    }
}