diff options
Diffstat (limited to 'src/reactor.rs')
| -rw-r--r-- | src/reactor.rs | 127 |
1 files changed, 79 insertions, 48 deletions
diff --git a/src/reactor.rs b/src/reactor.rs index 51133ed..ba064ee 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -1,113 +1,144 @@ -use std::{ - cell::RefCell, - os::unix::prelude::{AsRawFd, RawFd}, - rc::Rc, - task::{Context, Waker}, -}; - -use polling::{Event, Poller}; - +// luwenpeng 2022/11/01 + +use polling::Event; +use polling::Poller; +use std::cell::RefCell; +use std::os::unix::prelude::AsRawFd; +use std::os::unix::prelude::RawFd; +use std::rc::Rc; +use std::task::Context; +use std::task::Waker; +use std::thread::sleep; + +/* + * 通过 executor 获取 reactor + * + * 单线程中 + * Rc<T> : (多个所有者可读) + * RefCell<T> : (一个所有者可写) + * Rc<RefCell<T>> : (多个所有者可写) + */ #[inline] pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> { - crate::executor::EX.with(|ex| ex.reactor.clone()) + crate::executor::THREAD_LOCAL_EXECUTOR.with(|executor| executor.reactor.clone()) } +/****************************************************************************** + * Reactor + * + * fun: + * + * reactor.new(); 创建 reactor + * reactor.add(); 将 fd 添加到 reactor 的 epoll 中,但未设置读写事件 + * reactor.del(); 将 fd 从 + * reactor.mod_read(); + * reactor.mod_write(); + * reactor.wait(); + ******************************************************************************/ + #[derive(Debug)] pub struct Reactor { - poller: Poller, - waker_mapping: rustc_hash::FxHashMap<u64, Waker>, - - buffer: Vec<Event>, + poller: Poller, // poll + waker_mapping: rustc_hash::FxHashMap<u64, Waker>, // sockfd 与 waker 的映射关系 + buffer: Vec<Event>, // Event 的就绪队列 } impl Reactor { pub fn new() -> Self { + println!("[reactor] new"); + Self { poller: Poller::new().unwrap(), waker_mapping: Default::default(), - buffer: Vec::with_capacity(2048), } } - // Epoll related pub fn add(&mut self, fd: RawFd) { - println!("[reactor] add fd {}", fd); + println!("[reactor] add fd: {}, event: none", fd); + // 将 fd 设置为 nonblocking let flags = nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap()) .unwrap(); let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK; nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap(); + + // 将 fd 添加到 poller 中 self.poller .add(fd, polling::Event::none(fd as usize)) .unwrap(); } - pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) { - println!("[reactor] modify_readable fd {} token {}", fd, fd * 2); + pub fn del(&mut self, fd: RawFd) { + println!("[reactor] del fd: {}, event: all", fd,); + + // 将 fd 读写事件的 ctx 从 map 中删除 + self.waker_mapping.remove(&(fd as u64 * 2)); + self.waker_mapping.remove(&(fd as u64 * 2 + 1)); + + // 将 fd 从 poller 中移除 + self.poller.delete(fd).unwrap(); + } + + pub fn mod_read(&mut self, fd: RawFd, ctx: &mut Context) { + println!("[reactor] mod fd: {}, event: read", fd,); + + // 将 fd 可读事件的 ctx 存储到 map 中 + self.waker_mapping + .insert(fd as u64 * 2, ctx.waker().clone()); - self.push_completion(fd as u64 * 2, cx); + // fd 注册可读事件 let event = polling::Event::readable(fd as usize); self.poller.modify(fd, event); } - pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) { - println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1); + pub fn mod_write(&mut self, fd: RawFd, ctx: &mut Context) { + println!("[reactor] mod fd: {}, event: write", fd,); - self.push_completion(fd as u64 * 2 + 1, cx); + // 将 fd 可写事件的 ctx 存储到 map 中 + self.waker_mapping + .insert(fd as u64 * 2 + 1, ctx.waker().clone()); + + // fd 注册可写事件 let event = polling::Event::writable(fd as usize); self.poller.modify(fd, event); } pub fn wait(&mut self) { - println!("[reactor] waiting"); + println!("[reactor] waiting ..."); + + // TODO 增加定时器 timeout self.poller.wait(&mut self.buffer, None); - println!("[reactor] wait done"); + println!("[reactor] {} events is ready", self.buffer.len()); for i in 0..self.buffer.len() { let event = self.buffer.swap_remove(0); + + // TODO 是否会出现 readable && writable 的 event + + // 优先处理读事件 if event.readable { if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) { println!( - "[reactor token] fd {} read waker token {} removed and woken", + "[reactor] wake fd: {}, event: read, action: run waker.wake()", event.key, - event.key * 2 ); waker.wake(); } } + if event.writable { if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) { println!( - "[reactor token] fd {} write waker token {} removed and woken", + "[reactor] wake fd: {}, event: write, action: run waker.wake()", event.key, - event.key * 2 + 1 ); waker.wake(); } } } } - - pub fn delete(&mut self, fd: RawFd) { - println!("[reactor] delete fd {}", fd); - - self.waker_mapping.remove(&(fd as u64 * 2)); - self.waker_mapping.remove(&(fd as u64 * 2 + 1)); - println!( - "[reactor token] fd {} wakers token {}, {} removed", - fd, - fd * 2, - fd * 2 + 1 - ); - } - - fn push_completion(&mut self, token: u64, cx: &mut Context) { - println!("[reactor token] token {} waker saved", token); - - self.waker_mapping.insert(token, cx.waker().clone()); - } } impl Default for Reactor { |
