diff options
| author | zy <[email protected]> | 2023-08-31 09:51:56 +0000 |
|---|---|---|
| committer | zy <[email protected]> | 2023-08-31 09:51:56 +0000 |
| commit | 7e524e6ba28376e9f3cb9fe550d9d5c03ff6129a (patch) | |
| tree | 033aa03c892f205afec9fd77ac474edef2d280bd | |
| parent | 15685d1d01246ec3ca058498f675eaef4e28af35 (diff) | |
task drive 相关梳理
| -rw-r--r-- | monoio/src/blocking.rs | 1 | ||||
| -rw-r--r-- | monoio/src/builder.rs | 2 | ||||
| -rw-r--r-- | monoio/src/driver/mod.rs | 6 | ||||
| -rw-r--r-- | monoio/src/driver/op.rs | 22 | ||||
| -rw-r--r-- | monoio/src/runtime.rs | 17 | ||||
| -rw-r--r-- | monoio/src/task/core.rs | 37 | ||||
| -rw-r--r-- | monoio/src/task/harness.rs | 45 | ||||
| -rw-r--r-- | monoio/src/task/join.rs | 6 | ||||
| -rw-r--r-- | monoio/src/task/mod.rs | 27 | ||||
| -rw-r--r-- | monoio/src/task/raw.rs | 30 | ||||
| -rw-r--r-- | monoio/src/task/state.rs | 45 | ||||
| -rw-r--r-- | monoio/src/task/utils.rs | 3 | ||||
| -rw-r--r-- | monoio/src/task/waker.rs | 16 | ||||
| -rw-r--r-- | monoio/src/task/waker_fn.rs | 9 | ||||
| -rw-r--r-- | monoio/src/utils/slab.rs | 10 |
15 files changed, 180 insertions, 96 deletions
diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index c8262c4..144a088 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -161,6 +161,7 @@ impl ThreadPool for DefaultThreadPool { } } +// 完全公平调度 pub(crate) struct NoopScheduler; impl crate::task::Schedule for NoopScheduler { diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index 81a3f6b..735e40a 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -110,7 +110,7 @@ impl Buildable for LegacyDriver { #[cfg(all(target_os = "linux", feature = "iouring"))] impl Buildable for IoUringDriver { fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<IoUringDriver>> { - let thread_id = gen_id(); + let thread_id = gen_id(); // 线程 id | 与内核无关,仅仅是个唯一标识 #[cfg(feature = "sync")] let blocking_handle = this.blocking_handle; diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index b4048d4..7306dd7 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -91,6 +91,7 @@ pub trait Driver { fn unpark(&self) -> Self::Unpark; } +// 线程内独享的全局变量 scoped_thread_local!(pub(crate) static CURRENT: Inner); pub(crate) enum Inner { @@ -101,6 +102,7 @@ pub(crate) enum Inner { } impl Inner { + // 提交操作 到 内核 fn submit_with<T: OpAble>(&self, data: T) -> io::Result<Op<T>> { match self { #[cfg(windows)] @@ -120,6 +122,8 @@ impl Inner { } #[allow(unused)] + // 执行 poll 返回状态(Poll:Pending or Poll:Ready<>) + // 根据 io 状态选择,返回不同的状态 fn poll_op<T: OpAble>( &self, data: &mut T, @@ -144,6 +148,7 @@ impl Inner { } #[allow(unused)] + // 丢弃 op fn drop_op<T: 'static>(&self, index: usize, data: &mut Option<T>) { match self { #[cfg(windows)] @@ -186,6 +191,7 @@ impl Inner { } #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] + // 是否是兼容模式 fn is_legacy(&self) -> bool { matches!(self, Inner::Legacy(..)) } diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index bb6577c..a3b32dd 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -35,6 +35,7 @@ pub(crate) struct Op<T: 'static> { } /// Operation completion. Returns stored state with the result of the operation. +/// 返回存储的状态与操作的结果。 #[derive(Debug)] pub(crate) struct Completion<T> { pub(crate) data: T, @@ -42,9 +43,10 @@ pub(crate) struct Completion<T> { } /// Operation completion meta info. +/// 操作完成元信息。 #[derive(Debug)] pub(crate) struct CompletionMeta { - pub(crate) result: io::Result<u32>, + pub(crate) result: io::Result<u32>, // Result of the operation #[allow(unused)] pub(crate) flags: u32, } @@ -61,6 +63,7 @@ pub(crate) trait OpAble { /// If legacy is enabled and iouring is not, we can expose io interface in a poll-like way. /// This can provide better compatibility for crates programmed in poll-like way. +/// 如果启用了 legacy 并且没有启用 iouring,我们可以以类似于 poll 的方式暴露 io 接口。 #[cfg(all(unix, feature = "legacy"))] pub(crate) trait PollLegacy { fn poll_legacy(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<CompletionMeta>; @@ -92,6 +95,8 @@ impl<T> Op<T> { /// /// `state` is stored during the operation tracking any state submitted to /// the kernel. + /// 将操作提交给 uring + /// `state` 在操作期间存储,跟踪提交给内核的任何状态。 pub(super) fn submit_with(data: T) -> io::Result<Op<T>> where T: OpAble, @@ -100,6 +105,7 @@ impl<T> Op<T> { } /// Try submitting an operation to uring + /// 尝试将操作提交给 uring #[allow(unused)] pub(super) fn try_submit_with(data: T) -> io::Result<Op<T>> where @@ -111,12 +117,14 @@ impl<T> Op<T> { Err(io::ErrorKind::Other.into()) } } - + /// Cancel an operation + /// 取消一个操作 pub(crate) fn op_canceller(&self) -> OpCanceller where T: OpAble, { - #[cfg(feature = "legacy")] + #[cfg(feature = "legacy")] // 兼容模式 + // 如果是兼容模式 if is_legacy() { return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() { OpCanceller { @@ -137,16 +145,17 @@ impl<T> Op<T> { } } } - +/// 重点: 这里实现了 Future impl<T> Future for Op<T> where - T: Unpin + OpAble + 'static, + T: Unpin + OpAble + 'static, // T 要实现 OpAble { type Output = Completion<T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = &mut *self; let data_mut = me.data.as_mut().expect("unexpected operation state"); + // 调用 driver 执行 io let meta = ready!(me.driver.poll_op::<T>(data_mut, me.index, cx)); me.index = usize::MAX; @@ -167,7 +176,8 @@ pub(crate) fn is_legacy() -> bool { true } -#[cfg(target_os = "linux")] +#[cfg(target_os = "linux")] +// 是否是兼容模式 pub(crate) fn is_legacy() -> bool { super::CURRENT.with(|inner| inner.is_legacy()) } diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index d85f838..d5c0cfa 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -149,7 +149,7 @@ impl<D> Runtime<D> { Self { context, driver } } - /// 阻塞给定的 future + /// 主循环入口 pub fn block_on<F>(&mut self, future: F) -> F::Output where F: Future, @@ -166,20 +166,20 @@ impl<D> Runtime<D> { self.driver.with(|| { CURRENT.set(&self.context, || { - #[cfg(feature = "sync")] - let join = unsafe { spawn_without_static(future) }; - #[cfg(not(feature = "sync"))] + // #[cfg(feature = "sync")] + // let join = unsafe { spawn_without_static(future) }; + // #[cfg(not(feature = "sync"))] // join = future let join = future; let mut join = std::pin::pin!(join); - set_poll(); + set_poll(); // 设置 SHOULD_POLL true loop { loop { // Consume all tasks(with max round to prevent io starvation) // 消费所有 task (最大轮数防止 io 饥饿) let mut max_round = self.context.tasks.len() * 2; // Maximum round | Force exit when reaching the maximum round while let Some(t) = self.context.tasks.pop() { - t.run(); // 执行任务 + t.run(); // 执行任务 运行 (x.poll) // 避免无限循环 if max_round == 0 { // maybe there's a looping task @@ -190,7 +190,7 @@ impl<D> Runtime<D> { } } - // Check main future + // Check main future | 这里才第一次运行 join while should_poll() { // check if ready if let std::task::Poll::Ready(t) = join.as_mut().poll(cx) { @@ -206,13 +206,12 @@ impl<D> Runtime<D> { break; } // Cold path | 到这一步比较少 - // 待定 let _ = self.driver.submit(); } // Wait and Process CQ(the error is ignored for not debug mode) #[cfg(not(all(debug_assertions, feature = "debug")))] - let _ = self.driver.park(); // 无限等待并处理返回的事件 ?? + let _ = self.driver.park(); // 无限等待并处理返回的事件 #[cfg(all(debug_assertions, feature = "debug"))] if let Err(e) = self.driver.park() { diff --git a/monoio/src/task/core.rs b/monoio/src/task/core.rs index e92af77..d27cc3b 100644 --- a/monoio/src/task/core.rs +++ b/monoio/src/task/core.rs @@ -14,30 +14,36 @@ use super::{ #[repr(C)] // 确保该结构的内存 与 C 语言兼容 pub(crate) struct Cell<T: Future, S> { - pub(crate) header: Header, - pub(crate) core: Core<T, S>, - pub(crate) trailer: Trailer, + pub(crate) header: Header, // 头部 + pub(crate) core: Core<T, S>, // 核心 + pub(crate) trailer: Trailer, // 尾部 } pub(crate) struct Core<T: Future, S> { /// Scheduler used to drive this future + /// 调度器 用来驱动 future pub(crate) scheduler: S, /// Either the future or the output + /// 不是 future 就是 output?? + /// 任务运行阶段, 任务执行的状态 pub(crate) stage: CoreStage<T>, } +// 阶段 pub(crate) struct CoreStage<T: Future> { stage: UnsafeCell<Stage<T>>, } +// 阶段 pub(crate) enum Stage<T: Future> { - Running(T), - Finished(T::Output), - Consumed, + Running(T), // 正在运行 + Finished(T::Output), // 已结束 + Consumed, // 已消耗? } #[repr(C)] pub(crate) struct Header { /// State + /// 任务状态 RUNNING COMPLETE NOTIFIED JOIN_INTEREST JOIN_WAKER pub(crate) state: State, /// Table of function pointers for executing actions on the task. /// 函数表 @@ -49,13 +55,14 @@ pub(crate) struct Header { pub(crate) struct Trailer { /// Consumer task waiting on completion of this task. - /// 唤醒的 Waker + /// 持有 Waker pub(crate) waker: UnsafeCell<Option<Waker>>, } impl<T: Future, S: Schedule> Cell<T, S> { /// Allocates a new task cell, containing the header, trailer, and core /// structures. + /// 新的任务单元, 包含 头部, 尾部, 核心 pub(crate) fn new(owner_id: usize, future: T, scheduler: S) -> Box<Cell<T, S>> { Box::new(Cell { header: Header { @@ -105,19 +112,19 @@ impl<T: Future> CoreStage<T> { } /// Drop the future - /// + /// 丢弃 future /// # Safety /// /// The caller must ensure it is safe to mutate the `stage` field. pub(crate) fn drop_future_or_output(&self) { // Safety: the caller ensures mutual exclusion to the field. unsafe { - self.set_stage(Stage::Consumed); + self.set_stage(Stage::Consumed); // 运行状态设为 已消费 } } /// Store the task output - /// + /// 存储任务输出 /// # Safety /// /// The caller must ensure it is safe to mutate the `stage` field. @@ -129,7 +136,7 @@ impl<T: Future> CoreStage<T> { } /// Take the task output - /// + /// 获取任务输出 /// # Safety /// /// The caller must ensure it is safe to mutate the `stage` field. @@ -152,6 +159,7 @@ impl<T: Future> CoreStage<T> { impl Header { #[allow(unused)] + // 获取当前线程 id pub(crate) fn get_owner_id(&self) -> usize { // safety: If there are concurrent writes, then that write has violated // the safety requirements on `set_owner_id`. @@ -160,20 +168,23 @@ impl Header { } impl Trailer { + // 设置 waker pub(crate) unsafe fn set_waker(&self, waker: Option<Waker>) { self.waker.with_mut(|ptr| { *ptr = waker; }); } - + // 传入的 waker 和 当前waker 实例 self 是否唤醒的是同一个 任务 pub(crate) unsafe fn will_wake(&self, waker: &Waker) -> bool { self.waker .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker)) } - + // 唤醒 任务 pub(crate) fn wake_join(&self) { self.waker.with(|ptr| match unsafe { &*ptr } { + // 唤醒 join Some(waker) => waker.wake_by_ref(), + // waker 没有值直接 panic None => panic!("waker missing"), }); } diff --git a/monoio/src/task/harness.rs b/monoio/src/task/harness.rs index a8c19ff..d181714 100644 --- a/monoio/src/task/harness.rs +++ b/monoio/src/task/harness.rs @@ -16,6 +16,7 @@ use crate::{ utils::thread_id::{try_get_current_thread_id, DEFAULT_THREAD_ID}, }; +// 非空 Cell 的包装 pub(crate) struct Harness<T: Future, S: 'static> { cell: NonNull<Cell<T, S>>, } @@ -25,12 +26,13 @@ where T: Future, S: 'static, { + // 由 Header 指针 -> Cell -> Harness pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> { Harness { cell: ptr.cast::<Cell<T, S>>(), } } - + // 访问各个字段 fn header(&self) -> &Header { unsafe { &self.cell.as_ref().header } } @@ -50,17 +52,20 @@ where S: Schedule, { /// Polls the inner future. + /// 真正执行 poll 的地方 pub(super) fn poll(self) { trace!("MONOIO DEBUG[Harness]:: poll"); + // 匹配 poll 的结果 match self.poll_inner() { PollFuture::Notified => { // We should re-schedule the task. // 重新执行任务 self.header().state.ref_inc(); + // 到就绪队列头部,下一个执行 self.core().scheduler.yield_now(self.get_new_task()); } PollFuture::Complete => { - self.complete(); + self.complete(); // 完成任务 } PollFuture::Done => (), } @@ -80,20 +85,22 @@ where // poll the future let waker_ref = waker_ref::<T, S>(self.header()); let cx = Context::from_waker(&waker_ref); - let res = poll_future(&self.core().stage, cx); // poll future - // if ready + let res = poll_future(&self.core().stage, cx); // poll future wow! + // if ready if res == Poll::Ready(()) { - return PollFuture::Complete; // return complete + return PollFuture::Complete; // return complete 任务完成 } // task 没有完成 use super::state::TransitionToIdle; // task 状态到 挂起 match self.header().state.transition_to_idle() { + // 未被通知 TransitionToIdle::Ok => PollFuture::Done, + // 已经被通知了 TransitionToIdle::OkNotified => PollFuture::Notified, } } - + // 解除分配 pub(super) fn dealloc(self) { trace!("MONOIO DEBUG[Harness]:: dealloc"); @@ -102,9 +109,11 @@ where self.trailer().waker.with_mut(drop); // Check causality + // 任务运行状态 然后干了些啥.. self.core().stage.with_mut(drop); unsafe { + // from_raw 需要操作裸指针,因此是 unsafe 包裹 drop(Box::from_raw(self.cell.as_ptr())); } } @@ -118,10 +127,13 @@ where } // ===== join handle ===== + // 任务的 handle /// Read the task output into `dst`. + /// 读取任务的输出到 dst pub(super) fn try_read_output(self, dst: &mut Poll<T::Output>, waker: &Waker) { trace!("MONOIO DEBUG[Harness]:: try_read_output"); + // 如何能读到任务返回值 if can_read_output(self.header(), self.trailer(), waker) { *dst = Poll::Ready(self.core().stage.take_output()); } @@ -158,15 +170,19 @@ where } // ===== waker behavior ===== + // waker 相关的属性 /// This call consumes a ref-count and notifies the task. This will create a /// new Notified and submit it if necessary. + /// 这个调用消耗一个引用计数并且通知任务,如果有必要的话,这将创建一个新的 Notified 并且提交它 /// /// The caller does not need to hold a ref-count besides the one that was /// passed to this call. + /// 调用者不需要持有一个引用计数,除了传递给这个调用的引用计数 pub(super) fn wake_by_val(self) { trace!("MONOIO DEBUG[Harness]:: wake_by_val"); let owner_id = self.header().owner_id; + // 如果是不同的线程 if is_remote_task(owner_id) { // send to target thread trace!("MONOIO DEBUG[Harness]:: wake_by_val with another thread id"); @@ -217,9 +233,11 @@ where /// This call notifies the task. It will not consume any ref-counts, but the /// caller should hold a ref-count. This will create a new Notified and /// submit it if necessary. + /// 这个调用通知任务, 它不会消耗任何引用计数 ,但是调用者应该持有一个引用计数,如果有必要的话, 这将创建一个新的 Notified 并且提交它 pub(super) fn wake_by_ref(&self) { trace!("MONOIO DEBUG[Harness]:: wake_by_ref"); let owner_id = self.header().owner_id; + // 如果在不同线程 if is_remote_task(owner_id) { // send to target thread trace!("MONOIO DEBUG[Harness]:: wake_by_ref with another thread id"); @@ -266,6 +284,7 @@ where } } + // drop it! pub(super) fn drop_reference(self) { trace!("MONOIO DEBUG[Harness]:: drop_reference"); if self.header().state.ref_dec() { @@ -274,11 +293,14 @@ where } // ====== internal ====== + // 下面的方法全部都是 模块内调用的了 /// Complete the task. This method assumes that the state is RUNNING. + /// 完成任务,这个方法假设状态是 RUNNING fn complete(self) { // The future has completed and its output has been written to the task // stage. We transition from running to complete. + // 任务完成,并且它的输出已经被写入到 stage 将任务 `Running` -> `Complete` let snapshot = self.header().state.transition_to_complete(); @@ -299,7 +321,7 @@ where } /// Create a new task that holds its own ref-count. - /// + /// 创建一个新的任务,它持有自己的引用计数. 其实还是指向同一个 rawTask /// # Safety /// /// Any use of `self` after this call must ensure that a ref-count to the @@ -312,7 +334,7 @@ where unsafe { Task::from_raw(self.cell.cast()) } } } - +// 任务所在 线程是否是当前线程 fn is_remote_task(owner_id: usize) -> bool { if owner_id == DEFAULT_THREAD_ID { return true; @@ -322,7 +344,7 @@ fn is_remote_task(owner_id: usize) -> bool { None => true, } } - +// task 是否能读到输出 fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool { // Load a snapshot of the current task state let snapshot = header.state.load(); @@ -409,6 +431,7 @@ enum PollFuture { /// Poll the future. If the future completes, the output is written to the /// stage field. +/// poll future 完成,输出被写入到 stage 字段 fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { // CHIHAI: For efficiency we do not catch. @@ -429,9 +452,10 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { // mem::forget(guard); // res // })); - let output = core.poll(cx); + let output = core.poll(cx); // poll future // Prepare output for being placed in the core stage. + // 预备将 output let output = match output { // Ok(Poll::Pending) => return Poll::Pending, // Ok(Poll::Ready(output)) => Ok(output), @@ -444,6 +468,7 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { // let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { // core.store_output(output); // })); + //output 放入 core stage core.store_output(output); Poll::Ready(()) diff --git a/monoio/src/task/join.rs b/monoio/src/task/join.rs index 4f11add..f664d7b 100644 --- a/monoio/src/task/join.rs +++ b/monoio/src/task/join.rs @@ -9,6 +9,7 @@ use super::raw::RawTask; /// JoinHandle can be used to wait task finished. /// Note if you drop it directly, task will not be terminated. +///用于等待 task 完成, 但是如果直接丢弃它, task 将不会立刻被终止 pub struct JoinHandle<T> { raw: RawTask, _p: PhantomData<T>, @@ -26,6 +27,7 @@ impl<T> JoinHandle<T> { } /// Checks if the task associated with this `JoinHandle` has finished. + /// 检查与此 `JoinHandle` 关联的 task 是否已完成 pub fn is_finished(&self) -> bool { let state = self.raw.header().state.load(); state.is_complete() @@ -42,11 +44,13 @@ impl<T> Future for JoinHandle<T> { // Try to read the task output. If the task is not yet complete, the // waker is stored and is notified once the task does complete. + // 尝试读取 task 输出, 如果 task 还没有完成, 则存储 waker, 并在 task 完成时通知它 // // The function must go via the vtable, which requires erasing generic // types. To do this, the function "return" is placed on the stack // **before** calling the function and is passed into the function using // `*mut ()`. + // 函数必须通过 vtable, 这需要擦除泛型类型, 为此, 函数 "返回" 被放置在栈上, 在调用函数之前, 并使用 `*mut ()` 将其传递到函数中 // // Safety: // @@ -59,8 +63,10 @@ impl<T> Future for JoinHandle<T> { } } +// drop joinhandle impl<T> Drop for JoinHandle<T> { fn drop(&mut self) { + // 如果不能快速丢弃, 则调用慢速丢弃 if self.raw.header().state.drop_join_handle_fast().is_ok() { return; } diff --git a/monoio/src/task/mod.rs b/monoio/src/task/mod.rs index 0479a3f..7a25478 100644 --- a/monoio/src/task/mod.rs +++ b/monoio/src/task/mod.rs @@ -27,11 +27,12 @@ use std::{future::Future, marker::PhantomData, ptr::NonNull}; /// An owned handle to the task, tracked by ref count, not sendable #[repr(transparent)] pub(crate) struct Task<S: 'static> { - raw: RawTask, + raw: RawTask, // 原始 task 定义 _p: PhantomData<S>, } impl<S: 'static> Task<S> { + // from rawtask 创建 task unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { Task { raw: RawTask::from_raw(ptr), @@ -42,12 +43,12 @@ impl<S: 'static> Task<S> { fn header(&self) -> &Header { self.raw.header() } - // 运行 (x.poll) + // 运行 (x.poll) | 算是 poll 的全局入口 pub(crate) fn run(self) { self.raw.poll(); } - #[cfg(feature = "sync")] // 编译时带有 sync 才会编译这一段 + #[cfg(feature = "sync")] pub(crate) unsafe fn finish(&mut self, val_slot: *mut ()) { self.raw.finish(val_slot); } @@ -74,12 +75,12 @@ pub(crate) trait Schedule: Sized + 'static { } } -/// 创建一个新的任务,并返回任务句柄和 JoinHandle。 +/// 全局入口, 创建一个新的任务,并返回任务 和 JoinHandle。 /// /// # 参数 /// /// - `owner_id`:任务所有者的 ID。 -/// - `task`:要执行的任务。 +/// - `task`:要执行的 future(传入的原始 future) /// - `scheduler`:任务调度器。 /// /// # 类型参数 @@ -89,11 +90,11 @@ pub(crate) trait Schedule: Sized + 'static { /// /// # 返回值 /// -/// 返回一个元组,包含任务句柄和 JoinHandle。 +/// 返回一个元组,包含任务 和 JoinHandle。 pub(crate) fn new_task<T, S>( - owner_id: usize, - task: T, - scheduler: S, + owner_id: usize, // 线程id + task: T, // 任务 + scheduler: S, // 调度器 ) -> (Task<S>, JoinHandle<T::Output>) where S: Schedule, @@ -103,14 +104,10 @@ where unsafe { new_task_holding(owner_id, task, scheduler) } } -/// 创建一个新的任务,并返回任务句柄和 JoinHandle。 -/// -/// # 安全性 +/// 创建一个新的任务,并返回任务 和 JoinHandle。 /// /// 这个函数是 unsafe 的,因为它使用了裸指针。 /// -/// # 参数 -/// /// - `owner_id`:任务所有者的 ID。 /// - `task`:要执行的任务。 /// - `scheduler`:任务调度器。 @@ -122,7 +119,7 @@ where /// /// # 返回值 /// -/// 返回一个元组,包含任务句柄和 JoinHandle。 +/// 返回一个元组,包含任务 和 JoinHandle。 pub(crate) unsafe fn new_task_holding<T, S>( owner_id: usize, task: T, diff --git a/monoio/src/task/raw.rs b/monoio/src/task/raw.rs index 60780d8..6db6ae3 100644 --- a/monoio/src/task/raw.rs +++ b/monoio/src/task/raw.rs @@ -18,7 +18,7 @@ impl Clone for RawTask { impl Copy for RawTask {} -// 函数表? +// 函数表 pub(crate) struct Vtable { /// Poll the future pub(crate) poll: unsafe fn(NonNull<Header>), @@ -31,6 +31,7 @@ pub(crate) struct Vtable { pub(crate) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker), /// The join handle has been dropped + /// 缓慢的丢弃 join handle pub(crate) drop_join_handle_slow: unsafe fn(NonNull<Header>), /// Set future output @@ -39,12 +40,12 @@ pub(crate) struct Vtable { } /// Get the vtable for the requested `T` and `S` generics. -/// 生成 +/// 生成 vtable (函数表) pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable { &Vtable { - poll: poll::<T, S>, - dealloc: dealloc::<T, S>, - try_read_output: try_read_output::<T, S>, + poll: poll::<T, S>, // 调用 poll + dealloc: dealloc::<T, S>, // 释放资源 + try_read_output: try_read_output::<T, S>, // 尝试读取输出 drop_join_handle_slow: drop_join_handle_slow::<T, S>, #[cfg(feature = "sync")] finish: finish::<T, S>, @@ -57,26 +58,29 @@ impl RawTask { T: Future, S: Schedule, { + // 指向 new Cell 的裸指针 let ptr = Box::into_raw(Cell::new(owner_id, task, scheduler)); + // 指向 Header 的裸指针 | Header 是 Cell 的第一个字段 let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) }; RawTask { ptr } } - + // 由 Header 生成 RawTask pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask { RawTask { ptr } } - + // 获取 header 字段的引用 pub(crate) fn header(&self) -> &Header { unsafe { self.ptr.as_ref() } } /// Safety: mutual exclusion is required to call this function. + /// 执行 函数表的 poll 函数 pub(crate) fn poll(self) { let vtable: &Vtable = self.header().vtable; unsafe { (vtable.poll)(self.ptr) } } - + // 执行 函数表的 dealloc 函数 | 释放内存 pub(crate) fn dealloc(self) { let vtable = self.header().vtable; unsafe { @@ -86,11 +90,12 @@ impl RawTask { /// Safety: `dst` must be a `*mut Poll<super::Result<T::Output>>` where `T` /// is the future stored by the task. + /// 尝试读取任务输出, 必须确保 dst 是一个 `*mut Poll<super::Result<T::Output>>` 类型 pub(crate) unsafe fn try_read_output(self, dst: *mut (), waker: &Waker) { let vtable = self.header().vtable; (vtable.try_read_output)(self.ptr, dst, waker); } - + // 缓慢的丢弃 join handle pub(crate) fn drop_join_handle_slow(self) { let vtable = self.header().vtable; unsafe { (vtable.drop_join_handle_slow)(self.ptr) } @@ -103,11 +108,12 @@ impl RawTask { } } +// 指向 poll 函数, poll future unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) { let harness = Harness::<T, S>::from_raw(ptr); harness.poll(); } - +// 指向 dealloc 函数,释放内存 unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) { let harness: Harness<T, S> = Harness::<T, S>::from_raw(ptr); harness.dealloc(); @@ -119,7 +125,7 @@ unsafe fn finish<T: Future, S: Schedule>(ptr: NonNull<Header>, val: *mut ()) { let val = &mut *(val as *mut Option<<T as Future>::Output>); harness.finish(val.take().unwrap()); } - +// 尝试读取输出 unsafe fn try_read_output<T: Future, S: Schedule>( ptr: NonNull<Header>, dst: *mut (), @@ -130,7 +136,7 @@ unsafe fn try_read_output<T: Future, S: Schedule>( let harness = Harness::<T, S>::from_raw(ptr); harness.try_read_output(out, waker); } - +// 缓慢丢弃 join handle unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) { let harness = Harness::<T, S>::from_raw(ptr); harness.drop_join_handle_slow() diff --git a/monoio/src/task/state.rs b/monoio/src/task/state.rs index 0c94a7f..c74966c 100644 --- a/monoio/src/task/state.rs +++ b/monoio/src/task/state.rs @@ -10,7 +10,7 @@ use std::{ pub(crate) struct State(AtomicUsize); /// Current state value -/// 真正 state 的值 +/// 真正 state 的值,有一堆的位操作 #[derive(Copy, Clone)] // 实现了 Copy 和 Clone pub(crate) struct Snapshot(usize); @@ -27,6 +27,7 @@ const RUNNING: usize = 0b0001; const COMPLETE: usize = 0b0010; /// Extracts the task's lifecycle value from the state +/// 由 state 获取生命周期 const LIFECYCLE_MASK: usize = 0b11; /// Flag tracking if the task has been pushed into a run queue. @@ -39,6 +40,7 @@ const NOTIFIED: usize = 0b100; const JOIN_INTEREST: usize = 0b1_000; /// A join handle waker has been set +/// join handle 已设置 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 const JOIN_WAKER: usize = 0b10_000; @@ -46,22 +48,23 @@ const JOIN_WAKER: usize = 0b10_000; const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER; /// Bits used by the ref count portion of the state. -/// +/// 引用计数部分使用的位 const REF_COUNT_MASK: usize = !STATE_MASK; /// Number of positions to shift the ref count +/// 引用计数的位移 const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; /// One ref count +/// 引用计数 的 总数 const REF_ONE: usize = 1 << REF_COUNT_SHIFT; /// State a task is initialized with -/// 任务状态初始化 /// A task is initialized with two references: -/// 两个引用 +/// 任务状态初始化 两个引用计数 /// * A reference for Task. /// * A reference for the JoinHandle. -/// +/// task 和 joinhandle /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set. /// As the task starts with a `Notified`, `NOTIFIED` is set. const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED; @@ -79,14 +82,15 @@ pub(super) enum TransitionToNotified { } impl State { + // 创建 state pub(crate) fn new() -> Self { - State(AtomicUsize::new(INITIAL_STATE)) + State(AtomicUsize::new(INITIAL_STATE)) } - + // 获取 state 值 pub(crate) fn load(&self) -> Snapshot { Snapshot(self.0.load(Acquire)) } - + // 设置 state 值 pub(crate) fn store(&self, val: Snapshot) { self.0.store(val.0, Release); } @@ -96,7 +100,7 @@ impl State { /// 生命周期转换为 运行, 设置 notified 为 false, 以便在 poll 期间检测到通知 pub(super) fn transition_to_running(&self) { let mut snapshot = self.load(); // 获取值 - debug_assert!(snapshot.is_notified()); + debug_assert!(snapshot.is_notified()); debug_assert!(snapshot.is_idle()); snapshot.set_running(); // 设置运行 snapshot.unset_notified(); // 通知 = false @@ -109,7 +113,7 @@ impl State { let mut snapshot = self.load(); debug_assert!(snapshot.is_running()); snapshot.unset_running(); // 运行 = false - // 如果 notified 位为 true, 则返回 OkNotified + // 如果 notified 位为 true, 则返回 OkNotified let action = if snapshot.is_notified() { TransitionToIdle::OkNotified // 已经通知 } else { @@ -122,7 +126,7 @@ impl State { /// Transitions the task from `Running` -> `Complete`. /// 运行 --> 完成 pub(super) fn transition_to_complete(&self) -> Snapshot { - const DELTA: usize = RUNNING | COMPLETE; + const DELTA: usize = RUNNING | COMPLETE; // 两个位掩码的按位或操作的结果 let prev = Snapshot(self.0.fetch_xor(DELTA, AcqRel)); debug_assert!(prev.is_running()); @@ -132,23 +136,28 @@ impl State { } /// Transitions the state to `NOTIFIED`. + /// 根据 state 值,设置 NOTIFIED 位,并返回 TransitionToNotified 枚举 pub(super) fn transition_to_notified(&self) -> TransitionToNotified { let mut snapshot = self.load(); + // running 状态 -> let action = if snapshot.is_running() { - snapshot.set_notified(); - TransitionToNotified::DoNothing + snapshot.set_notified(); // 设置通知 + TransitionToNotified::DoNothing // 不做任何事情 } else if snapshot.is_complete() || snapshot.is_notified() { + // 已完成 或 已经通知过了 -> 不做任何事情 TransitionToNotified::DoNothing } else { + // 其他情况 -> 设置通知,并 Submit snapshot.set_notified(); TransitionToNotified::Submit }; - self.store(snapshot); + self.store(snapshot); // 更新 state action } /// Optimistically tries to swap the state assuming the join handle is /// __immediately__ dropped on spawn + /// 异步任务启动后立刻丢弃 join handle ?? pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { if *self.load() == INITIAL_STATE { self.store(Snapshot((INITIAL_STATE - REF_ONE) & !JOIN_INTEREST)); @@ -160,7 +169,7 @@ impl State { } /// Try to unset the JOIN_INTEREST flag. - /// + /// 尝试取消 JOIN_INTEREST 标志 /// Returns `Ok` if the operation happens before the task transitions to a /// completed state, `Err` otherwise. pub(super) fn unset_join_interested(&self) -> UpdateResult { @@ -179,7 +188,7 @@ impl State { } /// Set the `JOIN_WAKER` bit. - /// + /// 设置 JOIN_WAKER 位 /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if /// the task has completed. pub(super) fn set_join_waker(&self) -> UpdateResult { @@ -199,7 +208,7 @@ impl State { } /// Unsets the `JOIN_WAKER` bit. - /// + /// 取消 JOIN_WAKER 位 /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if /// the task has completed. pub(super) fn unset_waker(&self) -> UpdateResult { @@ -236,6 +245,7 @@ impl State { } /// Returns `true` if the task should be released. + /// 返回 true 如果任务应该被释放 pub(crate) fn ref_dec(&self) -> bool { let prev = Snapshot(self.0.fetch_sub(REF_ONE, AcqRel)); debug_assert!(prev.ref_count() >= 1); @@ -309,6 +319,7 @@ impl Snapshot { } /// Returns `true` if the task's future has completed execution. + /// 返回 true 如果任务的 future 已经完成 pub(super) fn is_complete(self) -> bool { self.0 & COMPLETE == COMPLETE } diff --git a/monoio/src/task/utils.rs b/monoio/src/task/utils.rs index 1e74cdd..4a7f016 100644 --- a/monoio/src/task/utils.rs +++ b/monoio/src/task/utils.rs @@ -6,10 +6,11 @@ pub(crate) trait UnsafeCellExt<T> { } impl<T> UnsafeCellExt<T> for UnsafeCell<T> { + // 在闭包 f 内可以访问到 不可变 fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R { f(self.get()) } - + // 在闭包 f 内可以访问到 可变 fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R { f(self.get()) } diff --git a/monoio/src/task/waker.rs b/monoio/src/task/waker.rs index 59c968b..fc1b074 100644 --- a/monoio/src/task/waker.rs +++ b/monoio/src/task/waker.rs @@ -24,11 +24,14 @@ where // `Waker::will_wake` uses the VTABLE pointer as part of the check. This // means that `will_wake` will always return false when using the current // task's waker. (discussion at rust-lang/rust#66281). - // + // Waker::will_wake(检查传入 waker 是否与自身唤醒同一个 task) 使用 VTABLE 指针作为检查的一部分. + // 这意味着当使用当前任务的 waker 时, will_wake 总是返回 false. + // // To fix this, we use a single vtable. Since we pass in a reference at this // point and not an *owned* waker, we must ensure that `drop` is never // called on this waker instance. This is done by wrapping it with // `ManuallyDrop` and then never calling drop. + // 为了解决这个问题, 我们使用一个单独的 vtable. 传递的是一个引用, 而不是一个 waker 类型,间接解决 let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) }; WakerRef { @@ -36,7 +39,7 @@ where _p: PhantomData, } } - +// 一个类型的引用转换成另一个类型, 也就是说可以自动解引用 impl<S> ops::Deref for WakerRef<'_, S> { type Target = Waker; @@ -44,7 +47,7 @@ impl<S> ops::Deref for WakerRef<'_, S> { &self.waker } } - +// 复制 waker unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker where T: Future, @@ -55,7 +58,7 @@ where (*header).state.ref_inc(); raw_waker::<T, S>(header) } - +// 释放 waker unsafe fn drop_waker<T, S>(ptr: *const ()) where T: Future, @@ -65,7 +68,7 @@ where let harness = Harness::<T, S>::from_raw(ptr); harness.drop_reference(); } - +// 值上面 调用 wake unsafe fn wake_by_val<T, S>(ptr: *const ()) where T: Future, @@ -77,6 +80,7 @@ where } // Wake without consuming the waker +// 引用上面 调用 wake, 不消耗引用计数 unsafe fn wake_by_ref<T, S>(ptr: *const ()) where T: Future, @@ -87,12 +91,14 @@ where harness.wake_by_ref(); } +// 创建一个 RawWaker pub(super) fn raw_waker<T, S>(header: *const Header) -> RawWaker where T: Future, S: Schedule, { let ptr = header as *const (); + // RawWakerVTable 的 函数表 let vtable = &RawWakerVTable::new( clone_waker::<T, S>, wake_by_val::<T, S>, diff --git a/monoio/src/task/waker_fn.rs b/monoio/src/task/waker_fn.rs index bb59fad..9395ed9 100644 --- a/monoio/src/task/waker_fn.rs +++ b/monoio/src/task/waker_fn.rs @@ -2,16 +2,17 @@ use core::task::{RawWaker, RawWakerVTable, Waker}; use std::cell::Cell; /// Creates a waker that does nothing. -/// +/// 创建 一个什么都不做的 waker /// This `Waker` is useful for polling a `Future` to check whether it is /// `Ready`, without doing any additional work. -/// 占位 Waker +/// 这个 waker 用于轮询一个 future, 检查它是否 ready, 而不做任何额外的工作 pub(crate) fn dummy_waker() -> Waker { fn raw_waker() -> RawWaker { // the pointer is never dereferenced, so null is ok + // 指针永远不会被解引用, 所以 null 是可以的 RawWaker::new(std::ptr::null::<()>(), vtable()) } - + // 一个静态的 RawWakerVTable fn vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( |_| raw_waker(), @@ -36,7 +37,7 @@ pub(crate) fn should_poll() -> bool { SHOULD_POLL.replace(false) // 不轮询 } -#[inline] +#[inline] pub(crate) fn set_poll() { SHOULD_POLL.set(true); // 轮询 } diff --git a/monoio/src/utils/slab.rs b/monoio/src/utils/slab.rs index 5846113..057ff05 100644 --- a/monoio/src/utils/slab.rs +++ b/monoio/src/utils/slab.rs @@ -10,16 +10,19 @@ use std::{ #[derive(Default)] pub(crate) struct Slab<T> { // pages of continued memory + // 连续内存页 pages: [Option<Page<T>>; NUM_PAGES], // cached write page id + // 缓存写入页的 id w_page_id: usize, // current generation + // ?? generation: u32, } -const NUM_PAGES: usize = 26; -const PAGE_INITIAL_SIZE: usize = 64; -const COMPACT_INTERVAL: u32 = 2048; +const NUM_PAGES: usize = 26; // 页面数量 +const PAGE_INITIAL_SIZE: usize = 64; // 初始页面大小 +const COMPACT_INTERVAL: u32 = 2048; // 压缩间隔 impl<T> Slab<T> { /// Create a new slab. @@ -35,6 +38,7 @@ impl<T> Slab<T> { } /// Get slab len. + /// slab 长度 #[allow(unused)] pub(crate) fn len(&self) -> usize { self.pages.iter().fold(0, |acc, page| match page { |
