From d0ffe8bfbffe075f3f104be383fe67efe38e1bb3 Mon Sep 17 00:00:00 2001 From: "ihc童鞋@提不起劲" Date: Fri, 12 May 2023 11:19:44 +0800 Subject: fix: fix TCP_FASTOPEN on macos (#165) --- examples/Cargo.toml | 4 ++++ examples/echo_tfo.rs | 27 +++++++++++++++++++++++++++ monoio/src/driver/legacy/mod.rs | 2 +- monoio/src/driver/legacy/scheduled_io.rs | 5 +++++ monoio/src/net/tcp/listener.rs | 7 ++++++- monoio/src/net/tcp/stream.rs | 17 +++++++++++++++++ 6 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 examples/echo_tfo.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5841371..ee9c6db 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -58,6 +58,10 @@ path = "uds.rs" name = "echo" path = "echo.rs" +[[example]] +name = "echo-tfo" +path = "echo_tfo.rs" + [[example]] name = "join" path = "join.rs" diff --git a/examples/echo_tfo.rs b/examples/echo_tfo.rs new file mode 100644 index 0000000..8d9f341 --- /dev/null +++ b/examples/echo_tfo.rs @@ -0,0 +1,27 @@ +use std::net::SocketAddr; + +use monoio::{ + io::{AsyncReadRentExt, AsyncWriteRentExt}, + net::{TcpListener, TcpStream}, +}; + +#[monoio::main] +async fn main() { + let bind_addr = "127.0.0.1:11990".parse::().unwrap(); + let opts = monoio::net::ListenerOpts::default().tcp_fast_open(true); + let listener = TcpListener::bind_with_config(bind_addr, &opts).unwrap(); + let addr = listener.local_addr().unwrap(); + let (tx, rx) = local_sync::oneshot::channel(); + monoio::spawn(async move { + let (mut socket, active_addr) = listener.accept().await.unwrap(); + socket.read_exact(vec![0; 2]).await.0.unwrap(); + assert!(tx.send(active_addr).is_ok()); + }); + let opts = monoio::net::TcpConnectOpts::default().tcp_fast_open(true); + let mut active = TcpStream::connect_addr_with_config(addr, &opts) + .await + .unwrap(); + active.write_all(b"hi").await.0.unwrap(); + let active_addr = rx.await.unwrap(); + assert_eq!(active.local_addr().unwrap(), active_addr); +} diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 98dd3a1..3bcbaac 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -24,7 +24,7 @@ mod waker; pub(crate) use waker::UnparkHandle; pub(crate) struct LegacyInner { - io_dispatch: Slab, + pub(crate) io_dispatch: Slab, events: Option, poll: mio::Poll, diff --git a/monoio/src/driver/legacy/scheduled_io.rs b/monoio/src/driver/legacy/scheduled_io.rs index 16a91dc..ce6ab94 100644 --- a/monoio/src/driver/legacy/scheduled_io.rs +++ b/monoio/src/driver/legacy/scheduled_io.rs @@ -22,6 +22,11 @@ impl Default for ScheduledIo { } impl ScheduledIo { + #[allow(unused)] + pub(crate) fn set_writable(&mut self) { + self.readiness |= Ready::WRITABLE; + } + pub(crate) fn set_readiness(&mut self, f: impl Fn(Ready) -> Ready) { self.readiness = f(self.readiness); } diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 75f2a34..476e679 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -74,11 +74,16 @@ impl TcpListener { #[cfg(any(target_os = "linux", target_os = "android"))] super::tfo::set_tcp_fastopen(&sys_listener, opts.backlog)?; #[cfg(any(target_os = "ios", target_os = "macos"))] - super::tfo::set_tcp_fastopen(&sys_listener)?; + let _ = super::tfo::set_tcp_fastopen_force_enable(&sys_listener); } sys_listener.bind(&addr)?; sys_listener.listen(opts.backlog)?; + #[cfg(any(target_os = "ios", target_os = "macos"))] + if opts.tcp_fast_open { + super::tfo::set_tcp_fastopen(&sys_listener)?; + } + #[cfg(unix)] let fd = SharedFd::new(sys_listener.into_raw_fd())?; diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index 52e0746..359fc83 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -136,6 +136,23 @@ impl TcpStream { let stream = TcpStream::from_shared_fd(completion.data.fd); // wait write ready on epoll branch if crate::driver::op::is_legacy() { + #[cfg(any(target_os = "ios", target_os = "macos"))] + if !tfo { + stream.writable(true).await?; + } else { + // set writable as init state + crate::driver::CURRENT.with(|inner| match inner { + crate::driver::Inner::Legacy(inner) => { + let idx = stream.fd.registered_index().unwrap(); + if let Some(mut readiness) = + unsafe { &mut *inner.get() }.io_dispatch.get(idx) + { + readiness.set_writable(); + } + } + }) + } + #[cfg(not(any(target_os = "ios", target_os = "macos")))] stream.writable(true).await?; // getsockopt libc::SO_ERROR -- cgit v1.2.3