summaryrefslogtreecommitdiff
path: root/monoio/src/driver/op/recv.rs
diff options
context:
space:
mode:
Diffstat (limited to 'monoio/src/driver/op/recv.rs')
-rw-r--r--monoio/src/driver/op/recv.rs80
1 files changed, 79 insertions, 1 deletions
diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs
index 3f8885f..ce6ecc8 100644
--- a/monoio/src/driver/op/recv.rs
+++ b/monoio/src/driver/op/recv.rs
@@ -13,7 +13,7 @@ use {
};
use super::{super::shared_fd::SharedFd, Op, OpAble};
-use crate::{buf::IoBufMut, BufResult};
+use crate::{buf::IoBufMut, net::unix::SocketAddr as UnixSocketAddr, BufResult};
pub(crate) struct Recv<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
@@ -181,3 +181,81 @@ impl<T: IoBufMut> OpAble for RecvMsg<T> {
syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0))
}
}
+
+pub(crate) struct RecvMsgUnix<T> {
+ /// Holds a strong ref to the FD, preventing the file from being closed
+ /// while the operation is in-flight.
+ #[allow(unused)]
+ fd: SharedFd,
+
+ /// Reference to the in-flight buffer.
+ pub(crate) buf: T,
+ pub(crate) info: Box<(
+ MaybeUninit<libc::sockaddr_storage>,
+ [libc::iovec; 1],
+ libc::msghdr,
+ )>,
+}
+
+impl<T: IoBufMut> Op<RecvMsgUnix<T>> {
+ pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result<Self> {
+ let iovec = [libc::iovec {
+ iov_base: buf.write_ptr() as *mut _,
+ iov_len: buf.bytes_total(),
+ }];
+ let mut info: Box<(
+ MaybeUninit<libc::sockaddr_storage>,
+ [libc::iovec; 1],
+ libc::msghdr,
+ )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() }));
+
+ info.2.msg_iov = info.1.as_mut_ptr();
+ info.2.msg_iovlen = 1;
+ info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void;
+ info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
+
+ Op::submit_with(RecvMsgUnix { fd, buf, info })
+ }
+
+ pub(crate) async fn wait(self) -> BufResult<(usize, UnixSocketAddr), T> {
+ let complete = self.await;
+ let res = complete.meta.result.map(|v| v as _);
+ let mut buf = complete.data.buf;
+
+ let res = res.map(|n| {
+ let storage = unsafe { complete.data.info.0.assume_init() };
+ let name_len = complete.data.info.2.msg_namelen;
+
+ let addr = unsafe {
+ let addr: &libc::sockaddr_un = transmute(&storage);
+ UnixSocketAddr::from_parts(*addr, name_len)
+ };
+
+ // Safety: the kernel wrote `n` bytes to the buffer.
+ unsafe {
+ buf.set_init(n);
+ }
+
+ (n, addr)
+ });
+ (res, buf)
+ }
+}
+
+impl<T: IoBufMut> OpAble for RecvMsgUnix<T> {
+ #[cfg(all(target_os = "linux", feature = "iouring"))]
+ fn uring_op(&mut self) -> io_uring::squeue::Entry {
+ opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _).build()
+ }
+
+ #[cfg(all(unix, feature = "legacy"))]
+ fn legacy_interest(&self) -> Option<(Direction, usize)> {
+ self.fd.registered_index().map(|idx| (Direction::Read, idx))
+ }
+
+ #[cfg(all(unix, feature = "legacy"))]
+ fn legacy_call(&mut self) -> io::Result<u32> {
+ let fd = self.fd.as_raw_fd();
+ syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0))
+ }
+}