1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
use std::{
cell::RefCell,
os::unix::prelude::{AsRawFd, RawFd},
rc::Rc,
task::{Context, Waker},
};
use polling::{Event, Poller};
#[inline] // 内联
pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> {
// create: 当前根模块
// executor: 模块名
// EX 静态变量
// reactor 本身是 Rc<RefCell<Reactor>>, 所以 clone 的是指向 实例 的引用
crate::executor::EX.with(|ex| ex.reactor.clone()) //
}
#[derive(Debug)]
pub struct Reactor {
poller: Poller, // epoll 的包装
waker_mapping: rustc_hash::FxHashMap<u64, Waker>, // token(io事件) -> waker 的映射, slab 更加常见
buffer: Vec<Event>, // IO 事件缓冲区
}
impl Reactor {
// 初始化
pub fn new() -> Self {
Self {
poller: Poller::new().unwrap(),
waker_mapping: Default::default(),
buffer: Vec::with_capacity(2048), // 默认 buffer 长度 2048
}
}
// Epoll related
pub fn add(&mut self, fd: RawFd) {
println!("[reactor] add fd {}", fd);
let flags =
nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap())
.unwrap();
let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK; // 设置为非阻塞
nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap();
self.poller
.add(fd, polling::Event::none(fd as usize))
.unwrap(); // 添加到 epoll
}
// 示例项目是 tcp,仅定义 2 类事件
// 读事件
pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) {
println!("[reactor] modify_readable fd {} token {}", fd, fd * 2);
self.push_completion(fd as u64 * 2, cx); // token 为 fd * 2
let event = polling::Event::readable(fd as usize); // 可读事件
self.poller.modify(fd, event); // 添加监听
}
pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) {
println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1);
self.push_completion(fd as u64 * 2 + 1, cx); // token 为 fd * 2 + 1
let event = polling::Event::writable(fd as usize); // 可写事件
self.poller.modify(fd, event); // 添加监听
}
pub fn wait(&mut self) { // 等价于 epoll_wait 阻塞
println!("[reactor] waiting");
self.poller.wait(&mut self.buffer, None); // epoll_wait
println!("[reactor] wait done"); // 已经有事件了
for i in 0..self.buffer.len() { // 遍历事件 buffer
let event = self.buffer.swap_remove(0); // 队列删除并返回
if event.readable { // 可读事件
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) {
println!(
"[reactor token] fd {} read waker token {} removed and woken",
event.key,
event.key * 2
);
waker.wake(); // 唤醒
}
}
if event.writable { // 可写事件
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) {
println!(
"[reactor token] fd {} write waker token {} removed and woken",
event.key,
event.key * 2 + 1
);
waker.wake(); // 唤醒
}
}
}
}
pub fn delete(&mut self, fd: RawFd) {
println!("[reactor] delete fd {}", fd);
// 删除 fd 对应的 waker
self.waker_mapping.remove(&(fd as u64 * 2));
self.waker_mapping.remove(&(fd as u64 * 2 + 1));
println!(
"[reactor token] fd {} wakers token {}, {} removed",
fd,
fd * 2,
fd * 2 + 1
);
}
fn push_completion(&mut self, token: u64, cx: &mut Context) {
println!("[reactor token] token {} waker saved", token);
self.waker_mapping.insert(token, cx.waker().clone());
}
}
//Default 是 rust 的内置 trait,用于为类型提供默认值
impl Default for Reactor {
fn default() -> Self {
Self::new()
}
}
|