tokio/runtime/builder.rs
1use crate::runtime::handle::Handle;
2use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
3use crate::util::rand::{RngSeed, RngSeedGenerator};
4
5use std::fmt;
6use std::io;
7use std::time::Duration;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
15/// or [`Builder::new_current_thread`].
16///
17/// See function level documentation for details on the various configuration
18/// settings.
19///
20/// [`build`]: method@Self::build
21/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
22/// [`Builder::new_current_thread`]: method@Self::new_current_thread
23///
24/// # Examples
25///
26/// ```
27/// use tokio::runtime::Builder;
28///
29/// fn main() {
30/// // build runtime
31/// let runtime = Builder::new_multi_thread()
32/// .worker_threads(4)
33/// .thread_name("my-custom-name")
34/// .thread_stack_size(3 * 1024 * 1024)
35/// .build()
36/// .unwrap();
37///
38/// // use runtime ...
39/// }
40/// ```
41pub struct Builder {
42 /// Runtime type
43 kind: Kind,
44
45 /// Whether or not to enable the I/O driver
46 enable_io: bool,
47 nevents: usize,
48
49 /// Whether or not to enable the time driver
50 enable_time: bool,
51
52 /// Whether or not the clock should start paused.
53 start_paused: bool,
54
55 /// The number of worker threads, used by Runtime.
56 ///
57 /// Only used when not using the current-thread executor.
58 worker_threads: Option<usize>,
59
60 /// Cap on thread usage.
61 max_blocking_threads: usize,
62
63 /// Name fn used for threads spawned by the runtime.
64 pub(super) thread_name: ThreadNameFn,
65
66 /// Stack size used for threads spawned by the runtime.
67 pub(super) thread_stack_size: Option<usize>,
68
69 /// Callback to run after each thread starts.
70 pub(super) after_start: Option<Callback>,
71
72 /// To run before each worker thread stops
73 pub(super) before_stop: Option<Callback>,
74
75 /// To run before each worker thread is parked.
76 pub(super) before_park: Option<Callback>,
77
78 /// To run after each thread is unparked.
79 pub(super) after_unpark: Option<Callback>,
80
81 /// Customizable keep alive timeout for `BlockingPool`
82 pub(super) keep_alive: Option<Duration>,
83
84 /// How many ticks before pulling a task from the global/remote queue?
85 ///
86 /// When `None`, the value is unspecified and behavior details are left to
87 /// the scheduler. Each scheduler flavor could choose to either pick its own
88 /// default value or use some other strategy to decide when to poll from the
89 /// global queue. For example, the multi-threaded scheduler uses a
90 /// self-tuning strategy based on mean task poll times.
91 pub(super) global_queue_interval: Option<u32>,
92
93 /// How many ticks before yielding to the driver for timer and I/O events?
94 pub(super) event_interval: u32,
95
96 pub(super) local_queue_capacity: usize,
97
98 /// When true, the multi-threade scheduler LIFO slot should not be used.
99 ///
100 /// This option should only be exposed as unstable.
101 pub(super) disable_lifo_slot: bool,
102
103 /// Specify a random number generator seed to provide deterministic results
104 pub(super) seed_generator: RngSeedGenerator,
105
106 /// When true, enables task poll count histogram instrumentation.
107 pub(super) metrics_poll_count_histogram_enable: bool,
108
109 /// Configures the task poll count histogram
110 pub(super) metrics_poll_count_histogram: HistogramBuilder,
111
112 #[cfg(tokio_unstable)]
113 pub(super) unhandled_panic: UnhandledPanic,
114}
115
116cfg_unstable! {
117 /// How the runtime should respond to unhandled panics.
118 ///
119 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
120 /// to configure the runtime behavior when a spawned task panics.
121 ///
122 /// See [`Builder::unhandled_panic`] for more details.
123 #[derive(Debug, Clone)]
124 #[non_exhaustive]
125 pub enum UnhandledPanic {
126 /// The runtime should ignore panics on spawned tasks.
127 ///
128 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
129 /// tasks continue running normally.
130 ///
131 /// This is the default behavior.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::runtime::{self, UnhandledPanic};
137 ///
138 /// # pub fn main() {
139 /// let rt = runtime::Builder::new_current_thread()
140 /// .unhandled_panic(UnhandledPanic::Ignore)
141 /// .build()
142 /// .unwrap();
143 ///
144 /// let task1 = rt.spawn(async { panic!("boom"); });
145 /// let task2 = rt.spawn(async {
146 /// // This task completes normally
147 /// "done"
148 /// });
149 ///
150 /// rt.block_on(async {
151 /// // The panic on the first task is forwarded to the `JoinHandle`
152 /// assert!(task1.await.is_err());
153 ///
154 /// // The second task completes normally
155 /// assert!(task2.await.is_ok());
156 /// })
157 /// # }
158 /// ```
159 ///
160 /// [`JoinHandle`]: struct@crate::task::JoinHandle
161 Ignore,
162
163 /// The runtime should immediately shutdown if a spawned task panics.
164 ///
165 /// The runtime will immediately shutdown even if the panicked task's
166 /// [`JoinHandle`] is still available. All further spawned tasks will be
167 /// immediately dropped and call to [`Runtime::block_on`] will panic.
168 ///
169 /// # Examples
170 ///
171 /// ```should_panic
172 /// use tokio::runtime::{self, UnhandledPanic};
173 ///
174 /// # pub fn main() {
175 /// let rt = runtime::Builder::new_current_thread()
176 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
177 /// .build()
178 /// .unwrap();
179 ///
180 /// rt.spawn(async { panic!("boom"); });
181 /// rt.spawn(async {
182 /// // This task never completes.
183 /// });
184 ///
185 /// rt.block_on(async {
186 /// // Do some work
187 /// # loop { tokio::task::yield_now().await; }
188 /// })
189 /// # }
190 /// ```
191 ///
192 /// [`JoinHandle`]: struct@crate::task::JoinHandle
193 ShutdownRuntime,
194 }
195}
196
197pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
198
199#[derive(Clone, Copy)]
200pub(crate) enum Kind {
201 CurrentThread,
202 #[cfg(feature = "rt-multi-thread")]
203 MultiThread,
204 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
205 MultiThreadAlt,
206}
207
208impl Builder {
209 /// Returns a new builder with the current thread scheduler selected.
210 ///
211 /// Configuration methods can be chained on the return value.
212 ///
213 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
214 /// [`LocalSet`].
215 ///
216 /// [`LocalSet`]: crate::task::LocalSet
217 pub fn new_current_thread() -> Builder {
218 #[cfg(loom)]
219 const EVENT_INTERVAL: u32 = 4;
220 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
221 #[cfg(not(loom))]
222 const EVENT_INTERVAL: u32 = 61;
223
224 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
225 }
226
227 /// Returns a new builder with the multi thread scheduler selected.
228 ///
229 /// Configuration methods can be chained on the return value.
230 #[cfg(feature = "rt-multi-thread")]
231 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
232 pub fn new_multi_thread() -> Builder {
233 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
234 Builder::new(Kind::MultiThread, 61)
235 }
236
237 cfg_unstable! {
238 /// Returns a new builder with the alternate multi thread scheduler
239 /// selected.
240 ///
241 /// The alternate multi threaded scheduler is an in-progress
242 /// candidate to replace the existing multi threaded scheduler. It
243 /// currently does not scale as well to 16+ processors.
244 ///
245 /// This runtime flavor is currently **not considered production
246 /// ready**.
247 ///
248 /// Configuration methods can be chained on the return value.
249 #[cfg(feature = "rt-multi-thread")]
250 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
251 pub fn new_multi_thread_alt() -> Builder {
252 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
253 Builder::new(Kind::MultiThreadAlt, 61)
254 }
255 }
256
257 /// Returns a new runtime builder initialized with default configuration
258 /// values.
259 ///
260 /// Configuration methods can be chained on the return value.
261 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
262 Builder {
263 kind,
264
265 // I/O defaults to "off"
266 enable_io: false,
267 nevents: 1024,
268
269 // Time defaults to "off"
270 enable_time: false,
271
272 // The clock starts not-paused
273 start_paused: false,
274
275 // Read from environment variable first in multi-threaded mode.
276 // Default to lazy auto-detection (one thread per CPU core)
277 worker_threads: None,
278
279 max_blocking_threads: 512,
280
281 // Default thread name
282 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
283
284 // Do not set a stack size by default
285 thread_stack_size: None,
286
287 // No worker thread callbacks
288 after_start: None,
289 before_stop: None,
290 before_park: None,
291 after_unpark: None,
292
293 keep_alive: None,
294
295 // Defaults for these values depend on the scheduler kind, so we get them
296 // as parameters.
297 global_queue_interval: None,
298 event_interval,
299
300 #[cfg(not(loom))]
301 local_queue_capacity: 256,
302
303 #[cfg(loom)]
304 local_queue_capacity: 4,
305
306 seed_generator: RngSeedGenerator::new(RngSeed::new()),
307
308 #[cfg(tokio_unstable)]
309 unhandled_panic: UnhandledPanic::Ignore,
310
311 metrics_poll_count_histogram_enable: false,
312
313 metrics_poll_count_histogram: HistogramBuilder::default(),
314
315 disable_lifo_slot: false,
316 }
317 }
318
319 /// Enables both I/O and time drivers.
320 ///
321 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
322 /// individually. If additional components are added to Tokio in the future,
323 /// `enable_all` will include these future components.
324 ///
325 /// # Examples
326 ///
327 /// ```
328 /// use tokio::runtime;
329 ///
330 /// let rt = runtime::Builder::new_multi_thread()
331 /// .enable_all()
332 /// .build()
333 /// .unwrap();
334 /// ```
335 pub fn enable_all(&mut self) -> &mut Self {
336 #[cfg(any(
337 feature = "net",
338 all(unix, feature = "process"),
339 all(unix, feature = "signal")
340 ))]
341 self.enable_io();
342 #[cfg(feature = "time")]
343 self.enable_time();
344
345 self
346 }
347
348 /// Sets the number of worker threads the `Runtime` will use.
349 ///
350 /// This can be any number above 0 though it is advised to keep this value
351 /// on the smaller side.
352 ///
353 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
354 ///
355 /// # Default
356 ///
357 /// The default value is the number of cores available to the system.
358 ///
359 /// When using the `current_thread` runtime this method has no effect.
360 ///
361 /// # Examples
362 ///
363 /// ## Multi threaded runtime with 4 threads
364 ///
365 /// ```
366 /// use tokio::runtime;
367 ///
368 /// // This will spawn a work-stealing runtime with 4 worker threads.
369 /// let rt = runtime::Builder::new_multi_thread()
370 /// .worker_threads(4)
371 /// .build()
372 /// .unwrap();
373 ///
374 /// rt.spawn(async move {});
375 /// ```
376 ///
377 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
378 ///
379 /// ```
380 /// use tokio::runtime;
381 ///
382 /// // Create a runtime that _must_ be driven from a call
383 /// // to `Runtime::block_on`.
384 /// let rt = runtime::Builder::new_current_thread()
385 /// .build()
386 /// .unwrap();
387 ///
388 /// // This will run the runtime and future on the current thread
389 /// rt.block_on(async move {});
390 /// ```
391 ///
392 /// # Panics
393 ///
394 /// This will panic if `val` is not larger than `0`.
395 #[track_caller]
396 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
397 assert!(val > 0, "Worker threads cannot be set to 0");
398 self.worker_threads = Some(val);
399 self
400 }
401
402 /// Specifies the limit for additional threads spawned by the Runtime.
403 ///
404 /// These threads are used for blocking operations like tasks spawned
405 /// through [`spawn_blocking`], this includes but is not limited to:
406 /// - [`fs`] operations
407 /// - dns resolution through [`ToSocketAddrs`]
408 /// - writing to [`Stdout`] or [`Stderr`]
409 /// - reading from [`Stdin`]
410 ///
411 /// Unlike the [`worker_threads`], they are not always active and will exit
412 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
413 ///
414 /// It's recommended to not set this limit too low in order to avoid hanging on operations
415 /// requiring [`spawn_blocking`].
416 ///
417 /// The default value is 512.
418 ///
419 /// # Panics
420 ///
421 /// This will panic if `val` is not larger than `0`.
422 ///
423 /// # Upgrading from 0.x
424 ///
425 /// In old versions `max_threads` limited both blocking and worker threads, but the
426 /// current `max_blocking_threads` does not include async worker threads in the count.
427 ///
428 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
429 /// [`fs`]: mod@crate::fs
430 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
431 /// [`Stdout`]: struct@crate::io::Stdout
432 /// [`Stdin`]: struct@crate::io::Stdin
433 /// [`Stderr`]: struct@crate::io::Stderr
434 /// [`worker_threads`]: Self::worker_threads
435 /// [`thread_keep_alive`]: Self::thread_keep_alive
436 #[track_caller]
437 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
438 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
439 assert!(val > 0, "Max blocking threads cannot be set to 0");
440 self.max_blocking_threads = val;
441 self
442 }
443
444 /// Sets name of threads spawned by the `Runtime`'s thread pool.
445 ///
446 /// The default name is "tokio-runtime-worker".
447 ///
448 /// # Examples
449 ///
450 /// ```
451 /// # use tokio::runtime;
452 ///
453 /// # pub fn main() {
454 /// let rt = runtime::Builder::new_multi_thread()
455 /// .thread_name("my-pool")
456 /// .build();
457 /// # }
458 /// ```
459 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
460 let val = val.into();
461 self.thread_name = std::sync::Arc::new(move || val.clone());
462 self
463 }
464
465 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
466 ///
467 /// The default name fn is `|| "tokio-runtime-worker".into()`.
468 ///
469 /// # Examples
470 ///
471 /// ```
472 /// # use tokio::runtime;
473 /// # use std::sync::atomic::{AtomicUsize, Ordering};
474 /// # pub fn main() {
475 /// let rt = runtime::Builder::new_multi_thread()
476 /// .thread_name_fn(|| {
477 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
478 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
479 /// format!("my-pool-{}", id)
480 /// })
481 /// .build();
482 /// # }
483 /// ```
484 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
485 where
486 F: Fn() -> String + Send + Sync + 'static,
487 {
488 self.thread_name = std::sync::Arc::new(f);
489 self
490 }
491
492 /// Sets the stack size (in bytes) for worker threads.
493 ///
494 /// The actual stack size may be greater than this value if the platform
495 /// specifies minimal stack size.
496 ///
497 /// The default stack size for spawned threads is 2 MiB, though this
498 /// particular stack size is subject to change in the future.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// # use tokio::runtime;
504 ///
505 /// # pub fn main() {
506 /// let rt = runtime::Builder::new_multi_thread()
507 /// .thread_stack_size(32 * 1024)
508 /// .build();
509 /// # }
510 /// ```
511 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
512 self.thread_stack_size = Some(val);
513 self
514 }
515
516 /// Executes function `f` after each thread is started but before it starts
517 /// doing work.
518 ///
519 /// This is intended for bookkeeping and monitoring use cases.
520 ///
521 /// # Examples
522 ///
523 /// ```
524 /// # use tokio::runtime;
525 /// # pub fn main() {
526 /// let runtime = runtime::Builder::new_multi_thread()
527 /// .on_thread_start(|| {
528 /// println!("thread started");
529 /// })
530 /// .build();
531 /// # }
532 /// ```
533 #[cfg(not(loom))]
534 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
535 where
536 F: Fn() + Send + Sync + 'static,
537 {
538 self.after_start = Some(std::sync::Arc::new(f));
539 self
540 }
541
542 /// Executes function `f` before each thread stops.
543 ///
544 /// This is intended for bookkeeping and monitoring use cases.
545 ///
546 /// # Examples
547 ///
548 /// ```
549 /// # use tokio::runtime;
550 /// # pub fn main() {
551 /// let runtime = runtime::Builder::new_multi_thread()
552 /// .on_thread_stop(|| {
553 /// println!("thread stopping");
554 /// })
555 /// .build();
556 /// # }
557 /// ```
558 #[cfg(not(loom))]
559 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
560 where
561 F: Fn() + Send + Sync + 'static,
562 {
563 self.before_stop = Some(std::sync::Arc::new(f));
564 self
565 }
566
567 /// Executes function `f` just before a thread is parked (goes idle).
568 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
569 /// can be called, and may result in this thread being unparked immediately.
570 ///
571 /// This can be used to start work only when the executor is idle, or for bookkeeping
572 /// and monitoring purposes.
573 ///
574 /// Note: There can only be one park callback for a runtime; calling this function
575 /// more than once replaces the last callback defined, rather than adding to it.
576 ///
577 /// # Examples
578 ///
579 /// ## Multithreaded executor
580 /// ```
581 /// # use std::sync::Arc;
582 /// # use std::sync::atomic::{AtomicBool, Ordering};
583 /// # use tokio::runtime;
584 /// # use tokio::sync::Barrier;
585 /// # pub fn main() {
586 /// let once = AtomicBool::new(true);
587 /// let barrier = Arc::new(Barrier::new(2));
588 ///
589 /// let runtime = runtime::Builder::new_multi_thread()
590 /// .worker_threads(1)
591 /// .on_thread_park({
592 /// let barrier = barrier.clone();
593 /// move || {
594 /// let barrier = barrier.clone();
595 /// if once.swap(false, Ordering::Relaxed) {
596 /// tokio::spawn(async move { barrier.wait().await; });
597 /// }
598 /// }
599 /// })
600 /// .build()
601 /// .unwrap();
602 ///
603 /// runtime.block_on(async {
604 /// barrier.wait().await;
605 /// })
606 /// # }
607 /// ```
608 /// ## Current thread executor
609 /// ```
610 /// # use std::sync::Arc;
611 /// # use std::sync::atomic::{AtomicBool, Ordering};
612 /// # use tokio::runtime;
613 /// # use tokio::sync::Barrier;
614 /// # pub fn main() {
615 /// let once = AtomicBool::new(true);
616 /// let barrier = Arc::new(Barrier::new(2));
617 ///
618 /// let runtime = runtime::Builder::new_current_thread()
619 /// .on_thread_park({
620 /// let barrier = barrier.clone();
621 /// move || {
622 /// let barrier = barrier.clone();
623 /// if once.swap(false, Ordering::Relaxed) {
624 /// tokio::spawn(async move { barrier.wait().await; });
625 /// }
626 /// }
627 /// })
628 /// .build()
629 /// .unwrap();
630 ///
631 /// runtime.block_on(async {
632 /// barrier.wait().await;
633 /// })
634 /// # }
635 /// ```
636 #[cfg(not(loom))]
637 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
638 where
639 F: Fn() + Send + Sync + 'static,
640 {
641 self.before_park = Some(std::sync::Arc::new(f));
642 self
643 }
644
645 /// Executes function `f` just after a thread unparks (starts executing tasks).
646 ///
647 /// This is intended for bookkeeping and monitoring use cases; note that work
648 /// in this callback will increase latencies when the application has allowed one or
649 /// more runtime threads to go idle.
650 ///
651 /// Note: There can only be one unpark callback for a runtime; calling this function
652 /// more than once replaces the last callback defined, rather than adding to it.
653 ///
654 /// # Examples
655 ///
656 /// ```
657 /// # use tokio::runtime;
658 /// # pub fn main() {
659 /// let runtime = runtime::Builder::new_multi_thread()
660 /// .on_thread_unpark(|| {
661 /// println!("thread unparking");
662 /// })
663 /// .build();
664 ///
665 /// runtime.unwrap().block_on(async {
666 /// tokio::task::yield_now().await;
667 /// println!("Hello from Tokio!");
668 /// })
669 /// # }
670 /// ```
671 #[cfg(not(loom))]
672 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
673 where
674 F: Fn() + Send + Sync + 'static,
675 {
676 self.after_unpark = Some(std::sync::Arc::new(f));
677 self
678 }
679
680 /// Creates the configured `Runtime`.
681 ///
682 /// The returned `Runtime` instance is ready to spawn tasks.
683 ///
684 /// # Examples
685 ///
686 /// ```
687 /// use tokio::runtime::Builder;
688 ///
689 /// let rt = Builder::new_multi_thread().build().unwrap();
690 ///
691 /// rt.block_on(async {
692 /// println!("Hello from the Tokio runtime");
693 /// });
694 /// ```
695 pub fn build(&mut self) -> io::Result<Runtime> {
696 match &self.kind {
697 Kind::CurrentThread => self.build_current_thread_runtime(),
698 #[cfg(feature = "rt-multi-thread")]
699 Kind::MultiThread => self.build_threaded_runtime(),
700 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
701 Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
702 }
703 }
704
705 fn get_cfg(&self, workers: usize) -> driver::Cfg {
706 driver::Cfg {
707 enable_pause_time: match self.kind {
708 Kind::CurrentThread => true,
709 #[cfg(feature = "rt-multi-thread")]
710 Kind::MultiThread => false,
711 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
712 Kind::MultiThreadAlt => false,
713 },
714 enable_io: self.enable_io,
715 enable_time: self.enable_time,
716 start_paused: self.start_paused,
717 nevents: self.nevents,
718 workers,
719 }
720 }
721
722 /// Sets a custom timeout for a thread in the blocking pool.
723 ///
724 /// By default, the timeout for a thread is set to 10 seconds. This can
725 /// be overridden using `.thread_keep_alive()`.
726 ///
727 /// # Example
728 ///
729 /// ```
730 /// # use tokio::runtime;
731 /// # use std::time::Duration;
732 /// # pub fn main() {
733 /// let rt = runtime::Builder::new_multi_thread()
734 /// .thread_keep_alive(Duration::from_millis(100))
735 /// .build();
736 /// # }
737 /// ```
738 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
739 self.keep_alive = Some(duration);
740 self
741 }
742
743 /// Sets the number of scheduler ticks after which the scheduler will poll the global
744 /// task queue.
745 ///
746 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
747 ///
748 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
749 /// [the module documentation] for the default behavior of the multi-thread scheduler.
750 ///
751 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
752 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
753 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
754 /// getting started on new work, especially if tasks frequently yield rather than complete
755 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
756 /// is a good choice when most tasks quickly complete polling.
757 ///
758 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
759 ///
760 /// # Panics
761 ///
762 /// This function will panic if 0 is passed as an argument.
763 ///
764 /// # Examples
765 ///
766 /// ```
767 /// # use tokio::runtime;
768 /// # pub fn main() {
769 /// let rt = runtime::Builder::new_multi_thread()
770 /// .global_queue_interval(31)
771 /// .build();
772 /// # }
773 /// ```
774 #[track_caller]
775 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
776 assert!(val > 0, "global_queue_interval must be greater than 0");
777 self.global_queue_interval = Some(val);
778 self
779 }
780
781 /// Sets the number of scheduler ticks after which the scheduler will poll for
782 /// external events (timers, I/O, and so on).
783 ///
784 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
785 ///
786 /// By default, the event interval is `61` for all scheduler types.
787 ///
788 /// Setting the event interval determines the effective "priority" of delivering
789 /// these external events (which may wake up additional tasks), compared to
790 /// executing tasks that are currently ready to run. A smaller value is useful
791 /// when tasks frequently spend a long time in polling, or frequently yield,
792 /// which can result in overly long delays picking up I/O events. Conversely,
793 /// picking up new events requires extra synchronization and syscall overhead,
794 /// so if tasks generally complete their polling quickly, a higher event interval
795 /// will minimize that overhead while still keeping the scheduler responsive to
796 /// events.
797 ///
798 /// # Examples
799 ///
800 /// ```
801 /// # use tokio::runtime;
802 /// # pub fn main() {
803 /// let rt = runtime::Builder::new_multi_thread()
804 /// .event_interval(31)
805 /// .build();
806 /// # }
807 /// ```
808 pub fn event_interval(&mut self, val: u32) -> &mut Self {
809 self.event_interval = val;
810 self
811 }
812
813 cfg_unstable! {
814 /// Configure how the runtime responds to an unhandled panic on a
815 /// spawned task.
816 ///
817 /// By default, an unhandled panic (i.e. a panic not caught by
818 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
819 /// execution. The panic is error value is forwarded to the task's
820 /// [`JoinHandle`] and all other spawned tasks continue running.
821 ///
822 /// The `unhandled_panic` option enables configuring this behavior.
823 ///
824 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
825 /// spawned tasks have no impact on the runtime's execution.
826 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
827 /// shutdown immediately when a spawned task panics even if that
828 /// task's `JoinHandle` has not been dropped. All other spawned tasks
829 /// will immediately terminate and further calls to
830 /// [`Runtime::block_on`] will panic.
831 ///
832 /// # Panics
833 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
834 /// on a runtime other than the current thread runtime.
835 ///
836 /// # Unstable
837 ///
838 /// This option is currently unstable and its implementation is
839 /// incomplete. The API may change or be removed in the future. See
840 /// issue [tokio-rs/tokio#4516] for more details.
841 ///
842 /// # Examples
843 ///
844 /// The following demonstrates a runtime configured to shutdown on
845 /// panic. The first spawned task panics and results in the runtime
846 /// shutting down. The second spawned task never has a chance to
847 /// execute. The call to `block_on` will panic due to the runtime being
848 /// forcibly shutdown.
849 ///
850 /// ```should_panic
851 /// use tokio::runtime::{self, UnhandledPanic};
852 ///
853 /// # pub fn main() {
854 /// let rt = runtime::Builder::new_current_thread()
855 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
856 /// .build()
857 /// .unwrap();
858 ///
859 /// rt.spawn(async { panic!("boom"); });
860 /// rt.spawn(async {
861 /// // This task never completes.
862 /// });
863 ///
864 /// rt.block_on(async {
865 /// // Do some work
866 /// # loop { tokio::task::yield_now().await; }
867 /// })
868 /// # }
869 /// ```
870 ///
871 /// [`JoinHandle`]: struct@crate::task::JoinHandle
872 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
873 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
874 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
875 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
876 }
877
878 self.unhandled_panic = behavior;
879 self
880 }
881
882 /// Disables the LIFO task scheduler heuristic.
883 ///
884 /// The multi-threaded scheduler includes a heuristic for optimizing
885 /// message-passing patterns. This heuristic results in the **last**
886 /// scheduled task being polled first.
887 ///
888 /// To implement this heuristic, each worker thread has a slot which
889 /// holds the task that should be polled next. However, this slot cannot
890 /// be stolen by other worker threads, which can result in lower total
891 /// throughput when tasks tend to have longer poll times.
892 ///
893 /// This configuration option will disable this heuristic resulting in
894 /// all scheduled tasks being pushed into the worker-local queue, which
895 /// is stealable.
896 ///
897 /// Consider trying this option when the task "scheduled" time is high
898 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
899 /// collect this data.
900 ///
901 /// # Unstable
902 ///
903 /// This configuration option is considered a workaround for the LIFO
904 /// slot not being stealable. When the slot becomes stealable, we will
905 /// revisit whether or not this option is necessary. See
906 /// issue [tokio-rs/tokio#4941].
907 ///
908 /// # Examples
909 ///
910 /// ```
911 /// use tokio::runtime;
912 ///
913 /// let rt = runtime::Builder::new_multi_thread()
914 /// .disable_lifo_slot()
915 /// .build()
916 /// .unwrap();
917 /// ```
918 ///
919 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
920 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
921 pub fn disable_lifo_slot(&mut self) -> &mut Self {
922 self.disable_lifo_slot = true;
923 self
924 }
925
926 /// Specifies the random number generation seed to use within all
927 /// threads associated with the runtime being built.
928 ///
929 /// This option is intended to make certain parts of the runtime
930 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
931 /// [`tokio::select!`] it will ensure that the order that branches are
932 /// polled is deterministic.
933 ///
934 /// In addition to the code specifying `rng_seed` and interacting with
935 /// the runtime, the internals of Tokio and the Rust compiler may affect
936 /// the sequences of random numbers. In order to ensure repeatable
937 /// results, the version of Tokio, the versions of all other
938 /// dependencies that interact with Tokio, and the Rust compiler version
939 /// should also all remain constant.
940 ///
941 /// # Examples
942 ///
943 /// ```
944 /// # use tokio::runtime::{self, RngSeed};
945 /// # pub fn main() {
946 /// let seed = RngSeed::from_bytes(b"place your seed here");
947 /// let rt = runtime::Builder::new_current_thread()
948 /// .rng_seed(seed)
949 /// .build();
950 /// # }
951 /// ```
952 ///
953 /// [`tokio::select!`]: crate::select
954 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
955 self.seed_generator = RngSeedGenerator::new(seed);
956 self
957 }
958 }
959
960 cfg_unstable_metrics! {
961 /// Enables tracking the distribution of task poll times.
962 ///
963 /// Task poll times are not instrumented by default as doing so requires
964 /// calling [`Instant::now()`] twice per task poll, which could add
965 /// measurable overhead. Use the [`Handle::metrics()`] to access the
966 /// metrics data.
967 ///
968 /// The histogram uses fixed bucket sizes. In other words, the histogram
969 /// buckets are not dynamic based on input values. Use the
970 /// `metrics_poll_count_histogram_` builder methods to configure the
971 /// histogram details.
972 ///
973 /// # Examples
974 ///
975 /// ```
976 /// use tokio::runtime;
977 ///
978 /// let rt = runtime::Builder::new_multi_thread()
979 /// .enable_metrics_poll_count_histogram()
980 /// .build()
981 /// .unwrap();
982 /// # // Test default values here
983 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
984 /// # let m = rt.handle().metrics();
985 /// # assert_eq!(m.poll_count_histogram_num_buckets(), 10);
986 /// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100));
987 /// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200));
988 /// ```
989 ///
990 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
991 /// [`Instant::now()`]: std::time::Instant::now
992 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
993 self.metrics_poll_count_histogram_enable = true;
994 self
995 }
996
997 /// Sets the histogram scale for tracking the distribution of task poll
998 /// times.
999 ///
1000 /// Tracking the distribution of task poll times can be done using a
1001 /// linear or log scale. When using linear scale, each histogram bucket
1002 /// will represent the same range of poll times. When using log scale,
1003 /// each histogram bucket will cover a range twice as big as the
1004 /// previous bucket.
1005 ///
1006 /// **Default:** linear scale.
1007 ///
1008 /// # Examples
1009 ///
1010 /// ```
1011 /// use tokio::runtime::{self, HistogramScale};
1012 ///
1013 /// let rt = runtime::Builder::new_multi_thread()
1014 /// .enable_metrics_poll_count_histogram()
1015 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1016 /// .build()
1017 /// .unwrap();
1018 /// ```
1019 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1020 self.metrics_poll_count_histogram.scale = histogram_scale;
1021 self
1022 }
1023
1024 /// Sets the histogram resolution for tracking the distribution of task
1025 /// poll times.
1026 ///
1027 /// The resolution is the histogram's first bucket's range. When using a
1028 /// linear histogram scale, each bucket will cover the same range. When
1029 /// using a log scale, each bucket will cover a range twice as big as
1030 /// the previous bucket. In the log case, the resolution represents the
1031 /// smallest bucket range.
1032 ///
1033 /// Note that, when using log scale, the resolution is rounded up to the
1034 /// nearest power of 2 in nanoseconds.
1035 ///
1036 /// **Default:** 100 microseconds.
1037 ///
1038 /// # Examples
1039 ///
1040 /// ```
1041 /// use tokio::runtime;
1042 /// use std::time::Duration;
1043 ///
1044 /// let rt = runtime::Builder::new_multi_thread()
1045 /// .enable_metrics_poll_count_histogram()
1046 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1047 /// .build()
1048 /// .unwrap();
1049 /// ```
1050 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1051 assert!(resolution > Duration::from_secs(0));
1052 // Sanity check the argument and also make the cast below safe.
1053 assert!(resolution <= Duration::from_secs(1));
1054
1055 let resolution = resolution.as_nanos() as u64;
1056 self.metrics_poll_count_histogram.resolution = resolution;
1057 self
1058 }
1059
1060 /// Sets the number of buckets for the histogram tracking the
1061 /// distribution of task poll times.
1062 ///
1063 /// The last bucket tracks all greater values that fall out of other
1064 /// ranges. So, configuring the histogram using a linear scale,
1065 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1066 /// polls that take more than 450ms to complete.
1067 ///
1068 /// **Default:** 10
1069 ///
1070 /// # Examples
1071 ///
1072 /// ```
1073 /// use tokio::runtime;
1074 ///
1075 /// let rt = runtime::Builder::new_multi_thread()
1076 /// .enable_metrics_poll_count_histogram()
1077 /// .metrics_poll_count_histogram_buckets(15)
1078 /// .build()
1079 /// .unwrap();
1080 /// ```
1081 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1082 self.metrics_poll_count_histogram.num_buckets = buckets;
1083 self
1084 }
1085 }
1086
1087 cfg_loom! {
1088 pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1089 assert!(value.is_power_of_two());
1090 self.local_queue_capacity = value;
1091 self
1092 }
1093 }
1094
1095 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1096 use crate::runtime::scheduler::{self, CurrentThread};
1097 use crate::runtime::{runtime::Scheduler, Config};
1098
1099 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1100
1101 // Blocking pool
1102 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1103 let blocking_spawner = blocking_pool.spawner().clone();
1104
1105 // Generate a rng seed for this runtime.
1106 let seed_generator_1 = self.seed_generator.next_generator();
1107 let seed_generator_2 = self.seed_generator.next_generator();
1108
1109 // And now put a single-threaded scheduler on top of the timer. When
1110 // there are no futures ready to do something, it'll let the timer or
1111 // the reactor to generate some new stimuli for the futures to continue
1112 // in their life.
1113 let (scheduler, handle) = CurrentThread::new(
1114 driver,
1115 driver_handle,
1116 blocking_spawner,
1117 seed_generator_2,
1118 Config {
1119 before_park: self.before_park.clone(),
1120 after_unpark: self.after_unpark.clone(),
1121 global_queue_interval: self.global_queue_interval,
1122 event_interval: self.event_interval,
1123 local_queue_capacity: self.local_queue_capacity,
1124 #[cfg(tokio_unstable)]
1125 unhandled_panic: self.unhandled_panic.clone(),
1126 disable_lifo_slot: self.disable_lifo_slot,
1127 seed_generator: seed_generator_1,
1128 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1129 },
1130 );
1131
1132 let handle = Handle {
1133 inner: scheduler::Handle::CurrentThread(handle),
1134 };
1135
1136 Ok(Runtime::from_parts(
1137 Scheduler::CurrentThread(scheduler),
1138 handle,
1139 blocking_pool,
1140 ))
1141 }
1142
1143 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1144 if self.metrics_poll_count_histogram_enable {
1145 Some(self.metrics_poll_count_histogram.clone())
1146 } else {
1147 None
1148 }
1149 }
1150}
1151
1152cfg_io_driver! {
1153 impl Builder {
1154 /// Enables the I/O driver.
1155 ///
1156 /// Doing this enables using net, process, signal, and some I/O types on
1157 /// the runtime.
1158 ///
1159 /// # Examples
1160 ///
1161 /// ```
1162 /// use tokio::runtime;
1163 ///
1164 /// let rt = runtime::Builder::new_multi_thread()
1165 /// .enable_io()
1166 /// .build()
1167 /// .unwrap();
1168 /// ```
1169 pub fn enable_io(&mut self) -> &mut Self {
1170 self.enable_io = true;
1171 self
1172 }
1173
1174 /// Enables the I/O driver and configures the max number of events to be
1175 /// processed per tick.
1176 ///
1177 /// # Examples
1178 ///
1179 /// ```
1180 /// use tokio::runtime;
1181 ///
1182 /// let rt = runtime::Builder::new_current_thread()
1183 /// .enable_io()
1184 /// .max_io_events_per_tick(1024)
1185 /// .build()
1186 /// .unwrap();
1187 /// ```
1188 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1189 self.nevents = capacity;
1190 self
1191 }
1192 }
1193}
1194
1195cfg_time! {
1196 impl Builder {
1197 /// Enables the time driver.
1198 ///
1199 /// Doing this enables using `tokio::time` on the runtime.
1200 ///
1201 /// # Examples
1202 ///
1203 /// ```
1204 /// use tokio::runtime;
1205 ///
1206 /// let rt = runtime::Builder::new_multi_thread()
1207 /// .enable_time()
1208 /// .build()
1209 /// .unwrap();
1210 /// ```
1211 pub fn enable_time(&mut self) -> &mut Self {
1212 self.enable_time = true;
1213 self
1214 }
1215 }
1216}
1217
1218cfg_test_util! {
1219 impl Builder {
1220 /// Controls if the runtime's clock starts paused or advancing.
1221 ///
1222 /// Pausing time requires the current-thread runtime; construction of
1223 /// the runtime will panic otherwise.
1224 ///
1225 /// # Examples
1226 ///
1227 /// ```
1228 /// use tokio::runtime;
1229 ///
1230 /// let rt = runtime::Builder::new_current_thread()
1231 /// .enable_time()
1232 /// .start_paused(true)
1233 /// .build()
1234 /// .unwrap();
1235 /// ```
1236 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1237 self.start_paused = start_paused;
1238 self
1239 }
1240 }
1241}
1242
1243cfg_rt_multi_thread! {
1244 impl Builder {
1245 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1246 use crate::loom::sys::num_cpus;
1247 use crate::runtime::{Config, runtime::Scheduler};
1248 use crate::runtime::scheduler::{self, MultiThread};
1249
1250 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1251
1252 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1253
1254 // Create the blocking pool
1255 let blocking_pool =
1256 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1257 let blocking_spawner = blocking_pool.spawner().clone();
1258
1259 // Generate a rng seed for this runtime.
1260 let seed_generator_1 = self.seed_generator.next_generator();
1261 let seed_generator_2 = self.seed_generator.next_generator();
1262
1263 let (scheduler, handle, launch) = MultiThread::new(
1264 core_threads,
1265 driver,
1266 driver_handle,
1267 blocking_spawner,
1268 seed_generator_2,
1269 Config {
1270 before_park: self.before_park.clone(),
1271 after_unpark: self.after_unpark.clone(),
1272 global_queue_interval: self.global_queue_interval,
1273 event_interval: self.event_interval,
1274 local_queue_capacity: self.local_queue_capacity,
1275 #[cfg(tokio_unstable)]
1276 unhandled_panic: self.unhandled_panic.clone(),
1277 disable_lifo_slot: self.disable_lifo_slot,
1278 seed_generator: seed_generator_1,
1279 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1280 },
1281 );
1282
1283 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1284
1285 // Spawn the thread pool workers
1286 let _enter = handle.enter();
1287 launch.launch();
1288
1289 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1290 }
1291
1292 cfg_unstable! {
1293 fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1294 use crate::loom::sys::num_cpus;
1295 use crate::runtime::{Config, runtime::Scheduler};
1296 use crate::runtime::scheduler::MultiThreadAlt;
1297
1298 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1299 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1300
1301 // Create the blocking pool
1302 let blocking_pool =
1303 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1304 let blocking_spawner = blocking_pool.spawner().clone();
1305
1306 // Generate a rng seed for this runtime.
1307 let seed_generator_1 = self.seed_generator.next_generator();
1308 let seed_generator_2 = self.seed_generator.next_generator();
1309
1310 let (scheduler, handle) = MultiThreadAlt::new(
1311 core_threads,
1312 driver,
1313 driver_handle,
1314 blocking_spawner,
1315 seed_generator_2,
1316 Config {
1317 before_park: self.before_park.clone(),
1318 after_unpark: self.after_unpark.clone(),
1319 global_queue_interval: self.global_queue_interval,
1320 event_interval: self.event_interval,
1321 local_queue_capacity: self.local_queue_capacity,
1322 #[cfg(tokio_unstable)]
1323 unhandled_panic: self.unhandled_panic.clone(),
1324 disable_lifo_slot: self.disable_lifo_slot,
1325 seed_generator: seed_generator_1,
1326 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1327 },
1328 );
1329
1330 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1331 }
1332 }
1333 }
1334}
1335
1336impl fmt::Debug for Builder {
1337 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1338 fmt.debug_struct("Builder")
1339 .field("worker_threads", &self.worker_threads)
1340 .field("max_blocking_threads", &self.max_blocking_threads)
1341 .field(
1342 "thread_name",
1343 &"<dyn Fn() -> String + Send + Sync + 'static>",
1344 )
1345 .field("thread_stack_size", &self.thread_stack_size)
1346 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1347 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1348 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1349 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1350 .finish()
1351 }
1352}