summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzy <[email protected]>2023-10-26 03:21:55 +0000
committerzy <[email protected]>2023-10-26 03:21:55 +0000
commit9ca7caa4bf91f87088d0e4093b267c6a5d818b1b (patch)
tree9d9e814b6313041671301f541a375d6f45a0c452
parentd7fbe33a93e5e744ed325c8442b7972996e3a44f (diff)
runtime
-rw-r--r--src/runtime/executor.rs258
-rw-r--r--src/runtime/mod.rs1
2 files changed, 259 insertions, 0 deletions
diff --git a/src/runtime/executor.rs b/src/runtime/executor.rs
new file mode 100644
index 0000000..2aa7e0c
--- /dev/null
+++ b/src/runtime/executor.rs
@@ -0,0 +1,258 @@
+use crate::{
+ event::manager::{EventManager, EventQueue},
+ session::manager::SessionManager,
+};
+use futures::{future::LocalBoxFuture, Future, FutureExt};
+use std::{
+ cell::RefCell,
+ collections::VecDeque,
+ marker::PhantomData,
+ mem::{self, ManuallyDrop},
+ ops,
+ pin::Pin,
+ ptr::NonNull,
+ rc::Rc,
+ task::{Context, RawWaker, RawWakerVTable, Waker},
+};
+
+// scoped_thread_local 宏创建的 本地线程独享的变量
+// EX Executor 类型实例
+scoped_tls::scoped_thread_local!(pub static EX: Executor);
+
+pub struct Executor {
+ task_queue: TaskQueue, // 任务队列
+
+ // pub(crate) 作用域
+ // Rc 和 RefCell: 多个所有者 且 可变
+ pub event_manager: Rc<RefCell<EventManager>>,
+ pub session_manager: Rc<RefCell<SessionManager>>,
+
+ /// Make sure the type is `!Send` and `!Sync`.
+ /// 将 Executor 当作 Rc<()>, Rc 非线程安全,这样编译器就不会自动实现 Send 和 Sync
+ /// Send 和 Sync 跨线程才有, thread-per-core 不需要
+ _marker: PhantomData<Rc<()>>,
+}
+
+impl Default for Executor {
+ fn default() -> Self {
+ // 构造方法
+ Self::new()
+ }
+}
+
+impl Executor {
+ pub fn new() -> Self {
+ // 构造方法
+ Self {
+ task_queue: TaskQueue::default(),
+ event_manager: Rc::new(RefCell::new(EventManager::new())),
+ session_manager: Rc::new(RefCell::new(SessionManager::new(4096))),
+ _marker: PhantomData, // 自动实现 Pin
+ }
+ }
+ // 创建新 task
+ pub fn spawn(fut: impl Future<Output = ()> + 'static) {
+ let t: Rc<Task> = Rc::new(Task {
+ future: RefCell::new(fut.boxed_local()),
+ }); // 打包到 Rc<Task> 中
+ EX.with(|ex| ex.task_queue.push(t)); // 添加到任务队列
+ }
+
+ pub fn block_on<F, T, O>(&self, f: F) -> O
+ // 真正执行开始的地方
+ where
+ F: Fn() -> T, // 闭包
+ T: Future<Output = O> + 'static, // task ?
+ {
+ let _waker: Waker = waker_fn::waker_fn(|| {}); // 空的 Waker
+ let cx = &mut Context::from_waker(&_waker); // 从 _waker 创建上下文
+
+ EX.set(self, || {
+ // 闭包
+ let fut = f(); // task 本身
+ pin_utils::pin_mut!(fut); // pin_mut! 宏,将 fut 变成 Pin<&mut F> 固定内存地址
+ loop {
+ // 任务队列循环
+ // return if the outer future is ready
+ // 第一次 poll
+ if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
+ break t;
+ }
+
+ // consume all tasks | 在 任务队列中的都是待处理任务
+ while let Some(t) = self.task_queue.pop() {
+ let future = t.future.borrow_mut();
+
+ let waker = event_v_waker::<T>(&t); // Waker 实例
+ let mut context: Context<'_> = Context::from_waker(&waker); // Context 实例
+ let _ = Pin::new(future).as_mut().poll(&mut context); // poll
+ }
+
+ // custom all ready events
+ self.event_manager.borrow_mut().dispatch(); // 事件分发
+ // if task queue is still empty blocking on;
+
+ // if self.task_queue.is_empty() {
+ // // 任务队列为空
+ // // println!("task queue is empty");
+ // // std::thread::sleep(std::time::Duration::from_millis(1000));
+ // // no task to execute now, it may ready | 没有任务可执行, 再试试 fut ?
+ // if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
+ // break t;
+ // }
+ // }
+ }
+ })
+ }
+}
+
+// 任务队列
+pub struct TaskQueue {
+ queue: RefCell<VecDeque<Rc<Task>>>, // 直接的 VecDeque
+}
+
+impl Default for TaskQueue {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl TaskQueue {
+ pub fn new() -> Self {
+ // 默认容量 4096
+ const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
+ Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
+ }
+ pub fn new_with_capacity(capacity: usize) -> Self {
+ // 指定容量
+ Self {
+ queue: RefCell::new(VecDeque::with_capacity(capacity)),
+ }
+ }
+
+ pub(crate) fn push(&self, runnable: Rc<Task>) {
+ // 添加任务
+ println!("add task");
+ self.queue.borrow_mut().push_back(runnable);
+ }
+
+ pub(crate) fn pop(&self) -> Option<Rc<Task>> {
+ // 移除任务
+ // println!("remove task");
+ self.queue.borrow_mut().pop_front()
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ return self.queue.borrow_mut().is_empty();
+ }
+}
+
+// 任务 / 协程 本身
+pub struct Task {
+ // 所有权的缘故 用了 RefCell,不可变引用情况下
+ // 不知道具体的类型,只是实现了 future trait,得用 Box 包装 -> LocalBoxFuture
+ // 整个的生命周期干脆 static 反正协程活的比其他长
+ future: RefCell<LocalBoxFuture<'static, ()>>,
+}
+impl Task {
+ fn wake_(self: Rc<Self>) {
+ // 唤醒
+ Self::wake_by_ref_(&self)
+ }
+
+ fn wake_by_ref_(self: &Rc<Self>) {
+ // 唤醒
+ EX.with(|ex: &Executor| ex.task_queue.push(self.clone())); // 添加到任务队列
+ }
+}
+
+pub(crate) struct WakerRef<'a> {
+ waker: ManuallyDrop<Waker>,
+ _p: PhantomData<&'a Task>,
+}
+
+/// Returns a `WakerRef` which avoids having to pre-emptively increase the
+/// refcount if there is no need to do so.
+pub(super) fn event_v_waker<T>(task: &Task) -> WakerRef<'_>
+where
+ T: Future,
+{
+ // (discussion at rust-lang/rust#66281).
+ // Waker::will_wake(检查传入 waker 是否与自身唤醒同一个 task) 使用 VTABLE 指针作为检查的一部分.
+ // 这意味着当使用当前任务的 waker 时, will_wake 总是返回 false.
+ // 为了解决这个问题, 我们使用一个单独的 vtable. 传递的是一个引用, 而不是一个 waker 类型,间接解决
+ let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T>(task))) };
+
+ WakerRef {
+ waker,
+ _p: PhantomData,
+ }
+}
+
+// EventVWaker - Waker 自动解引用
+impl ops::Deref for WakerRef<'_> {
+ type Target = Waker;
+
+ fn deref(&self) -> &Waker {
+ &self.waker
+ }
+}
+
+// 创建一个 RawWaker
+pub(super) fn raw_waker<T>(task: *const Task) -> RawWaker
+where
+ T: Future,
+{
+ let ptr = task as *const ();
+ // RawWakerVTable 的 函数表
+ let vtable = &RawWakerVTable::new(
+ clone_waker::<T>,
+ wake_by_val::<T>,
+ wake_by_ref::<T>,
+ drop_waker::<T>,
+ );
+ RawWaker::new(ptr, vtable)
+}
+
+// 复制 waker
+unsafe fn clone_waker<T>(ptr: *const ()) -> RawWaker
+where
+ T: Future,
+{
+ increase_refcount(ptr); // 增加计数,虽然下面只是增加了一个引用
+ let task = ptr as *const Task;
+ raw_waker::<T>(task)
+}
+// 释放 waker
+unsafe fn drop_waker<T>(ptr: *const ())
+where
+ T: Future,
+{
+ drop(Rc::from_raw(ptr as *const Task));
+}
+// 值上面 调用 wake
+unsafe fn wake_by_val<T>(ptr: *const ())
+where
+ T: Future,
+{
+ let rc = Rc::from_raw(ptr as *const Task);
+ rc.wake_();
+}
+
+// Wake without consuming the waker
+// 引用上面 调用 wake, 不消耗引用计数
+unsafe fn wake_by_ref<T>(ptr: *const ())
+where
+ T: Future,
+{
+ let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
+ rc.wake_by_ref_();
+}
+
+#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant.
+unsafe fn increase_refcount(ptr: *const ()) {
+ // Retain Rc, but don't touch refcount by wrapping in ManuallyDrop
+ let rc = mem::ManuallyDrop::new(Rc::<Task>::from_raw(ptr as *const Task));
+ // Now increase refcount, but don't drop new refcount either
+ let _rc_clone: mem::ManuallyDrop<_> = rc.clone();
+}
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
new file mode 100644
index 0000000..063039e
--- /dev/null
+++ b/src/runtime/mod.rs
@@ -0,0 +1 @@
+pub mod executor; \ No newline at end of file