Module cancel_safe_futures::coop_cancel

source ·
Available on crate feature std only.
Expand description

A multi-producer, single-consumer channel for cooperative (explicit) cancellation.

This is similar in nature to a tokio::task::AbortHandle, except it uses a cooperative model for cancellation.

§Motivation

Executors like Tokio support forcible cancellation for async tasks via facilities like tokio::task::JoinHandle::abort. However, this causes cancellations at any arbitrary await point. This is often not desirable because it can lead to invariant violations.

For example, consider this code that consists of both the cancel-safe AsyncWriteExt::write_buf and some cancel-unsafe code:

use bytes::Buf;
use std::io::Cursor;
use tokio::{io::AsyncWriteExt, sync::mpsc};

struct DataWriter {
    writer: tokio::fs::File,
    bytes_written_channel: mpsc::Sender<usize>,
}

impl DataWriter {
    async fn write(&mut self, cursor: &mut Cursor<&[u8]>) -> std::io::Result<()> {
        // Cursor<&[u8]> implements the bytes::Buf trait, which is used by `write_buf`.
        while cursor.has_remaining() {
            let bytes_written = self.writer.write_buf(cursor).await?; // (1)
            self.bytes_written_channel.send(bytes_written).await; // (2)
        }

        Ok(())
    }
}

The invariant upheld by DataWriter is that if some bytes are written, the corresponding bytes_written is sent over self.bytes_written_channel. This means that cancelling at await point (1) is okay, but cancelling at await point (2) is not.

If we use tokio::task::JoinHandle::abort to cancel the task, it is possible that the task is cancelled at await point (2), breaking the invariant. Instead, we can use cooperative cancellation with a select! loop.

use bytes::Buf;
use cancel_safe_futures::coop_cancel;
use std::io::Cursor;
use tokio::{io::AsyncWriteExt, sync::mpsc};

struct DataWriter {
    writer: tokio::fs::File,
    bytes_written_channel: mpsc::Sender<usize>,
    cancel_receiver: coop_cancel::Receiver<()>,
}

impl DataWriter {
    async fn write(&mut self, cursor: &mut Cursor<&[u8]>) -> std::io::Result<()> {
        while cursor.has_remaining() {
            tokio::select! {
                res = self.writer.write_buf(cursor) => {
                    let bytes_written = res?;
                    self.bytes_written_channel.send(bytes_written).await;
                }
                Some(()) = self.cancel_receiver.recv() => {
                    // A cancellation notice was sent over the
                    // channel. Cancel here.
                    println!("cancelling!");
                    break;
                }
            }
        }

        Ok(())
    }
}

§Attaching a cancel message

Canceler::cancel can be used to send a message of any type T along with the cancellation event. This message is received via the Some variant of Receiver::recv.

For a given Receiver, only the first message sent via any corresponding Canceler is received. Subsequent calls to Receiver::recv will always return None, no matter whether further cancellation messages are sent. (This can change in the future if there’s a good use case for it.)

§Notes

This module implements “fan-in” cancellation – it supports many cancelers but only one receiver. For “fan-out” cancellation with one sender and many receivers, consider using the drain crate. This module and drain can be combined: create a task that listens to a Receiver, and notify downstream receivers via drain in that task.

Structs§

  • A cooperative cancellation sender.
  • A cooperative cancellation receiver.
  • A future which can be used to optionally block until a Receiver is dropped.

Functions§

  • Creates and returns a cooperative cancellation pair.