use std::{ cell::RefCell, os::unix::prelude::{AsRawFd, RawFd}, rc::Rc, task::{Context, Waker}, }; use polling::{Event, Poller}; #[inline] pub(crate) fn get_reactor() -> Rc> { crate::executor::EX.with(|ex| ex.reactor.clone()) } #[derive(Debug)] pub struct Reactor { poller: Poller, waker_mapping: rustc_hash::FxHashMap, buffer: Vec, } impl Reactor { pub fn new() -> Self { Self { poller: Poller::new().unwrap(), waker_mapping: Default::default(), buffer: Vec::with_capacity(2048), } } // Epoll related pub fn add(&mut self, fd: RawFd) { println!("[reactor] add fd {}", fd); let flags = nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap()) .unwrap(); let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK; nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap(); self.poller .add(fd, polling::Event::none(fd as usize)) .unwrap(); } pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) { println!("[reactor] modify_readable fd {} token {}", fd, fd * 2); self.push_completion(fd as u64 * 2, cx); let event = polling::Event::readable(fd as usize); self.poller.modify(fd, event); } pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) { println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1); self.push_completion(fd as u64 * 2 + 1, cx); let event = polling::Event::writable(fd as usize); self.poller.modify(fd, event); } pub fn wait(&mut self) { println!("[reactor] waiting"); self.poller.wait(&mut self.buffer, None); println!("[reactor] wait done"); for i in 0..self.buffer.len() { let event = self.buffer.swap_remove(0); if event.readable { if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) { println!( "[reactor token] fd {} read waker token {} removed and woken", event.key, event.key * 2 ); waker.wake(); } } if event.writable { if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) { println!( "[reactor token] fd {} write waker token {} removed and woken", event.key, event.key * 2 + 1 ); waker.wake(); } } } } pub fn delete(&mut self, fd: RawFd) { println!("[reactor] delete fd {}", fd); self.waker_mapping.remove(&(fd as u64 * 2)); self.waker_mapping.remove(&(fd as u64 * 2 + 1)); println!( "[reactor token] fd {} wakers token {}, {} removed", fd, fd * 2, fd * 2 + 1 ); } fn push_completion(&mut self, token: u64, cx: &mut Context) { println!("[reactor token] token {} waker saved", token); self.waker_mapping.insert(token, cx.waker().clone()); } } impl Default for Reactor { fn default() -> Self { Self::new() } }