Trait cancel_safe_futures::sink::SinkExt
source · pub trait SinkExt<Item>: Sink<Item> {
// Provided methods
fn reserve(&mut self) -> Reserve<'_, Self, Item> ⓘ
where Self: Unpin { ... }
fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item> ⓘ
where Self: Unpin { ... }
}
Expand description
Extension trait for Sink
that provides alternative adapters.
Provided Methods§
sourcefn reserve(&mut self) -> Reserve<'_, Self, Item> ⓘwhere
Self: Unpin,
fn reserve(&mut self) -> Reserve<'_, Self, Item> ⓘwhere
Self: Unpin,
A future that completes once an item is ready to be sent to this sink.
The future stays pending until Sink::poll_ready
completes, then returns a Permit
which can be used to send an item to a sink.
§Motivation
Consider a select loop that calls the send
adapter,
something common while writing async Rust code:
use futures_util::SinkExt;
use std::time::Duration;
let mut my_sink = /* ... */;
// item must either be an Option<T> or be cloned, otherwise a
// "value moved here, in previous iteration of loop" error occurs.
let mut item = "hello".to_owned();
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
tokio::select! {
res = my_sink.send(item.clone()) => {
res?;
break;
}
_ = interval.tick() => {
continue;
}
}
}
If interval.tick()
occurs before my_sink.send(item.clone())
completes, then it is
impossible to tell if the item was actually sent to the sink or not, since send
combines
Sink::poll_ready
, Sink::start_send
and Sink::poll_flush
.
reserve
separates out Sink::poll_ready
from the latter two steps, so that item
is
only sent after the stream is ready to accept it. In the above case, this might look
something like:
use cancel_safe_futures::SinkExt;
use std::time::Duration;
let mut my_sink = /* ... */;
// item is an Option<T>, and will be set to None once it is sent.
let mut item = Some("hello".to_owned());
let mut interval = tokio::time::interval(Duration::from_secs(10));
while item.is_some() {
tokio::select! {
res = my_sink.reserve() => {
let permit = res?;
permit.send(item.take().unwrap())?.await?;
break;
}
_ = interval.tick() => {
continue;
}
}
}
sourcefn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item> ⓘwhere
Self: Unpin,
fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item> ⓘwhere
Self: Unpin,
A future that completes once the sink is flushed, and an item is ready to be sent to it.
This is similar to reserve
, except it calls
poll_flush
on the sink before calling poll_ready
on it.