diff options
Diffstat (limited to 'src/executor.rs')
| -rw-r--r-- | src/executor.rs | 242 |
1 files changed, 160 insertions, 82 deletions
diff --git a/src/executor.rs b/src/executor.rs index 42cd00f..51f58fa 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,24 +1,93 @@ -use std::{ - cell::RefCell, - collections::VecDeque, - marker::PhantomData, - mem, - rc::Rc, - task::{RawWaker, RawWakerVTable, Waker, Context}, pin::Pin, -}; - -use futures::{future::LocalBoxFuture, Future, FutureExt}; +// luwenpeng 2022/11/01 use crate::reactor::Reactor; +use futures::future::LocalBoxFuture; +use futures::Future; +use futures::FutureExt; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::RawWaker; +use std::task::RawWakerVTable; +use std::task::Waker; + +/****************************************************************************** + * thread_local 的静态变量 THREAD_LOCAL_EXECUTOR + ******************************************************************************/ + +scoped_tls::scoped_thread_local!(pub(crate) static THREAD_LOCAL_EXECUTOR: Executor); + +/****************************************************************************** + * Task + ******************************************************************************/ + +pub struct Task { + future: RefCell<LocalBoxFuture<'static, ()>>, +} + +impl Task { + fn enqueue(self: Rc<Self>) { + Self::enqueue_by_ref(&self) + } + + fn enqueue_by_ref(self: &Rc<Self>) { + THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(self.clone())); + } +} + +/****************************************************************************** + * TaskQueue + ******************************************************************************/ -scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor); +pub struct TaskQueue { + queue: RefCell<VecDeque<Rc<Task>>>, +} + +impl Default for TaskQueue { + fn default() -> Self { + Self::new() + } +} + +impl TaskQueue { + pub fn new() -> Self { + const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; + Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) + } + + pub fn new_with_capacity(capacity: usize) -> Self { + Self { + queue: RefCell::new(VecDeque::with_capacity(capacity)), + } + } + + pub(crate) fn push(&self, runnable: Rc<Task>) { + println!("[task_queue] push task"); + + self.queue.borrow_mut().push_back(runnable); + } + + pub(crate) fn pop(&self) -> Option<Rc<Task>> { + println!("[task_queue] pop task"); + + self.queue.borrow_mut().pop_front() + } +} + +/****************************************************************************** + * Executor + ******************************************************************************/ pub struct Executor { - local_queue: TaskQueue, - pub(crate) reactor: Rc<RefCell<Reactor>>, + local_queue: TaskQueue, // 任务队列 + pub(crate) reactor: Rc<RefCell<Reactor>>, // Reactor - /// Make sure the type is `!Send` and `!Sync`. - _marker: PhantomData<Rc<()>>, + // Make sure the type is `!Send` and `!Sync`. + marker: PhantomData<Rc<()>>, } impl Default for Executor { @@ -27,22 +96,22 @@ impl Default for Executor { } } - impl Executor { pub fn new() -> Self { Self { local_queue: TaskQueue::default(), reactor: Rc::new(RefCell::new(Reactor::default())), - - _marker: PhantomData, + marker: PhantomData, } } - pub fn spawn(fut: impl Future<Output = ()> + 'static) { - let t = Rc::new(Task { - future: RefCell::new(fut.boxed_local()), + pub fn spawn(future: impl Future<Output = ()> + 'static) { + println!("[executor] spawn, wrap future to task, and push task to queue"); + + let task = Rc::new(Task { + future: RefCell::new(future.boxed_local()), }); - EX.with(|ex| ex.local_queue.push(t)); + THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(task)); } pub fn block_on<F, T, O>(&self, f: F) -> O @@ -50,89 +119,87 @@ impl Executor { F: Fn() -> T, T: Future<Output = O> + 'static, { + println!("[executor] block_on"); + let _waker = waker_fn::waker_fn(|| {}); - let cx = &mut Context::from_waker(&_waker); + let ctx = &mut Context::from_waker(&_waker); + + THREAD_LOCAL_EXECUTOR.set(self, || { + // 此处的 future 为 async fn tcp_server() + let future = f(); + pin_utils::pin_mut!(future); - EX.set(self, || { - let fut = f(); - pin_utils::pin_mut!(fut); loop { + println!("[executor] -> loop"); + // return if the outer future is ready - if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { - break t; + if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) { + println!("[executor] future poll(1), return ready"); + break task; } + println!("[executor] consume all tasks"); + // consume all tasks - while let Some(t) = self.local_queue.pop() { - let future = t.future.borrow_mut(); - let w = waker(t.clone()); - let mut context = Context::from_waker(&w); + while let Some(task) = self.local_queue.pop() { + println!("[executor] pop task frome queue"); + + let future = task.future.borrow_mut(); + + let waker = wrap_task_to_waker(task.clone()); // 此处将 task 包装成 Waker + let mut context = Context::from_waker(&waker); // 此处将 Waker 包装成 Context + let _ = Pin::new(future).as_mut().poll(&mut context); + println!("[executor] future poll(2), return"); + + // 此处默认执行:[RawWaker] drop_waker() } // no task to execute now, it may ready - if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { - break t; + if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) { + println!("[executor] future poll(3), return ready"); + break task; } // block for io + println!("[executor] reactor->wait()"); self.reactor.borrow_mut().wait(); } }) } } -pub struct TaskQueue { - queue: RefCell<VecDeque<Rc<Task>>>, -} +/****************************************************************************** + * waker + ******************************************************************************/ -impl Default for TaskQueue { - fn default() -> Self { - Self::new() - } +/* +pub struct Context<'a> { + waker: &'a Waker, + _marker: PhantomData<fn(&'a ()) -> &'a ()>, } -impl TaskQueue { - pub fn new() -> Self { - const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; - Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) - } - pub fn new_with_capacity(capacity: usize) -> Self { - Self { - queue: RefCell::new(VecDeque::with_capacity(capacity)), - } - } - - pub(crate) fn push(&self, runnable: Rc<Task>) { - println!("add task"); - self.queue.borrow_mut().push_back(runnable); - } - - pub(crate) fn pop(&self) -> Option<Rc<Task>> { - println!("remove task"); - self.queue.borrow_mut().pop_front() - } +pub struct Waker { + waker: RawWaker, } -pub struct Task { - future: RefCell<LocalBoxFuture<'static, ()>>, +pub struct RawWaker { + data: *const (), + vtable: &'static RawWakerVTable, } +*/ + +fn wrap_task_to_waker(task: Rc<Task>) -> Waker { + println!("[executor] ->wrap_task_to_waker(), wrap task to Waker"); -fn waker(wake: Rc<Task>) -> Waker { - let ptr = Rc::into_raw(wake) as *const (); - let vtable = &Helper::VTABLE; + let ptr = Rc::into_raw(task) as *const (); + let vtable = &Helper::VTABLE; // VTABLE 为 const 定义的 RawWakerVTable unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } } -impl Task { - fn wake_(self: Rc<Self>) { - Self::wake_by_ref_(&self) - } - - fn wake_by_ref_(self: &Rc<Self>) { - EX.with(|ex| ex.local_queue.push(self.clone())); - } -} +/****************************************************************************** + * Helper + ******************************************************************************/ struct Helper; @@ -144,23 +211,34 @@ impl Helper { Self::drop_waker, ); - unsafe fn clone_waker(data: *const ()) -> RawWaker { - increase_refcount(data); + // 将 task 封装成 RawWaker + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + println!("[RawWaker] ->clone_waker(), wrap task to RawWaker"); + + increase_refcount(ptr); let vtable = &Self::VTABLE; - RawWaker::new(data, vtable) + RawWaker::new(ptr, vtable) } + // 将 task 添加到任务队列中 unsafe fn wake(ptr: *const ()) { - let rc = Rc::from_raw(ptr as *const Task); - rc.wake_(); + println!("[RawWaker] ->wake(), add task to queue"); + + let task = Rc::from_raw(ptr as *const Task); + task.enqueue(); } + // 将 task 添加到任务队列中 unsafe fn wake_by_ref(ptr: *const ()) { - let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); - rc.wake_by_ref_(); + println!("[RawWaker] wake_by_ref(), add task to queue"); + + let task = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); + task.enqueue_by_ref(); } unsafe fn drop_waker(ptr: *const ()) { + println!("[RawWaker] ->drop_waker(), drop RawWaker"); + drop(Rc::from_raw(ptr as *const Task)); } } |
