summaryrefslogtreecommitdiff
path: root/src/reactor.rs
blob: 0e4f2618360c31b1aac66e47b8c3950baa72f637 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::{
    cell::RefCell,
    os::unix::prelude::{AsRawFd, RawFd},
    rc::Rc,
    task::{Context, Waker},
};

use polling::{Event, Poller};

#[inline] // 内联
pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> {
    // create: 当前根模块
    // executor: 模块名
    // EX 静态变量
    // reactor 本身是 Rc<RefCell<Reactor>>, 所以 clone 的是指向 实例 的引用
    crate::executor::EX.with(|ex| ex.reactor.clone()) // 
}

#[derive(Debug)]
pub struct Reactor {
    poller: Poller,                                   // epoll 的包装
    waker_mapping: rustc_hash::FxHashMap<u64, Waker>, // token(io事件) -> waker 的映射, slab 更加常见

    buffer: Vec<Event>, // IO 事件缓冲区
}

impl Reactor {
    // 初始化
    pub fn new() -> Self {
        Self {
            poller: Poller::new().unwrap(),
            waker_mapping: Default::default(),

            buffer: Vec::with_capacity(2048), // 默认 buffer 长度 2048
        }
    }

    // Epoll related
    pub fn add(&mut self, fd: RawFd) {
        println!("[reactor] add fd {}", fd);

        let flags =
            nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap())
                .unwrap();
        let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK; // 设置为非阻塞
        nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap();
        self.poller
            .add(fd, polling::Event::none(fd as usize))
            .unwrap(); // 添加到 epoll
    }

    // 示例项目是 tcp,仅定义 2 类事件
    // 读事件
    pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) {
        println!("[reactor] modify_readable fd {} token {}", fd, fd * 2);

        self.push_completion(fd as u64 * 2, cx); // token 为 fd * 2
        let event = polling::Event::readable(fd as usize); // 可读事件
        self.poller.modify(fd, event); // 添加监听 
    }

    pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) {
        println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1);

        self.push_completion(fd as u64 * 2 + 1, cx); // token 为 fd * 2 + 1
        let event = polling::Event::writable(fd as usize); // 可写事件
        self.poller.modify(fd, event); // 添加监听
    }

    pub fn wait(&mut self) { // 等价于 epoll_wait 阻塞
        println!("[reactor] waiting");
        self.poller.wait(&mut self.buffer, None); // epoll_wait
        println!("[reactor] wait done"); // 已经有事件了

        for i in 0..self.buffer.len() { // 遍历事件 buffer
            let event = self.buffer.swap_remove(0); // 队列删除并返回 
            if event.readable { // 可读事件
                if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) {
                    println!(
                        "[reactor token] fd {} read waker token {} removed and woken",
                        event.key,
                        event.key * 2
                    );
                    waker.wake(); // 唤醒
                }
            }
            if event.writable { // 可写事件
                if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) {
                    println!(
                        "[reactor token] fd {} write waker token {} removed and woken",
                        event.key,
                        event.key * 2 + 1
                    );
                    waker.wake(); // 唤醒
                }
            }
        }
    }

    pub fn delete(&mut self, fd: RawFd) {
        println!("[reactor] delete fd {}", fd);
        // 删除 fd 对应的 waker
        self.waker_mapping.remove(&(fd as u64 * 2)); 
        self.waker_mapping.remove(&(fd as u64 * 2 + 1));
        println!(
            "[reactor token] fd {} wakers token {}, {} removed",
            fd,
            fd * 2,
            fd * 2 + 1
        );
    }

    fn push_completion(&mut self, token: u64, cx: &mut Context) {
        println!("[reactor token] token {} waker saved", token);

        self.waker_mapping.insert(token, cx.waker().clone());
    }
}
//Default 是 rust 的内置 trait,用于为类型提供默认值
impl Default for Reactor {
    fn default() -> Self {
        Self::new()
    }
}