diff options
| author | lijia <[email protected]> | 2024-10-18 16:47:51 +0800 |
|---|---|---|
| committer | lijia <[email protected]> | 2024-11-07 18:30:58 +0800 |
| commit | e734af76d81b07090c618b1c4af3b2fdd6b592f3 (patch) | |
| tree | c9b894fb0eaa9c56bd5f04bfab5628a97592091d /infra | |
| parent | 99a68d5c9efe500ab339165a2af515a0a7355ada (diff) | |
rebase onto develop-2.0
Diffstat (limited to 'infra')
23 files changed, 3629 insertions, 3 deletions
diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt index e2e45ee..fc0ad21 100644 --- a/infra/CMakeLists.txt +++ b/infra/CMakeLists.txt @@ -1,5 +1,5 @@ -set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager) -set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml) +set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor) +set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml ringbuf) set(DECODERS lpi_plus) set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS}) set(LIBS fieldstat4) diff --git a/infra/monitor/CMakeLists.txt b/infra/monitor/CMakeLists.txt new file mode 100644 index 0000000..0588f8c --- /dev/null +++ b/infra/monitor/CMakeLists.txt @@ -0,0 +1,22 @@ +# add_subdirectory(enforcer) + +add_library(monitor + monitor_cmd_assistant.c + monitor_transaction.c + monitor_server.c + monitor_utils.c + monitor_stat.c + monitor_spinlock.c + monitor_ringbuf.c + monitor_rpc.c +) + +include_directories(${CMAKE_SOURCE_DIR}/include/) +include_directories(${CMAKE_SOURCE_DIR}/deps) +include_directories(${CMAKE_SOURCE_DIR}/infra) +target_include_directories(monitor PUBLIC ${CMAKE_CURRENT_LIST_DIR}) + +set_target_properties(monitor PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/infra/monitor/version.map") +set_target_properties(monitor PROPERTIES PREFIX "") +target_link_libraries(monitor toml sds linenoise tuple session_manager libevent-static libevent-static cjson-static ringbuf) +target_link_options(monitor PRIVATE -rdynamic)
\ No newline at end of file diff --git a/infra/monitor/enforcer/CMakeLists.txt b/infra/monitor/enforcer/CMakeLists.txt new file mode 100644 index 0000000..2691059 --- /dev/null +++ b/infra/monitor/enforcer/CMakeLists.txt @@ -0,0 +1,11 @@ +add_library(monitor_enforcer STATIC show_session_enforcer.c ) +include_directories(${CMAKE_SOURCE_DIR}/include/) +include_directories(${CMAKE_SOURCE_DIR}/deps) +include_directories(${CMAKE_SOURCE_DIR}/deps/uthash) +include_directories(${CMAKE_SOURCE_DIR}/deps/timeout) +include_directories(${CMAKE_SOURCE_DIR}/infra) +include_directories(${CMAKE_SOURCE_DIR}/infra/tuple/) +include_directories(${CMAKE_SOURCE_DIR}/infra/packet_manager) +include_directories(${CMAKE_SOURCE_DIR}/infra/tcp_reassembly) + +target_include_directories(monitor_enforcer PUBLIC ${CMAKE_CURRENT_LIST_DIR})
\ No newline at end of file diff --git a/infra/monitor/enforcer/show_session_enforcer.c b/infra/monitor/enforcer/show_session_enforcer.c new file mode 100644 index 0000000..f72218a --- /dev/null +++ b/infra/monitor/enforcer/show_session_enforcer.c @@ -0,0 +1,818 @@ +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <stdint.h> +#include <stddef.h> +#include <assert.h> +#include "stellar/session.h" +#include "stellar/monitor.h" +#include "session_manager/session_manager_rte.h" +#include "monitor/monitor_utils.h" +#include "monitor/monitor_rpc.h" +#include "sds/sds.h" +#include "session_manager/session_internal.h" + +// temp add +extern struct session_manager_rte *session_manager_get_runtime(struct session_manager *sess_mgr, uint16_t thread_id); + +#define SHOW_SESSION_BRIEF_LIMIT_DEFAULT 10 +#define SHOW_SESSION_BRIEF_LIMIT_MAX 1000 +#define SHOW_SESSION_BRIEF_SCAN_COUNT_DEFAULT 100 + +/* show session brief */ +struct stm_show_session_brief_opt +{ +#define SESSION_SCAN_SNET (1 << 25) +#define SESSION_SCAN_DNET (1 << 26) +#define SESSION_SCAN_ID (1 << 27) +#define SESSION_SCAN_CURSOR (1 << 28) +#define SESSION_SCAN_COUNT (1 << 29) +#define SESSION_SCAN_LIMIT (1 << 30) + struct session_scan_opts scan_opt; + uint32_t limit; +}; + +/* show session detail <id> */ +struct stm_show_session_detail_opt +{ + uint64_t sess_id; + int thread_idx; // todo, not used now +}; + +struct show_session_brief_result +{ + uint64_t sid; + int thread_index; + enum session_state state; + enum session_type protocol; + time_t create_time_in_sec; + time_t last_pkt_time_in_sec; + uint8_t flow_dir; + struct tuple6 addr; +} __attribute__((packed)); + +struct show_session_brief_array +{ + size_t array_num; + struct show_session_brief_result array[0]; /* Continuous memory, array_num * sizeof(struct show_session_brief_result) */ +} __attribute__((packed)); + +struct show_session_detail_result +{ + struct show_session_brief_result sess_brief; + unsigned long long sess_stat[MAX_FLOW_TYPE][MAX_STAT]; + enum session_direction direction; // in or out + unsigned char application; // L7 appid + // todo, other info +} __attribute__((packed)); + +static void stm_session_brief_cli_args_set_default(struct stm_show_session_brief_opt *show_opt) +{ + memset(show_opt, 0, sizeof(struct stm_show_session_brief_opt)); + show_opt->limit = SHOW_SESSION_BRIEF_LIMIT_DEFAULT; + show_opt->scan_opt.cursor = 0; + show_opt->scan_opt.count = SHOW_SESSION_BRIEF_SCAN_COUNT_DEFAULT; +} + +static uint32_t stm_session_brief_cli_args_get_flags(const char *para_name) +{ + if (NULL == para_name) + { + return 0; + } + + if (strncasecmp(para_name, "id", 2) == 0) + { + return SESSION_SCAN_ID; + } + else if (strncasecmp(para_name, "cursor", 6) == 0) + { + return SESSION_SCAN_CURSOR; + } + else if (strncasecmp(para_name, "count", 5) == 0) + { + return SESSION_SCAN_COUNT; + } + else if (strncasecmp(para_name, "state", 5) == 0) + { + return SESSION_SCAN_STATE; + } + else if (strncasecmp(para_name, "sip", 3) == 0) + { + return SESSION_SCAN_SIP; + } + else if (strncasecmp(para_name, "dip", 3) == 0) + { + return SESSION_SCAN_DIP; + } + else if (strncasecmp(para_name, "snet", 4) == 0) + { + return SESSION_SCAN_SNET; + } + else if (strncasecmp(para_name, "dnet", 4) == 0) + { + return SESSION_SCAN_DNET; + } + else if (strncasecmp(para_name, "sport", 5) == 0) + { + return SESSION_SCAN_SPORT; + } + else if (strncasecmp(para_name, "dport", 5) == 0) + { + return SESSION_SCAN_DPORT; + } + else if (strncasecmp(para_name, "protocol", 8) == 0) + { + return SESSION_SCAN_TYPE; + } + else if (strncasecmp(para_name, "limit", 5) == 0) + { + return SESSION_SCAN_LIMIT; + } + else if (strncasecmp(para_name, "create-time-in", strlen("create-time-in")) == 0) + { + return SESSION_SCAN_CREATE_TIME; + } + else if (strncasecmp(para_name, "last-pkt-time-in", strlen("last-pkt-time-in")) == 0) + { + return SESSION_SCAN_LASPKT_TIME; + } + return 0; +} + +static enum session_state show_session_state_pton(const char *state_str) +{ + if (strncasecmp(state_str, "opening", strlen("opening")) == 0) + { + return SESSION_STATE_OPENING; + } + else if (strncasecmp(state_str, "active", strlen("active")) == 0) + { + return SESSION_STATE_ACTIVE; + } + else if (strncasecmp(state_str, "closing", strlen("closing")) == 0) + { + return SESSION_STATE_CLOSING; + } + + return MAX_STATE; +} + +static uint32_t show_session_ipaddr_ntop(const char *ip_string, struct session_scan_opts *scan_opt, uint32_t flag) +{ + uint32_t addr_family; + if (SESSION_SCAN_SIP == flag) + { + addr_family = stm_inet_pton(ip_string, &scan_opt->src_addr[0].v4, &scan_opt->src_addr[0].v6); + } + else + { + addr_family = stm_inet_pton(ip_string, &scan_opt->dst_addr[0].v4, &scan_opt->dst_addr[0].v6); + } + + if (addr_family == 0) + { + return 0; + } + if (AF_INET == addr_family) + { + if (SESSION_SCAN_SIP == flag) + { + scan_opt->src_addr[1].v4 = scan_opt->src_addr[0].v4; + } + else + { + scan_opt->dst_addr[1].v4 = scan_opt->dst_addr[0].v4; + } + } + else + { + if (SESSION_SCAN_SIP == flag) + { + scan_opt->src_addr[1].v6 = scan_opt->src_addr[0].v6; + } + else + { + scan_opt->dst_addr[1].v6 = scan_opt->dst_addr[0].v6; + } + } + return addr_family; +} + +static sds show_session_cli_args_sanity_check(const struct stm_show_session_brief_opt *brief_opt) +{ + uint32_t flags = brief_opt->scan_opt.flags; + sds ss = sdsempty(); + + if ((flags & SESSION_SCAN_SIP) && (flags & SESSION_SCAN_SNET)) + { + ss = sdscatprintf(ss, "error: the 'sip' and 'snet' options conflict!"); + return ss; + } + if ((flags & SESSION_SCAN_DIP) && (flags & SESSION_SCAN_DNET)) + { + ss = sdscatprintf(ss, "error: the 'dip' and 'dnet' options conflict!"); + return ss; + } + return ss; +} + +static int show_session_is_help(int argc, char *argv[]) +{ + for (int i = 0; i < argc; i++) + { + if (strncasecmp(argv[i], "help", 4) == 0) + { + return 1; + } + if (strncasecmp(argv[i], "--help", 6) == 0) + { + return 1; + } + if (strncasecmp(argv[i], "-h", 2) == 0) + { + return 1; + } + } + return 0; +} + +static struct monitor_reply *show_session_brief_usage(void) +{ + sds ss = sdsempty(); + ss = sdscatprintf(ss, "Usage: show session brief [options]\n"); + ss = sdscatprintf(ss, "Options:\n"); + ss = sdscatprintf(ss, " -h, --help help\tdisplay usage information and exit\n"); + ss = sdscatprintf(ss, " cursor <cursor>\n"); + ss = sdscatprintf(ss, " count <count>\n"); + ss = sdscatprintf(ss, " state <opening|active|closing>\n"); + ss = sdscatprintf(ss, " protocol <tcp|udp>\n"); + ss = sdscatprintf(ss, " sip <source ip>\n"); + ss = sdscatprintf(ss, " dip <destination ip>\n"); + ss = sdscatprintf(ss, " sport <source port>\n"); + ss = sdscatprintf(ss, " dport <destination port>\n"); + ss = sdscatprintf(ss, " snet <source network/netmask>\texample 192.168.1.0/24\n"); + ss = sdscatprintf(ss, " dnet <destination network/netmask>\texample 1234::abcd/48\n"); + ss = sdscatprintf(ss, " create-time-in <last-N-[seconds|minutes|hours|days]>\texample last-1-hours\n"); + ss = sdscatprintf(ss, " last-pkt-time-in <last-N-[seconds|minutes|hours|days]>\texample last-7-days\n"); + ss = sdscatprintf(ss, " limit <limit>\n"); + + struct monitor_reply *reply = monitor_reply_new_string("%s", ss); + sdsfree(ss); + return reply; +} + +static struct monitor_reply *show_session_detail_usage(void) +{ + sds ss = sdsempty(); + ss = sdscatprintf(ss, "Usage: show session detial [options]\n"); + ss = sdscatprintf(ss, "Options:\n"); + ss = sdscatprintf(ss, " -h, --help help\tdisplay usage information and exit\n"); + ss = sdscatprintf(ss, " id <session id>\n"); + struct monitor_reply *reply = monitor_reply_new_string("%s", ss); + sdsfree(ss); + return reply; +} + +/* + "show session ..." command args parser + return: + NULL: success, brief_opt is filled + not NULL: error message + */ +static struct monitor_reply *show_session_brief_cli_args_parse(int argc, char *argv[], struct stm_show_session_brief_opt *brief_opt) +{ + uint32_t ipaddr_family = 0, history_ipaddr_family = 0; + sds ss = NULL; + uint32_t flag; + struct monitor_reply *error_reply = NULL; + const char *opt_name, *opt_value; + + stm_session_brief_cli_args_set_default(brief_opt); + int i = 3; // skip "show session brief" + while (i < argc) + { + ipaddr_family = 0; + opt_name = argv[i]; + flag = stm_session_brief_cli_args_get_flags(opt_name); + if (i + 1 >= argc) + { + error_reply = monitor_reply_new_error("option %s requires an argument\n", opt_name); + return error_reply; + } + opt_value = argv[++i]; + + switch (flag) + { + case SESSION_SCAN_TYPE: + { + if (strncasecmp(opt_value, "tcp", 3) == 0) + { + brief_opt->scan_opt.type = SESSION_TYPE_TCP; + } + else if (strncasecmp(opt_value, "udp", 3) == 0) + { + brief_opt->scan_opt.type = SESSION_TYPE_UDP; + } + else + { + error_reply = monitor_reply_new_error("unsupported protocol type: %s. should be <tcp|udp>\n", opt_value); + goto error_exit; + } + } + break; + case SESSION_SCAN_STATE: + { + enum session_state tmp_state = show_session_state_pton(opt_value); + if (tmp_state == MAX_STATE) + { + error_reply = monitor_reply_new_error("unrecognized session state: %s. should be <opening|active|closing>\n", opt_value); + goto error_exit; + } + brief_opt->scan_opt.state = tmp_state; + } + break; + // case SESSION_SCAN_ID: + // if (stm_string_isdigit(opt_value) == 0) + // { + // *error_reply = monitor_reply_new_error("invalid session id: %s. should be integer in range: <1 - UINT64_MAX>\n", opt_value); + // goto error_exit; + // } + // show_opt->detail_opt.sess_id = strtoull(opt_value, NULL, 10); + // break; + case SESSION_SCAN_SIP: + ipaddr_family = show_session_ipaddr_ntop(opt_value, &brief_opt->scan_opt, SESSION_SCAN_SIP); + if (ipaddr_family == 0) + { + error_reply = monitor_reply_new_error("invalid sip address: %s\n", opt_value); + goto error_exit; + } + brief_opt->scan_opt.addr_family = ipaddr_family; + break; + case SESSION_SCAN_DIP: + ipaddr_family = show_session_ipaddr_ntop(opt_value, &brief_opt->scan_opt, SESSION_SCAN_DIP); + if (ipaddr_family == 0) + { + error_reply = monitor_reply_new_error("invalid dip address: %s\n", opt_value); + goto error_exit; + } + brief_opt->scan_opt.addr_family = ipaddr_family; + break; + + case SESSION_SCAN_SNET: + { + uint32_t ipv4addr, ipv4mask; + struct in6_addr ipv6addr, ipv6mask; + ipaddr_family = stm_ip_cidr_pton(opt_value, &ipv4addr, &ipv4mask, &ipv6addr, &ipv6mask); + if (ipaddr_family == 0) + { + error_reply = monitor_reply_new_error("invalid snet CIDR address: %s\n", opt_value); + goto error_exit; + } + if (AF_INET == ipaddr_family) + { + uint32_t ipv4_range[2]; + stm_ipv4_cidr_to_range(ipv4addr, ipv4mask, ipv4_range); + brief_opt->scan_opt.src_addr[0].v4.s_addr = ipv4_range[0]; + brief_opt->scan_opt.src_addr[1].v4.s_addr = ipv4_range[1]; + } + else + { + struct in6_addr ipv6_range[2]; + stm_ipv6_cidr_to_range(&ipv6addr, &ipv6mask, ipv6_range); + brief_opt->scan_opt.src_addr[0].v6 = ipv6_range[0]; + brief_opt->scan_opt.src_addr[1].v6 = ipv6_range[1]; + } + brief_opt->scan_opt.addr_family = ipaddr_family; + flag = SESSION_SCAN_SIP; + } + break; + case SESSION_SCAN_DNET: + { + uint32_t ipv4addr, ipv4mask; + struct in6_addr ipv6addr, ipv6mask; + ipaddr_family = stm_ip_cidr_pton(opt_value, &ipv4addr, &ipv4mask, &ipv6addr, &ipv6mask); + if (ipaddr_family == 0) + { + error_reply = monitor_reply_new_error("invalid dnet CIDR address: %s\n", opt_value); + goto error_exit; + } + if (AF_INET == ipaddr_family) + { + uint32_t ipv4_range[2]; + stm_ipv4_cidr_to_range(ipv4addr, ipv4mask, ipv4_range); + brief_opt->scan_opt.dst_addr[0].v4.s_addr = ipv4_range[0]; + brief_opt->scan_opt.dst_addr[1].v4.s_addr = ipv4_range[1]; + } + else + { + struct in6_addr ipv6_range[2]; + stm_ipv6_cidr_to_range(&ipv6addr, &ipv6mask, ipv6_range); + brief_opt->scan_opt.dst_addr[0].v6 = ipv6_range[0]; + brief_opt->scan_opt.dst_addr[1].v6 = ipv6_range[1]; + } + brief_opt->scan_opt.addr_family = ipaddr_family; + flag = SESSION_SCAN_DIP; + } + break; + case SESSION_SCAN_SPORT: + { + if (stm_string_isdigit(opt_value) == 0) + { + error_reply = monitor_reply_new_error("illegal sport: %s. should be integer\n", opt_value); + goto error_exit; + } + int tmp_val = atoi(opt_value); + if (tmp_val <= 0 || tmp_val > 65535) + { + error_reply = monitor_reply_new_error("illegal sport: %s. should be <1-65535>\n", opt_value); + goto error_exit; + } + brief_opt->scan_opt.src_port = htons((unsigned short)tmp_val); + } + break; + case SESSION_SCAN_DPORT: + { + if (stm_string_isdigit(opt_value) == 0) + { + error_reply = monitor_reply_new_error("illegal sport: %s. should be integer\n", opt_value); + goto error_exit; + } + int tmp_val = atoi(opt_value); + if (tmp_val <= 0 || tmp_val > 65535) + { + error_reply = monitor_reply_new_error("illegal sport: %s. should be <1-65535>\n", opt_value); + goto error_exit; + } + brief_opt->scan_opt.dst_port = htons((unsigned short)tmp_val); + } + break; + + case SESSION_SCAN_CURSOR: + brief_opt->scan_opt.cursor = strtoul(opt_value, NULL, 10); + break; + case SESSION_SCAN_COUNT: + brief_opt->scan_opt.count = strtoul(opt_value, NULL, 10); + if (brief_opt->scan_opt.count == 0) + { + error_reply = monitor_reply_new_error("illegal count: %s. should be <1 - UINT32_MAX>\n", opt_value); + goto error_exit; + } + break; + case SESSION_SCAN_LIMIT: + brief_opt->limit = strtoul(opt_value, NULL, 10); + if (brief_opt->limit == 0 || brief_opt->limit > SHOW_SESSION_BRIEF_LIMIT_MAX) + { + error_reply = monitor_reply_new_error("illegal limit: %s. should be <1 - %u>\n", opt_value, SHOW_SESSION_BRIEF_LIMIT_MAX); + goto error_exit; + } + break; + case SESSION_SCAN_CREATE_TIME: + { + time_t tmp_range[2] = {0, 0}; + if (stm_time_range_pton(opt_value, time(NULL), tmp_range) < 0) + { + error_reply = monitor_reply_new_error("invalid create-time-in: %s. \r\nsyntax: <last-N-[seconds|minutes|hours|days]>\n", opt_value); + goto error_exit; + } + brief_opt->scan_opt.create_time_ms[0] = tmp_range[0] * 1000; // second to ms + brief_opt->scan_opt.create_time_ms[1] = tmp_range[1] * 1000; // second to ms + } + break; + case SESSION_SCAN_LASPKT_TIME: + { + time_t tmp_range[2] = {0, 0}; + if (stm_time_range_pton(opt_value, time(NULL), tmp_range) < 0) + { + error_reply = monitor_reply_new_error("invalid last-pkt-time-in: %s. \r\nsyntax: <last-N-[seconds|minutes|hours|days]>\n", opt_value); + goto error_exit; + } + brief_opt->scan_opt.laspkt_time_ms[0] = tmp_range[0] * 1000; // second to ms + brief_opt->scan_opt.laspkt_time_ms[1] = tmp_range[1] * 1000; // second to ms + } + break; + default: + error_reply = monitor_reply_new_error("unrecognized params: %s \n", opt_name); + return error_reply; + } + + if ((history_ipaddr_family != 0) && (ipaddr_family != 0) && (history_ipaddr_family != ipaddr_family)) + { + error_reply = monitor_reply_new_error("contradictory ip version, expression rejects all sessions!\n"); + goto error_exit; + } + history_ipaddr_family = ipaddr_family; + i++; // to next option + brief_opt->scan_opt.flags |= flag; + } + ss = show_session_cli_args_sanity_check(brief_opt); + if (ss && sdslen(ss) > 0) + { + error_reply = monitor_reply_new_error("%s\n", ss); + sdsfree(ss); + goto error_exit; + } + sdsfree(ss); + error_reply = NULL; + return NULL; + +error_exit: + return error_reply; +} + +static void get_single_session_brief(struct session *sess, int thread_id, struct show_session_brief_result *brief) +{ + brief->thread_index = thread_id; + brief->state = session_get_current_state(sess); + brief->protocol = session_get_type(sess); + session_is_symmetric(sess, &brief->flow_dir); + brief->create_time_in_sec = session_get_timestamp(sess, SESSION_TIMESTAMP_START) / 1000; // ms to sec + brief->last_pkt_time_in_sec = session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) / 1000; // ms to sec + const struct tuple6 *tp6 = session_get_tuple6(sess); + memcpy(&brief->addr, tp6, sizeof(struct tuple6)); +} + +struct iovec show_session_brief_on_worker_thread_rpc_cb(int thread_idx, struct iovec request, void *args) +{ + struct iovec response = {}; + assert(request.iov_len == sizeof(struct stm_show_session_brief_opt)); + struct stm_show_session_brief_opt *show_brief_opt = (struct stm_show_session_brief_opt *)request.iov_base; + + uint64_t session_id_array[SHOW_SESSION_BRIEF_LIMIT_MAX]; + uint64_t session_id_array_count = SHOW_SESSION_BRIEF_LIMIT_MAX; + struct session_manager *sess_mgr = stellar_module_get_session_manager((struct stellar_module_manager *)args); + struct session_manager_rte *sess_mgr_rt = session_manager_get_runtime(sess_mgr, thread_idx); + session_id_array_count = session_manager_rte_scan_session(sess_mgr_rt, &show_brief_opt->scan_opt, session_id_array, show_brief_opt->limit); + if (session_id_array_count == 0) + { + // no session match the filter params, but no error! still need to build a empty reply! + // go on !!! + response.iov_base = NULL; + response.iov_len = 0; + return response; + } + + struct show_session_brief_result *brief_result_array = (struct show_session_brief_result *)calloc(session_id_array_count, sizeof(struct show_session_brief_result)); + + for (uint32_t i = 0; i < session_id_array_count; i++) + { + struct session *sess = session_manager_rte_lookup_session_by_id(sess_mgr_rt, session_id_array[i]); + assert(sess != NULL); + get_single_session_brief(sess, thread_idx, &brief_result_array[i]); + brief_result_array[i].sid = session_id_array[i]; + } + response.iov_base = brief_result_array; + response.iov_len = session_id_array_count; + return response; +} + +static sds session_brief_to_readable(const struct show_session_brief_result *sess_brief) +{ + char time_buf[64]; + char addr_buf[256]; + sds ss = sdsempty(); + strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_brief->create_time_in_sec)); + ss = sdscatprintf(ss, "%-6d %-21lu %-5s %-8s %-20s %s", sess_brief->thread_index, sess_brief->sid, + (sess_brief->protocol == SESSION_TYPE_TCP) ? "tcp" : "udp", + stm_session_state_ntop(sess_brief->state), time_buf, + stm_get0_readable_session_addr(&sess_brief->addr, addr_buf, sizeof(addr_buf))); + return ss; +} + +static struct monitor_reply *show_session_brief_cmd_cb(struct stellar_monitor *monitor, int argc, char *argv[], void *arg) +{ + struct stm_show_session_brief_opt brief_opt = {}; + if (show_session_is_help(argc, argv)) + { + return show_session_brief_usage(); + } + struct monitor_reply *error_reply = show_session_brief_cli_args_parse(argc, argv, &brief_opt); + if (error_reply != NULL) + { + return error_reply; + } + struct monitor_reply *cmd_reply; + struct stellar_module_manager *mod_mgr = (struct stellar_module_manager *)arg; + int thread_num = stellar_module_manager_get_max_thread_num(mod_mgr); + struct iovec request; + request.iov_base = (void *)&brief_opt; + request.iov_len = sizeof(struct stm_show_session_brief_opt); + + struct iovec response[thread_num]; + memset(response, 0, sizeof(response)); + size_t tot_sess_id_num = 0; + sds ss = sdsempty(); + ss = sdscatprintf(ss, "%-6s %-21s %-5s %-8s %-20s %s\r\n", "thread", "session-id", "proto", "state", "create-time", "tuple4(sip:sport-dip:dport)"); + ss = sdscatprintf(ss, "--------------------------------------------------------------------------------------------\r\n"); + + for (int i = 0; i < thread_num; i++) + { + response[i] = monitor_worker_thread_rpc(monitor, i, request, show_session_brief_on_worker_thread_rpc_cb, mod_mgr); + if (response[i].iov_base == NULL || response[i].iov_len == 0) // empty result + { + continue; + } + struct show_session_brief_result *brief_res_array = (struct show_session_brief_result *)(response[i].iov_base); + tot_sess_id_num += response[i].iov_len; // session_id_array_count + for (size_t j = 0; j < response[i].iov_len; j++) + { + ss = sdscatprintf(ss, "%s\n", session_brief_to_readable(&brief_res_array[j])); + } + if (tot_sess_id_num >= brief_opt.limit) + { + break; + } + } + + if (tot_sess_id_num == 0) + { + cmd_reply = monitor_reply_new_string("No session found"); + goto empty_result; + } + cmd_reply = monitor_reply_new_string("%s", ss); + +empty_result: + sdsfree(ss); + for (int i = 0; i < thread_num; i++) + { + if (response[i].iov_base) + { + free(response[i].iov_base); + } + } + return cmd_reply; +} + +/* + todo: add thread id, + fast patch, avoid traversing all worker threads + */ +static struct monitor_reply *show_session_detail_cli_args_parse(int argc, char *argv[], struct stm_show_session_detail_opt *detail_opt) +{ + if (argc < 4) + { + return monitor_reply_new_error("missing session id\n"); + } + if (argc > 5) + { + return monitor_reply_new_error("too many arguments\n"); + } + if (strncasecmp(argv[3], "id", 2) != 0) + { + return monitor_reply_new_error("missing session id\n"); + } + + if (stm_string_isdigit(argv[4]) == 0) + { + return monitor_reply_new_error("invalid session id: %s. should be integer in range: <1 - UINT64_MAX>\n", argv[4]); + } + detail_opt->sess_id = strtoull(argv[4], NULL, 10); + return NULL; +} + +struct iovec show_session_detail_on_worker_thread_rpc_cb(int thread_idx, struct iovec request, void *args) +{ + struct iovec response = {}; + assert(request.iov_len == sizeof(struct stm_show_session_detail_opt)); + struct stm_show_session_detail_opt *detail_opt = (struct stm_show_session_detail_opt *)request.iov_base; + + struct session_manager *sess_mgr = stellar_module_get_session_manager((struct stellar_module_manager *)args); + struct session_manager_rte *sess_mgr_rt = session_manager_get_runtime(sess_mgr, thread_idx); + + struct session *sess = session_manager_rte_lookup_session_by_id(sess_mgr_rt, detail_opt->sess_id); + if (NULL == sess) + { + return response; + } + + struct show_session_detail_result *detail_res = (struct show_session_detail_result *)calloc(1, sizeof(struct show_session_detail_result)); + get_single_session_brief(sess, thread_idx, &detail_res->sess_brief); + detail_res->sess_brief.sid = detail_opt->sess_id; + detail_res->direction = session_get_direction(sess); + // todo, get some exact stat, not all + memcpy(detail_res->sess_stat, sess->stats, sizeof(detail_res->sess_stat)); + + // todo, get application info + + response.iov_base = detail_res; + response.iov_len = sizeof(struct show_session_detail_result); + return response; +} + +static sds session_detail_to_readable(const struct show_session_detail_result *sess_detail) +{ + char addr_buf[256]; + sds ss = sdsempty(); + char time_buf[64]; +#define SHOW_SESSION_DETAIL_NAME_FORMAT "%-30s" +#define SHOW_SESSION_DETAIL_VALUE_FORMAT "%llu" + const char *flow_str[MAX_FLOW_TYPE] = {"C2S flow", "S2C flow"}; + ss = sdscatprintf(ss, "%-15s: %lu\r\n", "session-id", sess_detail->sess_brief.sid); + ss = sdscatprintf(ss, "%-15s: %d\r\n", "thread", sess_detail->sess_brief.thread_index); + ss = sdscatprintf(ss, "%-15s: %s\r\n", "state", stm_session_state_ntop(sess_detail->sess_brief.state)); + ss = sdscatprintf(ss, "%-15s: %s\r\n", "protocol", (sess_detail->sess_brief.protocol == SESSION_TYPE_TCP) ? "tcp" : "udp"); + strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_detail->sess_brief.create_time_in_sec)); + ss = sdscatprintf(ss, "%-15s: %s\r\n", "create-time", time_buf); + strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_detail->sess_brief.last_pkt_time_in_sec)); + ss = sdscatprintf(ss, "%-15s: %s\r\n", "last-pkt-time", time_buf); + ss = sdscatprintf(ss, "%-15s: %s\r\n", "tuple4", stm_get0_readable_session_addr(&sess_detail->sess_brief.addr, addr_buf, sizeof(addr_buf))); + ss = sdscatprintf(ss, "%-15s: %s\r\n", "symmetric", stm_session_flow_dir_ntop(sess_detail->sess_brief.flow_dir)); + ss = sdscatprintf(ss, "%-15s: %s\r\n", "direction", sess_detail->direction == SESSION_DIRECTION_INBOUND ? "INBOUND" : "OUTBOUND"); + + ss = sdscatprintf(ss, "statistics:\r\n"); + char printf_format[256]; + snprintf(printf_format, sizeof(printf_format), "\t%s : %s\r\n", SHOW_SESSION_DETAIL_NAME_FORMAT, SHOW_SESSION_DETAIL_VALUE_FORMAT); + for (int flow = 0; flow < MAX_FLOW_TYPE; flow++) + { + ss = sdscatprintf(ss, " %s:\r\n", flow_str[flow]); + ss = sdscatprintf(ss, printf_format, "raw_packets_received", sess_detail->sess_stat[flow][STAT_RAW_PACKETS_RECEIVED]); + ss = sdscatprintf(ss, printf_format, "raw_bytes_received", sess_detail->sess_stat[flow][STAT_RAW_BYTES_RECEIVED]); + ss = sdscatprintf(ss, printf_format, "duplicate_packets_bypass", sess_detail->sess_stat[flow][STAT_DUPLICATE_PACKETS_BYPASS]); + ss = sdscatprintf(ss, printf_format, "injected_packets", sess_detail->sess_stat[flow][STAT_INJECTED_PACKETS_SUCCESS]); + ss = sdscatprintf(ss, printf_format, "injected_bytes", sess_detail->sess_stat[flow][STAT_INJECTED_BYTES_SUCCESS]); + if (SESSION_TYPE_TCP == sess_detail->sess_brief.protocol) + { + ss = sdscatprintf(ss, printf_format, "tcp_segments_retransmit", sess_detail->sess_stat[flow][STAT_TCP_SEGMENTS_RETRANSMIT]); + ss = sdscatprintf(ss, printf_format, "tcp_payloads_retransmit", sess_detail->sess_stat[flow][STAT_TCP_PAYLOADS_RETRANSMIT]); + ss = sdscatprintf(ss, printf_format, "tcp_segments_reordered", sess_detail->sess_stat[flow][STAT_TCP_SEGMENTS_REORDERED]); + ss = sdscatprintf(ss, printf_format, "tcp_payloads_reordered", sess_detail->sess_stat[flow][STAT_TCP_PAYLOADS_REORDERED]); + } + } + + // todo: + ss = sdscatprintf(ss, "\r\n \033[31mtodo: add security policy rule id, HTTP URL, Server FQDN, etc... \033[0m"); + return ss; +} + +static struct monitor_reply *show_session_detail_cmd_cb(struct stellar_monitor *stm, int argc, char *argv[], void *arg) +{ + struct stm_show_session_detail_opt detail_opt = {}; + detail_opt.thread_idx = -1; + + if (show_session_is_help(argc, argv)) + { + return show_session_detail_usage(); + } + struct monitor_reply *error_reply = show_session_detail_cli_args_parse(argc, argv, &detail_opt); + if (error_reply != NULL) + { + return error_reply; + } + + struct monitor_reply *cmd_reply; + struct stellar_module_manager *mod_mgr = (struct stellar_module_manager *)arg; + int thread_num = stellar_module_manager_get_max_thread_num(mod_mgr); + struct iovec request; + request.iov_base = (void *)&detail_opt; + request.iov_len = sizeof(struct stm_show_session_detail_opt); + + struct iovec response[thread_num]; + memset(response, 0, sizeof(response)); + size_t tot_sess_id_num = 0; + sds ss = sdsempty(); + + for (int i = 0; i < thread_num; i++) + { + if (detail_opt.thread_idx != -1 && detail_opt.thread_idx != i) + { + continue; + } + response[i] = monitor_worker_thread_rpc(stm, i, request, show_session_detail_on_worker_thread_rpc_cb, mod_mgr); + if (response[i].iov_base == NULL || response[i].iov_len == 0) // empty result + { + continue; + } + struct show_session_detail_result *detail_res = (struct show_session_detail_result *)(response[i].iov_base); + ss = sdscatprintf(ss, "%s\n", session_detail_to_readable(detail_res)); + tot_sess_id_num++; + break; + } + if (tot_sess_id_num == 0) + { + cmd_reply = monitor_reply_new_string("No session found by id %lu", detail_opt.sess_id); + goto empty_result; + } + cmd_reply = monitor_reply_new_string("%s", ss); + +empty_result: + sdsfree(ss); + for (int i = 0; i < thread_num; i++) + { + if (response[i].iov_base) + { + free(response[i].iov_base); + } + } + return cmd_reply; +} + +int show_session_enforcer_init(struct stellar_module_manager *mod_mgr, struct stellar_monitor *stm) +{ + monitor_register_cmd(stm, "show session brief", show_session_brief_cmd_cb, "readonly", + "[sip | dip | sport | dport | protocol | state | cursor | count | limit | create-time-in | last-pkt-time-in ]", + "Show session brief information", (void *)mod_mgr); + monitor_register_cmd(stm, "show session detail", show_session_detail_cmd_cb, "readonly", + "id <id>", + "Show session verbose information", (void *)mod_mgr); + return 0; +} diff --git a/infra/monitor/monitor_cmd_assistant.c b/infra/monitor/monitor_cmd_assistant.c new file mode 100644 index 0000000..a2d02fa --- /dev/null +++ b/infra/monitor/monitor_cmd_assistant.c @@ -0,0 +1,537 @@ +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#ifdef __cplusplus +extern "C" +{ +#endif +#include "cJSON.h" +#include "sds/sds.h" +#include "monitor_private.h" +#include "monitor_cmd_assistant.h" +#include "monitor_utils.h" +#ifdef __cplusplus +} +#endif + +struct stm_cmd_spec +{ + const char *cmd_name; + const char *cmd_flags; + const char *cmd_hint; + int cmd_arity; + int cmd_first_key_offset; +}; + +struct stm_cmd_assistant +{ + cJSON *cjson_root; + stm_cmd_assistant_completion_cb *cmd_completion_cb; + stm_cmd_assistant_hints_cb *cmd_hints_cb; + sds hints_result; // mallo and free for every hits +}; +static __thread struct stm_cmd_assistant *__g_stm_cli_assistant = NULL; + +static cJSON *stm_cli_register_cmd(cJSON *father_next_array, const char *cur_cmd_prefix) +{ + cJSON *cur_level_item = NULL; + + /* search current cmd (name is prefix) in father->next array[] */ + int array_size = cJSON_GetArraySize(father_next_array); + for (int i = 0; i < array_size; i++) + { + cur_level_item = cJSON_GetArrayItem(father_next_array, i); + cJSON *cmd_name_item = cJSON_GetObjectItem(cur_level_item, "prefix"); + if (cmd_name_item != NULL && (0 == strcmp(cur_cmd_prefix, cmd_name_item->valuestring))) + { + break; + } + else + { + cur_level_item = NULL; + } + } + if (NULL == cur_level_item) + { + /* if not exist, create new cmd (name is prefix) */ + cur_level_item = cJSON_CreateObject(); + cJSON_AddItemToObject(cur_level_item, "prefix", cJSON_CreateString(cur_cmd_prefix)); + cJSON *new_cmd_next_array = cJSON_CreateArray(); + cJSON_AddItemToObject(cur_level_item, "next", new_cmd_next_array); + + /* insert into father->next array */ + cJSON_AddItemToArray(father_next_array, cur_level_item); + } + else + { + ; // already exist, do nothing + } + return cur_level_item; +} + +/* + search json object by cli_cmd_line, if exsit , return 0, do nothing, + the search json object function can be used for register_usage(); + if not exist, create new json object and insert into father->next array +*/ +int stm_cmd_assistant_register(struct stm_cmd_assistant *aide, const char *cli_cmd_line) +{ + int argc = 0; + if (NULL == aide->cjson_root) + { + aide->cjson_root = cJSON_CreateArray(); + } + sds *array = sdssplitargs(cli_cmd_line, &argc); + cJSON *father_candidate_array = aide->cjson_root; + cJSON *cur_cmd_obj = NULL; + for (int i = 0; i < argc; i++) + { + cur_cmd_obj = stm_cli_register_cmd(father_candidate_array, array[i]); + father_candidate_array = cJSON_GetObjectItem(cur_cmd_obj, "next"); + } + sdsfreesplitres(array, argc); + return 0; +} + +static cJSON *stm_cli_search_cmd(cJSON *father_next_array, const char *cur_cmd_prefix) +{ + cJSON *cur_level_item = NULL; + /* search current cmd (name is prefix) in father->next array[] */ + int array_size = cJSON_GetArraySize(father_next_array); + for (int i = 0; i < array_size; i++) + { + cur_level_item = cJSON_GetArrayItem(father_next_array, i); + cJSON *cmd_name_item = cJSON_GetObjectItem(cur_level_item, "prefix"); + if (cmd_name_item != NULL && (0 == strcmp(cur_cmd_prefix, cmd_name_item->valuestring))) + { + break; + } + else + { + cur_level_item = NULL; + } + } + return cur_level_item; +} + +cJSON *stm_cmd_assistant_search(struct stm_cmd_assistant *aide, const char *cli_cmd_line) +{ + int argc = 0; + sds *array = sdssplitargs(cli_cmd_line, &argc); + cJSON *father_candidate_array = aide->cjson_root; + cJSON *match_item = NULL; + for (int i = 0; i < argc; i++) + { + match_item = stm_cli_search_cmd(father_candidate_array, array[i]); + if (NULL == match_item) + { + return NULL; + } + father_candidate_array = cJSON_GetObjectItem(match_item, "next"); + } + sdsfreesplitres(array, argc); + return match_item; +} + +int stm_cmd_assistant_register_usage(struct stm_cmd_assistant *aide, const char *cli_cmd_line, const char *usage) +{ + if (NULL == aide || NULL == cli_cmd_line || NULL == usage) + { + return -1; + } + if (strlen(cli_cmd_line) == 0) + { + cJSON *obj = cJSON_CreateObject(); + cJSON_AddItemToObjectCS(obj, "usage", cJSON_CreateString(usage)); + cJSON_AddItemToArray(aide->cjson_root, obj); + return 0; + } + + cJSON *obj = stm_cmd_assistant_search(aide, cli_cmd_line); + if (NULL == obj) + { + stm_cmd_assistant_register(aide, cli_cmd_line); + obj = stm_cmd_assistant_search(aide, cli_cmd_line); + if (NULL == obj) + { + return -1; + } + } + cJSON_AddItemToObject(obj, "usage", cJSON_CreateString(usage)); + return 0; +} + +char *stm_cmd_assistant_verbose_print(int format) +{ + struct stm_cmd_assistant *aide = __g_stm_cli_assistant; + if (NULL == aide->cjson_root) + { + return (char *)strdup("[]"); + } + char *debug_print; + if (format == 1) + { + debug_print = cJSON_Print(aide->cjson_root); + } + else + { + debug_print = cJSON_PrintUnformatted(aide->cjson_root); + } + return debug_print; +} + +char *stm_cmd_assistant_brief_print(void) +{ + struct stm_cmd_assistant *aide = __g_stm_cli_assistant; + sds s = sdsempty(); + s = sdscat(s, "Usage:\r\n"); + int array_size = cJSON_GetArraySize(aide->cjson_root); + for (int sz = 0; sz < array_size; sz++) + { + cJSON *item = cJSON_GetArrayItem(aide->cjson_root, sz); + cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix"); + s = sdscatprintf(s, "\t%s\r\n", cmd_name_item->valuestring); + } + char *print_str = strdup(s); + sdsfree(s); + return print_str; +} + +int stm_cmd_assistant_json_load(struct stm_cmd_assistant *aide, const char *json_str) +{ + if (NULL == aide || NULL == json_str || strlen(json_str) == 0) + { + return -1; + } + cJSON *root = cJSON_Parse(json_str); + if (NULL == root) + { + return -1; + } + aide->cjson_root = root; + return 0; +} + +void stm_cmd_assistant_free(struct stm_cmd_assistant *aide) +{ + if (aide) + { + if (aide->cjson_root) + { + cJSON_Delete(aide->cjson_root); + } + if (aide->hints_result) + { + sdsfree(aide->hints_result); + } + free(aide); + } +} + +struct stm_cmd_assistant *stm_cmd_assistant_new(void) +{ + struct stm_cmd_assistant *aide = calloc(1, sizeof(struct stm_cmd_assistant)); + aide->cjson_root = NULL; + aide->hints_result = NULL; + __g_stm_cli_assistant = aide; + return aide; +} + +struct stm_cmd_assistant *stm_cmd_assistant_get(void) +{ + return __g_stm_cli_assistant; +} + +static int stm_cmd_exist(struct stm_cmd_assistant *aide, const char *cmd_name) +{ + cJSON *root = aide->cjson_root; + if (NULL == root) + { + return 0; + } + int array_size = cJSON_GetArraySize(root); + for (int sz = 0; sz < array_size; sz++) + { + cJSON *item = cJSON_GetArrayItem(root, sz); + cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix"); + if (cmd_name_item && cmd_name_item->valuestring) + { + if (strcasecmp(cmd_name, cmd_name_item->valuestring) == 0) + { + return 1; + } + } + } + return 0; +} + +/* + * return value: + * 0: success + * -1: failed + * 1: already exist + */ +int stm_cmd_assistant_register_cmd(struct stm_cmd_assistant *aide, const char *cmd_name, void *cmd_cb, void *cmd_arg, + const char *flags, const char *hint, const char *desc) +{ + if (NULL == aide || NULL == cmd_name || NULL == cmd_cb) + { + return -1; + } + if (stm_cmd_exist(aide, cmd_name)) + { + return 1; + } + if (stm_strncasecmp_exactly(flags, "readonly", 8) != 0 && stm_strncasecmp_exactly(flags, "write", 5) != 0) + { + return -1; + } + cJSON *obj = cJSON_CreateObject(); + cJSON_AddItemToObject(obj, "prefix", cJSON_CreateString(cmd_name)); + cJSON_AddItemToObject(obj, "flags", cJSON_CreateString(flags)); + cJSON_AddItemToObject(obj, "hints", cJSON_CreateString(hint)); + cJSON_AddItemToObject(obj, "desc", cJSON_CreateString(desc)); + + char tmp_str[32] = {}; + snprintf(tmp_str, sizeof(tmp_str), "%p", cmd_cb); + cJSON_AddItemToObject(obj, "cmd_cb", cJSON_CreateString(tmp_str)); + + snprintf(tmp_str, sizeof(tmp_str), "%p", cmd_arg); + cJSON_AddItemToObject(obj, "cmd_arg", cJSON_CreateString(tmp_str)); + + if (NULL == aide->cjson_root) + { + aide->cjson_root = cJSON_CreateArray(); + } + cJSON_AddItemToArray(aide->cjson_root, obj); + return 0; +} + +char *stm_cmd_assistant_serialize(struct stm_cmd_assistant *aide) +{ + if (NULL == aide) + { + return NULL; + } + char *debug_print = cJSON_Print(aide->cjson_root); + return debug_print; +} + +int stm_cmd_assistant_dserialize(struct stm_cmd_assistant *aide, const char *json) +{ + cJSON *tmp_root = cJSON_Parse(json); + if (NULL == tmp_root) + { + return -1; + } + aide->cjson_root = tmp_root; + return 0; +} + +/* + return value: + 0: equal exactly, uesed for hints, or get_cmd_cb() + -1: cli_array < register_cmd_array //raw command1 and command2 line has same section, and command1 line is shorter than command2, used for tab auto completion + 1: cli_array > register_cmd_array //raw command1 and command2 line has same section, and command1 line is longer than command2, used for get_cmd_cb() + -2: not match any secions +*/ +int stm_cmd_assistant_sds_compare(sds *cli_array, int cli_argc, sds *register_cmd_array, int register_argc) +{ + if (0 == cli_argc || 0 == register_argc) + { + return -2; + } + // compare the first n-1 words, must be exactly match + int min_argc = MIN(cli_argc, register_argc); + for (int i = 0; i < min_argc - 1; i++) + { + // previous words must be exactly match, so use strcasecmp() + if (strcasecmp(cli_array[i], register_cmd_array[i]) != 0) + { + return -2; + } + } + + // compare the last common word use substring match + if (strncasecmp(cli_array[min_argc - 1], register_cmd_array[min_argc - 1], strlen(cli_array[min_argc - 1])) == 0) + { + if (strcasecmp(cli_array[min_argc - 1], register_cmd_array[min_argc - 1]) == 0) + { + if (cli_argc == register_argc) + { + return 0; + } + else if (cli_argc < register_argc) + { + return -1; + } + else + { + return 1; + } + } + else + { + // cli command is not complete + return -1; + } + } + return -2; +} + +int stm_cmd_assistant_set_completion_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_completion_cb *cb) +{ + aide->cmd_completion_cb = cb; + return 0; +} + +int stm_cmd_assistant_set_hints_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_hints_cb *cb) +{ + aide->cmd_hints_cb = cb; + return 0; +} + +static void cjson_traversal_completion(struct stm_cmd_assistant *aide, sds *sds_cli_array, int cli_argc, cJSON *cjson_root, void *arg) +{ + int array_size = cJSON_GetArraySize(cjson_root); + for (int sz = 0; sz < array_size; sz++) + { + cJSON *array_item = cJSON_GetArrayItem(cjson_root, sz); + cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix"); + int register_cmd_section_num; + sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, ®ister_cmd_section_num); + + int match = stm_cmd_assistant_sds_compare(sds_cli_array, cli_argc, register_cmd_array, register_cmd_section_num); + sdsfreesplitres(register_cmd_array, register_cmd_section_num); + if (match == 0 || match == -1) + { + aide->cmd_completion_cb(arg, prefix_item->valuestring); + } + } + return; +} + +static void stm_cmd_assistant_completion_cb_fun(struct stm_cmd_assistant *aide, const char *buf, void *arg) +{ + /* input cmd buf must <= registed command + if buf_len < command_len ,auto completion the longest prefix + if buf_len == command_len, add a blank space + */ + int argc = 0; + sds *cli_cmd_array = sdssplitargs(buf, &argc); + cjson_traversal_completion(aide, cli_cmd_array, argc, aide->cjson_root, arg); + sdsfreesplitres(cli_cmd_array, argc); +} + +void stm_cmd_assistant_input_line(struct stm_cmd_assistant *aide, const char *line, void *arg) +{ + stm_cmd_assistant_completion_cb_fun(aide, line, arg); +} + +static const char *cjson_traversal_hints(sds *sds_cli_array, int cli_argc, cJSON *cjson_root) +{ + int array_size = cJSON_GetArraySize(cjson_root); + for (int sz = 0; sz < array_size; sz++) + { + cJSON *array_item = cJSON_GetArrayItem(cjson_root, sz); + cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix"); + int register_cmd_section_num; + sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, ®ister_cmd_section_num); + + int match = stm_cmd_assistant_sds_compare(sds_cli_array, cli_argc, register_cmd_array, register_cmd_section_num); + sdsfreesplitres(register_cmd_array, register_cmd_section_num); + if (match == 0) // hints must be exactly match + { + cJSON *hint_item = cJSON_GetObjectItem(array_item, "hints"); + if (hint_item) + { + return hint_item->valuestring; + } + } + } + return NULL; +} + +const char *stm_cmd_assistant_input_line_for_hints(struct stm_cmd_assistant *aide, const char *line) +{ + int argc = 0; + sds *cli_cmd_array = sdssplitargs(line, &argc); + const char *hints = cjson_traversal_hints(cli_cmd_array, argc, aide->cjson_root); + sdsfreesplitres(cli_cmd_array, argc); + return hints; +} + +static long long stm_cmd_assistant_get_item(struct stm_cmd_assistant *aide, const char *cmd_line, const char *json_item_name, const char *format) +{ + long long item_value = 0; + int array_size = cJSON_GetArraySize(aide->cjson_root); + int cli_argc; + sds *cli_cmd_array = sdssplitargs(cmd_line, &cli_argc); + int last_match_cmd_section_num = -1; + for (int sz = 0; sz < array_size; sz++) + { + cJSON *array_item = cJSON_GetArrayItem(aide->cjson_root, sz); + cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix"); + int register_cmd_section_num; + sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, ®ister_cmd_section_num); + + int match = stm_cmd_assistant_sds_compare(cli_cmd_array, cli_argc, register_cmd_array, register_cmd_section_num); + sdsfreesplitres(register_cmd_array, register_cmd_section_num); + if (match >= 0) + { + cJSON *hint_item = cJSON_GetObjectItem(array_item, json_item_name); + if (hint_item) + { + /* longest match */ + if (register_cmd_section_num > last_match_cmd_section_num) + { + last_match_cmd_section_num = register_cmd_section_num; + sscanf(hint_item->valuestring, format, &item_value); + } + } + } + } + sdsfreesplitres(cli_cmd_array, cli_argc); + return item_value; +} + +void *stm_cmd_assistant_get_cb(struct stm_cmd_assistant *aide, const char *cmd_line) +{ + return (void *)stm_cmd_assistant_get_item(aide, cmd_line, "cmd_cb", "%p"); +} + +void *stm_cmd_assistant_get_user_arg(struct stm_cmd_assistant *aide, const char *cmd_line) +{ + return (void *)stm_cmd_assistant_get_item(aide, cmd_line, "cmd_arg", "%p"); +} + +int stm_cmd_assistant_get_arity(struct stm_cmd_assistant *aide, const char *cmd_line) +{ + return (int)stm_cmd_assistant_get_item(aide, cmd_line, "arity", "%d"); +} + +sds stm_cmd_assistant_list_cmd_brief(struct stm_cmd_assistant *aide) +{ + sds res = sdsempty(); + int array_size = cJSON_GetArraySize(aide->cjson_root); + for (int sz = 0; sz < array_size; sz++) + { + cJSON *item = cJSON_GetArrayItem(aide->cjson_root, sz); + cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix"); + cJSON *cmd_desc_item = cJSON_GetObjectItem(item, "desc"); + res = sdscatprintf(res, "\"%s\", %s \r\n", cmd_name_item->valuestring, cmd_desc_item ? cmd_desc_item->valuestring : ""); + } + return res; +} + +sds stm_cmd_assistant_list_cmd_verbose(struct stm_cmd_assistant *aide) +{ + if (NULL == aide->cjson_root) + { + return sdsempty(); + } + char *json_str = cJSON_PrintUnformatted(aide->cjson_root); + sds res = sdsnew(json_str); + free(json_str); + return res; +} diff --git a/infra/monitor/monitor_cmd_assistant.h b/infra/monitor/monitor_cmd_assistant.h new file mode 100644 index 0000000..02100a7 --- /dev/null +++ b/infra/monitor/monitor_cmd_assistant.h @@ -0,0 +1,60 @@ +#pragma once +#ifdef __cplusplus +extern "C" +{ +#endif +#include "cJSON.h" +#include "sds/sds.h" + +#ifndef MIN +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#endif + +#ifndef MAX +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#endif + struct stm_cmd_assistant; + struct stm_cmd_spec; + + struct stm_cmd_assistant *stm_cmd_assistant_new(); + void stm_cmd_assistant_free(struct stm_cmd_assistant *aide); + int stm_cmd_assistant_json_load(struct stm_cmd_assistant *aide, const char *json_str); + char *stm_cmd_assistant_brief_print(void); + char *stm_cmd_assistant_verbose_print(int format); + int stm_cmd_assistant_is_help(const char *line); + cJSON *stm_cmd_assistant_search(struct stm_cmd_assistant *aide, const char *cli_cmd_line); + int stm_cmd_assistant_register(struct stm_cmd_assistant *aide, const char *cli_cmd_line); + int stm_cmd_assistant_register_usage(struct stm_cmd_assistant *aide, const char *cli_cmd_line, const char *usage); + + // return value shoule be free after uesd. + char *stm_cmd_assistant_serialize(struct stm_cmd_assistant *aide); + int stm_cmd_assistant_dserialize(struct stm_cmd_assistant *aide, const char *json); + + /* + * return value: + * 0: success + * -1: failed + * 1: already exist + */ + int stm_cmd_assistant_register_cmd(struct stm_cmd_assistant *aide, const char *cmd, void *cmd_cb, void *cmd_arg, + const char *flags, const char *hint, const char *description); + + int stm_cmd_assistant_sds_compare(sds *cli_array, int cli_argc, sds *register_cmd_array, int register_argc); + typedef void(stm_cmd_assistant_completion_cb)(void *arg, const char *candidate_completion); + typedef char *(stm_cmd_assistant_hints_cb)(const char *line); + int stm_cmd_assistant_set_completion_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_completion_cb *cb); + int stm_cmd_assistant_set_hints_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_hints_cb *cb); + void stm_cmd_assistant_input_line(struct stm_cmd_assistant *aide, const char *line, void *arg); + const char *stm_cmd_assistant_input_line_for_hints(struct stm_cmd_assistant *aide, const char *line); + struct stm_cmd_assistant *stm_cmd_assistant_get(void); + + // struct stm_cmd_spec *stm_cmd_assistant_get_cmd(struct stm_cmd_assistant *aide, const char *cmd); + void *stm_cmd_assistant_get_cb(struct stm_cmd_assistant *aide, const char *cmd); + void *stm_cmd_assistant_get_user_arg(struct stm_cmd_assistant *aide, const char *cmd); + int stm_cmd_assistant_get_arity(struct stm_cmd_assistant *aide, const char *cmd); + sds stm_cmd_assistant_list_cmd_brief(struct stm_cmd_assistant *aide); + sds stm_cmd_assistant_list_cmd_verbose(struct stm_cmd_assistant *aide); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/infra/monitor/monitor_private.h b/infra/monitor/monitor_private.h new file mode 100644 index 0000000..2e18836 --- /dev/null +++ b/infra/monitor/monitor_private.h @@ -0,0 +1,331 @@ +#pragma once +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stddef.h> +#include <pthread.h> +#include <netinet/in.h> +#include <pcap/pcap.h> +#ifdef __cplusplus +extern "C" +{ +#endif +#include "stellar/monitor.h" +#include "sds/sds.h" + +#include "stellar/module.h" +#include "stellar/log.h" + +#include <event2/event.h> +#include <event2/listener.h> +#include <event2/bufferevent.h> +#include <event2/buffer.h> +#include <event2/thread.h> +#include <event2/http.h> +#include "monitor_rpc.h" + /********************************** limit definition *****************************************/ + +#ifndef STELLAR_MAX_THREAD_NUM +#define STELLAR_MAX_THREAD_NUM (256) +#endif +#define STM_RINGBUF_SIZE (1024 * 1024) /* per thread */ +#define STM_CONNECTION_IDLE_TIMEOUT 300 /* How many seconds elapsed without input command, connection will closed */ +#define STM_REQUEST_TIMEOUT 5 + +#define STM_SERVER_LISTEN_IP "127.0.0.1" +#define STM_SERVER_LISTEN_PORT 80 +#define STM_TZSP_UDP_PORT 37008 /* default port of TZSP protocol: https://en.wikipedia.org/wiki/TZSP# */ + +#define STM_SESSION_DEFAULT_SEARCH_COUNT 100 /* if no count params, max search session number */ +#define STM_SESSION_DEFAULT_LIMIT_NUM 10 /* if no limit params, max support result session number */ +#define STM_SESSION_MAX_LIMIT_NUM 1000 + +#define STM_UINT64_READABLE_STRING_MAX_LEN 21 /* MAX value is: 18446744073709551615 */ +#define STM_UINT32_READABLE_STRING_MAX_LEN 11 /* MAX value is: 4294967295 */ + +#define STM_CONNECTIVITY_DEFALUT_COUNT 5 /* ping default count */ +#define STM_CONNECTIVITY_DEFALUT_SIZE 64 /* ping default bytes */ +#define STM_CONNECTIVITY_MAX_COUNT 100 /* ping max count */ +#define STM_CONNECTIVITY_MAX_SIZE 65535 /* ping max bytes */ + + /************************************************************************/ +#define STM_CMD_CALLBACK_THREAD_LOCAL_MAGIC (0x1234ABCD) +#define STM_RINGBUF_HDR_MAGIC (0x0ABCD12345678) + +#define STM_RINGBUF_THREAD_IDX_SERVER 0 +#define STM_RINGBUF_THREAD_IDX_AGENT 1 + +#define STM_MONITOR_THREAD_ID 0 // There are only two threads, use fix id +#define STM_WORKER_THREAD_ID 1 // There are only two threads, use fix id + +#define STM_LOG_MODULE_NAME "monitor" +#define STM_STAT_OUTPUT_PATH "log/monitor.fs4" +#define STM_STAT_OUTPUT_INTERVAL_MS 3000 +#define STM_RESTFUL_VERSION "v1" +#define STM_RESTFUL_RESOURCE "stellar_monitor" +#define STM_RESTFUL_URI_CMD_KEY "raw_cmd" // example: http://127.0.0.1:80/v1/stellar_monitor?raw_cmd=show%20session +#define STM_CLIENT_SERVER_SYNC_CMD "show command verbose" + +#define STM_CLI_CMD_HINTS_COLOR 90 +#define STM_CLI_CMD_HINTS_BOLD 0 + +#ifndef UNUSED +#define UNUSED __attribute__((unused)) +#endif + +#ifdef NDEBUG // release version +#define STM_DBG_PRINT(fmt, args...) +#else +#define STM_DBG_PRINT(fmt, args...) fprintf(stderr, fmt, ##args) +#endif + +#ifndef CALLOC +#define CALLOC(type, number) ((type *)calloc(sizeof(type), number)) +#endif + +#ifndef FREE +#define FREE(ptr) \ + { \ + if (ptr) \ + { \ + free((void *)ptr); \ + ptr = NULL; \ + } \ + } +#endif + +#define STM_TIME_START() \ + struct timespec __start_time, __end_time; \ + unsigned long long diff; \ + clock_gettime(CLOCK_MONOTONIC, &__start_time); + +#define STM_TIME_DIFF() \ + { \ + clock_gettime(CLOCK_MONOTONIC, &__end_time); \ + if (__start_time.tv_sec == __end_time.tv_sec) \ + { \ + diff = (unsigned long long)(__end_time.tv_nsec - __start_time.tv_nsec); \ + } \ + diff = ((unsigned long long)__end_time.tv_sec * 1000 * 1000 * 1000 + __end_time.tv_nsec) - ((unsigned long long)__start_time.tv_sec * 1000 * 1000 * 1000 + __start_time.tv_nsec); \ + } + +#ifndef MIN +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#endif + +#ifndef MAX +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#endif + +#ifndef TRUE +#define TRUE 1 +#endif + +#ifndef FALSE +#define FALSE 0 +#endif + +#define STM_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__) +#define STM_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__) +#define STM_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__) +#define STM_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__) + + enum stm_http_response_code + { + STM_HTTP_200_OK = 200, + STM_HTTP_204_NO_CONTENT = 204, + STM_HTTP_403_FORBIDDEN = 403, + STM_HTTP_408_REQUEST_TIMEOUT = 408, + STM_HTTP_413_PAYLOAD_TOO_LARGE = 413, + }; + + enum stm_stat_type + { + STM_STAT_CLI_CONNECTION_NEW, + STM_STAT_CLI_CONNECTION_CLOSE, + STM_STAT_CLI_REQUEST_SUCC, + STM_STAT_CLI_RESPONSE_SUCC, + STM_STAT_CLI_REQUEST_ERR, // RESTFul Syntax error! + STM_STAT_CLI_RESPONSE_ERR, // attention: empty result is not error! + STM_STAT_MAX, + }; + + struct stm_spinlock; + + struct stellar_monitor_config + { + // int thread_count; + size_t ringbuf_size; /* bytes */ + int connection_idle_timeout; + int cli_request_timeout; + char *listen_ipaddr; + unsigned short listen_port_host_order; + unsigned short data_link_bind_port_host_order; // for TZSP protocol + int output_interval_ms; + char *output_path; + }; + + struct stm_key_value_tuple + { + char *key; + char *value; + }; + + struct stm_key_value + { + int tuple_num; + struct stm_key_value_tuple *tuple; + }; + + typedef struct evhttp_request stm_network_connection; + + struct stm_cmd_transaction + { + struct stm_cmd_request *cmd_req; + struct stm_cmd_reply *cmd_res[STELLAR_MAX_THREAD_NUM]; // multi thread merge to one + }; + + struct stm_connection_manager + { + struct timeval link_start_time; + struct timeval last_active_time; + struct evhttp_connection *conn; + char peer_ipaddr[INET6_ADDRSTRLEN]; + uint16_t peer_port_host_order; + struct stm_connection_manager *next, *prev; + }; + + struct stm_stat_counter + { + int counter_id; + uint64_t count; + uint64_t bytes; + }; + + struct stm_stat + { + void *fs4_ins; + struct stm_stat_counter counters[STM_STAT_MAX]; + }; + + struct monitor_connection + { + struct evhttp_connection *current_evconn_ref; + }; + + /* optional API */ + struct monitor_connection; + typedef void(monitor_connection_close_cb)(struct monitor_connection *conn, void *arg); + int monitor_register_connection_close_cb(struct stellar_monitor *monitor, monitor_connection_close_cb *cb, void *arg); + struct stm_conn_close_cb_manager + { + monitor_connection_close_cb *cb; + void *arg; + struct stm_conn_close_cb_manager *next, *prev; + }; + + struct stm_pktdump_runtime; + struct stellar_monitor + { + struct module_manager *mod_mgr_ref; + struct logger *logger_ref; + int worker_thread_num; + struct stellar_monitor_config *config; + struct stm_cmd_assistant *aide; // reference, share with stellar + struct stm_connection_manager *connection_mgr; // used to tracking all connections, for cli "who" command + struct stm_conn_close_cb_manager *conn_close_mgr; + // struct stm_ringbuf_mgr *ringbuf_mgr[STELLAR_MAX_THREAD_NUM]; + struct event_base *evt_base; + // struct event *ev_timeout; + struct evhttp *evt_http_server; + pthread_t evt_main_loop_tid; + struct timeval time_now; + struct stm_stat *stat; + struct stm_spinlock *lock; // for dynamic register command, conn_close_cb + int (*gettime_cb)(struct timeval *tv, struct timezone *tz); + struct monitor_connection current_conn; + struct stm_pktdump_runtime *packet_dump; + + struct monitor_rpc **rpc_ins_array; // multir threads + }; + + enum monitor_reply_type + { + MONITOR_REPLY_INTEGER, + MONITOR_REPLY_DOUBLE, + MONITOR_REPLY_STRING, + MONITOR_REPLY_ERROR, + MONITOR_REPLY_STATUS, + MONITOR_REPLY_NIL, + }; + + struct monitor_reply + { + enum monitor_reply_type type; + long long integer; /* The integer when type is SWARMKV_REPLY_INTEGER */ + double dval; /* The double when type is SWARMKV_REPLY_DOUBLE */ + int len; /* Length of string */ + char *str; + int http_code; + const char *http_reason; + }; + + struct monitor_cli_args + { + const char *short_opt; + const char *long_opt; + int require_arg_value; + int value_is_multi_words; // "a b c d e f g" + char *value; // should be free after use + }; + /************************************************************************************************************/ + /* monitor call gettimeofday(2) by default */ + struct stellar_monitor_config *stellar_monitor_config_new(const char *toml); + int stellar_monitor_set_gettime_callback(struct stellar_monitor *stm, int (*gettime_cb)(struct timeval *tv, struct timezone *tz)); + struct stellar_monitor *stellar_monitor_get(void); + struct stm_connection_manager *stm_connection_insert(struct evhttp_connection *evconn); + void stm_connection_update(struct stm_connection_manager *conn_mgr, const struct evhttp_connection *evconn); + void stm_connection_delete(struct evhttp_connection *evconn); + const struct stm_connection_manager *stm_connection_search(const struct stm_connection_manager *conn_mgr_head, const struct evhttp_connection *evconn); + + struct stm_key_value *stm_cmd_key_value_new(void); + void stm_cmd_key_value_append(struct stm_key_value **kv, const char *key, const char *value); + void stm_cmd_key_value_free(struct stm_key_value *kv); + + /************************************** command manager **********************************************/ + struct stm_stat *stm_stat_init(struct stellar_monitor *stm); + sds stm_config_print(const struct stellar_monitor_config *config); + void stm_stat_free(struct stm_stat *stat); + void stm_stat_update(struct stm_stat *stat, int thread_idx, enum stm_stat_type type, long long value); + long long stm_get_stat_count(struct stm_stat *stat, enum stm_stat_type type); + long long stm_get_stat_bytes(struct stm_stat *stat, enum stm_stat_type type); + sds monitor_reply_to_string(const struct monitor_reply *reply); + void monitor_reply_free(struct monitor_reply *reply); + int monitor_util_parse_cmd_args(int argc, const char *argv[], struct monitor_cli_args cli_args[], size_t cli_args_array_size); + char *stm_http_url_encode(const char *originalText); + struct stm_spinlock *stm_spinlock_new(void); + void stm_spinlock_lock(struct stm_spinlock *splock); + void stm_spinlock_unlock(struct stm_spinlock *splock); + void stm_spinlock_free(struct stm_spinlock *splock); + struct stm_pktdump_runtime *stm_packet_dump_new(struct stellar_monitor *stm, const struct stellar_monitor_config *config); + void stm_pktdump_enforcer_free(struct stellar_monitor *stm); + + struct monitor_rpc *stm_rpc_new(void); + void stm_rpc_free(struct monitor_rpc *rpc_ins); + int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins); + struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args); + void monitor_rpc_free(struct monitor_rpc *rpc_ins); + struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr); + struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh); + + /* Must be called in 'monitor_cmd_cb' context */ + struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor); + /* Get the remote address and port associated with this connection. */ + int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, unsigned short *peer_port); + + /* command enforcer */ + int show_session_enforcer_init(struct module_manager *mod_mgr, struct stellar_monitor *stm); +#ifdef __cplusplus +} +#endif diff --git a/infra/monitor/monitor_ringbuf.c b/infra/monitor/monitor_ringbuf.c new file mode 100644 index 0000000..63270ba --- /dev/null +++ b/infra/monitor/monitor_ringbuf.c @@ -0,0 +1,137 @@ +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stddef.h> +#include "monitor_private.h" +#include "monitor_ringbuf.h" + +struct monitor_ringbuf_wrap +{ + ringbuf_t *ringbuf; + char *ringbuf_data; + unsigned long long push_number; // only for statistics + unsigned long long push_bytes; // only for statistics + unsigned long long pop_number; // only for statistics + unsigned long long pop_bytes; // only for statistics +}; + +ssize_t stm_ringbuf_stream_start(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t require_size) +{ + ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id); + ssize_t offset = ringbuf_acquire(rbf->ringbuf, rb_worker, require_size); + if (offset < 0) + { + STM_DBG_PRINT("stm ringbuf stream prealloc buffer(): ringbuf_acquire fail, no valid space!\n"); + return 0; + } + return offset; +} + +int stm_ringbuf_stream_append(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t rbf_offset, const void *value, size_t len) +{ + (void)thread_id; + memcpy(rbf->ringbuf_data + rbf_offset, value, len); + rbf->push_number++; + rbf->push_bytes += len; + return 0; +} + +void stm_ringbuf_stream_finish(int thread_id, struct monitor_ringbuf_wrap *rbf) +{ + ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id); + ringbuf_produce(rbf->ringbuf, rb_worker); +} + +int stm_ringbuf_easy_push(int thread_id, struct monitor_ringbuf_wrap *rbf, const void *push_value /*must continuous*/, size_t push_len) +{ + ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id); + ssize_t offset = ringbuf_acquire(rbf->ringbuf, rb_worker, push_len); + if (offset < 0) + { + STM_DBG_PRINT("stm ringbuf easy push(): ringbuf_acquire fail, no valid space!\n"); + return -1; + } + memcpy(rbf->ringbuf_data + offset, push_value, push_len); + + ringbuf_produce(rbf->ringbuf, rb_worker); + rbf->push_number++; + rbf->push_bytes += push_len; + // STM_DBG_PRINT("stm ringbuf push() success, len:%llu, number:%llu\n", push_len, rbf->push_number); + return 0; +} + +void *stm_ringbuf_pop(struct monitor_ringbuf_wrap *rbf, size_t *pop_len) +{ + size_t len = 0, offset = 0; + len = ringbuf_consume(rbf->ringbuf, &offset); + if (0 == len) + { + // STM_DBG_PRINT("stm_ringbuf_pop(): not valid data\n"); + *pop_len = 0; + return NULL; + } + rbf->pop_number++; + *pop_len = len; + // STM_DBG_PRINT("stm_ringbuf_pop() success, len:%llu, number:%llu\n", len, rbf->pop_number); + return rbf->ringbuf_data + offset; +} + +void stm_ringbuf_release(struct monitor_ringbuf_wrap *rbf, int rel_len) +{ + ringbuf_release(rbf->ringbuf, rel_len); + rbf->pop_bytes += rel_len; +} + +struct monitor_ringbuf_wrap *stm_ringbuf_wrap_new(int thread_tot_num, size_t ringbuf_size) +{ + struct monitor_ringbuf_wrap *rbf = (struct monitor_ringbuf_wrap *)calloc(1, sizeof(struct monitor_ringbuf_wrap)); + size_t ringbuf_obj_size; + ringbuf_get_sizes(thread_tot_num, &ringbuf_obj_size, NULL); + rbf->ringbuf = (ringbuf_t *)calloc(1, ringbuf_obj_size); + rbf->ringbuf_data = (char *)calloc(1, ringbuf_size); + ringbuf_setup(rbf->ringbuf, thread_tot_num, ringbuf_size); + return rbf; +} + +void stm_ringbuf_wrap_free(struct monitor_ringbuf_wrap *rbf) +{ + if (NULL == rbf) + { + return; + } + if (rbf->ringbuf) + { + free(rbf->ringbuf); + } + if (rbf->ringbuf_data) + { + free(rbf->ringbuf_data); + } + free(rbf); +} + +void stm_ringbuf_get_statistics(const struct monitor_ringbuf_wrap *rbf, unsigned long long *push_number, unsigned long long *push_bytes, unsigned long long *pop_number, unsigned long long *pop_bytes) +{ + if (NULL == rbf) + { + return; + } + if (push_number) + { + *push_number = rbf->push_number; + } + if (push_bytes) + { + *push_bytes = rbf->push_bytes; + } + if (pop_number) + { + *pop_number = rbf->pop_number; + } + if (pop_bytes) + { + *pop_bytes = rbf->pop_bytes; + } +}
\ No newline at end of file diff --git a/infra/monitor/monitor_ringbuf.h b/infra/monitor/monitor_ringbuf.h new file mode 100644 index 0000000..294bac5 --- /dev/null +++ b/infra/monitor/monitor_ringbuf.h @@ -0,0 +1,30 @@ +#pragma once +#include <stddef.h> + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "ringbuf/ringbuf.h" + + /* + Message format: + || header | payload ....(variable-length) || header | payload ....(variable-length) || .... + */ + + struct monitor_ringbuf_wrap; + struct monitor_ringbuf_wrap *stm_ringbuf_wrap_new(int thread_tot_num, size_t ringbuf_size); + int stm_ringbuf_easy_push(int thread_id, struct monitor_ringbuf_wrap *rbf, const void *value, size_t len); + void stm_ringbuf_release(struct monitor_ringbuf_wrap *rbf, int rel_len); + void *stm_ringbuf_pop(struct monitor_ringbuf_wrap *rbf, size_t *pop_len); + void stm_ringbuf_wrap_free(struct monitor_ringbuf_wrap *rbf); + ssize_t stm_ringbuf_stream_start(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t require_size); + int stm_ringbuf_stream_append(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t rbf_offset, const void *value, size_t len); + void stm_ringbuf_stream_finish(int thread_id, struct monitor_ringbuf_wrap *rbf); + void stm_ringbuf_get_statistics(const struct monitor_ringbuf_wrap *rbf, unsigned long long *push_number, + unsigned long long *push_bytes, unsigned long long *pop_number, unsigned long long *pop_bytes); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/infra/monitor/monitor_rpc.c b/infra/monitor/monitor_rpc.c new file mode 100644 index 0000000..8e2af67 --- /dev/null +++ b/infra/monitor/monitor_rpc.c @@ -0,0 +1,115 @@ +#include <stddef.h> +#include "stellar/monitor.h" +#include "monitor_private.h" +#include "monitor_rpc.h" +#include "stellar/module.h" + +#define RPC_WORKER_THREAD_BUSY 1 +#define RPC_WORKER_THREAD_IDLE 0 + +struct monitor_rpc_msg_hdr +{ + unsigned int type; + unsigned int length; // total messaage length, include this header, = payload length + sizeof(struct monitor_rpc_msg_hdr) + char value[0]; // variable-length, continuous +} __attribute__((packed)); + +enum monitor_rpc_ringbuf_dir +{ // full duplex, dir: 0: worker thread to monitor thread; 1: monitor thread to worker thread + RPC_RINBUG_DIR_W2M = 0, + RPC_RINBUG_DIR_M2W = 1, + RPC_RINBUG_DIR_MAX = 2, +}; + +struct monitor_rpc +{ + volatile long atomic_val; + + monitor_rpc_callabck *rpc_cb; + void *rpc_args; + struct iovec rpc_request; + struct iovec rpc_response; +}; + +struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args) +{ + while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY) + { + // wait for the last rpc response, not support concurrent rpc yet! + usleep(1000); + } + rpc_ins->rpc_cb = cb; + rpc_ins->rpc_args = user_args; + rpc_ins->rpc_request = rpc_request; + __sync_fetch_and_or(&rpc_ins->atomic_val, 1); + + while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY) + { + // wait for the rpc response... + usleep(1000); + } + return rpc_ins->rpc_response; +} + +int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins) +{ + if (0 == __sync_or_and_fetch(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE)) + { + return 0; + } + rpc_ins->rpc_response = rpc_ins->rpc_cb(thread_idx, rpc_ins->rpc_request, rpc_ins->rpc_args); + __sync_fetch_and_and(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE); + return 1; +} + +/* + * Communicate between different threads by ringbuf. + */ +struct iovec monitor_worker_thread_rpc(struct stellar_monitor *stm, int worker_thread_idx, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args) +{ + int worker_thread_num = module_manager_get_max_thread_num(stm->mod_mgr_ref); + if (worker_thread_idx >= worker_thread_num) + { + struct iovec response = {0}; + return response; + } + struct monitor_rpc *rpc_ins = stm->rpc_ins_array[worker_thread_idx]; + return stm_rpc_call(rpc_ins, rpc_request, cb, user_args); +} + +__thread long long rpc_idle_num = 0; + +void module_rpc_worker_thread_polling_cb(struct module_manager *mod_mgr, void *polling_arg) +{ + struct stellar_monitor *stm = (struct stellar_monitor *)polling_arg; + int thread_idx = module_manager_get_thread_id(mod_mgr); + struct monitor_rpc *rpc_ins = stm->rpc_ins_array[thread_idx]; + + stm_rpc_exec(thread_idx, rpc_ins); +} + +struct monitor_rpc *stm_rpc_new(void) +{ + struct monitor_rpc *rpc_ins = (struct monitor_rpc *)calloc(1, sizeof(struct monitor_rpc)); + return rpc_ins; +} + +void stm_rpc_free(struct monitor_rpc *rpc_ins) +{ + if (NULL == rpc_ins) + { + return; + } + free(rpc_ins); +} + +struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr) +{ + module_manager_polling_subscribe(mod_mgr, module_rpc_worker_thread_polling_cb, (void *)stm); + return stm_rpc_new(); +} + +void monitor_rpc_free(struct monitor_rpc *rpc_ins) +{ + stm_rpc_free(rpc_ins); +}
\ No newline at end of file diff --git a/infra/monitor/monitor_rpc.h b/infra/monitor/monitor_rpc.h new file mode 100644 index 0000000..9945c5c --- /dev/null +++ b/infra/monitor/monitor_rpc.h @@ -0,0 +1,10 @@ +#pragma once +#include <stdint.h> +#include <stddef.h> +#include "stellar/monitor.h" +#include <bits/types/struct_iovec.h> + +struct monitor_rpc; + +typedef struct iovec(monitor_rpc_callabck)(int worker_thread_idx, struct iovec user_data, void *user_args); +struct iovec monitor_worker_thread_rpc(struct stellar_monitor *monitor, int worker_thread_idx, struct iovec user_data, monitor_rpc_callabck *cb, void *user_args); diff --git a/infra/monitor/monitor_server.c b/infra/monitor/monitor_server.c new file mode 100644 index 0000000..35bcea3 --- /dev/null +++ b/infra/monitor/monitor_server.c @@ -0,0 +1,591 @@ +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stddef.h> +#include <pthread.h> +#include <assert.h> +#include <evhttp.h> +#include <event2/event.h> +#include <event2/keyvalq_struct.h> +#include <sys/queue.h> +#include "stellar/log.h" +#include "monitor_private.h" +#include "monitor_cmd_assistant.h" +#include "monitor/monitor_utils.h" +#include "toml/toml.h" +#include "sds/sds.h" +#include "uthash/utlist.h" + +static __thread struct stellar_monitor *__thread_local_stm; +static __thread pthread_t __thread_local_tid; +static __thread pthread_t __stm_libevevt_callback_thread_local_tid; +static __thread struct logger *__stm_thread_local_logger = NULL; + +static void stm_save_thread_local_context(struct stellar_monitor *stm) +{ + __thread_local_stm = stm; + __thread_local_tid = pthread_self(); + __stm_thread_local_logger = stm->logger_ref; +} + +static void stm_connection_close_notify(struct stellar_monitor *stm, UNUSED struct evhttp_connection *evconn) +{ + struct stm_conn_close_cb_manager *ele, *tmp; + DL_FOREACH_SAFE(stm->conn_close_mgr, ele, tmp) + { + ele->cb(&stm->current_conn, ele->arg); + } +} + +static void on_connection_close_cb(UNUSED struct evhttp_connection *ev_conn, UNUSED void *arg) +{ + __stm_libevevt_callback_thread_local_tid = pthread_self(); + struct stellar_monitor *stm = stellar_monitor_get(); + stm->current_conn.current_evconn_ref = ev_conn; + stm_spinlock_lock(stm->lock); + stm_connection_delete(ev_conn); + stm_connection_close_notify(stm, ev_conn); + stm_spinlock_unlock(stm->lock); + char *peer_ip_addr; + uint16_t peer_port; + evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port); + STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "cli connection closed, client %s:%u\n", peer_ip_addr, peer_port); + stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_CLOSE, 1); + stm->current_conn.current_evconn_ref = NULL; +} + +static void stm_command_send_reply_by_cstr(struct evhttp_request *request, int http_status_code, const char *reply, UNUSED void *params) +{ + struct evbuffer *buffer = evbuffer_new(); + evbuffer_add(buffer, reply, strlen(reply)); + evhttp_send_reply(request, http_status_code, "OK", buffer); + evbuffer_free(buffer); +} + +static void stm_command_send_reply(struct evhttp_request *request, struct monitor_reply *reply) +{ + struct evbuffer *buffer = evbuffer_new(); + sds reply_str = monitor_reply_to_string(reply); + evbuffer_add(buffer, reply_str, sdslen(reply_str)); + evhttp_send_reply(request, reply->http_code, reply->http_reason, buffer); + evbuffer_free(buffer); + sdsfree(reply_str); + monitor_reply_free(reply); +} + +static void stm_command_notfound(struct evhttp_request *request, UNUSED void *arg) +{ + struct stellar_monitor *stm = stellar_monitor_get(); + const char *req_str_uri = evhttp_request_get_uri(request); + struct evkeyvalq headers = {}; + evhttp_parse_query(req_str_uri, &headers); + const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY); + + stm_command_send_reply(request, monitor_reply_new_error(error_format_unknown_command, raw_cmd_content)); + STELLAR_LOG_ERROR(stm->logger_ref, STM_LOG_MODULE_NAME, "invlid http uri: %s\r\n", evhttp_request_get_uri(request)); + evhttp_clear_headers(&headers); +} + +static void stm_exec_command(struct stellar_monitor *stm, struct evhttp_request *request, const char *cmd_line) +{ + stm_spinlock_lock(stm->lock); + monitor_cmd_cb *cmd_cb = stm_cmd_assistant_get_cb(stm->aide, cmd_line); + if (NULL == cmd_cb) + { + stm_command_notfound(request, NULL); + stm_spinlock_unlock(stm->lock); + return; + } + void *cmd_user_arg = stm_cmd_assistant_get_user_arg(stm->aide, cmd_line); + int argc; + sds *cmd_argv = sdssplitargs(cmd_line, &argc); + struct monitor_reply *reply = cmd_cb(stm, argc, cmd_argv, cmd_user_arg); + + stm_command_send_reply(request, reply); + sdsfreesplitres(cmd_argv, argc); + stm_spinlock_unlock(stm->lock); +} + +static void stm_new_request_cb(struct evhttp_request *request, UNUSED void *privParams) +{ + __stm_libevevt_callback_thread_local_tid = pthread_self(); + struct stellar_monitor *stm = stellar_monitor_get(); + struct evhttp_connection *ev_conn = evhttp_request_get_connection(request); + stm->current_conn.current_evconn_ref = ev_conn; + stm_spinlock_lock(stm->lock); + stm_connection_insert(ev_conn); + stm_spinlock_unlock(stm->lock); + evhttp_connection_set_closecb(ev_conn, on_connection_close_cb, request); + // evhttp_request_set_error_cb(request, on_request_error_cb); + + const char *req_str_uri = evhttp_request_get_uri(request); + char *peer_ip_addr; + uint16_t peer_port; + evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port); + + STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "new cli request, client:%s:%u, uri: %s\n", peer_ip_addr, peer_port, req_str_uri); + + struct evkeyvalq headers = {}; + evhttp_parse_query(req_str_uri, &headers); + const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY); + if (NULL == raw_cmd_content) + { + stm_command_send_reply_by_cstr(request, HTTP_BADREQUEST, "http uri syntax error\r\n", NULL); + evhttp_clear_headers(&headers); + return; + } + stm_exec_command(stm, request, raw_cmd_content); + evhttp_clear_headers(&headers); +} + +static int stm_event_http_init(struct stellar_monitor *stm) +{ + // Create a new event handler + stm->evt_base = event_base_new(); + // Create a http server using that handler + stm->evt_http_server = evhttp_new(stm->evt_base); + // Limit serving GET requests + evhttp_set_allowed_methods(stm->evt_http_server, EVHTTP_REQ_GET); + + char restful_path[256] = {0}; /* must start with '/' */ + snprintf(restful_path, sizeof(restful_path), "/%s/%s", STM_RESTFUL_VERSION, STM_RESTFUL_RESOURCE); + evhttp_set_cb(stm->evt_http_server, restful_path, stm_new_request_cb, stm->evt_base); + + // Set the callback for anything not recognized + evhttp_set_gencb(stm->evt_http_server, stm_command_notfound, NULL); + if (evhttp_bind_socket(stm->evt_http_server, stm->config->listen_ipaddr, stm->config->listen_port_host_order) != 0) + { + STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "Could not bind to %s:%u\r\n", stm->config->listen_ipaddr, stm->config->listen_port_host_order); + return -1; + } + evhttp_set_timeout(stm->evt_http_server, stm->config->connection_idle_timeout); + STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "accept http uri path: %s\r\n", restful_path); + return 0; +} + +static void *stm_event_main_loop(void *arg) +{ + struct stellar_monitor *stm = (struct stellar_monitor *)arg; + stm_save_thread_local_context(stm); + event_base_dispatch(stm->evt_base); + return NULL; +} + +static void stm_event_http_free(struct stellar_monitor *stm) +{ + event_base_loopbreak(stm->evt_base); + pthread_cancel(stm->evt_main_loop_tid); + pthread_join(stm->evt_main_loop_tid, NULL); + evhttp_free(stm->evt_http_server); + // event_free(stm->ev_timeout); + event_base_free(stm->evt_base); +} + +static void stm_server_set_default_cfg(struct stellar_monitor_config *config) +{ + config->ringbuf_size = STM_RINGBUF_SIZE; + config->connection_idle_timeout = STM_CONNECTION_IDLE_TIMEOUT; + config->cli_request_timeout = STM_REQUEST_TIMEOUT; + config->listen_ipaddr = "0.0.0.0"; + config->listen_port_host_order = STM_SERVER_LISTEN_PORT; + config->data_link_bind_port_host_order = STM_TZSP_UDP_PORT; + config->output_interval_ms = STM_STAT_OUTPUT_INTERVAL_MS; +} + +int stellar_monitor_set_gettime_callback(struct stellar_monitor *stm, int (*gettime_cb)(struct timeval *tv, struct timezone *tz)) +{ + if (NULL == gettime_cb) + { + return -1; + } + stm->gettime_cb = gettime_cb; + return 0; +} + +struct stellar_monitor_config *stellar_monitor_config_new(const char *toml_file) +{ + struct stellar_monitor_config *config = CALLOC(struct stellar_monitor_config, 1); + stm_server_set_default_cfg(config); + int64_t int64_val = 0; + char errbuf[256]; + FILE *fp = NULL; + toml_table_t *root = NULL; + toml_table_t *table = NULL; + toml_raw_t ptr = NULL; + + fp = fopen(toml_file, "r"); + if (fp == NULL) + { + fprintf(stderr, "config file %s open failed, %s", toml_file, strerror(errno)); + goto fail_exit; + } + + root = toml_parse_file(fp, errbuf, sizeof(errbuf)); + if (root == NULL) + { + fprintf(stderr, "config file %s parse failed, %s", toml_file, errbuf); + goto fail_exit; + } + + table = toml_table_in(root, "monitor"); + if (table == NULL) + { + fprintf(stderr, "config file %s missing [monitor]", toml_file); + goto fail_exit; + } + + /* listen_port */ + ptr = toml_raw_in(table, "listen_port"); + if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) + { + if (int64_val < 1 || int64_val > 65535) + { + fprintf(stderr, "invalid monitor.listen_port %ld\n", int64_val); + FREE(config); + goto fail_exit; + } + config->listen_port_host_order = (uint16_t)int64_val; + } + + /* data link bind port */ + ptr = toml_raw_in(table, "data_link_bind_port"); + if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) + { + if (int64_val < 1 || int64_val > 65535) + { + fprintf(stderr, "invalid monitor.data_link_bind_port %ld\n", int64_val); + FREE(config); + goto fail_exit; + } + config->data_link_bind_port_host_order = (uint16_t)int64_val; + } + + /* connection_idle_timeout */ + ptr = toml_raw_in(table, "connection_idle_timeout"); + if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) + { + if (int64_val < 1 || int64_val > 3600) + { + fprintf(stderr, "invalid monitor.connection_idle_timeout %ld, should be [1, 3600]\n", int64_val); + FREE(config); + goto fail_exit; + } + config->connection_idle_timeout = (int)int64_val; + } + + /* cli_request_timeout */ + ptr = toml_raw_in(table, "cli_request_timeout"); + if (ptr != NULL || toml_rtoi(ptr, &int64_val) == 0) + { + if (int64_val < 1 || int64_val > 360) + { + fprintf(stderr, "invalid monitor.cli_request_timeout %ld, , should be [1, 360]\n", int64_val); + FREE(config); + goto fail_exit; + } + config->cli_request_timeout = (int)int64_val; + } + + /* stat */ + ptr = toml_raw_in(table, "stat_output_path"); + if (ptr == NULL || toml_rtos(ptr, &config->output_path) != 0) + { + config->output_path = strdup(STM_STAT_OUTPUT_PATH); + } + + ptr = toml_raw_in(table, "stat_output_interval_ms"); + if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) + { + if (int64_val < 1000 || int64_val > 1000 * 60) + { + fprintf(stderr, "invalid monitor.stat_output_interval_ms %ld, , should be [1, 600000]\n", int64_val); + FREE(config); + goto fail_exit; + } + config->output_interval_ms = (int)int64_val; + } + +fail_exit: + if (root) + { + toml_free(root); + } + if (fp) + { + fclose(fp); + } + return config; +} + +struct stellar_monitor *stellar_monitor_get(void) +{ + if (pthread_self() != __thread_local_tid) + { + assert(0); + // fprintf(stderr, "ERR stellar_monitor_get() failed, caller must in same thread context!\n"); + return NULL; + } + return __thread_local_stm; +} + +// support dynamic register command, independent of the order of initialization +int monitor_register_cmd(struct stellar_monitor *stm, const char *cmd, monitor_cmd_cb *cb, const char *flags, + const char *hint, const char *desc, void *arg) +{ + stm_spinlock_lock(stm->lock); + int ret = stm_cmd_assistant_register_cmd(stm->aide, cmd, cb, arg, flags, hint, desc); + stm_spinlock_unlock(stm->lock); + return ret; +} + +int monitor_register_connection_close_cb(struct stellar_monitor *stm, monitor_connection_close_cb *cb, void *arg) +{ + stm_spinlock_lock(stm->lock); + struct stm_conn_close_cb_manager *ele = CALLOC(struct stm_conn_close_cb_manager, 1); + ele->cb = cb; + ele->arg = arg; + DL_APPEND(stm->conn_close_mgr, ele); + stm_spinlock_unlock(stm->lock); + return 0; +} + +static struct monitor_reply *monitor_cmd_show_brief_cb(struct stellar_monitor *stm) +{ + sds cmd_brief = sdsempty(); + cmd_brief = sdscatfmt(cmd_brief, "%s, %s\r\n", "\"command\"", "description"); + cmd_brief = sdscatfmt(cmd_brief, "-----------------------------\r\n"); + sds cmd_brief_cont = stm_cmd_assistant_list_cmd_brief(stm->aide); + cmd_brief = sdscatsds(cmd_brief, cmd_brief_cont); + struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_brief); + sdsfree(cmd_brief); + sdsfree(cmd_brief_cont); + return reply; +} + +static struct monitor_reply *monitor_cmd_show_verbose_cb(struct stellar_monitor *stm) +{ + sds cmd_verbose = stm_cmd_assistant_list_cmd_verbose(stm->aide); + struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_verbose); + sdsfree(cmd_verbose); + return reply; +} + +static struct monitor_reply *monitor_server_builtin_show_command_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg) +{ + if (argc != 3) + { + return monitor_reply_new_error(error_format_wrong_number_of_args, "show command"); + } + if (stm_strncasecmp_exactly(argv[2], "brief", 5) == 0) + { + return monitor_cmd_show_brief_cb((struct stellar_monitor *)arg); + } + else if (stm_strncasecmp_exactly(argv[2], "verbose", 7) == 0) + { + return monitor_cmd_show_verbose_cb((struct stellar_monitor *)arg); + } + return monitor_reply_new_error(error_format_unknown_arg, argv[2]); +} + +static struct monitor_reply *monitor_server_builtin_ping_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg) +{ + if (argc == 1) + { + return monitor_reply_new_string("pong"); + } + else if (argc == 2) + { + return monitor_reply_new_string("%s", argv[1]); + } + return monitor_reply_new_error(error_format_wrong_number_of_args, "ping"); +} + +static struct monitor_reply *monitor_server_builtin_who_cb(struct stellar_monitor *stm, int argc UNUSED, char *argv[] UNUSED, UNUSED void *arg) +{ + struct stm_connection_manager *conn_mgr = stm->connection_mgr; + struct stm_connection_manager *ele, *tmp; + sds who = sdsempty(); + char timestr[64]; + DL_FOREACH_SAFE(conn_mgr, ele, tmp) + { + struct timeval tv = ele->link_start_time; + struct tm *tm = localtime(&tv.tv_sec); + strftime(timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", tm); + who = sdscatprintf(who, "%s %s:%u", timestr, ele->peer_ipaddr, ele->peer_port_host_order); + if (stm->current_conn.current_evconn_ref == ele->conn) + { + who = sdscat(who, "\033[1m [current]\033[0m"); + } + who = sdscat(who, "\r\n"); + } + sdsIncrLen(who, -2); // delete last \r\n + struct monitor_reply *reply = monitor_reply_new_string("%s", who); + sdsfree(who); + return reply; +} + +static int stm_builtin_cmd_register(struct stellar_monitor *stm) +{ + int ret = 0; + ret += monitor_register_cmd(stm, "show command", monitor_server_builtin_show_command_cb, "readonly", "[ brief|verbose ]", "show all registered commands info", (void *)stm); + assert(ret == 0); + ret += monitor_register_cmd(stm, "who", monitor_server_builtin_who_cb, "readonly", "<cr>", "show who is logged on", (void *)stm); + assert(ret == 0); + ret += monitor_register_cmd(stm, "ping", monitor_server_builtin_ping_cb, "readonly", "[message]", "ping the server", (void *)stm); + assert(ret == 0); + return ret; +} + +struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor) +{ + if (__stm_libevevt_callback_thread_local_tid != pthread_self()) + { + return NULL; + } + return &monitor->current_conn; +} + +int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, uint16_t *peer_port) +{ + if (NULL == conn || conn->current_evconn_ref == NULL) + { + if (peer_ip) + { + *peer_ip = NULL; + } + if (peer_port) + { + *peer_port = 0; + } + return -1; + } + evhttp_connection_get_peer(conn->current_evconn_ref, peer_ip, peer_port); + return 0; +} + +void monitor_free(struct stellar_monitor *stm) +{ + STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "free and exit\n"); + stm_event_http_free(stm); + stm_stat_free(stm->stat); + stm_cmd_assistant_free(stm->aide); + stm_spinlock_free(stm->lock); + if (stm->rpc_ins_array) + { + for (int tid = 0; tid < stm->worker_thread_num; tid++) + { + monitor_rpc_free(stm->rpc_ins_array[tid]); + } + free(stm->rpc_ins_array); + } + + __thread_local_stm = NULL; + FREE(stm->config->output_path); + FREE(stm->config); + FREE(stm); +} + +struct stellar_monitor *monitor_module_to_monitor(struct module *monitor_module) +{ + if (monitor_module == NULL) + { + return NULL; + } + return (struct stellar_monitor *)module_get_ctx(monitor_module); +} + +struct stellar_monitor *stellar_module_get_monitor(struct module_manager *mod_mgr) +{ + assert(mod_mgr); + struct module *monitor_mod = module_manager_get_module(mod_mgr, MONITOR_MODULE_NAME); + return monitor_module_to_monitor(monitor_mod); +} + +void monitor_on_exit(struct module_manager *mod_mgr __attribute__((unused)), struct module *mod) +{ + if (mod) + { + struct stellar_monitor *stm = module_get_ctx(mod); + monitor_free(stm); + module_free(mod); + } +} + +struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh) +{ + struct stellar_monitor *stm = (struct stellar_monitor *)calloc(1, sizeof(struct stellar_monitor)); + stm->logger_ref = logh; + stm->mod_mgr_ref = mod_mgr; + + struct stellar_monitor_config *config = stellar_monitor_config_new(toml_file); + if (NULL == config) + { + STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "get config failed!\n"); + goto fail_exit; + } + stm->config = config; + + stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr); + + stm->lock = stm_spinlock_new(); + stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr); + assert(stm->worker_thread_num > 0); + stm->gettime_cb = gettimeofday; + stm->aide = stm_cmd_assistant_new(); + if (stm_event_http_init(stm) < 0) + { + STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "libevent http server init() failed!\n"); + goto fail_exit; + } + stm->stat = stm_stat_init(stm); + stm_builtin_cmd_register(stm); + stm_save_thread_local_context(stm); + pthread_create(&stm->evt_main_loop_tid, NULL, stm_event_main_loop, (void *)stm); + sds config_print = stm_config_print(config); + STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "config: %s\n", config_print); + sdsfree(config_print); + + stm->rpc_ins_array = (struct monitor_rpc **)calloc(stm->worker_thread_num, sizeof(struct monitor_rpc *)); + for (int tid = 0; tid < stm->worker_thread_num; tid++) + { + stm->rpc_ins_array[tid] = monitor_rpc_new(stm, mod_mgr); + if (stm->rpc_ins_array[tid] == NULL) + { + STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "rpc init failed\n"); + goto fail_exit; + } + } + + stm_save_thread_local_context(stm); + return stm; + +fail_exit: + monitor_free(stm); + return NULL; +} + +struct module *monitor_on_init(struct module_manager *mod_mgr) +{ + assert(mod_mgr); + const char *toml_file = module_manager_get_toml_path(mod_mgr); + assert(toml_file); + struct logger *logh = module_manager_get_logger(mod_mgr); + assert(logh); + + struct stellar_monitor *stm = monitor_new(toml_file, mod_mgr, logh); + struct module *stm_mod = module_new(MONITOR_MODULE_NAME, (void *)stm); + if (stm_mod == NULL) + { + STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "moudule new '%s' fail\n", MONITOR_MODULE_NAME); + monitor_free(stm); + return NULL; + } + + // show_session_enforcer_init(mod_mgr, stm); + // stm_pktdump_enforcer_init(mod_mgr, stm); + + STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "init success\n"); + return stm_mod; +} diff --git a/infra/monitor/monitor_spinlock.c b/infra/monitor/monitor_spinlock.c new file mode 100644 index 0000000..9614803 --- /dev/null +++ b/infra/monitor/monitor_spinlock.c @@ -0,0 +1,72 @@ +// https://www.cs.utexas.edu/~pingali/CS378/2015sp/lectures/Spinlocks%20and%20Read-Write%20Locks.htm +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stddef.h> +#include <pthread.h> + +#if 0 /* use gcc builtin function */ +struct stm_spinlock +{ + long value; +}; + +struct stm_spinlock *stm_spinlock_new(void) +{ + struct stm_spinlock *splock = (struct stm_spinlock *)calloc(1, sizeof(struct stm_spinlock)); + return splock; +} + +void stm_spinlock_lock(struct stm_spinlock *splock) +{ + while (__sync_lock_test_and_set(&splock->value, 1)) + { + } +} + +void stm_spinlock_unlock(struct stm_spinlock *splock) +{ + __sync_lock_release(&splock->value); +} + +void stm_spinlock_free(struct stm_spinlock *splock) +{ + if (splock) + { + free(splock); + } +} +#else /* pthread spin lock */ +struct stm_spinlock +{ + pthread_spinlock_t lock_ins; +}; + +struct stm_spinlock *stm_spinlock_new(void) +{ + struct stm_spinlock *splock = (struct stm_spinlock *)calloc(1, sizeof(struct stm_spinlock)); + pthread_spin_init(&splock->lock_ins, PTHREAD_PROCESS_PRIVATE); + return splock; +} + +void stm_spinlock_lock(struct stm_spinlock *splock) +{ + pthread_spin_lock(&splock->lock_ins); +} + +void stm_spinlock_unlock(struct stm_spinlock *splock) +{ + pthread_spin_unlock(&splock->lock_ins); +} + +void stm_spinlock_free(struct stm_spinlock *splock) +{ + if (splock) + { + pthread_spin_destroy(&splock->lock_ins); + free(splock); + } +} +#endif
\ No newline at end of file diff --git a/infra/monitor/monitor_spinlock.h b/infra/monitor/monitor_spinlock.h new file mode 100644 index 0000000..fb5acb2 --- /dev/null +++ b/infra/monitor/monitor_spinlock.h @@ -0,0 +1,8 @@ +#pragma once + +struct stm_spinlock; + +struct stm_spinlock *stm_spinlock_new(void); +void stm_spinlock_lock(struct stm_spinlock *splock); +void stm_spinlock_unlock(struct stm_spinlock *splock); +void stm_spinlock_free(struct stm_spinlock *splock); diff --git a/infra/monitor/monitor_stat.c b/infra/monitor/monitor_stat.c new file mode 100644 index 0000000..2216b09 --- /dev/null +++ b/infra/monitor/monitor_stat.c @@ -0,0 +1,76 @@ +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stddef.h> +#include <arpa/inet.h> +#include <getopt.h> +#include <evhttp.h> +#include "monitor_private.h" +#include <fieldstat/fieldstat_easy.h> + +static const char *stm_stat_field_name[] = { + "connection_new", + "connection_close", + "request_succ", + "request_err", + "response_succ", + "response_err", + NULL, +}; + +long long stm_get_stat_count(struct stm_stat *stat, enum stm_stat_type type) +{ + if ((int)type < 0 || type >= STM_STAT_MAX) + { + return 0; + } + return stat->counters[type].count; +} + +long long stm_get_stat_bytes(struct stm_stat *stat, enum stm_stat_type type) +{ + if ((int)type < 0 || type >= STM_STAT_MAX) + { + return 0; + } + return stat->counters[type].bytes; +} + +void stm_stat_update(struct stm_stat *stat, int thread_idx, enum stm_stat_type type, long long value) +{ + if ((int)type < 0 || type >= STM_STAT_MAX) + { + return; + } + fieldstat_easy_counter_incrby(stat->fs4_ins, thread_idx, stat->counters[type].counter_id, NULL, 0, value); +} + +struct stm_stat *stm_stat_init(struct stellar_monitor *stm) +{ + const struct stellar_monitor_config *config = stm->config; + assert(sizeof(stm_stat_field_name) / sizeof(stm_stat_field_name[0]) == STM_STAT_MAX + 1); + struct stm_stat *stat = CALLOC(struct stm_stat, 1); + /* worker thread count + 1, reserved for libevent callback thread context */ + stat->fs4_ins = fieldstat_easy_new(stm->worker_thread_num + 1, "monitor", NULL, 0); + for (int i = 0; stm_stat_field_name[i] != NULL; i++) + { + stat->counters[i].counter_id = fieldstat_easy_register_counter(stat->fs4_ins, stm_stat_field_name[i]); + } + fieldstat_easy_enable_auto_output(stat->fs4_ins, config->output_path, MAX(config->output_interval_ms / 1000, 1)); + return stat; +} + +void stm_stat_free(struct stm_stat *stat) +{ + if (NULL == stat) + { + return; + } + if (stat->fs4_ins) + { + fieldstat_easy_free(stat->fs4_ins); + } + FREE(stat); +}
\ No newline at end of file diff --git a/infra/monitor/monitor_transaction.c b/infra/monitor/monitor_transaction.c new file mode 100644 index 0000000..0f9aba8 --- /dev/null +++ b/infra/monitor/monitor_transaction.c @@ -0,0 +1,83 @@ +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stddef.h> +#include <pthread.h> +#include "uthash/utlist.h" +#include "monitor_private.h" + +/******************************************** connection manager *****************************************/ +struct stm_connection_manager *stm_connection_insert(struct evhttp_connection *evconn) +{ + struct stellar_monitor *stm = stellar_monitor_get(); + struct stm_connection_manager *conn_mgr = stm->connection_mgr; + struct stm_connection_manager *ele, *tmp; + DL_FOREACH_SAFE(conn_mgr, ele, tmp) // check if current connection already exist + { + if (ele->conn == evconn) + { + stm->gettime_cb(&ele->last_active_time, NULL); + return ele; + } + } + + stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_NEW, 1); + struct stm_connection_manager *new_conn = (struct stm_connection_manager *)calloc(1, sizeof(struct stm_connection_manager)); + char *tmp_ip_addr; + uint16_t tmp_port; + evhttp_connection_get_peer(evconn, &tmp_ip_addr, &tmp_port); + if (tmp_ip_addr) + { + strncpy(new_conn->peer_ipaddr, tmp_ip_addr, INET6_ADDRSTRLEN - 1); + } + new_conn->peer_port_host_order = tmp_port; + stm->gettime_cb(&new_conn->link_start_time, NULL); + new_conn->last_active_time = new_conn->link_start_time; + new_conn->conn = evconn; + DL_APPEND(stm->connection_mgr, new_conn); + return new_conn; +} + +void stm_connection_delete(struct evhttp_connection *evconn) +{ + struct stellar_monitor *stm = stellar_monitor_get(); + struct stm_connection_manager *conn_mgr = stm->connection_mgr; + struct stm_connection_manager *ele, *tmp; + DL_FOREACH_SAFE(conn_mgr, ele, tmp) + { + if (ele->conn == evconn) + { + DL_DELETE(conn_mgr, ele); + FREE(ele); + } + } + stm->connection_mgr = conn_mgr; +} + +void stm_connection_update(struct stm_connection_manager *conn_mgr, const struct evhttp_connection *evconn) +{ + struct stellar_monitor *stm = stellar_monitor_get(); + struct stm_connection_manager *ele, *tmp; + DL_FOREACH_SAFE(conn_mgr, ele, tmp) + { + if (ele->conn == evconn) + { + stm->gettime_cb(&ele->last_active_time, NULL); + } + } +} + +const struct stm_connection_manager *stm_connection_search(const struct stm_connection_manager *conn_mgr_head, const struct evhttp_connection *evconn) +{ + const struct stm_connection_manager *ele, *tmp; + DL_FOREACH_SAFE(conn_mgr_head, ele, tmp) + { + if (ele->conn == evconn) + { + return ele; + } + } + return NULL; +} diff --git a/infra/monitor/monitor_utils.c b/infra/monitor/monitor_utils.c new file mode 100644 index 0000000..5280feb --- /dev/null +++ b/infra/monitor/monitor_utils.c @@ -0,0 +1,672 @@ +#include "sds/sds.h" +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <ctype.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <netinet/udp.h> +#include <netinet/ip.h> +#include <netinet/ip6.h> +#include <netinet/in.h> +#include <threads.h> + +#ifdef __cplusplus +extern "C" +{ +#endif +#include "monitor_private.h" +#include "stellar/session.h" +#include "tuple/tuple.h" + + struct stm_key_value *stm_cmd_key_value_new(void) + { + struct stm_key_value *kv = (struct stm_key_value *)calloc(1, sizeof(struct stm_key_value)); + kv->tuple_num = 0; + kv->tuple = NULL; + return kv; + } + + void stm_cmd_key_value_append(struct stm_key_value **kv, const char *key, const char *value) + { + if (NULL == *kv) + { + *kv = stm_cmd_key_value_new(); + } + struct stm_key_value *new_kv = *kv; + new_kv->tuple = (struct stm_key_value_tuple *)realloc(new_kv->tuple, (new_kv->tuple_num + 1) * sizeof(struct stm_key_value_tuple)); + new_kv->tuple[new_kv->tuple_num].key = strdup(key); + new_kv->tuple[new_kv->tuple_num].value = strdup(value); + new_kv->tuple_num++; + *kv = new_kv; + } + + void stm_cmd_key_value_free(struct stm_key_value *kv) + { + if (NULL == kv) + { + return; + } + for (int i = 0; i < kv->tuple_num; i++) + { + FREE(kv->tuple[i].key); + FREE(kv->tuple[i].value); + } + FREE(kv->tuple); + FREE(kv); + } + + void stm_cmd_request_free(struct stm_cmd_request *req) + { + FREE(req); + } + + int stm_strncasecmp_exactly(const char *s1, const char *s2, size_t n) + { + if (NULL == s1 || NULL == s2) + { + return -1; + } + size_t len1 = strlen(s1); + size_t len2 = strlen(s2); + + if (len1 != len2 || len1 != n) + { + return -1; + } + return strncasecmp(s1, s2, n); + } + + const char *stm_session_state_ntop(enum session_state state) + { + switch (state) + { + case SESSION_STATE_OPENING: + return "opening"; + break; + case SESSION_STATE_ACTIVE: + return "active"; + break; + + case SESSION_STATE_CLOSING: + return "closing"; + break; + + default: + break; + } + return "unknown"; + } + + const char *stm_session_flow_dir_ntop(uint32_t dir) + { + if (SESSION_SEEN_C2S_FLOW == dir) + { + return "C2S"; + } + else if (SESSION_SEEN_S2C_FLOW == dir) + { + return "S2C"; + } + else if ((SESSION_SEEN_C2S_FLOW | SESSION_SEEN_S2C_FLOW) == dir) + { + return "BIDIRECTION"; + } + return "UNKNOWN"; + } + + int stm_time_range_pton(const char *time_str, time_t now, time_t time_range[2]) + { + const char *delim = "-"; + char *save_ptr; + const char *time_str_min_example = "last-1-days"; + if (NULL == time_str || NULL == time_range) + { + return -1; + } + if (strlen(time_str) < strlen(time_str_min_example)) + { + return -1; + } + char *local_time_str = strdup(time_str); + + char *fix_prefix = strtok_r(local_time_str, delim, &save_ptr); + if (stm_strncasecmp_exactly(fix_prefix, "last", 4) != 0) + { + free(local_time_str); + return -1; + } + + char *number_string = strtok_r(NULL, delim, &save_ptr); + if (NULL == number_string) + { + free(local_time_str); + return -1; + } + long number = strtol(number_string, NULL, 10); + if (number <= 0 || number > 3600) + { + free(local_time_str); + return -1; + } + + char *unit_string = strtok_r(NULL, delim, &save_ptr); + if (NULL == unit_string) + { + free(local_time_str); + return -1; + } + long long multiple_second = 1; + if (stm_strncasecmp_exactly(unit_string, "seconds", 7) == 0) + { + multiple_second = 1; + } + else if (stm_strncasecmp_exactly(unit_string, "minutes", 7) == 0) + { + multiple_second = 60; + } + else if (stm_strncasecmp_exactly(unit_string, "hours", 5) == 0) + { + multiple_second = 60 * 60; + } + else if (stm_strncasecmp_exactly(unit_string, "days", 4) == 0) + { + multiple_second = 60 * 60 * 24; + } + else + { + free(local_time_str); + return -1; + } + + if ((long long)now < number * multiple_second) + { + time_range[0] = 0; + } + else + { + time_range[0] = now - number * multiple_second; + } + time_range[1] = now; + while (strtok_r(NULL, delim, &save_ptr)) + { + } + free(local_time_str); + return 0; + } + + int stm_time_in_range(time_t t, const time_t time_range[2]) + { + if (NULL == time_range) + { + return 0; + } + if (t >= time_range[0] && t <= time_range[1]) + { + return 1; + } + return 0; + } + + uint32_t stm_inet_pton(const char *ipstr, void *ipv4_value, void *ipv6_value) + { + unsigned int tmp_ipv4; + struct in6_addr tmp_ipv6; + if (NULL == ipstr || NULL == ipv4_value || NULL == ipv6_value) + { + return 0; + } + if (inet_pton(AF_INET, ipstr, &tmp_ipv4) == 1) + { + memcpy(ipv4_value, &tmp_ipv4, sizeof(int)); + return AF_INET; + } + if (inet_pton(AF_INET6, ipstr, &tmp_ipv6) == 1) + { + memcpy(ipv6_value, &tmp_ipv6, sizeof(struct in6_addr)); + return AF_INET6; + } + return 0; + } + + /** + * Calculate a 128-bit mask given a network prefix. + */ + void in6_addr_mask(struct in6_addr *mask, uint8_t bits) + { + for (uint8_t i = 0; i < sizeof(mask->s6_addr); i++) + { + mask->s6_addr[i] = bits ? (uint8_t)0xFF << (8 - (bits > 8 ? 8 : bits)) : 0; + + if (bits < 8) + bits = 0; + else + bits -= 8; + } + } + + /** + * Calculate the first address in a network given a mask. + */ + void in6_addr_network(struct in6_addr *network, const struct in6_addr *addr, const struct in6_addr *mask) + { + for (uint8_t i = 0; i < sizeof(network->s6_addr); i++) + { + network->s6_addr[i] = addr->s6_addr[i] & mask->s6_addr[i]; + } + } + + /** + * Calculate the last address in a network given a mask. + */ + void in6_addr_end(struct in6_addr *end, const struct in6_addr *addr, const struct in6_addr *mask) + { + for (uint8_t i = 0; i < sizeof(end->s6_addr); i++) + { + end->s6_addr[i] = (addr->s6_addr[i] & mask->s6_addr[i]) | ~mask->s6_addr[i]; + } + } + + int stm_ipv4_cidr_to_range(uint32_t ipaddr, uint32_t ipmask, uint32_t iprange[2]) + { + uint32_t network_addr = ipaddr & ipmask; + uint32_t broadcast_addr = (ipaddr & ipmask) | ~ipmask; + iprange[0] = network_addr; + iprange[1] = broadcast_addr; + return 0; + } + + int stm_ipv6_cidr_to_range(const struct in6_addr *ipaddr, const struct in6_addr *ipmask, struct in6_addr iprange[2]) + { + struct in6_addr network_addr = {}; + struct in6_addr broadcast_addr = {}; + in6_addr_network(&network_addr, ipaddr, ipmask); + in6_addr_end(&broadcast_addr, ipaddr, ipmask); + memcpy(&iprange[0], &network_addr, sizeof(struct in6_addr)); + memcpy(&iprange[1], &broadcast_addr, sizeof(struct in6_addr)); + + return 0; + } + + uint32_t stm_ip_cidr_pton(const char *ipcidr, void *ipv4_value, void *ipv4_mask, void *ipv6_value, void *ipv6_mask) + { + int ipver = 0; + const char *delim = "/"; + char *save_ptr; + + if (NULL == ipcidr || NULL == ipv4_value || NULL == ipv4_mask || NULL == ipv6_value || NULL == ipv6_mask) + { + return 0; + } + + if (strchr(ipcidr, '/') == NULL) + { + return 0; + } + char *local_ip_cidr = strdup(ipcidr); + + char *ipaddr = strtok_r(local_ip_cidr, delim, &save_ptr); + ipver = stm_inet_pton(ipaddr, ipv4_value, ipv6_value); + if (ipver != AF_INET && ipver != AF_INET6) + { + free(local_ip_cidr); + return 0; + } + + char *cidr = strtok_r(NULL, delim, &save_ptr); + if (NULL == cidr) + { + free(local_ip_cidr); + return 0; + } + int cidr_num = atoi(cidr); + if (ipver == AF_INET) + { + if (cidr_num <= 0 || cidr_num > 32) + { + free(local_ip_cidr); + return 0; + } + uint32_t mask = 0; + for (int i = 0; i < cidr_num; i++) + { + mask |= (uint32_t)1 << (31 - i); + } + mask = ntohl(mask); + memcpy(ipv4_mask, &mask, sizeof(int)); + } + else if (ipver == AF_INET6) + { + if (cidr_num <= 0 || cidr_num > 128) + { + free(local_ip_cidr); + return -1; + } + struct in6_addr mask = {}; + for (int i = 0; i < cidr_num; i++) + { + mask.s6_addr[i / 8] |= 1 << (7 - i % 8); + } + memcpy(ipv6_mask, &mask, sizeof(struct in6_addr)); + } + + while (strtok_r(NULL, delim, &save_ptr)) + ; + + free(local_ip_cidr); + return ipver; + } + + void stm_mem_fill_rand(char *buf, size_t len, unsigned char range_min, unsigned char range_max) + { + unsigned char *p = (unsigned char *)buf; + for (size_t i = 0; i < len; i++) + { + p[i] = (rand() % range_min + 1) + (range_max - range_min); + } + } + + int stm_string_isdigit(const char *str) + { + if (NULL == str || strlen(str) == 0) + { + return 0; + } + for (size_t i = 0; i < strlen(str); i++) + { + if (!isdigit(str[i])) + { + return 0; + } + } + return 1; + } + + /* + * input("hello, world!", "hello", "hi", &output) -> output = "hi, world!" + * return: the number of sub string have been replaced. + */ + int stm_replace_str(const char *raw, const char *search, const char *replace, char **outline) + { + const char *p = NULL; + int search_len = strlen(search); + int replace_len = strlen(replace); + int raw_len = strlen(raw); + + p = strstr(raw, search); + if (p == NULL) + { + return 0; + } + const char *remain_ptr = p + search_len; + char *new_line = (char *)calloc(1, replace_len + (raw_len - search_len) + 1); + strcpy(new_line, replace); + strcpy(new_line + replace_len, remain_ptr); + + *outline = new_line; + return 1; + } + + int stm_timeout(struct timeval start, struct timeval end, int timeout_sec) + { + return (end.tv_sec * 1000 * 1000 + end.tv_usec) - (start.tv_sec * 1000 * 1000 + start.tv_usec) >= timeout_sec * 1000 * 1000; + } + + struct monitor_reply *monitor_reply_nil(void) + { + struct monitor_reply *reply = CALLOC(struct monitor_reply, 1); + reply->type = MONITOR_REPLY_NIL; + reply->http_code = HTTP_OK; + reply->http_reason = "OK"; + return reply; + } + + /* string should without "\r\n", will add "\r\n" in monitor_reply_to_string() */ + struct monitor_reply *monitor_reply_new_string(const char *format, ...) + { + struct monitor_reply *reply = CALLOC(struct monitor_reply, 1); + reply->type = MONITOR_REPLY_STRING; + reply->http_code = HTTP_OK; + reply->http_reason = "OK"; + va_list ap; + va_start(ap, format); + reply->str = sdscatvprintf(sdsempty(), format, ap); + reply->len = strlen(reply->str); + va_end(ap); + return reply; + } + + /* string should without "\r\n", will add "\r\n" in monitor_reply_to_string() */ + struct monitor_reply *monitor_reply_new_error(const char *format, ...) + { + struct monitor_reply *reply = CALLOC(struct monitor_reply, 1); + reply->type = MONITOR_REPLY_ERROR; + reply->http_code = HTTP_BADREQUEST; + reply->http_reason = "ERROR"; + va_list ap; + va_start(ap, format); + reply->str = sdscatvprintf(sdsempty(), format, ap); + reply->len = strlen(reply->str); + va_end(ap); + return reply; + } + + struct monitor_reply *monitor_reply_new_status(const char *format, ...) + { + struct monitor_reply *reply = CALLOC(struct monitor_reply, 1); + reply->type = MONITOR_REPLY_STATUS; + reply->http_code = HTTP_OK; + reply->http_reason = "OK"; + va_list ap; + va_start(ap, format); + reply->str = sdscatvprintf(sdsempty(), format, ap); + reply->len = strlen(reply->str); + va_end(ap); + return reply; + } + + struct monitor_reply *monitor_reply_new_integer(long long integer) + { + struct monitor_reply *reply = CALLOC(struct monitor_reply, 1); + reply->type = MONITOR_REPLY_INTEGER; + reply->http_code = HTTP_OK; + reply->http_reason = "OK"; + reply->integer = integer; + return reply; + } + + struct monitor_reply *monitor_reply_new_double(double dval) + { + struct monitor_reply *reply = CALLOC(struct monitor_reply, 1); + reply->type = MONITOR_REPLY_DOUBLE; + reply->http_code = HTTP_OK; + reply->http_reason = "OK"; + reply->dval = dval; + return reply; + } + + sds monitor_reply_to_string(const struct monitor_reply *reply) + { + sds res = sdsempty(); + switch (reply->type) + { + case MONITOR_REPLY_INTEGER: + res = sdscatprintf(res, "(integer) %lld\r\n", reply->integer); + break; + case MONITOR_REPLY_DOUBLE: + res = sdscatprintf(res, "(double) %f\r\n", reply->dval); + break; + case MONITOR_REPLY_STRING: + case MONITOR_REPLY_STATUS: + res = sdscatlen(res, reply->str, reply->len); + res = sdscat(res, "\r\n"); + break; + case MONITOR_REPLY_NIL: + res = sdscat(res, "(nil)\r\n"); + break; + case MONITOR_REPLY_ERROR: + res = sdscatprintf(res, "(error) %s\r\n", reply->str); + break; + default: + break; + } + return res; + } + + void monitor_reply_free(struct monitor_reply *reply) + { + switch (reply->type) + { + case MONITOR_REPLY_STRING: + case MONITOR_REPLY_ERROR: + case MONITOR_REPLY_STATUS: + sdsfree(reply->str); + reply->str = NULL; + break; + default: + break; + } + free(reply); + } + + static struct monitor_cli_args *monitor_util_search_cmd_args(const char *opt_name, const struct monitor_cli_args cli_args[], size_t cli_args_array_size) + { + if (NULL == cli_args) + { + return NULL; + } + for (size_t i = 0; i < cli_args_array_size; i++) + { + if (stm_strncasecmp_exactly(opt_name, cli_args[i].short_opt, strlen(cli_args[i].short_opt)) == 0 || stm_strncasecmp_exactly(opt_name, cli_args[i].long_opt, strlen(cli_args[i].long_opt)) == 0) + { + return (struct monitor_cli_args *)&cli_args[i]; + } + } + return NULL; + } + + sds monitor_util_copy_arg_value(int argc, const char *argv[], int *copy_args_num) + { + sds res = sdsempty(); + int num = 0; + for (int i = 0; i < argc; i++) + { + if ((strncmp(argv[i], "-", 1) == 0) || (strncmp(argv[i], "--", 2) == 0)) + { + break; + } + else + { + res = sdscat(res, argv[i]); + if ((i + 1 < argc) && (strncmp(argv[i + 1], "-", 1) != 0) && (strncmp(argv[i + 1], "--", 2) != 0)) // not the last one + { + res = sdscat(res, " "); + } + num++; + } + } + *copy_args_num = num; + return res; + } + + int monitor_util_parse_cmd_args(int argc, const char *argv[], struct monitor_cli_args cli_args[], size_t cli_args_array_size) + { + if (argc <= 0 || NULL == argv || NULL == cli_args) + { + return -1; + } + int parse_stage = 0; // 0: arg; 1: value + struct monitor_cli_args *one_arg = NULL; + for (int i = 1; i < argc;) // skip program self name + { + if (0 == parse_stage) + { + one_arg = monitor_util_search_cmd_args(argv[i], cli_args, cli_args_array_size); + if (NULL == one_arg) + { + fprintf(stderr, "unknown option: %s\n", argv[i]); + return -1; + } + if (one_arg->require_arg_value && (i + 1 >= argc)) + { + fprintf(stderr, "option requires value: %s\n", argv[i]); + return -1; + } + parse_stage = 1; + i += 1; + } + else + { + if (NULL == one_arg) + { + fprintf(stderr, "unknown option: %s\n", argv[i]); + return -1; + } + int copy_args_num = 0; + if (one_arg->value_is_multi_words) + { + one_arg->value = monitor_util_copy_arg_value(argc - i, &argv[i], ©_args_num); + } + else + { + one_arg->value = sdsnew(argv[i]); + copy_args_num = 1; + } + i += copy_args_num; + parse_stage = 0; + one_arg = NULL; + } + } + return 0; + } + + sds stm_config_print(const struct stellar_monitor_config *config) + { + sds res = sdsempty(); + res = sdscatprintf(res, "cli_request_timeout: %ds, ", config->cli_request_timeout); + res = sdscatprintf(res, "connection_idle_timeout: %ds, ", config->connection_idle_timeout); + res = sdscatprintf(res, "listen_ip_addr: %s, ", config->listen_ipaddr); + res = sdscatprintf(res, "listen_port %u, ", config->listen_port_host_order); + res = sdscatprintf(res, "data_link_bind_port: %u, ", config->data_link_bind_port_host_order); + res = sdscatprintf(res, "stat_output_path: %s, ", config->output_path); + res = sdscatprintf(res, "stat_output_interval_ms: %dms ", config->output_interval_ms); + return res; + } + + char *stm_http_url_encode(const char *originalText) + { + // allocate memory for the worst possible case (all characters need to be encoded) + char *encodedText = (char *)malloc(sizeof(char) * strlen(originalText) * 3 + 1); + const char *hex = "0123456789abcdef"; + int pos = 0; + for (size_t i = 0; i < strlen(originalText); i++) + { + if (('a' <= originalText[i] && originalText[i] <= 'z') || ('A' <= originalText[i] && originalText[i] <= 'Z') || ('0' <= originalText[i] && originalText[i] <= '9')) + { + encodedText[pos++] = originalText[i]; + } + else if (originalText[i] == ' ') + { + encodedText[pos++] = '%'; + encodedText[pos++] = hex[originalText[i] >> 4]; + encodedText[pos++] = hex[originalText[i] & 15]; + } + /* todo: other characters ? */ + else + { + encodedText[pos++] = originalText[i]; + } + } + encodedText[pos] = '\0'; + return encodedText; + } + + const char *stm_get0_readable_session_addr(const struct tuple6 *addr, char *addr_buf, size_t max_buf_len) + { + tuple4_to_str((struct tuple4 *)addr, addr_buf, max_buf_len); + return addr_buf; + } + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/infra/monitor/monitor_utils.h b/infra/monitor/monitor_utils.h new file mode 100644 index 0000000..2ccecd2 --- /dev/null +++ b/infra/monitor/monitor_utils.h @@ -0,0 +1,30 @@ +#pragma once +#include <stdint.h> +#include <stddef.h> +#include <time.h> +#include <netinet/in.h> +#include "stellar/session.h" +#include "tuple/tuple.h" +#ifdef __cplusplus +extern "C" +{ +#endif + int stm_replace_str(const char *raw, const char *search, const char *replace, char **outline); + int stm_strncasecmp_exactly(const char *s1, const char *s2, size_t n); + struct stm_cmd_reply *stm_build_reply_error(const char *format, ...); + struct stm_cmd_reply *stm_cmd_reply_dup(const struct stm_cmd_reply *src); + const char *stm_session_flow_dir_ntop(uint32_t dir); + int stm_time_range_pton(const char *time_str, time_t now, time_t time_range[2]); + int stm_time_in_range(time_t t, const time_t time_range[2]); + uint32_t stm_inet_pton(const char *ipstr, void *ipv4_value, void *ipv6_value); + uint32_t stm_ip_cidr_pton(const char *ipcidr, void *ipv4_value, void *ipv4_mask, void *ipv6_value, void *ipv6_mask); + int stm_ipv4_cidr_to_range(uint32_t ipaddr, uint32_t ipmask, uint32_t iprange[2]); + int stm_ipv6_cidr_to_range(const struct in6_addr *ipaddr, const struct in6_addr *ipmask, struct in6_addr iprange[2]); + void stm_mem_fill_rand(char *buf, size_t len, unsigned char range_min, unsigned char range_max); + int stm_string_isdigit(const char *str); + int stm_timeout(struct timeval start, struct timeval end, int timeout_sec); + const char *stm_session_state_ntop(enum session_state state); + const char *stm_get0_readable_session_addr(const struct tuple6 *addr, char *addr_buf, size_t max_buf_len); +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/infra/monitor/version.map b/infra/monitor/version.map new file mode 100644 index 0000000..5958f4c --- /dev/null +++ b/infra/monitor/version.map @@ -0,0 +1,10 @@ +VERS_3.0{ +global: + extern "C" { + stellar_monitor_*; + monitor_*; + stm_*; + stellar_module_get_monitor; + }; + local: *; +}; diff --git a/infra/packet_manager/packet_internal.h b/infra/packet_manager/packet_internal.h index 1dbfab2..d9b3fe9 100644 --- a/infra/packet_manager/packet_internal.h +++ b/infra/packet_manager/packet_internal.h @@ -8,7 +8,7 @@ extern "C" #include <stdbool.h> #include <sys/queue.h> -#include "tuple.h" +#include "tuple/tuple.h" #include "stellar/packet.h" #define PACKET_MAX_LAYERS 32 diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 21fb779..3691805 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -593,5 +593,15 @@ void session_manager_on_thread_exit(struct module_manager *mod_mgr, int thread_i { struct session_manager *sess_mgr = module_get_ctx(mod); assert(sess_mgr); + assert(thread_id < (int)sess_mgr->cfg->thread_num); + session_manager_clean(sess_mgr, thread_id); +} + +// temp add for show session command +struct session_manager_rte *session_manager_get_runtime(struct session_manager *sess_mgr, uint16_t thread_id) +{ + assert(sess_mgr); + assert(thread_id < sess_mgr->cfg->thread_num); + return sess_mgr->rte[thread_id]; }
\ No newline at end of file diff --git a/infra/session_manager/session_manager_rte.h b/infra/session_manager/session_manager_rte.h index de8c09c..f04dceb 100644 --- a/infra/session_manager/session_manager_rte.h +++ b/infra/session_manager/session_manager_rte.h @@ -7,6 +7,7 @@ extern "C" #include "tuple.h" #include "stellar/session.h" +#include "session_manager_cfg.h" enum session_scan_flags { diff --git a/infra/version.map b/infra/version.map index c66ef63..616f18b 100644 --- a/infra/version.map +++ b/infra/version.map @@ -74,5 +74,7 @@ global: lpi_plus_exit; lpi_plus_appid_subscribe; + monitor_on_init; + monitor_on_exit; local: *; }; |
