summaryrefslogtreecommitdiff
path: root/src/executor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/executor.rs')
-rw-r--r--src/executor.rs242
1 files changed, 160 insertions, 82 deletions
diff --git a/src/executor.rs b/src/executor.rs
index 42cd00f..51f58fa 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -1,24 +1,93 @@
-use std::{
- cell::RefCell,
- collections::VecDeque,
- marker::PhantomData,
- mem,
- rc::Rc,
- task::{RawWaker, RawWakerVTable, Waker, Context}, pin::Pin,
-};
-
-use futures::{future::LocalBoxFuture, Future, FutureExt};
+// luwenpeng 2022/11/01
use crate::reactor::Reactor;
+use futures::future::LocalBoxFuture;
+use futures::Future;
+use futures::FutureExt;
+use std::cell::RefCell;
+use std::collections::VecDeque;
+use std::marker::PhantomData;
+use std::mem;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::RawWaker;
+use std::task::RawWakerVTable;
+use std::task::Waker;
+
+/******************************************************************************
+ * thread_local 的静态变量 THREAD_LOCAL_EXECUTOR
+ ******************************************************************************/
+
+scoped_tls::scoped_thread_local!(pub(crate) static THREAD_LOCAL_EXECUTOR: Executor);
+
+/******************************************************************************
+ * Task
+ ******************************************************************************/
+
+pub struct Task {
+ future: RefCell<LocalBoxFuture<'static, ()>>,
+}
+
+impl Task {
+ fn enqueue(self: Rc<Self>) {
+ Self::enqueue_by_ref(&self)
+ }
+
+ fn enqueue_by_ref(self: &Rc<Self>) {
+ THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(self.clone()));
+ }
+}
+
+/******************************************************************************
+ * TaskQueue
+ ******************************************************************************/
-scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor);
+pub struct TaskQueue {
+ queue: RefCell<VecDeque<Rc<Task>>>,
+}
+
+impl Default for TaskQueue {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl TaskQueue {
+ pub fn new() -> Self {
+ const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
+ Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
+ }
+
+ pub fn new_with_capacity(capacity: usize) -> Self {
+ Self {
+ queue: RefCell::new(VecDeque::with_capacity(capacity)),
+ }
+ }
+
+ pub(crate) fn push(&self, runnable: Rc<Task>) {
+ println!("[task_queue] push task");
+
+ self.queue.borrow_mut().push_back(runnable);
+ }
+
+ pub(crate) fn pop(&self) -> Option<Rc<Task>> {
+ println!("[task_queue] pop task");
+
+ self.queue.borrow_mut().pop_front()
+ }
+}
+
+/******************************************************************************
+ * Executor
+ ******************************************************************************/
pub struct Executor {
- local_queue: TaskQueue,
- pub(crate) reactor: Rc<RefCell<Reactor>>,
+ local_queue: TaskQueue, // 任务队列
+ pub(crate) reactor: Rc<RefCell<Reactor>>, // Reactor
- /// Make sure the type is `!Send` and `!Sync`.
- _marker: PhantomData<Rc<()>>,
+ // Make sure the type is `!Send` and `!Sync`.
+ marker: PhantomData<Rc<()>>,
}
impl Default for Executor {
@@ -27,22 +96,22 @@ impl Default for Executor {
}
}
-
impl Executor {
pub fn new() -> Self {
Self {
local_queue: TaskQueue::default(),
reactor: Rc::new(RefCell::new(Reactor::default())),
-
- _marker: PhantomData,
+ marker: PhantomData,
}
}
- pub fn spawn(fut: impl Future<Output = ()> + 'static) {
- let t = Rc::new(Task {
- future: RefCell::new(fut.boxed_local()),
+ pub fn spawn(future: impl Future<Output = ()> + 'static) {
+ println!("[executor] spawn, wrap future to task, and push task to queue");
+
+ let task = Rc::new(Task {
+ future: RefCell::new(future.boxed_local()),
});
- EX.with(|ex| ex.local_queue.push(t));
+ THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(task));
}
pub fn block_on<F, T, O>(&self, f: F) -> O
@@ -50,89 +119,87 @@ impl Executor {
F: Fn() -> T,
T: Future<Output = O> + 'static,
{
+ println!("[executor] block_on");
+
let _waker = waker_fn::waker_fn(|| {});
- let cx = &mut Context::from_waker(&_waker);
+ let ctx = &mut Context::from_waker(&_waker);
+
+ THREAD_LOCAL_EXECUTOR.set(self, || {
+ // 此处的 future 为 async fn tcp_server()
+ let future = f();
+ pin_utils::pin_mut!(future);
- EX.set(self, || {
- let fut = f();
- pin_utils::pin_mut!(fut);
loop {
+ println!("[executor] -> loop");
+
// return if the outer future is ready
- if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
- break t;
+ if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) {
+ println!("[executor] future poll(1), return ready");
+ break task;
}
+ println!("[executor] consume all tasks");
+
// consume all tasks
- while let Some(t) = self.local_queue.pop() {
- let future = t.future.borrow_mut();
- let w = waker(t.clone());
- let mut context = Context::from_waker(&w);
+ while let Some(task) = self.local_queue.pop() {
+ println!("[executor] pop task frome queue");
+
+ let future = task.future.borrow_mut();
+
+ let waker = wrap_task_to_waker(task.clone()); // 此处将 task 包装成 Waker
+ let mut context = Context::from_waker(&waker); // 此处将 Waker 包装成 Context
+
let _ = Pin::new(future).as_mut().poll(&mut context);
+ println!("[executor] future poll(2), return");
+
+ // 此处默认执行:[RawWaker] drop_waker()
}
// no task to execute now, it may ready
- if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
- break t;
+ if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) {
+ println!("[executor] future poll(3), return ready");
+ break task;
}
// block for io
+ println!("[executor] reactor->wait()");
self.reactor.borrow_mut().wait();
}
})
}
}
-pub struct TaskQueue {
- queue: RefCell<VecDeque<Rc<Task>>>,
-}
+/******************************************************************************
+ * waker
+ ******************************************************************************/
-impl Default for TaskQueue {
- fn default() -> Self {
- Self::new()
- }
+/*
+pub struct Context<'a> {
+ waker: &'a Waker,
+ _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}
-impl TaskQueue {
- pub fn new() -> Self {
- const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
- Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
- }
- pub fn new_with_capacity(capacity: usize) -> Self {
- Self {
- queue: RefCell::new(VecDeque::with_capacity(capacity)),
- }
- }
-
- pub(crate) fn push(&self, runnable: Rc<Task>) {
- println!("add task");
- self.queue.borrow_mut().push_back(runnable);
- }
-
- pub(crate) fn pop(&self) -> Option<Rc<Task>> {
- println!("remove task");
- self.queue.borrow_mut().pop_front()
- }
+pub struct Waker {
+ waker: RawWaker,
}
-pub struct Task {
- future: RefCell<LocalBoxFuture<'static, ()>>,
+pub struct RawWaker {
+ data: *const (),
+ vtable: &'static RawWakerVTable,
}
+*/
+
+fn wrap_task_to_waker(task: Rc<Task>) -> Waker {
+ println!("[executor] ->wrap_task_to_waker(), wrap task to Waker");
-fn waker(wake: Rc<Task>) -> Waker {
- let ptr = Rc::into_raw(wake) as *const ();
- let vtable = &Helper::VTABLE;
+ let ptr = Rc::into_raw(task) as *const ();
+ let vtable = &Helper::VTABLE; // VTABLE 为 const 定义的 RawWakerVTable
unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) }
}
-impl Task {
- fn wake_(self: Rc<Self>) {
- Self::wake_by_ref_(&self)
- }
-
- fn wake_by_ref_(self: &Rc<Self>) {
- EX.with(|ex| ex.local_queue.push(self.clone()));
- }
-}
+/******************************************************************************
+ * Helper
+ ******************************************************************************/
struct Helper;
@@ -144,23 +211,34 @@ impl Helper {
Self::drop_waker,
);
- unsafe fn clone_waker(data: *const ()) -> RawWaker {
- increase_refcount(data);
+ // 将 task 封装成 RawWaker
+ unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+ println!("[RawWaker] ->clone_waker(), wrap task to RawWaker");
+
+ increase_refcount(ptr);
let vtable = &Self::VTABLE;
- RawWaker::new(data, vtable)
+ RawWaker::new(ptr, vtable)
}
+ // 将 task 添加到任务队列中
unsafe fn wake(ptr: *const ()) {
- let rc = Rc::from_raw(ptr as *const Task);
- rc.wake_();
+ println!("[RawWaker] ->wake(), add task to queue");
+
+ let task = Rc::from_raw(ptr as *const Task);
+ task.enqueue();
}
+ // 将 task 添加到任务队列中
unsafe fn wake_by_ref(ptr: *const ()) {
- let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
- rc.wake_by_ref_();
+ println!("[RawWaker] wake_by_ref(), add task to queue");
+
+ let task = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
+ task.enqueue_by_ref();
}
unsafe fn drop_waker(ptr: *const ()) {
+ println!("[RawWaker] ->drop_waker(), drop RawWaker");
+
drop(Rc::from_raw(ptr as *const Task));
}
}