Rust future(stream) timeout

x1957

x1957

Posted on June 12, 2019

Rust future(stream) timeout

打开tokio-timer可以看到timout

#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<T> {
    value: T,
    delay: Delay,
}

很简单的结构,value就是我们要加timeout的future或者stream,delay就是多久之后就timeout(:

看看他实现的Future和Stream trait

impl<T> Future for Timeout<T>
where
    T: Future,
{
    type Item = T::Item;
    type Error = Error<T::Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        // First, try polling the future
        match self.value.poll() {
            Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
            Ok(Async::NotReady) => {}
            Err(e) => return Err(Error::inner(e)),
        }

        // Now check the timer
        match self.delay.poll() {
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Ok(Async::Ready(_)) => Err(Error::elapsed()),
            Err(e) => Err(Error::timer(e)),
        }
    }
}

impl<T> Stream for Timeout<T>
where
    T: Stream,
{
    type Item = T::Item;
    type Error = Error<T::Error>;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        // First, try polling the future
        match self.value.poll() {
            Ok(Async::Ready(v)) => {
                if v.is_some() {
                    self.delay.reset_timeout();
                }
                return Ok(Async::Ready(v));
            }
            Ok(Async::NotReady) => {}
            Err(e) => return Err(Error::inner(e)),
        }

        // Now check the timer
        match self.delay.poll() {
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Ok(Async::Ready(_)) => {
                self.delay.reset_timeout();
                Err(Error::elapsed())
            }
            Err(e) => Err(Error::timer(e)),
        }
    }

都是正常去poll value,如果ready了就直接返回,如果没有ready,那么再去poll dealy看看是否timeout,如果没有timeout返回notready,如果timeout了那就返回time elapsed error.

非常简单的逻辑实现了timeout(:

如果我要方便的使用timeout可能需要use tokio::prelude::FutureExt;

fn timeout(self, timeout: Duration) -> Timeout<Self>
    where
        Self: Sized,
    {
        Timeout::new(self, timeout)
    }

其实自己用Timeout::new也可以,但是xxx.timeout看着舒服一些

💖 💪 🙅 🚩
x1957
x1957

Posted on June 12, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Rust future(stream) timeout
rust Rust future(stream) timeout

June 12, 2019