cancel_safe_futures/future/join_all_then_try.rs
1//! Definition of the `JoinAllThenTry` adapter, waiting for all of a list of
2//! futures to finish with either success or error.
3
4#[cfg(not(futures_no_atomic_cas))]
5use crate::stream::{CollectThenTry, TryStreamExt};
6use crate::support::assert_future;
7use alloc::{boxed::Box, vec::Vec};
8use core::{
9 fmt,
10 future::Future,
11 iter::FromIterator,
12 mem,
13 pin::Pin,
14 task::{Context, Poll},
15};
16use futures_core::future::TryFuture;
17use futures_util::future::{IntoFuture, MaybeDone, TryFutureExt};
18#[cfg(not(futures_no_atomic_cas))]
19use futures_util::stream::FuturesOrdered;
20
21#[cfg(not(futures_no_atomic_cas))]
22pub(crate) const SMALL: usize = 30;
23
24/// Future for the [`join_all_then_try`] function.
25#[must_use = "futures do nothing unless you `.await` or poll them"]
26pub struct JoinAllThenTry<F>
27where
28 F: TryFuture,
29{
30 kind: JoinAllKindThenTry<F>,
31}
32
33enum JoinAllKindThenTry<F>
34where
35 F: TryFuture,
36{
37 Small {
38 elems: Pin<Box<[MaybeDone<IntoFuture<F>>]>>,
39 },
40 #[cfg(not(futures_no_atomic_cas))]
41 Big {
42 // The use of FuturesOrdered here ensures that in case of errors, the first future listed in
43 // the iterator that errors out will be returned.
44 fut: CollectThenTry<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,
45 },
46}
47
48impl<F> fmt::Debug for JoinAllThenTry<F>
49where
50 F: TryFuture + fmt::Debug,
51 F::Ok: fmt::Debug,
52 F::Error: fmt::Debug,
53 F::Output: fmt::Debug,
54{
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self.kind {
57 JoinAllKindThenTry::Small { ref elems } => f
58 .debug_struct("JoinAllThenTry")
59 .field("elems", elems)
60 .finish(),
61 #[cfg(not(futures_no_atomic_cas))]
62 JoinAllKindThenTry::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
63 }
64 }
65}
66
67/// Creates a future which represents either a collection of the results of the futures given or an
68/// error.
69///
70/// The returned future will drive execution for all of its underlying futures, collecting the
71/// results into a destination `Vec<T>` in the same order as they were provided.
72///
73/// Unlike [`futures::future::try_join_all`], if any future returns an error then all other futures
74/// will **not** be canceled. Instead, all other futures will be run to completion.
75///
76/// * If all futures complete successfully, then the returned future will succeed with a
77/// `Vec` of all the successful results.
78/// * If one or more futures fail, then the returned future will error out with the error
79/// for the first future listed that failed.
80///
81/// This function is only available when the `std` or `alloc` feature of this library is activated,
82/// and it is activated by default.
83///
84/// # Why use `join_all_then_try`?
85///
86/// See the documentation for [`join_then_try`](crate::join_then_try) for a discussion of why you might
87/// want to use a `then_try` adapter.
88///
89/// # Notes
90///
91/// This adapter does not expose a way to gather and combine all returned errors. Implementing that
92/// is a future goal, but it requires some design work for a generic way to combine errors. To
93/// do that today, use [`futures::future::join_all`] and combine errors at the end.
94///
95/// # See Also
96///
97/// `join_all_then_try` will switch to the more powerful [`FuturesOrdered`] for performance reasons if
98/// the number of futures is large. You may want to look into using it or its counterpart
99/// [`FuturesUnordered`][futures::stream::FuturesUnordered] directly.
100///
101/// Some examples for additional functionality provided by these are:
102///
103/// * Adding new futures to the set even after it has been started.
104///
105/// * Only polling the specific futures that have been woken. In cases where you have a lot of
106/// futures this will result in much more efficient polling.
107///
108/// # Examples
109///
110/// ```
111/// # #[tokio::main(flavor = "current_thread")]
112/// # async fn main() {
113/// use futures_util::future;
114/// use cancel_safe_futures::future::join_all_then_try;
115///
116/// let futures = vec![
117/// future::ok::<u32, u32>(1),
118/// future::ok::<u32, u32>(2),
119/// future::ok::<u32, u32>(3),
120/// ];
121///
122/// assert_eq!(join_all_then_try(futures).await, Ok(vec![1, 2, 3]));
123///
124/// let futures = vec![
125/// future::ok::<u32, u32>(1),
126/// future::err::<u32, u32>(2),
127/// future::ok::<u32, u32>(3),
128/// ];
129///
130/// assert_eq!(join_all_then_try(futures).await, Err(2));
131/// # }
132/// ```
133pub fn join_all_then_try<I>(iter: I) -> JoinAllThenTry<I::Item>
134where
135 I: IntoIterator,
136 I::Item: TryFuture,
137{
138 let iter = iter.into_iter().map(TryFutureExt::into_future);
139
140 #[cfg(futures_no_atomic_cas)]
141 {
142 let kind = JoinAllKindThenTry::Small {
143 elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
144 };
145
146 assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
147 JoinAllThenTry { kind },
148 )
149 }
150
151 #[cfg(not(futures_no_atomic_cas))]
152 {
153 let kind = match iter.size_hint().1 {
154 Some(max) if max <= SMALL => JoinAllKindThenTry::Small {
155 elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
156 },
157 _ => JoinAllKindThenTry::Big {
158 fut: iter.collect::<FuturesOrdered<_>>().collect_then_try(),
159 },
160 };
161
162 assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
163 JoinAllThenTry { kind },
164 )
165 }
166}
167
168impl<F> Future for JoinAllThenTry<F>
169where
170 F: TryFuture,
171{
172 type Output = Result<Vec<F::Ok>, F::Error>;
173
174 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175 enum FinalState {
176 Pending,
177 AllDone,
178 }
179
180 match &mut self.kind {
181 JoinAllKindThenTry::Small { elems } => {
182 let mut state = FinalState::AllDone;
183
184 for elem in iter_pin_mut(elems.as_mut()) {
185 match elem.poll(cx) {
186 Poll::Pending => state = FinalState::Pending,
187 Poll::Ready(()) => {}
188 }
189 }
190
191 match state {
192 FinalState::Pending => Poll::Pending,
193 FinalState::AllDone => {
194 let mut elems = mem::replace(elems, Box::pin([]));
195 let results: Result<Vec<_>, _> = iter_pin_mut(elems.as_mut())
196 .map(|e| e.take_output().unwrap())
197 .collect();
198 Poll::Ready(results)
199 }
200 }
201 }
202 #[cfg(not(futures_no_atomic_cas))]
203 JoinAllKindThenTry::Big { fut } => Pin::new(fut).poll(cx),
204 }
205 }
206}
207
208impl<F> FromIterator<F> for JoinAllThenTry<F>
209where
210 F: TryFuture,
211{
212 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
213 join_all_then_try(iter)
214 }
215}
216
217pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
218 // Safety: `std` _could_ make this unsound if it were to decide Pin's
219 // invariants aren't required to transmit through slices. Otherwise this has
220 // the same safety as a normal field pin projection.
221 unsafe { slice.get_unchecked_mut() }
222 .iter_mut()
223 .map(|t| unsafe { Pin::new_unchecked(t) })
224}