1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//! Alternative extensions for [`Sink`].

use futures_sink::Sink;

mod flush_reserve;
pub use flush_reserve::FlushReserve;

mod permit;
pub use permit::Permit;

mod reserve;
use crate::support::assert_future;
pub use reserve::Reserve;

/// Extension trait for [`Sink`] that provides alternative adapters.
pub trait SinkExt<Item>: Sink<Item> {
    /// A future that completes once an item is ready to be sent to this sink.
    ///
    /// The future stays pending until [`Sink::poll_ready`] completes, then returns a [`Permit`]
    /// which can be used to send an item to a sink.
    ///
    /// # Motivation
    ///
    /// Consider a select loop that calls the [`send`](futures_util::SinkExt::send) adapter,
    /// something common while writing async Rust code:
    ///
    /// ```
    /// use futures_util::SinkExt;
    /// use std::time::Duration;
    ///
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() -> Result<(), std::convert::Infallible> {
    /// # /*
    /// let mut my_sink = /* ... */;
    /// # */
    /// # let mut my_sink = futures_util::sink::drain();
    /// // item must either be an Option<T> or be cloned, otherwise a
    /// // "value moved here, in previous iteration of loop" error occurs.
    /// let mut item = "hello".to_owned();
    ///
    /// let mut interval = tokio::time::interval(Duration::from_secs(10));
    /// loop {
    ///     tokio::select! {
    ///         res = my_sink.send(item.clone()) => {
    ///             res?;
    ///             break;
    ///         }
    ///         _ = interval.tick() => {
    ///             continue;
    ///         }
    ///     }
    /// }
    ///
    /// # Ok(()) }
    /// ```
    ///
    /// If `interval.tick()` occurs before `my_sink.send(item.clone())` completes, then it is
    /// impossible to tell if the item was actually sent to the sink or not, since `send` combines
    /// [`Sink::poll_ready`], [`Sink::start_send`] and [`Sink::poll_flush`].
    ///
    /// `reserve` separates out [`Sink::poll_ready`] from the latter two steps, so that `item` is
    /// only sent after the stream is ready to accept it. In the above case, this might look
    /// something like:
    ///
    /// ```
    /// use cancel_safe_futures::SinkExt;
    /// use std::time::Duration;
    ///
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() -> Result<(), std::convert::Infallible> {
    /// # /*
    /// let mut my_sink = /* ... */;
    /// # */
    /// # let mut my_sink = futures_util::sink::drain();
    /// // item is an Option<T>, and will be set to None once it is sent.
    /// let mut item = Some("hello".to_owned());
    ///
    /// let mut interval = tokio::time::interval(Duration::from_secs(10));
    /// while item.is_some() {
    ///     tokio::select! {
    ///         res = my_sink.reserve() => {
    ///             let permit = res?;
    ///             permit.send(item.take().unwrap())?.await?;
    ///             break;
    ///         }
    ///         _ = interval.tick() => {
    ///             continue;
    ///         }
    ///     }
    /// }
    ///
    /// # Ok(()) }
    /// ```
    fn reserve(&mut self) -> Reserve<'_, Self, Item>
    where
        Self: Unpin,
    {
        assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(Reserve::new(self))
    }

    /// A future that completes once the sink is flushed, and an item is ready to be sent to it.
    ///
    /// This is similar to [`reserve`](SinkExt::reserve), except it calls
    /// [`poll_flush`](Sink::poll_flush) on the sink before calling [`poll_ready`](Sink::poll_ready)
    /// on it.
    fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item>
    where
        Self: Unpin,
    {
        assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(FlushReserve::new(self))
    }
}

impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}