summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2023-09-18 14:47:10 +0800
committerluwenpeng <[email protected]>2023-09-18 17:52:39 +0800
commitcb674f9e168b6e709136e17a5bc87d3925c6f479 (patch)
treea735d755c1b653ae59b0b47d05ed720aa573f0b6 /src/main.rs
parent9387c343d38c00efb432cfb419a3c669f4d65b3a (diff)
[refactor] Event manager: Support triggering new events in event handle, Solve the problem of multiple borrowing
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs113
1 files changed, 62 insertions, 51 deletions
diff --git a/src/main.rs b/src/main.rs
index 49a0728..81ef6f4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,8 +1,9 @@
use std::cell::RefCell;
use std::rc::Rc;
-use stellar_rs::event::event::BuiltInEvent;
+use stellar_rs::event::event::Event;
use stellar_rs::event::manager::EventHandle;
use stellar_rs::event::manager::EventManager;
+use stellar_rs::event::manager::EventQueue;
use stellar_rs::packet::capture::PacketCapture;
use stellar_rs::packet::packet::Encapsulation;
use stellar_rs::packet::packet::Packet;
@@ -12,61 +13,55 @@ use stellar_rs::session::session::SessionProto;
use stellar_rs::session::session::SessionState;
use stellar_rs::thread::thread::ThreadContex;
-fn trigger_event_by_packet(
+fn trigger_packet_event(
packet: &Packet,
session: Option<Rc<RefCell<Session>>>,
- event_mgr: Rc<RefCell<EventManager>>,
+ queue: &mut EventQueue,
) {
let num = packet.encapsulation.len();
for i in 0..num {
match packet.encapsulation[i] {
- Encapsulation::L2_ETH(_, _) => {
+ Encapsulation::Eth(_, _) => {
// TODO
}
- Encapsulation::L2_VLAN(_, _) => {
+ Encapsulation::Vlan(_, _) => {
// TODO
}
- Encapsulation::L2_MPLS(_, _) => {
+ Encapsulation::Mpls(_, _) => {
// TODO
}
- Encapsulation::L2_PWETH(_, _) => {
+ Encapsulation::PwEth(_, _) => {
// TODO
}
- Encapsulation::L3_IPV4(_, _) => {
- BuiltInEvent::trigger_ipv4_event(event_mgr.clone(), session.clone());
+ Encapsulation::Ipv4(_, _) => {
+ queue.add(Event::Ipv4Event, session.clone());
}
- Encapsulation::L3_IPV6(_, _) => {
- BuiltInEvent::trigger_ipv6_event(event_mgr.clone(), session.clone());
+ Encapsulation::Ipv6(_, _) => {
+ queue.add(Event::Ipv6Event, session.clone());
}
- Encapsulation::L4_TCP(_, _) => {
- BuiltInEvent::trigger_tcp_event(event_mgr.clone(), session.clone());
+ Encapsulation::Tcp(_, _) => {
+ queue.add(Event::TcpEvent, session.clone());
}
- Encapsulation::L4_UDP(_, _) => {
- BuiltInEvent::trigger_udp_event(event_mgr.clone(), session.clone());
+ Encapsulation::Udp(_, _) => {
+ queue.add(Event::UdpEvent, session.clone());
}
- Encapsulation::L4_ICMP(_, _) => {
+ Encapsulation::Icmp(_, _) => {
// TODO
}
- Encapsulation::L4_ICMPV6(_, _) => {
+ Encapsulation::Icmpv6(_, _) => {
// TODO
}
- Encapsulation::LTUN_GTPV1_C(_, _) => {
+ Encapsulation::Gtpv1(_, _) => {
// TODO
}
- Encapsulation::LTUN_L2TP(_, _) => {
- // TODO
- }
- Encapsulation::UNSUPPORTED(_) => {
+ Encapsulation::L2tp(_, _) => {
// TODO
}
}
}
}
-fn trigger_event_by_session(
- session: Option<Rc<RefCell<Session>>>,
- event_mgr: Rc<RefCell<EventManager>>,
-) {
+fn trigger_session_event(session: Option<Rc<RefCell<Session>>>, queue: &mut EventQueue) {
if session.is_none() {
return;
}
@@ -76,31 +71,47 @@ fn trigger_event_by_session(
match session_state {
SessionState::New => match session_proto {
- SessionProto::TCP => BuiltInEvent::trigger_tcp_opening_event(event_mgr, session),
- SessionProto::UDP => BuiltInEvent::trigger_udp_opening_event(event_mgr, session),
+ SessionProto::TCP => {
+ queue.add(Event::TcpOpeningEvent, session);
+ }
+ SessionProto::UDP => {
+ queue.add(Event::UdpOpeningEvent, session);
+ }
},
SessionState::Active => match session_proto {
- SessionProto::TCP => BuiltInEvent::trigger_tcp_active_event(event_mgr, session),
- SessionProto::UDP => BuiltInEvent::trigger_udp_active_event(event_mgr, session),
+ SessionProto::TCP => {
+ queue.add(Event::TcpActiveEvent, session);
+ }
+ SessionProto::UDP => {
+ queue.add(Event::UdpActiveEvent, session);
+ }
},
SessionState::Inactive => match session_proto {
- SessionProto::TCP => BuiltInEvent::trigger_tcp_closed_event(event_mgr, session),
- SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session),
+ SessionProto::TCP => {
+ queue.add(Event::TcpClosedEvent, session);
+ }
+ SessionProto::UDP => {
+ queue.add(Event::UdpExpireEvent, session);
+ }
},
SessionState::Expired => match session_proto {
- SessionProto::TCP => BuiltInEvent::trigger_tcp_expire_event(event_mgr, session),
- SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session),
+ SessionProto::TCP => {
+ queue.add(Event::TcpExpireEvent, session);
+ }
+ SessionProto::UDP => {
+ queue.add(Event::UdpExpireEvent, session);
+ }
},
}
}
fn handle_one_packet(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
- let event_mgr = ctx.borrow_mut().get_event_mgr();
- let session_mgr = ctx.borrow_mut().get_session_mgr();
+ let event_mgr = ctx.borrow().get_event_mgr();
+ let session_mgr = ctx.borrow().get_session_mgr();
+ let mut queue = EventQueue::new();
let mut packet = Packet::new(data, len);
- // Handle Packet
let result = packet.handle();
match result {
Ok(_left) => {
@@ -116,30 +127,30 @@ fn handle_one_packet(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
}
}
- // Packets have sessions, triggering both packet events and session events
if packet.get_inner_tuple().is_some() {
let flow_id = packet.get_flow_id().unwrap();
let session = session_mgr.borrow_mut().update(flow_id);
- trigger_event_by_packet(&packet, Some(session.clone()), event_mgr.clone());
- trigger_event_by_session(Some(session.clone()), event_mgr.clone());
- // Packets have no sessions, only packet events are triggered
+ trigger_packet_event(&packet, Some(session.clone()), &mut queue);
+ trigger_session_event(Some(session.clone()), &mut queue);
} else {
- trigger_event_by_packet(&packet, None, event_mgr.clone());
+ trigger_packet_event(&packet, None, &mut queue);
}
+ event_mgr.borrow_mut().dispatch(Some(&packet), &mut queue);
- // Handle packet events and session events on the current package
- event_mgr.borrow_mut().dispatch(Some(&packet));
-
- // Handle expire events without packets
let session = session_mgr.borrow_mut().expire_oldest_session();
- trigger_event_by_session(session, event_mgr.clone());
- event_mgr.borrow_mut().dispatch(None);
+ trigger_session_event(session, &mut queue);
+ event_mgr.borrow_mut().dispatch(None, &mut queue);
}
fn main() {
- let thread_ctx = Rc::new(RefCell::new(ThreadContex::new()));
- let mut plugin = ExamplePulgin::new("Example Plugin", thread_ctx.clone());
- plugin.init();
+ let mut event_mgr = EventManager::new();
+ let mut plugin1 = ExamplePulgin::new("Plugin1");
+ let mut plugin2 = ExamplePulgin::new("Plugin2");
+ plugin1.init(&mut event_mgr);
+ plugin2.init(&mut event_mgr);
+
+ let event_mgr = Rc::new(RefCell::new(event_mgr));
+ let thread_ctx = Rc::new(RefCell::new(ThreadContex::new(event_mgr)));
PacketCapture::show_devices();
let mut cap = PacketCapture::new("en0");