diff options
Diffstat (limited to 'monoio/src/driver/op/recv.rs')
| -rw-r--r-- | monoio/src/driver/op/recv.rs | 80 |
1 files changed, 79 insertions, 1 deletions
diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index 3f8885f..ce6ecc8 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -13,7 +13,7 @@ use { }; use super::{super::shared_fd::SharedFd, Op, OpAble}; -use crate::{buf::IoBufMut, BufResult}; +use crate::{buf::IoBufMut, net::unix::SocketAddr as UnixSocketAddr, BufResult}; pub(crate) struct Recv<T> { /// Holds a strong ref to the FD, preventing the file from being closed @@ -181,3 +181,81 @@ impl<T: IoBufMut> OpAble for RecvMsg<T> { syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) } } + +pub(crate) struct RecvMsgUnix<T> { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + #[allow(unused)] + fd: SharedFd, + + /// Reference to the in-flight buffer. + pub(crate) buf: T, + pub(crate) info: Box<( + MaybeUninit<libc::sockaddr_storage>, + [libc::iovec; 1], + libc::msghdr, + )>, +} + +impl<T: IoBufMut> Op<RecvMsgUnix<T>> { + pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result<Self> { + let iovec = [libc::iovec { + iov_base: buf.write_ptr() as *mut _, + iov_len: buf.bytes_total(), + }]; + let mut info: Box<( + MaybeUninit<libc::sockaddr_storage>, + [libc::iovec; 1], + libc::msghdr, + )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() })); + + info.2.msg_iov = info.1.as_mut_ptr(); + info.2.msg_iovlen = 1; + info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; + info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; + + Op::submit_with(RecvMsgUnix { fd, buf, info }) + } + + pub(crate) async fn wait(self) -> BufResult<(usize, UnixSocketAddr), T> { + let complete = self.await; + let res = complete.meta.result.map(|v| v as _); + let mut buf = complete.data.buf; + + let res = res.map(|n| { + let storage = unsafe { complete.data.info.0.assume_init() }; + let name_len = complete.data.info.2.msg_namelen; + + let addr = unsafe { + let addr: &libc::sockaddr_un = transmute(&storage); + UnixSocketAddr::from_parts(*addr, name_len) + }; + + // Safety: the kernel wrote `n` bytes to the buffer. + unsafe { + buf.set_init(n); + } + + (n, addr) + }); + (res, buf) + } +} + +impl<T: IoBufMut> OpAble for RecvMsgUnix<T> { + #[cfg(all(target_os = "linux", feature = "iouring"))] + fn uring_op(&mut self) -> io_uring::squeue::Entry { + opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _).build() + } + + #[cfg(all(unix, feature = "legacy"))] + fn legacy_interest(&self) -> Option<(Direction, usize)> { + self.fd.registered_index().map(|idx| (Direction::Read, idx)) + } + + #[cfg(all(unix, feature = "legacy"))] + fn legacy_call(&mut self) -> io::Result<u32> { + let fd = self.fd.as_raw_fd(); + syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) + } +} |
