soak.rs 6.66 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
5
6
7
8
9
10
11
12
13
14
// cargo test --test soak integration::main --features integration
//!
//! It will send a batch of requests to the runtime and measure the throughput.
//!
//! It will also measure the latency of the requests.
//!
//! A reasonable soak test configuration to start off is 1 minute duration with 10000 batch load:
//! export DYN_QUEUED_UP_PROCESSING=true
//! export DYN_SOAK_BATCH_LOAD=10000
//! export DYN_SOAK_RUN_DURATION=60s
//! cargo test --test soak integration::main --features integration -- --nocapture
15
16
17
#[cfg(feature = "integration")]
mod integration {

Neelay Shah's avatar
Neelay Shah committed
18
    pub const DEFAULT_NAMESPACE: &str = "dynamo";
19

Neelay Shah's avatar
Neelay Shah committed
20
    use dynamo_runtime::{
21
        DistributedRuntime, Runtime, Worker, logging,
22
        pipeline::{
23
24
            AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
            SingleIn, async_trait, network::Ingress,
25
26
        },
        protocols::annotated::Annotated,
27
        stream,
28
    };
29
30

    use anyhow::{Context, Result};
31
    use futures::StreamExt;
32
33
    use std::{
        sync::Arc,
34
        sync::atomic::{AtomicU64, Ordering},
35
36
        time::Duration,
    };
37
    use tokio::time::Instant;
38
39
40

    #[test]
    fn main() -> Result<()> {
41
        logging::init();
42
43
44
45
46
47
48
49
50
51
52
        let worker = Worker::from_settings()?;
        worker.execute(app)
    }

    async fn app(runtime: Runtime) -> Result<()> {
        let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
        let server = tokio::spawn(backend(distributed.clone()));
        let client = tokio::spawn(client(distributed.clone()));

        client.await??;
        distributed.shutdown();
53
54
55
56
57
58
59
60
        let handler = server.await??;

        // Print final backend counter value
        let final_count = handler.backend_counter.load(Ordering::Relaxed);
        println!(
            "Final RequestHandler backend_counter: {} requests processed",
            final_count
        );
61
62
63
64

        Ok(())
    }

65
66
67
68
    struct RequestHandler {
        backend_counter: AtomicU64,
        queued_up_processing: bool,
    }
69
70

    impl RequestHandler {
71
72
73
74
75
        fn new(queued_up_processing: bool) -> Arc<Self> {
            Arc::new(Self {
                backend_counter: AtomicU64::new(0),
                queued_up_processing,
            })
76
77
78
79
80
81
82
83
        }
    }

    #[async_trait]
    impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
        async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
            let (data, ctx) = input.into_parts();

84
85
86
            // Increment backend counter
            self.backend_counter.fetch_add(1, Ordering::Relaxed);

87
88
89
90
91
            let chars = data
                .chars()
                .map(|c| Annotated::from_data(c.to_string()))
                .collect::<Vec<_>>();

92
93
94
95
96
97
98
99
100
101
102
103
104
105
            if self.queued_up_processing {
                // queued up processing - delayed response to saturate the queue
                let async_stream = async_stream::stream! {
                    for c in chars {
                        yield c;
                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    }
                };
                Ok(ResponseStream::new(Box::pin(async_stream), ctx.context()))
            } else {
                // normal processing - immediate response
                let iter_stream = stream::iter(chars);
                Ok(ResponseStream::new(Box::pin(iter_stream), ctx.context()))
            }
106
107
108
        }
    }

109
110
111
112
113
114
    async fn backend(runtime: DistributedRuntime) -> Result<Arc<RequestHandler>> {
        // get the queued up processing setting from env (not delayed)
        let queued_up_processing =
            std::env::var("DYN_QUEUED_UP_PROCESSING").unwrap_or("false".to_string());
        let queued_up_processing: bool = queued_up_processing.parse().unwrap_or(false);

115
        // attach an ingress to an engine
116
117
        let handler = RequestHandler::new(queued_up_processing);
        let ingress = Ingress::for_engine(handler.clone())?;
118
119
120

        // // make the ingress discoverable via a component service
        // // we must first create a service, then we can attach one more more endpoints
121
122
123
        let mut component = runtime.namespace(DEFAULT_NAMESPACE)?.component("backend")?;
        component.add_stats_service().await?;
        component
124
125
126
127
            .endpoint("generate")
            .endpoint_builder()
            .handler(ingress)
            .start()
128
129
130
            .await?;

        Ok(handler)
131
132
133
    }

    async fn client(runtime: DistributedRuntime) -> Result<()> {
134
        // get the run duration from env
135
        let run_duration = std::env::var("DYN_SOAK_RUN_DURATION").unwrap_or("3s".to_string());
136
        let run_duration =
137
            humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(3));
138

139
140
        let batch_load = std::env::var("DYN_SOAK_BATCH_LOAD").unwrap_or("100".to_string());
        let batch_load: usize = batch_load.parse().unwrap_or(100);
141

142
143
144
145
        let client = runtime
            .namespace(DEFAULT_NAMESPACE)?
            .component("backend")?
            .endpoint("generate")
146
            .client()
147
148
            .await?;

149
        client.wait_for_instances().await?;
150
151
152
153
        let router =
            PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
                .await?;
        let router = Arc::new(router);
154

155
156
157
158
159
160
        let start = Instant::now();
        let mut count = 0;

        loop {
            let mut tasks = Vec::new();
            for _ in 0..batch_load {
161
                let router = router.clone();
162
163
                tasks.push(tokio::spawn(async move {
                    let mut stream = tokio::time::timeout(
164
165
                        Duration::from_secs(5),
                        router.random("hello world".to_string().into()),
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
                    )
                    .await
                    .context("request timed out")??;

                    while let Some(_resp) =
                        tokio::time::timeout(Duration::from_secs(30), stream.next())
                            .await
                            .context("stream timed out")?
                    {}
                    Ok::<(), Error>(())
                }));
            }

            for task in tasks.into_iter() {
                task.await??;
            }

            let elapsed = start.elapsed();
            count += batch_load;
185
186
187
            if count % 1000 == 0 {
                println!("elapsed: {:?}; count: {}", elapsed, count);
            }
188
189
190
191
192

            if elapsed > run_duration {
                println!("done");
                break;
            }
193
194
195
196
197
        }

        Ok(())
    }
}