cancel_safe_futures/future/
join_all_then_try.rs

1//! Definition of the `JoinAllThenTry` adapter, waiting for all of a list of
2//! futures to finish with either success or error.
3
4#[cfg(not(futures_no_atomic_cas))]
5use crate::stream::{CollectThenTry, TryStreamExt};
6use crate::support::assert_future;
7use alloc::{boxed::Box, vec::Vec};
8use core::{
9    fmt,
10    future::Future,
11    iter::FromIterator,
12    mem,
13    pin::Pin,
14    task::{Context, Poll},
15};
16use futures_core::future::TryFuture;
17use futures_util::future::{IntoFuture, MaybeDone, TryFutureExt};
18#[cfg(not(futures_no_atomic_cas))]
19use futures_util::stream::FuturesOrdered;
20
21#[cfg(not(futures_no_atomic_cas))]
22pub(crate) const SMALL: usize = 30;
23
24/// Future for the [`join_all_then_try`] function.
25#[must_use = "futures do nothing unless you `.await` or poll them"]
26pub struct JoinAllThenTry<F>
27where
28    F: TryFuture,
29{
30    kind: JoinAllKindThenTry<F>,
31}
32
33enum JoinAllKindThenTry<F>
34where
35    F: TryFuture,
36{
37    Small {
38        elems: Pin<Box<[MaybeDone<IntoFuture<F>>]>>,
39    },
40    #[cfg(not(futures_no_atomic_cas))]
41    Big {
42        // The use of FuturesOrdered here ensures that in case of errors, the first future listed in
43        // the iterator that errors out will be returned.
44        fut: CollectThenTry<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,
45    },
46}
47
48impl<F> fmt::Debug for JoinAllThenTry<F>
49where
50    F: TryFuture + fmt::Debug,
51    F::Ok: fmt::Debug,
52    F::Error: fmt::Debug,
53    F::Output: fmt::Debug,
54{
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self.kind {
57            JoinAllKindThenTry::Small { ref elems } => f
58                .debug_struct("JoinAllThenTry")
59                .field("elems", elems)
60                .finish(),
61            #[cfg(not(futures_no_atomic_cas))]
62            JoinAllKindThenTry::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
63        }
64    }
65}
66
67/// Creates a future which represents either a collection of the results of the futures given or an
68/// error.
69///
70/// The returned future will drive execution for all of its underlying futures, collecting the
71/// results into a destination `Vec<T>` in the same order as they were provided.
72///
73/// Unlike [`futures::future::try_join_all`], if any future returns an error then all other futures
74/// will **not** be canceled. Instead, all other futures will be run to completion.
75///
76/// * If all futures complete successfully, then the returned future will succeed with a
77///   `Vec` of all the successful results.
78/// * If one or more futures fail, then the returned future will error out with the error
79///   for the first future listed that failed.
80///
81/// This function is only available when the `std` or `alloc` feature of this library is activated,
82/// and it is activated by default.
83///
84/// # Why use `join_all_then_try`?
85///
86/// See the documentation for [`join_then_try`](crate::join_then_try) for a discussion of why you might
87/// want to use a `then_try` adapter.
88///
89/// # Notes
90///
91/// This adapter does not expose a way to gather and combine all returned errors. Implementing that
92/// is a future goal, but it requires some design work for a generic way to combine errors. To
93/// do that today, use [`futures::future::join_all`] and combine errors at the end.
94///
95/// # See Also
96///
97/// `join_all_then_try` will switch to the more powerful [`FuturesOrdered`] for performance reasons if
98/// the number of futures is large. You may want to look into using it or its counterpart
99/// [`FuturesUnordered`][futures::stream::FuturesUnordered] directly.
100///
101/// Some examples for additional functionality provided by these are:
102///
103///  * Adding new futures to the set even after it has been started.
104///
105///  * Only polling the specific futures that have been woken. In cases where you have a lot of
106///    futures this will result in much more efficient polling.
107///
108/// # Examples
109///
110/// ```
111/// # #[tokio::main(flavor = "current_thread")]
112/// # async fn main() {
113/// use futures_util::future;
114/// use cancel_safe_futures::future::join_all_then_try;
115///
116/// let futures = vec![
117///     future::ok::<u32, u32>(1),
118///     future::ok::<u32, u32>(2),
119///     future::ok::<u32, u32>(3),
120/// ];
121///
122/// assert_eq!(join_all_then_try(futures).await, Ok(vec![1, 2, 3]));
123///
124/// let futures = vec![
125///     future::ok::<u32, u32>(1),
126///     future::err::<u32, u32>(2),
127///     future::ok::<u32, u32>(3),
128/// ];
129///
130/// assert_eq!(join_all_then_try(futures).await, Err(2));
131/// # }
132/// ```
133pub fn join_all_then_try<I>(iter: I) -> JoinAllThenTry<I::Item>
134where
135    I: IntoIterator,
136    I::Item: TryFuture,
137{
138    let iter = iter.into_iter().map(TryFutureExt::into_future);
139
140    #[cfg(futures_no_atomic_cas)]
141    {
142        let kind = JoinAllKindThenTry::Small {
143            elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
144        };
145
146        assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
147            JoinAllThenTry { kind },
148        )
149    }
150
151    #[cfg(not(futures_no_atomic_cas))]
152    {
153        let kind = match iter.size_hint().1 {
154            Some(max) if max <= SMALL => JoinAllKindThenTry::Small {
155                elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
156            },
157            _ => JoinAllKindThenTry::Big {
158                fut: iter.collect::<FuturesOrdered<_>>().collect_then_try(),
159            },
160        };
161
162        assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
163            JoinAllThenTry { kind },
164        )
165    }
166}
167
168impl<F> Future for JoinAllThenTry<F>
169where
170    F: TryFuture,
171{
172    type Output = Result<Vec<F::Ok>, F::Error>;
173
174    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175        enum FinalState {
176            Pending,
177            AllDone,
178        }
179
180        match &mut self.kind {
181            JoinAllKindThenTry::Small { elems } => {
182                let mut state = FinalState::AllDone;
183
184                for elem in iter_pin_mut(elems.as_mut()) {
185                    match elem.poll(cx) {
186                        Poll::Pending => state = FinalState::Pending,
187                        Poll::Ready(()) => {}
188                    }
189                }
190
191                match state {
192                    FinalState::Pending => Poll::Pending,
193                    FinalState::AllDone => {
194                        let mut elems = mem::replace(elems, Box::pin([]));
195                        let results: Result<Vec<_>, _> = iter_pin_mut(elems.as_mut())
196                            .map(|e| e.take_output().unwrap())
197                            .collect();
198                        Poll::Ready(results)
199                    }
200                }
201            }
202            #[cfg(not(futures_no_atomic_cas))]
203            JoinAllKindThenTry::Big { fut } => Pin::new(fut).poll(cx),
204        }
205    }
206}
207
208impl<F> FromIterator<F> for JoinAllThenTry<F>
209where
210    F: TryFuture,
211{
212    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
213        join_all_then_try(iter)
214    }
215}
216
217pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
218    // Safety: `std` _could_ make this unsound if it were to decide Pin's
219    // invariants aren't required to transmit through slices. Otherwise this has
220    // the same safety as a normal field pin projection.
221    unsafe { slice.get_unchecked_mut() }
222        .iter_mut()
223        .map(|t| unsafe { Pin::new_unchecked(t) })
224}