DEV Community

x1957
x1957

Posted on

Rust future(stream) timeout

打开tokio-timer可以看到timout

#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<T> {
    value: T,
    delay: Delay,
}

很简单的结构,value就是我们要加timeout的future或者stream,delay就是多久之后就timeout(:

看看他实现的Future和Stream trait

impl<T> Future for Timeout<T>
where
    T: Future,
{
    type Item = T::Item;
    type Error = Error<T::Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        // First, try polling the future
        match self.value.poll() {
            Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
            Ok(Async::NotReady) => {}
            Err(e) => return Err(Error::inner(e)),
        }

        // Now check the timer
        match self.delay.poll() {
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Ok(Async::Ready(_)) => Err(Error::elapsed()),
            Err(e) => Err(Error::timer(e)),
        }
    }
}

impl<T> Stream for Timeout<T>
where
    T: Stream,
{
    type Item = T::Item;
    type Error = Error<T::Error>;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        // First, try polling the future
        match self.value.poll() {
            Ok(Async::Ready(v)) => {
                if v.is_some() {
                    self.delay.reset_timeout();
                }
                return Ok(Async::Ready(v));
            }
            Ok(Async::NotReady) => {}
            Err(e) => return Err(Error::inner(e)),
        }

        // Now check the timer
        match self.delay.poll() {
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Ok(Async::Ready(_)) => {
                self.delay.reset_timeout();
                Err(Error::elapsed())
            }
            Err(e) => Err(Error::timer(e)),
        }
    }

都是正常去poll value,如果ready了就直接返回,如果没有ready,那么再去poll dealy看看是否timeout,如果没有timeout返回notready,如果timeout了那就返回time elapsed error.

非常简单的逻辑实现了timeout(:

如果我要方便的使用timeout可能需要use tokio::prelude::FutureExt;

fn timeout(self, timeout: Duration) -> Timeout<Self>
    where
        Self: Sized,
    {
        Timeout::new(self, timeout)
    }

其实自己用Timeout::new也可以,但是xxx.timeout看着舒服一些

Top comments (0)