#include #include #include #include #include #include // for NF_ACCEPT #include #include #include #include #include #include #include #include #include "io_uring.h" #include "tfe_packet_io_fs.h" #include "tfe_tcp_restore.h" #include "acceptor_kni_v4.h" #include "tap.h" #include "tfe_packet_io.h" #include "tfe_session_table.h" #include "tfe_fieldstat.h" #include "dablooms.h" #include "timestamp.h" void * g_packet_io_logger = NULL; static int tap_read(int tap_fd, char *buff, int buff_size, void *logger) { int ret = read(tap_fd, buff, buff_size); if (ret < 0) { if (errno != EWOULDBLOCK && errno != EAGAIN) { TFE_LOG_ERROR(logger, "%s: unable to read data from tapfd %d, aborting: %s", LOG_TAG_PKTIO, tap_fd, strerror(errno)); } } return ret; } void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx) { if (ctx) { packet_io_destory(ctx->io); packet_io_fs_destory(ctx->packet_io_fs); free(ctx); ctx = NULL; } return; } struct fieldstat_easy_intercept *packet_io_fieldstat_easy_create(const char *profile, void *logger) { int packet_io_threads=0; int output_fs_interval_ms=0; char app_name[TFE_STRING_MAX]={0}; struct fieldstat_easy_intercept *intercept=NULL; MESA_load_profile_int_def(profile, "packet_io", "packet_io_threads", &packet_io_threads, 0); MESA_load_profile_int_def(profile, "proxy_hits", "output_fs_interval_ms", &output_fs_interval_ms, 500); MESA_load_profile_string_def(profile, "proxy_hits", "app_name", app_name, sizeof(app_name), "proxy_rule_hits"); intercept = tfe_fieldstat_easy_intercept_create(app_name, packet_io_threads, output_fs_interval_ms, logger); return intercept; } struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger) { struct acceptor_kni_v4 *ctx = ALLOC(struct acceptor_kni_v4, 1); MESA_load_profile_int_def(profile, "PACKET_IO", "dup_packet_filter_enable", (int *)&(ctx->dup_packet_filter_enable), 1); MESA_load_profile_int_def(profile, "PACKET_IO", "dup_packet_filter_capacity", (int *)&(ctx->dup_packet_filter_capacity), 1000000); MESA_load_profile_int_def(profile, "PACKET_IO", "dup_packet_filter_timeout", (int *)&(ctx->dup_packet_filter_timeout), 10); // MESA_load_profile not support double ctx->dup_packet_filter_error_rate = 0.00001; MESA_load_profile_int_def(profile, "PACKET_IO", "firewall_sids", (int *)&(ctx->firewall_sids), 1000); MESA_load_profile_int_def(profile, "PACKET_IO", "proxy_sids", (int *)&(ctx->proxy_sids), 1001); MESA_load_profile_int_def(profile, "PACKET_IO", "service_chaining_sids", (int *)&(ctx->sce_sids), 1002); MESA_load_profile_int_def(profile, "PACKET_IO", "packet_io_debug", (int *)&(ctx->debug), 0); MESA_load_profile_int_def(profile, "PACKET_IO", "packet_io_threads", (int *)&(ctx->nr_worker_threads), 8); MESA_load_profile_uint_range(profile, "PACKET_IO", "packet_io_cpu_affinity_mask", TFE_THREAD_MAX, (unsigned int *)ctx->cpu_affinity_mask); ctx->nr_worker_threads = MIN(ctx->nr_worker_threads, TFE_THREAD_MAX); CPU_ZERO(&ctx->coremask); for (int i = 0; i < ctx->nr_worker_threads; i++) { int cpu_id = ctx->cpu_affinity_mask[i]; CPU_SET(cpu_id, &ctx->coremask); } ctx->io = packet_io_create(profile, ctx->nr_worker_threads, &ctx->coremask, logger); if (ctx->io == NULL) { goto error_out; } ctx->packet_io_fs = packet_io_fs_create(profile); if (ctx->packet_io_fs == NULL) { goto error_out; } ctx->metrics = packet_io_fieldstat_easy_create(profile, logger); tfe_get_fieldstat_handle()->intercept = ctx->metrics; return ctx; error_out: acceptor_kni_v4_destroy(ctx); return NULL; } void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx) { if (thread_ctx == NULL) return; int thread_index = thread_ctx->thread_index; struct session_table *session_table = thread_ctx->session_table; struct fieldstat_easy_intercept *metrics=thread_ctx->ref_acceptor_ctx->metrics; session_foreach(session_table, metrics, tfe_fieldstat_intercept_incrby, thread_index); return; } static void *worker_thread_cycle(void *arg) { struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)arg; struct packet_io *handle = thread_ctx->ref_io; void *logger = thread_ctx->logger; #define MAX_REBUFF_SIZE 2048 char buffer[MAX_REBUFF_SIZE]; int pkg_len = 0; char thread_name[16]; int n_pkt_recv = 0; int thread_index = thread_ctx->thread_index; int using_iouring_mode = is_enable_iouring(handle); int fd_on_tap_0 = thread_ctx->tap_ctx->tap_fd; int fd_on_tap_c = thread_ctx->tap_ctx->tap_c; int fd_on_tap_s = thread_ctx->tap_ctx->tap_s; struct io_uring_instance *io_uring_on_tap_0 = thread_ctx->tap_ctx->io_uring_fd; struct io_uring_instance *io_uring_on_tap_c = thread_ctx->tap_ctx->io_uring_c; struct io_uring_instance *io_uring_on_tap_s = thread_ctx->tap_ctx->io_uring_s; struct timespec current_time; clock_gettime(CLOCK_MONOTONIC, ¤t_time); int timeout_ms = 0; uint64_t current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000; uint64_t metrics_last_send_ms = current_timestamp; uint64_t metrics_output_interval_ms = tfe_fieldstat_get_output_interval(thread_ctx->ref_acceptor_ctx->metrics); snprintf(thread_name, sizeof(thread_name), "pkt:worker-%d", thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); while (!worker_thread_ready) { sleep(1); } if (packet_io_thread_init(handle, thread_ctx, logger) != 0) { goto error_out; } if (using_iouring_mode) { io_uring_set_read_cb(io_uring_on_tap_0, handle_raw_packet_from_tap, thread_ctx); io_uring_set_read_cb(io_uring_on_tap_c, handle_decryption_packet_from_tap, thread_ctx); io_uring_set_read_cb(io_uring_on_tap_s, handle_decryption_packet_from_tap, thread_ctx); } TFE_LOG_INFO(logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_index); while (1) { n_pkt_recv = packet_io_polling_nf_interface(handle, thread_index, thread_ctx); if (using_iouring_mode) { n_pkt_recv += io_uring_polling(io_uring_on_tap_0); n_pkt_recv += io_uring_polling(io_uring_on_tap_c); n_pkt_recv += io_uring_polling(io_uring_on_tap_s); } else { if ((pkg_len = tap_read(fd_on_tap_0, buffer, MAX_REBUFF_SIZE, logger)) > 0) { n_pkt_recv++; handle_raw_packet_from_tap(buffer, pkg_len, thread_ctx); } if ((pkg_len = tap_read(fd_on_tap_c, buffer, MAX_REBUFF_SIZE, logger)) > 0) { n_pkt_recv++; handle_decryption_packet_from_tap(buffer, pkg_len, thread_ctx); } if ((pkg_len = tap_read(fd_on_tap_s, buffer, MAX_REBUFF_SIZE, logger)) > 0) { n_pkt_recv++; handle_decryption_packet_from_tap(buffer, pkg_len, thread_ctx); } } clock_gettime(CLOCK_MONOTONIC, ¤t_time); current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000; if (n_pkt_recv == 0) { timeout_ms = metrics_last_send_ms + metrics_output_interval_ms - current_timestamp; if (timeout_ms <= 0) { timeout_ms = 0; } packet_io_thread_wait(handle, thread_ctx, timeout_ms); } if (ATOMIC_READ(&thread_ctx->session_table_need_reset) > 0) { session_table_reset(thread_ctx->session_table); ATOMIC_ZERO(&thread_ctx->session_table_need_reset); } if (current_timestamp - metrics_last_send_ms >= metrics_output_interval_ms) { metrics_all_session_output(thread_ctx); metrics_last_send_ms = current_timestamp; } } error_out: TFE_LOG_ERROR(logger, "%s: worker thread %d exiting", "LOG_TAG_KNI", thread_index); return (void *)NULL; } struct acceptor_kni_v4 *acceptor_kni_v4_create(struct tfe_proxy *proxy, const char *profile) { void *packet_io_logger = NULL; packet_io_logger = (void *)MESA_create_runtime_log_handle("packet_io", RLOG_LV_DEBUG); assert(packet_io_logger != NULL); g_packet_io_logger = packet_io_logger; struct acceptor_kni_v4 *acceptor_ctx = acceptor_ctx_create(profile, packet_io_logger); if (acceptor_ctx == NULL) return NULL; acceptor_ctx->ref_proxy = proxy; for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { acceptor_ctx->work_threads[i].tid = 0; acceptor_ctx->work_threads[i].thread_index = i; acceptor_ctx->work_threads[i].ref_io = acceptor_ctx->io; acceptor_ctx->work_threads[i].ref_acceptor_ctx = acceptor_ctx; acceptor_ctx->work_threads[i].tap_ctx = tfe_tap_ctx_create(&acceptor_ctx->work_threads[i]); if (acceptor_ctx->work_threads[i].tap_ctx == NULL) goto error_out; acceptor_ctx->work_threads[i].session_table = session_table_create(); acceptor_ctx->work_threads[i].ref_proxy = proxy; acceptor_ctx->work_threads[i].ret_fs_state = acceptor_ctx->packet_io_fs; acceptor_ctx->work_threads[i].logger = packet_io_logger; acceptor_ctx->work_threads[i].session_table_need_reset = 0; if (acceptor_ctx->dup_packet_filter_enable) { acceptor_ctx->work_threads[i].dup_packet_filter = expiry_dablooms_init(acceptor_ctx->dup_packet_filter_capacity, acceptor_ctx->dup_packet_filter_error_rate, timestamp_get_sec(), acceptor_ctx->dup_packet_filter_timeout); } } for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { struct packet_io_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; if (pthread_create(&thread_ctx->tid, NULL, worker_thread_cycle, (void *)thread_ctx) < 0) { goto error_out; } } return acceptor_ctx; error_out: for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { tfe_tap_ctx_destory(acceptor_ctx->work_threads[i].tap_ctx); session_table_destory(acceptor_ctx->work_threads[i].session_table); } acceptor_kni_v4_destroy(acceptor_ctx); return NULL; }