From 142e30257e2d852b381f1ca47257095b888627cd Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Fri, 1 Sep 2023 14:40:25 +0800 Subject: [refactor] Event registration is moved to Plugin's init function --- src/event/manager.rs | 58 +++++--------------------- src/main.rs | 62 ++-------------------------- src/packet/packet.rs | 6 +-- src/plugin/example.rs | 109 ++++++++++++++++++++++++++++++++++++++----------- src/session/manager.rs | 8 ++-- src/session/session.rs | 31 ++++++++------ 6 files changed, 124 insertions(+), 150 deletions(-) diff --git a/src/event/manager.rs b/src/event/manager.rs index 587a00b..64975d6 100644 --- a/src/event/manager.rs +++ b/src/event/manager.rs @@ -6,6 +6,7 @@ use std::collections::VecDeque; use std::rc::Rc; pub trait EventHandle { + fn init(&mut self); fn handle(&mut self, index: usize, packet: &Packet, session: Option>>); } @@ -83,6 +84,7 @@ impl EventManager { #[cfg(test)] mod tests { + use crate::event::manager::EventHandle; use crate::packet::packet::Packet; use crate::plugin::example::ExamplePulgin; use crate::thread::thread::ThreadContex; @@ -214,57 +216,13 @@ mod tests { ]; // Create plugin - let plugin = ExamplePulgin::new( - "Example Plugin", - Rc::new(RefCell::new(String::from("Hello"))), - ); - - // Create event manager let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); - let mgr = thread_ctx.borrow_mut().get_event_mgr(); - - // Register event - let l2_event = mgr.borrow_mut().event2index("BUILDIN_L2_EVENT"); - let l3_event = mgr.borrow_mut().event2index("BUILDIN_L3_EVENT"); - let ip4_event = mgr.borrow_mut().event2index("BUILDIN_IP4_EVENT"); - let ip6_event = mgr.borrow_mut().event2index("BUILDIN_IP6_EVENT"); - let l4_event = mgr.borrow_mut().event2index("BUILDIN_L4_EVENT"); - let tcp_opening_event = mgr.borrow_mut().event2index("BUILDIN_TCP_OPENING_EVENT"); - let tcp_active_event = mgr.borrow_mut().event2index("BUILDIN_TCP_ACTIVE_EVENT"); - let tcp_closed_event = mgr.borrow_mut().event2index("BUILDIN_TCP_CLOSED_EVENT"); - let udp_event = mgr.borrow_mut().event2index("BUILDIN_UDP_EVENT"); - let l7_event = mgr.borrow_mut().event2index("BUILDIN_L7_EVENT"); - let dns_event = mgr.borrow_mut().event2index("BUILDIN_DNS_EVENT"); - let http_event = mgr.borrow_mut().event2index("BUILDIN_HTTP_EVENT"); - - mgr.borrow_mut() - .register(l2_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(l3_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(ip4_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(ip6_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(l4_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(tcp_opening_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(tcp_active_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(tcp_closed_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(udp_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(l7_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(dns_event, Box::new(plugin.clone())); - mgr.borrow_mut() - .register(http_event, Box::new(plugin.clone())); + let mut plugin = ExamplePulgin::new("Example Plugin", thread_ctx.clone()); + plugin.init(); // Handle packet let mut packet = Packet::new(&bytes, bytes.len() as u32); - let result = packet.handle(thread_ctx); + let result = packet.handle(thread_ctx.clone()); match result { Ok(_v) => { // println!("SUCCESS: {:?}, {:?}", packet, _v); @@ -280,7 +238,11 @@ mod tests { } // Dispatch event - mgr.borrow_mut().dispatch(&packet); + thread_ctx + .borrow_mut() + .get_event_mgr() + .borrow_mut() + .dispatch(&packet); // assert_eq!(1, 0); } diff --git a/src/main.rs b/src/main.rs index 1bd173b..a238862 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; use std::rc::Rc; -use stellar_rs::event::event::*; +use stellar_rs::event::manager::EventHandle; use stellar_rs::packet::capture::PacketCapture; use stellar_rs::packet::packet::Packet; use stellar_rs::plugin::example::ExamplePulgin; @@ -34,65 +34,9 @@ fn packet_callback(data: &[u8], len: u32, ctx: Rc>) { } fn main() { - let plugin = ExamplePulgin::new( - "Example Plugin", - Rc::new(RefCell::new(String::from("Hello"))), - ); - let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); - let mgr = thread_ctx.borrow_mut().get_event_mgr(); - - // let l2_event = mgr.borrow_mut().event2index(BUILDIN_L2_EVENT); - // mgr.borrow_mut() - // .register(l2_event, Box::new(plugin.clone())); - - // let l3_event = mgr.borrow_mut().event2index(BUILDIN_L3_EVENT); - // mgr.borrow_mut() - // .register(l3_event, Box::new(plugin.clone())); - - // let ip4_event = mgr.borrow_mut().event2index(BUILDIN_IP4_EVENT); - // mgr.borrow_mut() - // .register(ip4_event, Box::new(plugin.clone())); - - // let ip6_event = mgr.borrow_mut().event2index(BUILDIN_IP6_EVENT); - // mgr.borrow_mut() - // .register(ip6_event, Box::new(plugin.clone())); - - // let l4_event = mgr.borrow_mut().event2index(BUILDIN_L4_EVENT); - // mgr.borrow_mut() - // .register(l4_event, Box::new(plugin.clone())); - - let tcp_opening_event = mgr.borrow_mut().event2index(BUILDIN_TCP_OPENING_EVENT); - mgr.borrow_mut() - .register(tcp_opening_event, Box::new(plugin.clone())); - - let tcp_active_event = mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT); - mgr.borrow_mut() - .register(tcp_active_event, Box::new(plugin.clone())); - - let tcp_expire_event = mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT); - mgr.borrow_mut() - .register(tcp_expire_event, Box::new(plugin.clone())); - - let tcp_closed_event = mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT); - mgr.borrow_mut() - .register(tcp_closed_event, Box::new(plugin.clone())); - - // let udp_event = mgr.borrow_mut().event2index(BUILDIN_UDP_EVENT); - // mgr.borrow_mut() - // .register(udp_event, Box::new(plugin.clone())); - - // let l7_event = mgr.borrow_mut().event2index(BUILDIN_L7_EVENT); - // mgr.borrow_mut() - // .register(l7_event, Box::new(plugin.clone())); - - // let dns_event = mgr.borrow_mut().event2index(BUILDIN_DNS_EVENT); - // mgr.borrow_mut() - // .register(dns_event, Box::new(plugin.clone())); - - // let http_event = mgr.borrow_mut().event2index(BUILDIN_HTTP_EVENT); - // mgr.borrow_mut() - // .register(http_event, Box::new(plugin.clone())); + let mut plugin = ExamplePulgin::new("Example Plugin", thread_ctx.clone()); + plugin.init(); PacketCapture::show_devices(); let mut cap = PacketCapture::new("en0"); diff --git a/src/packet/packet.rs b/src/packet/packet.rs index 8b2d460..f80f4d6 100644 --- a/src/packet/packet.rs +++ b/src/packet/packet.rs @@ -391,7 +391,7 @@ impl Packet<'_> { } } -pub fn trace_id_reversed(trace_id: &String) -> String { +pub fn reverse_trace_id(trace_id: &String) -> String { let mut reversed_trace_id = String::new(); let mut trace_id_vec: Vec<&str> = trace_id.split(";").collect(); trace_id_vec.pop(); @@ -648,7 +648,7 @@ fn l7_identify(packet: &Packet) -> L7Protocol { mod tests { use super::Encapsulation; use super::Packet; - use crate::packet::packet::trace_id_reversed; + use crate::packet::packet::reverse_trace_id; use crate::protocol::ip::IPProtocol; use crate::protocol::ipv4::IPv4Header; use crate::protocol::ipv6::IPv6Header; @@ -1040,6 +1040,6 @@ mod tests { assert_eq!(packet.get_trace_id(), Some("IP4->IP4;192.168.0.101->121.14.154.93;TCP->TCP;50081->443;IP6->IP6;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string())); assert_eq!(packet.get_reversed_trace_id() ,Some("IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string())); - assert_eq!(trace_id_reversed(&packet.get_trace_id().unwrap()), "IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string()); + assert_eq!(reverse_trace_id(&packet.get_trace_id().unwrap()), "IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string()); } } diff --git a/src/plugin/example.rs b/src/plugin/example.rs index 68fd542..eb38c2c 100644 --- a/src/plugin/example.rs +++ b/src/plugin/example.rs @@ -1,50 +1,113 @@ +use crate::event::event::*; use crate::event::manager::EventHandle; use crate::packet::packet::Packet; use crate::session::session::Session; +use crate::thread::thread::ThreadContex; use std::cell::RefCell; use std::rc::Rc; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ExamplePulgin { plugin_name: &'static str, plugin_ctx: Rc>, - called_times: u64, + thread_ctx: Rc>, + tcp_opening_event: usize, + tcp_active_event: usize, + tcp_expire_event: usize, + tcp_closed_event: usize, + http_opening_event: usize, } impl ExamplePulgin { - pub fn new(plugin_name: &'static str, plugin_ctx: Rc>) -> ExamplePulgin { + pub fn new(plugin_name: &'static str, thread_ctx: Rc>) -> ExamplePulgin { ExamplePulgin { plugin_name, - plugin_ctx, - called_times: 0, + plugin_ctx: Rc::new(RefCell::new(String::new())), + thread_ctx: thread_ctx, + tcp_opening_event: 0, + tcp_active_event: 0, + tcp_expire_event: 0, + tcp_closed_event: 0, + http_opening_event: 0, } } + + pub fn get_thread_ctx(&self) -> Rc> { + self.thread_ctx.clone() + } } impl EventHandle for ExamplePulgin { - fn handle(&mut self, index: usize, packet: &Packet, session: Option>>) { - self.called_times += 1; - self.plugin_ctx.borrow_mut().clear(); - self.plugin_ctx.borrow_mut().push_str("1"); + fn init(&mut self) { + let thread_ctx = self.get_thread_ctx(); + let event_mgr = thread_ctx.borrow().get_event_mgr(); + + self.tcp_opening_event = event_mgr + .borrow_mut() + .event2index(BUILDIN_TCP_OPENING_EVENT); + event_mgr + .borrow_mut() + .register(self.tcp_opening_event, Box::new(self.clone())); + + self.tcp_active_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT); + event_mgr + .borrow_mut() + .register(self.tcp_active_event, Box::new(self.clone())); + + self.tcp_expire_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT); + event_mgr + .borrow_mut() + .register(self.tcp_expire_event, Box::new(self.clone())); - println!("{} handle event : {:?}", self.plugin_name, index); - println!( - "{} handle times : {:?}", - self.plugin_name, self.called_times - ); - println!( - "{} handle tuple : {:?}", - self.plugin_name, - packet.get_outer_tuple() - ); + self.tcp_closed_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT); + event_mgr + .borrow_mut() + .register(self.tcp_closed_event, Box::new(self.clone())); + + self.http_opening_event = event_mgr + .borrow_mut() + .event2index("USERDEF_HTTP_OPENING_EVENT"); + event_mgr + .borrow_mut() + .register(self.http_opening_event, Box::new(self.clone())); + } + fn handle(&mut self, index: usize, packet: &Packet, session: Option>>) { if session.is_none() { return; } + + self.plugin_ctx.borrow_mut().clear(); + self.plugin_ctx.borrow_mut().push_str("1"); + let session = session.unwrap(); - println!("{} handle session : {:?}", self.plugin_name, session); - session - .borrow_mut() - .set_session_exdata("example pulgin".to_string(), "success".to_string()); + if index == self.tcp_opening_event { + println!( + "{} handle BUILDIN_TCP_OPENING_EVENT: {:?}", + self.plugin_name, session + ); + } else if index == self.tcp_active_event { + println!( + "{} handle BUILDIN_TCP_ACTIVE_EVENT: {:?}", + self.plugin_name, session + ); + } else if index == self.tcp_expire_event { + println!( + "{} handle BUILDIN_TCP_EXPIRE_EVENT: {:?}", + self.plugin_name, session + ); + } else if index == self.tcp_closed_event { + println!( + "{} handle BUILDIN_TCP_CLOSED_EVENT: {:?}", + self.plugin_name, session + ); + } else if index == self.http_opening_event { + println!( + "{} handle USERDEF_HTTP_OPENING_EVENT: {:?}", + self.plugin_name, session + ); + } else { + println!("{} handle UNKNOWN_EVENT: {:?}", self.plugin_name, session); + } } } diff --git a/src/session/manager.rs b/src/session/manager.rs index 2a63bbf..60222d6 100644 --- a/src/session/manager.rs +++ b/src/session/manager.rs @@ -1,4 +1,4 @@ -use crate::packet::packet::trace_id_reversed; +use crate::session::session::reverse_session_id; use crate::session::session::Session; use crate::session::session::SessionProto; use crate::session::session::SessionState; @@ -33,7 +33,7 @@ impl SessionManager { return result; } - let reversed_id = trace_id_reversed(session_id); + let reversed_id = reverse_session_id(session_id); self.sessions .get(reversed_id.as_str()) .map(|session| session.clone()) @@ -49,7 +49,7 @@ impl SessionManager { return result; } - let reversed_id = trace_id_reversed(&session_id.to_string()); + let reversed_id = reverse_session_id(&session_id.to_string()); self.sessions.remove(reversed_id.as_str()) } @@ -64,7 +64,7 @@ impl SessionManager { for session_id in expired_sessions { let option = self.remove_session(&session_id); if let Some(session) = option { - print!("Session expired: {}", session_id); + println!("Session expired: {}", session_id); session .borrow_mut() .set_session_state(SessionState::Expired); diff --git a/src/session/session.rs b/src/session/session.rs index 41bd54a..adbdb64 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -1,3 +1,4 @@ +use crate::packet::packet::reverse_trace_id; use crate::packet::packet::Packet; use crate::session::manager::SessionManager; use chrono::Utc; @@ -99,6 +100,14 @@ impl Session { } } + pub fn get_session_id(&self) -> String { + self.session_id.clone() + } + + pub fn get_session_start_ts(&self) -> i64 { + self.session_timestamp.ts_start + } + pub fn set_session_proto(&mut self, proto: SessionProto) { self.session_proto = proto; } @@ -107,10 +116,6 @@ impl Session { self.session_proto } - pub fn get_session_id(&self) -> String { - self.session_id.clone() - } - pub fn set_session_state(&mut self, state: SessionState) { self.session_state = state; } @@ -132,6 +137,10 @@ impl Session { self.session_metrics_c2s.recv_bytes += recv_bytes; } + pub fn get_session_c2s_metrics(&self) -> SessionMetrics { + self.session_metrics_c2s + } + pub fn inc_session_s2c_metrics( &mut self, send_pkts: u64, @@ -145,18 +154,10 @@ impl Session { self.session_metrics_s2c.recv_bytes += recv_bytes; } - pub fn get_session_c2s_metrics(&self) -> SessionMetrics { - self.session_metrics_c2s - } - pub fn get_session_s2c_metrics(&self) -> SessionMetrics { self.session_metrics_s2c } - pub fn get_session_start_ts(&self) -> i64 { - self.session_timestamp.ts_start - } - pub fn update_session_expire_ts(&mut self) { self.session_timestamp.ts_expires = Utc::now().timestamp() + MAX_SESSION_EXPIRE_TIME; } @@ -199,9 +200,13 @@ impl Session { } /****************************************************************************** - * API Packet -> Session + * Utils API ******************************************************************************/ +pub fn reverse_session_id(session_id: &String) -> String { + reverse_trace_id(session_id) +} + pub fn packet2session( packet: &Packet, session_mgr: Rc>, -- cgit v1.2.3