summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorihc童鞋@提不起劲 <[email protected]>2023-05-12 11:19:44 +0800
committerGitHub <[email protected]>2023-05-12 11:19:44 +0800
commitd0ffe8bfbffe075f3f104be383fe67efe38e1bb3 (patch)
tree5d428374b12fff196540a4cbd1f00707a38a079c
parent6158266597187fb82f320bc74d29502b4eacbe28 (diff)
fix: fix TCP_FASTOPEN on macos (#165)
-rw-r--r--examples/Cargo.toml4
-rw-r--r--examples/echo_tfo.rs27
-rw-r--r--monoio/src/driver/legacy/mod.rs2
-rw-r--r--monoio/src/driver/legacy/scheduled_io.rs5
-rw-r--r--monoio/src/net/tcp/listener.rs7
-rw-r--r--monoio/src/net/tcp/stream.rs17
6 files changed, 60 insertions, 2 deletions
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 5841371..ee9c6db 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -59,6 +59,10 @@ name = "echo"
path = "echo.rs"
[[example]]
+name = "echo-tfo"
+path = "echo_tfo.rs"
+
+[[example]]
name = "join"
path = "join.rs"
diff --git a/examples/echo_tfo.rs b/examples/echo_tfo.rs
new file mode 100644
index 0000000..8d9f341
--- /dev/null
+++ b/examples/echo_tfo.rs
@@ -0,0 +1,27 @@
+use std::net::SocketAddr;
+
+use monoio::{
+ io::{AsyncReadRentExt, AsyncWriteRentExt},
+ net::{TcpListener, TcpStream},
+};
+
+#[monoio::main]
+async fn main() {
+ let bind_addr = "127.0.0.1:11990".parse::<SocketAddr>().unwrap();
+ let opts = monoio::net::ListenerOpts::default().tcp_fast_open(true);
+ let listener = TcpListener::bind_with_config(bind_addr, &opts).unwrap();
+ let addr = listener.local_addr().unwrap();
+ let (tx, rx) = local_sync::oneshot::channel();
+ monoio::spawn(async move {
+ let (mut socket, active_addr) = listener.accept().await.unwrap();
+ socket.read_exact(vec![0; 2]).await.0.unwrap();
+ assert!(tx.send(active_addr).is_ok());
+ });
+ let opts = monoio::net::TcpConnectOpts::default().tcp_fast_open(true);
+ let mut active = TcpStream::connect_addr_with_config(addr, &opts)
+ .await
+ .unwrap();
+ active.write_all(b"hi").await.0.unwrap();
+ let active_addr = rx.await.unwrap();
+ assert_eq!(active.local_addr().unwrap(), active_addr);
+}
diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs
index 98dd3a1..3bcbaac 100644
--- a/monoio/src/driver/legacy/mod.rs
+++ b/monoio/src/driver/legacy/mod.rs
@@ -24,7 +24,7 @@ mod waker;
pub(crate) use waker::UnparkHandle;
pub(crate) struct LegacyInner {
- io_dispatch: Slab<ScheduledIo>,
+ pub(crate) io_dispatch: Slab<ScheduledIo>,
events: Option<mio::Events>,
poll: mio::Poll,
diff --git a/monoio/src/driver/legacy/scheduled_io.rs b/monoio/src/driver/legacy/scheduled_io.rs
index 16a91dc..ce6ab94 100644
--- a/monoio/src/driver/legacy/scheduled_io.rs
+++ b/monoio/src/driver/legacy/scheduled_io.rs
@@ -22,6 +22,11 @@ impl Default for ScheduledIo {
}
impl ScheduledIo {
+ #[allow(unused)]
+ pub(crate) fn set_writable(&mut self) {
+ self.readiness |= Ready::WRITABLE;
+ }
+
pub(crate) fn set_readiness(&mut self, f: impl Fn(Ready) -> Ready) {
self.readiness = f(self.readiness);
}
diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs
index 75f2a34..476e679 100644
--- a/monoio/src/net/tcp/listener.rs
+++ b/monoio/src/net/tcp/listener.rs
@@ -74,11 +74,16 @@ impl TcpListener {
#[cfg(any(target_os = "linux", target_os = "android"))]
super::tfo::set_tcp_fastopen(&sys_listener, opts.backlog)?;
#[cfg(any(target_os = "ios", target_os = "macos"))]
- super::tfo::set_tcp_fastopen(&sys_listener)?;
+ let _ = super::tfo::set_tcp_fastopen_force_enable(&sys_listener);
}
sys_listener.bind(&addr)?;
sys_listener.listen(opts.backlog)?;
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ if opts.tcp_fast_open {
+ super::tfo::set_tcp_fastopen(&sys_listener)?;
+ }
+
#[cfg(unix)]
let fd = SharedFd::new(sys_listener.into_raw_fd())?;
diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs
index 52e0746..359fc83 100644
--- a/monoio/src/net/tcp/stream.rs
+++ b/monoio/src/net/tcp/stream.rs
@@ -136,6 +136,23 @@ impl TcpStream {
let stream = TcpStream::from_shared_fd(completion.data.fd);
// wait write ready on epoll branch
if crate::driver::op::is_legacy() {
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ if !tfo {
+ stream.writable(true).await?;
+ } else {
+ // set writable as init state
+ crate::driver::CURRENT.with(|inner| match inner {
+ crate::driver::Inner::Legacy(inner) => {
+ let idx = stream.fd.registered_index().unwrap();
+ if let Some(mut readiness) =
+ unsafe { &mut *inner.get() }.io_dispatch.get(idx)
+ {
+ readiness.set_writable();
+ }
+ }
+ })
+ }
+ #[cfg(not(any(target_os = "ios", target_os = "macos")))]
stream.writable(true).await?;
// getsockopt libc::SO_ERROR