diff options
Diffstat (limited to 'common/src/tfe_packet_io.cpp')
| -rw-r--r-- | common/src/tfe_packet_io.cpp | 103 |
1 files changed, 55 insertions, 48 deletions
diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index 40140ac..ef9c8f9 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -20,7 +20,7 @@ #include "tfe_ctrl_packet.h" #include "tfe_raw_packet.h" #include "io_uring.h" -#include "tfe_metrics.h" +#include "tfe_packet_io_fs.h" #include "tfe_cmsg.h" #include "tfe_tcp_restore.h" #include "tfe_stream.h" @@ -162,14 +162,6 @@ extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, s /****************************************************************************** * STATIC ******************************************************************************/ - -static void time_echo(uint64_t session_id, char *info) -{ - time_t t; - time(&t); - TFE_LOG_ERROR(g_default_logger, "%s: session:%lu, time:%s %s", LOG_TAG_PKTIO, session_id, ctime(&t), info); -} - // return 0 : not keepalive packet // return 1 : is keepalive packet static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff) @@ -595,8 +587,8 @@ static int tcp_restore_set_from_cmsg(struct tfe_cmsg *cmsg, struct tcp_restore_i ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (unsigned char *)&wsacle_server, sizeof(uint8_t), &length); if (ret == 0) { - restore_info->client.wscale_perm = true; - restore_info->client.wscale = wsacle_server; + restore_info->server.wscale_perm = true; + restore_info->server.wscale = wsacle_server; } uint8_t sack_client; @@ -889,6 +881,7 @@ static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta) if (meta->is_ctrl_pkt) { + marsio_buff_set_ctrlbuf(tx_buff); if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) != 0) { TFE_LOG_ERROR(g_default_logger, "%s: unable to set l7offset for metadata", LOG_TAG_PKTIO); @@ -952,7 +945,7 @@ static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *met */ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; @@ -979,6 +972,7 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_writer_t writer; mpack_writer_init_growable(&writer, &data, &size); + // root map mpack_build_map(&writer); mpack_write_cstr(&writer, "tsync"); @@ -994,9 +988,11 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_write_cstr(&writer, "log_update"); mpack_write_cstr(&writer, "params"); + // params map mpack_build_map(&writer); mpack_write_cstr(&writer, "proxy"); + // proxy map mpack_build_map(&writer); mpack_write_cstr(&writer, "ssl_intercept_info"); @@ -1031,9 +1027,30 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_write_str(&writer, ssl_error, ssl_error_length); mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length); mpack_complete_array(&writer); + // proxy map end mpack_complete_map(&writer); + // params map end mpack_complete_map(&writer); + // root map end mpack_complete_map(&writer); + + // finish writing + if (mpack_writer_destroy(&writer) != mpack_ok) + { + assert(0); + if (data) + { + free(data); + data = NULL; + } + return; + } + + struct ethhdr *eth_hdr = (struct ethhdr *)s_ctx->ctrl_meta->raw_data; + struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr)); + struct tcphdr *tcp_hdr = (struct tcphdr *)((char *)ip_hdr + sizeof(struct ip)); + // ip_hdr->ip_len = htons(sizeof(struct ip) + (ntohs(tcp_hdr->th_off) * 4) + size); + ip_hdr->ip_len = htons(sizeof(struct ip) + 20 + size); char *raw_packet_header_data = s_ctx->ctrl_meta->raw_data; int raw_packet_header_len = s_ctx->ctrl_meta->l7offset; @@ -1052,7 +1069,6 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) int nsend = marsio_buff_datalen(tx_buffs[0]); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_seq, tx_buffs, 1); - mpack_writer_destroy(&writer); if (data) free(data); return; @@ -1083,7 +1099,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser struct sockaddr_in *in_addr_client = (struct sockaddr_in *)&restore_info.client.addr; struct sockaddr_in *in_addr_server = (struct sockaddr_in *)&restore_info.server.addr; - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread->ref_io; struct raw_pkt_parser raw_parser; @@ -1259,7 +1275,7 @@ end: // return -1 : error static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); if (!node) @@ -1274,7 +1290,7 @@ static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser * // return -1 : error static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); if (node) @@ -1292,14 +1308,14 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser // return -1 : error static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; TFE_LOG_ERROR(g_default_logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { - struct acceptor_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; + struct packet_io_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; __atomic_fetch_add(&thread_ctx->session_table_need_reset, 1, __ATOMIC_RELAXED); } @@ -1310,9 +1326,9 @@ static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser // return -1 : error static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; - struct global_metrics *g_metrics = thread->ref_metrics; + struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct metadata meta; if (packet_io_get_metadata(rx_buff, &meta) == -1) @@ -1340,21 +1356,21 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf switch (ctrl_parser.state) { case SESSION_STATE_OPENING: - __atomic_fetch_add(&g_metrics->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED); // when session opening, firewall not send policy id // return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx); break; case SESSION_STATE_CLOSING: - __atomic_fetch_add(&g_metrics->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED); return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_ACTIVE: - __atomic_fetch_add(&g_metrics->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED); return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_RESETALL: - __atomic_fetch_add(&g_metrics->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED); return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx); default: - __atomic_fetch_add(&g_metrics->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED); break; } @@ -1363,9 +1379,9 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread->ref_io; - struct global_metrics *g_metrics = thread->ref_metrics; + struct packet_io_fs *packet_io_fs = thread->ret_fs_state; int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); @@ -1380,7 +1396,6 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } - time_echo(meta.session_id, "raw pkg from nf start"); struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id); if (node == NULL) @@ -1403,7 +1418,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx else { tfe_tap_write_per_thread(thread->tap_ctx->tap_s, raw_data, raw_len, g_default_logger); } - throughput_metrics_inc(&g_metrics->tap_s_pkt_tx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->tap_s_pkt_tx, 1, raw_len); } // s2c else { @@ -1414,7 +1429,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx else { tfe_tap_write_per_thread(thread->tap_ctx->tap_c, raw_data, raw_len, g_default_logger); } - throughput_metrics_inc(&g_metrics->tap_c_pkt_tx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->tap_c_pkt_tx, 1, raw_len); } } else @@ -1450,7 +1465,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx else { tfe_tap_write_per_thread(thread->tap_ctx->tap_fd, raw_data, raw_len, g_default_logger); } - throughput_metrics_inc(&g_metrics->tap_pkt_tx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->tap_pkt_tx, 1, raw_len); uint8_t flag = tfe_cmsg_get_flag(s_ctx->cmsg); if (flag & TFE_CMSG_FLAG_USER0) { @@ -1459,11 +1474,9 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx } } marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); - time_echo(meta.session_id, "raw pkg from nf end"); return 0; } - /****************************************************************************** * EXTERN ******************************************************************************/ @@ -1495,7 +1508,7 @@ void tfe_tap_ctx_destory(struct tap_ctx *handler) struct tap_ctx *tfe_tap_ctx_create(void *ctx) { int ret = 0; - struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread_ctx->ref_acceptor_ctx; struct packet_io *packet_io = acceptor_ctx->io; struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx)); @@ -1553,7 +1566,7 @@ error_out: return NULL; } -int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx) +int packet_io_thread_init(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx) { if (marsio_thread_init(handle->instance) != 0) { @@ -1564,7 +1577,7 @@ int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx * return 0; } -void packet_io_thread_wait(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx, int timeout_ms) +void packet_io_thread_wait(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, int timeout_ms) { struct mr_vdev *vdevs[] = {handle->dev_nf_interface.mr_dev}; @@ -1679,8 +1692,8 @@ error_out: // return n_packet_recv int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; - struct global_metrics *g_metrics = thread->ref_metrics; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; + struct packet_io_fs *packet_io_fs = thread->ret_fs_state; marsio_buff_t *rx_buffs[RX_BURST_MAX]; @@ -1715,14 +1728,14 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi if (marsio_buff_is_ctrlbuf(rx_buff)) { - throughput_metrics_inc(&g_metrics->ctrl_pkt_rx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->ctrl_pkt_rx, 1, raw_len); // all control packet need bypass handle_control_packet(handle, rx_buff, thread_seq, ctx); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); } else { - throughput_metrics_inc(&g_metrics->raw_pkt_rx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); handle_raw_packet_from_nf(handle, rx_buff, thread_seq, ctx); } } @@ -1732,7 +1745,7 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi void handle_decryption_packet_from_tap(const char *data, int len, void *args) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; @@ -1752,8 +1765,6 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - time_echo(s_ctx->session_id, "decryption pkg from nf start"); - marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ @@ -1791,14 +1802,13 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) } packet_io_set_metadata(tx_buffs[0], &meta); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1); - time_echo(s_ctx->session_id, "decryption pkg from nf end"); } void handle_raw_packet_from_tap(const char *data, int len, void *args) { char *src_mac = NULL; char *dst_mac = NULL; - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; @@ -1818,8 +1828,6 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - time_echo(s_ctx->session_id, "raw pkg from tap start"); - marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ @@ -1867,5 +1875,4 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) packet_io_set_metadata(tx_buffs[0], &meta); add_ether_header(dst, src_mac, dst_mac); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1); - time_echo(s_ctx->session_id, "raw pkg from tap end"); } |
