diff options
Diffstat (limited to 'monoio/src/runtime.rs')
| -rw-r--r-- | monoio/src/runtime.rs | 76 |
1 files changed, 53 insertions, 23 deletions
diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index bffd1a7..d85f838 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -1,11 +1,15 @@ +// 导入必要的模块 use std::future::Future; +// 根据配置导入必要的模块 #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; #[cfg(all(unix, feature = "legacy"))] use crate::LegacyDriver; + +// 从 crate 中导入必要的模块 use crate::{ driver::Driver, scheduler::{LocalScheduler, TaskQueue}, @@ -17,46 +21,57 @@ use crate::{ time::driver::Handle as TimeHandle, }; +// 为默认上下文声明一个线程本地变量 #[cfg(feature = "sync")] thread_local! { pub(crate) static DEFAULT_CTX: Context = Context { - thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID, - unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), - waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), - tasks: Default::default(), - time_handle: None, - blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic), + thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID, // 线程 ID + unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // unpark 句柄缓存 + waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // waker 发送器缓存 + tasks: Default::default(), // 任务队列 + time_handle: None, // 时间句柄 + blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic), // 阻塞句柄 }; } +// 为当前上下文声明一个作用域线程本地变量 线程局部变量 静态变量 +// CURRENT.with(|ctx| {}); +// 闭包内可以访问当前线程的 Context scoped_thread_local!(pub(crate) static CURRENT: Context); +/// 上下文结构体,包含任务队列、线程 ID、unpark 句柄、waker 发送器缓存、时间句柄和阻塞句柄 pub(crate) struct Context { /// Owned task set and local run queue + /// 拥有的任务集和本地运行队列 pub(crate) tasks: TaskQueue, - /// Thread id(not the kernel thread id but a generated unique number) + /// 线程 ID(不是内核线程 ID,而是生成的唯一数字) pub(crate) thread_id: usize, /// Thread unpark handles + /// 线程 unpark 句柄 #[cfg(feature = "sync")] pub(crate) unpark_cache: std::cell::RefCell<fxhash::FxHashMap<usize, crate::driver::UnparkHandle>>, /// Waker sender cache + /// waker 发送器缓存 #[cfg(feature = "sync")] pub(crate) waker_sender_cache: std::cell::RefCell<fxhash::FxHashMap<usize, flume::Sender<std::task::Waker>>>, /// Time Handle + /// 时间句柄 pub(crate) time_handle: Option<TimeHandle>, /// Blocking Handle + /// 阻塞句柄 #[cfg(feature = "sync")] pub(crate) blocking_handle: crate::blocking::BlockingHandle, } impl Context { + /// 创建一个带有阻塞句柄的新上下文 #[cfg(feature = "sync")] pub(crate) fn new(blocking_handle: crate::blocking::BlockingHandle) -> Self { let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); @@ -71,6 +86,7 @@ impl Context { } } + /// 创建一个不带阻塞句柄的新上下文 #[cfg(not(feature = "sync"))] pub(crate) fn new() -> Self { let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); @@ -82,6 +98,7 @@ impl Context { } } + /// unpark 给定 ID 的线程 #[allow(unused)] #[cfg(feature = "sync")] pub(crate) fn unpark_thread(&self, id: usize) { @@ -93,12 +110,14 @@ impl Context { if let Some(v) = get_unpark_handle(id) { // Write back to local cache + // 写回到本地缓存 let w = v.clone(); self.unpark_cache.borrow_mut().insert(id, w); v.unpark(); } } + /// 发送 waker 到给定 ID 的线程 #[allow(unused)] #[cfg(feature = "sync")] pub(crate) fn send_waker(&self, id: usize, w: std::task::Waker) { @@ -110,6 +129,7 @@ impl Context { if let Some(s) = get_waker_sender(id) { // Write back to local cache + // 写回到本地缓存 let _ = s.send(w); self.waker_sender_cache.borrow_mut().insert(id, s); } @@ -117,29 +137,32 @@ impl Context { } /// Monoio runtime +/// Monoio 运行时结构体,包含上下文和驱动程序 pub struct Runtime<D> { pub(crate) context: Context, pub(crate) driver: D, } impl<D> Runtime<D> { + /// 使用给定的上下文和驱动程序创建一个新的运行时 pub(crate) fn new(context: Context, driver: D) -> Self { Self { context, driver } } - /// Block on + /// 阻塞给定的 future pub fn block_on<F>(&mut self, future: F) -> F::Output where F: Future, D: Driver, { + // 不能在运行时内部启动运行时 assert!( !CURRENT.is_set(), "Can not start a runtime inside a runtime" ); - let waker = dummy_waker(); - let cx = &mut std::task::Context::from_waker(&waker); + let waker = dummy_waker(); // 占位符 waker + let cx = &mut std::task::Context::from_waker(&waker); // 标准库上下文 self.driver.with(|| { CURRENT.set(&self.context, || { @@ -153,11 +176,14 @@ impl<D> Runtime<D> { loop { loop { // Consume all tasks(with max round to prevent io starvation) - let mut max_round = self.context.tasks.len() * 2; + // 消费所有 task (最大轮数防止 io 饥饿) + let mut max_round = self.context.tasks.len() * 2; // Maximum round | Force exit when reaching the maximum round while let Some(t) = self.context.tasks.pop() { - t.run(); + t.run(); // 执行任务 + // 避免无限循环 if max_round == 0 { // maybe there's a looping task + // 或许这里 有一个 死循环 task 将 max_round 消耗到 0 了 break; } else { max_round -= 1; @@ -175,16 +201,18 @@ impl<D> Runtime<D> { if self.context.tasks.is_empty() { // No task to execute, we should wait for io blockingly // Hot path + // 就绪队列为空 + // 经常执行部分 break; } - - // Cold path + // Cold path | 到这一步比较少 + // 待定 let _ = self.driver.submit(); } // Wait and Process CQ(the error is ignored for not debug mode) #[cfg(not(all(debug_assertions, feature = "debug")))] - let _ = self.driver.park(); + let _ = self.driver.park(); // 无限等待并处理返回的事件 ?? #[cfg(all(debug_assertions, feature = "debug"))] if let Err(e) = self.driver.park() { @@ -196,8 +224,8 @@ impl<D> Runtime<D> { } } -/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based -/// runtime. +/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based runtime. +/// Fusion Runtime 是 io_uring driver 或者 legacy driver 的运行时的包装器 | 依照平台自动探测 #[cfg(all(unix, feature = "legacy"))] pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> { /// Uring driver based runtime. @@ -221,7 +249,7 @@ where L: Driver, R: Driver, { - /// Block on + /// Block on | 根据平台自动选择 pub fn block_on<F>(&mut self, future: F) -> F::Output where F: Future, @@ -352,11 +380,13 @@ impl From<Runtime<TimeDriver<IoUringDriver>>> for FusionRuntime<TimeDriver<IoUri } /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. +/// 生成新 async 任务, 返回 JoinHandle 实例 /// /// Spawning a task enables the task to execute concurrently to other tasks. /// There is no guarantee that a spawned task will execute to completion. When a /// runtime is shutdown, all outstanding tasks are dropped, regardless of the /// lifecycle of that task. +/// 生成任务允许任务并发执行,当一个任务关闭后,其他任务一同结束. /// /// /// [`JoinHandle`]: monoio::task::JoinHandle @@ -383,13 +413,13 @@ where T::Output: 'static, { let (task, join) = new_task( - crate::utils::thread_id::get_current_thread_id(), - future, - LocalScheduler, + crate::utils::thread_id::get_current_thread_id(), // 当前线程 id + future, // 任务 + LocalScheduler, // 调度器 ); CURRENT.with(|ctx| { - ctx.tasks.push(task); + ctx.tasks.push(task); // 就绪队列 push }); join } @@ -407,7 +437,7 @@ where ); CURRENT.with(|ctx| { - ctx.tasks.push(task); + ctx.tasks.push(task); // 就绪队列 push }); join } |
