From 09ec8c3e235ab84198d58dbd792f93f6d0b23bd4 Mon Sep 17 00:00:00 2001 From: chenzizhan Date: Mon, 4 Sep 2023 16:41:05 +0800 Subject: TransferringState done --- src/session/callbacks.rs | 1 - src/session/tcp_session.rs | 601 +++++++++++++++++++-------------------------- 2 files changed, 248 insertions(+), 354 deletions(-) diff --git a/src/session/callbacks.rs b/src/session/callbacks.rs index 2be75b4..101753e 100644 --- a/src/session/callbacks.rs +++ b/src/session/callbacks.rs @@ -25,7 +25,6 @@ impl ActionManager { pub(super) trait TcpCallBackAction{ fn run(&self); - fn read_peer(&mut self, peer: &tcp_session::Peer); } diff --git a/src/session/tcp_session.rs b/src/session/tcp_session.rs index 3aecef1..5106508 100644 --- a/src/session/tcp_session.rs +++ b/src/session/tcp_session.rs @@ -9,42 +9,22 @@ use crate::protocol::tcp::{TcpHeader, TcpOption}; use super::callbacks::{self, Policy}; -#[derive(Debug)] -struct Listen; -#[derive(Debug)] -struct SynSent; -#[derive(Debug)] -struct SynReceived; -#[derive(Debug)] -struct Established; -#[derive(Debug)] -struct FinWait1; -#[derive(Debug)] -struct FinWait2; -#[derive(Debug)] -struct CloseWait; -#[derive(Debug)] -struct Closing; -#[derive(Debug)] -struct LastAck; -#[derive(Debug)] -struct TimeWait; -#[derive(Debug)] -struct Closed; - -trait SessionState: Debug{} -impl SessionState for Listen{} -impl SessionState for SynSent{} -impl SessionState for SynReceived{} -impl SessionState for Established{} -impl SessionState for FinWait1{} -impl SessionState for FinWait2{} -impl SessionState for CloseWait{} -impl SessionState for Closing{} -impl SessionState for LastAck{} -impl SessionState for TimeWait{} -impl SessionState for Closed{} +#[derive(Debug, PartialEq, Eq)] +enum SessionState { + Listen, + SynSent, + SynReceived, + Established, + FinWait1, + FinWait2, + CloseWait, + Closing, + LastAck, + TimeWait, + Closed, +} +#[derive(Debug)] enum TcpSessionErr { WrongFlag(String), NoWindowSpace, @@ -75,13 +55,20 @@ struct Packet { segment: TcpSegment, } +impl Packet { + fn is_sent_by(&self, peer: &Peer) -> bool { + self.ip_src == peer.ip && self.tcp_header.source_port == peer.port + } +} + +#[derive(Debug, PartialEq, Eq)] enum PeerRole { Client, Server, } pub(super) struct Peer { - state: Box, + state: SessionState, ian: u32, // initial ack number isn: u32, // initial sequence number next_seq: u32, // next sequence number @@ -107,7 +94,7 @@ impl Debug for Peer { impl Peer { fn new() -> Peer { Peer { - state: Box::new(Closed), + state: SessionState::Closed, ian: 0, isn: 0, next_seq: 0, @@ -146,30 +133,6 @@ impl Peer { } } -#[derive(Debug, Clone)] -enum ClosingBy { - Nil, - Client, - Server, - Closed, -} - -#[derive(Debug, PartialEq, Eq)] -struct TransferringState{ - state_sender: SSender, - state_receiver: SReceiver, -} - -impl TransferringState { - fn new(state_sender: SSender, state_receiver: SReceiver) -> Self { - TransferringState { - state_sender, - state_receiver, - } - } -} - - // wave hand: // Peer side // Established send fin -> FinWait1* Established receive fin -> CloseWait @@ -182,367 +145,304 @@ impl TransferringState Closing // Closing send ack -> Closing Closing send ack -> closing // Closing receive ack -> timewait Closing receive ack -> timewait -enum TransferringStateWrapper { - Closed(TransferringState), // initial and end - SynSent(TransferringState), // the client start a connection - SynReceived(TransferringState), // server ack the syn packet - Established(TransferringState), // 3 way handshake done - FinWait1(TransferringState), // one of(either client or server) peer sends fin|rst and turn to FinWait1 - FinWait2(TransferringState), // receive ack - LastAck(TransferringState), // the side peer send fin on receiving fin that initiated by the other peer - TimeWait(TransferringState), // 4-way handshake done - Closing1(TransferringState), // one of Established state send syn and turn to FinWait1, while its peer, which has already sent syn and were being in FinWait1 state, receive syn and turn to Closing state - Closing2(TransferringState), // both peers enter the temporary Closing state - BothTimeWait1(TransferringState), // closing receive ack and turn to timewait - BothTimeWait2(TransferringState), // closing receive ack and turn to timewait -} +#[derive(Debug, PartialEq, Eq)] +enum TransferringState { + Closed, // initial and end + SynSent, // the client start a connection + SynReceived, // server ack the syn packet + Established, // 3 way handshake done + FinWait1, // one of(either client or server) peer sends fin|rst and turn to FinWait1 + FinWait2, // receive ack + LastAck, // the side peer send fin on receiving fin that initiated by the other peer + TimeWait, // 4-way handshake done + Closing1, // one of Established state send syn and turn to FinWait1, while its peer, which has already sent syn and were being in FinWait1 state, receive syn and turn to Closing state + Closing2, // both peers enter the temporary Closing state + BothTimeWait1, // closing receive ack and turn to timewait + BothTimeWait2, // closing receive ack and turn to timewait +} + +fn send_for_establised(sender: &mut Peer, packet: &Packet) -> Result<(), TcpSessionErr> { + let payload_len = packet.segment.payload.len() as u16; + if sender.used_win + payload_len >= sender.total_win { + return Err(TcpSessionErr::NoWindowSpace); + } + if payload_len > 0 { + sender.add_segment(packet); + } + sender.next_seq += { + if packet.tcp_header.flag_fin || packet.tcp_header.flag_rst { + 1 + } else { + if packet.tcp_header.seq_num < sender.next_seq { // segments out of order + 0 + } else { + packet.segment.payload.len() as u32 + } + } + }; -struct TransferringStateTransition { - from: TransferringStateWrapper, - to: TransferringStateWrapper, - ret_err: Option, + Ok(()) } -impl TransferringStateWrapper { - fn try_step(&self, packet: Packet) -> TransferringStateTransition { - let mut ret = TransferringStateTransition { - from: *self, - to: *self, - ret_err: None, - }; +impl TransferringState { + fn get_state(&self) -> (SessionState, SessionState) { + match *self { + TransferringState::Closed => (SessionState::Closed, SessionState::Closed), + TransferringState::SynSent => (SessionState::SynSent, SessionState::Listen), + TransferringState::SynReceived => (SessionState::SynReceived, SessionState::SynSent), + TransferringState::Established => (SessionState::Established, SessionState::Established), + TransferringState::FinWait1 => (SessionState::FinWait1, SessionState::CloseWait), + TransferringState::FinWait2 => (SessionState::FinWait2, SessionState::CloseWait), + TransferringState::LastAck => (SessionState::LastAck, SessionState::TimeWait), + TransferringState::TimeWait => (SessionState::TimeWait, SessionState::Closed), + TransferringState::Closing1 => (SessionState::FinWait1, SessionState::Closing), + TransferringState::Closing2 => (SessionState::Closing, SessionState::Closing), + TransferringState::BothTimeWait1 => (SessionState::Closing, SessionState::TimeWait), + TransferringState::BothTimeWait2 => (SessionState::TimeWait, SessionState::TimeWait), + } + } + + fn try_step(&self, packet: &Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { match *self { /* --------------------------- 3 way handshake --------------------------- */ - TransferringStateWrapper::Closed(state) => { + TransferringState::Closed => { if !packet.tcp_header.flag_syn { - ret.ret_err = Some(TcpSessionErr::WrongFlag("Closed server expect syn packet".to_string())); - } else { - ret.to = TransferringStateWrapper::SynSent(TransferringState{ - state_sender: SynSent, - state_receiver: Listen, - }) + return Err(TcpSessionErr::WrongFlag("Closed server expect syn packet".to_string())); } + client.as_sender(packet); + server.as_receiver(packet); - return ret; + return Ok(( + TransferringState::SynSent, + None)); }, - TransferringStateWrapper::SynSent(state) => { + TransferringState::SynSent => { if !packet.tcp_header.flag_syn || !packet.tcp_header.flag_ack { - ret.ret_err = Some(TcpSessionErr::WrongFlag("Client expects syn + ack packet to establish connection".to_string())); - } else { - ret.to = TransferringStateWrapper::SynReceived(TransferringState{ - state_sender: SynReceived, - state_receiver: SynSent, - }) + return Err(TcpSessionErr::WrongFlag("Client expects syn + ack packet to establish connection".to_string())); } - return ret; + client.as_receiver(packet); + server.as_sender(packet); + return Ok(( + TransferringState::SynReceived, + None) + ); }, - TransferringStateWrapper::SynReceived(state) => { + TransferringState::SynReceived => { if !packet.tcp_header.flag_ack { - ret.ret_err = Some(TcpSessionErr::WrongFlag("Server expects ack packet to establish connection".to_string())); - } else { - ret.to = TransferringStateWrapper::Established(TransferringState{ - state_sender: Established, - state_receiver: Established, - }) + return Err(TcpSessionErr::WrongFlag("Server expects ack packet to establish connection".to_string())); } - return ret; + return Ok(( + TransferringState::Established, + None) + ); }, + /* --------------------------- established & 4 way handshake --------------------------- */ - TransferringStateWrapper::Established(state) => { + TransferringState::Established => { if !packet.tcp_header.flag_ack && !packet.tcp_header.flag_fin && !packet.tcp_header.flag_rst { - ret.ret_err = Some(TcpSessionErr::WrongFlag("During transferring, every packet must carry ack, fin or rst flag.".to_string())); - return ret; + return Err(TcpSessionErr::WrongFlag("During transferring, every packet must carry ack, fin or rst flag.".to_string())); + } + if packet.is_sent_by(client) { + send_for_establised(client, packet)?; + } else { + send_for_establised(server, packet)?; } + if packet.tcp_header.flag_fin || packet.tcp_header.flag_rst { - ret.to = TransferringStateWrapper::FinWait1(TransferringState{ - state_sender: FinWait1, - state_receiver: CloseWait, - }); + return Ok(( + TransferringState::FinWait1, + None) + ); + } else { + return Ok(( + TransferringState::Established, + None) + ); } - return ret; }, - TransferringStateWrapper::FinWait1(state) => { + TransferringState::FinWait1 => { if !packet.tcp_header.flag_ack || !packet.tcp_header.flag_fin { - ret.ret_err = Some(TcpSessionErr::WrongFlag("During FinWait1, wrong packet flag.".to_string())); - return ret; + return Err(TcpSessionErr::WrongFlag("During FinWait1, wrong packet flag.".to_string())); } if packet.tcp_header.flag_ack { - ret.to = TransferringStateWrapper::FinWait2(TransferringState{ - state_sender: FinWait2, - state_receiver: CloseWait, - }); + return Ok ((TransferringState::FinWait2, None)); } else { - ret.to = TransferringStateWrapper::Closing1(TransferringState{ - state_sender: FinWait1, - state_receiver: Closing, - }); + return Ok((TransferringState::Closing1, None)); } - return ret; } - TransferringStateWrapper::FinWait2(state) => { + TransferringState::FinWait2 => { if !packet.tcp_header.flag_fin { - ret.ret_err = Some(TcpSessionErr::WrongFlag("During FinWait2, wrong packet flag, expecting fin".to_string())); - } else { - ret.to = TransferringStateWrapper::TimeWait(TransferringState{ - state_sender: TimeWait, - state_receiver: Closed, - }); + return Err(TcpSessionErr::WrongFlag("During FinWait2, wrong packet flag, expecting fin".to_string())); } - return ret; + return Ok((TransferringState::LastAck, None)); } - TransferringStateWrapper::LastAck(state) => { + TransferringState::LastAck => { if !packet.tcp_header.flag_ack { - ret.ret_err = Some(TcpSessionErr::WrongFlag("During LastAck, wrong packet flag, expecting ack".to_string())); - } else { - ret.to = TransferringStateWrapper::TimeWait(TransferringState{ - state_sender: TimeWait, - state_receiver: Closed, - }); + return Err(TcpSessionErr::WrongFlag("During LastAck, wrong packet flag, expecting ack".to_string())); } - return ret; + return Ok((TransferringState::TimeWait, None)); } - TransferringStateWrapper::TimeWait(state) => { - return ret; + TransferringState::TimeWait => { + return Ok((TransferringState::TimeWait, None)); // do nothing. Just throw away the redundant packets when in TimeWait state } + /* --------------------------------- closing -------------------------------- */ - TransferringStateWrapper::Closing1(state) => { + TransferringState::Closing1 => { if !packet.tcp_header.flag_syn { - ret.ret_err = Some(TcpSessionErr::WrongFlag("During Closing1, wrong packet flag, expecting syn".to_string())); - } else { - ret.to = TransferringStateWrapper::Closing2(TransferringState{ - state_sender: Closing, - state_receiver: Closing, - }); + return Err(TcpSessionErr::WrongFlag("During Closing1, wrong packet flag, expecting syn".to_string())); } - return ret; + return Ok(( + TransferringState::Closing2, + None) + ); } - TransferringStateWrapper::Closing2(state) => { + TransferringState::Closing2 => { if !packet.tcp_header.flag_ack { - ret.ret_err = Some(TcpSessionErr::WrongFlag("During Closing2, wrong packet flag, expecting ack".to_string())); - } else { - ret.to = TransferringStateWrapper::BothTimeWait1(TransferringState{ - state_sender: Closing, - state_receiver: TimeWait, - }); + return Err(TcpSessionErr::WrongFlag("A peer in closing state expects ack".to_string())); } - return ret; + return Ok(( + TransferringState::BothTimeWait1, + None) + ); } - TransferringStateWrapper::BothTimeWait1(state) => { - // todo: 如果是重复发包,也就是主动关闭的那方收到的ack,就直接return, 这个检查放到状态机外面的前面 + TransferringState::BothTimeWait1 => { if !packet.tcp_header.flag_ack { - ret.ret_err = Some(TcpSessionErr::FurtherCheckClosing); - } else { - ret.to = TransferringStateWrapper::BothTimeWait2(TransferringState{ - state_sender: TimeWait, - state_receiver: TimeWait, - }); + return Err(TcpSessionErr::FurtherCheckClosing); } - return ret; + return Ok(( + TransferringState::BothTimeWait2, + None) + ); } - TransferringStateWrapper::BothTimeWait2(_) => { - return ret; + TransferringState::BothTimeWait2 => { + return Ok(( + TransferringState::BothTimeWait2, + None) + ); // do nothing.Just throw away the redundant packets when in TimeWait state } } } } -impl TransferringStateTransition { - fn run(&self, packet: &Packet, client:&mut Peer, server:&mut Peer) -> Result<(TransferringStateWrapper, Option), TcpSessionErr> { - match self.from { - - } - if let Some(err) = &self.ret_err { - match err { - TcpSessionErr::FurtherCheckClosing => { - (sender, receiver) = find_role(); - if receiver. - }, - _ => { - return Err(TcpSessionErr::WrongFlag("During transferring, wrong packet flag.".to_string())); - } - } - } - } +enum ExpectedSender { + Fixed(PeerRole), + Either, + FindByPacket, } -fn session_add_packet() { - let next = session.wrapper.try_step(packet); - let sender = { - // 比较port和ip,判断是client还是server - } - if next is FinWait1 { - session.closing_by = sender_is; - } - TransferringState.into_state(packet, client, server); - if next is err: FurtherCheckClosing { // 当前的状态应该是BothTimeWait1, - assert(currentState == BothTimeWait1); - if closing_by_closingMode == sender_is { - return; - } else { - // 重复发包,直接return +impl PeerRole { + fn get_peer(&self) -> PeerRole { + match self { + PeerRole::Client => PeerRole::Server, + PeerRole::Server => PeerRole::Client, } - return; } } -trait IntoState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr>; -} - pub struct Session { client: Peer, server: Peer, -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: &Packet, client:&mut Peer, server:&mut Peer) -> Result, TcpSessionErr> { - client.state = Box::new(SynSent); - server.state = Box::new(Listen); - client.as_sender(packet); - server.as_receiver(packet); - Ok(None) - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - client.state = Box::new(SynSent); - server.state = Box::new(SynReceived); - client.as_receiver(packet); - server.as_sender(packet); - Ok((TransferringState{ - state_sender: SynReceived, - state_receiver: SynSent, - }, None)) - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - client.state = Box::new(Established); - server.state = Box::new(Established); - Ok((TransferringState{ - state_sender: Established, - state_receiver: Established, - }, None)) - } -} - -fn sender_operation_for_establised(sender: &mut Peer, packet: &Packet) -> Result<(), TcpSessionErr> { - let payload_len = packet.segment.payload.len() as u16; - if sender.used_win + payload_len >= sender.total_win { - return Err(TcpSessionErr::NoWindowSpace); - } - if payload_len > 0 { - sender.add_segment(packet); - } - sender.next_seq += { - if packet.tcp_header.flag_fin || packet.tcp_header.flag_rst { - 1 - } else { - if packet.tcp_header.seq_num < sender.next_seq { // segments out of order - 0 - } else { - packet.segment.payload.len() as u32 + current_state: TransferringState, + closing_by: Option, +} + +impl Session { + fn get_expected_sender(&self, packet: &Packet) -> ExpectedSender { + match self.current_state { + TransferringState::Closed => { + ExpectedSender::Fixed(PeerRole::Client) + }, + TransferringState::SynSent => { + ExpectedSender::Fixed(PeerRole::Server) + }, + TransferringState::SynReceived => { + ExpectedSender::Fixed(PeerRole::Client) + }, + TransferringState::Established => { + ExpectedSender::FindByPacket + }, + TransferringState::FinWait1 => { + ExpectedSender::Fixed(self.closing_by.unwrap().get_peer()) + }, + TransferringState::FinWait2 => { + ExpectedSender::Fixed(self.closing_by.unwrap().get_peer()) + }, + TransferringState::LastAck => { + ExpectedSender::Fixed(self.closing_by.unwrap()) + } + TransferringState::TimeWait => { + ExpectedSender::Either + } + TransferringState::Closing1 => { + ExpectedSender::Fixed(self.closing_by.unwrap().get_peer()) + } + TransferringState::Closing2 => { + ExpectedSender::FindByPacket + } + TransferringState::BothTimeWait1 => { + ExpectedSender::Fixed(self.closing_by.unwrap().get_peer()) + } + TransferringState::BothTimeWait2 => { + ExpectedSender::Either } } - }; - if (packet.tcp_header.flag_fin || packet.tcp_header.flag_rst) && sender.next_seq == sender.ian { - sender.state = Box::new(FinWait1); } - Ok(()) -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - if packet.tcp_header.source_port == client.port && packet.ip_src == client.ip { - sender_operation_for_establised(client, packet)?; - server.state = Box::new(CloseWait); - } else { - sender_operation_for_establised(server, packet)?; - client.state = Box::new(CloseWait); + fn add_packet(&mut self, packet: &Packet) { + let mut sender = PeerRole::Client; + if packet.is_sent_by(&self.server) { + sender = PeerRole::Server; } - if packet.tcp_header.flag_fin || packet.tcp_header.flag_rst { - self.closing_by = { - if packet.tcp_header.source_port == client.port && packet.ip_src == client.ip { - ClosingBy::Client + if let ExpectedSender::Fixed(role) = self.get_expected_sender(packet) { + if role != sender { + return; + } + } + let ret = self.current_state.try_step(packet, &mut self.client, &mut self.server); + match ret { + Err(TcpSessionErr::FurtherCheckClosing) => { + println!("Further check closing"); + let side_peer = { + if sender == PeerRole::Client { + &mut self.client + } else { + &mut self.server + } + }; + + if side_peer.state == SessionState::TimeWait { // receiver has been in timewait state, not an error + return; } else { - ClosingBy::Server + let e = TcpSessionErr::WrongFlag("A peer in closing state expects ack".to_string()); + println!("Error: {:?}", e); + } + }, + Err(e) => { + println!("Error: {:?}", e); + }, + Ok((next_state, policy)) => { + if next_state == TransferringState::FinWait1 ||next_state == TransferringState::Closing1 { + self.closing_by = Some(sender); } - }; - } - Ok((TransferringState{ - closing_by: self.closing_by.clone(), - p: PhantomData, - q: PhantomData, - }, None)) - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - - } -} - -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - - } -} -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - + self.current_state = next_state; + if let Some(policy) = policy { + // if policy is inner policy::update sender, then update closing by + policy_handle.get_action(policy).run(); + } + } + } } } -impl IntoState for TransferringState { - fn into_state(&mut self, packet: & Packet, client: &mut Peer, server: &mut Peer) -> Result<(TransferringState, Option), TcpSessionErr> { - // (sender, receiver) = match self.closing_by { - // ClosingBy::Client: { - // (self.server, self.client) - // }, - // ClosingBy::Server: { - // (self.client, self.server) - // }, - // _: { - // return Err(TcpSessionErr::WrongFlag("During closing, wrong packet.".to_string())); - // } - // } - return Err(TcpSessionErr::WrongFlag("try1.".to_string())); - } +trait VisitSession:TcpCallBackAction { + fn visit_session(&mut self, session: &Session); } - - +impl VisitSession for #[cfg(test)] mod tests { @@ -586,8 +486,3 @@ mod tests { } } -// impl Session { -// fn add_stream(&mut self, packet: TcpHeader) -> Result<(), Error> { -// Ok(()) -// } -// } -- cgit v1.2.3