From 9db73f57c452aa05da55211fd30af568a57857fc Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Mon, 31 Oct 2022 17:03:10 +0800 Subject: [delete] example/echo.rs [update] src/lib.rs -- 增加调试日志 [update] src/main.rs -- 增加调试日志 [update] src/tcp.rs i -- 增加调试日志 [update] src/reactor.rs -- 增加调试日志 [update] src/executor.rs -- 增加调试日志 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/echo.rs | 37 --------- src/executor.rs | 242 ++++++++++++++++++++++++++++++++++++------------------- src/lib.rs | 5 +- src/main.rs | 51 ++++++++++++ src/reactor.rs | 127 ++++++++++++++++++----------- src/tcp.rs | 175 +++++++++++++++++++++++++++++++--------- 6 files changed, 429 insertions(+), 208 deletions(-) delete mode 100644 examples/echo.rs create mode 100644 src/main.rs diff --git a/examples/echo.rs b/examples/echo.rs deleted file mode 100644 index 644627a..0000000 --- a/examples/echo.rs +++ /dev/null @@ -1,37 +0,0 @@ -//! Echo example. -//! Use `nc 127.0.0.1 30000` to connect. - -use futures::StreamExt; -use mini_rust_runtime::executor::Executor; -use mini_rust_runtime::tcp::TcpListener; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -fn main() { - let ex = Executor::new(); - ex.block_on(serve); -} - -async fn serve() { - let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap(); - while let Some(ret) = listener.next().await { - if let Ok((mut stream, addr)) = ret { - println!("accept a new connection from {} successfully", addr); - let f = async move { - let mut buf = [0; 4096]; - loop { - match stream.read(&mut buf).await { - Ok(n) => { - if n == 0 || stream.write_all(&buf[..n]).await.is_err() { - return; - } - } - Err(_) => { - return; - } - } - } - }; - Executor::spawn(f); - } - } -} diff --git a/src/executor.rs b/src/executor.rs index 42cd00f..51f58fa 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,24 +1,93 @@ -use std::{ - cell::RefCell, - collections::VecDeque, - marker::PhantomData, - mem, - rc::Rc, - task::{RawWaker, RawWakerVTable, Waker, Context}, pin::Pin, -}; - -use futures::{future::LocalBoxFuture, Future, FutureExt}; +// luwenpeng 2022/11/01 use crate::reactor::Reactor; +use futures::future::LocalBoxFuture; +use futures::Future; +use futures::FutureExt; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::RawWaker; +use std::task::RawWakerVTable; +use std::task::Waker; + +/****************************************************************************** + * thread_local 的静态变量 THREAD_LOCAL_EXECUTOR + ******************************************************************************/ + +scoped_tls::scoped_thread_local!(pub(crate) static THREAD_LOCAL_EXECUTOR: Executor); + +/****************************************************************************** + * Task + ******************************************************************************/ + +pub struct Task { + future: RefCell>, +} + +impl Task { + fn enqueue(self: Rc) { + Self::enqueue_by_ref(&self) + } + + fn enqueue_by_ref(self: &Rc) { + THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(self.clone())); + } +} + +/****************************************************************************** + * TaskQueue + ******************************************************************************/ -scoped_tls::scoped_thread_local!(pub(crate) static EX: Executor); +pub struct TaskQueue { + queue: RefCell>>, +} + +impl Default for TaskQueue { + fn default() -> Self { + Self::new() + } +} + +impl TaskQueue { + pub fn new() -> Self { + const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; + Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) + } + + pub fn new_with_capacity(capacity: usize) -> Self { + Self { + queue: RefCell::new(VecDeque::with_capacity(capacity)), + } + } + + pub(crate) fn push(&self, runnable: Rc) { + println!("[task_queue] push task"); + + self.queue.borrow_mut().push_back(runnable); + } + + pub(crate) fn pop(&self) -> Option> { + println!("[task_queue] pop task"); + + self.queue.borrow_mut().pop_front() + } +} + +/****************************************************************************** + * Executor + ******************************************************************************/ pub struct Executor { - local_queue: TaskQueue, - pub(crate) reactor: Rc>, + local_queue: TaskQueue, // 任务队列 + pub(crate) reactor: Rc>, // Reactor - /// Make sure the type is `!Send` and `!Sync`. - _marker: PhantomData>, + // Make sure the type is `!Send` and `!Sync`. + marker: PhantomData>, } impl Default for Executor { @@ -27,22 +96,22 @@ impl Default for Executor { } } - impl Executor { pub fn new() -> Self { Self { local_queue: TaskQueue::default(), reactor: Rc::new(RefCell::new(Reactor::default())), - - _marker: PhantomData, + marker: PhantomData, } } - pub fn spawn(fut: impl Future + 'static) { - let t = Rc::new(Task { - future: RefCell::new(fut.boxed_local()), + pub fn spawn(future: impl Future + 'static) { + println!("[executor] spawn, wrap future to task, and push task to queue"); + + let task = Rc::new(Task { + future: RefCell::new(future.boxed_local()), }); - EX.with(|ex| ex.local_queue.push(t)); + THREAD_LOCAL_EXECUTOR.with(|executor| executor.local_queue.push(task)); } pub fn block_on(&self, f: F) -> O @@ -50,89 +119,87 @@ impl Executor { F: Fn() -> T, T: Future + 'static, { + println!("[executor] block_on"); + let _waker = waker_fn::waker_fn(|| {}); - let cx = &mut Context::from_waker(&_waker); + let ctx = &mut Context::from_waker(&_waker); + + THREAD_LOCAL_EXECUTOR.set(self, || { + // 此处的 future 为 async fn tcp_server() + let future = f(); + pin_utils::pin_mut!(future); - EX.set(self, || { - let fut = f(); - pin_utils::pin_mut!(fut); loop { + println!("[executor] -> loop"); + // return if the outer future is ready - if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { - break t; + if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) { + println!("[executor] future poll(1), return ready"); + break task; } + println!("[executor] consume all tasks"); + // consume all tasks - while let Some(t) = self.local_queue.pop() { - let future = t.future.borrow_mut(); - let w = waker(t.clone()); - let mut context = Context::from_waker(&w); + while let Some(task) = self.local_queue.pop() { + println!("[executor] pop task frome queue"); + + let future = task.future.borrow_mut(); + + let waker = wrap_task_to_waker(task.clone()); // 此处将 task 包装成 Waker + let mut context = Context::from_waker(&waker); // 此处将 Waker 包装成 Context + let _ = Pin::new(future).as_mut().poll(&mut context); + println!("[executor] future poll(2), return"); + + // 此处默认执行:[RawWaker] drop_waker() } // no task to execute now, it may ready - if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) { - break t; + if let std::task::Poll::Ready(task) = future.as_mut().poll(ctx) { + println!("[executor] future poll(3), return ready"); + break task; } // block for io + println!("[executor] reactor->wait()"); self.reactor.borrow_mut().wait(); } }) } } -pub struct TaskQueue { - queue: RefCell>>, -} +/****************************************************************************** + * waker + ******************************************************************************/ -impl Default for TaskQueue { - fn default() -> Self { - Self::new() - } +/* +pub struct Context<'a> { + waker: &'a Waker, + _marker: PhantomData &'a ()>, } -impl TaskQueue { - pub fn new() -> Self { - const DEFAULT_TASK_QUEUE_SIZE: usize = 4096; - Self::new_with_capacity(DEFAULT_TASK_QUEUE_SIZE) - } - pub fn new_with_capacity(capacity: usize) -> Self { - Self { - queue: RefCell::new(VecDeque::with_capacity(capacity)), - } - } - - pub(crate) fn push(&self, runnable: Rc) { - println!("add task"); - self.queue.borrow_mut().push_back(runnable); - } - - pub(crate) fn pop(&self) -> Option> { - println!("remove task"); - self.queue.borrow_mut().pop_front() - } +pub struct Waker { + waker: RawWaker, } -pub struct Task { - future: RefCell>, +pub struct RawWaker { + data: *const (), + vtable: &'static RawWakerVTable, } +*/ + +fn wrap_task_to_waker(task: Rc) -> Waker { + println!("[executor] ->wrap_task_to_waker(), wrap task to Waker"); -fn waker(wake: Rc) -> Waker { - let ptr = Rc::into_raw(wake) as *const (); - let vtable = &Helper::VTABLE; + let ptr = Rc::into_raw(task) as *const (); + let vtable = &Helper::VTABLE; // VTABLE 为 const 定义的 RawWakerVTable unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } } -impl Task { - fn wake_(self: Rc) { - Self::wake_by_ref_(&self) - } - - fn wake_by_ref_(self: &Rc) { - EX.with(|ex| ex.local_queue.push(self.clone())); - } -} +/****************************************************************************** + * Helper + ******************************************************************************/ struct Helper; @@ -144,23 +211,34 @@ impl Helper { Self::drop_waker, ); - unsafe fn clone_waker(data: *const ()) -> RawWaker { - increase_refcount(data); + // 将 task 封装成 RawWaker + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + println!("[RawWaker] ->clone_waker(), wrap task to RawWaker"); + + increase_refcount(ptr); let vtable = &Self::VTABLE; - RawWaker::new(data, vtable) + RawWaker::new(ptr, vtable) } + // 将 task 添加到任务队列中 unsafe fn wake(ptr: *const ()) { - let rc = Rc::from_raw(ptr as *const Task); - rc.wake_(); + println!("[RawWaker] ->wake(), add task to queue"); + + let task = Rc::from_raw(ptr as *const Task); + task.enqueue(); } + // 将 task 添加到任务队列中 unsafe fn wake_by_ref(ptr: *const ()) { - let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); - rc.wake_by_ref_(); + println!("[RawWaker] wake_by_ref(), add task to queue"); + + let task = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); + task.enqueue_by_ref(); } unsafe fn drop_waker(ptr: *const ()) { + println!("[RawWaker] ->drop_waker(), drop RawWaker"); + drop(Rc::from_raw(ptr as *const Task)); } } diff --git a/src/lib.rs b/src/lib.rs index 6ebc7f8..85658d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ +// luwenpeng 2022/11/01 + #![allow(unused)] pub mod executor; -pub mod tcp; - mod reactor; +pub mod tcp; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ad85a80 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,51 @@ +// luwenpeng 2022/11/01 + +use futures::StreamExt; +use mini_rust_runtime::executor::Executor; +use mini_rust_runtime::tcp::TcpListener; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; + +fn main() { + let excutor = Executor::new(); + + excutor.block_on(tcp_server); +} + +async fn tcp_server() { + let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap(); + println!("[tcp_server] listen on: 127.0.0.1:30000"); + + // NOTE:listener.next() 其实是调用的 poll_next() + while let Some(ret) = listener.next().await { + if let Ok((mut stream, addr)) = ret { + println!("[tcp_server] accept new connection: {}", addr); + + // 此处只是注册,并不是此时执行 + let future = async move { + let mut buf = [0; 4096]; + loop { + match stream.read(&mut buf).await { + Ok(n) => { + println!("[tcp_server] stream {} read: {}", addr, n); + + if n == 0 || stream.write_all(&buf[..n]).await.is_err() { + println!("[tcp_server] stream {} write: {}", addr, n); + return; + } + } + Err(e) => { + println!("[tcp_server] stream {} read: {}", addr, e); + return; + } + } + } + }; + + // 将 future 封装成 task push 到任务队列中 + println!("[tcp_server] -> befor Executor::spawn()"); + Executor::spawn(future); + println!("[tcp_server] -> after Executor::spawn()"); + } + } +} diff --git a/src/reactor.rs b/src/reactor.rs index 51133ed..ba064ee 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -1,113 +1,144 @@ -use std::{ - cell::RefCell, - os::unix::prelude::{AsRawFd, RawFd}, - rc::Rc, - task::{Context, Waker}, -}; - -use polling::{Event, Poller}; - +// luwenpeng 2022/11/01 + +use polling::Event; +use polling::Poller; +use std::cell::RefCell; +use std::os::unix::prelude::AsRawFd; +use std::os::unix::prelude::RawFd; +use std::rc::Rc; +use std::task::Context; +use std::task::Waker; +use std::thread::sleep; + +/* + * 通过 executor 获取 reactor + * + * 单线程中 + * Rc : (多个所有者可读) + * RefCell : (一个所有者可写) + * Rc> : (多个所有者可写) + */ #[inline] pub(crate) fn get_reactor() -> Rc> { - crate::executor::EX.with(|ex| ex.reactor.clone()) + crate::executor::THREAD_LOCAL_EXECUTOR.with(|executor| executor.reactor.clone()) } +/****************************************************************************** + * Reactor + * + * fun: + * + * reactor.new(); 创建 reactor + * reactor.add(); 将 fd 添加到 reactor 的 epoll 中,但未设置读写事件 + * reactor.del(); 将 fd 从 + * reactor.mod_read(); + * reactor.mod_write(); + * reactor.wait(); + ******************************************************************************/ + #[derive(Debug)] pub struct Reactor { - poller: Poller, - waker_mapping: rustc_hash::FxHashMap, - - buffer: Vec, + poller: Poller, // poll + waker_mapping: rustc_hash::FxHashMap, // sockfd 与 waker 的映射关系 + buffer: Vec, // Event 的就绪队列 } impl Reactor { pub fn new() -> Self { + println!("[reactor] new"); + Self { poller: Poller::new().unwrap(), waker_mapping: Default::default(), - buffer: Vec::with_capacity(2048), } } - // Epoll related pub fn add(&mut self, fd: RawFd) { - println!("[reactor] add fd {}", fd); + println!("[reactor] add fd: {}, event: none", fd); + // 将 fd 设置为 nonblocking let flags = nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap()) .unwrap(); let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK; nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap(); + + // 将 fd 添加到 poller 中 self.poller .add(fd, polling::Event::none(fd as usize)) .unwrap(); } - pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) { - println!("[reactor] modify_readable fd {} token {}", fd, fd * 2); + pub fn del(&mut self, fd: RawFd) { + println!("[reactor] del fd: {}, event: all", fd,); + + // 将 fd 读写事件的 ctx 从 map 中删除 + self.waker_mapping.remove(&(fd as u64 * 2)); + self.waker_mapping.remove(&(fd as u64 * 2 + 1)); + + // 将 fd 从 poller 中移除 + self.poller.delete(fd).unwrap(); + } + + pub fn mod_read(&mut self, fd: RawFd, ctx: &mut Context) { + println!("[reactor] mod fd: {}, event: read", fd,); + + // 将 fd 可读事件的 ctx 存储到 map 中 + self.waker_mapping + .insert(fd as u64 * 2, ctx.waker().clone()); - self.push_completion(fd as u64 * 2, cx); + // fd 注册可读事件 let event = polling::Event::readable(fd as usize); self.poller.modify(fd, event); } - pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) { - println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1); + pub fn mod_write(&mut self, fd: RawFd, ctx: &mut Context) { + println!("[reactor] mod fd: {}, event: write", fd,); - self.push_completion(fd as u64 * 2 + 1, cx); + // 将 fd 可写事件的 ctx 存储到 map 中 + self.waker_mapping + .insert(fd as u64 * 2 + 1, ctx.waker().clone()); + + // fd 注册可写事件 let event = polling::Event::writable(fd as usize); self.poller.modify(fd, event); } pub fn wait(&mut self) { - println!("[reactor] waiting"); + println!("[reactor] waiting ..."); + + // TODO 增加定时器 timeout self.poller.wait(&mut self.buffer, None); - println!("[reactor] wait done"); + println!("[reactor] {} events is ready", self.buffer.len()); for i in 0..self.buffer.len() { let event = self.buffer.swap_remove(0); + + // TODO 是否会出现 readable && writable 的 event + + // 优先处理读事件 if event.readable { if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) { println!( - "[reactor token] fd {} read waker token {} removed and woken", + "[reactor] wake fd: {}, event: read, action: run waker.wake()", event.key, - event.key * 2 ); waker.wake(); } } + if event.writable { if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) { println!( - "[reactor token] fd {} write waker token {} removed and woken", + "[reactor] wake fd: {}, event: write, action: run waker.wake()", event.key, - event.key * 2 + 1 ); waker.wake(); } } } } - - pub fn delete(&mut self, fd: RawFd) { - println!("[reactor] delete fd {}", fd); - - self.waker_mapping.remove(&(fd as u64 * 2)); - self.waker_mapping.remove(&(fd as u64 * 2 + 1)); - println!( - "[reactor token] fd {} wakers token {}, {} removed", - fd, - fd * 2, - fd * 2 + 1 - ); - } - - fn push_completion(&mut self, token: u64, cx: &mut Context) { - println!("[reactor token] token {} waker saved", token); - - self.waker_mapping.insert(token, cx.waker().clone()); - } } impl Default for Reactor { diff --git a/src/tcp.rs b/src/tcp.rs index 46ef06a..8d1f3c5 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -1,16 +1,29 @@ -use std::{ - cell::RefCell, - io::{self, Read, Write}, - net::{SocketAddr, TcpListener as StdTcpListener, TcpStream as StdTcpStream, ToSocketAddrs}, - os::unix::prelude::AsRawFd, - rc::{Rc, Weak}, - task::Poll, -}; +// luwenpeng 2022/11/01 +use crate::reactor::get_reactor; +use crate::reactor::Reactor; use futures::Stream; -use socket2::{Domain, Protocol, Socket, Type}; +use socket2::Domain; +use socket2::Protocol; +use socket2::Socket; +use socket2::Type; +use std::cell::RefCell; +use std::io; +use std::io::Read; +use std::io::Write; +use std::net::SocketAddr; +use std::net::TcpListener as StdTcpListener; +use std::net::TcpStream as StdTcpStream; +use std::net::ToSocketAddrs; +use std::os::unix::prelude::AsRawFd; +use std::rc::Rc; +use std::rc::Weak; +use std::task::Poll; +use std::thread::sleep; -use crate::{reactor::get_reactor, reactor::Reactor}; +/****************************************************************************** + * TcpListener + ******************************************************************************/ #[derive(Debug)] pub struct TcpListener { @@ -20,6 +33,7 @@ pub struct TcpListener { impl TcpListener { pub fn bind(addr: A) -> Result { + // 设置 addr let addr = addr .to_socket_addrs()? .next() @@ -30,20 +44,32 @@ impl TcpListener { } else { Domain::IPV4 }; - let sk = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; + let addr = socket2::SockAddr::from(addr); - sk.set_reuse_address(true)?; - sk.bind(&addr)?; - sk.listen(1024)?; - // add fd to reactor + // 创建 listenfd + let listenfd = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; + + // 设置 reuse addr + listenfd.set_reuse_address(true)?; + + // bind + listenfd.bind(&addr)?; + + // listen + listenfd.listen(1024)?; + + // 通过 executor 获取 reactor let reactor = get_reactor(); - reactor.borrow_mut().add(sk.as_raw_fd()); - println!("tcp bind with fd {}", sk.as_raw_fd()); + // 将 listenfd 添加到 reactor 的 epoll 中,此时未设置 readable or writable 事件 + reactor.borrow_mut().add(listenfd.as_raw_fd()); + + println!("[tcp_listener] create listenfd: {}", listenfd.as_raw_fd(),); + Ok(Self { reactor: Rc::downgrade(&reactor), - listener: sk.into(), + listener: listenfd.into(), }) } } @@ -56,20 +82,46 @@ impl Stream for TcpListener { cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { match self.listener.accept() { - Ok((stream, addr)) => Poll::Ready(Some(Ok((stream.into(), addr)))), + // listenfd 上有新的连接到来 + Ok((stream, addr)) => { + println!( + "[tcp_listener] Stream->poll_next(): accept new connection: {}", + addr + ); + + // 将 listenfd 注册到 reactor 的 epoll 中,期待可读事情 + let reactor = self.reactor.upgrade().unwrap(); + reactor.borrow_mut().mod_read(self.listener.as_raw_fd(), cx); + + // NOTE: stream.into() 时会调用 TcpStream.from() 将 streamfd 注册到 reactor 中 + Poll::Ready(Some(Ok((stream.into(), addr)))) + } + + // listenfd 返回 wouldblock 错误 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - // modify reactor to register interest + println!("[tcp_listener] Stream->poll_next(): accept error: would block"); + + // 将 listenfd 注册到 reactor 的 epoll 中,期待可读事情 let reactor = self.reactor.upgrade().unwrap(); - reactor - .borrow_mut() - .modify_readable(self.listener.as_raw_fd(), cx); + reactor.borrow_mut().mod_read(self.listener.as_raw_fd(), cx); + Poll::Pending } - Err(e) => std::task::Poll::Ready(Some(Err(e))), + + // listenfd 返回其他错误 + Err(e) => { + println!("[tcp_listener] Stream->poll_next(): accept error: {}", e); + + Poll::Ready(Some(Err(e))) + } } } } +/****************************************************************************** + * TcpStream + ******************************************************************************/ + #[derive(Debug)] pub struct TcpStream { stream: StdTcpStream, @@ -77,6 +129,11 @@ pub struct TcpStream { impl From for TcpStream { fn from(stream: StdTcpStream) -> Self { + println!( + "[tcp_stream] From->from(): add streamfd {} to reactor", + stream.as_raw_fd() + ); + let reactor = get_reactor(); reactor.borrow_mut().add(stream.as_raw_fd()); Self { stream } @@ -85,9 +142,13 @@ impl From for TcpStream { impl Drop for TcpStream { fn drop(&mut self) { - println!("drop"); + println!( + "[tcp_stream] Drop->drop(): del streamfd {} from reactor", + self.stream.as_raw_fd() + ); + let reactor = get_reactor(); - reactor.borrow_mut().delete(self.stream.as_raw_fd()); + reactor.borrow_mut().del(self.stream.as_raw_fd()); } } @@ -98,27 +159,32 @@ impl tokio::io::AsyncRead for TcpStream { buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { let fd = self.stream.as_raw_fd(); + unsafe { let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); - println!("read for fd {}", fd); match self.stream.read(b) { Ok(n) => { - println!("read for fd {} done, {}", fd, n); + println!("[tcp_stream] AsyncRead->poll_read(): fd {}, len: {}", fd, n); + buf.assume_init(n); buf.advance(n); Poll::Ready(Ok(())) } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - println!("read for fd {} done WouldBlock", fd); - // modify reactor to register interest + println!( + "[tcp_stream] AsyncRead->poll_read(): fd {}, err: would block", + fd + ); + let reactor = get_reactor(); - reactor - .borrow_mut() - .modify_readable(self.stream.as_raw_fd(), cx); + reactor.borrow_mut().mod_read(self.stream.as_raw_fd(), cx); Poll::Pending } + Err(e) => { - println!("read for fd {} done err", fd); + println!("[tcp_stream] AsyncRead->poll_read(): fd {}, err: {}", fd, e); + Poll::Ready(Err(e)) } } @@ -133,15 +199,36 @@ impl tokio::io::AsyncWrite for TcpStream { buf: &[u8], ) -> Poll> { match self.stream.write(buf) { - Ok(n) => Poll::Ready(Ok(n)), + Ok(n) => { + println!( + "[tcp_stream] AsyncWrite->poll_write(): fd {}, len: {}", + self.stream.as_raw_fd(), + n + ); + + Poll::Ready(Ok(n)) + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + println!( + "[tcp_stream] AsyncWrite->poll_write(): fd {}, err: would block", + self.stream.as_raw_fd() + ); + let reactor = get_reactor(); - reactor - .borrow_mut() - .modify_writable(self.stream.as_raw_fd(), cx); + reactor.borrow_mut().mod_write(self.stream.as_raw_fd(), cx); Poll::Pending } - Err(e) => Poll::Ready(Err(e)), + + Err(e) => { + println!( + "[tcp_stream] AsyncWrite->poll_write(): fd {}, err: {}", + self.stream.as_raw_fd(), + e + ); + + Poll::Ready(Err(e)) + } } } @@ -149,6 +236,11 @@ impl tokio::io::AsyncWrite for TcpStream { self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { + println!( + "[tcp_stream] AsyncWrite->poll_flush(): fd {}", + self.stream.as_raw_fd() + ); + Poll::Ready(Ok(())) } @@ -156,6 +248,11 @@ impl tokio::io::AsyncWrite for TcpStream { self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { + println!( + "[tcp_stream] AsyncWrite->poll_shutdown(): fd {}", + self.stream.as_raw_fd() + ); + self.stream.shutdown(std::net::Shutdown::Write)?; Poll::Ready(Ok(())) } -- cgit v1.2.3