tokio/task/local.rs
1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4use crate::runtime;
5use crate::runtime::task::{
6 self, JoinHandle, LocalOwnedTasks, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
9use crate::sync::AtomicWaker;
10use crate::util::trace::SpawnMeta;
11use crate::util::RcCell;
12
13use std::cell::Cell;
14use std::collections::VecDeque;
15use std::fmt;
16use std::future::Future;
17use std::marker::PhantomData;
18use std::mem;
19use std::pin::Pin;
20use std::rc::Rc;
21use std::task::Poll;
22
23use pin_project_lite::pin_project;
24
25cfg_rt! {
26 /// A set of tasks which are executed on the same thread.
27 ///
28 /// In some cases, it is necessary to run one or more futures that do not
29 /// implement [`Send`] and thus are unsafe to send between threads. In these
30 /// cases, a [local task set] may be used to schedule one or more `!Send`
31 /// futures to run together on the same thread.
32 ///
33 /// For example, the following code will not compile:
34 ///
35 /// ```rust,compile_fail
36 /// use std::rc::Rc;
37 ///
38 /// #[tokio::main]
39 /// async fn main() {
40 /// // `Rc` does not implement `Send`, and thus may not be sent between
41 /// // threads safely.
42 /// let nonsend_data = Rc::new("my nonsend data...");
43 ///
44 /// let nonsend_data = nonsend_data.clone();
45 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
46 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
47 /// // will not compile.
48 /// tokio::spawn(async move {
49 /// println!("{}", nonsend_data);
50 /// // ...
51 /// }).await.unwrap();
52 /// }
53 /// ```
54 ///
55 /// # Use with `run_until`
56 ///
57 /// To spawn `!Send` futures, we can use a local task set to schedule them
58 /// on the thread calling [`Runtime::block_on`]. When running inside of the
59 /// local task set, we can use [`task::spawn_local`], which can spawn
60 /// `!Send` futures. For example:
61 ///
62 /// ```rust
63 /// use std::rc::Rc;
64 /// use tokio::task;
65 ///
66 /// # #[tokio::main(flavor = "current_thread")]
67 /// # async fn main() {
68 /// let nonsend_data = Rc::new("my nonsend data...");
69 ///
70 /// // Construct a local task set that can run `!Send` futures.
71 /// let local = task::LocalSet::new();
72 ///
73 /// // Run the local task set.
74 /// local.run_until(async move {
75 /// let nonsend_data = nonsend_data.clone();
76 /// // `spawn_local` ensures that the future is spawned on the local
77 /// // task set.
78 /// task::spawn_local(async move {
79 /// println!("{}", nonsend_data);
80 /// // ...
81 /// }).await.unwrap();
82 /// }).await;
83 /// # }
84 /// ```
85 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
86 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
87 /// cannot be used inside a task spawned with `tokio::spawn`.
88 ///
89 /// ## Awaiting a `LocalSet`
90 ///
91 /// Additionally, a `LocalSet` itself implements `Future`, completing when
92 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
93 /// several futures on a `LocalSet` and drive the whole set until they
94 /// complete. For example,
95 ///
96 /// ```rust
97 /// use tokio::{task, time};
98 /// use std::rc::Rc;
99 ///
100 /// # #[tokio::main(flavor = "current_thread")]
101 /// # async fn main() {
102 /// let nonsend_data = Rc::new("world");
103 /// let local = task::LocalSet::new();
104 ///
105 /// let nonsend_data2 = nonsend_data.clone();
106 /// local.spawn_local(async move {
107 /// // ...
108 /// println!("hello {}", nonsend_data2)
109 /// });
110 ///
111 /// local.spawn_local(async move {
112 /// time::sleep(time::Duration::from_millis(100)).await;
113 /// println!("goodbye {}", nonsend_data)
114 /// });
115 ///
116 /// // ...
117 ///
118 /// local.await;
119 /// # }
120 /// ```
121 /// **Note:** Awaiting a `LocalSet` can only be done inside
122 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
123 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
124 /// `tokio::spawn`.
125 ///
126 /// ## Use inside `tokio::spawn`
127 ///
128 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
129 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
130 /// something else. The solution is to create the `LocalSet` somewhere else,
131 /// and communicate with it using an [`mpsc`] channel.
132 ///
133 /// The following example puts the `LocalSet` inside a new thread.
134 /// ```
135 /// # #[cfg(not(target_family = "wasm"))]
136 /// # {
137 /// use tokio::runtime::Builder;
138 /// use tokio::sync::{mpsc, oneshot};
139 /// use tokio::task::LocalSet;
140 ///
141 /// // This struct describes the task you want to spawn. Here we include
142 /// // some simple examples. The oneshot channel allows sending a response
143 /// // to the spawner.
144 /// #[derive(Debug)]
145 /// enum Task {
146 /// PrintNumber(u32),
147 /// AddOne(u32, oneshot::Sender<u32>),
148 /// }
149 ///
150 /// #[derive(Clone)]
151 /// struct LocalSpawner {
152 /// send: mpsc::UnboundedSender<Task>,
153 /// }
154 ///
155 /// impl LocalSpawner {
156 /// pub fn new() -> Self {
157 /// let (send, mut recv) = mpsc::unbounded_channel();
158 ///
159 /// let rt = Builder::new_current_thread()
160 /// .enable_all()
161 /// .build()
162 /// .unwrap();
163 ///
164 /// std::thread::spawn(move || {
165 /// let local = LocalSet::new();
166 ///
167 /// local.spawn_local(async move {
168 /// while let Some(new_task) = recv.recv().await {
169 /// tokio::task::spawn_local(run_task(new_task));
170 /// }
171 /// // If the while loop returns, then all the LocalSpawner
172 /// // objects have been dropped.
173 /// });
174 ///
175 /// // This will return once all senders are dropped and all
176 /// // spawned tasks have returned.
177 /// rt.block_on(local);
178 /// });
179 ///
180 /// Self {
181 /// send,
182 /// }
183 /// }
184 ///
185 /// pub fn spawn(&self, task: Task) {
186 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
187 /// }
188 /// }
189 ///
190 /// // This task may do !Send stuff. We use printing a number as an example,
191 /// // but it could be anything.
192 /// //
193 /// // The Task struct is an enum to support spawning many different kinds
194 /// // of operations.
195 /// async fn run_task(task: Task) {
196 /// match task {
197 /// Task::PrintNumber(n) => {
198 /// println!("{}", n);
199 /// },
200 /// Task::AddOne(n, response) => {
201 /// // We ignore failures to send the response.
202 /// let _ = response.send(n + 1);
203 /// },
204 /// }
205 /// }
206 ///
207 /// #[tokio::main]
208 /// async fn main() {
209 /// let spawner = LocalSpawner::new();
210 ///
211 /// let (send, response) = oneshot::channel();
212 /// spawner.spawn(Task::AddOne(10, send));
213 /// let eleven = response.await.unwrap();
214 /// assert_eq!(eleven, 11);
215 /// }
216 /// # }
217 /// ```
218 ///
219 /// [`Send`]: trait@std::marker::Send
220 /// [local task set]: struct@LocalSet
221 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
222 /// [`task::spawn_local`]: fn@spawn_local
223 /// [`mpsc`]: mod@crate::sync::mpsc
224 pub struct LocalSet {
225 /// Current scheduler tick.
226 tick: Cell<u8>,
227
228 /// State available from thread-local.
229 context: Rc<Context>,
230
231 /// This type should not be Send.
232 _not_send: PhantomData<*const ()>,
233 }
234}
235
236/// State available from the thread-local.
237struct Context {
238 /// State shared between threads.
239 shared: Arc<Shared>,
240
241 /// True if a task panicked without being handled and the local set is
242 /// configured to shutdown on unhandled panic.
243 unhandled_panic: Cell<bool>,
244}
245
246/// `LocalSet` state shared between threads.
247struct Shared {
248 /// # Safety
249 ///
250 /// This field must *only* be accessed from the thread that owns the
251 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
252 local_state: LocalState,
253
254 /// Remote run queue sender.
255 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
256
257 /// Wake the `LocalSet` task.
258 waker: AtomicWaker,
259
260 /// How to respond to unhandled task panics.
261 #[cfg(tokio_unstable)]
262 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
263}
264
265/// Tracks the `LocalSet` state that must only be accessed from the thread that
266/// created the `LocalSet`.
267struct LocalState {
268 /// The `ThreadId` of the thread that owns the `LocalSet`.
269 owner: ThreadId,
270
271 /// Local run queue sender and receiver.
272 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
273
274 /// Collection of all active tasks spawned onto this executor.
275 owned: LocalOwnedTasks<Arc<Shared>>,
276}
277
278pin_project! {
279 #[derive(Debug)]
280 struct RunUntil<'a, F> {
281 local_set: &'a LocalSet,
282 #[pin]
283 future: F,
284 }
285}
286
287tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
288 ctx: RcCell::new(),
289 wake_on_schedule: Cell::new(false),
290} });
291
292struct LocalData {
293 ctx: RcCell<Context>,
294 wake_on_schedule: Cell<bool>,
295}
296
297impl LocalData {
298 /// Should be called except when we call `LocalSet::enter`.
299 /// Especially when we poll a `LocalSet`.
300 #[must_use = "dropping this guard will reset the entered state"]
301 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
302 let ctx = self.ctx.replace(Some(ctx));
303 let wake_on_schedule = self.wake_on_schedule.replace(false);
304 LocalDataEnterGuard {
305 local_data_ref: self,
306 ctx,
307 wake_on_schedule,
308 }
309 }
310}
311
312/// A guard for `LocalData::enter()`
313struct LocalDataEnterGuard<'a> {
314 local_data_ref: &'a LocalData,
315 ctx: Option<Rc<Context>>,
316 wake_on_schedule: bool,
317}
318
319impl<'a> Drop for LocalDataEnterGuard<'a> {
320 fn drop(&mut self) {
321 self.local_data_ref.ctx.set(self.ctx.take());
322 self.local_data_ref
323 .wake_on_schedule
324 .set(self.wake_on_schedule)
325 }
326}
327
328cfg_rt! {
329 /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
330 ///
331 /// This is possible when either using one of these types explicitly, or by
332 /// opting to use the `"local"` runtime flavor in `tokio::main`:
333 ///
334 /// ```ignore
335 /// #[tokio::main(flavor = "local")]
336 /// ```
337 ///
338 /// The spawned future will run on the same thread that called `spawn_local`.
339 ///
340 /// The provided future will start running in the background immediately
341 /// when `spawn_local` is called, even if you don't await the returned
342 /// `JoinHandle`.
343 ///
344 /// # Panics
345 ///
346 /// This function panics if called outside of a [`LocalSet`] or [`LocalRuntime`].
347 ///
348 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
349 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
350 /// `spawn_local` if you want to stay within the `LocalSet`.
351 ///
352 /// # Examples
353 ///
354 /// With `LocalSet`:
355 ///
356 /// ```rust
357 /// use std::rc::Rc;
358 /// use tokio::task;
359 ///
360 /// # #[tokio::main(flavor = "current_thread")]
361 /// # async fn main() {
362 /// let nonsend_data = Rc::new("my nonsend data...");
363 ///
364 /// let local = task::LocalSet::new();
365 ///
366 /// // Run the local task set.
367 /// local.run_until(async move {
368 /// let nonsend_data = nonsend_data.clone();
369 /// task::spawn_local(async move {
370 /// println!("{}", nonsend_data);
371 /// // ...
372 /// }).await.unwrap();
373 /// }).await;
374 /// # }
375 /// ```
376 /// With local runtime flavor.
377 ///
378 /// ```rust
379 /// #[tokio::main(flavor = "local")]
380 /// async fn main() {
381 /// let join = tokio::task::spawn_local(async {
382 /// println!("my nonsend data...")
383 /// });
384 ///
385 /// join.await.unwrap()
386 /// }
387 ///
388 /// ```
389 ///
390 /// [`LocalSet`]: struct@crate::task::LocalSet
391 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
392 /// [`tokio::spawn`]: fn@crate::task::spawn
393 /// [unstable]: ../../tokio/index.html#unstable-features
394 #[track_caller]
395 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
396 where
397 F: Future + 'static,
398 F::Output: 'static,
399 {
400 let fut_size = std::mem::size_of::<F>();
401 if fut_size > BOX_FUTURE_THRESHOLD {
402 spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
403 } else {
404 spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
405 }
406 }
407
408
409 #[track_caller]
410 pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
411 where F: Future + 'static,
412 F::Output: 'static
413 {
414 use crate::runtime::{context, task};
415
416 let mut future = Some(future);
417
418 let res = context::with_current(|handle| {
419 Some(if handle.is_local() {
420 if !handle.can_spawn_local_on_local_runtime() {
421 return None;
422 }
423
424 let future = future.take().unwrap();
425
426 #[cfg(all(
427 tokio_unstable,
428 feature = "taskdump",
429 feature = "rt",
430 target_os = "linux",
431 any(
432 target_arch = "aarch64",
433 target_arch = "x86",
434 target_arch = "x86_64"
435 )
436 ))]
437 let future = task::trace::Trace::root(future);
438 let id = task::Id::next();
439 let task = crate::util::trace::task(future, "task", meta, id.as_u64());
440
441 // safety: we have verified that this is a `LocalRuntime` owned by the current thread
442 unsafe { handle.spawn_local(task, id, meta.spawned_at) }
443 } else {
444 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
445 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
446 Some(cx) => cx.spawn(future.take().unwrap(), meta)
447 }
448 })
449 });
450
451 match res {
452 Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
453 Ok(Some(join_handle)) => join_handle,
454 Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
455 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
456 Some(cx) => cx.spawn(future.unwrap(), meta)
457 }
458 }
459 }
460}
461
462/// Initial queue capacity.
463const INITIAL_CAPACITY: usize = 64;
464
465/// Max number of tasks to poll per tick.
466const MAX_TASKS_PER_TICK: usize = 61;
467
468/// How often it check the remote queue first.
469const REMOTE_FIRST_INTERVAL: u8 = 31;
470
471/// Context guard for `LocalSet`
472pub struct LocalEnterGuard {
473 ctx: Option<Rc<Context>>,
474
475 /// Distinguishes whether the context was entered or being polled.
476 /// When we enter it, the value `wake_on_schedule` is set. In this case
477 /// `spawn_local` refers the context, whereas it is not being polled now.
478 wake_on_schedule: bool,
479}
480
481impl Drop for LocalEnterGuard {
482 fn drop(&mut self) {
483 CURRENT.with(
484 |LocalData {
485 ctx,
486 wake_on_schedule,
487 }| {
488 ctx.set(self.ctx.take());
489 wake_on_schedule.set(self.wake_on_schedule);
490 },
491 );
492 }
493}
494
495impl fmt::Debug for LocalEnterGuard {
496 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497 f.debug_struct("LocalEnterGuard").finish()
498 }
499}
500
501impl LocalSet {
502 /// Returns a new local task set.
503 pub fn new() -> LocalSet {
504 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
505
506 LocalSet {
507 tick: Cell::new(0),
508 context: Rc::new(Context {
509 shared: Arc::new(Shared {
510 local_state: LocalState {
511 owner,
512 owned: LocalOwnedTasks::new(),
513 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
514 },
515 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
516 waker: AtomicWaker::new(),
517 #[cfg(tokio_unstable)]
518 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
519 }),
520 unhandled_panic: Cell::new(false),
521 }),
522 _not_send: PhantomData,
523 }
524 }
525
526 /// Enters the context of this `LocalSet`.
527 ///
528 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
529 /// context you are inside.
530 ///
531 /// [`spawn_local`]: fn@crate::task::spawn_local
532 pub fn enter(&self) -> LocalEnterGuard {
533 CURRENT.with(
534 |LocalData {
535 ctx,
536 wake_on_schedule,
537 ..
538 }| {
539 let ctx = ctx.replace(Some(self.context.clone()));
540 let wake_on_schedule = wake_on_schedule.replace(true);
541 LocalEnterGuard {
542 ctx,
543 wake_on_schedule,
544 }
545 },
546 )
547 }
548
549 /// Spawns a `!Send` task onto the local task set.
550 ///
551 /// This task is guaranteed to be run on the current thread.
552 ///
553 /// Unlike the free function [`spawn_local`], this method may be used to
554 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
555 /// future will start running once the `LocalSet` is next started, even if
556 /// you don't await the returned `JoinHandle`.
557 ///
558 /// # Examples
559 ///
560 /// ```rust
561 /// use tokio::task;
562 ///
563 /// # #[tokio::main(flavor = "current_thread")]
564 /// # async fn main() {
565 /// let local = task::LocalSet::new();
566 ///
567 /// // Spawn a future on the local set. This future will be run when
568 /// // we call `run_until` to drive the task set.
569 /// local.spawn_local(async {
570 /// // ...
571 /// });
572 ///
573 /// // Run the local task set.
574 /// local.run_until(async move {
575 /// // ...
576 /// }).await;
577 ///
578 /// // When `run` finishes, we can spawn _more_ futures, which will
579 /// // run in subsequent calls to `run_until`.
580 /// local.spawn_local(async {
581 /// // ...
582 /// });
583 ///
584 /// local.run_until(async move {
585 /// // ...
586 /// }).await;
587 /// # }
588 /// ```
589 /// [`spawn_local`]: fn@spawn_local
590 #[track_caller]
591 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
592 where
593 F: Future + 'static,
594 F::Output: 'static,
595 {
596 let fut_size = mem::size_of::<F>();
597 if fut_size > BOX_FUTURE_THRESHOLD {
598 self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
599 } else {
600 self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
601 }
602 }
603
604 /// Runs a future to completion on the provided runtime, driving any local
605 /// futures spawned on this task set on the current thread.
606 ///
607 /// This runs the given future on the runtime, blocking until it is
608 /// complete, and yielding its resolved result. Any tasks or timers which
609 /// the future spawns internally will be executed on the runtime. The future
610 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
611 /// current thread.
612 ///
613 /// This method should not be called from an asynchronous context.
614 ///
615 /// # Panics
616 ///
617 /// This function panics if the executor is at capacity, if the provided
618 /// future panics, or if called within an asynchronous execution context.
619 ///
620 /// # Notes
621 ///
622 /// Since this function internally calls [`Runtime::block_on`], and drives
623 /// futures in the local task set inside that call to `block_on`, the local
624 /// futures may not use [in-place blocking]. If a blocking call needs to be
625 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
626 ///
627 /// For example, this will panic:
628 /// ```should_panic,ignore-wasm
629 /// use tokio::runtime::Runtime;
630 /// use tokio::task;
631 ///
632 /// let rt = Runtime::new().unwrap();
633 /// let local = task::LocalSet::new();
634 /// local.block_on(&rt, async {
635 /// let join = task::spawn_local(async {
636 /// let blocking_result = task::block_in_place(|| {
637 /// // ...
638 /// });
639 /// // ...
640 /// });
641 /// join.await.unwrap();
642 /// })
643 /// ```
644 /// This, however, will not panic:
645 /// ```
646 /// # #[cfg(not(target_family = "wasm"))]
647 /// # {
648 /// use tokio::runtime::Runtime;
649 /// use tokio::task;
650 ///
651 /// let rt = Runtime::new().unwrap();
652 /// let local = task::LocalSet::new();
653 /// local.block_on(&rt, async {
654 /// let join = task::spawn_local(async {
655 /// let blocking_result = task::spawn_blocking(|| {
656 /// // ...
657 /// }).await;
658 /// // ...
659 /// });
660 /// join.await.unwrap();
661 /// })
662 /// # }
663 /// ```
664 ///
665 /// [`spawn_local`]: fn@spawn_local
666 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
667 /// [in-place blocking]: fn@crate::task::block_in_place
668 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
669 #[track_caller]
670 #[cfg(feature = "rt")]
671 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
672 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
673 where
674 F: Future,
675 {
676 rt.block_on(self.run_until(future))
677 }
678
679 /// Runs a future to completion on the local set, returning its output.
680 ///
681 /// This returns a future that runs the given future with a local set,
682 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
683 /// Any local futures spawned on the local set will be driven in the
684 /// background until the future passed to `run_until` completes. When the future
685 /// passed to `run_until` finishes, any local futures which have not completed
686 /// will remain on the local set, and will be driven on subsequent calls to
687 /// `run_until` or when [awaiting the local set] itself.
688 ///
689 /// # Cancel safety
690 ///
691 /// This method is cancel safe when `future` is cancel safe.
692 ///
693 /// # Examples
694 ///
695 /// ```rust
696 /// use tokio::task;
697 ///
698 /// # #[tokio::main(flavor = "current_thread")]
699 /// # async fn main() {
700 /// task::LocalSet::new().run_until(async {
701 /// task::spawn_local(async move {
702 /// // ...
703 /// }).await.unwrap();
704 /// // ...
705 /// }).await;
706 /// # }
707 /// ```
708 ///
709 /// [`spawn_local`]: fn@spawn_local
710 /// [awaiting the local set]: #awaiting-a-localset
711 pub async fn run_until<F>(&self, future: F) -> F::Output
712 where
713 F: Future,
714 {
715 let run_until = RunUntil {
716 future,
717 local_set: self,
718 };
719 run_until.await
720 }
721
722 #[track_caller]
723 pub(in crate::task) fn spawn_named<F>(
724 &self,
725 future: F,
726 meta: SpawnMeta<'_>,
727 ) -> JoinHandle<F::Output>
728 where
729 F: Future + 'static,
730 F::Output: 'static,
731 {
732 self.spawn_named_inner(future, meta)
733 }
734
735 #[track_caller]
736 fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
737 where
738 F: Future + 'static,
739 F::Output: 'static,
740 {
741 let handle = self.context.spawn(future, meta);
742
743 // Because a task was spawned from *outside* the `LocalSet`, wake the
744 // `LocalSet` future to execute the new task, if it hasn't been woken.
745 //
746 // Spawning via the free fn `spawn` does not require this, as it can
747 // only be called from *within* a future executing on the `LocalSet` —
748 // in that case, the `LocalSet` must already be awake.
749 self.context.shared.waker.wake();
750 handle
751 }
752
753 /// Ticks the scheduler, returning whether the local future needs to be
754 /// notified again.
755 fn tick(&self) -> bool {
756 for _ in 0..MAX_TASKS_PER_TICK {
757 // Make sure we didn't hit an unhandled panic
758 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
759
760 match self.next_task() {
761 // Run the task
762 //
763 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
764 // used. We are responsible for maintaining the invariant that
765 // `run_unchecked` is only called on threads that spawned the
766 // task initially. Because `LocalSet` itself is `!Send`, and
767 // `spawn_local` spawns into the `LocalSet` on the current
768 // thread, the invariant is maintained.
769 Some(task) => crate::task::coop::budget(|| task.run()),
770 // We have fully drained the queue of notified tasks, so the
771 // local future doesn't need to be notified again — it can wait
772 // until something else wakes a task in the local set.
773 None => return false,
774 }
775 }
776
777 true
778 }
779
780 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
781 let tick = self.tick.get();
782 self.tick.set(tick.wrapping_add(1));
783
784 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
785 self.context
786 .shared
787 .queue
788 .lock()
789 .as_mut()
790 .and_then(|queue| queue.pop_front())
791 .or_else(|| self.pop_local())
792 } else {
793 self.pop_local().or_else(|| {
794 self.context
795 .shared
796 .queue
797 .lock()
798 .as_mut()
799 .and_then(VecDeque::pop_front)
800 })
801 };
802
803 task.map(|task| unsafe {
804 // Safety: because the `LocalSet` itself is `!Send`, we know we are
805 // on the same thread if we have access to the `LocalSet`, and can
806 // therefore access the local run queue.
807 self.context.shared.local_state.assert_owner(task)
808 })
809 }
810
811 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
812 unsafe {
813 // Safety: because the `LocalSet` itself is `!Send`, we know we are
814 // on the same thread if we have access to the `LocalSet`, and can
815 // therefore access the local run queue.
816 self.context.shared.local_state.task_pop_front()
817 }
818 }
819
820 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
821 CURRENT.with(|local_data| {
822 let _guard = local_data.enter(self.context.clone());
823 f()
824 })
825 }
826
827 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
828 /// fails.
829 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
830 let mut f = Some(f);
831
832 let res = CURRENT.try_with(|local_data| {
833 let _guard = local_data.enter(self.context.clone());
834 (f.take().unwrap())()
835 });
836
837 match res {
838 Ok(res) => res,
839 Err(_access_error) => (f.take().unwrap())(),
840 }
841 }
842
843 /// Returns the [`Id`] of the current [`LocalSet`] runtime.
844 ///
845 /// # Examples
846 ///
847 /// ```rust
848 /// use tokio::task;
849 ///
850 /// # #[tokio::main(flavor = "current_thread")]
851 /// # async fn main() {
852 /// let local_set = task::LocalSet::new();
853 /// println!("Local set id: {}", local_set.id());
854 /// # }
855 /// ```
856 ///
857 /// [`Id`]: struct@crate::runtime::Id
858 pub fn id(&self) -> runtime::Id {
859 runtime::Id::new(self.context.shared.local_state.owned.id)
860 }
861}
862
863cfg_unstable! {
864 impl LocalSet {
865 /// Configure how the `LocalSet` responds to an unhandled panic on a
866 /// spawned task.
867 ///
868 /// By default, an unhandled panic (i.e. a panic not caught by
869 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
870 /// execution. The panic is error value is forwarded to the task's
871 /// [`JoinHandle`] and all other spawned tasks continue running.
872 ///
873 /// The `unhandled_panic` option enables configuring this behavior.
874 ///
875 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
876 /// spawned tasks have no impact on the `LocalSet`'s execution.
877 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
878 /// shutdown immediately when a spawned task panics even if that
879 /// task's `JoinHandle` has not been dropped. All other spawned tasks
880 /// will immediately terminate and further calls to
881 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
882 ///
883 /// # Panics
884 ///
885 /// This method panics if called after the `LocalSet` has started
886 /// running.
887 ///
888 /// # Unstable
889 ///
890 /// This option is currently unstable and its implementation is
891 /// incomplete. The API may change or be removed in the future. See
892 /// tokio-rs/tokio#4516 for more details.
893 ///
894 /// # Examples
895 ///
896 /// The following demonstrates a `LocalSet` configured to shutdown on
897 /// panic. The first spawned task panics and results in the `LocalSet`
898 /// shutting down. The second spawned task never has a chance to
899 /// execute. The call to `run_until` will panic due to the runtime being
900 /// forcibly shutdown.
901 ///
902 /// ```should_panic
903 /// use tokio::runtime::UnhandledPanic;
904 ///
905 /// # #[tokio::main(flavor = "current_thread")]
906 /// # async fn main() {
907 /// tokio::task::LocalSet::new()
908 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
909 /// .run_until(async {
910 /// tokio::task::spawn_local(async { panic!("boom"); });
911 /// tokio::task::spawn_local(async {
912 /// // This task never completes
913 /// });
914 ///
915 /// // Do some work, but `run_until` will panic before it completes
916 /// # loop { tokio::task::yield_now().await; }
917 /// })
918 /// .await;
919 /// # }
920 /// ```
921 ///
922 /// [`JoinHandle`]: struct@crate::task::JoinHandle
923 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
924 // TODO: This should be set as a builder
925 Rc::get_mut(&mut self.context)
926 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
927 .expect("Unhandled Panic behavior modified after starting LocalSet")
928 .unhandled_panic = behavior;
929 self
930 }
931 }
932}
933
934impl fmt::Debug for LocalSet {
935 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
936 fmt.debug_struct("LocalSet").finish()
937 }
938}
939
940impl Future for LocalSet {
941 type Output = ();
942
943 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
944 let _no_blocking = crate::runtime::context::disallow_block_in_place();
945
946 // Register the waker before starting to work
947 self.context.shared.waker.register_by_ref(cx.waker());
948
949 if self.with(|| self.tick()) {
950 // If `tick` returns true, we need to notify the local future again:
951 // there are still tasks remaining in the run queue.
952 cx.waker().wake_by_ref();
953 Poll::Pending
954
955 // Safety: called from the thread that owns `LocalSet`. Because
956 // `LocalSet` is `!Send`, this is safe.
957 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
958 // If the scheduler has no remaining futures, we're done!
959 Poll::Ready(())
960 } else {
961 // There are still futures in the local set, but we've polled all the
962 // futures in the run queue. Therefore, we can just return Pending
963 // since the remaining futures will be woken from somewhere else.
964 Poll::Pending
965 }
966 }
967}
968
969impl Default for LocalSet {
970 fn default() -> LocalSet {
971 LocalSet::new()
972 }
973}
974
975impl Drop for LocalSet {
976 fn drop(&mut self) {
977 self.with_if_possible(|| {
978 let _no_blocking = crate::runtime::context::disallow_block_in_place();
979
980 // Shut down all tasks in the LocalOwnedTasks and close it to
981 // prevent new tasks from ever being added.
982 unsafe {
983 // Safety: called from the thread that owns `LocalSet`
984 self.context.shared.local_state.close_and_shutdown_all();
985 }
986
987 // We already called shutdown on all tasks above, so there is no
988 // need to call shutdown.
989
990 // Safety: note that this *intentionally* bypasses the unsafe
991 // `Shared::local_queue()` method. This is in order to avoid the
992 // debug assertion that we are on the thread that owns the
993 // `LocalSet`, because on some systems (e.g. at least some macOS
994 // versions), attempting to get the current thread ID can panic due
995 // to the thread's local data that stores the thread ID being
996 // dropped *before* the `LocalSet`.
997 //
998 // Despite avoiding the assertion here, it is safe for us to access
999 // the local queue in `Drop`, because the `LocalSet` itself is
1000 // `!Send`, so we can reasonably guarantee that it will not be
1001 // `Drop`ped from another thread.
1002 let local_queue = unsafe {
1003 // Safety: called from the thread that owns `LocalSet`
1004 self.context.shared.local_state.take_local_queue()
1005 };
1006 for task in local_queue {
1007 drop(task);
1008 }
1009
1010 // Take the queue from the Shared object to prevent pushing
1011 // notifications to it in the future.
1012 let queue = self.context.shared.queue.lock().take().unwrap();
1013 for task in queue {
1014 drop(task);
1015 }
1016
1017 // Safety: called from the thread that owns `LocalSet`
1018 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
1019 });
1020 }
1021}
1022
1023// === impl Context ===
1024
1025impl Context {
1026 #[track_caller]
1027 fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
1028 where
1029 F: Future + 'static,
1030 F::Output: 'static,
1031 {
1032 let id = crate::runtime::task::Id::next();
1033 let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1034
1035 // Safety: called from the thread that owns the `LocalSet`
1036 let (handle, notified) = {
1037 self.shared.local_state.assert_called_from_owner_thread();
1038 self.shared.local_state.owned.bind(
1039 future,
1040 self.shared.clone(),
1041 id,
1042 SpawnLocation::capture(),
1043 )
1044 };
1045
1046 if let Some(notified) = notified {
1047 self.shared.schedule(notified);
1048 }
1049
1050 handle
1051 }
1052}
1053
1054// === impl LocalFuture ===
1055
1056impl<T: Future> Future for RunUntil<'_, T> {
1057 type Output = T::Output;
1058
1059 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1060 let me = self.project();
1061
1062 me.local_set.with(|| {
1063 me.local_set
1064 .context
1065 .shared
1066 .waker
1067 .register_by_ref(cx.waker());
1068
1069 let _no_blocking = crate::runtime::context::disallow_block_in_place();
1070 let f = me.future;
1071
1072 if let Poll::Ready(output) = f.poll(cx) {
1073 return Poll::Ready(output);
1074 }
1075
1076 if me.local_set.tick() {
1077 // If `tick` returns `true`, we need to notify the local future again:
1078 // there are still tasks remaining in the run queue.
1079 cx.waker().wake_by_ref();
1080 }
1081
1082 Poll::Pending
1083 })
1084 }
1085}
1086
1087impl Shared {
1088 /// Schedule the provided task on the scheduler.
1089 fn schedule(&self, task: task::Notified<Arc<Self>>) {
1090 CURRENT.with(|localdata| {
1091 match localdata.ctx.get() {
1092 // If the current `LocalSet` is being polled, we don't need to wake it.
1093 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1094 // In this case it is not being polled, so we need to wake it.
1095 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1096 // Safety: if the current `LocalSet` context points to this
1097 // `LocalSet`, then we are on the thread that owns it.
1098 cx.shared.local_state.task_push_back(task);
1099 },
1100
1101 // We are on the thread that owns the `LocalSet`, so we can
1102 // wake to the local queue.
1103 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1104 unsafe {
1105 // Safety: we just checked that the thread ID matches
1106 // the localset's owner, so this is safe.
1107 self.local_state.task_push_back(task);
1108 }
1109 // We still have to wake the `LocalSet`, because it isn't
1110 // currently being polled.
1111 self.waker.wake();
1112 }
1113
1114 // We are *not* on the thread that owns the `LocalSet`, so we
1115 // have to wake to the remote queue.
1116 _ => {
1117 // First, check whether the queue is still there (if not, the
1118 // LocalSet is dropped). Then push to it if so, and if not,
1119 // do nothing.
1120 let mut lock = self.queue.lock();
1121
1122 if let Some(queue) = lock.as_mut() {
1123 queue.push_back(task);
1124 drop(lock);
1125 self.waker.wake();
1126 }
1127 }
1128 }
1129 });
1130 }
1131
1132 fn ptr_eq(&self, other: &Shared) -> bool {
1133 std::ptr::eq(self, other)
1134 }
1135}
1136
1137// This is safe because (and only because) we *pinky pwomise* to never touch the
1138// local run queue except from the thread that owns the `LocalSet`.
1139unsafe impl Sync for Shared {}
1140
1141impl task::Schedule for Arc<Shared> {
1142 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1143 // Safety, this is always called from the thread that owns `LocalSet`
1144 unsafe { self.local_state.task_remove(task) }
1145 }
1146
1147 fn schedule(&self, task: task::Notified<Self>) {
1148 Shared::schedule(self, task);
1149 }
1150
1151 // localset does not currently support task hooks
1152 fn hooks(&self) -> TaskHarnessScheduleHooks {
1153 TaskHarnessScheduleHooks {
1154 task_terminate_callback: None,
1155 }
1156 }
1157
1158 cfg_unstable! {
1159 fn unhandled_panic(&self) {
1160 use crate::runtime::UnhandledPanic;
1161
1162 match self.unhandled_panic {
1163 UnhandledPanic::Ignore => {
1164 // Do nothing
1165 }
1166 UnhandledPanic::ShutdownRuntime => {
1167 // This hook is only called from within the runtime, so
1168 // `CURRENT` should match with `&self`, i.e. there is no
1169 // opportunity for a nested scheduler to be called.
1170 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1171 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1172 cx.unhandled_panic.set(true);
1173 // Safety: this is always called from the thread that owns `LocalSet`
1174 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1175 }
1176 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1177 })
1178 }
1179 }
1180 }
1181 }
1182}
1183
1184impl LocalState {
1185 /// # Safety
1186 ///
1187 /// This method must only be called from the thread who
1188 /// has the same [`ThreadId`] as [`Self::owner`].
1189 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1190 // The caller ensures it is called from the same thread that owns
1191 // the LocalSet.
1192 self.assert_called_from_owner_thread();
1193
1194 self.local_queue
1195 .with_mut(|ptr| unsafe { (*ptr).pop_front() })
1196 }
1197
1198 /// # Safety
1199 ///
1200 /// This method must only be called from the thread who
1201 /// has the same [`ThreadId`] as [`Self::owner`].
1202 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1203 // The caller ensures it is called from the same thread that owns
1204 // the LocalSet.
1205 self.assert_called_from_owner_thread();
1206
1207 self.local_queue
1208 .with_mut(|ptr| unsafe { (*ptr).push_back(task) });
1209 }
1210
1211 /// # Safety
1212 ///
1213 /// This method must only be called from the thread who
1214 /// has the same [`ThreadId`] as [`Self::owner`].
1215 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1216 // The caller ensures it is called from the same thread that owns
1217 // the LocalSet.
1218 self.assert_called_from_owner_thread();
1219
1220 self.local_queue
1221 .with_mut(|ptr| std::mem::take(unsafe { &mut (*ptr) }))
1222 }
1223
1224 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1225 // The caller ensures it is called from the same thread that owns
1226 // the LocalSet.
1227 self.assert_called_from_owner_thread();
1228
1229 self.owned.remove(task)
1230 }
1231
1232 /// Returns true if the `LocalSet` does not have any spawned tasks
1233 unsafe fn owned_is_empty(&self) -> bool {
1234 // The caller ensures it is called from the same thread that owns
1235 // the LocalSet.
1236 self.assert_called_from_owner_thread();
1237
1238 self.owned.is_empty()
1239 }
1240
1241 unsafe fn assert_owner(
1242 &self,
1243 task: task::Notified<Arc<Shared>>,
1244 ) -> task::LocalNotified<Arc<Shared>> {
1245 // The caller ensures it is called from the same thread that owns
1246 // the LocalSet.
1247 self.assert_called_from_owner_thread();
1248
1249 self.owned.assert_owner(task)
1250 }
1251
1252 unsafe fn close_and_shutdown_all(&self) {
1253 // The caller ensures it is called from the same thread that owns
1254 // the LocalSet.
1255 self.assert_called_from_owner_thread();
1256
1257 self.owned.close_and_shutdown_all();
1258 }
1259
1260 #[track_caller]
1261 fn assert_called_from_owner_thread(&self) {
1262 // FreeBSD has some weirdness around thread-local destruction.
1263 // TODO: remove this hack when thread id is cleaned up
1264 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1265 debug_assert!(
1266 // if we couldn't get the thread ID because we're dropping the local
1267 // data, skip the assertion --- the `Drop` impl is not going to be
1268 // called from another thread, because `LocalSet` is `!Send`
1269 context::thread_id()
1270 .map(|id| id == self.owner)
1271 .unwrap_or(true),
1272 "`LocalSet`'s local run queue must not be accessed by another thread!"
1273 );
1274 }
1275}
1276
1277// This is `Send` because it is stored in `Shared`. It is up to the caller to
1278// ensure they are on the same thread that owns the `LocalSet`.
1279unsafe impl Send for LocalState {}
1280
1281#[cfg(all(test, not(loom)))]
1282mod tests {
1283 use super::*;
1284
1285 // Does a `LocalSet` running on a current-thread runtime...basically work?
1286 //
1287 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1288 // a lib test, it will run under Miri, so this is necessary to catch stacked
1289 // borrows violations in the `LocalSet` implementation.
1290 #[test]
1291 fn local_current_thread_scheduler() {
1292 let f = async {
1293 LocalSet::new()
1294 .run_until(async {
1295 spawn_local(async {}).await.unwrap();
1296 })
1297 .await;
1298 };
1299 crate::runtime::Builder::new_current_thread()
1300 .build()
1301 .expect("rt")
1302 .block_on(f)
1303 }
1304
1305 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1306 // same thread, the task is woken to the localset's local queue rather than
1307 // its remote queue.
1308 //
1309 // This test has to be defined in the `local.rs` file as a lib test, rather
1310 // than in `tests/`, because it makes assertions about the local set's
1311 // internal state.
1312 #[test]
1313 fn wakes_to_local_queue() {
1314 use super::*;
1315 use crate::sync::Notify;
1316 let rt = crate::runtime::Builder::new_current_thread()
1317 .build()
1318 .expect("rt");
1319 rt.block_on(async {
1320 let local = LocalSet::new();
1321 let notify = Arc::new(Notify::new());
1322 let task = local.spawn_local({
1323 let notify = notify.clone();
1324 async move {
1325 notify.notified().await;
1326 }
1327 });
1328 let mut run_until = Box::pin(local.run_until(async move {
1329 task.await.unwrap();
1330 }));
1331
1332 // poll the run until future once
1333 std::future::poll_fn(|cx| {
1334 let _ = run_until.as_mut().poll(cx);
1335 Poll::Ready(())
1336 })
1337 .await;
1338
1339 notify.notify_one();
1340 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1341 // TODO(eliza): it would be nice to be able to assert that this is
1342 // the local task.
1343 assert!(
1344 task.is_some(),
1345 "task should have been notified to the LocalSet's local queue"
1346 );
1347 })
1348 }
1349}