Skip to main content

tokio/runtime/
builder.rs

1use crate::runtime::handle::Handle;
2use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
3use crate::util::rand::{RngSeed, RngSeedGenerator};
4
5use std::fmt;
6use std::io;
7use std::time::Duration;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
15/// or [`Builder::new_current_thread`].
16///
17/// See function level documentation for details on the various configuration
18/// settings.
19///
20/// [`build`]: method@Self::build
21/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
22/// [`Builder::new_current_thread`]: method@Self::new_current_thread
23///
24/// # Examples
25///
26/// ```
27/// use tokio::runtime::Builder;
28///
29/// fn main() {
30///     // build runtime
31///     let runtime = Builder::new_multi_thread()
32///         .worker_threads(4)
33///         .thread_name("my-custom-name")
34///         .thread_stack_size(3 * 1024 * 1024)
35///         .build()
36///         .unwrap();
37///
38///     // use runtime ...
39/// }
40/// ```
41pub struct Builder {
42    /// Runtime type
43    kind: Kind,
44
45    /// Whether or not to enable the I/O driver
46    enable_io: bool,
47    nevents: usize,
48
49    /// Whether or not to enable the time driver
50    enable_time: bool,
51
52    /// Whether or not the clock should start paused.
53    start_paused: bool,
54
55    /// The number of worker threads, used by Runtime.
56    ///
57    /// Only used when not using the current-thread executor.
58    worker_threads: Option<usize>,
59
60    /// Cap on thread usage.
61    max_blocking_threads: usize,
62
63    /// Name fn used for threads spawned by the runtime.
64    pub(super) thread_name: ThreadNameFn,
65
66    /// Stack size used for threads spawned by the runtime.
67    pub(super) thread_stack_size: Option<usize>,
68
69    /// Callback to run after each thread starts.
70    pub(super) after_start: Option<Callback>,
71
72    /// To run before each worker thread stops
73    pub(super) before_stop: Option<Callback>,
74
75    /// To run before each worker thread is parked.
76    pub(super) before_park: Option<Callback>,
77
78    /// To run after each thread is unparked.
79    pub(super) after_unpark: Option<Callback>,
80
81    /// Customizable keep alive timeout for `BlockingPool`
82    pub(super) keep_alive: Option<Duration>,
83
84    /// How many ticks before pulling a task from the global/remote queue?
85    ///
86    /// When `None`, the value is unspecified and behavior details are left to
87    /// the scheduler. Each scheduler flavor could choose to either pick its own
88    /// default value or use some other strategy to decide when to poll from the
89    /// global queue. For example, the multi-threaded scheduler uses a
90    /// self-tuning strategy based on mean task poll times.
91    pub(super) global_queue_interval: Option<u32>,
92
93    /// How many ticks before yielding to the driver for timer and I/O events?
94    pub(super) event_interval: u32,
95
96    pub(super) local_queue_capacity: usize,
97
98    /// When true, the multi-threade scheduler LIFO slot should not be used.
99    ///
100    /// This option should only be exposed as unstable.
101    pub(super) disable_lifo_slot: bool,
102
103    /// Specify a random number generator seed to provide deterministic results
104    pub(super) seed_generator: RngSeedGenerator,
105
106    /// When true, enables task poll count histogram instrumentation.
107    pub(super) metrics_poll_count_histogram_enable: bool,
108
109    /// Configures the task poll count histogram
110    pub(super) metrics_poll_count_histogram: HistogramBuilder,
111
112    #[cfg(tokio_unstable)]
113    pub(super) unhandled_panic: UnhandledPanic,
114}
115
116cfg_unstable! {
117    /// How the runtime should respond to unhandled panics.
118    ///
119    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
120    /// to configure the runtime behavior when a spawned task panics.
121    ///
122    /// See [`Builder::unhandled_panic`] for more details.
123    #[derive(Debug, Clone)]
124    #[non_exhaustive]
125    pub enum UnhandledPanic {
126        /// The runtime should ignore panics on spawned tasks.
127        ///
128        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
129        /// tasks continue running normally.
130        ///
131        /// This is the default behavior.
132        ///
133        /// # Examples
134        ///
135        /// ```
136        /// use tokio::runtime::{self, UnhandledPanic};
137        ///
138        /// # pub fn main() {
139        /// let rt = runtime::Builder::new_current_thread()
140        ///     .unhandled_panic(UnhandledPanic::Ignore)
141        ///     .build()
142        ///     .unwrap();
143        ///
144        /// let task1 = rt.spawn(async { panic!("boom"); });
145        /// let task2 = rt.spawn(async {
146        ///     // This task completes normally
147        ///     "done"
148        /// });
149        ///
150        /// rt.block_on(async {
151        ///     // The panic on the first task is forwarded to the `JoinHandle`
152        ///     assert!(task1.await.is_err());
153        ///
154        ///     // The second task completes normally
155        ///     assert!(task2.await.is_ok());
156        /// })
157        /// # }
158        /// ```
159        ///
160        /// [`JoinHandle`]: struct@crate::task::JoinHandle
161        Ignore,
162
163        /// The runtime should immediately shutdown if a spawned task panics.
164        ///
165        /// The runtime will immediately shutdown even if the panicked task's
166        /// [`JoinHandle`] is still available. All further spawned tasks will be
167        /// immediately dropped and call to [`Runtime::block_on`] will panic.
168        ///
169        /// # Examples
170        ///
171        /// ```should_panic
172        /// use tokio::runtime::{self, UnhandledPanic};
173        ///
174        /// # pub fn main() {
175        /// let rt = runtime::Builder::new_current_thread()
176        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
177        ///     .build()
178        ///     .unwrap();
179        ///
180        /// rt.spawn(async { panic!("boom"); });
181        /// rt.spawn(async {
182        ///     // This task never completes.
183        /// });
184        ///
185        /// rt.block_on(async {
186        ///     // Do some work
187        /// # loop { tokio::task::yield_now().await; }
188        /// })
189        /// # }
190        /// ```
191        ///
192        /// [`JoinHandle`]: struct@crate::task::JoinHandle
193        ShutdownRuntime,
194    }
195}
196
197pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
198
199#[derive(Clone, Copy)]
200pub(crate) enum Kind {
201    CurrentThread,
202    #[cfg(feature = "rt-multi-thread")]
203    MultiThread,
204    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
205    MultiThreadAlt,
206}
207
208impl Builder {
209    /// Returns a new builder with the current thread scheduler selected.
210    ///
211    /// Configuration methods can be chained on the return value.
212    ///
213    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
214    /// [`LocalSet`].
215    ///
216    /// [`LocalSet`]: crate::task::LocalSet
217    pub fn new_current_thread() -> Builder {
218        #[cfg(loom)]
219        const EVENT_INTERVAL: u32 = 4;
220        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
221        #[cfg(not(loom))]
222        const EVENT_INTERVAL: u32 = 61;
223
224        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
225    }
226
227    /// Returns a new builder with the multi thread scheduler selected.
228    ///
229    /// Configuration methods can be chained on the return value.
230    #[cfg(feature = "rt-multi-thread")]
231    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
232    pub fn new_multi_thread() -> Builder {
233        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
234        Builder::new(Kind::MultiThread, 61)
235    }
236
237    cfg_unstable! {
238        /// Returns a new builder with the alternate multi thread scheduler
239        /// selected.
240        ///
241        /// The alternate multi threaded scheduler is an in-progress
242        /// candidate to replace the existing multi threaded scheduler. It
243        /// currently does not scale as well to 16+ processors.
244        ///
245        /// This runtime flavor is currently **not considered production
246        /// ready**.
247        ///
248        /// Configuration methods can be chained on the return value.
249        #[cfg(feature = "rt-multi-thread")]
250        #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
251        pub fn new_multi_thread_alt() -> Builder {
252            // The number `61` is fairly arbitrary. I believe this value was copied from golang.
253            Builder::new(Kind::MultiThreadAlt, 61)
254        }
255    }
256
257    /// Returns a new runtime builder initialized with default configuration
258    /// values.
259    ///
260    /// Configuration methods can be chained on the return value.
261    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
262        Builder {
263            kind,
264
265            // I/O defaults to "off"
266            enable_io: false,
267            nevents: 1024,
268
269            // Time defaults to "off"
270            enable_time: false,
271
272            // The clock starts not-paused
273            start_paused: false,
274
275            // Read from environment variable first in multi-threaded mode.
276            // Default to lazy auto-detection (one thread per CPU core)
277            worker_threads: None,
278
279            max_blocking_threads: 512,
280
281            // Default thread name
282            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
283
284            // Do not set a stack size by default
285            thread_stack_size: None,
286
287            // No worker thread callbacks
288            after_start: None,
289            before_stop: None,
290            before_park: None,
291            after_unpark: None,
292
293            keep_alive: None,
294
295            // Defaults for these values depend on the scheduler kind, so we get them
296            // as parameters.
297            global_queue_interval: None,
298            event_interval,
299
300            #[cfg(not(loom))]
301            local_queue_capacity: 256,
302
303            #[cfg(loom)]
304            local_queue_capacity: 4,
305
306            seed_generator: RngSeedGenerator::new(RngSeed::new()),
307
308            #[cfg(tokio_unstable)]
309            unhandled_panic: UnhandledPanic::Ignore,
310
311            metrics_poll_count_histogram_enable: false,
312
313            metrics_poll_count_histogram: HistogramBuilder::default(),
314
315            disable_lifo_slot: false,
316        }
317    }
318
319    /// Enables both I/O and time drivers.
320    ///
321    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
322    /// individually. If additional components are added to Tokio in the future,
323    /// `enable_all` will include these future components.
324    ///
325    /// # Examples
326    ///
327    /// ```
328    /// use tokio::runtime;
329    ///
330    /// let rt = runtime::Builder::new_multi_thread()
331    ///     .enable_all()
332    ///     .build()
333    ///     .unwrap();
334    /// ```
335    pub fn enable_all(&mut self) -> &mut Self {
336        #[cfg(any(
337            feature = "net",
338            all(unix, feature = "process"),
339            all(unix, feature = "signal")
340        ))]
341        self.enable_io();
342        #[cfg(feature = "time")]
343        self.enable_time();
344
345        self
346    }
347
348    /// Sets the number of worker threads the `Runtime` will use.
349    ///
350    /// This can be any number above 0 though it is advised to keep this value
351    /// on the smaller side.
352    ///
353    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
354    ///
355    /// # Default
356    ///
357    /// The default value is the number of cores available to the system.
358    ///
359    /// When using the `current_thread` runtime this method has no effect.
360    ///
361    /// # Examples
362    ///
363    /// ## Multi threaded runtime with 4 threads
364    ///
365    /// ```
366    /// use tokio::runtime;
367    ///
368    /// // This will spawn a work-stealing runtime with 4 worker threads.
369    /// let rt = runtime::Builder::new_multi_thread()
370    ///     .worker_threads(4)
371    ///     .build()
372    ///     .unwrap();
373    ///
374    /// rt.spawn(async move {});
375    /// ```
376    ///
377    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
378    ///
379    /// ```
380    /// use tokio::runtime;
381    ///
382    /// // Create a runtime that _must_ be driven from a call
383    /// // to `Runtime::block_on`.
384    /// let rt = runtime::Builder::new_current_thread()
385    ///     .build()
386    ///     .unwrap();
387    ///
388    /// // This will run the runtime and future on the current thread
389    /// rt.block_on(async move {});
390    /// ```
391    ///
392    /// # Panics
393    ///
394    /// This will panic if `val` is not larger than `0`.
395    #[track_caller]
396    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
397        assert!(val > 0, "Worker threads cannot be set to 0");
398        self.worker_threads = Some(val);
399        self
400    }
401
402    /// Specifies the limit for additional threads spawned by the Runtime.
403    ///
404    /// These threads are used for blocking operations like tasks spawned
405    /// through [`spawn_blocking`], this includes but is not limited to:
406    /// - [`fs`] operations
407    /// - dns resolution through [`ToSocketAddrs`]
408    /// - writing to [`Stdout`] or [`Stderr`]
409    /// - reading from [`Stdin`]
410    ///
411    /// Unlike the [`worker_threads`], they are not always active and will exit
412    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
413    ///
414    /// It's recommended to not set this limit too low in order to avoid hanging on operations
415    /// requiring [`spawn_blocking`].
416    ///
417    /// The default value is 512.
418    ///
419    /// # Panics
420    ///
421    /// This will panic if `val` is not larger than `0`.
422    ///
423    /// # Upgrading from 0.x
424    ///
425    /// In old versions `max_threads` limited both blocking and worker threads, but the
426    /// current `max_blocking_threads` does not include async worker threads in the count.
427    ///
428    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
429    /// [`fs`]: mod@crate::fs
430    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
431    /// [`Stdout`]: struct@crate::io::Stdout
432    /// [`Stdin`]: struct@crate::io::Stdin
433    /// [`Stderr`]: struct@crate::io::Stderr
434    /// [`worker_threads`]: Self::worker_threads
435    /// [`thread_keep_alive`]: Self::thread_keep_alive
436    #[track_caller]
437    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
438    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
439        assert!(val > 0, "Max blocking threads cannot be set to 0");
440        self.max_blocking_threads = val;
441        self
442    }
443
444    /// Sets name of threads spawned by the `Runtime`'s thread pool.
445    ///
446    /// The default name is "tokio-runtime-worker".
447    ///
448    /// # Examples
449    ///
450    /// ```
451    /// # use tokio::runtime;
452    ///
453    /// # pub fn main() {
454    /// let rt = runtime::Builder::new_multi_thread()
455    ///     .thread_name("my-pool")
456    ///     .build();
457    /// # }
458    /// ```
459    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
460        let val = val.into();
461        self.thread_name = std::sync::Arc::new(move || val.clone());
462        self
463    }
464
465    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
466    ///
467    /// The default name fn is `|| "tokio-runtime-worker".into()`.
468    ///
469    /// # Examples
470    ///
471    /// ```
472    /// # use tokio::runtime;
473    /// # use std::sync::atomic::{AtomicUsize, Ordering};
474    /// # pub fn main() {
475    /// let rt = runtime::Builder::new_multi_thread()
476    ///     .thread_name_fn(|| {
477    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
478    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
479    ///        format!("my-pool-{}", id)
480    ///     })
481    ///     .build();
482    /// # }
483    /// ```
484    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
485    where
486        F: Fn() -> String + Send + Sync + 'static,
487    {
488        self.thread_name = std::sync::Arc::new(f);
489        self
490    }
491
492    /// Sets the stack size (in bytes) for worker threads.
493    ///
494    /// The actual stack size may be greater than this value if the platform
495    /// specifies minimal stack size.
496    ///
497    /// The default stack size for spawned threads is 2 MiB, though this
498    /// particular stack size is subject to change in the future.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// # use tokio::runtime;
504    ///
505    /// # pub fn main() {
506    /// let rt = runtime::Builder::new_multi_thread()
507    ///     .thread_stack_size(32 * 1024)
508    ///     .build();
509    /// # }
510    /// ```
511    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
512        self.thread_stack_size = Some(val);
513        self
514    }
515
516    /// Executes function `f` after each thread is started but before it starts
517    /// doing work.
518    ///
519    /// This is intended for bookkeeping and monitoring use cases.
520    ///
521    /// # Examples
522    ///
523    /// ```
524    /// # use tokio::runtime;
525    /// # pub fn main() {
526    /// let runtime = runtime::Builder::new_multi_thread()
527    ///     .on_thread_start(|| {
528    ///         println!("thread started");
529    ///     })
530    ///     .build();
531    /// # }
532    /// ```
533    #[cfg(not(loom))]
534    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
535    where
536        F: Fn() + Send + Sync + 'static,
537    {
538        self.after_start = Some(std::sync::Arc::new(f));
539        self
540    }
541
542    /// Executes function `f` before each thread stops.
543    ///
544    /// This is intended for bookkeeping and monitoring use cases.
545    ///
546    /// # Examples
547    ///
548    /// ```
549    /// # use tokio::runtime;
550    /// # pub fn main() {
551    /// let runtime = runtime::Builder::new_multi_thread()
552    ///     .on_thread_stop(|| {
553    ///         println!("thread stopping");
554    ///     })
555    ///     .build();
556    /// # }
557    /// ```
558    #[cfg(not(loom))]
559    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
560    where
561        F: Fn() + Send + Sync + 'static,
562    {
563        self.before_stop = Some(std::sync::Arc::new(f));
564        self
565    }
566
567    /// Executes function `f` just before a thread is parked (goes idle).
568    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
569    /// can be called, and may result in this thread being unparked immediately.
570    ///
571    /// This can be used to start work only when the executor is idle, or for bookkeeping
572    /// and monitoring purposes.
573    ///
574    /// Note: There can only be one park callback for a runtime; calling this function
575    /// more than once replaces the last callback defined, rather than adding to it.
576    ///
577    /// # Examples
578    ///
579    /// ## Multithreaded executor
580    /// ```
581    /// # use std::sync::Arc;
582    /// # use std::sync::atomic::{AtomicBool, Ordering};
583    /// # use tokio::runtime;
584    /// # use tokio::sync::Barrier;
585    /// # pub fn main() {
586    /// let once = AtomicBool::new(true);
587    /// let barrier = Arc::new(Barrier::new(2));
588    ///
589    /// let runtime = runtime::Builder::new_multi_thread()
590    ///     .worker_threads(1)
591    ///     .on_thread_park({
592    ///         let barrier = barrier.clone();
593    ///         move || {
594    ///             let barrier = barrier.clone();
595    ///             if once.swap(false, Ordering::Relaxed) {
596    ///                 tokio::spawn(async move { barrier.wait().await; });
597    ///            }
598    ///         }
599    ///     })
600    ///     .build()
601    ///     .unwrap();
602    ///
603    /// runtime.block_on(async {
604    ///    barrier.wait().await;
605    /// })
606    /// # }
607    /// ```
608    /// ## Current thread executor
609    /// ```
610    /// # use std::sync::Arc;
611    /// # use std::sync::atomic::{AtomicBool, Ordering};
612    /// # use tokio::runtime;
613    /// # use tokio::sync::Barrier;
614    /// # pub fn main() {
615    /// let once = AtomicBool::new(true);
616    /// let barrier = Arc::new(Barrier::new(2));
617    ///
618    /// let runtime = runtime::Builder::new_current_thread()
619    ///     .on_thread_park({
620    ///         let barrier = barrier.clone();
621    ///         move || {
622    ///             let barrier = barrier.clone();
623    ///             if once.swap(false, Ordering::Relaxed) {
624    ///                 tokio::spawn(async move { barrier.wait().await; });
625    ///            }
626    ///         }
627    ///     })
628    ///     .build()
629    ///     .unwrap();
630    ///
631    /// runtime.block_on(async {
632    ///    barrier.wait().await;
633    /// })
634    /// # }
635    /// ```
636    #[cfg(not(loom))]
637    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
638    where
639        F: Fn() + Send + Sync + 'static,
640    {
641        self.before_park = Some(std::sync::Arc::new(f));
642        self
643    }
644
645    /// Executes function `f` just after a thread unparks (starts executing tasks).
646    ///
647    /// This is intended for bookkeeping and monitoring use cases; note that work
648    /// in this callback will increase latencies when the application has allowed one or
649    /// more runtime threads to go idle.
650    ///
651    /// Note: There can only be one unpark callback for a runtime; calling this function
652    /// more than once replaces the last callback defined, rather than adding to it.
653    ///
654    /// # Examples
655    ///
656    /// ```
657    /// # use tokio::runtime;
658    /// # pub fn main() {
659    /// let runtime = runtime::Builder::new_multi_thread()
660    ///     .on_thread_unpark(|| {
661    ///         println!("thread unparking");
662    ///     })
663    ///     .build();
664    ///
665    /// runtime.unwrap().block_on(async {
666    ///    tokio::task::yield_now().await;
667    ///    println!("Hello from Tokio!");
668    /// })
669    /// # }
670    /// ```
671    #[cfg(not(loom))]
672    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
673    where
674        F: Fn() + Send + Sync + 'static,
675    {
676        self.after_unpark = Some(std::sync::Arc::new(f));
677        self
678    }
679
680    /// Creates the configured `Runtime`.
681    ///
682    /// The returned `Runtime` instance is ready to spawn tasks.
683    ///
684    /// # Examples
685    ///
686    /// ```
687    /// use tokio::runtime::Builder;
688    ///
689    /// let rt  = Builder::new_multi_thread().build().unwrap();
690    ///
691    /// rt.block_on(async {
692    ///     println!("Hello from the Tokio runtime");
693    /// });
694    /// ```
695    pub fn build(&mut self) -> io::Result<Runtime> {
696        match &self.kind {
697            Kind::CurrentThread => self.build_current_thread_runtime(),
698            #[cfg(feature = "rt-multi-thread")]
699            Kind::MultiThread => self.build_threaded_runtime(),
700            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
701            Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
702        }
703    }
704
705    fn get_cfg(&self, workers: usize) -> driver::Cfg {
706        driver::Cfg {
707            enable_pause_time: match self.kind {
708                Kind::CurrentThread => true,
709                #[cfg(feature = "rt-multi-thread")]
710                Kind::MultiThread => false,
711                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
712                Kind::MultiThreadAlt => false,
713            },
714            enable_io: self.enable_io,
715            enable_time: self.enable_time,
716            start_paused: self.start_paused,
717            nevents: self.nevents,
718            workers,
719        }
720    }
721
722    /// Sets a custom timeout for a thread in the blocking pool.
723    ///
724    /// By default, the timeout for a thread is set to 10 seconds. This can
725    /// be overridden using `.thread_keep_alive()`.
726    ///
727    /// # Example
728    ///
729    /// ```
730    /// # use tokio::runtime;
731    /// # use std::time::Duration;
732    /// # pub fn main() {
733    /// let rt = runtime::Builder::new_multi_thread()
734    ///     .thread_keep_alive(Duration::from_millis(100))
735    ///     .build();
736    /// # }
737    /// ```
738    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
739        self.keep_alive = Some(duration);
740        self
741    }
742
743    /// Sets the number of scheduler ticks after which the scheduler will poll the global
744    /// task queue.
745    ///
746    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
747    ///
748    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
749    /// [the module documentation] for the default behavior of the multi-thread scheduler.
750    ///
751    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
752    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
753    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
754    /// getting started on new work, especially if tasks frequently yield rather than complete
755    /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
756    /// is a good choice when most tasks quickly complete polling.
757    ///
758    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
759    ///
760    /// # Panics
761    ///
762    /// This function will panic if 0 is passed as an argument.
763    ///
764    /// # Examples
765    ///
766    /// ```
767    /// # use tokio::runtime;
768    /// # pub fn main() {
769    /// let rt = runtime::Builder::new_multi_thread()
770    ///     .global_queue_interval(31)
771    ///     .build();
772    /// # }
773    /// ```
774    #[track_caller]
775    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
776        assert!(val > 0, "global_queue_interval must be greater than 0");
777        self.global_queue_interval = Some(val);
778        self
779    }
780
781    /// Sets the number of scheduler ticks after which the scheduler will poll for
782    /// external events (timers, I/O, and so on).
783    ///
784    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
785    ///
786    /// By default, the event interval is `61` for all scheduler types.
787    ///
788    /// Setting the event interval determines the effective "priority" of delivering
789    /// these external events (which may wake up additional tasks), compared to
790    /// executing tasks that are currently ready to run. A smaller value is useful
791    /// when tasks frequently spend a long time in polling, or frequently yield,
792    /// which can result in overly long delays picking up I/O events. Conversely,
793    /// picking up new events requires extra synchronization and syscall overhead,
794    /// so if tasks generally complete their polling quickly, a higher event interval
795    /// will minimize that overhead while still keeping the scheduler responsive to
796    /// events.
797    ///
798    /// # Examples
799    ///
800    /// ```
801    /// # use tokio::runtime;
802    /// # pub fn main() {
803    /// let rt = runtime::Builder::new_multi_thread()
804    ///     .event_interval(31)
805    ///     .build();
806    /// # }
807    /// ```
808    pub fn event_interval(&mut self, val: u32) -> &mut Self {
809        self.event_interval = val;
810        self
811    }
812
813    cfg_unstable! {
814        /// Configure how the runtime responds to an unhandled panic on a
815        /// spawned task.
816        ///
817        /// By default, an unhandled panic (i.e. a panic not caught by
818        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
819        /// execution. The panic is error value is forwarded to the task's
820        /// [`JoinHandle`] and all other spawned tasks continue running.
821        ///
822        /// The `unhandled_panic` option enables configuring this behavior.
823        ///
824        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
825        ///   spawned tasks have no impact on the runtime's execution.
826        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
827        ///   shutdown immediately when a spawned task panics even if that
828        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
829        ///   will immediately terminate and further calls to
830        ///   [`Runtime::block_on`] will panic.
831        ///
832        /// # Panics
833        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
834        /// on a runtime other than the current thread runtime.
835        ///
836        /// # Unstable
837        ///
838        /// This option is currently unstable and its implementation is
839        /// incomplete. The API may change or be removed in the future. See
840        /// issue [tokio-rs/tokio#4516] for more details.
841        ///
842        /// # Examples
843        ///
844        /// The following demonstrates a runtime configured to shutdown on
845        /// panic. The first spawned task panics and results in the runtime
846        /// shutting down. The second spawned task never has a chance to
847        /// execute. The call to `block_on` will panic due to the runtime being
848        /// forcibly shutdown.
849        ///
850        /// ```should_panic
851        /// use tokio::runtime::{self, UnhandledPanic};
852        ///
853        /// # pub fn main() {
854        /// let rt = runtime::Builder::new_current_thread()
855        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
856        ///     .build()
857        ///     .unwrap();
858        ///
859        /// rt.spawn(async { panic!("boom"); });
860        /// rt.spawn(async {
861        ///     // This task never completes.
862        /// });
863        ///
864        /// rt.block_on(async {
865        ///     // Do some work
866        /// # loop { tokio::task::yield_now().await; }
867        /// })
868        /// # }
869        /// ```
870        ///
871        /// [`JoinHandle`]: struct@crate::task::JoinHandle
872        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
873        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
874            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
875                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
876            }
877
878            self.unhandled_panic = behavior;
879            self
880        }
881
882        /// Disables the LIFO task scheduler heuristic.
883        ///
884        /// The multi-threaded scheduler includes a heuristic for optimizing
885        /// message-passing patterns. This heuristic results in the **last**
886        /// scheduled task being polled first.
887        ///
888        /// To implement this heuristic, each worker thread has a slot which
889        /// holds the task that should be polled next. However, this slot cannot
890        /// be stolen by other worker threads, which can result in lower total
891        /// throughput when tasks tend to have longer poll times.
892        ///
893        /// This configuration option will disable this heuristic resulting in
894        /// all scheduled tasks being pushed into the worker-local queue, which
895        /// is stealable.
896        ///
897        /// Consider trying this option when the task "scheduled" time is high
898        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
899        /// collect this data.
900        ///
901        /// # Unstable
902        ///
903        /// This configuration option is considered a workaround for the LIFO
904        /// slot not being stealable. When the slot becomes stealable, we will
905        /// revisit whether or not this option is necessary. See
906        /// issue [tokio-rs/tokio#4941].
907        ///
908        /// # Examples
909        ///
910        /// ```
911        /// use tokio::runtime;
912        ///
913        /// let rt = runtime::Builder::new_multi_thread()
914        ///     .disable_lifo_slot()
915        ///     .build()
916        ///     .unwrap();
917        /// ```
918        ///
919        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
920        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
921        pub fn disable_lifo_slot(&mut self) -> &mut Self {
922            self.disable_lifo_slot = true;
923            self
924        }
925
926        /// Specifies the random number generation seed to use within all
927        /// threads associated with the runtime being built.
928        ///
929        /// This option is intended to make certain parts of the runtime
930        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
931        /// [`tokio::select!`] it will ensure that the order that branches are
932        /// polled is deterministic.
933        ///
934        /// In addition to the code specifying `rng_seed` and interacting with
935        /// the runtime, the internals of Tokio and the Rust compiler may affect
936        /// the sequences of random numbers. In order to ensure repeatable
937        /// results, the version of Tokio, the versions of all other
938        /// dependencies that interact with Tokio, and the Rust compiler version
939        /// should also all remain constant.
940        ///
941        /// # Examples
942        ///
943        /// ```
944        /// # use tokio::runtime::{self, RngSeed};
945        /// # pub fn main() {
946        /// let seed = RngSeed::from_bytes(b"place your seed here");
947        /// let rt = runtime::Builder::new_current_thread()
948        ///     .rng_seed(seed)
949        ///     .build();
950        /// # }
951        /// ```
952        ///
953        /// [`tokio::select!`]: crate::select
954        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
955            self.seed_generator = RngSeedGenerator::new(seed);
956            self
957        }
958    }
959
960    cfg_unstable_metrics! {
961        /// Enables tracking the distribution of task poll times.
962        ///
963        /// Task poll times are not instrumented by default as doing so requires
964        /// calling [`Instant::now()`] twice per task poll, which could add
965        /// measurable overhead. Use the [`Handle::metrics()`] to access the
966        /// metrics data.
967        ///
968        /// The histogram uses fixed bucket sizes. In other words, the histogram
969        /// buckets are not dynamic based on input values. Use the
970        /// `metrics_poll_count_histogram_` builder methods to configure the
971        /// histogram details.
972        ///
973        /// # Examples
974        ///
975        /// ```
976        /// use tokio::runtime;
977        ///
978        /// let rt = runtime::Builder::new_multi_thread()
979        ///     .enable_metrics_poll_count_histogram()
980        ///     .build()
981        ///     .unwrap();
982        /// # // Test default values here
983        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
984        /// # let m = rt.handle().metrics();
985        /// # assert_eq!(m.poll_count_histogram_num_buckets(), 10);
986        /// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100));
987        /// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200));
988        /// ```
989        ///
990        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
991        /// [`Instant::now()`]: std::time::Instant::now
992        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
993            self.metrics_poll_count_histogram_enable = true;
994            self
995        }
996
997        /// Sets the histogram scale for tracking the distribution of task poll
998        /// times.
999        ///
1000        /// Tracking the distribution of task poll times can be done using a
1001        /// linear or log scale. When using linear scale, each histogram bucket
1002        /// will represent the same range of poll times. When using log scale,
1003        /// each histogram bucket will cover a range twice as big as the
1004        /// previous bucket.
1005        ///
1006        /// **Default:** linear scale.
1007        ///
1008        /// # Examples
1009        ///
1010        /// ```
1011        /// use tokio::runtime::{self, HistogramScale};
1012        ///
1013        /// let rt = runtime::Builder::new_multi_thread()
1014        ///     .enable_metrics_poll_count_histogram()
1015        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1016        ///     .build()
1017        ///     .unwrap();
1018        /// ```
1019        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1020            self.metrics_poll_count_histogram.scale = histogram_scale;
1021            self
1022        }
1023
1024        /// Sets the histogram resolution for tracking the distribution of task
1025        /// poll times.
1026        ///
1027        /// The resolution is the histogram's first bucket's range. When using a
1028        /// linear histogram scale, each bucket will cover the same range. When
1029        /// using a log scale, each bucket will cover a range twice as big as
1030        /// the previous bucket. In the log case, the resolution represents the
1031        /// smallest bucket range.
1032        ///
1033        /// Note that, when using log scale, the resolution is rounded up to the
1034        /// nearest power of 2 in nanoseconds.
1035        ///
1036        /// **Default:** 100 microseconds.
1037        ///
1038        /// # Examples
1039        ///
1040        /// ```
1041        /// use tokio::runtime;
1042        /// use std::time::Duration;
1043        ///
1044        /// let rt = runtime::Builder::new_multi_thread()
1045        ///     .enable_metrics_poll_count_histogram()
1046        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1047        ///     .build()
1048        ///     .unwrap();
1049        /// ```
1050        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1051            assert!(resolution > Duration::from_secs(0));
1052            // Sanity check the argument and also make the cast below safe.
1053            assert!(resolution <= Duration::from_secs(1));
1054
1055            let resolution = resolution.as_nanos() as u64;
1056            self.metrics_poll_count_histogram.resolution = resolution;
1057            self
1058        }
1059
1060        /// Sets the number of buckets for the histogram tracking the
1061        /// distribution of task poll times.
1062        ///
1063        /// The last bucket tracks all greater values that fall out of other
1064        /// ranges. So, configuring the histogram using a linear scale,
1065        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1066        /// polls that take more than 450ms to complete.
1067        ///
1068        /// **Default:** 10
1069        ///
1070        /// # Examples
1071        ///
1072        /// ```
1073        /// use tokio::runtime;
1074        ///
1075        /// let rt = runtime::Builder::new_multi_thread()
1076        ///     .enable_metrics_poll_count_histogram()
1077        ///     .metrics_poll_count_histogram_buckets(15)
1078        ///     .build()
1079        ///     .unwrap();
1080        /// ```
1081        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1082            self.metrics_poll_count_histogram.num_buckets = buckets;
1083            self
1084        }
1085    }
1086
1087    cfg_loom! {
1088        pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1089            assert!(value.is_power_of_two());
1090            self.local_queue_capacity = value;
1091            self
1092        }
1093    }
1094
1095    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1096        use crate::runtime::scheduler::{self, CurrentThread};
1097        use crate::runtime::{runtime::Scheduler, Config};
1098
1099        let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1100
1101        // Blocking pool
1102        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1103        let blocking_spawner = blocking_pool.spawner().clone();
1104
1105        // Generate a rng seed for this runtime.
1106        let seed_generator_1 = self.seed_generator.next_generator();
1107        let seed_generator_2 = self.seed_generator.next_generator();
1108
1109        // And now put a single-threaded scheduler on top of the timer. When
1110        // there are no futures ready to do something, it'll let the timer or
1111        // the reactor to generate some new stimuli for the futures to continue
1112        // in their life.
1113        let (scheduler, handle) = CurrentThread::new(
1114            driver,
1115            driver_handle,
1116            blocking_spawner,
1117            seed_generator_2,
1118            Config {
1119                before_park: self.before_park.clone(),
1120                after_unpark: self.after_unpark.clone(),
1121                global_queue_interval: self.global_queue_interval,
1122                event_interval: self.event_interval,
1123                local_queue_capacity: self.local_queue_capacity,
1124                #[cfg(tokio_unstable)]
1125                unhandled_panic: self.unhandled_panic.clone(),
1126                disable_lifo_slot: self.disable_lifo_slot,
1127                seed_generator: seed_generator_1,
1128                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1129            },
1130        );
1131
1132        let handle = Handle {
1133            inner: scheduler::Handle::CurrentThread(handle),
1134        };
1135
1136        Ok(Runtime::from_parts(
1137            Scheduler::CurrentThread(scheduler),
1138            handle,
1139            blocking_pool,
1140        ))
1141    }
1142
1143    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1144        if self.metrics_poll_count_histogram_enable {
1145            Some(self.metrics_poll_count_histogram.clone())
1146        } else {
1147            None
1148        }
1149    }
1150}
1151
1152cfg_io_driver! {
1153    impl Builder {
1154        /// Enables the I/O driver.
1155        ///
1156        /// Doing this enables using net, process, signal, and some I/O types on
1157        /// the runtime.
1158        ///
1159        /// # Examples
1160        ///
1161        /// ```
1162        /// use tokio::runtime;
1163        ///
1164        /// let rt = runtime::Builder::new_multi_thread()
1165        ///     .enable_io()
1166        ///     .build()
1167        ///     .unwrap();
1168        /// ```
1169        pub fn enable_io(&mut self) -> &mut Self {
1170            self.enable_io = true;
1171            self
1172        }
1173
1174        /// Enables the I/O driver and configures the max number of events to be
1175        /// processed per tick.
1176        ///
1177        /// # Examples
1178        ///
1179        /// ```
1180        /// use tokio::runtime;
1181        ///
1182        /// let rt = runtime::Builder::new_current_thread()
1183        ///     .enable_io()
1184        ///     .max_io_events_per_tick(1024)
1185        ///     .build()
1186        ///     .unwrap();
1187        /// ```
1188        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1189            self.nevents = capacity;
1190            self
1191        }
1192    }
1193}
1194
1195cfg_time! {
1196    impl Builder {
1197        /// Enables the time driver.
1198        ///
1199        /// Doing this enables using `tokio::time` on the runtime.
1200        ///
1201        /// # Examples
1202        ///
1203        /// ```
1204        /// use tokio::runtime;
1205        ///
1206        /// let rt = runtime::Builder::new_multi_thread()
1207        ///     .enable_time()
1208        ///     .build()
1209        ///     .unwrap();
1210        /// ```
1211        pub fn enable_time(&mut self) -> &mut Self {
1212            self.enable_time = true;
1213            self
1214        }
1215    }
1216}
1217
1218cfg_test_util! {
1219    impl Builder {
1220        /// Controls if the runtime's clock starts paused or advancing.
1221        ///
1222        /// Pausing time requires the current-thread runtime; construction of
1223        /// the runtime will panic otherwise.
1224        ///
1225        /// # Examples
1226        ///
1227        /// ```
1228        /// use tokio::runtime;
1229        ///
1230        /// let rt = runtime::Builder::new_current_thread()
1231        ///     .enable_time()
1232        ///     .start_paused(true)
1233        ///     .build()
1234        ///     .unwrap();
1235        /// ```
1236        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1237            self.start_paused = start_paused;
1238            self
1239        }
1240    }
1241}
1242
1243cfg_rt_multi_thread! {
1244    impl Builder {
1245        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1246            use crate::loom::sys::num_cpus;
1247            use crate::runtime::{Config, runtime::Scheduler};
1248            use crate::runtime::scheduler::{self, MultiThread};
1249
1250            let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1251
1252            let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1253
1254            // Create the blocking pool
1255            let blocking_pool =
1256                blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1257            let blocking_spawner = blocking_pool.spawner().clone();
1258
1259            // Generate a rng seed for this runtime.
1260            let seed_generator_1 = self.seed_generator.next_generator();
1261            let seed_generator_2 = self.seed_generator.next_generator();
1262
1263            let (scheduler, handle, launch) = MultiThread::new(
1264                core_threads,
1265                driver,
1266                driver_handle,
1267                blocking_spawner,
1268                seed_generator_2,
1269                Config {
1270                    before_park: self.before_park.clone(),
1271                    after_unpark: self.after_unpark.clone(),
1272                    global_queue_interval: self.global_queue_interval,
1273                    event_interval: self.event_interval,
1274                    local_queue_capacity: self.local_queue_capacity,
1275                    #[cfg(tokio_unstable)]
1276                    unhandled_panic: self.unhandled_panic.clone(),
1277                    disable_lifo_slot: self.disable_lifo_slot,
1278                    seed_generator: seed_generator_1,
1279                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1280                },
1281            );
1282
1283            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1284
1285            // Spawn the thread pool workers
1286            let _enter = handle.enter();
1287            launch.launch();
1288
1289            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1290        }
1291
1292        cfg_unstable! {
1293            fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1294                use crate::loom::sys::num_cpus;
1295                use crate::runtime::{Config, runtime::Scheduler};
1296                use crate::runtime::scheduler::MultiThreadAlt;
1297
1298                let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1299                let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1300
1301                // Create the blocking pool
1302                let blocking_pool =
1303                    blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1304                let blocking_spawner = blocking_pool.spawner().clone();
1305
1306                // Generate a rng seed for this runtime.
1307                let seed_generator_1 = self.seed_generator.next_generator();
1308                let seed_generator_2 = self.seed_generator.next_generator();
1309
1310                let (scheduler, handle) = MultiThreadAlt::new(
1311                    core_threads,
1312                    driver,
1313                    driver_handle,
1314                    blocking_spawner,
1315                    seed_generator_2,
1316                    Config {
1317                        before_park: self.before_park.clone(),
1318                        after_unpark: self.after_unpark.clone(),
1319                        global_queue_interval: self.global_queue_interval,
1320                        event_interval: self.event_interval,
1321                        local_queue_capacity: self.local_queue_capacity,
1322                        #[cfg(tokio_unstable)]
1323                        unhandled_panic: self.unhandled_panic.clone(),
1324                        disable_lifo_slot: self.disable_lifo_slot,
1325                        seed_generator: seed_generator_1,
1326                        metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1327                    },
1328                );
1329
1330                Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1331            }
1332        }
1333    }
1334}
1335
1336impl fmt::Debug for Builder {
1337    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1338        fmt.debug_struct("Builder")
1339            .field("worker_threads", &self.worker_threads)
1340            .field("max_blocking_threads", &self.max_blocking_threads)
1341            .field(
1342                "thread_name",
1343                &"<dyn Fn() -> String + Send + Sync + 'static>",
1344            )
1345            .field("thread_stack_size", &self.thread_stack_size)
1346            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1347            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1348            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1349            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1350            .finish()
1351    }
1352}