summaryrefslogtreecommitdiff
path: root/monoio/src/task
diff options
context:
space:
mode:
Diffstat (limited to 'monoio/src/task')
-rw-r--r--monoio/src/task/core.rs37
-rw-r--r--monoio/src/task/harness.rs45
-rw-r--r--monoio/src/task/join.rs6
-rw-r--r--monoio/src/task/mod.rs27
-rw-r--r--monoio/src/task/raw.rs30
-rw-r--r--monoio/src/task/state.rs45
-rw-r--r--monoio/src/task/utils.rs3
-rw-r--r--monoio/src/task/waker.rs16
-rw-r--r--monoio/src/task/waker_fn.rs9
9 files changed, 141 insertions, 77 deletions
diff --git a/monoio/src/task/core.rs b/monoio/src/task/core.rs
index e92af77..d27cc3b 100644
--- a/monoio/src/task/core.rs
+++ b/monoio/src/task/core.rs
@@ -14,30 +14,36 @@ use super::{
#[repr(C)] // 确保该结构的内存 与 C 语言兼容
pub(crate) struct Cell<T: Future, S> {
- pub(crate) header: Header,
- pub(crate) core: Core<T, S>,
- pub(crate) trailer: Trailer,
+ pub(crate) header: Header, // 头部
+ pub(crate) core: Core<T, S>, // 核心
+ pub(crate) trailer: Trailer, // 尾部
}
pub(crate) struct Core<T: Future, S> {
/// Scheduler used to drive this future
+ /// 调度器 用来驱动 future
pub(crate) scheduler: S,
/// Either the future or the output
+ /// 不是 future 就是 output??
+ /// 任务运行阶段, 任务执行的状态
pub(crate) stage: CoreStage<T>,
}
+// 阶段
pub(crate) struct CoreStage<T: Future> {
stage: UnsafeCell<Stage<T>>,
}
+// 阶段
pub(crate) enum Stage<T: Future> {
- Running(T),
- Finished(T::Output),
- Consumed,
+ Running(T), // 正在运行
+ Finished(T::Output), // 已结束
+ Consumed, // 已消耗?
}
#[repr(C)]
pub(crate) struct Header {
/// State
+ /// 任务状态 RUNNING COMPLETE NOTIFIED JOIN_INTEREST JOIN_WAKER
pub(crate) state: State,
/// Table of function pointers for executing actions on the task.
/// 函数表
@@ -49,13 +55,14 @@ pub(crate) struct Header {
pub(crate) struct Trailer {
/// Consumer task waiting on completion of this task.
- /// 唤醒的 Waker
+ /// 持有 Waker
pub(crate) waker: UnsafeCell<Option<Waker>>,
}
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
+ /// 新的任务单元, 包含 头部, 尾部, 核心
pub(crate) fn new(owner_id: usize, future: T, scheduler: S) -> Box<Cell<T, S>> {
Box::new(Cell {
header: Header {
@@ -105,19 +112,19 @@ impl<T: Future> CoreStage<T> {
}
/// Drop the future
- ///
+ /// 丢弃 future
/// # Safety
///
/// The caller must ensure it is safe to mutate the `stage` field.
pub(crate) fn drop_future_or_output(&self) {
// Safety: the caller ensures mutual exclusion to the field.
unsafe {
- self.set_stage(Stage::Consumed);
+ self.set_stage(Stage::Consumed); // 运行状态设为 已消费
}
}
/// Store the task output
- ///
+ /// 存储任务输出
/// # Safety
///
/// The caller must ensure it is safe to mutate the `stage` field.
@@ -129,7 +136,7 @@ impl<T: Future> CoreStage<T> {
}
/// Take the task output
- ///
+ /// 获取任务输出
/// # Safety
///
/// The caller must ensure it is safe to mutate the `stage` field.
@@ -152,6 +159,7 @@ impl<T: Future> CoreStage<T> {
impl Header {
#[allow(unused)]
+ // 获取当前线程 id
pub(crate) fn get_owner_id(&self) -> usize {
// safety: If there are concurrent writes, then that write has violated
// the safety requirements on `set_owner_id`.
@@ -160,20 +168,23 @@ impl Header {
}
impl Trailer {
+ // 设置 waker
pub(crate) unsafe fn set_waker(&self, waker: Option<Waker>) {
self.waker.with_mut(|ptr| {
*ptr = waker;
});
}
-
+ // 传入的 waker 和 当前waker 实例 self 是否唤醒的是同一个 任务
pub(crate) unsafe fn will_wake(&self, waker: &Waker) -> bool {
self.waker
.with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
}
-
+ // 唤醒 任务
pub(crate) fn wake_join(&self) {
self.waker.with(|ptr| match unsafe { &*ptr } {
+ // 唤醒 join
Some(waker) => waker.wake_by_ref(),
+ // waker 没有值直接 panic
None => panic!("waker missing"),
});
}
diff --git a/monoio/src/task/harness.rs b/monoio/src/task/harness.rs
index a8c19ff..d181714 100644
--- a/monoio/src/task/harness.rs
+++ b/monoio/src/task/harness.rs
@@ -16,6 +16,7 @@ use crate::{
utils::thread_id::{try_get_current_thread_id, DEFAULT_THREAD_ID},
};
+// 非空 Cell 的包装
pub(crate) struct Harness<T: Future, S: 'static> {
cell: NonNull<Cell<T, S>>,
}
@@ -25,12 +26,13 @@ where
T: Future,
S: 'static,
{
+ // 由 Header 指针 -> Cell -> Harness
pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
Harness {
cell: ptr.cast::<Cell<T, S>>(),
}
}
-
+ // 访问各个字段
fn header(&self) -> &Header {
unsafe { &self.cell.as_ref().header }
}
@@ -50,17 +52,20 @@ where
S: Schedule,
{
/// Polls the inner future.
+ /// 真正执行 poll 的地方
pub(super) fn poll(self) {
trace!("MONOIO DEBUG[Harness]:: poll");
+ // 匹配 poll 的结果
match self.poll_inner() {
PollFuture::Notified => {
// We should re-schedule the task.
// 重新执行任务
self.header().state.ref_inc();
+ // 到就绪队列头部,下一个执行
self.core().scheduler.yield_now(self.get_new_task());
}
PollFuture::Complete => {
- self.complete();
+ self.complete(); // 完成任务
}
PollFuture::Done => (),
}
@@ -80,20 +85,22 @@ where
// poll the future
let waker_ref = waker_ref::<T, S>(self.header());
let cx = Context::from_waker(&waker_ref);
- let res = poll_future(&self.core().stage, cx); // poll future
- // if ready
+ let res = poll_future(&self.core().stage, cx); // poll future wow!
+ // if ready
if res == Poll::Ready(()) {
- return PollFuture::Complete; // return complete
+ return PollFuture::Complete; // return complete 任务完成
}
// task 没有完成
use super::state::TransitionToIdle;
// task 状态到 挂起
match self.header().state.transition_to_idle() {
+ // 未被通知
TransitionToIdle::Ok => PollFuture::Done,
+ // 已经被通知了
TransitionToIdle::OkNotified => PollFuture::Notified,
}
}
-
+ // 解除分配
pub(super) fn dealloc(self) {
trace!("MONOIO DEBUG[Harness]:: dealloc");
@@ -102,9 +109,11 @@ where
self.trailer().waker.with_mut(drop);
// Check causality
+ // 任务运行状态 然后干了些啥..
self.core().stage.with_mut(drop);
unsafe {
+ // from_raw 需要操作裸指针,因此是 unsafe 包裹
drop(Box::from_raw(self.cell.as_ptr()));
}
}
@@ -118,10 +127,13 @@ where
}
// ===== join handle =====
+ // 任务的 handle
/// Read the task output into `dst`.
+ /// 读取任务的输出到 dst
pub(super) fn try_read_output(self, dst: &mut Poll<T::Output>, waker: &Waker) {
trace!("MONOIO DEBUG[Harness]:: try_read_output");
+ // 如何能读到任务返回值
if can_read_output(self.header(), self.trailer(), waker) {
*dst = Poll::Ready(self.core().stage.take_output());
}
@@ -158,15 +170,19 @@ where
}
// ===== waker behavior =====
+ // waker 相关的属性
/// This call consumes a ref-count and notifies the task. This will create a
/// new Notified and submit it if necessary.
+ /// 这个调用消耗一个引用计数并且通知任务,如果有必要的话,这将创建一个新的 Notified 并且提交它
///
/// The caller does not need to hold a ref-count besides the one that was
/// passed to this call.
+ /// 调用者不需要持有一个引用计数,除了传递给这个调用的引用计数
pub(super) fn wake_by_val(self) {
trace!("MONOIO DEBUG[Harness]:: wake_by_val");
let owner_id = self.header().owner_id;
+ // 如果是不同的线程
if is_remote_task(owner_id) {
// send to target thread
trace!("MONOIO DEBUG[Harness]:: wake_by_val with another thread id");
@@ -217,9 +233,11 @@ where
/// This call notifies the task. It will not consume any ref-counts, but the
/// caller should hold a ref-count. This will create a new Notified and
/// submit it if necessary.
+ /// 这个调用通知任务, 它不会消耗任何引用计数 ,但是调用者应该持有一个引用计数,如果有必要的话, 这将创建一个新的 Notified 并且提交它
pub(super) fn wake_by_ref(&self) {
trace!("MONOIO DEBUG[Harness]:: wake_by_ref");
let owner_id = self.header().owner_id;
+ // 如果在不同线程
if is_remote_task(owner_id) {
// send to target thread
trace!("MONOIO DEBUG[Harness]:: wake_by_ref with another thread id");
@@ -266,6 +284,7 @@ where
}
}
+ // drop it!
pub(super) fn drop_reference(self) {
trace!("MONOIO DEBUG[Harness]:: drop_reference");
if self.header().state.ref_dec() {
@@ -274,11 +293,14 @@ where
}
// ====== internal ======
+ // 下面的方法全部都是 模块内调用的了
/// Complete the task. This method assumes that the state is RUNNING.
+ /// 完成任务,这个方法假设状态是 RUNNING
fn complete(self) {
// The future has completed and its output has been written to the task
// stage. We transition from running to complete.
+ // 任务完成,并且它的输出已经被写入到 stage 将任务 `Running` -> `Complete`
let snapshot = self.header().state.transition_to_complete();
@@ -299,7 +321,7 @@ where
}
/// Create a new task that holds its own ref-count.
- ///
+ /// 创建一个新的任务,它持有自己的引用计数. 其实还是指向同一个 rawTask
/// # Safety
///
/// Any use of `self` after this call must ensure that a ref-count to the
@@ -312,7 +334,7 @@ where
unsafe { Task::from_raw(self.cell.cast()) }
}
}
-
+// 任务所在 线程是否是当前线程
fn is_remote_task(owner_id: usize) -> bool {
if owner_id == DEFAULT_THREAD_ID {
return true;
@@ -322,7 +344,7 @@ fn is_remote_task(owner_id: usize) -> bool {
None => true,
}
}
-
+// task 是否能读到输出
fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
// Load a snapshot of the current task state
let snapshot = header.state.load();
@@ -409,6 +431,7 @@ enum PollFuture {
/// Poll the future. If the future completes, the output is written to the
/// stage field.
+/// poll future 完成,输出被写入到 stage 字段
fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
// CHIHAI: For efficiency we do not catch.
@@ -429,9 +452,10 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
// mem::forget(guard);
// res
// }));
- let output = core.poll(cx);
+ let output = core.poll(cx); // poll future
// Prepare output for being placed in the core stage.
+ // 预备将 output
let output = match output {
// Ok(Poll::Pending) => return Poll::Pending,
// Ok(Poll::Ready(output)) => Ok(output),
@@ -444,6 +468,7 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
// let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
// core.store_output(output);
// }));
+ //output 放入 core stage
core.store_output(output);
Poll::Ready(())
diff --git a/monoio/src/task/join.rs b/monoio/src/task/join.rs
index 4f11add..f664d7b 100644
--- a/monoio/src/task/join.rs
+++ b/monoio/src/task/join.rs
@@ -9,6 +9,7 @@ use super::raw::RawTask;
/// JoinHandle can be used to wait task finished.
/// Note if you drop it directly, task will not be terminated.
+///用于等待 task 完成, 但是如果直接丢弃它, task 将不会立刻被终止
pub struct JoinHandle<T> {
raw: RawTask,
_p: PhantomData<T>,
@@ -26,6 +27,7 @@ impl<T> JoinHandle<T> {
}
/// Checks if the task associated with this `JoinHandle` has finished.
+ /// 检查与此 `JoinHandle` 关联的 task 是否已完成
pub fn is_finished(&self) -> bool {
let state = self.raw.header().state.load();
state.is_complete()
@@ -42,11 +44,13 @@ impl<T> Future for JoinHandle<T> {
// Try to read the task output. If the task is not yet complete, the
// waker is stored and is notified once the task does complete.
+ // 尝试读取 task 输出, 如果 task 还没有完成, 则存储 waker, 并在 task 完成时通知它
//
// The function must go via the vtable, which requires erasing generic
// types. To do this, the function "return" is placed on the stack
// **before** calling the function and is passed into the function using
// `*mut ()`.
+ // 函数必须通过 vtable, 这需要擦除泛型类型, 为此, 函数 "返回" 被放置在栈上, 在调用函数之前, 并使用 `*mut ()` 将其传递到函数中
//
// Safety:
//
@@ -59,8 +63,10 @@ impl<T> Future for JoinHandle<T> {
}
}
+// drop joinhandle
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
+ // 如果不能快速丢弃, 则调用慢速丢弃
if self.raw.header().state.drop_join_handle_fast().is_ok() {
return;
}
diff --git a/monoio/src/task/mod.rs b/monoio/src/task/mod.rs
index 0479a3f..7a25478 100644
--- a/monoio/src/task/mod.rs
+++ b/monoio/src/task/mod.rs
@@ -27,11 +27,12 @@ use std::{future::Future, marker::PhantomData, ptr::NonNull};
/// An owned handle to the task, tracked by ref count, not sendable
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
- raw: RawTask,
+ raw: RawTask, // 原始 task 定义
_p: PhantomData<S>,
}
impl<S: 'static> Task<S> {
+ // from rawtask 创建 task
unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
Task {
raw: RawTask::from_raw(ptr),
@@ -42,12 +43,12 @@ impl<S: 'static> Task<S> {
fn header(&self) -> &Header {
self.raw.header()
}
- // 运行 (x.poll)
+ // 运行 (x.poll) | 算是 poll 的全局入口
pub(crate) fn run(self) {
self.raw.poll();
}
- #[cfg(feature = "sync")] // 编译时带有 sync 才会编译这一段
+ #[cfg(feature = "sync")]
pub(crate) unsafe fn finish(&mut self, val_slot: *mut ()) {
self.raw.finish(val_slot);
}
@@ -74,12 +75,12 @@ pub(crate) trait Schedule: Sized + 'static {
}
}
-/// 创建一个新的任务,并返回任务句柄和 JoinHandle。
+/// 全局入口, 创建一个新的任务,并返回任务 和 JoinHandle。
///
/// # 参数
///
/// - `owner_id`:任务所有者的 ID。
-/// - `task`:要执行的任务。
+/// - `task`:要执行的 future(传入的原始 future)
/// - `scheduler`:任务调度器。
///
/// # 类型参数
@@ -89,11 +90,11 @@ pub(crate) trait Schedule: Sized + 'static {
///
/// # 返回值
///
-/// 返回一个元组,包含任务句柄和 JoinHandle。
+/// 返回一个元组,包含任务 和 JoinHandle。
pub(crate) fn new_task<T, S>(
- owner_id: usize,
- task: T,
- scheduler: S,
+ owner_id: usize, // 线程id
+ task: T, // 任务
+ scheduler: S, // 调度器
) -> (Task<S>, JoinHandle<T::Output>)
where
S: Schedule,
@@ -103,14 +104,10 @@ where
unsafe { new_task_holding(owner_id, task, scheduler) }
}
-/// 创建一个新的任务,并返回任务句柄和 JoinHandle。
-///
-/// # 安全性
+/// 创建一个新的任务,并返回任务 和 JoinHandle。
///
/// 这个函数是 unsafe 的,因为它使用了裸指针。
///
-/// # 参数
-///
/// - `owner_id`:任务所有者的 ID。
/// - `task`:要执行的任务。
/// - `scheduler`:任务调度器。
@@ -122,7 +119,7 @@ where
///
/// # 返回值
///
-/// 返回一个元组,包含任务句柄和 JoinHandle。
+/// 返回一个元组,包含任务 和 JoinHandle。
pub(crate) unsafe fn new_task_holding<T, S>(
owner_id: usize,
task: T,
diff --git a/monoio/src/task/raw.rs b/monoio/src/task/raw.rs
index 60780d8..6db6ae3 100644
--- a/monoio/src/task/raw.rs
+++ b/monoio/src/task/raw.rs
@@ -18,7 +18,7 @@ impl Clone for RawTask {
impl Copy for RawTask {}
-// 函数表?
+// 函数表
pub(crate) struct Vtable {
/// Poll the future
pub(crate) poll: unsafe fn(NonNull<Header>),
@@ -31,6 +31,7 @@ pub(crate) struct Vtable {
pub(crate) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),
/// The join handle has been dropped
+ /// 缓慢的丢弃 join handle
pub(crate) drop_join_handle_slow: unsafe fn(NonNull<Header>),
/// Set future output
@@ -39,12 +40,12 @@ pub(crate) struct Vtable {
}
/// Get the vtable for the requested `T` and `S` generics.
-/// 生成
+/// 生成 vtable (函数表)
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
- poll: poll::<T, S>,
- dealloc: dealloc::<T, S>,
- try_read_output: try_read_output::<T, S>,
+ poll: poll::<T, S>, // 调用 poll
+ dealloc: dealloc::<T, S>, // 释放资源
+ try_read_output: try_read_output::<T, S>, // 尝试读取输出
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
#[cfg(feature = "sync")]
finish: finish::<T, S>,
@@ -57,26 +58,29 @@ impl RawTask {
T: Future,
S: Schedule,
{
+ // 指向 new Cell 的裸指针
let ptr = Box::into_raw(Cell::new(owner_id, task, scheduler));
+ // 指向 Header 的裸指针 | Header 是 Cell 的第一个字段
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
RawTask { ptr }
}
-
+ // 由 Header 生成 RawTask
pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask {
RawTask { ptr }
}
-
+ // 获取 header 字段的引用
pub(crate) fn header(&self) -> &Header {
unsafe { self.ptr.as_ref() }
}
/// Safety: mutual exclusion is required to call this function.
+ /// 执行 函数表的 poll 函数
pub(crate) fn poll(self) {
let vtable: &Vtable = self.header().vtable;
unsafe { (vtable.poll)(self.ptr) }
}
-
+ // 执行 函数表的 dealloc 函数 | 释放内存
pub(crate) fn dealloc(self) {
let vtable = self.header().vtable;
unsafe {
@@ -86,11 +90,12 @@ impl RawTask {
/// Safety: `dst` must be a `*mut Poll<super::Result<T::Output>>` where `T`
/// is the future stored by the task.
+ /// 尝试读取任务输出, 必须确保 dst 是一个 `*mut Poll<super::Result<T::Output>>` 类型
pub(crate) unsafe fn try_read_output(self, dst: *mut (), waker: &Waker) {
let vtable = self.header().vtable;
(vtable.try_read_output)(self.ptr, dst, waker);
}
-
+ // 缓慢的丢弃 join handle
pub(crate) fn drop_join_handle_slow(self) {
let vtable = self.header().vtable;
unsafe { (vtable.drop_join_handle_slow)(self.ptr) }
@@ -103,11 +108,12 @@ impl RawTask {
}
}
+// 指向 poll 函数, poll future
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll();
}
-
+// 指向 dealloc 函数,释放内存
unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness: Harness<T, S> = Harness::<T, S>::from_raw(ptr);
harness.dealloc();
@@ -119,7 +125,7 @@ unsafe fn finish<T: Future, S: Schedule>(ptr: NonNull<Header>, val: *mut ()) {
let val = &mut *(val as *mut Option<<T as Future>::Output>);
harness.finish(val.take().unwrap());
}
-
+// 尝试读取输出
unsafe fn try_read_output<T: Future, S: Schedule>(
ptr: NonNull<Header>,
dst: *mut (),
@@ -130,7 +136,7 @@ unsafe fn try_read_output<T: Future, S: Schedule>(
let harness = Harness::<T, S>::from_raw(ptr);
harness.try_read_output(out, waker);
}
-
+// 缓慢丢弃 join handle
unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.drop_join_handle_slow()
diff --git a/monoio/src/task/state.rs b/monoio/src/task/state.rs
index 0c94a7f..c74966c 100644
--- a/monoio/src/task/state.rs
+++ b/monoio/src/task/state.rs
@@ -10,7 +10,7 @@ use std::{
pub(crate) struct State(AtomicUsize);
/// Current state value
-/// 真正 state 的值
+/// 真正 state 的值,有一堆的位操作
#[derive(Copy, Clone)] // 实现了 Copy 和 Clone
pub(crate) struct Snapshot(usize);
@@ -27,6 +27,7 @@ const RUNNING: usize = 0b0001;
const COMPLETE: usize = 0b0010;
/// Extracts the task's lifecycle value from the state
+/// 由 state 获取生命周期
const LIFECYCLE_MASK: usize = 0b11;
/// Flag tracking if the task has been pushed into a run queue.
@@ -39,6 +40,7 @@ const NOTIFIED: usize = 0b100;
const JOIN_INTEREST: usize = 0b1_000;
/// A join handle waker has been set
+/// join handle 已设置
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const JOIN_WAKER: usize = 0b10_000;
@@ -46,22 +48,23 @@ const JOIN_WAKER: usize = 0b10_000;
const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER;
/// Bits used by the ref count portion of the state.
-///
+/// 引用计数部分使用的位
const REF_COUNT_MASK: usize = !STATE_MASK;
/// Number of positions to shift the ref count
+/// 引用计数的位移
const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
/// One ref count
+/// 引用计数 的 总数
const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
/// State a task is initialized with
-/// 任务状态初始化
/// A task is initialized with two references:
-/// 两个引用
+/// 任务状态初始化 两个引用计数
/// * A reference for Task.
/// * A reference for the JoinHandle.
-///
+/// task 和 joinhandle
/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
/// As the task starts with a `Notified`, `NOTIFIED` is set.
const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED;
@@ -79,14 +82,15 @@ pub(super) enum TransitionToNotified {
}
impl State {
+ // 创建 state
pub(crate) fn new() -> Self {
- State(AtomicUsize::new(INITIAL_STATE))
+ State(AtomicUsize::new(INITIAL_STATE))
}
-
+ // 获取 state 值
pub(crate) fn load(&self) -> Snapshot {
Snapshot(self.0.load(Acquire))
}
-
+ // 设置 state 值
pub(crate) fn store(&self, val: Snapshot) {
self.0.store(val.0, Release);
}
@@ -96,7 +100,7 @@ impl State {
/// 生命周期转换为 运行, 设置 notified 为 false, 以便在 poll 期间检测到通知
pub(super) fn transition_to_running(&self) {
let mut snapshot = self.load(); // 获取值
- debug_assert!(snapshot.is_notified());
+ debug_assert!(snapshot.is_notified());
debug_assert!(snapshot.is_idle());
snapshot.set_running(); // 设置运行
snapshot.unset_notified(); // 通知 = false
@@ -109,7 +113,7 @@ impl State {
let mut snapshot = self.load();
debug_assert!(snapshot.is_running());
snapshot.unset_running(); // 运行 = false
- // 如果 notified 位为 true, 则返回 OkNotified
+ // 如果 notified 位为 true, 则返回 OkNotified
let action = if snapshot.is_notified() {
TransitionToIdle::OkNotified // 已经通知
} else {
@@ -122,7 +126,7 @@ impl State {
/// Transitions the task from `Running` -> `Complete`.
/// 运行 --> 完成
pub(super) fn transition_to_complete(&self) -> Snapshot {
- const DELTA: usize = RUNNING | COMPLETE;
+ const DELTA: usize = RUNNING | COMPLETE; // 两个位掩码的按位或操作的结果
let prev = Snapshot(self.0.fetch_xor(DELTA, AcqRel));
debug_assert!(prev.is_running());
@@ -132,23 +136,28 @@ impl State {
}
/// Transitions the state to `NOTIFIED`.
+ /// 根据 state 值,设置 NOTIFIED 位,并返回 TransitionToNotified 枚举
pub(super) fn transition_to_notified(&self) -> TransitionToNotified {
let mut snapshot = self.load();
+ // running 状态 ->
let action = if snapshot.is_running() {
- snapshot.set_notified();
- TransitionToNotified::DoNothing
+ snapshot.set_notified(); // 设置通知
+ TransitionToNotified::DoNothing // 不做任何事情
} else if snapshot.is_complete() || snapshot.is_notified() {
+ // 已完成 或 已经通知过了 -> 不做任何事情
TransitionToNotified::DoNothing
} else {
+ // 其他情况 -> 设置通知,并 Submit
snapshot.set_notified();
TransitionToNotified::Submit
};
- self.store(snapshot);
+ self.store(snapshot); // 更新 state
action
}
/// Optimistically tries to swap the state assuming the join handle is
/// __immediately__ dropped on spawn
+ /// 异步任务启动后立刻丢弃 join handle ??
pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
if *self.load() == INITIAL_STATE {
self.store(Snapshot((INITIAL_STATE - REF_ONE) & !JOIN_INTEREST));
@@ -160,7 +169,7 @@ impl State {
}
/// Try to unset the JOIN_INTEREST flag.
- ///
+ /// 尝试取消 JOIN_INTEREST 标志
/// Returns `Ok` if the operation happens before the task transitions to a
/// completed state, `Err` otherwise.
pub(super) fn unset_join_interested(&self) -> UpdateResult {
@@ -179,7 +188,7 @@ impl State {
}
/// Set the `JOIN_WAKER` bit.
- ///
+ /// 设置 JOIN_WAKER 位
/// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
/// the task has completed.
pub(super) fn set_join_waker(&self) -> UpdateResult {
@@ -199,7 +208,7 @@ impl State {
}
/// Unsets the `JOIN_WAKER` bit.
- ///
+ /// 取消 JOIN_WAKER 位
/// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
/// the task has completed.
pub(super) fn unset_waker(&self) -> UpdateResult {
@@ -236,6 +245,7 @@ impl State {
}
/// Returns `true` if the task should be released.
+ /// 返回 true 如果任务应该被释放
pub(crate) fn ref_dec(&self) -> bool {
let prev = Snapshot(self.0.fetch_sub(REF_ONE, AcqRel));
debug_assert!(prev.ref_count() >= 1);
@@ -309,6 +319,7 @@ impl Snapshot {
}
/// Returns `true` if the task's future has completed execution.
+ /// 返回 true 如果任务的 future 已经完成
pub(super) fn is_complete(self) -> bool {
self.0 & COMPLETE == COMPLETE
}
diff --git a/monoio/src/task/utils.rs b/monoio/src/task/utils.rs
index 1e74cdd..4a7f016 100644
--- a/monoio/src/task/utils.rs
+++ b/monoio/src/task/utils.rs
@@ -6,10 +6,11 @@ pub(crate) trait UnsafeCellExt<T> {
}
impl<T> UnsafeCellExt<T> for UnsafeCell<T> {
+ // 在闭包 f 内可以访问到 不可变
fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.get())
}
-
+ // 在闭包 f 内可以访问到 可变
fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.get())
}
diff --git a/monoio/src/task/waker.rs b/monoio/src/task/waker.rs
index 59c968b..fc1b074 100644
--- a/monoio/src/task/waker.rs
+++ b/monoio/src/task/waker.rs
@@ -24,11 +24,14 @@ where
// `Waker::will_wake` uses the VTABLE pointer as part of the check. This
// means that `will_wake` will always return false when using the current
// task's waker. (discussion at rust-lang/rust#66281).
- //
+ // Waker::will_wake(检查传入 waker 是否与自身唤醒同一个 task) 使用 VTABLE 指针作为检查的一部分.
+ // 这意味着当使用当前任务的 waker 时, will_wake 总是返回 false.
+ //
// To fix this, we use a single vtable. Since we pass in a reference at this
// point and not an *owned* waker, we must ensure that `drop` is never
// called on this waker instance. This is done by wrapping it with
// `ManuallyDrop` and then never calling drop.
+ // 为了解决这个问题, 我们使用一个单独的 vtable. 传递的是一个引用, 而不是一个 waker 类型,间接解决
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) };
WakerRef {
@@ -36,7 +39,7 @@ where
_p: PhantomData,
}
}
-
+// 一个类型的引用转换成另一个类型, 也就是说可以自动解引用
impl<S> ops::Deref for WakerRef<'_, S> {
type Target = Waker;
@@ -44,7 +47,7 @@ impl<S> ops::Deref for WakerRef<'_, S> {
&self.waker
}
}
-
+// 复制 waker
unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker
where
T: Future,
@@ -55,7 +58,7 @@ where
(*header).state.ref_inc();
raw_waker::<T, S>(header)
}
-
+// 释放 waker
unsafe fn drop_waker<T, S>(ptr: *const ())
where
T: Future,
@@ -65,7 +68,7 @@ where
let harness = Harness::<T, S>::from_raw(ptr);
harness.drop_reference();
}
-
+// 值上面 调用 wake
unsafe fn wake_by_val<T, S>(ptr: *const ())
where
T: Future,
@@ -77,6 +80,7 @@ where
}
// Wake without consuming the waker
+// 引用上面 调用 wake, 不消耗引用计数
unsafe fn wake_by_ref<T, S>(ptr: *const ())
where
T: Future,
@@ -87,12 +91,14 @@ where
harness.wake_by_ref();
}
+// 创建一个 RawWaker
pub(super) fn raw_waker<T, S>(header: *const Header) -> RawWaker
where
T: Future,
S: Schedule,
{
let ptr = header as *const ();
+ // RawWakerVTable 的 函数表
let vtable = &RawWakerVTable::new(
clone_waker::<T, S>,
wake_by_val::<T, S>,
diff --git a/monoio/src/task/waker_fn.rs b/monoio/src/task/waker_fn.rs
index bb59fad..9395ed9 100644
--- a/monoio/src/task/waker_fn.rs
+++ b/monoio/src/task/waker_fn.rs
@@ -2,16 +2,17 @@ use core::task::{RawWaker, RawWakerVTable, Waker};
use std::cell::Cell;
/// Creates a waker that does nothing.
-///
+/// 创建 一个什么都不做的 waker
/// This `Waker` is useful for polling a `Future` to check whether it is
/// `Ready`, without doing any additional work.
-/// 占位 Waker
+/// 这个 waker 用于轮询一个 future, 检查它是否 ready, 而不做任何额外的工作
pub(crate) fn dummy_waker() -> Waker {
fn raw_waker() -> RawWaker {
// the pointer is never dereferenced, so null is ok
+ // 指针永远不会被解引用, 所以 null 是可以的
RawWaker::new(std::ptr::null::<()>(), vtable())
}
-
+ // 一个静态的 RawWakerVTable
fn vtable() -> &'static RawWakerVTable {
&RawWakerVTable::new(
|_| raw_waker(),
@@ -36,7 +37,7 @@ pub(crate) fn should_poll() -> bool {
SHOULD_POLL.replace(false) // 不轮询
}
-#[inline]
+#[inline]
pub(crate) fn set_poll() {
SHOULD_POLL.set(true); // 轮询
}