summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2023-09-06 18:30:14 +0800
committerluwenpeng <[email protected]>2023-09-07 14:27:24 +0800
commite16b028be675ee35f4d08521accbc52ee6e6b182 (patch)
tree03ea80f57d4f2c2b98dd125fa9baa2cfd770dc88 /src/main.rs
parent142e30257e2d852b381f1ca47257095b888627cd (diff)
[refactor] Decouple packets from sessions/events
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs102
1 files changed, 93 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs
index a238862..c1286bd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,18 +1,88 @@
use std::cell::RefCell;
use std::rc::Rc;
+use stellar_rs::event::event::BuiltInEvent;
use stellar_rs::event::manager::EventHandle;
+use stellar_rs::event::manager::EventManager;
use stellar_rs::packet::capture::PacketCapture;
use stellar_rs::packet::packet::Packet;
+use stellar_rs::packet::packet::PacketEvent;
use stellar_rs::plugin::example::ExamplePulgin;
+use stellar_rs::session::session::Session;
+use stellar_rs::session::session::SessionProto;
+use stellar_rs::session::session::SessionState;
use stellar_rs::thread::thread::ThreadContex;
-fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
+fn trigger_packet_event(
+ packet: &Packet,
+ session: Option<Rc<RefCell<Session>>>,
+ event_mgr: Rc<RefCell<EventManager>>,
+) {
+ for packet_event in &packet.event {
+ match packet_event {
+ PacketEvent::L2_EVENT => {
+ BuiltInEvent::trigger_l2_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::L3_EVENT => {
+ BuiltInEvent::trigger_l3_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::IPV4_EVENT => {
+ BuiltInEvent::trigger_ip4_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::IPV6_EVENT => {
+ BuiltInEvent::trigger_ip6_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::L4_EVENT => {
+ BuiltInEvent::trigger_l4_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::TCP_EVENT => {
+ BuiltInEvent::trigger_tcp_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::UDP_EVENT => {
+ BuiltInEvent::trigger_udp_event(event_mgr.clone(), session.clone());
+ }
+ }
+ }
+}
+
+fn trigger_session_event(
+ session: Option<Rc<RefCell<Session>>>,
+ event_mgr: Rc<RefCell<EventManager>>,
+) {
+ if session.is_none() {
+ return;
+ }
+
+ let session_state = session.clone().unwrap().borrow_mut().get_session_state();
+ let session_proto = session.clone().unwrap().borrow_mut().get_session_proto();
+
+ match session_state {
+ SessionState::New => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_opening_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_opening_event(event_mgr, session),
+ },
+ SessionState::Active => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_active_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_active_event(event_mgr, session),
+ },
+ SessionState::Inactive => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_closed_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session),
+ },
+ SessionState::Expired => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_expire_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session),
+ },
+ }
+}
+
+fn capture_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
let event_mgr = ctx.borrow_mut().get_event_mgr();
let session_mgr = ctx.borrow_mut().get_session_mgr();
- let empty = Packet::new(b"", 0);
let mut packet = Packet::new(data, len);
- let result = packet.handle(ctx);
+
+ // Handle Packet
+ let result = packet.handle();
match result {
Ok(_left) => {
// println!("SUCCESS: {:?}, {:?}", packet, left);
@@ -23,14 +93,28 @@ fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
// println!("ERROR Data: {:?}", packet);
// println!("ERROR Code: {:?}", e);
println!("ERROR Desc: {}", e);
+ return;
}
}
- // Hanlde Packet Event
- event_mgr.borrow_mut().dispatch(&packet);
- // Hanlde Expire Event
- session_mgr.borrow_mut().expire_sessions();
- event_mgr.borrow_mut().dispatch(&empty);
+ // Packets have sessions, triggering both packet events and session events
+ if packet.get_inner_tuple().is_some() {
+ let flow_id = packet.get_flow_id().unwrap();
+ let session = session_mgr.borrow_mut().update(flow_id);
+ trigger_packet_event(&packet, Some(session.clone()), event_mgr.clone());
+ trigger_session_event(Some(session.clone()), event_mgr.clone());
+ // Packets have no sessions, only packet events are triggered
+ } else {
+ trigger_packet_event(&packet, None, event_mgr.clone());
+ }
+
+ // Handle packet events and session events on the current package
+ event_mgr.borrow_mut().dispatch(Some(&packet));
+
+ // Handle expire events without packets
+ let session = session_mgr.borrow_mut().expire_oldest_session();
+ trigger_session_event(session, event_mgr.clone());
+ event_mgr.borrow_mut().dispatch(None);
}
fn main() {
@@ -40,5 +124,5 @@ fn main() {
PacketCapture::show_devices();
let mut cap = PacketCapture::new("en0");
- cap.poll_packet(packet_callback, thread_ctx);
+ cap.poll_packet(capture_callback, thread_ctx);
}