use bytes::BytesMut; //use rust_unix_server::codec_eth::EthCodec; //use rust_unix_server::codec_ipv4::IPv4Codec; //use rust_unix_server::codec_tcp::TcpCodec; use rust_unix_server::codec_pkt::PktCodec; use std::io; use tokio::net::UnixListener; use tokio_stream::StreamExt; use tokio_util::codec::Decoder; use tokio_util::codec::Encoder; #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)] pub struct MyCodec(C); impl MyCodec { pub fn new(c: C) -> Self { MyCodec(c) } } impl Decoder for MyCodec where C: Decoder, std::io::Error: From<::Error>, { type Item = C::Item; type Error = io::Error; fn decode(&mut self, data: &mut BytesMut) -> Result, io::Error> { if !data.is_empty() { println!("MyCodec -> decode(), handle data len: {}", data.len()); let decoded = self.0.decode(data)?; match decoded { None => { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, format!("Frame of length {} is too large.", 111), )); } Some(inner) => Ok(Some(inner)), } } else { Ok(None) } } } impl Encoder for MyCodec where C: Encoder, std::io::Error: From, { type Error = io::Error; fn encode(&mut self, _item: T, _buf: &mut BytesMut) -> Result<(), io::Error> { // TODO Ok(()) } } // nc -U /Users/lwp/Downloads/temp.sock #[tokio::main] async fn main() -> Result<(), Box> { let listener = UnixListener::bind("/Users/lwp/Downloads/temp.sock")?; loop { let (socket, _) = listener.accept().await?; tokio::spawn(async move { let mut framed = tokio_util::codec::Framed::new( socket, // MyCodec::new(EthCodec::new(IPv4Codec::new(TcpCodec::new()))), PktCodec::new(), ); while let Some(message) = framed.next().await { match message { Ok(bytes) => println!("bytes: {:?}", bytes), Err(err) => println!("Socket closed with error: {:?}", err), } } println!("Client closed the connection"); }); } }