use futures::StreamExt; use pcap::Packet as PacketPacp; use std::borrow::BorrowMut; use std::cell::RefCell; use std::rc::Rc; use stellar_rs::session::session::Session; use stellar_rs::event::executor::{Executor, EX}; use stellar_rs::packet::capture::{BoxCodec, PacketCapture}; use stellar_rs::packet::packet::Packet; use stellar_rs::plugin::example::ExamplePulgin; fn main() { let ex = Executor::new(); ex.block_on(test1); } // async fn server() { // PacketCapture::show_devices(); // let mut cap = PacketCapture::new("eth0"); // let mut packet_num = 0; // while let Ok(packet) = cap.capture.next_packet() { // packet_num += 1; // println!("\n==================== New Packet ====================\n"); // println!("Packet[{}]->header : {:?}", packet_num, packet.header); // println!("Packet[{}]->data : {:?}", packet_num, packet.data); // let mut packet = Packet::new(&packet.data, packet.header.len); // let result = packet.handle(); // match result { // Ok(session_rc) => { // // need session // print!("session: {:?}", session_rc); // // println!("SUCCESS: {:?}, {:?}", packet, left); // // println!("SUCCESS: {:#?}, {:?}", packet, left); // // dbg!(packet); // // Executor::spawn(ExamplePulgin::new( // // "Example Plugin", // // &packet, // // session_rc.clone(), // // )); // ExamplePulgin::new("Example Plugin", Rc::new(RefCell::new(packet)), session_rc.clone()).await; // // Executor::spawn(ExamplePulgin::new( // // "Example Plugin", // // Rc::new(RefCell::new(packet)), // // session_rc.clone(), // // )); // } // Err(e) => { // // println!("ERROR Data: {:?}", packet); // // println!("ERROR Code: {:?}", e); // println!("ERROR Desc: {}", e); // // ExamplePulgin::new("Example Plugin with out session", &packet, None).await; // } // } // // H // } // } async fn test1() { let cap = PacketCapture::new("eth0"); let mut stream = cap.test(BoxCodec); loop { let temp = stream.next().await.unwrap(); let packet_pacp = temp.unwrap(); // let (packet, session_rc) = packet_pacp2packet(packet_pacp); let mut packet = Packet::new(&packet_pacp, packet_pacp.len().try_into().unwrap()); let result: Result< Option>>, stellar_rs::packet::error::PacketError, > = packet.handle(); // let packet = packet.unwrap(); // 一定有值 } // stream.for_each(|packet_pacp| async { // let (packet, session) = packet_pacp2packet(packet_pacp); // let packet = packet.unwrap(); // 一定有值 // ExamplePulgin::new( // "Example Plugin", // Rc::new(RefCell::new(packet)), // session.clone(), // ) // .await; // }) // let s = stream! { // let cap = PacketCapture::new("eth0"); // // while let Ok(packet_pacp) = cap.borrow_mut().capture.next_packet() { // // println!("\n==================== New Packet ====================\n"); // // println!("Packet->header : {:?}", packet_pacp.header); // // println!("Packet->data : {:?}", packet_pacp.data); // // yield packet_pacp; // // } // }; // loop { // let (packet, session) = test2(cap.clone()).await; // // plug_entry(packet, session); // let a = Executor::spawn(async move { // ExamplePulgin::new( // "Example Plugin", // Rc::new(RefCell::new(packet)), // session.clone(), // ) // .await // }); // } } // async fn test2<'a>(cap: Rc>) -> (Packet<'a>, Option>>) { // // let mut cap = pacp_init("eth0"); // let cap = &(*cap.borrow()); // // 使用 unsafe 强制解引用为 &mut PacketCapture // // let cap_ptr: *mut RefCell = Rc::into_raw(cap); // // let cap_ref: &mut RefCell = unsafe { &mut *cap_ptr }; // // let packet_capture: &mut PacketCapture = cap_ref.get_mut().unwrap(); // let packet_pacp: PacketPacp = capture_packet(cap).unwrap(); // let (packet, session_rc) = packet_pacp2packet(packet_pacp); // let packet = packet.unwrap(); // 一定有值 // return (packet, session_rc); // } // fn pacp_init(device: &str) -> Rc> { // PacketCapture::show_devices(); // let cap = PacketCapture::new(device); // return Rc::new(RefCell::new(cap)); // } // fn capture_packet<'a>(cap: &PacketCapture) -> Option> { // let cap_mut_ref: &mut PacketCapture = unsafe { // // 在 unsafe 块中将不可变引用转换为可变引用 // &mut *(cap as *const PacketCapture as *mut PacketCapture) // }; // while let Ok(packet_pacp) = cap_mut_ref.capture.next_packet() { // println!("\n==================== New Packet ====================\n"); // println!("Packet->header : {:?}", packet_pacp.header); // println!("Packet->data : {:?}", packet_pacp.data); // return Some(packet_pacp); // } // None // } fn packet_pacp2packet<'a>( packet_pacp: PacketPacp<'a>, ) -> (Option>, Option>>) { let mut packet = Packet::new(&packet_pacp.data, packet_pacp.header.len); let result: Result< Option>>, stellar_rs::packet::error::PacketError, > = packet.handle(); match result { Ok(session_rc) => { // need session print!("session: {:?}", session_rc); // return (packet, session_rc.clone()); return (Some(packet), session_rc.clone()); } Err(e) => { // println!("ERROR Data: {:?}", packet); // println!("ERROR Code: {:?}", e); println!("ERROR Desc: {}", e); // ExamplePulgin::new("Example Plugin with out session", &packet, None).await; return (Some(packet), None); } } }