cancel_safe_futures/sink/mod.rs
1//! Alternative extensions for [`Sink`].
2
3use futures_sink::Sink;
4
5mod flush_reserve;
6pub use flush_reserve::FlushReserve;
7
8mod permit;
9pub use permit::Permit;
10
11mod reserve;
12use crate::support::assert_future;
13pub use reserve::Reserve;
14
15/// Extension trait for [`Sink`] that provides alternative adapters.
16pub trait SinkExt<Item>: Sink<Item> {
17 /// A future that completes once an item is ready to be sent to this sink.
18 ///
19 /// The future stays pending until [`Sink::poll_ready`] completes, then returns a [`Permit`]
20 /// which can be used to send an item to a sink.
21 ///
22 /// # Motivation
23 ///
24 /// Consider a select loop that calls the [`send`](futures_util::SinkExt::send) adapter,
25 /// something common while writing async Rust code:
26 ///
27 /// ```
28 /// use futures_util::SinkExt;
29 /// use std::time::Duration;
30 ///
31 /// # #[tokio::main(flavor = "current_thread")]
32 /// # async fn main() -> Result<(), std::convert::Infallible> {
33 /// # /*
34 /// let mut my_sink = /* ... */;
35 /// # */
36 /// # let mut my_sink = futures_util::sink::drain();
37 /// // item must either be an Option<T> or be cloned, otherwise a
38 /// // "value moved here, in previous iteration of loop" error occurs.
39 /// let mut item = "hello".to_owned();
40 ///
41 /// let mut interval = tokio::time::interval(Duration::from_secs(10));
42 /// loop {
43 /// tokio::select! {
44 /// res = my_sink.send(item.clone()) => {
45 /// res?;
46 /// break;
47 /// }
48 /// _ = interval.tick() => {
49 /// continue;
50 /// }
51 /// }
52 /// }
53 ///
54 /// # Ok(()) }
55 /// ```
56 ///
57 /// If `interval.tick()` occurs before `my_sink.send(item.clone())` completes, then it is
58 /// impossible to tell if the item was actually sent to the sink or not, since `send` combines
59 /// [`Sink::poll_ready`], [`Sink::start_send`] and [`Sink::poll_flush`].
60 ///
61 /// `reserve` separates out [`Sink::poll_ready`] from the latter two steps, so that `item` is
62 /// only sent after the stream is ready to accept it. In the above case, this might look
63 /// something like:
64 ///
65 /// ```
66 /// use cancel_safe_futures::SinkExt;
67 /// use std::time::Duration;
68 ///
69 /// # #[tokio::main(flavor = "current_thread")]
70 /// # async fn main() -> Result<(), std::convert::Infallible> {
71 /// # /*
72 /// let mut my_sink = /* ... */;
73 /// # */
74 /// # let mut my_sink = futures_util::sink::drain();
75 /// // item is an Option<T>, and will be set to None once it is sent.
76 /// let mut item = Some("hello".to_owned());
77 ///
78 /// let mut interval = tokio::time::interval(Duration::from_secs(10));
79 /// while item.is_some() {
80 /// tokio::select! {
81 /// res = my_sink.reserve() => {
82 /// let permit = res?;
83 /// permit.send(item.take().unwrap())?.await?;
84 /// break;
85 /// }
86 /// _ = interval.tick() => {
87 /// continue;
88 /// }
89 /// }
90 /// }
91 ///
92 /// # Ok(()) }
93 /// ```
94 fn reserve(&mut self) -> Reserve<'_, Self, Item>
95 where
96 Self: Unpin,
97 {
98 assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(Reserve::new(self))
99 }
100
101 /// A future that completes once the sink is flushed, and an item is ready to be sent to it.
102 ///
103 /// This is similar to [`reserve`](SinkExt::reserve), except it calls
104 /// [`poll_flush`](Sink::poll_flush) on the sink before calling [`poll_ready`](Sink::poll_ready)
105 /// on it.
106 fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item>
107 where
108 Self: Unpin,
109 {
110 assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(FlushReserve::new(self))
111 }
112}
113
114impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}