tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5 blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, TaskMeta};
9
10use crate::runtime::{LocalOptions, LocalRuntime};
11use crate::util::rand::{RngSeed, RngSeedGenerator};
12
13use crate::runtime::blocking::BlockingPool;
14use crate::runtime::scheduler::CurrentThread;
15use std::fmt;
16use std::io;
17use std::thread::ThreadId;
18use std::time::Duration;
19
20/// Builds Tokio Runtime with custom configuration values.
21///
22/// Methods can be chained in order to set the configuration values. The
23/// Runtime is constructed by calling [`build`].
24///
25/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
26/// or [`Builder::new_current_thread`].
27///
28/// See function level documentation for details on the various configuration
29/// settings.
30///
31/// [`build`]: method@Self::build
32/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
33/// [`Builder::new_current_thread`]: method@Self::new_current_thread
34///
35/// # Examples
36///
37/// ```
38/// # #[cfg(not(target_family = "wasm"))]
39/// # {
40/// use tokio::runtime::Builder;
41///
42/// fn main() {
43/// // build runtime
44/// let runtime = Builder::new_multi_thread()
45/// .worker_threads(4)
46/// .thread_name("my-custom-name")
47/// .thread_stack_size(3 * 1024 * 1024)
48/// .build()
49/// .unwrap();
50///
51/// // use runtime ...
52/// }
53/// # }
54/// ```
55pub struct Builder {
56 /// Runtime type
57 kind: Kind,
58
59 /// Name of the runtime.
60 name: Option<String>,
61
62 /// Whether or not to enable the I/O driver
63 enable_io: bool,
64 nevents: usize,
65
66 /// Whether or not to enable the time driver
67 enable_time: bool,
68
69 /// Whether or not the clock should start paused.
70 start_paused: bool,
71
72 /// The number of worker threads, used by Runtime.
73 ///
74 /// Only used when not using the current-thread executor.
75 worker_threads: Option<usize>,
76
77 /// Cap on thread usage.
78 max_blocking_threads: usize,
79
80 /// Name fn used for threads spawned by the runtime.
81 pub(super) thread_name: ThreadNameFn,
82
83 /// Stack size used for threads spawned by the runtime.
84 pub(super) thread_stack_size: Option<usize>,
85
86 /// Callback to run after each thread starts.
87 pub(super) after_start: Option<Callback>,
88
89 /// To run before each worker thread stops
90 pub(super) before_stop: Option<Callback>,
91
92 /// To run before each worker thread is parked.
93 pub(super) before_park: Option<Callback>,
94
95 /// To run after each thread is unparked.
96 pub(super) after_unpark: Option<Callback>,
97
98 /// To run before each task is spawned.
99 pub(super) before_spawn: Option<TaskCallback>,
100
101 /// To run before each poll
102 #[cfg(tokio_unstable)]
103 pub(super) before_poll: Option<TaskCallback>,
104
105 /// To run after each poll
106 #[cfg(tokio_unstable)]
107 pub(super) after_poll: Option<TaskCallback>,
108
109 /// To run after each task is terminated.
110 pub(super) after_termination: Option<TaskCallback>,
111
112 /// Customizable keep alive timeout for `BlockingPool`
113 pub(super) keep_alive: Option<Duration>,
114
115 /// How many ticks before pulling a task from the global/remote queue?
116 ///
117 /// When `None`, the value is unspecified and behavior details are left to
118 /// the scheduler. Each scheduler flavor could choose to either pick its own
119 /// default value or use some other strategy to decide when to poll from the
120 /// global queue. For example, the multi-threaded scheduler uses a
121 /// self-tuning strategy based on mean task poll times.
122 pub(super) global_queue_interval: Option<u32>,
123
124 /// How many ticks before yielding to the driver for timer and I/O events?
125 pub(super) event_interval: u32,
126
127 /// When true, the multi-threade scheduler LIFO slot should not be used.
128 ///
129 /// This option should only be exposed as unstable.
130 pub(super) disable_lifo_slot: bool,
131
132 /// Specify a random number generator seed to provide deterministic results
133 pub(super) seed_generator: RngSeedGenerator,
134
135 /// When true, enables task poll count histogram instrumentation.
136 pub(super) metrics_poll_count_histogram_enable: bool,
137
138 /// Configures the task poll count histogram
139 pub(super) metrics_poll_count_histogram: HistogramBuilder,
140
141 #[cfg(tokio_unstable)]
142 pub(super) unhandled_panic: UnhandledPanic,
143
144 timer_flavor: TimerFlavor,
145}
146
147cfg_unstable! {
148 /// How the runtime should respond to unhandled panics.
149 ///
150 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
151 /// to configure the runtime behavior when a spawned task panics.
152 ///
153 /// See [`Builder::unhandled_panic`] for more details.
154 #[derive(Debug, Clone)]
155 #[non_exhaustive]
156 pub enum UnhandledPanic {
157 /// The runtime should ignore panics on spawned tasks.
158 ///
159 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
160 /// tasks continue running normally.
161 ///
162 /// This is the default behavior.
163 ///
164 /// # Examples
165 ///
166 /// ```
167 /// # #[cfg(not(target_family = "wasm"))]
168 /// # {
169 /// use tokio::runtime::{self, UnhandledPanic};
170 ///
171 /// # pub fn main() {
172 /// let rt = runtime::Builder::new_current_thread()
173 /// .unhandled_panic(UnhandledPanic::Ignore)
174 /// .build()
175 /// .unwrap();
176 ///
177 /// let task1 = rt.spawn(async { panic!("boom"); });
178 /// let task2 = rt.spawn(async {
179 /// // This task completes normally
180 /// "done"
181 /// });
182 ///
183 /// rt.block_on(async {
184 /// // The panic on the first task is forwarded to the `JoinHandle`
185 /// assert!(task1.await.is_err());
186 ///
187 /// // The second task completes normally
188 /// assert!(task2.await.is_ok());
189 /// })
190 /// # }
191 /// # }
192 /// ```
193 ///
194 /// [`JoinHandle`]: struct@crate::task::JoinHandle
195 Ignore,
196
197 /// The runtime should immediately shutdown if a spawned task panics.
198 ///
199 /// The runtime will immediately shutdown even if the panicked task's
200 /// [`JoinHandle`] is still available. All further spawned tasks will be
201 /// immediately dropped and call to [`Runtime::block_on`] will panic.
202 ///
203 /// # Examples
204 ///
205 /// ```should_panic
206 /// use tokio::runtime::{self, UnhandledPanic};
207 ///
208 /// # pub fn main() {
209 /// let rt = runtime::Builder::new_current_thread()
210 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
211 /// .build()
212 /// .unwrap();
213 ///
214 /// rt.spawn(async { panic!("boom"); });
215 /// rt.spawn(async {
216 /// // This task never completes.
217 /// });
218 ///
219 /// rt.block_on(async {
220 /// // Do some work
221 /// # loop { tokio::task::yield_now().await; }
222 /// })
223 /// # }
224 /// ```
225 ///
226 /// [`JoinHandle`]: struct@crate::task::JoinHandle
227 ShutdownRuntime,
228 }
229}
230
231pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
232
233#[derive(Clone, Copy)]
234pub(crate) enum Kind {
235 CurrentThread,
236 #[cfg(feature = "rt-multi-thread")]
237 MultiThread,
238}
239
240impl Builder {
241 /// Returns a new builder with the current thread scheduler selected.
242 ///
243 /// Configuration methods can be chained on the return value.
244 ///
245 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
246 /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`].
247 ///
248 /// [`LocalSet`]: crate::task::LocalSet
249 /// [`LocalRuntime`]: crate::runtime::LocalRuntime
250 /// [`build_local`]: crate::runtime::Builder::build_local
251 pub fn new_current_thread() -> Builder {
252 #[cfg(loom)]
253 const EVENT_INTERVAL: u32 = 4;
254 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
255 #[cfg(not(loom))]
256 const EVENT_INTERVAL: u32 = 61;
257
258 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
259 }
260
261 /// Returns a new builder with the multi thread scheduler selected.
262 ///
263 /// Configuration methods can be chained on the return value.
264 #[cfg(feature = "rt-multi-thread")]
265 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
266 pub fn new_multi_thread() -> Builder {
267 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
268 Builder::new(Kind::MultiThread, 61)
269 }
270
271 /// Returns a new runtime builder initialized with default configuration
272 /// values.
273 ///
274 /// Configuration methods can be chained on the return value.
275 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
276 Builder {
277 kind,
278
279 // Default runtime name
280 name: None,
281
282 // I/O defaults to "off"
283 enable_io: false,
284 nevents: 1024,
285
286 // Time defaults to "off"
287 enable_time: false,
288
289 // The clock starts not-paused
290 start_paused: false,
291
292 // Read from environment variable first in multi-threaded mode.
293 // Default to lazy auto-detection (one thread per CPU core)
294 worker_threads: None,
295
296 max_blocking_threads: 512,
297
298 // Default thread name
299 thread_name: std::sync::Arc::new(|| "tokio-rt-worker".into()),
300
301 // Do not set a stack size by default
302 thread_stack_size: None,
303
304 // No worker thread callbacks
305 after_start: None,
306 before_stop: None,
307 before_park: None,
308 after_unpark: None,
309
310 before_spawn: None,
311 after_termination: None,
312
313 #[cfg(tokio_unstable)]
314 before_poll: None,
315 #[cfg(tokio_unstable)]
316 after_poll: None,
317
318 keep_alive: None,
319
320 // Defaults for these values depend on the scheduler kind, so we get them
321 // as parameters.
322 global_queue_interval: None,
323 event_interval,
324
325 seed_generator: RngSeedGenerator::new(RngSeed::new()),
326
327 #[cfg(tokio_unstable)]
328 unhandled_panic: UnhandledPanic::Ignore,
329
330 metrics_poll_count_histogram_enable: false,
331
332 metrics_poll_count_histogram: HistogramBuilder::default(),
333
334 disable_lifo_slot: false,
335
336 timer_flavor: TimerFlavor::Traditional,
337 }
338 }
339
340 /// Enables both I/O and time drivers.
341 ///
342 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
343 /// individually. If additional components are added to Tokio in the future,
344 /// `enable_all` will include these future components.
345 ///
346 /// # Examples
347 ///
348 /// ```
349 /// # #[cfg(not(target_family = "wasm"))]
350 /// # {
351 /// use tokio::runtime;
352 ///
353 /// let rt = runtime::Builder::new_multi_thread()
354 /// .enable_all()
355 /// .build()
356 /// .unwrap();
357 /// # }
358 /// ```
359 pub fn enable_all(&mut self) -> &mut Self {
360 #[cfg(any(
361 feature = "net",
362 all(unix, feature = "process"),
363 all(unix, feature = "signal")
364 ))]
365 self.enable_io();
366
367 #[cfg(all(
368 tokio_unstable,
369 feature = "io-uring",
370 feature = "rt",
371 feature = "fs",
372 target_os = "linux",
373 ))]
374 self.enable_io_uring();
375
376 #[cfg(feature = "time")]
377 self.enable_time();
378
379 self
380 }
381
382 /// Enables the alternative timer implementation, which is disabled by default.
383 ///
384 /// The alternative timer implementation is an unstable feature that may
385 /// provide better performance on multi-threaded runtimes with a large number
386 /// of worker threads.
387 ///
388 /// This option only applies to multi-threaded runtimes. Attempting to use
389 /// this option with any other runtime type will have no effect.
390 ///
391 /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
392 ///
393 /// # Examples
394 ///
395 /// ```
396 /// # #[cfg(not(target_family = "wasm"))]
397 /// # {
398 /// use tokio::runtime;
399 ///
400 /// let rt = runtime::Builder::new_multi_thread()
401 /// .enable_alt_timer()
402 /// .build()
403 /// .unwrap();
404 /// # }
405 /// ```
406 #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
407 #[cfg_attr(
408 docsrs,
409 doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
410 )]
411 pub fn enable_alt_timer(&mut self) -> &mut Self {
412 self.enable_time();
413 self.timer_flavor = TimerFlavor::Alternative;
414 self
415 }
416
417 /// Sets the number of worker threads the `Runtime` will use.
418 ///
419 /// This can be any number above 0 though it is advised to keep this value
420 /// on the smaller side.
421 ///
422 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
423 ///
424 /// # Default
425 ///
426 /// The default value is the number of cores available to the system.
427 ///
428 /// When using the `current_thread` runtime this method has no effect.
429 ///
430 /// # Examples
431 ///
432 /// ## Multi threaded runtime with 4 threads
433 ///
434 /// ```
435 /// # #[cfg(not(target_family = "wasm"))]
436 /// # {
437 /// use tokio::runtime;
438 ///
439 /// // This will spawn a work-stealing runtime with 4 worker threads.
440 /// let rt = runtime::Builder::new_multi_thread()
441 /// .worker_threads(4)
442 /// .build()
443 /// .unwrap();
444 ///
445 /// rt.spawn(async move {});
446 /// # }
447 /// ```
448 ///
449 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
450 ///
451 /// ```
452 /// use tokio::runtime;
453 ///
454 /// // Create a runtime that _must_ be driven from a call
455 /// // to `Runtime::block_on`.
456 /// let rt = runtime::Builder::new_current_thread()
457 /// .build()
458 /// .unwrap();
459 ///
460 /// // This will run the runtime and future on the current thread
461 /// rt.block_on(async move {});
462 /// ```
463 ///
464 /// # Panics
465 ///
466 /// This will panic if `val` is not larger than `0`.
467 #[track_caller]
468 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
469 assert!(val > 0, "Worker threads cannot be set to 0");
470 self.worker_threads = Some(val);
471 self
472 }
473
474 /// Specifies the limit for additional threads spawned by the Runtime.
475 ///
476 /// These threads are used for blocking operations like tasks spawned
477 /// through [`spawn_blocking`], this includes but is not limited to:
478 /// - [`fs`] operations
479 /// - dns resolution through [`ToSocketAddrs`]
480 /// - writing to [`Stdout`] or [`Stderr`]
481 /// - reading from [`Stdin`]
482 ///
483 /// Unlike the [`worker_threads`], they are not always active and will exit
484 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
485 ///
486 /// It's recommended to not set this limit too low in order to avoid hanging on operations
487 /// requiring [`spawn_blocking`].
488 ///
489 /// The default value is 512.
490 ///
491 /// # Queue Behavior
492 ///
493 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
494 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
495 /// method has not been reached, a new thread will be spawned. If no idle thread is available
496 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
497 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
498 /// it could potentially grow unbounded.
499 ///
500 /// # Panics
501 ///
502 /// This will panic if `val` is not larger than `0`.
503 ///
504 /// # Upgrading from 0.x
505 ///
506 /// In old versions `max_threads` limited both blocking and worker threads, but the
507 /// current `max_blocking_threads` does not include async worker threads in the count.
508 ///
509 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
510 /// [`fs`]: mod@crate::fs
511 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
512 /// [`Stdout`]: struct@crate::io::Stdout
513 /// [`Stdin`]: struct@crate::io::Stdin
514 /// [`Stderr`]: struct@crate::io::Stderr
515 /// [`worker_threads`]: Self::worker_threads
516 /// [`thread_keep_alive`]: Self::thread_keep_alive
517 #[track_caller]
518 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
519 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
520 assert!(val > 0, "Max blocking threads cannot be set to 0");
521 self.max_blocking_threads = val;
522 self
523 }
524
525 /// Sets name of threads spawned by the `Runtime`'s thread pool.
526 ///
527 /// The default name is "tokio-rt-worker".
528 ///
529 /// # Examples
530 ///
531 /// ```
532 /// # #[cfg(not(target_family = "wasm"))]
533 /// # {
534 /// # use tokio::runtime;
535 ///
536 /// # pub fn main() {
537 /// let rt = runtime::Builder::new_multi_thread()
538 /// .thread_name("my-pool")
539 /// .build();
540 /// # }
541 /// # }
542 /// ```
543 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
544 let val = val.into();
545 self.thread_name = std::sync::Arc::new(move || val.clone());
546 self
547 }
548
549 /// Sets the name of the runtime.
550 ///
551 /// # Examples
552 ///
553 /// ```
554 /// # #[cfg(not(target_family = "wasm"))]
555 /// # {
556 /// # use tokio::runtime;
557 ///
558 /// # pub fn main() {
559 /// let rt = runtime::Builder::new_multi_thread()
560 /// .name("my-runtime")
561 /// .build();
562 /// # }
563 /// # }
564 /// ```
565 /// # Panics
566 ///
567 /// This function will panic if an empty value is passed as an argument.
568 ///
569 #[track_caller]
570 pub fn name(&mut self, val: impl Into<String>) -> &mut Self {
571 let val = val.into();
572 assert!(!val.trim().is_empty(), "runtime name shouldn't be empty");
573 self.name = Some(val);
574 self
575 }
576
577 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
578 ///
579 /// The default name fn is `|| "tokio-rt-worker".into()`.
580 ///
581 /// # Examples
582 ///
583 /// ```
584 /// # #[cfg(not(target_family = "wasm"))]
585 /// # {
586 /// # use tokio::runtime;
587 /// # use std::sync::atomic::{AtomicUsize, Ordering};
588 /// # pub fn main() {
589 /// let rt = runtime::Builder::new_multi_thread()
590 /// .thread_name_fn(|| {
591 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
592 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
593 /// format!("my-pool-{}", id)
594 /// })
595 /// .build();
596 /// # }
597 /// # }
598 /// ```
599 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
600 where
601 F: Fn() -> String + Send + Sync + 'static,
602 {
603 self.thread_name = std::sync::Arc::new(f);
604 self
605 }
606
607 /// Sets the stack size (in bytes) for worker threads.
608 ///
609 /// The actual stack size may be greater than this value if the platform
610 /// specifies minimal stack size.
611 ///
612 /// The default stack size for spawned threads is 2 MiB, though this
613 /// particular stack size is subject to change in the future.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// # #[cfg(not(target_family = "wasm"))]
619 /// # {
620 /// # use tokio::runtime;
621 ///
622 /// # pub fn main() {
623 /// let rt = runtime::Builder::new_multi_thread()
624 /// .thread_stack_size(32 * 1024)
625 /// .build();
626 /// # }
627 /// # }
628 /// ```
629 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
630 self.thread_stack_size = Some(val);
631 self
632 }
633
634 /// Executes function `f` after each thread is started but before it starts
635 /// doing work.
636 ///
637 /// This is intended for bookkeeping and monitoring use cases.
638 ///
639 /// # Examples
640 ///
641 /// ```
642 /// # #[cfg(not(target_family = "wasm"))]
643 /// # {
644 /// # use tokio::runtime;
645 /// # pub fn main() {
646 /// let runtime = runtime::Builder::new_multi_thread()
647 /// .on_thread_start(|| {
648 /// println!("thread started");
649 /// })
650 /// .build();
651 /// # }
652 /// # }
653 /// ```
654 #[cfg(not(loom))]
655 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
656 where
657 F: Fn() + Send + Sync + 'static,
658 {
659 self.after_start = Some(std::sync::Arc::new(f));
660 self
661 }
662
663 /// Executes function `f` before each thread stops.
664 ///
665 /// This is intended for bookkeeping and monitoring use cases.
666 ///
667 /// # Examples
668 ///
669 /// ```
670 /// # #[cfg(not(target_family = "wasm"))]
671 /// {
672 /// # use tokio::runtime;
673 /// # pub fn main() {
674 /// let runtime = runtime::Builder::new_multi_thread()
675 /// .on_thread_stop(|| {
676 /// println!("thread stopping");
677 /// })
678 /// .build();
679 /// # }
680 /// # }
681 /// ```
682 #[cfg(not(loom))]
683 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
684 where
685 F: Fn() + Send + Sync + 'static,
686 {
687 self.before_stop = Some(std::sync::Arc::new(f));
688 self
689 }
690
691 /// Executes function `f` just before a thread is parked (goes idle).
692 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
693 /// can be called, and may result in this thread being unparked immediately.
694 ///
695 /// This can be used to start work only when the executor is idle, or for bookkeeping
696 /// and monitoring purposes.
697 ///
698 /// Note: There can only be one park callback for a runtime; calling this function
699 /// more than once replaces the last callback defined, rather than adding to it.
700 ///
701 /// # Examples
702 ///
703 /// ## Multithreaded executor
704 /// ```
705 /// # #[cfg(not(target_family = "wasm"))]
706 /// # {
707 /// # use std::sync::Arc;
708 /// # use std::sync::atomic::{AtomicBool, Ordering};
709 /// # use tokio::runtime;
710 /// # use tokio::sync::Barrier;
711 /// # pub fn main() {
712 /// let once = AtomicBool::new(true);
713 /// let barrier = Arc::new(Barrier::new(2));
714 ///
715 /// let runtime = runtime::Builder::new_multi_thread()
716 /// .worker_threads(1)
717 /// .on_thread_park({
718 /// let barrier = barrier.clone();
719 /// move || {
720 /// let barrier = barrier.clone();
721 /// if once.swap(false, Ordering::Relaxed) {
722 /// tokio::spawn(async move { barrier.wait().await; });
723 /// }
724 /// }
725 /// })
726 /// .build()
727 /// .unwrap();
728 ///
729 /// runtime.block_on(async {
730 /// barrier.wait().await;
731 /// })
732 /// # }
733 /// # }
734 /// ```
735 /// ## Current thread executor
736 /// ```
737 /// # use std::sync::Arc;
738 /// # use std::sync::atomic::{AtomicBool, Ordering};
739 /// # use tokio::runtime;
740 /// # use tokio::sync::Barrier;
741 /// # pub fn main() {
742 /// let once = AtomicBool::new(true);
743 /// let barrier = Arc::new(Barrier::new(2));
744 ///
745 /// let runtime = runtime::Builder::new_current_thread()
746 /// .on_thread_park({
747 /// let barrier = barrier.clone();
748 /// move || {
749 /// let barrier = barrier.clone();
750 /// if once.swap(false, Ordering::Relaxed) {
751 /// tokio::spawn(async move { barrier.wait().await; });
752 /// }
753 /// }
754 /// })
755 /// .build()
756 /// .unwrap();
757 ///
758 /// runtime.block_on(async {
759 /// barrier.wait().await;
760 /// })
761 /// # }
762 /// ```
763 #[cfg(not(loom))]
764 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
765 where
766 F: Fn() + Send + Sync + 'static,
767 {
768 self.before_park = Some(std::sync::Arc::new(f));
769 self
770 }
771
772 /// Executes function `f` just after a thread unparks (starts executing tasks).
773 ///
774 /// This is intended for bookkeeping and monitoring use cases; note that work
775 /// in this callback will increase latencies when the application has allowed one or
776 /// more runtime threads to go idle.
777 ///
778 /// Note: There can only be one unpark callback for a runtime; calling this function
779 /// more than once replaces the last callback defined, rather than adding to it.
780 ///
781 /// # Examples
782 ///
783 /// ```
784 /// # #[cfg(not(target_family = "wasm"))]
785 /// # {
786 /// # use tokio::runtime;
787 /// # pub fn main() {
788 /// let runtime = runtime::Builder::new_multi_thread()
789 /// .on_thread_unpark(|| {
790 /// println!("thread unparking");
791 /// })
792 /// .build();
793 ///
794 /// runtime.unwrap().block_on(async {
795 /// tokio::task::yield_now().await;
796 /// println!("Hello from Tokio!");
797 /// })
798 /// # }
799 /// # }
800 /// ```
801 #[cfg(not(loom))]
802 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
803 where
804 F: Fn() + Send + Sync + 'static,
805 {
806 self.after_unpark = Some(std::sync::Arc::new(f));
807 self
808 }
809
810 /// Executes function `f` just before a task is spawned.
811 ///
812 /// `f` is called within the Tokio context, so functions like
813 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
814 /// invoked immediately.
815 ///
816 /// This can be used for bookkeeping or monitoring purposes.
817 ///
818 /// Note: There can only be one spawn callback for a runtime; calling this function more
819 /// than once replaces the last callback defined, rather than adding to it.
820 ///
821 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
822 ///
823 /// **Note**: This is an [unstable API][unstable]. The public API of this type
824 /// may break in 1.x releases. See [the documentation on unstable
825 /// features][unstable] for details.
826 ///
827 /// [unstable]: crate#unstable-features
828 ///
829 /// # Examples
830 ///
831 /// ```
832 /// # use tokio::runtime;
833 /// # pub fn main() {
834 /// let runtime = runtime::Builder::new_current_thread()
835 /// .on_task_spawn(|_| {
836 /// println!("spawning task");
837 /// })
838 /// .build()
839 /// .unwrap();
840 ///
841 /// runtime.block_on(async {
842 /// tokio::task::spawn(std::future::ready(()));
843 ///
844 /// for _ in 0..64 {
845 /// tokio::task::yield_now().await;
846 /// }
847 /// })
848 /// # }
849 /// ```
850 #[cfg(all(not(loom), tokio_unstable))]
851 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
852 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
853 where
854 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
855 {
856 self.before_spawn = Some(std::sync::Arc::new(f));
857 self
858 }
859
860 /// Executes function `f` just before a task is polled
861 ///
862 /// `f` is called within the Tokio context, so functions like
863 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
864 /// invoked immediately.
865 ///
866 /// **Note**: This is an [unstable API][unstable]. The public API of this type
867 /// may break in 1.x releases. See [the documentation on unstable
868 /// features][unstable] for details.
869 ///
870 /// [unstable]: crate#unstable-features
871 ///
872 /// # Examples
873 ///
874 /// ```
875 /// # #[cfg(not(target_family = "wasm"))]
876 /// # {
877 /// # use std::sync::{atomic::AtomicUsize, Arc};
878 /// # use tokio::task::yield_now;
879 /// # pub fn main() {
880 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
881 /// let poll_start = poll_start_counter.clone();
882 /// let rt = tokio::runtime::Builder::new_multi_thread()
883 /// .enable_all()
884 /// .on_before_task_poll(move |meta| {
885 /// println!("task {} is about to be polled", meta.id())
886 /// })
887 /// .build()
888 /// .unwrap();
889 /// let task = rt.spawn(async {
890 /// yield_now().await;
891 /// });
892 /// let _ = rt.block_on(task);
893 ///
894 /// # }
895 /// # }
896 /// ```
897 #[cfg(tokio_unstable)]
898 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
899 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
900 where
901 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
902 {
903 self.before_poll = Some(std::sync::Arc::new(f));
904 self
905 }
906
907 /// Executes function `f` just after a task is polled
908 ///
909 /// `f` is called within the Tokio context, so functions like
910 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
911 /// invoked immediately.
912 ///
913 /// **Note**: This is an [unstable API][unstable]. The public API of this type
914 /// may break in 1.x releases. See [the documentation on unstable
915 /// features][unstable] for details.
916 ///
917 /// [unstable]: crate#unstable-features
918 ///
919 /// # Examples
920 ///
921 /// ```
922 /// # #[cfg(not(target_family = "wasm"))]
923 /// # {
924 /// # use std::sync::{atomic::AtomicUsize, Arc};
925 /// # use tokio::task::yield_now;
926 /// # pub fn main() {
927 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
928 /// let poll_stop = poll_stop_counter.clone();
929 /// let rt = tokio::runtime::Builder::new_multi_thread()
930 /// .enable_all()
931 /// .on_after_task_poll(move |meta| {
932 /// println!("task {} completed polling", meta.id());
933 /// })
934 /// .build()
935 /// .unwrap();
936 /// let task = rt.spawn(async {
937 /// yield_now().await;
938 /// });
939 /// let _ = rt.block_on(task);
940 ///
941 /// # }
942 /// # }
943 /// ```
944 #[cfg(tokio_unstable)]
945 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
946 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
947 where
948 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
949 {
950 self.after_poll = Some(std::sync::Arc::new(f));
951 self
952 }
953
954 /// Executes function `f` just after a task is terminated.
955 ///
956 /// `f` is called within the Tokio context, so functions like
957 /// [`tokio::spawn`](crate::spawn) can be called.
958 ///
959 /// This can be used for bookkeeping or monitoring purposes.
960 ///
961 /// Note: There can only be one task termination callback for a runtime; calling this
962 /// function more than once replaces the last callback defined, rather than adding to it.
963 ///
964 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
965 ///
966 /// **Note**: This is an [unstable API][unstable]. The public API of this type
967 /// may break in 1.x releases. See [the documentation on unstable
968 /// features][unstable] for details.
969 ///
970 /// [unstable]: crate#unstable-features
971 ///
972 /// # Examples
973 ///
974 /// ```
975 /// # use tokio::runtime;
976 /// # pub fn main() {
977 /// let runtime = runtime::Builder::new_current_thread()
978 /// .on_task_terminate(|_| {
979 /// println!("killing task");
980 /// })
981 /// .build()
982 /// .unwrap();
983 ///
984 /// runtime.block_on(async {
985 /// tokio::task::spawn(std::future::ready(()));
986 ///
987 /// for _ in 0..64 {
988 /// tokio::task::yield_now().await;
989 /// }
990 /// })
991 /// # }
992 /// ```
993 #[cfg(all(not(loom), tokio_unstable))]
994 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
995 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
996 where
997 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
998 {
999 self.after_termination = Some(std::sync::Arc::new(f));
1000 self
1001 }
1002
1003 /// Creates the configured `Runtime`.
1004 ///
1005 /// The returned `Runtime` instance is ready to spawn tasks.
1006 ///
1007 /// # Examples
1008 ///
1009 /// ```
1010 /// # #[cfg(not(target_family = "wasm"))]
1011 /// # {
1012 /// use tokio::runtime::Builder;
1013 ///
1014 /// let rt = Builder::new_multi_thread().build().unwrap();
1015 ///
1016 /// rt.block_on(async {
1017 /// println!("Hello from the Tokio runtime");
1018 /// });
1019 /// # }
1020 /// ```
1021 pub fn build(&mut self) -> io::Result<Runtime> {
1022 match &self.kind {
1023 Kind::CurrentThread => self.build_current_thread_runtime(),
1024 #[cfg(feature = "rt-multi-thread")]
1025 Kind::MultiThread => self.build_threaded_runtime(),
1026 }
1027 }
1028
1029 /// Creates the configured [`LocalRuntime`].
1030 ///
1031 /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
1032 ///
1033 /// # Panics
1034 ///
1035 /// This will panic if the runtime is configured with [`new_multi_thread()`].
1036 ///
1037 /// [`new_multi_thread()`]: Builder::new_multi_thread
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```
1042 /// use tokio::runtime::{Builder, LocalOptions};
1043 ///
1044 /// let rt = Builder::new_current_thread()
1045 /// .build_local(LocalOptions::default())
1046 /// .unwrap();
1047 ///
1048 /// rt.spawn_local(async {
1049 /// println!("Hello from the Tokio runtime");
1050 /// });
1051 /// ```
1052 #[allow(unused_variables, unreachable_patterns)]
1053 pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1054 match &self.kind {
1055 Kind::CurrentThread => self.build_current_thread_local_runtime(),
1056 #[cfg(feature = "rt-multi-thread")]
1057 Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1058 }
1059 }
1060
1061 fn get_cfg(&self) -> driver::Cfg {
1062 driver::Cfg {
1063 enable_pause_time: match self.kind {
1064 Kind::CurrentThread => true,
1065 #[cfg(feature = "rt-multi-thread")]
1066 Kind::MultiThread => false,
1067 },
1068 enable_io: self.enable_io,
1069 enable_time: self.enable_time,
1070 start_paused: self.start_paused,
1071 nevents: self.nevents,
1072 timer_flavor: self.timer_flavor,
1073 }
1074 }
1075
1076 /// Sets a custom timeout for a thread in the blocking pool.
1077 ///
1078 /// By default, the timeout for a thread is set to 10 seconds. This can
1079 /// be overridden using `.thread_keep_alive()`.
1080 ///
1081 /// # Example
1082 ///
1083 /// ```
1084 /// # #[cfg(not(target_family = "wasm"))]
1085 /// # {
1086 /// # use tokio::runtime;
1087 /// # use std::time::Duration;
1088 /// # pub fn main() {
1089 /// let rt = runtime::Builder::new_multi_thread()
1090 /// .thread_keep_alive(Duration::from_millis(100))
1091 /// .build();
1092 /// # }
1093 /// # }
1094 /// ```
1095 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1096 self.keep_alive = Some(duration);
1097 self
1098 }
1099
1100 /// Sets the number of scheduler ticks after which the scheduler will poll the global
1101 /// task queue.
1102 ///
1103 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1104 ///
1105 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1106 /// [the module documentation] for the default behavior of the multi-thread scheduler.
1107 ///
1108 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1109 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1110 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1111 /// getting started on new work, especially if tasks frequently yield rather than complete
1112 /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1113 /// tasks from the local queue will be executed only if the global queue is empty.
1114 /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1115 /// tasks quickly complete polling.
1116 ///
1117 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1118 ///
1119 /// # Panics
1120 ///
1121 /// This function will panic if 0 is passed as an argument.
1122 ///
1123 /// # Examples
1124 ///
1125 /// ```
1126 /// # #[cfg(not(target_family = "wasm"))]
1127 /// # {
1128 /// # use tokio::runtime;
1129 /// # pub fn main() {
1130 /// let rt = runtime::Builder::new_multi_thread()
1131 /// .global_queue_interval(31)
1132 /// .build();
1133 /// # }
1134 /// # }
1135 /// ```
1136 #[track_caller]
1137 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1138 assert!(val > 0, "global_queue_interval must be greater than 0");
1139 self.global_queue_interval = Some(val);
1140 self
1141 }
1142
1143 /// Sets the number of scheduler ticks after which the scheduler will poll for
1144 /// external events (timers, I/O, and so on).
1145 ///
1146 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1147 ///
1148 /// By default, the event interval is `61` for all scheduler types.
1149 ///
1150 /// Setting the event interval determines the effective "priority" of delivering
1151 /// these external events (which may wake up additional tasks), compared to
1152 /// executing tasks that are currently ready to run. A smaller value is useful
1153 /// when tasks frequently spend a long time in polling, or infrequently yield,
1154 /// which can result in overly long delays picking up I/O events. Conversely,
1155 /// picking up new events requires extra synchronization and syscall overhead,
1156 /// so if tasks generally complete their polling quickly, a higher event interval
1157 /// will minimize that overhead while still keeping the scheduler responsive to
1158 /// events.
1159 ///
1160 /// # Panics
1161 ///
1162 /// This function will panic if 0 is passed as an argument.
1163 ///
1164 /// # Examples
1165 ///
1166 /// ```
1167 /// # #[cfg(not(target_family = "wasm"))]
1168 /// # {
1169 /// # use tokio::runtime;
1170 /// # pub fn main() {
1171 /// let rt = runtime::Builder::new_multi_thread()
1172 /// .event_interval(31)
1173 /// .build();
1174 /// # }
1175 /// # }
1176 /// ```
1177 #[track_caller]
1178 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1179 assert!(val > 0, "event_interval must be greater than 0");
1180 self.event_interval = val;
1181 self
1182 }
1183
1184 cfg_unstable! {
1185 /// Configure how the runtime responds to an unhandled panic on a
1186 /// spawned task.
1187 ///
1188 /// By default, an unhandled panic (i.e. a panic not caught by
1189 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1190 /// execution. The panic's error value is forwarded to the task's
1191 /// [`JoinHandle`] and all other spawned tasks continue running.
1192 ///
1193 /// The `unhandled_panic` option enables configuring this behavior.
1194 ///
1195 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1196 /// spawned tasks have no impact on the runtime's execution.
1197 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1198 /// shutdown immediately when a spawned task panics even if that
1199 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1200 /// will immediately terminate and further calls to
1201 /// [`Runtime::block_on`] will panic.
1202 ///
1203 /// # Panics
1204 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1205 /// on a runtime other than the current thread runtime.
1206 ///
1207 /// # Unstable
1208 ///
1209 /// This option is currently unstable and its implementation is
1210 /// incomplete. The API may change or be removed in the future. See
1211 /// issue [tokio-rs/tokio#4516] for more details.
1212 ///
1213 /// # Examples
1214 ///
1215 /// The following demonstrates a runtime configured to shutdown on
1216 /// panic. The first spawned task panics and results in the runtime
1217 /// shutting down. The second spawned task never has a chance to
1218 /// execute. The call to `block_on` will panic due to the runtime being
1219 /// forcibly shutdown.
1220 ///
1221 /// ```should_panic
1222 /// use tokio::runtime::{self, UnhandledPanic};
1223 ///
1224 /// # pub fn main() {
1225 /// let rt = runtime::Builder::new_current_thread()
1226 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1227 /// .build()
1228 /// .unwrap();
1229 ///
1230 /// rt.spawn(async { panic!("boom"); });
1231 /// rt.spawn(async {
1232 /// // This task never completes.
1233 /// });
1234 ///
1235 /// rt.block_on(async {
1236 /// // Do some work
1237 /// # loop { tokio::task::yield_now().await; }
1238 /// })
1239 /// # }
1240 /// ```
1241 ///
1242 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1243 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1244 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1245 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1246 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1247 }
1248
1249 self.unhandled_panic = behavior;
1250 self
1251 }
1252
1253 /// Disables the LIFO task scheduler heuristic.
1254 ///
1255 /// The multi-threaded scheduler includes a heuristic for optimizing
1256 /// message-passing patterns. This heuristic results in the **last**
1257 /// scheduled task being polled first.
1258 ///
1259 /// To implement this heuristic, each worker thread has a slot which
1260 /// holds the task that should be polled next. In earlier versions of
1261 /// Tokio, this slot could not be stolen by other worker threads, which
1262 /// can result in lower total throughput when tasks tend to have longer
1263 /// poll times.
1264 ///
1265 /// This configuration option will disable this heuristic resulting in
1266 /// all scheduled tasks being pushed into the worker-local queue. This
1267 /// was intended as a workaround for the LIFO slot not being stealable.
1268 /// As of Tokio 1.51, tasks can be stolen from the LIFO slot. In a
1269 /// future version, this option may be deprecated.
1270 ///
1271 /// # Unstable
1272 ///
1273 /// This configuration option was considered a workaround for the LIFO
1274 /// slot not being stealable. Since this is no longer the case, we will
1275 /// revisit whether or not this option is necessary. See
1276 /// issue [tokio-rs/tokio#4941].
1277 ///
1278 /// # Examples
1279 ///
1280 /// ```
1281 /// # #[cfg(not(target_family = "wasm"))]
1282 /// # {
1283 /// use tokio::runtime;
1284 ///
1285 /// let rt = runtime::Builder::new_multi_thread()
1286 /// .disable_lifo_slot()
1287 /// .build()
1288 /// .unwrap();
1289 /// # }
1290 /// ```
1291 ///
1292 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1293 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1294 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1295 self.disable_lifo_slot = true;
1296 self
1297 }
1298
1299 /// Specifies the random number generation seed to use within all
1300 /// threads associated with the runtime being built.
1301 ///
1302 /// This option is intended to make certain parts of the runtime
1303 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1304 /// [`tokio::select!`] it will ensure that the order that branches are
1305 /// polled is deterministic.
1306 ///
1307 /// In addition to the code specifying `rng_seed` and interacting with
1308 /// the runtime, the internals of Tokio and the Rust compiler may affect
1309 /// the sequences of random numbers. In order to ensure repeatable
1310 /// results, the version of Tokio, the versions of all other
1311 /// dependencies that interact with Tokio, and the Rust compiler version
1312 /// should also all remain constant.
1313 ///
1314 /// # Examples
1315 ///
1316 /// ```
1317 /// # use tokio::runtime::{self, RngSeed};
1318 /// # pub fn main() {
1319 /// let seed = RngSeed::from_bytes(b"place your seed here");
1320 /// let rt = runtime::Builder::new_current_thread()
1321 /// .rng_seed(seed)
1322 /// .build();
1323 /// # }
1324 /// ```
1325 ///
1326 /// [`tokio::select!`]: crate::select
1327 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1328 self.seed_generator = RngSeedGenerator::new(seed);
1329 self
1330 }
1331 }
1332
1333 cfg_unstable_metrics! {
1334 /// Enables tracking the distribution of task poll times.
1335 ///
1336 /// Task poll times are not instrumented by default as doing so requires
1337 /// calling [`Instant::now()`] twice per task poll, which could add
1338 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1339 /// metrics data.
1340 ///
1341 /// The histogram uses fixed bucket sizes. In other words, the histogram
1342 /// buckets are not dynamic based on input values. Use the
1343 /// `metrics_poll_time_histogram` builder methods to configure the
1344 /// histogram details.
1345 ///
1346 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1347 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1348 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1349 /// to select [`LogHistogram`] instead.
1350 ///
1351 /// # Examples
1352 ///
1353 /// ```
1354 /// # #[cfg(not(target_family = "wasm"))]
1355 /// # {
1356 /// use tokio::runtime;
1357 ///
1358 /// let rt = runtime::Builder::new_multi_thread()
1359 /// .enable_metrics_poll_time_histogram()
1360 /// .build()
1361 /// .unwrap();
1362 /// # // Test default values here
1363 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1364 /// # let m = rt.handle().metrics();
1365 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1366 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1367 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1368 /// # }
1369 /// ```
1370 ///
1371 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1372 /// [`Instant::now()`]: std::time::Instant::now
1373 /// [`LogHistogram`]: crate::runtime::LogHistogram
1374 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1375 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1376 self.metrics_poll_count_histogram_enable = true;
1377 self
1378 }
1379
1380 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1381 ///
1382 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1383 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1384 #[doc(hidden)]
1385 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1386 self.enable_metrics_poll_time_histogram()
1387 }
1388
1389 /// Sets the histogram scale for tracking the distribution of task poll
1390 /// times.
1391 ///
1392 /// Tracking the distribution of task poll times can be done using a
1393 /// linear or log scale. When using linear scale, each histogram bucket
1394 /// will represent the same range of poll times. When using log scale,
1395 /// each histogram bucket will cover a range twice as big as the
1396 /// previous bucket.
1397 ///
1398 /// **Default:** linear scale.
1399 ///
1400 /// # Examples
1401 ///
1402 /// ```
1403 /// # #[cfg(not(target_family = "wasm"))]
1404 /// # {
1405 /// use tokio::runtime::{self, HistogramScale};
1406 ///
1407 /// # #[allow(deprecated)]
1408 /// let rt = runtime::Builder::new_multi_thread()
1409 /// .enable_metrics_poll_time_histogram()
1410 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1411 /// .build()
1412 /// .unwrap();
1413 /// # }
1414 /// ```
1415 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1416 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1417 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1418 self
1419 }
1420
1421 /// Configure the histogram for tracking poll times
1422 ///
1423 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1424 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1425 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1426 ///
1427 /// # Examples
1428 /// Configure a [`LogHistogram`] with [default configuration]:
1429 /// ```
1430 /// # #[cfg(not(target_family = "wasm"))]
1431 /// # {
1432 /// use tokio::runtime;
1433 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1434 ///
1435 /// let rt = runtime::Builder::new_multi_thread()
1436 /// .enable_metrics_poll_time_histogram()
1437 /// .metrics_poll_time_histogram_configuration(
1438 /// HistogramConfiguration::log(LogHistogram::default())
1439 /// )
1440 /// .build()
1441 /// .unwrap();
1442 /// # }
1443 /// ```
1444 ///
1445 /// Configure a linear histogram with 100 buckets, each 10μs wide
1446 /// ```
1447 /// # #[cfg(not(target_family = "wasm"))]
1448 /// # {
1449 /// use tokio::runtime;
1450 /// use std::time::Duration;
1451 /// use tokio::runtime::HistogramConfiguration;
1452 ///
1453 /// let rt = runtime::Builder::new_multi_thread()
1454 /// .enable_metrics_poll_time_histogram()
1455 /// .metrics_poll_time_histogram_configuration(
1456 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1457 /// )
1458 /// .build()
1459 /// .unwrap();
1460 /// # }
1461 /// ```
1462 ///
1463 /// Configure a [`LogHistogram`] with the following settings:
1464 /// - Measure times from 100ns to 120s
1465 /// - Max error of 0.1
1466 /// - No more than 1024 buckets
1467 /// ```
1468 /// # #[cfg(not(target_family = "wasm"))]
1469 /// # {
1470 /// use std::time::Duration;
1471 /// use tokio::runtime;
1472 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1473 ///
1474 /// let rt = runtime::Builder::new_multi_thread()
1475 /// .enable_metrics_poll_time_histogram()
1476 /// .metrics_poll_time_histogram_configuration(
1477 /// HistogramConfiguration::log(LogHistogram::builder()
1478 /// .max_value(Duration::from_secs(120))
1479 /// .min_value(Duration::from_nanos(100))
1480 /// .max_error(0.1)
1481 /// .max_buckets(1024)
1482 /// .expect("configuration uses 488 buckets")
1483 /// )
1484 /// )
1485 /// .build()
1486 /// .unwrap();
1487 /// # }
1488 /// ```
1489 ///
1490 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1491 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1492 /// where each bucket is twice the size of the previous bucket.
1493 /// ```rust
1494 /// use std::time::Duration;
1495 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1496 /// let rt = tokio::runtime::Builder::new_current_thread()
1497 /// .enable_all()
1498 /// .enable_metrics_poll_time_histogram()
1499 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1500 /// LogHistogram::builder()
1501 /// .min_value(Duration::from_micros(20))
1502 /// .max_value(Duration::from_millis(4))
1503 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1504 /// .precision_exact(0)
1505 /// .max_buckets(10)
1506 /// .unwrap(),
1507 /// ))
1508 /// .build()
1509 /// .unwrap();
1510 /// ```
1511 ///
1512 /// [`LogHistogram`]: crate::runtime::LogHistogram
1513 /// [default configuration]: crate::runtime::LogHistogramBuilder
1514 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1515 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1516 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1517 self
1518 }
1519
1520 /// Sets the histogram resolution for tracking the distribution of task
1521 /// poll times.
1522 ///
1523 /// The resolution is the histogram's first bucket's range. When using a
1524 /// linear histogram scale, each bucket will cover the same range. When
1525 /// using a log scale, each bucket will cover a range twice as big as
1526 /// the previous bucket. In the log case, the resolution represents the
1527 /// smallest bucket range.
1528 ///
1529 /// Note that, when using log scale, the resolution is rounded up to the
1530 /// nearest power of 2 in nanoseconds.
1531 ///
1532 /// **Default:** 100 microseconds.
1533 ///
1534 /// # Examples
1535 ///
1536 /// ```
1537 /// # #[cfg(not(target_family = "wasm"))]
1538 /// # {
1539 /// use tokio::runtime;
1540 /// use std::time::Duration;
1541 ///
1542 /// # #[allow(deprecated)]
1543 /// let rt = runtime::Builder::new_multi_thread()
1544 /// .enable_metrics_poll_time_histogram()
1545 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1546 /// .build()
1547 /// .unwrap();
1548 /// # }
1549 /// ```
1550 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1551 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1552 assert!(resolution > Duration::from_secs(0));
1553 // Sanity check the argument and also make the cast below safe.
1554 assert!(resolution <= Duration::from_secs(1));
1555
1556 let resolution = resolution.as_nanos() as u64;
1557
1558 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1559 self
1560 }
1561
1562 /// Sets the number of buckets for the histogram tracking the
1563 /// distribution of task poll times.
1564 ///
1565 /// The last bucket tracks all greater values that fall out of other
1566 /// ranges. So, configuring the histogram using a linear scale,
1567 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1568 /// polls that take more than 450ms to complete.
1569 ///
1570 /// **Default:** 10
1571 ///
1572 /// # Examples
1573 ///
1574 /// ```
1575 /// # #[cfg(not(target_family = "wasm"))]
1576 /// # {
1577 /// use tokio::runtime;
1578 ///
1579 /// # #[allow(deprecated)]
1580 /// let rt = runtime::Builder::new_multi_thread()
1581 /// .enable_metrics_poll_time_histogram()
1582 /// .metrics_poll_count_histogram_buckets(15)
1583 /// .build()
1584 /// .unwrap();
1585 /// # }
1586 /// ```
1587 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1588 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1589 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1590 self
1591 }
1592 }
1593
1594 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1595 use crate::runtime::runtime::Scheduler;
1596
1597 let (scheduler, handle, blocking_pool) =
1598 self.build_current_thread_runtime_components(None)?;
1599
1600 Ok(Runtime::from_parts(
1601 Scheduler::CurrentThread(scheduler),
1602 handle,
1603 blocking_pool,
1604 ))
1605 }
1606
1607 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1608 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1609
1610 let tid = std::thread::current().id();
1611
1612 let (scheduler, handle, blocking_pool) =
1613 self.build_current_thread_runtime_components(Some(tid))?;
1614
1615 Ok(LocalRuntime::from_parts(
1616 LocalRuntimeScheduler::CurrentThread(scheduler),
1617 handle,
1618 blocking_pool,
1619 ))
1620 }
1621
1622 fn build_current_thread_runtime_components(
1623 &mut self,
1624 local_tid: Option<ThreadId>,
1625 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1626 use crate::runtime::scheduler;
1627 use crate::runtime::Config;
1628
1629 let mut cfg = self.get_cfg();
1630 cfg.timer_flavor = TimerFlavor::Traditional;
1631 let (driver, driver_handle) = driver::Driver::new(cfg)?;
1632
1633 // Blocking pool
1634 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1635 let blocking_spawner = blocking_pool.spawner().clone();
1636
1637 // Generate a rng seed for this runtime.
1638 let seed_generator_1 = self.seed_generator.next_generator();
1639 let seed_generator_2 = self.seed_generator.next_generator();
1640
1641 // And now put a single-threaded scheduler on top of the timer. When
1642 // there are no futures ready to do something, it'll let the timer or
1643 // the reactor to generate some new stimuli for the futures to continue
1644 // in their life.
1645 let (scheduler, handle) = CurrentThread::new(
1646 driver,
1647 driver_handle,
1648 blocking_spawner,
1649 seed_generator_2,
1650 Config {
1651 before_park: self.before_park.clone(),
1652 after_unpark: self.after_unpark.clone(),
1653 before_spawn: self.before_spawn.clone(),
1654 #[cfg(tokio_unstable)]
1655 before_poll: self.before_poll.clone(),
1656 #[cfg(tokio_unstable)]
1657 after_poll: self.after_poll.clone(),
1658 after_termination: self.after_termination.clone(),
1659 global_queue_interval: self.global_queue_interval,
1660 event_interval: self.event_interval,
1661 #[cfg(tokio_unstable)]
1662 unhandled_panic: self.unhandled_panic.clone(),
1663 disable_lifo_slot: self.disable_lifo_slot,
1664 seed_generator: seed_generator_1,
1665 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1666 },
1667 local_tid,
1668 self.name.clone(),
1669 );
1670
1671 let handle = Handle {
1672 inner: scheduler::Handle::CurrentThread(handle),
1673 };
1674
1675 Ok((scheduler, handle, blocking_pool))
1676 }
1677
1678 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1679 if self.metrics_poll_count_histogram_enable {
1680 Some(self.metrics_poll_count_histogram.clone())
1681 } else {
1682 None
1683 }
1684 }
1685}
1686
1687cfg_io_driver! {
1688 impl Builder {
1689 /// Enables the I/O driver.
1690 ///
1691 /// Doing this enables using net, process, signal, and some I/O types on
1692 /// the runtime.
1693 ///
1694 /// # Examples
1695 ///
1696 /// ```
1697 /// use tokio::runtime;
1698 ///
1699 /// let rt = runtime::Builder::new_multi_thread()
1700 /// .enable_io()
1701 /// .build()
1702 /// .unwrap();
1703 /// ```
1704 pub fn enable_io(&mut self) -> &mut Self {
1705 self.enable_io = true;
1706 self
1707 }
1708
1709 /// Enables the I/O driver and configures the max number of events to be
1710 /// processed per tick.
1711 ///
1712 /// # Examples
1713 ///
1714 /// ```
1715 /// use tokio::runtime;
1716 ///
1717 /// let rt = runtime::Builder::new_current_thread()
1718 /// .enable_io()
1719 /// .max_io_events_per_tick(1024)
1720 /// .build()
1721 /// .unwrap();
1722 /// ```
1723 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1724 self.nevents = capacity;
1725 self
1726 }
1727 }
1728}
1729
1730cfg_time! {
1731 impl Builder {
1732 /// Enables the time driver.
1733 ///
1734 /// Doing this enables using `tokio::time` on the runtime.
1735 ///
1736 /// # Examples
1737 ///
1738 /// ```
1739 /// # #[cfg(not(target_family = "wasm"))]
1740 /// # {
1741 /// use tokio::runtime;
1742 ///
1743 /// let rt = runtime::Builder::new_multi_thread()
1744 /// .enable_time()
1745 /// .build()
1746 /// .unwrap();
1747 /// # }
1748 /// ```
1749 pub fn enable_time(&mut self) -> &mut Self {
1750 self.enable_time = true;
1751 self
1752 }
1753 }
1754}
1755
1756cfg_io_uring! {
1757 impl Builder {
1758 /// Enables the tokio's io_uring driver.
1759 ///
1760 /// Doing this enables using io_uring operations on the runtime.
1761 ///
1762 /// # Examples
1763 ///
1764 /// ```
1765 /// use tokio::runtime;
1766 ///
1767 /// let rt = runtime::Builder::new_multi_thread()
1768 /// .enable_io_uring()
1769 /// .build()
1770 /// .unwrap();
1771 /// ```
1772 #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1773 pub fn enable_io_uring(&mut self) -> &mut Self {
1774 // Currently, the uring flag is equivalent to `enable_io`.
1775 self.enable_io = true;
1776 self
1777 }
1778 }
1779}
1780
1781cfg_test_util! {
1782 impl Builder {
1783 /// Controls if the runtime's clock starts paused or advancing.
1784 ///
1785 /// Pausing time requires the current-thread runtime; construction of
1786 /// the runtime will panic otherwise.
1787 ///
1788 /// # Examples
1789 ///
1790 /// ```
1791 /// use tokio::runtime;
1792 ///
1793 /// let rt = runtime::Builder::new_current_thread()
1794 /// .enable_time()
1795 /// .start_paused(true)
1796 /// .build()
1797 /// .unwrap();
1798 /// ```
1799 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1800 self.start_paused = start_paused;
1801 self
1802 }
1803 }
1804}
1805
1806cfg_rt_multi_thread! {
1807 impl Builder {
1808 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1809 use crate::loom::sys::num_cpus;
1810 use crate::runtime::{Config, runtime::Scheduler};
1811 use crate::runtime::scheduler::{self, MultiThread};
1812
1813 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1814
1815 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1816
1817 // Create the blocking pool
1818 let blocking_pool =
1819 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1820 let blocking_spawner = blocking_pool.spawner().clone();
1821
1822 // Generate a rng seed for this runtime.
1823 let seed_generator_1 = self.seed_generator.next_generator();
1824 let seed_generator_2 = self.seed_generator.next_generator();
1825
1826 let (scheduler, handle, launch) = MultiThread::new(
1827 worker_threads,
1828 driver,
1829 driver_handle,
1830 blocking_spawner,
1831 seed_generator_2,
1832 Config {
1833 before_park: self.before_park.clone(),
1834 after_unpark: self.after_unpark.clone(),
1835 before_spawn: self.before_spawn.clone(),
1836 #[cfg(tokio_unstable)]
1837 before_poll: self.before_poll.clone(),
1838 #[cfg(tokio_unstable)]
1839 after_poll: self.after_poll.clone(),
1840 after_termination: self.after_termination.clone(),
1841 global_queue_interval: self.global_queue_interval,
1842 event_interval: self.event_interval,
1843 #[cfg(tokio_unstable)]
1844 unhandled_panic: self.unhandled_panic.clone(),
1845 disable_lifo_slot: self.disable_lifo_slot,
1846 seed_generator: seed_generator_1,
1847 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1848 },
1849 self.timer_flavor,
1850 self.name.clone(),
1851 );
1852
1853 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1854
1855 // Spawn the thread pool workers
1856 let _enter = handle.enter();
1857 launch.launch();
1858
1859 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1860 }
1861 }
1862}
1863
1864impl fmt::Debug for Builder {
1865 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1866 let mut debug = fmt.debug_struct("Builder");
1867
1868 if let Some(name) = &self.name {
1869 debug.field("name", name);
1870 }
1871
1872 debug
1873 .field("worker_threads", &self.worker_threads)
1874 .field("max_blocking_threads", &self.max_blocking_threads)
1875 .field(
1876 "thread_name",
1877 &"<dyn Fn() -> String + Send + Sync + 'static>",
1878 )
1879 .field("thread_stack_size", &self.thread_stack_size)
1880 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1881 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1882 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1883 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."));
1884
1885 if self.name.is_none() {
1886 debug.finish_non_exhaustive()
1887 } else {
1888 debug.finish()
1889 }
1890 }
1891}