diff options
| author | ihc童鞋@提不起劲 <[email protected]> | 2023-06-09 15:26:22 +0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-06-09 15:26:22 +0800 |
| commit | 5e3a1c062b2912bc7fd08be5274388a0b83f1353 (patch) | |
| tree | 89789bc1c8c9d46196eba778f0cf9ecef2f32fa3 | |
| parent | 3d46a81951b1cdf0fc61bb52204f750bca7ca19a (diff) | |
feat: support ctrlc (#176)
| -rw-r--r-- | monoio/Cargo.toml | 4 | ||||
| -rw-r--r-- | monoio/src/driver/legacy/mod.rs | 10 | ||||
| -rw-r--r-- | monoio/src/driver/mod.rs | 14 | ||||
| -rw-r--r-- | monoio/src/driver/uring/mod.rs | 10 | ||||
| -rw-r--r-- | monoio/src/utils/ctrlc.rs | 64 | ||||
| -rw-r--r-- | monoio/src/utils/mod.rs | 5 | ||||
| -rw-r--r-- | monoio/tests/ctrlc_legacy.rs | 14 | ||||
| -rw-r--r-- | monoio/tests/ctrlc_uring.rs | 14 |
8 files changed, 131 insertions, 4 deletions
diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 285c0ef..abc927f 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -32,6 +32,7 @@ tokio = { version = "1", default-features = false, optional = true } tracing = { version = "0.1", default-features = false, features = [ "std", ], optional = true } +ctrlc = { version = "3", optional = true } # windows dependencies(will be added when windows support finished) # [target.'cfg(windows)'.dependencies] @@ -77,5 +78,8 @@ legacy = ["mio"] iouring = [] # tokio-compatiable(only have effect when legacy is enabled and iouring is not) tokio-compat = ["tokio"] +# signal enables setting ctrl_c handler +signal = ["ctrlc", "sync"] +signal-termination = ["signal", "ctrlc/termination"] # by default both iouring and legacy are enabled default = ["async-cancel", "bytes", "iouring", "legacy", "macros", "utils"] diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 3bcbaac..ffcf0f5 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -279,6 +279,13 @@ impl LegacyInner { data: Some(data), }) } + + #[cfg(feature = "sync")] + pub(crate) fn unpark(this: &Rc<UnsafeCell<LegacyInner>>) -> waker::UnparkHandle { + let inner = unsafe { &*this.get() }; + let weak = std::sync::Arc::downgrade(&inner.shared_waker); + waker::UnparkHandle(weak) + } } impl Driver for LegacyDriver { @@ -305,8 +312,7 @@ impl Driver for LegacyDriver { #[cfg(feature = "sync")] fn unpark(&self) -> Self::Unpark { - let weak = unsafe { std::sync::Arc::downgrade(&((*self.inner.get()).shared_waker)) }; - waker::UnparkHandle(weak) + LegacyInner::unpark(&self.inner) } } diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index 93b2f36..d509450 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -203,6 +203,7 @@ impl Inner { pub(crate) enum UnparkHandle { #[cfg(all(target_os = "linux", feature = "iouring"))] Uring(self::uring::UnparkHandle), + #[cfg(all(unix, feature = "legacy"))] Legacy(self::legacy::UnparkHandle), } @@ -238,3 +239,16 @@ impl From<self::legacy::UnparkHandle> for UnparkHandle { Self::Legacy(inner) } } + +#[cfg(feature = "sync")] +impl UnparkHandle { + #[allow(unused)] + pub(crate) fn current() -> Self { + CURRENT.with(|inner| match inner { + #[cfg(all(target_os = "linux", feature = "iouring"))] + Inner::Uring(this) => UringInner::unpark(this).into(), + #[cfg(all(unix, feature = "legacy"))] + Inner::Legacy(this) => LegacyInner::unpark(this).into(), + }) + } +} diff --git a/monoio/src/driver/uring/mod.rs b/monoio/src/driver/uring/mod.rs index 977a573..a9fba7d 100644 --- a/monoio/src/driver/uring/mod.rs +++ b/monoio/src/driver/uring/mod.rs @@ -285,8 +285,7 @@ impl Driver for IoUringDriver { #[cfg(feature = "sync")] fn unpark(&self) -> Self::Unpark { - let weak = unsafe { std::sync::Arc::downgrade(&((*self.inner.get()).shared_waker)) }; - waker::UnparkHandle(weak) + UringInner::unpark(&self.inner) } } @@ -426,6 +425,13 @@ impl UringInner { let _ = inner.uring.submission().push(&cancel); } } + + #[cfg(feature = "sync")] + pub(crate) fn unpark(this: &Rc<UnsafeCell<UringInner>>) -> waker::UnparkHandle { + let inner = unsafe { &*this.get() }; + let weak = std::sync::Arc::downgrade(&inner.shared_waker); + waker::UnparkHandle(weak) + } } impl AsRawFd for IoUringDriver { diff --git a/monoio/src/utils/ctrlc.rs b/monoio/src/utils/ctrlc.rs new file mode 100644 index 0000000..db302ae --- /dev/null +++ b/monoio/src/utils/ctrlc.rs @@ -0,0 +1,64 @@ +//! Forked from https://github.com/kennytm/async-ctrlc/blob/master/src/lib.rs + +use std::{ + future::Future, + marker::PhantomData, + pin::Pin, + ptr::null_mut, + sync::atomic::{AtomicBool, AtomicPtr, Ordering}, + task::{Context, Poll, Waker}, +}; + +use ctrlc::set_handler; +pub use ctrlc::Error; + +use crate::driver::{unpark::Unpark, UnparkHandle}; + +static WAKER: AtomicPtr<Waker> = AtomicPtr::new(null_mut()); +static ACTIVE: AtomicBool = AtomicBool::new(false); + +/// A future which is fulfilled when the program receives the Ctrl+C signal. +#[derive(Debug)] +pub struct CtrlC { + // Make it not Send or Sync since the signal handler holds an UnparkHandle + // of current thread. + // If users want to wake other threads, they should do it with channel manually. + _private: PhantomData<*const ()>, +} + +impl Future for CtrlC { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if ACTIVE.swap(false, Ordering::SeqCst) { + Poll::Ready(()) + } else { + let new_waker = Box::new(cx.waker().clone()); + let old_waker_ptr = WAKER.swap(Box::into_raw(new_waker), Ordering::SeqCst); + if !old_waker_ptr.is_null() { + unsafe { Box::from_raw(old_waker_ptr) }; + } + Poll::Pending + } + } +} + +impl CtrlC { + /// Creates a new `CtrlC` future. + /// + /// There should be at most one `CtrlC` instance in the whole program. The + /// second call to `Ctrl::new()` would return an error. + pub fn new() -> Result<Self, Error> { + let unpark_handler = UnparkHandle::current(); + set_handler(move || { + ACTIVE.store(true, Ordering::SeqCst); + let waker_ptr = WAKER.swap(null_mut(), Ordering::SeqCst); + if !waker_ptr.is_null() { + unsafe { Box::from_raw(waker_ptr) }.wake(); + } + let _ = unpark_handler.unpark(); + })?; + Ok(CtrlC { + _private: PhantomData, + }) + } +} diff --git a/monoio/src/utils/mod.rs b/monoio/src/utils/mod.rs index 3d60049..085bb30 100644 --- a/monoio/src/utils/mod.rs +++ b/monoio/src/utils/mod.rs @@ -9,6 +9,11 @@ mod rand; pub use rand::thread_rng_n; pub use uring_detect::detect_uring; +#[cfg(feature = "signal")] +mod ctrlc; +#[cfg(feature = "signal")] +pub use self::ctrlc::{CtrlC, Error as CtrlCError}; + #[cfg(feature = "utils")] mod bind_to_cpu_set; #[cfg(feature = "utils")] diff --git a/monoio/tests/ctrlc_legacy.rs b/monoio/tests/ctrlc_legacy.rs new file mode 100644 index 0000000..2a0fd53 --- /dev/null +++ b/monoio/tests/ctrlc_legacy.rs @@ -0,0 +1,14 @@ +#[cfg(feature = "signal")] +#[monoio::test(driver = "legacy")] +async fn test_ctrlc_legacy() { + use libc::{getpid, kill, SIGINT}; + use monoio::utils::CtrlC; + + let c = CtrlC::new().unwrap(); + std::thread::spawn(|| unsafe { + std::thread::sleep(std::time::Duration::from_millis(500)); + kill(getpid(), SIGINT); + }); + + c.await; +} diff --git a/monoio/tests/ctrlc_uring.rs b/monoio/tests/ctrlc_uring.rs new file mode 100644 index 0000000..602b963 --- /dev/null +++ b/monoio/tests/ctrlc_uring.rs @@ -0,0 +1,14 @@ +#[cfg(feature = "signal")] +#[monoio::test(driver = "uring")] +async fn test_ctrlc_uring() { + use libc::{getpid, kill, SIGINT}; + use monoio::utils::CtrlC; + + let c = CtrlC::new().unwrap(); + std::thread::spawn(|| unsafe { + std::thread::sleep(std::time::Duration::from_millis(500)); + kill(getpid(), SIGINT); + }); + + c.await; +} |
