diff options
| author | zy <[email protected]> | 2023-10-26 03:21:55 +0000 |
|---|---|---|
| committer | zy <[email protected]> | 2023-10-26 03:21:55 +0000 |
| commit | 9ca7caa4bf91f87088d0e4093b267c6a5d818b1b (patch) | |
| tree | 9d9e814b6313041671301f541a375d6f45a0c452 | |
| parent | d7fbe33a93e5e744ed325c8442b7972996e3a44f (diff) | |
runtime
| -rw-r--r-- | src/runtime/executor.rs | 258 | ||||
| -rw-r--r-- | src/runtime/mod.rs | 1 |
2 files changed, 259 insertions, 0 deletions
diff --git a/src/runtime/executor.rs b/src/runtime/executor.rs new file mode 100644 index 0000000..2aa7e0c --- /dev/null +++ b/src/runtime/executor.rs @@ -0,0 +1,258 @@ +use crate::{ + event::manager::{EventManager, EventQueue}, + session::manager::SessionManager, +}; +use futures::{future::LocalBoxFuture, Future, FutureExt}; +use std::{ + cell::RefCell, + collections::VecDeque, + marker::PhantomData, + mem::{self, ManuallyDrop}, + ops, + pin::Pin, + ptr::NonNull, + rc::Rc, + task::{Context, RawWaker, RawWakerVTable, Waker}, +}; + +// scoped_thread_local 宏创建的 本地线程独享的变量 +// EX Executor 类型实例 +scoped_tls::scoped_thread_local!(pub static EX: Executor); + +pub struct Executor { + task_queue: TaskQueue, // 任务队列 + + // pub(crate) 作用域 + // Rc 和 RefCell: 多个所有者 且 可变 + pub event_manager: Rc<RefCell<EventManager>>, + pub session_manager: Rc<RefCell<SessionManager>>, + + /// Make sure the type is `!Send` and `!Sync`. + /// 将 Executor 当作 Rc<()>, Rc 非线程安全,这样编译器就不会自动实现 Send 和 Sync + /// Send 和 Sync 跨线程才有, thread-per-core 不需要 + _marker: PhantomData<Rc<()>>, +} + +impl Default for Executor { + fn default() -> Self { + // 构造方法 + Self::new() + } +} + +impl Executor { + pub fn new() -> Self { + // 构造方法 + Self { + task_queue: TaskQueue::default(), + event_manager: Rc::new(RefCell::new(EventManager::new())), + session_manager: Rc::new(RefCell::new(SessionManager::new(4096))), + _marker: PhantomData, // 自动实现 Pin + } + } + // 创建新 task + pub fn spawn(fut: impl Future<Output = ()> + 'static) { + let t: Rc<Task> = Rc::new(Task { + future: RefCell::new(fut.boxed_local()), + }); // 打包到 Rc<Task> 中 + EX.with(|ex| ex.task_queue.push(t)); // 添加到任务队列 + } + + pub fn block_on<F, T, O>(&self, f: F) -> O + // 真正执行开始的地方 + where + F: Fn() -> T, // 闭包 + T: Future<Output = O> + 'static, // task ? + { + let _waker: Waker = waker_fn::waker_fn(|| {}); // 空的 Waker + let cx = &mut Context::from_waker(&_waker); // 从 _waker 创建上下文 + + EX.set(self, || { + // 闭包 + let fut = f(); // task 本身 + pin_utils::pin_mut!(fut); // pin_mut! 宏,将 fut 变成 Pin<&mut F> 固定内存地址 + loop { + // 任务队列循环 + // return if the outer future is ready + // 第一次 poll + if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { + break t; + } + + // consume all tasks | 在 任务队列中的都是待处理任务 + while let Some(t) = self.task_queue.pop() { + let future = t.future.borrow_mut(); + + let waker = event_v_waker::<T>(&t); // Waker 实例 + let mut context: Context<'_> = Context::from_waker(&waker); // Context 实例 + let _ = Pin::new(future).as_mut().poll(&mut context); // poll + } + + // custom all ready events + self.event_manager.borrow_mut().dispatch(); // 事件分发 + // if task queue is still empty blocking on; + + // if self.task_queue.is_empty() { + // // 任务队列为空 + // // println!("task queue is empty"); + // // std::thread::sleep(std::time::Duration::from_millis(1000)); + // // no task to execute now, it may ready | 没有任务可执行, 再试试 fut ? + // if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { + // break t; + // } + // } + } + }) + } +} + +// 任务队列 +pub struct TaskQueue { + queue: RefCell<VecDeque<Rc<Task>>>, // 直接的 VecDeque +} + +impl Default for TaskQueue { + fn default() -> Self { + Self::new() + } +} + +impl TaskQueue { + pub fn new() -> Self { + // 默认容量 4096 + const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; + Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) + } + pub fn new_with_capacity(capacity: usize) -> Self { + // 指定容量 + Self { + queue: RefCell::new(VecDeque::with_capacity(capacity)), + } + } + + pub(crate) fn push(&self, runnable: Rc<Task>) { + // 添加任务 + println!("add task"); + self.queue.borrow_mut().push_back(runnable); + } + + pub(crate) fn pop(&self) -> Option<Rc<Task>> { + // 移除任务 + // println!("remove task"); + self.queue.borrow_mut().pop_front() + } + + pub(crate) fn is_empty(&self) -> bool { + return self.queue.borrow_mut().is_empty(); + } +} + +// 任务 / 协程 本身 +pub struct Task { + // 所有权的缘故 用了 RefCell,不可变引用情况下 + // 不知道具体的类型,只是实现了 future trait,得用 Box 包装 -> LocalBoxFuture + // 整个的生命周期干脆 static 反正协程活的比其他长 + future: RefCell<LocalBoxFuture<'static, ()>>, +} +impl Task { + fn wake_(self: Rc<Self>) { + // 唤醒 + Self::wake_by_ref_(&self) + } + + fn wake_by_ref_(self: &Rc<Self>) { + // 唤醒 + EX.with(|ex: &Executor| ex.task_queue.push(self.clone())); // 添加到任务队列 + } +} + +pub(crate) struct WakerRef<'a> { + waker: ManuallyDrop<Waker>, + _p: PhantomData<&'a Task>, +} + +/// Returns a `WakerRef` which avoids having to pre-emptively increase the +/// refcount if there is no need to do so. +pub(super) fn event_v_waker<T>(task: &Task) -> WakerRef<'_> +where + T: Future, +{ + // (discussion at rust-lang/rust#66281). + // Waker::will_wake(检查传入 waker 是否与自身唤醒同一个 task) 使用 VTABLE 指针作为检查的一部分. + // 这意味着当使用当前任务的 waker 时, will_wake 总是返回 false. + // 为了解决这个问题, 我们使用一个单独的 vtable. 传递的是一个引用, 而不是一个 waker 类型,间接解决 + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T>(task))) }; + + WakerRef { + waker, + _p: PhantomData, + } +} + +// EventVWaker - Waker 自动解引用 +impl ops::Deref for WakerRef<'_> { + type Target = Waker; + + fn deref(&self) -> &Waker { + &self.waker + } +} + +// 创建一个 RawWaker +pub(super) fn raw_waker<T>(task: *const Task) -> RawWaker +where + T: Future, +{ + let ptr = task as *const (); + // RawWakerVTable 的 函数表 + let vtable = &RawWakerVTable::new( + clone_waker::<T>, + wake_by_val::<T>, + wake_by_ref::<T>, + drop_waker::<T>, + ); + RawWaker::new(ptr, vtable) +} + +// 复制 waker +unsafe fn clone_waker<T>(ptr: *const ()) -> RawWaker +where + T: Future, +{ + increase_refcount(ptr); // 增加计数,虽然下面只是增加了一个引用 + let task = ptr as *const Task; + raw_waker::<T>(task) +} +// 释放 waker +unsafe fn drop_waker<T>(ptr: *const ()) +where + T: Future, +{ + drop(Rc::from_raw(ptr as *const Task)); +} +// 值上面 调用 wake +unsafe fn wake_by_val<T>(ptr: *const ()) +where + T: Future, +{ + let rc = Rc::from_raw(ptr as *const Task); + rc.wake_(); +} + +// Wake without consuming the waker +// 引用上面 调用 wake, 不消耗引用计数 +unsafe fn wake_by_ref<T>(ptr: *const ()) +where + T: Future, +{ + let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); + rc.wake_by_ref_(); +} + +#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant. +unsafe fn increase_refcount(ptr: *const ()) { + // Retain Rc, but don't touch refcount by wrapping in ManuallyDrop + let rc = mem::ManuallyDrop::new(Rc::<Task>::from_raw(ptr as *const Task)); + // Now increase refcount, but don't drop new refcount either + let _rc_clone: mem::ManuallyDrop<_> = rc.clone(); +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs new file mode 100644 index 0000000..063039e --- /dev/null +++ b/src/runtime/mod.rs @@ -0,0 +1 @@ +pub mod executor;
\ No newline at end of file |
