summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorneetdai <[email protected]>2023-07-02 17:20:46 +0800
committerGitHub <[email protected]>2023-07-02 17:20:46 +0800
commitaba979c378d72380a0deb59c7b7454d2dfc0bbfa (patch)
tree806f4adddf9f58b0d33b36a41a45233b7af06ce5
parent9c3592cbb15cda6091c98736e3db9dfd6a38c9ac (diff)
feat: impl async_buf_read_ext (#181)
* feat: impl async_buf_read_ext * style: fix style warning
-rw-r--r--monoio/src/io/async_buf_read_ext.rs129
-rw-r--r--monoio/src/io/mod.rs2
-rw-r--r--monoio/src/lib.rs1
3 files changed, 132 insertions, 0 deletions
diff --git a/monoio/src/io/async_buf_read_ext.rs b/monoio/src/io/async_buf_read_ext.rs
new file mode 100644
index 0000000..20d9b91
--- /dev/null
+++ b/monoio/src/io/async_buf_read_ext.rs
@@ -0,0 +1,129 @@
+use core::slice::memchr::memchr;
+use std::{
+ future::Future,
+ io::{Error, ErrorKind, Result},
+ ops::Drop,
+ str::from_utf8,
+};
+
+use crate::io::AsyncBufRead;
+
+struct Guard<'a> {
+ buf: &'a mut Vec<u8>,
+ len: usize,
+}
+
+impl<'a> Drop for Guard<'a> {
+ fn drop(&mut self) {
+ unsafe {
+ self.buf.set_len(self.len);
+ }
+ }
+}
+
+async fn read_until<A>(r: &mut A, delim: u8, buf: &mut Vec<u8>) -> Result<usize>
+where
+ A: AsyncBufRead + ?Sized,
+{
+ let mut read = 0;
+ loop {
+ let (done, used) = {
+ let available = match r.fill_buf().await {
+ Ok(n) => n,
+ Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+ Err(e) => return Err(e),
+ };
+
+ match memchr(delim, available) {
+ Some(i) => {
+ buf.extend_from_slice(&available[..=i]);
+ (true, i + 1)
+ }
+ None => {
+ buf.extend_from_slice(available);
+ (false, available.len())
+ }
+ }
+ };
+ r.consume(used);
+ read += used;
+ if done || used == 0 {
+ return Ok(read);
+ }
+ }
+}
+
+/// AsyncBufReadExt
+pub trait AsyncBufReadExt {
+ /// The future of read until Result<usize>
+ type ReadUntilFuture<'a>: Future<Output = Result<usize>>
+ where
+ Self: 'a;
+
+ /// This function will read bytes from the underlying stream until the delimiter or EOF is
+ /// found. Once found, all bytes up to, and including, the delimiter (if found) will be appended
+ /// to buf.
+ ///
+ /// If successful, this function will return the total number of bytes read.
+ ///
+ /// # Errors
+ /// This function will ignore all instances of ErrorKind::Interrupted and will otherwise return
+ /// any errors returned by fill_buf.
+ fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> Self::ReadUntilFuture<'a>;
+
+ /// The future of read line Result<usize>
+ type ReadLineFuture<'a>: Future<Output = Result<usize>>
+ where
+ Self: 'a;
+
+ /// This function will read bytes from the underlying stream until the newline delimiter (the
+ /// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if
+ /// found) will be appended to buf.
+ ///
+ /// If successful, this function will return the total number of bytes read.
+ ///
+ /// If this function returns Ok(0), the stream has reached EOF.
+ ///
+ /// # Errors
+ /// This function has the same error semantics as read_until and will also return an error if
+ /// the read bytes are not valid UTF-8. If an I/O error is encountered then buf may contain some
+ /// bytes already read in the event that all data read so far was valid UTF-8.
+ fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Self::ReadLineFuture<'a>;
+}
+
+impl<A> AsyncBufReadExt for A
+where
+ A: AsyncBufRead + ?Sized,
+{
+ type ReadUntilFuture<'a> = impl Future<Output = Result<usize>> + 'a where Self: 'a;
+
+ fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> Self::ReadUntilFuture<'a> {
+ read_until(self, byte, buf)
+ }
+
+ type ReadLineFuture<'a> = impl Future<Output = Result<usize>> + 'a where Self: 'a;
+
+ fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Self::ReadLineFuture<'a> {
+ async {
+ unsafe {
+ let mut g = Guard {
+ len: buf.len(),
+ buf: buf.as_mut_vec(),
+ };
+
+ let ret = read_until(self, b'\n', g.buf).await;
+ if from_utf8(&g.buf[g.len..]).is_err() {
+ ret.and_then(|_| {
+ Err(Error::new(
+ ErrorKind::InvalidData,
+ "stream did not contain valid UTF-8",
+ ))
+ })
+ } else {
+ g.len = g.buf.len();
+ ret
+ }
+ }
+ }
+ }
+}
diff --git a/monoio/src/io/mod.rs b/monoio/src/io/mod.rs
index 0b97aa3..fd9dcca 100644
--- a/monoio/src/io/mod.rs
+++ b/monoio/src/io/mod.rs
@@ -1,6 +1,7 @@
//! IO traits
mod async_buf_read;
+mod async_buf_read_ext;
mod async_read_rent;
mod async_read_rent_ext;
mod async_rent_cancelable;
@@ -16,6 +17,7 @@ pub mod as_fd;
pub mod splice;
pub use async_buf_read::AsyncBufRead;
+pub use async_buf_read_ext::AsyncBufReadExt;
pub use async_read_rent::{AsyncReadRent, AsyncReadRentAt};
pub use async_read_rent_ext::AsyncReadRentExt;
pub use async_rent_cancelable::{CancelableAsyncReadRent, CancelableAsyncWriteRent};
diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs
index f116421..9984ee9 100644
--- a/monoio/src/lib.rs
+++ b/monoio/src/lib.rs
@@ -19,6 +19,7 @@
#![feature(unboxed_closures)]
#![feature(once_cell)]
#![feature(lazy_cell)]
+#![feature(slice_internals)]
#[macro_use]
pub mod macros;