Module cancel_safe_futures::coop_cancel
source · std
only.Expand description
A multi-producer, single-consumer channel for cooperative (explicit) cancellation.
This is similar in nature to a tokio::task::AbortHandle
, except it uses a cooperative model
for cancellation.
§Motivation
Executors like Tokio support forcible cancellation for async tasks via facilities like
tokio::task::JoinHandle::abort
. However, this causes cancellations at any arbitrary await
point. This is often not desirable because it can lead to invariant violations.
For example, consider this code that consists of both the cancel-safe
AsyncWriteExt::write_buf
and some cancel-unsafe code:
use bytes::Buf;
use std::io::Cursor;
use tokio::{io::AsyncWriteExt, sync::mpsc};
struct DataWriter {
writer: tokio::fs::File,
bytes_written_channel: mpsc::Sender<usize>,
}
impl DataWriter {
async fn write(&mut self, cursor: &mut Cursor<&[u8]>) -> std::io::Result<()> {
// Cursor<&[u8]> implements the bytes::Buf trait, which is used by `write_buf`.
while cursor.has_remaining() {
let bytes_written = self.writer.write_buf(cursor).await?; // (1)
self.bytes_written_channel.send(bytes_written).await; // (2)
}
Ok(())
}
}
The invariant upheld by DataWriter
is that if some bytes are written, the corresponding
bytes_written
is sent over self.bytes_written_channel
. This means that cancelling at await
point (1) is okay, but cancelling at await point (2) is not.
If we use tokio::task::JoinHandle::abort
to cancel the task, it is possible that the task is
cancelled at await point (2), breaking the invariant. Instead, we can use cooperative
cancellation with a select!
loop.
use bytes::Buf;
use cancel_safe_futures::coop_cancel;
use std::io::Cursor;
use tokio::{io::AsyncWriteExt, sync::mpsc};
struct DataWriter {
writer: tokio::fs::File,
bytes_written_channel: mpsc::Sender<usize>,
cancel_receiver: coop_cancel::Receiver<()>,
}
impl DataWriter {
async fn write(&mut self, cursor: &mut Cursor<&[u8]>) -> std::io::Result<()> {
while cursor.has_remaining() {
tokio::select! {
res = self.writer.write_buf(cursor) => {
let bytes_written = res?;
self.bytes_written_channel.send(bytes_written).await;
}
Some(()) = self.cancel_receiver.recv() => {
// A cancellation notice was sent over the
// channel. Cancel here.
println!("cancelling!");
break;
}
}
}
Ok(())
}
}
§Attaching a cancel message
Canceler::cancel
can be used to send a message of any type T
along with the cancellation
event. This message is received via the Some
variant of Receiver::recv
.
For a given Receiver
, only the first message sent via any corresponding Canceler
is
received. Subsequent calls to Receiver::recv
will always return None
, no matter whether
further cancellation messages are sent. (This can change in the future if there’s a good use
case for it.)
§Notes
This module implements “fan-in” cancellation – it supports many cancelers but only one
receiver. For “fan-out” cancellation with one sender and many receivers, consider using the
drain
crate. This module and drain
can be combined: create a task
that listens to a Receiver
, and notify downstream receivers via drain
in that task.
Structs§
- A cooperative cancellation sender.
- A cooperative cancellation receiver.
- A future which can be used to optionally block until a
Receiver
is dropped.
Functions§
- Creates and returns a cooperative cancellation pair.