diff options
| author | luwenpeng <[email protected]> | 2024-04-11 16:30:21 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-04-11 16:33:35 +0800 |
| commit | 84541c40e2e0bad5c0ab847724a0dc68266c598f (patch) | |
| tree | 4b1d46432218e0e2dcdf7362b2c2f35c9452920a | |
| parent | cfbad63021242f1471f491f4cfdbc40fe17d52da (diff) | |
Refactor main loop
| -rw-r--r-- | conf/stellar.toml | 4 | ||||
| -rw-r--r-- | deps/dablooms/test/gtest_dablooms.cpp | 16 | ||||
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/config/config.cpp | 108 | ||||
| -rw-r--r-- | src/config/config.h | 20 | ||||
| -rw-r--r-- | src/plugin/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/plugin/plugin_manager.cpp | 64 | ||||
| -rw-r--r-- | src/plugin/plugin_manager.h | 29 | ||||
| -rw-r--r-- | src/stellar/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/stellar/stellar.cpp | 207 | ||||
| -rw-r--r-- | src/stellar/stellar.h | 2 |
11 files changed, 240 insertions, 216 deletions
diff --git a/conf/stellar.toml b/conf/stellar.toml index 9a23b36..5f7bd1f 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -1,6 +1,6 @@ [device] -device_base = 1 # [0, 31] -device_offset = 2 # [0, 127] +base = 1 # [0, 31] +offset = 2 # [0, 127] [packet_io] mode = dumpfile # dumpfile, marsio diff --git a/deps/dablooms/test/gtest_dablooms.cpp b/deps/dablooms/test/gtest_dablooms.cpp index 7dc9468..93ad757 100644 --- a/deps/dablooms/test/gtest_dablooms.cpp +++ b/deps/dablooms/test/gtest_dablooms.cpp @@ -14,20 +14,6 @@ struct packet_idetify unsigned int ip_dst; } __attribute__((packed, aligned(1))); -struct config -{ - int enable; - - unsigned int capacity; - double error_rate; - int expiry_time; -} config = { - .enable = 1, - .capacity = 1000000, - .error_rate = 0.00001, - .expiry_time = 3, -}; - struct packet_idetify idetify = { .tcp_seq = 2172673142, .tcp_ack = 2198097831, @@ -41,7 +27,7 @@ struct packet_idetify idetify = { TEST(DABLOOMS, TEST) { - struct expiry_dablooms_handle *handle = expiry_dablooms_new(config.capacity, config.error_rate, 1, config.expiry_time); + struct expiry_dablooms_handle *handle = expiry_dablooms_new(1000000, 0.00001, 1, 3); EXPECT_TRUE(handle != nullptr); EXPECT_TRUE(expiry_dablooms_search(handle, (const char *)&idetify, sizeof(idetify), 1) != 1); // no exist diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 99a227a..9d55a75 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,5 +10,6 @@ add_subdirectory(tcp_reassembly) add_subdirectory(duplicated_packet_filter) add_subdirectory(evicted_session_filter) add_subdirectory(session) +add_subdirectory(plugin) add_subdirectory(config) add_subdirectory(stellar)
\ No newline at end of file diff --git a/src/config/config.cpp b/src/config/config.cpp index 0440c4a..d86b7d4 100644 --- a/src/config/config.cpp +++ b/src/config/config.cpp @@ -19,21 +19,21 @@ static int parse_device_section(toml_table_t *root, struct device_options *opts) return -1; } - ptr = toml_raw_in(table, "device_base"); + ptr = toml_raw_in(table, "base"); if (ptr == NULL) { - CONFIG_LOG_ERROR("config file missing device->device_base"); + CONFIG_LOG_ERROR("config file missing device->base"); return -1; } - opts->device_base = atoi(ptr); + opts->base = atoi(ptr); - ptr = toml_raw_in(table, "device_offset"); + ptr = toml_raw_in(table, "offset"); if (ptr == NULL) { - CONFIG_LOG_ERROR("config file missing device->device_offset"); + CONFIG_LOG_ERROR("config file missing device->offset"); return -1; } - opts->device_offset = atoi(ptr); + opts->offset = atoi(ptr); return 0; } @@ -403,7 +403,7 @@ static int parse_session_manager_section(toml_table_t *root, struct session_mana // return 0: success // retuun -1: failed -int parse_config_file(const char *file, struct config *config) +int stellar_config_load(const char *file, struct stellar_config *config) { int ret = -1; char errbuf[200]; @@ -425,22 +425,22 @@ int parse_config_file(const char *file, struct config *config) goto error_out; } - if (parse_device_section(table, &config->device_opts) != 0) + if (parse_device_section(table, &config->dev_opts) != 0) { goto error_out; } - if (parse_packet_io_section(table, &config->packet_io_opts) != 0) + if (parse_packet_io_section(table, &config->io_opts) != 0) { goto error_out; } - if (parse_ip_reassembly_section(table, &config->ip_reassembly_opts) != 0) + if (parse_ip_reassembly_section(table, &config->ip_opts) != 0) { goto error_out; } - if (parse_session_manager_section(table, &config->session_manager_opts) != 0) + if (parse_session_manager_section(table, &config->sess_mgr_opts) != 0) { goto error_out; } @@ -461,78 +461,78 @@ error_out: return ret; } -void print_config_options(struct config *config) +void stellar_config_print(struct stellar_config *config) { if (config == NULL) { return; } - struct device_options *device_opts = &config->device_opts; - struct packet_io_options *packet_io_opts = &config->packet_io_opts; - struct ip_reassembly_options *ip_reassembly_opts = &config->ip_reassembly_opts; - struct session_manager_options *session_manager_opts = &config->session_manager_opts; + struct device_options *dev_opts = &config->dev_opts; + struct packet_io_options *io_opts = &config->io_opts; + struct ip_reassembly_options *ip_opts = &config->ip_opts; + struct session_manager_options *sess_mgr_opts = &config->sess_mgr_opts; // device config - CONFIG_LOG_DEBUG("device->device_base : %d", device_opts->device_base); - CONFIG_LOG_DEBUG("device->device_offset : %d", device_opts->device_offset); + CONFIG_LOG_DEBUG("device->base : %d", dev_opts->base); + CONFIG_LOG_DEBUG("device->offset : %d", dev_opts->offset); // packet io config - CONFIG_LOG_DEBUG("packet_io->mode : %s", packet_io_opts->mode == PACKET_IO_DUMPFILE ? "dumpfile" : "marsio"); - if (packet_io_opts->mode == PACKET_IO_DUMPFILE) + CONFIG_LOG_DEBUG("packet_io->mode : %s", io_opts->mode == PACKET_IO_DUMPFILE ? "dumpfile" : "marsio"); + if (io_opts->mode == PACKET_IO_DUMPFILE) { - CONFIG_LOG_DEBUG("packet_io->dumpfile_dir : %s", packet_io_opts->dumpfile_dir); + CONFIG_LOG_DEBUG("packet_io->dumpfile_dir : %s", io_opts->dumpfile_dir); } else { - CONFIG_LOG_DEBUG("packet_io->app_symbol : %s", packet_io_opts->app_symbol); - CONFIG_LOG_DEBUG("packet_io->dev_symbol : %s", packet_io_opts->dev_symbol); + CONFIG_LOG_DEBUG("packet_io->app_symbol : %s", io_opts->app_symbol); + CONFIG_LOG_DEBUG("packet_io->dev_symbol : %s", io_opts->dev_symbol); } - CONFIG_LOG_DEBUG("packet_io->nr_threads : %d", packet_io_opts->nr_threads); - for (uint8_t i = 0; i < packet_io_opts->nr_threads; i++) + CONFIG_LOG_DEBUG("packet_io->nr_threads : %d", io_opts->nr_threads); + for (uint8_t i = 0; i < io_opts->nr_threads; i++) { - CONFIG_LOG_DEBUG("packet_io->cpu_mask[%03d] : %d", i, packet_io_opts->cpu_mask[i]); + CONFIG_LOG_DEBUG("packet_io->cpu_mask[%03d] : %d", i, io_opts->cpu_mask[i]); } // ip reassemble config - CONFIG_LOG_DEBUG("ip_reassembly->enable : %d", ip_reassembly_opts->enable); - CONFIG_LOG_DEBUG("ip_reassembly->timeout : %d", ip_reassembly_opts->timeout); - CONFIG_LOG_DEBUG("ip_reassembly->bucket_entries : %d", ip_reassembly_opts->bucket_entries); - CONFIG_LOG_DEBUG("ip_reassembly->bucket_num : %d", ip_reassembly_opts->bucket_num); + CONFIG_LOG_DEBUG("ip_reassembly->enable : %d", ip_opts->enable); + CONFIG_LOG_DEBUG("ip_reassembly->timeout : %d", ip_opts->timeout); + CONFIG_LOG_DEBUG("ip_reassembly->bucket_entries : %d", ip_opts->bucket_entries); + CONFIG_LOG_DEBUG("ip_reassembly->bucket_num : %d", ip_opts->bucket_num); // session manager config -> max session number - CONFIG_LOG_DEBUG("session_manager->max_tcp_session_num : %ld", session_manager_opts->max_tcp_session_num); - CONFIG_LOG_DEBUG("session_manager->max_udp_session_num : %ld", session_manager_opts->max_udp_session_num); + CONFIG_LOG_DEBUG("session_manager->max_tcp_session_num : %ld", sess_mgr_opts->max_tcp_session_num); + CONFIG_LOG_DEBUG("session_manager->max_udp_session_num : %ld", sess_mgr_opts->max_udp_session_num); // session manager config -> session overload evict - CONFIG_LOG_DEBUG("session_manager->tcp_overload_evict_old_sess : %d", session_manager_opts->tcp_overload_evict_old_sess); - CONFIG_LOG_DEBUG("session_manager->udp_overload_evict_old_sess : %d", session_manager_opts->udp_overload_evict_old_sess); + CONFIG_LOG_DEBUG("session_manager->tcp_overload_evict_old_sess : %d", sess_mgr_opts->tcp_overload_evict_old_sess); + CONFIG_LOG_DEBUG("session_manager->udp_overload_evict_old_sess : %d", sess_mgr_opts->udp_overload_evict_old_sess); // session manager config -> session timeout - CONFIG_LOG_DEBUG("session_manager->tcp_init_timeout : %ld", session_manager_opts->tcp_init_timeout); - CONFIG_LOG_DEBUG("session_manager->tcp_handshake_timeout : %ld", session_manager_opts->tcp_handshake_timeout); - CONFIG_LOG_DEBUG("session_manager->tcp_data_timeout : %ld", session_manager_opts->tcp_data_timeout); - CONFIG_LOG_DEBUG("session_manager->tcp_half_closed_timeout : %ld", session_manager_opts->tcp_half_closed_timeout); - CONFIG_LOG_DEBUG("session_manager->tcp_time_wait_timeout : %ld", session_manager_opts->tcp_time_wait_timeout); - CONFIG_LOG_DEBUG("session_manager->tcp_discard_timeout : %ld", session_manager_opts->tcp_discard_timeout); - CONFIG_LOG_DEBUG("session_manager->tcp_unverified_rst_timeout : %ld", session_manager_opts->tcp_unverified_rst_timeout); - CONFIG_LOG_DEBUG("session_manager->udp_data_timeout : %ld", session_manager_opts->udp_data_timeout); - CONFIG_LOG_DEBUG("session_manager->udp_discard_timeout : %ld", session_manager_opts->udp_discard_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_init_timeout : %ld", sess_mgr_opts->tcp_init_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_handshake_timeout : %ld", sess_mgr_opts->tcp_handshake_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_data_timeout : %ld", sess_mgr_opts->tcp_data_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_half_closed_timeout : %ld", sess_mgr_opts->tcp_half_closed_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_time_wait_timeout : %ld", sess_mgr_opts->tcp_time_wait_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_discard_timeout : %ld", sess_mgr_opts->tcp_discard_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_unverified_rst_timeout : %ld", sess_mgr_opts->tcp_unverified_rst_timeout); + CONFIG_LOG_DEBUG("session_manager->udp_data_timeout : %ld", sess_mgr_opts->udp_data_timeout); + CONFIG_LOG_DEBUG("session_manager->udp_discard_timeout : %ld", sess_mgr_opts->udp_discard_timeout); // session manager config -> duplicated packet filter - CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_enable : %d", session_manager_opts->duplicated_packet_filter_enable); - CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_capacity : %d", session_manager_opts->duplicated_packet_filter_capacity); - CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_timeout : %d", session_manager_opts->duplicated_packet_filter_timeout); - CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_error_rate : %f", session_manager_opts->duplicated_packet_filter_error_rate); + CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_enable : %d", sess_mgr_opts->duplicated_packet_filter_enable); + CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_capacity : %d", sess_mgr_opts->duplicated_packet_filter_capacity); + CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_timeout : %d", sess_mgr_opts->duplicated_packet_filter_timeout); + CONFIG_LOG_DEBUG("session_manager->duplicated_packet_filter_error_rate : %f", sess_mgr_opts->duplicated_packet_filter_error_rate); // session manager config -> evicted session filter - CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_enable : %d", session_manager_opts->evicted_session_filter_enable); - CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_capacity : %d", session_manager_opts->evicted_session_filter_capacity); - CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_timeout : %d", session_manager_opts->evicted_session_filter_timeout); - CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_error_rate : %f", session_manager_opts->evicted_session_filter_error_rate); + CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_enable : %d", sess_mgr_opts->evicted_session_filter_enable); + CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_capacity : %d", sess_mgr_opts->evicted_session_filter_capacity); + CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_timeout : %d", sess_mgr_opts->evicted_session_filter_timeout); + CONFIG_LOG_DEBUG("session_manager->evicted_session_filter_error_rate : %f", sess_mgr_opts->evicted_session_filter_error_rate); // session manager config -> TCP reassembly - CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_enable : %d", session_manager_opts->tcp_reassembly_enable); - CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_timeout : %d", session_manager_opts->tcp_reassembly_max_timeout); - CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_segments : %d", session_manager_opts->tcp_reassembly_max_segments); + CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_enable : %d", sess_mgr_opts->tcp_reassembly_enable); + CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_timeout : %d", sess_mgr_opts->tcp_reassembly_max_timeout); + CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_segments : %d", sess_mgr_opts->tcp_reassembly_max_segments); } diff --git a/src/config/config.h b/src/config/config.h index 2ef90af..05583f3 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -15,22 +15,20 @@ extern "C" struct device_options { - uint8_t device_base; - uint8_t device_offset; + uint8_t base; + uint8_t offset; }; -struct config +struct stellar_config { - struct device_options device_opts; - struct packet_io_options packet_io_opts; - struct ip_reassembly_options ip_reassembly_opts; - struct session_manager_options session_manager_opts; + struct device_options dev_opts; + struct packet_io_options io_opts; + struct ip_reassembly_options ip_opts; + struct session_manager_options sess_mgr_opts; }; -// return 0: success -// retuun -1: failed -int parse_config_file(const char *file, struct config *config); -void print_config_options(struct config *config); +int stellar_config_load(const char *file, struct stellar_config *config); +void stellar_config_print(struct stellar_config *config); #ifdef __cpluscplus } diff --git a/src/plugin/CMakeLists.txt b/src/plugin/CMakeLists.txt new file mode 100644 index 0000000..5b996a7 --- /dev/null +++ b/src/plugin/CMakeLists.txt @@ -0,0 +1,3 @@ +add_library(plugin_manager plugin_manager.cpp) +target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_link_libraries(plugin_manager session_manager)
\ No newline at end of file diff --git a/src/plugin/plugin_manager.cpp b/src/plugin/plugin_manager.cpp new file mode 100644 index 0000000..6049352 --- /dev/null +++ b/src/plugin/plugin_manager.cpp @@ -0,0 +1,64 @@ +#include "plugin_manager.h" +#include "session_private.h" + +struct plugin_manager +{ + // TODO +}; + +void *plugin_manager_new_ctx(struct session *sess) +{ + // TODO + return NULL; +} + +void plugin_manager_free_ctx(void *ctx) +{ + // TODO +} + +struct plugin_manager *plugin_manager_new(void) +{ + // TODO + return NULL; +} + +void plugin_manager_free(struct plugin_manager *mgr) +{ + // TODO +} + +void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt) +{ + // TODO + + PLUGIN_MANAGER_LOG_DEBUG("=> plugin dispatch session: %u %s\n", session_get_id(sess), session_get_tuple_str(sess)); + session_dump(sess); + + if (session_get_type(sess) == SESSION_TYPE_TCP) + { + // TODO Trigger TCP ALL MSG with (mgr, sess, pkt) + + do + { + struct tcp_segment *seg = session_get_tcp_segment(sess); + if (seg == NULL) + { + break; + } + + // TODO Trigger TCP Stream MSG with (mgr, sess, seg->data, seg->len) + + session_free_tcp_segment(sess, seg); + } while (1); + } + else + { + // TODO Trigger UDP MSG + } +} + +void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt) +{ + // TODO +}
\ No newline at end of file diff --git a/src/plugin/plugin_manager.h b/src/plugin/plugin_manager.h new file mode 100644 index 0000000..6e67347 --- /dev/null +++ b/src/plugin/plugin_manager.h @@ -0,0 +1,29 @@ +#ifndef _PLUGIN_MANAGER_H +#define _PLUGIN_MANAGER_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include "session.h" + +#define PLUGIN_MANAGER_LOG_ERROR(format, ...) LOG_ERROR("plugin manager", format, ##__VA_ARGS__) +#define PLUGIN_MANAGER_LOG_DEBUG(format, ...) LOG_DEBUG("plugin manager", format, ##__VA_ARGS__) + +// per session context +void *plugin_manager_new_ctx(struct session *sess); +void plugin_manager_free_ctx(void *ctx); + +struct plugin_manager; +struct plugin_manager *plugin_manager_new(void); +void plugin_manager_free(struct plugin_manager *mgr); + +void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt); +void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt); + +#ifdef __cpluscplus +} +#endif + +#endif diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt index d146b1c..ac7e19c 100644 --- a/src/stellar/CMakeLists.txt +++ b/src/stellar/CMakeLists.txt @@ -1,4 +1,4 @@ add_executable(stellar stellar.cpp) -target_link_libraries(stellar timestamp id_generator session_manager pthread config packet_io) +target_link_libraries(stellar timestamp session_manager plugin_manager pthread config packet_io) install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program)
\ No newline at end of file diff --git a/src/stellar/stellar.cpp b/src/stellar/stellar.cpp index 68e2aa9..4ff43c0 100644 --- a/src/stellar/stellar.cpp +++ b/src/stellar/stellar.cpp @@ -17,6 +17,7 @@ #include "id_generator.h" #include "ip_reassembly.h" #include "session_manager.h" +#include "plugin_manager.h" #define STELLAR_LOG_STATE(format, ...) LOG_STATE("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__) @@ -25,87 +26,31 @@ #define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED) #define ATOMIC_READ(x) __atomic_load_n(x, __ATOMIC_RELAXED) -struct thread_context +struct thread_ctx { pthread_t tid; - uint16_t index; - uint64_t need_exit; + uint16_t idx; uint64_t is_runing; - struct session_manager *sess_mgr; struct ip_reassembly *ip_mgr; + struct session_manager *sess_mgr; }; -struct stellar_context +struct stellar_runtime { uint64_t need_exit; - struct config config; - struct packet_io *packet_io; - struct thread_context threads_ctx[MAX_THREAD_NUM]; + struct plugin_manager *plug_mgr; + struct thread_ctx threads[MAX_THREAD_NUM]; }; +struct stellar_runtime __runtime; +struct stellar_runtime *runtime = &__runtime; -struct stellar_context stellar_context; -struct stellar_context *stellar_ctx = &stellar_context; - -// config -struct device_options *device_opts = &stellar_context.config.device_opts; -struct packet_io_options *packet_io_opts = &stellar_context.config.packet_io_opts; -struct ip_reassembly_options *ip_reassembly_opts = &stellar_context.config.ip_reassembly_opts; -struct session_manager_options *session_manager_opts = &stellar_context.config.session_manager_opts; +struct stellar_config __config; +struct stellar_config *config = &__config; static const char *log_config_file = "./conf/log.toml"; static const char *stellar_config_file = "./conf/stellar.toml"; -static void hex_dump(const char *payload, uint32_t len) -{ - printf("Payload Length: %d\n", len); - for (uint32_t i = 0; i < len; i++) - { - if (i > 0 && i % 16 == 0) - { - printf("\n"); - } - printf("%02x ", (uint8_t)payload[i]); - } - printf("\n"); -} - -void *plugin_manager_new_ctx() -{ - return NULL; -} - -void plugin_manager_free_ctx(void *ctx) -{ - return; -} - -void plugin_manager_dispatch(void *plugin_mgr, struct session *sess, const struct packet *pkt) -{ - if (sess == NULL) - { - return; - } - - printf("=> plugin dispatch session: %p\n", sess); - session_dump(sess); - - if (session_get_type(sess) == SESSION_TYPE_TCP) - { - do - { - struct tcp_segment *seg = session_get_tcp_segment(sess); - if (seg == NULL) - { - break; - } - hex_dump((const char *)seg->data, seg->len); - session_free_tcp_segment(sess, seg); - } while (1); - } - printf("<= plugin dispatch session\n"); -} - /****************************************************************************** * util ******************************************************************************/ @@ -115,19 +60,19 @@ static void signal_handler(int signo) if (signo == SIGINT) { STELLAR_LOG_STATE("SIGINT received, exit !!!"); - ATOMIC_SET(&stellar_ctx->need_exit, 1); + ATOMIC_SET(&runtime->need_exit, 1); } if (signo == SIGQUIT) { STELLAR_LOG_STATE("SIGQUIT received, exit !!!"); - ATOMIC_SET(&stellar_ctx->need_exit, 1); + ATOMIC_SET(&runtime->need_exit, 1); } if (signo == SIGTERM) { STELLAR_LOG_STATE("SIGTERM received, exit !!!"); - ATOMIC_SET(&stellar_ctx->need_exit, 1); + ATOMIC_SET(&runtime->need_exit, 1); } if (signo == SIGHUP) @@ -184,23 +129,23 @@ static inline void thread_set_name(const char *thd_symbol, uint16_t thd_idx) prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); } -static void *main_loop(void *arg) +static void *work_thread(void *arg) { + void *plugin_ctx; + struct packet *pkt; + struct packet packets[RX_BURST_MAX]; struct session *sess; struct session *evicted_sess; struct session *expired_sess; - struct packet *pkt; - struct packet packets[RX_BURST_MAX]; - struct thread_context *threads_ctx = (struct thread_context *)arg; - struct packet_io *packet_io = stellar_ctx->packet_io; - struct session_manager *sess_mgr = threads_ctx->sess_mgr; - struct ip_reassembly *ip_reass = threads_ctx->ip_mgr; - void *plug_mgr = NULL; - void *plug_mgr_ctx = NULL; + struct thread_ctx *thr_ctx = (struct thread_ctx *)arg; + struct ip_reassembly *ip_reass = thr_ctx->ip_mgr; + struct session_manager *sess_mgr = thr_ctx->sess_mgr; + struct packet_io *packet_io = runtime->packet_io; + struct plugin_manager *plug_mgr = runtime->plug_mgr; int nr_recv; uint64_t now = 0; - uint16_t thr_idx = threads_ctx->index; + uint16_t thr_idx = thr_ctx->idx; if (packet_io_init(packet_io, thr_idx) != 0) { @@ -208,11 +153,11 @@ static void *main_loop(void *arg) return NULL; } - ATOMIC_SET(&threads_ctx->is_runing, 1); + ATOMIC_SET(&thr_ctx->is_runing, 1); thread_set_name("stellar", thr_idx); STELLAR_LOG_STATE("worker thread %d runing", thr_idx); - while (ATOMIC_READ(&threads_ctx->need_exit) == 0) + while (ATOMIC_READ(&runtime->need_exit) == 0) { now = timestamp_get_msec(); // TODO nr_recv = packet_io_ingress(packet_io, thr_idx, packets, RX_BURST_MAX); @@ -225,7 +170,7 @@ static void *main_loop(void *arg) { pkt = &packets[i]; - // call plugin_manager_dispatch_raw_pkt(); + plugin_manager_dispatch_packet(plug_mgr, pkt); if (packet_is_fragment(pkt)) { struct packet *defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now); @@ -235,8 +180,8 @@ static void *main_loop(void *arg) } else { - // call plugin_manager_dispatch_defrag_pkt(); pkt = defraged_pkt; + plugin_manager_dispatch_packet(plug_mgr, pkt); } } @@ -248,8 +193,8 @@ static void *main_loop(void *arg) { goto fast_path; } - plug_mgr_ctx = plugin_manager_new_ctx(); - session_set_user_data(sess, plug_mgr_ctx); + plugin_ctx = plugin_manager_new_ctx(sess); + session_set_user_data(sess, plugin_ctx); } else { @@ -258,7 +203,7 @@ static void *main_loop(void *arg) goto fast_path; } } - plugin_manager_dispatch(plug_mgr, sess, pkt); + plugin_manager_dispatch_session(plug_mgr, sess, pkt); fast_path: execute_packet_action(packet_io, sess, pkt, thr_idx); @@ -271,16 +216,16 @@ static void *main_loop(void *arg) evicted_sess = session_manager_get_evicted_session(sess_mgr); if (evicted_sess) { - plug_mgr_ctx = session_get_user_data(evicted_sess); - plugin_manager_free_ctx(plug_mgr_ctx); + plugin_ctx = session_get_user_data(evicted_sess); + plugin_manager_free_ctx(plugin_ctx); session_manager_free_session(sess_mgr, evicted_sess); } } while ((expired_sess = session_manager_get_expired_session(sess_mgr, now))) { - plug_mgr_ctx = session_get_user_data(expired_sess); - plugin_manager_free_ctx(plug_mgr_ctx); + plugin_ctx = session_get_user_data(expired_sess); + plugin_manager_free_ctx(plugin_ctx); session_manager_free_session(sess_mgr, expired_sess); } ip_reassembly_expire(ip_reass, now); @@ -291,31 +236,28 @@ static void *main_loop(void *arg) // packet_io_yield(); } - ATOMIC_SET(&threads_ctx->is_runing, 0); + ATOMIC_SET(&thr_ctx->is_runing, 0); STELLAR_LOG_STATE("worker thread %d stop", thr_idx); return NULL; } -static int thread_context_init(struct stellar_context *ctx, uint8_t nr_threads) +static int stellar_thread_init(struct stellar_runtime *ctx, uint8_t nr_threads) { uint64_t now = timestamp_get_msec(); for (uint8_t i = 0; i < nr_threads; i++) { - struct thread_context *threads_ctx = &ctx->threads_ctx[i]; - threads_ctx->index = i; - threads_ctx->need_exit = 0; - threads_ctx->is_runing = 0; - - threads_ctx->sess_mgr = session_manager_new(session_manager_opts, now); - if (threads_ctx->sess_mgr == NULL) + struct thread_ctx *thr_ctx = &ctx->threads[i]; + thr_ctx->idx = i; + thr_ctx->is_runing = 0; + thr_ctx->sess_mgr = session_manager_new(&config->sess_mgr_opts, now); + if (thr_ctx->sess_mgr == NULL) { STELLAR_LOG_ERROR("unable to create session manager"); return -1; } - - threads_ctx->ip_mgr = ip_reassembly_new(ip_reassembly_opts); - if (threads_ctx->ip_mgr == NULL) + thr_ctx->ip_mgr = ip_reassembly_new(&config->ip_opts); + if (thr_ctx->ip_mgr == NULL) { STELLAR_LOG_ERROR("unable to create ip reassemble manager"); return -1; @@ -325,26 +267,26 @@ static int thread_context_init(struct stellar_context *ctx, uint8_t nr_threads) return 0; } -static void thread_context_free(struct stellar_context *ctx, uint8_t nr_threads) +static void stellar_thread_clean(struct stellar_runtime *ctx, uint8_t nr_threads) { for (uint8_t i = 0; i < nr_threads; i++) { - struct thread_context *threads_ctx = &ctx->threads_ctx[i]; - if (ATOMIC_READ(&threads_ctx->is_runing) == 0) + struct thread_ctx *thr_ctx = &ctx->threads[i]; + if (ATOMIC_READ(&thr_ctx->is_runing) == 0) { STELLAR_LOG_STATE("wait worker thread %d free context", i); - session_manager_free(threads_ctx->sess_mgr); - ip_reassembly_free(threads_ctx->ip_mgr); + session_manager_free(thr_ctx->sess_mgr); + ip_reassembly_free(thr_ctx->ip_mgr); } } } -static int thread_new(struct thread_context threads_ctx[], uint8_t nr_threads) +static int stellar_thread_run(struct stellar_runtime *ctx, uint8_t nr_threads) { for (uint8_t i = 0; i < nr_threads; i++) { - struct thread_context *ctx = &threads_ctx[i]; - if (pthread_create(&ctx->tid, NULL, main_loop, (void *)ctx) < 0) + struct thread_ctx *thr_ctx = &ctx->threads[i]; + if (pthread_create(&thr_ctx->tid, NULL, work_thread, (void *)thr_ctx) < 0) { STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); return -1; @@ -354,13 +296,12 @@ static int thread_new(struct thread_context threads_ctx[], uint8_t nr_threads) return 0; } -static void thread_free(struct thread_context threads_ctx[], uint8_t nr_threads) +static void stellar_thread_join(struct stellar_runtime *ctx, uint8_t nr_threads) { for (uint8_t i = 0; i < nr_threads; i++) { - struct thread_context *ctx = &threads_ctx[i]; - ATOMIC_SET(&ctx->need_exit, 1); - while (ATOMIC_READ(&ctx->is_runing) == 1) + struct thread_ctx *thr_ctx = &ctx->threads[i]; + while (ATOMIC_READ(&thr_ctx->is_runing) == 1) { STELLAR_LOG_STATE("wait worker thread %d stop", i); sleep(1); @@ -374,7 +315,7 @@ static void thread_free(struct thread_context threads_ctx[], uint8_t nr_threads) int main(int argc, char **argv) { - memset(stellar_ctx, 0, sizeof(struct stellar_context)); + memset(runtime, 0, sizeof(struct stellar_runtime)); timestamp_update(); signal(SIGINT, signal_handler); @@ -387,57 +328,59 @@ int main(int argc, char **argv) STELLAR_LOG_ERROR("unable to init log"); return -1; } - STELLAR_LOG_STATE("Start Stellar (version: %s)\n %s", __stellar_version, logo_str); - if (parse_config_file(stellar_config_file, &stellar_ctx->config) != 0) + if (stellar_config_load(stellar_config_file, config) != 0) { STELLAR_LOG_ERROR("unable to load config file"); return -1; } + stellar_config_print(config); - print_config_options(&stellar_ctx->config); - - if (id_generator_init(device_opts->device_base, device_opts->device_offset) != 0) + if (id_generator_init(config->dev_opts.base, config->dev_opts.offset) != 0) { STELLAR_LOG_ERROR("unable to init id generator"); return -1; } - // TODO load plugin + runtime->plug_mgr = plugin_manager_new(); + if (runtime->plug_mgr == NULL) + { + STELLAR_LOG_ERROR("unable to create plugin manager"); + return -1; + } - uint8_t nr_threads = packet_io_opts->nr_threads; - stellar_ctx->packet_io = packet_io_new(packet_io_opts); - if (stellar_ctx->packet_io == NULL) + uint8_t nr_threads = config->io_opts.nr_threads; + runtime->packet_io = packet_io_new(&config->io_opts); + if (runtime->packet_io == NULL) { STELLAR_LOG_ERROR("unable to create packet io"); goto error_out; } - if (thread_context_init(stellar_ctx, nr_threads) != 0) + if (stellar_thread_init(runtime, nr_threads) != 0) { STELLAR_LOG_ERROR("unable to init thread context"); goto error_out; } - if (thread_new(stellar_ctx->threads_ctx, nr_threads) != 0) + if (stellar_thread_run(runtime, nr_threads) != 0) { STELLAR_LOG_ERROR("unable to create worker thread"); goto error_out; } - while (!ATOMIC_READ(&stellar_ctx->need_exit)) + while (!ATOMIC_READ(&runtime->need_exit)) { timestamp_update(); sleep(1); } error_out: - thread_free(stellar_ctx->threads_ctx, nr_threads); - thread_context_free(stellar_ctx, nr_threads); - packet_io_free(stellar_ctx->packet_io); - - // TODO free plugin + stellar_thread_join(runtime, nr_threads); + stellar_thread_clean(runtime, nr_threads); + packet_io_free(runtime->packet_io); + plugin_manager_free(runtime->plug_mgr); log_free(); diff --git a/src/stellar/stellar.h b/src/stellar/stellar.h index 12af061..204f038 100644 --- a/src/stellar/stellar.h +++ b/src/stellar/stellar.h @@ -9,7 +9,7 @@ extern "C" #include <stdint.h> #define MAX_THREAD_NUM 256 -#define RX_BURST_MAX 64 +#define RX_BURST_MAX 32 #define ATOMIC_INC(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED) #define ATOMIC_DEC(x) __atomic_fetch_sub(x, 1, __ATOMIC_RELAXED) |
