summaryrefslogtreecommitdiff
path: root/src/executor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/executor.rs')
-rw-r--r--src/executor.rs174
1 files changed, 174 insertions, 0 deletions
diff --git a/src/executor.rs b/src/executor.rs
new file mode 100644
index 0000000..42cd00f
--- /dev/null
+++ b/src/executor.rs
@@ -0,0 +1,174 @@
+use std::{
+ cell::RefCell,
+ collections::VecDeque,
+ marker::PhantomData,
+ mem,
+ rc::Rc,
+ task::{RawWaker, RawWakerVTable, Waker, Context}, pin::Pin,
+};
+
+use futures::{future::LocalBoxFuture, Future, FutureExt};
+
+use crate::reactor::Reactor;
+
+scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor);
+
+pub struct Executor {
+ local_queue: TaskQueue,
+ pub(crate) reactor: Rc<RefCell<Reactor>>,
+
+ /// Make sure the type is `!Send` and `!Sync`.
+ _marker: PhantomData<Rc<()>>,
+}
+
+impl Default for Executor {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+
+impl Executor {
+ pub fn new() -> Self {
+ Self {
+ local_queue: TaskQueue::default(),
+ reactor: Rc::new(RefCell::new(Reactor::default())),
+
+ _marker: PhantomData,
+ }
+ }
+
+ pub fn spawn(fut: impl Future<Output = ()> + 'static) {
+ let t = Rc::new(Task {
+ future: RefCell::new(fut.boxed_local()),
+ });
+ EX.with(|ex| ex.local_queue.push(t));
+ }
+
+ pub fn block_on<F, T, O>(&self, f: F) -> O
+ where
+ F: Fn() -> T,
+ T: Future<Output = O> + 'static,
+ {
+ let _waker = waker_fn::waker_fn(|| {});
+ let cx = &mut Context::from_waker(&_waker);
+
+ EX.set(self, || {
+ let fut = f();
+ pin_utils::pin_mut!(fut);
+ loop {
+ // return if the outer future is ready
+ if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
+ break t;
+ }
+
+ // consume all tasks
+ while let Some(t) = self.local_queue.pop() {
+ let future = t.future.borrow_mut();
+ let w = waker(t.clone());
+ let mut context = Context::from_waker(&w);
+ let _ = Pin::new(future).as_mut().poll(&mut context);
+ }
+
+ // no task to execute now, it may ready
+ if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
+ break t;
+ }
+
+ // block for io
+ self.reactor.borrow_mut().wait();
+ }
+ })
+ }
+}
+
+pub struct TaskQueue {
+ queue: RefCell<VecDeque<Rc<Task>>>,
+}
+
+impl Default for TaskQueue {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl TaskQueue {
+ pub fn new() -> Self {
+ const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
+ Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
+ }
+ pub fn new_with_capacity(capacity: usize) -> Self {
+ Self {
+ queue: RefCell::new(VecDeque::with_capacity(capacity)),
+ }
+ }
+
+ pub(crate) fn push(&self, runnable: Rc<Task>) {
+ println!("add task");
+ self.queue.borrow_mut().push_back(runnable);
+ }
+
+ pub(crate) fn pop(&self) -> Option<Rc<Task>> {
+ println!("remove task");
+ self.queue.borrow_mut().pop_front()
+ }
+}
+
+pub struct Task {
+ future: RefCell<LocalBoxFuture<'static, ()>>,
+}
+
+fn waker(wake: Rc<Task>) -> Waker {
+ let ptr = Rc::into_raw(wake) as *const ();
+ let vtable = &Helper::VTABLE;
+ unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) }
+}
+
+impl Task {
+ fn wake_(self: Rc<Self>) {
+ Self::wake_by_ref_(&self)
+ }
+
+ fn wake_by_ref_(self: &Rc<Self>) {
+ EX.with(|ex| ex.local_queue.push(self.clone()));
+ }
+}
+
+struct Helper;
+
+impl Helper {
+ const VTABLE: RawWakerVTable = RawWakerVTable::new(
+ Self::clone_waker,
+ Self::wake,
+ Self::wake_by_ref,
+ Self::drop_waker,
+ );
+
+ unsafe fn clone_waker(data: *const ()) -> RawWaker {
+ increase_refcount(data);
+ let vtable = &Self::VTABLE;
+ RawWaker::new(data, vtable)
+ }
+
+ unsafe fn wake(ptr: *const ()) {
+ let rc = Rc::from_raw(ptr as *const Task);
+ rc.wake_();
+ }
+
+ unsafe fn wake_by_ref(ptr: *const ()) {
+ let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
+ rc.wake_by_ref_();
+ }
+
+ unsafe fn drop_waker(ptr: *const ()) {
+ drop(Rc::from_raw(ptr as *const Task));
+ }
+}
+
+#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant.
+unsafe fn increase_refcount(data: *const ()) {
+ // Retain Rc, but don't touch refcount by wrapping in ManuallyDrop
+ let rc = mem::ManuallyDrop::new(Rc::<Task>::from_raw(data as *const Task));
+ // Now increase refcount, but don't drop new refcount either
+ let _rc_clone: mem::ManuallyDrop<_> = rc.clone();
+}