summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2023-09-06 18:04:17 +0800
committerchenzizhan <[email protected]>2023-09-06 18:04:17 +0800
commit17c053f231e41db4e48be8885030b86ce0daee8e (patch)
tree7050b2b2605e8df59c89fbe43323109018db996b
parent43718ed34cf0c182a0a5a0dca7c53c199f67bf49 (diff)
restart with library
-rw-r--r--.vscode/settings.json3
-rw-r--r--src/session/actions.rs90
-rw-r--r--src/session/mod.rs45
-rw-r--r--src/session/my_tcp_reassembly_deprecated.rs (renamed from src/session/tcp_session.rs)51
-rw-r--r--src/session/tcp_reassembly.rs1029
-rw-r--r--src/session/tcp_stream.rs3
-rw-r--r--src/session/timeout.rs16
7 files changed, 1063 insertions, 174 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 352a626..706d289 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -1,5 +1,8 @@
{
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
+ ],
+ "cSpell.ignoreWords": [
+ "Hasher"
]
} \ No newline at end of file
diff --git a/src/session/actions.rs b/src/session/actions.rs
deleted file mode 100644
index 602c9c9..0000000
--- a/src/session/actions.rs
+++ /dev/null
@@ -1,90 +0,0 @@
-// use std::net::{TcpListener, TcpStream};
-use super::tcp_session;
-use enum_map::Enum;
-use std::collections::BTreeMap;
-
-pub enum TcpEvent {
- SessionClosing,
- // todo
-}
-
-impl Enum for TcpEvent{}
-
-pub(super) struct ActionManager {
- // todo: read toml and config.
- event_map: enum_map::EnumMap<TcpEvent, Vec<Box<dyn TcpAction>>>,
-}
-
-impl ActionManager {
- fn register(&self, event: TcpEvent, action: Box<dyn TcpAction>) {
- self.event_map[event].push(action);
- }
- fn run_event(&self, event: TcpEvent, session: &tcp_session::Session) {
- for action in self.event_map[event].iter() {
- action.run(session);
- }
- }
-}
-
-trait TcpAction{
- fn run(&self, session: &tcp_session::Session);
-}
-
-
-/* --------------------------------- SendCB --------------------------------- */
-pub enum SendTo {
- SendConsole,
-}
-
-pub(super) struct SendAction
-{
- send_to: SendTo,
-}
-
-impl SendAction {
- pub fn new(send_to: SendTo) -> Self {
- SendAction {
- send_to,
- }
- }
-}
-
-fn Contat_segment(buf: &mut Vec<u8>, segment: &BTreeMap<u32, tcp_session::TcpSegment>) {
- for (_, segment) in segment.iter() {
- buf.extend_from_slice(&segment.payload); // todo: peer 提供一个dump segment 的方法
- }
-}
-
-impl TcpAction for SendCB {
- fn run(&self, session: &tcp_session::Session) {
- let mut buf: Vec<u8> = Vec::new();
- Contat_segment(&mut buf, &session.client.map);
-
- match self.send_to {
- SendTo::SendConsole => {
- println!("SendCB::run");
- println!("{:?}", buf);
- },
- }
- }
-}
-
-
-// // more call backs here
-
-
-
-// #[cfg(test)]
-// mod tests {
-// use super::*;
-
-// #[test]
-// fn test_callbacks() {
-// let action_manager = ActionManager{};
-// let action = action_manager.gen_action(Policy::TcpSendConsole);
-// action.run();
-
-
-// println!("hahahahaczzzzzzx");
-// }
-// } \ No newline at end of file
diff --git a/src/session/mod.rs b/src/session/mod.rs
deleted file mode 100644
index 148ca75..0000000
--- a/src/session/mod.rs
+++ /dev/null
@@ -1,45 +0,0 @@
-
-
-pub mod tcp_session;
-pub mod actions;
-
-// enum tcp_session_state {
-// TCP_SESSION_STATE_NONE,
-// TCP_SESSION_STATE_CONNECTING,
-// TCP_SESSION_STATE_CONNECTED,
-// TCP_SESSION_STATE_DISCONNECTED,
-// TCP_SESSION_STATE_ERROR,
-// }
-
-// struct Peer_addr {
-// (ip, port): (Ipv4Addr, u16),
-// }
-
-// struct tcp_session<T: tcp_session_state> {
-// source: Peer_addr,
-// destination: Peer_addr,
-
-// window_size: u16,
-// used_window_size: u16,
-
-// phantom: PhantomData<T>,
-// }
-
-// impl tcp_session<tcp_session_state::TCP_SESSION_STATE_NONE> {
-// fn new(source: Peer_addr, destination: Peer_addr) -> Self {
-// tcp_session {
-// source,
-// destination,
-// window_size: 0,
-// used_window_size: 0,
-// phantom: PhantomData,
-// }
-// }
-// }
-
-// struct session_manager {
-// sessions: Vec<tcp_session>, // use tree map
-// }
-
-// // todo: impl hash for session
-
diff --git a/src/session/tcp_session.rs b/src/session/my_tcp_reassembly_deprecated.rs
index b3e84fc..a366142 100644
--- a/src/session/tcp_session.rs
+++ b/src/session/my_tcp_reassembly_deprecated.rs
@@ -29,6 +29,7 @@ enum TcpSessionErr {
WrongFlag(String),
NoWindowSpace,
WrongSender(PeerRole),
+ SameSeqNum,
FurtherCheckClosing,
}
@@ -48,7 +49,7 @@ impl TcpSegment {
}
}
-struct Packet {
+pub(super) struct Packet {
ip_dst: Ipv4Addr,
ip_src: Ipv4Addr,
tcp_header: TcpHeader,
@@ -56,6 +57,14 @@ struct Packet {
}
impl Packet {
+ pub fn new(ip_dst: Ipv4Addr, ip_src: Ipv4Addr, tcp_header: TcpHeader, segment: &[u8]) -> Packet {
+ Packet {
+ ip_dst: ip_dst,
+ ip_src: ip_src,
+ tcp_header: tcp_header,
+ segment: segment,
+ }
+ }
fn is_sent_by(&self, peer: &Peer) -> bool {
self.ip_src == peer.ip && self.tcp_header.source_port == peer.port
}
@@ -73,7 +82,7 @@ pub(super) struct Peer {
isn: u32, // initial sequence number
next_seq: u32, // next sequence number
- total_win: u16, // window scale in tcp options
+ total_win: u16, // window size after scale
used_win: u16, // used window size
ip: Ipv4Addr,
port: u16,
@@ -124,12 +133,16 @@ impl Peer {
fn as_receiver(&mut self, packet: &Packet) {
self.ian = packet.tcp_header.seq_num;
}
- fn add_segment(&mut self, packet: &Packet) {
+ fn add_segment(&mut self, packet: &Packet) -> Result<(), TcpSessionErr>{
+ if let Some(_) = self.map.get(&packet.tcp_header.seq_num) {
+ return Err(TcpSessionErr::SameSeqNum);
+ }
self.map.insert(packet.tcp_header.seq_num,
TcpSegment::new(packet.tcp_header.seq_num, packet.segment.payload.clone())
);
self.used_win += packet.segment.payload.len() as u16;
+ Ok(())
}
}
@@ -163,11 +176,11 @@ enum TransferringState {
fn send_for_establised(sender: &mut Peer, packet: &Packet) -> Result<(), TcpSessionErr> {
let payload_len = packet.segment.payload.len() as u16;
- if sender.used_win + payload_len >= sender.total_win {
+ if sender.used_win + payload_len > sender.total_win {
return Err(TcpSessionErr::NoWindowSpace);
}
if payload_len > 0 {
- sender.add_segment(packet);
+ sender.add_segment(packet)?;
}
sender.next_seq += {
if packet.tcp_header.flag_fin || packet.tcp_header.flag_rst {
@@ -394,6 +407,13 @@ impl Session {
if packet.is_sent_by(&self.server) {
sender = PeerRole::Server;
}
+ let (send_peer, receive_peer) = {
+ if sender == PeerRole::Client {
+ (&mut self.client, &mut self.server)
+ } else {
+ (&mut self.server, &mut self.client)
+ }
+ };
if let ExpectedSender::Fixed(role) = self.get_expected_sender(packet) {
if role != sender {
@@ -404,15 +424,8 @@ impl Session {
match ret {
Err(TcpSessionErr::FurtherCheckClosing) => {
println!("Further check closing");
- let side_peer = {
- if sender == PeerRole::Client {
- &mut self.client
- } else {
- &mut self.server
- }
- };
-
- if side_peer.state == SessionState::TimeWait { // receiver has been in timewait state, not an error
+
+ if receive_peer.state == SessionState::TimeWait { // receiver has been in timewait state, not an error
return None;
} else {
let e = TcpSessionErr::WrongFlag("A peer in closing state expects ack".to_string());
@@ -429,18 +442,16 @@ impl Session {
}
self.current_state = next_state;
+ let (sender_state, receiver_state) = self.current_state.get_state();
+ send_peer.state = sender_state;
+ receive_peer.state = receiver_state;
+
return event;
}
}
}
}
-trait VisitSession:TcpCallBackAction {
- fn visit_session(&mut self, session: &Session);
-}
-
-impl VisitSession for
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/session/tcp_reassembly.rs b/src/session/tcp_reassembly.rs
new file mode 100644
index 0000000..ff19e40
--- /dev/null
+++ b/src/session/tcp_reassembly.rs
@@ -0,0 +1,1029 @@
+use libpcap_tools::{Duration, Flow, FlowID};
+use pnet_macros_support::packet::Packet as PnetPacket;
+use pnet_packet::tcp::{TcpFlags, TcpPacket};
+use std::cmp::Ordering;
+use std::collections::{HashMap, VecDeque};
+use std::fmt;
+use std::net::IpAddr;
+use std::num::Wrapping;
+
+const EARLY_DETECT_OVERLAP: bool = false;
+
+#[derive(Debug, Eq, PartialEq)]
+#[allow(dead_code)]
+pub enum TcpStatus {
+ Closed = 0,
+ Listen,
+ SynSent,
+ SynRcv,
+ Established,
+ Closing,
+ CloseWait,
+ FinWait1,
+ FinWait2,
+ LastAck,
+ TimeWait,
+}
+
+impl Default for TcpStatus {
+ fn default() -> Self {
+ TcpStatus::Closed
+ }
+}
+
+#[derive(Debug)]
+pub struct TcpSegment {
+ pub rel_seq: Wrapping<u32>,
+ pub rel_ack: Wrapping<u32>,
+ pub flags: u16,
+ pub data: Vec<u8>,
+ pub pcap_index: usize,
+}
+
+impl TcpSegment {
+ /// Return the offset of the overlapping area if `self` (as left) overlaps on `right`
+ pub fn overlap_offset(&self, right: &TcpSegment) -> Option<usize> {
+ let next_seq = self.rel_seq + Wrapping(self.data.len() as u32);
+ if next_seq > right.rel_seq {
+ let overlap_offset = (right.rel_seq - self.rel_seq).0 as usize;
+ Some(overlap_offset)
+ } else {
+ None
+ }
+ }
+
+ /// Splits the segment into two at the given offset.
+ ///
+ /// # Panics
+ ///
+ /// Panics if `offset > self.data.len()`
+ pub fn split_off(&mut self, offset: usize) -> TcpSegment {
+ debug_assert!(offset < self.data.len());
+ let remaining = self.data.split_off(offset);
+ let rel_seq = self.rel_seq + Wrapping(offset as u32);
+ TcpSegment {
+ data: remaining,
+ rel_seq,
+ ..*self
+ }
+ }
+}
+
+pub struct TcpPeer {
+ /// Initial Seq number (absolute)
+ isn: Wrapping<u32>,
+ /// Initial Ack number (absolute)
+ ian: Wrapping<u32>,
+ /// Next Seq number
+ next_rel_seq: Wrapping<u32>,
+ /// Last acknowledged number
+ last_rel_ack: Wrapping<u32>,
+ /// Connection state
+ status: TcpStatus,
+ /// The current list of segments (ordered by rel_seq)
+ segments: VecDeque<TcpSegment>,
+ /// DEBUG: host address
+ addr: IpAddr,
+ /// DEBUG: port
+ port: u16,
+}
+
+impl TcpPeer {
+ fn insert_sorted(&mut self, s: TcpSegment) {
+ for (n, item) in self.segments.iter().enumerate() {
+ if item.rel_seq > s.rel_seq {
+ self.segments.insert(n, s);
+ return;
+ }
+ }
+ self.segments.push_back(s);
+ }
+}
+
+pub struct TcpStream {
+ pub client: TcpPeer,
+ pub server: TcpPeer,
+ pub status: TcpStatus,
+ // XXX timestamp of last seen packet
+ pub last_seen_ts: Duration,
+}
+
+pub struct TcpStreamReassembly {
+ pub m: HashMap<FlowID, TcpStream>,
+
+ pub timeout: Duration,
+}
+
+impl Default for TcpStreamReassembly {
+ fn default() -> Self {
+ TcpStreamReassembly {
+ m: HashMap::new(),
+ timeout: Duration::new(14400, 0),
+ }
+ }
+}
+
+#[derive(Debug, Eq, PartialEq)]
+pub enum TcpStreamError {
+ Anomaly,
+ /// Connection is OK, but sides are inverted
+ Inverted,
+ /// Packet received but connection has expired
+ Expired,
+ HandshakeFailed,
+}
+
+impl TcpPeer {
+ pub fn new(addr: &IpAddr, port: u16) -> Self {
+ TcpPeer {
+ isn: Wrapping(0),
+ ian: Wrapping(0),
+ next_rel_seq: Wrapping(0),
+ last_rel_ack: Wrapping(0),
+ status: TcpStatus::Closed,
+ segments: VecDeque::new(),
+ addr: *addr,
+ port,
+ }
+ }
+}
+
+impl TcpStream {
+ pub fn new(flow: &Flow) -> Self {
+ TcpStream {
+ client: TcpPeer::new(&flow.five_tuple.src, flow.five_tuple.src_port),
+ server: TcpPeer::new(&flow.five_tuple.dst, flow.five_tuple.dst_port),
+ status: TcpStatus::Closed,
+ last_seen_ts: flow.last_seen,
+ }
+ }
+
+ pub fn handle_new_connection<'a>(
+ &mut self,
+ tcp: &'a TcpPacket,
+ to_server: bool,
+ pcap_index: usize,
+ ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
+ let seq = Wrapping(tcp.get_sequence());
+ let ack = Wrapping(tcp.get_acknowledgement());
+ let tcp_flags = tcp.get_flags();
+
+ let (mut src, mut dst) = if to_server {
+ (&mut self.client, &mut self.server)
+ } else {
+ (&mut self.server, &mut self.client)
+ };
+
+ match src.status {
+ // Client -- SYN --> Server
+ TcpStatus::Closed => {
+ if tcp_flags & TcpFlags::RST != 0 {
+ // TODO check if destination.segments must be removed
+ // client sent a RST, this is expected
+ return Ok(None);
+ }
+ if tcp_flags & TcpFlags::SYN == 0 {
+ // not a SYN - usually happens at start of pcap if missed SYN
+ warn!("First packet of a TCP stream is not a SYN");
+ // test is ACK + data, and set established if possible
+ if tcp_flags & TcpFlags::ACK != 0 {
+ trace!("Trying to catch connection on the fly");
+ src.isn = seq;
+ src.ian = ack;
+ src.next_rel_seq = Wrapping(0);
+ src.status = TcpStatus::Established;
+ dst.isn = ack;
+ dst.ian = seq;
+ dst.status = TcpStatus::Established;
+ dst.last_rel_ack = Wrapping(0);
+ self.status = TcpStatus::Established;
+ // queue segment (even if FIN, to get correct seq numbers)
+ let segment = TcpSegment {
+ rel_seq: Wrapping(0),
+ rel_ack: Wrapping(0),
+ flags: tcp_flags,
+ data: tcp.payload().to_vec(), // XXX data cloned here
+ pcap_index,
+ };
+ queue_segment(src, segment);
+
+ return Ok(None);
+ }
+ return Err(TcpStreamError::Anomaly);
+ }
+ if tcp_flags & TcpFlags::ACK != 0 {
+ warn!("First packet is SYN+ACK - missed SYN?");
+ dst.isn = ack - Wrapping(1);
+ dst.status = TcpStatus::SynSent;
+ dst.next_rel_seq = Wrapping(1);
+ src.isn = seq;
+ src.ian = ack;
+ src.last_rel_ack = Wrapping(1);
+ src.next_rel_seq = Wrapping(1);
+ src.status = TcpStatus::Listen;
+ // swap sides and tell analyzer to do the same for flow
+ std::mem::swap(&mut self.client, &mut self.server);
+ return Err(TcpStreamError::Inverted);
+ }
+ src.isn = seq;
+ src.next_rel_seq = Wrapping(1);
+ dst.ian = seq;
+ self.status = TcpStatus::SynSent;
+ src.status = TcpStatus::SynSent;
+ dst.status = TcpStatus::Listen;
+ // do we have data ?
+ if !tcp.payload().is_empty() {
+ warn!("Data in handshake SYN");
+ // conn.next_rel_seq += Wrapping(tcp.payload().len() as u32);
+ let segment = TcpSegment {
+ rel_seq: seq - src.isn,
+ rel_ack: ack - dst.isn,
+ flags: tcp_flags,
+ data: tcp.payload().to_vec(), // XXX data cloned here
+ pcap_index,
+ };
+ queue_segment(src, segment);
+ }
+ }
+ // Server -- SYN+ACK --> Client
+ TcpStatus::Listen => {
+ if tcp_flags != (TcpFlags::SYN | TcpFlags::ACK) {
+ // XXX ?
+ }
+ // if we had data in SYN, add its length
+ let next_rel_seq = if dst.segments.is_empty() {
+ Wrapping(1)
+ } else {
+ Wrapping(1) + Wrapping(dst.segments[0].data.len() as u32)
+ };
+ if ack != dst.isn + next_rel_seq {
+ warn!("NEW/SYN-ACK: ack number is wrong");
+ return Err(TcpStreamError::HandshakeFailed);
+ }
+ src.isn = seq;
+ src.next_rel_seq = Wrapping(1);
+ dst.ian = seq;
+ dst.last_rel_ack = Wrapping(1);
+
+ src.status = TcpStatus::SynRcv;
+ self.status = TcpStatus::SynRcv;
+
+ // do not push data if we had some in SYN, it will be done after handshake succeeds
+ }
+ // Client -- ACK --> Server
+ TcpStatus::SynSent => {
+ if tcp_flags & TcpFlags::ACK == 0 {
+ if tcp_flags == TcpFlags::SYN {
+ // can be a SYN resend
+ if seq == src.isn && ack.0 == 0 {
+ trace!("SYN resend - ignoring");
+ return Ok(None);
+ }
+ // can be a disordered handshake (receive S after SA)
+ if seq + Wrapping(1) == dst.ian {
+ trace!("Likely received SA before S - ignoring");
+ return Ok(None);
+ }
+ }
+ warn!("Not an ACK");
+ }
+ // TODO check seq, ack
+ if ack != dst.isn + Wrapping(1) {
+ warn!("NEW/ACK: ack number is wrong");
+ return Err(TcpStreamError::HandshakeFailed);
+ }
+ src.status = TcpStatus::Established;
+ dst.status = TcpStatus::Established;
+ dst.last_rel_ack = Wrapping(1);
+ self.status = TcpStatus::Established;
+ // do we have data ?
+ if !tcp.payload().is_empty() {
+ // warn!("Data in handshake ACK");
+ let segment = TcpSegment {
+ rel_seq: seq - src.isn,
+ rel_ack: ack - dst.isn,
+ flags: tcp_flags,
+ data: tcp.payload().to_vec(), // XXX data cloned here
+ pcap_index,
+ };
+ queue_segment(src, segment);
+ }
+ }
+ TcpStatus::SynRcv => {
+ // we received something while in SYN_RCV state - we should only have sent ACK
+ // this could be a SYN+ACK retransmit
+ if tcp_flags == TcpFlags::SYN | TcpFlags::ACK {
+ // XXX compare SEQ numbers?
+ // ignore
+ return Ok(None);
+ }
+ warn!(
+ "Received unexpected data in SYN_RCV state idx={}",
+ pcap_index
+ );
+ }
+ _ => unreachable!(),
+ }
+ Ok(None)
+ }
+
+ pub fn handle_established_connection<'a>(
+ &mut self,
+ tcp: &'a TcpPacket,
+ to_server: bool,
+ pcap_index: usize,
+ ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
+ let (origin, destination) = if to_server {
+ (&mut self.client, &mut self.server)
+ } else {
+ (&mut self.server, &mut self.client)
+ };
+
+ let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn;
+ let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn;
+ let tcp_flags = tcp.get_flags();
+
+ trace!("EST: payload len={}", tcp.payload().len());
+ trace!(
+ " Tcp rel seq {} ack {} next seq {}",
+ rel_seq,
+ rel_ack,
+ origin.next_rel_seq
+ );
+
+ if tcp_flags & TcpFlags::ACK == 0 && tcp.get_acknowledgement() != 0 {
+ warn!(
+ "EST/ packet without ACK (broken TCP implementation or attack) idx={}",
+ pcap_index
+ );
+ // ignore segment
+ return Ok(None);
+ }
+
+ let segment = TcpSegment {
+ rel_seq,
+ rel_ack,
+ flags: tcp_flags,
+ data: tcp.payload().to_vec(), // XXX data cloned here
+ pcap_index,
+ };
+ queue_segment(origin, segment);
+
+ // trace!("Destination: {:?}", destination); // TODO to remove
+
+ // if there is a ACK, check & send segments on the *other* side
+ let ret = if tcp_flags & TcpFlags::ACK != 0 {
+ send_peer_segments(destination, rel_ack)
+ } else {
+ None
+ };
+
+ trace!(
+ " PEER EST rel next seq {} last_ack {}",
+ destination.next_rel_seq,
+ destination.last_rel_ack,
+ );
+
+ Ok(ret)
+ }
+
+ fn handle_closing_connection(
+ &mut self,
+ tcp: &TcpPacket,
+ to_server: bool,
+ pcap_index: usize,
+ ) -> Option<Vec<TcpSegment>> {
+ let (mut origin, destination) = if to_server {
+ (&mut self.client, &mut self.server)
+ } else {
+ (&mut self.server, &mut self.client)
+ };
+
+ let tcp_flags = tcp.get_flags();
+ let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn;
+ let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn;
+ let has_ack = tcp_flags & TcpFlags::ACK != 0;
+ let has_fin = tcp_flags & TcpFlags::FIN != 0;
+
+ let ret = if has_ack {
+ trace!("ACKing segments up to {}", rel_ack);
+ send_peer_segments(destination, rel_ack)
+ } else {
+ if tcp.get_acknowledgement() != 0 {
+ warn!(
+ "EST/ packet without ACK (broken TCP implementation or attack) idx={}",
+ pcap_index
+ );
+ // ignore segment
+ return None;
+ }
+ None
+ };
+ if tcp_flags & TcpFlags::RST != 0 {
+ // if we get a RST, check the sequence number and remove matching segments
+ // trace!("RST received. rel_seq: {}", rel_seq);
+ // trace!(
+ // "{} remaining (undelivered) segments DESTINATION",
+ // destination.segments.len()
+ // );
+ // for (n, s) in destination.segments.iter().enumerate() {
+ // trace!(" s[{}]: rel_seq={} plen={}", n, s.rel_seq, s.data.len());
+ // }
+ // remove queued segments up to rel_seq
+ destination.segments.retain(|s| s.rel_ack != rel_seq);
+ trace!(
+ "RST: {} remaining (undelivered) segments DESTINATION after removal",
+ destination.segments.len()
+ );
+ origin.status = TcpStatus::Closed; // XXX except if ACK ?
+ return ret;
+ }
+
+ // queue segment (even if FIN, to get correct seq numbers)
+ let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn;
+ let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn;
+ let segment = TcpSegment {
+ rel_seq,
+ rel_ack,
+ flags: tcp_flags,
+ data: tcp.payload().to_vec(), // XXX data cloned here
+ pcap_index,
+ };
+ queue_segment(origin, segment);
+
+ // if tcp_flags & TcpFlags::FIN != 0 {
+ // warn!("origin next seq was {}", origin.next_rel_seq.0);
+ // origin.next_rel_seq += Wrapping(1);
+ // }
+
+ match origin.status {
+ TcpStatus::Established => {
+ // we know there is a FIN (tested in TcpStreamReassembly::update)
+ origin.status = TcpStatus::FinWait1;
+ destination.status = TcpStatus::CloseWait; // we are not sure it was received
+ }
+ TcpStatus::CloseWait => {
+ if !has_fin {
+ // if only an ACK, do nothing and stay in CloseWait status
+ if has_ack {
+ // debug!("destination status: {:?}", destination.status);
+ if destination.status == TcpStatus::FinWait1 {
+ destination.status = TcpStatus::FinWait2;
+ }
+ } else {
+ warn!("Origin should have sent a FIN and/or ACK");
+ }
+ } else {
+ origin.status = TcpStatus::LastAck;
+ // debug!("destination status: {:?}", destination.status);
+ if has_ack || destination.status == TcpStatus::FinWait2 {
+ destination.status = TcpStatus::TimeWait;
+ } else {
+ destination.status = TcpStatus::Closing;
+ }
+ }
+ }
+ TcpStatus::TimeWait => {
+ // only an ACK should be sent (XXX nothing else, maybe PSH)
+ if has_ack {
+ // this is the end!
+ origin.status = TcpStatus::Closed;
+ destination.status = TcpStatus::Closed;
+ }
+ }
+ _ => {
+ warn!(
+ "Unhandled closing transition: origin host {} status {:?}",
+ origin.addr, origin.status
+ );
+ warn!(
+ " dest host {} status {:?}",
+ destination.addr, destination.status
+ );
+ }
+ }
+
+ trace!(
+ "TCP connection closing, {} remaining (undelivered) segments",
+ origin.segments.len()
+ );
+ // DEBUG
+ for (n, s) in origin.segments.iter().enumerate() {
+ trace!(
+ " s[{}]: seq={} len={} idx={}",
+ n,
+ s.rel_seq.0,
+ s.data.len(),
+ s.pcap_index,
+ );
+ }
+
+ // TODO what now?
+
+ if origin.segments.is_empty() {
+ return ret;
+ }
+
+ ret
+ }
+
+ // force expiration (for ex after timeout) of this stream
+ fn expire(&mut self) {
+ self.client.status = TcpStatus::Closed;
+ self.server.status = TcpStatus::Closed;
+ }
+} // TcpStream
+
+fn queue_segment(peer: &mut TcpPeer, segment: TcpSegment) {
+ // only store segments with data, except FIN
+ if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 {
+ return;
+ }
+ // // DEBUG
+ // for (n, s) in peer.segments.iter().enumerate() {
+ // debug!(
+ // " XXX peer s[{}]: rel_seq={} plen={}",
+ // n,
+ // s.rel_seq,
+ // s.data.len()
+ // );
+ // }
+ // trivial case: list is empty - just push segment
+ if peer.segments.is_empty() {
+ trace!("Pushing segment (front)");
+ peer.segments.push_front(segment);
+ return;
+ }
+
+ if EARLY_DETECT_OVERLAP {
+ // find last element before candidate and first element after candidate
+ let mut before = None;
+ let mut after = None;
+ // let mut opt_pos = None;
+ for (_n, s) in peer.segments.iter().enumerate() {
+ if s.rel_seq < segment.rel_seq {
+ before = Some(s);
+ } else {
+ after = Some(s);
+ // opt_pos = Some(n);
+ break;
+ }
+ }
+ // trace!("tcp segment insertion index: {:?}", opt_pos);
+ // check for left overlap
+ if let Some(s) = before {
+ let next_seq = s.rel_seq + Wrapping(s.data.len() as u32);
+ match segment.rel_seq.cmp(&next_seq) {
+ Ordering::Equal => {
+ // XXX do nothing, simply queue segment
+ // // simple case: merge segment
+ // trace!(
+ // "Merging segments (seq {} and {})",
+ // s.rel_seq,
+ // segment.rel_seq
+ // );
+ // s.data.extend_from_slice(&segment.data);
+ // s.rel_ack = segment.rel_ack;
+ // // XXX pcap_index should be a list (and append to it)
+ // // TODO check next segment in queue to test if a hole was filled
+ // return;
+ }
+ Ordering::Greater => {
+ // we have a hole
+ warn!("Missing segment on left of incoming segment");
+ }
+ Ordering::Less => {
+ // Left overlap
+ warn!("Segment with left overlap");
+ // let overlap_size = (next_seq - segment.rel_seq).0 as usize;
+ // debug_assert!(overlap_size <= s.data.len());
+ // let overlap_start = s.data.len() - overlap_size;
+ // let overlap_left = &s.data[overlap_start..];
+ // if overlap_left == &segment.data[..overlap_size] {
+ // info!(
+ // "TCP Segment with left overlap: area matches idx={}",
+ // segment.pcap_index
+ // );
+ // trace!("Left overlap: removing {} bytes", overlap_size);
+ // // remove overlapping area and fix offset
+ // let new_data = segment.data.split_off(overlap_size);
+ // segment.data = new_data;
+ // segment.rel_seq += Wrapping(overlap_size as u32);
+ // } else {
+ // warn!(
+ // "TCP Segment with left overlap: area differs idx={}",
+ // segment.pcap_index
+ // );
+ // // XXX keep new ?
+ // }
+ }
+ }
+ }
+ // check for right overlap
+ if let Some(s) = after {
+ let right_next_seq = segment.rel_seq + Wrapping(segment.data.len() as u32);
+ match right_next_seq.cmp(&s.rel_seq) {
+ Ordering::Equal => (),
+ Ordering::Greater => {
+ // Right overlap
+ warn!("Segment with right overlap");
+ // let overlap_size = (right_next_seq - s.rel_seq).0 as usize;
+ // debug_assert!(overlap_size <= s.data.len());
+ // let overlap_start = segment.data.len() - overlap_size;
+ // let overlap = &segment.data[overlap_start..];
+ // let right_overlap = &s.data[..overlap_size];
+ // if overlap == right_overlap {
+ // info!(
+ // "TCP Segment with right overlap: area matches idx={}",
+ // segment.pcap_index
+ // );
+ // trace!("Right overlap: removing {} bytes", overlap_size);
+ // segment.data.truncate(overlap_start);
+ // } else {
+ // warn!(
+ // "TCP Segment with right overlap: area differs idx={}",
+ // segment.pcap_index
+ // );
+ // // XXX keep new ?
+ // }
+ }
+ Ordering::Less => {
+ trace!(
+ "hole remaining on right of incoming segment idx={}",
+ segment.pcap_index
+ );
+ }
+ }
+ }
+ // if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 {
+ // trace!("No data after overlap, NOT queuing segment");
+ // return;
+ // }
+ }
+ trace!("Adding segment");
+ peer.insert_sorted(segment);
+}
+
+fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<TcpSegment>> {
+ trace!(
+ "Trying to send segments for {}:{} up to {} (last ack: {})",
+ peer.addr,
+ peer.port,
+ rel_ack,
+ peer.last_rel_ack
+ );
+ if rel_ack == peer.last_rel_ack {
+ trace!("re-acking last data, doing nothing");
+ return None;
+ }
+ if peer.segments.is_empty() {
+ return None;
+ }
+
+ // is ACK acceptable?
+ if rel_ack < peer.last_rel_ack {
+ warn!("ACK request for already ACKed data (ack < last_ack)");
+ return None;
+ }
+
+ // check consistency of segment ACK numbers + order and/or missing fragments and/or overlap
+
+ let mut acked = Vec::new();
+
+ while !peer.segments.is_empty() {
+ let segment = &peer.segments[0];
+ trace!(
+ "segment: rel_seq={} len={}",
+ segment.rel_seq,
+ segment.data.len()
+ );
+ trace!(
+ " origin.next_rel_seq {} ack {}",
+ peer.next_rel_seq,
+ rel_ack
+ );
+ // if origin.next_rel_seq > rel_ack {
+ // warn!("next_seq > ack - partial ACK ?");
+ // unreachable!(); // XXX do we care about that case?
+ // // break;
+ // }
+ if rel_ack <= segment.rel_seq {
+ // if packet is in the past (strictly less), we don't care
+ break;
+ }
+
+ // safety: segments is just tested above
+ let mut segment = peer.segments.pop_front().unwrap();
+
+ if rel_ack < segment.rel_seq + Wrapping(segment.data.len() as u32) {
+ // warn!("ACK lower then seq + segment size - SACK?");
+ trace!("ACK for part of buffer");
+ // split data and insert new dummy segment
+ trace!("rel_ack {} segment.rel_seq {}", rel_ack, segment.rel_seq);
+ trace!("segment data len {}", segment.data.len());
+ let acked_len = (rel_ack - segment.rel_seq).0 as usize;
+ let new_segment = segment.split_off(acked_len);
+ trace!(
+ "insert new segment from {} len {}",
+ new_segment.rel_ack,
+ new_segment.data.len()
+ );
+ peer.insert_sorted(new_segment);
+ }
+
+ handle_overlap_linux(peer, &mut segment);
+ adjust_seq_numbers(peer, &segment);
+
+ trace!(
+ "ACKed: pushing segment: rel_seq={} len={}",
+ segment.rel_seq,
+ segment.data.len(),
+ );
+ if !segment.data.is_empty() {
+ acked.push(segment);
+ }
+ }
+
+ if peer.next_rel_seq != rel_ack {
+ // missed segments, or maybe received FIN ?
+ warn!(
+ "TCP ACKed unseen segment next_seq {} != ack {} (Missed segments?)",
+ peer.next_rel_seq, rel_ack
+ );
+ // TODO notify upper layer for missing data
+ }
+
+ peer.last_rel_ack = rel_ack;
+ Some(acked)
+}
+
+const FIRST_WINS: bool = false;
+
+// implements the "first segment wins" or the "last segment wins" policies
+#[allow(dead_code)]
+fn handle_overlap_first_last(peer: &mut TcpPeer, segment: &mut TcpSegment) {
+ // loop while segment has overlap
+ while let Some(next) = peer.segments.front() {
+ if let Some(overlap_offset) = segment.overlap_offset(next) {
+ let next_pcap_index = next.pcap_index;
+ warn!(
+ "segments overlaps next candidate (offset={})",
+ overlap_offset
+ );
+ trace!("segment idx={}", segment.pcap_index);
+ // split segment at overlapping_offset
+ let mut segment_right = segment.split_off(overlap_offset);
+ let overlap_size;
+ // segment right can be greater, equal or smaller to next
+ match segment_right.data.len().cmp(&next.data.len()) {
+ Ordering::Less => {
+ // right_segment is smaller than next
+ overlap_size = segment_right.data.len();
+ if segment_right.data[..] != next.data[..overlap_size] {
+ warn!(
+ "TCP overlapping data differ in packets idx={} and idx={}",
+ segment_right.pcap_index, next_pcap_index
+ );
+ }
+ let first = peer.segments.front_mut().unwrap();
+ let front_right = first.split_off(overlap_size);
+ trace!("front_right idx={}", front_right.pcap_index);
+ trace!("re-inserting remaining data (next)");
+ peer.insert_sorted(front_right);
+ }
+ Ordering::Equal => {
+ if segment_right.data[..] != next.data[..] {
+ warn!(
+ "TCP overlapping data differ in packets idx={} and idx={}",
+ segment_right.pcap_index, next_pcap_index
+ );
+ }
+ }
+ Ordering::Greater => {
+ // right_segment is longer than next
+ overlap_size = next.data.len();
+ if segment_right.data[..overlap_size] != next.data[..] {
+ warn!(
+ "TCP overlapping data differ in packets idx={} and idx={}",
+ segment_right.pcap_index, next_pcap_index
+ );
+ }
+ let rem = segment_right.split_off(overlap_size);
+ trace!("re-inserting remaining data (first)");
+ peer.insert_sorted(rem);
+ }
+ }
+ // which part to keep ? segment_right or next ?
+ // trace!("FIRST_WINS: {}, l:{} r:{}", FIRST_WINS, segment.pcap_index, next_pcap_index);
+ // trace!("(before)\n{:?}", peer);
+ if FIRST_WINS ^ (segment.pcap_index > next_pcap_index) {
+ trace!("dropping next");
+ let _ = peer.segments.pop_front();
+ peer.insert_sorted(segment_right);
+ } else {
+ trace!("dropping first");
+ drop(segment_right);
+ }
+ // trace!("(after)\n{:?}", peer);
+ } else {
+ break;
+ }
+ }
+}
+
+// handle overlapping segments, using a linux-like policy
+// Linux favors an original segment, EXCEPT when the subsequent begins before the original,
+//or the subsequent segment begins the same and ends after the original segment.
+#[allow(dead_code)]
+fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) {
+ // loop while segment has overlap
+ while let Some(next) = peer.segments.front() {
+ if let Some(overlap_offset) = segment.overlap_offset(next) {
+ warn!(
+ "segment idx={} overlaps next candidate idx={} (at offset={})",
+ segment.pcap_index, next.pcap_index, overlap_offset
+ );
+ // we will modify the subsequent segment (next)
+ // safety: element presence was tested in outer loop
+ let next = peer.segments.pop_front().unwrap();
+
+ // split next
+ let overlap_size = segment.data.len() - overlap_offset;
+ let min_overlap_size = std::cmp::min(overlap_size, next.data.len());
+ // compare overlap area
+ if next.data[..min_overlap_size]
+ != segment.data[overlap_offset..overlap_offset + min_overlap_size]
+ {
+ warn!(
+ "Overlap area differs! left idx={} right idx={}",
+ segment.pcap_index, next.pcap_index
+ );
+ }
+ if overlap_size >= next.data.len() {
+ // subsequent segment starts after and is smaller, so drop it
+ drop(next);
+ continue;
+ }
+ // otherwise, split next into left and right, drop left and accept right
+ let mut left = next;
+ let right = left.split_off(overlap_size);
+ // to accept right, merge it into segment
+ segment.data.extend_from_slice(&right.data);
+ } else {
+ // trace!("no overlap, break");
+ break;
+ }
+ }
+}
+
+fn adjust_seq_numbers(origin: &mut TcpPeer, segment: &TcpSegment) {
+ if !segment.data.is_empty() {
+ // adding length is wrong in case of overlap
+ // origin.next_rel_seq += Wrapping(segment.data.len() as u32);
+ origin.next_rel_seq = segment.rel_seq + Wrapping(segment.data.len() as u32);
+ }
+
+ if segment.flags & TcpFlags::FIN != 0 {
+ // trace!("Segment has FIN");
+ origin.next_rel_seq += Wrapping(1);
+ }
+}
+
+impl TcpStreamReassembly {
+ pub(crate) fn update(
+ &mut self,
+ flow: &Flow,
+ tcp: &TcpPacket,
+ to_server: bool,
+ pcap_index: usize,
+ ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
+ trace!("5-t: {}", flow.five_tuple);
+ trace!(" flow id: {:x}", flow.flow_id);
+ trace!(
+ " seq: {:x} ack {:x}",
+ tcp.get_sequence(),
+ tcp.get_acknowledgement()
+ );
+
+ let mut stream = self
+ .m
+ .entry(flow.flow_id)
+ .or_insert_with(|| TcpStream::new(flow));
+ trace!("stream state: {:?}", stream.status);
+ trace!("to_server: {}", to_server);
+
+ // check time delay with previous packet before updating
+ if stream.last_seen_ts > flow.last_seen {
+ info!("packet received in past of stream idx={}", pcap_index);
+ } else if flow.last_seen - stream.last_seen_ts > self.timeout {
+ warn!("TCP stream received packet after timeout");
+ stream.expire();
+ return Err(TcpStreamError::Expired);
+ }
+ stream.last_seen_ts = flow.last_seen;
+
+ let (origin, _destination) = if to_server {
+ (&stream.client, &stream.server)
+ } else {
+ (&stream.server, &stream.client)
+ };
+
+ trace!(
+ "origin: {}:{} status {:?}",
+ origin.addr,
+ origin.port,
+ origin.status
+ );
+ debug_print_tcp_flags(tcp.get_flags());
+
+ match origin.status {
+ TcpStatus::Closed | TcpStatus::Listen | TcpStatus::SynSent | TcpStatus::SynRcv => {
+ stream.handle_new_connection(tcp, to_server, pcap_index)
+ }
+ TcpStatus::Established => {
+ // check for close request
+ if tcp.get_flags() & (TcpFlags::FIN | TcpFlags::RST) != 0 {
+ trace!("Requesting end of connection");
+ Ok(stream.handle_closing_connection(tcp, to_server, pcap_index))
+ } else {
+ stream.handle_established_connection(tcp, to_server, pcap_index)
+ }
+ }
+ _ => Ok(stream.handle_closing_connection(tcp, to_server, pcap_index)),
+ }
+ }
+ pub(crate) fn check_expired_connections(&mut self, now: Duration) {
+ for (flow_id, stream) in self.m.iter_mut() {
+ if now < stream.last_seen_ts {
+ warn!(
+ "stream.last_seen_ts is in the future for flow id {:x}",
+ flow_id
+ );
+ continue;
+ }
+ if now - stream.last_seen_ts > self.timeout {
+ warn!("TCP stream timeout reached for flow {:x}", flow_id);
+ stream.expire();
+ }
+ }
+ }
+}
+
+pub(crate) fn finalize_tcp_streams(analyzer: &mut crate::analyzer::Analyzer) {
+ warn!("expiring all TCP connections");
+ for (flow_id, _stream) in analyzer.tcp_defrag.m.iter() {
+ // TODO do we have anything to do?
+ if let Some(flow) = analyzer.flows.get_flow(*flow_id) {
+ debug!(" flow: {:?}", flow);
+ }
+ }
+ analyzer.tcp_defrag.m.clear();
+}
+
+fn debug_print_tcp_flags(tcp_flags: u16) {
+ if log::Level::Debug <= log::STATIC_MAX_LEVEL {
+ let mut s = String::from("tcp_flags: [");
+ if tcp_flags & TcpFlags::SYN != 0 {
+ s += "S"
+ }
+ if tcp_flags & TcpFlags::FIN != 0 {
+ s += "F"
+ }
+ if tcp_flags & TcpFlags::RST != 0 {
+ s += "R"
+ }
+ if tcp_flags & TcpFlags::URG != 0 {
+ s += "U"
+ }
+ if tcp_flags & TcpFlags::PSH != 0 {
+ s += "P"
+ }
+ if tcp_flags & TcpFlags::ACK != 0 {
+ s += "A"
+ }
+ s += "]";
+ trace!("{}", s);
+ }
+}
+
+impl fmt::Debug for TcpPeer {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ writeln!(f, "Peer: {}:{}", self.addr, self.port)?;
+ writeln!(f, " status: {:?}", self.status)?;
+ writeln!(f, " isn: 0x{:x} ian: 0x{:x}", self.isn, self.ian)?;
+ writeln!(f, " next_rel_seq: {}", self.next_rel_seq)?;
+ writeln!(f, " last_rel_ack: {}", self.last_rel_ack)?;
+ writeln!(f, " #segments: {}", self.segments.len())?;
+ for (n, s) in self.segments.iter().enumerate() {
+ writeln!(
+ f,
+ " s[{}]: rel_seq={} len={} idx={}",
+ n,
+ s.rel_seq,
+ s.data.len(),
+ s.pcap_index,
+ )?;
+ }
+ Ok(())
+ }
+}
diff --git a/src/session/tcp_stream.rs b/src/session/tcp_stream.rs
deleted file mode 100644
index 653d3b4..0000000
--- a/src/session/tcp_stream.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-
-
-// tcpheader to stream, and many checks
diff --git a/src/session/timeout.rs b/src/session/timeout.rs
deleted file mode 100644
index 3aa4103..0000000
--- a/src/session/timeout.rs
+++ /dev/null
@@ -1,16 +0,0 @@
-/* ----------------------------------- 方案1 ---------------------------------- */
-// 比较直接的方法,是搞一个定时器,定期监听,但是这样就要上锁,或者原子操作
-// 有一个比较恶心的问题,如果我要让定时器的线程可以删除session,必须把session move 过去,那么一定得对整个session的句柄上锁。
-// 这个方法有一个最大的优点,比起方法3,它更通用,相当于是给所有定时器提供了一个方法。以后估计会有watchdog 之类的 。
-// https://doc.rust-lang.org/std/sync/atomic/
-
-/* ----------------------------------- 方案2 ---------------------------------- */
-// 异步处理,session 开启,一直await。但是这么做,就感觉有点像是每个session都起一个concurrent 的感觉,就是tokio等等那套,起一个端口,要是超时自己关了。
-// 如果用worker job 异步模型组织session的话,这种方法是最好的。
-// https://docs.rs/async-io/latest/async_io/
-
-/* ----------------------------------- 方案3 ---------------------------------- */
-// 每次包来的时候,都看一下所有session的情况,如果有的话就删掉。当然得有一些处理,比如有一个granularity,至少多长时间才检查一次,比如一次最多检查多少个session,等等。
-// 如果session数小,且session超时不多的话,这种方法是最好的,完全不考虑线程。
-// 参考reass:timeout.h:set_time()
-