summaryrefslogtreecommitdiff
path: root/monoio/src/runtime.rs
diff options
context:
space:
mode:
Diffstat (limited to 'monoio/src/runtime.rs')
-rw-r--r--monoio/src/runtime.rs76
1 files changed, 53 insertions, 23 deletions
diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs
index bffd1a7..d85f838 100644
--- a/monoio/src/runtime.rs
+++ b/monoio/src/runtime.rs
@@ -1,11 +1,15 @@
+// 导入必要的模块
use std::future::Future;
+// 根据配置导入必要的模块
#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
use crate::time::TimeDriver;
#[cfg(all(target_os = "linux", feature = "iouring"))]
use crate::IoUringDriver;
#[cfg(all(unix, feature = "legacy"))]
use crate::LegacyDriver;
+
+// 从 crate 中导入必要的模块
use crate::{
driver::Driver,
scheduler::{LocalScheduler, TaskQueue},
@@ -17,46 +21,57 @@ use crate::{
time::driver::Handle as TimeHandle,
};
+// 为默认上下文声明一个线程本地变量
#[cfg(feature = "sync")]
thread_local! {
pub(crate) static DEFAULT_CTX: Context = Context {
- thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID,
- unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()),
- waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()),
- tasks: Default::default(),
- time_handle: None,
- blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic),
+ thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID, // 线程 ID
+ unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // unpark 句柄缓存
+ waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // waker 发送器缓存
+ tasks: Default::default(), // 任务队列
+ time_handle: None, // 时间句柄
+ blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic), // 阻塞句柄
};
}
+// 为当前上下文声明一个作用域线程本地变量 线程局部变量 静态变量
+// CURRENT.with(|ctx| {});
+// 闭包内可以访问当前线程的 Context
scoped_thread_local!(pub(crate) static CURRENT: Context);
+/// 上下文结构体,包含任务队列、线程 ID、unpark 句柄、waker 发送器缓存、时间句柄和阻塞句柄
pub(crate) struct Context {
/// Owned task set and local run queue
+ /// 拥有的任务集和本地运行队列
pub(crate) tasks: TaskQueue,
-
/// Thread id(not the kernel thread id but a generated unique number)
+ /// 线程 ID(不是内核线程 ID,而是生成的唯一数字)
pub(crate) thread_id: usize,
/// Thread unpark handles
+ /// 线程 unpark 句柄
#[cfg(feature = "sync")]
pub(crate) unpark_cache:
std::cell::RefCell<fxhash::FxHashMap<usize, crate::driver::UnparkHandle>>,
/// Waker sender cache
+ /// waker 发送器缓存
#[cfg(feature = "sync")]
pub(crate) waker_sender_cache:
std::cell::RefCell<fxhash::FxHashMap<usize, flume::Sender<std::task::Waker>>>,
/// Time Handle
+ /// 时间句柄
pub(crate) time_handle: Option<TimeHandle>,
/// Blocking Handle
+ /// 阻塞句柄
#[cfg(feature = "sync")]
pub(crate) blocking_handle: crate::blocking::BlockingHandle,
}
impl Context {
+ /// 创建一个带有阻塞句柄的新上下文
#[cfg(feature = "sync")]
pub(crate) fn new(blocking_handle: crate::blocking::BlockingHandle) -> Self {
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
@@ -71,6 +86,7 @@ impl Context {
}
}
+ /// 创建一个不带阻塞句柄的新上下文
#[cfg(not(feature = "sync"))]
pub(crate) fn new() -> Self {
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
@@ -82,6 +98,7 @@ impl Context {
}
}
+ /// unpark 给定 ID 的线程
#[allow(unused)]
#[cfg(feature = "sync")]
pub(crate) fn unpark_thread(&self, id: usize) {
@@ -93,12 +110,14 @@ impl Context {
if let Some(v) = get_unpark_handle(id) {
// Write back to local cache
+ // 写回到本地缓存
let w = v.clone();
self.unpark_cache.borrow_mut().insert(id, w);
v.unpark();
}
}
+ /// 发送 waker 到给定 ID 的线程
#[allow(unused)]
#[cfg(feature = "sync")]
pub(crate) fn send_waker(&self, id: usize, w: std::task::Waker) {
@@ -110,6 +129,7 @@ impl Context {
if let Some(s) = get_waker_sender(id) {
// Write back to local cache
+ // 写回到本地缓存
let _ = s.send(w);
self.waker_sender_cache.borrow_mut().insert(id, s);
}
@@ -117,29 +137,32 @@ impl Context {
}
/// Monoio runtime
+/// Monoio 运行时结构体,包含上下文和驱动程序
pub struct Runtime<D> {
pub(crate) context: Context,
pub(crate) driver: D,
}
impl<D> Runtime<D> {
+ /// 使用给定的上下文和驱动程序创建一个新的运行时
pub(crate) fn new(context: Context, driver: D) -> Self {
Self { context, driver }
}
- /// Block on
+ /// 阻塞给定的 future
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
D: Driver,
{
+ // 不能在运行时内部启动运行时
assert!(
!CURRENT.is_set(),
"Can not start a runtime inside a runtime"
);
- let waker = dummy_waker();
- let cx = &mut std::task::Context::from_waker(&waker);
+ let waker = dummy_waker(); // 占位符 waker
+ let cx = &mut std::task::Context::from_waker(&waker); // 标准库上下文
self.driver.with(|| {
CURRENT.set(&self.context, || {
@@ -153,11 +176,14 @@ impl<D> Runtime<D> {
loop {
loop {
// Consume all tasks(with max round to prevent io starvation)
- let mut max_round = self.context.tasks.len() * 2;
+ // 消费所有 task (最大轮数防止 io 饥饿)
+ let mut max_round = self.context.tasks.len() * 2; // Maximum round | Force exit when reaching the maximum round
while let Some(t) = self.context.tasks.pop() {
- t.run();
+ t.run(); // 执行任务
+ // 避免无限循环
if max_round == 0 {
// maybe there's a looping task
+ // 或许这里 有一个 死循环 task 将 max_round 消耗到 0 了
break;
} else {
max_round -= 1;
@@ -175,16 +201,18 @@ impl<D> Runtime<D> {
if self.context.tasks.is_empty() {
// No task to execute, we should wait for io blockingly
// Hot path
+ // 就绪队列为空
+ // 经常执行部分
break;
}
-
- // Cold path
+ // Cold path | 到这一步比较少
+ // 待定
let _ = self.driver.submit();
}
// Wait and Process CQ(the error is ignored for not debug mode)
#[cfg(not(all(debug_assertions, feature = "debug")))]
- let _ = self.driver.park();
+ let _ = self.driver.park(); // 无限等待并处理返回的事件 ??
#[cfg(all(debug_assertions, feature = "debug"))]
if let Err(e) = self.driver.park() {
@@ -196,8 +224,8 @@ impl<D> Runtime<D> {
}
}
-/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based
-/// runtime.
+/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based runtime.
+/// Fusion Runtime 是 io_uring driver 或者 legacy driver 的运行时的包装器 | 依照平台自动探测
#[cfg(all(unix, feature = "legacy"))]
pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> {
/// Uring driver based runtime.
@@ -221,7 +249,7 @@ where
L: Driver,
R: Driver,
{
- /// Block on
+ /// Block on | 根据平台自动选择
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
@@ -352,11 +380,13 @@ impl From<Runtime<TimeDriver<IoUringDriver>>> for FusionRuntime<TimeDriver<IoUri
}
/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
+/// 生成新 async 任务, 返回 JoinHandle 实例
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion. When a
/// runtime is shutdown, all outstanding tasks are dropped, regardless of the
/// lifecycle of that task.
+/// 生成任务允许任务并发执行,当一个任务关闭后,其他任务一同结束.
///
///
/// [`JoinHandle`]: monoio::task::JoinHandle
@@ -383,13 +413,13 @@ where
T::Output: 'static,
{
let (task, join) = new_task(
- crate::utils::thread_id::get_current_thread_id(),
- future,
- LocalScheduler,
+ crate::utils::thread_id::get_current_thread_id(), // 当前线程 id
+ future, // 任务
+ LocalScheduler, // 调度器
);
CURRENT.with(|ctx| {
- ctx.tasks.push(task);
+ ctx.tasks.push(task); // 就绪队列 push
});
join
}
@@ -407,7 +437,7 @@ where
);
CURRENT.with(|ctx| {
- ctx.tasks.push(task);
+ ctx.tasks.push(task); // 就绪队列 push
});
join
}