summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorihc童鞋@提不起劲 <[email protected]>2023-07-07 17:07:06 +0800
committerGitHub <[email protected]>2023-07-07 17:07:06 +0800
commitc9e8c8c02e8635f21eb2f712fdf2fc3a456052ae (patch)
treeb6bb3da4952e002f833337cfdcb15581977f08c0
parentd3a06896855b5e8130993ac759654bc9cfc9b86f (diff)
feat: support UnixSeqpacket and its listener (#187)
-rw-r--r--monoio/src/net/unix/mod.rs4
-rw-r--r--monoio/src/net/unix/seq_packet/listener.rs92
-rw-r--r--monoio/src/net/unix/seq_packet/mod.rs161
-rw-r--r--monoio/src/net/unix/socket_addr.rs18
-rw-r--r--monoio/tests/unix_seqpacket.rs22
5 files changed, 296 insertions, 1 deletions
diff --git a/monoio/src/net/unix/mod.rs b/monoio/src/net/unix/mod.rs
index a55a4f4..82f2453 100644
--- a/monoio/src/net/unix/mod.rs
+++ b/monoio/src/net/unix/mod.rs
@@ -9,9 +9,13 @@ mod split;
mod stream;
mod ucred;
+#[cfg(target_os = "linux")]
+mod seq_packet;
pub use datagram::UnixDatagram;
pub use listener::UnixListener;
pub use pipe::{new_pipe, Pipe};
+#[cfg(target_os = "linux")]
+pub use seq_packet::{UnixSeqpacket, UnixSeqpacketListener};
pub use socket_addr::SocketAddr;
pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf};
pub use stream::UnixStream;
diff --git a/monoio/src/net/unix/seq_packet/listener.rs b/monoio/src/net/unix/seq_packet/listener.rs
new file mode 100644
index 0000000..c40af0d
--- /dev/null
+++ b/monoio/src/net/unix/seq_packet/listener.rs
@@ -0,0 +1,92 @@
+use std::{
+ future::Future,
+ io,
+ os::fd::{AsRawFd, RawFd},
+ path::Path,
+};
+
+use super::UnixSeqpacket;
+use crate::{
+ driver::{op::Op, shared_fd::SharedFd},
+ io::stream::Stream,
+ net::{
+ new_socket,
+ unix::{socket_addr::socket_addr, SocketAddr},
+ },
+};
+
+const DEFAULT_BACKLOG: libc::c_int = 128;
+
+/// Listner for UnixSeqpacket
+pub struct UnixSeqpacketListener {
+ fd: SharedFd,
+}
+
+impl UnixSeqpacketListener {
+ /// Creates a new `UnixSeqpacketListener` bound to the specified path with custom backlog
+ pub fn bind_with_backlog<P: AsRef<Path>>(path: P, backlog: libc::c_int) -> io::Result<Self> {
+ let (addr, addr_len) = socket_addr(path.as_ref())?;
+ let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?;
+ crate::syscall!(bind(socket, &addr as *const _ as *const _, addr_len))?;
+ crate::syscall!(listen(socket, backlog))?;
+ Ok(Self {
+ fd: SharedFd::new(socket)?,
+ })
+ }
+
+ /// Creates a new `UnixSeqpacketListener` bound to the specified path with default backlog(128)
+ #[inline]
+ pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
+ Self::bind_with_backlog(path, DEFAULT_BACKLOG)
+ }
+
+ /// Accept a UnixSeqpacket
+ pub async fn accept(&self) -> io::Result<(UnixSeqpacket, SocketAddr)> {
+ let op = Op::accept(&self.fd)?;
+
+ // Await the completion of the event
+ let completion = op.await;
+
+ // Convert fd
+ let fd = completion.meta.result?;
+
+ // Construct stream
+ let stream = UnixSeqpacket::from_shared_fd(SharedFd::new(fd as _)?);
+
+ // Construct SocketAddr
+ let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) };
+ let storage: *mut libc::sockaddr_storage = &mut storage as *mut _;
+ let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() };
+ let raw_addr_len = completion.data.addr.1;
+
+ let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len);
+
+ Ok((stream, addr))
+ }
+}
+
+impl AsRawFd for UnixSeqpacketListener {
+ #[inline]
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd.raw_fd()
+ }
+}
+
+impl std::fmt::Debug for UnixSeqpacketListener {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("UnixSeqpacketListener")
+ .field("fd", &self.fd)
+ .finish()
+ }
+}
+
+impl Stream for UnixSeqpacketListener {
+ type Item = io::Result<(UnixSeqpacket, SocketAddr)>;
+
+ type NextFuture<'a> = impl Future<Output = Option<Self::Item>> + 'a;
+
+ #[inline]
+ fn next(&mut self) -> Self::NextFuture<'_> {
+ async move { Some(self.accept().await) }
+ }
+}
diff --git a/monoio/src/net/unix/seq_packet/mod.rs b/monoio/src/net/unix/seq_packet/mod.rs
new file mode 100644
index 0000000..1c48bbe
--- /dev/null
+++ b/monoio/src/net/unix/seq_packet/mod.rs
@@ -0,0 +1,161 @@
+//! UnixSeqpacket related.
+//! Only available on linux.
+
+use std::{
+ io,
+ os::unix::prelude::{AsRawFd, RawFd},
+ path::Path,
+};
+
+use super::{
+ socket_addr::{local_addr, pair, peer_addr, socket_addr},
+ SocketAddr,
+};
+use crate::{
+ buf::{IoBuf, IoBufMut},
+ driver::{op::Op, shared_fd::SharedFd},
+ net::new_socket,
+};
+
+mod listener;
+pub use listener::UnixSeqpacketListener;
+
+/// UnixSeqpacket
+pub struct UnixSeqpacket {
+ fd: SharedFd,
+}
+
+impl UnixSeqpacket {
+ pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
+ Self { fd }
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ pub fn pair() -> io::Result<(Self, Self)> {
+ let (a, b) = pair(libc::SOCK_SEQPACKET)?;
+ Ok((
+ Self::from_shared_fd(SharedFd::new(a)?),
+ Self::from_shared_fd(SharedFd::new(b)?),
+ ))
+ }
+
+ /// Connects the socket to the specified address.
+ pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
+ let (addr, addr_len) = socket_addr(path.as_ref())?;
+ Self::inner_connect(addr, addr_len).await
+ }
+
+ /// Connects the socket to an address.
+ pub async fn connect_addr(addr: SocketAddr) -> io::Result<Self> {
+ let (addr, addr_len) = addr.into_parts();
+ Self::inner_connect(addr, addr_len).await
+ }
+
+ #[inline(always)]
+ async fn inner_connect(
+ sockaddr: libc::sockaddr_un,
+ socklen: libc::socklen_t,
+ ) -> io::Result<Self> {
+ let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?;
+ let op = Op::connect_unix(SharedFd::new(socket)?, sockaddr, socklen)?;
+ let completion = op.await;
+ completion.meta.result?;
+
+ Ok(Self::from_shared_fd(completion.data.fd))
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ local_addr(self.as_raw_fd())
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ peer_addr(self.as_raw_fd())
+ }
+
+ /// Wait for read readiness.
+ /// Note: Do not use it before every io. It is different from other runtimes!
+ ///
+ /// Everytime call to this method may pay a syscall cost.
+ /// In uring impl, it will push a PollAdd op; in epoll impl, it will use use
+ /// inner readiness state; if !relaxed, it will call syscall poll after that.
+ ///
+ /// If relaxed, on legacy driver it may return false positive result.
+ /// If you want to do io by your own, you must maintain io readiness and wait
+ /// for io ready with relaxed=false.
+ pub async fn readable(&self, relaxed: bool) -> io::Result<()> {
+ let op = Op::poll_read(&self.fd, relaxed).unwrap();
+ op.wait().await
+ }
+
+ /// Wait for write readiness.
+ /// Note: Do not use it before every io. It is different from other runtimes!
+ ///
+ /// Everytime call to this method may pay a syscall cost.
+ /// In uring impl, it will push a PollAdd op; in epoll impl, it will use use
+ /// inner readiness state; if !relaxed, it will call syscall poll after that.
+ ///
+ /// If relaxed, on legacy driver it may return false positive result.
+ /// If you want to do io by your own, you must maintain io readiness and wait
+ /// for io ready with relaxed=false.
+ pub async fn writable(&self, relaxed: bool) -> io::Result<()> {
+ let op = Op::poll_write(&self.fd, relaxed).unwrap();
+ op.wait().await
+ }
+
+ /// Sends data on the socket to the given address. On success, returns the
+ /// number of bytes written.
+ pub async fn send_to<T: IoBuf, P: AsRef<Path>>(
+ &self,
+ buf: T,
+ path: P,
+ ) -> crate::BufResult<usize, T> {
+ let addr = match crate::net::unix::socket_addr::socket_addr(path.as_ref()) {
+ Ok(addr) => addr,
+ Err(e) => return (Err(e), buf),
+ };
+ let op = Op::send_msg_unix(
+ self.fd.clone(),
+ buf,
+ Some(SocketAddr::from_parts(addr.0, addr.1)),
+ )
+ .unwrap();
+ op.wait().await
+ }
+
+ /// Receives a single datagram message on the socket. On success, returns the number
+ /// of bytes read and the origin.
+ pub async fn recv_from<T: IoBufMut>(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> {
+ let op = Op::recv_msg_unix(self.fd.clone(), buf).unwrap();
+ op.wait().await
+ }
+
+ /// Sends data on the socket to the remote address to which it is connected.
+ pub async fn send<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
+ let op = Op::send_msg_unix(self.fd.clone(), buf, None).unwrap();
+ op.wait().await
+ }
+
+ /// Receives a single datagram message on the socket from the remote address to
+ /// which it is connected. On success, returns the number of bytes read.
+ pub async fn recv<T: IoBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
+ let op = Op::recv(self.fd.clone(), buf).unwrap();
+ op.read().await
+ }
+}
+
+impl AsRawFd for UnixSeqpacket {
+ #[inline]
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd.raw_fd()
+ }
+}
+
+impl std::fmt::Debug for UnixSeqpacket {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("UnixSeqpacket")
+ .field("fd", &self.fd)
+ .finish()
+ }
+}
diff --git a/monoio/src/net/unix/socket_addr.rs b/monoio/src/net/unix/socket_addr.rs
index 7169552..7cd1513 100644
--- a/monoio/src/net/unix/socket_addr.rs
+++ b/monoio/src/net/unix/socket_addr.rs
@@ -214,9 +214,25 @@ pub(crate) fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)>
where
T: FromRawFd,
{
- #[cfg(not(any(target_os = "ios", target_os = "macos")))]
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "illumos",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
+ #[cfg(target_os = "linux")]
+ let flags = {
+ if crate::driver::op::is_legacy() {
+ flags | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK
+ } else {
+ flags | libc::SOCK_CLOEXEC
+ }
+ };
+
let mut fds = [-1; 2];
crate::syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?;
let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) };
diff --git a/monoio/tests/unix_seqpacket.rs b/monoio/tests/unix_seqpacket.rs
new file mode 100644
index 0000000..65fc21e
--- /dev/null
+++ b/monoio/tests/unix_seqpacket.rs
@@ -0,0 +1,22 @@
+#[cfg(target_os = "linux")]
+#[monoio::test_all]
+async fn test_seqpacket() -> std::io::Result<()> {
+ use monoio::net::unix::{UnixSeqpacket, UnixSeqpacketListener};
+
+ let dir = tempfile::Builder::new()
+ .prefix("monoio-unix-seqpacket-tests")
+ .tempdir()
+ .unwrap();
+ let sock_path = dir.path().join("seqpacket.sock");
+
+ let listener = UnixSeqpacketListener::bind(&sock_path).unwrap();
+ monoio::spawn(async move {
+ let (conn, _addr) = listener.accept().await.unwrap();
+ let (res, buf) = conn.recv(vec![0; 100]).await;
+ assert_eq!(res.unwrap(), 5);
+ assert_eq!(buf, b"hello");
+ });
+ let conn = UnixSeqpacket::connect(&sock_path).await.unwrap();
+ conn.send(b"hello").await.0.unwrap();
+ Ok(())
+}