Commit 00bc41d8 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

refactor: adding tests and simplifying DeadlineStream (#243)

-   Minor update to DeadlineStream
-   Adding tests
parent 26f6008a
......@@ -24,15 +24,15 @@ use tokio::time::{self, sleep_until, Duration, Instant, Sleep};
pub struct DeadlineStream<S> {
stream: S,
sleep: Pin<Box<Sleep>>,
deadline: Instant,
}
impl<S: Stream + Unpin> Stream for DeadlineStream<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First, check if our sleep future has completed
if Pin::new(&mut self.sleep).poll(cx).is_ready() {
// Check if we've passed the deadline
if Instant::now() >= self.deadline {
// The deadline expired; end the stream now
return Poll::Ready(None);
}
......@@ -43,8 +43,33 @@ impl<S: Stream + Unpin> Stream for DeadlineStream<S> {
}
pub fn until_deadline<S: Stream + Unpin>(stream: S, deadline: Instant) -> DeadlineStream<S> {
DeadlineStream {
stream,
sleep: Box::pin(sleep_until(deadline)),
DeadlineStream { stream, deadline }
}
#[cfg(test)]
mod tests {
use futures::stream::{self, Stream, StreamExt};
use tokio::pin;
use super::*;
#[tokio::test]
async fn test_until_deadline() {
let stream = stream::iter(vec![100, 100, 200]);
let stream = stream.then(|x| {
let sleep = time::sleep(Duration::from_millis(x));
async move {
sleep.await;
x
}
});
let deadline = Instant::now() + Duration::from_millis(300);
let mut result = Vec::new();
pin!(stream);
let mut stream = until_deadline(stream, deadline);
while let Some(x) = stream.next().await {
result.push(x);
}
assert_eq!(result, vec![100, 100]);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment