Unverified Commit 08355da6 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

refactor: move kv store to runtime (#1459)

parent e924a7c7
......@@ -4,11 +4,14 @@
use std::sync::Arc;
use dynamo_runtime::transports::etcd;
use dynamo_runtime::{protocols, slug::Slug};
use dynamo_runtime::{
protocols,
slug::Slug,
storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
};
use serde::{Deserialize, Serialize};
use crate::{
key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
model_card::{self, ModelDeploymentCard},
model_type::ModelType,
};
......
......@@ -18,7 +18,7 @@ pub mod engines;
pub mod gguf;
pub mod http;
pub mod hub;
pub mod key_value_store;
// pub mod key_value_store;
pub mod kv_router;
pub mod local_model;
pub mod mocker;
......
......@@ -5,11 +5,13 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use dynamo_runtime::component::{Component, Endpoint};
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::{
component::{Component, Endpoint},
storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
};
use crate::discovery::ModelEntry;
use crate::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager};
use crate::model_card::{self, ModelDeploymentCard};
use crate::model_type::ModelType;
......
......@@ -21,14 +21,12 @@ use std::time::Duration;
use anyhow::{Context, Result};
use derive_builder::Builder;
use dynamo_runtime::slug::Slug;
use dynamo_runtime::transports::nats;
use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned, transports::nats};
use serde::{Deserialize, Serialize};
use tokenizers::Tokenizer as HfTokenizer;
use url::Url;
use crate::gguf::{Content, ContentConfig, ModelConfigLike};
use crate::key_value_store::Versioned;
use crate::protocols::TokenIdType;
/// If a model deployment card hasn't been refreshed in this much time the worker is likely gone
......
......@@ -29,10 +29,15 @@
//!
//! TODO: Top-level Overview of Endpoints/Functions
use crate::{discovery::Lease, service::ServiceSet};
use crate::{discovery::Lease, service::ServiceSet, transports::etcd::EtcdPath};
use super::{
error, traits::*, transports::nats::Slug, utils::Duration, DistributedRuntime, Result, Runtime,
error,
traits::*,
transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
transports::nats::Slug,
utils::Duration,
DistributedRuntime, Result, Runtime,
};
use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
......@@ -63,6 +68,9 @@ pub use client::{Client, InstanceSource};
/// An instance is namespace+component+endpoint+lease_id and must be unique.
pub const INSTANCE_ROOT_PATH: &str = "instances";
/// The root etcd path where each namespace is registered in etcd.
pub const ETCD_ROOT_PATH: &str = "dynamo://";
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum TransportType {
......@@ -100,17 +108,18 @@ impl Instance {
/// a [Service] then adding one or more [Endpoint] to the [Service].
///
/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
#[derive(Educe, Builder, Clone)]
#[derive(Educe, Builder, Clone, Validate)]
#[educe(Debug)]
#[builder(pattern = "owned")]
pub struct Component {
#[builder(private)]
#[educe(Debug(ignore))]
drt: DistributedRuntime,
drt: Arc<DistributedRuntime>,
// todo - restrict the namespace to a-z0-9-_A-Z
/// Name of the component
#[builder(setter(into))]
#[validate(custom(function = "validate_allowed_chars"))]
name: String,
// todo - restrict the namespace to a-z0-9-_A-Z
......@@ -176,6 +185,11 @@ impl Component {
format!("{}/{}", self.namespace.name(), self.name)
}
pub fn etcd_path(&self) -> EtcdPath {
EtcdPath::new_component(&self.namespace.name(), &self.name)
.expect("Component name and namespace should be valid")
}
pub fn namespace(&self) -> &Namespace {
&self.namespace
}
......@@ -240,7 +254,7 @@ impl Component {
}
impl ComponentBuilder {
pub fn from_runtime(drt: DistributedRuntime) -> Self {
pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
Self::default().drt(drt)
}
}
......@@ -303,8 +317,14 @@ impl Endpoint {
&self.component
}
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
pub fn path(&self) -> String {
format!("{}/{}", self.component.path(), self.name)
format!(
"{}/{}/{}",
self.component.path(),
ENDPOINT_KEYWORD,
self.name
)
}
/// The endpoint part of an instance path in etcd
......@@ -314,8 +334,18 @@ impl Endpoint {
format!("{component_path}/{endpoint_name}")
}
/// The endpoint as an EtcdPath object
pub fn etcd_path(&self) -> EtcdPath {
EtcdPath::new_endpoint(
&self.component.namespace().name(),
&self.component.name(),
&self.name,
)
.expect("Endpoint name and component name should be valid")
}
/// The fully path of an instance in etcd
pub fn etcd_path(&self, lease_id: i64) -> String {
pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
let endpoint_root = self.etcd_root();
if self.is_static {
endpoint_root
......@@ -324,6 +354,21 @@ impl Endpoint {
}
}
/// The endpoint as an EtcdPath object with lease ID
pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
if self.is_static {
self.etcd_path()
} else {
EtcdPath::new_endpoint_with_lease(
&self.component.namespace().name(),
&self.component.name(),
&self.name,
lease_id,
)
.expect("Endpoint name and component name should be valid")
}
}
pub fn name_with_id(&self, lease_id: i64) -> String {
if self.is_static {
self.name.clone()
......@@ -358,18 +403,19 @@ impl Endpoint {
}
}
#[derive(Educe, Builder, Clone, Validate)]
#[educe(Debug)]
#[derive(Builder, Clone, Validate)]
#[builder(pattern = "owned")]
pub struct Namespace {
#[builder(private)]
#[educe(Debug(ignore))]
runtime: DistributedRuntime,
runtime: Arc<DistributedRuntime>,
#[validate()]
#[validate(custom(function = "validate_allowed_chars"))]
name: String,
is_static: bool,
#[builder(default = "None")]
parent: Option<Arc<Namespace>>,
}
impl DistributedRuntimeProvider for Namespace {
......@@ -378,6 +424,16 @@ impl DistributedRuntimeProvider for Namespace {
}
}
impl std::fmt::Debug for Namespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
self.name, self.is_static, self.parent
)
}
}
impl RuntimeProvider for Namespace {
fn rt(&self) -> &Runtime {
self.runtime.rt()
......@@ -393,7 +449,7 @@ impl std::fmt::Display for Namespace {
impl Namespace {
pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
Ok(NamespaceBuilder::default()
.runtime(runtime)
.runtime(Arc::new(runtime))
.name(name)
.is_static(is_static)
.build()?)
......@@ -408,8 +464,25 @@ impl Namespace {
.build()?)
}
pub fn name(&self) -> &str {
&self.name
/// Create a [`Namespace`] in the parent namespace
pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
Ok(NamespaceBuilder::default()
.runtime(self.runtime.clone())
.name(name.into())
.is_static(self.is_static)
.parent(Some(Arc::new(self.clone())))
.build()?)
}
pub fn etcd_path(&self) -> String {
format!("{}{}", ETCD_ROOT_PATH, self.name())
}
pub fn name(&self) -> String {
match &self.parent {
Some(parent) => format!("{}.{}", parent.name(), self.name),
None => self.name.clone(),
}
}
}
......
......@@ -59,7 +59,10 @@ impl EndpointConfigBuilder {
let lease = lease.or(endpoint.drt().primary_lease());
let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
tracing::debug!("Starting endpoint: {}", endpoint.etcd_path(lease_id));
tracing::debug!(
"Starting endpoint: {}",
endpoint.etcd_path_with_lease_id(lease_id)
);
let service_name = endpoint.component.service_name();
......@@ -124,7 +127,11 @@ impl EndpointConfigBuilder {
if let Some(etcd_client) = &endpoint.component.drt.etcd_client {
if let Err(e) = etcd_client
.kv_create(endpoint.etcd_path(lease_id), info, Some(lease_id))
.kv_create(
endpoint.etcd_path_with_lease_id(lease_id),
info,
Some(lease_id),
)
.await
{
tracing::error!("Failed to register discoverable service: {:?}", e);
......
......@@ -151,6 +151,7 @@ impl DistributedRuntime {
self.nats_client.clone()
}
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
pub fn etcd_client(&self) -> Option<etcd::Client> {
self.etcd_client.clone()
}
......
......@@ -44,6 +44,7 @@ pub mod runnable;
pub mod runtime;
pub mod service;
pub mod slug;
pub mod storage;
pub mod traits;
pub mod transports;
pub mod utils;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub mod key_value_store;
......@@ -22,9 +22,9 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use crate::slug::Slug;
use crate::CancellationToken;
use async_trait::async_trait;
use dynamo_runtime::slug::Slug;
use dynamo_runtime::CancellationToken;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
......@@ -312,7 +312,7 @@ mod tests {
}
fn init() {
dynamo_runtime::logging::init();
crate::logging::init();
}
#[tokio::test]
......
......@@ -17,9 +17,9 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::time::Duration;
use crate::{slug::Slug, transports::etcd::Client};
use async_stream::stream;
use async_trait::async_trait;
use dynamo_runtime::{slug::Slug, transports::etcd::Client};
use etcd_client::{EventType, PutOptions, WatchOptions};
use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome};
......
......@@ -15,8 +15,8 @@
use std::{collections::HashMap, pin::Pin, time::Duration};
use crate::{protocols::Endpoint, slug::Slug, transports::nats::Client};
use async_trait::async_trait;
use dynamo_runtime::{protocols::Endpoint, slug::Slug, transports::nats::Client};
use futures::StreamExt;
use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome};
......
......@@ -32,7 +32,10 @@ pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
use tokio::time::{interval, Duration};
mod lease;
mod path;
use lease::*;
pub use path::*;
//pub use etcd::ConnectOptions as EtcdConnectOptions;
......@@ -130,7 +133,7 @@ impl Client {
}
/// Get a reference to the underlying [`etcd_client::Client`] instance.
pub fn etcd_client(&self) -> &etcd_client::Client {
pub(crate) fn etcd_client(&self) -> &etcd_client::Client {
&self.client
}
......
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Test file for recursive namespace etcd_path functionality
#[allow(unused_imports)]
use dynamo_runtime::{DistributedRuntime, Runtime};
#[cfg(feature = "integration")]
#[test]
fn test_namespace_etcd_path_format() {
// Test that the etcd_path format is correct for the expected use case
// This test verifies the format: dynamo://ns1.ns2.ns3/component/{component.name()}
// Expected format examples:
let single_ns_path = "dynamo://ns1";
let nested_ns_path = "dynamo://ns1.ns2.ns3";
let component_path = "dynamo://ns1.ns2.ns3/_component_/my-component";
// Verify the format matches our requirements
assert!(single_ns_path.starts_with("dynamo://"));
assert!(nested_ns_path.starts_with("dynamo://"));
assert!(nested_ns_path.contains("."));
assert!(component_path.contains("/_component_/"));
// Test the specific format requested in the user query (now with reserved keywords)
let expected_format = "dynamo://ns1.ns2.ns3/_component_/my-component";
assert_eq!(component_path, expected_format);
println!("✅ Namespace etcd_path format verification passed");
println!(" Single namespace: {}", single_ns_path);
println!(" Nested namespace: {}", nested_ns_path);
println!(" Component path: {}", component_path);
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_recursive_namespace_implementation() {
let runtime = Runtime::from_current().unwrap();
let distributed_runtime = DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap();
// Test single namespace
let ns1 = distributed_runtime.namespace("ns1").unwrap();
assert_eq!(ns1.etcd_path(), "dynamo://ns1");
assert_eq!(ns1.name(), "ns1");
// Test nested namespace ns1.ns2
let ns2 = ns1.namespace("ns2").unwrap();
assert_eq!(ns2.etcd_path(), "dynamo://ns1.ns2");
assert_eq!(ns2.name(), "ns1.ns2");
// Test deeply nested namespace ns1.ns2.ns3
let ns3 = ns2.namespace("ns3").unwrap();
assert_eq!(ns3.etcd_path(), "dynamo://ns1.ns2.ns3");
assert_eq!(ns3.name(), "ns1.ns2.ns3");
// Test component in deeply nested namespace
let component = ns3.component("my-component").unwrap();
assert_eq!(
component.etcd_path().to_string(),
"dynamo://ns1.ns2.ns3/_component_/my-component"
);
assert_eq!(component.name(), "my-component");
assert_eq!(component.path(), "ns1.ns2.ns3/my-component");
println!("✅ Actual recursive namespace implementation test passed!");
println!(" Root namespace: {}", ns1.etcd_path());
println!(" Nested namespace: {}", ns2.etcd_path());
println!(" Deep namespace: {}", ns3.etcd_path());
println!(" Component path: {}", component.etcd_path());
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_multiple_branches_recursive_namespaces() {
let runtime = Runtime::from_current().unwrap();
let distributed_runtime = DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap();
// Create root namespace
let root = distributed_runtime.namespace("root").unwrap();
// Create multiple branches
let prod_ns = root.namespace("prod").unwrap();
let staging_ns = root.namespace("staging").unwrap();
// Create services in each branch
let prod_service_ns = prod_ns.namespace("services").unwrap();
let staging_service_ns = staging_ns.namespace("services").unwrap();
// Verify the paths are correct
assert_eq!(prod_service_ns.etcd_path(), "dynamo://root.prod.services");
assert_eq!(
staging_service_ns.etcd_path(),
"dynamo://root.staging.services"
);
// Create components in each branch
let prod_component = prod_service_ns.component("api-gateway").unwrap();
let staging_component = staging_service_ns.component("api-gateway").unwrap();
assert_eq!(
prod_component.etcd_path().to_string(),
"dynamo://root.prod.services/_component_/api-gateway"
);
assert_eq!(
staging_component.etcd_path().to_string(),
"dynamo://root.staging.services/_component_/api-gateway"
);
// Verify they are different
assert_ne!(prod_component.etcd_path(), staging_component.etcd_path());
println!("✅ Multiple branches recursive namespaces test passed!");
println!(" Production: {}", prod_component.etcd_path());
println!(" Staging: {}", staging_component.etcd_path());
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment