use std::{ cell::RefCell, collections::VecDeque, marker::PhantomData, mem, rc::Rc, task::{RawWaker, RawWakerVTable, Waker, Context}, pin::Pin, }; use futures::{future::LocalBoxFuture, Future, FutureExt}; use crate::reactor::Reactor; scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor); pub struct Executor { local_queue: TaskQueue, pub(crate) reactor: Rc>, /// Make sure the type is `!Send` and `!Sync`. _marker: PhantomData>, } impl Default for Executor { fn default() -> Self { Self::new() } } impl Executor { pub fn new() -> Self { Self { local_queue: TaskQueue::default(), reactor: Rc::new(RefCell::new(Reactor::default())), _marker: PhantomData, } } pub fn spawn(fut: impl Future + 'static) { let t = Rc::new(Task { future: RefCell::new(fut.boxed_local()), }); EX.with(|ex| ex.local_queue.push(t)); } pub fn block_on(&self, f: F) -> O where F: Fn() -> T, T: Future + 'static, { let _waker = waker_fn::waker_fn(|| {}); let cx = &mut Context::from_waker(&_waker); EX.set(self, || { let fut = f(); pin_utils::pin_mut!(fut); loop { // return if the outer future is ready if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { break t; } // consume all tasks while let Some(t) = self.local_queue.pop() { let future = t.future.borrow_mut(); let w = waker(t.clone()); let mut context = Context::from_waker(&w); let _ = Pin::new(future).as_mut().poll(&mut context); } // no task to execute now, it may ready if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { break t; } // block for io self.reactor.borrow_mut().wait(); } }) } } pub struct TaskQueue { queue: RefCell>>, } impl Default for TaskQueue { fn default() -> Self { Self::new() } } impl TaskQueue { pub fn new() -> Self { const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) } pub fn new_with_capacity(capacity: usize) -> Self { Self { queue: RefCell::new(VecDeque::with_capacity(capacity)), } } pub(crate) fn push(&self, runnable: Rc) { println!("add task"); self.queue.borrow_mut().push_back(runnable); } pub(crate) fn pop(&self) -> Option> { println!("remove task"); self.queue.borrow_mut().pop_front() } } pub struct Task { future: RefCell>, } fn waker(wake: Rc) -> Waker { let ptr = Rc::into_raw(wake) as *const (); let vtable = &Helper::VTABLE; unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } } impl Task { fn wake_(self: Rc) { Self::wake_by_ref_(&self) } fn wake_by_ref_(self: &Rc) { EX.with(|ex| ex.local_queue.push(self.clone())); } } struct Helper; impl Helper { const VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, Self::wake, Self::wake_by_ref, Self::drop_waker, ); unsafe fn clone_waker(data: *const ()) -> RawWaker { increase_refcount(data); let vtable = &Self::VTABLE; RawWaker::new(data, vtable) } unsafe fn wake(ptr: *const ()) { let rc = Rc::from_raw(ptr as *const Task); rc.wake_(); } unsafe fn wake_by_ref(ptr: *const ()) { let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); rc.wake_by_ref_(); } unsafe fn drop_waker(ptr: *const ()) { drop(Rc::from_raw(ptr as *const Task)); } } #[allow(clippy::redundant_clone)] // The clone here isn't actually redundant. unsafe fn increase_refcount(data: *const ()) { // Retain Rc, but don't touch refcount by wrapping in ManuallyDrop let rc = mem::ManuallyDrop::new(Rc::::from_raw(data as *const Task)); // Now increase refcount, but don't drop new refcount either let _rc_clone: mem::ManuallyDrop<_> = rc.clone(); }