diff options
| author | Jin Wei Tan <[email protected]> | 2023-07-10 11:03:16 +0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-07-10 11:03:16 +0800 |
| commit | e22989c352ee30560e50924617a776ab271a6282 (patch) | |
| tree | df8eb134fef390b9a70904f66dc58405e8569b8a | |
| parent | c9e8c8c02e8635f21eb2f712fdf2fc3a456052ae (diff) | |
feat: implement accept, close, connect, fsync ops on windows (#188)
* feat: implement accept, close, connect, fsync ops on windows
* fix errors
| -rw-r--r-- | monoio/src/driver/op.rs | 10 | ||||
| -rw-r--r-- | monoio/src/driver/op/accept.rs | 45 | ||||
| -rw-r--r-- | monoio/src/driver/op/close.rs | 29 | ||||
| -rw-r--r-- | monoio/src/driver/op/connect.rs | 84 | ||||
| -rw-r--r-- | monoio/src/driver/op/fsync.rs | 19 | ||||
| -rw-r--r-- | monoio/src/driver/util.rs | 9 |
6 files changed, 169 insertions, 27 deletions
diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index da8967e..bb6577c 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -53,9 +53,9 @@ pub(crate) trait OpAble { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_interest(&self) -> Option<(super::legacy::ready::Direction, usize)>; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_call(&mut self) -> io::Result<u32>; } @@ -116,7 +116,7 @@ impl<T> Op<T> { where T: OpAble, { - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] if is_legacy() { return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() { OpCanceller { @@ -132,7 +132,7 @@ impl<T> Op<T> { } OpCanceller { index: self.index, - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] direction: None, } } @@ -175,7 +175,7 @@ pub(crate) fn is_legacy() -> bool { #[derive(Debug, Eq, PartialEq, Clone, Hash)] pub(crate) struct OpCanceller { pub(super) index: usize, - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] pub(super) direction: Option<super::legacy::ready::Direction>, } diff --git a/monoio/src/driver/op/accept.rs b/monoio/src/driver/op/accept.rs index f854e38..090d73b 100644 --- a/monoio/src/driver/op/accept.rs +++ b/monoio/src/driver/op/accept.rs @@ -5,32 +5,48 @@ use std::{ #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(windows)] use { - crate::{driver::legacy::ready::Direction, syscall_u32}, - std::os::unix::prelude::AsRawFd, + crate::syscall, + std::os::windows::prelude::AsRawSocket, + windows_sys::Win32::Networking::WinSock::{ + accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE, + }, }; +#[cfg(all(unix, feature = "legacy"))] +use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; +#[cfg(feature = "legacy")] +use crate::driver::legacy::ready::Direction; /// Accept pub(crate) struct Accept { - #[allow(unused)] pub(crate) fd: SharedFd, #[cfg(unix)] pub(crate) addr: Box<(MaybeUninit<libc::sockaddr_storage>, libc::socklen_t)>, + #[cfg(windows)] + pub(crate) addr: Box<(MaybeUninit<SOCKADDR_STORAGE>, socklen_t)>, } impl Op<Accept> { - #[cfg(unix)] /// Accept a connection pub(crate) fn accept(fd: &SharedFd) -> io::Result<Self> { + #[cfg(unix)] + let addr = Box::new(( + MaybeUninit::uninit(), + size_of::<libc::sockaddr_storage>() as libc::socklen_t, + )); + + #[cfg(windows)] + let addr = Box::new(( + MaybeUninit::uninit(), + size_of::<SOCKADDR_STORAGE>() as socklen_t, + )); + Op::submit_with(Accept { fd: fd.clone(), - addr: Box::new(( - MaybeUninit::uninit(), - size_of::<libc::sockaddr_storage>() as libc::socklen_t, - )), + addr, }) } } @@ -46,11 +62,20 @@ impl OpAble for Accept { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } + #[cfg(windows)] + fn legacy_call(&mut self) -> io::Result<u32> { + let fd = self.fd.as_raw_socket(); + let addr = self.addr.0.as_mut_ptr() as *mut _; + let len = &mut self.addr.1; + + syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET) + } + #[cfg(all(unix, feature = "legacy"))] fn legacy_call(&mut self) -> io::Result<u32> { let fd = self.fd.as_raw_fd(); diff --git a/monoio/src/driver/op/close.rs b/monoio/src/driver/op/close.rs index f089bea..9048fd1 100644 --- a/monoio/src/driver/op/close.rs +++ b/monoio/src/driver/op/close.rs @@ -1,16 +1,26 @@ +use std::io; #[cfg(unix)] -use std::{io, os::unix::io::RawFd}; +use std::os::unix::io::RawFd; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; +#[cfg(windows)] +use { + crate::syscall, std::os::windows::io::RawSocket, + windows_sys::Win32::Networking::WinSock::closesocket, +}; use super::{Op, OpAble}; +#[cfg(feature = "legacy")] +use crate::driver::legacy::ready::Direction; #[cfg(all(unix, feature = "legacy"))] -use crate::{driver::legacy::ready::Direction, syscall_u32}; +use crate::syscall_u32; pub(crate) struct Close { #[cfg(unix)] fd: RawFd, + #[cfg(windows)] + fd: RawSocket, } impl Op<Close> { @@ -19,6 +29,11 @@ impl Op<Close> { pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> { Op::try_submit_with(Close { fd }) } + + #[cfg(windows)] + pub(crate) fn close(fd: RawSocket) -> io::Result<Op<Close>> { + Op::try_submit_with(Close { fd }) + } } impl OpAble for Close { @@ -27,13 +42,17 @@ impl OpAble for Close { opcode::Close::new(types::Fd(self.fd)).build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_interest(&self) -> Option<(Direction, usize)> { None } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_call(&mut self) -> io::Result<u32> { - syscall_u32!(close(self.fd)) + #[cfg(unix)] + return syscall_u32!(close(self.fd)); + + #[cfg(windows)] + return syscall!(closesocket(self.fd), PartialEq::ne, 0); } } diff --git a/monoio/src/driver/op/connect.rs b/monoio/src/driver/op/connect.rs index a7b6902..c6a8b25 100644 --- a/monoio/src/driver/op/connect.rs +++ b/monoio/src/driver/op/connect.rs @@ -2,14 +2,22 @@ use std::{io, net::SocketAddr}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; +#[cfg(windows)] +use windows_sys::Win32::Networking::WinSock::{ + connect, socklen_t, AF_INET, AF_INET6, IN6_ADDR, IN6_ADDR_0, IN_ADDR, IN_ADDR_0, SOCKADDR_IN, + SOCKADDR_IN6, SOCKADDR_IN6_0, SOCKET_ERROR, +}; use super::{super::shared_fd::SharedFd, Op, OpAble}; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] use crate::driver::legacy::ready::Direction; pub(crate) struct Connect { pub(crate) fd: SharedFd, socket_addr: Box<SocketAddrCRepr>, + #[cfg(windows)] + socket_addr_len: socklen_t, + #[cfg(unix)] socket_addr_len: libc::socklen_t, #[cfg(any(target_os = "ios", target_os = "macos"))] tfo: bool, @@ -44,12 +52,12 @@ impl OpAble for Connect { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_interest(&self) -> Option<(Direction, usize)> { None } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_call(&mut self) -> io::Result<u32> { // For ios/macos, if tfo is enabled, we will // call connectx here. @@ -76,6 +84,7 @@ impl OpAble for Connect { }; } + #[cfg(unix)] match crate::syscall_u32!(connect( self.fd.raw_fd(), self.socket_addr.as_ptr(), @@ -84,6 +93,20 @@ impl OpAble for Connect { Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), _ => Ok(self.fd.raw_fd() as u32), } + + #[cfg(windows)] + match crate::syscall!( + connect( + self.fd.raw_socket(), + self.socket_addr.as_ptr(), + self.socket_addr_len, + ), + PartialEq::eq, + SOCKET_ERROR + ) { + Err(err) if err.kind() != io::ErrorKind::WouldBlock => Err(err), + _ => Ok(self.fd.raw_fd() as u32), + } } } @@ -147,8 +170,14 @@ impl OpAble for ConnectUnix { // Copied from mio. #[repr(C)] pub(crate) union SocketAddrCRepr { + #[cfg(unix)] v4: libc::sockaddr_in, + #[cfg(unix)] v6: libc::sockaddr_in6, + #[cfg(windows)] + v4: SOCKADDR_IN, + #[cfg(windows)] + v6: SOCKADDR_IN6, } impl SocketAddrCRepr { @@ -157,6 +186,55 @@ impl SocketAddrCRepr { } } +#[cfg(windows)] +pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) { + match addr { + SocketAddr::V4(ref addr) => { + // `s_addr` is stored as BE on all machine and the array is in BE order. + // So the native endian conversion method is used so that it's never swapped. + let sin_addr = unsafe { + let mut s_un = std::mem::zeroed::<IN_ADDR_0>(); + s_un.S_addr = u32::from_ne_bytes(addr.ip().octets()); + IN_ADDR { S_un: s_un } + }; + + let sockaddr_in = SOCKADDR_IN { + sin_family: AF_INET as u16, // 1 + sin_port: addr.port().to_be(), + sin_addr, + sin_zero: [0; 8], + }; + + let sockaddr = SocketAddrCRepr { v4: sockaddr_in }; + (sockaddr, std::mem::size_of::<SOCKADDR_IN>() as i32) + } + SocketAddr::V6(ref addr) => { + let sin6_addr = unsafe { + let mut u = std::mem::zeroed::<IN6_ADDR_0>(); + u.Byte = addr.ip().octets(); + IN6_ADDR { u } + }; + let u = unsafe { + let mut u = std::mem::zeroed::<SOCKADDR_IN6_0>(); + u.sin6_scope_id = addr.scope_id(); + u + }; + + let sockaddr_in6 = SOCKADDR_IN6 { + sin6_family: AF_INET6 as u16, // 23 + sin6_port: addr.port().to_be(), + sin6_addr, + sin6_flowinfo: addr.flowinfo(), + Anonymous: u, + }; + + let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 }; + (sockaddr, std::mem::size_of::<SOCKADDR_IN6>() as i32) + } + } +} + +#[cfg(unix)] /// Converts a Rust `SocketAddr` into the system representation. pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_t) { match addr { diff --git a/monoio/src/driver/op/fsync.rs b/monoio/src/driver/op/fsync.rs index 2665c2c..f23c1de 100644 --- a/monoio/src/driver/op/fsync.rs +++ b/monoio/src/driver/op/fsync.rs @@ -2,10 +2,16 @@ use std::io; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; +#[cfg(windows)] +use windows_sys::Win32::Storage::FileSystem::FlushFileBuffers; use super::{super::shared_fd::SharedFd, Op, OpAble}; +#[cfg(feature = "legacy")] +use crate::driver::legacy::ready::Direction; +#[cfg(windows)] +use crate::syscall; #[cfg(all(unix, feature = "legacy"))] -use crate::{driver::legacy::ready::Direction, syscall_u32}; +use crate::syscall_u32; pub(crate) struct Fsync { #[allow(unused)] @@ -42,11 +48,20 @@ impl OpAble for Fsync { opc.build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn legacy_interest(&self) -> Option<(Direction, usize)> { None } + #[cfg(all(windows, feature = "legacy"))] + fn legacy_call(&mut self) -> io::Result<u32> { + syscall!( + FlushFileBuffers(self.handle.as_raw_handle()), + PartialEq::eq, + 0 + ) + } + #[cfg(all(unix, not(target_os = "linux"), feature = "legacy"))] fn legacy_call(&mut self) -> io::Result<u32> { syscall_u32!(fsync(self.fd.raw_fd())) diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs index 71e3b69..a583400 100644 --- a/monoio/src/driver/util.rs +++ b/monoio/src/driver/util.rs @@ -38,8 +38,13 @@ macro_rules! syscall { #[cfg(windows)] #[macro_export] macro_rules! syscall { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ - unimplemented!() + ($fn: ident ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{ + let res = unsafe { $fn($($arg, )*) }; + if $err_test(&res, &$err_value) { + Err(io::Error::last_os_error()) + } else { + Ok(res) + } }}; } |
