tokio/runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5    blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, TaskMeta};
9
10use crate::runtime::{LocalOptions, LocalRuntime};
11use crate::util::rand::{RngSeed, RngSeedGenerator};
12
13use crate::runtime::blocking::BlockingPool;
14use crate::runtime::scheduler::CurrentThread;
15use std::fmt;
16use std::io;
17use std::thread::ThreadId;
18use std::time::Duration;
19
20/// Builds Tokio Runtime with custom configuration values.
21///
22/// Methods can be chained in order to set the configuration values. The
23/// Runtime is constructed by calling [`build`].
24///
25/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
26/// or [`Builder::new_current_thread`].
27///
28/// See function level documentation for details on the various configuration
29/// settings.
30///
31/// [`build`]: method@Self::build
32/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
33/// [`Builder::new_current_thread`]: method@Self::new_current_thread
34///
35/// # Examples
36///
37/// ```
38/// # #[cfg(not(target_family = "wasm"))]
39/// # {
40/// use tokio::runtime::Builder;
41///
42/// fn main() {
43///     // build runtime
44///     let runtime = Builder::new_multi_thread()
45///         .worker_threads(4)
46///         .thread_name("my-custom-name")
47///         .thread_stack_size(3 * 1024 * 1024)
48///         .build()
49///         .unwrap();
50///
51///     // use runtime ...
52/// }
53/// # }
54/// ```
55pub struct Builder {
56    /// Runtime type
57    kind: Kind,
58
59    /// Name of the runtime.
60    name: Option<String>,
61
62    /// Whether or not to enable the I/O driver
63    enable_io: bool,
64    nevents: usize,
65
66    /// Whether or not to enable the time driver
67    enable_time: bool,
68
69    /// Whether or not the clock should start paused.
70    start_paused: bool,
71
72    /// The number of worker threads, used by Runtime.
73    ///
74    /// Only used when not using the current-thread executor.
75    worker_threads: Option<usize>,
76
77    /// Cap on thread usage.
78    max_blocking_threads: usize,
79
80    /// Name fn used for threads spawned by the runtime.
81    pub(super) thread_name: ThreadNameFn,
82
83    /// Stack size used for threads spawned by the runtime.
84    pub(super) thread_stack_size: Option<usize>,
85
86    /// Callback to run after each thread starts.
87    pub(super) after_start: Option<Callback>,
88
89    /// To run before each worker thread stops
90    pub(super) before_stop: Option<Callback>,
91
92    /// To run before each worker thread is parked.
93    pub(super) before_park: Option<Callback>,
94
95    /// To run after each thread is unparked.
96    pub(super) after_unpark: Option<Callback>,
97
98    /// To run before each task is spawned.
99    pub(super) before_spawn: Option<TaskCallback>,
100
101    /// To run before each poll
102    #[cfg(tokio_unstable)]
103    pub(super) before_poll: Option<TaskCallback>,
104
105    /// To run after each poll
106    #[cfg(tokio_unstable)]
107    pub(super) after_poll: Option<TaskCallback>,
108
109    /// To run after each task is terminated.
110    pub(super) after_termination: Option<TaskCallback>,
111
112    /// Customizable keep alive timeout for `BlockingPool`
113    pub(super) keep_alive: Option<Duration>,
114
115    /// How many ticks before pulling a task from the global/remote queue?
116    ///
117    /// When `None`, the value is unspecified and behavior details are left to
118    /// the scheduler. Each scheduler flavor could choose to either pick its own
119    /// default value or use some other strategy to decide when to poll from the
120    /// global queue. For example, the multi-threaded scheduler uses a
121    /// self-tuning strategy based on mean task poll times.
122    pub(super) global_queue_interval: Option<u32>,
123
124    /// How many ticks before yielding to the driver for timer and I/O events?
125    pub(super) event_interval: u32,
126
127    /// When true, the multi-threade scheduler LIFO slot should not be used.
128    ///
129    /// This option should only be exposed as unstable.
130    pub(super) disable_lifo_slot: bool,
131
132    /// Specify a random number generator seed to provide deterministic results
133    pub(super) seed_generator: RngSeedGenerator,
134
135    /// When true, enables task poll count histogram instrumentation.
136    pub(super) metrics_poll_count_histogram_enable: bool,
137
138    /// Configures the task poll count histogram
139    pub(super) metrics_poll_count_histogram: HistogramBuilder,
140
141    #[cfg(tokio_unstable)]
142    pub(super) unhandled_panic: UnhandledPanic,
143
144    timer_flavor: TimerFlavor,
145}
146
147cfg_unstable! {
148    /// How the runtime should respond to unhandled panics.
149    ///
150    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
151    /// to configure the runtime behavior when a spawned task panics.
152    ///
153    /// See [`Builder::unhandled_panic`] for more details.
154    #[derive(Debug, Clone)]
155    #[non_exhaustive]
156    pub enum UnhandledPanic {
157        /// The runtime should ignore panics on spawned tasks.
158        ///
159        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
160        /// tasks continue running normally.
161        ///
162        /// This is the default behavior.
163        ///
164        /// # Examples
165        ///
166        /// ```
167        /// # #[cfg(not(target_family = "wasm"))]
168        /// # {
169        /// use tokio::runtime::{self, UnhandledPanic};
170        ///
171        /// # pub fn main() {
172        /// let rt = runtime::Builder::new_current_thread()
173        ///     .unhandled_panic(UnhandledPanic::Ignore)
174        ///     .build()
175        ///     .unwrap();
176        ///
177        /// let task1 = rt.spawn(async { panic!("boom"); });
178        /// let task2 = rt.spawn(async {
179        ///     // This task completes normally
180        ///     "done"
181        /// });
182        ///
183        /// rt.block_on(async {
184        ///     // The panic on the first task is forwarded to the `JoinHandle`
185        ///     assert!(task1.await.is_err());
186        ///
187        ///     // The second task completes normally
188        ///     assert!(task2.await.is_ok());
189        /// })
190        /// # }
191        /// # }
192        /// ```
193        ///
194        /// [`JoinHandle`]: struct@crate::task::JoinHandle
195        Ignore,
196
197        /// The runtime should immediately shutdown if a spawned task panics.
198        ///
199        /// The runtime will immediately shutdown even if the panicked task's
200        /// [`JoinHandle`] is still available. All further spawned tasks will be
201        /// immediately dropped and call to [`Runtime::block_on`] will panic.
202        ///
203        /// # Examples
204        ///
205        /// ```should_panic
206        /// use tokio::runtime::{self, UnhandledPanic};
207        ///
208        /// # pub fn main() {
209        /// let rt = runtime::Builder::new_current_thread()
210        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
211        ///     .build()
212        ///     .unwrap();
213        ///
214        /// rt.spawn(async { panic!("boom"); });
215        /// rt.spawn(async {
216        ///     // This task never completes.
217        /// });
218        ///
219        /// rt.block_on(async {
220        ///     // Do some work
221        /// # loop { tokio::task::yield_now().await; }
222        /// })
223        /// # }
224        /// ```
225        ///
226        /// [`JoinHandle`]: struct@crate::task::JoinHandle
227        ShutdownRuntime,
228    }
229}
230
231pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
232
233#[derive(Clone, Copy)]
234pub(crate) enum Kind {
235    CurrentThread,
236    #[cfg(feature = "rt-multi-thread")]
237    MultiThread,
238}
239
240impl Builder {
241    /// Returns a new builder with the current thread scheduler selected.
242    ///
243    /// Configuration methods can be chained on the return value.
244    ///
245    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
246    /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`].
247    ///
248    /// [`LocalSet`]: crate::task::LocalSet
249    /// [`LocalRuntime`]: crate::runtime::LocalRuntime
250    /// [`build_local`]: crate::runtime::Builder::build_local
251    pub fn new_current_thread() -> Builder {
252        #[cfg(loom)]
253        const EVENT_INTERVAL: u32 = 4;
254        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
255        #[cfg(not(loom))]
256        const EVENT_INTERVAL: u32 = 61;
257
258        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
259    }
260
261    /// Returns a new builder with the multi thread scheduler selected.
262    ///
263    /// Configuration methods can be chained on the return value.
264    #[cfg(feature = "rt-multi-thread")]
265    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
266    pub fn new_multi_thread() -> Builder {
267        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
268        Builder::new(Kind::MultiThread, 61)
269    }
270
271    /// Returns a new runtime builder initialized with default configuration
272    /// values.
273    ///
274    /// Configuration methods can be chained on the return value.
275    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
276        Builder {
277            kind,
278
279            // Default runtime name
280            name: None,
281
282            // I/O defaults to "off"
283            enable_io: false,
284            nevents: 1024,
285
286            // Time defaults to "off"
287            enable_time: false,
288
289            // The clock starts not-paused
290            start_paused: false,
291
292            // Read from environment variable first in multi-threaded mode.
293            // Default to lazy auto-detection (one thread per CPU core)
294            worker_threads: None,
295
296            max_blocking_threads: 512,
297
298            // Default thread name
299            thread_name: std::sync::Arc::new(|| "tokio-rt-worker".into()),
300
301            // Do not set a stack size by default
302            thread_stack_size: None,
303
304            // No worker thread callbacks
305            after_start: None,
306            before_stop: None,
307            before_park: None,
308            after_unpark: None,
309
310            before_spawn: None,
311            after_termination: None,
312
313            #[cfg(tokio_unstable)]
314            before_poll: None,
315            #[cfg(tokio_unstable)]
316            after_poll: None,
317
318            keep_alive: None,
319
320            // Defaults for these values depend on the scheduler kind, so we get them
321            // as parameters.
322            global_queue_interval: None,
323            event_interval,
324
325            seed_generator: RngSeedGenerator::new(RngSeed::new()),
326
327            #[cfg(tokio_unstable)]
328            unhandled_panic: UnhandledPanic::Ignore,
329
330            metrics_poll_count_histogram_enable: false,
331
332            metrics_poll_count_histogram: HistogramBuilder::default(),
333
334            disable_lifo_slot: false,
335
336            timer_flavor: TimerFlavor::Traditional,
337        }
338    }
339
340    /// Enables both I/O and time drivers.
341    ///
342    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
343    /// individually. If additional components are added to Tokio in the future,
344    /// `enable_all` will include these future components.
345    ///
346    /// # Examples
347    ///
348    /// ```
349    /// # #[cfg(not(target_family = "wasm"))]
350    /// # {
351    /// use tokio::runtime;
352    ///
353    /// let rt = runtime::Builder::new_multi_thread()
354    ///     .enable_all()
355    ///     .build()
356    ///     .unwrap();
357    /// # }
358    /// ```
359    pub fn enable_all(&mut self) -> &mut Self {
360        #[cfg(any(
361            feature = "net",
362            all(unix, feature = "process"),
363            all(unix, feature = "signal")
364        ))]
365        self.enable_io();
366
367        #[cfg(all(
368            tokio_unstable,
369            feature = "io-uring",
370            feature = "rt",
371            feature = "fs",
372            target_os = "linux",
373        ))]
374        self.enable_io_uring();
375
376        #[cfg(feature = "time")]
377        self.enable_time();
378
379        self
380    }
381
382    /// Enables the alternative timer implementation, which is disabled by default.
383    ///
384    /// The alternative timer implementation is an unstable feature that may
385    /// provide better performance on multi-threaded runtimes with a large number
386    /// of worker threads.
387    ///
388    /// This option only applies to multi-threaded runtimes. Attempting to use
389    /// this option with any other runtime type will have no effect.
390    ///
391    /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
392    ///
393    /// # Examples
394    ///
395    /// ```
396    /// # #[cfg(not(target_family = "wasm"))]
397    /// # {
398    /// use tokio::runtime;
399    ///
400    /// let rt = runtime::Builder::new_multi_thread()
401    ///   .enable_alt_timer()
402    ///   .build()
403    ///   .unwrap();
404    /// # }
405    /// ```
406    #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
407    #[cfg_attr(
408        docsrs,
409        doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
410    )]
411    pub fn enable_alt_timer(&mut self) -> &mut Self {
412        self.enable_time();
413        self.timer_flavor = TimerFlavor::Alternative;
414        self
415    }
416
417    /// Sets the number of worker threads the `Runtime` will use.
418    ///
419    /// This can be any number above 0 though it is advised to keep this value
420    /// on the smaller side.
421    ///
422    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
423    ///
424    /// # Default
425    ///
426    /// The default value is the number of cores available to the system.
427    ///
428    /// When using the `current_thread` runtime this method has no effect.
429    ///
430    /// # Examples
431    ///
432    /// ## Multi threaded runtime with 4 threads
433    ///
434    /// ```
435    /// # #[cfg(not(target_family = "wasm"))]
436    /// # {
437    /// use tokio::runtime;
438    ///
439    /// // This will spawn a work-stealing runtime with 4 worker threads.
440    /// let rt = runtime::Builder::new_multi_thread()
441    ///     .worker_threads(4)
442    ///     .build()
443    ///     .unwrap();
444    ///
445    /// rt.spawn(async move {});
446    /// # }
447    /// ```
448    ///
449    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
450    ///
451    /// ```
452    /// use tokio::runtime;
453    ///
454    /// // Create a runtime that _must_ be driven from a call
455    /// // to `Runtime::block_on`.
456    /// let rt = runtime::Builder::new_current_thread()
457    ///     .build()
458    ///     .unwrap();
459    ///
460    /// // This will run the runtime and future on the current thread
461    /// rt.block_on(async move {});
462    /// ```
463    ///
464    /// # Panics
465    ///
466    /// This will panic if `val` is not larger than `0`.
467    #[track_caller]
468    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
469        assert!(val > 0, "Worker threads cannot be set to 0");
470        self.worker_threads = Some(val);
471        self
472    }
473
474    /// Specifies the limit for additional threads spawned by the Runtime.
475    ///
476    /// These threads are used for blocking operations like tasks spawned
477    /// through [`spawn_blocking`], this includes but is not limited to:
478    /// - [`fs`] operations
479    /// - dns resolution through [`ToSocketAddrs`]
480    /// - writing to [`Stdout`] or [`Stderr`]
481    /// - reading from [`Stdin`]
482    ///
483    /// Unlike the [`worker_threads`], they are not always active and will exit
484    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
485    ///
486    /// It's recommended to not set this limit too low in order to avoid hanging on operations
487    /// requiring [`spawn_blocking`].
488    ///
489    /// The default value is 512.
490    ///
491    /// # Queue Behavior
492    ///
493    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
494    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
495    /// method has not been reached, a new thread will be spawned. If no idle thread is available
496    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
497    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
498    /// it could potentially grow unbounded.
499    ///
500    /// # Panics
501    ///
502    /// This will panic if `val` is not larger than `0`.
503    ///
504    /// # Upgrading from 0.x
505    ///
506    /// In old versions `max_threads` limited both blocking and worker threads, but the
507    /// current `max_blocking_threads` does not include async worker threads in the count.
508    ///
509    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
510    /// [`fs`]: mod@crate::fs
511    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
512    /// [`Stdout`]: struct@crate::io::Stdout
513    /// [`Stdin`]: struct@crate::io::Stdin
514    /// [`Stderr`]: struct@crate::io::Stderr
515    /// [`worker_threads`]: Self::worker_threads
516    /// [`thread_keep_alive`]: Self::thread_keep_alive
517    #[track_caller]
518    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
519    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
520        assert!(val > 0, "Max blocking threads cannot be set to 0");
521        self.max_blocking_threads = val;
522        self
523    }
524
525    /// Sets name of threads spawned by the `Runtime`'s thread pool.
526    ///
527    /// The default name is "tokio-rt-worker".
528    ///
529    /// # Examples
530    ///
531    /// ```
532    /// # #[cfg(not(target_family = "wasm"))]
533    /// # {
534    /// # use tokio::runtime;
535    ///
536    /// # pub fn main() {
537    /// let rt = runtime::Builder::new_multi_thread()
538    ///     .thread_name("my-pool")
539    ///     .build();
540    /// # }
541    /// # }
542    /// ```
543    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
544        let val = val.into();
545        self.thread_name = std::sync::Arc::new(move || val.clone());
546        self
547    }
548
549    /// Sets the name of the runtime.
550    ///
551    /// # Examples
552    ///
553    /// ```
554    /// # #[cfg(not(target_family = "wasm"))]
555    /// # {
556    /// # use tokio::runtime;
557    ///
558    /// # pub fn main() {
559    /// let rt = runtime::Builder::new_multi_thread()
560    ///     .name("my-runtime")
561    ///     .build();
562    /// # }
563    /// # }
564    /// ```
565    /// # Panics
566    ///
567    /// This function will panic if an empty value is passed as an argument.
568    ///
569    #[track_caller]
570    pub fn name(&mut self, val: impl Into<String>) -> &mut Self {
571        let val = val.into();
572        assert!(!val.trim().is_empty(), "runtime name shouldn't be empty");
573        self.name = Some(val);
574        self
575    }
576
577    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
578    ///
579    /// The default name fn is `|| "tokio-rt-worker".into()`.
580    ///
581    /// # Examples
582    ///
583    /// ```
584    /// # #[cfg(not(target_family = "wasm"))]
585    /// # {
586    /// # use tokio::runtime;
587    /// # use std::sync::atomic::{AtomicUsize, Ordering};
588    /// # pub fn main() {
589    /// let rt = runtime::Builder::new_multi_thread()
590    ///     .thread_name_fn(|| {
591    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
592    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
593    ///        format!("my-pool-{}", id)
594    ///     })
595    ///     .build();
596    /// # }
597    /// # }
598    /// ```
599    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
600    where
601        F: Fn() -> String + Send + Sync + 'static,
602    {
603        self.thread_name = std::sync::Arc::new(f);
604        self
605    }
606
607    /// Sets the stack size (in bytes) for worker threads.
608    ///
609    /// The actual stack size may be greater than this value if the platform
610    /// specifies minimal stack size.
611    ///
612    /// The default stack size for spawned threads is 2 MiB, though this
613    /// particular stack size is subject to change in the future.
614    ///
615    /// # Examples
616    ///
617    /// ```
618    /// # #[cfg(not(target_family = "wasm"))]
619    /// # {
620    /// # use tokio::runtime;
621    ///
622    /// # pub fn main() {
623    /// let rt = runtime::Builder::new_multi_thread()
624    ///     .thread_stack_size(32 * 1024)
625    ///     .build();
626    /// # }
627    /// # }
628    /// ```
629    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
630        self.thread_stack_size = Some(val);
631        self
632    }
633
634    /// Executes function `f` after each thread is started but before it starts
635    /// doing work.
636    ///
637    /// This is intended for bookkeeping and monitoring use cases.
638    ///
639    /// # Examples
640    ///
641    /// ```
642    /// # #[cfg(not(target_family = "wasm"))]
643    /// # {
644    /// # use tokio::runtime;
645    /// # pub fn main() {
646    /// let runtime = runtime::Builder::new_multi_thread()
647    ///     .on_thread_start(|| {
648    ///         println!("thread started");
649    ///     })
650    ///     .build();
651    /// # }
652    /// # }
653    /// ```
654    #[cfg(not(loom))]
655    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
656    where
657        F: Fn() + Send + Sync + 'static,
658    {
659        self.after_start = Some(std::sync::Arc::new(f));
660        self
661    }
662
663    /// Executes function `f` before each thread stops.
664    ///
665    /// This is intended for bookkeeping and monitoring use cases.
666    ///
667    /// # Examples
668    ///
669    /// ```
670    /// # #[cfg(not(target_family = "wasm"))]
671    /// {
672    /// # use tokio::runtime;
673    /// # pub fn main() {
674    /// let runtime = runtime::Builder::new_multi_thread()
675    ///     .on_thread_stop(|| {
676    ///         println!("thread stopping");
677    ///     })
678    ///     .build();
679    /// # }
680    /// # }
681    /// ```
682    #[cfg(not(loom))]
683    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
684    where
685        F: Fn() + Send + Sync + 'static,
686    {
687        self.before_stop = Some(std::sync::Arc::new(f));
688        self
689    }
690
691    /// Executes function `f` just before a thread is parked (goes idle).
692    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
693    /// can be called, and may result in this thread being unparked immediately.
694    ///
695    /// This can be used to start work only when the executor is idle, or for bookkeeping
696    /// and monitoring purposes.
697    ///
698    /// Note: There can only be one park callback for a runtime; calling this function
699    /// more than once replaces the last callback defined, rather than adding to it.
700    ///
701    /// # Examples
702    ///
703    /// ## Multithreaded executor
704    /// ```
705    /// # #[cfg(not(target_family = "wasm"))]
706    /// # {
707    /// # use std::sync::Arc;
708    /// # use std::sync::atomic::{AtomicBool, Ordering};
709    /// # use tokio::runtime;
710    /// # use tokio::sync::Barrier;
711    /// # pub fn main() {
712    /// let once = AtomicBool::new(true);
713    /// let barrier = Arc::new(Barrier::new(2));
714    ///
715    /// let runtime = runtime::Builder::new_multi_thread()
716    ///     .worker_threads(1)
717    ///     .on_thread_park({
718    ///         let barrier = barrier.clone();
719    ///         move || {
720    ///             let barrier = barrier.clone();
721    ///             if once.swap(false, Ordering::Relaxed) {
722    ///                 tokio::spawn(async move { barrier.wait().await; });
723    ///            }
724    ///         }
725    ///     })
726    ///     .build()
727    ///     .unwrap();
728    ///
729    /// runtime.block_on(async {
730    ///    barrier.wait().await;
731    /// })
732    /// # }
733    /// # }
734    /// ```
735    /// ## Current thread executor
736    /// ```
737    /// # use std::sync::Arc;
738    /// # use std::sync::atomic::{AtomicBool, Ordering};
739    /// # use tokio::runtime;
740    /// # use tokio::sync::Barrier;
741    /// # pub fn main() {
742    /// let once = AtomicBool::new(true);
743    /// let barrier = Arc::new(Barrier::new(2));
744    ///
745    /// let runtime = runtime::Builder::new_current_thread()
746    ///     .on_thread_park({
747    ///         let barrier = barrier.clone();
748    ///         move || {
749    ///             let barrier = barrier.clone();
750    ///             if once.swap(false, Ordering::Relaxed) {
751    ///                 tokio::spawn(async move { barrier.wait().await; });
752    ///            }
753    ///         }
754    ///     })
755    ///     .build()
756    ///     .unwrap();
757    ///
758    /// runtime.block_on(async {
759    ///    barrier.wait().await;
760    /// })
761    /// # }
762    /// ```
763    #[cfg(not(loom))]
764    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
765    where
766        F: Fn() + Send + Sync + 'static,
767    {
768        self.before_park = Some(std::sync::Arc::new(f));
769        self
770    }
771
772    /// Executes function `f` just after a thread unparks (starts executing tasks).
773    ///
774    /// This is intended for bookkeeping and monitoring use cases; note that work
775    /// in this callback will increase latencies when the application has allowed one or
776    /// more runtime threads to go idle.
777    ///
778    /// Note: There can only be one unpark callback for a runtime; calling this function
779    /// more than once replaces the last callback defined, rather than adding to it.
780    ///
781    /// # Examples
782    ///
783    /// ```
784    /// # #[cfg(not(target_family = "wasm"))]
785    /// # {
786    /// # use tokio::runtime;
787    /// # pub fn main() {
788    /// let runtime = runtime::Builder::new_multi_thread()
789    ///     .on_thread_unpark(|| {
790    ///         println!("thread unparking");
791    ///     })
792    ///     .build();
793    ///
794    /// runtime.unwrap().block_on(async {
795    ///    tokio::task::yield_now().await;
796    ///    println!("Hello from Tokio!");
797    /// })
798    /// # }
799    /// # }
800    /// ```
801    #[cfg(not(loom))]
802    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
803    where
804        F: Fn() + Send + Sync + 'static,
805    {
806        self.after_unpark = Some(std::sync::Arc::new(f));
807        self
808    }
809
810    /// Executes function `f` just before a task is spawned.
811    ///
812    /// `f` is called within the Tokio context, so functions like
813    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
814    /// invoked immediately.
815    ///
816    /// This can be used for bookkeeping or monitoring purposes.
817    ///
818    /// Note: There can only be one spawn callback for a runtime; calling this function more
819    /// than once replaces the last callback defined, rather than adding to it.
820    ///
821    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
822    ///
823    /// **Note**: This is an [unstable API][unstable]. The public API of this type
824    /// may break in 1.x releases. See [the documentation on unstable
825    /// features][unstable] for details.
826    ///
827    /// [unstable]: crate#unstable-features
828    ///
829    /// # Examples
830    ///
831    /// ```
832    /// # use tokio::runtime;
833    /// # pub fn main() {
834    /// let runtime = runtime::Builder::new_current_thread()
835    ///     .on_task_spawn(|_| {
836    ///         println!("spawning task");
837    ///     })
838    ///     .build()
839    ///     .unwrap();
840    ///
841    /// runtime.block_on(async {
842    ///     tokio::task::spawn(std::future::ready(()));
843    ///
844    ///     for _ in 0..64 {
845    ///         tokio::task::yield_now().await;
846    ///     }
847    /// })
848    /// # }
849    /// ```
850    #[cfg(all(not(loom), tokio_unstable))]
851    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
852    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
853    where
854        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
855    {
856        self.before_spawn = Some(std::sync::Arc::new(f));
857        self
858    }
859
860    /// Executes function `f` just before a task is polled
861    ///
862    /// `f` is called within the Tokio context, so functions like
863    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
864    /// invoked immediately.
865    ///
866    /// **Note**: This is an [unstable API][unstable]. The public API of this type
867    /// may break in 1.x releases. See [the documentation on unstable
868    /// features][unstable] for details.
869    ///
870    /// [unstable]: crate#unstable-features
871    ///
872    /// # Examples
873    ///
874    /// ```
875    /// # #[cfg(not(target_family = "wasm"))]
876    /// # {
877    /// # use std::sync::{atomic::AtomicUsize, Arc};
878    /// # use tokio::task::yield_now;
879    /// # pub fn main() {
880    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
881    /// let poll_start = poll_start_counter.clone();
882    /// let rt = tokio::runtime::Builder::new_multi_thread()
883    ///     .enable_all()
884    ///     .on_before_task_poll(move |meta| {
885    ///         println!("task {} is about to be polled", meta.id())
886    ///     })
887    ///     .build()
888    ///     .unwrap();
889    /// let task = rt.spawn(async {
890    ///     yield_now().await;
891    /// });
892    /// let _ = rt.block_on(task);
893    ///
894    /// # }
895    /// # }
896    /// ```
897    #[cfg(tokio_unstable)]
898    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
899    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
900    where
901        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
902    {
903        self.before_poll = Some(std::sync::Arc::new(f));
904        self
905    }
906
907    /// Executes function `f` just after a task is polled
908    ///
909    /// `f` is called within the Tokio context, so functions like
910    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
911    /// invoked immediately.
912    ///
913    /// **Note**: This is an [unstable API][unstable]. The public API of this type
914    /// may break in 1.x releases. See [the documentation on unstable
915    /// features][unstable] for details.
916    ///
917    /// [unstable]: crate#unstable-features
918    ///
919    /// # Examples
920    ///
921    /// ```
922    /// # #[cfg(not(target_family = "wasm"))]
923    /// # {
924    /// # use std::sync::{atomic::AtomicUsize, Arc};
925    /// # use tokio::task::yield_now;
926    /// # pub fn main() {
927    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
928    /// let poll_stop = poll_stop_counter.clone();
929    /// let rt = tokio::runtime::Builder::new_multi_thread()
930    ///     .enable_all()
931    ///     .on_after_task_poll(move |meta| {
932    ///         println!("task {} completed polling", meta.id());
933    ///     })
934    ///     .build()
935    ///     .unwrap();
936    /// let task = rt.spawn(async {
937    ///     yield_now().await;
938    /// });
939    /// let _ = rt.block_on(task);
940    ///
941    /// # }
942    /// # }
943    /// ```
944    #[cfg(tokio_unstable)]
945    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
946    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
947    where
948        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
949    {
950        self.after_poll = Some(std::sync::Arc::new(f));
951        self
952    }
953
954    /// Executes function `f` just after a task is terminated.
955    ///
956    /// `f` is called within the Tokio context, so functions like
957    /// [`tokio::spawn`](crate::spawn) can be called.
958    ///
959    /// This can be used for bookkeeping or monitoring purposes.
960    ///
961    /// Note: There can only be one task termination callback for a runtime; calling this
962    /// function more than once replaces the last callback defined, rather than adding to it.
963    ///
964    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
965    ///
966    /// **Note**: This is an [unstable API][unstable]. The public API of this type
967    /// may break in 1.x releases. See [the documentation on unstable
968    /// features][unstable] for details.
969    ///
970    /// [unstable]: crate#unstable-features
971    ///
972    /// # Examples
973    ///
974    /// ```
975    /// # use tokio::runtime;
976    /// # pub fn main() {
977    /// let runtime = runtime::Builder::new_current_thread()
978    ///     .on_task_terminate(|_| {
979    ///         println!("killing task");
980    ///     })
981    ///     .build()
982    ///     .unwrap();
983    ///
984    /// runtime.block_on(async {
985    ///     tokio::task::spawn(std::future::ready(()));
986    ///
987    ///     for _ in 0..64 {
988    ///         tokio::task::yield_now().await;
989    ///     }
990    /// })
991    /// # }
992    /// ```
993    #[cfg(all(not(loom), tokio_unstable))]
994    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
995    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
996    where
997        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
998    {
999        self.after_termination = Some(std::sync::Arc::new(f));
1000        self
1001    }
1002
1003    /// Creates the configured `Runtime`.
1004    ///
1005    /// The returned `Runtime` instance is ready to spawn tasks.
1006    ///
1007    /// # Examples
1008    ///
1009    /// ```
1010    /// # #[cfg(not(target_family = "wasm"))]
1011    /// # {
1012    /// use tokio::runtime::Builder;
1013    ///
1014    /// let rt  = Builder::new_multi_thread().build().unwrap();
1015    ///
1016    /// rt.block_on(async {
1017    ///     println!("Hello from the Tokio runtime");
1018    /// });
1019    /// # }
1020    /// ```
1021    pub fn build(&mut self) -> io::Result<Runtime> {
1022        match &self.kind {
1023            Kind::CurrentThread => self.build_current_thread_runtime(),
1024            #[cfg(feature = "rt-multi-thread")]
1025            Kind::MultiThread => self.build_threaded_runtime(),
1026        }
1027    }
1028
1029    /// Creates the configured [`LocalRuntime`].
1030    ///
1031    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
1032    ///
1033    /// # Panics
1034    ///
1035    /// This will panic if the runtime is configured with [`new_multi_thread()`].
1036    ///
1037    /// [`new_multi_thread()`]: Builder::new_multi_thread
1038    ///
1039    /// # Examples
1040    ///
1041    /// ```
1042    /// use tokio::runtime::{Builder, LocalOptions};
1043    ///
1044    /// let rt = Builder::new_current_thread()
1045    ///     .build_local(LocalOptions::default())
1046    ///     .unwrap();
1047    ///
1048    /// rt.spawn_local(async {
1049    ///     println!("Hello from the Tokio runtime");
1050    /// });
1051    /// ```
1052    #[allow(unused_variables, unreachable_patterns)]
1053    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1054        match &self.kind {
1055            Kind::CurrentThread => self.build_current_thread_local_runtime(),
1056            #[cfg(feature = "rt-multi-thread")]
1057            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1058        }
1059    }
1060
1061    fn get_cfg(&self) -> driver::Cfg {
1062        driver::Cfg {
1063            enable_pause_time: match self.kind {
1064                Kind::CurrentThread => true,
1065                #[cfg(feature = "rt-multi-thread")]
1066                Kind::MultiThread => false,
1067            },
1068            enable_io: self.enable_io,
1069            enable_time: self.enable_time,
1070            start_paused: self.start_paused,
1071            nevents: self.nevents,
1072            timer_flavor: self.timer_flavor,
1073        }
1074    }
1075
1076    /// Sets a custom timeout for a thread in the blocking pool.
1077    ///
1078    /// By default, the timeout for a thread is set to 10 seconds. This can
1079    /// be overridden using `.thread_keep_alive()`.
1080    ///
1081    /// # Example
1082    ///
1083    /// ```
1084    /// # #[cfg(not(target_family = "wasm"))]
1085    /// # {
1086    /// # use tokio::runtime;
1087    /// # use std::time::Duration;
1088    /// # pub fn main() {
1089    /// let rt = runtime::Builder::new_multi_thread()
1090    ///     .thread_keep_alive(Duration::from_millis(100))
1091    ///     .build();
1092    /// # }
1093    /// # }
1094    /// ```
1095    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1096        self.keep_alive = Some(duration);
1097        self
1098    }
1099
1100    /// Sets the number of scheduler ticks after which the scheduler will poll the global
1101    /// task queue.
1102    ///
1103    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1104    ///
1105    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1106    /// [the module documentation] for the default behavior of the multi-thread scheduler.
1107    ///
1108    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1109    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1110    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1111    /// getting started on new work, especially if tasks frequently yield rather than complete
1112    /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1113    /// tasks from the local queue will be executed only if the global queue is empty.
1114    /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1115    /// tasks quickly complete polling.
1116    ///
1117    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1118    ///
1119    /// # Panics
1120    ///
1121    /// This function will panic if 0 is passed as an argument.
1122    ///
1123    /// # Examples
1124    ///
1125    /// ```
1126    /// # #[cfg(not(target_family = "wasm"))]
1127    /// # {
1128    /// # use tokio::runtime;
1129    /// # pub fn main() {
1130    /// let rt = runtime::Builder::new_multi_thread()
1131    ///     .global_queue_interval(31)
1132    ///     .build();
1133    /// # }
1134    /// # }
1135    /// ```
1136    #[track_caller]
1137    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1138        assert!(val > 0, "global_queue_interval must be greater than 0");
1139        self.global_queue_interval = Some(val);
1140        self
1141    }
1142
1143    /// Sets the number of scheduler ticks after which the scheduler will poll for
1144    /// external events (timers, I/O, and so on).
1145    ///
1146    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1147    ///
1148    /// By default, the event interval is `61` for all scheduler types.
1149    ///
1150    /// Setting the event interval determines the effective "priority" of delivering
1151    /// these external events (which may wake up additional tasks), compared to
1152    /// executing tasks that are currently ready to run. A smaller value is useful
1153    /// when tasks frequently spend a long time in polling, or infrequently yield,
1154    /// which can result in overly long delays picking up I/O events. Conversely,
1155    /// picking up new events requires extra synchronization and syscall overhead,
1156    /// so if tasks generally complete their polling quickly, a higher event interval
1157    /// will minimize that overhead while still keeping the scheduler responsive to
1158    /// events.
1159    ///
1160    /// # Panics
1161    ///
1162    /// This function will panic if 0 is passed as an argument.
1163    ///
1164    /// # Examples
1165    ///
1166    /// ```
1167    /// # #[cfg(not(target_family = "wasm"))]
1168    /// # {
1169    /// # use tokio::runtime;
1170    /// # pub fn main() {
1171    /// let rt = runtime::Builder::new_multi_thread()
1172    ///     .event_interval(31)
1173    ///     .build();
1174    /// # }
1175    /// # }
1176    /// ```
1177    #[track_caller]
1178    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1179        assert!(val > 0, "event_interval must be greater than 0");
1180        self.event_interval = val;
1181        self
1182    }
1183
1184    cfg_unstable! {
1185        /// Configure how the runtime responds to an unhandled panic on a
1186        /// spawned task.
1187        ///
1188        /// By default, an unhandled panic (i.e. a panic not caught by
1189        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1190        /// execution. The panic's error value is forwarded to the task's
1191        /// [`JoinHandle`] and all other spawned tasks continue running.
1192        ///
1193        /// The `unhandled_panic` option enables configuring this behavior.
1194        ///
1195        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1196        ///   spawned tasks have no impact on the runtime's execution.
1197        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1198        ///   shutdown immediately when a spawned task panics even if that
1199        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1200        ///   will immediately terminate and further calls to
1201        ///   [`Runtime::block_on`] will panic.
1202        ///
1203        /// # Panics
1204        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1205        /// on a runtime other than the current thread runtime.
1206        ///
1207        /// # Unstable
1208        ///
1209        /// This option is currently unstable and its implementation is
1210        /// incomplete. The API may change or be removed in the future. See
1211        /// issue [tokio-rs/tokio#4516] for more details.
1212        ///
1213        /// # Examples
1214        ///
1215        /// The following demonstrates a runtime configured to shutdown on
1216        /// panic. The first spawned task panics and results in the runtime
1217        /// shutting down. The second spawned task never has a chance to
1218        /// execute. The call to `block_on` will panic due to the runtime being
1219        /// forcibly shutdown.
1220        ///
1221        /// ```should_panic
1222        /// use tokio::runtime::{self, UnhandledPanic};
1223        ///
1224        /// # pub fn main() {
1225        /// let rt = runtime::Builder::new_current_thread()
1226        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1227        ///     .build()
1228        ///     .unwrap();
1229        ///
1230        /// rt.spawn(async { panic!("boom"); });
1231        /// rt.spawn(async {
1232        ///     // This task never completes.
1233        /// });
1234        ///
1235        /// rt.block_on(async {
1236        ///     // Do some work
1237        /// # loop { tokio::task::yield_now().await; }
1238        /// })
1239        /// # }
1240        /// ```
1241        ///
1242        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1243        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1244        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1245            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1246                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1247            }
1248
1249            self.unhandled_panic = behavior;
1250            self
1251        }
1252
1253        /// Disables the LIFO task scheduler heuristic.
1254        ///
1255        /// The multi-threaded scheduler includes a heuristic for optimizing
1256        /// message-passing patterns. This heuristic results in the **last**
1257        /// scheduled task being polled first.
1258        ///
1259        /// To implement this heuristic, each worker thread has a slot which
1260        /// holds the task that should be polled next. In earlier versions of
1261        /// Tokio, this slot could not be stolen by other worker threads, which
1262        /// can result in lower total throughput when tasks tend to have longer
1263        /// poll times.
1264        ///
1265        /// This configuration option will disable this heuristic resulting in
1266        /// all scheduled tasks being pushed into the worker-local queue. This
1267        /// was intended as a workaround for the LIFO slot not being stealable.
1268        /// As of Tokio 1.51, tasks can be stolen from the LIFO slot. In a
1269        /// future version, this option may be deprecated.
1270        ///
1271        /// # Unstable
1272        ///
1273        /// This configuration option was considered a workaround for the LIFO
1274        /// slot not being stealable. Since this is no longer the case, we will
1275        /// revisit whether or not this option is necessary. See
1276        /// issue [tokio-rs/tokio#4941].
1277        ///
1278        /// # Examples
1279        ///
1280        /// ```
1281        /// # #[cfg(not(target_family = "wasm"))]
1282        /// # {
1283        /// use tokio::runtime;
1284        ///
1285        /// let rt = runtime::Builder::new_multi_thread()
1286        ///     .disable_lifo_slot()
1287        ///     .build()
1288        ///     .unwrap();
1289        /// # }
1290        /// ```
1291        ///
1292        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1293        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1294        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1295            self.disable_lifo_slot = true;
1296            self
1297        }
1298
1299        /// Specifies the random number generation seed to use within all
1300        /// threads associated with the runtime being built.
1301        ///
1302        /// This option is intended to make certain parts of the runtime
1303        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1304        /// [`tokio::select!`] it will ensure that the order that branches are
1305        /// polled is deterministic.
1306        ///
1307        /// In addition to the code specifying `rng_seed` and interacting with
1308        /// the runtime, the internals of Tokio and the Rust compiler may affect
1309        /// the sequences of random numbers. In order to ensure repeatable
1310        /// results, the version of Tokio, the versions of all other
1311        /// dependencies that interact with Tokio, and the Rust compiler version
1312        /// should also all remain constant.
1313        ///
1314        /// # Examples
1315        ///
1316        /// ```
1317        /// # use tokio::runtime::{self, RngSeed};
1318        /// # pub fn main() {
1319        /// let seed = RngSeed::from_bytes(b"place your seed here");
1320        /// let rt = runtime::Builder::new_current_thread()
1321        ///     .rng_seed(seed)
1322        ///     .build();
1323        /// # }
1324        /// ```
1325        ///
1326        /// [`tokio::select!`]: crate::select
1327        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1328            self.seed_generator = RngSeedGenerator::new(seed);
1329            self
1330        }
1331    }
1332
1333    cfg_unstable_metrics! {
1334        /// Enables tracking the distribution of task poll times.
1335        ///
1336        /// Task poll times are not instrumented by default as doing so requires
1337        /// calling [`Instant::now()`] twice per task poll, which could add
1338        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1339        /// metrics data.
1340        ///
1341        /// The histogram uses fixed bucket sizes. In other words, the histogram
1342        /// buckets are not dynamic based on input values. Use the
1343        /// `metrics_poll_time_histogram` builder methods to configure the
1344        /// histogram details.
1345        ///
1346        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1347        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1348        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1349        /// to select [`LogHistogram`] instead.
1350        ///
1351        /// # Examples
1352        ///
1353        /// ```
1354        /// # #[cfg(not(target_family = "wasm"))]
1355        /// # {
1356        /// use tokio::runtime;
1357        ///
1358        /// let rt = runtime::Builder::new_multi_thread()
1359        ///     .enable_metrics_poll_time_histogram()
1360        ///     .build()
1361        ///     .unwrap();
1362        /// # // Test default values here
1363        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1364        /// # let m = rt.handle().metrics();
1365        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1366        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1367        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1368        /// # }
1369        /// ```
1370        ///
1371        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1372        /// [`Instant::now()`]: std::time::Instant::now
1373        /// [`LogHistogram`]: crate::runtime::LogHistogram
1374        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1375        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1376            self.metrics_poll_count_histogram_enable = true;
1377            self
1378        }
1379
1380        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1381        ///
1382        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1383        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1384        #[doc(hidden)]
1385        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1386            self.enable_metrics_poll_time_histogram()
1387        }
1388
1389        /// Sets the histogram scale for tracking the distribution of task poll
1390        /// times.
1391        ///
1392        /// Tracking the distribution of task poll times can be done using a
1393        /// linear or log scale. When using linear scale, each histogram bucket
1394        /// will represent the same range of poll times. When using log scale,
1395        /// each histogram bucket will cover a range twice as big as the
1396        /// previous bucket.
1397        ///
1398        /// **Default:** linear scale.
1399        ///
1400        /// # Examples
1401        ///
1402        /// ```
1403        /// # #[cfg(not(target_family = "wasm"))]
1404        /// # {
1405        /// use tokio::runtime::{self, HistogramScale};
1406        ///
1407        /// # #[allow(deprecated)]
1408        /// let rt = runtime::Builder::new_multi_thread()
1409        ///     .enable_metrics_poll_time_histogram()
1410        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1411        ///     .build()
1412        ///     .unwrap();
1413        /// # }
1414        /// ```
1415        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1416        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1417            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1418            self
1419        }
1420
1421        /// Configure the histogram for tracking poll times
1422        ///
1423        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1424        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1425        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1426        ///
1427        /// # Examples
1428        /// Configure a [`LogHistogram`] with [default configuration]:
1429        /// ```
1430        /// # #[cfg(not(target_family = "wasm"))]
1431        /// # {
1432        /// use tokio::runtime;
1433        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1434        ///
1435        /// let rt = runtime::Builder::new_multi_thread()
1436        ///     .enable_metrics_poll_time_histogram()
1437        ///     .metrics_poll_time_histogram_configuration(
1438        ///         HistogramConfiguration::log(LogHistogram::default())
1439        ///     )
1440        ///     .build()
1441        ///     .unwrap();
1442        /// # }
1443        /// ```
1444        ///
1445        /// Configure a linear histogram with 100 buckets, each 10μs wide
1446        /// ```
1447        /// # #[cfg(not(target_family = "wasm"))]
1448        /// # {
1449        /// use tokio::runtime;
1450        /// use std::time::Duration;
1451        /// use tokio::runtime::HistogramConfiguration;
1452        ///
1453        /// let rt = runtime::Builder::new_multi_thread()
1454        ///     .enable_metrics_poll_time_histogram()
1455        ///     .metrics_poll_time_histogram_configuration(
1456        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1457        ///     )
1458        ///     .build()
1459        ///     .unwrap();
1460        /// # }
1461        /// ```
1462        ///
1463        /// Configure a [`LogHistogram`] with the following settings:
1464        /// - Measure times from 100ns to 120s
1465        /// - Max error of 0.1
1466        /// - No more than 1024 buckets
1467        /// ```
1468        /// # #[cfg(not(target_family = "wasm"))]
1469        /// # {
1470        /// use std::time::Duration;
1471        /// use tokio::runtime;
1472        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1473        ///
1474        /// let rt = runtime::Builder::new_multi_thread()
1475        ///     .enable_metrics_poll_time_histogram()
1476        ///     .metrics_poll_time_histogram_configuration(
1477        ///         HistogramConfiguration::log(LogHistogram::builder()
1478        ///             .max_value(Duration::from_secs(120))
1479        ///             .min_value(Duration::from_nanos(100))
1480        ///             .max_error(0.1)
1481        ///             .max_buckets(1024)
1482        ///             .expect("configuration uses 488 buckets")
1483        ///         )
1484        ///     )
1485        ///     .build()
1486        ///     .unwrap();
1487        /// # }
1488        /// ```
1489        ///
1490        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1491        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1492        /// where each bucket is twice the size of the previous bucket.
1493        /// ```rust
1494        /// use std::time::Duration;
1495        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1496        /// let rt = tokio::runtime::Builder::new_current_thread()
1497        ///     .enable_all()
1498        ///     .enable_metrics_poll_time_histogram()
1499        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1500        ///         LogHistogram::builder()
1501        ///             .min_value(Duration::from_micros(20))
1502        ///             .max_value(Duration::from_millis(4))
1503        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1504        ///             .precision_exact(0)
1505        ///             .max_buckets(10)
1506        ///             .unwrap(),
1507        ///     ))
1508        ///     .build()
1509        ///     .unwrap();
1510        /// ```
1511        ///
1512        /// [`LogHistogram`]: crate::runtime::LogHistogram
1513        /// [default configuration]: crate::runtime::LogHistogramBuilder
1514        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1515        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1516            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1517            self
1518        }
1519
1520        /// Sets the histogram resolution for tracking the distribution of task
1521        /// poll times.
1522        ///
1523        /// The resolution is the histogram's first bucket's range. When using a
1524        /// linear histogram scale, each bucket will cover the same range. When
1525        /// using a log scale, each bucket will cover a range twice as big as
1526        /// the previous bucket. In the log case, the resolution represents the
1527        /// smallest bucket range.
1528        ///
1529        /// Note that, when using log scale, the resolution is rounded up to the
1530        /// nearest power of 2 in nanoseconds.
1531        ///
1532        /// **Default:** 100 microseconds.
1533        ///
1534        /// # Examples
1535        ///
1536        /// ```
1537        /// # #[cfg(not(target_family = "wasm"))]
1538        /// # {
1539        /// use tokio::runtime;
1540        /// use std::time::Duration;
1541        ///
1542        /// # #[allow(deprecated)]
1543        /// let rt = runtime::Builder::new_multi_thread()
1544        ///     .enable_metrics_poll_time_histogram()
1545        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1546        ///     .build()
1547        ///     .unwrap();
1548        /// # }
1549        /// ```
1550        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1551        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1552            assert!(resolution > Duration::from_secs(0));
1553            // Sanity check the argument and also make the cast below safe.
1554            assert!(resolution <= Duration::from_secs(1));
1555
1556            let resolution = resolution.as_nanos() as u64;
1557
1558            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1559            self
1560        }
1561
1562        /// Sets the number of buckets for the histogram tracking the
1563        /// distribution of task poll times.
1564        ///
1565        /// The last bucket tracks all greater values that fall out of other
1566        /// ranges. So, configuring the histogram using a linear scale,
1567        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1568        /// polls that take more than 450ms to complete.
1569        ///
1570        /// **Default:** 10
1571        ///
1572        /// # Examples
1573        ///
1574        /// ```
1575        /// # #[cfg(not(target_family = "wasm"))]
1576        /// # {
1577        /// use tokio::runtime;
1578        ///
1579        /// # #[allow(deprecated)]
1580        /// let rt = runtime::Builder::new_multi_thread()
1581        ///     .enable_metrics_poll_time_histogram()
1582        ///     .metrics_poll_count_histogram_buckets(15)
1583        ///     .build()
1584        ///     .unwrap();
1585        /// # }
1586        /// ```
1587        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1588        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1589            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1590            self
1591        }
1592    }
1593
1594    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1595        use crate::runtime::runtime::Scheduler;
1596
1597        let (scheduler, handle, blocking_pool) =
1598            self.build_current_thread_runtime_components(None)?;
1599
1600        Ok(Runtime::from_parts(
1601            Scheduler::CurrentThread(scheduler),
1602            handle,
1603            blocking_pool,
1604        ))
1605    }
1606
1607    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1608        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1609
1610        let tid = std::thread::current().id();
1611
1612        let (scheduler, handle, blocking_pool) =
1613            self.build_current_thread_runtime_components(Some(tid))?;
1614
1615        Ok(LocalRuntime::from_parts(
1616            LocalRuntimeScheduler::CurrentThread(scheduler),
1617            handle,
1618            blocking_pool,
1619        ))
1620    }
1621
1622    fn build_current_thread_runtime_components(
1623        &mut self,
1624        local_tid: Option<ThreadId>,
1625    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1626        use crate::runtime::scheduler;
1627        use crate::runtime::Config;
1628
1629        let mut cfg = self.get_cfg();
1630        cfg.timer_flavor = TimerFlavor::Traditional;
1631        let (driver, driver_handle) = driver::Driver::new(cfg)?;
1632
1633        // Blocking pool
1634        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1635        let blocking_spawner = blocking_pool.spawner().clone();
1636
1637        // Generate a rng seed for this runtime.
1638        let seed_generator_1 = self.seed_generator.next_generator();
1639        let seed_generator_2 = self.seed_generator.next_generator();
1640
1641        // And now put a single-threaded scheduler on top of the timer. When
1642        // there are no futures ready to do something, it'll let the timer or
1643        // the reactor to generate some new stimuli for the futures to continue
1644        // in their life.
1645        let (scheduler, handle) = CurrentThread::new(
1646            driver,
1647            driver_handle,
1648            blocking_spawner,
1649            seed_generator_2,
1650            Config {
1651                before_park: self.before_park.clone(),
1652                after_unpark: self.after_unpark.clone(),
1653                before_spawn: self.before_spawn.clone(),
1654                #[cfg(tokio_unstable)]
1655                before_poll: self.before_poll.clone(),
1656                #[cfg(tokio_unstable)]
1657                after_poll: self.after_poll.clone(),
1658                after_termination: self.after_termination.clone(),
1659                global_queue_interval: self.global_queue_interval,
1660                event_interval: self.event_interval,
1661                #[cfg(tokio_unstable)]
1662                unhandled_panic: self.unhandled_panic.clone(),
1663                disable_lifo_slot: self.disable_lifo_slot,
1664                seed_generator: seed_generator_1,
1665                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1666            },
1667            local_tid,
1668            self.name.clone(),
1669        );
1670
1671        let handle = Handle {
1672            inner: scheduler::Handle::CurrentThread(handle),
1673        };
1674
1675        Ok((scheduler, handle, blocking_pool))
1676    }
1677
1678    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1679        if self.metrics_poll_count_histogram_enable {
1680            Some(self.metrics_poll_count_histogram.clone())
1681        } else {
1682            None
1683        }
1684    }
1685}
1686
1687cfg_io_driver! {
1688    impl Builder {
1689        /// Enables the I/O driver.
1690        ///
1691        /// Doing this enables using net, process, signal, and some I/O types on
1692        /// the runtime.
1693        ///
1694        /// # Examples
1695        ///
1696        /// ```
1697        /// use tokio::runtime;
1698        ///
1699        /// let rt = runtime::Builder::new_multi_thread()
1700        ///     .enable_io()
1701        ///     .build()
1702        ///     .unwrap();
1703        /// ```
1704        pub fn enable_io(&mut self) -> &mut Self {
1705            self.enable_io = true;
1706            self
1707        }
1708
1709        /// Enables the I/O driver and configures the max number of events to be
1710        /// processed per tick.
1711        ///
1712        /// # Examples
1713        ///
1714        /// ```
1715        /// use tokio::runtime;
1716        ///
1717        /// let rt = runtime::Builder::new_current_thread()
1718        ///     .enable_io()
1719        ///     .max_io_events_per_tick(1024)
1720        ///     .build()
1721        ///     .unwrap();
1722        /// ```
1723        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1724            self.nevents = capacity;
1725            self
1726        }
1727    }
1728}
1729
1730cfg_time! {
1731    impl Builder {
1732        /// Enables the time driver.
1733        ///
1734        /// Doing this enables using `tokio::time` on the runtime.
1735        ///
1736        /// # Examples
1737        ///
1738        /// ```
1739        /// # #[cfg(not(target_family = "wasm"))]
1740        /// # {
1741        /// use tokio::runtime;
1742        ///
1743        /// let rt = runtime::Builder::new_multi_thread()
1744        ///     .enable_time()
1745        ///     .build()
1746        ///     .unwrap();
1747        /// # }
1748        /// ```
1749        pub fn enable_time(&mut self) -> &mut Self {
1750            self.enable_time = true;
1751            self
1752        }
1753    }
1754}
1755
1756cfg_io_uring! {
1757    impl Builder {
1758        /// Enables the tokio's io_uring driver.
1759        ///
1760        /// Doing this enables using io_uring operations on the runtime.
1761        ///
1762        /// # Examples
1763        ///
1764        /// ```
1765        /// use tokio::runtime;
1766        ///
1767        /// let rt = runtime::Builder::new_multi_thread()
1768        ///     .enable_io_uring()
1769        ///     .build()
1770        ///     .unwrap();
1771        /// ```
1772        #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1773        pub fn enable_io_uring(&mut self) -> &mut Self {
1774            // Currently, the uring flag is equivalent to `enable_io`.
1775            self.enable_io = true;
1776            self
1777        }
1778    }
1779}
1780
1781cfg_test_util! {
1782    impl Builder {
1783        /// Controls if the runtime's clock starts paused or advancing.
1784        ///
1785        /// Pausing time requires the current-thread runtime; construction of
1786        /// the runtime will panic otherwise.
1787        ///
1788        /// # Examples
1789        ///
1790        /// ```
1791        /// use tokio::runtime;
1792        ///
1793        /// let rt = runtime::Builder::new_current_thread()
1794        ///     .enable_time()
1795        ///     .start_paused(true)
1796        ///     .build()
1797        ///     .unwrap();
1798        /// ```
1799        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1800            self.start_paused = start_paused;
1801            self
1802        }
1803    }
1804}
1805
1806cfg_rt_multi_thread! {
1807    impl Builder {
1808        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1809            use crate::loom::sys::num_cpus;
1810            use crate::runtime::{Config, runtime::Scheduler};
1811            use crate::runtime::scheduler::{self, MultiThread};
1812
1813            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1814
1815            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1816
1817            // Create the blocking pool
1818            let blocking_pool =
1819                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1820            let blocking_spawner = blocking_pool.spawner().clone();
1821
1822            // Generate a rng seed for this runtime.
1823            let seed_generator_1 = self.seed_generator.next_generator();
1824            let seed_generator_2 = self.seed_generator.next_generator();
1825
1826            let (scheduler, handle, launch) = MultiThread::new(
1827                worker_threads,
1828                driver,
1829                driver_handle,
1830                blocking_spawner,
1831                seed_generator_2,
1832                Config {
1833                    before_park: self.before_park.clone(),
1834                    after_unpark: self.after_unpark.clone(),
1835                    before_spawn: self.before_spawn.clone(),
1836                    #[cfg(tokio_unstable)]
1837                    before_poll: self.before_poll.clone(),
1838                    #[cfg(tokio_unstable)]
1839                    after_poll: self.after_poll.clone(),
1840                    after_termination: self.after_termination.clone(),
1841                    global_queue_interval: self.global_queue_interval,
1842                    event_interval: self.event_interval,
1843                    #[cfg(tokio_unstable)]
1844                    unhandled_panic: self.unhandled_panic.clone(),
1845                    disable_lifo_slot: self.disable_lifo_slot,
1846                    seed_generator: seed_generator_1,
1847                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1848                },
1849                self.timer_flavor,
1850                self.name.clone(),
1851            );
1852
1853            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1854
1855            // Spawn the thread pool workers
1856            let _enter = handle.enter();
1857            launch.launch();
1858
1859            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1860        }
1861    }
1862}
1863
1864impl fmt::Debug for Builder {
1865    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1866        let mut debug = fmt.debug_struct("Builder");
1867
1868        if let Some(name) = &self.name {
1869            debug.field("name", name);
1870        }
1871
1872        debug
1873            .field("worker_threads", &self.worker_threads)
1874            .field("max_blocking_threads", &self.max_blocking_threads)
1875            .field(
1876                "thread_name",
1877                &"<dyn Fn() -> String + Send + Sync + 'static>",
1878            )
1879            .field("thread_stack_size", &self.thread_stack_size)
1880            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1881            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1882            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1883            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."));
1884
1885        if self.name.is_none() {
1886            debug.finish_non_exhaustive()
1887        } else {
1888            debug.finish()
1889        }
1890    }
1891}