summaryrefslogtreecommitdiff
path: root/src/reactor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/reactor.rs')
-rw-r--r--src/reactor.rs117
1 files changed, 117 insertions, 0 deletions
diff --git a/src/reactor.rs b/src/reactor.rs
new file mode 100644
index 0000000..51133ed
--- /dev/null
+++ b/src/reactor.rs
@@ -0,0 +1,117 @@
+use std::{
+ cell::RefCell,
+ os::unix::prelude::{AsRawFd, RawFd},
+ rc::Rc,
+ task::{Context, Waker},
+};
+
+use polling::{Event, Poller};
+
+#[inline]
+pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> {
+ crate::executor::EX.with(|ex| ex.reactor.clone())
+}
+
+#[derive(Debug)]
+pub struct Reactor {
+ poller: Poller,
+ waker_mapping: rustc_hash::FxHashMap<u64, Waker>,
+
+ buffer: Vec<Event>,
+}
+
+impl Reactor {
+ pub fn new() -> Self {
+ Self {
+ poller: Poller::new().unwrap(),
+ waker_mapping: Default::default(),
+
+ buffer: Vec::with_capacity(2048),
+ }
+ }
+
+ // Epoll related
+ pub fn add(&mut self, fd: RawFd) {
+ println!("[reactor] add fd {}", fd);
+
+ let flags =
+ nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap())
+ .unwrap();
+ let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK;
+ nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap();
+ self.poller
+ .add(fd, polling::Event::none(fd as usize))
+ .unwrap();
+ }
+
+ pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) {
+ println!("[reactor] modify_readable fd {} token {}", fd, fd * 2);
+
+ self.push_completion(fd as u64 * 2, cx);
+ let event = polling::Event::readable(fd as usize);
+ self.poller.modify(fd, event);
+ }
+
+ pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) {
+ println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1);
+
+ self.push_completion(fd as u64 * 2 + 1, cx);
+ let event = polling::Event::writable(fd as usize);
+ self.poller.modify(fd, event);
+ }
+
+ pub fn wait(&mut self) {
+ println!("[reactor] waiting");
+ self.poller.wait(&mut self.buffer, None);
+ println!("[reactor] wait done");
+
+ for i in 0..self.buffer.len() {
+ let event = self.buffer.swap_remove(0);
+ if event.readable {
+ if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) {
+ println!(
+ "[reactor token] fd {} read waker token {} removed and woken",
+ event.key,
+ event.key * 2
+ );
+ waker.wake();
+ }
+ }
+ if event.writable {
+ if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) {
+ println!(
+ "[reactor token] fd {} write waker token {} removed and woken",
+ event.key,
+ event.key * 2 + 1
+ );
+ waker.wake();
+ }
+ }
+ }
+ }
+
+ pub fn delete(&mut self, fd: RawFd) {
+ println!("[reactor] delete fd {}", fd);
+
+ self.waker_mapping.remove(&(fd as u64 * 2));
+ self.waker_mapping.remove(&(fd as u64 * 2 + 1));
+ println!(
+ "[reactor token] fd {} wakers token {}, {} removed",
+ fd,
+ fd * 2,
+ fd * 2 + 1
+ );
+ }
+
+ fn push_completion(&mut self, token: u64, cx: &mut Context) {
+ println!("[reactor token] token {} waker saved", token);
+
+ self.waker_mapping.insert(token, cx.waker().clone());
+ }
+}
+
+impl Default for Reactor {
+ fn default() -> Self {
+ Self::new()
+ }
+}