summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorihc童鞋@提不起劲 <[email protected]>2023-06-22 11:48:01 +0800
committerGitHub <[email protected]>2023-06-22 11:48:01 +0800
commit9c3592cbb15cda6091c98736e3db9dfd6a38c9ac (patch)
treecae58fe73f75837a369c9027f1d6a08b76be1065
parente11113d1abb7cc08ded44b302acc84b129a758bd (diff)
fix: remove split which violate unsafe rule and publish 0.1.5 (#178)
-rw-r--r--examples/proxy.rs8
-rw-r--r--monoio/Cargo.toml2
-rw-r--r--monoio/src/buf/vec_wrapper.rs2
-rw-r--r--monoio/src/io/mod.rs2
-rw-r--r--monoio/src/io/stream/mod.rs2
-rw-r--r--monoio/src/io/stream/stream_ext.rs3
-rw-r--r--monoio/src/io/util/copy.rs5
-rw-r--r--monoio/src/io/util/mod.rs2
-rw-r--r--monoio/src/io/util/split.rs204
-rw-r--r--monoio/src/net/tcp/mod.rs2
-rw-r--r--monoio/src/net/tcp/split.rs27
-rw-r--r--monoio/src/net/unix/mod.rs2
-rw-r--r--monoio/src/net/unix/split.rs26
-rw-r--r--monoio/src/utils/ctrlc.rs2
-rw-r--r--monoio/tests/tcp_echo.rs4
-rw-r--r--monoio/tests/tcp_split.rs4
-rw-r--r--monoio/tests/uds_split.rs6
-rw-r--r--monoio/tests/zero_copy.rs8
18 files changed, 32 insertions, 279 deletions
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 8630329..e8429d1 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -13,12 +13,12 @@ async fn main() {
let listener = TcpListener::bind(LISTEN_ADDRESS)
.unwrap_or_else(|_| panic!("[Server] Unable to bind to {LISTEN_ADDRESS}"));
loop {
- if let Ok((mut in_conn, _addr)) = listener.accept().await {
+ if let Ok((in_conn, _addr)) = listener.accept().await {
let out_conn = TcpStream::connect(TARGET_ADDRESS).await;
- if let Ok(mut out_conn) = out_conn {
+ if let Ok(out_conn) = out_conn {
monoio::spawn(async move {
- let (mut in_r, mut in_w) = in_conn.split();
- let (mut out_r, mut out_w) = out_conn.split();
+ let (mut in_r, mut in_w) = in_conn.into_split();
+ let (mut out_r, mut out_w) = out_conn.into_split();
let _ = monoio::join!(
copy_one_direction(&mut in_r, &mut out_w),
copy_one_direction(&mut out_r, &mut in_w),
diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml
index 2120dbf..44baf0e 100644
--- a/monoio/Cargo.toml
+++ b/monoio/Cargo.toml
@@ -8,7 +8,7 @@ license = "MIT/Apache-2.0"
name = "monoio"
readme = "README.md"
repository = "https://github.com/bytedance/monoio"
-version = "0.1.4"
+version = "0.1.5"
# common dependencies
[dependencies]
diff --git a/monoio/src/buf/vec_wrapper.rs b/monoio/src/buf/vec_wrapper.rs
index 902864a..18e799e 100644
--- a/monoio/src/buf/vec_wrapper.rs
+++ b/monoio/src/buf/vec_wrapper.rs
@@ -113,7 +113,7 @@ impl IoVecMeta {
return;
}
std::cmp::Ordering::Greater => {
- unsafe { iovec.iov_base.add(amt) };
+ let _ = unsafe { iovec.iov_base.add(amt) };
iovec.iov_len -= amt;
self.offset = offset;
return;
diff --git a/monoio/src/io/mod.rs b/monoio/src/io/mod.rs
index 2a6c7ba..0b97aa3 100644
--- a/monoio/src/io/mod.rs
+++ b/monoio/src/io/mod.rs
@@ -29,5 +29,5 @@ pub(crate) use util::operation_canceled;
pub use util::zero_copy;
pub use util::{
copy, BufReader, BufWriter, CancelHandle, Canceller, OwnedReadHalf, OwnedWriteHalf,
- PrefixedReadIo, ReadHalf, Split, Splitable, WriteHalf,
+ PrefixedReadIo, Split, Splitable,
};
diff --git a/monoio/src/io/stream/mod.rs b/monoio/src/io/stream/mod.rs
index 334b836..7dc375b 100644
--- a/monoio/src/io/stream/mod.rs
+++ b/monoio/src/io/stream/mod.rs
@@ -4,7 +4,7 @@ mod iter;
mod stream_ext;
pub use iter::{iter, Iter};
-pub use stream_ext::StreamExt;
+pub use stream_ext::{ForEachFut, StreamExt};
/// A stream of values produced asynchronously in pure async/await.
#[must_use = "streams do nothing unless polled"]
diff --git a/monoio/src/io/stream/stream_ext.rs b/monoio/src/io/stream/stream_ext.rs
index b393f89..5e4f1bd 100644
--- a/monoio/src/io/stream/stream_ext.rs
+++ b/monoio/src/io/stream/stream_ext.rs
@@ -42,7 +42,8 @@ pub trait StreamExt: Stream {
impl<T> StreamExt for T where T: Stream {}
-type ForEachFut<T: Stream, Fut: Future<Output = ()>, F: FnMut(<T as Stream>::Item) -> Fut> =
+/// A ForEachFut is a Future generated by Stream's for_each.
+pub type ForEachFut<T: Stream, Fut: Future<Output = ()>, F: FnMut(<T as Stream>::Item) -> Fut> =
impl Future<Output = ()>;
#[must_use = "streams do nothing unless polled"]
diff --git a/monoio/src/io/util/copy.rs b/monoio/src/io/util/copy.rs
index f48233b..23111ec 100644
--- a/monoio/src/io/util/copy.rs
+++ b/monoio/src/io/util/copy.rs
@@ -4,10 +4,7 @@ use std::io;
use crate::{
io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt},
- net::{
- tcp::{TcpReadHalf, TcpWriteHalf},
- unix::new_pipe,
- },
+ net::unix::new_pipe,
};
const BUF_SIZE: usize = 4 * 1024;
diff --git a/monoio/src/io/util/mod.rs b/monoio/src/io/util/mod.rs
index faf330e..45c3cc3 100644
--- a/monoio/src/io/util/mod.rs
+++ b/monoio/src/io/util/mod.rs
@@ -15,4 +15,4 @@ pub use copy::copy;
#[cfg(all(target_os = "linux", feature = "splice"))]
pub use copy::zero_copy;
pub use prefixed_io::PrefixedReadIo;
-pub use split::{OwnedReadHalf, OwnedWriteHalf, ReadHalf, Split, Splitable, WriteHalf};
+pub use split::{OwnedReadHalf, OwnedWriteHalf, Split, Splitable};
diff --git a/monoio/src/io/util/split.rs b/monoio/src/io/util/split.rs
index b0a6fba..fcf3639 100644
--- a/monoio/src/io/util/split.rs
+++ b/monoio/src/io/util/split.rs
@@ -3,15 +3,11 @@ use std::{
error::Error,
fmt::{self, Debug},
future::Future,
- io,
rc::Rc,
};
use super::CancelHandle;
-use crate::{
- buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut},
- io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent},
-};
+use crate::io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent};
/// Owned Read Half Part
#[derive(Debug)]
@@ -22,12 +18,7 @@ pub struct OwnedReadHalf<T>(pub Rc<UnsafeCell<T>>);
pub struct OwnedWriteHalf<T>(pub Rc<UnsafeCell<T>>)
where
T: AsyncWriteRent;
-/// Borrowed Write Half Part
-#[derive(Debug)]
-pub struct WriteHalf<'cx, T>(pub &'cx T);
-/// Borrowed Read Half Part
-#[derive(Debug)]
-pub struct ReadHalf<'cx, T>(pub &'cx T);
+
/// This is a dummy unsafe trait to inform monoio,
/// the object with has this `Split` trait can be safely split
/// to read/write object in both form of `Owned` or `Borrowed`.
@@ -47,209 +38,22 @@ pub trait Splitable {
/// Owned Write Split
type OwnedWrite;
- /// Borrowed Read Split
- type Read<'cx>
- where
- Self: 'cx;
- /// Borrowed Write Split
- type Write<'cx>
- where
- Self: 'cx;
-
/// Split into owned parts
fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite);
-
- /// Split into borrowed parts
- fn split(&mut self) -> (Self::Read<'_>, Self::Write<'_>);
-}
-
-impl<'t, Inner> AsyncReadRent for ReadHalf<'t, Inner>
-where
- Inner: AsyncReadRent,
-{
- type ReadFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoBufMut + 'a, Inner: 'a;
- type ReadvFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoVecBufMut + 'a, Inner: 'a;
-
- #[inline]
- fn read<T: IoBufMut>(&mut self, buf: T) -> Self::ReadFuture<'_, T>
- where
- T: IoBufMut,
- {
- // Submit the read operation
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.read(buf)
- }
-
- #[inline]
- fn readv<T: IoVecBufMut>(&mut self, buf: T) -> Self::ReadvFuture<'_, T> {
- // Submit the read operation
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.readv(buf)
- }
-}
-
-impl<'t, Inner> CancelableAsyncReadRent for ReadHalf<'t, Inner>
-where
- Inner: CancelableAsyncReadRent,
-{
- type CancelableReadFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoBufMut + 'a, Inner: 'a;
- type CancelableReadvFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoVecBufMut + 'a, Inner: 'a;
-
- #[inline]
- fn cancelable_read<T: IoBufMut>(
- &mut self,
- buf: T,
- c: CancelHandle,
- ) -> Self::CancelableReadFuture<'_, T>
- where
- T: IoBufMut,
- {
- // Submit the read operation
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.cancelable_read(buf, c)
- }
-
- #[inline]
- fn cancelable_readv<T: IoVecBufMut>(
- &mut self,
- buf: T,
- c: CancelHandle,
- ) -> Self::CancelableReadvFuture<'_, T> {
- // Submit the read operation
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.cancelable_readv(buf, c)
- }
-}
-
-impl<'t, Inner> AsyncWriteRent for WriteHalf<'t, Inner>
-where
- Inner: AsyncWriteRent,
-{
- type WriteFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoBuf + 'a, Inner: 'a;
- type WritevFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoVecBuf + 'a, Inner: 'a;
- type FlushFuture<'a> = impl Future<Output = io::Result<()>> + 'a where
- 't: 'a, Inner: 'a;
- type ShutdownFuture<'a> = impl Future<Output = io::Result<()>> + 'a where
- 't: 'a, Inner: 'a;
-
- #[inline]
- fn write<T: IoBuf>(&mut self, buf: T) -> Self::WriteFuture<'_, T>
- where
- T: IoBuf,
- {
- // Submit the write operation
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.write(buf)
- }
-
- #[inline]
- fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> {
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.writev(buf_vec)
- }
-
- #[inline]
- fn flush(&mut self) -> Self::FlushFuture<'_> {
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.flush()
- }
-
- #[inline]
- fn shutdown(&mut self) -> Self::ShutdownFuture<'_> {
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.shutdown()
- }
-}
-
-impl<'t, Inner> CancelableAsyncWriteRent for WriteHalf<'t, Inner>
-where
- Inner: CancelableAsyncWriteRent,
-{
- type CancelableWriteFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoBuf + 'a, Inner: 'a;
- type CancelableWritevFuture<'a, B> = impl Future<Output = crate::BufResult<usize, B>> + 'a where
- 't: 'a, B: IoVecBuf + 'a, Inner: 'a;
- type CancelableFlushFuture<'a> = impl Future<Output = io::Result<()>> + 'a where
- 't: 'a, Inner: 'a;
- type CancelableShutdownFuture<'a> = impl Future<Output = io::Result<()>> + 'a where
- 't: 'a, Inner: 'a;
-
- #[inline]
- fn cancelable_write<T: IoBuf>(
- &mut self,
- buf: T,
- c: CancelHandle,
- ) -> Self::CancelableWriteFuture<'_, T>
- where
- T: IoBuf,
- {
- // Submit the write operation
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.cancelable_write(buf, c)
- }
-
- #[inline]
- fn cancelable_writev<T: IoVecBuf>(
- &mut self,
- buf_vec: T,
- c: CancelHandle,
- ) -> Self::CancelableWritevFuture<'_, T> {
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.cancelable_writev(buf_vec, c)
- }
-
- #[inline]
- fn cancelable_flush(&mut self, c: CancelHandle) -> Self::CancelableFlushFuture<'_> {
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.cancelable_flush(c)
- }
-
- #[inline]
- fn cancelable_shutdown(&mut self, c: CancelHandle) -> Self::CancelableShutdownFuture<'_> {
- #[allow(cast_ref_to_mut)]
- let raw_stream = unsafe { &mut *(self.0 as *const Inner as *mut Inner) };
- raw_stream.cancelable_shutdown(c)
- }
}
impl<T> Splitable for T
where
- T: Split + AsyncReadRent + AsyncWriteRent,
+ T: Split + AsyncWriteRent,
{
- type Read<'cx> = ReadHalf<'cx, T> where Self: 'cx;
-
- type Write<'cx> = WriteHalf<'cx, T> where Self: 'cx;
-
type OwnedRead = OwnedReadHalf<T>;
-
type OwnedWrite = OwnedWriteHalf<T>;
+ #[inline]
fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite) {
let shared = Rc::new(UnsafeCell::new(self));
(OwnedReadHalf(shared.clone()), OwnedWriteHalf(shared))
}
-
- #[inline]
- fn split(&mut self) -> (Self::Read<'_>, Self::Write<'_>) {
- (ReadHalf(&*self), WriteHalf(&*self))
- }
}
impl<Inner> AsyncReadRent for OwnedReadHalf<Inner>
diff --git a/monoio/src/net/tcp/mod.rs b/monoio/src/net/tcp/mod.rs
index 4454053..c092c28 100644
--- a/monoio/src/net/tcp/mod.rs
+++ b/monoio/src/net/tcp/mod.rs
@@ -7,5 +7,5 @@ mod stream;
mod tfo;
pub use listener::TcpListener;
-pub use split::{TcpOwnedReadHalf, TcpOwnedWriteHalf, TcpReadHalf, TcpWriteHalf};
+pub use split::{TcpOwnedReadHalf, TcpOwnedWriteHalf};
pub use stream::{TcpConnectOpts, TcpStream};
diff --git a/monoio/src/net/tcp/split.rs b/monoio/src/net/tcp/split.rs
index 2b9ffc7..c8465f1 100644
--- a/monoio/src/net/tcp/split.rs
+++ b/monoio/src/net/tcp/split.rs
@@ -3,39 +3,14 @@ use std::{io, net::SocketAddr};
use super::TcpStream;
use crate::io::{
as_fd::{AsReadFd, AsWriteFd, SharedFdWrapper},
- OwnedReadHalf, OwnedWriteHalf, ReadHalf, WriteHalf,
+ OwnedReadHalf, OwnedWriteHalf,
};
-/// ReadHalf.
-pub type TcpReadHalf<'a> = ReadHalf<'a, TcpStream>;
-/// WriteHalf
-pub type TcpWriteHalf<'a> = WriteHalf<'a, TcpStream>;
-
-#[allow(cast_ref_to_mut)]
-impl<'t> AsReadFd for TcpReadHalf<'t> {
- #[inline]
- fn as_reader_fd(&mut self) -> &SharedFdWrapper {
- let raw_stream = unsafe { &mut *(self.0 as *const TcpStream as *mut TcpStream) };
- raw_stream.as_reader_fd()
- }
-}
-
-#[allow(cast_ref_to_mut)]
-impl<'t> AsWriteFd for TcpWriteHalf<'t> {
- #[inline]
- fn as_writer_fd(&mut self) -> &SharedFdWrapper {
- let raw_stream = unsafe { &mut *(self.0 as *const TcpStream as *mut TcpStream) };
- raw_stream.as_writer_fd()
- }
-}
-
/// OwnedReadHalf.
pub type TcpOwnedReadHalf = OwnedReadHalf<TcpStream>;
/// OwnedWriteHalf
pub type TcpOwnedWriteHalf = OwnedWriteHalf<TcpStream>;
-// impl Error for ReuniteError{}
-
impl TcpOwnedReadHalf {
/// Returns the remote address that this stream is connected to.
#[inline]
diff --git a/monoio/src/net/unix/mod.rs b/monoio/src/net/unix/mod.rs
index cfbdf40..a55a4f4 100644
--- a/monoio/src/net/unix/mod.rs
+++ b/monoio/src/net/unix/mod.rs
@@ -13,7 +13,7 @@ pub use datagram::UnixDatagram;
pub use listener::UnixListener;
pub use pipe::{new_pipe, Pipe};
pub use socket_addr::SocketAddr;
-pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf, UnixReadHalf, UnixWriteHalf};
+pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf};
pub use stream::UnixStream;
pub(crate) fn path_offset(sockaddr: &libc::sockaddr_un) -> usize {
diff --git a/monoio/src/net/unix/split.rs b/monoio/src/net/unix/split.rs
index 8b37bd7..a5503aa 100644
--- a/monoio/src/net/unix/split.rs
+++ b/monoio/src/net/unix/split.rs
@@ -3,33 +3,9 @@ use std::io;
use super::{SocketAddr, UnixStream};
use crate::io::{
as_fd::{AsReadFd, AsWriteFd, SharedFdWrapper},
- OwnedReadHalf, OwnedWriteHalf, ReadHalf, WriteHalf,
+ OwnedReadHalf, OwnedWriteHalf,
};
-/// ReadHalf.
-pub type UnixReadHalf<'a> = ReadHalf<'a, UnixStream>;
-
-/// WriteHalf.
-pub type UnixWriteHalf<'a> = WriteHalf<'a, UnixStream>;
-
-#[allow(cast_ref_to_mut)]
-impl<'t> AsReadFd for UnixReadHalf<'t> {
- #[inline]
- fn as_reader_fd(&mut self) -> &SharedFdWrapper {
- let raw_stream = unsafe { &mut *(self.0 as *const UnixStream as *mut UnixStream) };
- raw_stream.as_reader_fd()
- }
-}
-
-#[allow(cast_ref_to_mut)]
-impl<'t> AsWriteFd for UnixWriteHalf<'t> {
- #[inline]
- fn as_writer_fd(&mut self) -> &SharedFdWrapper {
- let raw_stream = unsafe { &mut *(self.0 as *const UnixStream as *mut UnixStream) };
- raw_stream.as_writer_fd()
- }
-}
-
/// OwnedReadHalf.
pub type UnixOwnedReadHalf = OwnedReadHalf<UnixStream>;
diff --git a/monoio/src/utils/ctrlc.rs b/monoio/src/utils/ctrlc.rs
index db302ae..f72b7f2 100644
--- a/monoio/src/utils/ctrlc.rs
+++ b/monoio/src/utils/ctrlc.rs
@@ -35,7 +35,7 @@ impl Future for CtrlC {
let new_waker = Box::new(cx.waker().clone());
let old_waker_ptr = WAKER.swap(Box::into_raw(new_waker), Ordering::SeqCst);
if !old_waker_ptr.is_null() {
- unsafe { Box::from_raw(old_waker_ptr) };
+ let _ = unsafe { Box::from_raw(old_waker_ptr) };
}
Poll::Pending
}
diff --git a/monoio/tests/tcp_echo.rs b/monoio/tests/tcp_echo.rs
index ce6ad6d..9f1c22d 100644
--- a/monoio/tests/tcp_echo.rs
+++ b/monoio/tests/tcp_echo.rs
@@ -53,8 +53,8 @@ async fn echo_server() {
assert!(tx.send(()).is_ok());
});
- let (mut stream, _) = srv.accept().await.unwrap();
- let (mut rd, mut wr) = stream.split();
+ let (stream, _) = srv.accept().await.unwrap();
+ let (mut rd, mut wr) = stream.into_split();
let n = io::copy(&mut rd, &mut wr).await.unwrap();
assert_eq!(n, (ITER * (msg.len() + iov_msg.len())) as u64);
diff --git a/monoio/tests/tcp_split.rs b/monoio/tests/tcp_split.rs
index c1877dc..4342b45 100644
--- a/monoio/tests/tcp_split.rs
+++ b/monoio/tests/tcp_split.rs
@@ -24,8 +24,8 @@ async fn split() -> Result<()> {
assert_eq!(&read_buf[..read_len], MSG);
});
- let mut stream = TcpStream::connect(&addr).await?;
- let (mut read_half, mut write_half) = stream.split();
+ let stream = TcpStream::connect(&addr).await?;
+ let (mut read_half, mut write_half) = stream.into_split();
let read_buf = Box::new([0u8; 32]);
let (read_res, buf) = read_half.read(read_buf).await;
diff --git a/monoio/tests/uds_split.rs b/monoio/tests/uds_split.rs
index 5f07ae8..21f8cef 100644
--- a/monoio/tests/uds_split.rs
+++ b/monoio/tests/uds_split.rs
@@ -11,10 +11,10 @@ use monoio::{
/// the connection.
#[monoio::test_all(entries = 1024)]
async fn split() -> std::io::Result<()> {
- let (mut a, mut b) = UnixStream::pair()?;
+ let (a, b) = UnixStream::pair()?;
- let (mut a_read, mut a_write) = a.split();
- let (mut b_read, mut b_write) = b.split();
+ let (mut a_read, mut a_write) = a.into_split();
+ let (mut b_read, mut b_write) = b.into_split();
let (a_response, b_response) = futures::future::try_join(
send_recv_all(&mut a_read, &mut a_write, b"A"),
diff --git a/monoio/tests/zero_copy.rs b/monoio/tests/zero_copy.rs
index a0c14db..6abed19 100644
--- a/monoio/tests/zero_copy.rs
+++ b/monoio/tests/zero_copy.rs
@@ -12,8 +12,8 @@ async fn zero_copy_for_tcp() {
let (mut c_tx, mut c_rx) = local_sync::oneshot::channel::<()>();
let addr = srv.local_addr().unwrap();
monoio::spawn(async move {
- let mut stream = TcpStream::connect(&addr).await.unwrap();
- let (mut rx, mut tx) = stream.split();
+ let stream = TcpStream::connect(&addr).await.unwrap();
+ let (mut rx, mut tx) = stream.into_split();
tx.write_all(MSG).await.0.unwrap();
let buf = Vec::<u8>::with_capacity(MSG.len()).slice_mut(0..MSG.len());
let (res, buf) = rx.read_exact(buf).await;
@@ -46,8 +46,8 @@ async fn zero_copy_for_uds() {
let srv = monoio::net::UnixListener::bind(&sock_path).unwrap();
let (mut c_tx, mut c_rx) = local_sync::oneshot::channel::<()>();
monoio::spawn(async move {
- let mut stream = UnixStream::connect(&sock_path).await.unwrap();
- let (mut rx, mut tx) = stream.split();
+ let stream = UnixStream::connect(&sock_path).await.unwrap();
+ let (mut rx, mut tx) = stream.into_split();
tx.write_all(MSG).await.0.unwrap();
let buf = Vec::<u8>::with_capacity(MSG.len()).slice_mut(0..MSG.len());
let (res, buf) = rx.read_exact(buf).await;