diff options
| author | zy <[email protected]> | 2023-10-08 17:35:34 +0000 |
|---|---|---|
| committer | zy <[email protected]> | 2023-10-08 17:35:34 +0000 |
| commit | 831eaa467b2a44c4aac503f6ae418bec6f70d95c (patch) | |
| tree | 5a2743d17270ee191441f440e82a491d74edd700 | |
| parent | 5932b2eed8e3700a69253e180487e0be3fc9470e (diff) | |
warp PacketCapture into a async stream.
| -rw-r--r-- | src/packet/capture.rs | 132 |
1 files changed, 127 insertions, 5 deletions
diff --git a/src/packet/capture.rs b/src/packet/capture.rs index 58fe348..7f37b30 100644 --- a/src/packet/capture.rs +++ b/src/packet/capture.rs @@ -1,7 +1,11 @@ use crate::thread::thread::ThreadContex; -use pcap::Capture; +use futures::{Future, Stream}; +use pcap::{Capture, Error, Packet}; use std::cell::RefCell; +use std::marker::PhantomData; +use std::pin::Pin; use std::rc::Rc; +use std::task::Poll; pub struct PacketCapture { pub capture: Capture<pcap::Active>, @@ -19,10 +23,7 @@ impl PacketCapture { PacketCapture { capture } } - pub fn poll_packet( - &mut self, - callback: fn(data: &[u8], len: u32) - ) { + pub fn poll_packet(&mut self, callback: fn(data: &[u8], len: u32)) { let mut packet_num = 0; while let Ok(packet) = self.capture.next_packet() { packet_num += 1; @@ -44,4 +45,125 @@ impl PacketCapture { println!("{:#?}", device); } } + + pub fn test<C: PacketCodec>(self, codec: C) -> PacketStream<C> { + PacketStream::new(self.capture, codec) + } +} + +pub struct PacketStream<C> { + pub capture: Capture<pcap::Active>, + codec: C, +} + +impl<C> PacketStream<C> { + pub fn new(capture: Capture<pcap::Active>, codec: C) -> Self { + PacketStream { capture, codec } + } +} + +pub trait PacketCodec { + type Item; + fn decode(&mut self, packet: Packet) -> Self::Item; +} + +pub struct BoxCodec; +impl PacketCodec for BoxCodec { + type Item = Box<[u8]>; + fn decode(&mut self, packet: Packet) -> Self::Item { + packet.data.into() + } +} + +impl<C> Unpin for PacketStream<C> {} + +impl<C: PacketCodec> Future for PacketStream<C> { + type Output = Result<C::Item, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { + let stream = Pin::into_inner(self); + let codec = &mut stream.codec; + + match stream.capture.next_packet() { + Ok(p) => Poll::Ready(Ok(codec.decode(p))), + // Err(e @ Error::TimeoutExpired) => Err(io::Error::new(io::ErrorKind::WouldBlock, e)), + // Err(e) => Ok(Err(e)), + _ => Poll::Pending, + } + } +} + +impl<C: PacketCodec> futures::Stream for PacketStream<C> { + type Item = Result<C::Item, Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Self::Item>> { + let stream = Pin::into_inner(self); + let codec = &mut stream.codec; + + loop { + match stream.capture.next_packet() { + Ok(p) => return Poll::Ready(Some(Ok(codec.decode(p)))), + _ => continue, + } + } + } } + +// pub trait PacketCodec<'a> { +// type Item<'b>: 'a +// where +// Self: 'a, +// Self: 'b; +// fn decode(&'a mut self, packet: Packet<'a>) -> Self::Item<'a>; +// } + +// pub struct PacketStream<'a, C> { +// pub capture: Capture<pcap::Active>, +// codec: C, +// _phantom: PhantomData<&'a ()>, +// } + +// impl<'a, C> PacketStream<'a, C> { +// pub fn new(capture: Capture<pcap::Active>, codec: C) -> Self { +// PacketStream { +// capture, +// codec, +// _phantom: PhantomData, +// } +// } +// } + +// pub struct BoxCodec; +// impl<'a> PacketCodec<'a> for BoxCodec { +// type Item<'b> = Packet<'b> where Self: 'a, Self: 'b; +// fn decode(&'a mut self, packet: Packet<'a>) -> Self::Item<'a> { +// packet.data.into() +// } +// } + +// impl<C> Unpin for PacketStream<'_, C> {} + +// impl<'a, C> futures::Stream for PacketStream<'a, C> +// where +// C: PacketCodec<'a> + 'static, +// { +// type Item = Result<C::Item<'a>, Error>; + +// fn poll_next( +// self: Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> Poll<Option<Self::Item>> { +// let stream = Pin::into_inner(self); +// let codec = &mut stream.codec; + +// loop { +// match stream.capture.next_packet() { +// Ok(p) => return Poll::Ready(Some(Ok(codec.decode(p)))), +// _ => continue, +// } +// } +// } +// } |
