diff options
| author | ihc童鞋@提不起劲 <[email protected]> | 2023-06-22 11:48:01 +0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-06-22 11:48:01 +0800 |
| commit | 9c3592cbb15cda6091c98736e3db9dfd6a38c9ac (patch) | |
| tree | cae58fe73f75837a369c9027f1d6a08b76be1065 | |
| parent | e11113d1abb7cc08ded44b302acc84b129a758bd (diff) | |
fix: remove split which violate unsafe rule and publish 0.1.5 (#178)
| -rw-r--r-- | examples/proxy.rs | 8 | ||||
| -rw-r--r-- | monoio/Cargo.toml | 2 | ||||
| -rw-r--r-- | monoio/src/buf/vec_wrapper.rs | 2 | ||||
| -rw-r--r-- | monoio/src/io/mod.rs | 2 | ||||
| -rw-r--r-- | monoio/src/io/stream/mod.rs | 2 | ||||
| -rw-r--r-- | monoio/src/io/stream/stream_ext.rs | 3 | ||||
| -rw-r--r-- | monoio/src/io/util/copy.rs | 5 | ||||
| -rw-r--r-- | monoio/src/io/util/mod.rs | 2 | ||||
| -rw-r--r-- | monoio/src/io/util/split.rs | 204 | ||||
| -rw-r--r-- | monoio/src/net/tcp/mod.rs | 2 | ||||
| -rw-r--r-- | monoio/src/net/tcp/split.rs | 27 | ||||
| -rw-r--r-- | monoio/src/net/unix/mod.rs | 2 | ||||
| -rw-r--r-- | monoio/src/net/unix/split.rs | 26 | ||||
| -rw-r--r-- | monoio/src/utils/ctrlc.rs | 2 | ||||
| -rw-r--r-- | monoio/tests/tcp_echo.rs | 4 | ||||
| -rw-r--r-- | monoio/tests/tcp_split.rs | 4 | ||||
| -rw-r--r-- | monoio/tests/uds_split.rs | 6 | ||||
| -rw-r--r-- | monoio/tests/zero_copy.rs | 8 |
18 files changed, 32 insertions, 279 deletions
diff --git a/examples/proxy.rs b/examples/proxy.rs index 8630329..e8429d1 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -13,12 +13,12 @@ async fn main() { let listener = TcpListener::bind(LISTEN_ADDRESS) .unwrap_or_else(|_| panic!("[Server] Unable to bind to {LISTEN_ADDRESS}")); loop { - if let Ok((mut in_conn, _addr)) = listener.accept().await { + if let Ok((in_conn, _addr)) = listener.accept().await { let out_conn = TcpStream::connect(TARGET_ADDRESS).await; - if let Ok(mut out_conn) = out_conn { + if let Ok(out_conn) = out_conn { monoio::spawn(async move { - let (mut in_r, mut in_w) = in_conn.split(); - let (mut out_r, mut out_w) = out_conn.split(); + let (mut in_r, mut in_w) = in_conn.into_split(); + let (mut out_r, mut out_w) = out_conn.into_split(); let _ = monoio::join!( copy_one_direction(&mut in_r, &mut out_w), copy_one_direction(&mut out_r, &mut in_w), diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 2120dbf..44baf0e 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT/Apache-2.0" name = "monoio" readme = "README.md" repository = "https://github.com/bytedance/monoio" -version = "0.1.4" +version = "0.1.5" # common dependencies [dependencies] diff --git a/monoio/src/buf/vec_wrapper.rs b/monoio/src/buf/vec_wrapper.rs index 902864a..18e799e 100644 --- a/monoio/src/buf/vec_wrapper.rs +++ b/monoio/src/buf/vec_wrapper.rs @@ -113,7 +113,7 @@ impl IoVecMeta { return; } std::cmp::Ordering::Greater => { - unsafe { iovec.iov_base.add(amt) }; + let _ = unsafe { iovec.iov_base.add(amt) }; iovec.iov_len -= amt; self.offset = offset; return; diff --git a/monoio/src/io/mod.rs b/monoio/src/io/mod.rs index 2a6c7ba..0b97aa3 100644 --- a/monoio/src/io/mod.rs +++ b/monoio/src/io/mod.rs @@ -29,5 +29,5 @@ pub(crate) use util::operation_canceled; pub use util::zero_copy; pub use util::{ copy, BufReader, BufWriter, CancelHandle, Canceller, OwnedReadHalf, OwnedWriteHalf, - PrefixedReadIo, ReadHalf, Split, Splitable, WriteHalf, + PrefixedReadIo, Split, Splitable, }; diff --git a/monoio/src/io/stream/mod.rs b/monoio/src/io/stream/mod.rs index 334b836..7dc375b 100644 --- a/monoio/src/io/stream/mod.rs +++ b/monoio/src/io/stream/mod.rs @@ -4,7 +4,7 @@ mod iter; mod stream_ext; pub use iter::{iter, Iter}; -pub use stream_ext::StreamExt; +pub use stream_ext::{ForEachFut, StreamExt}; /// A stream of values produced asynchronously in pure async/await. #[must_use = "streams do nothing unless polled"] diff --git a/monoio/src/io/stream/stream_ext.rs b/monoio/src/io/stream/stream_ext.rs index b393f89..5e4f1bd 100644 --- a/monoio/src/io/stream/stream_ext.rs +++ b/monoio/src/io/stream/stream_ext.rs @@ -42,7 +42,8 @@ pub trait StreamExt: Stream { impl<T> StreamExt for T where T: Stream {} -type ForEachFut<T: Stream, Fut: Future<Output = ()>, F: FnMut(<T as Stream>::Item) -> Fut> = +/// A ForEachFut is a Future generated by Stream's for_each. +pub type ForEachFut<T: Stream, Fut: Future<Output = ()>, F: FnMut(<T as Stream>::Item) -> Fut> = impl Future<Output = ()>; #[must_use = "streams do nothing unless polled"] diff --git a/monoio/src/io/util/copy.rs b/monoio/src/io/util/copy.rs index f48233b..23111ec 100644 --- a/monoio/src/io/util/copy.rs +++ b/monoio/src/io/util/copy.rs @@ -4,10 +4,7 @@ use std::io; use crate::{ io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt}, - net::{ - tcp::{TcpReadHalf, TcpWriteHalf}, - unix::new_pipe, - }, + net::unix::new_pipe, }; const BUF_SIZE: usize = 4 * 1024; diff --git a/monoio/src/io/util/mod.rs b/monoio/src/io/util/mod.rs index faf330e..45c3cc3 100644 --- a/monoio/src/io/util/mod.rs +++ b/monoio/src/io/util/mod.rs @@ -15,4 +15,4 @@ pub use copy::copy; #[cfg(all(target_os = "linux", feature = "splice"))] pub use copy::zero_copy; pub use prefixed_io::PrefixedReadIo; -pub use split::{OwnedReadHalf, OwnedWriteHalf, ReadHalf, Split, Splitable, WriteHalf}; +pub use split::{OwnedReadHalf, OwnedWriteHalf, Split, Splitable}; diff --git a/monoio/src/io/util/split.rs b/monoio/src/io/util/split.rs index b0a6fba..fcf3639 100644 --- a/monoio/src/io/util/split.rs +++ b/monoio/src/io/util/split.rs @@ -3,15 +3,11 @@ use std::{ error::Error, fmt::{self, Debug}, future::Future, - io, rc::Rc, }; use super::CancelHandle; -use crate::{ - buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, - io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent}, -}; +use crate::io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent}; /// Owned Read Half Part #[derive(Debug)] @@ -22,12 +18,7 @@ pub struct OwnedReadHalf<T>(pub Rc<UnsafeCell<T>>); pub struct OwnedWriteHalf<T>(pub Rc<UnsafeCell<T>>) where T: AsyncWriteRent; -/// Borrowed Write Half Part -#[derive(Debug)] -pub struct WriteHalf<'cx, T>(pub &'cx T); -/// Borrowed Read Half Part -#[derive(Debug)] -pub struct ReadHalf<'cx, T>(pub &'cx T); + /// This is a dummy unsafe trait to inform monoio, /// the object with has this `Split` trait can be safely split /// to read/write object in both form of `Owned` or `Borrowed`. @@ -47,209 +38,22 @@ pub trait Splitable { /// Owned Write Split type OwnedWrite; - /// Borrowed Read Split - type Read<'cx> - where - Self: 'cx; - /// Borrowed Write Split - type Write<'cx> - where - Self: 'cx; - /// Split into owned parts fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite); - - /// Split into borrowed parts - fn split(&mut self) -> (Self::Read<'_>, Self::Write<'_>); -} - -impl<'t, Inner> AsyncReadRent for ReadHalf<'t, Inner> -where - Inner: AsyncReadRent, -{ - type ReadFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoBufMut + 'a, Inner: 'a; - type ReadvFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoVecBufMut + 'a, Inner: 'a; - - #[inline] - fn read<T: IoBufMut>(&mut self, buf: T) -> Self::ReadFuture<'_, T> - where - T: IoBufMut, - { - // Submit the read operation - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.read(buf) - } - - #[inline] - fn readv<T: IoVecBufMut>(&mut self, buf: T) -> Self::ReadvFuture<'_, T> { - // Submit the read operation - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.readv(buf) - } -} - -impl<'t, Inner> CancelableAsyncReadRent for ReadHalf<'t, Inner> -where - Inner: CancelableAsyncReadRent, -{ - type CancelableReadFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoBufMut + 'a, Inner: 'a; - type CancelableReadvFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoVecBufMut + 'a, Inner: 'a; - - #[inline] - fn cancelable_read<T: IoBufMut>( - &mut self, - buf: T, - c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T> - where - T: IoBufMut, - { - // Submit the read operation - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.cancelable_read(buf, c) - } - - #[inline] - fn cancelable_readv<T: IoVecBufMut>( - &mut self, - buf: T, - c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T> { - // Submit the read operation - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.cancelable_readv(buf, c) - } -} - -impl<'t, Inner> AsyncWriteRent for WriteHalf<'t, Inner> -where - Inner: AsyncWriteRent, -{ - type WriteFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoBuf + 'a, Inner: 'a; - type WritevFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoVecBuf + 'a, Inner: 'a; - type FlushFuture<'a> = impl Future<Output = io::Result<()>> + 'a where - 't: 'a, Inner: 'a; - type ShutdownFuture<'a> = impl Future<Output = io::Result<()>> + 'a where - 't: 'a, Inner: 'a; - - #[inline] - fn write<T: IoBuf>(&mut self, buf: T) -> Self::WriteFuture<'_, T> - where - T: IoBuf, - { - // Submit the write operation - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.write(buf) - } - - #[inline] - fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.writev(buf_vec) - } - - #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.flush() - } - - #[inline] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.shutdown() - } -} - -impl<'t, Inner> CancelableAsyncWriteRent for WriteHalf<'t, Inner> -where - Inner: CancelableAsyncWriteRent, -{ - type CancelableWriteFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoBuf + 'a, Inner: 'a; - type CancelableWritevFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where - 't: 'a, B: IoVecBuf + 'a, Inner: 'a; - type CancelableFlushFuture<'a> = impl Future<Output = io::Result<()>> + 'a where - 't: 'a, Inner: 'a; - type CancelableShutdownFuture<'a> = impl Future<Output = io::Result<()>> + 'a where - 't: 'a, Inner: 'a; - - #[inline] - fn cancelable_write<T: IoBuf>( - &mut self, - buf: T, - c: CancelHandle, - ) -> Self::CancelableWriteFuture<'_, T> - where - T: IoBuf, - { - // Submit the write operation - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.cancelable_write(buf, c) - } - - #[inline] - fn cancelable_writev<T: IoVecBuf>( - &mut self, - buf_vec: T, - c: CancelHandle, - ) -> Self::CancelableWritevFuture<'_, T> { - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.cancelable_writev(buf_vec, c) - } - - #[inline] - fn cancelable_flush(&mut self, c: CancelHandle) -> Self::CancelableFlushFuture<'_> { - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.cancelable_flush(c) - } - - #[inline] - fn cancelable_shutdown(&mut self, c: CancelHandle) -> Self::CancelableShutdownFuture<'_> { - #[allow(cast_ref_to_mut)] - let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) }; - raw_stream.cancelable_shutdown(c) - } } impl<T> Splitable for T where - T: Split + AsyncReadRent + AsyncWriteRent, + T: Split + AsyncWriteRent, { - type Read<'cx> = ReadHalf<'cx, T> where Self: 'cx; - - type Write<'cx> = WriteHalf<'cx, T> where Self: 'cx; - type OwnedRead = OwnedReadHalf<T>; - type OwnedWrite = OwnedWriteHalf<T>; + #[inline] fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite) { let shared = Rc::new(UnsafeCell::new(self)); (OwnedReadHalf(shared.clone()), OwnedWriteHalf(shared)) } - - #[inline] - fn split(&mut self) -> (Self::Read<'_>, Self::Write<'_>) { - (ReadHalf(&*self), WriteHalf(&*self)) - } } impl<Inner> AsyncReadRent for OwnedReadHalf<Inner> diff --git a/monoio/src/net/tcp/mod.rs b/monoio/src/net/tcp/mod.rs index 4454053..c092c28 100644 --- a/monoio/src/net/tcp/mod.rs +++ b/monoio/src/net/tcp/mod.rs @@ -7,5 +7,5 @@ mod stream; mod tfo; pub use listener::TcpListener; -pub use split::{TcpOwnedReadHalf, TcpOwnedWriteHalf, TcpReadHalf, TcpWriteHalf}; +pub use split::{TcpOwnedReadHalf, TcpOwnedWriteHalf}; pub use stream::{TcpConnectOpts, TcpStream}; diff --git a/monoio/src/net/tcp/split.rs b/monoio/src/net/tcp/split.rs index 2b9ffc7..c8465f1 100644 --- a/monoio/src/net/tcp/split.rs +++ b/monoio/src/net/tcp/split.rs @@ -3,39 +3,14 @@ use std::{io, net::SocketAddr}; use super::TcpStream; use crate::io::{ as_fd::{AsReadFd, AsWriteFd, SharedFdWrapper}, - OwnedReadHalf, OwnedWriteHalf, ReadHalf, WriteHalf, + OwnedReadHalf, OwnedWriteHalf, }; -/// ReadHalf. -pub type TcpReadHalf<'a> = ReadHalf<'a, TcpStream>; -/// WriteHalf -pub type TcpWriteHalf<'a> = WriteHalf<'a, TcpStream>; - -#[allow(cast_ref_to_mut)] -impl<'t> AsReadFd for TcpReadHalf<'t> { - #[inline] - fn as_reader_fd(&mut self) -> &SharedFdWrapper { - let raw_stream = unsafe { &mut *(self.0 as *const TcpStream as *mut TcpStream) }; - raw_stream.as_reader_fd() - } -} - -#[allow(cast_ref_to_mut)] -impl<'t> AsWriteFd for TcpWriteHalf<'t> { - #[inline] - fn as_writer_fd(&mut self) -> &SharedFdWrapper { - let raw_stream = unsafe { &mut *(self.0 as *const TcpStream as *mut TcpStream) }; - raw_stream.as_writer_fd() - } -} - /// OwnedReadHalf. pub type TcpOwnedReadHalf = OwnedReadHalf<TcpStream>; /// OwnedWriteHalf pub type TcpOwnedWriteHalf = OwnedWriteHalf<TcpStream>; -// impl Error for ReuniteError{} - impl TcpOwnedReadHalf { /// Returns the remote address that this stream is connected to. #[inline] diff --git a/monoio/src/net/unix/mod.rs b/monoio/src/net/unix/mod.rs index cfbdf40..a55a4f4 100644 --- a/monoio/src/net/unix/mod.rs +++ b/monoio/src/net/unix/mod.rs @@ -13,7 +13,7 @@ pub use datagram::UnixDatagram; pub use listener::UnixListener; pub use pipe::{new_pipe, Pipe}; pub use socket_addr::SocketAddr; -pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf, UnixReadHalf, UnixWriteHalf}; +pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf}; pub use stream::UnixStream; pub(crate) fn path_offset(sockaddr: &libc::sockaddr_un) -> usize { diff --git a/monoio/src/net/unix/split.rs b/monoio/src/net/unix/split.rs index 8b37bd7..a5503aa 100644 --- a/monoio/src/net/unix/split.rs +++ b/monoio/src/net/unix/split.rs @@ -3,33 +3,9 @@ use std::io; use super::{SocketAddr, UnixStream}; use crate::io::{ as_fd::{AsReadFd, AsWriteFd, SharedFdWrapper}, - OwnedReadHalf, OwnedWriteHalf, ReadHalf, WriteHalf, + OwnedReadHalf, OwnedWriteHalf, }; -/// ReadHalf. -pub type UnixReadHalf<'a> = ReadHalf<'a, UnixStream>; - -/// WriteHalf. -pub type UnixWriteHalf<'a> = WriteHalf<'a, UnixStream>; - -#[allow(cast_ref_to_mut)] -impl<'t> AsReadFd for UnixReadHalf<'t> { - #[inline] - fn as_reader_fd(&mut self) -> &SharedFdWrapper { - let raw_stream = unsafe { &mut *(self.0 as *const UnixStream as *mut UnixStream) }; - raw_stream.as_reader_fd() - } -} - -#[allow(cast_ref_to_mut)] -impl<'t> AsWriteFd for UnixWriteHalf<'t> { - #[inline] - fn as_writer_fd(&mut self) -> &SharedFdWrapper { - let raw_stream = unsafe { &mut *(self.0 as *const UnixStream as *mut UnixStream) }; - raw_stream.as_writer_fd() - } -} - /// OwnedReadHalf. pub type UnixOwnedReadHalf = OwnedReadHalf<UnixStream>; diff --git a/monoio/src/utils/ctrlc.rs b/monoio/src/utils/ctrlc.rs index db302ae..f72b7f2 100644 --- a/monoio/src/utils/ctrlc.rs +++ b/monoio/src/utils/ctrlc.rs @@ -35,7 +35,7 @@ impl Future for CtrlC { let new_waker = Box::new(cx.waker().clone()); let old_waker_ptr = WAKER.swap(Box::into_raw(new_waker), Ordering::SeqCst); if !old_waker_ptr.is_null() { - unsafe { Box::from_raw(old_waker_ptr) }; + let _ = unsafe { Box::from_raw(old_waker_ptr) }; } Poll::Pending } diff --git a/monoio/tests/tcp_echo.rs b/monoio/tests/tcp_echo.rs index ce6ad6d..9f1c22d 100644 --- a/monoio/tests/tcp_echo.rs +++ b/monoio/tests/tcp_echo.rs @@ -53,8 +53,8 @@ async fn echo_server() { assert!(tx.send(()).is_ok()); }); - let (mut stream, _) = srv.accept().await.unwrap(); - let (mut rd, mut wr) = stream.split(); + let (stream, _) = srv.accept().await.unwrap(); + let (mut rd, mut wr) = stream.into_split(); let n = io::copy(&mut rd, &mut wr).await.unwrap(); assert_eq!(n, (ITER * (msg.len() + iov_msg.len())) as u64); diff --git a/monoio/tests/tcp_split.rs b/monoio/tests/tcp_split.rs index c1877dc..4342b45 100644 --- a/monoio/tests/tcp_split.rs +++ b/monoio/tests/tcp_split.rs @@ -24,8 +24,8 @@ async fn split() -> Result<()> { assert_eq!(&read_buf[..read_len], MSG); }); - let mut stream = TcpStream::connect(&addr).await?; - let (mut read_half, mut write_half) = stream.split(); + let stream = TcpStream::connect(&addr).await?; + let (mut read_half, mut write_half) = stream.into_split(); let read_buf = Box::new([0u8; 32]); let (read_res, buf) = read_half.read(read_buf).await; diff --git a/monoio/tests/uds_split.rs b/monoio/tests/uds_split.rs index 5f07ae8..21f8cef 100644 --- a/monoio/tests/uds_split.rs +++ b/monoio/tests/uds_split.rs @@ -11,10 +11,10 @@ use monoio::{ /// the connection. #[monoio::test_all(entries = 1024)] async fn split() -> std::io::Result<()> { - let (mut a, mut b) = UnixStream::pair()?; + let (a, b) = UnixStream::pair()?; - let (mut a_read, mut a_write) = a.split(); - let (mut b_read, mut b_write) = b.split(); + let (mut a_read, mut a_write) = a.into_split(); + let (mut b_read, mut b_write) = b.into_split(); let (a_response, b_response) = futures::future::try_join( send_recv_all(&mut a_read, &mut a_write, b"A"), diff --git a/monoio/tests/zero_copy.rs b/monoio/tests/zero_copy.rs index a0c14db..6abed19 100644 --- a/monoio/tests/zero_copy.rs +++ b/monoio/tests/zero_copy.rs @@ -12,8 +12,8 @@ async fn zero_copy_for_tcp() { let (mut c_tx, mut c_rx) = local_sync::oneshot::channel::<()>(); let addr = srv.local_addr().unwrap(); monoio::spawn(async move { - let mut stream = TcpStream::connect(&addr).await.unwrap(); - let (mut rx, mut tx) = stream.split(); + let stream = TcpStream::connect(&addr).await.unwrap(); + let (mut rx, mut tx) = stream.into_split(); tx.write_all(MSG).await.0.unwrap(); let buf = Vec::<u8>::with_capacity(MSG.len()).slice_mut(0..MSG.len()); let (res, buf) = rx.read_exact(buf).await; @@ -46,8 +46,8 @@ async fn zero_copy_for_uds() { let srv = monoio::net::UnixListener::bind(&sock_path).unwrap(); let (mut c_tx, mut c_rx) = local_sync::oneshot::channel::<()>(); monoio::spawn(async move { - let mut stream = UnixStream::connect(&sock_path).await.unwrap(); - let (mut rx, mut tx) = stream.split(); + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (mut rx, mut tx) = stream.into_split(); tx.write_all(MSG).await.0.unwrap(); let buf = Vec::<u8>::with_capacity(MSG.len()).slice_mut(0..MSG.len()); let (res, buf) = rx.read_exact(buf).await; |
