summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2022-10-31 17:03:10 +0800
committerluwenpeng <[email protected]>2022-11-03 14:30:58 +0800
commit9db73f57c452aa05da55211fd30af568a57857fc (patch)
tree237c6b2adaad64da7733638c7fd2ab0c9d88b7f2
parent54d9885220d2e0cd0167f6cbb10c7b0d9e762df2 (diff)
[delete] example/echo.rsHEADlwp-self-study
[update] src/lib.rs -- 增加调试日志 [update] src/main.rs -- 增加调试日志 [update] src/tcp.rs i -- 增加调试日志 [update] src/reactor.rs -- 增加调试日志 [update] src/executor.rs -- 增加调试日志
-rw-r--r--examples/echo.rs37
-rw-r--r--src/executor.rs242
-rw-r--r--src/lib.rs5
-rw-r--r--src/main.rs51
-rw-r--r--src/reactor.rs127
-rw-r--r--src/tcp.rs175
6 files changed, 429 insertions, 208 deletions
diff --git a/examples/echo.rs b/examples/echo.rs
deleted file mode 100644
index 644627a..0000000
--- a/examples/echo.rs
+++ /dev/null
@@ -1,37 +0,0 @@
-//! Echo example.
-//! Use `nc 127.0.0.1 30000` to connect.
-
-use futures::StreamExt;
-use mini_rust_runtime::executor::Executor;
-use mini_rust_runtime::tcp::TcpListener;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-
-fn main() {
- let ex = Executor::new();
- ex.block_on(serve);
-}
-
-async fn serve() {
- let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap();
- while let Some(ret) = listener.next().await {
- if let Ok((mut stream, addr)) = ret {
- println!("accept a new connection from {} successfully", addr);
- let f = async move {
- let mut buf = [0; 4096];
- loop {
- match stream.read(&mut buf).await {
- Ok(n) => {
- if n == 0 || stream.write_all(&buf[..n]).await.is_err() {
- return;
- }
- }
- Err(_) => {
- return;
- }
- }
- }
- };
- Executor::spawn(f);
- }
- }
-}
diff --git a/src/executor.rs b/src/executor.rs
index 42cd00f..51f58fa 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -1,24 +1,93 @@
-use std::{
- cell::RefCell,
- collections::VecDeque,
- marker::PhantomData,
- mem,
- rc::Rc,
- task::{RawWaker, RawWakerVTable, Waker, Context}, pin::Pin,
-};
-
-use futures::{future::LocalBoxFuture, Future, FutureExt};
+// luwenpeng 2022/11/01
use crate::reactor::Reactor;
+use futures::future::LocalBoxFuture;
+use futures::Future;
+use futures::FutureExt;
+use std::cell::RefCell;
+use std::collections::VecDeque;
+use std::marker::PhantomData;
+use std::mem;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::RawWaker;
+use std::task::RawWakerVTable;
+use std::task::Waker;
+
+/******************************************************************************
+ * thread_local 的静态变量 THREAD_LOCAL_EXECUTOR
+ ******************************************************************************/
+
+scoped_tls::scoped_thread_local!(pub(crate) static THREAD_LOCAL_EXECUTOR: Executor);
+
+/******************************************************************************
+ * Task
+ ******************************************************************************/
+
+pub struct Task {
+ future: RefCell<LocalBoxFuture<'static, ()>>,
+}
+
+impl Task {
+ fn enqueue(self: Rc<Self>) {
+ Self::enqueue_by_ref(&self)
+ }
+
+ fn enqueue_by_ref(self: &Rc<Self>) {
+ THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(self.clone()));
+ }
+}
+
+/******************************************************************************
+ * TaskQueue
+ ******************************************************************************/
-scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor);
+pub struct TaskQueue {
+ queue: RefCell<VecDeque<Rc<Task>>>,
+}
+
+impl Default for TaskQueue {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl TaskQueue {
+ pub fn new() -> Self {
+ const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
+ Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
+ }
+
+ pub fn new_with_capacity(capacity: usize) -> Self {
+ Self {
+ queue: RefCell::new(VecDeque::with_capacity(capacity)),
+ }
+ }
+
+ pub(crate) fn push(&self, runnable: Rc<Task>) {
+ println!("[task_queue] push task");
+
+ self.queue.borrow_mut().push_back(runnable);
+ }
+
+ pub(crate) fn pop(&self) -> Option<Rc<Task>> {
+ println!("[task_queue] pop task");
+
+ self.queue.borrow_mut().pop_front()
+ }
+}
+
+/******************************************************************************
+ * Executor
+ ******************************************************************************/
pub struct Executor {
- local_queue: TaskQueue,
- pub(crate) reactor: Rc<RefCell<Reactor>>,
+ local_queue: TaskQueue, // 任务队列
+ pub(crate) reactor: Rc<RefCell<Reactor>>, // Reactor
- /// Make sure the type is `!Send` and `!Sync`.
- _marker: PhantomData<Rc<()>>,
+ // Make sure the type is `!Send` and `!Sync`.
+ marker: PhantomData<Rc<()>>,
}
impl Default for Executor {
@@ -27,22 +96,22 @@ impl Default for Executor {
}
}
-
impl Executor {
pub fn new() -> Self {
Self {
local_queue: TaskQueue::default(),
reactor: Rc::new(RefCell::new(Reactor::default())),
-
- _marker: PhantomData,
+ marker: PhantomData,
}
}
- pub fn spawn(fut: impl Future<Output = ()> + 'static) {
- let t = Rc::new(Task {
- future: RefCell::new(fut.boxed_local()),
+ pub fn spawn(future: impl Future<Output = ()> + 'static) {
+ println!("[executor] spawn, wrap future to task, and push task to queue");
+
+ let task = Rc::new(Task {
+ future: RefCell::new(future.boxed_local()),
});
- EX.with(|ex| ex.local_queue.push(t));
+ THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(task));
}
pub fn block_on<F, T, O>(&self, f: F) -> O
@@ -50,89 +119,87 @@ impl Executor {
F: Fn() -> T,
T: Future<Output = O> + 'static,
{
+ println!("[executor] block_on");
+
let _waker = waker_fn::waker_fn(|| {});
- let cx = &mut Context::from_waker(&_waker);
+ let ctx = &mut Context::from_waker(&_waker);
+
+ THREAD_LOCAL_EXECUTOR.set(self, || {
+ // 此处的 future 为 async fn tcp_server()
+ let future = f();
+ pin_utils::pin_mut!(future);
- EX.set(self, || {
- let fut = f();
- pin_utils::pin_mut!(fut);
loop {
+ println!("[executor] -> loop");
+
// return if the outer future is ready
- if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
- break t;
+ if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) {
+ println!("[executor] future poll(1), return ready");
+ break task;
}
+ println!("[executor] consume all tasks");
+
// consume all tasks
- while let Some(t) = self.local_queue.pop() {
- let future = t.future.borrow_mut();
- let w = waker(t.clone());
- let mut context = Context::from_waker(&w);
+ while let Some(task) = self.local_queue.pop() {
+ println!("[executor] pop task frome queue");
+
+ let future = task.future.borrow_mut();
+
+ let waker = wrap_task_to_waker(task.clone()); // 此处将 task 包装成 Waker
+ let mut context = Context::from_waker(&waker); // 此处将 Waker 包装成 Context
+
let _ = Pin::new(future).as_mut().poll(&mut context);
+ println!("[executor] future poll(2), return");
+
+ // 此处默认执行:[RawWaker] drop_waker()
}
// no task to execute now, it may ready
- if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
- break t;
+ if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) {
+ println!("[executor] future poll(3), return ready");
+ break task;
}
// block for io
+ println!("[executor] reactor->wait()");
self.reactor.borrow_mut().wait();
}
})
}
}
-pub struct TaskQueue {
- queue: RefCell<VecDeque<Rc<Task>>>,
-}
+/******************************************************************************
+ * waker
+ ******************************************************************************/
-impl Default for TaskQueue {
- fn default() -> Self {
- Self::new()
- }
+/*
+pub struct Context<'a> {
+ waker: &'a Waker,
+ _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}
-impl TaskQueue {
- pub fn new() -> Self {
- const DEFAULT_TASK_QUEUE_SIZE: usize = 4096;
- Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE)
- }
- pub fn new_with_capacity(capacity: usize) -> Self {
- Self {
- queue: RefCell::new(VecDeque::with_capacity(capacity)),
- }
- }
-
- pub(crate) fn push(&self, runnable: Rc<Task>) {
- println!("add task");
- self.queue.borrow_mut().push_back(runnable);
- }
-
- pub(crate) fn pop(&self) -> Option<Rc<Task>> {
- println!("remove task");
- self.queue.borrow_mut().pop_front()
- }
+pub struct Waker {
+ waker: RawWaker,
}
-pub struct Task {
- future: RefCell<LocalBoxFuture<'static, ()>>,
+pub struct RawWaker {
+ data: *const (),
+ vtable: &'static RawWakerVTable,
}
+*/
+
+fn wrap_task_to_waker(task: Rc<Task>) -> Waker {
+ println!("[executor] ->wrap_task_to_waker(), wrap task to Waker");
-fn waker(wake: Rc<Task>) -> Waker {
- let ptr = Rc::into_raw(wake) as *const ();
- let vtable = &Helper::VTABLE;
+ let ptr = Rc::into_raw(task) as *const ();
+ let vtable = &Helper::VTABLE; // VTABLE 为 const 定义的 RawWakerVTable
unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) }
}
-impl Task {
- fn wake_(self: Rc<Self>) {
- Self::wake_by_ref_(&self)
- }
-
- fn wake_by_ref_(self: &Rc<Self>) {
- EX.with(|ex| ex.local_queue.push(self.clone()));
- }
-}
+/******************************************************************************
+ * Helper
+ ******************************************************************************/
struct Helper;
@@ -144,23 +211,34 @@ impl Helper {
Self::drop_waker,
);
- unsafe fn clone_waker(data: *const ()) -> RawWaker {
- increase_refcount(data);
+ // 将 task 封装成 RawWaker
+ unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+ println!("[RawWaker] ->clone_waker(), wrap task to RawWaker");
+
+ increase_refcount(ptr);
let vtable = &Self::VTABLE;
- RawWaker::new(data, vtable)
+ RawWaker::new(ptr, vtable)
}
+ // 将 task 添加到任务队列中
unsafe fn wake(ptr: *const ()) {
- let rc = Rc::from_raw(ptr as *const Task);
- rc.wake_();
+ println!("[RawWaker] ->wake(), add task to queue");
+
+ let task = Rc::from_raw(ptr as *const Task);
+ task.enqueue();
}
+ // 将 task 添加到任务队列中
unsafe fn wake_by_ref(ptr: *const ()) {
- let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
- rc.wake_by_ref_();
+ println!("[RawWaker] wake_by_ref(), add task to queue");
+
+ let task = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
+ task.enqueue_by_ref();
}
unsafe fn drop_waker(ptr: *const ()) {
+ println!("[RawWaker] ->drop_waker(), drop RawWaker");
+
drop(Rc::from_raw(ptr as *const Task));
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 6ebc7f8..85658d8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,6 +1,7 @@
+// luwenpeng 2022/11/01
+
#![allow(unused)]
pub mod executor;
-pub mod tcp;
-
mod reactor;
+pub mod tcp;
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..ad85a80
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,51 @@
+// luwenpeng 2022/11/01
+
+use futures::StreamExt;
+use mini_rust_runtime::executor::Executor;
+use mini_rust_runtime::tcp::TcpListener;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWriteExt;
+
+fn main() {
+ let excutor = Executor::new();
+
+ excutor.block_on(tcp_server);
+}
+
+async fn tcp_server() {
+ let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap();
+ println!("[tcp_server] listen on: 127.0.0.1:30000");
+
+ // NOTE:listener.next() 其实是调用的 poll_next()
+ while let Some(ret) = listener.next().await {
+ if let Ok((mut stream, addr)) = ret {
+ println!("[tcp_server] accept new connection: {}", addr);
+
+ // 此处只是注册,并不是此时执行
+ let future = async move {
+ let mut buf = [0; 4096];
+ loop {
+ match stream.read(&mut buf).await {
+ Ok(n) => {
+ println!("[tcp_server] stream {} read: {}", addr, n);
+
+ if n == 0 || stream.write_all(&buf[..n]).await.is_err() {
+ println!("[tcp_server] stream {} write: {}", addr, n);
+ return;
+ }
+ }
+ Err(e) => {
+ println!("[tcp_server] stream {} read: {}", addr, e);
+ return;
+ }
+ }
+ }
+ };
+
+ // 将 future 封装成 task push 到任务队列中
+ println!("[tcp_server] -> befor Executor::spawn()");
+ Executor::spawn(future);
+ println!("[tcp_server] -> after Executor::spawn()");
+ }
+ }
+}
diff --git a/src/reactor.rs b/src/reactor.rs
index 51133ed..ba064ee 100644
--- a/src/reactor.rs
+++ b/src/reactor.rs
@@ -1,113 +1,144 @@
-use std::{
- cell::RefCell,
- os::unix::prelude::{AsRawFd, RawFd},
- rc::Rc,
- task::{Context, Waker},
-};
-
-use polling::{Event, Poller};
-
+// luwenpeng 2022/11/01
+
+use polling::Event;
+use polling::Poller;
+use std::cell::RefCell;
+use std::os::unix::prelude::AsRawFd;
+use std::os::unix::prelude::RawFd;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Waker;
+use std::thread::sleep;
+
+/*
+ * 通过 executor 获取 reactor
+ *
+ * 单线程中
+ * Rc<T> : (多个所有者可读)
+ * RefCell<T> : (一个所有者可写)
+ * Rc<RefCell<T>> : (多个所有者可写)
+ */
#[inline]
pub(crate) fn get_reactor() -> Rc<RefCell<Reactor>> {
- crate::executor::EX.with(|ex| ex.reactor.clone())
+ crate::executor::THREAD_LOCAL_EXECUTOR.with(|executor| executor.reactor.clone())
}
+/******************************************************************************
+ * Reactor
+ *
+ * fun:
+ *
+ * reactor.new(); 创建 reactor
+ * reactor.add(); 将 fd 添加到 reactor 的 epoll 中,但未设置读写事件
+ * reactor.del(); 将 fd 从
+ * reactor.mod_read();
+ * reactor.mod_write();
+ * reactor.wait();
+ ******************************************************************************/
+
#[derive(Debug)]
pub struct Reactor {
- poller: Poller,
- waker_mapping: rustc_hash::FxHashMap<u64, Waker>,
-
- buffer: Vec<Event>,
+ poller: Poller, // poll
+ waker_mapping: rustc_hash::FxHashMap<u64, Waker>, // sockfd 与 waker 的映射关系
+ buffer: Vec<Event>, // Event 的就绪队列
}
impl Reactor {
pub fn new() -> Self {
+ println!("[reactor] new");
+
Self {
poller: Poller::new().unwrap(),
waker_mapping: Default::default(),
-
buffer: Vec::with_capacity(2048),
}
}
- // Epoll related
pub fn add(&mut self, fd: RawFd) {
- println!("[reactor] add fd {}", fd);
+ println!("[reactor] add fd: {}, event: none", fd);
+ // 将 fd 设置为 nonblocking
let flags =
nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap())
.unwrap();
let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK;
nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap();
+
+ // 将 fd 添加到 poller 中
self.poller
.add(fd, polling::Event::none(fd as usize))
.unwrap();
}
- pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) {
- println!("[reactor] modify_readable fd {} token {}", fd, fd * 2);
+ pub fn del(&mut self, fd: RawFd) {
+ println!("[reactor] del fd: {}, event: all", fd,);
+
+ // 将 fd 读写事件的 ctx 从 map 中删除
+ self.waker_mapping.remove(&(fd as u64 * 2));
+ self.waker_mapping.remove(&(fd as u64 * 2 + 1));
+
+ // 将 fd 从 poller 中移除
+ self.poller.delete(fd).unwrap();
+ }
+
+ pub fn mod_read(&mut self, fd: RawFd, ctx: &mut Context) {
+ println!("[reactor] mod fd: {}, event: read", fd,);
+
+ // 将 fd 可读事件的 ctx 存储到 map 中
+ self.waker_mapping
+ .insert(fd as u64 * 2, ctx.waker().clone());
- self.push_completion(fd as u64 * 2, cx);
+ // fd 注册可读事件
let event = polling::Event::readable(fd as usize);
self.poller.modify(fd, event);
}
- pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) {
- println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1);
+ pub fn mod_write(&mut self, fd: RawFd, ctx: &mut Context) {
+ println!("[reactor] mod fd: {}, event: write", fd,);
- self.push_completion(fd as u64 * 2 + 1, cx);
+ // 将 fd 可写事件的 ctx 存储到 map 中
+ self.waker_mapping
+ .insert(fd as u64 * 2 + 1, ctx.waker().clone());
+
+ // fd 注册可写事件
let event = polling::Event::writable(fd as usize);
self.poller.modify(fd, event);
}
pub fn wait(&mut self) {
- println!("[reactor] waiting");
+ println!("[reactor] waiting ...");
+
+ // TODO 增加定时器 timeout
self.poller.wait(&mut self.buffer, None);
- println!("[reactor] wait done");
+ println!("[reactor] {} events is ready", self.buffer.len());
for i in 0..self.buffer.len() {
let event = self.buffer.swap_remove(0);
+
+ // TODO 是否会出现 readable && writable 的 event
+
+ // 优先处理读事件
if event.readable {
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) {
println!(
- "[reactor token] fd {} read waker token {} removed and woken",
+ "[reactor] wake fd: {}, event: read, action: run waker.wake()",
event.key,
- event.key * 2
);
waker.wake();
}
}
+
if event.writable {
if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) {
println!(
- "[reactor token] fd {} write waker token {} removed and woken",
+ "[reactor] wake fd: {}, event: write, action: run waker.wake()",
event.key,
- event.key * 2 + 1
);
waker.wake();
}
}
}
}
-
- pub fn delete(&mut self, fd: RawFd) {
- println!("[reactor] delete fd {}", fd);
-
- self.waker_mapping.remove(&(fd as u64 * 2));
- self.waker_mapping.remove(&(fd as u64 * 2 + 1));
- println!(
- "[reactor token] fd {} wakers token {}, {} removed",
- fd,
- fd * 2,
- fd * 2 + 1
- );
- }
-
- fn push_completion(&mut self, token: u64, cx: &mut Context) {
- println!("[reactor token] token {} waker saved", token);
-
- self.waker_mapping.insert(token, cx.waker().clone());
- }
}
impl Default for Reactor {
diff --git a/src/tcp.rs b/src/tcp.rs
index 46ef06a..8d1f3c5 100644
--- a/src/tcp.rs
+++ b/src/tcp.rs
@@ -1,16 +1,29 @@
-use std::{
- cell::RefCell,
- io::{self, Read, Write},
- net::{SocketAddr, TcpListener as StdTcpListener, TcpStream as StdTcpStream, ToSocketAddrs},
- os::unix::prelude::AsRawFd,
- rc::{Rc, Weak},
- task::Poll,
-};
+// luwenpeng 2022/11/01
+use crate::reactor::get_reactor;
+use crate::reactor::Reactor;
use futures::Stream;
-use socket2::{Domain, Protocol, Socket, Type};
+use socket2::Domain;
+use socket2::Protocol;
+use socket2::Socket;
+use socket2::Type;
+use std::cell::RefCell;
+use std::io;
+use std::io::Read;
+use std::io::Write;
+use std::net::SocketAddr;
+use std::net::TcpListener as StdTcpListener;
+use std::net::TcpStream as StdTcpStream;
+use std::net::ToSocketAddrs;
+use std::os::unix::prelude::AsRawFd;
+use std::rc::Rc;
+use std::rc::Weak;
+use std::task::Poll;
+use std::thread::sleep;
-use crate::{reactor::get_reactor, reactor::Reactor};
+/******************************************************************************
+ * TcpListener
+ ******************************************************************************/
#[derive(Debug)]
pub struct TcpListener {
@@ -20,6 +33,7 @@ pub struct TcpListener {
impl TcpListener {
pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
+ // 设置 addr
let addr = addr
.to_socket_addrs()?
.next()
@@ -30,20 +44,32 @@ impl TcpListener {
} else {
Domain::IPV4
};
- let sk = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
+
let addr = socket2::SockAddr::from(addr);
- sk.set_reuse_address(true)?;
- sk.bind(&addr)?;
- sk.listen(1024)?;
- // add fd to reactor
+ // 创建 listenfd
+ let listenfd = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
+
+ // 设置 reuse addr
+ listenfd.set_reuse_address(true)?;
+
+ // bind
+ listenfd.bind(&addr)?;
+
+ // listen
+ listenfd.listen(1024)?;
+
+ // 通过 executor 获取 reactor
let reactor = get_reactor();
- reactor.borrow_mut().add(sk.as_raw_fd());
- println!("tcp bind with fd {}", sk.as_raw_fd());
+ // 将 listenfd 添加到 reactor 的 epoll 中,此时未设置 readable or writable 事件
+ reactor.borrow_mut().add(listenfd.as_raw_fd());
+
+ println!("[tcp_listener] create listenfd: {}", listenfd.as_raw_fd(),);
+
Ok(Self {
reactor: Rc::downgrade(&reactor),
- listener: sk.into(),
+ listener: listenfd.into(),
})
}
}
@@ -56,20 +82,46 @@ impl Stream for TcpListener {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.listener.accept() {
- Ok((stream, addr)) => Poll::Ready(Some(Ok((stream.into(), addr)))),
+ // listenfd 上有新的连接到来
+ Ok((stream, addr)) => {
+ println!(
+ "[tcp_listener] Stream->poll_next(): accept new connection: {}",
+ addr
+ );
+
+ // 将 listenfd 注册到 reactor 的 epoll 中,期待可读事情
+ let reactor = self.reactor.upgrade().unwrap();
+ reactor.borrow_mut().mod_read(self.listener.as_raw_fd(), cx);
+
+ // NOTE: stream.into() 时会调用 TcpStream.from() 将 streamfd 注册到 reactor 中
+ Poll::Ready(Some(Ok((stream.into(), addr))))
+ }
+
+ // listenfd 返回 wouldblock 错误
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- // modify reactor to register interest
+ println!("[tcp_listener] Stream->poll_next(): accept error: would block");
+
+ // 将 listenfd 注册到 reactor 的 epoll 中,期待可读事情
let reactor = self.reactor.upgrade().unwrap();
- reactor
- .borrow_mut()
- .modify_readable(self.listener.as_raw_fd(), cx);
+ reactor.borrow_mut().mod_read(self.listener.as_raw_fd(), cx);
+
Poll::Pending
}
- Err(e) => std::task::Poll::Ready(Some(Err(e))),
+
+ // listenfd 返回其他错误
+ Err(e) => {
+ println!("[tcp_listener] Stream->poll_next(): accept error: {}", e);
+
+ Poll::Ready(Some(Err(e)))
+ }
}
}
}
+/******************************************************************************
+ * TcpStream
+ ******************************************************************************/
+
#[derive(Debug)]
pub struct TcpStream {
stream: StdTcpStream,
@@ -77,6 +129,11 @@ pub struct TcpStream {
impl From<StdTcpStream> for TcpStream {
fn from(stream: StdTcpStream) -> Self {
+ println!(
+ "[tcp_stream] From->from(): add streamfd {} to reactor",
+ stream.as_raw_fd()
+ );
+
let reactor = get_reactor();
reactor.borrow_mut().add(stream.as_raw_fd());
Self { stream }
@@ -85,9 +142,13 @@ impl From<StdTcpStream> for TcpStream {
impl Drop for TcpStream {
fn drop(&mut self) {
- println!("drop");
+ println!(
+ "[tcp_stream] Drop->drop(): del streamfd {} from reactor",
+ self.stream.as_raw_fd()
+ );
+
let reactor = get_reactor();
- reactor.borrow_mut().delete(self.stream.as_raw_fd());
+ reactor.borrow_mut().del(self.stream.as_raw_fd());
}
}
@@ -98,27 +159,32 @@ impl tokio::io::AsyncRead for TcpStream {
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let fd = self.stream.as_raw_fd();
+
unsafe {
let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
- println!("read for fd {}", fd);
match self.stream.read(b) {
Ok(n) => {
- println!("read for fd {} done, {}", fd, n);
+ println!("[tcp_stream] AsyncRead->poll_read(): fd {}, len: {}", fd, n);
+
buf.assume_init(n);
buf.advance(n);
Poll::Ready(Ok(()))
}
+
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
- println!("read for fd {} done WouldBlock", fd);
- // modify reactor to register interest
+ println!(
+ "[tcp_stream] AsyncRead->poll_read(): fd {}, err: would block",
+ fd
+ );
+
let reactor = get_reactor();
- reactor
- .borrow_mut()
- .modify_readable(self.stream.as_raw_fd(), cx);
+ reactor.borrow_mut().mod_read(self.stream.as_raw_fd(), cx);
Poll::Pending
}
+
Err(e) => {
- println!("read for fd {} done err", fd);
+ println!("[tcp_stream] AsyncRead->poll_read(): fd {}, err: {}", fd, e);
+
Poll::Ready(Err(e))
}
}
@@ -133,15 +199,36 @@ impl tokio::io::AsyncWrite for TcpStream {
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match self.stream.write(buf) {
- Ok(n) => Poll::Ready(Ok(n)),
+ Ok(n) => {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_write(): fd {}, len: {}",
+ self.stream.as_raw_fd(),
+ n
+ );
+
+ Poll::Ready(Ok(n))
+ }
+
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_write(): fd {}, err: would block",
+ self.stream.as_raw_fd()
+ );
+
let reactor = get_reactor();
- reactor
- .borrow_mut()
- .modify_writable(self.stream.as_raw_fd(), cx);
+ reactor.borrow_mut().mod_write(self.stream.as_raw_fd(), cx);
Poll::Pending
}
- Err(e) => Poll::Ready(Err(e)),
+
+ Err(e) => {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_write(): fd {}, err: {}",
+ self.stream.as_raw_fd(),
+ e
+ );
+
+ Poll::Ready(Err(e))
+ }
}
}
@@ -149,6 +236,11 @@ impl tokio::io::AsyncWrite for TcpStream {
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_flush(): fd {}",
+ self.stream.as_raw_fd()
+ );
+
Poll::Ready(Ok(()))
}
@@ -156,6 +248,11 @@ impl tokio::io::AsyncWrite for TcpStream {
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_shutdown(): fd {}",
+ self.stream.as_raw_fd()
+ );
+
self.stream.shutdown(std::net::Shutdown::Write)?;
Poll::Ready(Ok(()))
}