use std::{ cell::RefCell, collections::VecDeque, marker::PhantomData, mem, pin::Pin, rc::Rc, task::{Context, RawWaker, RawWakerVTable, Waker}, }; use futures::{future::LocalBoxFuture, Future, FutureExt}; use crate::reactor::Reactor; // scoped_thread_local 宏创建的 本地线程独享的变量 // EX Executor 类型实例 scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor); pub struct Executor { local_queue: TaskQueue, // 任务队列 // pub(crate) 作用域 // Rc 和 RefCell: 多个所有者 且 可变 pub(crate) reactor: Rc>, /// Make sure the type is `!Send` and `!Sync`. /// 将 Executor 当作 Rc<()>, Rc 非线程安全,这样编译器就不会自动实现 Send 和 Sync /// Send 和 Sync 跨线程才有, thread-per-core 不需要 _marker: PhantomData>, } impl Default for Executor { fn default() -> Self { // 构造方法 Self::new() } } impl Executor { pub fn new() -> Self { // 构造方法 Self { local_queue: TaskQueue::default(), reactor: Rc::new(RefCell::new(Reactor::default())), _marker: PhantomData, // 自动实现 Pin } } // 创建新 task pub fn spawn(fut: impl Future + 'static) { let t: Rc = Rc::new(Task { future: RefCell::new(fut.boxed_local()), }); // 打包到 Rc 中 EX.with(|ex| ex.local_queue.push(t)); // 添加到任务队列 } pub fn block_on(&self, f: F) -> O // 真正执行开始的地方 where F: Fn() -> T, // 闭包 T: Future + 'static, // task ? { let _waker: Waker = waker_fn::waker_fn(|| {}); // 空的 Waker let cx = &mut Context::from_waker(&_waker); // 从 _waker 创建上下文 EX.set(self, || { // 闭包 let fut = f(); // task 本身 pin_utils::pin_mut!(fut); // pin_mut! 宏,将 fut 变成 Pin<&mut F> 固定内存地址 loop { // 任务队列循环 // return if the outer future is ready // 第一次 poll if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { break t; } // consume all tasks | 在 任务队列中的都是待处理任务 while let Some(t) = self.local_queue.pop() { let future = t.future.borrow_mut(); let w = waker(t.clone()); // Waker 实例 let mut context: Context<'_> = Context::from_waker(&w); // Context 实例 let _ = Pin::new(future).as_mut().poll(&mut context); // poll } // no task to execute now, it may ready | 没有任务可执行, 再试试 fut ? if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { break t; } // block for io self.reactor.borrow_mut().wait(); } }) } } // 任务队列 pub struct TaskQueue { queue: RefCell>>, // 直接的 VecDeque } impl Default for TaskQueue { fn default() -> Self { Self::new() } } impl TaskQueue { pub fn new() -> Self { // 默认容量 4096 const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) } pub fn new_with_capacity(capacity: usize) -> Self { // 指定容量 Self { queue: RefCell::new(VecDeque::with_capacity(capacity)), } } pub(crate) fn push(&self, runnable: Rc) { // 添加任务 println!("add task"); self.queue.borrow_mut().push_back(runnable); } pub(crate) fn pop(&self) -> Option> { // 移除任务 println!("remove task"); self.queue.borrow_mut().pop_front() } } // 任务 / 协程 本身 pub struct Task { // 所有权的缘故 用了 RefCell,不可变引用情况下 // 不知道具体的类型,只是实现了 future trait,得用 Box 包装 -> LocalBoxFuture // 整个的生命周期干脆 static 反正协程活的比其他长 future: RefCell>, } fn waker(wake: Rc) -> Waker { let ptr = Rc::into_raw(wake) as *const (); let vtable = &Helper::VTABLE; unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } } impl Task { fn wake_(self: Rc) { // 唤醒 Self::wake_by_ref_(&self) } fn wake_by_ref_(self: &Rc) { // 唤醒 EX.with(|ex: &Executor| ex.local_queue.push(self.clone())); // 添加到任务队列 } } struct Helper; impl Helper { // RawWakerVTable 原始唤醒器的虚函数表 const VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, Self::wake, Self::wake_by_ref, Self::drop_waker, ); // unsafe 要非常小心 unsafe fn clone_waker(data: *const ()) -> RawWaker { // 克隆 waker 指针 increase_refcount(data); // 增加计数,虽然下面只是增加了一个引用 let vtable = &Self::VTABLE; // VTABLE 的引用 RawWaker::new(data, vtable) // 原始唤醒器 的 实例 } unsafe fn wake(ptr: *const ()) { let rc = Rc::from_raw(ptr as *const Task); rc.wake_(); } unsafe fn wake_by_ref(ptr: *const ()) { let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); rc.wake_by_ref_(); } unsafe fn drop_waker(ptr: *const ()) { drop(Rc::from_raw(ptr as *const Task)); } } #[allow(clippy::redundant_clone)] // The clone here isn't actually redundant. unsafe fn increase_refcount(data: *const ()) { // Retain Rc, but don't touch refcount by wrapping in ManuallyDrop let rc = mem::ManuallyDrop::new(Rc::::from_raw(data as *const Task)); // Now increase refcount, but don't drop new refcount either let _rc_clone: mem::ManuallyDrop<_> = rc.clone(); }