Trait cancel_safe_futures::sink::SinkExt
source · pub trait SinkExt<Item>: Sink<Item> {
// Provided methods
fn reserve(&mut self) -> Reserve<'_, Self, Item> ⓘ
where Self: Unpin { ... }
fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item> ⓘ
where Self: Unpin { ... }
}Expand description
Extension trait for Sink that provides alternative adapters.
Provided Methods§
sourcefn reserve(&mut self) -> Reserve<'_, Self, Item> ⓘwhere
Self: Unpin,
fn reserve(&mut self) -> Reserve<'_, Self, Item> ⓘwhere
Self: Unpin,
A future that completes once an item is ready to be sent to this sink.
The future stays pending until Sink::poll_ready completes, then returns a Permit
which can be used to send an item to a sink.
§Motivation
Consider a select loop that calls the send adapter,
something common while writing async Rust code:
use futures_util::SinkExt;
use std::time::Duration;
let mut my_sink = /* ... */;
// item must either be an Option<T> or be cloned, otherwise a
// "value moved here, in previous iteration of loop" error occurs.
let mut item = "hello".to_owned();
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
tokio::select! {
res = my_sink.send(item.clone()) => {
res?;
break;
}
_ = interval.tick() => {
continue;
}
}
}
If interval.tick() occurs before my_sink.send(item.clone()) completes, then it is
impossible to tell if the item was actually sent to the sink or not, since send combines
Sink::poll_ready, Sink::start_send and Sink::poll_flush.
reserve separates out Sink::poll_ready from the latter two steps, so that item is
only sent after the stream is ready to accept it. In the above case, this might look
something like:
use cancel_safe_futures::SinkExt;
use std::time::Duration;
let mut my_sink = /* ... */;
// item is an Option<T>, and will be set to None once it is sent.
let mut item = Some("hello".to_owned());
let mut interval = tokio::time::interval(Duration::from_secs(10));
while item.is_some() {
tokio::select! {
res = my_sink.reserve() => {
let permit = res?;
permit.send(item.take().unwrap())?.await?;
break;
}
_ = interval.tick() => {
continue;
}
}
}
sourcefn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item> ⓘwhere
Self: Unpin,
fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item> ⓘwhere
Self: Unpin,
A future that completes once the sink is flushed, and an item is ready to be sent to it.
This is similar to reserve, except it calls
poll_flush on the sink before calling poll_ready
on it.