tokio/runtime/local_runtime/runtime.rs
1#![allow(irrefutable_let_patterns)]
2
3use crate::runtime::blocking::BlockingPool;
4use crate::runtime::scheduler::CurrentThread;
5use crate::runtime::{context, Builder, EnterGuard, Handle, BOX_FUTURE_THRESHOLD};
6use crate::task::JoinHandle;
7
8use crate::util::trace::SpawnMeta;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::mem;
12use std::time::Duration;
13
14/// A local Tokio runtime.
15///
16/// This runtime is capable of driving tasks which are not `Send + Sync` without the use of a
17/// `LocalSet`, and thus supports `spawn_local` without the need for a `LocalSet` context.
18///
19/// This runtime cannot be moved between threads or driven from different threads.
20///
21/// This runtime is incompatible with `LocalSet`. You should not attempt to drive a `LocalSet` within a
22/// `LocalRuntime`.
23///
24/// Currently, this runtime supports one flavor, which is internally identical to `current_thread`,
25/// save for the aforementioned differences related to `spawn_local`.
26///
27/// For more general information on how to use runtimes, see the [module] docs.
28///
29/// [runtime]: crate::runtime::Runtime
30/// [module]: crate::runtime
31#[derive(Debug)]
32pub struct LocalRuntime {
33 /// Task scheduler
34 scheduler: LocalRuntimeScheduler,
35
36 /// Handle to runtime, also contains driver handles
37 handle: Handle,
38
39 /// Blocking pool handle, used to signal shutdown
40 blocking_pool: BlockingPool,
41
42 /// Marker used to make this !Send and !Sync.
43 _phantom: PhantomData<*mut u8>,
44}
45
46/// The runtime scheduler is always a `current_thread` scheduler right now.
47#[derive(Debug)]
48pub(crate) enum LocalRuntimeScheduler {
49 /// Execute all tasks on the current-thread.
50 CurrentThread(CurrentThread),
51}
52
53impl LocalRuntime {
54 pub(crate) fn from_parts(
55 scheduler: LocalRuntimeScheduler,
56 handle: Handle,
57 blocking_pool: BlockingPool,
58 ) -> LocalRuntime {
59 LocalRuntime {
60 scheduler,
61 handle,
62 blocking_pool,
63 _phantom: Default::default(),
64 }
65 }
66
67 /// Creates a new local runtime instance with default configuration values.
68 ///
69 /// This results in the scheduler, I/O driver, and time driver being
70 /// initialized.
71 ///
72 /// When a more complex configuration is necessary, the [runtime builder] may be used.
73 ///
74 /// See [module level][mod] documentation for more details.
75 ///
76 /// # Examples
77 ///
78 /// Creating a new `LocalRuntime` with default configuration values.
79 ///
80 /// ```
81 /// use tokio::runtime::LocalRuntime;
82 ///
83 /// let rt = LocalRuntime::new()
84 /// .unwrap();
85 ///
86 /// // Use the runtime...
87 /// ```
88 ///
89 /// [mod]: crate::runtime
90 /// [runtime builder]: crate::runtime::Builder
91 pub fn new() -> std::io::Result<LocalRuntime> {
92 Builder::new_current_thread()
93 .enable_all()
94 .build_local(Default::default())
95 }
96
97 /// Returns a handle to the runtime's spawner.
98 ///
99 /// The returned handle can be used to spawn tasks that run on this runtime, and can
100 /// be cloned to allow moving the `Handle` to other threads.
101 ///
102 /// As the handle can be sent to other threads, it can only be used to spawn tasks that are `Send`.
103 ///
104 /// Calling [`Handle::block_on`] on a handle to a `LocalRuntime` is error-prone.
105 /// Refer to the documentation of [`Handle::block_on`] for more.
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// use tokio::runtime::LocalRuntime;
111 ///
112 /// let rt = LocalRuntime::new()
113 /// .unwrap();
114 ///
115 /// let handle = rt.handle();
116 ///
117 /// // Use the handle...
118 /// ```
119 pub fn handle(&self) -> &Handle {
120 &self.handle
121 }
122
123 /// Spawns a task on the runtime.
124 ///
125 /// This is analogous to the [`spawn`] method on the standard [`Runtime`], but works even if the task is not thread-safe.
126 ///
127 /// [`spawn`]: crate::runtime::Runtime::spawn
128 /// [`Runtime`]: crate::runtime::Runtime
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// use tokio::runtime::LocalRuntime;
134 ///
135 /// # fn dox() {
136 /// // Create the runtime
137 /// let rt = LocalRuntime::new().unwrap();
138 ///
139 /// // Spawn a future onto the runtime
140 /// rt.spawn_local(async {
141 /// println!("now running on a worker thread");
142 /// });
143 /// # }
144 /// ```
145 #[track_caller]
146 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
147 where
148 F: Future + 'static,
149 F::Output: 'static,
150 {
151 let fut_size = std::mem::size_of::<F>();
152 let meta = SpawnMeta::new_unnamed(fut_size);
153
154 // safety: spawn_local can only be called from `LocalRuntime`, which this is
155 unsafe {
156 if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
157 self.handle.spawn_local_named(Box::pin(future), meta)
158 } else {
159 self.handle.spawn_local_named(future, meta)
160 }
161 }
162 }
163
164 /// Runs the provided function on a thread from a dedicated blocking thread pool.
165 ///
166 /// This function _will_ be run on another thread.
167 ///
168 /// See the [documentation in the non-local runtime][Runtime] for more
169 /// information.
170 ///
171 /// [Runtime]: crate::runtime::Runtime::spawn_blocking
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// use tokio::runtime::LocalRuntime;
177 ///
178 /// # fn dox() {
179 /// // Create the runtime
180 /// let rt = LocalRuntime::new().unwrap();
181 ///
182 /// // Spawn a blocking function onto the runtime
183 /// rt.spawn_blocking(|| {
184 /// println!("now running on a worker thread");
185 /// });
186 /// # }
187 /// ```
188 #[track_caller]
189 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
190 where
191 F: FnOnce() -> R + Send + 'static,
192 R: Send + 'static,
193 {
194 self.handle.spawn_blocking(func)
195 }
196
197 /// Runs a future to completion on the Tokio runtime. This is the
198 /// runtime's entry point.
199 ///
200 /// See the documentation for [the equivalent method on Runtime][Runtime]
201 /// for more information.
202 ///
203 /// [Runtime]: crate::runtime::Runtime::block_on
204 ///
205 /// # Examples
206 ///
207 /// ```no_run
208 /// use tokio::runtime::LocalRuntime;
209 ///
210 /// // Create the runtime
211 /// let rt = LocalRuntime::new().unwrap();
212 ///
213 /// // Execute the future, blocking the current thread until completion
214 /// rt.block_on(async {
215 /// println!("hello");
216 /// });
217 /// ```
218 #[track_caller]
219 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
220 let fut_size = mem::size_of::<F>();
221 let meta = SpawnMeta::new_unnamed(fut_size);
222
223 if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
224 self.block_on_inner(Box::pin(future), meta)
225 } else {
226 self.block_on_inner(future, meta)
227 }
228 }
229
230 #[track_caller]
231 fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
232 #[cfg(all(
233 tokio_unstable,
234 feature = "taskdump",
235 feature = "rt",
236 target_os = "linux",
237 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
238 ))]
239 let future = crate::runtime::task::trace::Trace::root(future);
240
241 #[cfg(all(tokio_unstable, feature = "tracing"))]
242 let future = crate::util::trace::task(
243 future,
244 "block_on",
245 _meta,
246 crate::runtime::task::Id::next().as_u64(),
247 );
248
249 let _enter = self.enter();
250
251 if let LocalRuntimeScheduler::CurrentThread(exec) = &self.scheduler {
252 exec.block_on(&self.handle.inner, future)
253 } else {
254 unreachable!("LocalRuntime only supports current_thread")
255 }
256 }
257
258 /// Enters the runtime context.
259 ///
260 /// This allows you to construct types that must have an executor
261 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
262 /// also allow you to call methods such as [`tokio::spawn`].
263 ///
264 /// If this is a handle to a [`LocalRuntime`], and this function is being invoked from the same
265 /// thread that the runtime was created on, you will also be able to call
266 /// [`tokio::task::spawn_local`].
267 ///
268 /// [`Sleep`]: struct@crate::time::Sleep
269 /// [`TcpStream`]: struct@crate::net::TcpStream
270 /// [`tokio::spawn`]: fn@crate::spawn
271 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
272 /// [`tokio::task::spawn_local`]: fn@crate::task::spawn_local
273 ///
274 /// # Example
275 ///
276 /// ```
277 /// use tokio::runtime::LocalRuntime;
278 /// use tokio::task::JoinHandle;
279 ///
280 /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
281 /// // Had we not used `rt.enter` below, this would panic.
282 /// tokio::spawn(async move {
283 /// println!("{}", msg);
284 /// })
285 /// }
286 ///
287 /// fn main() {
288 /// let rt = LocalRuntime::new().unwrap();
289 ///
290 /// let s = "Hello World!".to_string();
291 ///
292 /// // By entering the context, we tie `tokio::spawn` to this executor.
293 /// let _guard = rt.enter();
294 /// let handle = function_that_spawns(s);
295 ///
296 /// // Wait for the task before we end the test.
297 /// rt.block_on(handle).unwrap();
298 /// }
299 /// ```
300 pub fn enter(&self) -> EnterGuard<'_> {
301 self.handle.enter()
302 }
303
304 /// Shuts down the runtime, waiting for at most `duration` for all spawned
305 /// work to stop.
306 ///
307 /// Note that `spawn_blocking` tasks, and only `spawn_blocking` tasks, can get left behind if
308 /// the timeout expires.
309 ///
310 /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
311 ///
312 /// # Examples
313 ///
314 /// ```
315 /// # #[cfg(not(target_family = "wasm"))]
316 /// # {
317 /// use tokio::runtime::LocalRuntime;
318 /// use tokio::task;
319 ///
320 /// use std::thread;
321 /// use std::time::Duration;
322 ///
323 /// fn main() {
324 /// let runtime = LocalRuntime::new().unwrap();
325 ///
326 /// runtime.block_on(async move {
327 /// task::spawn_blocking(move || {
328 /// thread::sleep(Duration::from_secs(10_000));
329 /// });
330 /// });
331 ///
332 /// runtime.shutdown_timeout(Duration::from_millis(100));
333 /// }
334 /// # }
335 /// ```
336 pub fn shutdown_timeout(mut self, duration: Duration) {
337 // Wakeup and shutdown all the worker threads
338 self.handle.inner.shutdown();
339 self.blocking_pool.shutdown(Some(duration));
340 }
341
342 /// Shuts down the runtime, without waiting for any spawned work to stop.
343 ///
344 /// This can be useful if you want to drop a runtime from within another runtime.
345 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
346 /// to complete, which would normally not be permitted within an asynchronous context.
347 /// By calling `shutdown_background()`, you can drop the runtime from such a context.
348 ///
349 /// Note however, that because we do not wait for any blocking tasks to complete, this
350 /// may result in a resource leak (in that any blocking tasks are still running until they
351 /// return. No other tasks will leak.
352 ///
353 /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
354 ///
355 /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
356 ///
357 /// ```
358 /// use tokio::runtime::LocalRuntime;
359 ///
360 /// fn main() {
361 /// let runtime = LocalRuntime::new().unwrap();
362 ///
363 /// runtime.block_on(async move {
364 /// let inner_runtime = LocalRuntime::new().unwrap();
365 /// // ...
366 /// inner_runtime.shutdown_background();
367 /// });
368 /// }
369 /// ```
370 pub fn shutdown_background(self) {
371 self.shutdown_timeout(Duration::from_nanos(0));
372 }
373
374 /// Returns a view that lets you get information about how the runtime
375 /// is performing.
376 pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
377 self.handle.metrics()
378 }
379}
380
381impl Drop for LocalRuntime {
382 fn drop(&mut self) {
383 if let LocalRuntimeScheduler::CurrentThread(current_thread) = &mut self.scheduler {
384 // This ensures that tasks spawned on the current-thread
385 // runtime are dropped inside the runtime's context.
386 let _guard = context::try_set_current(&self.handle.inner);
387 current_thread.shutdown(&self.handle.inner);
388 } else {
389 unreachable!("LocalRuntime only supports current-thread")
390 }
391 }
392}
393
394impl std::panic::UnwindSafe for LocalRuntime {}
395
396impl std::panic::RefUnwindSafe for LocalRuntime {}