1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
use std::{
cell::RefCell,
collections::VecDeque,
marker::PhantomData,
mem,
pin::Pin,
rc::Rc,
task::{Context, RawWaker, RawWakerVTable, Waker},
};
use futures::{future::LocalBoxFuture, Future, FutureExt};
use crate::reactor::Reactor;
// scoped_thread_local宏创建的 本地线程独享的变量
// EX Executor 类型实例
scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor);
pub struct Executor {
local_queue: TaskQueue, // 任务队列
// 带一个 pub(crate) 限制
pub(crate) reactor: Rc<RefCell<Reactor>>,
/// Make sure the type is `!Send` and `!Sync`.
/// 不会占用任何内存,但类型是 Rc<()>,编译时就确保不会实现 Send 和 Sync
_marker: PhantomData<Rc<()>>,
}
impl Default for Executor {
fn default() -> Self {
// 构造方法
Self::new()
}
}
impl Executor {
pub fn new() -> Self {
// 构造方法
Self {
local_queue: TaskQueue::default(),
reactor: Rc::new(RefCell::new(Reactor::default())),
_marker: PhantomData,
}
}
// 创建新 task
pub fn spawn(fut: impl Future<Output = ()> + 'static) {
let t: Rc<Task> = Rc::new(Task {
future: RefCell::new(fut.boxed_local()),
}); // 打包到 Rc<Task> 中
EX.with(|ex| ex.local_queue.push(t)); // 添加到任务队列
}
pub fn block_on<F, T, O>(&self, f: F) -> O
// 真正执行开始的地方
where
F: Fn() -> T, // 闭包
T: Future<Output = O> + 'static, // task ?
{
let _waker: Waker = waker_fn::waker_fn(|| {}); // 空的 Waker
let cx = &mut Context::from_waker(&_waker); // 从 _waker 创建上下文
EX.set(self, || {
// 闭包
let fut = f(); // task 本身
pin_utils::pin_mut!(fut); // pin_mut! 宏,将 fut 变成 Pin<&mut F> 固定内存地址
loop {
// 任务队列循环
// return if the outer future is ready
// 第一次 poll
if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
break t;
}
// consume all tasks | 在 任务队列中的都是待处理任务
while let Some(t) = self.local_queue.pop() {
let future = t.future.borrow_mut();
let w = waker(t.clone()); // Waker 实例
let mut context = Context::from_waker(&w); // Context 实例
let _ = Pin::new(future).as_mut().poll(&mut context); // poll
}
// no task to execute now, it may ready | 没有任务可执行, 再试试 fut ?
if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
break t;
}
// block for io
self.reactor.borrow_mut().wait();
}
})
}
}
// 任务队列
pub struct TaskQueue {
queue: RefCell<VecDeque<Rc<Task>>>, // 直接的 VecDeque
}
impl Default for TaskQueue {
fn default() -> Self {
Self::new()
}
}
impl TaskQueue {
pub fn new() -> Self {
// 默认容量 4096
const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
}
pub fn new_with_capacity(capacity: usize) -> Self {
// 指定容量
Self {
queue: RefCell::new(VecDeque::with_capacity(capacity)),
}
}
pub(crate) fn push(&self, runnable: Rc<Task>) {
// 添加任务
println!("add task");
self.queue.borrow_mut().push_back(runnable);
}
pub(crate) fn pop(&self) -> Option<Rc<Task>> {
// 移除任务
println!("remove task");
self.queue.borrow_mut().pop_front()
}
}
// 任务 / 协程 本身
pub struct Task {
// 所有权的缘故 用了 RefCell,不可变引用情况下
// 不知道具体的类型,只是实现了 future trait,得用 Box 包装 -> LocalBoxFuture
// 整个的生命周期干脆 static 反正协程活的比其他长
future: RefCell<LocalBoxFuture<'static, ()>>,
}
fn waker(wake: Rc<Task>) -> Waker {
let ptr = Rc::into_raw(wake) as *const ();
let vtable = &Helper::VTABLE;
unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) }
}
impl Task {
fn wake_(self: Rc<Self>) {
// 唤醒
Self::wake_by_ref_(&self)
}
fn wake_by_ref_(self: &Rc<Self>) {
// 唤醒
EX.with(|ex: &Executor| ex.local_queue.push(self.clone())); // 添加到任务队列
}
}
struct Helper;
impl Helper {
// RawWakerVTable 原始唤醒器的虚函数表
const VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::clone_waker,
Self::wake,
Self::wake_by_ref,
Self::drop_waker,
);
// unsafe 要非常小心
unsafe fn clone_waker(data: *const ()) -> RawWaker {
// 克隆 waker 指针
increase_refcount(data); // 增加计数,虽然下面只是增加了一个引用
let vtable = &Self::VTABLE; // VTABLE 的引用
RawWaker::new(data, vtable) // 原始唤醒器 的 实例
}
unsafe fn wake(ptr: *const ()) {
let rc = Rc::from_raw(ptr as *const Task);
rc.wake_();
}
unsafe fn wake_by_ref(ptr: *const ()) {
let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
rc.wake_by_ref_();
}
unsafe fn drop_waker(ptr: *const ()) {
drop(Rc::from_raw(ptr as *const Task));
}
}
#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant.
unsafe fn increase_refcount(data: *const ()) {
// Retain Rc, but don't touch refcount by wrapping in ManuallyDrop
let rc = mem::ManuallyDrop::new(Rc::<Task>::from_raw(data as *const Task));
// Now increase refcount, but don't drop new refcount either
let _rc_clone: mem::ManuallyDrop<_> = rc.clone();
}
|