summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorihc童鞋@提不起劲 <[email protected]>2023-04-28 14:23:29 +0800
committerGitHub <[email protected]>2023-04-28 14:23:29 +0800
commitc1b854aed9a81e3dab2bfe3f06674124cceb01ed (patch)
treed46f7bf078f1c53b0af37e13916a8b0e309e8cee
parentef21bca117e057676b435702b463b7a50ed6fb67 (diff)
feat: support TCP_FASTOPEN (#161)
-rw-r--r--monoio/src/driver/op/connect.rs52
-rw-r--r--monoio/src/net/listener_config.rs32
-rw-r--r--monoio/src/net/mod.rs6
-rw-r--r--monoio/src/net/tcp/listener.rs38
-rw-r--r--monoio/src/net/tcp/mod.rs3
-rw-r--r--monoio/src/net/tcp/stream.rs90
-rw-r--r--monoio/src/net/tcp/tfo/linux.rs48
-rw-r--r--monoio/src/net/tcp/tfo/macos.rs30
-rw-r--r--monoio/src/net/tcp/tfo/mod.rs11
-rw-r--r--monoio/src/net/udp.rs2
-rw-r--r--monoio/src/net/unix/listener.rs6
-rw-r--r--monoio/tests/fs_file.rs2
-rw-r--r--monoio/tests/tcp_accept.rs6
-rw-r--r--monoio/tests/tcp_echo.rs24
-rw-r--r--monoio/tests/tcp_into_split.rs2
-rw-r--r--monoio/tests/udp.rs1
-rw-r--r--monoio/tests/uds_split.rs1
-rw-r--r--monoio/tests/uds_stream.rs4
18 files changed, 305 insertions, 53 deletions
diff --git a/monoio/src/driver/op/connect.rs b/monoio/src/driver/op/connect.rs
index a41b741..a7b6902 100644
--- a/monoio/src/driver/op/connect.rs
+++ b/monoio/src/driver/op/connect.rs
@@ -11,22 +11,25 @@ pub(crate) struct Connect {
pub(crate) fd: SharedFd,
socket_addr: Box<SocketAddrCRepr>,
socket_addr_len: libc::socklen_t,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ tfo: bool,
}
impl Op<Connect> {
/// Submit a request to connect.
- pub(crate) fn connect(socket: SharedFd, addr: SocketAddr) -> io::Result<Op<Connect>> {
- #[cfg(unix)]
- {
- let (raw_addr, raw_addr_length) = socket_addr(&addr);
- Op::submit_with(Connect {
- fd: socket,
- socket_addr: Box::new(raw_addr),
- socket_addr_len: raw_addr_length,
- })
- }
- #[cfg(windows)]
- unimplemented!()
+ pub(crate) fn connect(
+ socket: SharedFd,
+ addr: SocketAddr,
+ _tfo: bool,
+ ) -> io::Result<Op<Connect>> {
+ let (raw_addr, raw_addr_length) = socket_addr(&addr);
+ Op::submit_with(Connect {
+ fd: socket,
+ socket_addr: Box::new(raw_addr),
+ socket_addr_len: raw_addr_length,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ tfo: _tfo,
+ })
}
}
@@ -48,6 +51,31 @@ impl OpAble for Connect {
#[cfg(all(unix, feature = "legacy"))]
fn legacy_call(&mut self) -> io::Result<u32> {
+ // For ios/macos, if tfo is enabled, we will
+ // call connectx here.
+ // For linux/android, we have already set socket
+ // via set_tcp_fastopen_connect.
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ if self.tfo {
+ let mut endpoints: libc::sa_endpoints_t = unsafe { std::mem::zeroed() };
+ endpoints.sae_dstaddr = self.socket_addr.as_ptr();
+ endpoints.sae_dstaddrlen = self.socket_addr_len;
+
+ return match crate::syscall_u32!(connectx(
+ self.fd.raw_fd(),
+ &endpoints as *const _,
+ libc::SAE_ASSOCID_ANY,
+ libc::CONNECT_DATA_IDEMPOTENT | libc::CONNECT_RESUME_ON_READ_WRITE,
+ std::ptr::null(),
+ 0,
+ std::ptr::null_mut(),
+ std::ptr::null_mut(),
+ )) {
+ Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
+ _ => Ok(self.fd.raw_fd() as u32),
+ };
+ }
+
match crate::syscall_u32!(connect(
self.fd.raw_fd(),
self.socket_addr.as_ptr(),
diff --git a/monoio/src/net/listener_config.rs b/monoio/src/net/listener_config.rs
index 955c2f2..056850b 100644
--- a/monoio/src/net/listener_config.rs
+++ b/monoio/src/net/listener_config.rs
@@ -1,6 +1,7 @@
-/// Custom listener config
+/// Custom listener options
#[derive(Debug, Clone, Copy)]
-pub struct ListenerConfig {
+#[non_exhaustive]
+pub struct ListenerOpts {
/// Whether to enable reuse_port.
pub reuse_port: bool,
/// Whether to enable reuse_addr.
@@ -11,22 +12,31 @@ pub struct ListenerConfig {
pub send_buf_size: Option<usize>,
/// Recv buffer size or None to use default.
pub recv_buf_size: Option<usize>,
+ /// TCP fast open.
+ pub tcp_fast_open: bool,
}
-impl Default for ListenerConfig {
+impl Default for ListenerOpts {
#[inline]
fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ListenerOpts {
+ /// Create a default ListenerOpts.
+ #[inline]
+ pub const fn new() -> Self {
Self {
reuse_port: true,
reuse_addr: true,
backlog: 1024,
send_buf_size: None,
recv_buf_size: None,
+ tcp_fast_open: false,
}
}
-}
-impl ListenerConfig {
/// Enable SO_REUSEPORT
#[must_use]
#[inline]
@@ -66,4 +76,16 @@ impl ListenerConfig {
self.recv_buf_size = Some(recv_buf_size);
self
}
+
+ /// Specify FastOpen.
+ /// Note: if it is enabled, the connection will be
+ /// established on first peer data sent, which means
+ /// data cannot be sent immediately after connection
+ /// accepted if client does not send something.
+ #[must_use]
+ #[inline]
+ pub fn tcp_fast_open(mut self, fast_open: bool) -> Self {
+ self.tcp_fast_open = fast_open;
+ self
+ }
}
diff --git a/monoio/src/net/mod.rs b/monoio/src/net/mod.rs
index 8825c86..5c26fb2 100644
--- a/monoio/src/net/mod.rs
+++ b/monoio/src/net/mod.rs
@@ -7,8 +7,10 @@ pub mod udp;
#[cfg(unix)]
pub mod unix;
-pub use listener_config::ListenerConfig;
-pub use tcp::{TcpListener, TcpStream};
+pub use listener_config::ListenerOpts;
+#[deprecated(since = "0.2.0", note = "use ListenerOpts")]
+pub use listener_config::ListenerOpts as ListenerConfig;
+pub use tcp::{TcpConnectOpts, TcpListener, TcpStream};
#[cfg(unix)]
pub use unix::{Pipe, UnixDatagram, UnixListener, UnixStream};
diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs
index aa192b6..75f2a34 100644
--- a/monoio/src/net/tcp/listener.rs
+++ b/monoio/src/net/tcp/listener.rs
@@ -15,7 +15,7 @@ use crate::io::CancelHandle;
use crate::{
driver::{op::Op, shared_fd::SharedFd},
io::stream::Stream,
- net::ListenerConfig,
+ net::ListenerOpts,
};
/// TcpListener
@@ -39,10 +39,7 @@ impl TcpListener {
}
/// Bind to address with config
- pub fn bind_with_config<A: ToSocketAddrs>(
- addr: A,
- config: &ListenerConfig,
- ) -> io::Result<Self> {
+ pub fn bind_with_config<A: ToSocketAddrs>(addr: A, opts: &ListenerOpts) -> io::Result<Self> {
let addr = addr
.to_socket_addrs()?
.next()
@@ -61,20 +58,26 @@ impl TcpListener {
let addr = socket2::SockAddr::from(addr);
#[cfg(unix)]
- if config.reuse_port {
+ if opts.reuse_port {
sys_listener.set_reuse_port(true)?;
}
- if config.reuse_addr {
+ if opts.reuse_addr {
sys_listener.set_reuse_address(true)?;
}
- if let Some(send_buf_size) = config.send_buf_size {
+ if let Some(send_buf_size) = opts.send_buf_size {
sys_listener.set_send_buffer_size(send_buf_size)?;
}
- if let Some(recv_buf_size) = config.recv_buf_size {
+ if let Some(recv_buf_size) = opts.recv_buf_size {
sys_listener.set_recv_buffer_size(recv_buf_size)?;
}
+ if opts.tcp_fast_open {
+ #[cfg(any(target_os = "linux", target_os = "android"))]
+ super::tfo::set_tcp_fastopen(&sys_listener, opts.backlog)?;
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ super::tfo::set_tcp_fastopen(&sys_listener)?;
+ }
sys_listener.bind(&addr)?;
- sys_listener.listen(config.backlog)?;
+ sys_listener.listen(opts.backlog)?;
#[cfg(unix)]
let fd = SharedFd::new(sys_listener.into_raw_fd())?;
@@ -87,8 +90,8 @@ impl TcpListener {
/// Bind to address
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
- let cfg = ListenerConfig::default();
- Self::bind_with_config(addr, &cfg)
+ const DEFAULT_CFG: ListenerOpts = ListenerOpts::new();
+ Self::bind_with_config(addr, &DEFAULT_CFG)
}
#[cfg(unix)]
@@ -237,6 +240,17 @@ impl TcpListener {
let op = Op::poll_read(&self.fd, relaxed).unwrap();
op.wait().await
}
+
+ /// Creates new `TcpListener` from a `std::net::TcpListener`.
+ pub fn from_std(stdl: std::net::TcpListener) -> io::Result<Self> {
+ match SharedFd::new(stdl.as_raw_fd()) {
+ Ok(shared) => {
+ stdl.into_raw_fd();
+ Ok(Self::from_shared_fd(shared))
+ }
+ Err(e) => Err(e),
+ }
+ }
}
impl Stream for TcpListener {
diff --git a/monoio/src/net/tcp/mod.rs b/monoio/src/net/tcp/mod.rs
index ab53425..4454053 100644
--- a/monoio/src/net/tcp/mod.rs
+++ b/monoio/src/net/tcp/mod.rs
@@ -4,7 +4,8 @@
mod listener;
mod split;
mod stream;
+mod tfo;
pub use listener::TcpListener;
pub use split::{TcpOwnedReadHalf, TcpOwnedWriteHalf, TcpReadHalf, TcpWriteHalf};
-pub use stream::TcpStream;
+pub use stream::{TcpConnectOpts, TcpStream};
diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs
index 600b04f..52e0746 100644
--- a/monoio/src/net/tcp/stream.rs
+++ b/monoio/src/net/tcp/stream.rs
@@ -20,6 +20,42 @@ use crate::{
},
};
+/// Custom tcp connect options
+#[derive(Debug, Clone, Copy)]
+#[non_exhaustive]
+pub struct TcpConnectOpts {
+ /// TCP fast open.
+ pub tcp_fast_open: bool,
+}
+
+impl Default for TcpConnectOpts {
+ #[inline]
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl TcpConnectOpts {
+ /// Create a default TcpConnectOpts.
+ #[inline]
+ pub const fn new() -> Self {
+ Self {
+ tcp_fast_open: false,
+ }
+ }
+
+ /// Specify FastOpen
+ /// Note: This option only works for linux 4.1+
+ /// and macos/ios 9.0+.
+ /// If it is enabled, the connection will be
+ /// established on the first call to write.
+ #[must_use]
+ #[inline]
+ pub fn tcp_fast_open(mut self, fast_open: bool) -> Self {
+ self.tcp_fast_open = fast_open;
+ self
+ }
+}
/// TcpStream
pub struct TcpStream {
fd: SharedFd,
@@ -57,14 +93,43 @@ impl TcpStream {
}
#[cfg(unix)]
- /// Establishe a connection to the specified `addr`.
+ /// Establish a connection to the specified `addr`.
+ pub async fn connect_addr(addr: SocketAddr) -> io::Result<Self> {
+ const DEFAULT_OPTS: TcpConnectOpts = TcpConnectOpts {
+ tcp_fast_open: false,
+ };
+ Self::connect_addr_with_config(addr, &DEFAULT_OPTS).await
+ }
+
+ #[cfg(windows)]
+ /// Establish a connection to the specified `addr`.
pub async fn connect_addr(addr: SocketAddr) -> io::Result<Self> {
+ unimplemented!()
+ }
+
+ /// Establish a connection to the specified `addr` with given config.
+ pub async fn connect_addr_with_config(
+ addr: SocketAddr,
+ opts: &TcpConnectOpts,
+ ) -> io::Result<Self> {
let domain = match addr {
SocketAddr::V4(_) => libc::AF_INET,
SocketAddr::V6(_) => libc::AF_INET6,
};
let socket = crate::net::new_socket(domain, libc::SOCK_STREAM)?;
- let op = Op::connect(SharedFd::new(socket)?, addr)?;
+ #[allow(unused_mut)]
+ let mut tfo = opts.tcp_fast_open;
+
+ if tfo {
+ #[cfg(any(target_os = "linux", target_os = "android"))]
+ super::tfo::try_set_tcp_fastopen_connect(&socket);
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ // if we cannot set force tcp fastopen, we will not use it.
+ if super::tfo::set_tcp_fastopen_force_enable(&socket).is_err() {
+ tfo = false;
+ }
+ }
+ let op = Op::connect(SharedFd::new(socket)?, addr, tfo)?;
let completion = op.await;
completion.meta.result?;
@@ -72,23 +137,18 @@ impl TcpStream {
// wait write ready on epoll branch
if crate::driver::op::is_legacy() {
stream.writable(true).await?;
- }
- // getsockopt
- let sys_socket = unsafe { std::net::TcpStream::from_raw_fd(stream.fd.raw_fd()) };
- let err = sys_socket.take_error();
- let _ = sys_socket.into_raw_fd();
- if let Some(e) = err? {
- return Err(e);
+
+ // getsockopt libc::SO_ERROR
+ let sys_socket = unsafe { std::net::TcpStream::from_raw_fd(stream.fd.raw_fd()) };
+ let err = sys_socket.take_error();
+ let _ = sys_socket.into_raw_fd();
+ if let Some(e) = err? {
+ return Err(e);
+ }
}
Ok(stream)
}
- #[cfg(windows)]
- /// Establishe a connection to the specified `addr`.
- pub async fn connect_addr(addr: SocketAddr) -> io::Result<Self> {
- unimplemented!()
- }
-
/// Return the local address that this stream is bound to.
#[inline]
pub fn local_addr(&self) -> io::Result<SocketAddr> {
diff --git a/monoio/src/net/tcp/tfo/linux.rs b/monoio/src/net/tcp/tfo/linux.rs
new file mode 100644
index 0000000..9ec162e
--- /dev/null
+++ b/monoio/src/net/tcp/tfo/linux.rs
@@ -0,0 +1,48 @@
+use std::{cell::RefCell, io, os::fd::AsRawFd};
+
+thread_local! {
+ pub(crate) static TFO_CONNECT_AVAILABLE: RefCell<bool> = RefCell::new(true);
+}
+
+/// Call before listen.
+pub(crate) fn set_tcp_fastopen<S: AsRawFd>(fd: &S, fast_open: i32) -> io::Result<()> {
+ crate::syscall!(setsockopt(
+ fd.as_raw_fd(),
+ libc::SOL_TCP,
+ libc::TCP_FASTOPEN,
+ &fast_open as *const _ as *const libc::c_void,
+ std::mem::size_of::<libc::c_int>() as libc::socklen_t
+ ))?;
+ Ok(())
+}
+
+/// Call before connect.
+/// Linux 4.1+ only.
+pub(crate) fn set_tcp_fastopen_connect<S: AsRawFd>(fd: &S) -> io::Result<()> {
+ const ENABLED: libc::c_int = 0x1;
+
+ crate::syscall!(setsockopt(
+ fd.as_raw_fd(),
+ libc::SOL_TCP,
+ libc::TCP_FASTOPEN_CONNECT,
+ &ENABLED as *const _ as *const libc::c_void,
+ std::mem::size_of::<libc::c_int>() as libc::socklen_t
+ ))?;
+ Ok(())
+}
+
+pub(crate) fn try_set_tcp_fastopen_connect<S: AsRawFd>(fd: &S) {
+ if !TFO_CONNECT_AVAILABLE.with(|f| *f.borrow()) {
+ return;
+ }
+ match set_tcp_fastopen_connect(fd) {
+ Ok(_) => (),
+ Err(e) if e.raw_os_error() == Some(libc::ENOPROTOOPT) => {
+ TFO_CONNECT_AVAILABLE.with(|f| *f.borrow_mut() = false);
+ }
+ Err(_e) => {
+ #[cfg(all(debug_assertions, feature = "debug"))]
+ tracing::warn!("set_tcp_fastopen_connect failed: {}", _e);
+ }
+ }
+}
diff --git a/monoio/src/net/tcp/tfo/macos.rs b/monoio/src/net/tcp/tfo/macos.rs
new file mode 100644
index 0000000..3465fe5
--- /dev/null
+++ b/monoio/src/net/tcp/tfo/macos.rs
@@ -0,0 +1,30 @@
+use std::{io, os::fd::AsRawFd};
+
+/// Call before listen.
+pub(crate) fn set_tcp_fastopen<S: AsRawFd>(fd: &S) -> io::Result<()> {
+ const ENABLED: libc::c_int = 0x1;
+ crate::syscall!(setsockopt(
+ fd.as_raw_fd(),
+ libc::IPPROTO_TCP,
+ libc::TCP_FASTOPEN,
+ &ENABLED as *const _ as *const libc::c_void,
+ std::mem::size_of::<libc::c_int>() as libc::socklen_t
+ ))?;
+ Ok(())
+}
+
+/// Force use fastopen.
+/// MacOS only.
+pub(crate) fn set_tcp_fastopen_force_enable<S: AsRawFd>(fd: &S) -> io::Result<()> {
+ const TCP_FASTOPEN_FORCE_ENABLE: libc::c_int = 0x218;
+ const ENABLED: libc::c_int = 0x1;
+
+ crate::syscall!(setsockopt(
+ fd.as_raw_fd(),
+ libc::IPPROTO_TCP,
+ TCP_FASTOPEN_FORCE_ENABLE,
+ &ENABLED as *const _ as *const libc::c_void,
+ std::mem::size_of::<libc::c_int>() as libc::socklen_t
+ ))?;
+ Ok(())
+}
diff --git a/monoio/src/net/tcp/tfo/mod.rs b/monoio/src/net/tcp/tfo/mod.rs
new file mode 100644
index 0000000..1c20734
--- /dev/null
+++ b/monoio/src/net/tcp/tfo/mod.rs
@@ -0,0 +1,11 @@
+//! TCP Fast Open
+
+#[cfg(any(target_os = "ios", target_os = "macos"))]
+mod macos;
+#[cfg(any(target_os = "ios", target_os = "macos"))]
+pub(crate) use macos::{set_tcp_fastopen, set_tcp_fastopen_force_enable};
+
+#[cfg(any(target_os = "linux", target_os = "android"))]
+mod linux;
+#[cfg(any(target_os = "linux", target_os = "android"))]
+pub(crate) use linux::{set_tcp_fastopen, try_set_tcp_fastopen_connect};
diff --git a/monoio/src/net/udp.rs b/monoio/src/net/udp.rs
index 4137219..a6628d0 100644
--- a/monoio/src/net/udp.rs
+++ b/monoio/src/net/udp.rs
@@ -113,7 +113,7 @@ impl UdpSocket {
/// `recv` syscalls to be used to send data and also applies filters to only
/// receive data from the specified address.
pub async fn connect(&self, socket_addr: SocketAddr) -> io::Result<()> {
- let op = Op::connect(self.fd.clone(), socket_addr)?;
+ let op = Op::connect(self.fd.clone(), socket_addr, false)?;
let completion = op.await;
completion.meta.result?;
Ok(())
diff --git a/monoio/src/net/unix/listener.rs b/monoio/src/net/unix/listener.rs
index 039451f..7007257 100644
--- a/monoio/src/net/unix/listener.rs
+++ b/monoio/src/net/unix/listener.rs
@@ -9,7 +9,7 @@ use super::{socket_addr::SocketAddr, UnixStream};
use crate::{
driver::{op::Op, shared_fd::SharedFd},
io::{stream::Stream, CancelHandle},
- net::ListenerConfig,
+ net::ListenerOpts,
};
/// UnixListener
@@ -31,7 +31,7 @@ impl UnixListener {
/// config.
pub fn bind_with_config<P: AsRef<Path>>(
path: P,
- config: &ListenerConfig,
+ config: &ListenerOpts,
) -> io::Result<UnixListener> {
let sys_listener =
socket2::Socket::new(socket2::Domain::UNIX, socket2::Type::STREAM, None)?;
@@ -61,7 +61,7 @@ impl UnixListener {
/// Creates a new `UnixListener` bound to the specified socket with default
/// config.
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
- Self::bind_with_config(path, &ListenerConfig::default())
+ Self::bind_with_config(path, &ListenerOpts::default())
}
/// Accept
diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs
index 4f85fef..24d8f58 100644
--- a/monoio/tests/fs_file.rs
+++ b/monoio/tests/fs_file.rs
@@ -5,8 +5,10 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use monoio::fs::File;
use tempfile::NamedTempFile;
+#[cfg(unix)]
const HELLO: &[u8] = b"hello world...";
+#[cfg(unix)]
async fn read_hello(file: &File) {
let buf = Vec::with_capacity(1024);
let (res, buf) = file.read_at(buf, 0).await;
diff --git a/monoio/tests/tcp_accept.rs b/monoio/tests/tcp_accept.rs
index 29bae8c..7c31f2f 100644
--- a/monoio/tests/tcp_accept.rs
+++ b/monoio/tests/tcp_accept.rs
@@ -1,8 +1,10 @@
+#[cfg(unix)]
use std::net::{IpAddr, SocketAddr};
-use monoio::net::{TcpListener, TcpStream};
#[cfg(unix)]
+use monoio::net::{TcpListener, TcpStream};
+#[cfg(unix)]
macro_rules! test_accept {
($(($ident:ident, $target:expr),)*) => {
$(
@@ -22,8 +24,8 @@ macro_rules! test_accept {
)*
}
}
-#[cfg(unix)]
+#[cfg(unix)]
test_accept! {
(ip_str, "127.0.0.1:0"),
(host_str, "localhost:0"),
diff --git a/monoio/tests/tcp_echo.rs b/monoio/tests/tcp_echo.rs
index 3e20513..618fcc7 100644
--- a/monoio/tests/tcp_echo.rs
+++ b/monoio/tests/tcp_echo.rs
@@ -96,3 +96,27 @@ async fn rw_able() {
assert!(res.is_ok());
assert!(conn.readable(false).await.is_ok());
}
+
+#[cfg(unix)]
+#[monoio::test_all]
+async fn echo_tfo() {
+ use std::net::SocketAddr;
+
+ let bind_addr = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
+ let opts = monoio::net::ListenerOpts::default().tcp_fast_open(true);
+ let listener = TcpListener::bind_with_config(bind_addr, &opts).unwrap();
+ let addr = listener.local_addr().unwrap();
+ let (tx, rx) = local_sync::oneshot::channel();
+ monoio::spawn(async move {
+ let (mut socket, active_addr) = listener.accept().await.unwrap();
+ socket.read_exact(vec![0; 2]).await.0.unwrap();
+ assert!(tx.send(active_addr).is_ok());
+ });
+ let opts = monoio::net::TcpConnectOpts::default().tcp_fast_open(true);
+ let mut active = TcpStream::connect_addr_with_config(addr, &opts)
+ .await
+ .unwrap();
+ active.write_all(b"hi").await.0.unwrap();
+ let active_addr = rx.await.unwrap();
+ assert_eq!(active.local_addr().unwrap(), active_addr);
+}
diff --git a/monoio/tests/tcp_into_split.rs b/monoio/tests/tcp_into_split.rs
index 17b5e2f..1fd4747 100644
--- a/monoio/tests/tcp_into_split.rs
+++ b/monoio/tests/tcp_into_split.rs
@@ -1,8 +1,10 @@
+#[cfg(unix)]
use std::{
io::{Error, ErrorKind, Read, Result, Write},
net, thread,
};
+#[cfg(unix)]
use monoio::{
io::{AsyncReadRent, AsyncWriteRentExt, Splitable},
net::{TcpListener, TcpStream},
diff --git a/monoio/tests/udp.rs b/monoio/tests/udp.rs
index 76cbe95..f3ac7aa 100644
--- a/monoio/tests/udp.rs
+++ b/monoio/tests/udp.rs
@@ -1,3 +1,4 @@
+#[cfg(unix)]
use monoio::net::udp::UdpSocket;
#[cfg(unix)]
diff --git a/monoio/tests/uds_split.rs b/monoio/tests/uds_split.rs
index faf5197..ed3327a 100644
--- a/monoio/tests/uds_split.rs
+++ b/monoio/tests/uds_split.rs
@@ -1,3 +1,4 @@
+#[cfg(unix)]
use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt, Splitable};
#[cfg(unix)]
use monoio::net::UnixStream;
diff --git a/monoio/tests/uds_stream.rs b/monoio/tests/uds_stream.rs
index ae20dae..a277b1c 100644
--- a/monoio/tests/uds_stream.rs
+++ b/monoio/tests/uds_stream.rs
@@ -1,7 +1,10 @@
+#[cfg(unix)]
use futures::future::try_join;
+#[cfg(unix)]
use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt};
#[cfg(unix)]
use monoio::net::{UnixListener, UnixStream};
+
#[cfg(unix)]
#[monoio::test_all]
async fn accept_read_write() -> std::io::Result<()> {
@@ -29,6 +32,7 @@ async fn accept_read_write() -> std::io::Result<()> {
assert_eq!(len, 0);
Ok(())
}
+
#[cfg(unix)]
#[monoio::test_all]
async fn shutdown() -> std::io::Result<()> {