summaryrefslogtreecommitdiff
path: root/src/reactor.rs
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2022-10-31 17:03:10 +0800
committerluwenpeng <[email protected]>2022-11-03 14:30:58 +0800
commit9db73f57c452aa05da55211fd30af568a57857fc (patch)
tree237c6b2adaad64da7733638c7fd2ab0c9d88b7f2 /src/reactor.rs
parent54d9885220d2e0cd0167f6cbb10c7b0d9e762df2 (diff)
[delete] example/echo.rsHEADlwp-self-study
[update] src/lib.rs -- 增加调试日志 [update] src/main.rs -- 增加调试日志 [update] src/tcp.rs i -- 增加调试日志 [update] src/reactor.rs -- 增加调试日志 [update] src/executor.rs -- 增加调试日志
Diffstat (limited to 'src/reactor.rs')
-rw-r--r--src/reactor.rs127
1 files changed, 79 insertions, 48 deletions
diff --git a/src/reactor.rs b/src/reactor.rs
index 51133ed..ba064ee 100644
--- a/src/reactor.rs
+++ b/src/reactor.rs
@@ -1,113 +1,144 @@
-use std::{
- cell::RefCell,
- os::unix::prelude::{AsRawFd, RawFd},
- rc::Rc,
- task::{Context, Waker},
-};
-
-use polling::{Event, Poller};
-
+// luwenpeng 2022/11/01
+
+use polling::Event;
+use polling::Poller;
+use std::cell::RefCell;
+use std::os::unix::prelude::AsRawFd;
+use std::os::unix::prelude::RawFd;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Waker;
+use std::thread::sleep;
+
+/*
+ * 通过 executor 获取 reactor
+ *
+ * 单线程中
+ * Rc<T> : (多个所有者可读)
+ * RefCell<T> : (一个所有者可写)
+ * Rc<RefCell<T>> : (多个所有者可写)
+ */
#[inline]
pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> {
- crate::executor::EX.with(|ex| ex.reactor.clone())
+ crate::executor::THREAD_LOCAL_EXECUTOR.with(|executor| executor.reactor.clone())
}
+/******************************************************************************
+ * Reactor
+ *
+ * fun:
+ *
+ * reactor.new(); 创建 reactor
+ * reactor.add(); 将 fd 添加到 reactor 的 epoll 中,但未设置读写事件
+ * reactor.del(); 将 fd 从
+ * reactor.mod_read();
+ * reactor.mod_write();
+ * reactor.wait();
+ ******************************************************************************/
+
#[derive(Debug)]
pub struct Reactor {
- poller: Poller,
- waker_mapping: rustc_hash::FxHashMap<u64, Waker>,
-
- buffer: Vec<Event>,
+ poller: Poller, // poll
+ waker_mapping: rustc_hash::FxHashMap<u64, Waker>, // sockfd 与 waker 的映射关系
+ buffer: Vec<Event>, // Event 的就绪队列
}
impl Reactor {
pub fn new() -> Self {
+ println!("[reactor] new");
+
Self {
poller: Poller::new().unwrap(),
waker_mapping: Default::default(),
-
buffer: Vec::with_capacity(2048),
}
}
- // Epoll related
pub fn add(&mut self, fd: RawFd) {
- println!("[reactor] add fd {}", fd);
+ println!("[reactor] add fd: {}, event: none", fd);
+ // 将 fd 设置为 nonblocking
let flags =
nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap())
.unwrap();
let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK;
nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap();
+
+ // 将 fd 添加到 poller 中
self.poller
.add(fd, polling::Event::none(fd as usize))
.unwrap();
}
- pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) {
- println!("[reactor] modify_readable fd {} token {}", fd, fd * 2);
+ pub fn del(&mut self, fd: RawFd) {
+ println!("[reactor] del fd: {}, event: all", fd,);
+
+ // 将 fd 读写事件的 ctx 从 map 中删除
+ self.waker_mapping.remove(&(fd as u64 * 2));
+ self.waker_mapping.remove(&(fd as u64 * 2 + 1));
+
+ // 将 fd 从 poller 中移除
+ self.poller.delete(fd).unwrap();
+ }
+
+ pub fn mod_read(&mut self, fd: RawFd, ctx: &mut Context) {
+ println!("[reactor] mod fd: {}, event: read", fd,);
+
+ // 将 fd 可读事件的 ctx 存储到 map 中
+ self.waker_mapping
+ .insert(fd as u64 * 2, ctx.waker().clone());
- self.push_completion(fd as u64 * 2, cx);
+ // fd 注册可读事件
let event = polling::Event::readable(fd as usize);
self.poller.modify(fd, event);
}
- pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) {
- println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1);
+ pub fn mod_write(&mut self, fd: RawFd, ctx: &mut Context) {
+ println!("[reactor] mod fd: {}, event: write", fd,);
- self.push_completion(fd as u64 * 2 + 1, cx);
+ // 将 fd 可写事件的 ctx 存储到 map 中
+ self.waker_mapping
+ .insert(fd as u64 * 2 + 1, ctx.waker().clone());
+
+ // fd 注册可写事件
let event = polling::Event::writable(fd as usize);
self.poller.modify(fd, event);
}
pub fn wait(&mut self) {
- println!("[reactor] waiting");
+ println!("[reactor] waiting ...");
+
+ // TODO 增加定时器 timeout
self.poller.wait(&mut self.buffer, None);
- println!("[reactor] wait done");
+ println!("[reactor] {} events is ready", self.buffer.len());
for i in 0..self.buffer.len() {
let event = self.buffer.swap_remove(0);
+
+ // TODO 是否会出现 readable && writable 的 event
+
+ // 优先处理读事件
if event.readable {
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) {
println!(
- "[reactor token] fd {} read waker token {} removed and woken",
+ "[reactor] wake fd: {}, event: read, action: run waker.wake()",
event.key,
- event.key * 2
);
waker.wake();
}
}
+
if event.writable {
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) {
println!(
- "[reactor token] fd {} write waker token {} removed and woken",
+ "[reactor] wake fd: {}, event: write, action: run waker.wake()",
event.key,
- event.key * 2 + 1
);
waker.wake();
}
}
}
}
-
- pub fn delete(&mut self, fd: RawFd) {
- println!("[reactor] delete fd {}", fd);
-
- self.waker_mapping.remove(&(fd as u64 * 2));
- self.waker_mapping.remove(&(fd as u64 * 2 + 1));
- println!(
- "[reactor token] fd {} wakers token {}, {} removed",
- fd,
- fd * 2,
- fd * 2 + 1
- );
- }
-
- fn push_completion(&mut self, token: u64, cx: &mut Context) {
- println!("[reactor token] token {} waker saved", token);
-
- self.waker_mapping.insert(token, cx.waker().clone());
- }
}
impl Default for Reactor {