summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2023-09-07 17:04:16 +0800
committerchenzizhan <[email protected]>2023-09-07 17:04:16 +0800
commit138fdfbed1b16dcd44b2d3e2ee5b19b103915bf0 (patch)
treec802dd72dcecc5cf675d40c7adda78615bf5e79c
parentdf7ba937f53b93275a705952f44a6f87a23bcb77 (diff)
wip
-rw-r--r--src/main.rs26
-rw-r--r--src/session/duration.rs61
-rw-r--r--src/session/mod.rs3
-rw-r--r--src/session/tcp_reassembly.rs475
4 files changed, 306 insertions, 259 deletions
diff --git a/src/main.rs b/src/main.rs
index 2eee4d3..e7a11a9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,29 +1,3 @@
-struct Boo {
- a: int32,
-}
-
-struct Foo {
- a: int32,
-}
-
-fn boo_cb(boo: &Boo) {
- println!("boo_cb");
-}
-
-fn foo_cb(foo: &Foo, a: int32) {
- println!("foo_cb");
- println!("{}", a);
-}
-
-enum Event {
- CallBoo,
- CallFoo,
-}
-
-struct EventCb {
- event: Event,
-}
-
fn main() {
println!("Hello, world!");
}
diff --git a/src/session/duration.rs b/src/session/duration.rs
new file mode 100644
index 0000000..67fc245
--- /dev/null
+++ b/src/session/duration.rs
@@ -0,0 +1,61 @@
+use std::ops::{Add,Sub};
+
+/// Reimplementation of std::time::Duration, but panic-free
+/// and partial, only to match our needs:
+/// - use micros instead of nanos, avoid casts
+/// - expose fields
+#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Debug)]
+pub struct Duration {
+ pub secs: u32,
+ pub micros: u32,
+}
+
+pub const MICROS_PER_SEC: u32 = 1_000_000;
+
+impl Duration {
+ /// Build Duration from secs and micros
+ pub fn new(secs:u32, micros:u32) -> Duration {
+ Duration{ secs, micros }
+ }
+ /// Test if Duration object is null
+ #[inline]
+ pub fn is_null(self) -> bool {
+ self.secs == 0 && self.micros == 0
+ }
+}
+
+impl Add for Duration {
+ type Output = Duration;
+
+ #[allow(clippy::suspicious_arithmetic_impl)]
+ fn add(self, other: Duration) -> Self::Output {
+ let secs = self.secs.wrapping_add(other.secs);
+ let micros = self.micros.wrapping_add(other.micros);
+ let (secs,micros) = if micros > MICROS_PER_SEC {
+ (secs + (micros / MICROS_PER_SEC), micros % MICROS_PER_SEC)
+ } else {
+ (secs,micros)
+ };
+
+ Duration{ secs, micros }
+ }
+}
+
+impl Sub for Duration {
+ type Output = Duration;
+
+ #[allow(clippy::suspicious_arithmetic_impl)]
+ fn sub(self, other: Duration) -> Self::Output {
+ let secs = self.secs.wrapping_sub(other.secs);
+ let (secs,micros) = if self.micros >= other.micros {
+ (secs,self.micros - other.micros)
+ } else {
+ let diff = other.micros.wrapping_sub(self.micros);
+ let secs_less = diff / MICROS_PER_SEC;
+ let micros = MICROS_PER_SEC - diff;
+ (secs.wrapping_sub(1 + secs_less),micros)
+ };
+
+ Duration{ secs, micros }
+ }
+}
diff --git a/src/session/mod.rs b/src/session/mod.rs
index d3a09dc..f91fd0a 100644
--- a/src/session/mod.rs
+++ b/src/session/mod.rs
@@ -1 +1,2 @@
-pub mod tcp_reassembly; \ No newline at end of file
+pub mod tcp_reassembly;
+pub(super) mod duration; \ No newline at end of file
diff --git a/src/session/tcp_reassembly.rs b/src/session/tcp_reassembly.rs
index 98d0164..4b74d21 100644
--- a/src/session/tcp_reassembly.rs
+++ b/src/session/tcp_reassembly.rs
@@ -1,14 +1,34 @@
-use pnet_macros_support::packet::Packet as PnetPacket;
-use std::cmp::Ordering;
-use std::collections::{HashMap, VecDeque};
+use std::collections::VecDeque;
use std::fmt;
-use std::net::IpAddr;
+use std::net::{IpAddr, Ipv4Addr};
use std::num::Wrapping;
-use pnet_packet::tcp::TcpFlags;
+use super::duration::Duration;
use crate::protocol::ipv4::IPv4Header;
-use crate::protocol::tcp::TcpHeader;
+use crate::protocol::tcp::{TcpHeader, TcpOption};
use crate::packet::packet::Encapsulation;
+use crate::packet::packet::Packet as RawPacket;
+const DEFAULT_TIMEOUT: Duration = Duration{secs:7200, micros:0}; // 120 min timeout, currently do not support option 28 parse
+// todo: const TCP_OPTION_TIMESTAMPS: u8 = 28; https://datatracker.ietf.org/doc/rfc5482/
+
+#[derive(Debug, Eq, PartialEq)]
+pub enum TcpStreamError {
+ Anomaly,
+ /// Connection is OK, but sides are inverted
+ Inverted,
+ /// Packet received but connection has expired
+ Expired,
+ HandshakeFailed,
+}
+
+pub enum TcpConnectionErr {
+ PacketNotV4Tcp,
+ WrongFlags, // todo返回当前的会话状态,以及期望的flag
+ // todo: TCP header to flag:u16
+ ExpiredSession,
+ /// the seq number is wrong in the 2nd or 3rd handshake
+ HandshakeFailed,
+}
#[derive(Debug, Eq, PartialEq)]
#[allow(dead_code)]
@@ -32,6 +52,16 @@ impl Default for TcpStatus {
}
}
+enum TcpFlags {
+ FIN = 0x01,
+ SYN = 0x02,
+ RST = 0x04,
+ PSH = 0x08,
+ ACK = 0x10,
+ URG = 0x20,
+}
+
+#[derive(Debug)]
struct TcpPacket {
payload : Vec<u8>,
ipv4_header : IPv4Header,
@@ -59,19 +89,32 @@ impl TcpPacket {
fn payload(&self) -> &[u8] {
self.payload.as_slice()
}
+ fn get_timestamp(&self) -> Duration { // todo: 感觉这个duration没什么用,改成u32 吧,单位是秒
+ let mut timeVal:u32 = 0;
+ if let Some(options) = &self.tcp_header.options {
+ for option in options {
+ if let TcpOption::TIMESTAMPS{length, ts_value, ts_reply} = option {
+ timeVal = *ts_value;
+ }
+ }
+ }
+ Duration::new(timeVal, 0)
+ }
}
#[derive(Debug)]
pub struct TcpSegment {
pub rel_seq: Wrapping<u32>,
pub rel_ack: Wrapping<u32>,
- pub flags: u16,
- pub data: Vec<u8>,
+ pub payload: Vec<u8>,
+
+ // packet: TcpPacket,
ipv4_header: IPv4Header,
tcp_header: TcpHeader,
+ // todo: 替换成packet
}
-fn encapsulation_convert_to_my_packet(encapsulation: Vec<Encapsulation<'a>>) -> Option<TcpPacket> {
+fn encapsulation_convert_to_my_packet(encapsulation: & Vec<Encapsulation<'_>>) -> Result<TcpPacket, TcpConnectionErr> {
let mut payload = Vec::new();
let mut ipv4_header = Option::None;
let mut tcp_header = Option::None;
@@ -88,20 +131,20 @@ fn encapsulation_convert_to_my_packet(encapsulation: Vec<Encapsulation<'a>>) ->
}
}
if ipv4_header.is_none() || tcp_header.is_none() {
- return None;
+ return Err(TcpConnectionErr::PacketNotV4Tcp);
}
- Some(TcpPacket {
+ Ok(TcpPacket {
payload,
- ipv4_header: ipv4_header.unwrap(),
- tcp_header: tcp_header.unwrap(),
+ ipv4_header: ipv4_header.unwrap().clone(),
+ tcp_header: tcp_header.unwrap().clone(),
})
}
impl TcpSegment {
/// Return the offset of the overlapping area if `self` (as left) overlaps on `right`
pub fn overlap_offset(&self, right: &TcpSegment) -> Option<usize> {
- let next_seq = self.rel_seq + Wrapping(self.data.len() as u32);
+ let next_seq = self.rel_seq + Wrapping(self.payload.len() as u32);
if next_seq > right.rel_seq {
let overlap_offset = (right.rel_seq - self.rel_seq).0 as usize;
Some(overlap_offset)
@@ -114,15 +157,17 @@ impl TcpSegment {
///
/// # Panics
///
- /// Panics if `offset > self.data.len()`
+ /// Panics if `offset > self.payload.len()`
pub fn split_off(&mut self, offset: usize) -> TcpSegment {
- debug_assert!(offset < self.data.len());
- let remaining = self.data.split_off(offset);
+ assert!(offset < self.payload.len());
+ let remaining = self.payload.split_off(offset);
let rel_seq = self.rel_seq + Wrapping(offset as u32);
TcpSegment {
- data: remaining,
+ payload: remaining,
rel_seq,
- ..*self
+ rel_ack: self.rel_ack,
+ ipv4_header: self.ipv4_header.clone(),
+ tcp_header: self.tcp_header.clone(),
}
}
}
@@ -140,9 +185,9 @@ pub struct TcpPeer {
status: TcpStatus,
/// The current list of segments (ordered by rel_seq)
segments: VecDeque<TcpSegment>,
- /// DEBUG: host address
- addr: IpAddr,
- /// DEBUG: port
+ /// println: host address
+ addr: Ipv4Addr,
+ /// println: port
port: u16,
}
@@ -158,41 +203,23 @@ impl TcpPeer {
}
}
-pub struct TcpStream {
+struct TcpStream {
pub client: TcpPeer,
pub server: TcpPeer,
pub status: TcpStatus,
- // XXX timestamp of last seen packet
- pub last_seen_ts: Duration,
+ // from packet.option or timeval passed by api argument. Used to check timeout.
+ // the free of session is NOT decided by this value. The api user should decide it.
+ pub last_seen_ts: Duration,
}
-pub struct TcpStreamReassembly {
- pub m: HashMap<FlowID, TcpStream>,
+pub struct TcpConnection {
+ stream: TcpStream,
pub timeout: Duration,
}
-impl Default for TcpStreamReassembly {
- fn default() -> Self {
- TcpStreamReassembly {
- m: HashMap::new(),
- timeout: Duration::new(14400, 0),
- }
- }
-}
-
-#[derive(Debug, Eq, PartialEq)]
-pub enum TcpStreamError {
- Anomaly,
- /// Connection is OK, but sides are inverted
- Inverted,
- /// Packet received but connection has expired
- Expired,
- HandshakeFailed,
-}
-
impl TcpPeer {
- pub fn new(addr: &IpAddr, port: u16) -> Self {
+ pub fn new(addr: &Ipv4Addr, port: u16) -> Self {
TcpPeer {
isn: Wrapping(0),
ian: Wrapping(0),
@@ -206,21 +233,25 @@ impl TcpPeer {
}
}
+fn tmp_take_ownership(t: TcpHeader) {
+
+}
+
impl TcpStream {
- pub fn new(flow: &Flow) -> Self {
+ pub fn new(packet: &TcpPacket) -> Self {
TcpStream {
- client: TcpPeer::new(&flow.five_tuple.src, flow.five_tuple.src_port),
- server: TcpPeer::new(&flow.five_tuple.dst, flow.five_tuple.dst_port),
+ client: TcpPeer::new(&packet.ipv4_header.source_address, packet.tcp_header.source_port),
+ server: TcpPeer::new(&packet.ipv4_header.dest_address, packet.tcp_header.dest_port),
status: TcpStatus::Closed,
- last_seen_ts: flow.last_seen,
+ last_seen_ts: packet.get_timestamp(),
}
}
- pub fn handle_new_connection<'a>(
+ fn handle_new_connection(
&mut self,
- tcp: &'a TcpPacket,
+ tcp: TcpPacket,
to_server: bool,
- ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
+ ) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> {
let seq = Wrapping(tcp.get_sequence());
let ack = Wrapping(tcp.get_acknowledgement());
@@ -238,12 +269,12 @@ impl TcpStream {
// client sent a RST, this is expected
return Ok(None);
}
- if tcp.has_flag(TcpFlags::SYN) == 0 {
+ if !tcp.has_flag(TcpFlags::SYN) {
// not a SYN - usually happens at start of pcap if missed SYN
- warn!("First packet of a TCP stream is not a SYN");
+ println!("First packet of a TCP stream is not a SYN");
// test is ACK + data, and set established if possible
- if tcp.has_flag(TcpFlags::ACK) != 0 {
- trace!("Trying to catch connection on the fly");
+ if tcp.has_flag(TcpFlags::ACK) {
+ println!("the session has closed, but receive an established packet.");
src.isn = seq;
src.ian = ack;
src.next_rel_seq = Wrapping(0);
@@ -257,27 +288,19 @@ impl TcpStream {
let segment = TcpSegment {
rel_seq: Wrapping(0),
rel_ack: Wrapping(0),
- data: tcp.payload().to_vec(), // XXX data cloned here
+ payload: tcp.payload().to_vec(),
+ tcp_header: tcp.tcp_header,
+ ipv4_header: tcp.ipv4_header,
};
queue_segment(src, segment);
return Ok(None);
}
- return Err(TcpStreamError::Anomaly);
+ return Err(TcpConnectionErr::WrongFlags);
}
- if tcp.has_flag(TcpFlags::ACK) != 0 {
- warn!("First packet is SYN+ACK - missed SYN?");
- dst.isn = ack - Wrapping(1);
- dst.status = TcpStatus::SynSent;
- dst.next_rel_seq = Wrapping(1);
- src.isn = seq;
- src.ian = ack;
- src.last_rel_ack = Wrapping(1);
- src.next_rel_seq = Wrapping(1);
- src.status = TcpStatus::Listen;
- // swap sides and tell analyzer to do the same for flow
- std::mem::swap(&mut self.client, &mut self.server);
- return Err(TcpStreamError::Inverted);
+ if tcp.has_flag(TcpFlags::ACK) {
+ println!("First packet is SYN+ACK");
+ return Err(TcpConnectionErr::WrongFlags);
}
src.isn = seq;
src.next_rel_seq = Wrapping(1);
@@ -285,34 +308,35 @@ impl TcpStream {
self.status = TcpStatus::SynSent;
src.status = TcpStatus::SynSent;
dst.status = TcpStatus::Listen;
- // do we have data ?
+
if !tcp.payload().is_empty() {
- warn!("Data in handshake SYN");
+ println!("Data in handshake SYN");
// conn.next_rel_seq += Wrapping(tcp.payload().len() as u32);
let segment = TcpSegment {
rel_seq: seq - src.isn,
rel_ack: ack - dst.isn,
- flags: tcp_flags,
- data: tcp.payload().to_vec(), // XXX data cloned here
+ payload: tcp.payload().to_vec(),
+ tcp_header: tcp.tcp_header,
+ ipv4_header: tcp.ipv4_header,
};
queue_segment(src, segment);
}
}
// Server -- SYN+ACK --> Client
TcpStatus::Listen => {
- if tcp.has_flag(TcpFlags::SYN) || tcp.has_flag(TcpFlags::ACK) == 0 {
- warn!("Not a SYN or ACK");
- return Err(TcpStreamError::Anomaly);
+ if !tcp.has_flag(TcpFlags::SYN) && !tcp.has_flag(TcpFlags::ACK) {
+ println!("Not a SYN + ACK");
+ return Err(TcpConnectionErr::WrongFlags);
}
// if we had data in SYN, add its length
let next_rel_seq = if dst.segments.is_empty() {
Wrapping(1)
} else {
- Wrapping(1) + Wrapping(dst.segments[0].data.len() as u32)
+ Wrapping(1) + Wrapping(dst.segments[0].payload.len() as u32)
};
if ack != dst.isn + next_rel_seq {
- warn!("NEW/SYN-ACK: ack number is wrong");
- return Err(TcpStreamError::HandshakeFailed);
+ println!("NEW/SYN-ACK: ack number is wrong");
+ return Err(TcpConnectionErr::HandshakeFailed);
}
src.isn = seq;
src.next_rel_seq = Wrapping(1);
@@ -326,25 +350,25 @@ impl TcpStream {
}
// Client -- ACK --> Server
TcpStatus::SynSent => {
- if tcp.has_flag(TcpFlags::ACK) == 0 {
+ if !tcp.has_flag(TcpFlags::ACK) {
if tcp.has_flag(TcpFlags::SYN) {
// can be a SYN resend
if seq == src.isn && ack.0 == 0 {
- trace!("SYN resend - ignoring");
+ println!("SYN resend - ignoring");
return Ok(None);
}
// can be a disordered handshake (receive S after SA)
if seq + Wrapping(1) == dst.ian {
- trace!("Likely received SA before S - ignoring");
+ println!("Likely received SA before S - ignoring");
return Ok(None);
}
}
- warn!("Not an ACK");
+ println!("Not an ACK");
}
- // TODO check seq, ack
+
if ack != dst.isn + Wrapping(1) {
- warn!("NEW/ACK: ack number is wrong");
- return Err(TcpStreamError::HandshakeFailed);
+ println!("NEW/ACK: ack number is wrong");
+ return Err(TcpConnectionErr::HandshakeFailed);
}
src.status = TcpStatus::Established;
dst.status = TcpStatus::Established;
@@ -352,12 +376,13 @@ impl TcpStream {
self.status = TcpStatus::Established;
// do we have data ?
if !tcp.payload().is_empty() {
- // warn!("Data in handshake ACK");
+ // println!("Data in handshake ACK");
let segment = TcpSegment {
rel_seq: seq - src.isn,
rel_ack: ack - dst.isn,
- flags: tcp_flags,
- data: tcp.payload().to_vec(), // XXX data cloned here
+ payload: tcp.payload().to_vec(), // XXX data cloned here
+ tcp_header: tcp.tcp_header,
+ ipv4_header: tcp.ipv4_header,
};
queue_segment(src, segment);
}
@@ -370,18 +395,19 @@ impl TcpStream {
// ignore
return Ok(None);
}
- warn!("Received unexpected data in SYN_RCV state");
+ println!("Received unexpected data in SYN_RCV state");
+ return Err(TcpConnectionErr::WrongFlags);
}
_ => unreachable!(),
}
Ok(None)
}
- pub fn handle_established_connection<'a>(
+ pub fn handle_established_connection(
&mut self,
- tcp: &'a TcpPacket,
+ tcp: TcpPacket,
to_server: bool,
- ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
+ ) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> {
let (origin, destination) = if to_server {
(&mut self.client, &mut self.server)
} else {
@@ -390,40 +416,42 @@ impl TcpStream {
let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn;
let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn;
+ let has_ack = tcp.has_flag(TcpFlags::ACK); // get it before borrowing tcp
- trace!("EST: payload len={}", tcp.payload().len());
- trace!(
+ println!("EST: payload len={}", tcp.payload().len());
+ println!(
" Tcp rel seq {} ack {} next seq {}",
rel_seq,
rel_ack,
origin.next_rel_seq
);
- if tcp.has_flag(TcpFlags::ACK) == 0 && tcp.get_acknowledgement() != 0 {
- warn!(
- "EST/ packet without ACK (broken TCP implementation or attack)",
+ if !tcp.has_flag(TcpFlags::ACK) && tcp.get_acknowledgement() != 0 {
+ println!(
+ "Established state packet without ACK (broken TCP implementation or attack)",
);
// ignore segment
- return Ok(None);
+ return Err(TcpConnectionErr::WrongFlags);
}
let segment = TcpSegment {
rel_seq,
rel_ack,
- data: tcp.payload().to_vec(), // XXX data cloned here
+ payload: tcp.payload().to_vec(), // XXX data cloned here
+ tcp_header: tcp.tcp_header,
+ ipv4_header: tcp.ipv4_header,
};
queue_segment(origin, segment);
- // trace!("Destination: {:?}", destination); // TODO to remove
// if there is a ACK, check & send segments on the *other* side
- let ret = if tcp.has_flag(TcpFlags::ACK) != 0 {
+ let ret = if has_ack {
send_peer_segments(destination, rel_ack)
} else {
None
};
- trace!(
+ println!(
" PEER EST rel next seq {} last_ack {}",
destination.next_rel_seq,
destination.last_rel_ack,
@@ -434,9 +462,9 @@ impl TcpStream {
fn handle_closing_connection(
&mut self,
- tcp: &TcpPacket,
+ tcp: TcpPacket,
to_server: bool,
- ) -> Option<Vec<TcpSegment>> {
+ ) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> {
let (mut origin, destination) = if to_server {
(&mut self.client, &mut self.server)
} else {
@@ -449,36 +477,36 @@ impl TcpStream {
let has_fin = tcp.has_flag(TcpFlags::FIN);
let ret = if has_ack {
- trace!("ACKing segments up to {}", rel_ack);
+ println!("ACKing segments up to {}", rel_ack);
send_peer_segments(destination, rel_ack)
} else {
if tcp.get_acknowledgement() != 0 {
- warn!(
+ println!(
"EST/ packet without ACK (broken TCP implementation or attack)",
);
// ignore segment
- return None;
+ return Err(TcpConnectionErr::WrongFlags);
}
None
};
if tcp.has_flag(TcpFlags::RST) {
// if we get a RST, check the sequence number and remove matching segments
- // trace!("RST received. rel_seq: {}", rel_seq);
- // trace!(
+ // println!("RST received. rel_seq: {}", rel_seq);
+ // println!(
// "{} remaining (undelivered) segments DESTINATION",
// destination.segments.len()
// );
// for (n, s) in destination.segments.iter().enumerate() {
- // trace!(" s[{}]: rel_seq={} plen={}", n, s.rel_seq, s.data.len());
+ // println!(" s[{}]: rel_seq={} plen={}", n, s.rel_seq, s.payload.len());
// }
// remove queued segments up to rel_seq
destination.segments.retain(|s| s.rel_ack != rel_seq);
- trace!(
+ println!(
"RST: {} remaining (undelivered) segments DESTINATION after removal",
destination.segments.len()
);
origin.status = TcpStatus::Closed; // XXX except if ACK ?
- return ret;
+ return Ok(ret);
}
// queue segment (even if FIN, to get correct seq numbers)
@@ -487,18 +515,20 @@ impl TcpStream {
let segment = TcpSegment {
rel_seq,
rel_ack,
- data: tcp.payload().to_vec(), // XXX data cloned here
+ payload: tcp.payload().to_vec(), // XXX data cloned here
+ tcp_header: tcp.tcp_header,
+ ipv4_header: tcp.ipv4_header,
};
queue_segment(origin, segment);
// if tcp_flags & TcpFlags::FIN != 0 {
- // warn!("origin next seq was {}", origin.next_rel_seq.0);
+ // println!("origin next seq was {}", origin.next_rel_seq.0);
// origin.next_rel_seq += Wrapping(1);
// }
match origin.status {
TcpStatus::Established => {
- // we know there is a FIN (tested in TcpStreamReassembly::update)
+ // we know there is a FIN (tested in TcpConnection::update)
origin.status = TcpStatus::FinWait1;
destination.status = TcpStatus::CloseWait; // we are not sure it was received
}
@@ -506,16 +536,16 @@ impl TcpStream {
if !has_fin {
// if only an ACK, do nothing and stay in CloseWait status
if has_ack {
- // debug!("destination status: {:?}", destination.status);
+ // println!("destination status: {:?}", destination.status);
if destination.status == TcpStatus::FinWait1 {
destination.status = TcpStatus::FinWait2;
}
} else {
- warn!("Origin should have sent a FIN and/or ACK");
+ println!("Origin should have sent a FIN and/or ACK");
}
} else {
origin.status = TcpStatus::LastAck;
- // debug!("destination status: {:?}", destination.status);
+ // println!("destination status: {:?}", destination.status);
if has_ack || destination.status == TcpStatus::FinWait2 {
destination.status = TcpStatus::TimeWait;
} else {
@@ -532,38 +562,32 @@ impl TcpStream {
}
}
_ => {
- warn!(
+ println!(
"Unhandled closing transition: origin host {} status {:?}",
origin.addr, origin.status
);
- warn!(
+ println!(
" dest host {} status {:?}",
destination.addr, destination.status
);
}
}
- trace!(
+ println!(
"TCP connection closing, {} remaining (undelivered) segments",
origin.segments.len()
);
- // DEBUG
+ // println
for (n, s) in origin.segments.iter().enumerate() {
- trace!(
+ println!(
" s[{}]: seq={} len={}",
n,
s.rel_seq.0,
- s.data.len(),
+ s.payload.len(),
);
}
- // TODO what now?
-
- if origin.segments.is_empty() {
- return ret;
- }
-
- ret
+ Ok(ret)
}
// force expiration (for ex after timeout) of this stream
@@ -575,31 +599,33 @@ impl TcpStream {
fn queue_segment(peer: &mut TcpPeer, segment: TcpSegment) {
// only store segments with data, except FIN
- if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 {
+ if segment.payload.is_empty() && !segment.tcp_header.flag_fin {
return;
}
- // // DEBUG
+ // // println
// for (n, s) in peer.segments.iter().enumerate() {
- // debug!(
+ // println!(
// " XXX peer s[{}]: rel_seq={} plen={}",
// n,
// s.rel_seq,
- // s.data.len()
+ // s.payload.len()
// );
// }
- // trivial case: list is empty - just push segment
+
+ //todo: 老代码有一个 EARLY_DETECT_OVERLAP 不知道干嘛的
+
if peer.segments.is_empty() {
- trace!("Pushing segment (front)");
+ println!("Pushing segment (front)");
peer.segments.push_front(segment);
return;
}
- trace!("Adding segment");
+ println!("Adding segment");
peer.insert_sorted(segment);
}
fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<TcpSegment>> {
- trace!(
+ println!(
"Trying to send segments for {}:{} up to {} (last ack: {})",
peer.addr,
peer.port,
@@ -607,7 +633,7 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<
peer.last_rel_ack
);
if rel_ack == peer.last_rel_ack {
- trace!("re-acking last data, doing nothing");
+ println!("re-acking last data, doing nothing");
return None;
}
if peer.segments.is_empty() {
@@ -616,7 +642,7 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<
// is ACK acceptable?
if rel_ack < peer.last_rel_ack {
- warn!("ACK request for already ACKed data (ack < last_ack)");
+ println!("ACK request for already ACKed data (ack < last_ack)");
return None;
}
@@ -626,18 +652,19 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<
while !peer.segments.is_empty() {
let segment = &peer.segments[0];
- trace!(
+ println!(
"segment: rel_seq={} len={}",
segment.rel_seq,
- segment.data.len()
+ segment.payload.len()
);
- trace!(
+ println!(
" origin.next_rel_seq {} ack {}",
peer.next_rel_seq,
rel_ack
);
+ // todo:
// if origin.next_rel_seq > rel_ack {
- // warn!("next_seq > ack - partial ACK ?");
+ // println!("next_seq > ack - partial ACK ?");
// unreachable!(); // XXX do we care about that case?
// // break;
// }
@@ -649,18 +676,18 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<
// safety: segments is just tested above
let mut segment = peer.segments.pop_front().unwrap();
- if rel_ack < segment.rel_seq + Wrapping(segment.data.len() as u32) {
- // warn!("ACK lower then seq + segment size - SACK?");
- trace!("ACK for part of buffer");
+ if rel_ack < segment.rel_seq + Wrapping(segment.payload.len() as u32) {
+ // println!("ACK lower then seq + segment size - SACK?");
+ println!("ACK for part of buffer");
// split data and insert new dummy segment
- trace!("rel_ack {} segment.rel_seq {}", rel_ack, segment.rel_seq);
- trace!("segment data len {}", segment.data.len());
+ println!("rel_ack {} segment.rel_seq {}", rel_ack, segment.rel_seq);
+ println!("segment data len {}", segment.payload.len());
let acked_len = (rel_ack - segment.rel_seq).0 as usize;
let new_segment = segment.split_off(acked_len);
- trace!(
+ println!(
"insert new segment from {} len {}",
new_segment.rel_ack,
- new_segment.data.len()
+ new_segment.payload.len()
);
peer.insert_sorted(new_segment);
}
@@ -668,19 +695,19 @@ fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<
handle_overlap_linux(peer, &mut segment);
adjust_seq_numbers(peer, &segment);
- trace!(
+ println!(
"ACKed: pushing segment: rel_seq={} len={}",
segment.rel_seq,
- segment.data.len(),
+ segment.payload.len(),
);
- if !segment.data.is_empty() {
+ if !segment.payload.is_empty() {
acked.push(segment);
}
}
if peer.next_rel_seq != rel_ack {
// missed segments, or maybe received FIN ?
- warn!(
+ println!(
"TCP ACKed unseen segment next_seq {} != ack {} (Missed segments?)",
peer.next_rel_seq, rel_ack
);
@@ -701,7 +728,7 @@ fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) {
// loop while segment has overlap
while let Some(next) = peer.segments.front() {
if let Some(overlap_offset) = segment.overlap_offset(next) {
- warn!(
+ println!(
"overlaps next candidate (at offset={})",
overlap_offset
);
@@ -710,15 +737,15 @@ fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) {
let next = peer.segments.pop_front().unwrap();
// split next
- let overlap_size = segment.data.len() - overlap_offset;
- let min_overlap_size = std::cmp::min(overlap_size, next.data.len());
+ let overlap_size = segment.payload.len() - overlap_offset;
+ let min_overlap_size = std::cmp::min(overlap_size, next.payload.len());
// compare overlap area
- if next.data[..min_overlap_size]
- != segment.data[overlap_offset..overlap_offset + min_overlap_size]
+ if next.payload[..min_overlap_size]
+ != segment.payload[overlap_offset..overlap_offset + min_overlap_size]
{
- warn!("Overlap area differs!");
+ println!("Overlap area differs!");
}
- if overlap_size >= next.data.len() {
+ if overlap_size >= next.payload.len() {
// subsequent segment starts after and is smaller, so drop it
drop(next);
continue;
@@ -727,67 +754,76 @@ fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) {
let mut left = next;
let right = left.split_off(overlap_size);
// to accept right, merge it into segment
- segment.data.extend_from_slice(&right.data);
+ segment.payload.extend_from_slice(&right.payload);
} else {
- // trace!("no overlap, break");
+ // println!("no overlap, break");
break;
}
}
}
fn adjust_seq_numbers(origin: &mut TcpPeer, segment: &TcpSegment) {
- if !segment.data.is_empty() {
+ if !segment.payload.is_empty() {
// adding length is wrong in case of overlap
- // origin.next_rel_seq += Wrapping(segment.data.len() as u32);
- origin.next_rel_seq = segment.rel_seq + Wrapping(segment.data.len() as u32);
+ // origin.next_rel_seq += Wrapping(segment.payload.len() as u32);
+ origin.next_rel_seq = segment.rel_seq + Wrapping(segment.payload.len() as u32);
}
- if segment.flags & TcpFlags::FIN != 0 {
- // trace!("Segment has FIN");
+ if segment.tcp_header.flag_fin {
+ // println!("Segment has FIN");
origin.next_rel_seq += Wrapping(1);
}
}
-impl TcpStreamReassembly {
- pub(crate) fn update(
- &mut self,
- flow: &Flow,
- tcp: &TcpPacket,
- to_server: bool,
- ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
- trace!("5-t: {}", flow.five_tuple);
- trace!(" flow id: {:x}", flow.flow_id);
- trace!(
- " seq: {:x} ack {:x}",
- tcp.get_sequence(),
- tcp.get_acknowledgement()
- );
+impl TcpConnection {
+ pub(crate) fn try_new(packet: &RawPacket) -> Result<Self, TcpConnectionErr> {
+ let simple_packet = encapsulation_convert_to_my_packet(&packet.encapsulation)?;
+
+ let mut stream = TcpStream::new(&simple_packet);
+ let mut connection = TcpConnection {
+ stream,
+ timeout: DEFAULT_TIMEOUT,
+ };
+
+ if !simple_packet.has_flag(TcpFlags::SYN) {
+ return Err(TcpConnectionErr::WrongFlags);
+ }
- let mut stream = self
- .m
- .entry(flow.flow_id)
- .or_insert_with(|| TcpStream::new(flow));
- trace!("stream state: {:?}", stream.status);
- trace!("to_server: {}", to_server);
+ connection._update(simple_packet)?;
+ Ok(connection)
+ }
+
+ pub(crate) fn update(&mut self, packet: &RawPacket) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> {
+ let simple_packet = encapsulation_convert_to_my_packet(&packet.encapsulation)?;
+ self._update(simple_packet)
+ }
+
+ fn _update(&mut self, tcp: TcpPacket) -> Result<Option<Vec<TcpSegment>>, TcpConnectionErr> {
+ let mut stream = &mut self.stream;
+ println!("stream state: {:?}", stream.status);
// check time delay with previous packet before updating
- if stream.last_seen_ts > flow.last_seen {
- info!("packet received in past");
- } else if flow.last_seen - stream.last_seen_ts > self.timeout {
- warn!("TCP stream received packet after timeout");
+ let packet_ts = tcp.get_timestamp();
+ if stream.last_seen_ts > packet_ts {
+ println!("packet received in past");
+ } else if packet_ts - stream.last_seen_ts > self.timeout {
+ println!("TCP stream received packet after timeout");
stream.expire();
- return Err(TcpStreamError::Expired);
+ return Err(TcpConnectionErr::ExpiredSession);
}
- stream.last_seen_ts = flow.last_seen;
+ stream.last_seen_ts = packet_ts;
+ // get origin and destination
+ let to_server = tcp.ipv4_header.source_address == stream.server.addr &&
+ tcp.tcp_header.source_port == stream.server.port;
+ println!("to_server: {}", to_server);
let (origin, _destination) = if to_server {
(&stream.client, &stream.server)
} else {
(&stream.server, &stream.client)
};
- println!(
- "origin: {}:{} status {:?}",
+ println!("origin: {}:{} status {:?}",
origin.addr,
origin.port,
origin.status
@@ -800,41 +836,16 @@ impl TcpStreamReassembly {
TcpStatus::Established => {
// check for close request
if tcp.has_flag(TcpFlags::FIN) || tcp.has_flag(TcpFlags::RST) {
- trace!("Requesting end of connection");
- Ok(stream.handle_closing_connection(tcp, to_server))
+ println!("Requesting end of connection");
+ stream.handle_closing_connection(tcp, to_server)
} else {
stream.handle_established_connection(tcp, to_server)
}
}
- _ => Ok(stream.handle_closing_connection(tcp, to_server)),
+ _ => stream.handle_closing_connection(tcp, to_server),
}
}
- pub(crate) fn check_expired_connections(&mut self, now: Duration) {
- for (flow_id, stream) in self.m.iter_mut() {
- if now < stream.last_seen_ts {
- warn!(
- "stream.last_seen_ts is in the future for flow id {:x}",
- flow_id
- );
- continue;
- }
- if now - stream.last_seen_ts > self.timeout {
- warn!("TCP stream timeout reached for flow {:x}", flow_id);
- stream.expire();
- }
- }
- }
-}
-pub(crate) fn finalize_tcp_streams(analyzer: &mut crate::analyzer::Analyzer) {
- warn!("expiring all TCP connections");
- for (flow_id, _stream) in analyzer.tcp_defrag.m.iter() {
- // TODO do we have anything to do?
- if let Some(flow) = analyzer.flows.get_flow(*flow_id) {
- debug!(" flow: {:?}", flow);
- }
- }
- analyzer.tcp_defrag.m.clear();
}
impl fmt::Debug for TcpPeer {
@@ -851,7 +862,7 @@ impl fmt::Debug for TcpPeer {
" s[{}]: rel_seq={} len={}",
n,
s.rel_seq,
- s.data.len(),
+ s.payload.len(),
)?;
}
Ok(())