summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2023-09-01 14:40:25 +0800
committerluwenpeng <[email protected]>2023-09-06 18:13:26 +0800
commit142e30257e2d852b381f1ca47257095b888627cd (patch)
tree9f8515db301611ad2dae41b5437b1fe43308e8d5
parentccf658df8cd062fedb66bb64d3c5a25b2d847f25 (diff)
[refactor] Event registration is moved to Plugin's init function
-rw-r--r--src/event/manager.rs58
-rw-r--r--src/main.rs62
-rw-r--r--src/packet/packet.rs6
-rw-r--r--src/plugin/example.rs109
-rw-r--r--src/session/manager.rs8
-rw-r--r--src/session/session.rs31
6 files changed, 124 insertions, 150 deletions
diff --git a/src/event/manager.rs b/src/event/manager.rs
index 587a00b..64975d6 100644
--- a/src/event/manager.rs
+++ b/src/event/manager.rs
@@ -6,6 +6,7 @@ use std::collections::VecDeque;
use std::rc::Rc;
pub trait EventHandle {
+ fn init(&mut self);
fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>);
}
@@ -83,6 +84,7 @@ impl EventManager {
#[cfg(test)]
mod tests {
+ use crate::event::manager::EventHandle;
use crate::packet::packet::Packet;
use crate::plugin::example::ExamplePulgin;
use crate::thread::thread::ThreadContex;
@@ -214,57 +216,13 @@ mod tests {
];
// Create plugin
- let plugin = ExamplePulgin::new(
- "Example Plugin",
- Rc::new(RefCell::new(String::from("Hello"))),
- );
-
- // Create event manager
let thread_ctx = Rc::new(RefCell::new(ThreadContex::new()));
- let mgr = thread_ctx.borrow_mut().get_event_mgr();
-
- // Register event
- let l2_event = mgr.borrow_mut().event2index("BUILDIN_L2_EVENT");
- let l3_event = mgr.borrow_mut().event2index("BUILDIN_L3_EVENT");
- let ip4_event = mgr.borrow_mut().event2index("BUILDIN_IP4_EVENT");
- let ip6_event = mgr.borrow_mut().event2index("BUILDIN_IP6_EVENT");
- let l4_event = mgr.borrow_mut().event2index("BUILDIN_L4_EVENT");
- let tcp_opening_event = mgr.borrow_mut().event2index("BUILDIN_TCP_OPENING_EVENT");
- let tcp_active_event = mgr.borrow_mut().event2index("BUILDIN_TCP_ACTIVE_EVENT");
- let tcp_closed_event = mgr.borrow_mut().event2index("BUILDIN_TCP_CLOSED_EVENT");
- let udp_event = mgr.borrow_mut().event2index("BUILDIN_UDP_EVENT");
- let l7_event = mgr.borrow_mut().event2index("BUILDIN_L7_EVENT");
- let dns_event = mgr.borrow_mut().event2index("BUILDIN_DNS_EVENT");
- let http_event = mgr.borrow_mut().event2index("BUILDIN_HTTP_EVENT");
-
- mgr.borrow_mut()
- .register(l2_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(l3_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(ip4_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(ip6_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(l4_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(tcp_opening_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(tcp_active_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(tcp_closed_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(udp_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(l7_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(dns_event, Box::new(plugin.clone()));
- mgr.borrow_mut()
- .register(http_event, Box::new(plugin.clone()));
+ let mut plugin = ExamplePulgin::new("Example Plugin", thread_ctx.clone());
+ plugin.init();
// Handle packet
let mut packet = Packet::new(&bytes, bytes.len() as u32);
- let result = packet.handle(thread_ctx);
+ let result = packet.handle(thread_ctx.clone());
match result {
Ok(_v) => {
// println!("SUCCESS: {:?}, {:?}", packet, _v);
@@ -280,7 +238,11 @@ mod tests {
}
// Dispatch event
- mgr.borrow_mut().dispatch(&packet);
+ thread_ctx
+ .borrow_mut()
+ .get_event_mgr()
+ .borrow_mut()
+ .dispatch(&packet);
// assert_eq!(1, 0);
}
diff --git a/src/main.rs b/src/main.rs
index 1bd173b..a238862 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,6 @@
use std::cell::RefCell;
use std::rc::Rc;
-use stellar_rs::event::event::*;
+use stellar_rs::event::manager::EventHandle;
use stellar_rs::packet::capture::PacketCapture;
use stellar_rs::packet::packet::Packet;
use stellar_rs::plugin::example::ExamplePulgin;
@@ -34,65 +34,9 @@ fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
}
fn main() {
- let plugin = ExamplePulgin::new(
- "Example Plugin",
- Rc::new(RefCell::new(String::from("Hello"))),
- );
-
let thread_ctx = Rc::new(RefCell::new(ThreadContex::new()));
- let mgr = thread_ctx.borrow_mut().get_event_mgr();
-
- // let l2_event = mgr.borrow_mut().event2index(BUILDIN_L2_EVENT);
- // mgr.borrow_mut()
- // .register(l2_event, Box::new(plugin.clone()));
-
- // let l3_event = mgr.borrow_mut().event2index(BUILDIN_L3_EVENT);
- // mgr.borrow_mut()
- // .register(l3_event, Box::new(plugin.clone()));
-
- // let ip4_event = mgr.borrow_mut().event2index(BUILDIN_IP4_EVENT);
- // mgr.borrow_mut()
- // .register(ip4_event, Box::new(plugin.clone()));
-
- // let ip6_event = mgr.borrow_mut().event2index(BUILDIN_IP6_EVENT);
- // mgr.borrow_mut()
- // .register(ip6_event, Box::new(plugin.clone()));
-
- // let l4_event = mgr.borrow_mut().event2index(BUILDIN_L4_EVENT);
- // mgr.borrow_mut()
- // .register(l4_event, Box::new(plugin.clone()));
-
- let tcp_opening_event = mgr.borrow_mut().event2index(BUILDIN_TCP_OPENING_EVENT);
- mgr.borrow_mut()
- .register(tcp_opening_event, Box::new(plugin.clone()));
-
- let tcp_active_event = mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT);
- mgr.borrow_mut()
- .register(tcp_active_event, Box::new(plugin.clone()));
-
- let tcp_expire_event = mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT);
- mgr.borrow_mut()
- .register(tcp_expire_event, Box::new(plugin.clone()));
-
- let tcp_closed_event = mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT);
- mgr.borrow_mut()
- .register(tcp_closed_event, Box::new(plugin.clone()));
-
- // let udp_event = mgr.borrow_mut().event2index(BUILDIN_UDP_EVENT);
- // mgr.borrow_mut()
- // .register(udp_event, Box::new(plugin.clone()));
-
- // let l7_event = mgr.borrow_mut().event2index(BUILDIN_L7_EVENT);
- // mgr.borrow_mut()
- // .register(l7_event, Box::new(plugin.clone()));
-
- // let dns_event = mgr.borrow_mut().event2index(BUILDIN_DNS_EVENT);
- // mgr.borrow_mut()
- // .register(dns_event, Box::new(plugin.clone()));
-
- // let http_event = mgr.borrow_mut().event2index(BUILDIN_HTTP_EVENT);
- // mgr.borrow_mut()
- // .register(http_event, Box::new(plugin.clone()));
+ let mut plugin = ExamplePulgin::new("Example Plugin", thread_ctx.clone());
+ plugin.init();
PacketCapture::show_devices();
let mut cap = PacketCapture::new("en0");
diff --git a/src/packet/packet.rs b/src/packet/packet.rs
index 8b2d460..f80f4d6 100644
--- a/src/packet/packet.rs
+++ b/src/packet/packet.rs
@@ -391,7 +391,7 @@ impl Packet<'_> {
}
}
-pub fn trace_id_reversed(trace_id: &String) -> String {
+pub fn reverse_trace_id(trace_id: &String) -> String {
let mut reversed_trace_id = String::new();
let mut trace_id_vec: Vec<&str> = trace_id.split(";").collect();
trace_id_vec.pop();
@@ -648,7 +648,7 @@ fn l7_identify(packet: &Packet) -> L7Protocol {
mod tests {
use super::Encapsulation;
use super::Packet;
- use crate::packet::packet::trace_id_reversed;
+ use crate::packet::packet::reverse_trace_id;
use crate::protocol::ip::IPProtocol;
use crate::protocol::ipv4::IPv4Header;
use crate::protocol::ipv6::IPv6Header;
@@ -1040,6 +1040,6 @@ mod tests {
assert_eq!(packet.get_trace_id(), Some("IP4->IP4;192.168.0.101->121.14.154.93;TCP->TCP;50081->443;IP6->IP6;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string()));
assert_eq!(packet.get_reversed_trace_id() ,Some("IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string()));
- assert_eq!(trace_id_reversed(&packet.get_trace_id().unwrap()), "IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string());
+ assert_eq!(reverse_trace_id(&packet.get_trace_id().unwrap()), "IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string());
}
}
diff --git a/src/plugin/example.rs b/src/plugin/example.rs
index 68fd542..eb38c2c 100644
--- a/src/plugin/example.rs
+++ b/src/plugin/example.rs
@@ -1,50 +1,113 @@
+use crate::event::event::*;
use crate::event::manager::EventHandle;
use crate::packet::packet::Packet;
use crate::session::session::Session;
+use crate::thread::thread::ThreadContex;
use std::cell::RefCell;
use std::rc::Rc;
-#[derive(Debug, Clone)]
+#[derive(Clone)]
pub struct ExamplePulgin {
plugin_name: &'static str,
plugin_ctx: Rc<RefCell<String>>,
- called_times: u64,
+ thread_ctx: Rc<RefCell<ThreadContex>>,
+ tcp_opening_event: usize,
+ tcp_active_event: usize,
+ tcp_expire_event: usize,
+ tcp_closed_event: usize,
+ http_opening_event: usize,
}
impl ExamplePulgin {
- pub fn new(plugin_name: &'static str, plugin_ctx: Rc<RefCell<String>>) -> ExamplePulgin {
+ pub fn new(plugin_name: &'static str, thread_ctx: Rc<RefCell<ThreadContex>>) -> ExamplePulgin {
ExamplePulgin {
plugin_name,
- plugin_ctx,
- called_times: 0,
+ plugin_ctx: Rc::new(RefCell::new(String::new())),
+ thread_ctx: thread_ctx,
+ tcp_opening_event: 0,
+ tcp_active_event: 0,
+ tcp_expire_event: 0,
+ tcp_closed_event: 0,
+ http_opening_event: 0,
}
}
+
+ pub fn get_thread_ctx(&self) -> Rc<RefCell<ThreadContex>> {
+ self.thread_ctx.clone()
+ }
}
impl EventHandle for ExamplePulgin {
- fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>) {
- self.called_times += 1;
- self.plugin_ctx.borrow_mut().clear();
- self.plugin_ctx.borrow_mut().push_str("1");
+ fn init(&mut self) {
+ let thread_ctx = self.get_thread_ctx();
+ let event_mgr = thread_ctx.borrow().get_event_mgr();
+
+ self.tcp_opening_event = event_mgr
+ .borrow_mut()
+ .event2index(BUILDIN_TCP_OPENING_EVENT);
+ event_mgr
+ .borrow_mut()
+ .register(self.tcp_opening_event, Box::new(self.clone()));
+
+ self.tcp_active_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT);
+ event_mgr
+ .borrow_mut()
+ .register(self.tcp_active_event, Box::new(self.clone()));
+
+ self.tcp_expire_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT);
+ event_mgr
+ .borrow_mut()
+ .register(self.tcp_expire_event, Box::new(self.clone()));
- println!("{} handle event : {:?}", self.plugin_name, index);
- println!(
- "{} handle times : {:?}",
- self.plugin_name, self.called_times
- );
- println!(
- "{} handle tuple : {:?}",
- self.plugin_name,
- packet.get_outer_tuple()
- );
+ self.tcp_closed_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT);
+ event_mgr
+ .borrow_mut()
+ .register(self.tcp_closed_event, Box::new(self.clone()));
+
+ self.http_opening_event = event_mgr
+ .borrow_mut()
+ .event2index("USERDEF_HTTP_OPENING_EVENT");
+ event_mgr
+ .borrow_mut()
+ .register(self.http_opening_event, Box::new(self.clone()));
+ }
+ fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>) {
if session.is_none() {
return;
}
+
+ self.plugin_ctx.borrow_mut().clear();
+ self.plugin_ctx.borrow_mut().push_str("1");
+
let session = session.unwrap();
- println!("{} handle session : {:?}", self.plugin_name, session);
- session
- .borrow_mut()
- .set_session_exdata("example pulgin".to_string(), "success".to_string());
+ if index == self.tcp_opening_event {
+ println!(
+ "{} handle BUILDIN_TCP_OPENING_EVENT: {:?}",
+ self.plugin_name, session
+ );
+ } else if index == self.tcp_active_event {
+ println!(
+ "{} handle BUILDIN_TCP_ACTIVE_EVENT: {:?}",
+ self.plugin_name, session
+ );
+ } else if index == self.tcp_expire_event {
+ println!(
+ "{} handle BUILDIN_TCP_EXPIRE_EVENT: {:?}",
+ self.plugin_name, session
+ );
+ } else if index == self.tcp_closed_event {
+ println!(
+ "{} handle BUILDIN_TCP_CLOSED_EVENT: {:?}",
+ self.plugin_name, session
+ );
+ } else if index == self.http_opening_event {
+ println!(
+ "{} handle USERDEF_HTTP_OPENING_EVENT: {:?}",
+ self.plugin_name, session
+ );
+ } else {
+ println!("{} handle UNKNOWN_EVENT: {:?}", self.plugin_name, session);
+ }
}
}
diff --git a/src/session/manager.rs b/src/session/manager.rs
index 2a63bbf..60222d6 100644
--- a/src/session/manager.rs
+++ b/src/session/manager.rs
@@ -1,4 +1,4 @@
-use crate::packet::packet::trace_id_reversed;
+use crate::session::session::reverse_session_id;
use crate::session::session::Session;
use crate::session::session::SessionProto;
use crate::session::session::SessionState;
@@ -33,7 +33,7 @@ impl SessionManager {
return result;
}
- let reversed_id = trace_id_reversed(session_id);
+ let reversed_id = reverse_session_id(session_id);
self.sessions
.get(reversed_id.as_str())
.map(|session| session.clone())
@@ -49,7 +49,7 @@ impl SessionManager {
return result;
}
- let reversed_id = trace_id_reversed(&session_id.to_string());
+ let reversed_id = reverse_session_id(&session_id.to_string());
self.sessions.remove(reversed_id.as_str())
}
@@ -64,7 +64,7 @@ impl SessionManager {
for session_id in expired_sessions {
let option = self.remove_session(&session_id);
if let Some(session) = option {
- print!("Session expired: {}", session_id);
+ println!("Session expired: {}", session_id);
session
.borrow_mut()
.set_session_state(SessionState::Expired);
diff --git a/src/session/session.rs b/src/session/session.rs
index 41bd54a..adbdb64 100644
--- a/src/session/session.rs
+++ b/src/session/session.rs
@@ -1,3 +1,4 @@
+use crate::packet::packet::reverse_trace_id;
use crate::packet::packet::Packet;
use crate::session::manager::SessionManager;
use chrono::Utc;
@@ -99,6 +100,14 @@ impl Session {
}
}
+ pub fn get_session_id(&self) -> String {
+ self.session_id.clone()
+ }
+
+ pub fn get_session_start_ts(&self) -> i64 {
+ self.session_timestamp.ts_start
+ }
+
pub fn set_session_proto(&mut self, proto: SessionProto) {
self.session_proto = proto;
}
@@ -107,10 +116,6 @@ impl Session {
self.session_proto
}
- pub fn get_session_id(&self) -> String {
- self.session_id.clone()
- }
-
pub fn set_session_state(&mut self, state: SessionState) {
self.session_state = state;
}
@@ -132,6 +137,10 @@ impl Session {
self.session_metrics_c2s.recv_bytes += recv_bytes;
}
+ pub fn get_session_c2s_metrics(&self) -> SessionMetrics {
+ self.session_metrics_c2s
+ }
+
pub fn inc_session_s2c_metrics(
&mut self,
send_pkts: u64,
@@ -145,18 +154,10 @@ impl Session {
self.session_metrics_s2c.recv_bytes += recv_bytes;
}
- pub fn get_session_c2s_metrics(&self) -> SessionMetrics {
- self.session_metrics_c2s
- }
-
pub fn get_session_s2c_metrics(&self) -> SessionMetrics {
self.session_metrics_s2c
}
- pub fn get_session_start_ts(&self) -> i64 {
- self.session_timestamp.ts_start
- }
-
pub fn update_session_expire_ts(&mut self) {
self.session_timestamp.ts_expires = Utc::now().timestamp() + MAX_SESSION_EXPIRE_TIME;
}
@@ -199,9 +200,13 @@ impl Session {
}
/******************************************************************************
- * API Packet -> Session
+ * Utils API
******************************************************************************/
+pub fn reverse_session_id(session_id: &String) -> String {
+ reverse_trace_id(session_id)
+}
+
pub fn packet2session(
packet: &Packet,
session_mgr: Rc<RefCell<SessionManager>>,