Trait cancel_safe_futures::sink::SinkExt

source ·
pub trait SinkExt<Item>: Sink<Item> {
    // Provided methods
    fn reserve(&mut self) -> Reserve<'_, Self, Item> 
       where Self: Unpin { ... }
    fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item> 
       where Self: Unpin { ... }
}
Expand description

Extension trait for Sink that provides alternative adapters.

Provided Methods§

source

fn reserve(&mut self) -> Reserve<'_, Self, Item>
where Self: Unpin,

A future that completes once an item is ready to be sent to this sink.

The future stays pending until Sink::poll_ready completes, then returns a Permit which can be used to send an item to a sink.

§Motivation

Consider a select loop that calls the send adapter, something common while writing async Rust code:

use futures_util::SinkExt;
use std::time::Duration;

let mut my_sink = /* ... */;
// item must either be an Option<T> or be cloned, otherwise a
// "value moved here, in previous iteration of loop" error occurs.
let mut item = "hello".to_owned();

let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
    tokio::select! {
        res = my_sink.send(item.clone()) => {
            res?;
            break;
        }
        _ = interval.tick() => {
            continue;
        }
    }
}

If interval.tick() occurs before my_sink.send(item.clone()) completes, then it is impossible to tell if the item was actually sent to the sink or not, since send combines Sink::poll_ready, Sink::start_send and Sink::poll_flush.

reserve separates out Sink::poll_ready from the latter two steps, so that item is only sent after the stream is ready to accept it. In the above case, this might look something like:

use cancel_safe_futures::SinkExt;
use std::time::Duration;

let mut my_sink = /* ... */;
// item is an Option<T>, and will be set to None once it is sent.
let mut item = Some("hello".to_owned());

let mut interval = tokio::time::interval(Duration::from_secs(10));
while item.is_some() {
    tokio::select! {
        res = my_sink.reserve() => {
            let permit = res?;
            permit.send(item.take().unwrap())?.await?;
            break;
        }
        _ = interval.tick() => {
            continue;
        }
    }
}
source

fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item>
where Self: Unpin,

A future that completes once the sink is flushed, and an item is ready to be sent to it.

This is similar to reserve, except it calls poll_flush on the sink before calling poll_ready on it.

Object Safety§

This trait is not object safe.

Implementors§

source§

impl<T, Item> SinkExt<Item> for T
where T: Sink<Item> + ?Sized,