cancel_safe_futures/stream/
collect_then_try.rs1use core::{mem, pin::Pin};
2use futures_core::{
3 future::{FusedFuture, Future},
4 ready,
5 stream::{FusedStream, TryStream},
6 task::{Context, Poll},
7};
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[derive(Debug)]
13 #[must_use = "futures do nothing unless you `.await` or poll them"]
14 pub struct CollectThenTry<St: TryStream, C> {
15 #[pin]
16 stream: St,
17 items: C,
18 first_error: Option<St::Error>,
19 }
20}
21
22impl<St: TryStream, C: Default> CollectThenTry<St, C> {
23 pub(super) fn new(s: St) -> Self {
24 Self {
25 stream: s,
26 items: Default::default(),
27 first_error: None,
28 }
29 }
30}
31
32impl<St: TryStream, C> FusedFuture for CollectThenTry<St, C>
33where
34 St: TryStream + FusedStream,
35 C: Default + Extend<St::Ok>,
36{
37 fn is_terminated(&self) -> bool {
38 self.stream.is_terminated()
39 }
40}
41
42impl<St, C> Future for CollectThenTry<St, C>
43where
44 St: TryStream,
45 C: Default + Extend<St::Ok>,
46{
47 type Output = Result<C, St::Error>;
48
49 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 let mut this = self.project();
51 Poll::Ready(loop {
52 match ready!(this.stream.as_mut().try_poll_next(cx)) {
53 Some(Ok(x)) => this.items.extend(Some(x)),
54 Some(Err(e)) => {
55 if this.first_error.is_none() {
56 *this.first_error = Some(e);
57 }
58 }
59 None => {
60 if let Some(e) = this.first_error.take() {
61 break Err(e);
62 }
63 break Ok(mem::take(this.items));
64 }
65 }
66 })
67 }
68}