summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzy <[email protected]>2023-08-15 09:39:16 +0000
committerzy <[email protected]>2023-08-15 09:39:16 +0000
commit903e822ad10befec177827f50e00ca0f53cb3e58 (patch)
tree1d7c36effb487c2fd5d13ec490ab6301914dbed6
parentc88946873cec734a7b81366e87bc204e191ef38e (diff)
一些注释和测试代码comment
-rw-r--r--examples/echo.rs38
-rw-r--r--src/tcp.rs18
2 files changed, 45 insertions, 11 deletions
diff --git a/examples/echo.rs b/examples/echo.rs
index 3f01cf7..2df9c1d 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -8,11 +8,44 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
fn main() {
let ex = Executor::new(); // 执行器实例
- ex.block_on(serve); // 协程最开始的地方
+ ex.block_on(test); // 协程最开始的地方
+}
+
+async fn test(){
+ let test1 = serve;
+ let test2= serve2;
+ // 并发执行 test1 test2
+ let ((),()) = futures::join!(test1(), test2());
+}
+
+async fn serve2(){
+ let mut listener2 = TcpListener::bind("127.0.0.1:30001").unwrap(); // 绑定地址:端口
+ while let Some(ret) = listener2.next().await {
+ if let Ok((mut stream, addr)) = ret {
+ println!("accept a new connection from {} successfully", addr);
+ let f = async move {
+ let mut buf = [0; 4096];
+ loop {
+ match stream.read(&mut buf).await {
+ Ok(n) => {
+ if n == 0 || stream.write_all(&buf[..n]).await.is_err() {
+ // 回写
+ return;
+ }
+ }
+ Err(_) => {
+ return;
+ }
+ }
+ }
+ };
+ Executor::spawn(f);
+ }
+ }
}
async fn serve() {
- let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap();
+ let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap(); // 绑定地址:端口
while let Some(ret) = listener.next().await {
if let Ok((mut stream, addr)) = ret {
println!("accept a new connection from {} successfully", addr);
@@ -22,6 +55,7 @@ async fn serve() {
match stream.read(&mut buf).await {
Ok(n) => {
if n == 0 || stream.write_all(&buf[..n]).await.is_err() {
+ // 回写
return;
}
}
diff --git a/src/tcp.rs b/src/tcp.rs
index 2063e0a..94b876a 100644
--- a/src/tcp.rs
+++ b/src/tcp.rs
@@ -15,8 +15,8 @@ use crate::{reactor::get_reactor, reactor::Reactor};
/// TCP 监听器
#[derive(Debug)]
pub struct TcpListener {
- reactor: Weak<RefCell<Reactor>>,
- listener: StdTcpListener,
+ reactor: Weak<RefCell<Reactor>>, // reactor
+ listener: StdTcpListener, // 标准库的 TcpListener | 包装一层
}
impl TcpListener {
@@ -42,20 +42,20 @@ impl TcpListener {
sk.bind(&addr)?;
sk.listen(1024)?;
- // 将 fd 添加到反应器中
+ // 将 fd 添加到 reactor 中
let reactor = get_reactor();
reactor.borrow_mut().add(sk.as_raw_fd());
println!("tcp bind with fd {}", sk.as_raw_fd());
Ok(Self {
- reactor: Rc::downgrade(&reactor),
+ reactor: Rc::downgrade(&reactor),
listener: sk.into(),
})
}
}
-
-impl Stream for TcpListener {
- type Item = std::io::Result<(TcpStream, SocketAddr)>;
+//Stream 流
+impl Stream for TcpListener { //TcpStream 和 TcpListener 在这链接
+ type Item = std::io::Result<(TcpStream, SocketAddr)>; //
fn poll_next(
self: std::pin::Pin<&mut Self>,
@@ -63,12 +63,12 @@ impl Stream for TcpListener {
) -> std::task::Poll<Option<Self::Item>> {
match self.listener.accept() {
Ok((stream, addr)) => Poll::Ready(Some(Ok((stream.into(), addr)))),
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 继续阻塞
// 修改反应器以注册感兴趣的事件
let reactor = self.reactor.upgrade().unwrap();
reactor
.borrow_mut()
- .modify_readable(self.listener.as_raw_fd(), cx);
+ .modify_readable(self.listener.as_raw_fd(), cx); // 可读事件
Poll::Pending
}
Err(e) => std::task::Poll::Ready(Some(Err(e))),