// 导入必要的模块 use std::future::Future; // 根据配置导入必要的模块 #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; #[cfg(all(unix, feature = "legacy"))] use crate::LegacyDriver; // 从 crate 中导入必要的模块 use crate::{ driver::Driver, scheduler::{LocalScheduler, TaskQueue}, task::{ new_task, waker_fn::{dummy_waker, set_poll, should_poll}, JoinHandle, }, time::driver::Handle as TimeHandle, }; // 为默认上下文声明一个线程本地变量 #[cfg(feature = "sync")] thread_local! { pub(crate) static DEFAULT_CTX: Context = Context { thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID, // 线程 ID unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // unpark 句柄缓存 waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // waker 发送器缓存 tasks: Default::default(), // 任务队列 time_handle: None, // 时间句柄 blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic), // 阻塞句柄 }; } // 为当前上下文声明一个作用域线程本地变量 线程局部变量 静态变量 // CURRENT.with(|ctx| {}); // 闭包内可以访问当前线程的 Context scoped_thread_local!(pub(crate) static CURRENT: Context); /// 上下文结构体,包含任务队列、线程 ID、unpark 句柄、waker 发送器缓存、时间句柄和阻塞句柄 pub(crate) struct Context { /// Owned task set and local run queue /// 拥有的任务集和本地运行队列 pub(crate) tasks: TaskQueue, /// Thread id(not the kernel thread id but a generated unique number) /// 线程 ID(不是内核线程 ID,而是生成的唯一数字) pub(crate) thread_id: usize, /// Thread unpark handles /// 线程 unpark 句柄 #[cfg(feature = "sync")] pub(crate) unpark_cache: std::cell::RefCell>, /// Waker sender cache /// waker 发送器缓存 #[cfg(feature = "sync")] pub(crate) waker_sender_cache: std::cell::RefCell>>, /// Time Handle /// 时间句柄 pub(crate) time_handle: Option, /// Blocking Handle /// 阻塞句柄 #[cfg(feature = "sync")] pub(crate) blocking_handle: crate::blocking::BlockingHandle, } impl Context { /// 创建一个带有阻塞句柄的新上下文 #[cfg(feature = "sync")] pub(crate) fn new(blocking_handle: crate::blocking::BlockingHandle) -> Self { let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); Self { thread_id, unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), tasks: TaskQueue::default(), time_handle: None, blocking_handle, } } /// 创建一个不带阻塞句柄的新上下文 #[cfg(not(feature = "sync"))] pub(crate) fn new() -> Self { let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); Self { thread_id, tasks: TaskQueue::default(), time_handle: None, } } /// unpark 给定 ID 的线程 #[allow(unused)] #[cfg(feature = "sync")] pub(crate) fn unpark_thread(&self, id: usize) { use crate::driver::{thread::get_unpark_handle, unpark::Unpark}; if let Some(handle) = self.unpark_cache.borrow().get(&id) { handle.unpark(); return; } if let Some(v) = get_unpark_handle(id) { // Write back to local cache // 写回到本地缓存 let w = v.clone(); self.unpark_cache.borrow_mut().insert(id, w); v.unpark(); } } /// 发送 waker 到给定 ID 的线程 #[allow(unused)] #[cfg(feature = "sync")] pub(crate) fn send_waker(&self, id: usize, w: std::task::Waker) { use crate::driver::thread::get_waker_sender; if let Some(sender) = self.waker_sender_cache.borrow().get(&id) { let _ = sender.send(w); return; } if let Some(s) = get_waker_sender(id) { // Write back to local cache // 写回到本地缓存 let _ = s.send(w); self.waker_sender_cache.borrow_mut().insert(id, s); } } } /// Monoio runtime /// Monoio 运行时结构体,包含上下文和驱动程序 pub struct Runtime { pub(crate) context: Context, pub(crate) driver: D, } impl Runtime { /// 使用给定的上下文和驱动程序创建一个新的运行时 pub(crate) fn new(context: Context, driver: D) -> Self { Self { context, driver } } /// 主循环入口 pub fn block_on(&mut self, future: F) -> F::Output where F: Future, D: Driver, { // 不能在运行时内部启动运行时 assert!( !CURRENT.is_set(), "Can not start a runtime inside a runtime" ); let waker = dummy_waker(); // 占位符 waker let cx = &mut std::task::Context::from_waker(&waker); // 标准库上下文 self.driver.with(|| { CURRENT.set(&self.context, || { // #[cfg(feature = "sync")] // let join = unsafe { spawn_without_static(future) }; // #[cfg(not(feature = "sync"))] // join = future let join = future; let mut join = std::pin::pin!(join); set_poll(); // 设置 SHOULD_POLL true loop { loop { // Consume all tasks(with max round to prevent io starvation) // 消费所有 task (最大轮数防止 io 饥饿) let mut max_round = self.context.tasks.len() * 2; // Maximum round | Force exit when reaching the maximum round while let Some(t) = self.context.tasks.pop() { t.run(); // 执行任务 运行 (x.poll) // 避免无限循环 if max_round == 0 { // maybe there's a looping task // 或许这里 有一个 死循环 task 将 max_round 消耗到 0 了 break; } else { max_round -= 1; } } // Check main future | 这里才第一次运行 join while should_poll() { // check if ready if let std::task::Poll::Ready(t) = join.as_mut().poll(cx) { return t; } } if self.context.tasks.is_empty() { // No task to execute, we should wait for io blockingly // Hot path // 就绪队列为空 // 经常执行部分 break; } // Cold path | 到这一步比较少 let _ = self.driver.submit(); } // Wait and Process CQ(the error is ignored for not debug mode) #[cfg(not(all(debug_assertions, feature = "debug")))] let _ = self.driver.park(); // 无限等待并处理返回的事件 #[cfg(all(debug_assertions, feature = "debug"))] if let Err(e) = self.driver.park() { trace!("park error: {:?}", e); } } }) }) } } /// Fusion Runtime is a wrapper of io_uring driver or legacy driver based runtime. /// Fusion Runtime 是 io_uring driver 或者 legacy driver 的运行时的包装器 | 依照平台自动探测 /// 所以很复杂 #[cfg(all(unix, feature = "legacy"))] pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> { /// Uring driver based runtime. #[cfg(all(target_os = "linux", feature = "iouring"))] Uring(Runtime), /// Legacy driver based runtime. Legacy(Runtime), } /// Fusion Runtime is a wrapper of io_uring driver or legacy driver based /// runtime. #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))] pub enum FusionRuntime { /// Uring driver based runtime. Uring(Runtime), } #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] impl FusionRuntime where L: Driver, R: Driver, { /// Block on | 根据平台自动选择 pub fn block_on(&mut self, future: F) -> F::Output where F: Future, { match self { FusionRuntime::Uring(inner) => { info!("Monoio is running with io_uring driver"); inner.block_on(future) } FusionRuntime::Legacy(inner) => { info!("Monoio is running with legacy driver"); inner.block_on(future) } } } } #[cfg(all( unix, feature = "legacy", not(all(target_os = "linux", feature = "iouring")) ))] impl FusionRuntime where R: Driver, { /// Block on pub fn block_on(&mut self, future: F) -> F::Output where F: Future, { match self { FusionRuntime::Legacy(inner) => inner.block_on(future), } } } #[cfg(all(not(feature = "legacy"), all(target_os = "linux", feature = "iouring")))] impl FusionRuntime where R: Driver, { /// Block on pub fn block_on(&mut self, future: F) -> F::Output where F: Future, { match self { FusionRuntime::Uring(inner) => inner.block_on(future), } } } // L -> Fusion #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] impl From> for FusionRuntime { fn from(r: Runtime) -> Self { Self::Uring(r) } } // TL -> Fusion #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] impl From>> for FusionRuntime, TimeDriver> { fn from(r: Runtime>) -> Self { Self::Uring(r) } } // R -> Fusion #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] impl From> for FusionRuntime { fn from(r: Runtime) -> Self { Self::Legacy(r) } } // TR -> Fusion #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] impl From>> for FusionRuntime, TimeDriver> { fn from(r: Runtime>) -> Self { Self::Legacy(r) } } // R -> Fusion #[cfg(all( unix, feature = "legacy", not(all(target_os = "linux", feature = "iouring")) ))] impl From> for FusionRuntime { fn from(r: Runtime) -> Self { Self::Legacy(r) } } // TR -> Fusion #[cfg(all( unix, feature = "legacy", not(all(target_os = "linux", feature = "iouring")) ))] impl From>> for FusionRuntime> { fn from(r: Runtime>) -> Self { Self::Legacy(r) } } // L -> Fusion #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))] impl From> for FusionRuntime { fn from(r: Runtime) -> Self { Self::Uring(r) } } // TL -> Fusion #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))] impl From>> for FusionRuntime> { fn from(r: Runtime>) -> Self { Self::Uring(r) } } /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. /// 生成新 async 任务, 返回 JoinHandle 实例 /// /// Spawning a task enables the task to execute concurrently to other tasks. /// There is no guarantee that a spawned task will execute to completion. When a /// runtime is shutdown, all outstanding tasks are dropped, regardless of the /// lifecycle of that task. /// 生成的任务将和 其他任务 并发执行(都在就绪队列里) /// 但无法保证 生成的任务 一定执行完毕,runtime 结束时,未完成任务都会被 drop /// /// /// [`JoinHandle`]: monoio::task::JoinHandle /// /// # Examples /// /// In this example, a server is started and `spawn` is used to start a new task /// that processes each received connection. /// /// ```no_run /// #[monoio::main] /// async fn main() { /// let handle = monoio::spawn(async { /// println!("hello from a background task"); /// }); /// /// // Let the task complete /// handle.await; /// } /// ``` pub fn spawn(future: T) -> JoinHandle where T: Future + 'static, T::Output: 'static, { let (task, join) = new_task( crate::utils::thread_id::get_current_thread_id(), // 当前线程 id future, // 任务 LocalScheduler, // 调度器 ); CURRENT.with(|ctx| { ctx.tasks.push(task); // 就绪队列 push }); join } #[cfg(feature = "sync")] unsafe fn spawn_without_static(future: T) -> JoinHandle where T: Future, { use crate::task::new_task_holding; let (task, join) = new_task_holding( crate::utils::thread_id::get_current_thread_id(), future, LocalScheduler, ); CURRENT.with(|ctx| { ctx.tasks.push(task); // 就绪队列 push }); join } #[cfg(test)] mod tests { #[cfg(all(feature = "sync", target_os = "linux", feature = "iouring"))] #[test] fn across_thread() { use futures::channel::oneshot; use crate::driver::IoUringDriver; let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = oneshot::channel::(); std::thread::spawn(move || { let mut rt = crate::RuntimeBuilder::::new() .build() .unwrap(); rt.block_on(async move { let n = rx1.await.expect("unable to receive rx1"); assert!(tx2.send(n).is_ok()); }); }); let mut rt = crate::RuntimeBuilder::::new() .build() .unwrap(); rt.block_on(async move { assert!(tx1.send(24).is_ok()); assert_eq!(rx2.await.expect("unable to receive rx2"), 24); }); } #[cfg(all(target_os = "linux", feature = "iouring"))] #[test] fn timer() { use crate::driver::IoUringDriver; let mut rt = crate::RuntimeBuilder::::new() .enable_timer() .build() .unwrap(); let instant = std::time::Instant::now(); rt.block_on(async { crate::time::sleep(std::time::Duration::from_millis(200)).await; }); let eps = instant.elapsed().subsec_millis(); assert!((eps as i32 - 200).abs() < 50); } }