sinks.rs 3.74 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
 * Copyright 2024-2025 NVIDIA CORPORATION & AFFILIATES
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

use super::{
    async_trait, private::Token, Arc, Edge, OnceLock, PipelineError, Service, Sink, Source,
};
use crate::pipeline::{PipelineIO, ServiceEngine};

mod base;
mod pipeline;
mod segment;

pub(crate) struct SinkEdge<Resp: PipelineIO> {
    edge: OnceLock<Edge<Resp>>,
}

pub struct ServiceBackend<Req: PipelineIO, Resp: PipelineIO> {
    engine: ServiceEngine<Req, Resp>,
    inner: SinkEdge<Resp>,
}

// todo - use a once lock of a TransportEngine
pub struct SegmentSink<Req: PipelineIO, Resp: PipelineIO> {
    engine: OnceLock<ServiceEngine<Req, Resp>>,
    inner: SinkEdge<Resp>,
}

#[allow(dead_code)]
pub struct EgressPort<Req: PipelineIO, Resp: PipelineIO> {
    engine: Service<Req, Resp>,
}

// impl<Resp: PipelineIO> SegmentSink<Req, Resp> {
//     pub connect(&self)
// }

// impl<Req, Resp> EgressPort<Req, Resp>
// where
//     Req: PipelineIO + Serialize,
//     Resp: for<'de> Deserialize<'de> + DataType,
// {
// }

// #[async_trait]
// impl<Req, Resp> AsyncEngine<Context<Req>, Annotated<Resp>> for EgressPort<Req, Resp>
// where
//     Req: PipelineIO + Serialize,
//     Resp: for<'de> Deserialize<'de> + DataType,
// {
//     async fn generate(&self, request: Context<Req>) -> Result<Resp, GenerateError> {
//         // when publish our request, we need to publish it with a subject
//         // we will use a trait in the future
//         let tx_subject = "tx-model-subject".to_string();

//         let rx_subject = "rx-model-subject".to_string();

//         // make a response channel
//         let (bytes_tx, bytes_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(16);

//         // register the bytes_tx sender with the response subject
//         // let bytes_stream = self.response_subscriber.register(rx_subject, bytes_tx);

//         // ask network impl for a Sender to the cancellation channel

//         let request = request
//             .try_map(|req| bincode::serialize(&req))
//             .map_err(|e| {
//                 GenerateError(format!(
//                     "Failed to serialize request in egress port: {}",
//                     e.to_string()
//                 ))
//             })?;

//         let (data, context) = request.transfer(());

//         let stream_ctx = Arc::new(StreamContext::from(context));

//         let shutdown_ctx = stream_ctx.clone();

//         let (live_tx, live_rx) = tokio::sync::oneshot::channel::<()>();

//         let byte_stream = ReceiverStream::new(bytes_rx);

//         let decoded = byte_stream
//             // decode the response
//             .map(move |item| {
//                 bincode::deserialize::<Annotated<Resp>>(&item)
//                     .expect("failed to deserialize response")
//             })
//             .scan(Some(live_tx), move |live_tx, item| {
//                 match item {
//                     Annotated::End => {
//                         // this essentially drops the channel
//                         let _ = live_tx.take();
//                     }
//                     _ => {}
//                 }
//                 futures::future::ready(Some(item))
//             });

//         return Ok(ResponseStream::new(Box::pin(decoded), stream_ctx));
//     }
// }