component.rs 21.6 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5

//! The [Component] module defines the top-level API for building distributed applications.
//!
Graham King's avatar
Graham King committed
6
7
8
//! A distributed application consists of a set of [Component] that can host one
//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
//! that can be accessed by other [Component] in the distributed application.
Ryan Olson's avatar
Ryan Olson committed
9
10
11
12
//!
//! A [Component] is made discoverable by registering it with the distributed runtime under
//! a [`Namespace`].
//!
Graham King's avatar
Graham King committed
13
//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
Ryan Olson's avatar
Ryan Olson committed
14
15
16
17
18
19
20
21
22
23
24
25
//!
//! We might extend namespace to include grouping behavior, which would define groups of
//! components that are tightly coupled.
//!
//! A [Component] is the core building block of a distributed application. It is a logical
//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
//! distributed application.
//!
//! A [Component] can present to the distributed application one or more configuration files
//! which define how that component was constructed/configured and what capabilities it can
//! provide.
//!
Graham King's avatar
Graham King committed
26
//! Other [Component] can write to watching locations within a [Component] etcd
Ryan Olson's avatar
Ryan Olson committed
27
28
29
30
31
//! path. This allows the [Component] to take dynamic actions depending on the watch
//! triggers.
//!
//! TODO: Top-level Overview of Endpoints/Functions

32
33
use std::fmt;

34
use crate::{
35
36
    config::HealthStatus,
    distributed::RequestPlaneMode,
37
    metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38
    service::ServiceClient,
39
    service::ServiceSet,
40
    transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
41
};
Ryan Olson's avatar
Ryan Olson committed
42

Ryan Olson's avatar
Ryan Olson committed
43
use super::{
44
    DistributedRuntime, Runtime,
45
46
47
48
    traits::*,
    transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
    transports::nats::Slug,
    utils::Duration,
Ryan Olson's avatar
Ryan Olson committed
49
};
Ryan Olson's avatar
Ryan Olson committed
50

51
use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
52
use crate::protocols::EndpointId;
53
use crate::service::ComponentNatsServerPrometheusMetrics;
Ryan Olson's avatar
Ryan Olson committed
54
55
56
57
58
59
60
61
use async_nats::{
    rustls::quic,
    service::{Service, ServiceExt},
};
use derive_builder::Builder;
use derive_getters::Getters;
use educe::Educe;
use serde::{Deserialize, Serialize};
62
use service::EndpointStatsHandler;
63
use std::{collections::HashMap, hash::Hash, sync::Arc};
Ryan Olson's avatar
Ryan Olson committed
64
65
66
use validator::{Validate, ValidationError};

mod client;
67
68
#[allow(clippy::module_inception)]
mod component;
Ryan Olson's avatar
Ryan Olson committed
69
mod endpoint;
Ryan Olson's avatar
Ryan Olson committed
70
mod namespace;
Ryan Olson's avatar
Ryan Olson committed
71
mod registry;
72
pub mod service;
Ryan Olson's avatar
Ryan Olson committed
73

74
pub use client::Client;
75

76
/// The root key-value path where each instance registers itself in.
77
/// An instance is namespace+component+endpoint+lease_id and must be unique.
78
pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
79

80
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
Ryan Olson's avatar
Ryan Olson committed
81
82
#[serde(rename_all = "snake_case")]
pub enum TransportType {
83
84
85
86
    #[serde(rename = "nats_tcp")]
    Nats(String),
    Http(String),
    Tcp(String),
Ryan Olson's avatar
Ryan Olson committed
87
88
}

89
90
#[derive(Default)]
pub struct RegistryInner {
91
92
93
    pub(crate) services: HashMap<String, Service>,
    pub(crate) stats_handlers:
        HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
94
95
}

Ryan Olson's avatar
Ryan Olson committed
96
97
#[derive(Clone)]
pub struct Registry {
98
    pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
Ryan Olson's avatar
Ryan Olson committed
99
100
}

101
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
102
pub struct Instance {
Ryan Olson's avatar
Ryan Olson committed
103
104
105
    pub component: String,
    pub endpoint: String,
    pub namespace: String,
106
    pub instance_id: u64,
Ryan Olson's avatar
Ryan Olson committed
107
108
109
    pub transport: TransportType,
}

110
impl Instance {
111
    pub fn id(&self) -> u64 {
112
        self.instance_id
113
    }
114
115
116
117
118
119
120
    pub fn endpoint_id(&self) -> EndpointId {
        EndpointId {
            namespace: self.namespace.clone(),
            component: self.component.clone(),
            name: self.endpoint.clone(),
        }
    }
121
122
}

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
impl fmt::Display for Instance {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{}/{}/{}/{}",
            self.namespace, self.component, self.endpoint, self.instance_id
        )
    }
}

/// Sort by string name
impl std::cmp::Ord for Instance {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.to_string().cmp(&other.to_string())
    }
}

impl PartialOrd for Instance {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        // Since Ord is fully implemented, the comparison is always total.
        Some(self.cmp(other))
    }
}

Ryan Olson's avatar
Ryan Olson committed
147
/// A [Component] a discoverable entity in the distributed runtime.
Graham King's avatar
Graham King committed
148
149
/// You can host [Endpoint] on a [Component] by first creating
/// a [Service] then adding one or more [Endpoint] to the [Service].
Ryan Olson's avatar
Ryan Olson committed
150
151
///
/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
152
#[derive(Educe, Builder, Clone, Validate)]
Ryan Olson's avatar
Ryan Olson committed
153
#[educe(Debug)]
154
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
Ryan Olson's avatar
Ryan Olson committed
155
156
157
pub struct Component {
    #[builder(private)]
    #[educe(Debug(ignore))]
158
    drt: Arc<DistributedRuntime>,
Ryan Olson's avatar
Ryan Olson committed
159
160
161

    /// Name of the component
    #[builder(setter(into))]
162
    #[validate(custom(function = "validate_allowed_chars"))]
Ryan Olson's avatar
Ryan Olson committed
163
164
    name: String,

165
166
167
168
    /// Additional labels for metrics
    #[builder(default = "Vec::new()")]
    labels: Vec<(String, String)>,

Ryan Olson's avatar
Ryan Olson committed
169
170
171
    // todo - restrict the namespace to a-z0-9-_A-Z
    /// Namespace
    #[builder(setter(into))]
172
    namespace: Namespace,
173

174
175
176
    /// This hierarchy's own metrics registry
    #[builder(default = "crate::MetricsRegistry::new()")]
    metrics_registry: crate::MetricsRegistry,
Ryan Olson's avatar
Ryan Olson committed
177
178
}

179
180
181
182
183
184
185
186
187
impl Hash for Component {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.namespace.name().hash(state);
        self.name.hash(state);
    }
}

impl PartialEq for Component {
    fn eq(&self, other: &Self) -> bool {
188
        self.namespace.name() == other.namespace.name() && self.name == other.name
189
190
191
192
193
    }
}

impl Eq for Component {}

194
195
impl std::fmt::Display for Component {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196
        write!(f, "{}.{}", self.namespace.name(), self.name)
197
198
199
    }
}

200
201
202
203
204
205
206
207
208
209
210
211
impl DistributedRuntimeProvider for Component {
    fn drt(&self) -> &DistributedRuntime {
        &self.drt
    }
}

impl RuntimeProvider for Component {
    fn rt(&self) -> &Runtime {
        self.drt.rt()
    }
}

212
impl MetricsHierarchy for Component {
213
214
215
216
    fn basename(&self) -> String {
        self.name.clone()
    }

217
218
219
220
221
222
223
224
225
226
227
228
229
230
    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
        let mut parents = vec![];

        // Get all ancestors of namespace (DRT, parent namespaces, etc.)
        parents.extend(self.namespace.parent_hierarchies());

        // Add namespace itself
        parents.push(&self.namespace as &dyn MetricsHierarchy);

        parents
    }

    fn get_metrics_registry(&self) -> &MetricsRegistry {
        &self.metrics_registry
231
232
233
    }
}

Ryan Olson's avatar
Ryan Olson committed
234
impl Component {
235
236
    /// The component part of an instance path in key-value store.
    pub fn instance_root(&self) -> String {
237
238
239
        let ns = self.namespace.name();
        let cp = &self.name;
        format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
Ryan Olson's avatar
Ryan Olson committed
240
241
    }

242
    pub fn service_name(&self) -> String {
243
        let service_name = format!("{}_{}", self.namespace.name(), self.name);
244
        Slug::slugify(&service_name).to_string()
Ryan Olson's avatar
Ryan Olson committed
245
246
    }

247
    pub fn path(&self) -> String {
248
        format!("{}/{}", self.namespace.name(), self.name)
249
250
    }

251
252
253
254
255
    pub fn etcd_path(&self) -> EtcdPath {
        EtcdPath::new_component(&self.namespace.name(), &self.name)
            .expect("Component name and namespace should be valid")
    }

256
    pub fn namespace(&self) -> &Namespace {
257
258
259
        &self.namespace
    }

260
261
    pub fn name(&self) -> &str {
        &self.name
262
263
    }

264
265
266
267
    pub fn labels(&self) -> &[(String, String)] {
        &self.labels
    }

Ryan Olson's avatar
Ryan Olson committed
268
269
270
271
    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
        Endpoint {
            component: self.clone(),
            name: endpoint.into(),
272
            labels: Vec::new(),
273
            metrics_registry: crate::MetricsRegistry::new(),
Ryan Olson's avatar
Ryan Olson committed
274
275
276
        }
    }

277
    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
278
279
280
281
282
        let discovery = self.drt.discovery();

        let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
            namespace: self.namespace.name(),
            component: self.name.clone(),
283
        };
284
285
286
287
288
289
290
291
292
293
294
295

        let discovery_instances = discovery.list(discovery_query).await?;

        // Extract Instance from DiscoveryInstance::Endpoint wrapper
        let mut instances: Vec<Instance> = discovery_instances
            .into_iter()
            .filter_map(|di| match di {
                crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
                _ => None, // Ignore all other variants (ModelCard, etc.)
            })
            .collect();

296
297
        instances.sort();
        Ok(instances)
Ryan Olson's avatar
Ryan Olson committed
298
299
    }

300
301
    /// Scrape ServiceSet, which contains NATS stats as well as user defined stats
    /// embedded in data field of ServiceInfo.
302
    async fn scrape_stats(&self, timeout: Duration) -> anyhow::Result<ServiceSet> {
303
        // Debug: scraping stats for component
Ryan Olson's avatar
Ryan Olson committed
304
        let service_name = self.service_name();
305
306
307
308
309
        let Some(service_client) = self
            .drt()
            .nats_client()
            .map(|nc| ServiceClient::new(nc.clone()))
        else {
310
311
            anyhow::bail!("ServiceSet is gathered via NATS, do not call this in non-NATS setups.");
        };
Ryan Olson's avatar
Ryan Olson committed
312
        service_client
313
            .collect_services(&service_name, timeout)
Ryan Olson's avatar
Ryan Olson committed
314
315
316
            .await
    }

317
    /// Add Prometheus metrics for this component's NATS service stats.
318
    ///
319
    /// Starts a background task that periodically requests service statistics from NATS
320
321
322
323
    /// and updates the corresponding Prometheus metrics. The first scrape happens immediately,
    /// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
    /// which should be near or smaller than typical Prometheus scraping intervals to ensure
    /// metrics are fresh when Prometheus collects them.
324
    fn start_scraping_nats_service_component_metrics(&self) -> anyhow::Result<()> {
325
        const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); // Should be <= Prometheus scrape interval
326

327
328
        // If there is another component with the same service name, this will fail.
        let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
329
330
331

        let component_clone = self.clone();

332
333
334
335
        // Start a background task that scrapes stats every 5 seconds
        let m = component_metrics.clone();
        let c = component_clone.clone();

336
337
338
339
340
341
342
343
344
        // Use the DRT's runtime handle to spawn the background task.
        // We cannot use regular `tokio::spawn` here because:
        // 1. This method may be called from contexts without an active Tokio runtime
        //    (e.g., tests that create a DRT in a blocking context)
        // 2. Tests often create a temporary runtime just to build the DRT, then drop it
        // 3. `tokio::spawn` requires being called from within a runtime context
        // By using the DRT's own runtime handle, we ensure the task runs in the
        // correct runtime that will persist for the lifetime of the component.
        c.drt().runtime().secondary().spawn(async move {
345
346
            let timeout = std::time::Duration::from_millis(500);
            let mut interval = tokio::time::interval(MAX_WAIT_MS);
347
348
349
350
351
352
353
354
355
356
357
358
359
360
            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

            loop {
                match c.scrape_stats(timeout).await {
                    Ok(service_set) => {
                        m.update_from_service_set(&service_set);
                    }
                    Err(err) => {
                        tracing::error!(
                            "Background scrape failed for {}: {}",
                            c.service_name(),
                            err
                        );
                        m.reset_to_zeros();
361
362
                    }
                }
363

364
365
                interval.tick().await;
            }
366
367
368
369
370
        });

        Ok(())
    }

371
    // Gather NATS metrics
372
    async fn add_stats_service(&mut self) -> anyhow::Result<()> {
373
374
375
376
377
        let service_name = self.service_name();

        // Pre-check to save cost of creating the service, but don't hold the lock
        if self
            .drt
378
            .component_registry()
379
380
381
382
383
384
            .inner
            .lock()
            .await
            .services
            .contains_key(&service_name)
        {
385
386
387
388
            // The NATS service is per component, but it is called from `serve_endpoint`, and there
            // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
            tracing::trace!("Service {service_name} already exists");
            return Ok(());
389
390
391
392
393
394
395
396
397
        }

        let Some(nats_client) = self.drt.nats_client() else {
            anyhow::bail!("Cannot create NATS service without NATS.");
        };
        let description = None;
        let (nats_service, stats_reg) =
            service::build_nats_service(nats_client, self, description).await?;

398
        let mut guard = self.drt.component_registry().inner.lock().await;
399
400
401
402
        if !guard.services.contains_key(&service_name) {
            // Normal case
            guard.services.insert(service_name.clone(), nats_service);
            guard.stats_handlers.insert(service_name.clone(), stats_reg);
403
404
405

            tracing::info!("Added NATS / stats service {service_name}");

406
407
408
409
            drop(guard);
        } else {
            drop(guard);
            let _ = nats_service.stop().await;
410
411
412
            // The NATS service is per component, but it is called from `serve_endpoint`, and there
            // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
            return Ok(());
413
414
        }

415
416
417
418
419
420
        if let Err(err) = self.start_scraping_nats_service_component_metrics() {
            tracing::debug!(
                "Metrics registration failed for '{}': {}",
                self.service_name(),
                err
            );
421
422
        }
        Ok(())
Ryan Olson's avatar
Ryan Olson committed
423
424
425
426
    }
}

impl ComponentBuilder {
427
    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
Ryan Olson's avatar
Ryan Olson committed
428
429
        Self::default().drt(drt)
    }
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444

    pub fn build(self) -> Result<Component, anyhow::Error> {
        let component = self.build_internal()?;
        // If this component is using NATS, gather it's metrics
        if component.drt().request_plane().is_nats() {
            let mut c = component.clone();
            // Start in the background to isolate the async, and because we don't need it yet
            component.drt().runtime().secondary().spawn(async move {
                if let Err(err) = c.add_stats_service().await {
                    tracing::error!(error = %err, component = c.service_name(), "Failed starting stats service");
                }
            });
        }
        Ok(component)
    }
Ryan Olson's avatar
Ryan Olson committed
445
446
447
448
449
450
451
452
453
}

#[derive(Debug, Clone)]
pub struct Endpoint {
    component: Component,

    // todo - restrict alphabet
    /// Endpoint name
    name: String,
454

455
456
    /// Additional labels for metrics
    labels: Vec<(String, String)>,
457
458
459

    /// This hierarchy's own metrics registry
    metrics_registry: crate::MetricsRegistry,
Ryan Olson's avatar
Ryan Olson committed
460
461
}

462
463
464
465
466
467
468
469
470
impl Hash for Endpoint {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.component.hash(state);
        self.name.hash(state);
    }
}

impl PartialEq for Endpoint {
    fn eq(&self, other: &Self) -> bool {
471
        self.component == other.component && self.name == other.name
472
473
474
475
476
    }
}

impl Eq for Endpoint {}

477
478
479
480
481
482
483
484
485
486
487
488
impl DistributedRuntimeProvider for Endpoint {
    fn drt(&self) -> &DistributedRuntime {
        self.component.drt()
    }
}

impl RuntimeProvider for Endpoint {
    fn rt(&self) -> &Runtime {
        self.component.rt()
    }
}

489
impl MetricsHierarchy for Endpoint {
490
491
492
493
    fn basename(&self) -> String {
        self.name.clone()
    }

494
495
496
497
498
499
500
501
502
503
504
505
506
507
    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
        let mut parents = vec![];

        // Get all ancestors of component (DRT, Namespace, etc.)
        parents.extend(self.component.parent_hierarchies());

        // Add component itself
        parents.push(&self.component as &dyn MetricsHierarchy);

        parents
    }

    fn get_metrics_registry(&self) -> &MetricsRegistry {
        &self.metrics_registry
508
509
510
    }
}

Ryan Olson's avatar
Ryan Olson committed
511
impl Endpoint {
512
513
514
515
516
517
518
519
    pub fn id(&self) -> EndpointId {
        EndpointId {
            namespace: self.component.namespace().name().to_string(),
            component: self.component.name().to_string(),
            name: self.name().to_string(),
        }
    }

Ryan Olson's avatar
Ryan Olson committed
520
521
522
523
    pub fn name(&self) -> &str {
        &self.name
    }

524
525
526
527
    pub fn component(&self) -> &Component {
        &self.component
    }

528
    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
529
    pub fn path(&self) -> String {
530
531
532
533
534
535
        format!(
            "{}/{}/{}",
            self.component.path(),
            ENDPOINT_KEYWORD,
            self.name
        )
536
537
    }

538
539
    /// The endpoint part of an instance path in etcd
    pub fn etcd_root(&self) -> String {
540
        let component_path = self.component.instance_root();
541
542
        let endpoint_name = &self.name;
        format!("{component_path}/{endpoint_name}")
Ryan Olson's avatar
Ryan Olson committed
543
544
    }

545
546
547
548
    /// The endpoint as an EtcdPath object
    pub fn etcd_path(&self) -> EtcdPath {
        EtcdPath::new_endpoint(
            &self.component.namespace().name(),
549
            self.component.name(),
550
551
552
553
554
            &self.name,
        )
        .expect("Endpoint name and component name should be valid")
    }

555
    /// The fully path of an instance in etcd
556
    pub fn etcd_path_with_lease_id(&self, lease_id: u64) -> String {
557
558
559
560
        format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
    }

    /// Full path of this endpoint with forward slash separators, including lease id
561
    pub fn unique_path(&self, lease_id: u64) -> String {
562
563
564
565
        let ns = self.component.namespace().name();
        let cp = self.component.name();
        let ep = self.name();
        format!("{ns}/{cp}/{ep}/{lease_id:x}")
Ryan Olson's avatar
Ryan Olson committed
566
567
    }

568
569
570
571
572
573
574
575
576
    /// The endpoint as an EtcdPath object with instance ID
    pub fn etcd_path_object_with_lease_id(&self, instance_id: i64) -> EtcdPath {
        EtcdPath::new_endpoint_with_lease(
            &self.component.namespace().name(),
            self.component.name(),
            &self.name,
            instance_id,
        )
        .expect("Endpoint name and component name should be valid")
577
578
    }

579
580
    pub fn name_with_id(&self, instance_id: u64) -> String {
        format!("{}-{:x}", self.name, instance_id)
Ryan Olson's avatar
Ryan Olson committed
581
582
    }

Ryan Olson's avatar
Ryan Olson committed
583
584
585
586
587
    pub fn subject(&self) -> String {
        format!("{}.{}", self.component.service_name(), self.name)
    }

    /// Subject to an instance of the [Endpoint] with a specific lease id
588
    pub fn subject_to(&self, lease_id: u64) -> String {
Ryan Olson's avatar
Ryan Olson committed
589
590
591
592
593
        format!(
            "{}.{}",
            self.component.service_name(),
            self.name_with_id(lease_id)
        )
Ryan Olson's avatar
Ryan Olson committed
594
595
    }

596
    pub async fn client(&self) -> anyhow::Result<client::Client> {
597
        client::Client::new(self.clone()).await
Ryan Olson's avatar
Ryan Olson committed
598
599
600
601
602
603
604
    }

    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
    }
}

605
#[derive(Builder, Clone, Validate)]
Ryan Olson's avatar
Ryan Olson committed
606
607
608
#[builder(pattern = "owned")]
pub struct Namespace {
    #[builder(private)]
609
    runtime: Arc<DistributedRuntime>,
Ryan Olson's avatar
Ryan Olson committed
610

611
    #[validate(custom(function = "validate_allowed_chars"))]
Ryan Olson's avatar
Ryan Olson committed
612
    name: String,
613

614
615
    #[builder(default = "None")]
    parent: Option<Arc<Namespace>>,
616
617
618
619

    /// Additional labels for metrics
    #[builder(default = "Vec::new()")]
    labels: Vec<(String, String)>,
620
621
622
623

    /// This hierarchy's own metrics registry
    #[builder(default = "crate::MetricsRegistry::new()")]
    metrics_registry: crate::MetricsRegistry,
Ryan Olson's avatar
Ryan Olson committed
624
625
}

626
627
628
629
630
631
impl DistributedRuntimeProvider for Namespace {
    fn drt(&self) -> &DistributedRuntime {
        &self.runtime
    }
}

632
633
634
635
impl std::fmt::Debug for Namespace {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
636
637
            "Namespace {{ name: {}; parent: {:?} }}",
            self.name, self.parent
638
639
640
641
        )
    }
}

642
643
644
645
646
647
impl RuntimeProvider for Namespace {
    fn rt(&self) -> &Runtime {
        self.runtime.rt()
    }
}

648
649
650
651
652
653
impl std::fmt::Display for Namespace {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.name)
    }
}

Ryan Olson's avatar
Ryan Olson committed
654
impl Namespace {
655
    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
Ryan Olson's avatar
Ryan Olson committed
656
        Ok(NamespaceBuilder::default()
657
            .runtime(Arc::new(runtime))
Ryan Olson's avatar
Ryan Olson committed
658
659
660
661
            .name(name)
            .build()?)
    }

662
    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
663
    pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
664
        ComponentBuilder::from_runtime(self.runtime.clone())
Ryan Olson's avatar
Ryan Olson committed
665
            .name(name)
666
            .namespace(self.clone())
667
            .build()
Ryan Olson's avatar
Ryan Olson committed
668
    }
Ryan Olson's avatar
Ryan Olson committed
669

670
    /// Create a [`Namespace`] in the parent namespace
671
    pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
672
673
674
675
676
677
678
679
        Ok(NamespaceBuilder::default()
            .runtime(self.runtime.clone())
            .name(name.into())
            .parent(Some(Arc::new(self.clone())))
            .build()?)
    }

    pub fn etcd_path(&self) -> String {
680
        format!("{ETCD_ROOT_PATH}{}", self.name())
681
682
683
684
685
686
687
    }

    pub fn name(&self) -> String {
        match &self.parent {
            Some(parent) => format!("{}.{}", parent.name(), self.name),
            None => self.name.clone(),
        }
Ryan Olson's avatar
Ryan Olson committed
688
    }
Ryan Olson's avatar
Ryan Olson committed
689
690
691
692
693
694
695
696
697
698
699
700
701
}

// Custom validator function
fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
    // Define the allowed character set using a regex
    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();

    if regex.is_match(input) {
        Ok(())
    } else {
        Err(ValidationError::new("invalid_characters"))
    }
}