From e5038c5b0c1d5ee8026290d22734d79818533eef Mon Sep 17 00:00:00 2001 From: zy Date: Wed, 23 Aug 2023 04:59:13 +0000 Subject: 重新 init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 3 + Cargo.toml | 1 - README.md | 8 +- examples/echo.rs | 72 ------------ examples/tcp_async.rs | 72 ++++++++++++ examples/udp_sync.rs | 3 + src/executor.rs | 2 +- src/lib.rs | 3 +- src/net/mod.rs | 2 + src/net/tcp.rs | 173 +++++++++++++++++++++++++++ src/net/udp.rs | 317 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/tcp.rs | 173 --------------------------- src/udp.rs | 317 -------------------------------------------------- 13 files changed, 576 insertions(+), 570 deletions(-) delete mode 100644 examples/echo.rs create mode 100644 examples/tcp_async.rs create mode 100644 src/net/mod.rs create mode 100644 src/net/tcp.rs create mode 100644 src/net/udp.rs delete mode 100644 src/tcp.rs delete mode 100644 src/udp.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 12052b8..f728a93 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,6 +3,9 @@ "**/target": true }, "lldb.verboseLogging": true, + "rust-analyzer.linkedProjects": [ + "./Cargo.toml" + ], // "rust-analyzer.linkedProjects": [ // "./timer_future/Cargo.toml" // ] diff --git a/Cargo.toml b/Cargo.toml index 99ec98a..af45e9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,4 @@ [package] -authors = ["ihciah "] edition = "2021" name = "mini-rust-runtime" version = "0.1.0" diff --git a/README.md b/README.md index 2e775b8..8dd01c5 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Mini Rust Runtime -To show how a runtime works. +实验性质 rust runtime 功能接口不断变更中. -The related post: [Rust Runtime 设计与实现-科普篇](https://www.ihcblog.com/rust-runtime-design-1/) - -Ref: https://github.com/fujita/greeter \ No newline at end of file +fork 自 [mini-rust-runtime](https://github.com/ihciah/mini-rust-runtime) +- [Rust Runtime 设计与实现-科普篇](https://www.ihcblog.com/rust-runtime-design-1/) +- Ref: https://github.com/fujita/greeter diff --git a/examples/echo.rs b/examples/echo.rs deleted file mode 100644 index 11429e2..0000000 --- a/examples/echo.rs +++ /dev/null @@ -1,72 +0,0 @@ -//! Echo example. -//! Use `nc 127.0.0.1 30000` to connect. - -use futures::StreamExt; -use mini_rust_runtime::executor::Executor; -use mini_rust_runtime::tcp::TcpListener; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -fn main() { - let ex = Executor::new(); // 执行器实例 - ex.block_on(test); // 协程最开始的地方 -} - -async fn test(){ - let test1 = serve; - let test2= serve2; - // 并发执行 test1 test2 - let ((),()) = futures::join!(test1(), test2()); -} - -async fn serve2(){ - let mut listener2 = TcpListener::bind("127.0.0.1:30001").unwrap(); // 绑定地址:端口 - while let Some(ret) = listener2.next().await { // 等待下一个 Tcp 连接 - // 如果 tcp 建立连接成功, 取 stream 和 addr - if let Ok((mut stream, addr)) = ret { - println!("accept a new connection from {} successfully", addr); - let f = async move { - let mut buf = [0; 4096]; - loop { - match stream.read(&mut buf).await { - Ok(n) => { - if n == 0 || stream.write_all(&buf[..n]).await.is_err() { - // 回写 - return; - } - } - Err(_) => { - return; - } - } - } - }; - Executor::spawn(f); - } - } -} - -async fn serve() { - let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap(); // 绑定地址:端口 - while let Some(ret) = listener.next().await { - if let Ok((mut stream, addr)) = ret { - println!("accept a new connection from {} successfully", addr); - let f = async move { - let mut buf = [0; 4096]; - loop { - match stream.read(&mut buf).await { - Ok(n) => { - if n == 0 || stream.write_all(&buf[..n]).await.is_err() { - // 回写 - return; - } - } - Err(_) => { - return; - } - } - } - }; - Executor::spawn(f); - } - } -} diff --git a/examples/tcp_async.rs b/examples/tcp_async.rs new file mode 100644 index 0000000..50c5ed6 --- /dev/null +++ b/examples/tcp_async.rs @@ -0,0 +1,72 @@ +//! Echo example. +//! Use `nc 127.0.0.1 30000` to connect. + +use futures::StreamExt; +use mini_rust_runtime::executor::Executor; +use mini_rust_runtime::net::tcp::TcpListener; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +fn main() { + let ex = Executor::new(); // 执行器实例 + ex.block_on(test); // 协程最开始的地方 +} + +async fn test(){ + let test1 = serve; + let test2= serve2; + // 并发执行 test1 test2 + let ((),()) = futures::join!(test1(), test2()); +} + +async fn serve2(){ + let mut listener2 = TcpListener::bind("127.0.0.1:30001").unwrap(); // 绑定地址:端口 + while let Some(ret) = listener2.next().await { // 等待下一个 Tcp 连接 + // 如果 tcp 建立连接成功, 取 stream 和 addr + if let Ok((mut stream, addr)) = ret { + println!("accept a new connection from {} successfully", addr); + let f = async move { + let mut buf = [0; 4096]; + loop { + match stream.read(&mut buf).await { + Ok(n) => { + if n == 0 || stream.write_all(&buf[..n]).await.is_err() { + // 回写 + return; + } + } + Err(_) => { + return; + } + } + } + }; + Executor::spawn(f); + } + } +} + +async fn serve() { + let mut listener = TcpListener::bind("127.0.0.1:30000").unwrap(); // 绑定地址:端口 + while let Some(ret) = listener.next().await { + if let Ok((mut stream, addr)) = ret { + println!("accept a new connection from {} successfully", addr); + let f = async move { + let mut buf = [0; 4096]; + loop { + match stream.read(&mut buf).await { + Ok(n) => { + if n == 0 || stream.write_all(&buf[..n]).await.is_err() { + // 回写 + return; + } + } + Err(_) => { + return; + } + } + } + }; + Executor::spawn(f); + } + } +} diff --git a/examples/udp_sync.rs b/examples/udp_sync.rs index b33ec35..8187ac0 100644 --- a/examples/udp_sync.rs +++ b/examples/udp_sync.rs @@ -1,3 +1,6 @@ +//! Echo example. +//! Use `nc -u 127.0.0.1 30000` to connect. + use std::net::UdpSocket; fn main() -> std::io::Result<()> { diff --git a/src/executor.rs b/src/executor.rs index 9832d88..5d9e201 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -78,7 +78,7 @@ impl Executor { while let Some(t) = self.local_queue.pop() { let future = t.future.borrow_mut(); let w = waker(t.clone()); // Waker 实例 - let mut context = Context::from_waker(&w); // Context 实例 + let mut context: Context<'_> = Context::from_waker(&w); // Context 实例 let _ = Pin::new(future).as_mut().poll(&mut context); // poll } diff --git a/src/lib.rs b/src/lib.rs index d4151d1..48af7b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,6 @@ #![allow(unused)] pub mod executor; -pub mod tcp; -pub mod udp; +pub mod net; mod reactor; diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 0000000..a6f6844 --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,2 @@ +pub mod tcp; +pub mod udp; \ No newline at end of file diff --git a/src/net/tcp.rs b/src/net/tcp.rs new file mode 100644 index 0000000..94b876a --- /dev/null +++ b/src/net/tcp.rs @@ -0,0 +1,173 @@ +use std::{ + cell::RefCell, + io::{self, Read, Write}, + net::{SocketAddr, TcpListener as StdTcpListener, TcpStream as StdTcpStream, ToSocketAddrs}, + os::unix::prelude::AsRawFd, + rc::{Rc, Weak}, + task::Poll, +}; + +use futures::Stream; +use socket2::{Domain, Protocol, Socket, Type}; + +use crate::{reactor::get_reactor, reactor::Reactor}; + +/// TCP 监听器 +#[derive(Debug)] +pub struct TcpListener { + reactor: Weak>, // reactor + listener: StdTcpListener, // 标准库的 TcpListener | 包装一层 +} + +impl TcpListener { + /// 绑定地址并返回一个 `TcpListener` 实例 + pub fn bind(addr: A) -> Result { + // 解析地址 + let addr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; + + // 创建 socket + let domain = if addr.is_ipv6() { + Domain::IPV6 + } else { + Domain::IPV4 + }; + let sk = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; + + // 绑定地址并监听 + let addr = socket2::SockAddr::from(addr); + sk.set_reuse_address(true)?; + sk.bind(&addr)?; + sk.listen(1024)?; + + // 将 fd 添加到 reactor 中 + let reactor = get_reactor(); + reactor.borrow_mut().add(sk.as_raw_fd()); + + println!("tcp bind with fd {}", sk.as_raw_fd()); + Ok(Self { + reactor: Rc::downgrade(&reactor), + listener: sk.into(), + }) + } +} +//Stream 流 +impl Stream for TcpListener { //TcpStream 和 TcpListener 在这链接 + type Item = std::io::Result<(TcpStream, SocketAddr)>; // + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.listener.accept() { + Ok((stream, addr)) => Poll::Ready(Some(Ok((stream.into(), addr)))), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 继续阻塞 + // 修改反应器以注册感兴趣的事件 + let reactor = self.reactor.upgrade().unwrap(); + reactor + .borrow_mut() + .modify_readable(self.listener.as_raw_fd(), cx); // 可读事件 + Poll::Pending + } + Err(e) => std::task::Poll::Ready(Some(Err(e))), + } + } +} + +/// TCP 流 +#[derive(Debug)] +pub struct TcpStream { + stream: StdTcpStream, // 标准库的 TcpStream | 包装一层 +} + +impl From for TcpStream { + // 从标准库的 TcpStream 转换为自定义的 TcpStream + fn from(stream: StdTcpStream) -> Self { + let reactor: Rc> = get_reactor(); // 获取 reactor + reactor.borrow_mut().add(stream.as_raw_fd()); // 将 fd 添加到 reactor + Self { stream } // 返回包装后的 TcpStream + } +} + +impl Drop for TcpStream { + fn drop(&mut self) { + // 可变引用 + println!("drop"); + let reactor = get_reactor(); // 获取 reactor + reactor.borrow_mut().delete(self.stream.as_raw_fd()); // 将 fd 从 reactor 中删除 + } +} + +// 为 TcpStream 实现 tokio::io::AsyncRead +impl tokio::io::AsyncRead for TcpStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let fd = self.stream.as_raw_fd(); // 获取 stream 对应的 fd + unsafe { + // 将 ReadBuf 转换为 [u8] , stream.read 需要 + let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); + println!("read for fd {}", fd); + match self.stream.read(b) { + Ok(n) => { // 读取成功 + println!("read for fd {} done, {}", fd, n); + buf.assume_init(n); // 初始化 n 个字节 + buf.advance(n); // 指针前进 n 个字节 + Poll::Ready(Ok(())) // 返回结果 + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // 读取失败,且错误类型为 WouldBlock + println!("read for fd {} done WouldBlock", fd); + // 修改反应器以注册感兴趣的事件 + let reactor = get_reactor(); // 获取 reactor + reactor + .borrow_mut() + .modify_readable(self.stream.as_raw_fd(), cx); // 注册到 reactor 可读事件 + Poll::Pending // 等待 + } + Err(e) => { + println!("read for fd {} done err", fd); + Poll::Ready(Err(e)) // 结束(错误) + } + } + } + } +} + +impl tokio::io::AsyncWrite for TcpStream { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.stream.write(buf) { + Ok(n) => Poll::Ready(Ok(n)), // 写入成功 + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // 写入失败,且错误类型为 WouldBlock + let reactor: Rc> = get_reactor(); + reactor + .borrow_mut() + .modify_writable(self.stream.as_raw_fd(), cx); // 注册可写事件 + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), // 写入失败,返回结果(错误信息) + } + } + + fn poll_flush( // 或许是仅占位吧. + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( // 关闭时,将 stream 的写入关闭 + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.stream.shutdown(std::net::Shutdown::Write)?; // 关闭写入,出错则传递错误 + Poll::Ready(Ok(())) + } +} diff --git a/src/net/udp.rs b/src/net/udp.rs new file mode 100644 index 0000000..0a6a391 --- /dev/null +++ b/src/net/udp.rs @@ -0,0 +1,317 @@ +use std::cell::RefCell; +use std::io; +use std::net::{SocketAddr, ToSocketAddrs, UdpSocket as StdUdpSocket}; +use std::os::unix::prelude::AsRawFd; +use std::rc::{Rc, Weak}; +use std::task::Poll; + +use futures::{Future, Stream}; +use socket2::{Domain, Protocol, SockAddr, Socket, Type}; + +use crate::reactor::{get_reactor, Reactor}; + +/// UDP 套接字 +#[derive(Debug)] +pub struct UdpSocket { + reactor: Weak>, // reactor + socket: StdUdpSocket, // 标准库的 UdpSocket | 包装一层 +} + +impl UdpSocket { + /// 绑定地址并返回一个 `UdpSocket` 实例 + pub fn bind(addr: A) -> Result { + // 解析地址 + let addr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; + + // 地址解析 + let domain = if addr.is_ipv6() { + Domain::IPV6 + } else { + Domain::IPV4 + }; + let sk = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; + + // 绑定地址 + let addr: SockAddr = SockAddr::from(addr); + sk.bind(&addr)?; + + // 将 fd 添加到 reactor 中 + let reactor = get_reactor(); + reactor.borrow_mut().add(sk.as_raw_fd()); + + println!("udp bind with fd {}", sk.as_raw_fd()); + Ok(Self { + reactor: Rc::downgrade(&reactor), + socket: sk.into(), + }) + } + + pub fn recv_from_async<'a>( + &'a self, + buf: &'a mut [u8], + ) -> impl Future> + 'a { + struct RecvFromFuture<'a> { + socket: &'a StdUdpSocket, + buf: &'a mut [u8], + } + impl<'a> Future for RecvFromFuture<'a> { + type Output = io::Result<(usize, SocketAddr)>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + match self.socket.recv_from(self.buf) { + Ok((n, addr)) => { + println!("recv_from {} {} bytes", addr, n); + Poll::Ready(Ok((n, addr))) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + // 读取失败,且错误类型为 WouldBlock + println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); + // 修改反应器以注册感兴趣的事件 + let reactor = get_reactor(); // 获取 reactor + reactor + .borrow_mut() + .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 + Poll::Pending // 等待 + } + Err(e) => { + println!("read for fd {} done err", self.socket.as_raw_fd()); + Poll::Ready(Err(e)) // 结束(错误) + } + } + } + } + RecvFromFuture { + socket: &self.socket, + buf, + } + } + + pub fn send_to_async<'a>( + &'a self, + buf: &'a [u8], + addr: &'a SocketAddr, + ) -> impl Future> + 'a { + struct SendToFuture<'a> { + socket: &'a StdUdpSocket, + buf: &'a [u8], + addr: &'a SocketAddr, + } + impl<'a> Future for SendToFuture<'a> { + type Output = io::Result; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + match self.socket.send_to(self.buf, self.addr) { + Ok(n) => Poll::Ready(Ok(n)), // 写入成功 + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + // 写入失败,且错误类型为 WouldBlock + let reactor: Rc> = get_reactor(); + reactor + .borrow_mut() + .modify_writable(self.socket.as_raw_fd(), cx); // 注册可写事件 + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), // 写入失败,返回结果(错误信息) + } + } + } + SendToFuture { + socket: &self.socket, + buf, + addr, + } + } +} + +// impl From for UdpSocket { +// fn from(socket: StdUdpSocket) -> Self { +// Self { +// reactor: Weak::new(), +// socket, +// } +// } +// } + +impl Drop for UdpSocket { + fn drop(&mut self) { + println!("drop udp socket"); + if let Some(reactor) = self.reactor.upgrade() { + // 从 reactor 中移除 + reactor.borrow_mut().delete(self.socket.as_raw_fd()); + } + } +} + +// impl Future for UdpSocket { +// type Output = io::Result<(usize, SocketAddr)>; + +// fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { +// let mut buf = [0; 1024]; +// match self.socket.recv_from(&mut buf) { +// Ok((n, addr)) => { +// println!("recv_from {} {} bytes", addr, n); +// Poll::Ready(Ok((n, addr))) +// } +// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { +// // 读取失败,且错误类型为 WouldBlock +// println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); +// // 修改反应器以注册感兴趣的事件 +// let reactor = get_reactor(); // 获取 reactor +// reactor +// .borrow_mut() +// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 +// Poll::Pending // 等待 +// } +// Err(e) => { +// println!("read for fd {} done err", self.socket.as_raw_fd()); +// Poll::Ready(Err(e)) // 结束(错误) +// } +// } +// } +// } + +// impl Stream for UdpSocket { +// type Item = io::Result<(usize, SocketAddr)>; + +// fn poll_next( +// self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> std::task::Poll> { +// match self.socket.recv_from(&mut self.buf) { +// Ok((n, addr)) => { +// println!("recv_from {} {} bytes", addr, n); +// Poll::Ready(Some(Ok((n, addr)))) +// } +// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { +// // 读取失败,且错误类型为 WouldBlock +// println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); +// // 修改反应器以注册感兴趣的事件 +// let reactor = get_reactor(); // 获取 reactor +// reactor +// .borrow_mut() +// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 +// Poll::Pending // 等待 +// } +// Err(e) => { +// println!("read for fd {} done err", self.socket.as_raw_fd()); +// Poll::Ready(Some(Err(e))) // 结束(错误) +// } +// } +// } +// } + +// impl UdpSocket{ +// pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { +// let mut buf = tokio::io::ReadBuf::new(buf); +// self.read(&mut buf).await?; +// Ok((buf.filled().len(), buf.filled().len())) +// } +// } + +// impl Future for UdpSocket { +// type Output = io::Result<(Vec, SocketAddr)>; + +// fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { +// let mut buf = vec![0u8; 1024]; +// match self.socket.recv_from(&mut buf) { +// Ok((n, addr)) => { +// println!("recv_from {} {} bytes", addr, n); +// Poll::Ready(Ok((buf, addr))) +// } +// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { +// // 读取失败,且错误类型为 WouldBlock +// println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); +// // 修改反应器以注册感兴趣的事件 +// let reactor = get_reactor(); // 获取 reactor +// reactor +// .borrow_mut() +// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 +// Poll::Pending // 等待 +// } +// Err(e) => { +// println!("read for fd {} done err", self.socket.as_raw_fd()); +// Poll::Ready(Err(e)) // 结束(错误) +// } +// } +// } +// } + +// // 为 UdpSocket 实现 tokio::io::AsyncWrite +// impl tokio::io::AsyncWrite for UdpSocket { +// fn poll_write( +// mut self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// buf: &[u8], +// ) -> Poll> { +// match self.send_to(buf, &"127.0.0.1:8080".parse().unwrap()) { +// Ok(n) => Poll::Ready(Ok(n)), // 写入成功 +// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 写入失败,且错误类型为 WouldBlock +// let reactor: Rc> = get_reactor(); +// reactor +// .borrow_mut() +// .modify_writable(self.socket.as_raw_fd(), cx); // 注册可写事件 +// Poll::Pending +// } +// Err(e) => Poll::Ready(Err(e)), // 写入失败,返回结果(错误信息) +// } +// } + +// fn poll_flush( +// self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> Poll> { +// Poll::Ready(Ok(())) +// } + +// fn poll_shutdown( +// self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> Poll> { +// Poll::Ready(Ok(())) +// } +// } + +// 为 UdpSocket 实现 tokio::io::AsyncRead +// impl tokio::io::AsyncRead for UdpSocket { +// fn poll_read( +// mut self: std::pin::Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// buf: &mut tokio::io::ReadBuf<'_>, +// ) -> Poll> { +// let fd: i32 = self.socket.as_raw_fd(); // 获取 socket 对应的 fd +// unsafe { +// // 将 ReadBuf 转换为 [u8] , socket.recv_from 需要 +// let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); +// println!("read for fd {}", fd); +// match self.socket.recv_from(b) { +// Ok((size, addr)) => { +// // 读取成功 +// println!("read for fd {} done, {}", fd, size); +// Poll::Ready(Ok((size, addr))) +// } +// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { +// // 读取失败,且错误类型为 WouldBlock +// println!("read for fd {} done WouldBlock", fd); +// // 修改反应器以注册感兴趣的事件 +// let reactor = get_reactor(); // 获取 reactor +// reactor +// .borrow_mut() +// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 +// Poll::Pending // 等待 +// } +// Err(e) => { +// println!("read for fd {} done err", fd); +// Poll::Ready(Err(e)) // 结束(错误) +// } +// } +// } +// } +// } diff --git a/src/tcp.rs b/src/tcp.rs deleted file mode 100644 index 94b876a..0000000 --- a/src/tcp.rs +++ /dev/null @@ -1,173 +0,0 @@ -use std::{ - cell::RefCell, - io::{self, Read, Write}, - net::{SocketAddr, TcpListener as StdTcpListener, TcpStream as StdTcpStream, ToSocketAddrs}, - os::unix::prelude::AsRawFd, - rc::{Rc, Weak}, - task::Poll, -}; - -use futures::Stream; -use socket2::{Domain, Protocol, Socket, Type}; - -use crate::{reactor::get_reactor, reactor::Reactor}; - -/// TCP 监听器 -#[derive(Debug)] -pub struct TcpListener { - reactor: Weak>, // reactor - listener: StdTcpListener, // 标准库的 TcpListener | 包装一层 -} - -impl TcpListener { - /// 绑定地址并返回一个 `TcpListener` 实例 - pub fn bind(addr: A) -> Result { - // 解析地址 - let addr = addr - .to_socket_addrs()? - .next() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; - - // 创建 socket - let domain = if addr.is_ipv6() { - Domain::IPV6 - } else { - Domain::IPV4 - }; - let sk = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; - - // 绑定地址并监听 - let addr = socket2::SockAddr::from(addr); - sk.set_reuse_address(true)?; - sk.bind(&addr)?; - sk.listen(1024)?; - - // 将 fd 添加到 reactor 中 - let reactor = get_reactor(); - reactor.borrow_mut().add(sk.as_raw_fd()); - - println!("tcp bind with fd {}", sk.as_raw_fd()); - Ok(Self { - reactor: Rc::downgrade(&reactor), - listener: sk.into(), - }) - } -} -//Stream 流 -impl Stream for TcpListener { //TcpStream 和 TcpListener 在这链接 - type Item = std::io::Result<(TcpStream, SocketAddr)>; // - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match self.listener.accept() { - Ok((stream, addr)) => Poll::Ready(Some(Ok((stream.into(), addr)))), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 继续阻塞 - // 修改反应器以注册感兴趣的事件 - let reactor = self.reactor.upgrade().unwrap(); - reactor - .borrow_mut() - .modify_readable(self.listener.as_raw_fd(), cx); // 可读事件 - Poll::Pending - } - Err(e) => std::task::Poll::Ready(Some(Err(e))), - } - } -} - -/// TCP 流 -#[derive(Debug)] -pub struct TcpStream { - stream: StdTcpStream, // 标准库的 TcpStream | 包装一层 -} - -impl From for TcpStream { - // 从标准库的 TcpStream 转换为自定义的 TcpStream - fn from(stream: StdTcpStream) -> Self { - let reactor: Rc> = get_reactor(); // 获取 reactor - reactor.borrow_mut().add(stream.as_raw_fd()); // 将 fd 添加到 reactor - Self { stream } // 返回包装后的 TcpStream - } -} - -impl Drop for TcpStream { - fn drop(&mut self) { - // 可变引用 - println!("drop"); - let reactor = get_reactor(); // 获取 reactor - reactor.borrow_mut().delete(self.stream.as_raw_fd()); // 将 fd 从 reactor 中删除 - } -} - -// 为 TcpStream 实现 tokio::io::AsyncRead -impl tokio::io::AsyncRead for TcpStream { - fn poll_read( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let fd = self.stream.as_raw_fd(); // 获取 stream 对应的 fd - unsafe { - // 将 ReadBuf 转换为 [u8] , stream.read 需要 - let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); - println!("read for fd {}", fd); - match self.stream.read(b) { - Ok(n) => { // 读取成功 - println!("read for fd {} done, {}", fd, n); - buf.assume_init(n); // 初始化 n 个字节 - buf.advance(n); // 指针前进 n 个字节 - Poll::Ready(Ok(())) // 返回结果 - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // 读取失败,且错误类型为 WouldBlock - println!("read for fd {} done WouldBlock", fd); - // 修改反应器以注册感兴趣的事件 - let reactor = get_reactor(); // 获取 reactor - reactor - .borrow_mut() - .modify_readable(self.stream.as_raw_fd(), cx); // 注册到 reactor 可读事件 - Poll::Pending // 等待 - } - Err(e) => { - println!("read for fd {} done err", fd); - Poll::Ready(Err(e)) // 结束(错误) - } - } - } - } -} - -impl tokio::io::AsyncWrite for TcpStream { - fn poll_write( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - match self.stream.write(buf) { - Ok(n) => Poll::Ready(Ok(n)), // 写入成功 - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // 写入失败,且错误类型为 WouldBlock - let reactor: Rc> = get_reactor(); - reactor - .borrow_mut() - .modify_writable(self.stream.as_raw_fd(), cx); // 注册可写事件 - Poll::Pending - } - Err(e) => Poll::Ready(Err(e)), // 写入失败,返回结果(错误信息) - } - } - - fn poll_flush( // 或许是仅占位吧. - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( // 关闭时,将 stream 的写入关闭 - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.stream.shutdown(std::net::Shutdown::Write)?; // 关闭写入,出错则传递错误 - Poll::Ready(Ok(())) - } -} diff --git a/src/udp.rs b/src/udp.rs deleted file mode 100644 index 0a6a391..0000000 --- a/src/udp.rs +++ /dev/null @@ -1,317 +0,0 @@ -use std::cell::RefCell; -use std::io; -use std::net::{SocketAddr, ToSocketAddrs, UdpSocket as StdUdpSocket}; -use std::os::unix::prelude::AsRawFd; -use std::rc::{Rc, Weak}; -use std::task::Poll; - -use futures::{Future, Stream}; -use socket2::{Domain, Protocol, SockAddr, Socket, Type}; - -use crate::reactor::{get_reactor, Reactor}; - -/// UDP 套接字 -#[derive(Debug)] -pub struct UdpSocket { - reactor: Weak>, // reactor - socket: StdUdpSocket, // 标准库的 UdpSocket | 包装一层 -} - -impl UdpSocket { - /// 绑定地址并返回一个 `UdpSocket` 实例 - pub fn bind(addr: A) -> Result { - // 解析地址 - let addr = addr - .to_socket_addrs()? - .next() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; - - // 地址解析 - let domain = if addr.is_ipv6() { - Domain::IPV6 - } else { - Domain::IPV4 - }; - let sk = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; - - // 绑定地址 - let addr: SockAddr = SockAddr::from(addr); - sk.bind(&addr)?; - - // 将 fd 添加到 reactor 中 - let reactor = get_reactor(); - reactor.borrow_mut().add(sk.as_raw_fd()); - - println!("udp bind with fd {}", sk.as_raw_fd()); - Ok(Self { - reactor: Rc::downgrade(&reactor), - socket: sk.into(), - }) - } - - pub fn recv_from_async<'a>( - &'a self, - buf: &'a mut [u8], - ) -> impl Future> + 'a { - struct RecvFromFuture<'a> { - socket: &'a StdUdpSocket, - buf: &'a mut [u8], - } - impl<'a> Future for RecvFromFuture<'a> { - type Output = io::Result<(usize, SocketAddr)>; - - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll { - match self.socket.recv_from(self.buf) { - Ok((n, addr)) => { - println!("recv_from {} {} bytes", addr, n); - Poll::Ready(Ok((n, addr))) - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - // 读取失败,且错误类型为 WouldBlock - println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); - // 修改反应器以注册感兴趣的事件 - let reactor = get_reactor(); // 获取 reactor - reactor - .borrow_mut() - .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 - Poll::Pending // 等待 - } - Err(e) => { - println!("read for fd {} done err", self.socket.as_raw_fd()); - Poll::Ready(Err(e)) // 结束(错误) - } - } - } - } - RecvFromFuture { - socket: &self.socket, - buf, - } - } - - pub fn send_to_async<'a>( - &'a self, - buf: &'a [u8], - addr: &'a SocketAddr, - ) -> impl Future> + 'a { - struct SendToFuture<'a> { - socket: &'a StdUdpSocket, - buf: &'a [u8], - addr: &'a SocketAddr, - } - impl<'a> Future for SendToFuture<'a> { - type Output = io::Result; - - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll { - match self.socket.send_to(self.buf, self.addr) { - Ok(n) => Poll::Ready(Ok(n)), // 写入成功 - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - // 写入失败,且错误类型为 WouldBlock - let reactor: Rc> = get_reactor(); - reactor - .borrow_mut() - .modify_writable(self.socket.as_raw_fd(), cx); // 注册可写事件 - Poll::Pending - } - Err(e) => Poll::Ready(Err(e)), // 写入失败,返回结果(错误信息) - } - } - } - SendToFuture { - socket: &self.socket, - buf, - addr, - } - } -} - -// impl From for UdpSocket { -// fn from(socket: StdUdpSocket) -> Self { -// Self { -// reactor: Weak::new(), -// socket, -// } -// } -// } - -impl Drop for UdpSocket { - fn drop(&mut self) { - println!("drop udp socket"); - if let Some(reactor) = self.reactor.upgrade() { - // 从 reactor 中移除 - reactor.borrow_mut().delete(self.socket.as_raw_fd()); - } - } -} - -// impl Future for UdpSocket { -// type Output = io::Result<(usize, SocketAddr)>; - -// fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { -// let mut buf = [0; 1024]; -// match self.socket.recv_from(&mut buf) { -// Ok((n, addr)) => { -// println!("recv_from {} {} bytes", addr, n); -// Poll::Ready(Ok((n, addr))) -// } -// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { -// // 读取失败,且错误类型为 WouldBlock -// println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); -// // 修改反应器以注册感兴趣的事件 -// let reactor = get_reactor(); // 获取 reactor -// reactor -// .borrow_mut() -// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 -// Poll::Pending // 等待 -// } -// Err(e) => { -// println!("read for fd {} done err", self.socket.as_raw_fd()); -// Poll::Ready(Err(e)) // 结束(错误) -// } -// } -// } -// } - -// impl Stream for UdpSocket { -// type Item = io::Result<(usize, SocketAddr)>; - -// fn poll_next( -// self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> std::task::Poll> { -// match self.socket.recv_from(&mut self.buf) { -// Ok((n, addr)) => { -// println!("recv_from {} {} bytes", addr, n); -// Poll::Ready(Some(Ok((n, addr)))) -// } -// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { -// // 读取失败,且错误类型为 WouldBlock -// println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); -// // 修改反应器以注册感兴趣的事件 -// let reactor = get_reactor(); // 获取 reactor -// reactor -// .borrow_mut() -// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 -// Poll::Pending // 等待 -// } -// Err(e) => { -// println!("read for fd {} done err", self.socket.as_raw_fd()); -// Poll::Ready(Some(Err(e))) // 结束(错误) -// } -// } -// } -// } - -// impl UdpSocket{ -// pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { -// let mut buf = tokio::io::ReadBuf::new(buf); -// self.read(&mut buf).await?; -// Ok((buf.filled().len(), buf.filled().len())) -// } -// } - -// impl Future for UdpSocket { -// type Output = io::Result<(Vec, SocketAddr)>; - -// fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { -// let mut buf = vec![0u8; 1024]; -// match self.socket.recv_from(&mut buf) { -// Ok((n, addr)) => { -// println!("recv_from {} {} bytes", addr, n); -// Poll::Ready(Ok((buf, addr))) -// } -// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { -// // 读取失败,且错误类型为 WouldBlock -// println!("read for fd {} done WouldBlock", self.socket.as_raw_fd()); -// // 修改反应器以注册感兴趣的事件 -// let reactor = get_reactor(); // 获取 reactor -// reactor -// .borrow_mut() -// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 -// Poll::Pending // 等待 -// } -// Err(e) => { -// println!("read for fd {} done err", self.socket.as_raw_fd()); -// Poll::Ready(Err(e)) // 结束(错误) -// } -// } -// } -// } - -// // 为 UdpSocket 实现 tokio::io::AsyncWrite -// impl tokio::io::AsyncWrite for UdpSocket { -// fn poll_write( -// mut self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// buf: &[u8], -// ) -> Poll> { -// match self.send_to(buf, &"127.0.0.1:8080".parse().unwrap()) { -// Ok(n) => Poll::Ready(Ok(n)), // 写入成功 -// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // 写入失败,且错误类型为 WouldBlock -// let reactor: Rc> = get_reactor(); -// reactor -// .borrow_mut() -// .modify_writable(self.socket.as_raw_fd(), cx); // 注册可写事件 -// Poll::Pending -// } -// Err(e) => Poll::Ready(Err(e)), // 写入失败,返回结果(错误信息) -// } -// } - -// fn poll_flush( -// self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> Poll> { -// Poll::Ready(Ok(())) -// } - -// fn poll_shutdown( -// self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> Poll> { -// Poll::Ready(Ok(())) -// } -// } - -// 为 UdpSocket 实现 tokio::io::AsyncRead -// impl tokio::io::AsyncRead for UdpSocket { -// fn poll_read( -// mut self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// buf: &mut tokio::io::ReadBuf<'_>, -// ) -> Poll> { -// let fd: i32 = self.socket.as_raw_fd(); // 获取 socket 对应的 fd -// unsafe { -// // 将 ReadBuf 转换为 [u8] , socket.recv_from 需要 -// let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); -// println!("read for fd {}", fd); -// match self.socket.recv_from(b) { -// Ok((size, addr)) => { -// // 读取成功 -// println!("read for fd {} done, {}", fd, size); -// Poll::Ready(Ok((size, addr))) -// } -// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { -// // 读取失败,且错误类型为 WouldBlock -// println!("read for fd {} done WouldBlock", fd); -// // 修改反应器以注册感兴趣的事件 -// let reactor = get_reactor(); // 获取 reactor -// reactor -// .borrow_mut() -// .modify_readable(self.socket.as_raw_fd(), cx); // 注册到 reactor 可读事件 -// Poll::Pending // 等待 -// } -// Err(e) => { -// println!("read for fd {} done err", fd); -// Poll::Ready(Err(e)) // 结束(错误) -// } -// } -// } -// } -// } -- cgit v1.2.3