Struct cancel_safe_futures::sync::RobustMutex
source · pub struct RobustMutex<T: ?Sized> { /* private fields */ }
std
only.Expand description
A cancel-safe and panic-safe variant of tokio::sync::Mutex
.
A RobustMutex
is a wrapper on top of a tokio::sync::Mutex
which adds two further
guarantees: panic safety and cancel safety. Both of these guarantees are implemented to
ensure that mutex invariants aren’t violated to the greatest extent possible.
§Motivation
A mutex is a synchronization structure which allows only one task to access some data at a time. The general idea behind a mutex is that the data it owns has some invariants. When a task acquires a lock on the mutex, it enters a critical section. Within this critical section, the invariants can temporarily be violated. It is expected that the task will restore those invariants before releasing the lock.
For example, let’s say that we have a mutex which guards two HashMap
s, with the invariant that
mutex are that the two HashMap
s always contain the same keys. With a Tokio mutex, you might
write something like:
use std::collections::HashMap;
use tokio::sync::Mutex;
struct MyStruct {
map1: HashMap<String, String>,
map2: HashMap<String, u32>,
}
impl MyStruct {
fn new() -> Self {
Self {
map1: HashMap::new(),
map2: HashMap::new(),
}
}
}
#[tokio::main]
async fn main() {
let mutex = Mutex::new(MyStruct::new());
let mut guard = mutex.lock().await;
guard.map1.insert("hello".to_owned(), "world".to_owned()); // (1)
// ... some code in between
guard.map2.insert("hello".to_owned(), 42); // (2)
// (This happens implicitly but is made explicit here.)
std::mem::drop(guard);
}
At point (1) we’ve temporarily violated the invariant that map1
and map2
contain the same
keys. However, at point (2) the invariant is restored.
-
But what if the task panics between (1) and (2)? In that case, the mutex is left in a state where the invariants are violated. This is a problem because this is an inconsistent state – other tasks which acquire the lock can no longer assume that the invariants are upheld.
This is the problem that panic safety solves.
-
In async code, what if there’s an await point between (1) and (2), and the future is dropped at that await point? Then, too, the invariants are violated. With synchronous code the only possible interruptions in the middle of a critical section are due to panics, but with async code cancellations are a fact of life.
This is the problem that cancel safety solves.
Both of these problems can also be solved in an ad-hoc manner (for example, by carefully checking for and restoring invariants at the start of each critical section). However, the goal of this mutex is to provide a systematic, if conservative, solution to these problems.
§Panic safety
Like std::sync::Mutex
but unlike tokio::sync::Mutex
, this mutex implements a strategy
called “poisoning” where a mutex is considered poisoned whenever a task panics within one of the
ActionPermit
perform methods. Once a mutex is poisoned, all other tasks are unable to access
the data by default.
This means that the lock
and try_lock
methods return a
Result
which indicates whether a mutex has been poisoned or not. Most usage of a mutex will
simply unwrap()
these results, propagating panics among tasks to ensure that
a possibly invalid invariant is not witnessed.
A poisoned mutex, however, does not prevent all access to the underlying data. The
PoisonError
type has an
into_inner
method which will return the guard that
would have otherwise been returned on a successful lock. This allows access to the data, despite
the lock being poisoned.
§Cancel safety
To guard against async cancellations in the middle of the critical section, the mutex uses a
callback approach. This is done by returning ActionPermit
instances which provide access to
the guarded data in two ways:
perform()
, which accepts a synchronous closure that cannot have await points within it.perform_async_boxed()
andperform_async_boxed_local()
, which accept asynchronous closures. If the future returned by these methods is cancelled in the middle of execution, the mutex is marked as poisoned.
In general, it is recommended that perform()
be used and mutexes not be held across await
points at all, since that can cause performance and correctness issues.
Not using an RAII guard like std::sync::MutexGuard
does mean that there are patterns that
are not possible with this mutex. For example, you cannot perform a pattern where:
- You acquire a lock L₁.
- You acquire a second lock L₂.
- You release L₁.
- You release L₂.
If you really do need to do this or more complicated patterns, std::sync::Mutex
and
tokio::sync::Mutex
remain available.
§Examples
The above example, rewritten to use a RobustMutex
, would look like:
use cancel_safe_futures::sync::RobustMutex;
use std::collections::HashMap;
struct MyStruct {
map1: HashMap<String, String>,
map2: HashMap<String, u32>,
}
impl MyStruct {
fn new() -> Self { /* ... */ }
}
#[tokio::main]
async fn main() {
let mutex = RobustMutex::new(MyStruct::new());
let mut permit = mutex.lock().await.unwrap(); // note unwrap() here
permit.perform(|data| {
data.map1.insert("hello".to_owned(), "world".to_owned()); // (1)
// ... some code in between
data.map2.insert("hello".to_owned(), 42); // (2)
});
}
§Features
Basic mutex operations are supported. In the future, this will support:
- An
OwnedActionPermit
, similar totokio::sync::OwnedMutexGuard
.
Mapped action permits similar to tokio::sync::MappedMutexGuard
will likely not be supported
because it’s hard to define panic and cancel safety in that scenario.
§Why “robust”?
The name is derived from POSIX’s pthread_mutexattr_getrobust
and
pthread_mutexattr_setrobust
.
These functions aim to achieve very similar goals to this mutex, except in slightly different
circumstances (thread cancellations and terminations rather than task cancellations and
panics).
Implementations§
source§impl<T: ?Sized> RobustMutex<T>
impl<T: ?Sized> RobustMutex<T>
sourcepub fn new(value: T) -> Selfwhere
T: Sized,
pub fn new(value: T) -> Selfwhere
T: Sized,
Creates a new lock in an unlocked state ready for use.
§Examples
use cancel_safe_futures::sync::RobustMutex;
let lock = RobustMutex::new(5);
sourcepub const fn const_new(value: T) -> Selfwhere
T: Sized,
Available on crate feature parking_lot
only.
pub const fn const_new(value: T) -> Selfwhere
T: Sized,
parking_lot
only.Creates a new lock in an unlocked state ready for use.
§Examples
use cancel_safe_futures::sync::RobustMutex;
static LOCK: RobustMutex<i32> = RobustMutex::const_new(5);
sourcepub async fn lock(&self) -> LockResult<ActionPermit<'_, T>>
pub async fn lock(&self) -> LockResult<ActionPermit<'_, T>>
Locks this mutex, causing the current task to yield until the lock has been acquired. When
the lock has been acquired, function returns a ActionPermit
.
§Errors
If another user of this mutex panicked while holding the mutex, then this call will return an error once the mutex is acquired.
§Cancel safety
This method uses a queue to fairly distribute locks in the order they were requested.
Cancelling a call to lock
makes you lose your place in the queue.
§Examples
use cancel_safe_futures::sync::RobustMutex;
#[tokio::main]
async fn main() {
let mutex = RobustMutex::new(1);
let mut permit = mutex.lock().await.unwrap();
permit.perform(|n| *n = 2);
}
sourcepub fn blocking_lock(&self) -> LockResult<ActionPermit<'_, T>>
pub fn blocking_lock(&self) -> LockResult<ActionPermit<'_, T>>
Blockingly locks this Mutex
. When the lock has been acquired, the function returns a
ActionPermit
.
This method is intended for use cases where you need to use this mutex in asynchronous code as well as in synchronous code.
§Errors
If another user of this mutex panicked while holding the mutex, then this call will return an error once the mutex is acquired.
§Panics
This function panics if called within an asynchronous execution context.
§Examples
use cancel_safe_futures::sync::RobustMutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let mutex = Arc::new(RobustMutex::new(1));
let permit = mutex.lock().await.unwrap();
let mutex1 = Arc::clone(&mutex);
let blocking_task = tokio::task::spawn_blocking(move || {
// This shall block until the `lock` is released.
let permit = mutex1.blocking_lock().unwrap();
permit.perform(|n| *n = 2);
});
permit.perform(|n| { assert_eq!(*n, 1) });
// Await the completion of the blocking task.
blocking_task.await.unwrap();
// Assert uncontended.
let permit = mutex.try_lock().unwrap();
permit.perform(|n| { assert_eq!(*n, 2) });
}
sourcepub fn try_lock(&self) -> TryLockResult<ActionPermit<'_, T>>
pub fn try_lock(&self) -> TryLockResult<ActionPermit<'_, T>>
Attempts to acquire the lock, returning an ActionPermit
if successful.
§Errors
Returns TryLockError::WouldBlock
if the lock is currently held somewhere else.
Returns TryLockError::Poisoned
if another thread panicked while holding the lock.
§Examples
use cancel_safe_futures::sync::RobustMutex;
#[tokio::main]
async fn main() {
let mutex = RobustMutex::new(1);
let permit = mutex.try_lock().unwrap();
permit.perform(|n| {
assert_eq!(*n, 1);
});
}
sourcepub fn is_poisoned(&self) -> bool
pub fn is_poisoned(&self) -> bool
Determines whether the mutex is poisoned.
This is equivalent to Self::is_panic_poisoned
||
Self::is_cancel_poisoned
.
If another task is active, the mutex can still become poisoned at any time. You should not
trust a false
value for program correctness without additional synchronization.
sourcepub fn is_panic_poisoned(&self) -> bool
pub fn is_panic_poisoned(&self) -> bool
Determines whether the mutex is poisoned due to a panic.
If another task is active, the mutex can still become poisoned at any time. You should not
trust a false
value for program correctness without additional synchronization.
§Examples
use cancel_safe_futures::sync::RobustMutex;
use std::sync::Arc;
let mutex = Arc::new(RobustMutex::new(0));
let c_mutex = Arc::clone(&mutex);
let _ = tokio::task::spawn(async move {
let permit = c_mutex.lock().await.unwrap();
permit.perform(|_| {
panic!(); // the mutex gets poisoned
});
}).await;
assert!(mutex.is_panic_poisoned());
sourcepub fn is_cancel_poisoned(&self) -> bool
pub fn is_cancel_poisoned(&self) -> bool
Determines whether this mutex is poisoned due to a cancellation.
If another task is active, the mutex can still become poisoned at any time. You should not
trust a false
value for program correctness without additional synchronization.
§Examples
use cancel_safe_futures::sync::RobustMutex;
use futures::FutureExt;
use std::sync::Arc;
let mutex = Arc::new(RobustMutex::new(0));
let c_mutex = Arc::clone(&mutex);
tokio::task::spawn(async move {
let permit = c_mutex.lock().await.unwrap();
let fut = permit.perform_async_boxed(|n| async move {
// Sleep for 1 second.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
*n = 1;
}.boxed());
tokio::select! {
_ = fut => {
panic!("this branch should not be encountered");
}
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
// Exit the task, causing `fut` to be cancelled after 100ms.
}
}
}).await.unwrap();
assert!(mutex.is_cancel_poisoned());
sourcepub fn get_mut(&mut self) -> &mut T
pub fn get_mut(&mut self) -> &mut T
Returns a mutable reference to the underlying data.
Since this call borrows the Mutex
mutably, no actual locking needs to
take place – the mutable borrow statically guarantees no locks exist.
§Examples
use cancel_safe_futures::sync::RobustMutex;
let mut mutex = RobustMutex::new(1);
let n = mutex.get_mut();
*n = 2;
sourcepub fn into_inner(self) -> LockResult<T>where
T: Sized,
pub fn into_inner(self) -> LockResult<T>where
T: Sized,
Consumes the mutex, returning the underlying data.
§Errors
If another user of this mutex panicked while holding the mutex, then this call will return an error.
§Examples
use cancel_safe_futures::sync::RobustMutex;
#[tokio::main]
async fn main() {
let mutex = RobustMutex::new(1);
let n = mutex.into_inner().unwrap();
assert_eq!(n, 1);
}