summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--monoio/src/blocking.rs28
-rw-r--r--monoio/src/driver/mod.rs7
-rw-r--r--monoio/src/net/tcp/listener.rs16
-rw-r--r--monoio/src/runtime.rs76
-rw-r--r--monoio/src/scheduler.rs30
-rw-r--r--monoio/src/task/core.rs5
-rw-r--r--monoio/src/task/harness.rs16
-rw-r--r--monoio/src/task/mod.rs45
-rw-r--r--monoio/src/task/raw.rs12
-rw-r--r--monoio/src/task/state.rs35
-rw-r--r--monoio/src/task/waker_fn.rs9
11 files changed, 201 insertions, 78 deletions
diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs
index 7d3469d..c8262c4 100644
--- a/monoio/src/blocking.rs
+++ b/monoio/src/blocking.rs
@@ -1,4 +1,5 @@
//! Blocking tasks related.
+//! 与阻塞任务相关的模块。
use std::{future::Future, task::Poll};
@@ -11,21 +12,24 @@ use crate::{
/// Users may implement a ThreadPool and attach it to runtime.
/// We also provide an implementation based on threadpool crate, you can use DefaultThreadPool.
+/// 用户可以实现一个ThreadPool并将其附加到运行时 或者 我们也提供了默认实现 DefaultThreadPool。
+/// ThreadPool trait 定义了一个 schedule_task 方法,用于在 spawn_blocking 时调用。
pub trait ThreadPool {
- /// Monoio runtime will call `schedule_task` on `spawn_blocking`.
- /// ThreadPool impl must execute it now or later.
fn schedule_task(&self, task: BlockingTask);
}
/// Error on waiting blocking task.
+/// 等待阻塞任务时的错误。
#[derive(Debug, Clone, Copy)]
pub enum JoinError {
/// Task is canceled.
+ /// 任务被取消。
Canceled,
}
/// BlockingTask is contrusted by monoio, ThreadPool impl
/// will execute it with `.run()`.
+/// BlockingTask 由 monoio 构造,ThreadPool impl 将使用 `.run()` 执行它。
pub struct BlockingTask {
task: Option<crate::task::Task<NoopScheduler>>,
blocking_vtable: &'static BlockingTaskVtable,
@@ -57,7 +61,7 @@ impl Drop for BlockingTask {
}
impl BlockingTask {
- /// Run task.
+ /// 运行任务。
#[inline]
pub fn run(mut self) {
let task = self.task.take().unwrap();
@@ -76,11 +80,14 @@ impl BlockingTask {
/// BlockingStrategy can be set if there is no ThreadPool attached.
/// It controls how to handle `spawn_blocking` without thread pool.
+/// BlockingStrategy 可以在没有 ThreadPool 附加时设置。它控制如何处理没有线程池的 `spawn_blocking`。
#[derive(Clone, Copy, Debug)]
pub enum BlockingStrategy {
/// Panic when `spawn_blocking`.
+ /// `spawn_blocking` 时 panic。
Panic,
/// Execute with current thread when `spawn_blocking`.
+ /// `spawn_blocking` 时使用当前线程执行。
ExecuteLocal,
}
@@ -89,6 +96,10 @@ pub enum BlockingStrategy {
/// Users can also set `BlockingStrategy` for a runtime when there is no thread pool.
/// WARNING: DO NOT USE THIS FOR ASYNC TASK! Async tasks will not be executed but only built the
/// future!
+/// `spawn_blocking` 用于执行具有重计算或阻塞 io 的任务(没有
+/// async)。要使用它,用户可以初始化线程池并将其附加到创建的运行时。
+/// 当没有线程池时,用户还可以为运行时设置 `BlockingStrategy`。
+/// 警告:不要将其用于异步任务!异步任务将不会执行,而只会构建 future!
pub fn spawn_blocking<F, R>(func: F) -> JoinHandle<Result<R, JoinError>>
where
F: FnOnce() -> R + Send + 'static,
@@ -110,6 +121,12 @@ where
// 2. set runtime blocking strategy to `BlockingStrategy::ExecuteLocal`
// Note: solution 2 will execute blocking task on current thread and may block other
// tasks This may cause other tasks high latency.
+
+ // 对于用户:如果看到此 panic,则有两个选择:
+ // 1. 附加共享线程池以执行阻塞任务
+ // 2. 将运行时阻塞策略设置为 `BlockingStrategy::ExecuteLocal`
+ // 注意:解决方案 2 将在当前线程上执行阻塞任务,可能会阻塞其他任务,
+ // 这可能会导致其他任务高延迟。
panic!("execute blocking task without thread pool attached")
}
}
@@ -121,13 +138,15 @@ where
/// DefaultThreadPool is a simple wrapped `threadpool::ThreadPool` that implement
/// `monoio::blocking::ThreadPool`. You may use this implementation, or you can use your own thread
/// pool implementation.
+/// DefaultThreadPool 是一个简单的包装了 `threadpool::ThreadPool` 的实现,实现了
+/// `monoio::blocking::ThreadPool`。 您可以使用此实现,也可以使用自己的线程池实现。
#[derive(Clone)]
pub struct DefaultThreadPool {
pool: ThreadPoolImpl,
}
impl DefaultThreadPool {
- /// Create a new DefaultThreadPool.
+ /// 创建一个新的 DefaultThreadPool。
pub fn new(num_threads: usize) -> Self {
let pool = ThreadPoolBuilder::default()
.num_threads(num_threads)
@@ -168,7 +187,6 @@ impl From<BlockingStrategy> for BlockingHandle {
struct BlockingFuture<F>(Option<F>);
impl<T> Unpin for BlockingFuture<T> {}
-
impl<F, R> Future for BlockingFuture<F>
where
F: FnOnce() -> R + Send + 'static,
diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs
index cf44330..b4048d4 100644
--- a/monoio/src/driver/mod.rs
+++ b/monoio/src/driver/mod.rs
@@ -66,22 +66,27 @@ impl unpark::Unpark for std::sync::Arc<dyn unpark::Unpark> {
}
}
-/// Core driver trait.
+/// Core driver trait. 驱动设备??
pub trait Driver {
/// Run with driver TLS.
fn with<R>(&self, f: impl FnOnce() -> R) -> R;
/// Submit ops to kernel and process returned events.
+ /// 提交操作到内核并处理返回的事件
fn submit(&self) -> io::Result<()>;
/// Wait infinitely and process returned events.
+ /// 无限等待并处理返回的事件
fn park(&self) -> io::Result<()>;
/// Wait with timeout and process returned events.
+ /// 带超时等待并处理返回的事件
fn park_timeout(&self, duration: Duration) -> io::Result<()>;
/// The struct to wake thread from another.
+ /// 用于唤醒另一个线程的结构体
#[cfg(feature = "sync")]
type Unpark: unpark::Unpark;
/// Get Unpark.
+ /// 获取 Unpark
#[cfg(feature = "sync")]
fn unpark(&self) -> Self::Unpark;
}
diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs
index 9cc81d1..858bdd7 100644
--- a/monoio/src/net/tcp/listener.rs
+++ b/monoio/src/net/tcp/listener.rs
@@ -43,21 +43,21 @@ impl TcpListener {
let addr = addr
.to_socket_addrs()?
.next()
- .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?;
+ .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; // 转换地址
- let domain = if addr.is_ipv6() {
+ let domain: socket2::Domain = if addr.is_ipv6() {
socket2::Domain::IPV6
} else {
socket2::Domain::IPV4
- };
+ }; // 获取地址类型
let sys_listener =
socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
- #[cfg(all(unix, feature = "legacy"))]
- Self::set_non_blocking(&sys_listener)?;
+ #[cfg(all(unix, feature = "legacy"))] // 兼容模式,使用 epoll 时
+ Self::set_non_blocking(&sys_listener)?; // socket 为非阻塞
let addr = socket2::SockAddr::from(addr);
- #[cfg(unix)]
+ #[cfg(unix)] // unix 平台
if opts.reuse_port {
sys_listener.set_reuse_port(true)?;
}
@@ -70,7 +70,7 @@ impl TcpListener {
if let Some(recv_buf_size) = opts.recv_buf_size {
sys_listener.set_recv_buffer_size(recv_buf_size)?;
}
- if opts.tcp_fast_open {
+ if opts.tcp_fast_open { // tcp fast open
#[cfg(any(target_os = "linux", target_os = "android"))]
super::tfo::set_tcp_fastopen(&sys_listener, opts.backlog)?;
#[cfg(any(target_os = "ios", target_os = "macos"))]
@@ -85,7 +85,7 @@ impl TcpListener {
}
#[cfg(unix)]
- let fd = SharedFd::new(sys_listener.into_raw_fd())?;
+ let fd = SharedFd::new(sys_listener.into_raw_fd())?; // 获取文件描述符
#[cfg(windows)]
let fd = unimplemented!();
diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs
index bffd1a7..d85f838 100644
--- a/monoio/src/runtime.rs
+++ b/monoio/src/runtime.rs
@@ -1,11 +1,15 @@
+// 导入必要的模块
use std::future::Future;
+// 根据配置导入必要的模块
#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
use crate::time::TimeDriver;
#[cfg(all(target_os = "linux", feature = "iouring"))]
use crate::IoUringDriver;
#[cfg(all(unix, feature = "legacy"))]
use crate::LegacyDriver;
+
+// 从 crate 中导入必要的模块
use crate::{
driver::Driver,
scheduler::{LocalScheduler, TaskQueue},
@@ -17,46 +21,57 @@ use crate::{
time::driver::Handle as TimeHandle,
};
+// 为默认上下文声明一个线程本地变量
#[cfg(feature = "sync")]
thread_local! {
pub(crate) static DEFAULT_CTX: Context = Context {
- thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID,
- unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()),
- waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()),
- tasks: Default::default(),
- time_handle: None,
- blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic),
+ thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID, // 线程 ID
+ unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // unpark 句柄缓存
+ waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // waker 发送器缓存
+ tasks: Default::default(), // 任务队列
+ time_handle: None, // 时间句柄
+ blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic), // 阻塞句柄
};
}
+// 为当前上下文声明一个作用域线程本地变量 线程局部变量 静态变量
+// CURRENT.with(|ctx| {});
+// 闭包内可以访问当前线程的 Context
scoped_thread_local!(pub(crate) static CURRENT: Context);
+/// 上下文结构体,包含任务队列、线程 ID、unpark 句柄、waker 发送器缓存、时间句柄和阻塞句柄
pub(crate) struct Context {
/// Owned task set and local run queue
+ /// 拥有的任务集和本地运行队列
pub(crate) tasks: TaskQueue,
-
/// Thread id(not the kernel thread id but a generated unique number)
+ /// 线程 ID(不是内核线程 ID,而是生成的唯一数字)
pub(crate) thread_id: usize,
/// Thread unpark handles
+ /// 线程 unpark 句柄
#[cfg(feature = "sync")]
pub(crate) unpark_cache:
std::cell::RefCell<fxhash::FxHashMap<usize, crate::driver::UnparkHandle>>,
/// Waker sender cache
+ /// waker 发送器缓存
#[cfg(feature = "sync")]
pub(crate) waker_sender_cache:
std::cell::RefCell<fxhash::FxHashMap<usize, flume::Sender<std::task::Waker>>>,
/// Time Handle
+ /// 时间句柄
pub(crate) time_handle: Option<TimeHandle>,
/// Blocking Handle
+ /// 阻塞句柄
#[cfg(feature = "sync")]
pub(crate) blocking_handle: crate::blocking::BlockingHandle,
}
impl Context {
+ /// 创建一个带有阻塞句柄的新上下文
#[cfg(feature = "sync")]
pub(crate) fn new(blocking_handle: crate::blocking::BlockingHandle) -> Self {
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
@@ -71,6 +86,7 @@ impl Context {
}
}
+ /// 创建一个不带阻塞句柄的新上下文
#[cfg(not(feature = "sync"))]
pub(crate) fn new() -> Self {
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
@@ -82,6 +98,7 @@ impl Context {
}
}
+ /// unpark 给定 ID 的线程
#[allow(unused)]
#[cfg(feature = "sync")]
pub(crate) fn unpark_thread(&self, id: usize) {
@@ -93,12 +110,14 @@ impl Context {
if let Some(v) = get_unpark_handle(id) {
// Write back to local cache
+ // 写回到本地缓存
let w = v.clone();
self.unpark_cache.borrow_mut().insert(id, w);
v.unpark();
}
}
+ /// 发送 waker 到给定 ID 的线程
#[allow(unused)]
#[cfg(feature = "sync")]
pub(crate) fn send_waker(&self, id: usize, w: std::task::Waker) {
@@ -110,6 +129,7 @@ impl Context {
if let Some(s) = get_waker_sender(id) {
// Write back to local cache
+ // 写回到本地缓存
let _ = s.send(w);
self.waker_sender_cache.borrow_mut().insert(id, s);
}
@@ -117,29 +137,32 @@ impl Context {
}
/// Monoio runtime
+/// Monoio 运行时结构体,包含上下文和驱动程序
pub struct Runtime<D> {
pub(crate) context: Context,
pub(crate) driver: D,
}
impl<D> Runtime<D> {
+ /// 使用给定的上下文和驱动程序创建一个新的运行时
pub(crate) fn new(context: Context, driver: D) -> Self {
Self { context, driver }
}
- /// Block on
+ /// 阻塞给定的 future
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
D: Driver,
{
+ // 不能在运行时内部启动运行时
assert!(
!CURRENT.is_set(),
"Can not start a runtime inside a runtime"
);
- let waker = dummy_waker();
- let cx = &mut std::task::Context::from_waker(&waker);
+ let waker = dummy_waker(); // 占位符 waker
+ let cx = &mut std::task::Context::from_waker(&waker); // 标准库上下文
self.driver.with(|| {
CURRENT.set(&self.context, || {
@@ -153,11 +176,14 @@ impl<D> Runtime<D> {
loop {
loop {
// Consume all tasks(with max round to prevent io starvation)
- let mut max_round = self.context.tasks.len() * 2;
+ // 消费所有 task (最大轮数防止 io 饥饿)
+ let mut max_round = self.context.tasks.len() * 2; // Maximum round | Force exit when reaching the maximum round
while let Some(t) = self.context.tasks.pop() {
- t.run();
+ t.run(); // 执行任务
+ // 避免无限循环
if max_round == 0 {
// maybe there's a looping task
+ // 或许这里 有一个 死循环 task 将 max_round 消耗到 0 了
break;
} else {
max_round -= 1;
@@ -175,16 +201,18 @@ impl<D> Runtime<D> {
if self.context.tasks.is_empty() {
// No task to execute, we should wait for io blockingly
// Hot path
+ // 就绪队列为空
+ // 经常执行部分
break;
}
-
- // Cold path
+ // Cold path | 到这一步比较少
+ // 待定
let _ = self.driver.submit();
}
// Wait and Process CQ(the error is ignored for not debug mode)
#[cfg(not(all(debug_assertions, feature = "debug")))]
- let _ = self.driver.park();
+ let _ = self.driver.park(); // 无限等待并处理返回的事件 ??
#[cfg(all(debug_assertions, feature = "debug"))]
if let Err(e) = self.driver.park() {
@@ -196,8 +224,8 @@ impl<D> Runtime<D> {
}
}
-/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based
-/// runtime.
+/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based runtime.
+/// Fusion Runtime 是 io_uring driver 或者 legacy driver 的运行时的包装器 | 依照平台自动探测
#[cfg(all(unix, feature = "legacy"))]
pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> {
/// Uring driver based runtime.
@@ -221,7 +249,7 @@ where
L: Driver,
R: Driver,
{
- /// Block on
+ /// Block on | 根据平台自动选择
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
@@ -352,11 +380,13 @@ impl From<Runtime<TimeDriver<IoUringDriver>>> for FusionRuntime<TimeDriver<IoUri
}
/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
+/// 生成新 async 任务, 返回 JoinHandle 实例
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion. When a
/// runtime is shutdown, all outstanding tasks are dropped, regardless of the
/// lifecycle of that task.
+/// 生成任务允许任务并发执行,当一个任务关闭后,其他任务一同结束.
///
///
/// [`JoinHandle`]: monoio::task::JoinHandle
@@ -383,13 +413,13 @@ where
T::Output: 'static,
{
let (task, join) = new_task(
- crate::utils::thread_id::get_current_thread_id(),
- future,
- LocalScheduler,
+ crate::utils::thread_id::get_current_thread_id(), // 当前线程 id
+ future, // 任务
+ LocalScheduler, // 调度器
);
CURRENT.with(|ctx| {
- ctx.tasks.push(task);
+ ctx.tasks.push(task); // 就绪队列 push
});
join
}
@@ -407,7 +437,7 @@ where
);
CURRENT.with(|ctx| {
- ctx.tasks.push(task);
+ ctx.tasks.push(task); // 就绪队列 push
});
join
}
diff --git a/monoio/src/scheduler.rs b/monoio/src/scheduler.rs
index 2d4791a..594ec77 100644
--- a/monoio/src/scheduler.rs
+++ b/monoio/src/scheduler.rs
@@ -1,64 +1,72 @@
+// 引入需要的库
use std::{cell::UnsafeCell, collections::VecDeque, marker::PhantomData};
use crate::task::{Schedule, Task};
+// 本地调度器
pub(crate) struct LocalScheduler;
+// 两种调度策略
impl Schedule for LocalScheduler {
+ // 就绪队列
fn schedule(&self, task: Task<Self>) {
crate::runtime::CURRENT.with(|cx| cx.tasks.push(task));
}
-
+ // 就绪队列,立刻执行
fn yield_now(&self, task: Task<Self>) {
crate::runtime::CURRENT.with(|cx| cx.tasks.push_front(task));
}
}
+// 任务队列
pub(crate) struct TaskQueue {
- // Local queue.
+ // Local queue. 就绪队列
queue: UnsafeCell<VecDeque<Task<LocalScheduler>>>,
// Make sure the type is `!Send` and `!Sync`.
- _marker: PhantomData<*const ()>,
+ _marker: PhantomData<*const ()>, // 非跨线程
}
impl Default for TaskQueue {
+ // 默认构造函数
fn default() -> Self {
Self::new()
}
}
impl TaskQueue {
+ // 创建就绪队列实例
pub(crate) fn new() -> Self {
- const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
+ const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; // 默认4096
Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
}
+ // 创建指定容量的就绪队列实例
pub(crate) fn new_with_capacity(capacity: usize) -> Self {
Self {
- queue: UnsafeCell::new(VecDeque::with_capacity(capacity)),
- _marker: PhantomData,
+ queue: UnsafeCell::new(VecDeque::with_capacity(capacity)), // 指定容量
+ _marker: PhantomData, // 内存对齐?
}
}
-
+ // 获取就绪队列长度
pub(crate) fn len(&self) -> usize {
unsafe { (*self.queue.get()).len() }
}
-
+ // 判断就绪队列是否为空
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
-
+ // 将任务加入就绪队列尾部
pub(crate) fn push(&self, runnable: Task<LocalScheduler>) {
unsafe {
(*self.queue.get()).push_back(runnable);
}
}
-
+ // 将任务加入就绪队列头部, 约等于立刻执行
pub(crate) fn push_front(&self, runnable: Task<LocalScheduler>) {
unsafe {
(*self.queue.get()).push_front(runnable);
}
}
-
+ // 从就绪队列头部弹出任务
pub(crate) fn pop(&self) -> Option<Task<LocalScheduler>> {
unsafe { (*self.queue.get()).pop_front() }
}
diff --git a/monoio/src/task/core.rs b/monoio/src/task/core.rs
index b43633f..e92af77 100644
--- a/monoio/src/task/core.rs
+++ b/monoio/src/task/core.rs
@@ -12,7 +12,7 @@ use super::{
Schedule,
};
-#[repr(C)]
+#[repr(C)] // 确保该结构的内存 与 C 语言兼容
pub(crate) struct Cell<T: Future, S> {
pub(crate) header: Header,
pub(crate) core: Core<T, S>,
@@ -40,13 +40,16 @@ pub(crate) struct Header {
/// State
pub(crate) state: State,
/// Table of function pointers for executing actions on the task.
+ /// 函数表
pub(crate) vtable: &'static Vtable,
/// Thread ID(sync: used for wake task on its thread; sync disabled: do checking)
+ /// 线程ID
pub(crate) owner_id: usize,
}
pub(crate) struct Trailer {
/// Consumer task waiting on completion of this task.
+ /// 唤醒的 Waker
pub(crate) waker: UnsafeCell<Option<Waker>>,
}
diff --git a/monoio/src/task/harness.rs b/monoio/src/task/harness.rs
index 309ef6a..a8c19ff 100644
--- a/monoio/src/task/harness.rs
+++ b/monoio/src/task/harness.rs
@@ -55,6 +55,7 @@ where
match self.poll_inner() {
PollFuture::Notified => {
// We should re-schedule the task.
+ // 重新执行任务
self.header().state.ref_inc();
self.core().scheduler.yield_now(self.get_new_task());
}
@@ -65,24 +66,28 @@ where
}
}
- /// Do polland return the status.
+ /// Do poll and return the status.
+ /// poll 并且返回状态
///
/// poll_inner does not take a ref-count. We must make sure the task is
/// alive when call this method
+ /// poll_inner 不会增加引用计数,我们必须确保任务在调用这个方法的时候是活着的
fn poll_inner(&self) -> PollFuture {
// notified -> running
+ // 状态转换 notified -> running | 就绪 到 运行
self.header().state.transition_to_running();
// poll the future
let waker_ref = waker_ref::<T, S>(self.header());
let cx = Context::from_waker(&waker_ref);
- let res = poll_future(&self.core().stage, cx);
-
+ let res = poll_future(&self.core().stage, cx); // poll future
+ // if ready
if res == Poll::Ready(()) {
- return PollFuture::Complete;
+ return PollFuture::Complete; // return complete
}
-
+ // task 没有完成
use super::state::TransitionToIdle;
+ // task 状态到 挂起
match self.header().state.transition_to_idle() {
TransitionToIdle::Ok => PollFuture::Done,
TransitionToIdle::OkNotified => PollFuture::Notified,
@@ -93,6 +98,7 @@ where
trace!("MONOIO DEBUG[Harness]:: dealloc");
// Release the join waker, if there is one.
+ // 释放 join waker, 如果有的话
self.trailer().waker.with_mut(drop);
// Check causality
diff --git a/monoio/src/task/mod.rs b/monoio/src/task/mod.rs
index 6133347..0479a3f 100644
--- a/monoio/src/task/mod.rs
+++ b/monoio/src/task/mod.rs
@@ -42,17 +42,17 @@ impl<S: 'static> Task<S> {
fn header(&self) -> &Header {
self.raw.header()
}
-
+ // 运行 (x.poll)
pub(crate) fn run(self) {
self.raw.poll();
}
- #[cfg(feature = "sync")]
+ #[cfg(feature = "sync")] // 编译时带有 sync 才会编译这一段
pub(crate) unsafe fn finish(&mut self, val_slot: *mut ()) {
self.raw.finish(val_slot);
}
}
-
+// 超出作用域 销毁的 特征
impl<S: 'static> Drop for Task<S> {
fn drop(&mut self) {
// Decrement the ref count
@@ -63,8 +63,9 @@ impl<S: 'static> Drop for Task<S> {
}
}
+// 调度器 特征
pub(crate) trait Schedule: Sized + 'static {
- /// Schedule the task
+ /// Schedule the task | 调度任务
fn schedule(&self, task: Task<Self>);
/// Schedule the task to run in the near future, yielding the thread to
/// other tasks.
@@ -73,6 +74,22 @@ pub(crate) trait Schedule: Sized + 'static {
}
}
+/// 创建一个新的任务,并返回任务句柄和 JoinHandle。
+///
+/// # 参数
+///
+/// - `owner_id`:任务所有者的 ID。
+/// - `task`:要执行的任务。
+/// - `scheduler`:任务调度器。
+///
+/// # 类型参数
+///
+/// - `S`:任务调度器的类型。
+/// - `T`:要执行的任务的类型。
+///
+/// # 返回值
+///
+/// 返回一个元组,包含任务句柄和 JoinHandle。
pub(crate) fn new_task<T, S>(
owner_id: usize,
task: T,
@@ -86,6 +103,26 @@ where
unsafe { new_task_holding(owner_id, task, scheduler) }
}
+/// 创建一个新的任务,并返回任务句柄和 JoinHandle。
+///
+/// # 安全性
+///
+/// 这个函数是 unsafe 的,因为它使用了裸指针。
+///
+/// # 参数
+///
+/// - `owner_id`:任务所有者的 ID。
+/// - `task`:要执行的任务。
+/// - `scheduler`:任务调度器。
+///
+/// # 类型参数
+///
+/// - `S`:任务调度器的类型。
+/// - `T`:要执行的任务的类型。
+///
+/// # 返回值
+///
+/// 返回一个元组,包含任务句柄和 JoinHandle。
pub(crate) unsafe fn new_task_holding<T, S>(
owner_id: usize,
task: T,
diff --git a/monoio/src/task/raw.rs b/monoio/src/task/raw.rs
index ccdc8e2..60780d8 100644
--- a/monoio/src/task/raw.rs
+++ b/monoio/src/task/raw.rs
@@ -7,24 +7,27 @@ use std::{
use crate::task::{Cell, Harness, Header, Schedule};
pub(crate) struct RawTask {
- ptr: NonNull<Header>,
+ ptr: NonNull<Header>, // 非空指向 Header 的裸指针
}
impl Clone for RawTask {
fn clone(&self) -> Self {
- *self
+ *self // 实际上是个裸指针
}
}
impl Copy for RawTask {}
+// 函数表?
pub(crate) struct Vtable {
/// Poll the future
pub(crate) poll: unsafe fn(NonNull<Header>),
/// Deallocate the memory
+ /// 释放内存
pub(crate) dealloc: unsafe fn(NonNull<Header>),
/// Read the task output, if complete
+ /// 读取任务输出,如果完成
pub(crate) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),
/// The join handle has been dropped
@@ -36,6 +39,7 @@ pub(crate) struct Vtable {
}
/// Get the vtable for the requested `T` and `S` generics.
+/// 生成
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll::<T, S>,
@@ -69,7 +73,7 @@ impl RawTask {
/// Safety: mutual exclusion is required to call this function.
pub(crate) fn poll(self) {
- let vtable = self.header().vtable;
+ let vtable: &Vtable = self.header().vtable;
unsafe { (vtable.poll)(self.ptr) }
}
@@ -105,7 +109,7 @@ unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
}
unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) {
- let harness = Harness::<T, S>::from_raw(ptr);
+ let harness: Harness<T, S> = Harness::<T, S>::from_raw(ptr);
harness.dealloc();
}
diff --git a/monoio/src/task/state.rs b/monoio/src/task/state.rs
index 3bba8fb..0c94a7f 100644
--- a/monoio/src/task/state.rs
+++ b/monoio/src/task/state.rs
@@ -6,29 +6,35 @@ use std::{
},
};
+// AtomicUsize 避免使用锁
pub(crate) struct State(AtomicUsize);
/// Current state value
-#[derive(Copy, Clone)]
+/// 真正 state 的值
+#[derive(Copy, Clone)] // 实现了 Copy 和 Clone
pub(crate) struct Snapshot(usize);
type UpdateResult = Result<Snapshot, Snapshot>;
/// The task is currently being run.
+/// 正在运行
const RUNNING: usize = 0b0001;
/// The task is complete.
///
/// Once this bit is set, it is never unset
+/// 任务完成
const COMPLETE: usize = 0b0010;
/// Extracts the task's lifecycle value from the state
const LIFECYCLE_MASK: usize = 0b11;
/// Flag tracking if the task has been pushed into a run queue.
+/// 已在就绪队列
const NOTIFIED: usize = 0b100;
/// The join handle is still around
+/// 连接句柄依然存在
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const JOIN_INTEREST: usize = 0b1_000;
@@ -40,6 +46,7 @@ const JOIN_WAKER: usize = 0b10_000;
const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER;
/// Bits used by the ref count portion of the state.
+///
const REF_COUNT_MASK: usize = !STATE_MASK;
/// Number of positions to shift the ref count
@@ -49,9 +56,9 @@ const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
/// State a task is initialized with
-///
+/// 任务状态初始化
/// A task is initialized with two references:
-///
+/// 两个引用
/// * A reference for Task.
/// * A reference for the JoinHandle.
///
@@ -73,7 +80,7 @@ pub(super) enum TransitionToNotified {
impl State {
pub(crate) fn new() -> Self {
- State(AtomicUsize::new(INITIAL_STATE))
+ State(AtomicUsize::new(INITIAL_STATE))
}
pub(crate) fn load(&self) -> Snapshot {
@@ -86,30 +93,34 @@ impl State {
/// Attempt to transition the lifecycle to `Running`. This sets the
/// notified bit to false so notifications during the poll can be detected.
+ /// 生命周期转换为 运行, 设置 notified 为 false, 以便在 poll 期间检测到通知
pub(super) fn transition_to_running(&self) {
- let mut snapshot = self.load();
- debug_assert!(snapshot.is_notified());
+ let mut snapshot = self.load(); // 获取值
+ debug_assert!(snapshot.is_notified());
debug_assert!(snapshot.is_idle());
- snapshot.set_running();
- snapshot.unset_notified();
+ snapshot.set_running(); // 设置运行
+ snapshot.unset_notified(); // 通知 = false
self.store(snapshot);
}
/// Transitions the task from `Running` -> `Idle`.
+ /// 运行 -> 挂起
pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
let mut snapshot = self.load();
debug_assert!(snapshot.is_running());
- snapshot.unset_running();
+ snapshot.unset_running(); // 运行 = false
+ // 如果 notified 位为 true, 则返回 OkNotified
let action = if snapshot.is_notified() {
- TransitionToIdle::OkNotified
+ TransitionToIdle::OkNotified // 已经通知
} else {
- TransitionToIdle::Ok
+ TransitionToIdle::Ok // 未通知
};
self.store(snapshot);
action
}
/// Transitions the task from `Running` -> `Complete`.
+ /// 运行 --> 完成
pub(super) fn transition_to_complete(&self) -> Snapshot {
const DELTA: usize = RUNNING | COMPLETE;
@@ -288,7 +299,7 @@ impl Snapshot {
pub(super) fn is_running(self) -> bool {
self.0 & RUNNING == RUNNING
}
-
+ // 设置 运行
fn set_running(&mut self) {
self.0 |= RUNNING;
}
diff --git a/monoio/src/task/waker_fn.rs b/monoio/src/task/waker_fn.rs
index 5d7f664..bb59fad 100644
--- a/monoio/src/task/waker_fn.rs
+++ b/monoio/src/task/waker_fn.rs
@@ -5,6 +5,7 @@ use std::cell::Cell;
///
/// This `Waker` is useful for polling a `Future` to check whether it is
/// `Ready`, without doing any additional work.
+/// 占位 Waker
pub(crate) fn dummy_waker() -> Waker {
fn raw_waker() -> RawWaker {
// the pointer is never dereferenced, so null is ok
@@ -27,15 +28,15 @@ pub(crate) fn dummy_waker() -> Waker {
unsafe { Waker::from_raw(raw_waker()) }
}
-#[thread_local]
-static SHOULD_POLL: Cell<bool> = Cell::new(true);
+#[thread_local] // 线程局部变量 bool
+static SHOULD_POLL: Cell<bool> = Cell::new(true); // 是否轮询?
#[inline]
pub(crate) fn should_poll() -> bool {
- SHOULD_POLL.replace(false)
+ SHOULD_POLL.replace(false) // 不轮询
}
#[inline]
pub(crate) fn set_poll() {
- SHOULD_POLL.set(true);
+ SHOULD_POLL.set(true); // 轮询
}