use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; use stellar_rs::event::event::Event; use stellar_rs::event::manager::EventHandle; use stellar_rs::event::manager::EventManager; use stellar_rs::event::manager::EventQueue; use stellar_rs::get_innermost_special_encapsulation; use stellar_rs::packet::capture::PacketCapture; use stellar_rs::packet::packet::Encapsulation; use stellar_rs::packet::packet::Packet; use stellar_rs::packet::status::PacketStatus; use stellar_rs::plugin::example::ExamplePulgin; use stellar_rs::session::session::Session; use stellar_rs::session::session::SessionProto; use stellar_rs::session::session::SessionState; use stellar_rs::thread::thread::ThreadContext; fn trigger_packet_event( packet: &Packet, session: Option>>, queue: &mut EventQueue, ) { let num = packet.encapsulation.len(); for i in 0..num { match packet.encapsulation[i] { Encapsulation::ETH(_, _) => { // TODO } Encapsulation::VLAN(_, _) => { // TODO } Encapsulation::MPLS(_, _) => { // TODO } Encapsulation::PWETH(_, _) => { // TODO } Encapsulation::IPv4(_, _) => { queue.add(Event::IPv4Event, session.clone()); } Encapsulation::IPv6(_, _) => { queue.add(Event::IPv6Event, session.clone()); } Encapsulation::GREv0(_, _) => { // TODO } Encapsulation::GREv1(_, _) => { // TODO } Encapsulation::TCP(_, _) => { queue.add(Event::TCPEvent, session.clone()); } Encapsulation::UDP(_, _) => { queue.add(Event::UDPEvent, session.clone()); } Encapsulation::ICMP(_, _) => { // TODO } Encapsulation::ICMPv6(_, _) => { // TODO } Encapsulation::GTPv1(_, _) => { // TODO } Encapsulation::L2TP(_, _) => { // TODO } Encapsulation::PPTP(_, _) => { // TODO } Encapsulation::PPP(_, _) => { // TODO } Encapsulation::PPPoE(_, _) => { // TODO } } } } fn trigger_session_event(session: Option>>, queue: &mut EventQueue) { if session.is_none() { return; } let session_state = session.clone().unwrap().borrow_mut().get_session_state(); let session_proto = session.clone().unwrap().borrow_mut().get_session_proto(); match session_state { SessionState::New => match session_proto { SessionProto::TCP => { queue.add(Event::TCPOpeningEvent, session); } SessionProto::UDP => { queue.add(Event::UDPOpeningEvent, session); } }, SessionState::Active => match session_proto { SessionProto::TCP => { queue.add(Event::TCPActiveEvent, session); } SessionProto::UDP => { queue.add(Event::UDPActiveEvent, session); } }, SessionState::Inactive => match session_proto { SessionProto::TCP => { queue.add(Event::TCPClosedEvent, session); } SessionProto::UDP => { queue.add(Event::UDPExpireEvent, session); } }, SessionState::Expired => match session_proto { SessionProto::TCP => { queue.add(Event::TCPExpireEvent, session); } SessionProto::UDP => { queue.add(Event::UDPExpireEvent, session); } }, } } fn handle_one_packet(mut packet: Packet, thread_ctx: &mut ThreadContext) { let event_mgr = &thread_ctx.event_mgr; let session_mgr = &mut thread_ctx.session_mgr; let packet_metrics = &mut thread_ctx.packet_metrics; let mut queue = EventQueue::new(); let result = packet.handle(); match result { Ok(_) => { packet_metrics.add(PacketStatus::Normal); // println!("Ok Packet: {:?}", packet); } Err(e) => { packet_metrics.add(e); println!("Unexpected Packet: {:?} {:?}", e, packet); return; } } match get_innermost_special_encapsulation!(packet, TCP | UDP) { Some(_) => { let flow_id = packet.get_flow_id().unwrap(); let session = session_mgr.update(flow_id); trigger_packet_event(&packet, Some(session.clone()), &mut queue); trigger_session_event(Some(session.clone()), &mut queue); } None => { trigger_packet_event(&packet, None, &mut queue); } } event_mgr.dispatch(Some(&packet), &mut queue); let session = session_mgr.expire_oldest_session(); trigger_session_event(session, &mut queue); event_mgr.dispatch(None, &mut queue); } fn main() { let mut event_mgr = EventManager::new(); let mut plugin1 = ExamplePulgin::new("Plugin1"); let mut plugin2 = ExamplePulgin::new("Plugin2"); plugin1.init(&mut event_mgr); plugin2.init(&mut event_mgr); let event_mgr = Arc::new(event_mgr); let mut thread0_ctx = ThreadContext::new(0, event_mgr.clone()); // let mut thread1_ctx = ThreadContext::new(1, event_mgr.clone()); run_capture_mode("en0", &mut thread0_ctx); } fn run_capture_mode(device: &str, thread_ctx: &mut ThreadContext) { let mut capture = PacketCapture::new(device); loop { match capture.next() { Some(packet) => { handle_one_packet(packet, thread_ctx); } None => { // do something } } } }