Skip to main content

cancel_safe_futures/sync/
mutex.rs

1use super::poison::{self, LockResult, TryLockError, TryLockResult};
2use futures_core::future::{BoxFuture, LocalBoxFuture};
3use std::fmt;
4use tokio::sync::MutexGuard;
5
6/// A cancel-safe and panic-safe variant of [`tokio::sync::Mutex`].
7///
8/// A `RobustMutex` is a wrapper on top of a [`tokio::sync::Mutex`] which adds two further
9/// guarantees: *panic safety* and *cancel safety*. Both of these guarantees are implemented to
10/// ensure that mutex invariants aren't violated to the greatest extent possible.
11///
12/// # Motivation
13///
14/// A mutex is a synchronization structure which allows only one task to access some data at a time.
15/// The general idea behind a mutex is that the data it owns has some *invariants*. When a task
16/// acquires a lock on the mutex, it enters a *critical section*. Within this critical section, the
17/// invariants can temporarily be violated. It is expected that the task will restore those
18/// invariants before releasing the lock.
19///
20/// For example, let's say that we have a mutex which guards two `HashMap`s, with the invariant that
21/// mutex are that the two `HashMap`s always contain the same keys. With a Tokio mutex, you might
22/// write something like:
23///
24/// ```rust
25/// use std::collections::HashMap;
26/// use tokio::sync::Mutex;
27///
28/// struct MyStruct {
29///     map1: HashMap<String, String>,
30///     map2: HashMap<String, u32>,
31/// }
32///
33/// impl MyStruct {
34///     fn new() -> Self {
35///         Self {
36///             map1: HashMap::new(),
37///             map2: HashMap::new(),
38///         }
39///     }
40/// }
41///
42/// #[tokio::main]
43/// async fn main() {
44///     let mutex = Mutex::new(MyStruct::new());
45///
46///     let mut guard = mutex.lock().await;
47///     guard.map1.insert("hello".to_owned(), "world".to_owned());  // (1)
48///     // ... some code in between
49///     guard.map2.insert("hello".to_owned(), 42);  // (2)
50///
51///     // (This happens implicitly but is made explicit here.)
52///     std::mem::drop(guard);
53/// }
54/// ```
55///
56/// At point (1) we've temporarily violated the invariant that `map1` and `map2` contain the same
57/// keys. However, at point (2) the invariant is restored.
58///
59/// * But what if the task panics between (1) and (2)? In that case, the mutex is left in a state
60///   where the invariants are violated. This is a problem because this is an inconsistent state --
61///   other tasks which acquire the lock can no longer assume that the invariants are upheld.
62///
63///   This is the problem that *panic safety* solves.
64///
65/// * In async code, what if there's an await point between (1) and (2), and the future is dropped
66///   at that await point? Then, too, the invariants are violated. With synchronous code the only
67///   possible interruptions in the middle of a critical section are due to panics, but with async
68///   code cancellations are a fact of life.
69///
70///   This is the problem that *cancel safety* solves.
71///
72/// Both of these problems can also be solved in an ad-hoc manner (for example, by carefully
73/// checking for and restoring invariants at the start of each critical section). However, **the
74/// goal of this mutex is to provide a systematic, if conservative, solution to these problems.**
75///
76/// # Panic safety
77///
78/// Like [`std::sync::Mutex`] but *unlike* [`tokio::sync::Mutex`], this mutex implements a strategy
79/// called "poisoning" where a mutex is considered poisoned whenever a task panics within one of the
80/// [`ActionPermit`] perform methods. Once a mutex is poisoned, all other tasks are unable to access
81/// the data by default.
82///
83/// This means that the [`lock`](Self::lock) and [`try_lock`](Self::try_lock) methods return a
84/// [`Result`] which indicates whether a mutex has been poisoned or not. Most usage of a mutex will
85/// simply [`unwrap()`](Result::unwrap) these results, propagating panics among tasks to ensure that
86/// a possibly invalid invariant is not witnessed.
87///
88/// A poisoned mutex, however, does not prevent all access to the underlying data. The
89/// [`PoisonError`](crate::sync::PoisonError) type has an
90/// [`into_inner`](crate::sync::PoisonError::into_inner) method which will return the guard that
91/// would have otherwise been returned on a successful lock. This allows access to the data, despite
92/// the lock being poisoned.
93///
94/// # Cancel safety
95///
96/// To guard against async cancellations in the middle of the critical section, the mutex uses a
97/// callback approach. This is done by returning [`ActionPermit`] instances which provide access to
98/// the guarded data in two ways:
99///
100/// 1. [`perform()`], which accepts a synchronous closure that cannot have await points within it.
101/// 2. [`perform_async_boxed()`] and [`perform_async_boxed_local()`], which accept asynchronous
102///    closures. If the future returned by these methods is cancelled in the middle of execution,
103///    the mutex is marked as poisoned.
104///
105/// In general, it is recommended that [`perform()`] be used and mutexes not be held across await
106/// points at all, since that can cause performance and correctness issues.
107///
108/// Not using an RAII guard like [`std::sync::MutexGuard`] does mean that there are patterns that
109/// are not possible with this mutex. For example, you cannot perform a pattern where:
110///
111/// 1. You acquire a lock *L₁*.
112/// 2. You acquire a second lock *L₂*.
113/// 3. You release *L₁*.
114/// 4. You release *L₂*.
115///
116/// If you really do need to do this or more complicated patterns, [`std::sync::Mutex`] and
117/// [`tokio::sync::Mutex`] remain available.
118///
119/// # Examples
120///
121/// The above example, rewritten to use a `RobustMutex`, would look like:
122///
123/// ```
124/// use cancel_safe_futures::sync::RobustMutex;
125/// use std::collections::HashMap;
126///
127/// struct MyStruct {
128///     map1: HashMap<String, String>,
129///     map2: HashMap<String, u32>,
130/// }
131///
132/// impl MyStruct {
133/// # /*
134///     fn new() -> Self { /* ... */ }
135/// # */
136/// #    fn new() -> Self {
137/// #        Self {
138/// #            map1: HashMap::new(),
139/// #            map2: HashMap::new(),
140/// #        }
141/// #    }
142/// }
143///
144/// #[tokio::main]
145/// async fn main() {
146///     let mutex = RobustMutex::new(MyStruct::new());
147///
148///     let mut permit = mutex.lock().await.unwrap();  // note unwrap() here
149///     permit.perform(|data| {
150///         data.map1.insert("hello".to_owned(), "world".to_owned());  // (1)
151///         // ... some code in between
152///         data.map2.insert("hello".to_owned(), 42);  // (2)
153///     });
154/// }
155/// ```
156///
157/// # Features
158///
159/// Basic mutex operations are supported. In the future, this will support:
160///
161/// - An `OwnedActionPermit`, similar to [`tokio::sync::OwnedMutexGuard`].
162///
163/// Mapped action permits similar to [`tokio::sync::MappedMutexGuard`] will likely not be supported
164/// because it's hard to define panic and cancel safety in that scenario.
165///
166/// # Why "robust"?
167///
168/// The name is derived from POSIX's [`pthread_mutexattr_getrobust` and
169/// `pthread_mutexattr_setrobust`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutexattr_getrobust.html).
170/// These functions aim to achieve very similar goals to this mutex, except in slightly different
171/// circumstances (*thread* cancellations and terminations rather than *task* cancellations and
172/// panics).
173///
174/// [`perform()`]: ActionPermit::perform
175/// [`perform_async_boxed()`]: ActionPermit::perform_async_boxed
176/// [`perform_async_boxed_local()`]: ActionPermit::perform_async_boxed_local
177pub struct RobustMutex<T: ?Sized> {
178    poison: poison::Flag,
179    inner: tokio::sync::Mutex<T>,
180}
181
182impl<T: ?Sized> RobustMutex<T> {
183    /// Creates a new lock in an unlocked state ready for use.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use cancel_safe_futures::sync::RobustMutex;
189    ///
190    /// let lock = RobustMutex::new(5);
191    /// ```
192    #[track_caller]
193    pub fn new(value: T) -> Self
194    where
195        T: Sized,
196    {
197        Self {
198            inner: tokio::sync::Mutex::new(value),
199            poison: poison::Flag::new(),
200        }
201    }
202
203    /// Creates a new lock in an unlocked state ready for use.
204    ///
205    /// # Examples
206    ///
207    /// ```
208    /// use cancel_safe_futures::sync::RobustMutex;
209    ///
210    /// static LOCK: RobustMutex<i32> = RobustMutex::const_new(5);
211    /// ```
212    #[cfg(all(feature = "parking_lot", not(test)))]
213    #[cfg_attr(doc_cfg, doc(cfg(feature = "parking_lot")))]
214    pub const fn const_new(value: T) -> Self
215    where
216        T: Sized,
217    {
218        Self {
219            inner: tokio::sync::Mutex::const_new(value),
220            poison: poison::Flag::new(),
221        }
222    }
223
224    /// Locks this mutex, causing the current task to yield until the lock has been acquired.  When
225    /// the lock has been acquired, function returns a [`ActionPermit`].
226    ///
227    /// # Errors
228    ///
229    /// If another user of this mutex panicked while holding the mutex, then this call will return
230    /// an error once the mutex is acquired.
231    ///
232    /// # Cancel safety
233    ///
234    /// This method uses a queue to fairly distribute locks in the order they were requested.
235    /// Cancelling a call to `lock` makes you lose your place in the queue.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use cancel_safe_futures::sync::RobustMutex;
241    ///
242    /// #[tokio::main]
243    /// async fn main() {
244    ///     let mutex = RobustMutex::new(1);
245    ///
246    ///     let mut permit = mutex.lock().await.unwrap();
247    ///     permit.perform(|n| *n = 2);
248    /// }
249    /// ```
250    pub async fn lock(&self) -> LockResult<ActionPermit<'_, T>> {
251        let guard = self.inner.lock().await;
252        ActionPermit::new(guard, &self.poison)
253    }
254
255    /// Blockingly locks this `Mutex`. When the lock has been acquired, the function returns a
256    /// [`ActionPermit`].
257    ///
258    /// This method is intended for use cases where you need to use this mutex in asynchronous code
259    /// as well as in synchronous code.
260    ///
261    /// # Errors
262    ///
263    /// If another user of this mutex panicked while holding the mutex, then this call will return
264    /// an error once the mutex is acquired.
265    ///
266    /// # Panics
267    ///
268    /// This function panics if called within an asynchronous execution context.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// use cancel_safe_futures::sync::RobustMutex;
274    /// use std::sync::Arc;
275    ///
276    /// #[tokio::main]
277    /// async fn main() {
278    ///     let mutex = Arc::new(RobustMutex::new(1));
279    ///     let permit = mutex.lock().await.unwrap();
280    ///
281    ///     let mutex1 = Arc::clone(&mutex);
282    ///     let blocking_task = tokio::task::spawn_blocking(move || {
283    ///         // This shall block until the `lock` is released.
284    ///         let permit = mutex1.blocking_lock().unwrap();
285    ///         permit.perform(|n| *n = 2);
286    ///     });
287    ///
288    ///     permit.perform(|n| { assert_eq!(*n, 1) });
289    ///
290    ///     // Await the completion of the blocking task.
291    ///     blocking_task.await.unwrap();
292    ///
293    ///     // Assert uncontended.
294    ///     let permit = mutex.try_lock().unwrap();
295    ///     permit.perform(|n| { assert_eq!(*n, 2) });
296    /// }
297    /// ```
298    #[track_caller]
299    #[cfg_attr(doc_cfg, doc(alias = "lock_blocking"))]
300    pub fn blocking_lock(&self) -> LockResult<ActionPermit<'_, T>> {
301        let guard = self.inner.blocking_lock();
302        ActionPermit::new(guard, &self.poison)
303    }
304
305    /// Attempts to acquire the lock, returning an [`ActionPermit`] if successful.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`TryLockError::WouldBlock`] if the lock is currently held somewhere else.
310    ///
311    /// Returns [`TryLockError::Poisoned`] if another thread panicked while holding the lock.
312    ///
313    /// # Examples
314    ///
315    /// ```
316    /// use cancel_safe_futures::sync::RobustMutex;
317    ///
318    /// #[tokio::main]
319    /// async fn main() {
320    ///     let mutex = RobustMutex::new(1);
321    ///
322    ///     let permit = mutex.try_lock().unwrap();
323    ///     permit.perform(|n| {
324    ///         assert_eq!(*n, 1);
325    ///     });
326    /// }
327    /// ```
328    pub fn try_lock(&self) -> TryLockResult<ActionPermit<'_, T>> {
329        match self.inner.try_lock() {
330            Ok(guard) => ActionPermit::new(guard, &self.poison).map_err(TryLockError::Poisoned),
331            Err(_) => Err(TryLockError::WouldBlock),
332        }
333    }
334
335    /// Determines whether the mutex is poisoned.
336    ///
337    /// This is equivalent to [`Self::is_panic_poisoned`]` || `[`Self::is_cancel_poisoned`].
338    ///
339    /// If another task is active, the mutex can still become poisoned at any time. You should not
340    /// trust a `false` value for program correctness without additional synchronization.
341    #[inline]
342    pub fn is_poisoned(&self) -> bool {
343        self.poison.get_flags() != poison::NO_POISON
344    }
345
346    /// Determines whether the mutex is poisoned due to a panic.
347    ///
348    /// If another task is active, the mutex can still become poisoned at any time. You should not
349    /// trust a `false` value for program correctness without additional synchronization.
350    ///
351    /// # Examples
352    ///
353    /// ```
354    /// use cancel_safe_futures::sync::RobustMutex;
355    /// use std::sync::Arc;
356    ///
357    /// # #[tokio::main]
358    /// # async fn main() {
359    ///
360    /// let mutex = Arc::new(RobustMutex::new(0));
361    /// let c_mutex = Arc::clone(&mutex);
362    ///
363    /// let _ = tokio::task::spawn(async move {
364    ///     let permit = c_mutex.lock().await.unwrap();
365    ///     permit.perform(|_| {
366    ///         panic!(); // the mutex gets poisoned
367    ///     });
368    /// }).await;
369    ///
370    /// assert!(mutex.is_panic_poisoned());
371    /// # }
372    /// ```
373    #[inline]
374    pub fn is_panic_poisoned(&self) -> bool {
375        self.poison.get_flags() & poison::PANIC_POISON != 0
376    }
377
378    /// Determines whether this mutex is poisoned due to a cancellation.
379    ///
380    /// If another task is active, the mutex can still become poisoned at any time. You should not
381    /// trust a `false` value for program correctness without additional synchronization.
382    ///
383    /// # Examples
384    ///
385    /// ```
386    /// use cancel_safe_futures::sync::RobustMutex;
387    /// use futures::FutureExt;
388    /// use std::sync::Arc;
389    ///
390    /// # #[tokio::main]
391    /// # async fn main() {
392    ///
393    /// let mutex = Arc::new(RobustMutex::new(0));
394    /// let c_mutex = Arc::clone(&mutex);
395    ///
396    /// tokio::task::spawn(async move {
397    ///     let permit = c_mutex.lock().await.unwrap();
398    ///     let fut = permit.perform_async_boxed(|n| async move {
399    ///         // Sleep for 1 second.
400    ///         tokio::time::sleep(std::time::Duration::from_secs(1)).await;
401    ///         *n = 1;
402    ///     }.boxed());
403    ///     tokio::select! {
404    ///         _ = fut => {
405    ///             panic!("this branch should not be encountered");
406    ///         }
407    ///         _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
408    ///             // Exit the task, causing `fut` to be cancelled after 100ms.
409    ///         }
410    ///     }
411    /// }).await.unwrap();
412    ///
413    /// assert!(mutex.is_cancel_poisoned());
414    ///
415    /// # }
416    /// ```
417    #[inline]
418    pub fn is_cancel_poisoned(&self) -> bool {
419        self.poison.get_flags() & poison::CANCEL_POISON != 0
420    }
421
422    /// Returns a mutable reference to the underlying data.
423    ///
424    /// Since this call borrows the `Mutex` mutably, no actual locking needs to
425    /// take place -- the mutable borrow statically guarantees no locks exist.
426    ///
427    /// # Examples
428    ///
429    /// ```
430    /// use cancel_safe_futures::sync::RobustMutex;
431    ///
432    /// let mut mutex = RobustMutex::new(1);
433    ///
434    /// let n = mutex.get_mut();
435    /// *n = 2;
436    /// ```
437    #[inline]
438    pub fn get_mut(&mut self) -> &mut T {
439        self.inner.get_mut()
440    }
441
442    /// Consumes the mutex, returning the underlying data.
443    ///
444    /// # Errors
445    ///
446    /// If another user of this mutex panicked while holding the mutex, then this call will return
447    /// an error.
448    ///
449    /// # Examples
450    ///
451    /// ```
452    /// use cancel_safe_futures::sync::RobustMutex;
453    ///
454    /// #[tokio::main]
455    /// async fn main() {
456    ///     let mutex = RobustMutex::new(1);
457    ///
458    ///     let n = mutex.into_inner().unwrap();
459    ///     assert_eq!(n, 1);
460    /// }
461    /// ```
462    pub fn into_inner(self) -> LockResult<T>
463    where
464        T: Sized,
465    {
466        let data = self.inner.into_inner();
467        poison::map_result(self.poison.borrow(), |()| data)
468    }
469}
470
471impl<T> Default for RobustMutex<T>
472where
473    T: Default,
474{
475    #[inline]
476    fn default() -> Self {
477        Self::new(Default::default())
478    }
479}
480
481impl<T> From<T> for RobustMutex<T> {
482    #[inline]
483    fn from(t: T) -> Self {
484        Self::new(t)
485    }
486}
487
488impl<T: ?Sized + fmt::Debug> fmt::Debug for RobustMutex<T> {
489    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490        let mut d = f.debug_struct("RobustMutex");
491        match self.try_lock() {
492            Ok(inner) => d.field("data", &inner.guard),
493            Err(_) => d.field("data", &format_args!("<locked>")),
494        };
495        d.field("poisoned", &self.poison);
496        d.finish()
497    }
498}
499
500/// A token that grants the ability to run one closure against the data guarded by a
501/// [`RobustMutex`].
502///
503/// This is produced by the `lock` family of operations on [`RobustMutex`] and is intended to
504/// provide robust cancel safety.
505///
506/// For more information, see the documentation for [`RobustMutex`].
507///
508/// # Why is this its own type?
509///
510/// A question some users might have is: why not combine `lock` and `perform`? Why have this type
511/// that sits in the middle?
512///
513/// The answer is that this structure is necessary to provide cancel safety. Consider what happens
514/// with a hypothetical `lock_and_perform` function. Let's say we use it in a `select!` statement
515/// thus:
516///
517/// ```rust,no_run
518/// use std::sync::LockResult;
519/// use std::time::Duration;
520/// use tokio::time::sleep;
521///
522/// # /*
523/// struct MyMutex<T> { /* ... */ }
524/// # */
525/// # struct MyMutex<T> { _marker: std::marker::PhantomData<T> }
526///
527/// impl<T> MyMutex<T> {
528///     fn new(data: T) -> Self {
529///         /* ... */
530///         todo!();
531///     }
532///     async fn lock_and_perform<U>(self, action: impl FnOnce(&mut T) -> U) -> LockResult<U> {
533///         /* ... */
534/// #       todo!()
535///     }
536/// }
537///
538/// // Represents some kind of type that is unique and can't be cloned.
539/// struct NonCloneableType(u32);
540///
541/// #[tokio::main]
542/// async fn main() {
543///     let mutex = MyMutex::new(1);
544///     let data = NonCloneableType(2);
545///     let sleep = sleep(Duration::from_secs(1));
546///
547///     let fut = mutex.lock_and_perform(|n| {
548///         *n = data.0;
549///     });
550///
551///     tokio::select! {
552///         _ = fut => {
553///             /* ... */
554///         }
555///         _ = sleep => {
556///             /* ... */
557///         }
558///     }
559/// }
560/// ```
561///
562/// Then, if `sleep` fires before `fut`, the non-cloneable type is dropped without being used. This
563/// leads to cancel unsafety.
564///
565/// This is very similar to the cancel unsafety that [`futures::SinkExt::send`] has, and that this
566/// crate's [`SinkExt::reserve`](crate::SinkExt::reserve) solves.
567#[derive(Debug)]
568pub struct ActionPermit<'a, T: ?Sized> {
569    poison: &'a poison::Flag,
570    guard: MutexGuard<'a, T>,
571}
572
573impl<'a, T: ?Sized> ActionPermit<'a, T> {
574    /// Invariant: the mutex must be locked when this is called. (This is ensured by requiring a
575    /// guard).
576    #[inline]
577    fn new(guard: MutexGuard<'a, T>, poison: &'a poison::Flag) -> LockResult<Self> {
578        poison::map_result(poison.borrow(), |()| Self { poison, guard })
579    }
580
581    /// Runs a closure with access to the guarded data, consuming the permit in the process and
582    /// unlocking the mutex once the closure completes.
583    ///
584    /// This is a synchronous closure, which means that it cannot have await points within it. This
585    /// guarantees cancel safety for this mutex.
586    ///
587    /// # Notes
588    ///
589    /// `action` is *not* run inside a synchronous context. This means that operations like
590    /// [`tokio::sync::mpsc::Sender::blocking_send`] will panic inside `action`.
591    ///
592    /// If `action` panics, the mutex is marked poisoned.
593    ///
594    /// # Examples
595    ///
596    /// ```
597    /// use cancel_safe_futures::sync::RobustMutex;
598    ///
599    /// #[tokio::main]
600    /// async fn main() {
601    ///     let mutex = RobustMutex::new(1);
602    ///
603    ///     let permit = mutex.lock().await.unwrap();
604    ///     permit.perform(|n| *n = 2);
605    /// }
606    /// ```
607    pub fn perform<R, F>(mut self, action: F) -> R
608    where
609        F: FnOnce(&mut T) -> R,
610    {
611        let poison_guard = self.poison.guard_assuming_no_poison();
612        let _poisoner = Poisoner {
613            poison: self.poison,
614            poison_guard,
615        };
616
617        action(&mut *self.guard)
618
619        // Note: we're relying on the Drop impl for `_poisoner` to unlock the mutex.
620    }
621
622    /// Runs an asynchronous block in the context of the guarded data, consuming the permit in the
623    /// process and unlocking the mutex once the block completes.
624    ///
625    /// In general, holding asynchronous locks across await points can lead to surprising
626    /// performance issues. It is strongly recommended that [`perform`](Self::perform) is used, or
627    /// that the code is rewritten to use message passing.
628    ///
629    /// # Notes
630    ///
631    /// The mutex is marked poisoned if any of the following occur:
632    ///
633    /// * The future returned by `action` panics.
634    /// * The future returned by this async function is cancelled before being driven to completion.
635    ///
636    /// Due to [limitations in stable
637    /// Rust](https://kevincox.ca/2022/04/16/rust-generic-closure-lifetimes), this accepts a dynamic
638    /// [`BoxFuture`] rather than a generic future. Once [async
639    /// closures](https://rust-lang.github.io/async-fundamentals-initiative/roadmap/async_closures.html)
640    /// are stabilized, this will switch to them.
641    ///
642    /// # Examples
643    ///
644    /// ```
645    /// use cancel_safe_futures::sync::RobustMutex;
646    /// use futures::FutureExt;  // for FutureExt::boxed()
647    /// use std::time::Duration;
648    ///
649    /// #[tokio::main]
650    /// async fn main() {
651    ///     let mutex = RobustMutex::new(1);
652    ///
653    ///     let permit = mutex.lock().await.unwrap();
654    ///     permit.perform_async_boxed(|n| {
655    ///         async move {
656    ///             tokio::time::sleep(
657    ///                 std::time::Duration::from_millis(100),
658    ///             ).await;
659    ///             *n = 2;
660    ///         }
661    ///         .boxed()
662    ///     }).await;
663    ///
664    ///     // Check that the new value of the mutex is 2.
665    ///     let permit = mutex.lock().await.unwrap();
666    ///     permit.perform(|n| assert_eq!(*n, 2));
667    /// }
668    /// ```
669    pub async fn perform_async_boxed<R, F>(mut self, action: F) -> R
670    where
671        F: for<'lock> FnOnce(&'lock mut T) -> BoxFuture<'lock, R>,
672    {
673        let poison_guard = self.poison.guard_assuming_no_poison();
674        let mut poisoner = AsyncPoisoner {
675            poison: self.poison,
676            poison_guard,
677            terminated: false,
678        };
679        // At this point, the future can:
680        // * panic, in which case both the panic and (since the future isn't complete) cancel poison
681        //   flags are set.
682        // * be dropped without being driven to completion, in which case the cancel poison flag is
683        //   set.
684        let ret = action(&mut *self.guard).await;
685
686        // At this point, the future has completed.
687        poisoner.terminated = true;
688        ret
689    }
690
691    /// Runs a non-`Send` asynchronous block in the context of the guarded data, consuming the
692    /// permit in the process and unlocking the mutex once the block completes.
693    ///
694    /// This is a variant of [`perform_async_boxed`](Self::perform_async_boxed) that allows the
695    /// future to be non-`Send`.
696    pub async fn perform_async_boxed_local<R, F>(mut self, action: F) -> R
697    where
698        F: for<'lock> FnOnce(&'lock mut T) -> LocalBoxFuture<'lock, R>,
699    {
700        let poison_guard = self.poison.guard_assuming_no_poison();
701        let mut poisoner = AsyncPoisoner {
702            poison: self.poison,
703            poison_guard,
704            terminated: false,
705        };
706        // At this point, the future can:
707        // * panic, in which case both the panic and (since the future isn't complete) cancel poison
708        //   flags are set.
709        // * be dropped without being driven to completion, in which case the cancel poison flag is
710        //   set.
711        let ret = action(&mut *self.guard).await;
712
713        // At this point, the future has completed.
714        poisoner.terminated = true;
715        ret
716    }
717}
718
719#[clippy::has_significant_drop]
720struct Poisoner<'a> {
721    poison: &'a poison::Flag,
722    poison_guard: poison::Guard,
723}
724
725impl<'a> Drop for Poisoner<'a> {
726    #[inline]
727    fn drop(&mut self) {
728        self.poison.done(&self.poison_guard, false);
729    }
730}
731
732#[clippy::has_significant_drop]
733struct AsyncPoisoner<'a> {
734    poison: &'a poison::Flag,
735    poison_guard: poison::Guard,
736    terminated: bool,
737}
738
739impl<'a> Drop for AsyncPoisoner<'a> {
740    #[inline]
741    fn drop(&mut self) {
742        self.poison.done(&self.poison_guard, !self.terminated);
743    }
744}