diff options
| author | chenzizhan <[email protected]> | 2023-09-07 17:04:16 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2023-09-07 17:04:16 +0800 |
| commit | 138fdfbed1b16dcd44b2d3e2ee5b19b103915bf0 (patch) | |
| tree | c802dd72dcecc5cf675d40c7adda78615bf5e79c | |
| parent | df7ba937f53b93275a705952f44a6f87a23bcb77 (diff) | |
wip
| -rw-r--r-- | src/main.rs | 26 | ||||
| -rw-r--r-- | src/session/duration.rs | 61 | ||||
| -rw-r--r-- | src/session/mod.rs | 3 | ||||
| -rw-r--r-- | src/session/tcp_reassembly.rs | 475 |
4 files changed, 306 insertions, 259 deletions
diff --git a/src/main.rs b/src/main.rs index 2eee4d3..e7a11a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,29 +1,3 @@ -struct Boo { - a: int32, -} - -struct Foo { - a: int32, -} - -fn boo_cb(boo: &Boo) { - println!("boo_cb"); -} - -fn foo_cb(foo: &Foo, a: int32) { - println!("foo_cb"); - println!("{}", a); -} - -enum Event { - CallBoo, - CallFoo, -} - -struct EventCb { - event: Event, -} - fn main() { println!("Hello, world!"); } diff --git a/src/session/duration.rs b/src/session/duration.rs new file mode 100644 index 0000000..67fc245 --- /dev/null +++ b/src/session/duration.rs @@ -0,0 +1,61 @@ +use std::ops::{Add,Sub}; + +/// Reimplementation of std::time::Duration, but panic-free +/// and partial, only to match our needs: +/// - use micros instead of nanos, avoid casts +/// - expose fields +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Debug)] +pub struct Duration { + pub secs: u32, + pub micros: u32, +} + +pub const MICROS_PER_SEC: u32 = 1_000_000; + +impl Duration { + /// Build Duration from secs and micros + pub fn new(secs:u32, micros:u32) -> Duration { + Duration{ secs, micros } + } + /// Test if Duration object is null + #[inline] + pub fn is_null(self) -> bool { + self.secs == 0 && self.micros == 0 + } +} + +impl Add for Duration { + type Output = Duration; + + #[allow(clippy::suspicious_arithmetic_impl)] + fn add(self, other: Duration) -> Self::Output { + let secs = self.secs.wrapping_add(other.secs); + let micros = self.micros.wrapping_add(other.micros); + let (secs,micros) = if micros > MICROS_PER_SEC { + (secs + (micros / MICROS_PER_SEC), micros % MICROS_PER_SEC) + } else { + (secs,micros) + }; + + Duration{ secs, micros } + } +} + +impl Sub for Duration { + type Output = Duration; + + #[allow(clippy::suspicious_arithmetic_impl)] + fn sub(self, other: Duration) -> Self::Output { + let secs = self.secs.wrapping_sub(other.secs); + let (secs,micros) = if self.micros >= other.micros { + (secs,self.micros - other.micros) + } else { + let diff = other.micros.wrapping_sub(self.micros); + let secs_less = diff / MICROS_PER_SEC; + let micros = MICROS_PER_SEC - diff; + (secs.wrapping_sub(1 + secs_less),micros) + }; + + Duration{ secs, micros } + } +} diff --git a/src/session/mod.rs b/src/session/mod.rs index d3a09dc..f91fd0a 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -1 +1,2 @@ -pub mod tcp_reassembly;
\ No newline at end of file +pub mod tcp_reassembly; +pub(super) mod duration;
\ No newline at end of file diff --git a/src/session/tcp_reassembly.rs b/src/session/tcp_reassembly.rs index 98d0164..4b74d21 100644 --- a/src/session/tcp_reassembly.rs +++ b/src/session/tcp_reassembly.rs @@ -1,14 +1,34 @@ -use pnet_macros_support::packet::Packet as PnetPacket; -use std::cmp::Ordering; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::fmt; -use std::net::IpAddr; +use std::net::{IpAddr, Ipv4Addr}; use std::num::Wrapping; -use pnet_packet::tcp::TcpFlags; +use super::duration::Duration; use crate::protocol::ipv4::IPv4Header; -use crate::protocol::tcp::TcpHeader; +use crate::protocol::tcp::{TcpHeader, TcpOption}; use crate::packet::packet::Encapsulation; +use crate::packet::packet::Packet as RawPacket; +const DEFAULT_TIMEOUT: Duration = Duration{secs:7200, micros:0}; // 120 min timeout, currently do not support option 28 parse +// todo: const TCP_OPTION_TIMESTAMPS: u8 = 28; https://datatracker.ietf.org/doc/rfc5482/ + +#[derive(Debug, Eq, PartialEq)] +pub enum TcpStreamError { + Anomaly, + /// Connection is OK, but sides are inverted + Inverted, + /// Packet received but connection has expired + Expired, + HandshakeFailed, +} + +pub enum TcpConnectionErr { + PacketNotV4Tcp, + WrongFlags, // todo返回当前的会话状态,以及期望的flag + // todo: TCP header to flag:u16 + ExpiredSession, + /// the seq number is wrong in the 2nd or 3rd handshake + HandshakeFailed, +} #[derive(Debug, Eq, PartialEq)] #[allow(dead_code)] @@ -32,6 +52,16 @@ impl Default for TcpStatus { } } +enum TcpFlags { + FIN = 0x01, + SYN = 0x02, + RST = 0x04, + PSH = 0x08, + ACK = 0x10, + URG = 0x20, +} + +#[derive(Debug)] struct TcpPacket { payload : Vec<u8>, ipv4_header : IPv4Header, @@ -59,19 +89,32 @@ impl TcpPacket { fn payload(&self) -> &[u8] { self.payload.as_slice() } + fn get_timestamp(&self) -> Duration { // todo: 感觉这个duration没什么用,改成u32 吧,单位是秒 + let mut timeVal:u32 = 0; + if let Some(options) = &self.tcp_header.options { + for option in options { + if let TcpOption::TIMESTAMPS{length, ts_value, ts_reply} = option { + timeVal = *ts_value; + } + } + } + Duration::new(timeVal, 0) + } } #[derive(Debug)] pub struct TcpSegment { pub rel_seq: Wrapping<u32>, pub rel_ack: Wrapping<u32>, - pub flags: u16, - pub data: Vec<u8>, + pub payload: Vec<u8>, + + // packet: TcpPacket, ipv4_header: IPv4Header, tcp_header: TcpHeader, + // todo: 替换成packet } -fn encapsulation_convert_to_my_packet(encapsulation: Vec<Encapsulation<'a>>) -> Option<TcpPacket> { +fn encapsulation_convert_to_my_packet(encapsulation: & Vec<Encapsulation<'_>>) -> Result<TcpPacket, TcpConnectionErr> { let mut payload = Vec::new(); let mut ipv4_header = Option::None; let mut tcp_header = Option::None; @@ -88,20 +131,20 @@ fn encapsulation_convert_to_my_packet(encapsulation: Vec<Encapsulation<'a>>) -> } } if ipv4_header.is_none() || tcp_header.is_none() { - return None; + return Err(TcpConnectionErr::PacketNotV4Tcp); } - Some(TcpPacket { + Ok(TcpPacket { payload, - ipv4_header: ipv4_header.unwrap(), - tcp_header: tcp_header.unwrap(), + ipv4_header: ipv4_header.unwrap().clone(), + tcp_header: tcp_header.unwrap().clone(), }) } impl TcpSegment { /// Return the offset of the overlapping area if `self` (as left) overlaps on `right` pub fn overlap_offset(&self, right: &TcpSegment) -> Option<usize> { - let next_seq = self.rel_seq + Wrapping(self.data.len() as u32); + let next_seq = self.rel_seq + Wrapping(self.payload.len() as u32); if next_seq > right.rel_seq { let overlap_offset = (right.rel_seq - self.rel_seq).0 as usize; Some(overlap_offset) @@ -114,15 +157,17 @@ impl TcpSegment { /// /// # Panics /// - /// Panics if `offset > self.data.len()` + /// Panics if `offset > self.payload.len()` pub fn split_off(&mut self, offset: usize) -> TcpSegment { - debug_assert!(offset < self.data.len()); - let remaining = self.data.split_off(offset); + assert!(offset < self.payload.len()); + let remaining = self.payload.split_off(offset); let rel_seq = self.rel_seq + Wrapping(offset as u32); TcpSegment { - data: remaining, + payload: remaining, rel_seq, - ..*self + rel_ack: self.rel_ack, + ipv4_header: self.ipv4_header.clone(), + tcp_header: self.tcp_header.clone(), } } } @@ -140,9 +185,9 @@ pub struct TcpPeer { status: TcpStatus, /// The current list of segments (ordered by rel_seq) segments: VecDeque<TcpSegment>, - /// DEBUG: host address - addr: IpAddr, - /// DEBUG: port + /// println: host address + addr: Ipv4Addr, + /// println: port port: u16, } @@ -158,41 +203,23 @@ impl TcpPeer { } } -pub struct TcpStream { +struct TcpStream { pub client: TcpPeer, pub server: TcpPeer, pub status: TcpStatus, - // XXX timestamp of last seen packet - pub last_seen_ts: Duration, + // from packet.option or timeval passed by api argument. Used to check timeout. + // the free of session is NOT decided by this value. The api user should decide it. + pub last_seen_ts: Duration, } -pub struct TcpStreamReassembly { - pub m: HashMap<FlowID, TcpStream>, +pub struct TcpConnection { + stream: TcpStream, pub timeout: Duration, } -impl Default for TcpStreamReassembly { - fn default() -> Self { - TcpStreamReassembly { - m: HashMap::new(), - timeout: Duration::new(14400, 0), - } - } -} - -#[derive(Debug, Eq, PartialEq)] -pub enum TcpStreamError { - Anomaly, - /// Connection is OK, but sides are inverted - Inverted, - /// Packet received but connection has expired - Expired, - HandshakeFailed, -} - impl TcpPeer { - pub fn new(addr: &IpAddr, port: u16) -> Self { + pub fn new(addr: &Ipv4Addr, port: u16) -> Self { TcpPeer { isn: Wrapping(0), ian: Wrapping(0), @@ -206,21 +233,25 @@ impl TcpPeer { } } +fn tmp_take_ownership(t: TcpHeader) { + +} + impl TcpStream { - pub fn new(flow: &Flow) -> Self { + pub fn new(packet: &TcpPacket) -> Self { TcpStream { - client: TcpPeer::new(&flow.five_tuple.src, flow.five_tuple.src_port), - server: TcpPeer::new(&flow.five_tuple.dst, flow.five_tuple.dst_port), + client: TcpPeer::new(&packet.ipv4_header.source_address, packet.tcp_header.source_port), + server: TcpPeer::new(&packet.ipv4_header.dest_address, packet.tcp_header.dest_port), status: TcpStatus::Closed, - last_seen_ts: flow.last_seen, + last_seen_ts: packet.get_timestamp(), } } - pub fn handle_new_connection<'a>( + fn handle_new_connection( &mut self, - tcp: &'a TcpPacket, + tcp: TcpPacket, to_server: bool, - ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> { + ) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> { let seq = Wrapping(tcp.get_sequence()); let ack = Wrapping(tcp.get_acknowledgement()); @@ -238,12 +269,12 @@ impl TcpStream { // client sent a RST, this is expected return Ok(None); } - if tcp.has_flag(TcpFlags::SYN) == 0 { + if !tcp.has_flag(TcpFlags::SYN) { // not a SYN - usually happens at start of pcap if missed SYN - warn!("First packet of a TCP stream is not a SYN"); + println!("First packet of a TCP stream is not a SYN"); // test is ACK + data, and set established if possible - if tcp.has_flag(TcpFlags::ACK) != 0 { - trace!("Trying to catch connection on the fly"); + if tcp.has_flag(TcpFlags::ACK) { + println!("the session has closed, but receive an established packet."); src.isn = seq; src.ian = ack; src.next_rel_seq = Wrapping(0); @@ -257,27 +288,19 @@ impl TcpStream { let segment = TcpSegment { rel_seq: Wrapping(0), rel_ack: Wrapping(0), - data: tcp.payload().to_vec(), // XXX data cloned here + payload: tcp.payload().to_vec(), + tcp_header: tcp.tcp_header, + ipv4_header: tcp.ipv4_header, }; queue_segment(src, segment); return Ok(None); } - return Err(TcpStreamError::Anomaly); + return Err(TcpConnectionErr::WrongFlags); } - if tcp.has_flag(TcpFlags::ACK) != 0 { - warn!("First packet is SYN+ACK - missed SYN?"); - dst.isn = ack - Wrapping(1); - dst.status = TcpStatus::SynSent; - dst.next_rel_seq = Wrapping(1); - src.isn = seq; - src.ian = ack; - src.last_rel_ack = Wrapping(1); - src.next_rel_seq = Wrapping(1); - src.status = TcpStatus::Listen; - // swap sides and tell analyzer to do the same for flow - std::mem::swap(&mut self.client, &mut self.server); - return Err(TcpStreamError::Inverted); + if tcp.has_flag(TcpFlags::ACK) { + println!("First packet is SYN+ACK"); + return Err(TcpConnectionErr::WrongFlags); } src.isn = seq; src.next_rel_seq = Wrapping(1); @@ -285,34 +308,35 @@ impl TcpStream { self.status = TcpStatus::SynSent; src.status = TcpStatus::SynSent; dst.status = TcpStatus::Listen; - // do we have data ? + if !tcp.payload().is_empty() { - warn!("Data in handshake SYN"); + println!("Data in handshake SYN"); // conn.next_rel_seq += Wrapping(tcp.payload().len() as u32); let segment = TcpSegment { rel_seq: seq - src.isn, rel_ack: ack - dst.isn, - flags: tcp_flags, - data: tcp.payload().to_vec(), // XXX data cloned here + payload: tcp.payload().to_vec(), + tcp_header: tcp.tcp_header, + ipv4_header: tcp.ipv4_header, }; queue_segment(src, segment); } } // Server -- SYN+ACK --> Client TcpStatus::Listen => { - if tcp.has_flag(TcpFlags::SYN) || tcp.has_flag(TcpFlags::ACK) == 0 { - warn!("Not a SYN or ACK"); - return Err(TcpStreamError::Anomaly); + if !tcp.has_flag(TcpFlags::SYN) && !tcp.has_flag(TcpFlags::ACK) { + println!("Not a SYN + ACK"); + return Err(TcpConnectionErr::WrongFlags); } // if we had data in SYN, add its length let next_rel_seq = if dst.segments.is_empty() { Wrapping(1) } else { - Wrapping(1) + Wrapping(dst.segments[0].data.len() as u32) + Wrapping(1) + Wrapping(dst.segments[0].payload.len() as u32) }; if ack != dst.isn + next_rel_seq { - warn!("NEW/SYN-ACK: ack number is wrong"); - return Err(TcpStreamError::HandshakeFailed); + println!("NEW/SYN-ACK: ack number is wrong"); + return Err(TcpConnectionErr::HandshakeFailed); } src.isn = seq; src.next_rel_seq = Wrapping(1); @@ -326,25 +350,25 @@ impl TcpStream { } // Client -- ACK --> Server TcpStatus::SynSent => { - if tcp.has_flag(TcpFlags::ACK) == 0 { + if !tcp.has_flag(TcpFlags::ACK) { if tcp.has_flag(TcpFlags::SYN) { // can be a SYN resend if seq == src.isn && ack.0 == 0 { - trace!("SYN resend - ignoring"); + println!("SYN resend - ignoring"); return Ok(None); } // can be a disordered handshake (receive S after SA) if seq + Wrapping(1) == dst.ian { - trace!("Likely received SA before S - ignoring"); + println!("Likely received SA before S - ignoring"); return Ok(None); } } - warn!("Not an ACK"); + println!("Not an ACK"); } - // TODO check seq, ack + if ack != dst.isn + Wrapping(1) { - warn!("NEW/ACK: ack number is wrong"); - return Err(TcpStreamError::HandshakeFailed); + println!("NEW/ACK: ack number is wrong"); + return Err(TcpConnectionErr::HandshakeFailed); } src.status = TcpStatus::Established; dst.status = TcpStatus::Established; @@ -352,12 +376,13 @@ impl TcpStream { self.status = TcpStatus::Established; // do we have data ? if !tcp.payload().is_empty() { - // warn!("Data in handshake ACK"); + // println!("Data in handshake ACK"); let segment = TcpSegment { rel_seq: seq - src.isn, rel_ack: ack - dst.isn, - flags: tcp_flags, - data: tcp.payload().to_vec(), // XXX data cloned here + payload: tcp.payload().to_vec(), // XXX data cloned here + tcp_header: tcp.tcp_header, + ipv4_header: tcp.ipv4_header, }; queue_segment(src, segment); } @@ -370,18 +395,19 @@ impl TcpStream { // ignore return Ok(None); } - warn!("Received unexpected data in SYN_RCV state"); + println!("Received unexpected data in SYN_RCV state"); + return Err(TcpConnectionErr::WrongFlags); } _ => unreachable!(), } Ok(None) } - pub fn handle_established_connection<'a>( + pub fn handle_established_connection( &mut self, - tcp: &'a TcpPacket, + tcp: TcpPacket, to_server: bool, - ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> { + ) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> { let (origin, destination) = if to_server { (&mut self.client, &mut self.server) } else { @@ -390,40 +416,42 @@ impl TcpStream { let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn; let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn; + let has_ack = tcp.has_flag(TcpFlags::ACK); // get it before borrowing tcp - trace!("EST: payload len={}", tcp.payload().len()); - trace!( + println!("EST: payload len={}", tcp.payload().len()); + println!( " Tcp rel seq {} ack {} next seq {}", rel_seq, rel_ack, origin.next_rel_seq ); - if tcp.has_flag(TcpFlags::ACK) == 0 && tcp.get_acknowledgement() != 0 { - warn!( - "EST/ packet without ACK (broken TCP implementation or attack)", + if !tcp.has_flag(TcpFlags::ACK) && tcp.get_acknowledgement() != 0 { + println!( + "Established state packet without ACK (broken TCP implementation or attack)", ); // ignore segment - return Ok(None); + return Err(TcpConnectionErr::WrongFlags); } let segment = TcpSegment { rel_seq, rel_ack, - data: tcp.payload().to_vec(), // XXX data cloned here + payload: tcp.payload().to_vec(), // XXX data cloned here + tcp_header: tcp.tcp_header, + ipv4_header: tcp.ipv4_header, }; queue_segment(origin, segment); - // trace!("Destination: {:?}", destination); // TODO to remove // if there is a ACK, check & send segments on the *other* side - let ret = if tcp.has_flag(TcpFlags::ACK) != 0 { + let ret = if has_ack { send_peer_segments(destination, rel_ack) } else { None }; - trace!( + println!( " PEER EST rel next seq {} last_ack {}", destination.next_rel_seq, destination.last_rel_ack, @@ -434,9 +462,9 @@ impl TcpStream { fn handle_closing_connection( &mut self, - tcp: &TcpPacket, + tcp: TcpPacket, to_server: bool, - ) -> Option<Vec<TcpSegment>> { + ) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> { let (mut origin, destination) = if to_server { (&mut self.client, &mut self.server) } else { @@ -449,36 +477,36 @@ impl TcpStream { let has_fin = tcp.has_flag(TcpFlags::FIN); let ret = if has_ack { - trace!("ACKing segments up to {}", rel_ack); + println!("ACKing segments up to {}", rel_ack); send_peer_segments(destination, rel_ack) } else { if tcp.get_acknowledgement() != 0 { - warn!( + println!( "EST/ packet without ACK (broken TCP implementation or attack)", ); // ignore segment - return None; + return Err(TcpConnectionErr::WrongFlags); } None }; if tcp.has_flag(TcpFlags::RST) { // if we get a RST, check the sequence number and remove matching segments - // trace!("RST received. rel_seq: {}", rel_seq); - // trace!( + // println!("RST received. rel_seq: {}", rel_seq); + // println!( // "{} remaining (undelivered) segments DESTINATION", // destination.segments.len() // ); // for (n, s) in destination.segments.iter().enumerate() { - // trace!(" s[{}]: rel_seq={} plen={}", n, s.rel_seq, s.data.len()); + // println!(" s[{}]: rel_seq={} plen={}", n, s.rel_seq, s.payload.len()); // } // remove queued segments up to rel_seq destination.segments.retain(|s| s.rel_ack != rel_seq); - trace!( + println!( "RST: {} remaining (undelivered) segments DESTINATION after removal", destination.segments.len() ); origin.status = TcpStatus::Closed; // XXX except if ACK ? - return ret; + return Ok(ret); } // queue segment (even if FIN, to get correct seq numbers) @@ -487,18 +515,20 @@ impl TcpStream { let segment = TcpSegment { rel_seq, rel_ack, - data: tcp.payload().to_vec(), // XXX data cloned here + payload: tcp.payload().to_vec(), // XXX data cloned here + tcp_header: tcp.tcp_header, + ipv4_header: tcp.ipv4_header, }; queue_segment(origin, segment); // if tcp_flags & TcpFlags::FIN != 0 { - // warn!("origin next seq was {}", origin.next_rel_seq.0); + // println!("origin next seq was {}", origin.next_rel_seq.0); // origin.next_rel_seq += Wrapping(1); // } match origin.status { TcpStatus::Established => { - // we know there is a FIN (tested in TcpStreamReassembly::update) + // we know there is a FIN (tested in TcpConnection::update) origin.status = TcpStatus::FinWait1; destination.status = TcpStatus::CloseWait; // we are not sure it was received } @@ -506,16 +536,16 @@ impl TcpStream { if !has_fin { // if only an ACK, do nothing and stay in CloseWait status if has_ack { - // debug!("destination status: {:?}", destination.status); + // println!("destination status: {:?}", destination.status); if destination.status == TcpStatus::FinWait1 { destination.status = TcpStatus::FinWait2; } } else { - warn!("Origin should have sent a FIN and/or ACK"); + println!("Origin should have sent a FIN and/or ACK"); } } else { origin.status = TcpStatus::LastAck; - // debug!("destination status: {:?}", destination.status); + // println!("destination status: {:?}", destination.status); if has_ack || destination.status == TcpStatus::FinWait2 { destination.status = TcpStatus::TimeWait; } else { @@ -532,38 +562,32 @@ impl TcpStream { } } _ => { - warn!( + println!( "Unhandled closing transition: origin host {} status {:?}", origin.addr, origin.status ); - warn!( + println!( " dest host {} status {:?}", destination.addr, destination.status ); } } - trace!( + println!( "TCP connection closing, {} remaining (undelivered) segments", origin.segments.len() ); - // DEBUG + // println for (n, s) in origin.segments.iter().enumerate() { - trace!( + println!( " s[{}]: seq={} len={}", n, s.rel_seq.0, - s.data.len(), + s.payload.len(), ); } - // TODO what now? - - if origin.segments.is_empty() { - return ret; - } - - ret + Ok(ret) } // force expiration (for ex after timeout) of this stream @@ -575,31 +599,33 @@ impl TcpStream { fn queue_segment(peer: &mut TcpPeer, segment: TcpSegment) { // only store segments with data, except FIN - if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 { + if segment.payload.is_empty() && !segment.tcp_header.flag_fin { return; } - // // DEBUG + // // println // for (n, s) in peer.segments.iter().enumerate() { - // debug!( + // println!( // " XXX peer s[{}]: rel_seq={} plen={}", // n, // s.rel_seq, - // s.data.len() + // s.payload.len() // ); // } - // trivial case: list is empty - just push segment + + //todo: 老代码有一个 EARLY_DETECT_OVERLAP 不知道干嘛的 + if peer.segments.is_empty() { - trace!("Pushing segment (front)"); + println!("Pushing segment (front)"); peer.segments.push_front(segment); return; } - trace!("Adding segment"); + println!("Adding segment"); peer.insert_sorted(segment); } fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<TcpSegment>> { - trace!( + println!( "Trying to send segments for {}:{} up to {} (last ack: {})", peer.addr, peer.port, @@ -607,7 +633,7 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec< peer.last_rel_ack ); if rel_ack == peer.last_rel_ack { - trace!("re-acking last data, doing nothing"); + println!("re-acking last data, doing nothing"); return None; } if peer.segments.is_empty() { @@ -616,7 +642,7 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec< // is ACK acceptable? if rel_ack < peer.last_rel_ack { - warn!("ACK request for already ACKed data (ack < last_ack)"); + println!("ACK request for already ACKed data (ack < last_ack)"); return None; } @@ -626,18 +652,19 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec< while !peer.segments.is_empty() { let segment = &peer.segments[0]; - trace!( + println!( "segment: rel_seq={} len={}", segment.rel_seq, - segment.data.len() + segment.payload.len() ); - trace!( + println!( " origin.next_rel_seq {} ack {}", peer.next_rel_seq, rel_ack ); + // todo: // if origin.next_rel_seq > rel_ack { - // warn!("next_seq > ack - partial ACK ?"); + // println!("next_seq > ack - partial ACK ?"); // unreachable!(); // XXX do we care about that case? // // break; // } @@ -649,18 +676,18 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec< // safety: segments is just tested above let mut segment = peer.segments.pop_front().unwrap(); - if rel_ack < segment.rel_seq + Wrapping(segment.data.len() as u32) { - // warn!("ACK lower then seq + segment size - SACK?"); - trace!("ACK for part of buffer"); + if rel_ack < segment.rel_seq + Wrapping(segment.payload.len() as u32) { + // println!("ACK lower then seq + segment size - SACK?"); + println!("ACK for part of buffer"); // split data and insert new dummy segment - trace!("rel_ack {} segment.rel_seq {}", rel_ack, segment.rel_seq); - trace!("segment data len {}", segment.data.len()); + println!("rel_ack {} segment.rel_seq {}", rel_ack, segment.rel_seq); + println!("segment data len {}", segment.payload.len()); let acked_len = (rel_ack - segment.rel_seq).0 as usize; let new_segment = segment.split_off(acked_len); - trace!( + println!( "insert new segment from {} len {}", new_segment.rel_ack, - new_segment.data.len() + new_segment.payload.len() ); peer.insert_sorted(new_segment); } @@ -668,19 +695,19 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec< handle_overlap_linux(peer, &mut segment); adjust_seq_numbers(peer, &segment); - trace!( + println!( "ACKed: pushing segment: rel_seq={} len={}", segment.rel_seq, - segment.data.len(), + segment.payload.len(), ); - if !segment.data.is_empty() { + if !segment.payload.is_empty() { acked.push(segment); } } if peer.next_rel_seq != rel_ack { // missed segments, or maybe received FIN ? - warn!( + println!( "TCP ACKed unseen segment next_seq {} != ack {} (Missed segments?)", peer.next_rel_seq, rel_ack ); @@ -701,7 +728,7 @@ fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) { // loop while segment has overlap while let Some(next) = peer.segments.front() { if let Some(overlap_offset) = segment.overlap_offset(next) { - warn!( + println!( "overlaps next candidate (at offset={})", overlap_offset ); @@ -710,15 +737,15 @@ fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) { let next = peer.segments.pop_front().unwrap(); // split next - let overlap_size = segment.data.len() - overlap_offset; - let min_overlap_size = std::cmp::min(overlap_size, next.data.len()); + let overlap_size = segment.payload.len() - overlap_offset; + let min_overlap_size = std::cmp::min(overlap_size, next.payload.len()); // compare overlap area - if next.data[..min_overlap_size] - != segment.data[overlap_offset..overlap_offset + min_overlap_size] + if next.payload[..min_overlap_size] + != segment.payload[overlap_offset..overlap_offset + min_overlap_size] { - warn!("Overlap area differs!"); + println!("Overlap area differs!"); } - if overlap_size >= next.data.len() { + if overlap_size >= next.payload.len() { // subsequent segment starts after and is smaller, so drop it drop(next); continue; @@ -727,67 +754,76 @@ fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) { let mut left = next; let right = left.split_off(overlap_size); // to accept right, merge it into segment - segment.data.extend_from_slice(&right.data); + segment.payload.extend_from_slice(&right.payload); } else { - // trace!("no overlap, break"); + // println!("no overlap, break"); break; } } } fn adjust_seq_numbers(origin: &mut TcpPeer, segment: &TcpSegment) { - if !segment.data.is_empty() { + if !segment.payload.is_empty() { // adding length is wrong in case of overlap - // origin.next_rel_seq += Wrapping(segment.data.len() as u32); - origin.next_rel_seq = segment.rel_seq + Wrapping(segment.data.len() as u32); + // origin.next_rel_seq += Wrapping(segment.payload.len() as u32); + origin.next_rel_seq = segment.rel_seq + Wrapping(segment.payload.len() as u32); } - if segment.flags & TcpFlags::FIN != 0 { - // trace!("Segment has FIN"); + if segment.tcp_header.flag_fin { + // println!("Segment has FIN"); origin.next_rel_seq += Wrapping(1); } } -impl TcpStreamReassembly { - pub(crate) fn update( - &mut self, - flow: &Flow, - tcp: &TcpPacket, - to_server: bool, - ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> { - trace!("5-t: {}", flow.five_tuple); - trace!(" flow id: {:x}", flow.flow_id); - trace!( - " seq: {:x} ack {:x}", - tcp.get_sequence(), - tcp.get_acknowledgement() - ); +impl TcpConnection { + pub(crate) fn try_new(packet: &RawPacket) -> Result<Self, TcpConnectionErr> { + let simple_packet = encapsulation_convert_to_my_packet(&packet.encapsulation)?; + + let mut stream = TcpStream::new(&simple_packet); + let mut connection = TcpConnection { + stream, + timeout: DEFAULT_TIMEOUT, + }; + + if !simple_packet.has_flag(TcpFlags::SYN) { + return Err(TcpConnectionErr::WrongFlags); + } - let mut stream = self - .m - .entry(flow.flow_id) - .or_insert_with(|| TcpStream::new(flow)); - trace!("stream state: {:?}", stream.status); - trace!("to_server: {}", to_server); + connection._update(simple_packet)?; + Ok(connection) + } + + pub(crate) fn update(&mut self, packet: &RawPacket) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> { + let simple_packet = encapsulation_convert_to_my_packet(&packet.encapsulation)?; + self._update(simple_packet) + } + + fn _update(&mut self, tcp: TcpPacket) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> { + let mut stream = &mut self.stream; + println!("stream state: {:?}", stream.status); // check time delay with previous packet before updating - if stream.last_seen_ts > flow.last_seen { - info!("packet received in past"); - } else if flow.last_seen - stream.last_seen_ts > self.timeout { - warn!("TCP stream received packet after timeout"); + let packet_ts = tcp.get_timestamp(); + if stream.last_seen_ts > packet_ts { + println!("packet received in past"); + } else if packet_ts - stream.last_seen_ts > self.timeout { + println!("TCP stream received packet after timeout"); stream.expire(); - return Err(TcpStreamError::Expired); + return Err(TcpConnectionErr::ExpiredSession); } - stream.last_seen_ts = flow.last_seen; + stream.last_seen_ts = packet_ts; + // get origin and destination + let to_server = tcp.ipv4_header.source_address == stream.server.addr && + tcp.tcp_header.source_port == stream.server.port; + println!("to_server: {}", to_server); let (origin, _destination) = if to_server { (&stream.client, &stream.server) } else { (&stream.server, &stream.client) }; - println!( - "origin: {}:{} status {:?}", + println!("origin: {}:{} status {:?}", origin.addr, origin.port, origin.status @@ -800,41 +836,16 @@ impl TcpStreamReassembly { TcpStatus::Established => { // check for close request if tcp.has_flag(TcpFlags::FIN) || tcp.has_flag(TcpFlags::RST) { - trace!("Requesting end of connection"); - Ok(stream.handle_closing_connection(tcp, to_server)) + println!("Requesting end of connection"); + stream.handle_closing_connection(tcp, to_server) } else { stream.handle_established_connection(tcp, to_server) } } - _ => Ok(stream.handle_closing_connection(tcp, to_server)), + _ => stream.handle_closing_connection(tcp, to_server), } } - pub(crate) fn check_expired_connections(&mut self, now: Duration) { - for (flow_id, stream) in self.m.iter_mut() { - if now < stream.last_seen_ts { - warn!( - "stream.last_seen_ts is in the future for flow id {:x}", - flow_id - ); - continue; - } - if now - stream.last_seen_ts > self.timeout { - warn!("TCP stream timeout reached for flow {:x}", flow_id); - stream.expire(); - } - } - } -} -pub(crate) fn finalize_tcp_streams(analyzer: &mut crate::analyzer::Analyzer) { - warn!("expiring all TCP connections"); - for (flow_id, _stream) in analyzer.tcp_defrag.m.iter() { - // TODO do we have anything to do? - if let Some(flow) = analyzer.flows.get_flow(*flow_id) { - debug!(" flow: {:?}", flow); - } - } - analyzer.tcp_defrag.m.clear(); } impl fmt::Debug for TcpPeer { @@ -851,7 +862,7 @@ impl fmt::Debug for TcpPeer { " s[{}]: rel_seq={} len={}", n, s.rel_seq, - s.data.len(), + s.payload.len(), )?; } Ok(()) |
