/* * Copyright 2024-2025 NVIDIA CORPORATION & AFFILIATES * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ //! Pipeline Error // use async_nats::error::Error as NatsError; pub use anyhow::{anyhow, anyhow as error, bail, ensure, Context, Error, Result}; pub trait PipelineErrorExt { /// Downcast the [`Error`] to a [`PipelineError`] fn try_into_pipeline_error(self) -> Result; /// If the [`Error`] can be downcast to a [`PipelineError`], then the left variant is returned, /// otherwise the right variant is returned. fn either_pipeline_error(self) -> either::Either; } impl PipelineErrorExt for Error { fn try_into_pipeline_error(self) -> Result { self.downcast::() } fn either_pipeline_error(self) -> either::Either { match self.downcast::() { Ok(err) => either::Left(err), Err(err) => either::Right(err), } } } #[derive(Debug, thiserror::Error)] pub enum PipelineError { /// For starter, to remove as code matures. #[error("Generic error: {0}")] Generic(String), /// Edges can only be set once. This error is thrown on subsequent attempts to set an edge.s #[error("Link failed: Edge already set")] EdgeAlreadySet, /// The source node is not connected to an edge. #[error("Disconnected source; no edge on which to send data")] NoEdge, #[error("SegmentSink is not connected to an EgressPort")] NoNetworkEdge, /// In the interim between when a request was made and when the stream was received, the /// requesting task was dropped. This maybe a logic error in the pipeline; and become a /// panic/fatal error in the future. This error is thrown when the `on_data` method of a /// terminating sink either cannot find the `oneshot` channel sender or the corresponding /// receiver was dropped #[error("Unlinked request; initiating request task was dropped or cancelled")] DetatchedStreamReceiver, // In the interim between when a response was made and when the stream was received, the // Sender for the stream was dropped. This maybe a logic error in the pipeline; and become a // panic/fatal error in the future. #[error("Unlinked response; response task was dropped or cancelled")] DetatchedStreamSender, #[error("Serialzation Error: {0}")] SerializationError(String), #[error("Deserialization Error: {0}")] DeserializationError(String), #[error("Failed to issue request to the control plane: {0}")] ControlPlaneRequestError(String), #[error("Failed to establish a streaming connection: {0}")] ConnectionFailed(String), #[error("Generate Error: {0}")] GenerateError(Error), #[error("NATS Request Error: {0}")] NatsRequestError(#[from] NatsError), #[error("NATS Get Stream Error: {0}")] NatsGetStreamError(#[from] NatsError), #[error("NATS Create Stream Error: {0}")] NatsCreateStreamError(#[from] NatsError), #[error("NATS Consumer Error: {0}")] NatsConsumerError(#[from] NatsError), #[error("NATS Batch Error: {0}")] NatsBatchError(#[from] NatsError), #[error("NATS Publish Error: {0}")] NatsPublishError(#[from] NatsError), #[error("NATS Connect Error: {0}")] NatsConnectError(#[from] NatsError), #[error("NATS Subscriber Error: {0}")] NatsSubscriberError(#[from] async_nats::SubscribeError), #[error("Local IP Address Error: {0}")] LocalIpAddressError(#[from] local_ip_address::Error), #[error("Prometheus Error: {0}")] PrometheusError(#[from] prometheus::Error), #[error("Other NATS Error: {0}")] NatsError(#[from] Box), #[error("Two Part Codec Error: {0}")] TwoPartCodec(#[from] TwoPartCodecError), #[error("Serde Json Error: {0}")] SerdeJsonError(#[from] serde_json::Error), #[error("NATS KV Err: {0} for bucket '{1}")] KeyValueError(String, String), } #[derive(Debug, thiserror::Error)] pub enum TwoPartCodecError { #[error("I/O error: {0}")] Io(#[from] std::io::Error), #[error("Message size {0} exceeds the maximum allowed size of {1} bytes")] MessageTooLarge(usize, usize), #[error("Invalid message: {0}")] InvalidMessage(String), #[error("Checksum mismatch")] ChecksumMismatch, }