soak.rs 6.78 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
5
6
7
8
9
10
11
12
13
14
// cargo test --test soak integration::main --features integration
//!
//! It will send a batch of requests to the runtime and measure the throughput.
//!
//! It will also measure the latency of the requests.
//!
//! A reasonable soak test configuration to start off is 1 minute duration with 10000 batch load:
//! export DYN_QUEUED_UP_PROCESSING=true
//! export DYN_SOAK_BATCH_LOAD=10000
//! export DYN_SOAK_RUN_DURATION=60s
//! cargo test --test soak integration::main --features integration -- --nocapture
15
16
17
#[cfg(feature = "integration")]
mod integration {

Neelay Shah's avatar
Neelay Shah committed
18
    pub const DEFAULT_NAMESPACE: &str = "dynamo";
19

Neelay Shah's avatar
Neelay Shah committed
20
    use dynamo_runtime::{
21
        DistributedRuntime, Runtime, Worker,
22
23
        config::environment_names::testing as env_testing,
        logging,
24
        pipeline::{
25
26
            AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
            SingleIn, async_trait, network::Ingress,
27
28
        },
        protocols::annotated::Annotated,
29
        stream,
30
    };
31
32

    use anyhow::{Context, Result};
33
    use futures::StreamExt;
34
35
    use std::{
        sync::Arc,
36
        sync::atomic::{AtomicU64, Ordering},
37
38
        time::Duration,
    };
39
    use tokio::time::Instant;
40
41
42

    #[test]
    fn main() -> Result<()> {
43
        logging::init();
44
45
46
47
48
49
50
51
52
53
54
        let worker = Worker::from_settings()?;
        worker.execute(app)
    }

    async fn app(runtime: Runtime) -> Result<()> {
        let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
        let server = tokio::spawn(backend(distributed.clone()));
        let client = tokio::spawn(client(distributed.clone()));

        client.await??;
        distributed.shutdown();
55
56
57
58
59
60
61
62
        let handler = server.await??;

        // Print final backend counter value
        let final_count = handler.backend_counter.load(Ordering::Relaxed);
        println!(
            "Final RequestHandler backend_counter: {} requests processed",
            final_count
        );
63
64
65
66

        Ok(())
    }

67
68
69
70
    struct RequestHandler {
        backend_counter: AtomicU64,
        queued_up_processing: bool,
    }
71
72

    impl RequestHandler {
73
74
75
76
77
        fn new(queued_up_processing: bool) -> Arc<Self> {
            Arc::new(Self {
                backend_counter: AtomicU64::new(0),
                queued_up_processing,
            })
78
79
80
81
82
83
84
85
        }
    }

    #[async_trait]
    impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
        async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
            let (data, ctx) = input.into_parts();

86
87
88
            // Increment backend counter
            self.backend_counter.fetch_add(1, Ordering::Relaxed);

89
90
91
92
93
            let chars = data
                .chars()
                .map(|c| Annotated::from_data(c.to_string()))
                .collect::<Vec<_>>();

94
95
96
97
98
99
100
101
102
103
104
105
106
107
            if self.queued_up_processing {
                // queued up processing - delayed response to saturate the queue
                let async_stream = async_stream::stream! {
                    for c in chars {
                        yield c;
                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    }
                };
                Ok(ResponseStream::new(Box::pin(async_stream), ctx.context()))
            } else {
                // normal processing - immediate response
                let iter_stream = stream::iter(chars);
                Ok(ResponseStream::new(Box::pin(iter_stream), ctx.context()))
            }
108
109
110
        }
    }

111
112
113
    async fn backend(runtime: DistributedRuntime) -> Result<Arc<RequestHandler>> {
        // get the queued up processing setting from env (not delayed)
        let queued_up_processing =
114
            std::env::var(env_testing::DYN_QUEUED_UP_PROCESSING).unwrap_or("false".to_string());
115
116
        let queued_up_processing: bool = queued_up_processing.parse().unwrap_or(false);

117
        // attach an ingress to an engine
118
119
        let handler = RequestHandler::new(queued_up_processing);
        let ingress = Ingress::for_engine(handler.clone())?;
120
121
122

        // // make the ingress discoverable via a component service
        // // we must first create a service, then we can attach one more more endpoints
123
124
125
        let mut component = runtime.namespace(DEFAULT_NAMESPACE)?.component("backend")?;
        component.add_stats_service().await?;
        component
126
127
128
129
            .endpoint("generate")
            .endpoint_builder()
            .handler(ingress)
            .start()
130
131
132
            .await?;

        Ok(handler)
133
134
135
    }

    async fn client(runtime: DistributedRuntime) -> Result<()> {
136
        // get the run duration from env
137
138
        let run_duration =
            std::env::var(env_testing::DYN_SOAK_RUN_DURATION).unwrap_or("3s".to_string());
139
        let run_duration =
140
            humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(3));
141

142
143
        let batch_load =
            std::env::var(env_testing::DYN_SOAK_BATCH_LOAD).unwrap_or("100".to_string());
144
        let batch_load: usize = batch_load.parse().unwrap_or(100);
145

146
147
148
149
        let client = runtime
            .namespace(DEFAULT_NAMESPACE)?
            .component("backend")?
            .endpoint("generate")
150
            .client()
151
152
            .await?;

153
        client.wait_for_instances().await?;
154
155
156
157
        let router =
            PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
                .await?;
        let router = Arc::new(router);
158

159
160
161
162
163
164
        let start = Instant::now();
        let mut count = 0;

        loop {
            let mut tasks = Vec::new();
            for _ in 0..batch_load {
165
                let router = router.clone();
166
167
                tasks.push(tokio::spawn(async move {
                    let mut stream = tokio::time::timeout(
168
169
                        Duration::from_secs(5),
                        router.random("hello world".to_string().into()),
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
                    )
                    .await
                    .context("request timed out")??;

                    while let Some(_resp) =
                        tokio::time::timeout(Duration::from_secs(30), stream.next())
                            .await
                            .context("stream timed out")?
                    {}
                    Ok::<(), Error>(())
                }));
            }

            for task in tasks.into_iter() {
                task.await??;
            }

            let elapsed = start.elapsed();
            count += batch_load;
189
190
191
            if count % 1000 == 0 {
                println!("elapsed: {:?}; count: {}", elapsed, count);
            }
192
193
194
195
196

            if elapsed > run_duration {
                println!("done");
                break;
            }
197
198
199
200
201
        }

        Ok(())
    }
}