//! Blocking tasks related. //! 与阻塞任务相关的模块。 use std::{future::Future, task::Poll}; use threadpool::{Builder as ThreadPoolBuilder, ThreadPool as ThreadPoolImpl}; use crate::{ task::{new_task, JoinHandle}, utils::thread_id::DEFAULT_THREAD_ID, }; /// Users may implement a ThreadPool and attach it to runtime. /// We also provide an implementation based on threadpool crate, you can use DefaultThreadPool. /// 用户可以实现一个ThreadPool并将其附加到运行时 或者 我们也提供了默认实现 DefaultThreadPool。 /// ThreadPool trait 定义了一个 schedule_task 方法,用于在 spawn_blocking 时调用。 pub trait ThreadPool { fn schedule_task(&self, task: BlockingTask); } /// Error on waiting blocking task. /// 等待阻塞任务时的错误。 #[derive(Debug, Clone, Copy)] pub enum JoinError { /// Task is canceled. /// 任务被取消。 Canceled, } /// BlockingTask is contrusted by monoio, ThreadPool impl /// will execute it with `.run()`. /// BlockingTask 由 monoio 构造,ThreadPool impl 将使用 `.run()` 执行它。 pub struct BlockingTask { task: Option>, blocking_vtable: &'static BlockingTaskVtable, } unsafe impl Send for BlockingTask {} struct BlockingTaskVtable { pub(crate) drop: unsafe fn(&mut crate::task::Task), } fn blocking_vtable() -> &'static BlockingTaskVtable { &BlockingTaskVtable { drop: blocking_task_drop::, } } fn blocking_task_drop(task: &mut crate::task::Task) { let mut opt: Option> = Some(Err(JoinError::Canceled)); unsafe { task.finish((&mut opt) as *mut _ as *mut ()) }; } impl Drop for BlockingTask { fn drop(&mut self) { if let Some(task) = self.task.as_mut() { unsafe { (self.blocking_vtable.drop)(task) }; } } } impl BlockingTask { /// 运行任务。 #[inline] pub fn run(mut self) { let task = self.task.take().unwrap(); task.run(); // // if we are within a runtime, just run it. // if crate::runtime::CURRENT.is_set() { // task.run(); // return; // } // // if we are on a standalone thread, we will use thread local ctx as Context. // crate::runtime::DEFAULT_CTX.with(|ctx| { // crate::runtime::CURRENT.set(ctx, || task.run()); // }); } } /// BlockingStrategy can be set if there is no ThreadPool attached. /// It controls how to handle `spawn_blocking` without thread pool. /// BlockingStrategy 可以在没有 ThreadPool 附加时设置。它控制如何处理没有线程池的 `spawn_blocking`。 #[derive(Clone, Copy, Debug)] pub enum BlockingStrategy { /// Panic when `spawn_blocking`. /// `spawn_blocking` 时 panic。 Panic, /// Execute with current thread when `spawn_blocking`. /// `spawn_blocking` 时使用当前线程执行。 ExecuteLocal, } /// `spawn_blocking` is used for executing a task(without async) with heavy computation or blocking /// io. To used it, users may initialize a thread pool and attach it on creating runtime. /// Users can also set `BlockingStrategy` for a runtime when there is no thread pool. /// WARNING: DO NOT USE THIS FOR ASYNC TASK! Async tasks will not be executed but only built the /// future! /// `spawn_blocking` 用于执行具有重计算或阻塞 io 的任务(没有 /// async)。要使用它,用户可以初始化线程池并将其附加到创建的运行时。 /// 当没有线程池时,用户还可以为运行时设置 `BlockingStrategy`。 /// 警告:不要将其用于异步任务!异步任务将不会执行,而只会构建 future! pub fn spawn_blocking(func: F) -> JoinHandle> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { let fut = BlockingFuture(Some(func)); let (task, join) = new_task(DEFAULT_THREAD_ID, fut, NoopScheduler); crate::runtime::CURRENT.with(|inner| { let handle = &inner.blocking_handle; match handle { BlockingHandle::Attached(shared) => shared.schedule_task(BlockingTask { task: Some(task), blocking_vtable: blocking_vtable::(), }), BlockingHandle::Empty(BlockingStrategy::ExecuteLocal) => task.run(), BlockingHandle::Empty(BlockingStrategy::Panic) => { // For users: if you see this panic, you have 2 choices: // 1. attach a shared thread pool to execute blocking tasks // 2. set runtime blocking strategy to `BlockingStrategy::ExecuteLocal` // Note: solution 2 will execute blocking task on current thread and may block other // tasks This may cause other tasks high latency. // 对于用户:如果看到此 panic,则有两个选择: // 1. 附加共享线程池以执行阻塞任务 // 2. 将运行时阻塞策略设置为 `BlockingStrategy::ExecuteLocal` // 注意:解决方案 2 将在当前线程上执行阻塞任务,可能会阻塞其他任务, // 这可能会导致其他任务高延迟。 panic!("execute blocking task without thread pool attached") } } }); join } /// DefaultThreadPool is a simple wrapped `threadpool::ThreadPool` that implement /// `monoio::blocking::ThreadPool`. You may use this implementation, or you can use your own thread /// pool implementation. /// DefaultThreadPool 是一个简单的包装了 `threadpool::ThreadPool` 的实现,实现了 /// `monoio::blocking::ThreadPool`。 您可以使用此实现,也可以使用自己的线程池实现。 #[derive(Clone)] pub struct DefaultThreadPool { pool: ThreadPoolImpl, } impl DefaultThreadPool { /// 创建一个新的 DefaultThreadPool。 pub fn new(num_threads: usize) -> Self { let pool = ThreadPoolBuilder::default() .num_threads(num_threads) .build(); Self { pool } } } impl ThreadPool for DefaultThreadPool { fn schedule_task(&self, task: BlockingTask) { self.pool.execute(move || task.run()); } } // 完全公平调度 pub(crate) struct NoopScheduler; impl crate::task::Schedule for NoopScheduler { fn schedule(&self, _task: crate::task::Task) { unreachable!() } fn yield_now(&self, _task: crate::task::Task) { unreachable!() } } pub(crate) enum BlockingHandle { Attached(Box), Empty(BlockingStrategy), } impl From for BlockingHandle { fn from(value: BlockingStrategy) -> Self { Self::Empty(value) } } struct BlockingFuture(Option); impl Unpin for BlockingFuture {} impl Future for BlockingFuture where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { type Output = Result; fn poll( mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let me = &mut *self; let func = me.0.take().expect("blocking task ran twice."); Poll::Ready(Ok(func())) } } #[cfg(test)] mod tests { use super::DefaultThreadPool; /// NaiveThreadPool always create a new thread on executing tasks. struct NaiveThreadPool; impl super::ThreadPool for NaiveThreadPool { fn schedule_task(&self, task: super::BlockingTask) { std::thread::spawn(move || { task.run(); }); } } /// FakeThreadPool always drop tasks. struct FakeThreadPool; impl super::ThreadPool for FakeThreadPool { fn schedule_task(&self, _task: super::BlockingTask) {} } #[test] fn hello_blocking() { let shared_pool = Box::new(NaiveThreadPool); let mut rt = crate::RuntimeBuilder::::new() .attach_thread_pool(shared_pool) .enable_timer() .build() .unwrap(); rt.block_on(async { let begin = std::time::Instant::now(); let join = crate::spawn_blocking(|| { // Simulate a heavy computation. std::thread::sleep(std::time::Duration::from_millis(400)); "hello spawn_blocking!".to_string() }); let sleep_async = crate::time::sleep(std::time::Duration::from_millis(400)); let (result, _) = crate::join!(join, sleep_async); let eps = begin.elapsed(); assert!(eps < std::time::Duration::from_millis(800)); assert!(eps >= std::time::Duration::from_millis(400)); assert_eq!(result.unwrap(), "hello spawn_blocking!"); }); } #[test] #[should_panic] fn blocking_panic() { let mut rt = crate::RuntimeBuilder::::new() .with_blocking_strategy(crate::blocking::BlockingStrategy::Panic) .enable_timer() .build() .unwrap(); rt.block_on(async { let join = crate::spawn_blocking(|| 1); let _ = join.await; }); } #[test] fn blocking_current() { let mut rt = crate::RuntimeBuilder::::new() .with_blocking_strategy(crate::blocking::BlockingStrategy::ExecuteLocal) .enable_timer() .build() .unwrap(); rt.block_on(async { let begin = std::time::Instant::now(); let join = crate::spawn_blocking(|| { // Simulate a heavy computation. std::thread::sleep(std::time::Duration::from_millis(100)); "hello spawn_blocking!".to_string() }); let sleep_async = crate::time::sleep(std::time::Duration::from_millis(100)); let (result, _) = crate::join!(join, sleep_async); let eps = begin.elapsed(); assert!(eps > std::time::Duration::from_millis(200)); assert_eq!(result.unwrap(), "hello spawn_blocking!"); }); } #[test] fn drop_task() { let shared_pool = Box::new(FakeThreadPool); let mut rt = crate::RuntimeBuilder::::new() .attach_thread_pool(shared_pool) .enable_timer() .build() .unwrap(); rt.block_on(async { let ret = crate::spawn_blocking(|| 1).await; assert!(matches!(ret, Err(super::JoinError::Canceled))); }); } #[test] fn default_pool() { let shared_pool = Box::new(DefaultThreadPool::new(3)); let mut rt = crate::RuntimeBuilder::::new() .attach_thread_pool(shared_pool) .enable_timer() .build() .unwrap(); rt.block_on(async { let begin = std::time::Instant::now(); let join1 = crate::spawn_blocking(|| { // Simulate a heavy computation. std::thread::sleep(std::time::Duration::from_millis(150)); "hello spawn_blocking1!".to_string() }); let join2 = crate::spawn_blocking(|| { // Simulate a heavy computation. std::thread::sleep(std::time::Duration::from_millis(150)); "hello spawn_blocking2!".to_string() }); let join3 = crate::spawn_blocking(|| { // Simulate a heavy computation. std::thread::sleep(std::time::Duration::from_millis(150)); "hello spawn_blocking3!".to_string() }); let join4 = crate::spawn_blocking(|| { // Simulate a heavy computation. std::thread::sleep(std::time::Duration::from_millis(150)); "hello spawn_blocking4!".to_string() }); let sleep_async = crate::time::sleep(std::time::Duration::from_millis(150)); let (result1, result2, result3, result4, _) = crate::join!(join1, join2, join3, join4, sleep_async); let eps = begin.elapsed(); assert!(eps < std::time::Duration::from_millis(590)); assert!(eps >= std::time::Duration::from_millis(150)); assert_eq!(result1.unwrap(), "hello spawn_blocking1!"); assert_eq!(result2.unwrap(), "hello spawn_blocking2!"); assert_eq!(result3.unwrap(), "hello spawn_blocking3!"); assert_eq!(result4.unwrap(), "hello spawn_blocking4!"); }); } }