summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzy <[email protected]>2023-10-08 17:35:34 +0000
committerzy <[email protected]>2023-10-08 17:35:34 +0000
commit831eaa467b2a44c4aac503f6ae418bec6f70d95c (patch)
tree5a2743d17270ee191441f440e82a491d74edd700
parent5932b2eed8e3700a69253e180487e0be3fc9470e (diff)
warp PacketCapture into a async stream.
-rw-r--r--src/packet/capture.rs132
1 files changed, 127 insertions, 5 deletions
diff --git a/src/packet/capture.rs b/src/packet/capture.rs
index 58fe348..7f37b30 100644
--- a/src/packet/capture.rs
+++ b/src/packet/capture.rs
@@ -1,7 +1,11 @@
use crate::thread::thread::ThreadContex;
-use pcap::Capture;
+use futures::{Future, Stream};
+use pcap::{Capture, Error, Packet};
use std::cell::RefCell;
+use std::marker::PhantomData;
+use std::pin::Pin;
use std::rc::Rc;
+use std::task::Poll;
pub struct PacketCapture {
pub capture: Capture<pcap::Active>,
@@ -19,10 +23,7 @@ impl PacketCapture {
PacketCapture { capture }
}
- pub fn poll_packet(
- &mut self,
- callback: fn(data: &[u8], len: u32)
- ) {
+ pub fn poll_packet(&mut self, callback: fn(data: &[u8], len: u32)) {
let mut packet_num = 0;
while let Ok(packet) = self.capture.next_packet() {
packet_num += 1;
@@ -44,4 +45,125 @@ impl PacketCapture {
println!("{:#?}", device);
}
}
+
+ pub fn test<C: PacketCodec>(self, codec: C) -> PacketStream<C> {
+ PacketStream::new(self.capture, codec)
+ }
+}
+
+pub struct PacketStream<C> {
+ pub capture: Capture<pcap::Active>,
+ codec: C,
+}
+
+impl<C> PacketStream<C> {
+ pub fn new(capture: Capture<pcap::Active>, codec: C) -> Self {
+ PacketStream { capture, codec }
+ }
+}
+
+pub trait PacketCodec {
+ type Item;
+ fn decode(&mut self, packet: Packet) -> Self::Item;
+}
+
+pub struct BoxCodec;
+impl PacketCodec for BoxCodec {
+ type Item = Box<[u8]>;
+ fn decode(&mut self, packet: Packet) -> Self::Item {
+ packet.data.into()
+ }
+}
+
+impl<C> Unpin for PacketStream<C> {}
+
+impl<C: PacketCodec> Future for PacketStream<C> {
+ type Output = Result<C::Item, Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ let stream = Pin::into_inner(self);
+ let codec = &mut stream.codec;
+
+ match stream.capture.next_packet() {
+ Ok(p) => Poll::Ready(Ok(codec.decode(p))),
+ // Err(e @ Error::TimeoutExpired) => Err(io::Error::new(io::ErrorKind::WouldBlock, e)),
+ // Err(e) => Ok(Err(e)),
+ _ => Poll::Pending,
+ }
+ }
+}
+
+impl<C: PacketCodec> futures::Stream for PacketStream<C> {
+ type Item = Result<C::Item, Error>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let stream = Pin::into_inner(self);
+ let codec = &mut stream.codec;
+
+ loop {
+ match stream.capture.next_packet() {
+ Ok(p) => return Poll::Ready(Some(Ok(codec.decode(p)))),
+ _ => continue,
+ }
+ }
+ }
}
+
+// pub trait PacketCodec<'a> {
+// type Item<'b>: 'a
+// where
+// Self: 'a,
+// Self: 'b;
+// fn decode(&'a mut self, packet: Packet<'a>) -> Self::Item<'a>;
+// }
+
+// pub struct PacketStream<'a, C> {
+// pub capture: Capture<pcap::Active>,
+// codec: C,
+// _phantom: PhantomData<&'a ()>,
+// }
+
+// impl<'a, C> PacketStream<'a, C> {
+// pub fn new(capture: Capture<pcap::Active>, codec: C) -> Self {
+// PacketStream {
+// capture,
+// codec,
+// _phantom: PhantomData,
+// }
+// }
+// }
+
+// pub struct BoxCodec;
+// impl<'a> PacketCodec<'a> for BoxCodec {
+// type Item<'b> = Packet<'b> where Self: 'a, Self: 'b;
+// fn decode(&'a mut self, packet: Packet<'a>) -> Self::Item<'a> {
+// packet.data.into()
+// }
+// }
+
+// impl<C> Unpin for PacketStream<'_, C> {}
+
+// impl<'a, C> futures::Stream for PacketStream<'a, C>
+// where
+// C: PacketCodec<'a> + 'static,
+// {
+// type Item = Result<C::Item<'a>, Error>;
+
+// fn poll_next(
+// self: Pin<&mut Self>,
+// cx: &mut std::task::Context<'_>,
+// ) -> Poll<Option<Self::Item>> {
+// let stream = Pin::into_inner(self);
+// let codec = &mut stream.codec;
+
+// loop {
+// match stream.capture.next_packet() {
+// Ok(p) => return Poll::Ready(Some(Ok(codec.decode(p)))),
+// _ => continue,
+// }
+// }
+// }
+// }