diff options
| author | wangmenglan <[email protected]> | 2023-04-28 16:18:32 +0800 |
|---|---|---|
| committer | wangmenglan <[email protected]> | 2023-05-06 17:53:08 +0800 |
| commit | 8de8ec1c5fa26f9e051ddd57f398d688e972c192 (patch) | |
| tree | 280da2158e0d7893f9df5f4b159a69f11087d49f /common/src/tfe_packet_io.cpp | |
| parent | 8a7c196c20dea2005085ded654f207936608c8ba (diff) | |
TSG-14938 TFE支持新控制报文格式; 调整代码结构
Diffstat (limited to 'common/src/tfe_packet_io.cpp')
| -rw-r--r-- | common/src/tfe_packet_io.cpp | 481 |
1 files changed, 426 insertions, 55 deletions
diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index 17ed84b..699d025 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -3,15 +3,20 @@ #include <netinet/udp.h> #include <netinet/tcp.h> #include <netinet/ether.h> +#include <linux/if_tun.h> +#include <sys/eventfd.h> #include <marsio.h> #include <cjson/cJSON.h> #include <MESA/MESA_prof_load.h> #include <tfe_utils.h> -// #include <tfe_proxy.h> #include <proxy.h> -#include "tfe_acceptor_kni.h" +#include <intercept_policy.h> +#include <unistd.h> + +#include <time.h> + #include "tfe_ctrl_packet.h" #include "tfe_raw_packet.h" #include "io_uring.h" @@ -21,25 +26,13 @@ #include "tfe_stream.h" #include "raw_socket.h" #include "packet_construct.h" -#include "tfe_tap_rss.h" -#include <intercept_policy.h> +#include "mpack.h" +#include "tap.h" +#include "bpf_obj.h" +#include "tfe_session_table.h" +#include "tfe_packet_io.h" + -#include <time.h> -/* - * add: vxlan_hdr - * del: marsio_buff_ctrlzone_reset() - * +----+ NF2SF +----+ - * | |--------------------------->| | - * | | | | - * | |-------+ | |-------+ - * | NF | | NF2NF (undo) | SF | | SF2SF (del old vxlan_hdr; add new vxlan_hdr) - * | |<------+ | |<------+ - * | | | | - * | |<---------------------------| | - * | | SF2NF | | - * +---+ del: vxlan_hdr +----+ - * add: session_id + route_ctx + sid - */ /****************************************************************************** * Struct @@ -51,12 +44,41 @@ #define SET_TRAFFIC_IS_DECRYPTED(field) (field | TRAFFIC_IS_DECRYPTED) #define CLEAR_TRAFFIC_IS_DECRYPTED(field) (field & ~TRAFFIC_IS_DECRYPTED) + struct config { int bypass_all_traffic; int rx_burst_max; + + int enable_iouring; + int enable_debuglog; + + int ring_size; + int buff_size; + + int flags; + int sq_thread_idle; + + int bpf_debug_log; + int bpf_hash_mode; + int tap_allow_mutilthread; + char bpf_obj[1024]; + + char src_mac[6]; + char tap_mac[6]; + char tap_c_mac[6]; + char tap_s_mac[6]; + char dev_tap[16]; + char dev_tap_c[16]; + char dev_tap_s[16]; + + int tap_rps_enable; + char tap_rps_mask[TFE_SYMBOL_MAX]; + char app_symbol[256]; char dev_nf_interface[256]; + + struct bpf_obj_ctx *tap_bpf_ctx; }; struct device @@ -141,14 +163,11 @@ extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, s * STATIC ******************************************************************************/ -static void time_echo(struct addr_tuple4 inner_addr) +static void time_echo(uint64_t session_id, char *info) { time_t t; time(&t); - - char *addr_string = addr_tuple4_to_str(&inner_addr); - TFE_LOG_ERROR(g_default_logger, "%s: session:%s, time:%s", LOG_TAG_PKTIO, addr_string, ctime(&t)); - free(addr_string); + TFE_LOG_ERROR(g_default_logger, "%s: session:%lu, time:%s %s", LOG_TAG_PKTIO, session_id, ctime(&t), info); } // return 0 : not keepalive packet @@ -173,6 +192,38 @@ static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff) } } +static int tfe_tap_write_per_thread(int tap_fd, const char *data, int data_len, void *logger) +{ + int ret = write(tap_fd, data, data_len); + if (ret != data_len) + { + TFE_LOG_ERROR(g_default_logger, "%s: need send %dB, only send %dB, aborting: %s", LOG_TAG_PKTIO, data_len, ret, strerror(errno)); + } + + return ret; +} + +static struct session_ctx *session_ctx_new() +{ + struct session_ctx *ctx = (struct session_ctx *)calloc(1, sizeof(struct session_ctx)); + assert(ctx != NULL); + return ctx; +} + +static void session_ctx_free(struct session_ctx *ctx) +{ + if (ctx) + { + if (ctx->cmsg) + { + tfe_cmsg_destroy(ctx->cmsg); + } + + free(ctx); + ctx = 0; + } +} + static void session_value_free_cb(void *ctx) { struct session_ctx *s_ctx = (struct session_ctx *)ctx; @@ -659,10 +710,54 @@ static int tcp_restore_set_from_pkg(struct addr_tuple4 *tuple4, struct tcp_resto // return -1 : error static int packet_io_config(const char *profile, struct config *config) { + int ret = 0; + + MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_all_traffic", (int *)&config->bypass_all_traffic, 0); MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1); MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol)); MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_nf_interface", config->dev_nf_interface, sizeof(config->dev_nf_interface)); + MESA_load_profile_string_def(profile, "PACKET_IO", "tap_name", config->dev_tap, sizeof(config->dev_tap), "tap0"); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_allow_mutilthread", &config->tap_allow_mutilthread); + MESA_load_profile_string_nodef(profile, "PACKET_IO", "bpf_obj", config->bpf_obj, sizeof(config->bpf_obj)); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_debug_log", (int *)&config->bpf_debug_log); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_hash_mode", (int *)&config->bpf_hash_mode); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_rps_enable", &config->tap_rps_enable); + MESA_load_profile_string_nodef(profile, "PACKET_IO", "tap_rps_mask", config->tap_rps_mask, sizeof(config->tap_rps_mask)); + + MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_iouring", &config->enable_iouring); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_debuglog", &config->enable_debuglog); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "ring_size", &config->ring_size); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "buff_size", &config->buff_size); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "flags", &config->flags); + MESA_load_profile_int_nodef(profile, "PACKET_IO", "sq_thread_idle", &config->sq_thread_idle); + + MESA_load_profile_string_def(profile, "traffic_steering", "device_client", config->dev_tap_c, sizeof(config->dev_tap_c), "tap_c"); + MESA_load_profile_string_def(profile, "traffic_steering", "device_server", config->dev_tap_s, sizeof(config->dev_tap_s), "tap_s"); + + char src_mac_addr[18] = {0}; + ret = MESA_load_profile_string_nodef(profile, "PACKET_IO", "src_mac_addr", src_mac_addr, sizeof(src_mac_addr)); + if(ret < 0){ + TFE_LOG_ERROR(g_default_logger, "%s: invalid src_mac_addr: src_mac_addr not set, profile = %s, section = PACKET_IO", LOG_TAG_PKTIO, profile); + return -1; + } + str_to_mac(src_mac_addr, config->src_mac); + ret = get_mac_by_device_name(config->dev_tap, config->tap_mac); + if (ret != 0) { + TFE_LOG_ERROR(g_default_logger, "%s: invalid tap_name: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap); + return -1; + } + ret = get_mac_by_device_name(config->dev_tap_c, config->tap_c_mac); + if (ret != 0) { + TFE_LOG_ERROR(g_default_logger, "%s: invalid device_client: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_c); + return -1; + } + ret = get_mac_by_device_name(config->dev_tap_s, config->tap_s_mac); + if (ret != 0) { + TFE_LOG_ERROR(g_default_logger, "%s: invalid device_server: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_s); + return -1; + } + if (config->rx_burst_max > RX_BURST_MAX) { TFE_LOG_ERROR(g_default_logger, "%s: invalid rx_burst_max, exceeds limit %d", LOG_TAG_PKTIO, RX_BURST_MAX); @@ -685,6 +780,22 @@ static int packet_io_config(const char *profile, struct config *config) TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->rx_burst_max : %d", LOG_TAG_PKTIO, config->rx_burst_max); TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->app_symbol : %s", LOG_TAG_PKTIO, config->app_symbol); TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->dev_nf_interface : %s", LOG_TAG_PKTIO, config->dev_nf_interface); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_name : %s", LOG_TAG_PKTIO, config->tap_rps_mask); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_allow_mutilthread : %d", LOG_TAG_PKTIO, config->tap_allow_mutilthread); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->bpf_obj : %s", LOG_TAG_PKTIO, config->bpf_obj); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->bpf_debug_log : %d", LOG_TAG_PKTIO, config->bpf_debug_log); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->bpf_hash_mode : %d", LOG_TAG_PKTIO, config->bpf_hash_mode); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_rps_enable : %d", LOG_TAG_PKTIO, config->tap_rps_enable); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_rps_mask : %s", LOG_TAG_PKTIO, config->tap_rps_mask); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->enable_iouring : %d", LOG_TAG_PKTIO, config->enable_iouring); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->enable_debuglog : %d", LOG_TAG_PKTIO, config->enable_debuglog); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->ring_size : %d", LOG_TAG_PKTIO, config->ring_size); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->buff_size : %d", LOG_TAG_PKTIO, config->buff_size); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->flags : %d", LOG_TAG_PKTIO, config->flags); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->sq_thread_idle : %d", LOG_TAG_PKTIO, config->sq_thread_idle); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->device_client : %s", LOG_TAG_PKTIO, config->dev_tap_c); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->device_server : %s", LOG_TAG_PKTIO, config->dev_tap_s); + TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->src_mac_addr : %s", LOG_TAG_PKTIO, src_mac_addr); return 0; } @@ -826,28 +937,169 @@ static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *met /* { - "tsync": "1.0", + "tsync": "2.0", "session_id": "123456789", "state": "active", "method": "log_update", "params": { - "sf_profile_ids": [ - 2, - 3, - 4, - 5, - 6, - 7 - ] + "proxy": { + "ssl_intercept_info": { + mpack array + } + } } } */ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) { struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; - struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx; + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; + char *data; + size_t size; + mpack_writer_t writer; + mpack_writer_init_growable(&writer, &data, &size); + + mpack_build_map(&writer); + + mpack_write_cstr(&writer, "tsync"); + mpack_write_cstr(&writer, "2.0"); + + mpack_write_cstr(&writer, "session_id"); + mpack_write_u64(&writer, s_ctx->session_id); + + mpack_write_cstr(&writer, "state"); + mpack_write_cstr(&writer, "active"); + + mpack_write_cstr(&writer, "method"); + mpack_write_cstr(&writer, "log_update"); + + mpack_write_cstr(&writer, "params"); + mpack_build_map(&writer); + + mpack_write_cstr(&writer, "proxy"); + mpack_build_map(&writer); + + mpack_write_cstr(&writer, "ssl_intercept_info"); + mpack_build_array(&writer); // cmsg value begin + + int ret = 0; + uint8_t ssl_intercept_status = 0; + uint16_t length = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (unsigned char *)&ssl_intercept_status, sizeof(ssl_intercept_status), &length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl intercept state from cmsg: %s", strerror(-ret)); + return; + } + + uint64_t ssl_upstream_latency = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_LATENCY, (unsigned char *)&ssl_upstream_latency, sizeof(ssl_upstream_latency), &length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl upstream latency from cmsg: %s", strerror(-ret)); + return; + } + + uint64_t ssl_downstream_latency = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_LATENCY, (unsigned char *)&ssl_downstream_latency, sizeof(ssl_downstream_latency), &length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl downstream latency from cmsg: %s", strerror(-ret)); + return; + } + + char ssl_upstream_version[64] = {0}; + uint16_t ssl_upstream_version_length = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_VERSION, (unsigned char *)ssl_upstream_version, sizeof(ssl_upstream_version), &length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl upstream version from cmsg: %s", strerror(-ret)); + return; + } + + char ssl_downstream_version[64] = {0}; + uint16_t ssl_downstream_version_length = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_VERSION, (unsigned char *)ssl_downstream_version, sizeof(ssl_downstream_version), &ssl_downstream_version_length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl downstream version from cmsg: %s", strerror(-ret)); + return; + } + + uint8_t ssl_pinning_state = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&ssl_pinning_state, sizeof(ssl_pinning_state), &length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl pinning state from cmsg: %s", strerror(-ret)); + return; + } + + uint8_t ssl_cert_verify = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CERT_VERIFY, (unsigned char *)&ssl_cert_verify, sizeof(ssl_cert_verify), &length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl cert verify from cmsg: %s", strerror(-ret)); + return; + } + + char ssl_error[64] = {0}; + uint16_t ssl_error_length = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_ERROR, (unsigned char *)ssl_error, sizeof(ssl_error), &ssl_error_length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl error from cmsg: %s", strerror(-ret)); + return; + } + + char ssl_passthrough_reason[32] = {0}; + uint16_t ssl_passthrough_reason_length = 0; + ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (unsigned char *)ssl_passthrough_reason, sizeof(ssl_passthrough_reason), &ssl_passthrough_reason_length); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl passthrough reason from cmsg: %s", strerror(-ret)); + return; + } + + mpack_write_u8(&writer, ssl_intercept_status); + mpack_write_u64(&writer, ssl_upstream_latency); + mpack_write_u64(&writer, ssl_downstream_latency); + mpack_write_str(&writer, ssl_upstream_version, ssl_upstream_version_length); + mpack_write_str(&writer, ssl_downstream_version, ssl_downstream_version_length); + mpack_write_u8(&writer, ssl_pinning_state); + mpack_write_u8(&writer, ssl_cert_verify); + mpack_write_str(&writer, ssl_error, ssl_error_length); + mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length); + mpack_complete_array(&writer); + mpack_complete_map(&writer); + + mpack_complete_map(&writer); + + mpack_complete_map(&writer); + + // marsio_buff_t *tx_buffs[1]; + // char *raw_packet_header_data = session_ctx->ctrl_meta->raw_data; + // int raw_packet_header_len = session_ctx->ctrl_meta->l7offset; + // marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_index); + // char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + size); + // memcpy(dst, raw_packet_header_data, raw_packet_header_len); + // memcpy(dst + raw_packet_header_len, data, size); + + // struct metadata meta = {0}; + // meta.session_id = session_ctx->session_id; + // meta.l7offset = raw_packet_header_len; + // meta.is_ctrl_pkt = 1; + // meta.sids.num = 1; + // meta.sids.elems[0] = sce_ctx->firewall_sids; + // route_ctx_copy(&meta.route_ctx, &session_ctx->ctrl_meta->route_ctx); + // mbuff_set_metadata(tx_buffs[0], &meta); + // int nsend = marsio_buff_datalen(tx_buffs[0]); + // marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, tx_buffs, 1); + +end: + mpack_writer_destroy(&writer); + free(data); return; } @@ -856,12 +1108,10 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { uint8_t *iptmp = NULL; - int ret = 0; int fd_downstream = 0; int fd_upstream = 0; int fd_fake_c = 0; int fd_fake_s = 0; - uint64_t rule_id = 0; uint16_t size = 0; char *addr_str = NULL; @@ -878,7 +1128,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser struct sockaddr_in *in_addr_server = (struct sockaddr_in *)&restore_info.server.addr; struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; - struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx; + struct packet_io *packet_io = thread->ref_io; struct raw_pkt_parser raw_parser; raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8); @@ -941,7 +1191,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser free(addr_str); - fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), thread->ref_tap_config->tap_device, 0x65); + fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), packet_io->config.dev_tap, 0x65); if (fd_upstream < 0) { TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(UPSTREAM)"); @@ -949,7 +1199,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser } // tcp repair S2C - fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), thread->ref_tap_config->tap_device, 0x65); + fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), packet_io->config.dev_tap, 0x65); if (fd_downstream < 0) { TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(DOWNSTREAM)"); @@ -1080,7 +1330,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; - struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx; + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; TFE_LOG_ERROR(g_default_logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); @@ -1098,7 +1348,7 @@ static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; - struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx; + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct global_metrics *g_metrics = thread->ref_metrics; struct metadata meta; @@ -1151,10 +1401,8 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; - struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx; + struct packet_io *packet_io = thread->ref_io; struct global_metrics *g_metrics = thread->ref_metrics; - struct addr_tuple4 inner_addr; - memset(&inner_addr, 0, sizeof(struct addr_tuple4)); int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); @@ -1169,6 +1417,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } + time_echo(meta.session_id, "raw pkg from nf start"); struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id); if (node == NULL) @@ -1186,6 +1435,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx { metadata_deep_copy(s_ctx->raw_meta_e2i, &meta); } + s_ctx->raw_meta_e2i->sids = meta.sids; } else { @@ -1193,14 +1443,15 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx { metadata_deep_copy(s_ctx->raw_meta_i2e, &meta); } + s_ctx->raw_meta_i2e->sids = meta.sids; } if (meta.is_decrypted) { // c2s if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir) { - add_ether_header(raw_data, acceptor_ctx->config->src_mac, acceptor_ctx->config->tap_s_mac); - if (acceptor_ctx->config->enable_iouring) { + add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac); + if (packet_io->config.enable_iouring) { io_uring_submit_write_entry(thread->tap_ctx->io_uring_s, raw_data, raw_len); } else { @@ -1210,8 +1461,8 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx } // s2c else { - add_ether_header(raw_data, acceptor_ctx->config->src_mac, acceptor_ctx->config->tap_c_mac); - if (acceptor_ctx->config->enable_iouring) { + add_ether_header(raw_data, packet_io->config.tap_s_mac, packet_io->config.tap_c_mac); + if (packet_io->config.enable_iouring) { io_uring_submit_write_entry(thread->tap_ctx->io_uring_c, raw_data, raw_len); } else { @@ -1229,8 +1480,8 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx buff_size = raw_len - ((char *)payload - meta->raw_data) + sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct tcphdr); #endif // send to tap0 - add_ether_header(raw_data, acceptor_ctx->config->src_mac, acceptor_ctx->config->tap_mac); - if (acceptor_ctx->config->enable_iouring) { + add_ether_header(raw_data, packet_io->config.src_mac, packet_io->config.tap_mac); + if (packet_io->config.enable_iouring) { io_uring_submit_write_entry(thread->tap_ctx->io_uring_fd, raw_data, raw_len); } else { @@ -1239,6 +1490,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx throughput_metrics_inc(&g_metrics->tap_pkt_tx, 1, raw_len); } marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); + time_echo(meta.session_id, "raw pkg from nf end"); return 0; } @@ -1246,6 +1498,91 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx /****************************************************************************** * EXTERN ******************************************************************************/ +int is_enable_iouring(struct packet_io *handle) +{ + return handle->config.enable_iouring; +} + +void tfe_tap_ctx_destory(struct tap_ctx *handler) +{ + if (handler) { + io_uring_instance_destory(handler->io_uring_fd); + io_uring_instance_destory(handler->io_uring_c); + io_uring_instance_destory(handler->io_uring_s); + if (handler->eventfd > 0) + close(handler->eventfd); + if (handler->eventfd_c > 0) + close(handler->eventfd_c); + if (handler->eventfd_s > 0) + close(handler->eventfd_s); + tap_close(handler->tap_fd); + tap_close(handler->tap_c); + tap_close(handler->tap_s); + + free(handler); + } +} + +struct tap_ctx *tfe_tap_ctx_create(void *ctx) +{ + int ret = 0; + struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)ctx; + struct acceptor_kni_v4 *acceptor_ctx = thread_ctx->ref_acceptor_ctx; + struct packet_io *packet_io = acceptor_ctx->io; + struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx)); + assert(tap_ctx != NULL); + + tap_ctx->tap_fd = tap_open(packet_io->config.dev_tap, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE); + tap_ctx->tap_c = tap_open(packet_io->config.dev_tap_c, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE); + tap_ctx->tap_s = tap_open(packet_io->config.dev_tap_s, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE); + + tap_up_link(packet_io->config.dev_tap); + tap_up_link(packet_io->config.dev_tap_c); + tap_up_link(packet_io->config.dev_tap_s); + + // fcntl(2) EFD_NONBLOCK 不生效 + tap_ctx->eventfd = eventfd(0, EFD_NONBLOCK); + tap_ctx->eventfd_c = eventfd(0, EFD_NONBLOCK); + tap_ctx->eventfd_s = eventfd(0, EFD_NONBLOCK); + + if (packet_io->config.enable_iouring) { + bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_fd); + bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_c); + bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_s); + + tap_ctx->io_uring_fd = io_uring_instance_create(tap_ctx->tap_fd, tap_ctx->eventfd, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog); + if (tap_ctx->io_uring_fd == NULL) + goto error_out; + tap_ctx->io_uring_c = io_uring_instance_create(tap_ctx->tap_c, tap_ctx->eventfd_c, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog); + if (tap_ctx->io_uring_c == NULL) + goto error_out; + tap_ctx->io_uring_s = io_uring_instance_create(tap_ctx->tap_s, tap_ctx->eventfd_s, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog); + if (tap_ctx->io_uring_s == NULL) + goto error_out; + + marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd, thread_ctx->thread_index); + marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_c, thread_ctx->thread_index); + marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_s, thread_ctx->thread_index); + } + + if (packet_io->config.tap_rps_enable) + { + ret = tap_set_rps(packet_io->config.dev_tap, thread_ctx->thread_index, packet_io->config.tap_rps_mask); + if (ret != 0) + goto error_out; + ret = tap_set_rps(packet_io->config.dev_tap_c, thread_ctx->thread_index, packet_io->config.tap_rps_mask); + if (ret != 0) + goto error_out; + ret = tap_set_rps(packet_io->config.dev_tap_s, thread_ctx->thread_index, packet_io->config.tap_rps_mask); + if (ret != 0) + goto error_out; + } + + return tap_ctx; +error_out: + tfe_tap_ctx_destory(tap_ctx); + return NULL; +} int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx) { @@ -1287,6 +1624,12 @@ void packet_io_destory(struct packet_io *handle) handle->instance = NULL; } + if (handle->config.tap_bpf_ctx) + { + bpf_obj_unload(handle->config.tap_bpf_ctx); + handle->config.tap_bpf_ctx = NULL; + } + free(handle); handle = NULL; } @@ -1304,6 +1647,19 @@ struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_ goto error_out; } + if (handle->config.tap_allow_mutilthread) + { + handle->config.tap_bpf_ctx = bpf_obj_load(handle->config.bpf_obj, thread_num, handle->config.bpf_hash_mode, handle->config.bpf_debug_log); + if (handle->config.tap_bpf_ctx == NULL) + { + goto error_out; + } + } + else if (thread_num > 1){ + TFE_LOG_ERROR(g_default_logger, "%s: under tap mode, when disable tap_allow_mutilthread, only support one work thread.", LOG_TAG_PKTIO); + goto error_out; + } + handle->instance = marsio_create(); if (handle->instance == NULL) { @@ -1366,6 +1722,17 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi return 0; } + if (handle->config.bypass_all_traffic == 1) + { + for (int i = 0; i < nr_recv; i++) + { + int raw_len = marsio_buff_datalen(rx_buffs[i]); + } + + marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, rx_buffs, nr_recv); + return nr_recv; + } + for (int j = 0; j < nr_recv; j++) { marsio_buff_t *rx_buff = rx_buffs[j]; @@ -1397,7 +1764,7 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi void handle_decryption_packet_from_tap(const char *data, int len, void *args) { struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args; - struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx; + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; struct addr_tuple4 inner_addr; @@ -1416,6 +1783,7 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; + time_echo(s_ctx->session_id, "decryption pkg from nf start"); marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); @@ -1435,8 +1803,9 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) meta.is_decrypted = SET_TRAFFIC_IS_DECRYPTED(0); meta.is_ctrl_pkt = 0; meta.l7offset = 0; - meta.sids.num = 1; + meta.sids.num = 2; meta.sids.elems[0] = acceptor_ctx->sce_sids; + meta.sids.elems[1] = acceptor_ctx->proxy_sids; if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct addr_tuple4)) == 0) meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir; @@ -1453,6 +1822,7 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) } packet_io_set_metadata(tx_buffs[0], &meta); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1); + time_echo(s_ctx->session_id, "decryption pkg from nf end"); } void handle_raw_packet_from_tap(const char *data, int len, void *args) @@ -1460,7 +1830,7 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) char *src_mac = NULL; char *dst_mac = NULL; struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args; - struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx; + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; struct addr_tuple4 inner_addr; @@ -1479,6 +1849,7 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; + time_echo(s_ctx->session_id, "raw pkg from tap start"); marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); @@ -1519,7 +1890,6 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) } else { - // raw_meta_i2e->raw_data maybe is null sids_copy(&meta.sids, &s_ctx->raw_meta_i2e->sids); route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_i2e->route_ctx); } @@ -1528,4 +1898,5 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) packet_io_set_metadata(tx_buffs[0], &meta); add_ether_header(dst, src_mac, dst_mac); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1); + time_echo(s_ctx->session_id, "raw pkg from tap end"); } |
