diff options
Diffstat (limited to 'src/reactor.rs')
| -rw-r--r-- | src/reactor.rs | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/src/reactor.rs b/src/reactor.rs new file mode 100644 index 0000000..51133ed --- /dev/null +++ b/src/reactor.rs @@ -0,0 +1,117 @@ +use std::{ + cell::RefCell, + os::unix::prelude::{AsRawFd, RawFd}, + rc::Rc, + task::{Context, Waker}, +}; + +use polling::{Event, Poller}; + +#[inline] +pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> { + crate::executor::EX.with(|ex| ex.reactor.clone()) +} + +#[derive(Debug)] +pub struct Reactor { + poller: Poller, + waker_mapping: rustc_hash::FxHashMap<u64, Waker>, + + buffer: Vec<Event>, +} + +impl Reactor { + pub fn new() -> Self { + Self { + poller: Poller::new().unwrap(), + waker_mapping: Default::default(), + + buffer: Vec::with_capacity(2048), + } + } + + // Epoll related + pub fn add(&mut self, fd: RawFd) { + println!("[reactor] add fd {}", fd); + + let flags = + nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap()) + .unwrap(); + let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK; + nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap(); + self.poller + .add(fd, polling::Event::none(fd as usize)) + .unwrap(); + } + + pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) { + println!("[reactor] modify_readable fd {} token {}", fd, fd * 2); + + self.push_completion(fd as u64 * 2, cx); + let event = polling::Event::readable(fd as usize); + self.poller.modify(fd, event); + } + + pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) { + println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1); + + self.push_completion(fd as u64 * 2 + 1, cx); + let event = polling::Event::writable(fd as usize); + self.poller.modify(fd, event); + } + + pub fn wait(&mut self) { + println!("[reactor] waiting"); + self.poller.wait(&mut self.buffer, None); + println!("[reactor] wait done"); + + for i in 0..self.buffer.len() { + let event = self.buffer.swap_remove(0); + if event.readable { + if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) { + println!( + "[reactor token] fd {} read waker token {} removed and woken", + event.key, + event.key * 2 + ); + waker.wake(); + } + } + if event.writable { + if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) { + println!( + "[reactor token] fd {} write waker token {} removed and woken", + event.key, + event.key * 2 + 1 + ); + waker.wake(); + } + } + } + } + + pub fn delete(&mut self, fd: RawFd) { + println!("[reactor] delete fd {}", fd); + + self.waker_mapping.remove(&(fd as u64 * 2)); + self.waker_mapping.remove(&(fd as u64 * 2 + 1)); + println!( + "[reactor token] fd {} wakers token {}, {} removed", + fd, + fd * 2, + fd * 2 + 1 + ); + } + + fn push_completion(&mut self, token: u64, cx: &mut Context) { + println!("[reactor token] token {} waker saved", token); + + self.waker_mapping.insert(token, cx.waker().clone()); + } +} + +impl Default for Reactor { + fn default() -> Self { + Self::new() + } +} |
