summaryrefslogtreecommitdiff
path: root/src/main.rs
blob: 9df3beb2ad88c27137579a9a2017a196694369dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use bytes::BytesMut;
//use rust_unix_server::codec_eth::EthCodec;
//use rust_unix_server::codec_ipv4::IPv4Codec;
//use rust_unix_server::codec_tcp::TcpCodec;
use rust_unix_server::codec_pkt::PktCodec;
use std::io;
use tokio::net::UnixListener;
use tokio_stream::StreamExt;
use tokio_util::codec::Decoder;
use tokio_util::codec::Encoder;

#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
pub struct MyCodec<C>(C);

impl<C> MyCodec<C> {
    pub fn new(c: C) -> Self {
        MyCodec(c)
    }
}

impl<C> Decoder for MyCodec<C>
where
    C: Decoder,
    std::io::Error: From<<C as Decoder>::Error>,
{
    type Item = C::Item;
    type Error = io::Error;

    fn decode(&mut self, data: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
        if !data.is_empty() {
            println!("MyCodec -> decode(), handle data len: {}", data.len());

            let decoded = self.0.decode(data)?;
            match decoded {
                None => {
                    return Err(std::io::Error::new(
                        std::io::ErrorKind::InvalidData,
                        format!("Frame of length {} is too large.", 111),
                    ));
                }
                Some(inner) => Ok(Some(inner)),
            }
        } else {
            Ok(None)
        }
    }
}

impl<C, T> Encoder<T> for MyCodec<C>
where
    C: Encoder<T>,
    std::io::Error: From<C::Error>,
{
    type Error = io::Error;

    fn encode(&mut self, _item: T, _buf: &mut BytesMut) -> Result<(), io::Error> {
        // TODO
        Ok(())
    }
}

// nc -U /Users/lwp/Downloads/temp.sock
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = UnixListener::bind("/Users/lwp/Downloads/temp.sock")?;
    loop {
        let (socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut framed = tokio_util::codec::Framed::new(
                socket,
               // MyCodec::new(EthCodec::new(IPv4Codec::new(TcpCodec::new()))),
               PktCodec::new(),
            );
            while let Some(message) = framed.next().await {
                match message {
                    Ok(bytes) => println!("bytes: {:?}", bytes),
                    Err(err) => println!("Socket closed with error: {:?}", err),
                }
            }
            println!("Client closed the connection");
        });
    }
}