summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJin Wei Tan <[email protected]>2023-07-10 11:03:16 +0800
committerGitHub <[email protected]>2023-07-10 11:03:16 +0800
commite22989c352ee30560e50924617a776ab271a6282 (patch)
treedf8eb134fef390b9a70904f66dc58405e8569b8a
parentc9e8c8c02e8635f21eb2f712fdf2fc3a456052ae (diff)
feat: implement accept, close, connect, fsync ops on windows (#188)
* feat: implement accept, close, connect, fsync ops on windows * fix errors
-rw-r--r--monoio/src/driver/op.rs10
-rw-r--r--monoio/src/driver/op/accept.rs45
-rw-r--r--monoio/src/driver/op/close.rs29
-rw-r--r--monoio/src/driver/op/connect.rs84
-rw-r--r--monoio/src/driver/op/fsync.rs19
-rw-r--r--monoio/src/driver/util.rs9
6 files changed, 169 insertions, 27 deletions
diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs
index da8967e..bb6577c 100644
--- a/monoio/src/driver/op.rs
+++ b/monoio/src/driver/op.rs
@@ -53,9 +53,9 @@ pub(crate) trait OpAble {
#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry;
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(super::legacy::ready::Direction, usize)>;
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_call(&mut self) -> io::Result<u32>;
}
@@ -116,7 +116,7 @@ impl<T> Op<T> {
where
T: OpAble,
{
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
if is_legacy() {
return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() {
OpCanceller {
@@ -132,7 +132,7 @@ impl<T> Op<T> {
}
OpCanceller {
index: self.index,
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
direction: None,
}
}
@@ -175,7 +175,7 @@ pub(crate) fn is_legacy() -> bool {
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub(crate) struct OpCanceller {
pub(super) index: usize,
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
pub(super) direction: Option<super::legacy::ready::Direction>,
}
diff --git a/monoio/src/driver/op/accept.rs b/monoio/src/driver/op/accept.rs
index f854e38..090d73b 100644
--- a/monoio/src/driver/op/accept.rs
+++ b/monoio/src/driver/op/accept.rs
@@ -5,32 +5,48 @@ use std::{
#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
-#[cfg(all(unix, feature = "legacy"))]
+#[cfg(windows)]
use {
- crate::{driver::legacy::ready::Direction, syscall_u32},
- std::os::unix::prelude::AsRawFd,
+ crate::syscall,
+ std::os::windows::prelude::AsRawSocket,
+ windows_sys::Win32::Networking::WinSock::{
+ accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE,
+ },
};
+#[cfg(all(unix, feature = "legacy"))]
+use {crate::syscall_u32, std::os::unix::prelude::AsRawFd};
use super::{super::shared_fd::SharedFd, Op, OpAble};
+#[cfg(feature = "legacy")]
+use crate::driver::legacy::ready::Direction;
/// Accept
pub(crate) struct Accept {
- #[allow(unused)]
pub(crate) fd: SharedFd,
#[cfg(unix)]
pub(crate) addr: Box<(MaybeUninit<libc::sockaddr_storage>, libc::socklen_t)>,
+ #[cfg(windows)]
+ pub(crate) addr: Box<(MaybeUninit<SOCKADDR_STORAGE>, socklen_t)>,
}
impl Op<Accept> {
- #[cfg(unix)]
/// Accept a connection
pub(crate) fn accept(fd: &SharedFd) -> io::Result<Self> {
+ #[cfg(unix)]
+ let addr = Box::new((
+ MaybeUninit::uninit(),
+ size_of::<libc::sockaddr_storage>() as libc::socklen_t,
+ ));
+
+ #[cfg(windows)]
+ let addr = Box::new((
+ MaybeUninit::uninit(),
+ size_of::<SOCKADDR_STORAGE>() as socklen_t,
+ ));
+
Op::submit_with(Accept {
fd: fd.clone(),
- addr: Box::new((
- MaybeUninit::uninit(),
- size_of::<libc::sockaddr_storage>() as libc::socklen_t,
- )),
+ addr,
})
}
}
@@ -46,11 +62,20 @@ impl OpAble for Accept {
.build()
}
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
self.fd.registered_index().map(|idx| (Direction::Read, idx))
}
+ #[cfg(windows)]
+ fn legacy_call(&mut self) -> io::Result<u32> {
+ let fd = self.fd.as_raw_socket();
+ let addr = self.addr.0.as_mut_ptr() as *mut _;
+ let len = &mut self.addr.1;
+
+ syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET)
+ }
+
#[cfg(all(unix, feature = "legacy"))]
fn legacy_call(&mut self) -> io::Result<u32> {
let fd = self.fd.as_raw_fd();
diff --git a/monoio/src/driver/op/close.rs b/monoio/src/driver/op/close.rs
index f089bea..9048fd1 100644
--- a/monoio/src/driver/op/close.rs
+++ b/monoio/src/driver/op/close.rs
@@ -1,16 +1,26 @@
+use std::io;
#[cfg(unix)]
-use std::{io, os::unix::io::RawFd};
+use std::os::unix::io::RawFd;
#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
+#[cfg(windows)]
+use {
+ crate::syscall, std::os::windows::io::RawSocket,
+ windows_sys::Win32::Networking::WinSock::closesocket,
+};
use super::{Op, OpAble};
+#[cfg(feature = "legacy")]
+use crate::driver::legacy::ready::Direction;
#[cfg(all(unix, feature = "legacy"))]
-use crate::{driver::legacy::ready::Direction, syscall_u32};
+use crate::syscall_u32;
pub(crate) struct Close {
#[cfg(unix)]
fd: RawFd,
+ #[cfg(windows)]
+ fd: RawSocket,
}
impl Op<Close> {
@@ -19,6 +29,11 @@ impl Op<Close> {
pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> {
Op::try_submit_with(Close { fd })
}
+
+ #[cfg(windows)]
+ pub(crate) fn close(fd: RawSocket) -> io::Result<Op<Close>> {
+ Op::try_submit_with(Close { fd })
+ }
}
impl OpAble for Close {
@@ -27,13 +42,17 @@ impl OpAble for Close {
opcode::Close::new(types::Fd(self.fd)).build()
}
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
None
}
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_call(&mut self) -> io::Result<u32> {
- syscall_u32!(close(self.fd))
+ #[cfg(unix)]
+ return syscall_u32!(close(self.fd));
+
+ #[cfg(windows)]
+ return syscall!(closesocket(self.fd), PartialEq::ne, 0);
}
}
diff --git a/monoio/src/driver/op/connect.rs b/monoio/src/driver/op/connect.rs
index a7b6902..c6a8b25 100644
--- a/monoio/src/driver/op/connect.rs
+++ b/monoio/src/driver/op/connect.rs
@@ -2,14 +2,22 @@ use std::{io, net::SocketAddr};
#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
+#[cfg(windows)]
+use windows_sys::Win32::Networking::WinSock::{
+ connect, socklen_t, AF_INET, AF_INET6, IN6_ADDR, IN6_ADDR_0, IN_ADDR, IN_ADDR_0, SOCKADDR_IN,
+ SOCKADDR_IN6, SOCKADDR_IN6_0, SOCKET_ERROR,
+};
use super::{super::shared_fd::SharedFd, Op, OpAble};
-#[cfg(all(unix, feature = "legacy"))]
+#[cfg(feature = "legacy")]
use crate::driver::legacy::ready::Direction;
pub(crate) struct Connect {
pub(crate) fd: SharedFd,
socket_addr: Box<SocketAddrCRepr>,
+ #[cfg(windows)]
+ socket_addr_len: socklen_t,
+ #[cfg(unix)]
socket_addr_len: libc::socklen_t,
#[cfg(any(target_os = "ios", target_os = "macos"))]
tfo: bool,
@@ -44,12 +52,12 @@ impl OpAble for Connect {
.build()
}
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
None
}
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_call(&mut self) -> io::Result<u32> {
// For ios/macos, if tfo is enabled, we will
// call connectx here.
@@ -76,6 +84,7 @@ impl OpAble for Connect {
};
}
+ #[cfg(unix)]
match crate::syscall_u32!(connect(
self.fd.raw_fd(),
self.socket_addr.as_ptr(),
@@ -84,6 +93,20 @@ impl OpAble for Connect {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
}
+
+ #[cfg(windows)]
+ match crate::syscall!(
+ connect(
+ self.fd.raw_socket(),
+ self.socket_addr.as_ptr(),
+ self.socket_addr_len,
+ ),
+ PartialEq::eq,
+ SOCKET_ERROR
+ ) {
+ Err(err) if err.kind() != io::ErrorKind::WouldBlock => Err(err),
+ _ => Ok(self.fd.raw_fd() as u32),
+ }
}
}
@@ -147,8 +170,14 @@ impl OpAble for ConnectUnix {
// Copied from mio.
#[repr(C)]
pub(crate) union SocketAddrCRepr {
+ #[cfg(unix)]
v4: libc::sockaddr_in,
+ #[cfg(unix)]
v6: libc::sockaddr_in6,
+ #[cfg(windows)]
+ v4: SOCKADDR_IN,
+ #[cfg(windows)]
+ v6: SOCKADDR_IN6,
}
impl SocketAddrCRepr {
@@ -157,6 +186,55 @@ impl SocketAddrCRepr {
}
}
+#[cfg(windows)]
+pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) {
+ match addr {
+ SocketAddr::V4(ref addr) => {
+ // `s_addr` is stored as BE on all machine and the array is in BE order.
+ // So the native endian conversion method is used so that it's never swapped.
+ let sin_addr = unsafe {
+ let mut s_un = std::mem::zeroed::<IN_ADDR_0>();
+ s_un.S_addr = u32::from_ne_bytes(addr.ip().octets());
+ IN_ADDR { S_un: s_un }
+ };
+
+ let sockaddr_in = SOCKADDR_IN {
+ sin_family: AF_INET as u16, // 1
+ sin_port: addr.port().to_be(),
+ sin_addr,
+ sin_zero: [0; 8],
+ };
+
+ let sockaddr = SocketAddrCRepr { v4: sockaddr_in };
+ (sockaddr, std::mem::size_of::<SOCKADDR_IN>() as i32)
+ }
+ SocketAddr::V6(ref addr) => {
+ let sin6_addr = unsafe {
+ let mut u = std::mem::zeroed::<IN6_ADDR_0>();
+ u.Byte = addr.ip().octets();
+ IN6_ADDR { u }
+ };
+ let u = unsafe {
+ let mut u = std::mem::zeroed::<SOCKADDR_IN6_0>();
+ u.sin6_scope_id = addr.scope_id();
+ u
+ };
+
+ let sockaddr_in6 = SOCKADDR_IN6 {
+ sin6_family: AF_INET6 as u16, // 23
+ sin6_port: addr.port().to_be(),
+ sin6_addr,
+ sin6_flowinfo: addr.flowinfo(),
+ Anonymous: u,
+ };
+
+ let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 };
+ (sockaddr, std::mem::size_of::<SOCKADDR_IN6>() as i32)
+ }
+ }
+}
+
+#[cfg(unix)]
/// Converts a Rust `SocketAddr` into the system representation.
pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_t) {
match addr {
diff --git a/monoio/src/driver/op/fsync.rs b/monoio/src/driver/op/fsync.rs
index 2665c2c..f23c1de 100644
--- a/monoio/src/driver/op/fsync.rs
+++ b/monoio/src/driver/op/fsync.rs
@@ -2,10 +2,16 @@ use std::io;
#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
+#[cfg(windows)]
+use windows_sys::Win32::Storage::FileSystem::FlushFileBuffers;
use super::{super::shared_fd::SharedFd, Op, OpAble};
+#[cfg(feature = "legacy")]
+use crate::driver::legacy::ready::Direction;
+#[cfg(windows)]
+use crate::syscall;
#[cfg(all(unix, feature = "legacy"))]
-use crate::{driver::legacy::ready::Direction, syscall_u32};
+use crate::syscall_u32;
pub(crate) struct Fsync {
#[allow(unused)]
@@ -42,11 +48,20 @@ impl OpAble for Fsync {
opc.build()
}
- #[cfg(all(unix, feature = "legacy"))]
+ #[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
None
}
+ #[cfg(all(windows, feature = "legacy"))]
+ fn legacy_call(&mut self) -> io::Result<u32> {
+ syscall!(
+ FlushFileBuffers(self.handle.as_raw_handle()),
+ PartialEq::eq,
+ 0
+ )
+ }
+
#[cfg(all(unix, not(target_os = "linux"), feature = "legacy"))]
fn legacy_call(&mut self) -> io::Result<u32> {
syscall_u32!(fsync(self.fd.raw_fd()))
diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs
index 71e3b69..a583400 100644
--- a/monoio/src/driver/util.rs
+++ b/monoio/src/driver/util.rs
@@ -38,8 +38,13 @@ macro_rules! syscall {
#[cfg(windows)]
#[macro_export]
macro_rules! syscall {
- ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
- unimplemented!()
+ ($fn: ident ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{
+ let res = unsafe { $fn($($arg, )*) };
+ if $err_test(&res, &$err_value) {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(res)
+ }
}};
}