diff options
| author | luwenpeng <[email protected]> | 2023-09-06 18:30:14 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2023-09-07 14:27:24 +0800 |
| commit | e16b028be675ee35f4d08521accbc52ee6e6b182 (patch) | |
| tree | 03ea80f57d4f2c2b98dd125fa9baa2cfd770dc88 /src/main.rs | |
| parent | 142e30257e2d852b381f1ca47257095b888627cd (diff) | |
[refactor] Decouple packets from sessions/events
Diffstat (limited to 'src/main.rs')
| -rw-r--r-- | src/main.rs | 102 |
1 files changed, 93 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs index a238862..c1286bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,88 @@ use std::cell::RefCell; use std::rc::Rc; +use stellar_rs::event::event::BuiltInEvent; use stellar_rs::event::manager::EventHandle; +use stellar_rs::event::manager::EventManager; use stellar_rs::packet::capture::PacketCapture; use stellar_rs::packet::packet::Packet; +use stellar_rs::packet::packet::PacketEvent; use stellar_rs::plugin::example::ExamplePulgin; +use stellar_rs::session::session::Session; +use stellar_rs::session::session::SessionProto; +use stellar_rs::session::session::SessionState; use stellar_rs::thread::thread::ThreadContex; -fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { +fn trigger_packet_event( + packet: &Packet, + session: Option<Rc<RefCell<Session>>>, + event_mgr: Rc<RefCell<EventManager>>, +) { + for packet_event in &packet.event { + match packet_event { + PacketEvent::L2_EVENT => { + BuiltInEvent::trigger_l2_event(event_mgr.clone(), session.clone()); + } + PacketEvent::L3_EVENT => { + BuiltInEvent::trigger_l3_event(event_mgr.clone(), session.clone()); + } + PacketEvent::IPV4_EVENT => { + BuiltInEvent::trigger_ip4_event(event_mgr.clone(), session.clone()); + } + PacketEvent::IPV6_EVENT => { + BuiltInEvent::trigger_ip6_event(event_mgr.clone(), session.clone()); + } + PacketEvent::L4_EVENT => { + BuiltInEvent::trigger_l4_event(event_mgr.clone(), session.clone()); + } + PacketEvent::TCP_EVENT => { + BuiltInEvent::trigger_tcp_event(event_mgr.clone(), session.clone()); + } + PacketEvent::UDP_EVENT => { + BuiltInEvent::trigger_udp_event(event_mgr.clone(), session.clone()); + } + } + } +} + +fn trigger_session_event( + session: Option<Rc<RefCell<Session>>>, + event_mgr: Rc<RefCell<EventManager>>, +) { + if session.is_none() { + return; + } + + let session_state = session.clone().unwrap().borrow_mut().get_session_state(); + let session_proto = session.clone().unwrap().borrow_mut().get_session_proto(); + + match session_state { + SessionState::New => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_opening_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_opening_event(event_mgr, session), + }, + SessionState::Active => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_active_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_active_event(event_mgr, session), + }, + SessionState::Inactive => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_closed_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session), + }, + SessionState::Expired => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_expire_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session), + }, + } +} + +fn capture_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { let event_mgr = ctx.borrow_mut().get_event_mgr(); let session_mgr = ctx.borrow_mut().get_session_mgr(); - let empty = Packet::new(b"", 0); let mut packet = Packet::new(data, len); - let result = packet.handle(ctx); + + // Handle Packet + let result = packet.handle(); match result { Ok(_left) => { // println!("SUCCESS: {:?}, {:?}", packet, left); @@ -23,14 +93,28 @@ fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { // println!("ERROR Data: {:?}", packet); // println!("ERROR Code: {:?}", e); println!("ERROR Desc: {}", e); + return; } } - // Hanlde Packet Event - event_mgr.borrow_mut().dispatch(&packet); - // Hanlde Expire Event - session_mgr.borrow_mut().expire_sessions(); - event_mgr.borrow_mut().dispatch(&empty); + // Packets have sessions, triggering both packet events and session events + if packet.get_inner_tuple().is_some() { + let flow_id = packet.get_flow_id().unwrap(); + let session = session_mgr.borrow_mut().update(flow_id); + trigger_packet_event(&packet, Some(session.clone()), event_mgr.clone()); + trigger_session_event(Some(session.clone()), event_mgr.clone()); + // Packets have no sessions, only packet events are triggered + } else { + trigger_packet_event(&packet, None, event_mgr.clone()); + } + + // Handle packet events and session events on the current package + event_mgr.borrow_mut().dispatch(Some(&packet)); + + // Handle expire events without packets + let session = session_mgr.borrow_mut().expire_oldest_session(); + trigger_session_event(session, event_mgr.clone()); + event_mgr.borrow_mut().dispatch(None); } fn main() { @@ -40,5 +124,5 @@ fn main() { PacketCapture::show_devices(); let mut cap = PacketCapture::new("en0"); - cap.poll_packet(packet_callback, thread_ctx); + cap.poll_packet(capture_callback, thread_ctx); } |
