diff options
| -rw-r--r-- | monoio/src/blocking.rs | 28 | ||||
| -rw-r--r-- | monoio/src/driver/mod.rs | 7 | ||||
| -rw-r--r-- | monoio/src/net/tcp/listener.rs | 16 | ||||
| -rw-r--r-- | monoio/src/runtime.rs | 76 | ||||
| -rw-r--r-- | monoio/src/scheduler.rs | 30 | ||||
| -rw-r--r-- | monoio/src/task/core.rs | 5 | ||||
| -rw-r--r-- | monoio/src/task/harness.rs | 16 | ||||
| -rw-r--r-- | monoio/src/task/mod.rs | 45 | ||||
| -rw-r--r-- | monoio/src/task/raw.rs | 12 | ||||
| -rw-r--r-- | monoio/src/task/state.rs | 35 | ||||
| -rw-r--r-- | monoio/src/task/waker_fn.rs | 9 |
11 files changed, 201 insertions, 78 deletions
diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index 7d3469d..c8262c4 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -1,4 +1,5 @@ //! Blocking tasks related. +//! 与阻塞任务相关的模块。 use std::{future::Future, task::Poll}; @@ -11,21 +12,24 @@ use crate::{ /// Users may implement a ThreadPool and attach it to runtime. /// We also provide an implementation based on threadpool crate, you can use DefaultThreadPool. +/// 用户可以实现一个ThreadPool并将其附加到运行时 或者 我们也提供了默认实现 DefaultThreadPool。 +/// ThreadPool trait 定义了一个 schedule_task 方法,用于在 spawn_blocking 时调用。 pub trait ThreadPool { - /// Monoio runtime will call `schedule_task` on `spawn_blocking`. - /// ThreadPool impl must execute it now or later. fn schedule_task(&self, task: BlockingTask); } /// Error on waiting blocking task. +/// 等待阻塞任务时的错误。 #[derive(Debug, Clone, Copy)] pub enum JoinError { /// Task is canceled. + /// 任务被取消。 Canceled, } /// BlockingTask is contrusted by monoio, ThreadPool impl /// will execute it with `.run()`. +/// BlockingTask 由 monoio 构造,ThreadPool impl 将使用 `.run()` 执行它。 pub struct BlockingTask { task: Option<crate::task::Task<NoopScheduler>>, blocking_vtable: &'static BlockingTaskVtable, @@ -57,7 +61,7 @@ impl Drop for BlockingTask { } impl BlockingTask { - /// Run task. + /// 运行任务。 #[inline] pub fn run(mut self) { let task = self.task.take().unwrap(); @@ -76,11 +80,14 @@ impl BlockingTask { /// BlockingStrategy can be set if there is no ThreadPool attached. /// It controls how to handle `spawn_blocking` without thread pool. +/// BlockingStrategy 可以在没有 ThreadPool 附加时设置。它控制如何处理没有线程池的 `spawn_blocking`。 #[derive(Clone, Copy, Debug)] pub enum BlockingStrategy { /// Panic when `spawn_blocking`. + /// `spawn_blocking` 时 panic。 Panic, /// Execute with current thread when `spawn_blocking`. + /// `spawn_blocking` 时使用当前线程执行。 ExecuteLocal, } @@ -89,6 +96,10 @@ pub enum BlockingStrategy { /// Users can also set `BlockingStrategy` for a runtime when there is no thread pool. /// WARNING: DO NOT USE THIS FOR ASYNC TASK! Async tasks will not be executed but only built the /// future! +/// `spawn_blocking` 用于执行具有重计算或阻塞 io 的任务(没有 +/// async)。要使用它,用户可以初始化线程池并将其附加到创建的运行时。 +/// 当没有线程池时,用户还可以为运行时设置 `BlockingStrategy`。 +/// 警告:不要将其用于异步任务!异步任务将不会执行,而只会构建 future! pub fn spawn_blocking<F, R>(func: F) -> JoinHandle<Result<R, JoinError>> where F: FnOnce() -> R + Send + 'static, @@ -110,6 +121,12 @@ where // 2. set runtime blocking strategy to `BlockingStrategy::ExecuteLocal` // Note: solution 2 will execute blocking task on current thread and may block other // tasks This may cause other tasks high latency. + + // 对于用户:如果看到此 panic,则有两个选择: + // 1. 附加共享线程池以执行阻塞任务 + // 2. 将运行时阻塞策略设置为 `BlockingStrategy::ExecuteLocal` + // 注意:解决方案 2 将在当前线程上执行阻塞任务,可能会阻塞其他任务, + // 这可能会导致其他任务高延迟。 panic!("execute blocking task without thread pool attached") } } @@ -121,13 +138,15 @@ where /// DefaultThreadPool is a simple wrapped `threadpool::ThreadPool` that implement /// `monoio::blocking::ThreadPool`. You may use this implementation, or you can use your own thread /// pool implementation. +/// DefaultThreadPool 是一个简单的包装了 `threadpool::ThreadPool` 的实现,实现了 +/// `monoio::blocking::ThreadPool`。 您可以使用此实现,也可以使用自己的线程池实现。 #[derive(Clone)] pub struct DefaultThreadPool { pool: ThreadPoolImpl, } impl DefaultThreadPool { - /// Create a new DefaultThreadPool. + /// 创建一个新的 DefaultThreadPool。 pub fn new(num_threads: usize) -> Self { let pool = ThreadPoolBuilder::default() .num_threads(num_threads) @@ -168,7 +187,6 @@ impl From<BlockingStrategy> for BlockingHandle { struct BlockingFuture<F>(Option<F>); impl<T> Unpin for BlockingFuture<T> {} - impl<F, R> Future for BlockingFuture<F> where F: FnOnce() -> R + Send + 'static, diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index cf44330..b4048d4 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -66,22 +66,27 @@ impl unpark::Unpark for std::sync::Arc<dyn unpark::Unpark> { } } -/// Core driver trait. +/// Core driver trait. 驱动设备?? pub trait Driver { /// Run with driver TLS. fn with<R>(&self, f: impl FnOnce() -> R) -> R; /// Submit ops to kernel and process returned events. + /// 提交操作到内核并处理返回的事件 fn submit(&self) -> io::Result<()>; /// Wait infinitely and process returned events. + /// 无限等待并处理返回的事件 fn park(&self) -> io::Result<()>; /// Wait with timeout and process returned events. + /// 带超时等待并处理返回的事件 fn park_timeout(&self, duration: Duration) -> io::Result<()>; /// The struct to wake thread from another. + /// 用于唤醒另一个线程的结构体 #[cfg(feature = "sync")] type Unpark: unpark::Unpark; /// Get Unpark. + /// 获取 Unpark #[cfg(feature = "sync")] fn unpark(&self) -> Self::Unpark; } diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 9cc81d1..858bdd7 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -43,21 +43,21 @@ impl TcpListener { let addr = addr .to_socket_addrs()? .next() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; // 转换地址 - let domain = if addr.is_ipv6() { + let domain: socket2::Domain = if addr.is_ipv6() { socket2::Domain::IPV6 } else { socket2::Domain::IPV4 - }; + }; // 获取地址类型 let sys_listener = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; - #[cfg(all(unix, feature = "legacy"))] - Self::set_non_blocking(&sys_listener)?; + #[cfg(all(unix, feature = "legacy"))] // 兼容模式,使用 epoll 时 + Self::set_non_blocking(&sys_listener)?; // socket 为非阻塞 let addr = socket2::SockAddr::from(addr); - #[cfg(unix)] + #[cfg(unix)] // unix 平台 if opts.reuse_port { sys_listener.set_reuse_port(true)?; } @@ -70,7 +70,7 @@ impl TcpListener { if let Some(recv_buf_size) = opts.recv_buf_size { sys_listener.set_recv_buffer_size(recv_buf_size)?; } - if opts.tcp_fast_open { + if opts.tcp_fast_open { // tcp fast open #[cfg(any(target_os = "linux", target_os = "android"))] super::tfo::set_tcp_fastopen(&sys_listener, opts.backlog)?; #[cfg(any(target_os = "ios", target_os = "macos"))] @@ -85,7 +85,7 @@ impl TcpListener { } #[cfg(unix)] - let fd = SharedFd::new(sys_listener.into_raw_fd())?; + let fd = SharedFd::new(sys_listener.into_raw_fd())?; // 获取文件描述符 #[cfg(windows)] let fd = unimplemented!(); diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index bffd1a7..d85f838 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -1,11 +1,15 @@ +// 导入必要的模块 use std::future::Future; +// 根据配置导入必要的模块 #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; #[cfg(all(unix, feature = "legacy"))] use crate::LegacyDriver; + +// 从 crate 中导入必要的模块 use crate::{ driver::Driver, scheduler::{LocalScheduler, TaskQueue}, @@ -17,46 +21,57 @@ use crate::{ time::driver::Handle as TimeHandle, }; +// 为默认上下文声明一个线程本地变量 #[cfg(feature = "sync")] thread_local! { pub(crate) static DEFAULT_CTX: Context = Context { - thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID, - unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), - waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), - tasks: Default::default(), - time_handle: None, - blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic), + thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID, // 线程 ID + unpark_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // unpark 句柄缓存 + waker_sender_cache: std::cell::RefCell::new(fxhash::FxHashMap::default()), // waker 发送器缓存 + tasks: Default::default(), // 任务队列 + time_handle: None, // 时间句柄 + blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic), // 阻塞句柄 }; } +// 为当前上下文声明一个作用域线程本地变量 线程局部变量 静态变量 +// CURRENT.with(|ctx| {}); +// 闭包内可以访问当前线程的 Context scoped_thread_local!(pub(crate) static CURRENT: Context); +/// 上下文结构体,包含任务队列、线程 ID、unpark 句柄、waker 发送器缓存、时间句柄和阻塞句柄 pub(crate) struct Context { /// Owned task set and local run queue + /// 拥有的任务集和本地运行队列 pub(crate) tasks: TaskQueue, - /// Thread id(not the kernel thread id but a generated unique number) + /// 线程 ID(不是内核线程 ID,而是生成的唯一数字) pub(crate) thread_id: usize, /// Thread unpark handles + /// 线程 unpark 句柄 #[cfg(feature = "sync")] pub(crate) unpark_cache: std::cell::RefCell<fxhash::FxHashMap<usize, crate::driver::UnparkHandle>>, /// Waker sender cache + /// waker 发送器缓存 #[cfg(feature = "sync")] pub(crate) waker_sender_cache: std::cell::RefCell<fxhash::FxHashMap<usize, flume::Sender<std::task::Waker>>>, /// Time Handle + /// 时间句柄 pub(crate) time_handle: Option<TimeHandle>, /// Blocking Handle + /// 阻塞句柄 #[cfg(feature = "sync")] pub(crate) blocking_handle: crate::blocking::BlockingHandle, } impl Context { + /// 创建一个带有阻塞句柄的新上下文 #[cfg(feature = "sync")] pub(crate) fn new(blocking_handle: crate::blocking::BlockingHandle) -> Self { let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); @@ -71,6 +86,7 @@ impl Context { } } + /// 创建一个不带阻塞句柄的新上下文 #[cfg(not(feature = "sync"))] pub(crate) fn new() -> Self { let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); @@ -82,6 +98,7 @@ impl Context { } } + /// unpark 给定 ID 的线程 #[allow(unused)] #[cfg(feature = "sync")] pub(crate) fn unpark_thread(&self, id: usize) { @@ -93,12 +110,14 @@ impl Context { if let Some(v) = get_unpark_handle(id) { // Write back to local cache + // 写回到本地缓存 let w = v.clone(); self.unpark_cache.borrow_mut().insert(id, w); v.unpark(); } } + /// 发送 waker 到给定 ID 的线程 #[allow(unused)] #[cfg(feature = "sync")] pub(crate) fn send_waker(&self, id: usize, w: std::task::Waker) { @@ -110,6 +129,7 @@ impl Context { if let Some(s) = get_waker_sender(id) { // Write back to local cache + // 写回到本地缓存 let _ = s.send(w); self.waker_sender_cache.borrow_mut().insert(id, s); } @@ -117,29 +137,32 @@ impl Context { } /// Monoio runtime +/// Monoio 运行时结构体,包含上下文和驱动程序 pub struct Runtime<D> { pub(crate) context: Context, pub(crate) driver: D, } impl<D> Runtime<D> { + /// 使用给定的上下文和驱动程序创建一个新的运行时 pub(crate) fn new(context: Context, driver: D) -> Self { Self { context, driver } } - /// Block on + /// 阻塞给定的 future pub fn block_on<F>(&mut self, future: F) -> F::Output where F: Future, D: Driver, { + // 不能在运行时内部启动运行时 assert!( !CURRENT.is_set(), "Can not start a runtime inside a runtime" ); - let waker = dummy_waker(); - let cx = &mut std::task::Context::from_waker(&waker); + let waker = dummy_waker(); // 占位符 waker + let cx = &mut std::task::Context::from_waker(&waker); // 标准库上下文 self.driver.with(|| { CURRENT.set(&self.context, || { @@ -153,11 +176,14 @@ impl<D> Runtime<D> { loop { loop { // Consume all tasks(with max round to prevent io starvation) - let mut max_round = self.context.tasks.len() * 2; + // 消费所有 task (最大轮数防止 io 饥饿) + let mut max_round = self.context.tasks.len() * 2; // Maximum round | Force exit when reaching the maximum round while let Some(t) = self.context.tasks.pop() { - t.run(); + t.run(); // 执行任务 + // 避免无限循环 if max_round == 0 { // maybe there's a looping task + // 或许这里 有一个 死循环 task 将 max_round 消耗到 0 了 break; } else { max_round -= 1; @@ -175,16 +201,18 @@ impl<D> Runtime<D> { if self.context.tasks.is_empty() { // No task to execute, we should wait for io blockingly // Hot path + // 就绪队列为空 + // 经常执行部分 break; } - - // Cold path + // Cold path | 到这一步比较少 + // 待定 let _ = self.driver.submit(); } // Wait and Process CQ(the error is ignored for not debug mode) #[cfg(not(all(debug_assertions, feature = "debug")))] - let _ = self.driver.park(); + let _ = self.driver.park(); // 无限等待并处理返回的事件 ?? #[cfg(all(debug_assertions, feature = "debug"))] if let Err(e) = self.driver.park() { @@ -196,8 +224,8 @@ impl<D> Runtime<D> { } } -/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based -/// runtime. +/// Fusion Runtime is a wrapper of io_uring driver or legacy driver based runtime. +/// Fusion Runtime 是 io_uring driver 或者 legacy driver 的运行时的包装器 | 依照平台自动探测 #[cfg(all(unix, feature = "legacy"))] pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> { /// Uring driver based runtime. @@ -221,7 +249,7 @@ where L: Driver, R: Driver, { - /// Block on + /// Block on | 根据平台自动选择 pub fn block_on<F>(&mut self, future: F) -> F::Output where F: Future, @@ -352,11 +380,13 @@ impl From<Runtime<TimeDriver<IoUringDriver>>> for FusionRuntime<TimeDriver<IoUri } /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. +/// 生成新 async 任务, 返回 JoinHandle 实例 /// /// Spawning a task enables the task to execute concurrently to other tasks. /// There is no guarantee that a spawned task will execute to completion. When a /// runtime is shutdown, all outstanding tasks are dropped, regardless of the /// lifecycle of that task. +/// 生成任务允许任务并发执行,当一个任务关闭后,其他任务一同结束. /// /// /// [`JoinHandle`]: monoio::task::JoinHandle @@ -383,13 +413,13 @@ where T::Output: 'static, { let (task, join) = new_task( - crate::utils::thread_id::get_current_thread_id(), - future, - LocalScheduler, + crate::utils::thread_id::get_current_thread_id(), // 当前线程 id + future, // 任务 + LocalScheduler, // 调度器 ); CURRENT.with(|ctx| { - ctx.tasks.push(task); + ctx.tasks.push(task); // 就绪队列 push }); join } @@ -407,7 +437,7 @@ where ); CURRENT.with(|ctx| { - ctx.tasks.push(task); + ctx.tasks.push(task); // 就绪队列 push }); join } diff --git a/monoio/src/scheduler.rs b/monoio/src/scheduler.rs index 2d4791a..594ec77 100644 --- a/monoio/src/scheduler.rs +++ b/monoio/src/scheduler.rs @@ -1,64 +1,72 @@ +// 引入需要的库 use std::{cell::UnsafeCell, collections::VecDeque, marker::PhantomData}; use crate::task::{Schedule, Task}; +// 本地调度器 pub(crate) struct LocalScheduler; +// 两种调度策略 impl Schedule for LocalScheduler { + // 就绪队列 fn schedule(&self, task: Task<Self>) { crate::runtime::CURRENT.with(|cx| cx.tasks.push(task)); } - + // 就绪队列,立刻执行 fn yield_now(&self, task: Task<Self>) { crate::runtime::CURRENT.with(|cx| cx.tasks.push_front(task)); } } +// 任务队列 pub(crate) struct TaskQueue { - // Local queue. + // Local queue. 就绪队列 queue: UnsafeCell<VecDeque<Task<LocalScheduler>>>, // Make sure the type is `!Send` and `!Sync`. - _marker: PhantomData<*const ()>, + _marker: PhantomData<*const ()>, // 非跨线程 } impl Default for TaskQueue { + // 默认构造函数 fn default() -> Self { Self::new() } } impl TaskQueue { + // 创建就绪队列实例 pub(crate) fn new() -> Self { - const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; + const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; // 默认4096 Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) } + // 创建指定容量的就绪队列实例 pub(crate) fn new_with_capacity(capacity: usize) -> Self { Self { - queue: UnsafeCell::new(VecDeque::with_capacity(capacity)), - _marker: PhantomData, + queue: UnsafeCell::new(VecDeque::with_capacity(capacity)), // 指定容量 + _marker: PhantomData, // 内存对齐? } } - + // 获取就绪队列长度 pub(crate) fn len(&self) -> usize { unsafe { (*self.queue.get()).len() } } - + // 判断就绪队列是否为空 pub(crate) fn is_empty(&self) -> bool { self.len() == 0 } - + // 将任务加入就绪队列尾部 pub(crate) fn push(&self, runnable: Task<LocalScheduler>) { unsafe { (*self.queue.get()).push_back(runnable); } } - + // 将任务加入就绪队列头部, 约等于立刻执行 pub(crate) fn push_front(&self, runnable: Task<LocalScheduler>) { unsafe { (*self.queue.get()).push_front(runnable); } } - + // 从就绪队列头部弹出任务 pub(crate) fn pop(&self) -> Option<Task<LocalScheduler>> { unsafe { (*self.queue.get()).pop_front() } } diff --git a/monoio/src/task/core.rs b/monoio/src/task/core.rs index b43633f..e92af77 100644 --- a/monoio/src/task/core.rs +++ b/monoio/src/task/core.rs @@ -12,7 +12,7 @@ use super::{ Schedule, }; -#[repr(C)] +#[repr(C)] // 确保该结构的内存 与 C 语言兼容 pub(crate) struct Cell<T: Future, S> { pub(crate) header: Header, pub(crate) core: Core<T, S>, @@ -40,13 +40,16 @@ pub(crate) struct Header { /// State pub(crate) state: State, /// Table of function pointers for executing actions on the task. + /// 函数表 pub(crate) vtable: &'static Vtable, /// Thread ID(sync: used for wake task on its thread; sync disabled: do checking) + /// 线程ID pub(crate) owner_id: usize, } pub(crate) struct Trailer { /// Consumer task waiting on completion of this task. + /// 唤醒的 Waker pub(crate) waker: UnsafeCell<Option<Waker>>, } diff --git a/monoio/src/task/harness.rs b/monoio/src/task/harness.rs index 309ef6a..a8c19ff 100644 --- a/monoio/src/task/harness.rs +++ b/monoio/src/task/harness.rs @@ -55,6 +55,7 @@ where match self.poll_inner() { PollFuture::Notified => { // We should re-schedule the task. + // 重新执行任务 self.header().state.ref_inc(); self.core().scheduler.yield_now(self.get_new_task()); } @@ -65,24 +66,28 @@ where } } - /// Do polland return the status. + /// Do poll and return the status. + /// poll 并且返回状态 /// /// poll_inner does not take a ref-count. We must make sure the task is /// alive when call this method + /// poll_inner 不会增加引用计数,我们必须确保任务在调用这个方法的时候是活着的 fn poll_inner(&self) -> PollFuture { // notified -> running + // 状态转换 notified -> running | 就绪 到 运行 self.header().state.transition_to_running(); // poll the future let waker_ref = waker_ref::<T, S>(self.header()); let cx = Context::from_waker(&waker_ref); - let res = poll_future(&self.core().stage, cx); - + let res = poll_future(&self.core().stage, cx); // poll future + // if ready if res == Poll::Ready(()) { - return PollFuture::Complete; + return PollFuture::Complete; // return complete } - + // task 没有完成 use super::state::TransitionToIdle; + // task 状态到 挂起 match self.header().state.transition_to_idle() { TransitionToIdle::Ok => PollFuture::Done, TransitionToIdle::OkNotified => PollFuture::Notified, @@ -93,6 +98,7 @@ where trace!("MONOIO DEBUG[Harness]:: dealloc"); // Release the join waker, if there is one. + // 释放 join waker, 如果有的话 self.trailer().waker.with_mut(drop); // Check causality diff --git a/monoio/src/task/mod.rs b/monoio/src/task/mod.rs index 6133347..0479a3f 100644 --- a/monoio/src/task/mod.rs +++ b/monoio/src/task/mod.rs @@ -42,17 +42,17 @@ impl<S: 'static> Task<S> { fn header(&self) -> &Header { self.raw.header() } - + // 运行 (x.poll) pub(crate) fn run(self) { self.raw.poll(); } - #[cfg(feature = "sync")] + #[cfg(feature = "sync")] // 编译时带有 sync 才会编译这一段 pub(crate) unsafe fn finish(&mut self, val_slot: *mut ()) { self.raw.finish(val_slot); } } - +// 超出作用域 销毁的 特征 impl<S: 'static> Drop for Task<S> { fn drop(&mut self) { // Decrement the ref count @@ -63,8 +63,9 @@ impl<S: 'static> Drop for Task<S> { } } +// 调度器 特征 pub(crate) trait Schedule: Sized + 'static { - /// Schedule the task + /// Schedule the task | 调度任务 fn schedule(&self, task: Task<Self>); /// Schedule the task to run in the near future, yielding the thread to /// other tasks. @@ -73,6 +74,22 @@ pub(crate) trait Schedule: Sized + 'static { } } +/// 创建一个新的任务,并返回任务句柄和 JoinHandle。 +/// +/// # 参数 +/// +/// - `owner_id`:任务所有者的 ID。 +/// - `task`:要执行的任务。 +/// - `scheduler`:任务调度器。 +/// +/// # 类型参数 +/// +/// - `S`:任务调度器的类型。 +/// - `T`:要执行的任务的类型。 +/// +/// # 返回值 +/// +/// 返回一个元组,包含任务句柄和 JoinHandle。 pub(crate) fn new_task<T, S>( owner_id: usize, task: T, @@ -86,6 +103,26 @@ where unsafe { new_task_holding(owner_id, task, scheduler) } } +/// 创建一个新的任务,并返回任务句柄和 JoinHandle。 +/// +/// # 安全性 +/// +/// 这个函数是 unsafe 的,因为它使用了裸指针。 +/// +/// # 参数 +/// +/// - `owner_id`:任务所有者的 ID。 +/// - `task`:要执行的任务。 +/// - `scheduler`:任务调度器。 +/// +/// # 类型参数 +/// +/// - `S`:任务调度器的类型。 +/// - `T`:要执行的任务的类型。 +/// +/// # 返回值 +/// +/// 返回一个元组,包含任务句柄和 JoinHandle。 pub(crate) unsafe fn new_task_holding<T, S>( owner_id: usize, task: T, diff --git a/monoio/src/task/raw.rs b/monoio/src/task/raw.rs index ccdc8e2..60780d8 100644 --- a/monoio/src/task/raw.rs +++ b/monoio/src/task/raw.rs @@ -7,24 +7,27 @@ use std::{ use crate::task::{Cell, Harness, Header, Schedule}; pub(crate) struct RawTask { - ptr: NonNull<Header>, + ptr: NonNull<Header>, // 非空指向 Header 的裸指针 } impl Clone for RawTask { fn clone(&self) -> Self { - *self + *self // 实际上是个裸指针 } } impl Copy for RawTask {} +// 函数表? pub(crate) struct Vtable { /// Poll the future pub(crate) poll: unsafe fn(NonNull<Header>), /// Deallocate the memory + /// 释放内存 pub(crate) dealloc: unsafe fn(NonNull<Header>), /// Read the task output, if complete + /// 读取任务输出,如果完成 pub(crate) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker), /// The join handle has been dropped @@ -36,6 +39,7 @@ pub(crate) struct Vtable { } /// Get the vtable for the requested `T` and `S` generics. +/// 生成 pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable { &Vtable { poll: poll::<T, S>, @@ -69,7 +73,7 @@ impl RawTask { /// Safety: mutual exclusion is required to call this function. pub(crate) fn poll(self) { - let vtable = self.header().vtable; + let vtable: &Vtable = self.header().vtable; unsafe { (vtable.poll)(self.ptr) } } @@ -105,7 +109,7 @@ unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) { } unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) { - let harness = Harness::<T, S>::from_raw(ptr); + let harness: Harness<T, S> = Harness::<T, S>::from_raw(ptr); harness.dealloc(); } diff --git a/monoio/src/task/state.rs b/monoio/src/task/state.rs index 3bba8fb..0c94a7f 100644 --- a/monoio/src/task/state.rs +++ b/monoio/src/task/state.rs @@ -6,29 +6,35 @@ use std::{ }, }; +// AtomicUsize 避免使用锁 pub(crate) struct State(AtomicUsize); /// Current state value -#[derive(Copy, Clone)] +/// 真正 state 的值 +#[derive(Copy, Clone)] // 实现了 Copy 和 Clone pub(crate) struct Snapshot(usize); type UpdateResult = Result<Snapshot, Snapshot>; /// The task is currently being run. +/// 正在运行 const RUNNING: usize = 0b0001; /// The task is complete. /// /// Once this bit is set, it is never unset +/// 任务完成 const COMPLETE: usize = 0b0010; /// Extracts the task's lifecycle value from the state const LIFECYCLE_MASK: usize = 0b11; /// Flag tracking if the task has been pushed into a run queue. +/// 已在就绪队列 const NOTIFIED: usize = 0b100; /// The join handle is still around +/// 连接句柄依然存在 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 const JOIN_INTEREST: usize = 0b1_000; @@ -40,6 +46,7 @@ const JOIN_WAKER: usize = 0b10_000; const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER; /// Bits used by the ref count portion of the state. +/// const REF_COUNT_MASK: usize = !STATE_MASK; /// Number of positions to shift the ref count @@ -49,9 +56,9 @@ const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; const REF_ONE: usize = 1 << REF_COUNT_SHIFT; /// State a task is initialized with -/// +/// 任务状态初始化 /// A task is initialized with two references: -/// +/// 两个引用 /// * A reference for Task. /// * A reference for the JoinHandle. /// @@ -73,7 +80,7 @@ pub(super) enum TransitionToNotified { impl State { pub(crate) fn new() -> Self { - State(AtomicUsize::new(INITIAL_STATE)) + State(AtomicUsize::new(INITIAL_STATE)) } pub(crate) fn load(&self) -> Snapshot { @@ -86,30 +93,34 @@ impl State { /// Attempt to transition the lifecycle to `Running`. This sets the /// notified bit to false so notifications during the poll can be detected. + /// 生命周期转换为 运行, 设置 notified 为 false, 以便在 poll 期间检测到通知 pub(super) fn transition_to_running(&self) { - let mut snapshot = self.load(); - debug_assert!(snapshot.is_notified()); + let mut snapshot = self.load(); // 获取值 + debug_assert!(snapshot.is_notified()); debug_assert!(snapshot.is_idle()); - snapshot.set_running(); - snapshot.unset_notified(); + snapshot.set_running(); // 设置运行 + snapshot.unset_notified(); // 通知 = false self.store(snapshot); } /// Transitions the task from `Running` -> `Idle`. + /// 运行 -> 挂起 pub(super) fn transition_to_idle(&self) -> TransitionToIdle { let mut snapshot = self.load(); debug_assert!(snapshot.is_running()); - snapshot.unset_running(); + snapshot.unset_running(); // 运行 = false + // 如果 notified 位为 true, 则返回 OkNotified let action = if snapshot.is_notified() { - TransitionToIdle::OkNotified + TransitionToIdle::OkNotified // 已经通知 } else { - TransitionToIdle::Ok + TransitionToIdle::Ok // 未通知 }; self.store(snapshot); action } /// Transitions the task from `Running` -> `Complete`. + /// 运行 --> 完成 pub(super) fn transition_to_complete(&self) -> Snapshot { const DELTA: usize = RUNNING | COMPLETE; @@ -288,7 +299,7 @@ impl Snapshot { pub(super) fn is_running(self) -> bool { self.0 & RUNNING == RUNNING } - + // 设置 运行 fn set_running(&mut self) { self.0 |= RUNNING; } diff --git a/monoio/src/task/waker_fn.rs b/monoio/src/task/waker_fn.rs index 5d7f664..bb59fad 100644 --- a/monoio/src/task/waker_fn.rs +++ b/monoio/src/task/waker_fn.rs @@ -5,6 +5,7 @@ use std::cell::Cell; /// /// This `Waker` is useful for polling a `Future` to check whether it is /// `Ready`, without doing any additional work. +/// 占位 Waker pub(crate) fn dummy_waker() -> Waker { fn raw_waker() -> RawWaker { // the pointer is never dereferenced, so null is ok @@ -27,15 +28,15 @@ pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(raw_waker()) } } -#[thread_local] -static SHOULD_POLL: Cell<bool> = Cell::new(true); +#[thread_local] // 线程局部变量 bool +static SHOULD_POLL: Cell<bool> = Cell::new(true); // 是否轮询? #[inline] pub(crate) fn should_poll() -> bool { - SHOULD_POLL.replace(false) + SHOULD_POLL.replace(false) // 不轮询 } #[inline] pub(crate) fn set_poll() { - SHOULD_POLL.set(true); + SHOULD_POLL.set(true); // 轮询 } |
