summaryrefslogtreecommitdiff
path: root/bindings/rs-timeout/src/timeout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'bindings/rs-timeout/src/timeout.rs')
-rw-r--r--bindings/rs-timeout/src/timeout.rs634
1 files changed, 634 insertions, 0 deletions
diff --git a/bindings/rs-timeout/src/timeout.rs b/bindings/rs-timeout/src/timeout.rs
new file mode 100644
index 0000000..d3382d4
--- /dev/null
+++ b/bindings/rs-timeout/src/timeout.rs
@@ -0,0 +1,634 @@
+use std::{panic, ptr::NonNull};
+
+use libc::c_void;
+
+use crate::timeout_bind::*;
+
+/// timeout type:
+/// - INT: Periodic timeout, start time only supports relative time.
+/// - ABS: Executed only once, start time is absolute time.
+/// - Default: Executed only once, start time is relative time
+/// ToType could use as i32
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum ToType {
+ INT = TIMEOUT_INT as isize, // periodic timeout, relative time
+ ABS = TIMEOUT_ABS as isize, // onetime timeout, absolute time
+ Default = TIMEOUT_DEFAULT as isize, // onetime timeout, relative time
+}
+// as i32
+impl From<ToType> for i32 {
+ fn from(timeout_type: ToType) -> Self {
+ timeout_type as i32
+ }
+}
+
+impl ToType {
+ // i32 -> ToType
+ fn new(flag: i32) -> Self {
+ match flag {
+ TIMEOUT_INT => ToType::INT,
+ TIMEOUT_ABS => ToType::ABS,
+ _ => ToType::Default,
+ }
+ }
+}
+
+#[repr(C)]
+#[derive(Debug)]
+pub struct TimeoutCallBack<'a, T> {
+ pub fn_ptr: Option<extern "C" fn(arg: &mut T)>,
+ pub arg: &'a mut T,
+}
+
+/// TimeoutCallBack<T> -> timeout_cb
+impl<T> From<TimeoutCallBack<'_, T>> for timeout_cb {
+ fn from(timeout_cb: TimeoutCallBack<T>) -> Self {
+ timeout_cb {
+ fn_ptr: timeout_cb.fn_ptr.map(|f| unsafe { std::mem::transmute(f) }),
+ arg: timeout_cb.arg as *mut T as *mut c_void,
+ }
+ }
+}
+
+// type TimeoutCallBack = timeout_cb; // callback
+impl<'a, T> TimeoutCallBack<'a, T> {
+ pub fn new(callback: extern "C" fn(arg: &mut T), arg: &'a mut T) -> Self {
+ let fn_ptr = Some(callback);
+ TimeoutCallBack { fn_ptr, arg }
+ }
+}
+
+impl timeout_cb {
+ fn call(&self) -> bool {
+ if let Some(callback) = self.fn_ptr {
+ let result = panic::catch_unwind(|| unsafe {
+ callback(self.arg);
+ });
+ if result.is_ok() {
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
+/// on C side: callback function is hook<F>, arg is closure.
+/// run callback is run hook<F>, make hook like a exec for closure.
+unsafe extern "C" fn hook<F>(arg: *mut c_void)
+where
+ F: FnMut(),
+{
+ let closure = &mut *(arg as *mut F);
+ closure();
+}
+
+/// return callback exec function
+/// without the actual effect, only the compiler associated HOOK and F
+pub(crate) fn get_cb_exec<F>(_closure: &F) -> unsafe extern "C" fn(arg: *mut c_void)
+where
+ F: FnMut(), // closure
+{
+ hook::<F>
+}
+
+type Timeout = timeout;
+
+impl Timeout {
+ pub fn new(flags: ToType) -> Result<Timeout, &'static str> {
+ // timeout instantiated in rust rather than C side
+ let raw = Box::into_raw(Box::new(timeout::default()));
+ // Box::into_raw let instance ownership transfer to C
+ let raw = unsafe { timeout_init(raw, flags as i32) };
+ if raw.is_null() {
+ return Err("Failed to create Timeout");
+ }
+ // Box::from_raw: instance ownership from C side to rust.
+ Ok(*unsafe { Box::from_raw(raw) })
+ }
+ /// transfer *timeout to Timeout
+ /// instance ownership form C side to rust
+ pub(crate) fn transfer(to: *mut timeout) -> Timeout {
+ *unsafe { Box::from_raw(to) }
+ }
+ /// set callback
+ pub fn set_cb<T>(&mut self, cb: TimeoutCallBack<T>) {
+ self.callback = cb.into();
+ }
+
+ /// set callback by timeout_cb
+ /// for test
+ pub(crate) fn set_cb_raw(&mut self, cb: timeout_cb) {
+ self.callback = cb.into();
+ }
+
+ /// set closure
+ pub fn set_cb_closure<F>(&mut self, closure: &mut F)
+ where
+ F: FnMut(),
+ {
+ // let (cb_exec, mut cb) = make_cb!(closure);
+ let cb_exec = get_cb_exec(closure);
+ let exec_ptr = Box::into_raw(Box::new(cb_exec));
+
+ let to_cb = timeout_cb {
+ fn_ptr: Some(unsafe { *exec_ptr }),
+ arg: closure as *mut _ as *mut c_void,
+ };
+ self.callback = to_cb;
+ }
+
+ /// return true if timeout is registered and on timing wheel
+ pub fn is_pending(&self) -> bool {
+ //C code: return to->pending && to->pending != &to->timeouts->expired;
+ if !self.pending.is_null() // pending not null
+ && !self.timeouts.is_null() // timeouts not null
+ && self.pending != (unsafe { &mut (*self.timeouts).expired // pending not expired
+ })
+ {
+ return true;
+ }
+ false
+ }
+
+ /// return true if timeout is registered and on expired queue
+ pub fn is_expired(&self) -> bool {
+ //return to->pending && to->pending == &to->timeouts->expired;
+ if !self.pending.is_null() // pending not null
+ && !self.timeouts.is_null() // timeouts not null
+ && self.pending == (unsafe { &mut (*self.timeouts).expired // pending is expired
+ }) {
+ return true;
+ }
+ false
+ }
+ /// return true if timeout is periodic
+ pub fn is_periodic(&self) -> bool {
+ // if ((to->flags & TIMEOUT_INT) && to->interval > 0)
+ return (self.flags & TIMEOUT_INT != 0) && (self.interval > 0);
+ }
+
+ /// remove timeout from any timing wheel or expired queue (okay if on neither)
+ pub fn delete(&mut self) {
+ unsafe { timeout_del(self) };
+ }
+
+ /// return true if cb is not null and callback has executed
+ pub fn run_cb(&self) -> bool {
+ let cb = self.callback;
+ return cb.call();
+ }
+}
+
+impl Drop for Timeout {
+ fn drop(&mut self) {
+ self.delete(); // delete
+ }
+}
+
+impl Timeout {
+ /// create periodic timeout
+ pub fn new_int() -> Result<Timeout, &'static str> {
+ Timeout::new(ToType::INT)
+ }
+ // create onetime timeout with absolute time
+ pub fn new_abs() -> Result<Timeout, &'static str> {
+ Timeout::new(ToType::ABS)
+ }
+ /// create onetime timeout with relative time
+ pub fn new_default() -> Result<Timeout, &'static str> {
+ Timeout::new(ToType::Default)
+ }
+}
+
+/// delete periodic timeout
+/// No ownership of the Timeout object is passed in
+pub fn delete_to_periodic(to: &Timeout) {
+ if to.is_periodic() {
+ unsafe { timeout_del(to as *const _ as *mut timeout) };
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum TomItType {
+ PENDING = TIMEOUTS_PENDING as isize,
+ EXPIRED = TIMEOUTS_EXPIRED as isize,
+ ALL = TIMEOUTS_ALL as isize,
+ // CLEAR = TIMEOUTS_CLEAR as isize,
+}
+// as i32
+impl From<TomItType> for i32 {
+ fn from(flag: TomItType) -> Self {
+ flag as i32
+ }
+}
+
+impl TomItType {
+ pub fn new(flag: i32) -> Self {
+ match flag {
+ TIMEOUTS_PENDING => TomItType::PENDING,
+ TIMEOUTS_EXPIRED => TomItType::EXPIRED,
+ TIMEOUTS_ALL => TomItType::ALL,
+ // TIMEOUTS_CLEAR => TimeoutSItFlag::CLEAR, // CLEAR means clear all expired timeout on expire queue.
+ // this creates complications in ownership
+ _ => TomItType::ALL,
+ }
+ }
+}
+
+type TomIt = timeouts_it;
+
+impl TomIt {
+ /// flag has 3 value: PENDING, EXPIRED, ALL
+ fn new(flags: TomItType) -> TomIt {
+ let mut instance = timeouts_it::default();
+ TIMEOUTS_IT_INIT(&mut instance, flags as i32);
+ instance
+ }
+}
+
+/// expired timeout return type
+pub enum ToR {
+ OneTime(Timeout), // instance ownership from C side to rust
+ Periodic(&'static Timeout), // instance ownership still on C side
+}
+
+// TimeoutManager
+pub struct TimeoutManager {
+ tos: NonNull<timeouts>,
+}
+
+impl TimeoutManager {
+ /// TIMEOUT_mHZ 1000; TIMEOUT_uHZ 1000000; TIMEOUT_nHZ 1000000000;
+ /// if hz_set = 0, default hz_set = TIMEOUT_mHZ
+ pub fn new(hz_set: timeout_t) -> Result<TimeoutManager, &'static str> {
+ let mut err = 0 as usize;
+ // if hz_set = 0, default set to TIMEOUT_mHZ (timeouts_open)
+ let tos = unsafe { timeouts_open(hz_set, &mut err) };
+ if err != 0 {
+ return Err("Failed to create timeout manager, null");
+ }
+ let tos = NonNull::new(tos).ok_or("Failed to create timeout manager, null")?;
+ Ok(TimeoutManager { tos })
+ }
+
+ // get raw pointer
+ fn get_raw(&self) -> *const timeouts {
+ self.tos.as_ptr()
+ }
+
+ // get raw mut pointer
+ fn get_raw_mut(&self) -> *mut timeouts {
+ self.tos.as_ptr()
+ }
+
+ // close
+ pub fn close(&mut self) {
+ unsafe {
+ timeouts_close(self.get_raw_mut());
+ }
+ }
+
+ fn update_time(&mut self, time: timeout_t, timeout_type: ToType) {
+ match timeout_type {
+ ToType::INT => unsafe { timeouts_step(self.get_raw_mut(), time) },
+ ToType::ABS => unsafe { timeouts_update(self.get_raw_mut(), time) },
+ ToType::Default => unsafe { timeouts_step(self.get_raw_mut(), time) },
+ }
+ }
+ /// update time: relative time
+ pub fn update_time_rel(&mut self, time: timeout_t) {
+ self.update_time(time, ToType::Default);
+ }
+ /// update time: absolute time
+ pub fn update_time_abs(&mut self, current_time: timeout_t) {
+ self.update_time(current_time, ToType::ABS);
+ }
+ /// get tos hz
+ pub fn get_hz(&self) -> timeout_t {
+ unsafe { timeouts_hz(self.get_raw()) }
+ }
+ /// return interval to next required update.
+ ///
+ /// careful for two case:
+ /// - return value could be u64::MAX, it means no timeout on timing wheel
+ /// - return value will always be less than the next most recent timeout expiration time
+ pub fn get_next_wait_time(&self) -> timeout_t {
+ unsafe { timeouts_timeout(self.get_raw_mut()) }
+ }
+ /// return true if any timeouts pending on timing wheel
+ pub fn any_pending(&self) -> bool {
+ unsafe { timeouts_pending(self.get_raw()) }
+ }
+ /// return true if any timeouts on expired queue
+ pub fn any_expired(&self) -> bool {
+ unsafe { timeouts_expired(self.get_raw()) }
+ }
+ // return true if TimeoutManager is effective
+ pub fn check(&self) -> bool {
+ unsafe { timeouts_check(self.get_raw_mut(), stderr) }
+ }
+}
+
+impl TimeoutManager {
+ /// add Timeout to timing wheel
+ /// Timeout type:
+ /// - INT: first expired on now + timeout, then expired at now + timeout + timeout + ...
+ /// even if it's expired,TimeoutManger will not auto renew it. must consume it use expired_timeouts.
+ /// - ABS: expired on timeout, then expired only once.
+ /// - Default: first expired on now + timeout, then expired only once.
+ /// Pass in ownership of the Timeout object
+ pub fn add(&mut self, to: Timeout, timeout: timeout_t) {
+ let to_ptr = Box::into_raw(Box::new(to));
+ unsafe { timeouts_add(self.get_raw_mut(), to_ptr, timeout) };
+ }
+ /// remove Timeout from any timing wheel or expired queue (okay if on neither)
+ /// No ownership of the Timeout object is passed in
+ pub fn delete(&mut self, to: &mut Timeout) {
+ unsafe { timeouts_del(self.get_raw_mut(), to) };
+ }
+
+ /// consume expired timeout
+ /// return next expired timeout until the queue is empty, then returns NULL.
+ /// ToR::OneTime: Ownership of timeout objects moved from C to rust
+ /// all pending/expired flag has cleared.
+ /// ToR::Periodic: Ownership still on C side
+ pub(crate) fn expired_timeout<'a>(&'a mut self) -> Option<ToR> {
+ let to_ptr: *mut timeout = unsafe { timeouts_get(self.get_raw_mut()) };
+ if to_ptr.is_null() {
+ return None;
+ }
+ if unsafe { (*to_ptr).is_periodic() } {
+ return Some(ToR::Periodic(unsafe { &*to_ptr }));
+ }
+ return Some(ToR::OneTime(Timeout::transfer(to_ptr)));
+ }
+ /// return next expired timeout iterator
+ /// ToR::OneTime: Ownership of timeout objects moved from C to rust
+ /// all pending/expired flag has cleared.
+ /// ToR::Periodic: Ownership still on C side
+ pub fn expired_timeouts(&mut self) -> ExpireIter {
+ ExpireIter {
+ timeout_manager: self,
+ }
+ }
+ ///
+ /// return next quote of timeout as timeout_sit requested, or NULL if none
+ /// No ownership of the Timeout object is passed in from C to rust
+ /// No consume any timeout
+ pub(crate) fn next_timeout<'a>(&mut self, tos_it: &mut TomIt) -> Option<&'a Timeout> {
+ let to_ptr: *mut timeout = unsafe { timeouts_next(self.get_raw_mut(), tos_it) };
+ if to_ptr.is_null() {
+ return None;
+ }
+ return Some(unsafe { &*to_ptr });
+ }
+ /// return next timeout quote iterator(as requested by tos_it_type)
+ pub fn next_timeouts(&mut self, tos_it_type: TomItType) -> NextIter {
+ let tos_it = TomIt::new(tos_it_type);
+ NextIter {
+ timeout_manager: self,
+ timeout_sit: tos_it,
+ }
+ }
+}
+
+impl Drop for TimeoutManager {
+ fn drop(&mut self) {
+ self.close();
+ }
+}
+
+impl TimeoutManager {
+ pub fn next_timeouts_pending(&mut self) -> NextIter {
+ self.next_timeouts(TomItType::PENDING)
+ }
+ pub fn next_timeouts_expired(&mut self) -> NextIter {
+ self.next_timeouts(TomItType::EXPIRED)
+ }
+ pub fn next_timeouts_all(&mut self) -> NextIter {
+ self.next_timeouts(TomItType::ALL)
+ }
+}
+
+pub struct ExpireIter<'a> {
+ timeout_manager: &'a mut TimeoutManager,
+}
+
+impl<'a> Iterator for ExpireIter<'a> {
+ type Item = ToR;
+ fn next(&mut self) -> Option<Self::Item> {
+ self.timeout_manager.expired_timeout()
+ }
+}
+
+pub struct NextIter<'a> {
+ timeout_manager: &'a mut TimeoutManager,
+ timeout_sit: TomIt,
+}
+
+impl<'a> Iterator for NextIter<'a> {
+ type Item = &'a Timeout;
+ fn next(&mut self) -> Option<Self::Item> {
+ self.timeout_manager.next_timeout(&mut self.timeout_sit)
+ }
+}
+
+#[cfg(test)]
+#[allow(unused_variables)]
+mod tests {
+ use std::{cell::RefCell, rc::Rc};
+
+ use super::*;
+
+ #[test]
+ fn test_timeout_type() {
+ let int_type = ToType::INT;
+ let abs_type = ToType::ABS;
+ let default_type = ToType::Default;
+
+ assert_eq!(i32::from(int_type), TIMEOUT_INT);
+ assert_eq!(i32::from(abs_type), TIMEOUT_ABS);
+ assert_eq!(i32::from(default_type), TIMEOUT_DEFAULT);
+
+ assert_eq!(ToType::new(TIMEOUT_INT), int_type);
+ assert_eq!(ToType::new(TIMEOUT_ABS), abs_type);
+ assert_eq!(ToType::new(123), default_type);
+ }
+
+ #[test]
+ fn test_timeout() {
+ let to = Timeout::new(ToType::Default).unwrap(); // relative timeout
+ assert!(!to.is_pending());
+ assert!(!to.is_expired());
+
+ let to2 = Timeout::new(ToType::INT).unwrap(); // relative timeout
+ assert!(!to2.is_pending());
+ assert!(!to2.is_expired());
+
+ let mut tos = TimeoutManager::new(TIMEOUT_mHZ).unwrap();
+ tos.update_time_rel(0); // tos.now = 0
+ tos.add(to, 100); // onetime, expired time = tos.now + 100
+ tos.add(to2, 100); // periodic
+
+ let mut tos_it = TomIt::new(TomItType::PENDING);
+ let to = tos.next_timeout(&mut tos_it).unwrap();
+ let to2 = tos.next_timeout(&mut tos_it).unwrap();
+
+ tos.update_time_rel(1); // tos.now = 1
+ assert!(to.is_pending());
+ assert!(!to.is_expired());
+
+ tos.update_time_rel(98); // tos.now = 99
+ assert!(to.is_pending());
+ assert!(!to.is_expired());
+
+ tos.update_time_rel(10); // tos.now = 109
+
+ for to in tos.next_timeouts(TomItType::ALL) {
+ assert!(!to.is_pending());
+ assert!(to.is_expired());
+ }
+
+ for to in tos.expired_timeouts() {
+ match to {
+ ToR::OneTime(temp) => {
+ // all flag has cleared
+ // assert!(to.is_expired());
+ }
+ ToR::Periodic(temp) => {
+ assert!(temp.is_periodic()); // next expired time = 200
+ }
+ };
+ }
+
+ tos.update_time_rel(110); // tos.now = 219
+
+ let mut temp_flag = false;
+
+ for to in tos.expired_timeouts() {
+ match to {
+ ToR::OneTime(temp) => {
+ // all flag has cleared
+ // assert!(to.is_expired());
+ }
+ ToR::Periodic(temp) => {
+ assert!(temp.is_periodic());
+ temp_flag = true;
+ }
+ };
+ }
+
+ assert!(temp_flag);
+ }
+
+ #[test]
+ fn test_timeout_sit_flag_into_i32() {
+ let pending = TomItType::PENDING;
+ let expired = TomItType::EXPIRED;
+ let all = TomItType::ALL;
+
+ assert_eq!(i32::from(pending), TIMEOUTS_PENDING);
+ assert_eq!(i32::from(expired), TIMEOUTS_EXPIRED);
+ assert_eq!(i32::from(all), TIMEOUTS_ALL);
+
+ assert_eq!(TomItType::new(TIMEOUTS_PENDING), pending);
+ assert_eq!(TomItType::new(TIMEOUTS_EXPIRED), expired);
+ assert_eq!(TomItType::new(TIMEOUTS_ALL), all);
+ assert_eq!(TomItType::new(123), all);
+ }
+
+ #[test]
+ fn test_timeout_manger() {
+ let mut tos = TimeoutManager::new(TIMEOUT_mHZ).unwrap();
+ assert_eq!(tos.get_hz(), TIMEOUT_mHZ);
+ assert!(tos.check());
+
+ tos.update_time_abs(0);
+ assert_eq!(tos.get_next_wait_time(), u64::MAX); // no timeout wait, so wait time is u64::MAX
+
+ let timeout = Timeout::new(ToType::Default).unwrap(); // relative timeout
+ tos.add(timeout, 100); // expired time = tos.now + 100
+
+ let mut tos_it = TomIt::new(TomItType::PENDING);
+ let to = tos.next_timeout(&mut tos_it);
+ let to = to.unwrap();
+
+ tos.update_time_abs(30);
+ assert!(tos.any_pending());
+ assert!(!tos.any_expired());
+ assert!(tos.get_next_wait_time() < 70);
+
+ assert!(to.is_pending());
+ assert!(!to.is_expired());
+
+ tos.update_time_abs(100);
+ assert!(!tos.any_pending());
+ assert!(tos.any_expired());
+ assert_eq!(tos.get_next_wait_time(), 0);
+
+ tos.update_time_abs(150);
+
+ assert!(!to.is_pending());
+ assert!(to.is_expired());
+
+ let timeout2 = tos.expired_timeout();
+ assert!(timeout2.is_some());
+ }
+
+ // just for test
+ #[derive(Clone, Debug, PartialEq, Eq)]
+ pub struct Session {
+ pub session_id: String,
+ }
+ impl Drop for Session {
+ fn drop(&mut self) {
+ println!("drop session: {}", self.session_id);
+ }
+ }
+
+ #[test]
+ #[allow(unused_variables)]
+ fn test_cb() {
+ let mut timeout = Timeout::new(ToType::Default).unwrap();
+
+ extern "C" fn rust_callback2(arg: &mut i32) {
+ println!("Callback executed with arg: {}", arg);
+ }
+ let mut binding = 11;
+ let cb = TimeoutCallBack::new(rust_callback2, &mut binding);
+ timeout.set_cb(cb);
+ timeout.run_cb();
+
+ extern "C" fn rust_callback3(arg: &mut Rc<RefCell<Session>>) {
+ let value = arg.borrow();
+ println!("Callback executed with session_id: {}", value.session_id);
+ }
+
+ let session = Session {
+ session_id: "123".to_string(),
+ };
+ let session_ref = Rc::new(RefCell::new(session));
+ let mut binding = session_ref.clone();
+ let cb = TimeoutCallBack::new(rust_callback3, &mut binding);
+ timeout.set_cb(cb);
+ timeout.run_cb();
+ }
+
+ #[test]
+ #[allow(unused_variables)]
+ fn test_cb_closure() {
+ let mut timeout = Timeout::new(ToType::Default).unwrap();
+
+ let mut session = Session {
+ session_id: "123".to_string(),
+ };
+
+ timeout.set_cb_closure(&mut || {
+ println!("Callback executed with session_id: {}", session.session_id);
+ session.session_id = "456".to_string();
+ println!("Callback executed with session_id: {}", session.session_id);
+ });
+
+ timeout.run_cb();
+ }
+}