diff options
| author | ihciah <[email protected]> | 2021-11-28 01:46:23 +0800 |
|---|---|---|
| committer | ihciah <[email protected]> | 2021-11-29 16:05:45 +0800 |
| commit | 54d9885220d2e0cd0167f6cbb10c7b0d9e762df2 (patch) | |
| tree | 392992a5ee3d531018bf55c12305e0ebc2ce984a /src/executor.rs | |
init
Diffstat (limited to 'src/executor.rs')
| -rw-r--r-- | src/executor.rs | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/src/executor.rs b/src/executor.rs new file mode 100644 index 0000000..42cd00f --- /dev/null +++ b/src/executor.rs @@ -0,0 +1,174 @@ +use std::{ + cell::RefCell, + collections::VecDeque, + marker::PhantomData, + mem, + rc::Rc, + task::{RawWaker, RawWakerVTable, Waker, Context}, pin::Pin, +}; + +use futures::{future::LocalBoxFuture, Future, FutureExt}; + +use crate::reactor::Reactor; + +scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor); + +pub struct Executor { + local_queue: TaskQueue, + pub(crate) reactor: Rc<RefCell<Reactor>>, + + /// Make sure the type is `!Send` and `!Sync`. + _marker: PhantomData<Rc<()>>, +} + +impl Default for Executor { + fn default() -> Self { + Self::new() + } +} + + +impl Executor { + pub fn new() -> Self { + Self { + local_queue: TaskQueue::default(), + reactor: Rc::new(RefCell::new(Reactor::default())), + + _marker: PhantomData, + } + } + + pub fn spawn(fut: impl Future<Output = ()> + 'static) { + let t = Rc::new(Task { + future: RefCell::new(fut.boxed_local()), + }); + EX.with(|ex| ex.local_queue.push(t)); + } + + pub fn block_on<F, T, O>(&self, f: F) -> O + where + F: Fn() -> T, + T: Future<Output = O> + 'static, + { + let _waker = waker_fn::waker_fn(|| {}); + let cx = &mut Context::from_waker(&_waker); + + EX.set(self, || { + let fut = f(); + pin_utils::pin_mut!(fut); + loop { + // return if the outer future is ready + if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { + break t; + } + + // consume all tasks + while let Some(t) = self.local_queue.pop() { + let future = t.future.borrow_mut(); + let w = waker(t.clone()); + let mut context = Context::from_waker(&w); + let _ = Pin::new(future).as_mut().poll(&mut context); + } + + // no task to execute now, it may ready + if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { + break t; + } + + // block for io + self.reactor.borrow_mut().wait(); + } + }) + } +} + +pub struct TaskQueue { + queue: RefCell<VecDeque<Rc<Task>>>, +} + +impl Default for TaskQueue { + fn default() -> Self { + Self::new() + } +} + +impl TaskQueue { + pub fn new() -> Self { + const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; + Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) + } + pub fn new_with_capacity(capacity: usize) -> Self { + Self { + queue: RefCell::new(VecDeque::with_capacity(capacity)), + } + } + + pub(crate) fn push(&self, runnable: Rc<Task>) { + println!("add task"); + self.queue.borrow_mut().push_back(runnable); + } + + pub(crate) fn pop(&self) -> Option<Rc<Task>> { + println!("remove task"); + self.queue.borrow_mut().pop_front() + } +} + +pub struct Task { + future: RefCell<LocalBoxFuture<'static, ()>>, +} + +fn waker(wake: Rc<Task>) -> Waker { + let ptr = Rc::into_raw(wake) as *const (); + let vtable = &Helper::VTABLE; + unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } +} + +impl Task { + fn wake_(self: Rc<Self>) { + Self::wake_by_ref_(&self) + } + + fn wake_by_ref_(self: &Rc<Self>) { + EX.with(|ex| ex.local_queue.push(self.clone())); + } +} + +struct Helper; + +impl Helper { + const VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_waker, + Self::wake, + Self::wake_by_ref, + Self::drop_waker, + ); + + unsafe fn clone_waker(data: *const ()) -> RawWaker { + increase_refcount(data); + let vtable = &Self::VTABLE; + RawWaker::new(data, vtable) + } + + unsafe fn wake(ptr: *const ()) { + let rc = Rc::from_raw(ptr as *const Task); + rc.wake_(); + } + + unsafe fn wake_by_ref(ptr: *const ()) { + let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); + rc.wake_by_ref_(); + } + + unsafe fn drop_waker(ptr: *const ()) { + drop(Rc::from_raw(ptr as *const Task)); + } +} + +#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant. +unsafe fn increase_refcount(data: *const ()) { + // Retain Rc, but don't touch refcount by wrapping in ManuallyDrop + let rc = mem::ManuallyDrop::new(Rc::<Task>::from_raw(data as *const Task)); + // Now increase refcount, but don't drop new refcount either + let _rc_clone: mem::ManuallyDrop<_> = rc.clone(); +} |
