1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
#![cfg_attr(not(feature = "std"), no_std)]
//! Alternative futures adapters that are more cancellation-aware.
//!
//! # What is this crate?
//!
//! This crate solves a few related but distinct problems:
//!
//! ## 1. Cancel-safe futures adapters
//!
//! The [`futures`](https://docs.rs/futures/latest/futures/) library contains many adapters that
//! make writing asynchronous Rust code more pleasant. However, some of those combinators make it
//! hard to write code that can withstand cancellation in the case of timeouts, `select!` branches
//! or similar.
//!
//! For a more detailed explanation, see the documentation for [`SinkExt::reserve`].
//!
//! ### Example
//!
//! Attempt to send an item in a loop with a timeout:
//!
//! ```
//! use cancel_safe_futures::prelude::*;
//! use std::time::Duration;
//!
//! # #[tokio::main(flavor = "current_thread")]
//! # async fn main() -> Result<(), std::convert::Infallible> {
//! # /*
//! let mut my_sink = /* ... */;
//! # */
//! # let mut my_sink = futures_util::sink::drain();
//!
//! // This item is stored here and will be set to None once the loop exits successfully.
//! let mut item = Some("hello".to_owned());
//! let do_flush = false;
//!
//! while item.is_some() {
//! match tokio::time::timeout(Duration::from_secs(10), my_sink.reserve()).await {
//! Ok(Ok(permit)) => {
//! let item = item.take().unwrap();
//! if !do_flush {
//! // permit.feed() feeds the item into the sink without flushing
//! // the sink afterwards. This is a synchronous method.
//! permit.feed(item)?;
//! } else {
//! // Alternatively, permit.send() writes the item into the sink
//! // synchronously, then returns a future which can be awaited to
//! // flush the sink.
//! permit.send(item)?.await?;
//! }
//! }
//! Ok(Err(error)) => return Err(error),
//! Err(timeout_error) => continue,
//! }
//! }
//!
//! # Ok(()) }
//! ```
//!
//! ## 2. `then_try` adapters that don't perform cancellations
//!
//! The futures and tokio libraries come with a number of `try_` adapters and macros, for example
//! [`tokio::try_join!`]. These adapters have the property that if one of the futures under
//! consideration fails, all other futures are cancelled.
//!
//! This is not always desirable and has led to correctness bugs (e.g. [omicron PR
//! 3707](https://github.com/oxidecomputer/omicron/pull/3707)). To address this issue, this crate
//! provides a set of `then_try` adapters and macros that behave like their `try_` counterparts,
//! except that if one or more of the futures errors out, the others will still be run to
//! completion.
//!
//! The `then_try` family includes:
//!
//! * [`join_then_try`]: similar to [`tokio::try_join`].
//! * [`future::join_all_then_try`]: similar to [`futures::future::try_join_all`].
//! * [`TryStreamExt`]: contains alternative extension methods to [`futures::stream::TryStreamExt`],
//! such as `collect_then_try`.
//!
//! ### Example
//!
//! For a detailed example, see the documentation for the [`join_then_try`] macro.
//!
//! ## 3. Cancel-safe mutexes
//!
//! The [`tokio::sync::Mutex`] shipped with Tokio has resulted in many bugs in practice,
//! particularly around cancellations.
//!
//! This crate provides an alternative mutex API, called [`RobustMutex`](sync::RobustMutex), that does not
//! have those pitfalls. For more, see the documentation for [`RobustMutex`](sync::RobustMutex).
//!
//! ## 4. Cooperative cancellation
//!
//! Executors like Tokio support forcible cancellation for async tasks via facilities like
//! [`tokio::task::JoinHandle::abort`]. However, this can cause cancellations at any arbitrary await
//! point. If the future is in the middle of cancel-unsafe code, this can cause invariant violations
//! or other issues.
//!
//! Instead, async cancellation can be done cooperatively: code can check for cancellation
//! explicitly via [`tokio::select!`]. This crate provides the [`coop_cancel`] module that can be
//! used to accomplish that goal.
//!
//! ### Example
//!
//! For a detailed example, see the documentation for [`coop_cancel`].
//!
//! # Notes
//!
//! This library is not complete: adapters and macros are added on an as-needed basis. If you need
//! an adapter that is not yet implemented, please open an issue or a pull request.
//!
//! # Optional features
//!
//! * `macros` (enabled by default): Enables macros.
//! * `std` (enabled by default): Enables items that depend on `std`, including items that depend on
//! `alloc`.
//! * `alloc` (enabled by default): Enables items that depend on `alloc`.
//! * `parking_lot`: Switches to `parking_lot`'s mutexes.
//!
//! No-std users must turn off default features while importing this crate.
#![warn(missing_docs)]
#![cfg_attr(doc_cfg, feature(doc_cfg, doc_auto_cfg))]
#[cfg(feature = "alloc")]
extern crate alloc;
// Includes re-exports used by macros.
//
// This module is not intended to be part of the public API. In general, any
// `doc(hidden)` code is not part of the public and stable API.
#[macro_use]
#[doc(hidden)]
pub mod macros;
#[cfg(feature = "std")]
pub mod coop_cancel;
pub mod future;
pub mod prelude;
pub mod sink;
pub mod stream;
mod support;
pub mod sync;
pub use sink::SinkExt;
pub use stream::TryStreamExt;