tokio/runtime/local_runtime/
runtime.rs

1#![allow(irrefutable_let_patterns)]
2
3use crate::runtime::blocking::BlockingPool;
4use crate::runtime::scheduler::CurrentThread;
5use crate::runtime::{context, Builder, EnterGuard, Handle, BOX_FUTURE_THRESHOLD};
6use crate::task::JoinHandle;
7
8use crate::util::trace::SpawnMeta;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::mem;
12use std::time::Duration;
13
14/// A local Tokio runtime.
15///
16/// This runtime is capable of driving tasks which are not `Send + Sync` without the use of a
17/// `LocalSet`, and thus supports `spawn_local` without the need for a `LocalSet` context.
18///
19/// This runtime cannot be moved between threads or driven from different threads.
20///
21/// This runtime is incompatible with `LocalSet`. You should not attempt to drive a `LocalSet` within a
22/// `LocalRuntime`.
23///
24/// Currently, this runtime supports one flavor, which is internally identical to `current_thread`,
25/// save for the aforementioned differences related to `spawn_local`.
26///
27/// For more general information on how to use runtimes, see the [module] docs.
28///
29/// [runtime]: crate::runtime::Runtime
30/// [module]: crate::runtime
31#[derive(Debug)]
32pub struct LocalRuntime {
33    /// Task scheduler
34    scheduler: LocalRuntimeScheduler,
35
36    /// Handle to runtime, also contains driver handles
37    handle: Handle,
38
39    /// Blocking pool handle, used to signal shutdown
40    blocking_pool: BlockingPool,
41
42    /// Marker used to make this !Send and !Sync.
43    _phantom: PhantomData<*mut u8>,
44}
45
46/// The runtime scheduler is always a `current_thread` scheduler right now.
47#[derive(Debug)]
48pub(crate) enum LocalRuntimeScheduler {
49    /// Execute all tasks on the current-thread.
50    CurrentThread(CurrentThread),
51}
52
53impl LocalRuntime {
54    pub(crate) fn from_parts(
55        scheduler: LocalRuntimeScheduler,
56        handle: Handle,
57        blocking_pool: BlockingPool,
58    ) -> LocalRuntime {
59        LocalRuntime {
60            scheduler,
61            handle,
62            blocking_pool,
63            _phantom: Default::default(),
64        }
65    }
66
67    /// Creates a new local runtime instance with default configuration values.
68    ///
69    /// This results in the scheduler, I/O driver, and time driver being
70    /// initialized.
71    ///
72    /// When a more complex configuration is necessary, the [runtime builder] may be used.
73    ///
74    /// See [module level][mod] documentation for more details.
75    ///
76    /// # Examples
77    ///
78    /// Creating a new `LocalRuntime` with default configuration values.
79    ///
80    /// ```
81    /// use tokio::runtime::LocalRuntime;
82    ///
83    /// let rt = LocalRuntime::new()
84    ///     .unwrap();
85    ///
86    /// // Use the runtime...
87    /// ```
88    ///
89    /// [mod]: crate::runtime
90    /// [runtime builder]: crate::runtime::Builder
91    pub fn new() -> std::io::Result<LocalRuntime> {
92        Builder::new_current_thread()
93            .enable_all()
94            .build_local(Default::default())
95    }
96
97    /// Returns a handle to the runtime's spawner.
98    ///
99    /// The returned handle can be used to spawn tasks that run on this runtime, and can
100    /// be cloned to allow moving the `Handle` to other threads.
101    ///
102    /// As the handle can be sent to other threads, it can only be used to spawn tasks that are `Send`.
103    ///
104    /// Calling [`Handle::block_on`] on a handle to a `LocalRuntime` is error-prone.
105    /// Refer to the documentation of [`Handle::block_on`] for more.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// use tokio::runtime::LocalRuntime;
111    ///
112    /// let rt = LocalRuntime::new()
113    ///     .unwrap();
114    ///
115    /// let handle = rt.handle();
116    ///
117    /// // Use the handle...
118    /// ```
119    pub fn handle(&self) -> &Handle {
120        &self.handle
121    }
122
123    /// Spawns a task on the runtime.
124    ///
125    /// This is analogous to the [`spawn`] method on the standard [`Runtime`], but works even if the task is not thread-safe.
126    ///
127    /// [`spawn`]: crate::runtime::Runtime::spawn
128    /// [`Runtime`]: crate::runtime::Runtime
129    ///
130    /// # Examples
131    ///
132    /// ```
133    /// use tokio::runtime::LocalRuntime;
134    ///
135    /// # fn dox() {
136    /// // Create the runtime
137    /// let rt = LocalRuntime::new().unwrap();
138    ///
139    /// // Spawn a future onto the runtime
140    /// rt.spawn_local(async {
141    ///     println!("now running on a worker thread");
142    /// });
143    /// # }
144    /// ```
145    #[track_caller]
146    pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
147    where
148        F: Future + 'static,
149        F::Output: 'static,
150    {
151        let fut_size = std::mem::size_of::<F>();
152        let meta = SpawnMeta::new_unnamed(fut_size);
153
154        // safety: spawn_local can only be called from `LocalRuntime`, which this is
155        unsafe {
156            if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
157                self.handle.spawn_local_named(Box::pin(future), meta)
158            } else {
159                self.handle.spawn_local_named(future, meta)
160            }
161        }
162    }
163
164    /// Runs the provided function on a thread from a dedicated blocking thread pool.
165    ///
166    /// This function _will_ be run on another thread.
167    ///
168    /// See the [documentation in the non-local runtime][Runtime] for more
169    /// information.
170    ///
171    /// [Runtime]: crate::runtime::Runtime::spawn_blocking
172    ///
173    /// # Examples
174    ///
175    /// ```
176    /// use tokio::runtime::LocalRuntime;
177    ///
178    /// # fn dox() {
179    /// // Create the runtime
180    /// let rt = LocalRuntime::new().unwrap();
181    ///
182    /// // Spawn a blocking function onto the runtime
183    /// rt.spawn_blocking(|| {
184    ///     println!("now running on a worker thread");
185    /// });
186    /// # }
187    /// ```
188    #[track_caller]
189    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
190    where
191        F: FnOnce() -> R + Send + 'static,
192        R: Send + 'static,
193    {
194        self.handle.spawn_blocking(func)
195    }
196
197    /// Runs a future to completion on the Tokio runtime. This is the
198    /// runtime's entry point.
199    ///
200    /// See the documentation for [the equivalent method on Runtime][Runtime]
201    /// for more information.
202    ///
203    /// [Runtime]: crate::runtime::Runtime::block_on
204    ///
205    /// # Examples
206    ///
207    /// ```no_run
208    /// use tokio::runtime::LocalRuntime;
209    ///
210    /// // Create the runtime
211    /// let rt  = LocalRuntime::new().unwrap();
212    ///
213    /// // Execute the future, blocking the current thread until completion
214    /// rt.block_on(async {
215    ///     println!("hello");
216    /// });
217    /// ```
218    #[track_caller]
219    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
220        let fut_size = mem::size_of::<F>();
221        let meta = SpawnMeta::new_unnamed(fut_size);
222
223        if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
224            self.block_on_inner(Box::pin(future), meta)
225        } else {
226            self.block_on_inner(future, meta)
227        }
228    }
229
230    #[track_caller]
231    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
232        #[cfg(all(
233            tokio_unstable,
234            feature = "taskdump",
235            feature = "rt",
236            target_os = "linux",
237            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
238        ))]
239        let future = crate::runtime::task::trace::Trace::root(future);
240
241        #[cfg(all(tokio_unstable, feature = "tracing"))]
242        let future = crate::util::trace::task(
243            future,
244            "block_on",
245            _meta,
246            crate::runtime::task::Id::next().as_u64(),
247        );
248
249        let _enter = self.enter();
250
251        if let LocalRuntimeScheduler::CurrentThread(exec) = &self.scheduler {
252            exec.block_on(&self.handle.inner, future)
253        } else {
254            unreachable!("LocalRuntime only supports current_thread")
255        }
256    }
257
258    /// Enters the runtime context.
259    ///
260    /// This allows you to construct types that must have an executor
261    /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
262    /// also allow you to call methods such as [`tokio::spawn`].
263    ///
264    /// If this is a handle to a [`LocalRuntime`], and this function is being invoked from the same
265    /// thread that the runtime was created on, you will also be able to call
266    /// [`tokio::task::spawn_local`].
267    ///
268    /// [`Sleep`]: struct@crate::time::Sleep
269    /// [`TcpStream`]: struct@crate::net::TcpStream
270    /// [`tokio::spawn`]: fn@crate::spawn
271    /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
272    /// [`tokio::task::spawn_local`]: fn@crate::task::spawn_local
273    ///
274    /// # Example
275    ///
276    /// ```
277    /// use tokio::runtime::LocalRuntime;
278    /// use tokio::task::JoinHandle;
279    ///
280    /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
281    ///     // Had we not used `rt.enter` below, this would panic.
282    ///     tokio::spawn(async move {
283    ///         println!("{}", msg);
284    ///     })
285    /// }
286    ///
287    /// fn main() {
288    ///     let rt = LocalRuntime::new().unwrap();
289    ///
290    ///     let s = "Hello World!".to_string();
291    ///
292    ///     // By entering the context, we tie `tokio::spawn` to this executor.
293    ///     let _guard = rt.enter();
294    ///     let handle = function_that_spawns(s);
295    ///
296    ///     // Wait for the task before we end the test.
297    ///     rt.block_on(handle).unwrap();
298    /// }
299    /// ```
300    pub fn enter(&self) -> EnterGuard<'_> {
301        self.handle.enter()
302    }
303
304    /// Shuts down the runtime, waiting for at most `duration` for all spawned
305    /// work to stop.
306    ///
307    /// Note that `spawn_blocking` tasks, and only `spawn_blocking` tasks, can get left behind if
308    /// the timeout expires.
309    ///
310    /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
311    ///
312    /// # Examples
313    ///
314    /// ```
315    /// # #[cfg(not(target_family = "wasm"))]
316    /// # {
317    /// use tokio::runtime::LocalRuntime;
318    /// use tokio::task;
319    ///
320    /// use std::thread;
321    /// use std::time::Duration;
322    ///
323    /// fn main() {
324    ///    let runtime = LocalRuntime::new().unwrap();
325    ///
326    ///    runtime.block_on(async move {
327    ///        task::spawn_blocking(move || {
328    ///            thread::sleep(Duration::from_secs(10_000));
329    ///        });
330    ///    });
331    ///
332    ///    runtime.shutdown_timeout(Duration::from_millis(100));
333    /// }
334    /// # }
335    /// ```
336    pub fn shutdown_timeout(mut self, duration: Duration) {
337        // Wakeup and shutdown all the worker threads
338        self.handle.inner.shutdown();
339        self.blocking_pool.shutdown(Some(duration));
340    }
341
342    /// Shuts down the runtime, without waiting for any spawned work to stop.
343    ///
344    /// This can be useful if you want to drop a runtime from within another runtime.
345    /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
346    /// to complete, which would normally not be permitted within an asynchronous context.
347    /// By calling `shutdown_background()`, you can drop the runtime from such a context.
348    ///
349    /// Note however, that because we do not wait for any blocking tasks to complete, this
350    /// may result in a resource leak (in that any blocking tasks are still running until they
351    /// return. No other tasks will leak.
352    ///
353    /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
354    ///
355    /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
356    ///
357    /// ```
358    /// use tokio::runtime::LocalRuntime;
359    ///
360    /// fn main() {
361    ///    let runtime = LocalRuntime::new().unwrap();
362    ///
363    ///    runtime.block_on(async move {
364    ///        let inner_runtime = LocalRuntime::new().unwrap();
365    ///        // ...
366    ///        inner_runtime.shutdown_background();
367    ///    });
368    /// }
369    /// ```
370    pub fn shutdown_background(self) {
371        self.shutdown_timeout(Duration::from_nanos(0));
372    }
373
374    /// Returns a view that lets you get information about how the runtime
375    /// is performing.
376    pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
377        self.handle.metrics()
378    }
379}
380
381impl Drop for LocalRuntime {
382    fn drop(&mut self) {
383        if let LocalRuntimeScheduler::CurrentThread(current_thread) = &mut self.scheduler {
384            // This ensures that tasks spawned on the current-thread
385            // runtime are dropped inside the runtime's context.
386            let _guard = context::try_set_current(&self.handle.inner);
387            current_thread.shutdown(&self.handle.inner);
388        } else {
389            unreachable!("LocalRuntime only supports current-thread")
390        }
391    }
392}
393
394impl std::panic::UnwindSafe for LocalRuntime {}
395
396impl std::panic::RefUnwindSafe for LocalRuntime {}