Struct cancel_safe_futures::sync::RobustMutex

source ·
pub struct RobustMutex<T: ?Sized> { /* private fields */ }
Available on crate feature std only.
Expand description

A cancel-safe and panic-safe variant of tokio::sync::Mutex.

A RobustMutex is a wrapper on top of a tokio::sync::Mutex which adds two further guarantees: panic safety and cancel safety. Both of these guarantees are implemented to ensure that mutex invariants aren’t violated to the greatest extent possible.

§Motivation

A mutex is a synchronization structure which allows only one task to access some data at a time. The general idea behind a mutex is that the data it owns has some invariants. When a task acquires a lock on the mutex, it enters a critical section. Within this critical section, the invariants can temporarily be violated. It is expected that the task will restore those invariants before releasing the lock.

For example, let’s say that we have a mutex which guards two HashMaps, with the invariant that mutex are that the two HashMaps always contain the same keys. With a Tokio mutex, you might write something like:

use std::collections::HashMap;
use tokio::sync::Mutex;

struct MyStruct {
    map1: HashMap<String, String>,
    map2: HashMap<String, u32>,
}

impl MyStruct {
    fn new() -> Self {
        Self {
            map1: HashMap::new(),
            map2: HashMap::new(),
        }
    }
}

#[tokio::main]
async fn main() {
    let mutex = Mutex::new(MyStruct::new());

    let mut guard = mutex.lock().await;
    guard.map1.insert("hello".to_owned(), "world".to_owned());  // (1)
    // ... some code in between
    guard.map2.insert("hello".to_owned(), 42);  // (2)

    // (This happens implicitly but is made explicit here.)
    std::mem::drop(guard);
}

At point (1) we’ve temporarily violated the invariant that map1 and map2 contain the same keys. However, at point (2) the invariant is restored.

  • But what if the task panics between (1) and (2)? In that case, the mutex is left in a state where the invariants are violated. This is a problem because this is an inconsistent state – other tasks which acquire the lock can no longer assume that the invariants are upheld.

    This is the problem that panic safety solves.

  • In async code, what if there’s an await point between (1) and (2), and the future is dropped at that await point? Then, too, the invariants are violated. With synchronous code the only possible interruptions in the middle of a critical section are due to panics, but with async code cancellations are a fact of life.

    This is the problem that cancel safety solves.

Both of these problems can also be solved in an ad-hoc manner (for example, by carefully checking for and restoring invariants at the start of each critical section). However, the goal of this mutex is to provide a systematic, if conservative, solution to these problems.

§Panic safety

Like std::sync::Mutex but unlike tokio::sync::Mutex, this mutex implements a strategy called “poisoning” where a mutex is considered poisoned whenever a task panics within one of the ActionPermit perform methods. Once a mutex is poisoned, all other tasks are unable to access the data by default.

This means that the lock and try_lock methods return a Result which indicates whether a mutex has been poisoned or not. Most usage of a mutex will simply unwrap() these results, propagating panics among tasks to ensure that a possibly invalid invariant is not witnessed.

A poisoned mutex, however, does not prevent all access to the underlying data. The PoisonError type has an into_inner method which will return the guard that would have otherwise been returned on a successful lock. This allows access to the data, despite the lock being poisoned.

§Cancel safety

To guard against async cancellations in the middle of the critical section, the mutex uses a callback approach. This is done by returning ActionPermit instances which provide access to the guarded data in two ways:

  1. perform(), which accepts a synchronous closure that cannot have await points within it.
  2. perform_async_boxed() and perform_async_boxed_local(), which accept asynchronous closures. If the future returned by these methods is cancelled in the middle of execution, the mutex is marked as poisoned.

In general, it is recommended that perform() be used and mutexes not be held across await points at all, since that can cause performance and correctness issues.

Not using an RAII guard like std::sync::MutexGuard does mean that there are patterns that are not possible with this mutex. For example, you cannot perform a pattern where:

  1. You acquire a lock L₁.
  2. You acquire a second lock L₂.
  3. You release L₁.
  4. You release L₂.

If you really do need to do this or more complicated patterns, std::sync::Mutex and tokio::sync::Mutex remain available.

§Examples

The above example, rewritten to use a RobustMutex, would look like:

use cancel_safe_futures::sync::RobustMutex;
use std::collections::HashMap;

struct MyStruct {
    map1: HashMap<String, String>,
    map2: HashMap<String, u32>,
}

impl MyStruct {
    fn new() -> Self { /* ... */ }
}

#[tokio::main]
async fn main() {
    let mutex = RobustMutex::new(MyStruct::new());

    let mut permit = mutex.lock().await.unwrap();  // note unwrap() here
    permit.perform(|data| {
        data.map1.insert("hello".to_owned(), "world".to_owned());  // (1)
        // ... some code in between
        data.map2.insert("hello".to_owned(), 42);  // (2)
    });
}

§Features

Basic mutex operations are supported. In the future, this will support:

Mapped action permits similar to tokio::sync::MappedMutexGuard will likely not be supported because it’s hard to define panic and cancel safety in that scenario.

§Why “robust”?

The name is derived from POSIX’s pthread_mutexattr_getrobust and pthread_mutexattr_setrobust. These functions aim to achieve very similar goals to this mutex, except in slightly different circumstances (thread cancellations and terminations rather than task cancellations and panics).

Implementations§

source§

impl<T: ?Sized> RobustMutex<T>

source

pub fn new(value: T) -> Self
where T: Sized,

Creates a new lock in an unlocked state ready for use.

§Examples
use cancel_safe_futures::sync::RobustMutex;

let lock = RobustMutex::new(5);
source

pub const fn const_new(value: T) -> Self
where T: Sized,

Available on crate feature parking_lot only.

Creates a new lock in an unlocked state ready for use.

§Examples
use cancel_safe_futures::sync::RobustMutex;

static LOCK: RobustMutex<i32> = RobustMutex::const_new(5);
source

pub async fn lock(&self) -> LockResult<ActionPermit<'_, T>>

Locks this mutex, causing the current task to yield until the lock has been acquired. When the lock has been acquired, function returns a ActionPermit.

§Errors

If another user of this mutex panicked while holding the mutex, then this call will return an error once the mutex is acquired.

§Cancel safety

This method uses a queue to fairly distribute locks in the order they were requested. Cancelling a call to lock makes you lose your place in the queue.

§Examples
use cancel_safe_futures::sync::RobustMutex;

#[tokio::main]
async fn main() {
    let mutex = RobustMutex::new(1);

    let mut permit = mutex.lock().await.unwrap();
    permit.perform(|n| *n = 2);
}
source

pub fn blocking_lock(&self) -> LockResult<ActionPermit<'_, T>>

Blockingly locks this Mutex. When the lock has been acquired, the function returns a ActionPermit.

This method is intended for use cases where you need to use this mutex in asynchronous code as well as in synchronous code.

§Errors

If another user of this mutex panicked while holding the mutex, then this call will return an error once the mutex is acquired.

§Panics

This function panics if called within an asynchronous execution context.

§Examples
use cancel_safe_futures::sync::RobustMutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let mutex = Arc::new(RobustMutex::new(1));
    let permit = mutex.lock().await.unwrap();

    let mutex1 = Arc::clone(&mutex);
    let blocking_task = tokio::task::spawn_blocking(move || {
        // This shall block until the `lock` is released.
        let permit = mutex1.blocking_lock().unwrap();
        permit.perform(|n| *n = 2);
    });

    permit.perform(|n| { assert_eq!(*n, 1) });

    // Await the completion of the blocking task.
    blocking_task.await.unwrap();

    // Assert uncontended.
    let permit = mutex.try_lock().unwrap();
    permit.perform(|n| { assert_eq!(*n, 2) });
}
source

pub fn try_lock(&self) -> TryLockResult<ActionPermit<'_, T>>

Attempts to acquire the lock, returning an ActionPermit if successful.

§Errors

Returns TryLockError::WouldBlock if the lock is currently held somewhere else.

Returns TryLockError::Poisoned if another thread panicked while holding the lock.

§Examples
use cancel_safe_futures::sync::RobustMutex;

#[tokio::main]
async fn main() {
    let mutex = RobustMutex::new(1);

    let permit = mutex.try_lock().unwrap();
    permit.perform(|n| {
        assert_eq!(*n, 1);
    });
}
source

pub fn is_poisoned(&self) -> bool

Determines whether the mutex is poisoned.

This is equivalent to Self::is_panic_poisoned||Self::is_cancel_poisoned.

If another task is active, the mutex can still become poisoned at any time. You should not trust a false value for program correctness without additional synchronization.

source

pub fn is_panic_poisoned(&self) -> bool

Determines whether the mutex is poisoned due to a panic.

If another task is active, the mutex can still become poisoned at any time. You should not trust a false value for program correctness without additional synchronization.

§Examples
use cancel_safe_futures::sync::RobustMutex;
use std::sync::Arc;


let mutex = Arc::new(RobustMutex::new(0));
let c_mutex = Arc::clone(&mutex);

let _ = tokio::task::spawn(async move {
    let permit = c_mutex.lock().await.unwrap();
    permit.perform(|_| {
        panic!(); // the mutex gets poisoned
    });
}).await;

assert!(mutex.is_panic_poisoned());
source

pub fn is_cancel_poisoned(&self) -> bool

Determines whether this mutex is poisoned due to a cancellation.

If another task is active, the mutex can still become poisoned at any time. You should not trust a false value for program correctness without additional synchronization.

§Examples
use cancel_safe_futures::sync::RobustMutex;
use futures::FutureExt;
use std::sync::Arc;


let mutex = Arc::new(RobustMutex::new(0));
let c_mutex = Arc::clone(&mutex);

tokio::task::spawn(async move {
    let permit = c_mutex.lock().await.unwrap();
    let fut = permit.perform_async_boxed(|n| async move {
        // Sleep for 1 second.
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        *n = 1;
    }.boxed());
    tokio::select! {
        _ = fut => {
            panic!("this branch should not be encountered");
        }
        _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
            // Exit the task, causing `fut` to be cancelled after 100ms.
        }
    }
}).await.unwrap();

assert!(mutex.is_cancel_poisoned());
source

pub fn get_mut(&mut self) -> &mut T

Returns a mutable reference to the underlying data.

Since this call borrows the Mutex mutably, no actual locking needs to take place – the mutable borrow statically guarantees no locks exist.

§Examples
use cancel_safe_futures::sync::RobustMutex;

let mut mutex = RobustMutex::new(1);

let n = mutex.get_mut();
*n = 2;
source

pub fn into_inner(self) -> LockResult<T>
where T: Sized,

Consumes the mutex, returning the underlying data.

§Errors

If another user of this mutex panicked while holding the mutex, then this call will return an error.

§Examples
use cancel_safe_futures::sync::RobustMutex;

#[tokio::main]
async fn main() {
    let mutex = RobustMutex::new(1);

    let n = mutex.into_inner().unwrap();
    assert_eq!(n, 1);
}

Trait Implementations§

source§

impl<T: ?Sized + Debug> Debug for RobustMutex<T>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T> Default for RobustMutex<T>
where T: Default,

source§

fn default() -> Self

Returns the “default value” for a type. Read more
source§

impl<T> From<T> for RobustMutex<T>

source§

fn from(t: T) -> Self

Converts to this type from the input type.

Auto Trait Implementations§

§

impl<T> !Freeze for RobustMutex<T>

§

impl<T> !RefUnwindSafe for RobustMutex<T>

§

impl<T> Send for RobustMutex<T>
where T: Send + ?Sized,

§

impl<T> Sync for RobustMutex<T>
where T: Send + ?Sized,

§

impl<T> Unpin for RobustMutex<T>
where T: Unpin + ?Sized,

§

impl<T> !UnwindSafe for RobustMutex<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<!> for T

source§

fn from(t: !) -> T

Converts to this type from the input type.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.