summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorihc童鞋@提不起劲 <[email protected]>2023-06-09 15:26:22 +0800
committerGitHub <[email protected]>2023-06-09 15:26:22 +0800
commit5e3a1c062b2912bc7fd08be5274388a0b83f1353 (patch)
tree89789bc1c8c9d46196eba778f0cf9ecef2f32fa3
parent3d46a81951b1cdf0fc61bb52204f750bca7ca19a (diff)
feat: support ctrlc (#176)
-rw-r--r--monoio/Cargo.toml4
-rw-r--r--monoio/src/driver/legacy/mod.rs10
-rw-r--r--monoio/src/driver/mod.rs14
-rw-r--r--monoio/src/driver/uring/mod.rs10
-rw-r--r--monoio/src/utils/ctrlc.rs64
-rw-r--r--monoio/src/utils/mod.rs5
-rw-r--r--monoio/tests/ctrlc_legacy.rs14
-rw-r--r--monoio/tests/ctrlc_uring.rs14
8 files changed, 131 insertions, 4 deletions
diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml
index 285c0ef..abc927f 100644
--- a/monoio/Cargo.toml
+++ b/monoio/Cargo.toml
@@ -32,6 +32,7 @@ tokio = { version = "1", default-features = false, optional = true }
tracing = { version = "0.1", default-features = false, features = [
"std",
], optional = true }
+ctrlc = { version = "3", optional = true }
# windows dependencies(will be added when windows support finished)
# [target.'cfg(windows)'.dependencies]
@@ -77,5 +78,8 @@ legacy = ["mio"]
iouring = []
# tokio-compatiable(only have effect when legacy is enabled and iouring is not)
tokio-compat = ["tokio"]
+# signal enables setting ctrl_c handler
+signal = ["ctrlc", "sync"]
+signal-termination = ["signal", "ctrlc/termination"]
# by default both iouring and legacy are enabled
default = ["async-cancel", "bytes", "iouring", "legacy", "macros", "utils"]
diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs
index 3bcbaac..ffcf0f5 100644
--- a/monoio/src/driver/legacy/mod.rs
+++ b/monoio/src/driver/legacy/mod.rs
@@ -279,6 +279,13 @@ impl LegacyInner {
data: Some(data),
})
}
+
+ #[cfg(feature = "sync")]
+ pub(crate) fn unpark(this: &Rc<UnsafeCell<LegacyInner>>) -> waker::UnparkHandle {
+ let inner = unsafe { &*this.get() };
+ let weak = std::sync::Arc::downgrade(&inner.shared_waker);
+ waker::UnparkHandle(weak)
+ }
}
impl Driver for LegacyDriver {
@@ -305,8 +312,7 @@ impl Driver for LegacyDriver {
#[cfg(feature = "sync")]
fn unpark(&self) -> Self::Unpark {
- let weak = unsafe { std::sync::Arc::downgrade(&((*self.inner.get()).shared_waker)) };
- waker::UnparkHandle(weak)
+ LegacyInner::unpark(&self.inner)
}
}
diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs
index 93b2f36..d509450 100644
--- a/monoio/src/driver/mod.rs
+++ b/monoio/src/driver/mod.rs
@@ -203,6 +203,7 @@ impl Inner {
pub(crate) enum UnparkHandle {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Uring(self::uring::UnparkHandle),
+ #[cfg(all(unix, feature = "legacy"))]
Legacy(self::legacy::UnparkHandle),
}
@@ -238,3 +239,16 @@ impl From<self::legacy::UnparkHandle> for UnparkHandle {
Self::Legacy(inner)
}
}
+
+#[cfg(feature = "sync")]
+impl UnparkHandle {
+ #[allow(unused)]
+ pub(crate) fn current() -> Self {
+ CURRENT.with(|inner| match inner {
+ #[cfg(all(target_os = "linux", feature = "iouring"))]
+ Inner::Uring(this) => UringInner::unpark(this).into(),
+ #[cfg(all(unix, feature = "legacy"))]
+ Inner::Legacy(this) => LegacyInner::unpark(this).into(),
+ })
+ }
+}
diff --git a/monoio/src/driver/uring/mod.rs b/monoio/src/driver/uring/mod.rs
index 977a573..a9fba7d 100644
--- a/monoio/src/driver/uring/mod.rs
+++ b/monoio/src/driver/uring/mod.rs
@@ -285,8 +285,7 @@ impl Driver for IoUringDriver {
#[cfg(feature = "sync")]
fn unpark(&self) -> Self::Unpark {
- let weak = unsafe { std::sync::Arc::downgrade(&((*self.inner.get()).shared_waker)) };
- waker::UnparkHandle(weak)
+ UringInner::unpark(&self.inner)
}
}
@@ -426,6 +425,13 @@ impl UringInner {
let _ = inner.uring.submission().push(&cancel);
}
}
+
+ #[cfg(feature = "sync")]
+ pub(crate) fn unpark(this: &Rc<UnsafeCell<UringInner>>) -> waker::UnparkHandle {
+ let inner = unsafe { &*this.get() };
+ let weak = std::sync::Arc::downgrade(&inner.shared_waker);
+ waker::UnparkHandle(weak)
+ }
}
impl AsRawFd for IoUringDriver {
diff --git a/monoio/src/utils/ctrlc.rs b/monoio/src/utils/ctrlc.rs
new file mode 100644
index 0000000..db302ae
--- /dev/null
+++ b/monoio/src/utils/ctrlc.rs
@@ -0,0 +1,64 @@
+//! Forked from https://github.com/kennytm/async-ctrlc/blob/master/src/lib.rs
+
+use std::{
+ future::Future,
+ marker::PhantomData,
+ pin::Pin,
+ ptr::null_mut,
+ sync::atomic::{AtomicBool, AtomicPtr, Ordering},
+ task::{Context, Poll, Waker},
+};
+
+use ctrlc::set_handler;
+pub use ctrlc::Error;
+
+use crate::driver::{unpark::Unpark, UnparkHandle};
+
+static WAKER: AtomicPtr<Waker> = AtomicPtr::new(null_mut());
+static ACTIVE: AtomicBool = AtomicBool::new(false);
+
+/// A future which is fulfilled when the program receives the Ctrl+C signal.
+#[derive(Debug)]
+pub struct CtrlC {
+ // Make it not Send or Sync since the signal handler holds an UnparkHandle
+ // of current thread.
+ // If users want to wake other threads, they should do it with channel manually.
+ _private: PhantomData<*const ()>,
+}
+
+impl Future for CtrlC {
+ type Output = ();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if ACTIVE.swap(false, Ordering::SeqCst) {
+ Poll::Ready(())
+ } else {
+ let new_waker = Box::new(cx.waker().clone());
+ let old_waker_ptr = WAKER.swap(Box::into_raw(new_waker), Ordering::SeqCst);
+ if !old_waker_ptr.is_null() {
+ unsafe { Box::from_raw(old_waker_ptr) };
+ }
+ Poll::Pending
+ }
+ }
+}
+
+impl CtrlC {
+ /// Creates a new `CtrlC` future.
+ ///
+ /// There should be at most one `CtrlC` instance in the whole program. The
+ /// second call to `Ctrl::new()` would return an error.
+ pub fn new() -> Result<Self, Error> {
+ let unpark_handler = UnparkHandle::current();
+ set_handler(move || {
+ ACTIVE.store(true, Ordering::SeqCst);
+ let waker_ptr = WAKER.swap(null_mut(), Ordering::SeqCst);
+ if !waker_ptr.is_null() {
+ unsafe { Box::from_raw(waker_ptr) }.wake();
+ }
+ let _ = unpark_handler.unpark();
+ })?;
+ Ok(CtrlC {
+ _private: PhantomData,
+ })
+ }
+}
diff --git a/monoio/src/utils/mod.rs b/monoio/src/utils/mod.rs
index 3d60049..085bb30 100644
--- a/monoio/src/utils/mod.rs
+++ b/monoio/src/utils/mod.rs
@@ -9,6 +9,11 @@ mod rand;
pub use rand::thread_rng_n;
pub use uring_detect::detect_uring;
+#[cfg(feature = "signal")]
+mod ctrlc;
+#[cfg(feature = "signal")]
+pub use self::ctrlc::{CtrlC, Error as CtrlCError};
+
#[cfg(feature = "utils")]
mod bind_to_cpu_set;
#[cfg(feature = "utils")]
diff --git a/monoio/tests/ctrlc_legacy.rs b/monoio/tests/ctrlc_legacy.rs
new file mode 100644
index 0000000..2a0fd53
--- /dev/null
+++ b/monoio/tests/ctrlc_legacy.rs
@@ -0,0 +1,14 @@
+#[cfg(feature = "signal")]
+#[monoio::test(driver = "legacy")]
+async fn test_ctrlc_legacy() {
+ use libc::{getpid, kill, SIGINT};
+ use monoio::utils::CtrlC;
+
+ let c = CtrlC::new().unwrap();
+ std::thread::spawn(|| unsafe {
+ std::thread::sleep(std::time::Duration::from_millis(500));
+ kill(getpid(), SIGINT);
+ });
+
+ c.await;
+}
diff --git a/monoio/tests/ctrlc_uring.rs b/monoio/tests/ctrlc_uring.rs
new file mode 100644
index 0000000..602b963
--- /dev/null
+++ b/monoio/tests/ctrlc_uring.rs
@@ -0,0 +1,14 @@
+#[cfg(feature = "signal")]
+#[monoio::test(driver = "uring")]
+async fn test_ctrlc_uring() {
+ use libc::{getpid, kill, SIGINT};
+ use monoio::utils::CtrlC;
+
+ let c = CtrlC::new().unwrap();
+ std::thread::spawn(|| unsafe {
+ std::thread::sleep(std::time::Duration::from_millis(500));
+ kill(getpid(), SIGINT);
+ });
+
+ c.await;
+}