1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
//! Alternative extensions for [`Sink`].
use futures_sink::Sink;
mod flush_reserve;
pub use flush_reserve::FlushReserve;
mod permit;
pub use permit::Permit;
mod reserve;
use crate::support::assert_future;
pub use reserve::Reserve;
/// Extension trait for [`Sink`] that provides alternative adapters.
pub trait SinkExt<Item>: Sink<Item> {
/// A future that completes once an item is ready to be sent to this sink.
///
/// The future stays pending until [`Sink::poll_ready`] completes, then returns a [`Permit`]
/// which can be used to send an item to a sink.
///
/// # Motivation
///
/// Consider a select loop that calls the [`send`](futures_util::SinkExt::send) adapter,
/// something common while writing async Rust code:
///
/// ```
/// use futures_util::SinkExt;
/// use std::time::Duration;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), std::convert::Infallible> {
/// # /*
/// let mut my_sink = /* ... */;
/// # */
/// # let mut my_sink = futures_util::sink::drain();
/// // item must either be an Option<T> or be cloned, otherwise a
/// // "value moved here, in previous iteration of loop" error occurs.
/// let mut item = "hello".to_owned();
///
/// let mut interval = tokio::time::interval(Duration::from_secs(10));
/// loop {
/// tokio::select! {
/// res = my_sink.send(item.clone()) => {
/// res?;
/// break;
/// }
/// _ = interval.tick() => {
/// continue;
/// }
/// }
/// }
///
/// # Ok(()) }
/// ```
///
/// If `interval.tick()` occurs before `my_sink.send(item.clone())` completes, then it is
/// impossible to tell if the item was actually sent to the sink or not, since `send` combines
/// [`Sink::poll_ready`], [`Sink::start_send`] and [`Sink::poll_flush`].
///
/// `reserve` separates out [`Sink::poll_ready`] from the latter two steps, so that `item` is
/// only sent after the stream is ready to accept it. In the above case, this might look
/// something like:
///
/// ```
/// use cancel_safe_futures::SinkExt;
/// use std::time::Duration;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), std::convert::Infallible> {
/// # /*
/// let mut my_sink = /* ... */;
/// # */
/// # let mut my_sink = futures_util::sink::drain();
/// // item is an Option<T>, and will be set to None once it is sent.
/// let mut item = Some("hello".to_owned());
///
/// let mut interval = tokio::time::interval(Duration::from_secs(10));
/// while item.is_some() {
/// tokio::select! {
/// res = my_sink.reserve() => {
/// let permit = res?;
/// permit.send(item.take().unwrap())?.await?;
/// break;
/// }
/// _ = interval.tick() => {
/// continue;
/// }
/// }
/// }
///
/// # Ok(()) }
/// ```
fn reserve(&mut self) -> Reserve<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(Reserve::new(self))
}
/// A future that completes once the sink is flushed, and an item is ready to be sent to it.
///
/// This is similar to [`reserve`](SinkExt::reserve), except it calls
/// [`poll_flush`](Sink::poll_flush) on the sink before calling [`poll_ready`](Sink::poll_ready)
/// on it.
fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<Permit<'_, Self, Item>, Self::Error>, _>(FlushReserve::new(self))
}
}
impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}