From 903e822ad10befec177827f50e00ca0f53cb3e58 Mon Sep 17 00:00:00 2001 From: zy Date: Tue, 15 Aug 2023 09:39:16 +0000 Subject: 一些注释和测试代码 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/echo.rs | 38 ++++++++++++++++++++++++++++++++++++-- src/tcp.rs | 18 +++++++++--------- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/examples/echo.rs b/examples/echo.rs index 3f01cf7..2df9c1d 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -8,11 +8,44 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; fn main() { let ex = Executor::new(); // 执行器实例 - ex.block_on(serve); // 协程最开始的地方 + ex.block_on(test); // 协程最开始的地方 +} + +async fn test(){ + let test1 = serve; + let test2= serve2; + // 并发执行 test1 test2 + let ((),()) = futures::join!(test1(), test2()); +} + +async fn serve2(){ + let mut listener2 = TcpListener::bind("127.0.0.1:30001").unwrap(); // 绑定地址:端口 + while let Some(ret) = listener2.next().await { + if let Ok((mut stream, addr)) = ret { + println!("accept a new connection from {} successfully", addr); + let f = async move { + let mut buf = [0; 4096]; + loop { + match stream.read(&mut buf).await { + Ok(n) => { + if n == 0 || stream.write_all(&buf[..n]).await.is_err() { + // 回写 + return; + } + } + Err(_) => { + return; + } + } + } + }; + Executor::spawn(f); + } + } } async fn serve() { - let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap(); + let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap(); // 绑定地址:端口 while let Some(ret) = listener.next().await { if let Ok((mut stream, addr)) = ret { println!("accept a new connection from {} successfully", addr); @@ -22,6 +55,7 @@ async fn serve() { match stream.read(&mut buf).await { Ok(n) => { if n == 0 || stream.write_all(&buf[..n]).await.is_err() { + // 回写 return; } } diff --git a/src/tcp.rs b/src/tcp.rs index 2063e0a..94b876a 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -15,8 +15,8 @@ use crate::{reactor::get_reactor, reactor::Reactor}; /// TCP 监听器 #[derive(Debug)] pub struct TcpListener { - reactor: Weak>, - listener: StdTcpListener, + reactor: Weak>, // reactor + listener: StdTcpListener, // 标准库的 TcpListener | 包装一层 } impl TcpListener { @@ -42,20 +42,20 @@ impl TcpListener { sk.bind(&addr)?; sk.listen(1024)?; - // 将 fd 添加到反应器中 + // 将 fd 添加到 reactor 中 let reactor = get_reactor(); reactor.borrow_mut().add(sk.as_raw_fd()); println!("tcp bind with fd {}", sk.as_raw_fd()); Ok(Self { - reactor: Rc::downgrade(&reactor), + reactor: Rc::downgrade(&reactor), listener: sk.into(), }) } } - -impl Stream for TcpListener { - type Item = std::io::Result<(TcpStream, SocketAddr)>; +//Stream 流 +impl Stream for TcpListener { //TcpStream 和 TcpListener 在这链接 + type Item = std::io::Result<(TcpStream, SocketAddr)>; // fn poll_next( self: std::pin::Pin<&mut Self>, @@ -63,12 +63,12 @@ impl Stream for TcpListener { ) -> std::task::Poll> { match self.listener.accept() { Ok((stream, addr)) => Poll::Ready(Some(Ok((stream.into(), addr)))), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 继续阻塞 // 修改反应器以注册感兴趣的事件 let reactor = self.reactor.upgrade().unwrap(); reactor .borrow_mut() - .modify_readable(self.listener.as_raw_fd(), cx); + .modify_readable(self.listener.as_raw_fd(), cx); // 可读事件 Poll::Pending } Err(e) => std::task::Poll::Ready(Some(Err(e))), -- cgit v1.2.3