1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
use std::{
cell::RefCell,
os::unix::prelude::{AsRawFd, RawFd},
rc::Rc,
task::{Context, Waker},
};
use polling::{Event, Poller};
#[inline]
pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> {
crate::executor::EX.with(|ex| ex.reactor.clone())
}
#[derive(Debug)]
pub struct Reactor {
poller: Poller,
waker_mapping: rustc_hash::FxHashMap<u64, Waker>,
buffer: Vec<Event>,
}
impl Reactor {
pub fn new() -> Self {
Self {
poller: Poller::new().unwrap(),
waker_mapping: Default::default(),
buffer: Vec::with_capacity(2048),
}
}
// Epoll related
pub fn add(&mut self, fd: RawFd) {
println!("[reactor] add fd {}", fd);
let flags =
nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap())
.unwrap();
let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK;
nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap();
self.poller
.add(fd, polling::Event::none(fd as usize))
.unwrap();
}
pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) {
println!("[reactor] modify_readable fd {} token {}", fd, fd * 2);
self.push_completion(fd as u64 * 2, cx);
let event = polling::Event::readable(fd as usize);
self.poller.modify(fd, event);
}
pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) {
println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1);
self.push_completion(fd as u64 * 2 + 1, cx);
let event = polling::Event::writable(fd as usize);
self.poller.modify(fd, event);
}
pub fn wait(&mut self) {
println!("[reactor] waiting");
self.poller.wait(&mut self.buffer, None);
println!("[reactor] wait done");
for i in 0..self.buffer.len() {
let event = self.buffer.swap_remove(0);
if event.readable {
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) {
println!(
"[reactor token] fd {} read waker token {} removed and woken",
event.key,
event.key * 2
);
waker.wake();
}
}
if event.writable {
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) {
println!(
"[reactor token] fd {} write waker token {} removed and woken",
event.key,
event.key * 2 + 1
);
waker.wake();
}
}
}
}
pub fn delete(&mut self, fd: RawFd) {
println!("[reactor] delete fd {}", fd);
self.waker_mapping.remove(&(fd as u64 * 2));
self.waker_mapping.remove(&(fd as u64 * 2 + 1));
println!(
"[reactor token] fd {} wakers token {}, {} removed",
fd,
fd * 2,
fd * 2 + 1
);
}
fn push_completion(&mut self, token: u64, cx: &mut Context) {
println!("[reactor token] token {} waker saved", token);
self.waker_mapping.insert(token, cx.waker().clone());
}
}
impl Default for Reactor {
fn default() -> Self {
Self::new()
}
}
|