Skip to main content

cancel_safe_futures/stream/
collect_then_try.rs

1use core::{mem, pin::Pin};
2use futures_core::{
3    future::{FusedFuture, Future},
4    ready,
5    stream::{FusedStream, TryStream},
6    task::{Context, Poll},
7};
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// Future for the [`collect_then_try`](super::TryStreamExt::collect_then_try) method.
12    #[derive(Debug)]
13    #[must_use = "futures do nothing unless you `.await` or poll them"]
14    pub struct CollectThenTry<St: TryStream, C> {
15        #[pin]
16        stream: St,
17        items: C,
18        first_error: Option<St::Error>,
19    }
20}
21
22impl<St: TryStream, C: Default> CollectThenTry<St, C> {
23    pub(super) fn new(s: St) -> Self {
24        Self {
25            stream: s,
26            items: Default::default(),
27            first_error: None,
28        }
29    }
30}
31
32impl<St: TryStream, C> FusedFuture for CollectThenTry<St, C>
33where
34    St: TryStream + FusedStream,
35    C: Default + Extend<St::Ok>,
36{
37    fn is_terminated(&self) -> bool {
38        self.stream.is_terminated()
39    }
40}
41
42impl<St, C> Future for CollectThenTry<St, C>
43where
44    St: TryStream,
45    C: Default + Extend<St::Ok>,
46{
47    type Output = Result<C, St::Error>;
48
49    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50        let mut this = self.project();
51        Poll::Ready(loop {
52            match ready!(this.stream.as_mut().try_poll_next(cx)) {
53                Some(Ok(x)) => this.items.extend(Some(x)),
54                Some(Err(e)) => {
55                    if this.first_error.is_none() {
56                        *this.first_error = Some(e);
57                    }
58                }
59                None => {
60                    if let Some(e) = this.first_error.take() {
61                        break Err(e);
62                    }
63                    break Ok(mem::take(this.items));
64                }
65            }
66        })
67    }
68}