/* * Copyright 2024-2025 NVIDIA CORPORATION & AFFILIATES * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ use super::{ async_trait, private::Token, Arc, Edge, OnceLock, PipelineError, Service, Sink, Source, }; use crate::pipeline::{PipelineIO, ServiceEngine}; mod base; mod pipeline; mod segment; pub(crate) struct SinkEdge { edge: OnceLock>, } pub struct ServiceBackend { engine: ServiceEngine, inner: SinkEdge, } // todo - use a once lock of a TransportEngine pub struct SegmentSink { engine: OnceLock>, inner: SinkEdge, } #[allow(dead_code)] pub struct EgressPort { engine: Service, } // impl SegmentSink { // pub connect(&self) // } // impl EgressPort // where // Req: PipelineIO + Serialize, // Resp: for<'de> Deserialize<'de> + DataType, // { // } // #[async_trait] // impl AsyncEngine, Annotated> for EgressPort // where // Req: PipelineIO + Serialize, // Resp: for<'de> Deserialize<'de> + DataType, // { // async fn generate(&self, request: Context) -> Result { // // when publish our request, we need to publish it with a subject // // we will use a trait in the future // let tx_subject = "tx-model-subject".to_string(); // let rx_subject = "rx-model-subject".to_string(); // // make a response channel // let (bytes_tx, bytes_rx) = tokio::sync::mpsc::channel::>(16); // // register the bytes_tx sender with the response subject // // let bytes_stream = self.response_subscriber.register(rx_subject, bytes_tx); // // ask network impl for a Sender to the cancellation channel // let request = request // .try_map(|req| bincode::serialize(&req)) // .map_err(|e| { // GenerateError(format!( // "Failed to serialize request in egress port: {}", // e.to_string() // )) // })?; // let (data, context) = request.transfer(()); // let stream_ctx = Arc::new(StreamContext::from(context)); // let shutdown_ctx = stream_ctx.clone(); // let (live_tx, live_rx) = tokio::sync::oneshot::channel::<()>(); // let byte_stream = ReceiverStream::new(bytes_rx); // let decoded = byte_stream // // decode the response // .map(move |item| { // bincode::deserialize::>(&item) // .expect("failed to deserialize response") // }) // .scan(Some(live_tx), move |live_tx, item| { // match item { // Annotated::End => { // // this essentially drops the channel // let _ = live_tx.take(); // } // _ => {} // } // futures::future::ready(Some(item)) // }); // return Ok(ResponseStream::new(Box::pin(decoded), stream_ctx)); // } // }