diff options
| author | chenzizhan <[email protected]> | 2023-09-06 18:04:17 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2023-09-06 18:04:17 +0800 |
| commit | 17c053f231e41db4e48be8885030b86ce0daee8e (patch) | |
| tree | 7050b2b2605e8df59c89fbe43323109018db996b | |
| parent | 43718ed34cf0c182a0a5a0dca7c53c199f67bf49 (diff) | |
restart with library
| -rw-r--r-- | .vscode/settings.json | 3 | ||||
| -rw-r--r-- | src/session/actions.rs | 90 | ||||
| -rw-r--r-- | src/session/mod.rs | 45 | ||||
| -rw-r--r-- | src/session/my_tcp_reassembly_deprecated.rs (renamed from src/session/tcp_session.rs) | 51 | ||||
| -rw-r--r-- | src/session/tcp_reassembly.rs | 1029 | ||||
| -rw-r--r-- | src/session/tcp_stream.rs | 3 | ||||
| -rw-r--r-- | src/session/timeout.rs | 16 |
7 files changed, 1063 insertions, 174 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json index 352a626..706d289 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,8 @@ { "rust-analyzer.linkedProjects": [ "./Cargo.toml" + ], + "cSpell.ignoreWords": [ + "Hasher" ] }
\ No newline at end of file diff --git a/src/session/actions.rs b/src/session/actions.rs deleted file mode 100644 index 602c9c9..0000000 --- a/src/session/actions.rs +++ /dev/null @@ -1,90 +0,0 @@ -// use std::net::{TcpListener, TcpStream}; -use super::tcp_session; -use enum_map::Enum; -use std::collections::BTreeMap; - -pub enum TcpEvent { - SessionClosing, - // todo -} - -impl Enum for TcpEvent{} - -pub(super) struct ActionManager { - // todo: read toml and config. - event_map: enum_map::EnumMap<TcpEvent, Vec<Box<dyn TcpAction>>>, -} - -impl ActionManager { - fn register(&self, event: TcpEvent, action: Box<dyn TcpAction>) { - self.event_map[event].push(action); - } - fn run_event(&self, event: TcpEvent, session: &tcp_session::Session) { - for action in self.event_map[event].iter() { - action.run(session); - } - } -} - -trait TcpAction{ - fn run(&self, session: &tcp_session::Session); -} - - -/* --------------------------------- SendCB --------------------------------- */ -pub enum SendTo { - SendConsole, -} - -pub(super) struct SendAction -{ - send_to: SendTo, -} - -impl SendAction { - pub fn new(send_to: SendTo) -> Self { - SendAction { - send_to, - } - } -} - -fn Contat_segment(buf: &mut Vec<u8>, segment: &BTreeMap<u32, tcp_session::TcpSegment>) { - for (_, segment) in segment.iter() { - buf.extend_from_slice(&segment.payload); // todo: peer 提供一个dump segment 的方法 - } -} - -impl TcpAction for SendCB { - fn run(&self, session: &tcp_session::Session) { - let mut buf: Vec<u8> = Vec::new(); - Contat_segment(&mut buf, &session.client.map); - - match self.send_to { - SendTo::SendConsole => { - println!("SendCB::run"); - println!("{:?}", buf); - }, - } - } -} - - -// // more call backs here - - - -// #[cfg(test)] -// mod tests { -// use super::*; - -// #[test] -// fn test_callbacks() { -// let action_manager = ActionManager{}; -// let action = action_manager.gen_action(Policy::TcpSendConsole); -// action.run(); - - -// println!("hahahahaczzzzzzx"); -// } -// }
\ No newline at end of file diff --git a/src/session/mod.rs b/src/session/mod.rs deleted file mode 100644 index 148ca75..0000000 --- a/src/session/mod.rs +++ /dev/null @@ -1,45 +0,0 @@ - - -pub mod tcp_session; -pub mod actions; - -// enum tcp_session_state { -// TCP_SESSION_STATE_NONE, -// TCP_SESSION_STATE_CONNECTING, -// TCP_SESSION_STATE_CONNECTED, -// TCP_SESSION_STATE_DISCONNECTED, -// TCP_SESSION_STATE_ERROR, -// } - -// struct Peer_addr { -// (ip, port): (Ipv4Addr, u16), -// } - -// struct tcp_session<T: tcp_session_state> { -// source: Peer_addr, -// destination: Peer_addr, - -// window_size: u16, -// used_window_size: u16, - -// phantom: PhantomData<T>, -// } - -// impl tcp_session<tcp_session_state::TCP_SESSION_STATE_NONE> { -// fn new(source: Peer_addr, destination: Peer_addr) -> Self { -// tcp_session { -// source, -// destination, -// window_size: 0, -// used_window_size: 0, -// phantom: PhantomData, -// } -// } -// } - -// struct session_manager { -// sessions: Vec<tcp_session>, // use tree map -// } - -// // todo: impl hash for session - diff --git a/src/session/tcp_session.rs b/src/session/my_tcp_reassembly_deprecated.rs index b3e84fc..a366142 100644 --- a/src/session/tcp_session.rs +++ b/src/session/my_tcp_reassembly_deprecated.rs @@ -29,6 +29,7 @@ enum TcpSessionErr { WrongFlag(String), NoWindowSpace, WrongSender(PeerRole), + SameSeqNum, FurtherCheckClosing, } @@ -48,7 +49,7 @@ impl TcpSegment { } } -struct Packet { +pub(super) struct Packet { ip_dst: Ipv4Addr, ip_src: Ipv4Addr, tcp_header: TcpHeader, @@ -56,6 +57,14 @@ struct Packet { } impl Packet { + pub fn new(ip_dst: Ipv4Addr, ip_src: Ipv4Addr, tcp_header: TcpHeader, segment: &[u8]) -> Packet { + Packet { + ip_dst: ip_dst, + ip_src: ip_src, + tcp_header: tcp_header, + segment: segment, + } + } fn is_sent_by(&self, peer: &Peer) -> bool { self.ip_src == peer.ip && self.tcp_header.source_port == peer.port } @@ -73,7 +82,7 @@ pub(super) struct Peer { isn: u32, // initial sequence number next_seq: u32, // next sequence number - total_win: u16, // window scale in tcp options + total_win: u16, // window size after scale used_win: u16, // used window size ip: Ipv4Addr, port: u16, @@ -124,12 +133,16 @@ impl Peer { fn as_receiver(&mut self, packet: &Packet) { self.ian = packet.tcp_header.seq_num; } - fn add_segment(&mut self, packet: &Packet) { + fn add_segment(&mut self, packet: &Packet) -> Result<(), TcpSessionErr>{ + if let Some(_) = self.map.get(&packet.tcp_header.seq_num) { + return Err(TcpSessionErr::SameSeqNum); + } self.map.insert(packet.tcp_header.seq_num, TcpSegment::new(packet.tcp_header.seq_num, packet.segment.payload.clone()) ); self.used_win += packet.segment.payload.len() as u16; + Ok(()) } } @@ -163,11 +176,11 @@ enum TransferringState { fn send_for_establised(sender: &mut Peer, packet: &Packet) -> Result<(), TcpSessionErr> { let payload_len = packet.segment.payload.len() as u16; - if sender.used_win + payload_len >= sender.total_win { + if sender.used_win + payload_len > sender.total_win { return Err(TcpSessionErr::NoWindowSpace); } if payload_len > 0 { - sender.add_segment(packet); + sender.add_segment(packet)?; } sender.next_seq += { if packet.tcp_header.flag_fin || packet.tcp_header.flag_rst { @@ -394,6 +407,13 @@ impl Session { if packet.is_sent_by(&self.server) { sender = PeerRole::Server; } + let (send_peer, receive_peer) = { + if sender == PeerRole::Client { + (&mut self.client, &mut self.server) + } else { + (&mut self.server, &mut self.client) + } + }; if let ExpectedSender::Fixed(role) = self.get_expected_sender(packet) { if role != sender { @@ -404,15 +424,8 @@ impl Session { match ret { Err(TcpSessionErr::FurtherCheckClosing) => { println!("Further check closing"); - let side_peer = { - if sender == PeerRole::Client { - &mut self.client - } else { - &mut self.server - } - }; - - if side_peer.state == SessionState::TimeWait { // receiver has been in timewait state, not an error + + if receive_peer.state == SessionState::TimeWait { // receiver has been in timewait state, not an error return None; } else { let e = TcpSessionErr::WrongFlag("A peer in closing state expects ack".to_string()); @@ -429,18 +442,16 @@ impl Session { } self.current_state = next_state; + let (sender_state, receiver_state) = self.current_state.get_state(); + send_peer.state = sender_state; + receive_peer.state = receiver_state; + return event; } } } } -trait VisitSession:TcpCallBackAction { - fn visit_session(&mut self, session: &Session); -} - -impl VisitSession for - #[cfg(test)] mod tests { use super::*; diff --git a/src/session/tcp_reassembly.rs b/src/session/tcp_reassembly.rs new file mode 100644 index 0000000..ff19e40 --- /dev/null +++ b/src/session/tcp_reassembly.rs @@ -0,0 +1,1029 @@ +use libpcap_tools::{Duration, Flow, FlowID}; +use pnet_macros_support::packet::Packet as PnetPacket; +use pnet_packet::tcp::{TcpFlags, TcpPacket}; +use std::cmp::Ordering; +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::net::IpAddr; +use std::num::Wrapping; + +const EARLY_DETECT_OVERLAP: bool = false; + +#[derive(Debug, Eq, PartialEq)] +#[allow(dead_code)] +pub enum TcpStatus { + Closed = 0, + Listen, + SynSent, + SynRcv, + Established, + Closing, + CloseWait, + FinWait1, + FinWait2, + LastAck, + TimeWait, +} + +impl Default for TcpStatus { + fn default() -> Self { + TcpStatus::Closed + } +} + +#[derive(Debug)] +pub struct TcpSegment { + pub rel_seq: Wrapping<u32>, + pub rel_ack: Wrapping<u32>, + pub flags: u16, + pub data: Vec<u8>, + pub pcap_index: usize, +} + +impl TcpSegment { + /// Return the offset of the overlapping area if `self` (as left) overlaps on `right` + pub fn overlap_offset(&self, right: &TcpSegment) -> Option<usize> { + let next_seq = self.rel_seq + Wrapping(self.data.len() as u32); + if next_seq > right.rel_seq { + let overlap_offset = (right.rel_seq - self.rel_seq).0 as usize; + Some(overlap_offset) + } else { + None + } + } + + /// Splits the segment into two at the given offset. + /// + /// # Panics + /// + /// Panics if `offset > self.data.len()` + pub fn split_off(&mut self, offset: usize) -> TcpSegment { + debug_assert!(offset < self.data.len()); + let remaining = self.data.split_off(offset); + let rel_seq = self.rel_seq + Wrapping(offset as u32); + TcpSegment { + data: remaining, + rel_seq, + ..*self + } + } +} + +pub struct TcpPeer { + /// Initial Seq number (absolute) + isn: Wrapping<u32>, + /// Initial Ack number (absolute) + ian: Wrapping<u32>, + /// Next Seq number + next_rel_seq: Wrapping<u32>, + /// Last acknowledged number + last_rel_ack: Wrapping<u32>, + /// Connection state + status: TcpStatus, + /// The current list of segments (ordered by rel_seq) + segments: VecDeque<TcpSegment>, + /// DEBUG: host address + addr: IpAddr, + /// DEBUG: port + port: u16, +} + +impl TcpPeer { + fn insert_sorted(&mut self, s: TcpSegment) { + for (n, item) in self.segments.iter().enumerate() { + if item.rel_seq > s.rel_seq { + self.segments.insert(n, s); + return; + } + } + self.segments.push_back(s); + } +} + +pub struct TcpStream { + pub client: TcpPeer, + pub server: TcpPeer, + pub status: TcpStatus, + // XXX timestamp of last seen packet + pub last_seen_ts: Duration, +} + +pub struct TcpStreamReassembly { + pub m: HashMap<FlowID, TcpStream>, + + pub timeout: Duration, +} + +impl Default for TcpStreamReassembly { + fn default() -> Self { + TcpStreamReassembly { + m: HashMap::new(), + timeout: Duration::new(14400, 0), + } + } +} + +#[derive(Debug, Eq, PartialEq)] +pub enum TcpStreamError { + Anomaly, + /// Connection is OK, but sides are inverted + Inverted, + /// Packet received but connection has expired + Expired, + HandshakeFailed, +} + +impl TcpPeer { + pub fn new(addr: &IpAddr, port: u16) -> Self { + TcpPeer { + isn: Wrapping(0), + ian: Wrapping(0), + next_rel_seq: Wrapping(0), + last_rel_ack: Wrapping(0), + status: TcpStatus::Closed, + segments: VecDeque::new(), + addr: *addr, + port, + } + } +} + +impl TcpStream { + pub fn new(flow: &Flow) -> Self { + TcpStream { + client: TcpPeer::new(&flow.five_tuple.src, flow.five_tuple.src_port), + server: TcpPeer::new(&flow.five_tuple.dst, flow.five_tuple.dst_port), + status: TcpStatus::Closed, + last_seen_ts: flow.last_seen, + } + } + + pub fn handle_new_connection<'a>( + &mut self, + tcp: &'a TcpPacket, + to_server: bool, + pcap_index: usize, + ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> { + let seq = Wrapping(tcp.get_sequence()); + let ack = Wrapping(tcp.get_acknowledgement()); + let tcp_flags = tcp.get_flags(); + + let (mut src, mut dst) = if to_server { + (&mut self.client, &mut self.server) + } else { + (&mut self.server, &mut self.client) + }; + + match src.status { + // Client -- SYN --> Server + TcpStatus::Closed => { + if tcp_flags & TcpFlags::RST != 0 { + // TODO check if destination.segments must be removed + // client sent a RST, this is expected + return Ok(None); + } + if tcp_flags & TcpFlags::SYN == 0 { + // not a SYN - usually happens at start of pcap if missed SYN + warn!("First packet of a TCP stream is not a SYN"); + // test is ACK + data, and set established if possible + if tcp_flags & TcpFlags::ACK != 0 { + trace!("Trying to catch connection on the fly"); + src.isn = seq; + src.ian = ack; + src.next_rel_seq = Wrapping(0); + src.status = TcpStatus::Established; + dst.isn = ack; + dst.ian = seq; + dst.status = TcpStatus::Established; + dst.last_rel_ack = Wrapping(0); + self.status = TcpStatus::Established; + // queue segment (even if FIN, to get correct seq numbers) + let segment = TcpSegment { + rel_seq: Wrapping(0), + rel_ack: Wrapping(0), + flags: tcp_flags, + data: tcp.payload().to_vec(), // XXX data cloned here + pcap_index, + }; + queue_segment(src, segment); + + return Ok(None); + } + return Err(TcpStreamError::Anomaly); + } + if tcp_flags & TcpFlags::ACK != 0 { + warn!("First packet is SYN+ACK - missed SYN?"); + dst.isn = ack - Wrapping(1); + dst.status = TcpStatus::SynSent; + dst.next_rel_seq = Wrapping(1); + src.isn = seq; + src.ian = ack; + src.last_rel_ack = Wrapping(1); + src.next_rel_seq = Wrapping(1); + src.status = TcpStatus::Listen; + // swap sides and tell analyzer to do the same for flow + std::mem::swap(&mut self.client, &mut self.server); + return Err(TcpStreamError::Inverted); + } + src.isn = seq; + src.next_rel_seq = Wrapping(1); + dst.ian = seq; + self.status = TcpStatus::SynSent; + src.status = TcpStatus::SynSent; + dst.status = TcpStatus::Listen; + // do we have data ? + if !tcp.payload().is_empty() { + warn!("Data in handshake SYN"); + // conn.next_rel_seq += Wrapping(tcp.payload().len() as u32); + let segment = TcpSegment { + rel_seq: seq - src.isn, + rel_ack: ack - dst.isn, + flags: tcp_flags, + data: tcp.payload().to_vec(), // XXX data cloned here + pcap_index, + }; + queue_segment(src, segment); + } + } + // Server -- SYN+ACK --> Client + TcpStatus::Listen => { + if tcp_flags != (TcpFlags::SYN | TcpFlags::ACK) { + // XXX ? + } + // if we had data in SYN, add its length + let next_rel_seq = if dst.segments.is_empty() { + Wrapping(1) + } else { + Wrapping(1) + Wrapping(dst.segments[0].data.len() as u32) + }; + if ack != dst.isn + next_rel_seq { + warn!("NEW/SYN-ACK: ack number is wrong"); + return Err(TcpStreamError::HandshakeFailed); + } + src.isn = seq; + src.next_rel_seq = Wrapping(1); + dst.ian = seq; + dst.last_rel_ack = Wrapping(1); + + src.status = TcpStatus::SynRcv; + self.status = TcpStatus::SynRcv; + + // do not push data if we had some in SYN, it will be done after handshake succeeds + } + // Client -- ACK --> Server + TcpStatus::SynSent => { + if tcp_flags & TcpFlags::ACK == 0 { + if tcp_flags == TcpFlags::SYN { + // can be a SYN resend + if seq == src.isn && ack.0 == 0 { + trace!("SYN resend - ignoring"); + return Ok(None); + } + // can be a disordered handshake (receive S after SA) + if seq + Wrapping(1) == dst.ian { + trace!("Likely received SA before S - ignoring"); + return Ok(None); + } + } + warn!("Not an ACK"); + } + // TODO check seq, ack + if ack != dst.isn + Wrapping(1) { + warn!("NEW/ACK: ack number is wrong"); + return Err(TcpStreamError::HandshakeFailed); + } + src.status = TcpStatus::Established; + dst.status = TcpStatus::Established; + dst.last_rel_ack = Wrapping(1); + self.status = TcpStatus::Established; + // do we have data ? + if !tcp.payload().is_empty() { + // warn!("Data in handshake ACK"); + let segment = TcpSegment { + rel_seq: seq - src.isn, + rel_ack: ack - dst.isn, + flags: tcp_flags, + data: tcp.payload().to_vec(), // XXX data cloned here + pcap_index, + }; + queue_segment(src, segment); + } + } + TcpStatus::SynRcv => { + // we received something while in SYN_RCV state - we should only have sent ACK + // this could be a SYN+ACK retransmit + if tcp_flags == TcpFlags::SYN | TcpFlags::ACK { + // XXX compare SEQ numbers? + // ignore + return Ok(None); + } + warn!( + "Received unexpected data in SYN_RCV state idx={}", + pcap_index + ); + } + _ => unreachable!(), + } + Ok(None) + } + + pub fn handle_established_connection<'a>( + &mut self, + tcp: &'a TcpPacket, + to_server: bool, + pcap_index: usize, + ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> { + let (origin, destination) = if to_server { + (&mut self.client, &mut self.server) + } else { + (&mut self.server, &mut self.client) + }; + + let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn; + let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn; + let tcp_flags = tcp.get_flags(); + + trace!("EST: payload len={}", tcp.payload().len()); + trace!( + " Tcp rel seq {} ack {} next seq {}", + rel_seq, + rel_ack, + origin.next_rel_seq + ); + + if tcp_flags & TcpFlags::ACK == 0 && tcp.get_acknowledgement() != 0 { + warn!( + "EST/ packet without ACK (broken TCP implementation or attack) idx={}", + pcap_index + ); + // ignore segment + return Ok(None); + } + + let segment = TcpSegment { + rel_seq, + rel_ack, + flags: tcp_flags, + data: tcp.payload().to_vec(), // XXX data cloned here + pcap_index, + }; + queue_segment(origin, segment); + + // trace!("Destination: {:?}", destination); // TODO to remove + + // if there is a ACK, check & send segments on the *other* side + let ret = if tcp_flags & TcpFlags::ACK != 0 { + send_peer_segments(destination, rel_ack) + } else { + None + }; + + trace!( + " PEER EST rel next seq {} last_ack {}", + destination.next_rel_seq, + destination.last_rel_ack, + ); + + Ok(ret) + } + + fn handle_closing_connection( + &mut self, + tcp: &TcpPacket, + to_server: bool, + pcap_index: usize, + ) -> Option<Vec<TcpSegment>> { + let (mut origin, destination) = if to_server { + (&mut self.client, &mut self.server) + } else { + (&mut self.server, &mut self.client) + }; + + let tcp_flags = tcp.get_flags(); + let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn; + let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn; + let has_ack = tcp_flags & TcpFlags::ACK != 0; + let has_fin = tcp_flags & TcpFlags::FIN != 0; + + let ret = if has_ack { + trace!("ACKing segments up to {}", rel_ack); + send_peer_segments(destination, rel_ack) + } else { + if tcp.get_acknowledgement() != 0 { + warn!( + "EST/ packet without ACK (broken TCP implementation or attack) idx={}", + pcap_index + ); + // ignore segment + return None; + } + None + }; + if tcp_flags & TcpFlags::RST != 0 { + // if we get a RST, check the sequence number and remove matching segments + // trace!("RST received. rel_seq: {}", rel_seq); + // trace!( + // "{} remaining (undelivered) segments DESTINATION", + // destination.segments.len() + // ); + // for (n, s) in destination.segments.iter().enumerate() { + // trace!(" s[{}]: rel_seq={} plen={}", n, s.rel_seq, s.data.len()); + // } + // remove queued segments up to rel_seq + destination.segments.retain(|s| s.rel_ack != rel_seq); + trace!( + "RST: {} remaining (undelivered) segments DESTINATION after removal", + destination.segments.len() + ); + origin.status = TcpStatus::Closed; // XXX except if ACK ? + return ret; + } + + // queue segment (even if FIN, to get correct seq numbers) + let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn; + let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn; + let segment = TcpSegment { + rel_seq, + rel_ack, + flags: tcp_flags, + data: tcp.payload().to_vec(), // XXX data cloned here + pcap_index, + }; + queue_segment(origin, segment); + + // if tcp_flags & TcpFlags::FIN != 0 { + // warn!("origin next seq was {}", origin.next_rel_seq.0); + // origin.next_rel_seq += Wrapping(1); + // } + + match origin.status { + TcpStatus::Established => { + // we know there is a FIN (tested in TcpStreamReassembly::update) + origin.status = TcpStatus::FinWait1; + destination.status = TcpStatus::CloseWait; // we are not sure it was received + } + TcpStatus::CloseWait => { + if !has_fin { + // if only an ACK, do nothing and stay in CloseWait status + if has_ack { + // debug!("destination status: {:?}", destination.status); + if destination.status == TcpStatus::FinWait1 { + destination.status = TcpStatus::FinWait2; + } + } else { + warn!("Origin should have sent a FIN and/or ACK"); + } + } else { + origin.status = TcpStatus::LastAck; + // debug!("destination status: {:?}", destination.status); + if has_ack || destination.status == TcpStatus::FinWait2 { + destination.status = TcpStatus::TimeWait; + } else { + destination.status = TcpStatus::Closing; + } + } + } + TcpStatus::TimeWait => { + // only an ACK should be sent (XXX nothing else, maybe PSH) + if has_ack { + // this is the end! + origin.status = TcpStatus::Closed; + destination.status = TcpStatus::Closed; + } + } + _ => { + warn!( + "Unhandled closing transition: origin host {} status {:?}", + origin.addr, origin.status + ); + warn!( + " dest host {} status {:?}", + destination.addr, destination.status + ); + } + } + + trace!( + "TCP connection closing, {} remaining (undelivered) segments", + origin.segments.len() + ); + // DEBUG + for (n, s) in origin.segments.iter().enumerate() { + trace!( + " s[{}]: seq={} len={} idx={}", + n, + s.rel_seq.0, + s.data.len(), + s.pcap_index, + ); + } + + // TODO what now? + + if origin.segments.is_empty() { + return ret; + } + + ret + } + + // force expiration (for ex after timeout) of this stream + fn expire(&mut self) { + self.client.status = TcpStatus::Closed; + self.server.status = TcpStatus::Closed; + } +} // TcpStream + +fn queue_segment(peer: &mut TcpPeer, segment: TcpSegment) { + // only store segments with data, except FIN + if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 { + return; + } + // // DEBUG + // for (n, s) in peer.segments.iter().enumerate() { + // debug!( + // " XXX peer s[{}]: rel_seq={} plen={}", + // n, + // s.rel_seq, + // s.data.len() + // ); + // } + // trivial case: list is empty - just push segment + if peer.segments.is_empty() { + trace!("Pushing segment (front)"); + peer.segments.push_front(segment); + return; + } + + if EARLY_DETECT_OVERLAP { + // find last element before candidate and first element after candidate + let mut before = None; + let mut after = None; + // let mut opt_pos = None; + for (_n, s) in peer.segments.iter().enumerate() { + if s.rel_seq < segment.rel_seq { + before = Some(s); + } else { + after = Some(s); + // opt_pos = Some(n); + break; + } + } + // trace!("tcp segment insertion index: {:?}", opt_pos); + // check for left overlap + if let Some(s) = before { + let next_seq = s.rel_seq + Wrapping(s.data.len() as u32); + match segment.rel_seq.cmp(&next_seq) { + Ordering::Equal => { + // XXX do nothing, simply queue segment + // // simple case: merge segment + // trace!( + // "Merging segments (seq {} and {})", + // s.rel_seq, + // segment.rel_seq + // ); + // s.data.extend_from_slice(&segment.data); + // s.rel_ack = segment.rel_ack; + // // XXX pcap_index should be a list (and append to it) + // // TODO check next segment in queue to test if a hole was filled + // return; + } + Ordering::Greater => { + // we have a hole + warn!("Missing segment on left of incoming segment"); + } + Ordering::Less => { + // Left overlap + warn!("Segment with left overlap"); + // let overlap_size = (next_seq - segment.rel_seq).0 as usize; + // debug_assert!(overlap_size <= s.data.len()); + // let overlap_start = s.data.len() - overlap_size; + // let overlap_left = &s.data[overlap_start..]; + // if overlap_left == &segment.data[..overlap_size] { + // info!( + // "TCP Segment with left overlap: area matches idx={}", + // segment.pcap_index + // ); + // trace!("Left overlap: removing {} bytes", overlap_size); + // // remove overlapping area and fix offset + // let new_data = segment.data.split_off(overlap_size); + // segment.data = new_data; + // segment.rel_seq += Wrapping(overlap_size as u32); + // } else { + // warn!( + // "TCP Segment with left overlap: area differs idx={}", + // segment.pcap_index + // ); + // // XXX keep new ? + // } + } + } + } + // check for right overlap + if let Some(s) = after { + let right_next_seq = segment.rel_seq + Wrapping(segment.data.len() as u32); + match right_next_seq.cmp(&s.rel_seq) { + Ordering::Equal => (), + Ordering::Greater => { + // Right overlap + warn!("Segment with right overlap"); + // let overlap_size = (right_next_seq - s.rel_seq).0 as usize; + // debug_assert!(overlap_size <= s.data.len()); + // let overlap_start = segment.data.len() - overlap_size; + // let overlap = &segment.data[overlap_start..]; + // let right_overlap = &s.data[..overlap_size]; + // if overlap == right_overlap { + // info!( + // "TCP Segment with right overlap: area matches idx={}", + // segment.pcap_index + // ); + // trace!("Right overlap: removing {} bytes", overlap_size); + // segment.data.truncate(overlap_start); + // } else { + // warn!( + // "TCP Segment with right overlap: area differs idx={}", + // segment.pcap_index + // ); + // // XXX keep new ? + // } + } + Ordering::Less => { + trace!( + "hole remaining on right of incoming segment idx={}", + segment.pcap_index + ); + } + } + } + // if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 { + // trace!("No data after overlap, NOT queuing segment"); + // return; + // } + } + trace!("Adding segment"); + peer.insert_sorted(segment); +} + +fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<TcpSegment>> { + trace!( + "Trying to send segments for {}:{} up to {} (last ack: {})", + peer.addr, + peer.port, + rel_ack, + peer.last_rel_ack + ); + if rel_ack == peer.last_rel_ack { + trace!("re-acking last data, doing nothing"); + return None; + } + if peer.segments.is_empty() { + return None; + } + + // is ACK acceptable? + if rel_ack < peer.last_rel_ack { + warn!("ACK request for already ACKed data (ack < last_ack)"); + return None; + } + + // check consistency of segment ACK numbers + order and/or missing fragments and/or overlap + + let mut acked = Vec::new(); + + while !peer.segments.is_empty() { + let segment = &peer.segments[0]; + trace!( + "segment: rel_seq={} len={}", + segment.rel_seq, + segment.data.len() + ); + trace!( + " origin.next_rel_seq {} ack {}", + peer.next_rel_seq, + rel_ack + ); + // if origin.next_rel_seq > rel_ack { + // warn!("next_seq > ack - partial ACK ?"); + // unreachable!(); // XXX do we care about that case? + // // break; + // } + if rel_ack <= segment.rel_seq { + // if packet is in the past (strictly less), we don't care + break; + } + + // safety: segments is just tested above + let mut segment = peer.segments.pop_front().unwrap(); + + if rel_ack < segment.rel_seq + Wrapping(segment.data.len() as u32) { + // warn!("ACK lower then seq + segment size - SACK?"); + trace!("ACK for part of buffer"); + // split data and insert new dummy segment + trace!("rel_ack {} segment.rel_seq {}", rel_ack, segment.rel_seq); + trace!("segment data len {}", segment.data.len()); + let acked_len = (rel_ack - segment.rel_seq).0 as usize; + let new_segment = segment.split_off(acked_len); + trace!( + "insert new segment from {} len {}", + new_segment.rel_ack, + new_segment.data.len() + ); + peer.insert_sorted(new_segment); + } + + handle_overlap_linux(peer, &mut segment); + adjust_seq_numbers(peer, &segment); + + trace!( + "ACKed: pushing segment: rel_seq={} len={}", + segment.rel_seq, + segment.data.len(), + ); + if !segment.data.is_empty() { + acked.push(segment); + } + } + + if peer.next_rel_seq != rel_ack { + // missed segments, or maybe received FIN ? + warn!( + "TCP ACKed unseen segment next_seq {} != ack {} (Missed segments?)", + peer.next_rel_seq, rel_ack + ); + // TODO notify upper layer for missing data + } + + peer.last_rel_ack = rel_ack; + Some(acked) +} + +const FIRST_WINS: bool = false; + +// implements the "first segment wins" or the "last segment wins" policies +#[allow(dead_code)] +fn handle_overlap_first_last(peer: &mut TcpPeer, segment: &mut TcpSegment) { + // loop while segment has overlap + while let Some(next) = peer.segments.front() { + if let Some(overlap_offset) = segment.overlap_offset(next) { + let next_pcap_index = next.pcap_index; + warn!( + "segments overlaps next candidate (offset={})", + overlap_offset + ); + trace!("segment idx={}", segment.pcap_index); + // split segment at overlapping_offset + let mut segment_right = segment.split_off(overlap_offset); + let overlap_size; + // segment right can be greater, equal or smaller to next + match segment_right.data.len().cmp(&next.data.len()) { + Ordering::Less => { + // right_segment is smaller than next + overlap_size = segment_right.data.len(); + if segment_right.data[..] != next.data[..overlap_size] { + warn!( + "TCP overlapping data differ in packets idx={} and idx={}", + segment_right.pcap_index, next_pcap_index + ); + } + let first = peer.segments.front_mut().unwrap(); + let front_right = first.split_off(overlap_size); + trace!("front_right idx={}", front_right.pcap_index); + trace!("re-inserting remaining data (next)"); + peer.insert_sorted(front_right); + } + Ordering::Equal => { + if segment_right.data[..] != next.data[..] { + warn!( + "TCP overlapping data differ in packets idx={} and idx={}", + segment_right.pcap_index, next_pcap_index + ); + } + } + Ordering::Greater => { + // right_segment is longer than next + overlap_size = next.data.len(); + if segment_right.data[..overlap_size] != next.data[..] { + warn!( + "TCP overlapping data differ in packets idx={} and idx={}", + segment_right.pcap_index, next_pcap_index + ); + } + let rem = segment_right.split_off(overlap_size); + trace!("re-inserting remaining data (first)"); + peer.insert_sorted(rem); + } + } + // which part to keep ? segment_right or next ? + // trace!("FIRST_WINS: {}, l:{} r:{}", FIRST_WINS, segment.pcap_index, next_pcap_index); + // trace!("(before)\n{:?}", peer); + if FIRST_WINS ^ (segment.pcap_index > next_pcap_index) { + trace!("dropping next"); + let _ = peer.segments.pop_front(); + peer.insert_sorted(segment_right); + } else { + trace!("dropping first"); + drop(segment_right); + } + // trace!("(after)\n{:?}", peer); + } else { + break; + } + } +} + +// handle overlapping segments, using a linux-like policy +// Linux favors an original segment, EXCEPT when the subsequent begins before the original, +//or the subsequent segment begins the same and ends after the original segment. +#[allow(dead_code)] +fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) { + // loop while segment has overlap + while let Some(next) = peer.segments.front() { + if let Some(overlap_offset) = segment.overlap_offset(next) { + warn!( + "segment idx={} overlaps next candidate idx={} (at offset={})", + segment.pcap_index, next.pcap_index, overlap_offset + ); + // we will modify the subsequent segment (next) + // safety: element presence was tested in outer loop + let next = peer.segments.pop_front().unwrap(); + + // split next + let overlap_size = segment.data.len() - overlap_offset; + let min_overlap_size = std::cmp::min(overlap_size, next.data.len()); + // compare overlap area + if next.data[..min_overlap_size] + != segment.data[overlap_offset..overlap_offset + min_overlap_size] + { + warn!( + "Overlap area differs! left idx={} right idx={}", + segment.pcap_index, next.pcap_index + ); + } + if overlap_size >= next.data.len() { + // subsequent segment starts after and is smaller, so drop it + drop(next); + continue; + } + // otherwise, split next into left and right, drop left and accept right + let mut left = next; + let right = left.split_off(overlap_size); + // to accept right, merge it into segment + segment.data.extend_from_slice(&right.data); + } else { + // trace!("no overlap, break"); + break; + } + } +} + +fn adjust_seq_numbers(origin: &mut TcpPeer, segment: &TcpSegment) { + if !segment.data.is_empty() { + // adding length is wrong in case of overlap + // origin.next_rel_seq += Wrapping(segment.data.len() as u32); + origin.next_rel_seq = segment.rel_seq + Wrapping(segment.data.len() as u32); + } + + if segment.flags & TcpFlags::FIN != 0 { + // trace!("Segment has FIN"); + origin.next_rel_seq += Wrapping(1); + } +} + +impl TcpStreamReassembly { + pub(crate) fn update( + &mut self, + flow: &Flow, + tcp: &TcpPacket, + to_server: bool, + pcap_index: usize, + ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> { + trace!("5-t: {}", flow.five_tuple); + trace!(" flow id: {:x}", flow.flow_id); + trace!( + " seq: {:x} ack {:x}", + tcp.get_sequence(), + tcp.get_acknowledgement() + ); + + let mut stream = self + .m + .entry(flow.flow_id) + .or_insert_with(|| TcpStream::new(flow)); + trace!("stream state: {:?}", stream.status); + trace!("to_server: {}", to_server); + + // check time delay with previous packet before updating + if stream.last_seen_ts > flow.last_seen { + info!("packet received in past of stream idx={}", pcap_index); + } else if flow.last_seen - stream.last_seen_ts > self.timeout { + warn!("TCP stream received packet after timeout"); + stream.expire(); + return Err(TcpStreamError::Expired); + } + stream.last_seen_ts = flow.last_seen; + + let (origin, _destination) = if to_server { + (&stream.client, &stream.server) + } else { + (&stream.server, &stream.client) + }; + + trace!( + "origin: {}:{} status {:?}", + origin.addr, + origin.port, + origin.status + ); + debug_print_tcp_flags(tcp.get_flags()); + + match origin.status { + TcpStatus::Closed | TcpStatus::Listen | TcpStatus::SynSent | TcpStatus::SynRcv => { + stream.handle_new_connection(tcp, to_server, pcap_index) + } + TcpStatus::Established => { + // check for close request + if tcp.get_flags() & (TcpFlags::FIN | TcpFlags::RST) != 0 { + trace!("Requesting end of connection"); + Ok(stream.handle_closing_connection(tcp, to_server, pcap_index)) + } else { + stream.handle_established_connection(tcp, to_server, pcap_index) + } + } + _ => Ok(stream.handle_closing_connection(tcp, to_server, pcap_index)), + } + } + pub(crate) fn check_expired_connections(&mut self, now: Duration) { + for (flow_id, stream) in self.m.iter_mut() { + if now < stream.last_seen_ts { + warn!( + "stream.last_seen_ts is in the future for flow id {:x}", + flow_id + ); + continue; + } + if now - stream.last_seen_ts > self.timeout { + warn!("TCP stream timeout reached for flow {:x}", flow_id); + stream.expire(); + } + } + } +} + +pub(crate) fn finalize_tcp_streams(analyzer: &mut crate::analyzer::Analyzer) { + warn!("expiring all TCP connections"); + for (flow_id, _stream) in analyzer.tcp_defrag.m.iter() { + // TODO do we have anything to do? + if let Some(flow) = analyzer.flows.get_flow(*flow_id) { + debug!(" flow: {:?}", flow); + } + } + analyzer.tcp_defrag.m.clear(); +} + +fn debug_print_tcp_flags(tcp_flags: u16) { + if log::Level::Debug <= log::STATIC_MAX_LEVEL { + let mut s = String::from("tcp_flags: ["); + if tcp_flags & TcpFlags::SYN != 0 { + s += "S" + } + if tcp_flags & TcpFlags::FIN != 0 { + s += "F" + } + if tcp_flags & TcpFlags::RST != 0 { + s += "R" + } + if tcp_flags & TcpFlags::URG != 0 { + s += "U" + } + if tcp_flags & TcpFlags::PSH != 0 { + s += "P" + } + if tcp_flags & TcpFlags::ACK != 0 { + s += "A" + } + s += "]"; + trace!("{}", s); + } +} + +impl fmt::Debug for TcpPeer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "Peer: {}:{}", self.addr, self.port)?; + writeln!(f, " status: {:?}", self.status)?; + writeln!(f, " isn: 0x{:x} ian: 0x{:x}", self.isn, self.ian)?; + writeln!(f, " next_rel_seq: {}", self.next_rel_seq)?; + writeln!(f, " last_rel_ack: {}", self.last_rel_ack)?; + writeln!(f, " #segments: {}", self.segments.len())?; + for (n, s) in self.segments.iter().enumerate() { + writeln!( + f, + " s[{}]: rel_seq={} len={} idx={}", + n, + s.rel_seq, + s.data.len(), + s.pcap_index, + )?; + } + Ok(()) + } +} diff --git a/src/session/tcp_stream.rs b/src/session/tcp_stream.rs deleted file mode 100644 index 653d3b4..0000000 --- a/src/session/tcp_stream.rs +++ /dev/null @@ -1,3 +0,0 @@ - - -// tcpheader to stream, and many checks diff --git a/src/session/timeout.rs b/src/session/timeout.rs deleted file mode 100644 index 3aa4103..0000000 --- a/src/session/timeout.rs +++ /dev/null @@ -1,16 +0,0 @@ -/* ----------------------------------- 方案1 ---------------------------------- */ -// 比较直接的方法,是搞一个定时器,定期监听,但是这样就要上锁,或者原子操作 -// 有一个比较恶心的问题,如果我要让定时器的线程可以删除session,必须把session move 过去,那么一定得对整个session的句柄上锁。 -// 这个方法有一个最大的优点,比起方法3,它更通用,相当于是给所有定时器提供了一个方法。以后估计会有watchdog 之类的 。 -// https://doc.rust-lang.org/std/sync/atomic/ - -/* ----------------------------------- 方案2 ---------------------------------- */ -// 异步处理,session 开启,一直await。但是这么做,就感觉有点像是每个session都起一个concurrent 的感觉,就是tokio等等那套,起一个端口,要是超时自己关了。 -// 如果用worker job 异步模型组织session的话,这种方法是最好的。 -// https://docs.rs/async-io/latest/async_io/ - -/* ----------------------------------- 方案3 ---------------------------------- */ -// 每次包来的时候,都看一下所有session的情况,如果有的话就删掉。当然得有一些处理,比如有一个granularity,至少多长时间才检查一次,比如一次最多检查多少个session,等等。 -// 如果session数小,且session超时不多的话,这种方法是最好的,完全不考虑线程。 -// 参考reass:timeout.h:set_time() - |
