cancel_safe_futures/sink/
mod.rs

1//! Alternative extensions for [`Sink`].
2
3use futures_sink::Sink;
4
5mod flush_reserve;
6pub use flush_reserve::FlushReserve;
7
8mod permit;
9pub use permit::Permit;
10
11mod reserve;
12use crate::support::assert_future;
13pub use reserve::Reserve;
14
15/// Extension trait for [`Sink`] that provides alternative adapters.
16pub trait SinkExt<Item>: Sink<Item> {
17    /// A future that completes once an item is ready to be sent to this sink.
18    ///
19    /// The future stays pending until [`Sink::poll_ready`] completes, then returns a [`Permit`]
20    /// which can be used to send an item to a sink.
21    ///
22    /// # Motivation
23    ///
24    /// Consider a select loop that calls the [`send`](futures_util::SinkExt::send) adapter,
25    /// something common while writing async Rust code:
26    ///
27    /// ```
28    /// use futures_util::SinkExt;
29    /// use std::time::Duration;
30    ///
31    /// # #[tokio::main(flavor = "current_thread")]
32    /// # async fn main() -> Result<(), std::convert::Infallible> {
33    /// # /*
34    /// let mut my_sink = /* ... */;
35    /// # */
36    /// # let mut my_sink = futures_util::sink::drain();
37    /// // item must either be an Option<T> or be cloned, otherwise a
38    /// // "value moved here, in previous iteration of loop" error occurs.
39    /// let mut item = "hello".to_owned();
40    ///
41    /// let mut interval = tokio::time::interval(Duration::from_secs(10));
42    /// loop {
43    ///     tokio::select! {
44    ///         res = my_sink.send(item.clone()) => {
45    ///             res?;
46    ///             break;
47    ///         }
48    ///         _ = interval.tick() => {
49    ///             continue;
50    ///         }
51    ///     }
52    /// }
53    ///
54    /// # Ok(()) }
55    /// ```
56    ///
57    /// If `interval.tick()` occurs before `my_sink.send(item.clone())` completes, then it is
58    /// impossible to tell if the item was actually sent to the sink or not, since `send` combines
59    /// [`Sink::poll_ready`], [`Sink::start_send`] and [`Sink::poll_flush`].
60    ///
61    /// `reserve` separates out [`Sink::poll_ready`] from the latter two steps, so that `item` is
62    /// only sent after the stream is ready to accept it. In the above case, this might look
63    /// something like:
64    ///
65    /// ```
66    /// use cancel_safe_futures::SinkExt;
67    /// use std::time::Duration;
68    ///
69    /// # #[tokio::main(flavor = "current_thread")]
70    /// # async fn main() -> Result<(), std::convert::Infallible> {
71    /// # /*
72    /// let mut my_sink = /* ... */;
73    /// # */
74    /// # let mut my_sink = futures_util::sink::drain();
75    /// // item is an Option<T>, and will be set to None once it is sent.
76    /// let mut item = Some("hello".to_owned());
77    ///
78    /// let mut interval = tokio::time::interval(Duration::from_secs(10));
79    /// while item.is_some() {
80    ///     tokio::select! {
81    ///         res = my_sink.reserve() => {
82    ///             let permit = res?;
83    ///             permit.send(item.take().unwrap())?.await?;
84    ///             break;
85    ///         }
86    ///         _ = interval.tick() => {
87    ///             continue;
88    ///         }
89    ///     }
90    /// }
91    ///
92    /// # Ok(()) }
93    /// ```
94    fn reserve(&mut self) -> Reserve<'_, Self, Item>
95    where
96        Self: Unpin,
97    {
98        assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(Reserve::new(self))
99    }
100
101    /// A future that completes once the sink is flushed, and an item is ready to be sent to it.
102    ///
103    /// This is similar to [`reserve`](SinkExt::reserve), except it calls
104    /// [`poll_flush`](Sink::poll_flush) on the sink before calling [`poll_ready`](Sink::poll_ready)
105    /// on it.
106    fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item>
107    where
108        Self: Unpin,
109    {
110        assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(FlushReserve::new(self))
111    }
112}
113
114impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}