use std::{ cell::RefCell, os::unix::prelude::{AsRawFd, RawFd}, rc::Rc, task::{Context, Waker}, }; use polling::{Event, Poller}; #[inline] pub(crate) fn get_reactor() -> Rc> { crate::executor::EX.with(|ex| ex.reactor.clone()) } #[derive(Debug)] pub struct Reactor { poller: Poller, // epoll 的包装 waker_mapping: rustc_hash::FxHashMap, // token -> waker 的映射, slab 更加常见 buffer: Vec, // 事件缓冲区 } impl Reactor { // 初始化 pub fn new() -> Self { Self { poller: Poller::new().unwrap(), waker_mapping: Default::default(), buffer: Vec::with_capacity(2048), // 默认 buffer 长度 2048 } } // Epoll related pub fn add(&mut self, fd: RawFd) { println!("[reactor] add fd {}", fd); let flags = nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap()) .unwrap(); let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK; // 设置为非阻塞 nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap(); self.poller .add(fd, polling::Event::none(fd as usize)) .unwrap(); // 添加到 epoll } // 示例项目是 tcp,仅定义 2 类事件 // 读事件 pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) { println!("[reactor] modify_readable fd {} token {}", fd, fd * 2); self.push_completion(fd as u64 * 2, cx); // token 为 fd * 2 let event = polling::Event::readable(fd as usize); // 可读事件 self.poller.modify(fd, event); // 添加监听 } pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) { println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1); self.push_completion(fd as u64 * 2 + 1, cx); // token 为 fd * 2 + 1 let event = polling::Event::writable(fd as usize); // 可写事件 self.poller.modify(fd, event); // 添加监听 } pub fn wait(&mut self) { println!("[reactor] waiting"); self.poller.wait(&mut self.buffer, None); // epoll_wait println!("[reactor] wait done"); // 已经有事件了 for i in 0..self.buffer.len() { // 遍历事件 buffer let event = self.buffer.swap_remove(0); // 取出事件 if event.readable { // 可读事件 if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) { println!( "[reactor token] fd {} read waker token {} removed and woken", event.key, event.key * 2 ); waker.wake(); // 唤醒 } } if event.writable { // 可写事件 if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) { println!( "[reactor token] fd {} write waker token {} removed and woken", event.key, event.key * 2 + 1 ); waker.wake(); // 唤醒 } } } } pub fn delete(&mut self, fd: RawFd) { println!("[reactor] delete fd {}", fd); // 删除 fd 对应的 waker self.waker_mapping.remove(&(fd as u64 * 2)); self.waker_mapping.remove(&(fd as u64 * 2 + 1)); println!( "[reactor token] fd {} wakers token {}, {} removed", fd, fd * 2, fd * 2 + 1 ); } fn push_completion(&mut self, token: u64, cx: &mut Context) { println!("[reactor token] token {} waker saved", token); self.waker_mapping.insert(token, cx.waker().clone()); } } //Default 是 rust 的内置 trait,用于为类型提供默认值 impl Default for Reactor { fn default() -> Self { Self::new() } }