Unverified Commit fcb91e4b authored by Graham King's avatar Graham King Committed by GitHub
Browse files

refactor(storage): Remove the stuttering from key_value_store. (#4604)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 7a384793
......@@ -6,7 +6,7 @@ use dynamo_llm::entrypoint::EngineConfig;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::distributed::{DistributedConfig, RequestPlaneMode};
use dynamo_runtime::storage::key_value_store::KeyValueStoreSelect;
use dynamo_runtime::storage::kv;
use dynamo_runtime::transports::nats;
use dynamo_runtime::{DistributedRuntime, Runtime};
......@@ -82,7 +82,7 @@ pub async fn run(
DistributedConfig::process_local()
} else {
// Normal case
let selected_store: KeyValueStoreSelect = flags.store_kv.parse()?;
let selected_store: kv::Selector = flags.store_kv.parse()?;
let request_plane: RequestPlaneMode = flags.request_plane.parse()?;
DistributedConfig {
store_backend: selected_store,
......
......@@ -3,7 +3,7 @@
use dynamo_llm::local_model::LocalModel;
use dynamo_runtime::distributed::{DistributedConfig, RequestPlaneMode};
use dynamo_runtime::storage::key_value_store::KeyValueStoreSelect;
use dynamo_runtime::storage::kv;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use pyo3::IntoPyObjectExt;
......@@ -455,7 +455,7 @@ enum ModelInput {
impl DistributedRuntime {
#[new]
fn new(event_loop: PyObject, store_kv: String, request_plane: String) -> PyResult<Self> {
let selected_kv_store: KeyValueStoreSelect = store_kv.parse().map_err(to_pyerr)?;
let selected_kv_store: kv::Selector = store_kv.parse().map_err(to_pyerr)?;
let request_plane: RequestPlaneMode = request_plane.parse().map_err(to_pyerr)?;
// Try to get existing runtime first, create new Worker only if needed
......
......@@ -21,7 +21,7 @@ use derive_builder::Builder;
use dynamo_runtime::discovery::{Discovery, KVStoreDiscovery};
use dynamo_runtime::logging::make_request_span;
use dynamo_runtime::metrics::prometheus_names::name_prefix;
use dynamo_runtime::storage::key_value_store::KeyValueStoreManager;
use dynamo_runtime::storage::kv;
use std::net::SocketAddr;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
......@@ -31,7 +31,7 @@ use tower_http::trace::TraceLayer;
pub struct State {
metrics: Arc<Metrics>,
manager: Arc<ModelManager>,
store: KeyValueStoreManager,
store: kv::Manager,
discovery_client: Arc<dyn Discovery>,
flags: StateFlags,
}
......@@ -73,7 +73,7 @@ impl StateFlags {
}
impl State {
pub fn new(manager: Arc<ModelManager>, store: KeyValueStoreManager) -> Self {
pub fn new(manager: Arc<ModelManager>, store: kv::Manager) -> Self {
// Initialize discovery backed by KV store
// Create a cancellation token for the discovery's watch streams
let discovery_client = {
......@@ -108,7 +108,7 @@ impl State {
self.manager.clone()
}
pub fn store(&self) -> &KeyValueStoreManager {
pub fn store(&self) -> &kv::Manager {
&self.store
}
......@@ -178,7 +178,7 @@ pub struct HttpServiceConfig {
request_template: Option<RequestTemplate>,
#[builder(default)]
store: KeyValueStoreManager,
store: kv::Manager,
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
#[builder(default = "None")]
......
......@@ -21,7 +21,7 @@ use crate::local_model::runtime_config::ModelRuntimeConfig;
use crate::model_type::{ModelInput, ModelType};
use anyhow::{Context, Result};
use derive_builder::Builder;
use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned};
use dynamo_runtime::{slug::Slug, storage::kv};
use serde::{Deserialize, Serialize};
use tokenizers::Tokenizer as HfTokenizer;
......@@ -543,7 +543,7 @@ impl PartialEq for ModelDeploymentCard {
}
/// A ModelDeploymentCard is published a single time per instance and never updated.
impl Versioned for ModelDeploymentCard {
impl kv::Versioned for ModelDeploymentCard {
fn revision(&self) -> u64 {
0
}
......
......@@ -17,7 +17,6 @@ use crate::{
AddressedPushRouter, AddressedRequest, AsyncEngine, Data, ManyOut, PushRouter, RouterMode,
SingleIn,
},
storage::key_value_store::{KeyValueStoreManager, WatchEvent},
traits::DistributedRuntimeProvider,
transports::etcd::Client as EtcdClient,
};
......
......@@ -15,7 +15,6 @@ use crate::{
distributed::RequestPlaneMode,
pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
protocols::EndpointId,
storage::key_value_store,
traits::DistributedRuntimeProvider,
transports::nats,
};
......
......@@ -12,19 +12,19 @@ use tokio_util::sync::CancellationToken;
use super::{
Discovery, DiscoveryEvent, DiscoveryInstance, DiscoveryQuery, DiscoverySpec, DiscoveryStream,
};
use crate::storage::key_value_store::{KeyValueStoreManager, WatchEvent};
use crate::storage::kv;
const INSTANCES_BUCKET: &str = "v1/instances";
const MODELS_BUCKET: &str = "v1/mdc";
/// Discovery implementation backed by a KeyValueStore
/// Discovery implementation backed by a kv::Store
pub struct KVStoreDiscovery {
store: Arc<KeyValueStoreManager>,
store: Arc<kv::Manager>,
cancel_token: CancellationToken,
}
impl KVStoreDiscovery {
pub fn new(store: KeyValueStoreManager, cancel_token: CancellationToken) -> Self {
pub fn new(store: kv::Manager, cancel_token: CancellationToken) -> Self {
Self {
store: Arc::new(store),
cancel_token,
......@@ -184,7 +184,7 @@ impl Discovery for KVStoreDiscovery {
key_path
);
let bucket = self.store.get_or_create_bucket(bucket_name, None).await?;
let key = crate::storage::key_value_store::Key::new(key_path.clone());
let key = kv::Key::new(key_path.clone());
tracing::debug!(
"KVStoreDiscovery::register: Inserting into bucket={}, key={}",
......@@ -251,7 +251,7 @@ impl Discovery for KVStoreDiscovery {
return Ok(());
};
let key = crate::storage::key_value_store::Key::new(key_path.clone());
let key = kv::Key::new(key_path.clone());
// Delete the entry from the bucket
bucket.delete(&key).await?;
......@@ -313,7 +313,7 @@ impl Discovery for KVStoreDiscovery {
// Use the provided cancellation token, or fall back to the default token
let cancel_token = cancel_token.unwrap_or_else(|| self.cancel_token.clone());
// Use the KeyValueStoreManager's watch mechanism
// Use the kv::Manager's watch mechanism
let (_, mut rx) = self.store.clone().watch(
bucket_name,
None, // No TTL
......@@ -324,7 +324,7 @@ impl Discovery for KVStoreDiscovery {
let stream = async_stream::stream! {
while let Some(event) = rx.recv().await {
let discovery_event = match event {
WatchEvent::Put(kv) => {
kv::WatchEvent::Put(kv) => {
// Check if this key matches our prefix
if !Self::matches_prefix(kv.key_str(), &prefix, bucket_name) {
continue;
......@@ -344,7 +344,7 @@ impl Discovery for KVStoreDiscovery {
}
}
}
WatchEvent::Delete(kv) => {
kv::WatchEvent::Delete(kv) => {
let key_str = kv.as_ref();
// Check if this key matches our prefix
if !Self::matches_prefix(key_str, &prefix, bucket_name) {
......@@ -398,7 +398,7 @@ mod tests {
#[tokio::test]
async fn test_kv_store_discovery_register_endpoint() {
let store = KeyValueStoreManager::memory();
let store = kv::Manager::memory();
let cancel_token = CancellationToken::new();
let client = KVStoreDiscovery::new(store, cancel_token);
......@@ -423,7 +423,7 @@ mod tests {
#[tokio::test]
async fn test_kv_store_discovery_list() {
let store = KeyValueStoreManager::memory();
let store = kv::Manager::memory();
let cancel_token = CancellationToken::new();
let client = KVStoreDiscovery::new(store, cancel_token);
......@@ -478,7 +478,7 @@ mod tests {
#[tokio::test]
async fn test_kv_store_discovery_watch() {
let store = KeyValueStoreManager::memory();
let store = kv::Manager::memory();
let cancel_token = CancellationToken::new();
let client = Arc::new(KVStoreDiscovery::new(store, cancel_token.clone()));
......
......@@ -5,10 +5,7 @@ use crate::component::{Component, Instance};
use crate::pipeline::PipelineError;
use crate::pipeline::network::manager::NetworkManager;
use crate::service::{ComponentNatsServerPrometheusMetrics, ServiceClient, ServiceSet};
use crate::storage::key_value_store::{
EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
MemoryStore,
};
use crate::storage::kv::{self, Store as _};
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{
component::{self, ComponentBuilder, Endpoint, Namespace},
......@@ -48,7 +45,7 @@ pub struct DistributedRuntime {
runtime: Runtime,
nats_client: Option<transports::nats::Client>,
store: KeyValueStoreManager,
store: kv::Manager,
network_manager: Arc<NetworkManager>,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
......@@ -104,15 +101,15 @@ impl DistributedRuntime {
let runtime_clone = runtime.clone();
let store = match selected_kv_store {
KeyValueStoreSelect::Etcd(etcd_config) => {
kv::Selector::Etcd(etcd_config) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
// The returned error doesn't show because of a dropped runtime error, so
// log it first.
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
KeyValueStoreManager::etcd(etcd_client)
kv::Manager::etcd(etcd_client)
}
KeyValueStoreSelect::File(root) => KeyValueStoreManager::file(root),
KeyValueStoreSelect::Memory => KeyValueStoreManager::memory(),
kv::Selector::File(root) => kv::Manager::file(root),
kv::Selector::Memory => kv::Manager::memory(),
};
let nats_client = match nats_config {
......@@ -377,7 +374,7 @@ impl DistributedRuntime {
/// An interface to store things outside of the process. Usually backed by something like etcd.
/// Currently does key-value, but will grow to include whatever we need to store.
pub fn store(&self) -> &KeyValueStoreManager {
pub fn store(&self) -> &kv::Manager {
&self.store
}
......@@ -546,7 +543,7 @@ async fn nats_metrics_worker(
#[derive(Dissolve)]
pub struct DistributedConfig {
pub store_backend: KeyValueStoreSelect,
pub store_backend: kv::Selector,
pub nats_config: Option<nats::ClientOptions>,
pub request_plane: RequestPlaneMode,
}
......@@ -555,7 +552,7 @@ impl DistributedConfig {
pub fn from_settings() -> DistributedConfig {
let request_plane = RequestPlaneMode::from_env();
DistributedConfig {
store_backend: KeyValueStoreSelect::Etcd(Box::default()),
store_backend: kv::Selector::Etcd(Box::default()),
nats_config: if request_plane.is_nats() {
Some(nats::ClientOptions::default())
} else {
......@@ -572,7 +569,7 @@ impl DistributedConfig {
};
let request_plane = RequestPlaneMode::from_env();
DistributedConfig {
store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
store_backend: kv::Selector::Etcd(Box::new(etcd_config)),
nats_config: if request_plane.is_nats() {
Some(nats::ClientOptions::default())
} else {
......@@ -586,7 +583,7 @@ impl DistributedConfig {
/// same process.
pub fn process_local() -> DistributedConfig {
DistributedConfig {
store_backend: KeyValueStoreSelect::Memory,
store_backend: kv::Selector::Memory,
nats_config: None,
// This won't be used in process local, so we likely need a "none" option to
// communicate that and avoid opening the ports.
......@@ -666,11 +663,11 @@ pub mod distributed_test_utils {
/// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
#[cfg(feature = "integration")]
pub async fn create_test_drt_async() -> super::DistributedRuntime {
use crate::{storage::key_value_store::KeyValueStoreSelect, transports::nats};
use crate::{storage::kv, transports::nats};
let rt = crate::Runtime::from_current().unwrap();
let config = super::DistributedConfig {
store_backend: KeyValueStoreSelect::Memory,
store_backend: kv::Selector::Memory,
nats_config: Some(nats::ClientOptions::default()),
request_plane: crate::distributed::RequestPlaneMode::default(),
};
......
......@@ -53,10 +53,6 @@ pub use system_health::{HealthCheckTarget, SystemHealth};
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;
use crate::{
metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore,
};
use component::Endpoint;
use utils::GracefulShutdownTracker;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub mod key_value_store;
pub mod kv;
......@@ -112,8 +112,8 @@ pub enum WatchEvent {
}
#[async_trait]
pub trait KeyValueStore: Send + Sync {
type Bucket: KeyValueBucket + Send + Sync + 'static;
pub trait Store: Send + Sync {
type Bucket: Bucket + Send + Sync + 'static;
async fn get_or_create_bucket(
&self,
......@@ -130,7 +130,7 @@ pub trait KeyValueStore: Send + Sync {
}
#[derive(Clone, Debug, Default)]
pub enum KeyValueStoreSelect {
pub enum Selector {
// Box it because it is significantly bigger than the other variants
Etcd(Box<etcd_transport::ClientOptions>),
File(PathBuf),
......@@ -139,23 +139,23 @@ pub enum KeyValueStoreSelect {
// Nats not listed because likely we want to remove that impl. It is not currently used and not well tested.
}
impl fmt::Display for KeyValueStoreSelect {
impl fmt::Display for Selector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
KeyValueStoreSelect::Etcd(opts) => {
Selector::Etcd(opts) => {
let urls = opts.etcd_url.join(",");
write!(f, "Etcd({urls})")
}
KeyValueStoreSelect::File(path) => write!(f, "File({})", path.display()),
KeyValueStoreSelect::Memory => write!(f, "Memory"),
Selector::File(path) => write!(f, "File({})", path.display()),
Selector::Memory => write!(f, "Memory"),
}
}
}
impl FromStr for KeyValueStoreSelect {
impl FromStr for Selector {
type Err = anyhow::Error;
fn from_str(s: &str) -> anyhow::Result<KeyValueStoreSelect> {
fn from_str(s: &str) -> anyhow::Result<Selector> {
match s {
"etcd" => Ok(Self::Etcd(Box::default())),
"file" => {
......@@ -170,16 +170,16 @@ impl FromStr for KeyValueStoreSelect {
}
}
impl TryFrom<String> for KeyValueStoreSelect {
impl TryFrom<String> for Selector {
type Error = anyhow::Error;
fn try_from(s: String) -> anyhow::Result<KeyValueStoreSelect> {
fn try_from(s: String) -> anyhow::Result<Selector> {
s.parse()
}
}
#[allow(clippy::large_enum_variant)]
pub enum KeyValueStoreEnum {
enum KeyValueStoreEnum {
Memory(MemoryStore),
Nats(NATSStore),
Etcd(EtcdStore),
......@@ -192,7 +192,7 @@ impl KeyValueStoreEnum {
bucket_name: &str,
// auto-delete items older than this
ttl: Option<Duration>,
) -> Result<Box<dyn KeyValueBucket>, StoreError> {
) -> Result<Box<dyn Bucket>, StoreError> {
use KeyValueStoreEnum::*;
Ok(match self {
Memory(x) => Box::new(x.get_or_create_bucket(bucket_name, ttl).await?),
......@@ -202,28 +202,25 @@ impl KeyValueStoreEnum {
})
}
async fn get_bucket(
&self,
bucket_name: &str,
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError> {
async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Box<dyn Bucket>>, StoreError> {
use KeyValueStoreEnum::*;
let maybe_bucket: Option<Box<dyn KeyValueBucket>> = match self {
let maybe_bucket: Option<Box<dyn Bucket>> = match self {
Memory(x) => x
.get_bucket(bucket_name)
.await?
.map(|b| Box::new(b) as Box<dyn KeyValueBucket>),
.map(|b| Box::new(b) as Box<dyn Bucket>),
Nats(x) => x
.get_bucket(bucket_name)
.await?
.map(|b| Box::new(b) as Box<dyn KeyValueBucket>),
.map(|b| Box::new(b) as Box<dyn Bucket>),
Etcd(x) => x
.get_bucket(bucket_name)
.await?
.map(|b| Box::new(b) as Box<dyn KeyValueBucket>),
.map(|b| Box::new(b) as Box<dyn Bucket>),
File(x) => x
.get_bucket(bucket_name)
.await?
.map(|b| Box::new(b) as Box<dyn KeyValueBucket>),
.map(|b| Box::new(b) as Box<dyn Bucket>),
};
Ok(maybe_bucket)
}
......@@ -250,15 +247,15 @@ impl KeyValueStoreEnum {
}
#[derive(Clone)]
pub struct KeyValueStoreManager(pub Arc<KeyValueStoreEnum>);
pub struct Manager(Arc<KeyValueStoreEnum>);
impl Default for KeyValueStoreManager {
impl Default for Manager {
fn default() -> Self {
KeyValueStoreManager::memory()
Manager::memory()
}
}
impl KeyValueStoreManager {
impl Manager {
/// In-memory KeyValueStoreManager for testing
pub fn memory() -> Self {
Self::new(KeyValueStoreEnum::Memory(MemoryStore::new()))
......@@ -272,8 +269,8 @@ impl KeyValueStoreManager {
Self::new(KeyValueStoreEnum::File(FileStore::new(root)))
}
fn new(s: KeyValueStoreEnum) -> KeyValueStoreManager {
KeyValueStoreManager(Arc::new(s))
fn new(s: KeyValueStoreEnum) -> Manager {
Manager(Arc::new(s))
}
pub async fn get_or_create_bucket(
......@@ -281,14 +278,14 @@ impl KeyValueStoreManager {
bucket_name: &str,
// auto-delete items older than this
ttl: Option<Duration>,
) -> Result<Box<dyn KeyValueBucket>, StoreError> {
) -> Result<Box<dyn Bucket>, StoreError> {
self.0.get_or_create_bucket(bucket_name, ttl).await
}
pub async fn get_bucket(
&self,
bucket_name: &str,
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError> {
) -> Result<Option<Box<dyn Bucket>>, StoreError> {
self.0.get_bucket(bucket_name).await
}
......@@ -397,7 +394,7 @@ impl KeyValueStoreManager {
/// An online storage for key-value config values.
#[async_trait]
pub trait KeyValueBucket: Send + Sync {
pub trait Bucket: Send + Sync {
/// A bucket is a collection of key/value pairs.
/// Insert a value into a bucket, if it doesn't exist already
/// The Key should be the name of the item, not including the bucket name.
......
......@@ -5,15 +5,12 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::time::Duration;
use crate::{
storage::key_value_store::{Key, KeyValue, WatchEvent},
transports::etcd,
};
use crate::transports::etcd;
use async_stream::stream;
use async_trait::async_trait;
use etcd_client::{Compare, CompareOp, EventType, PutOptions, Txn, TxnOp, WatchOptions};
use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome};
use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
#[derive(Clone)]
pub struct EtcdStore {
......@@ -27,7 +24,7 @@ impl EtcdStore {
}
#[async_trait]
impl KeyValueStore for EtcdStore {
impl Store for EtcdStore {
type Bucket = EtcdBucket;
/// A "bucket" in etcd is a path prefix
......@@ -66,7 +63,7 @@ pub struct EtcdBucket {
}
#[async_trait]
impl KeyValueBucket for EtcdBucket {
impl Bucket for EtcdBucket {
async fn insert(
&self,
key: &Key,
......
......@@ -21,9 +21,7 @@ use futures::StreamExt;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
use parking_lot::Mutex;
use crate::storage::key_value_store::KeyValue;
use super::{Key, KeyValueBucket, KeyValueStore, StoreError, StoreOutcome, WatchEvent};
use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
/// How long until a key expires. We keep the keys alive by touching the files.
/// 10s is the same as our etcd lease expiry.
......@@ -100,7 +98,7 @@ impl FileStore {
}
#[async_trait]
impl KeyValueStore for FileStore {
impl Store for FileStore {
type Bucket = Directory;
/// A "bucket" is a directory
......@@ -278,7 +276,7 @@ impl fmt::Display for Directory {
}
#[async_trait]
impl KeyValueBucket for Directory {
impl Bucket for Directory {
/// Write a file to the directory
async fn insert(
&self,
......@@ -471,9 +469,7 @@ fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
mod tests {
use std::collections::HashSet;
use crate::storage::key_value_store::{
FileStore, Key, KeyValueBucket as _, KeyValueStore as _,
};
use crate::storage::kv::{Bucket as _, FileStore, Key, Store as _};
#[tokio::test]
async fn test_entries_full_path() {
......
......@@ -11,9 +11,7 @@ use async_trait::async_trait;
use rand::Rng as _;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use crate::storage::key_value_store::{Key, KeyValue, WatchEvent};
use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome};
use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
#[derive(Clone, Debug)]
enum MemoryEvent {
......@@ -71,7 +69,7 @@ impl MemoryStore {
}
#[async_trait]
impl KeyValueStore for MemoryStore {
impl Store for MemoryStore {
type Bucket = MemoryBucketRef;
async fn get_or_create_bucket(
......@@ -112,7 +110,7 @@ impl KeyValueStore for MemoryStore {
}
#[async_trait]
impl KeyValueBucket for MemoryBucketRef {
impl Bucket for MemoryBucketRef {
async fn insert(
&self,
key: &Key,
......@@ -233,12 +231,9 @@ impl KeyValueBucket for MemoryBucketRef {
#[cfg(test)]
mod tests {
use crate::storage::kv::{Bucket as _, Key, MemoryStore, Store as _};
use std::collections::HashSet;
use crate::storage::key_value_store::{
Key, KeyValueBucket as _, KeyValueStore as _, MemoryStore,
};
#[tokio::test]
async fn test_entries_full_path() {
let m = MemoryStore::new();
......
......@@ -3,17 +3,12 @@
use std::{collections::HashMap, pin::Pin, time::Duration};
use crate::{
protocols::EndpointId,
slug::Slug,
storage::key_value_store::{Key, KeyValue, WatchEvent},
transports::nats::Client,
};
use crate::{protocols::EndpointId, slug::Slug, storage::kv, transports::nats::Client};
use async_nats::jetstream::kv::Operation;
use async_trait::async_trait;
use futures::StreamExt;
use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome};
use super::{Bucket, Store, StoreError, StoreOutcome};
#[derive(Clone)]
pub struct NATSStore {
......@@ -26,7 +21,7 @@ pub struct NATSBucket {
}
#[async_trait]
impl KeyValueStore for NATSStore {
impl Store for NATSStore {
type Bucket = NATSBucket;
async fn get_or_create_bucket(
......@@ -120,10 +115,10 @@ impl NATSStore {
}
#[async_trait]
impl KeyValueBucket for NATSBucket {
impl Bucket for NATSBucket {
async fn insert(
&self,
key: &Key,
key: &kv::Key,
value: bytes::Bytes,
revision: u64,
) -> Result<StoreOutcome, StoreError> {
......@@ -134,14 +129,14 @@ impl KeyValueBucket for NATSBucket {
}
}
async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
async fn get(&self, key: &kv::Key) -> Result<Option<bytes::Bytes>, StoreError> {
self.nats_store
.get(key)
.await
.map_err(|e| StoreError::NATSError(e.to_string()))
}
async fn delete(&self, key: &Key) -> Result<(), StoreError> {
async fn delete(&self, key: &kv::Key) -> Result<(), StoreError> {
self.nats_store
.delete(key)
.await
......@@ -150,7 +145,8 @@ impl KeyValueBucket for NATSBucket {
async fn watch(
&self,
) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
) -> Result<Pin<Box<dyn futures::Stream<Item = kv::WatchEvent> + Send + 'life0>>, StoreError>
{
let watch_stream = self
.nats_store
.watch_all()
......@@ -165,15 +161,15 @@ impl KeyValueBucket for NATSBucket {
>| async move {
match maybe_entry {
Ok(entry) => {
let key = Key::new(entry.key);
let key = kv::Key::new(entry.key);
Some(match entry.operation {
Operation::Put => {
let item = KeyValue::new(key, entry.value);
WatchEvent::Put(item)
let item = kv::KeyValue::new(key, entry.value);
kv::WatchEvent::Put(item)
}
Operation::Delete => WatchEvent::Delete(key),
Operation::Delete => kv::WatchEvent::Delete(key),
// TODO: What is Purge? Not urgent, NATS impl not used
Operation::Purge => WatchEvent::Delete(key),
Operation::Purge => kv::WatchEvent::Delete(key),
})
}
Err(e) => {
......@@ -186,7 +182,7 @@ impl KeyValueBucket for NATSBucket {
))
}
async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
async fn entries(&self) -> Result<HashMap<kv::Key, bytes::Bytes>, StoreError> {
let mut key_stream = self
.nats_store
.keys()
......@@ -195,7 +191,7 @@ impl KeyValueBucket for NATSBucket {
let mut out = HashMap::new();
while let Some(Ok(key)) = key_stream.next().await {
if let Ok(Some(entry)) = self.nats_store.entry(&key).await {
out.insert(Key::new(key), entry.value);
out.insert(kv::Key::new(key), entry.value);
}
}
Ok(out)
......@@ -203,7 +199,7 @@ impl KeyValueBucket for NATSBucket {
}
impl NATSBucket {
async fn create(&self, key: &Key, value: bytes::Bytes) -> Result<StoreOutcome, StoreError> {
async fn create(&self, key: &kv::Key, value: bytes::Bytes) -> Result<StoreOutcome, StoreError> {
match self.nats_store.create(&key, value).await {
Ok(revision) => Ok(StoreOutcome::Created(revision)),
Err(err) if err.kind() == async_nats::jetstream::kv::CreateErrorKind::AlreadyExists => {
......@@ -226,7 +222,7 @@ impl NATSBucket {
async fn update(
&self,
key: &Key,
key: &kv::Key,
value: bytes::Bytes,
revision: u64,
) -> Result<StoreOutcome, StoreError> {
......@@ -246,7 +242,7 @@ impl NATSBucket {
/// and try the update again.
async fn resync_update(
&self,
key: &Key,
key: &kv::Key,
value: bytes::Bytes,
) -> Result<StoreOutcome, StoreError> {
match self.nats_store.entry(key).await {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment