1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use super::CollectThenTry;
use crate::support::assert_future;
use futures_core::TryStream;

impl<S: ?Sized + TryStream> TryStreamExt for S {}

/// Alternative adapters for `Result`-returning streams
pub trait TryStreamExt: TryStream {
    /// Attempts to run this stream to completion, executing the provided asynchronous closure for
    /// each element on the stream concurrently as elements become available. Runs the stream to
    /// completion, then exits with:
    ///
    /// - `Ok(())` if all elements were processed successfully.
    /// - `Err(error)` if an error occurred while processing an element. The first error encountered
    ///   is cached and returned.
    ///
    /// This is similar to
    /// [`try_for_each_concurrent`](futures::stream::TryStreamExt::try_for_each_concurrent),
    /// but will continue running the stream to completion even if an error is encountered.
    ///
    /// This method is only available when the `std` or `alloc` feature of this library is
    /// activated, and it is activated by default.
    ///
    /// # Examples
    ///
    /// ```
    /// use cancel_safe_futures::stream::TryStreamExt;
    /// use tokio::sync::oneshot;
    /// use futures_util::{stream, FutureExt, StreamExt};
    ///
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    /// let (tx1, rx1) = oneshot::channel();
    /// let (tx2, rx2) = oneshot::channel();
    /// let (tx3, rx3) = oneshot::channel();
    ///
    /// let stream = stream::iter(vec![rx1, rx2, rx3]);
    /// let fut = stream.map(Ok).for_each_concurrent_then_try(
    ///     /* limit */ 2,
    ///     |rx| async move {
    ///         let res: Result<(), oneshot::error::RecvError> = rx.await;
    ///         res
    ///     }
    /// );
    ///
    /// tx1.send(()).unwrap();
    /// // Drop the second sender so that `rx2` resolves to `Canceled`.
    /// drop(tx2);
    ///
    /// // Unlike `try_for_each_concurrent`, tx3 also needs to be resolved
    /// // before the future will finish execution. This causes `now_or_never` to
    /// // return None.
    /// let mut fut = std::pin::pin!(fut);
    /// assert_eq!(fut.as_mut().now_or_never(), None);
    ///
    /// tx3.send(()).unwrap();
    ///
    /// // The final result is an error because the second future
    /// // resulted in an error.
    /// fut.await.unwrap_err();
    /// # }
    /// ```
    #[cfg(not(futures_no_atomic_cas))]
    #[cfg(feature = "alloc")]
    fn for_each_concurrent_then_try<Fut, F>(
        self,
        limit: impl Into<Option<usize>>,
        f: F,
    ) -> super::ForEachConcurrentThenTry<Self, Fut, F>
    where
        F: FnMut(Self::Ok) -> Fut,
        Fut: core::future::Future<Output = Result<(), Self::Error>>,
        Self: Sized,
    {
        assert_future::<Result<(), Self::Error>, _>(super::ForEachConcurrentThenTry::new(
            self,
            limit.into(),
            f,
        ))
    }

    /// Attempt to transform a stream into a collection, returning a future representing the result
    /// of that computation.
    ///
    /// This adapter will collect all successful results of this stream and collect them into the
    /// specified collection type. Unlike
    /// [`try_collect`](futures::stream::TryStreamExt::try_collect), if an error happens then the
    /// stream will still be run to completion.
    ///
    /// If more than one error is produced, this adapter will return the first error encountered.
    ///
    /// The returned future will be resolved when the stream terminates.
    ///
    /// # Notes
    ///
    /// This adapter does not expose a way to gather and combine all returned errors. Implementing that
    /// is a future goal, but it requires some design work for a generic way to combine errors. To
    /// do that today, use [`futures::StreamExt::collect`] and combine errors at the end.
    ///
    /// # Examples
    ///
    /// This example uses the [`async-stream`](https://docs.rs/async-stream) crate to create a
    /// stream with interspersed `Ok` and `Err` values.
    ///
    /// ```
    /// use cancel_safe_futures::stream::TryStreamExt;
    /// use std::sync::atomic::{AtomicBool, Ordering};
    ///
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    /// let end_of_stream = AtomicBool::new(false);
    /// let end_ref = &end_of_stream;
    ///
    /// // This stream generates interspersed Ok and Err values.
    /// let stream = async_stream::stream! {
    ///     for i in 1..=4 {
    ///         yield Ok(i);
    ///     }
    ///     yield Err(5);
    ///     for i in 6..=9 {
    ///         yield Ok(i);
    ///     }
    ///     yield Err(10);
    ///
    ///     end_ref.store(true, Ordering::SeqCst);
    /// };
    ///
    /// let output: Result<Vec<i32>, i32> = stream.collect_then_try().await;
    ///
    /// // The first error encountered is returned.
    /// assert_eq!(output, Err(5));
    ///
    /// // The stream is still run to completion even though it errored out in the middle.
    /// assert!(end_of_stream.load(Ordering::SeqCst));
    /// # }
    /// ```
    fn collect_then_try<C: Default + Extend<Self::Ok>>(self) -> CollectThenTry<Self, C>
    where
        Self: Sized,
    {
        assert_future::<Result<C, Self::Error>, _>(CollectThenTry::new(self))
    }
}