diff options
| author | ihc童鞋@提不起劲 <[email protected]> | 2023-07-07 17:07:06 +0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-07-07 17:07:06 +0800 |
| commit | c9e8c8c02e8635f21eb2f712fdf2fc3a456052ae (patch) | |
| tree | b6bb3da4952e002f833337cfdcb15581977f08c0 | |
| parent | d3a06896855b5e8130993ac759654bc9cfc9b86f (diff) | |
feat: support UnixSeqpacket and its listener (#187)
| -rw-r--r-- | monoio/src/net/unix/mod.rs | 4 | ||||
| -rw-r--r-- | monoio/src/net/unix/seq_packet/listener.rs | 92 | ||||
| -rw-r--r-- | monoio/src/net/unix/seq_packet/mod.rs | 161 | ||||
| -rw-r--r-- | monoio/src/net/unix/socket_addr.rs | 18 | ||||
| -rw-r--r-- | monoio/tests/unix_seqpacket.rs | 22 |
5 files changed, 296 insertions, 1 deletions
diff --git a/monoio/src/net/unix/mod.rs b/monoio/src/net/unix/mod.rs index a55a4f4..82f2453 100644 --- a/monoio/src/net/unix/mod.rs +++ b/monoio/src/net/unix/mod.rs @@ -9,9 +9,13 @@ mod split; mod stream; mod ucred; +#[cfg(target_os = "linux")] +mod seq_packet; pub use datagram::UnixDatagram; pub use listener::UnixListener; pub use pipe::{new_pipe, Pipe}; +#[cfg(target_os = "linux")] +pub use seq_packet::{UnixSeqpacket, UnixSeqpacketListener}; pub use socket_addr::SocketAddr; pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf}; pub use stream::UnixStream; diff --git a/monoio/src/net/unix/seq_packet/listener.rs b/monoio/src/net/unix/seq_packet/listener.rs new file mode 100644 index 0000000..c40af0d --- /dev/null +++ b/monoio/src/net/unix/seq_packet/listener.rs @@ -0,0 +1,92 @@ +use std::{ + future::Future, + io, + os::fd::{AsRawFd, RawFd}, + path::Path, +}; + +use super::UnixSeqpacket; +use crate::{ + driver::{op::Op, shared_fd::SharedFd}, + io::stream::Stream, + net::{ + new_socket, + unix::{socket_addr::socket_addr, SocketAddr}, + }, +}; + +const DEFAULT_BACKLOG: libc::c_int = 128; + +/// Listner for UnixSeqpacket +pub struct UnixSeqpacketListener { + fd: SharedFd, +} + +impl UnixSeqpacketListener { + /// Creates a new `UnixSeqpacketListener` bound to the specified path with custom backlog + pub fn bind_with_backlog<P: AsRef<Path>>(path: P, backlog: libc::c_int) -> io::Result<Self> { + let (addr, addr_len) = socket_addr(path.as_ref())?; + let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?; + crate::syscall!(bind(socket, &addr as *const _ as *const _, addr_len))?; + crate::syscall!(listen(socket, backlog))?; + Ok(Self { + fd: SharedFd::new(socket)?, + }) + } + + /// Creates a new `UnixSeqpacketListener` bound to the specified path with default backlog(128) + #[inline] + pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> { + Self::bind_with_backlog(path, DEFAULT_BACKLOG) + } + + /// Accept a UnixSeqpacket + pub async fn accept(&self) -> io::Result<(UnixSeqpacket, SocketAddr)> { + let op = Op::accept(&self.fd)?; + + // Await the completion of the event + let completion = op.await; + + // Convert fd + let fd = completion.meta.result?; + + // Construct stream + let stream = UnixSeqpacket::from_shared_fd(SharedFd::new(fd as _)?); + + // Construct SocketAddr + let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) }; + let storage: *mut libc::sockaddr_storage = &mut storage as *mut _; + let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() }; + let raw_addr_len = completion.data.addr.1; + + let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len); + + Ok((stream, addr)) + } +} + +impl AsRawFd for UnixSeqpacketListener { + #[inline] + fn as_raw_fd(&self) -> RawFd { + self.fd.raw_fd() + } +} + +impl std::fmt::Debug for UnixSeqpacketListener { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UnixSeqpacketListener") + .field("fd", &self.fd) + .finish() + } +} + +impl Stream for UnixSeqpacketListener { + type Item = io::Result<(UnixSeqpacket, SocketAddr)>; + + type NextFuture<'a> = impl Future<Output = Option<Self::Item>> + 'a; + + #[inline] + fn next(&mut self) -> Self::NextFuture<'_> { + async move { Some(self.accept().await) } + } +} diff --git a/monoio/src/net/unix/seq_packet/mod.rs b/monoio/src/net/unix/seq_packet/mod.rs new file mode 100644 index 0000000..1c48bbe --- /dev/null +++ b/monoio/src/net/unix/seq_packet/mod.rs @@ -0,0 +1,161 @@ +//! UnixSeqpacket related. +//! Only available on linux. + +use std::{ + io, + os::unix::prelude::{AsRawFd, RawFd}, + path::Path, +}; + +use super::{ + socket_addr::{local_addr, pair, peer_addr, socket_addr}, + SocketAddr, +}; +use crate::{ + buf::{IoBuf, IoBufMut}, + driver::{op::Op, shared_fd::SharedFd}, + net::new_socket, +}; + +mod listener; +pub use listener::UnixSeqpacketListener; + +/// UnixSeqpacket +pub struct UnixSeqpacket { + fd: SharedFd, +} + +impl UnixSeqpacket { + pub(crate) fn from_shared_fd(fd: SharedFd) -> Self { + Self { fd } + } + + /// Creates an unnamed pair of connected sockets. + pub fn pair() -> io::Result<(Self, Self)> { + let (a, b) = pair(libc::SOCK_SEQPACKET)?; + Ok(( + Self::from_shared_fd(SharedFd::new(a)?), + Self::from_shared_fd(SharedFd::new(b)?), + )) + } + + /// Connects the socket to the specified address. + pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> { + let (addr, addr_len) = socket_addr(path.as_ref())?; + Self::inner_connect(addr, addr_len).await + } + + /// Connects the socket to an address. + pub async fn connect_addr(addr: SocketAddr) -> io::Result<Self> { + let (addr, addr_len) = addr.into_parts(); + Self::inner_connect(addr, addr_len).await + } + + #[inline(always)] + async fn inner_connect( + sockaddr: libc::sockaddr_un, + socklen: libc::socklen_t, + ) -> io::Result<Self> { + let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?; + let op = Op::connect_unix(SharedFd::new(socket)?, sockaddr, socklen)?; + let completion = op.await; + completion.meta.result?; + + Ok(Self::from_shared_fd(completion.data.fd)) + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + local_addr(self.as_raw_fd()) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + peer_addr(self.as_raw_fd()) + } + + /// Wait for read readiness. + /// Note: Do not use it before every io. It is different from other runtimes! + /// + /// Everytime call to this method may pay a syscall cost. + /// In uring impl, it will push a PollAdd op; in epoll impl, it will use use + /// inner readiness state; if !relaxed, it will call syscall poll after that. + /// + /// If relaxed, on legacy driver it may return false positive result. + /// If you want to do io by your own, you must maintain io readiness and wait + /// for io ready with relaxed=false. + pub async fn readable(&self, relaxed: bool) -> io::Result<()> { + let op = Op::poll_read(&self.fd, relaxed).unwrap(); + op.wait().await + } + + /// Wait for write readiness. + /// Note: Do not use it before every io. It is different from other runtimes! + /// + /// Everytime call to this method may pay a syscall cost. + /// In uring impl, it will push a PollAdd op; in epoll impl, it will use use + /// inner readiness state; if !relaxed, it will call syscall poll after that. + /// + /// If relaxed, on legacy driver it may return false positive result. + /// If you want to do io by your own, you must maintain io readiness and wait + /// for io ready with relaxed=false. + pub async fn writable(&self, relaxed: bool) -> io::Result<()> { + let op = Op::poll_write(&self.fd, relaxed).unwrap(); + op.wait().await + } + + /// Sends data on the socket to the given address. On success, returns the + /// number of bytes written. + pub async fn send_to<T: IoBuf, P: AsRef<Path>>( + &self, + buf: T, + path: P, + ) -> crate::BufResult<usize, T> { + let addr = match crate::net::unix::socket_addr::socket_addr(path.as_ref()) { + Ok(addr) => addr, + Err(e) => return (Err(e), buf), + }; + let op = Op::send_msg_unix( + self.fd.clone(), + buf, + Some(SocketAddr::from_parts(addr.0, addr.1)), + ) + .unwrap(); + op.wait().await + } + + /// Receives a single datagram message on the socket. On success, returns the number + /// of bytes read and the origin. + pub async fn recv_from<T: IoBufMut>(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> { + let op = Op::recv_msg_unix(self.fd.clone(), buf).unwrap(); + op.wait().await + } + + /// Sends data on the socket to the remote address to which it is connected. + pub async fn send<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> { + let op = Op::send_msg_unix(self.fd.clone(), buf, None).unwrap(); + op.wait().await + } + + /// Receives a single datagram message on the socket from the remote address to + /// which it is connected. On success, returns the number of bytes read. + pub async fn recv<T: IoBufMut>(&self, buf: T) -> crate::BufResult<usize, T> { + let op = Op::recv(self.fd.clone(), buf).unwrap(); + op.read().await + } +} + +impl AsRawFd for UnixSeqpacket { + #[inline] + fn as_raw_fd(&self) -> RawFd { + self.fd.raw_fd() + } +} + +impl std::fmt::Debug for UnixSeqpacket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UnixSeqpacket") + .field("fd", &self.fd) + .finish() + } +} diff --git a/monoio/src/net/unix/socket_addr.rs b/monoio/src/net/unix/socket_addr.rs index 7169552..7cd1513 100644 --- a/monoio/src/net/unix/socket_addr.rs +++ b/monoio/src/net/unix/socket_addr.rs @@ -214,9 +214,25 @@ pub(crate) fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)> where T: FromRawFd, { - #[cfg(not(any(target_os = "ios", target_os = "macos")))] + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd" + ))] let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC; + #[cfg(target_os = "linux")] + let flags = { + if crate::driver::op::is_legacy() { + flags | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK + } else { + flags | libc::SOCK_CLOEXEC + } + }; + let mut fds = [-1; 2]; crate::syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?; let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) }; diff --git a/monoio/tests/unix_seqpacket.rs b/monoio/tests/unix_seqpacket.rs new file mode 100644 index 0000000..65fc21e --- /dev/null +++ b/monoio/tests/unix_seqpacket.rs @@ -0,0 +1,22 @@ +#[cfg(target_os = "linux")] +#[monoio::test_all] +async fn test_seqpacket() -> std::io::Result<()> { + use monoio::net::unix::{UnixSeqpacket, UnixSeqpacketListener}; + + let dir = tempfile::Builder::new() + .prefix("monoio-unix-seqpacket-tests") + .tempdir() + .unwrap(); + let sock_path = dir.path().join("seqpacket.sock"); + + let listener = UnixSeqpacketListener::bind(&sock_path).unwrap(); + monoio::spawn(async move { + let (conn, _addr) = listener.accept().await.unwrap(); + let (res, buf) = conn.recv(vec![0; 100]).await; + assert_eq!(res.unwrap(), 5); + assert_eq!(buf, b"hello"); + }); + let conn = UnixSeqpacket::connect(&sock_path).await.unwrap(); + conn.send(b"hello").await.0.unwrap(); + Ok(()) +} |
