summaryrefslogtreecommitdiff
path: root/src/tcp.rs
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2022-10-31 17:03:10 +0800
committerluwenpeng <[email protected]>2022-11-03 14:30:58 +0800
commit9db73f57c452aa05da55211fd30af568a57857fc (patch)
tree237c6b2adaad64da7733638c7fd2ab0c9d88b7f2 /src/tcp.rs
parent54d9885220d2e0cd0167f6cbb10c7b0d9e762df2 (diff)
[delete] example/echo.rsHEADlwp-self-study
[update] src/lib.rs -- 增加调试日志 [update] src/main.rs -- 增加调试日志 [update] src/tcp.rs i -- 增加调试日志 [update] src/reactor.rs -- 增加调试日志 [update] src/executor.rs -- 增加调试日志
Diffstat (limited to 'src/tcp.rs')
-rw-r--r--src/tcp.rs175
1 files changed, 136 insertions, 39 deletions
diff --git a/src/tcp.rs b/src/tcp.rs
index 46ef06a..8d1f3c5 100644
--- a/src/tcp.rs
+++ b/src/tcp.rs
@@ -1,16 +1,29 @@
-use std::{
- cell::RefCell,
- io::{self, Read, Write},
- net::{SocketAddr, TcpListener as StdTcpListener, TcpStream as StdTcpStream, ToSocketAddrs},
- os::unix::prelude::AsRawFd,
- rc::{Rc, Weak},
- task::Poll,
-};
+// luwenpeng 2022/11/01
+use crate::reactor::get_reactor;
+use crate::reactor::Reactor;
use futures::Stream;
-use socket2::{Domain, Protocol, Socket, Type};
+use socket2::Domain;
+use socket2::Protocol;
+use socket2::Socket;
+use socket2::Type;
+use std::cell::RefCell;
+use std::io;
+use std::io::Read;
+use std::io::Write;
+use std::net::SocketAddr;
+use std::net::TcpListener as StdTcpListener;
+use std::net::TcpStream as StdTcpStream;
+use std::net::ToSocketAddrs;
+use std::os::unix::prelude::AsRawFd;
+use std::rc::Rc;
+use std::rc::Weak;
+use std::task::Poll;
+use std::thread::sleep;
-use crate::{reactor::get_reactor, reactor::Reactor};
+/******************************************************************************
+ * TcpListener
+ ******************************************************************************/
#[derive(Debug)]
pub struct TcpListener {
@@ -20,6 +33,7 @@ pub struct TcpListener {
impl TcpListener {
pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
+ // 设置 addr
let addr = addr
.to_socket_addrs()?
.next()
@@ -30,20 +44,32 @@ impl TcpListener {
} else {
Domain::IPV4
};
- let sk = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
+
let addr = socket2::SockAddr::from(addr);
- sk.set_reuse_address(true)?;
- sk.bind(&addr)?;
- sk.listen(1024)?;
- // add fd to reactor
+ // 创建 listenfd
+ let listenfd = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
+
+ // 设置 reuse addr
+ listenfd.set_reuse_address(true)?;
+
+ // bind
+ listenfd.bind(&addr)?;
+
+ // listen
+ listenfd.listen(1024)?;
+
+ // 通过 executor 获取 reactor
let reactor = get_reactor();
- reactor.borrow_mut().add(sk.as_raw_fd());
- println!("tcp bind with fd {}", sk.as_raw_fd());
+ // 将 listenfd 添加到 reactor 的 epoll 中,此时未设置 readable or writable 事件
+ reactor.borrow_mut().add(listenfd.as_raw_fd());
+
+ println!("[tcp_listener] create listenfd: {}", listenfd.as_raw_fd(),);
+
Ok(Self {
reactor: Rc::downgrade(&reactor),
- listener: sk.into(),
+ listener: listenfd.into(),
})
}
}
@@ -56,20 +82,46 @@ impl Stream for TcpListener {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.listener.accept() {
- Ok((stream, addr)) => Poll::Ready(Some(Ok((stream.into(), addr)))),
+ // listenfd 上有新的连接到来
+ Ok((stream, addr)) => {
+ println!(
+ "[tcp_listener] Stream->poll_next(): accept new connection: {}",
+ addr
+ );
+
+ // 将 listenfd 注册到 reactor 的 epoll 中,期待可读事情
+ let reactor = self.reactor.upgrade().unwrap();
+ reactor.borrow_mut().mod_read(self.listener.as_raw_fd(), cx);
+
+ // NOTE: stream.into() 时会调用 TcpStream.from() 将 streamfd 注册到 reactor 中
+ Poll::Ready(Some(Ok((stream.into(), addr))))
+ }
+
+ // listenfd 返回 wouldblock 错误
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- // modify reactor to register interest
+ println!("[tcp_listener] Stream->poll_next(): accept error: would block");
+
+ // 将 listenfd 注册到 reactor 的 epoll 中,期待可读事情
let reactor = self.reactor.upgrade().unwrap();
- reactor
- .borrow_mut()
- .modify_readable(self.listener.as_raw_fd(), cx);
+ reactor.borrow_mut().mod_read(self.listener.as_raw_fd(), cx);
+
Poll::Pending
}
- Err(e) => std::task::Poll::Ready(Some(Err(e))),
+
+ // listenfd 返回其他错误
+ Err(e) => {
+ println!("[tcp_listener] Stream->poll_next(): accept error: {}", e);
+
+ Poll::Ready(Some(Err(e)))
+ }
}
}
}
+/******************************************************************************
+ * TcpStream
+ ******************************************************************************/
+
#[derive(Debug)]
pub struct TcpStream {
stream: StdTcpStream,
@@ -77,6 +129,11 @@ pub struct TcpStream {
impl From<StdTcpStream> for TcpStream {
fn from(stream: StdTcpStream) -> Self {
+ println!(
+ "[tcp_stream] From->from(): add streamfd {} to reactor",
+ stream.as_raw_fd()
+ );
+
let reactor = get_reactor();
reactor.borrow_mut().add(stream.as_raw_fd());
Self { stream }
@@ -85,9 +142,13 @@ impl From<StdTcpStream> for TcpStream {
impl Drop for TcpStream {
fn drop(&mut self) {
- println!("drop");
+ println!(
+ "[tcp_stream] Drop->drop(): del streamfd {} from reactor",
+ self.stream.as_raw_fd()
+ );
+
let reactor = get_reactor();
- reactor.borrow_mut().delete(self.stream.as_raw_fd());
+ reactor.borrow_mut().del(self.stream.as_raw_fd());
}
}
@@ -98,27 +159,32 @@ impl tokio::io::AsyncRead for TcpStream {
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let fd = self.stream.as_raw_fd();
+
unsafe {
let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
- println!("read for fd {}", fd);
match self.stream.read(b) {
Ok(n) => {
- println!("read for fd {} done, {}", fd, n);
+ println!("[tcp_stream] AsyncRead->poll_read(): fd {}, len: {}", fd, n);
+
buf.assume_init(n);
buf.advance(n);
Poll::Ready(Ok(()))
}
+
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
- println!("read for fd {} done WouldBlock", fd);
- // modify reactor to register interest
+ println!(
+ "[tcp_stream] AsyncRead->poll_read(): fd {}, err: would block",
+ fd
+ );
+
let reactor = get_reactor();
- reactor
- .borrow_mut()
- .modify_readable(self.stream.as_raw_fd(), cx);
+ reactor.borrow_mut().mod_read(self.stream.as_raw_fd(), cx);
Poll::Pending
}
+
Err(e) => {
- println!("read for fd {} done err", fd);
+ println!("[tcp_stream] AsyncRead->poll_read(): fd {}, err: {}", fd, e);
+
Poll::Ready(Err(e))
}
}
@@ -133,15 +199,36 @@ impl tokio::io::AsyncWrite for TcpStream {
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match self.stream.write(buf) {
- Ok(n) => Poll::Ready(Ok(n)),
+ Ok(n) => {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_write(): fd {}, len: {}",
+ self.stream.as_raw_fd(),
+ n
+ );
+
+ Poll::Ready(Ok(n))
+ }
+
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_write(): fd {}, err: would block",
+ self.stream.as_raw_fd()
+ );
+
let reactor = get_reactor();
- reactor
- .borrow_mut()
- .modify_writable(self.stream.as_raw_fd(), cx);
+ reactor.borrow_mut().mod_write(self.stream.as_raw_fd(), cx);
Poll::Pending
}
- Err(e) => Poll::Ready(Err(e)),
+
+ Err(e) => {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_write(): fd {}, err: {}",
+ self.stream.as_raw_fd(),
+ e
+ );
+
+ Poll::Ready(Err(e))
+ }
}
}
@@ -149,6 +236,11 @@ impl tokio::io::AsyncWrite for TcpStream {
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_flush(): fd {}",
+ self.stream.as_raw_fd()
+ );
+
Poll::Ready(Ok(()))
}
@@ -156,6 +248,11 @@ impl tokio::io::AsyncWrite for TcpStream {
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
+ println!(
+ "[tcp_stream] AsyncWrite->poll_shutdown(): fd {}",
+ self.stream.as_raw_fd()
+ );
+
self.stream.shutdown(std::net::Shutdown::Write)?;
Poll::Ready(Ok(()))
}