cancel_safe_futures/sync/mutex.rs
1use super::poison::{self, LockResult, TryLockError, TryLockResult};
2use futures_core::future::{BoxFuture, LocalBoxFuture};
3use std::fmt;
4use tokio::sync::MutexGuard;
5
6/// A cancel-safe and panic-safe variant of [`tokio::sync::Mutex`].
7///
8/// A `RobustMutex` is a wrapper on top of a [`tokio::sync::Mutex`] which adds two further
9/// guarantees: *panic safety* and *cancel safety*. Both of these guarantees are implemented to
10/// ensure that mutex invariants aren't violated to the greatest extent possible.
11///
12/// # Motivation
13///
14/// A mutex is a synchronization structure which allows only one task to access some data at a time.
15/// The general idea behind a mutex is that the data it owns has some *invariants*. When a task
16/// acquires a lock on the mutex, it enters a *critical section*. Within this critical section, the
17/// invariants can temporarily be violated. It is expected that the task will restore those
18/// invariants before releasing the lock.
19///
20/// For example, let's say that we have a mutex which guards two `HashMap`s, with the invariant that
21/// mutex are that the two `HashMap`s always contain the same keys. With a Tokio mutex, you might
22/// write something like:
23///
24/// ```rust
25/// use std::collections::HashMap;
26/// use tokio::sync::Mutex;
27///
28/// struct MyStruct {
29/// map1: HashMap<String, String>,
30/// map2: HashMap<String, u32>,
31/// }
32///
33/// impl MyStruct {
34/// fn new() -> Self {
35/// Self {
36/// map1: HashMap::new(),
37/// map2: HashMap::new(),
38/// }
39/// }
40/// }
41///
42/// #[tokio::main]
43/// async fn main() {
44/// let mutex = Mutex::new(MyStruct::new());
45///
46/// let mut guard = mutex.lock().await;
47/// guard.map1.insert("hello".to_owned(), "world".to_owned()); // (1)
48/// // ... some code in between
49/// guard.map2.insert("hello".to_owned(), 42); // (2)
50///
51/// // (This happens implicitly but is made explicit here.)
52/// std::mem::drop(guard);
53/// }
54/// ```
55///
56/// At point (1) we've temporarily violated the invariant that `map1` and `map2` contain the same
57/// keys. However, at point (2) the invariant is restored.
58///
59/// * But what if the task panics between (1) and (2)? In that case, the mutex is left in a state
60/// where the invariants are violated. This is a problem because this is an inconsistent state --
61/// other tasks which acquire the lock can no longer assume that the invariants are upheld.
62///
63/// This is the problem that *panic safety* solves.
64///
65/// * In async code, what if there's an await point between (1) and (2), and the future is dropped
66/// at that await point? Then, too, the invariants are violated. With synchronous code the only
67/// possible interruptions in the middle of a critical section are due to panics, but with async
68/// code cancellations are a fact of life.
69///
70/// This is the problem that *cancel safety* solves.
71///
72/// Both of these problems can also be solved in an ad-hoc manner (for example, by carefully
73/// checking for and restoring invariants at the start of each critical section). However, **the
74/// goal of this mutex is to provide a systematic, if conservative, solution to these problems.**
75///
76/// # Panic safety
77///
78/// Like [`std::sync::Mutex`] but *unlike* [`tokio::sync::Mutex`], this mutex implements a strategy
79/// called "poisoning" where a mutex is considered poisoned whenever a task panics within one of the
80/// [`ActionPermit`] perform methods. Once a mutex is poisoned, all other tasks are unable to access
81/// the data by default.
82///
83/// This means that the [`lock`](Self::lock) and [`try_lock`](Self::try_lock) methods return a
84/// [`Result`] which indicates whether a mutex has been poisoned or not. Most usage of a mutex will
85/// simply [`unwrap()`](Result::unwrap) these results, propagating panics among tasks to ensure that
86/// a possibly invalid invariant is not witnessed.
87///
88/// A poisoned mutex, however, does not prevent all access to the underlying data. The
89/// [`PoisonError`](crate::sync::PoisonError) type has an
90/// [`into_inner`](crate::sync::PoisonError::into_inner) method which will return the guard that
91/// would have otherwise been returned on a successful lock. This allows access to the data, despite
92/// the lock being poisoned.
93///
94/// # Cancel safety
95///
96/// To guard against async cancellations in the middle of the critical section, the mutex uses a
97/// callback approach. This is done by returning [`ActionPermit`] instances which provide access to
98/// the guarded data in two ways:
99///
100/// 1. [`perform()`], which accepts a synchronous closure that cannot have await points within it.
101/// 2. [`perform_async_boxed()`] and [`perform_async_boxed_local()`], which accept asynchronous
102/// closures. If the future returned by these methods is cancelled in the middle of execution,
103/// the mutex is marked as poisoned.
104///
105/// In general, it is recommended that [`perform()`] be used and mutexes not be held across await
106/// points at all, since that can cause performance and correctness issues.
107///
108/// Not using an RAII guard like [`std::sync::MutexGuard`] does mean that there are patterns that
109/// are not possible with this mutex. For example, you cannot perform a pattern where:
110///
111/// 1. You acquire a lock *L₁*.
112/// 2. You acquire a second lock *L₂*.
113/// 3. You release *L₁*.
114/// 4. You release *L₂*.
115///
116/// If you really do need to do this or more complicated patterns, [`std::sync::Mutex`] and
117/// [`tokio::sync::Mutex`] remain available.
118///
119/// # Examples
120///
121/// The above example, rewritten to use a `RobustMutex`, would look like:
122///
123/// ```
124/// use cancel_safe_futures::sync::RobustMutex;
125/// use std::collections::HashMap;
126///
127/// struct MyStruct {
128/// map1: HashMap<String, String>,
129/// map2: HashMap<String, u32>,
130/// }
131///
132/// impl MyStruct {
133/// # /*
134/// fn new() -> Self { /* ... */ }
135/// # */
136/// # fn new() -> Self {
137/// # Self {
138/// # map1: HashMap::new(),
139/// # map2: HashMap::new(),
140/// # }
141/// # }
142/// }
143///
144/// #[tokio::main]
145/// async fn main() {
146/// let mutex = RobustMutex::new(MyStruct::new());
147///
148/// let mut permit = mutex.lock().await.unwrap(); // note unwrap() here
149/// permit.perform(|data| {
150/// data.map1.insert("hello".to_owned(), "world".to_owned()); // (1)
151/// // ... some code in between
152/// data.map2.insert("hello".to_owned(), 42); // (2)
153/// });
154/// }
155/// ```
156///
157/// # Features
158///
159/// Basic mutex operations are supported. In the future, this will support:
160///
161/// - An `OwnedActionPermit`, similar to [`tokio::sync::OwnedMutexGuard`].
162///
163/// Mapped action permits similar to [`tokio::sync::MappedMutexGuard`] will likely not be supported
164/// because it's hard to define panic and cancel safety in that scenario.
165///
166/// # Why "robust"?
167///
168/// The name is derived from POSIX's [`pthread_mutexattr_getrobust` and
169/// `pthread_mutexattr_setrobust`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutexattr_getrobust.html).
170/// These functions aim to achieve very similar goals to this mutex, except in slightly different
171/// circumstances (*thread* cancellations and terminations rather than *task* cancellations and
172/// panics).
173///
174/// [`perform()`]: ActionPermit::perform
175/// [`perform_async_boxed()`]: ActionPermit::perform_async_boxed
176/// [`perform_async_boxed_local()`]: ActionPermit::perform_async_boxed_local
177pub struct RobustMutex<T: ?Sized> {
178 poison: poison::Flag,
179 inner: tokio::sync::Mutex<T>,
180}
181
182impl<T: ?Sized> RobustMutex<T> {
183 /// Creates a new lock in an unlocked state ready for use.
184 ///
185 /// # Examples
186 ///
187 /// ```
188 /// use cancel_safe_futures::sync::RobustMutex;
189 ///
190 /// let lock = RobustMutex::new(5);
191 /// ```
192 #[track_caller]
193 pub fn new(value: T) -> Self
194 where
195 T: Sized,
196 {
197 Self {
198 inner: tokio::sync::Mutex::new(value),
199 poison: poison::Flag::new(),
200 }
201 }
202
203 /// Creates a new lock in an unlocked state ready for use.
204 ///
205 /// # Examples
206 ///
207 /// ```
208 /// use cancel_safe_futures::sync::RobustMutex;
209 ///
210 /// static LOCK: RobustMutex<i32> = RobustMutex::const_new(5);
211 /// ```
212 #[cfg(all(feature = "parking_lot", not(test)))]
213 #[cfg_attr(doc_cfg, doc(cfg(feature = "parking_lot")))]
214 pub const fn const_new(value: T) -> Self
215 where
216 T: Sized,
217 {
218 Self {
219 inner: tokio::sync::Mutex::const_new(value),
220 poison: poison::Flag::new(),
221 }
222 }
223
224 /// Locks this mutex, causing the current task to yield until the lock has been acquired. When
225 /// the lock has been acquired, function returns a [`ActionPermit`].
226 ///
227 /// # Errors
228 ///
229 /// If another user of this mutex panicked while holding the mutex, then this call will return
230 /// an error once the mutex is acquired.
231 ///
232 /// # Cancel safety
233 ///
234 /// This method uses a queue to fairly distribute locks in the order they were requested.
235 /// Cancelling a call to `lock` makes you lose your place in the queue.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use cancel_safe_futures::sync::RobustMutex;
241 ///
242 /// #[tokio::main]
243 /// async fn main() {
244 /// let mutex = RobustMutex::new(1);
245 ///
246 /// let mut permit = mutex.lock().await.unwrap();
247 /// permit.perform(|n| *n = 2);
248 /// }
249 /// ```
250 pub async fn lock(&self) -> LockResult<ActionPermit<'_, T>> {
251 let guard = self.inner.lock().await;
252 ActionPermit::new(guard, &self.poison)
253 }
254
255 /// Blockingly locks this `Mutex`. When the lock has been acquired, the function returns a
256 /// [`ActionPermit`].
257 ///
258 /// This method is intended for use cases where you need to use this mutex in asynchronous code
259 /// as well as in synchronous code.
260 ///
261 /// # Errors
262 ///
263 /// If another user of this mutex panicked while holding the mutex, then this call will return
264 /// an error once the mutex is acquired.
265 ///
266 /// # Panics
267 ///
268 /// This function panics if called within an asynchronous execution context.
269 ///
270 /// # Examples
271 ///
272 /// ```
273 /// use cancel_safe_futures::sync::RobustMutex;
274 /// use std::sync::Arc;
275 ///
276 /// #[tokio::main]
277 /// async fn main() {
278 /// let mutex = Arc::new(RobustMutex::new(1));
279 /// let permit = mutex.lock().await.unwrap();
280 ///
281 /// let mutex1 = Arc::clone(&mutex);
282 /// let blocking_task = tokio::task::spawn_blocking(move || {
283 /// // This shall block until the `lock` is released.
284 /// let permit = mutex1.blocking_lock().unwrap();
285 /// permit.perform(|n| *n = 2);
286 /// });
287 ///
288 /// permit.perform(|n| { assert_eq!(*n, 1) });
289 ///
290 /// // Await the completion of the blocking task.
291 /// blocking_task.await.unwrap();
292 ///
293 /// // Assert uncontended.
294 /// let permit = mutex.try_lock().unwrap();
295 /// permit.perform(|n| { assert_eq!(*n, 2) });
296 /// }
297 /// ```
298 #[track_caller]
299 #[cfg_attr(doc_cfg, doc(alias = "lock_blocking"))]
300 pub fn blocking_lock(&self) -> LockResult<ActionPermit<'_, T>> {
301 let guard = self.inner.blocking_lock();
302 ActionPermit::new(guard, &self.poison)
303 }
304
305 /// Attempts to acquire the lock, returning an [`ActionPermit`] if successful.
306 ///
307 /// # Errors
308 ///
309 /// Returns [`TryLockError::WouldBlock`] if the lock is currently held somewhere else.
310 ///
311 /// Returns [`TryLockError::Poisoned`] if another thread panicked while holding the lock.
312 ///
313 /// # Examples
314 ///
315 /// ```
316 /// use cancel_safe_futures::sync::RobustMutex;
317 ///
318 /// #[tokio::main]
319 /// async fn main() {
320 /// let mutex = RobustMutex::new(1);
321 ///
322 /// let permit = mutex.try_lock().unwrap();
323 /// permit.perform(|n| {
324 /// assert_eq!(*n, 1);
325 /// });
326 /// }
327 /// ```
328 pub fn try_lock(&self) -> TryLockResult<ActionPermit<'_, T>> {
329 match self.inner.try_lock() {
330 Ok(guard) => ActionPermit::new(guard, &self.poison).map_err(TryLockError::Poisoned),
331 Err(_) => Err(TryLockError::WouldBlock),
332 }
333 }
334
335 /// Determines whether the mutex is poisoned.
336 ///
337 /// This is equivalent to [`Self::is_panic_poisoned`]` || `[`Self::is_cancel_poisoned`].
338 ///
339 /// If another task is active, the mutex can still become poisoned at any time. You should not
340 /// trust a `false` value for program correctness without additional synchronization.
341 #[inline]
342 pub fn is_poisoned(&self) -> bool {
343 self.poison.get_flags() != poison::NO_POISON
344 }
345
346 /// Determines whether the mutex is poisoned due to a panic.
347 ///
348 /// If another task is active, the mutex can still become poisoned at any time. You should not
349 /// trust a `false` value for program correctness without additional synchronization.
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// use cancel_safe_futures::sync::RobustMutex;
355 /// use std::sync::Arc;
356 ///
357 /// # #[tokio::main]
358 /// # async fn main() {
359 ///
360 /// let mutex = Arc::new(RobustMutex::new(0));
361 /// let c_mutex = Arc::clone(&mutex);
362 ///
363 /// let _ = tokio::task::spawn(async move {
364 /// let permit = c_mutex.lock().await.unwrap();
365 /// permit.perform(|_| {
366 /// panic!(); // the mutex gets poisoned
367 /// });
368 /// }).await;
369 ///
370 /// assert!(mutex.is_panic_poisoned());
371 /// # }
372 /// ```
373 #[inline]
374 pub fn is_panic_poisoned(&self) -> bool {
375 self.poison.get_flags() & poison::PANIC_POISON != 0
376 }
377
378 /// Determines whether this mutex is poisoned due to a cancellation.
379 ///
380 /// If another task is active, the mutex can still become poisoned at any time. You should not
381 /// trust a `false` value for program correctness without additional synchronization.
382 ///
383 /// # Examples
384 ///
385 /// ```
386 /// use cancel_safe_futures::sync::RobustMutex;
387 /// use futures::FutureExt;
388 /// use std::sync::Arc;
389 ///
390 /// # #[tokio::main]
391 /// # async fn main() {
392 ///
393 /// let mutex = Arc::new(RobustMutex::new(0));
394 /// let c_mutex = Arc::clone(&mutex);
395 ///
396 /// tokio::task::spawn(async move {
397 /// let permit = c_mutex.lock().await.unwrap();
398 /// let fut = permit.perform_async_boxed(|n| async move {
399 /// // Sleep for 1 second.
400 /// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
401 /// *n = 1;
402 /// }.boxed());
403 /// tokio::select! {
404 /// _ = fut => {
405 /// panic!("this branch should not be encountered");
406 /// }
407 /// _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
408 /// // Exit the task, causing `fut` to be cancelled after 100ms.
409 /// }
410 /// }
411 /// }).await.unwrap();
412 ///
413 /// assert!(mutex.is_cancel_poisoned());
414 ///
415 /// # }
416 /// ```
417 #[inline]
418 pub fn is_cancel_poisoned(&self) -> bool {
419 self.poison.get_flags() & poison::CANCEL_POISON != 0
420 }
421
422 /// Returns a mutable reference to the underlying data.
423 ///
424 /// Since this call borrows the `Mutex` mutably, no actual locking needs to
425 /// take place -- the mutable borrow statically guarantees no locks exist.
426 ///
427 /// # Examples
428 ///
429 /// ```
430 /// use cancel_safe_futures::sync::RobustMutex;
431 ///
432 /// let mut mutex = RobustMutex::new(1);
433 ///
434 /// let n = mutex.get_mut();
435 /// *n = 2;
436 /// ```
437 #[inline]
438 pub fn get_mut(&mut self) -> &mut T {
439 self.inner.get_mut()
440 }
441
442 /// Consumes the mutex, returning the underlying data.
443 ///
444 /// # Errors
445 ///
446 /// If another user of this mutex panicked while holding the mutex, then this call will return
447 /// an error.
448 ///
449 /// # Examples
450 ///
451 /// ```
452 /// use cancel_safe_futures::sync::RobustMutex;
453 ///
454 /// #[tokio::main]
455 /// async fn main() {
456 /// let mutex = RobustMutex::new(1);
457 ///
458 /// let n = mutex.into_inner().unwrap();
459 /// assert_eq!(n, 1);
460 /// }
461 /// ```
462 pub fn into_inner(self) -> LockResult<T>
463 where
464 T: Sized,
465 {
466 let data = self.inner.into_inner();
467 poison::map_result(self.poison.borrow(), |()| data)
468 }
469}
470
471impl<T> Default for RobustMutex<T>
472where
473 T: Default,
474{
475 #[inline]
476 fn default() -> Self {
477 Self::new(Default::default())
478 }
479}
480
481impl<T> From<T> for RobustMutex<T> {
482 #[inline]
483 fn from(t: T) -> Self {
484 Self::new(t)
485 }
486}
487
488impl<T: ?Sized + fmt::Debug> fmt::Debug for RobustMutex<T> {
489 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490 let mut d = f.debug_struct("RobustMutex");
491 match self.try_lock() {
492 Ok(inner) => d.field("data", &inner.guard),
493 Err(_) => d.field("data", &format_args!("<locked>")),
494 };
495 d.field("poisoned", &self.poison);
496 d.finish()
497 }
498}
499
500/// A token that grants the ability to run one closure against the data guarded by a
501/// [`RobustMutex`].
502///
503/// This is produced by the `lock` family of operations on [`RobustMutex`] and is intended to
504/// provide robust cancel safety.
505///
506/// For more information, see the documentation for [`RobustMutex`].
507///
508/// # Why is this its own type?
509///
510/// A question some users might have is: why not combine `lock` and `perform`? Why have this type
511/// that sits in the middle?
512///
513/// The answer is that this structure is necessary to provide cancel safety. Consider what happens
514/// with a hypothetical `lock_and_perform` function. Let's say we use it in a `select!` statement
515/// thus:
516///
517/// ```rust,no_run
518/// use std::sync::LockResult;
519/// use std::time::Duration;
520/// use tokio::time::sleep;
521///
522/// # /*
523/// struct MyMutex<T> { /* ... */ }
524/// # */
525/// # struct MyMutex<T> { _marker: std::marker::PhantomData<T> }
526///
527/// impl<T> MyMutex<T> {
528/// fn new(data: T) -> Self {
529/// /* ... */
530/// todo!();
531/// }
532/// async fn lock_and_perform<U>(self, action: impl FnOnce(&mut T) -> U) -> LockResult<U> {
533/// /* ... */
534/// # todo!()
535/// }
536/// }
537///
538/// // Represents some kind of type that is unique and can't be cloned.
539/// struct NonCloneableType(u32);
540///
541/// #[tokio::main]
542/// async fn main() {
543/// let mutex = MyMutex::new(1);
544/// let data = NonCloneableType(2);
545/// let sleep = sleep(Duration::from_secs(1));
546///
547/// let fut = mutex.lock_and_perform(|n| {
548/// *n = data.0;
549/// });
550///
551/// tokio::select! {
552/// _ = fut => {
553/// /* ... */
554/// }
555/// _ = sleep => {
556/// /* ... */
557/// }
558/// }
559/// }
560/// ```
561///
562/// Then, if `sleep` fires before `fut`, the non-cloneable type is dropped without being used. This
563/// leads to cancel unsafety.
564///
565/// This is very similar to the cancel unsafety that [`futures::SinkExt::send`] has, and that this
566/// crate's [`SinkExt::reserve`](crate::SinkExt::reserve) solves.
567#[derive(Debug)]
568pub struct ActionPermit<'a, T: ?Sized> {
569 poison: &'a poison::Flag,
570 guard: MutexGuard<'a, T>,
571}
572
573impl<'a, T: ?Sized> ActionPermit<'a, T> {
574 /// Invariant: the mutex must be locked when this is called. (This is ensured by requiring a
575 /// guard).
576 #[inline]
577 fn new(guard: MutexGuard<'a, T>, poison: &'a poison::Flag) -> LockResult<Self> {
578 poison::map_result(poison.borrow(), |()| Self { poison, guard })
579 }
580
581 /// Runs a closure with access to the guarded data, consuming the permit in the process and
582 /// unlocking the mutex once the closure completes.
583 ///
584 /// This is a synchronous closure, which means that it cannot have await points within it. This
585 /// guarantees cancel safety for this mutex.
586 ///
587 /// # Notes
588 ///
589 /// `action` is *not* run inside a synchronous context. This means that operations like
590 /// [`tokio::sync::mpsc::Sender::blocking_send`] will panic inside `action`.
591 ///
592 /// If `action` panics, the mutex is marked poisoned.
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// use cancel_safe_futures::sync::RobustMutex;
598 ///
599 /// #[tokio::main]
600 /// async fn main() {
601 /// let mutex = RobustMutex::new(1);
602 ///
603 /// let permit = mutex.lock().await.unwrap();
604 /// permit.perform(|n| *n = 2);
605 /// }
606 /// ```
607 pub fn perform<R, F>(mut self, action: F) -> R
608 where
609 F: FnOnce(&mut T) -> R,
610 {
611 let poison_guard = self.poison.guard_assuming_no_poison();
612 let _poisoner = Poisoner {
613 poison: self.poison,
614 poison_guard,
615 };
616
617 action(&mut *self.guard)
618
619 // Note: we're relying on the Drop impl for `_poisoner` to unlock the mutex.
620 }
621
622 /// Runs an asynchronous block in the context of the guarded data, consuming the permit in the
623 /// process and unlocking the mutex once the block completes.
624 ///
625 /// In general, holding asynchronous locks across await points can lead to surprising
626 /// performance issues. It is strongly recommended that [`perform`](Self::perform) is used, or
627 /// that the code is rewritten to use message passing.
628 ///
629 /// # Notes
630 ///
631 /// The mutex is marked poisoned if any of the following occur:
632 ///
633 /// * The future returned by `action` panics.
634 /// * The future returned by this async function is cancelled before being driven to completion.
635 ///
636 /// Due to [limitations in stable
637 /// Rust](https://kevincox.ca/2022/04/16/rust-generic-closure-lifetimes), this accepts a dynamic
638 /// [`BoxFuture`] rather than a generic future. Once [async
639 /// closures](https://rust-lang.github.io/async-fundamentals-initiative/roadmap/async_closures.html)
640 /// are stabilized, this will switch to them.
641 ///
642 /// # Examples
643 ///
644 /// ```
645 /// use cancel_safe_futures::sync::RobustMutex;
646 /// use futures::FutureExt; // for FutureExt::boxed()
647 /// use std::time::Duration;
648 ///
649 /// #[tokio::main]
650 /// async fn main() {
651 /// let mutex = RobustMutex::new(1);
652 ///
653 /// let permit = mutex.lock().await.unwrap();
654 /// permit.perform_async_boxed(|n| {
655 /// async move {
656 /// tokio::time::sleep(
657 /// std::time::Duration::from_millis(100),
658 /// ).await;
659 /// *n = 2;
660 /// }
661 /// .boxed()
662 /// }).await;
663 ///
664 /// // Check that the new value of the mutex is 2.
665 /// let permit = mutex.lock().await.unwrap();
666 /// permit.perform(|n| assert_eq!(*n, 2));
667 /// }
668 /// ```
669 pub async fn perform_async_boxed<R, F>(mut self, action: F) -> R
670 where
671 F: for<'lock> FnOnce(&'lock mut T) -> BoxFuture<'lock, R>,
672 {
673 let poison_guard = self.poison.guard_assuming_no_poison();
674 let mut poisoner = AsyncPoisoner {
675 poison: self.poison,
676 poison_guard,
677 terminated: false,
678 };
679 // At this point, the future can:
680 // * panic, in which case both the panic and (since the future isn't complete) cancel poison
681 // flags are set.
682 // * be dropped without being driven to completion, in which case the cancel poison flag is
683 // set.
684 let ret = action(&mut *self.guard).await;
685
686 // At this point, the future has completed.
687 poisoner.terminated = true;
688 ret
689 }
690
691 /// Runs a non-`Send` asynchronous block in the context of the guarded data, consuming the
692 /// permit in the process and unlocking the mutex once the block completes.
693 ///
694 /// This is a variant of [`perform_async_boxed`](Self::perform_async_boxed) that allows the
695 /// future to be non-`Send`.
696 pub async fn perform_async_boxed_local<R, F>(mut self, action: F) -> R
697 where
698 F: for<'lock> FnOnce(&'lock mut T) -> LocalBoxFuture<'lock, R>,
699 {
700 let poison_guard = self.poison.guard_assuming_no_poison();
701 let mut poisoner = AsyncPoisoner {
702 poison: self.poison,
703 poison_guard,
704 terminated: false,
705 };
706 // At this point, the future can:
707 // * panic, in which case both the panic and (since the future isn't complete) cancel poison
708 // flags are set.
709 // * be dropped without being driven to completion, in which case the cancel poison flag is
710 // set.
711 let ret = action(&mut *self.guard).await;
712
713 // At this point, the future has completed.
714 poisoner.terminated = true;
715 ret
716 }
717}
718
719#[clippy::has_significant_drop]
720struct Poisoner<'a> {
721 poison: &'a poison::Flag,
722 poison_guard: poison::Guard,
723}
724
725impl<'a> Drop for Poisoner<'a> {
726 #[inline]
727 fn drop(&mut self) {
728 self.poison.done(&self.poison_guard, false);
729 }
730}
731
732#[clippy::has_significant_drop]
733struct AsyncPoisoner<'a> {
734 poison: &'a poison::Flag,
735 poison_guard: poison::Guard,
736 terminated: bool,
737}
738
739impl<'a> Drop for AsyncPoisoner<'a> {
740 #[inline]
741 fn drop(&mut self) {
742 self.poison.done(&self.poison_guard, !self.terminated);
743 }
744}