diff options
| author | luwenpeng <[email protected]> | 2021-03-08 17:33:17 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2021-04-21 13:26:07 +0800 |
| commit | 1fe60d2428f231eb3bd68b0ecfb479ab25689405 (patch) | |
| tree | 1f61a044cf24ffcf3524259ba087033c7c981e23 | |
| parent | 1c37ae746df4e8cb4cb242af5a9a2ce4a95a0d21 (diff) | |
废除 tfe-kmod, tfe 直接与 kni 通信
* 新增 enable_kni_v3=1 配置项
* develop_build_release 分支关闭 ASAN 检测
* 修正根据 CMSG 恢复 TCP 链接时没有正确填写 TCP 时间戳启用选项的问题
| -rw-r--r-- | .gitlab-ci.yml | 2 | ||||
| -rw-r--r-- | ci/travis.sh | 1 | ||||
| -rw-r--r-- | cmake/FindNFNETLINK.cmake | 39 | ||||
| -rw-r--r-- | common/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | common/include/tfe_cmsg.h | 5 | ||||
| -rw-r--r-- | common/include/tfe_pkt_util.h | 59 | ||||
| -rw-r--r-- | common/include/tfe_tcp_restore.h | 47 | ||||
| -rw-r--r-- | common/include/tfe_utils.h | 6 | ||||
| -rw-r--r-- | common/src/tfe_pkt_util.cpp | 245 | ||||
| -rw-r--r-- | common/src/tfe_tcp_restore.cpp | 261 | ||||
| -rw-r--r-- | common/src/tfe_utils.cpp | 11 | ||||
| -rw-r--r-- | conf/tfe/tfe.conf | 8 | ||||
| -rw-r--r-- | platform/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | platform/include/internal/acceptor_kni_v3.h | 7 | ||||
| -rw-r--r-- | platform/include/internal/proxy.h | 2 | ||||
| -rw-r--r-- | platform/src/acceptor_kni_v3.cpp | 675 | ||||
| -rw-r--r-- | platform/src/proxy.cpp | 11 | ||||
| -rw-r--r-- | platform/src/tcp_stream.cpp | 4 | ||||
| -rw-r--r-- | script/service/tfe-env.service | 4 | ||||
| -rw-r--r-- | vendor/CMakeLists.txt | 17 | ||||
| -rw-r--r-- | vendor/libnetfilter_queue-1.0.5.tar.bz2 | bin | 0 -> 313856 bytes |
21 files changed, 1387 insertions, 25 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index cf66410..0b17169 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -65,7 +65,7 @@ develop_build_release: variables: TESTING_VERSION_BUILD: 1 UPLOAD_SYMBOL_FILES: 1 - ASAN_OPTION: ADDRESS + # ASAN_OPTION: ADDRESS BUILD_TYPE: RelWithDebInfo PACKAGE: 1 PULP3_REPO_NAME: tfe-testing-x86_64.el7 diff --git a/ci/travis.sh b/ci/travis.sh index 0a961f1..19470d9 100644 --- a/ci/travis.sh +++ b/ci/travis.sh @@ -37,6 +37,7 @@ yum install -y mrzcpd numactl-devel zlib-devel librdkafka-devel systemd-devel yum install -y libcjson-devel libmaatframe-devel libMESA_field_stat2-devel libMESA_handle_logger-devel yum install -y libMESA_htable-devel libMESA_prof_load-devel librulescan-devel libwiredcfg-devel libWiredLB-devel sapp-devel libbreakpad_mini-devel yum install -y libasan +yum install -y libmnl-devel libnfnetlink-devel if [ $ASAN_OPTION ];then source /opt/rh/devtoolset-7/enable diff --git a/cmake/FindNFNETLINK.cmake b/cmake/FindNFNETLINK.cmake new file mode 100644 index 0000000..e0e7742 --- /dev/null +++ b/cmake/FindNFNETLINK.cmake @@ -0,0 +1,39 @@ +# - Find nfnetlinkDaemon +# Find the nfnetlink daemon library +# +# This module defines the following variables: +# NFNETLINK_FOUND - True if library and include directory are found +# If set to TRUE, the following are also defined: +# NFNETLINK_INCLUDE_DIRS - The directory where to find the header file +# NFNETLINK_LIBRARIES - Where to find the library file +# +# For conveniance, these variables are also set. They have the same values +# than the variables above. The user can thus choose his/her prefered way +# to write them. +# NFNETLINK_LIBRARY +# NFNETLINK_INCLUDE_DIR +# +# This file is in the public domain + +include(FindPkgConfig) +pkg_check_modules(NFNETLINK libnfnetlink) + +if(NOT NFNETLINK_FOUND) + find_path(NFNETLINK_INCLUDE_DIRS NAMES nlibnfnetlink/libnfnetlink.h + DOC "The nfnetlink include directory") + + find_library(NFNETLINK_LIBRARIES NAMES libnfnetlink + DOC "The nfnetlink library") + + # Use some standard module to handle the QUIETLY and REQUIRED arguments, and + # set NFNETLINK_FOUND to TRUE if these two variables are set. + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(NFNETLINK REQUIRED_VARS NFNETLINK_LIBRARIES NFNETLINK_INCLUDE_DIRS) + + if(NFNETLINK_FOUND) + set(NFNETLINK_LIBRARY ${NFNETLINK_LIBRARIES}) + set(NFNETLINK_INCLUDE_DIR ${NFNETLINK_INCLUDE_DIRS}) + endif() +endif() + +mark_as_advanced(NFNETLINK_INCLUDE_DIRS NFNETLINK_LIBRARIES)
\ No newline at end of file diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 69c6ce7..becc6a2 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,4 +1,4 @@ - add_library(common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp src/tfe_kafka_logger.cpp src/tfe_resource.cpp src/tfe_scan.cpp) + add_library(common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp src/tfe_kafka_logger.cpp src/tfe_resource.cpp src/tfe_scan.cpp src/tfe_pkt_util.cpp src/tfe_tcp_restore.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(common PUBLIC libevent-static libevent-static-openssl libevent-static-pthreads) target_link_libraries(common PUBLIC MESA_handle_logger cjson) diff --git a/common/include/tfe_cmsg.h b/common/include/tfe_cmsg.h index df46091..5867cbd 100644 --- a/common/include/tfe_cmsg.h +++ b/common/include/tfe_cmsg.h @@ -25,6 +25,11 @@ enum tfe_cmsg_tlv_type TFE_CMSG_TCP_RESTORE_TS_CLIENT = 0x8, TFE_CMSG_TCP_RESTORE_TS_SERVER = 0x9, TFE_CMSG_TCP_RESTORE_PROTOCOL = 0xa, + TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT = 0xb, + TFE_CMSG_TCP_RESTORE_WINDOW_SERVER = 0xc, + TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR = 0xd, + TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL = 0xe, + TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL = 0xf, TFE_CMSG_POLICY_ID = 0x10, TFE_CMSG_STREAM_TRACE_ID = 0x11, diff --git a/common/include/tfe_pkt_util.h b/common/include/tfe_pkt_util.h new file mode 100644 index 0000000..bd6dc2e --- /dev/null +++ b/common/include/tfe_pkt_util.h @@ -0,0 +1,59 @@ +#ifndef _TFE_PKT_UTIL_H +#define _TFE_PKT_UTIL_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +enum addr_type_t { + ADDR_TYPE_IPV4 = 1, + ADDR_TYPE_IPV6 = 2, +}; + +struct pkt_info { + enum addr_type_t addr_type; + + union { + struct iphdr *v4; + struct ip6_hdr *v6; + } iphdr; + uint16_t iphdr_len; + uint16_t ip_totlen; + + struct tcphdr *tcphdr; + uint16_t tcphdr_len; + + char *data; + uint16_t data_len; + + int parse_failed; +}; + +// always success +void tfe_pkt_parse_ipv4_header(const void *a_packet, struct pkt_info *pktinfo); +// check pktinfo->parse_failed for status +void tfe_pkt_parse_ipv6_header(const void *a_packet, struct pkt_info *pktinfo); + +uint16_t tfe_pkt_checksum_ip(const void *buf, size_t hdr_len); +uint16_t tfe_pkt_checksum_tcp_v4(const void *buf, size_t len, in_addr_t src_addr, in_addr_t dest_addr); +uint16_t tfe_pkt_checksum_tcp_v6(const void *buf, size_t len, struct in6_addr src_addr, struct in6_addr dest_addr); + +/* + * 目的:在 IP 的 Payload ${data} 中查找指定的 tcp ${option}。 + * + * 已知: + * 1.所有的 tcp options 所占的存储空间的长度为 ${optlen} + * 2.${out_opt_buff} 输出缓冲区的最大长度为 ${out_opt_buff_size} + * + * 返回值: + * 1.若找到指定的 tcp ${option} 则返回 1,并将该 tcp ${option} 对应的值拷贝到 ${out_opt_buff} 中,并将拷贝的值所占的存储空间记录到 ${out_optlen} 中 + * 2.若未找到指定的 tcp ${option} 则返回 0 + */ +int tfe_pkt_find_tcp_option(uint8_t option, char *data, unsigned int opts_total_len, uint8_t *out_opt_len, char *out_opt_buff, unsigned int out_opt_buff_size); + +#ifdef __cpluscplus +} +#endif + +#endif
\ No newline at end of file diff --git a/common/include/tfe_tcp_restore.h b/common/include/tfe_tcp_restore.h new file mode 100644 index 0000000..535fdb1 --- /dev/null +++ b/common/include/tfe_tcp_restore.h @@ -0,0 +1,47 @@ +#ifndef _TFE_TCP_RESTORE_H +#define _TFE_TCP_RESTORE_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +enum tcp_restore_pkt_dir +{ + PKT_DIR_NOT_SET = 0x0, + PKT_DIR_C2S = 0x1, + PKT_DIR_S2C = 0x2 +}; + +struct tcp_restore_endpoint +{ + struct sockaddr_storage addr; + uint32_t seq; + uint32_t ack; + uint32_t ts_val; + uint16_t mss; + uint16_t window; + uint8_t wscale; + bool wscale_perm; + bool timestamp_perm; + bool sack_perm; +}; + +struct tcp_restore_info +{ + enum tcp_restore_pkt_dir cur_dir; + struct tcp_restore_endpoint client; + struct tcp_restore_endpoint server; + + char cmsg[2048]; + unsigned int cmsg_len; +}; + +void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info); +int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const struct tcp_restore_endpoint *peer); + +#ifdef __cpluscplus +} +#endif + +#endif
\ No newline at end of file diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index 937b0d3..1b9af35 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -8,6 +8,7 @@ #include <MESA/MESA_htable.h> #include <time.h> #include <dirent.h> //scan_dir +#include <stdbool.h> #define TFE_STRING_MAX 2048 #define TFE_PATH_MAX 256 @@ -103,9 +104,6 @@ do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); abo #define MIN(a, b) (((a) < (b)) ? (a) : (b)) #endif - -int addr_sock_to_layer(struct sockaddr * sock_addr, int sockaddrlen, struct layer_addr * layer_addr); -int addr_layer_to_sock(struct layer_addr * layer_addr, struct sockaddr * sock_addr); char* tfe_strdup(const char* s); char *tfe_thread_safe_ctime(const time_t *tp, char *buf, int len); @@ -172,6 +170,4 @@ int tfe_scandir(const char *dir, struct dirent ***namelist, char *tfe_read_file(const char *filename, size_t *filelen); const char * tfe_version(); -int __wrapper_MESA_htable_set_opt(MESA_htable_handle table, enum MESA_htable_opt opt_type, unsigned value); -int __wrapper_MESA_htable_set_opt(MESA_htable_handle table, enum MESA_htable_opt opt_type, void * val, size_t len); int tfe_decode_base64url(u_char *dst, u_char *src);
\ No newline at end of file diff --git a/common/src/tfe_pkt_util.cpp b/common/src/tfe_pkt_util.cpp new file mode 100644 index 0000000..052f03a --- /dev/null +++ b/common/src/tfe_pkt_util.cpp @@ -0,0 +1,245 @@ +#include <string.h> +#include <netinet/ip.h> +#include <netinet/ip6.h> +#include <linux/tcp.h> + +#include <tfe_utils.h> +#include <tfe_pkt_util.h> + +// always success +void tfe_pkt_parse_ipv4_header(const void *a_packet, struct pkt_info *pktinfo) +{ + pktinfo->addr_type = ADDR_TYPE_IPV4; + + pktinfo->iphdr.v4 = (struct iphdr *)a_packet; + pktinfo->iphdr_len = pktinfo->iphdr.v4->ihl * 4; + pktinfo->ip_totlen = ntohs(pktinfo->iphdr.v4->tot_len); + + pktinfo->tcphdr = (struct tcphdr *)((char *)pktinfo->iphdr.v4 + pktinfo->iphdr_len); + pktinfo->tcphdr_len = pktinfo->tcphdr->doff * 4; + + pktinfo->data = (char *)pktinfo->tcphdr + pktinfo->tcphdr_len; + pktinfo->data_len = pktinfo->ip_totlen - pktinfo->iphdr_len - pktinfo->tcphdr_len; + + pktinfo->parse_failed = 0; +} + +// check pktinfo->parse_failed for status +void tfe_pkt_parse_ipv6_header(const void *a_packet, struct pkt_info *pktinfo) +{ + pktinfo->addr_type = ADDR_TYPE_IPV6; + + pktinfo->iphdr.v6 = (struct ip6_hdr *)a_packet; + pktinfo->ip_totlen = ntohs(pktinfo->iphdr.v6->ip6_ctlun.ip6_un1.ip6_un1_plen) + sizeof(struct ip6_hdr); + + uint8_t next_hdr_type = pktinfo->iphdr.v6->ip6_ctlun.ip6_un1.ip6_un1_nxt; + char *next_hdr_ptr = (char *)pktinfo->iphdr.v6 + sizeof(struct ip6_hdr); + int skip_len = 0; + while (1) + { + switch(next_hdr_type) + { + case IPPROTO_TCP: + //parse tcphdr + pktinfo->iphdr_len = next_hdr_ptr - (char *)a_packet; + + pktinfo->tcphdr = (struct tcphdr *)next_hdr_ptr; + pktinfo->tcphdr_len = pktinfo->tcphdr->doff * 4; + + pktinfo->data = (char *)pktinfo->tcphdr + pktinfo->tcphdr_len; + pktinfo->data_len = pktinfo->ip_totlen - pktinfo->iphdr_len - pktinfo->tcphdr_len; + + pktinfo->parse_failed = 0; + return; + case IPPROTO_HOPOPTS: /* fall through */ + case IPPROTO_ROUTING: /* fall through */ + case IPPROTO_AH: /* fall through */ + case IPPROTO_DSTOPTS: /* fall through */ + skip_len = (*(next_hdr_ptr + 1)) * 8 + 8; + next_hdr_type = *next_hdr_ptr; + next_hdr_ptr += skip_len; + break; + case IPPROTO_NONE: /* fall through */ + default: + pktinfo->parse_failed = 1; + return; + } + } +} + +uint16_t tfe_pkt_checksum_ip(const void *buf, size_t hdr_len) +{ + unsigned long sum = 0; + const uint16_t *ip1; + ip1 = (const uint16_t *)buf; + while (hdr_len > 1) + { + sum += *ip1++; + if (sum & 0x80000000) + { + sum = (sum & 0xFFFF) + (sum >> 16); + } + hdr_len -= 2; + } + while (sum >> 16) + { + sum = (sum & 0xFFFF) + (sum >> 16); + } + return (~sum); +} + +uint16_t tfe_pkt_checksum_tcp_v4(const void *_buf, size_t len, in_addr_t src_addr, in_addr_t dest_addr) +{ + const uint16_t *buf = (u_int16_t *)_buf; + uint16_t *ip_src = (uint16_t *)&src_addr; + uint16_t *ip_dst = (uint16_t *)&dest_addr; + uint32_t sum; + size_t length = len; + + // Calculate the sum + sum = 0; + while (len > 1) + { + sum += *buf++; + if (sum & 0x80000000) + { + sum = (sum & 0xFFFF) + (sum >> 16); + } + len -= 2; + } + + if (len & 1) + { + // Add the padding if the packet lenght is odd + sum += *((uint8_t *)buf); + } + + // Add the pseudo-header + sum += *(ip_src++); + sum += *ip_src; + sum += *(ip_dst++); + sum += *ip_dst; + sum += htons(IPPROTO_TCP); + sum += htons(length); + + // Add the carries + while (sum >> 16) + { + sum = (sum & 0xFFFF) + (sum >> 16); + } + + // Return the one's complement of sum + return ((uint16_t)(~sum)); +} + +uint16_t tfe_pkt_checksum_tcp_v6(const void *_buf, size_t len, struct in6_addr src_addr, struct in6_addr dest_addr) +{ + const uint16_t *buf = (u_int16_t *)_buf; + uint16_t *ip_src = (uint16_t *)&src_addr; + uint16_t *ip_dst = (uint16_t *)&dest_addr; + uint32_t sum; + size_t length = len; + int i = 0; + + // Calculate the sum + sum = 0; + while (len > 1) + { + sum += *buf++; + if (sum & 0x80000000) + { + sum = (sum & 0xFFFF) + (sum >> 16); + } + len -= 2; + } + + if (len & 1) + { + // Add the padding if the packet lenght is odd + sum += *((uint8_t *)buf); + } + + // Add the pseudo-header + for (i = 0; i < 8; i++) + { + sum += *ip_src; + ip_src++; + } + for ( i = 0; i < 8; i++) + { + sum += *ip_dst; + ip_dst++; + } + sum += htons(IPPROTO_TCP); + sum += htons(length); + + // Add the carries + while (sum >> 16) + { + sum = (sum & 0xFFFF) + (sum >> 16); + } + + // Return the one's complement of sum + return ((uint16_t)(~sum)); +} + +/* + * 目的:在 IP 的 Payload ${data} 中查找指定的 tcp ${option}。 + * + * 已知: + * 1.所有的 tcp options 所占的存储空间的长度为 ${optlen} + * 2.${out_opt_buff} 输出缓冲区的最大长度为 ${out_opt_buff_size} + * + * 返回值: + * 1.若找到指定的 tcp ${option} 则返回 1,并将该 tcp ${option} 对应的值拷贝到 ${out_opt_buff} 中,并将拷贝的值所占的存储空间记录到 ${out_optlen} 中 + * 2.若未找到指定的 tcp ${option} 则返回 0 + */ +int tfe_pkt_find_tcp_option(uint8_t option, char *data, unsigned int opts_total_len, uint8_t *out_opt_len, char *out_opt_buff, unsigned int out_opt_buff_size) +{ + const uint8_t *op; + unsigned int i; + + if (!opts_total_len) + return 0; + + op = (uint8_t *)(data + sizeof(struct tcphdr)); + for (i = 0; i < opts_total_len;) + { + uint8_t __optlen; + uint8_t __valuelen; + + if (op[i] == option) + { + if (op[i] < 2) + { + *out_opt_len = 0; + return 1; + } + + __optlen = op[i + 1]; + if (__optlen <= 2) + { + TFE_LOG_ERROR(g_default_logger, "failed at parse tcp options, tcp option length must be larger than 2, but the value is %u", __optlen); + return 0; + } + + __valuelen = __optlen - 2; + if (__valuelen > out_opt_buff_size) + { + TFE_LOG_ERROR(g_default_logger, "failed at parse tcp options, tcp option length is larger than input buffer"); + return 0; + } + + *out_opt_len = __valuelen; + memcpy(out_opt_buff, &op[i + 2], __valuelen); + return 1; + } + + if (op[i] < 2) + i++; + else + i += op[i + 1] ?: 1; + } + + return 0; +}
\ No newline at end of file diff --git a/common/src/tfe_tcp_restore.cpp b/common/src/tfe_tcp_restore.cpp new file mode 100644 index 0000000..08221b3 --- /dev/null +++ b/common/src/tfe_tcp_restore.cpp @@ -0,0 +1,261 @@ +#include <stdbool.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> +#include <assert.h> +#include <arpa/inet.h> +#include <linux/tcp.h> + +#include <tfe_utils.h> +#include <tfe_tcp_restore.h> + +static unsigned int fd_so_mask = 0x65; + +void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info) +{ + char str_client_addr[64] = { 0 }; + char str_server_addr[64] = { 0 }; + + const struct tcp_restore_endpoint *client = &info->client; + const struct tcp_restore_endpoint *server = &info->server; + + assert(client->addr.ss_family == server->addr.ss_family); + + if (client->addr.ss_family == AF_INET) + { + struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr; + struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr; + uint16_t port_client = ntohs(sk_client->sin_port); + uint16_t port_server = ntohs(sk_server->sin_port); + + inet_ntop(AF_INET, &sk_client->sin_addr, str_client_addr, sizeof(str_client_addr)); + inet_ntop(AF_INET, &sk_server->sin_addr, str_server_addr, sizeof(str_client_addr)); + + TFE_LOG_DEBUG(g_default_logger, "tcp_restore_info %p: cur_dir=%u, %s:%hu->%s:%hu, seq=%u, ack=%u, " + "client={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }, " + "server={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }", + info, info->cur_dir, str_client_addr, port_client, str_server_addr, port_server, info->client.seq, info->client.ack, + client->mss, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), + server->mss, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); + } + else if (client->addr.ss_family == AF_INET6) + { + struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr; + struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr; + uint16_t port_client = ntohs(sk_client->sin6_port); + uint16_t port_server = ntohs(sk_server->sin6_port); + + inet_ntop(AF_INET6, &sk_client->sin6_addr, str_client_addr, sizeof(str_client_addr)); + inet_ntop(AF_INET6, &sk_server->sin6_addr, str_server_addr, sizeof(str_client_addr)); + + TFE_LOG_DEBUG(g_default_logger, "tcp_restore_info %p: cur_dir=%u, %s:%hu->%s:%hu, seq=%u, ack=%u, " + "client={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }, " + "server={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }", + info, info->cur_dir, str_client_addr, port_client, str_server_addr, port_server, info->client.seq, info->client.ack, + client->mss, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), + server->mss, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); + } +} + +int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const struct tcp_restore_endpoint *peer) +{ + int result = 0; + int sockopt = 0; + int sockfd = 0; + + unsigned int nr_tcp_repair_opts = 0; + struct tcp_repair_opt tcp_repair_opts[8]; + struct tcp_repair_window tcp_repair_window = { 0 }; + + if (endpoint->addr.ss_family == AF_INET) + { + sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP); + } + else if (endpoint->addr.ss_family == AF_INET6) + { + sockfd = socket(AF_INET6, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP); + } + else + { + errno = EINVAL; + TFE_LOG_ERROR(g_default_logger, "failed at tcp_restore_fd_create(), %d: %s", errno, strerror(errno)); + goto errout; + } + + if (sockfd < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at socket(), %d: %s", errno, strerror(errno)); + goto errout; + } + + // Setup TCP REPAIR Status + sockopt = 1; + result = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(SO_REUSEADDR), %d: %s", errno, strerror(errno)); + goto errout; + } + + sockopt = 1; + result = setsockopt(sockfd, SOL_IP, IP_TRANSPARENT, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(IP_TRANSPARENT), %d: %s", errno, strerror(errno)); + goto errout; + } + + sockopt = 1; + result = setsockopt(sockfd, IPPROTO_TCP, TCP_REPAIR, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_REPAIR), %d: %s", errno, strerror(errno)); + goto errout; + } + + sockopt = fd_so_mask; + result = setsockopt(sockfd, SOL_SOCKET, SO_MARK, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(SO_MARK), %d: %s", errno, strerror(errno)); + goto errout; + } + + // Setup SEQ/ACK and TCP options + sockopt = TCP_SEND_QUEUE; + result = setsockopt(sockfd, IPPROTO_TCP, TCP_REPAIR_QUEUE, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_REPAIR_QUEUE), %d: %s", errno, strerror(errno)); + goto errout; + } + + sockopt = endpoint->seq; + result = setsockopt(sockfd, IPPROTO_TCP, TCP_QUEUE_SEQ, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_QUEUE_SEQ), %d: %s", errno, strerror(errno)); + goto errout; + } + + sockopt = TCP_RECV_QUEUE; + result = setsockopt(sockfd, IPPROTO_TCP, TCP_REPAIR_QUEUE, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_REPAIR_QUEUE), %d: %s", errno, strerror(errno)); + goto errout; + } + + sockopt = endpoint->ack; + result = setsockopt(sockfd, IPPROTO_TCP, TCP_QUEUE_SEQ, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_QUEUE_SEQ), %d: %s", errno, strerror(errno)); + goto errout; + } + +#ifndef TCPOPT_MAXSEG +#define TCPOPT_MAXSEG 2 +#endif + +#ifndef TCPOPT_WINDOW +#define TCPOPT_WINDOW 3 +#endif + +#ifndef TCPOPT_SACK_PERMITTED +#define TCPOPT_SACK_PERMITTED 4 +#endif + +#ifndef TCPOPT_TIMESTAMP +#define TCPOPT_TIMESTAMP 8 +#endif + + tcp_repair_opts[nr_tcp_repair_opts].opt_code = TCPOPT_MAXSEG; + tcp_repair_opts[nr_tcp_repair_opts].opt_val = MIN(endpoint->mss, peer->mss); + nr_tcp_repair_opts++; + + if (endpoint->sack_perm && peer->sack_perm) + { + tcp_repair_opts[nr_tcp_repair_opts].opt_code = TCPOPT_SACK_PERMITTED; + tcp_repair_opts[nr_tcp_repair_opts].opt_val = 0; + nr_tcp_repair_opts++; + } + + if (endpoint->wscale_perm && peer->wscale_perm) + { + tcp_repair_opts[nr_tcp_repair_opts].opt_code = TCPOPT_WINDOW; + tcp_repair_opts[nr_tcp_repair_opts].opt_val = (endpoint->wscale << 16) | peer->wscale; + nr_tcp_repair_opts++; + } + + if (endpoint->timestamp_perm && peer->timestamp_perm) + { + tcp_repair_opts[nr_tcp_repair_opts].opt_code = TCPOPT_TIMESTAMP; + tcp_repair_opts[nr_tcp_repair_opts].opt_val = 0; + nr_tcp_repair_opts++; + } + + // Bind address and connect to peer endpoint + result = bind(sockfd, (struct sockaddr *)&endpoint->addr, sizeof(endpoint->addr)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at bind(), %d: %s", errno, strerror(errno)); + goto errout; + } + + result = connect(sockfd, (struct sockaddr *)&peer->addr, sizeof(peer->addr)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at connect(), %d: %s", errno, strerror(errno)); + goto errout; + } + + result = setsockopt(sockfd, IPPROTO_TCP, TCP_REPAIR_OPTIONS, (char *)tcp_repair_opts, nr_tcp_repair_opts * sizeof(struct tcp_repair_opt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_REPAIR_OPTIONS), %d: %s", errno, strerror(errno)); + goto errout; + } + + if (endpoint->timestamp_perm && peer->timestamp_perm) + { + result = setsockopt(sockfd, IPPROTO_TCP, TCP_TIMESTAMP, &(endpoint->ts_val), sizeof(endpoint->ts_val)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_TIMESTAMP), %d: %s", errno, strerror(errno)); + goto errout; + } + } + + // Perpare Window Setup + tcp_repair_window.snd_wl1 = peer->seq; + tcp_repair_window.snd_wnd = peer->window; + tcp_repair_window.max_window = peer->window; + tcp_repair_window.rcv_wnd = endpoint->window; + tcp_repair_window.rcv_wup = endpoint->ack; + + result = setsockopt(sockfd, IPPROTO_TCP, TCP_REPAIR_WINDOW, (char *)&tcp_repair_window, sizeof(tcp_repair_window)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_REPAIR_WINDOW), %d: %s", errno, strerror(errno)); + goto errout; + } + + sockopt = 0; + result = setsockopt(sockfd, IPPROTO_TCP, TCP_REPAIR, (char *)&sockopt, sizeof(sockopt)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(TCP_REPAIR), %d: %s", errno, strerror(errno)); + goto errout; + } + + return sockfd; + +errout: + if (sockfd > 0) + { + close(sockfd); + } + + return -1; +} diff --git a/common/src/tfe_utils.cpp b/common/src/tfe_utils.cpp index 8f2fd38..5f66d30 100644 --- a/common/src/tfe_utils.cpp +++ b/common/src/tfe_utils.cpp @@ -7,17 +7,6 @@ #include <time.h> #include <assert.h> -int addr_sock_to_layer(struct sockaddr * sock_addr, int sockaddrlen, struct layer_addr * layer_addr) -{ - return 0; -} - -int addr_layer_to_sock(struct layer_addr * layer_addr, struct sockaddr * sock_addr) -{ - int sockaddrlen=-1; - return sockaddrlen; -} - //functioned as strdup, for dictator compatible. char* tfe_strdup(const char* s) { diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index bb2c35f..123117c 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -2,6 +2,7 @@ nr_worker_threads=8 enable_kni_v1=0 enable_kni_v2=1 +enable_kni_v3=0 # Only when (disable_coredump == 1 || (enable_breakpad == 1 && enable_breakpad_upload == 1)) is satisfied, the core will not be generated locally disable_coredump=0 @@ -19,6 +20,13 @@ cpu_affinity_mask=1-9 # LEAST_CONN = 0; ROUND_ROBIN = 1 load_balance=1 +# for enable kni v3 +[nfq] +queue_id=1 +queue_maxlen=655350 +queue_rcvbufsiz=983025000 +queue_no_enobufs=1 + [kni] # kni v1 #uxdomain=/var/run/.tfe_kni_acceptor_handler diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index cd9f472..71912ea 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -1,6 +1,7 @@ find_package(SYSTEMD REQUIRED) +find_package(NFNETLINK REQUIRED) -add_executable(tfe src/acceptor_kni_v1.cpp src/acceptor_kni_v2.cpp src/ssl_stream.cpp src/key_keeper.cpp src/ssl_fetch_cert.cpp +add_executable(tfe src/acceptor_kni_v1.cpp src/acceptor_kni_v2.cpp src/acceptor_kni_v3.cpp src/ssl_stream.cpp src/key_keeper.cpp src/ssl_fetch_cert.cpp src/ssl_sess_cache.cpp src/ssl_sess_ticket.cpp src/ssl_service_cache.cpp src/ssl_trusted_cert_storage.cpp src/ev_root_ca_metadata.cpp src/ssl_utils.cpp src/tcp_stream.cpp src/main.cpp src/proxy.cpp src/sender_scm.cpp src/watchdog_kni.cpp src/ssl_ja3.cpp) @@ -10,7 +11,8 @@ target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/interna target_include_directories(tfe PRIVATE ${SYSTEMD_INCLUDE_DIRS}) target_link_libraries(tfe common tango-cache-client) -target_link_libraries(tfe pthread dl +target_link_libraries(tfe pthread dl nfnetlink + libnetfilter_queue-static openssl-ssl-static openssl-crypto-static libevent-static diff --git a/platform/include/internal/acceptor_kni_v3.h b/platform/include/internal/acceptor_kni_v3.h new file mode 100644 index 0000000..cc915e0 --- /dev/null +++ b/platform/include/internal/acceptor_kni_v3.h @@ -0,0 +1,7 @@ +#pragma once + +struct tfe_proxy; +struct acceptor_kni_v3; + +struct acceptor_kni_v3 *acceptor_kni_v3_create(struct tfe_proxy *proxy, const char *profile, void *logger); +void acceptor_kni_v3_destroy(struct acceptor_kni_v3 *ctx); diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index e0978fc..5b91e94 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -111,9 +111,11 @@ struct tfe_proxy unsigned int en_kni_v1_acceptor; unsigned int en_kni_v2_acceptor; + unsigned int en_kni_v3_acceptor; struct acceptor_kni_v1 * kni_v1_acceptor; struct acceptor_kni_v2 * kni_v2_acceptor; + struct acceptor_kni_v3 * kni_v3_acceptor; struct sender_scm * scm_sender; struct watchdog_kni * watchdog_kni; diff --git a/platform/src/acceptor_kni_v3.cpp b/platform/src/acceptor_kni_v3.cpp new file mode 100644 index 0000000..87f1244 --- /dev/null +++ b/platform/src/acceptor_kni_v3.cpp @@ -0,0 +1,675 @@ +#include <sys/prctl.h> +#include <unistd.h> +#include <netinet/ip.h> +#include <netinet/ip6.h> +#include <linux/tcp.h> +#include <linux/netfilter.h> // for NF_ACCEPT +#include <libnetfilter_queue/libnetfilter_queue.h> + +#include <tfe_cmsg.h> +#include <proxy.h> +#include <tfe_pkt_util.h> +#include <tfe_tcp_restore.h> +#include <MESA/MESA_prof_load.h> + +#define TCP_RESTORE_TCPOPT_KIND 88 + +struct acceptor_kni_v3 +{ + struct tfe_proxy *proxy; + const char *profile; + + struct nfq_handle *h; + struct nfq_q_handle *qh; + int fd_nfq_socket; + struct event_base *ev_base; + struct event *ev_nfq_socket; + struct timespec start; + struct timespec end; + pthread_t thread; + + unsigned int queue_id; + unsigned int queue_maxlen; + unsigned int queue_rcvbufsiz; + unsigned int queue_no_enobufs; +}; + +#define TCP_RESTORE_TCPOPT_KIND 88 + +struct tcp_restore_info_tlv +{ + uint16_t type; + uint16_t length; + + union { + uint8_t value_as_uint8[0]; + uint16_t value_as_uint16[0]; + uint32_t value_as_uint32[0]; + unsigned char value_as_string[0]; + }; + +} __attribute__((packed)); + +struct tcp_restore_info_header +{ + uint8_t __magic__[2]; /* Must be 0x4d, 0x5a */ + uint16_t nr_tlvs; + struct tcp_restore_info_tlv tlvs[0]; +} __attribute__((packed)); + +static int tcp_restore_info_parse_from_cmsg(const char *data, unsigned int datalen, struct tcp_restore_info *out) +{ + unsigned int tlv_iter; + unsigned int nr_tlvs; + struct tcp_restore_info_header *header = (struct tcp_restore_info_header *)data; + + if (header->__magic__[0] != 0x4d || header->__magic__[1] != 0x5a) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options from cmsg, wrong magic"); + goto invalid_format; + } + + nr_tlvs = ntohs(header->nr_tlvs); + if (nr_tlvs >= 256) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options from cmsg, numbers of tlvs is larger than 256"); + goto invalid_format; + } + + if (datalen < sizeof(struct tcp_restore_info_header)) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options from cmsg, length is shorter than tlv header"); + goto invalid_format; + } + + memcpy(out->cmsg, data, datalen); + out->cmsg_len = datalen; + + datalen -= sizeof(struct tcp_restore_info_header); + data += sizeof(struct tcp_restore_info_header); + + for (tlv_iter = 0; tlv_iter < nr_tlvs; tlv_iter++) + { + struct tcp_restore_info_tlv *tlv = (struct tcp_restore_info_tlv *)data; + uint16_t tlv_type = ntohs(tlv->type); + uint16_t tlv_length = ntohs(tlv->length); + + unsigned int __length = tlv_length; + if (datalen < __length) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options from cmsg, left space is smaller than tlv's length, " + "datalen is %u, tlv's length is %u", datalen, __length); + goto invalid_format; + } + + if (tlv_length < sizeof(uint16_t) * 2) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options from cmsg, invalid tlv length, should larger than sizeof(type) + sizeof(length)"); + goto invalid_format; + } + + tlv_length -= sizeof(uint16_t) * 2; + +#define __CHECK_TLV_LENGTH(x) \ + do \ + { \ + if (x != tlv_length) \ + { \ + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options from cmsg, invalid tlv length, should be %u, actually is %u", \ + (unsigned int)x, (unsigned int)tlv_length); \ + goto invalid_format; \ + } \ + } while (0) + + switch (tlv_type) + { + case TFE_CMSG_TCP_RESTORE_SEQ: + __CHECK_TLV_LENGTH(sizeof(uint32_t)); + out->client.seq = ntohl(tlv->value_as_uint32[0]); + out->server.ack = ntohl(tlv->value_as_uint32[0]); + break; + + case TFE_CMSG_TCP_RESTORE_ACK: + __CHECK_TLV_LENGTH(sizeof(uint32_t)); + out->client.ack = ntohl(tlv->value_as_uint32[0]); + out->server.seq = ntohl(tlv->value_as_uint32[0]); + break; + + case TFE_CMSG_TCP_RESTORE_TS_CLIENT: + __CHECK_TLV_LENGTH(sizeof(uint8_t)); + out->client.timestamp_perm = !!(tlv->value_as_uint8[0]); + break; + + case TFE_CMSG_TCP_RESTORE_TS_SERVER: + __CHECK_TLV_LENGTH(sizeof(uint8_t)); + out->server.timestamp_perm = !!(tlv->value_as_uint8[0]); + break; + + case TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL: + __CHECK_TLV_LENGTH(sizeof(uint32_t)); + out->client.ts_val = ntohl(tlv->value_as_uint32[0]); + break; + + case TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL: + __CHECK_TLV_LENGTH(sizeof(uint32_t)); + out->server.ts_val = ntohl(tlv->value_as_uint32[0]); + break; + + case TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT: + __CHECK_TLV_LENGTH(sizeof(uint8_t)); + out->client.wscale_perm = true; + out->client.wscale = tlv->value_as_uint8[0]; + break; + + case TFE_CMSG_TCP_RESTORE_WSACLE_SERVER: + __CHECK_TLV_LENGTH(sizeof(uint8_t)); + out->server.wscale_perm = true; + out->server.wscale = tlv->value_as_uint8[0]; + break; + + case TFE_CMSG_TCP_RESTORE_SACK_CLIENT: + __CHECK_TLV_LENGTH(sizeof(uint8_t)); + out->client.sack_perm = true; + break; + + case TFE_CMSG_TCP_RESTORE_SACK_SERVER: + __CHECK_TLV_LENGTH(sizeof(uint8_t)); + out->server.sack_perm = true; + break; + + case TFE_CMSG_TCP_RESTORE_MSS_CLIENT: + __CHECK_TLV_LENGTH(sizeof(uint16_t)); + out->client.mss = ntohs(tlv->value_as_uint16[0]); + break; + + case TFE_CMSG_TCP_RESTORE_MSS_SERVER: + __CHECK_TLV_LENGTH(sizeof(uint16_t)); + out->server.mss = ntohs(tlv->value_as_uint16[0]); + break; + + case TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT: + __CHECK_TLV_LENGTH(sizeof(uint16_t)); + out->client.window = ntohs(tlv->value_as_uint16[0]); + break; + + case TFE_CMSG_TCP_RESTORE_WINDOW_SERVER: + __CHECK_TLV_LENGTH(sizeof(uint16_t)); + out->server.window = ntohs(tlv->value_as_uint16[0]); + break; + + case TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR: + __CHECK_TLV_LENGTH(sizeof(uint8_t)); + out->cur_dir = (enum tcp_restore_pkt_dir)(tlv->value_as_uint8[0]); + + default: + break; + } + + data += __length; + datalen -= __length; + } + + return 0; + +invalid_format: + return -EINVAL; +} + +static void tcp_restore_info_parse_from_pkt(struct pkt_info *pktinfo, struct tcp_restore_info *out) +{ + if (pktinfo->addr_type == ADDR_TYPE_IPV4) + { + struct iphdr *iphdr = pktinfo->iphdr.v4; + struct tcphdr *tcphdr = pktinfo->tcphdr; + + struct sockaddr_in *in_addr_client; + struct sockaddr_in *in_addr_server; + + if (out->cur_dir == PKT_DIR_NOT_SET || out->cur_dir == PKT_DIR_C2S) + { + in_addr_client = (struct sockaddr_in *)&out->client.addr; + in_addr_server = (struct sockaddr_in *)&out->server.addr; + } + else + { + in_addr_client = (struct sockaddr_in *)&out->server.addr; + in_addr_server = (struct sockaddr_in *)&out->client.addr; + } + + in_addr_client->sin_family = AF_INET; + in_addr_client->sin_addr.s_addr = iphdr->saddr; + in_addr_client->sin_port = tcphdr->source; + + in_addr_server->sin_family = AF_INET; + in_addr_server->sin_addr.s_addr = iphdr->daddr; + in_addr_server->sin_port = tcphdr->dest; + } + + if (pktinfo->addr_type == ADDR_TYPE_IPV6) + { + struct ip6_hdr *ipv6hdr = (struct ip6_hdr *)(pktinfo->iphdr.v6); + struct tcphdr *tcphdr = pktinfo->tcphdr; + + struct sockaddr_in6 *in6_addr_client; + struct sockaddr_in6 *in6_addr_server; + + if (out->cur_dir == PKT_DIR_NOT_SET || out->cur_dir == PKT_DIR_C2S) + { + in6_addr_client = (struct sockaddr_in6 *)&out->client.addr; + in6_addr_server = (struct sockaddr_in6 *)&out->server.addr; + } + else + { + in6_addr_client = (struct sockaddr_in6 *)&out->server.addr; + in6_addr_server = (struct sockaddr_in6 *)&out->client.addr; + } + + in6_addr_client->sin6_family = AF_INET6; + in6_addr_client->sin6_addr = ipv6hdr->ip6_src; + in6_addr_client->sin6_port = tcphdr->source; + + in6_addr_server->sin6_family = AF_INET6; + in6_addr_server->sin6_addr = ipv6hdr->ip6_dst; + in6_addr_server->sin6_port = tcphdr->dest; + } +} + +/* + * nfmsg : message objetc that contains the packet + * nfad : Netlink packet data handle + */ +static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, struct nfq_data *nfa, void *data) +{ + int id = 0; + int ret = 0; + int fd_downstream = 0; + int fd_upstream = 0; + int hit_tcpopt = 0; + uint16_t cmsg_offset = 0; + uint8_t restore_opt_len = 0; + int raw_payload_len = 0; + unsigned int cmsg_payload_len = 0; + char *cmsg_payload = NULL; + uint64_t jiffies_us = 0; + unsigned char *raw_payload = NULL; + struct iphdr *iphdr = NULL; + struct tfe_cmsg *cmsg = NULL; + struct pkt_info pktinfo; + struct tcp_restore_info restore_info; + struct acceptor_kni_v3 *__ctx = (struct acceptor_kni_v3 *)data; + clock_gettime(CLOCK_MONOTONIC, &(__ctx->start)); + memset(&pktinfo, 0, sizeof(pktinfo)); + memset(&restore_info, 0, sizeof(restore_info)); + + struct nfqnl_msg_packet_hdr *ph = nfq_get_msg_packet_hdr(nfa); + if (ph == NULL) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_get_msg_packet_hdr(), result is NULL"); + goto end; + } + id = ntohl(ph->packet_id); + + raw_payload_len = nfq_get_payload(nfa, &raw_payload); + if ((unsigned int)raw_payload_len <= (MIN(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr))) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_get_payload(), paylod len is %d", raw_payload_len); + goto end; + } + + iphdr = (struct iphdr *)raw_payload; + if (iphdr->version == 4) + { + if (iphdr->protocol == IPPROTO_TCP) + { + tfe_pkt_parse_ipv4_header(raw_payload, &pktinfo); + } + else + { + TFE_LOG_ERROR(g_default_logger, "failed at parse IPv4 header, sub protocol not tcp"); + goto end; + } + } + else + { + tfe_pkt_parse_ipv6_header(raw_payload, &pktinfo); + if (pktinfo.parse_failed) + { + TFE_LOG_ERROR(g_default_logger, "failed at parse IPv6 header, sub protocol not tcp"); + goto end; + } + } + + if (pktinfo.ip_totlen > raw_payload_len) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser IP header, invalid ip header totlen"); + goto end; + } + + // check if there is a tcp options + if (pktinfo.tcphdr_len <= sizeof(struct tcphdr)) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP header, tcp header len too small"); + goto end; + } + + // Parse tcp options + hit_tcpopt = tfe_pkt_find_tcp_option(TCP_RESTORE_TCPOPT_KIND, (char *)pktinfo.tcphdr, pktinfo.tcphdr_len - sizeof(struct tcphdr), + &restore_opt_len, (char *)&cmsg_offset, sizeof(cmsg_offset)); + if (!hit_tcpopt || restore_opt_len != 2) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options, tcp option hit:%d, opt len:%d", hit_tcpopt, restore_opt_len); + goto end; + } + + cmsg_offset = ntohs(cmsg_offset); + cmsg_payload = (char *)(pktinfo.data + cmsg_offset); + cmsg_payload_len = pktinfo.data_len - cmsg_offset; + + // 从 cmsg 中解析信息存储到 restore_info 中 + ret = tcp_restore_info_parse_from_cmsg(cmsg_payload, cmsg_payload_len, &restore_info); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at parser TCP options from cmsg"); + goto end; + } + + tcp_restore_info_parse_from_pkt(&pktinfo, &restore_info); + + // Remove cmsg from payload + pktinfo.ip_totlen = pktinfo.ip_totlen - cmsg_payload_len; + if (pktinfo.addr_type == ADDR_TYPE_IPV4) + { + pktinfo.iphdr.v4->tot_len = htons(pktinfo.ip_totlen); + pktinfo.iphdr.v4->check = 0; + pktinfo.iphdr.v4->check = tfe_pkt_checksum_ip((void*)pktinfo.iphdr.v4, pktinfo.iphdr_len); + pktinfo.tcphdr->check = 0; + pktinfo.tcphdr->check = tfe_pkt_checksum_tcp_v4((void*)pktinfo.tcphdr, pktinfo.ip_totlen - pktinfo.iphdr_len, pktinfo.iphdr.v4->saddr, pktinfo.iphdr.v4->daddr); + } + if (pktinfo.addr_type == ADDR_TYPE_IPV6) + { + pktinfo.iphdr.v6->ip6_ctlun.ip6_un1.ip6_un1_plen = 0; + pktinfo.iphdr.v6->ip6_ctlun.ip6_un1.ip6_un1_plen = htons(pktinfo.ip_totlen - sizeof(struct ip6_hdr)); + // IPv6 header no checksum + pktinfo.tcphdr->check = 0; + pktinfo.tcphdr->check = tfe_pkt_checksum_tcp_v6((void*)pktinfo.tcphdr, pktinfo.ip_totlen - pktinfo.iphdr_len, pktinfo.iphdr.v6->ip6_src, pktinfo.iphdr.v6->ip6_dst); + } + + tfe_tcp_restore_info_dump(&restore_info); + + // tcp repair C2S + fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server)); + if (fd_upstream < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at tcp_restore_fd_create(UPSTREAM)"); + goto end; + } + + // tcp repair S2C + fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client)); + if (fd_downstream < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at tcp_restore_fd_create(DOWNSTREAM)"); + goto end; + } + + if (tfe_cmsg_deserialize((const unsigned char *)restore_info.cmsg, restore_info.cmsg_len, &cmsg) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at tfe_cmsg_deserialize()"); + goto end; + } + + if (tfe_proxy_fds_accept(__ctx->proxy, fd_downstream, fd_upstream, cmsg) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at tfe_proxy_fds_accept()"); + goto end; + } + TFE_PROXY_STAT_INCREASE(STAT_FD_OPEN_BY_KNI_ACCEPT, 2); + + clock_gettime(CLOCK_MONOTONIC, &(__ctx->end)); + jiffies_us = (__ctx->end.tv_sec - __ctx->start.tv_sec) * 1000 * 1000 + (__ctx->end.tv_nsec - __ctx->start.tv_nsec) / 1000; + TFE_LOG_DEBUG(g_default_logger, "nfqueue tcp_restore=%p time=%ldus hw_protocol=0x%04x hook=%u id=%010u protocol=%s total_len=%d inject_len=%d iphdr_len=%d tcphdr_len=%d data_len=%d", + &restore_info, jiffies_us, ntohs(ph->hw_protocol), ph->hook, id, (pktinfo.addr_type == ADDR_TYPE_IPV4 ? "IPv4" : "IPv6"), + raw_payload_len, pktinfo.ip_totlen, pktinfo.iphdr_len, pktinfo.tcphdr_len, pktinfo.data_len); + /* + * NF_DROP : discarded the packet + * NF_ACCEPT : the packet passes, continue iterations + * NF_QUEUE : inject the packet into a different queue (the target queue number is in the high 16 bits of the verdict) + * NF_REPEAT : iterate the same cycle once more + * NF_STOP : accept, but don't continue iterations + */ + // nfq_set_verdict() + // nfq_set_verdict2() + // nfq_set_verdict_batch() + // nfq_set_verdict_batch2() + // nfq_set_verdict_mark() + + return nfq_set_verdict(qh, id, NF_ACCEPT, pktinfo.ip_totlen, raw_payload); + +end: + if (fd_upstream > 0) + { + TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, 1); + close(fd_upstream); + } + if (fd_downstream > 0) + { + TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, 1); + close(fd_downstream); + } + return nfq_set_verdict(qh, id, NF_ACCEPT, 0, NULL); +} + +void acceptor_kni_v3_event(evutil_socket_t fd, short what, void *user) +{ + struct acceptor_kni_v3 *__ctx = (struct acceptor_kni_v3 *) user; + assert(__ctx != NULL && __ctx->thread == pthread_self()); + assert(what & EV_READ); + + char buf[4096] __attribute__ ((aligned)); + int rv; + + rv = recv(fd, buf, sizeof(buf), 0); + if (rv >= 0) + { + TFE_LOG_DEBUG(g_default_logger, "nfqueue acceptor thread recv %d bytes form nfqueue fd %d", rv, fd); + nfq_handle_packet(__ctx->h, buf, rv); + return; + } + else + { + /* if your application is too slow to digest the packets that + * are sent from kernel-space, the socket buffer that we use + * to enqueue packets may fill up returning ENOBUFS. Depending + * on your application, this error may be ignored. Please, see + * the doxygen documentation of this library on how to improve + * this situation. + */ + if (errno == ENOBUFS) + { + TFE_LOG_ERROR(g_default_logger, "nfqueue losing packets!"); + } + + TFE_LOG_ERROR(g_default_logger, "failed at recv() data from nfqueue, %d: %s", errno, strerror(errno)); + } +} + +void *acceptor_kni_v3_event_thread_entry(void *args) +{ + struct acceptor_kni_v3 *__ctx = (struct acceptor_kni_v3 *)args; + assert(__ctx != NULL && __ctx->thread == pthread_self()); + + char thread_name[16] = { 0 }; + snprintf(thread_name, sizeof(thread_name), "tfe:acceptor-v3"); + prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); + + char affinity[32] = {0}; + if (__ctx->proxy->enable_cpu_affinity) + { + tfe_thread_set_affinity(__ctx->proxy->cpu_affinity_mask[0]); + snprintf(affinity, sizeof(affinity), "affinity cpu%d", __ctx->proxy->cpu_affinity_mask[0]); + } + + TFE_LOG_INFO(g_default_logger, "nfq acceptor thread %s is running.", __ctx->proxy->enable_cpu_affinity ? affinity : ""); + event_base_dispatch(__ctx->ev_base); + + DIE("nfq acceptor thread is exited, abort."); +} + +void acceptor_kni_v3_destroy(struct acceptor_kni_v3 *ctx) +{ + if (ctx != NULL && ctx->qh != NULL) + { + nfq_destroy_queue(ctx->qh); + ctx->qh = NULL; + } + if (ctx != NULL && ctx->h != NULL) + { + nfq_close(ctx->h); + ctx->h = NULL; + } + + if (ctx != NULL && ctx->ev_base != NULL) + { + event_base_free(ctx->ev_base); + ctx->ev_base = NULL; + } + + if (ctx != NULL) + { + free(ctx); + ctx = NULL; + } +} + +struct acceptor_kni_v3 *acceptor_kni_v3_create(struct tfe_proxy *proxy, const char *profile, void *logger) +{ + struct acceptor_kni_v3 *__ctx = ALLOC(struct acceptor_kni_v3, 1); + + int ret = 0; + __ctx->proxy = proxy; + __ctx->profile = profile; + + MESA_load_profile_uint_def(profile, "nfq", "queue_id", &(__ctx->queue_id), 1); + MESA_load_profile_uint_def(profile, "nfq", "queue_maxlen", &(__ctx->queue_maxlen), 65535); + MESA_load_profile_uint_def(profile, "nfq", "queue_rcvbufsiz", &(__ctx->queue_rcvbufsiz), 98302500); + MESA_load_profile_uint_def(profile, "nfq", "queue_no_enobufs", &(__ctx->queue_no_enobufs), 1); + + __ctx->h = nfq_open(); + if (!__ctx->h) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_open(), %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + + if (nfq_unbind_pf(__ctx->h, AF_INET) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_unbind_pf(AF_INET), %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + + if (nfq_unbind_pf(__ctx->h, AF_INET6) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_unbind_pf(AF_INET6), %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + + if (nfq_bind_pf(__ctx->h, AF_INET) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_bind_pf(AF_INET), %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + + if (nfq_bind_pf(__ctx->h, AF_INET6) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_bind_pf(AF_INET6), %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + + __ctx->qh = nfq_create_queue(__ctx->h, __ctx->queue_id, &payload_handler_cb, __ctx); + if (!__ctx->qh) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_create_queue(), %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + + /* + * NFQNL_COPY_NONE - noop, do not use it + * NFQNL_COPY_META - copy only packet metadata + * NFQNL_COPY_PACKET - copy entire packet + */ + if (nfq_set_mode(__ctx->qh, NFQNL_COPY_PACKET, 0xffff) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_set_mode(NFQNL_COPY_PACKET), %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + + if (nfq_set_queue_maxlen(__ctx->qh, __ctx->queue_maxlen) < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at nfq_set_queue_maxlen(%d), %d: %s", __ctx->queue_maxlen, errno, strerror(errno)); + errno = 0; + goto __errout; + } + + nfnl_rcvbufsiz(nfq_nfnlh(__ctx->h), __ctx->queue_rcvbufsiz); + + __ctx->fd_nfq_socket = nfq_fd(__ctx->h); + + /* + * set NETLINK_NO_ENOBUFS socket option to avoid receiving ENOBUFS errors (requires Linux kernel >= 2.6.30). + * Don't send error about no buffer space available but drop the packets instead + */ + if (__ctx->queue_no_enobufs) + { + if (setsockopt(__ctx->fd_nfq_socket, SOL_NETLINK, NETLINK_NO_ENOBUFS, &__ctx->queue_no_enobufs, sizeof(__ctx->queue_no_enobufs)) == -1) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(NETLINK_NO_ENOBUFS) for nfq fd, %d: %s", errno, strerror(errno)); + errno = 0; + goto __errout; + } + } + + evutil_make_socket_nonblocking(__ctx->fd_nfq_socket); + + __ctx->ev_base = event_base_new(); + if (unlikely(__ctx->ev_base == NULL)) + { + TFE_LOG_ERROR(g_default_logger, "failed at event_base_new()"); + goto __errout; + } + + __ctx->ev_nfq_socket = event_new(__ctx->ev_base, __ctx->fd_nfq_socket, EV_READ | EV_PERSIST, acceptor_kni_v3_event, __ctx); + if (unlikely(__ctx->ev_nfq_socket == NULL)) + { + TFE_LOG_ERROR(g_default_logger, "failed at setup READ event for nfqueue socket"); + goto __errout; + } + + ret = event_add(__ctx->ev_nfq_socket, NULL); + if (unlikely(ret < 0)) + { + TFE_LOG_ERROR(g_default_logger, "failed at adding nfqueue socket event to evbase"); + goto __errout; + } + + ret = pthread_create(&__ctx->thread, NULL, acceptor_kni_v3_event_thread_entry, (void *) __ctx); + if (unlikely(ret < 0)) + { + TFE_LOG_ERROR(g_default_logger, "failed at creating event thread: %s", strerror(errno)); + errno = 0; + goto __errout; + } + + TFE_LOG_INFO(g_default_logger, "KNIv3 acceptor init successfully"); + return __ctx; + +__errout: + acceptor_kni_v3_destroy(__ctx); + return NULL; +} diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 0da98c0..23c4cf3 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -45,6 +45,7 @@ #include <tcp_stream.h> #include <acceptor_kni_v1.h> #include <acceptor_kni_v2.h> +#include <acceptor_kni_v3.h> #include <watchdog_kni.h> #include <key_keeper.h> @@ -499,6 +500,10 @@ void tfe_proxy_acceptor_init(struct tfe_proxy * proxy, const char * profile) { MESA_load_profile_uint_def(profile, "system", "enable_kni_v1", &proxy->en_kni_v1_acceptor, 0); MESA_load_profile_uint_def(profile, "system", "enable_kni_v2", &proxy->en_kni_v2_acceptor, 1); + MESA_load_profile_uint_def(profile, "system", "enable_kni_v3", &proxy->en_kni_v3_acceptor, 0); + + int ret = proxy->en_kni_v1_acceptor + proxy->en_kni_v2_acceptor + proxy->en_kni_v3_acceptor; + CHECK_OR_EXIT((ret == 1), "Invalid KNI acceptor. Exit."); if (proxy->en_kni_v1_acceptor) { @@ -512,6 +517,12 @@ void tfe_proxy_acceptor_init(struct tfe_proxy * proxy, const char * profile) CHECK_OR_EXIT(g_default_proxy->kni_v2_acceptor, "Failed at init KNIv2 acceptor. Exit. "); } + if (proxy->en_kni_v3_acceptor) + { + g_default_proxy->kni_v3_acceptor = acceptor_kni_v3_create(g_default_proxy, profile, g_default_logger); + CHECK_OR_EXIT(g_default_proxy->kni_v3_acceptor, "Failed at init KNIv3 acceptor. Exit. "); + } + return; } diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 7c7054c..92bae92 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -1470,8 +1470,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst _stream->head.addr = tfe_stream_addr_create_by_fd(fd_downstream, CONN_DIR_DOWNSTREAM); if (unlikely(_stream->head.addr == NULL)) { - TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", - fd_downstream, fd_upstream); goto __errout; + TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, %s, terminate fds.", + fd_downstream, fd_upstream, strerror(errno)); goto __errout; } _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); diff --git a/script/service/tfe-env.service b/script/service/tfe-env.service index 3ae24b8..5bf650f 100644 --- a/script/service/tfe-env.service +++ b/script/service/tfe-env.service @@ -13,7 +13,7 @@ RemainAfterExit=yes ExecStart=/bin/true ExecStop=/bin/true -ExecStartPost=/usr/sbin/modprobe tfe-kmod +# ExecStartPost=/usr/sbin/modprobe tfe-kmod ExecStartPost=/usr/sbin/ip link set ${TFE_DEVICE_DATA_INCOMING} address ${TFE_LOCAL_MAC_DATA_INCOMING} ExecStartPost=/usr/sbin/ip link set ${TFE_DEVICE_DATA_INCOMING} up ExecStartPost=/usr/sbin/ip addr flush dev ${TFE_DEVICE_DATA_INCOMING} @@ -44,7 +44,7 @@ ExecStopPost=/usr/sbin/ip -6 route del default via fd00::01 ExecStopPost=/usr/sbin/ip -6 route del local default dev lo table 102 ExecStopPost=/usr/sbin/ip addr del fd00::02/64 dev ${TFE_DEVICE_DATA_INCOMING} ExecStopPost=/usr/sbin/ip link set ${TFE_DEVICE_DATA_INCOMING} down -ExecStopPost=/usr/sbin/modprobe -r tfe-kmod +# ExecStopPost=/usr/sbin/modprobe -r tfe-kmod [Install] RequiredBy=tfe.service diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index 350c382..a52f9c4 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -335,6 +335,21 @@ set_property(TARGET brotlienc-static PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/l set_property(TARGET brotlienc-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) target_link_libraries(brotlienc-static INTERFACE brotlicommon-static) +### libnetfilter_queue +ExternalProject_Add(libnetfilter_queue PREFIX libnetfilter_queue + URL ${CMAKE_CURRENT_SOURCE_DIR}/libnetfilter_queue-1.0.5.tar.bz2 + URL_MD5 ce807654358481efaa826fec33c89b6a + CONFIGURE_COMMAND ./configure --prefix=<INSTALL_DIR> --enable-static=yes --enable-shared=no + BUILD_IN_SOURCE 1) + +ExternalProject_Get_Property(libnetfilter_queue INSTALL_DIR) +file(MAKE_DIRECTORY ${INSTALL_DIR}/include) + +add_library(libnetfilter_queue-static STATIC IMPORTED GLOBAL) +add_dependencies(libnetfilter_queue-static libnetfilter_queue) +set_property(TARGET libnetfilter_queue-static PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libnetfilter_queue.a) +set_property(TARGET libnetfilter_queue-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) + ### gperftools #ExternalProject_Add(gperftools # PREFIX gperftools @@ -350,4 +365,4 @@ target_link_libraries(brotlienc-static INTERFACE brotlicommon-static) #add_library(gperftools-static STATIC IMPORTED GLOBAL) #add_dependencies(gperftools-static gperftools) #set_property(TARGET gperftools-static PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libtcmalloc.a) -#set_property(TARGET gperftools-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include)
\ No newline at end of file +#set_property(TARGET gperftools-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) diff --git a/vendor/libnetfilter_queue-1.0.5.tar.bz2 b/vendor/libnetfilter_queue-1.0.5.tar.bz2 Binary files differnew file mode 100644 index 0000000..a3cd668 --- /dev/null +++ b/vendor/libnetfilter_queue-1.0.5.tar.bz2 |
