soak.rs 7.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
17
18
19
20
21
22
23
24
25
26
// cargo test --test soak integration::main --features integration
//!
//! It will send a batch of requests to the runtime and measure the throughput.
//!
//! It will also measure the latency of the requests.
//!
//! A reasonable soak test configuration to start off is 1 minute duration with 10000 batch load:
//! export DYN_QUEUED_UP_PROCESSING=true
//! export DYN_SOAK_BATCH_LOAD=10000
//! export DYN_SOAK_RUN_DURATION=60s
//! cargo test --test soak integration::main --features integration -- --nocapture
27
28
29
#[cfg(feature = "integration")]
mod integration {

Neelay Shah's avatar
Neelay Shah committed
30
    pub const DEFAULT_NAMESPACE: &str = "dynamo";
31

Neelay Shah's avatar
Neelay Shah committed
32
    use dynamo_runtime::{
33
        DistributedRuntime, ErrorContext, Result, Runtime, Worker, logging,
34
        pipeline::{
35
36
            AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
            SingleIn, async_trait, network::Ingress,
37
38
        },
        protocols::annotated::Annotated,
39
        stream,
40
    };
41
    use futures::StreamExt;
42
43
    use std::{
        sync::Arc,
44
        sync::atomic::{AtomicU64, Ordering},
45
46
        time::Duration,
    };
47
    use tokio::time::Instant;
48
49
50

    #[test]
    fn main() -> Result<()> {
51
        logging::init();
52
53
54
55
56
57
58
59
60
61
62
        let worker = Worker::from_settings()?;
        worker.execute(app)
    }

    async fn app(runtime: Runtime) -> Result<()> {
        let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
        let server = tokio::spawn(backend(distributed.clone()));
        let client = tokio::spawn(client(distributed.clone()));

        client.await??;
        distributed.shutdown();
63
64
65
66
67
68
69
70
        let handler = server.await??;

        // Print final backend counter value
        let final_count = handler.backend_counter.load(Ordering::Relaxed);
        println!(
            "Final RequestHandler backend_counter: {} requests processed",
            final_count
        );
71
72
73
74

        Ok(())
    }

75
76
77
78
    struct RequestHandler {
        backend_counter: AtomicU64,
        queued_up_processing: bool,
    }
79
80

    impl RequestHandler {
81
82
83
84
85
        fn new(queued_up_processing: bool) -> Arc<Self> {
            Arc::new(Self {
                backend_counter: AtomicU64::new(0),
                queued_up_processing,
            })
86
87
88
89
90
91
92
93
        }
    }

    #[async_trait]
    impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
        async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
            let (data, ctx) = input.into_parts();

94
95
96
            // Increment backend counter
            self.backend_counter.fetch_add(1, Ordering::Relaxed);

97
98
99
100
101
            let chars = data
                .chars()
                .map(|c| Annotated::from_data(c.to_string()))
                .collect::<Vec<_>>();

102
103
104
105
106
107
108
109
110
111
112
113
114
115
            if self.queued_up_processing {
                // queued up processing - delayed response to saturate the queue
                let async_stream = async_stream::stream! {
                    for c in chars {
                        yield c;
                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    }
                };
                Ok(ResponseStream::new(Box::pin(async_stream), ctx.context()))
            } else {
                // normal processing - immediate response
                let iter_stream = stream::iter(chars);
                Ok(ResponseStream::new(Box::pin(iter_stream), ctx.context()))
            }
116
117
118
        }
    }

119
120
121
122
123
124
    async fn backend(runtime: DistributedRuntime) -> Result<Arc<RequestHandler>> {
        // get the queued up processing setting from env (not delayed)
        let queued_up_processing =
            std::env::var("DYN_QUEUED_UP_PROCESSING").unwrap_or("false".to_string());
        let queued_up_processing: bool = queued_up_processing.parse().unwrap_or(false);

125
        // attach an ingress to an engine
126
127
        let handler = RequestHandler::new(queued_up_processing);
        let ingress = Ingress::for_engine(handler.clone())?;
128
129
130
131
132
133
134
135
136
137
138
139
140

        // // make the ingress discoverable via a component service
        // // we must first create a service, then we can attach one more more endpoints
        runtime
            .namespace(DEFAULT_NAMESPACE)?
            .component("backend")?
            .service_builder()
            .create()
            .await?
            .endpoint("generate")
            .endpoint_builder()
            .handler(ingress)
            .start()
141
142
143
            .await?;

        Ok(handler)
144
145
146
    }

    async fn client(runtime: DistributedRuntime) -> Result<()> {
147
        // get the run duration from env
148
        let run_duration = std::env::var("DYN_SOAK_RUN_DURATION").unwrap_or("3s".to_string());
149
        let run_duration =
150
            humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(3));
151

152
153
        let batch_load = std::env::var("DYN_SOAK_BATCH_LOAD").unwrap_or("100".to_string());
        let batch_load: usize = batch_load.parse().unwrap_or(100);
154

155
156
157
158
        let client = runtime
            .namespace(DEFAULT_NAMESPACE)?
            .component("backend")?
            .endpoint("generate")
159
            .client()
160
161
            .await?;

162
        client.wait_for_instances().await?;
163
164
165
166
        let router =
            PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
                .await?;
        let router = Arc::new(router);
167

168
169
170
171
172
173
        let start = Instant::now();
        let mut count = 0;

        loop {
            let mut tasks = Vec::new();
            for _ in 0..batch_load {
174
                let router = router.clone();
175
176
                tasks.push(tokio::spawn(async move {
                    let mut stream = tokio::time::timeout(
177
178
                        Duration::from_secs(5),
                        router.random("hello world".to_string().into()),
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
                    )
                    .await
                    .context("request timed out")??;

                    while let Some(_resp) =
                        tokio::time::timeout(Duration::from_secs(30), stream.next())
                            .await
                            .context("stream timed out")?
                    {}
                    Ok::<(), Error>(())
                }));
            }

            for task in tasks.into_iter() {
                task.await??;
            }

            let elapsed = start.elapsed();
            count += batch_load;
198
199
200
            if count % 1000 == 0 {
                println!("elapsed: {:?}; count: {}", elapsed, count);
            }
201
202
203
204
205

            if elapsed > run_duration {
                println!("done");
                break;
            }
206
207
208
209
210
        }

        Ok(())
    }
}