summaryrefslogtreecommitdiff
path: root/infra/monitor
diff options
context:
space:
mode:
authorlijia <[email protected]>2024-10-18 16:47:51 +0800
committerlijia <[email protected]>2024-11-07 18:30:58 +0800
commite734af76d81b07090c618b1c4af3b2fdd6b592f3 (patch)
treec9b894fb0eaa9c56bd5f04bfab5628a97592091d /infra/monitor
parent99a68d5c9efe500ab339165a2af515a0a7355ada (diff)
rebase onto develop-2.0
Diffstat (limited to 'infra/monitor')
-rw-r--r--infra/monitor/CMakeLists.txt22
-rw-r--r--infra/monitor/enforcer/CMakeLists.txt11
-rw-r--r--infra/monitor/enforcer/show_session_enforcer.c818
-rw-r--r--infra/monitor/monitor_cmd_assistant.c537
-rw-r--r--infra/monitor/monitor_cmd_assistant.h60
-rw-r--r--infra/monitor/monitor_private.h331
-rw-r--r--infra/monitor/monitor_ringbuf.c137
-rw-r--r--infra/monitor/monitor_ringbuf.h30
-rw-r--r--infra/monitor/monitor_rpc.c115
-rw-r--r--infra/monitor/monitor_rpc.h10
-rw-r--r--infra/monitor/monitor_server.c591
-rw-r--r--infra/monitor/monitor_spinlock.c72
-rw-r--r--infra/monitor/monitor_spinlock.h8
-rw-r--r--infra/monitor/monitor_stat.c76
-rw-r--r--infra/monitor/monitor_transaction.c83
-rw-r--r--infra/monitor/monitor_utils.c672
-rw-r--r--infra/monitor/monitor_utils.h30
-rw-r--r--infra/monitor/version.map10
18 files changed, 3613 insertions, 0 deletions
diff --git a/infra/monitor/CMakeLists.txt b/infra/monitor/CMakeLists.txt
new file mode 100644
index 0000000..0588f8c
--- /dev/null
+++ b/infra/monitor/CMakeLists.txt
@@ -0,0 +1,22 @@
+# add_subdirectory(enforcer)
+
+add_library(monitor
+ monitor_cmd_assistant.c
+ monitor_transaction.c
+ monitor_server.c
+ monitor_utils.c
+ monitor_stat.c
+ monitor_spinlock.c
+ monitor_ringbuf.c
+ monitor_rpc.c
+)
+
+include_directories(${CMAKE_SOURCE_DIR}/include/)
+include_directories(${CMAKE_SOURCE_DIR}/deps)
+include_directories(${CMAKE_SOURCE_DIR}/infra)
+target_include_directories(monitor PUBLIC ${CMAKE_CURRENT_LIST_DIR})
+
+set_target_properties(monitor PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/infra/monitor/version.map")
+set_target_properties(monitor PROPERTIES PREFIX "")
+target_link_libraries(monitor toml sds linenoise tuple session_manager libevent-static libevent-static cjson-static ringbuf)
+target_link_options(monitor PRIVATE -rdynamic) \ No newline at end of file
diff --git a/infra/monitor/enforcer/CMakeLists.txt b/infra/monitor/enforcer/CMakeLists.txt
new file mode 100644
index 0000000..2691059
--- /dev/null
+++ b/infra/monitor/enforcer/CMakeLists.txt
@@ -0,0 +1,11 @@
+add_library(monitor_enforcer STATIC show_session_enforcer.c )
+include_directories(${CMAKE_SOURCE_DIR}/include/)
+include_directories(${CMAKE_SOURCE_DIR}/deps)
+include_directories(${CMAKE_SOURCE_DIR}/deps/uthash)
+include_directories(${CMAKE_SOURCE_DIR}/deps/timeout)
+include_directories(${CMAKE_SOURCE_DIR}/infra)
+include_directories(${CMAKE_SOURCE_DIR}/infra/tuple/)
+include_directories(${CMAKE_SOURCE_DIR}/infra/packet_manager)
+include_directories(${CMAKE_SOURCE_DIR}/infra/tcp_reassembly)
+
+target_include_directories(monitor_enforcer PUBLIC ${CMAKE_CURRENT_LIST_DIR}) \ No newline at end of file
diff --git a/infra/monitor/enforcer/show_session_enforcer.c b/infra/monitor/enforcer/show_session_enforcer.c
new file mode 100644
index 0000000..f72218a
--- /dev/null
+++ b/infra/monitor/enforcer/show_session_enforcer.c
@@ -0,0 +1,818 @@
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <stddef.h>
+#include <assert.h>
+#include "stellar/session.h"
+#include "stellar/monitor.h"
+#include "session_manager/session_manager_rte.h"
+#include "monitor/monitor_utils.h"
+#include "monitor/monitor_rpc.h"
+#include "sds/sds.h"
+#include "session_manager/session_internal.h"
+
+// temp add
+extern struct session_manager_rte *session_manager_get_runtime(struct session_manager *sess_mgr, uint16_t thread_id);
+
+#define SHOW_SESSION_BRIEF_LIMIT_DEFAULT 10
+#define SHOW_SESSION_BRIEF_LIMIT_MAX 1000
+#define SHOW_SESSION_BRIEF_SCAN_COUNT_DEFAULT 100
+
+/* show session brief */
+struct stm_show_session_brief_opt
+{
+#define SESSION_SCAN_SNET (1 << 25)
+#define SESSION_SCAN_DNET (1 << 26)
+#define SESSION_SCAN_ID (1 << 27)
+#define SESSION_SCAN_CURSOR (1 << 28)
+#define SESSION_SCAN_COUNT (1 << 29)
+#define SESSION_SCAN_LIMIT (1 << 30)
+ struct session_scan_opts scan_opt;
+ uint32_t limit;
+};
+
+/* show session detail <id> */
+struct stm_show_session_detail_opt
+{
+ uint64_t sess_id;
+ int thread_idx; // todo, not used now
+};
+
+struct show_session_brief_result
+{
+ uint64_t sid;
+ int thread_index;
+ enum session_state state;
+ enum session_type protocol;
+ time_t create_time_in_sec;
+ time_t last_pkt_time_in_sec;
+ uint8_t flow_dir;
+ struct tuple6 addr;
+} __attribute__((packed));
+
+struct show_session_brief_array
+{
+ size_t array_num;
+ struct show_session_brief_result array[0]; /* Continuous memory, array_num * sizeof(struct show_session_brief_result) */
+} __attribute__((packed));
+
+struct show_session_detail_result
+{
+ struct show_session_brief_result sess_brief;
+ unsigned long long sess_stat[MAX_FLOW_TYPE][MAX_STAT];
+ enum session_direction direction; // in or out
+ unsigned char application; // L7 appid
+ // todo, other info
+} __attribute__((packed));
+
+static void stm_session_brief_cli_args_set_default(struct stm_show_session_brief_opt *show_opt)
+{
+ memset(show_opt, 0, sizeof(struct stm_show_session_brief_opt));
+ show_opt->limit = SHOW_SESSION_BRIEF_LIMIT_DEFAULT;
+ show_opt->scan_opt.cursor = 0;
+ show_opt->scan_opt.count = SHOW_SESSION_BRIEF_SCAN_COUNT_DEFAULT;
+}
+
+static uint32_t stm_session_brief_cli_args_get_flags(const char *para_name)
+{
+ if (NULL == para_name)
+ {
+ return 0;
+ }
+
+ if (strncasecmp(para_name, "id", 2) == 0)
+ {
+ return SESSION_SCAN_ID;
+ }
+ else if (strncasecmp(para_name, "cursor", 6) == 0)
+ {
+ return SESSION_SCAN_CURSOR;
+ }
+ else if (strncasecmp(para_name, "count", 5) == 0)
+ {
+ return SESSION_SCAN_COUNT;
+ }
+ else if (strncasecmp(para_name, "state", 5) == 0)
+ {
+ return SESSION_SCAN_STATE;
+ }
+ else if (strncasecmp(para_name, "sip", 3) == 0)
+ {
+ return SESSION_SCAN_SIP;
+ }
+ else if (strncasecmp(para_name, "dip", 3) == 0)
+ {
+ return SESSION_SCAN_DIP;
+ }
+ else if (strncasecmp(para_name, "snet", 4) == 0)
+ {
+ return SESSION_SCAN_SNET;
+ }
+ else if (strncasecmp(para_name, "dnet", 4) == 0)
+ {
+ return SESSION_SCAN_DNET;
+ }
+ else if (strncasecmp(para_name, "sport", 5) == 0)
+ {
+ return SESSION_SCAN_SPORT;
+ }
+ else if (strncasecmp(para_name, "dport", 5) == 0)
+ {
+ return SESSION_SCAN_DPORT;
+ }
+ else if (strncasecmp(para_name, "protocol", 8) == 0)
+ {
+ return SESSION_SCAN_TYPE;
+ }
+ else if (strncasecmp(para_name, "limit", 5) == 0)
+ {
+ return SESSION_SCAN_LIMIT;
+ }
+ else if (strncasecmp(para_name, "create-time-in", strlen("create-time-in")) == 0)
+ {
+ return SESSION_SCAN_CREATE_TIME;
+ }
+ else if (strncasecmp(para_name, "last-pkt-time-in", strlen("last-pkt-time-in")) == 0)
+ {
+ return SESSION_SCAN_LASPKT_TIME;
+ }
+ return 0;
+}
+
+static enum session_state show_session_state_pton(const char *state_str)
+{
+ if (strncasecmp(state_str, "opening", strlen("opening")) == 0)
+ {
+ return SESSION_STATE_OPENING;
+ }
+ else if (strncasecmp(state_str, "active", strlen("active")) == 0)
+ {
+ return SESSION_STATE_ACTIVE;
+ }
+ else if (strncasecmp(state_str, "closing", strlen("closing")) == 0)
+ {
+ return SESSION_STATE_CLOSING;
+ }
+
+ return MAX_STATE;
+}
+
+static uint32_t show_session_ipaddr_ntop(const char *ip_string, struct session_scan_opts *scan_opt, uint32_t flag)
+{
+ uint32_t addr_family;
+ if (SESSION_SCAN_SIP == flag)
+ {
+ addr_family = stm_inet_pton(ip_string, &scan_opt->src_addr[0].v4, &scan_opt->src_addr[0].v6);
+ }
+ else
+ {
+ addr_family = stm_inet_pton(ip_string, &scan_opt->dst_addr[0].v4, &scan_opt->dst_addr[0].v6);
+ }
+
+ if (addr_family == 0)
+ {
+ return 0;
+ }
+ if (AF_INET == addr_family)
+ {
+ if (SESSION_SCAN_SIP == flag)
+ {
+ scan_opt->src_addr[1].v4 = scan_opt->src_addr[0].v4;
+ }
+ else
+ {
+ scan_opt->dst_addr[1].v4 = scan_opt->dst_addr[0].v4;
+ }
+ }
+ else
+ {
+ if (SESSION_SCAN_SIP == flag)
+ {
+ scan_opt->src_addr[1].v6 = scan_opt->src_addr[0].v6;
+ }
+ else
+ {
+ scan_opt->dst_addr[1].v6 = scan_opt->dst_addr[0].v6;
+ }
+ }
+ return addr_family;
+}
+
+static sds show_session_cli_args_sanity_check(const struct stm_show_session_brief_opt *brief_opt)
+{
+ uint32_t flags = brief_opt->scan_opt.flags;
+ sds ss = sdsempty();
+
+ if ((flags & SESSION_SCAN_SIP) && (flags & SESSION_SCAN_SNET))
+ {
+ ss = sdscatprintf(ss, "error: the 'sip' and 'snet' options conflict!");
+ return ss;
+ }
+ if ((flags & SESSION_SCAN_DIP) && (flags & SESSION_SCAN_DNET))
+ {
+ ss = sdscatprintf(ss, "error: the 'dip' and 'dnet' options conflict!");
+ return ss;
+ }
+ return ss;
+}
+
+static int show_session_is_help(int argc, char *argv[])
+{
+ for (int i = 0; i < argc; i++)
+ {
+ if (strncasecmp(argv[i], "help", 4) == 0)
+ {
+ return 1;
+ }
+ if (strncasecmp(argv[i], "--help", 6) == 0)
+ {
+ return 1;
+ }
+ if (strncasecmp(argv[i], "-h", 2) == 0)
+ {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static struct monitor_reply *show_session_brief_usage(void)
+{
+ sds ss = sdsempty();
+ ss = sdscatprintf(ss, "Usage: show session brief [options]\n");
+ ss = sdscatprintf(ss, "Options:\n");
+ ss = sdscatprintf(ss, " -h, --help help\tdisplay usage information and exit\n");
+ ss = sdscatprintf(ss, " cursor <cursor>\n");
+ ss = sdscatprintf(ss, " count <count>\n");
+ ss = sdscatprintf(ss, " state <opening|active|closing>\n");
+ ss = sdscatprintf(ss, " protocol <tcp|udp>\n");
+ ss = sdscatprintf(ss, " sip <source ip>\n");
+ ss = sdscatprintf(ss, " dip <destination ip>\n");
+ ss = sdscatprintf(ss, " sport <source port>\n");
+ ss = sdscatprintf(ss, " dport <destination port>\n");
+ ss = sdscatprintf(ss, " snet <source network/netmask>\texample 192.168.1.0/24\n");
+ ss = sdscatprintf(ss, " dnet <destination network/netmask>\texample 1234::abcd/48\n");
+ ss = sdscatprintf(ss, " create-time-in <last-N-[seconds|minutes|hours|days]>\texample last-1-hours\n");
+ ss = sdscatprintf(ss, " last-pkt-time-in <last-N-[seconds|minutes|hours|days]>\texample last-7-days\n");
+ ss = sdscatprintf(ss, " limit <limit>\n");
+
+ struct monitor_reply *reply = monitor_reply_new_string("%s", ss);
+ sdsfree(ss);
+ return reply;
+}
+
+static struct monitor_reply *show_session_detail_usage(void)
+{
+ sds ss = sdsempty();
+ ss = sdscatprintf(ss, "Usage: show session detial [options]\n");
+ ss = sdscatprintf(ss, "Options:\n");
+ ss = sdscatprintf(ss, " -h, --help help\tdisplay usage information and exit\n");
+ ss = sdscatprintf(ss, " id <session id>\n");
+ struct monitor_reply *reply = monitor_reply_new_string("%s", ss);
+ sdsfree(ss);
+ return reply;
+}
+
+/*
+ "show session ..." command args parser
+ return:
+ NULL: success, brief_opt is filled
+ not NULL: error message
+ */
+static struct monitor_reply *show_session_brief_cli_args_parse(int argc, char *argv[], struct stm_show_session_brief_opt *brief_opt)
+{
+ uint32_t ipaddr_family = 0, history_ipaddr_family = 0;
+ sds ss = NULL;
+ uint32_t flag;
+ struct monitor_reply *error_reply = NULL;
+ const char *opt_name, *opt_value;
+
+ stm_session_brief_cli_args_set_default(brief_opt);
+ int i = 3; // skip "show session brief"
+ while (i < argc)
+ {
+ ipaddr_family = 0;
+ opt_name = argv[i];
+ flag = stm_session_brief_cli_args_get_flags(opt_name);
+ if (i + 1 >= argc)
+ {
+ error_reply = monitor_reply_new_error("option %s requires an argument\n", opt_name);
+ return error_reply;
+ }
+ opt_value = argv[++i];
+
+ switch (flag)
+ {
+ case SESSION_SCAN_TYPE:
+ {
+ if (strncasecmp(opt_value, "tcp", 3) == 0)
+ {
+ brief_opt->scan_opt.type = SESSION_TYPE_TCP;
+ }
+ else if (strncasecmp(opt_value, "udp", 3) == 0)
+ {
+ brief_opt->scan_opt.type = SESSION_TYPE_UDP;
+ }
+ else
+ {
+ error_reply = monitor_reply_new_error("unsupported protocol type: %s. should be <tcp|udp>\n", opt_value);
+ goto error_exit;
+ }
+ }
+ break;
+ case SESSION_SCAN_STATE:
+ {
+ enum session_state tmp_state = show_session_state_pton(opt_value);
+ if (tmp_state == MAX_STATE)
+ {
+ error_reply = monitor_reply_new_error("unrecognized session state: %s. should be <opening|active|closing>\n", opt_value);
+ goto error_exit;
+ }
+ brief_opt->scan_opt.state = tmp_state;
+ }
+ break;
+ // case SESSION_SCAN_ID:
+ // if (stm_string_isdigit(opt_value) == 0)
+ // {
+ // *error_reply = monitor_reply_new_error("invalid session id: %s. should be integer in range: <1 - UINT64_MAX>\n", opt_value);
+ // goto error_exit;
+ // }
+ // show_opt->detail_opt.sess_id = strtoull(opt_value, NULL, 10);
+ // break;
+ case SESSION_SCAN_SIP:
+ ipaddr_family = show_session_ipaddr_ntop(opt_value, &brief_opt->scan_opt, SESSION_SCAN_SIP);
+ if (ipaddr_family == 0)
+ {
+ error_reply = monitor_reply_new_error("invalid sip address: %s\n", opt_value);
+ goto error_exit;
+ }
+ brief_opt->scan_opt.addr_family = ipaddr_family;
+ break;
+ case SESSION_SCAN_DIP:
+ ipaddr_family = show_session_ipaddr_ntop(opt_value, &brief_opt->scan_opt, SESSION_SCAN_DIP);
+ if (ipaddr_family == 0)
+ {
+ error_reply = monitor_reply_new_error("invalid dip address: %s\n", opt_value);
+ goto error_exit;
+ }
+ brief_opt->scan_opt.addr_family = ipaddr_family;
+ break;
+
+ case SESSION_SCAN_SNET:
+ {
+ uint32_t ipv4addr, ipv4mask;
+ struct in6_addr ipv6addr, ipv6mask;
+ ipaddr_family = stm_ip_cidr_pton(opt_value, &ipv4addr, &ipv4mask, &ipv6addr, &ipv6mask);
+ if (ipaddr_family == 0)
+ {
+ error_reply = monitor_reply_new_error("invalid snet CIDR address: %s\n", opt_value);
+ goto error_exit;
+ }
+ if (AF_INET == ipaddr_family)
+ {
+ uint32_t ipv4_range[2];
+ stm_ipv4_cidr_to_range(ipv4addr, ipv4mask, ipv4_range);
+ brief_opt->scan_opt.src_addr[0].v4.s_addr = ipv4_range[0];
+ brief_opt->scan_opt.src_addr[1].v4.s_addr = ipv4_range[1];
+ }
+ else
+ {
+ struct in6_addr ipv6_range[2];
+ stm_ipv6_cidr_to_range(&ipv6addr, &ipv6mask, ipv6_range);
+ brief_opt->scan_opt.src_addr[0].v6 = ipv6_range[0];
+ brief_opt->scan_opt.src_addr[1].v6 = ipv6_range[1];
+ }
+ brief_opt->scan_opt.addr_family = ipaddr_family;
+ flag = SESSION_SCAN_SIP;
+ }
+ break;
+ case SESSION_SCAN_DNET:
+ {
+ uint32_t ipv4addr, ipv4mask;
+ struct in6_addr ipv6addr, ipv6mask;
+ ipaddr_family = stm_ip_cidr_pton(opt_value, &ipv4addr, &ipv4mask, &ipv6addr, &ipv6mask);
+ if (ipaddr_family == 0)
+ {
+ error_reply = monitor_reply_new_error("invalid dnet CIDR address: %s\n", opt_value);
+ goto error_exit;
+ }
+ if (AF_INET == ipaddr_family)
+ {
+ uint32_t ipv4_range[2];
+ stm_ipv4_cidr_to_range(ipv4addr, ipv4mask, ipv4_range);
+ brief_opt->scan_opt.dst_addr[0].v4.s_addr = ipv4_range[0];
+ brief_opt->scan_opt.dst_addr[1].v4.s_addr = ipv4_range[1];
+ }
+ else
+ {
+ struct in6_addr ipv6_range[2];
+ stm_ipv6_cidr_to_range(&ipv6addr, &ipv6mask, ipv6_range);
+ brief_opt->scan_opt.dst_addr[0].v6 = ipv6_range[0];
+ brief_opt->scan_opt.dst_addr[1].v6 = ipv6_range[1];
+ }
+ brief_opt->scan_opt.addr_family = ipaddr_family;
+ flag = SESSION_SCAN_DIP;
+ }
+ break;
+ case SESSION_SCAN_SPORT:
+ {
+ if (stm_string_isdigit(opt_value) == 0)
+ {
+ error_reply = monitor_reply_new_error("illegal sport: %s. should be integer\n", opt_value);
+ goto error_exit;
+ }
+ int tmp_val = atoi(opt_value);
+ if (tmp_val <= 0 || tmp_val > 65535)
+ {
+ error_reply = monitor_reply_new_error("illegal sport: %s. should be <1-65535>\n", opt_value);
+ goto error_exit;
+ }
+ brief_opt->scan_opt.src_port = htons((unsigned short)tmp_val);
+ }
+ break;
+ case SESSION_SCAN_DPORT:
+ {
+ if (stm_string_isdigit(opt_value) == 0)
+ {
+ error_reply = monitor_reply_new_error("illegal sport: %s. should be integer\n", opt_value);
+ goto error_exit;
+ }
+ int tmp_val = atoi(opt_value);
+ if (tmp_val <= 0 || tmp_val > 65535)
+ {
+ error_reply = monitor_reply_new_error("illegal sport: %s. should be <1-65535>\n", opt_value);
+ goto error_exit;
+ }
+ brief_opt->scan_opt.dst_port = htons((unsigned short)tmp_val);
+ }
+ break;
+
+ case SESSION_SCAN_CURSOR:
+ brief_opt->scan_opt.cursor = strtoul(opt_value, NULL, 10);
+ break;
+ case SESSION_SCAN_COUNT:
+ brief_opt->scan_opt.count = strtoul(opt_value, NULL, 10);
+ if (brief_opt->scan_opt.count == 0)
+ {
+ error_reply = monitor_reply_new_error("illegal count: %s. should be <1 - UINT32_MAX>\n", opt_value);
+ goto error_exit;
+ }
+ break;
+ case SESSION_SCAN_LIMIT:
+ brief_opt->limit = strtoul(opt_value, NULL, 10);
+ if (brief_opt->limit == 0 || brief_opt->limit > SHOW_SESSION_BRIEF_LIMIT_MAX)
+ {
+ error_reply = monitor_reply_new_error("illegal limit: %s. should be <1 - %u>\n", opt_value, SHOW_SESSION_BRIEF_LIMIT_MAX);
+ goto error_exit;
+ }
+ break;
+ case SESSION_SCAN_CREATE_TIME:
+ {
+ time_t tmp_range[2] = {0, 0};
+ if (stm_time_range_pton(opt_value, time(NULL), tmp_range) < 0)
+ {
+ error_reply = monitor_reply_new_error("invalid create-time-in: %s. \r\nsyntax: <last-N-[seconds|minutes|hours|days]>\n", opt_value);
+ goto error_exit;
+ }
+ brief_opt->scan_opt.create_time_ms[0] = tmp_range[0] * 1000; // second to ms
+ brief_opt->scan_opt.create_time_ms[1] = tmp_range[1] * 1000; // second to ms
+ }
+ break;
+ case SESSION_SCAN_LASPKT_TIME:
+ {
+ time_t tmp_range[2] = {0, 0};
+ if (stm_time_range_pton(opt_value, time(NULL), tmp_range) < 0)
+ {
+ error_reply = monitor_reply_new_error("invalid last-pkt-time-in: %s. \r\nsyntax: <last-N-[seconds|minutes|hours|days]>\n", opt_value);
+ goto error_exit;
+ }
+ brief_opt->scan_opt.laspkt_time_ms[0] = tmp_range[0] * 1000; // second to ms
+ brief_opt->scan_opt.laspkt_time_ms[1] = tmp_range[1] * 1000; // second to ms
+ }
+ break;
+ default:
+ error_reply = monitor_reply_new_error("unrecognized params: %s \n", opt_name);
+ return error_reply;
+ }
+
+ if ((history_ipaddr_family != 0) && (ipaddr_family != 0) && (history_ipaddr_family != ipaddr_family))
+ {
+ error_reply = monitor_reply_new_error("contradictory ip version, expression rejects all sessions!\n");
+ goto error_exit;
+ }
+ history_ipaddr_family = ipaddr_family;
+ i++; // to next option
+ brief_opt->scan_opt.flags |= flag;
+ }
+ ss = show_session_cli_args_sanity_check(brief_opt);
+ if (ss && sdslen(ss) > 0)
+ {
+ error_reply = monitor_reply_new_error("%s\n", ss);
+ sdsfree(ss);
+ goto error_exit;
+ }
+ sdsfree(ss);
+ error_reply = NULL;
+ return NULL;
+
+error_exit:
+ return error_reply;
+}
+
+static void get_single_session_brief(struct session *sess, int thread_id, struct show_session_brief_result *brief)
+{
+ brief->thread_index = thread_id;
+ brief->state = session_get_current_state(sess);
+ brief->protocol = session_get_type(sess);
+ session_is_symmetric(sess, &brief->flow_dir);
+ brief->create_time_in_sec = session_get_timestamp(sess, SESSION_TIMESTAMP_START) / 1000; // ms to sec
+ brief->last_pkt_time_in_sec = session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) / 1000; // ms to sec
+ const struct tuple6 *tp6 = session_get_tuple6(sess);
+ memcpy(&brief->addr, tp6, sizeof(struct tuple6));
+}
+
+struct iovec show_session_brief_on_worker_thread_rpc_cb(int thread_idx, struct iovec request, void *args)
+{
+ struct iovec response = {};
+ assert(request.iov_len == sizeof(struct stm_show_session_brief_opt));
+ struct stm_show_session_brief_opt *show_brief_opt = (struct stm_show_session_brief_opt *)request.iov_base;
+
+ uint64_t session_id_array[SHOW_SESSION_BRIEF_LIMIT_MAX];
+ uint64_t session_id_array_count = SHOW_SESSION_BRIEF_LIMIT_MAX;
+ struct session_manager *sess_mgr = stellar_module_get_session_manager((struct stellar_module_manager *)args);
+ struct session_manager_rte *sess_mgr_rt = session_manager_get_runtime(sess_mgr, thread_idx);
+ session_id_array_count = session_manager_rte_scan_session(sess_mgr_rt, &show_brief_opt->scan_opt, session_id_array, show_brief_opt->limit);
+ if (session_id_array_count == 0)
+ {
+ // no session match the filter params, but no error! still need to build a empty reply!
+ // go on !!!
+ response.iov_base = NULL;
+ response.iov_len = 0;
+ return response;
+ }
+
+ struct show_session_brief_result *brief_result_array = (struct show_session_brief_result *)calloc(session_id_array_count, sizeof(struct show_session_brief_result));
+
+ for (uint32_t i = 0; i < session_id_array_count; i++)
+ {
+ struct session *sess = session_manager_rte_lookup_session_by_id(sess_mgr_rt, session_id_array[i]);
+ assert(sess != NULL);
+ get_single_session_brief(sess, thread_idx, &brief_result_array[i]);
+ brief_result_array[i].sid = session_id_array[i];
+ }
+ response.iov_base = brief_result_array;
+ response.iov_len = session_id_array_count;
+ return response;
+}
+
+static sds session_brief_to_readable(const struct show_session_brief_result *sess_brief)
+{
+ char time_buf[64];
+ char addr_buf[256];
+ sds ss = sdsempty();
+ strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_brief->create_time_in_sec));
+ ss = sdscatprintf(ss, "%-6d %-21lu %-5s %-8s %-20s %s", sess_brief->thread_index, sess_brief->sid,
+ (sess_brief->protocol == SESSION_TYPE_TCP) ? "tcp" : "udp",
+ stm_session_state_ntop(sess_brief->state), time_buf,
+ stm_get0_readable_session_addr(&sess_brief->addr, addr_buf, sizeof(addr_buf)));
+ return ss;
+}
+
+static struct monitor_reply *show_session_brief_cmd_cb(struct stellar_monitor *monitor, int argc, char *argv[], void *arg)
+{
+ struct stm_show_session_brief_opt brief_opt = {};
+ if (show_session_is_help(argc, argv))
+ {
+ return show_session_brief_usage();
+ }
+ struct monitor_reply *error_reply = show_session_brief_cli_args_parse(argc, argv, &brief_opt);
+ if (error_reply != NULL)
+ {
+ return error_reply;
+ }
+ struct monitor_reply *cmd_reply;
+ struct stellar_module_manager *mod_mgr = (struct stellar_module_manager *)arg;
+ int thread_num = stellar_module_manager_get_max_thread_num(mod_mgr);
+ struct iovec request;
+ request.iov_base = (void *)&brief_opt;
+ request.iov_len = sizeof(struct stm_show_session_brief_opt);
+
+ struct iovec response[thread_num];
+ memset(response, 0, sizeof(response));
+ size_t tot_sess_id_num = 0;
+ sds ss = sdsempty();
+ ss = sdscatprintf(ss, "%-6s %-21s %-5s %-8s %-20s %s\r\n", "thread", "session-id", "proto", "state", "create-time", "tuple4(sip:sport-dip:dport)");
+ ss = sdscatprintf(ss, "--------------------------------------------------------------------------------------------\r\n");
+
+ for (int i = 0; i < thread_num; i++)
+ {
+ response[i] = monitor_worker_thread_rpc(monitor, i, request, show_session_brief_on_worker_thread_rpc_cb, mod_mgr);
+ if (response[i].iov_base == NULL || response[i].iov_len == 0) // empty result
+ {
+ continue;
+ }
+ struct show_session_brief_result *brief_res_array = (struct show_session_brief_result *)(response[i].iov_base);
+ tot_sess_id_num += response[i].iov_len; // session_id_array_count
+ for (size_t j = 0; j < response[i].iov_len; j++)
+ {
+ ss = sdscatprintf(ss, "%s\n", session_brief_to_readable(&brief_res_array[j]));
+ }
+ if (tot_sess_id_num >= brief_opt.limit)
+ {
+ break;
+ }
+ }
+
+ if (tot_sess_id_num == 0)
+ {
+ cmd_reply = monitor_reply_new_string("No session found");
+ goto empty_result;
+ }
+ cmd_reply = monitor_reply_new_string("%s", ss);
+
+empty_result:
+ sdsfree(ss);
+ for (int i = 0; i < thread_num; i++)
+ {
+ if (response[i].iov_base)
+ {
+ free(response[i].iov_base);
+ }
+ }
+ return cmd_reply;
+}
+
+/*
+ todo: add thread id,
+ fast patch, avoid traversing all worker threads
+ */
+static struct monitor_reply *show_session_detail_cli_args_parse(int argc, char *argv[], struct stm_show_session_detail_opt *detail_opt)
+{
+ if (argc < 4)
+ {
+ return monitor_reply_new_error("missing session id\n");
+ }
+ if (argc > 5)
+ {
+ return monitor_reply_new_error("too many arguments\n");
+ }
+ if (strncasecmp(argv[3], "id", 2) != 0)
+ {
+ return monitor_reply_new_error("missing session id\n");
+ }
+
+ if (stm_string_isdigit(argv[4]) == 0)
+ {
+ return monitor_reply_new_error("invalid session id: %s. should be integer in range: <1 - UINT64_MAX>\n", argv[4]);
+ }
+ detail_opt->sess_id = strtoull(argv[4], NULL, 10);
+ return NULL;
+}
+
+struct iovec show_session_detail_on_worker_thread_rpc_cb(int thread_idx, struct iovec request, void *args)
+{
+ struct iovec response = {};
+ assert(request.iov_len == sizeof(struct stm_show_session_detail_opt));
+ struct stm_show_session_detail_opt *detail_opt = (struct stm_show_session_detail_opt *)request.iov_base;
+
+ struct session_manager *sess_mgr = stellar_module_get_session_manager((struct stellar_module_manager *)args);
+ struct session_manager_rte *sess_mgr_rt = session_manager_get_runtime(sess_mgr, thread_idx);
+
+ struct session *sess = session_manager_rte_lookup_session_by_id(sess_mgr_rt, detail_opt->sess_id);
+ if (NULL == sess)
+ {
+ return response;
+ }
+
+ struct show_session_detail_result *detail_res = (struct show_session_detail_result *)calloc(1, sizeof(struct show_session_detail_result));
+ get_single_session_brief(sess, thread_idx, &detail_res->sess_brief);
+ detail_res->sess_brief.sid = detail_opt->sess_id;
+ detail_res->direction = session_get_direction(sess);
+ // todo, get some exact stat, not all
+ memcpy(detail_res->sess_stat, sess->stats, sizeof(detail_res->sess_stat));
+
+ // todo, get application info
+
+ response.iov_base = detail_res;
+ response.iov_len = sizeof(struct show_session_detail_result);
+ return response;
+}
+
+static sds session_detail_to_readable(const struct show_session_detail_result *sess_detail)
+{
+ char addr_buf[256];
+ sds ss = sdsempty();
+ char time_buf[64];
+#define SHOW_SESSION_DETAIL_NAME_FORMAT "%-30s"
+#define SHOW_SESSION_DETAIL_VALUE_FORMAT "%llu"
+ const char *flow_str[MAX_FLOW_TYPE] = {"C2S flow", "S2C flow"};
+ ss = sdscatprintf(ss, "%-15s: %lu\r\n", "session-id", sess_detail->sess_brief.sid);
+ ss = sdscatprintf(ss, "%-15s: %d\r\n", "thread", sess_detail->sess_brief.thread_index);
+ ss = sdscatprintf(ss, "%-15s: %s\r\n", "state", stm_session_state_ntop(sess_detail->sess_brief.state));
+ ss = sdscatprintf(ss, "%-15s: %s\r\n", "protocol", (sess_detail->sess_brief.protocol == SESSION_TYPE_TCP) ? "tcp" : "udp");
+ strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_detail->sess_brief.create_time_in_sec));
+ ss = sdscatprintf(ss, "%-15s: %s\r\n", "create-time", time_buf);
+ strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_detail->sess_brief.last_pkt_time_in_sec));
+ ss = sdscatprintf(ss, "%-15s: %s\r\n", "last-pkt-time", time_buf);
+ ss = sdscatprintf(ss, "%-15s: %s\r\n", "tuple4", stm_get0_readable_session_addr(&sess_detail->sess_brief.addr, addr_buf, sizeof(addr_buf)));
+ ss = sdscatprintf(ss, "%-15s: %s\r\n", "symmetric", stm_session_flow_dir_ntop(sess_detail->sess_brief.flow_dir));
+ ss = sdscatprintf(ss, "%-15s: %s\r\n", "direction", sess_detail->direction == SESSION_DIRECTION_INBOUND ? "INBOUND" : "OUTBOUND");
+
+ ss = sdscatprintf(ss, "statistics:\r\n");
+ char printf_format[256];
+ snprintf(printf_format, sizeof(printf_format), "\t%s : %s\r\n", SHOW_SESSION_DETAIL_NAME_FORMAT, SHOW_SESSION_DETAIL_VALUE_FORMAT);
+ for (int flow = 0; flow < MAX_FLOW_TYPE; flow++)
+ {
+ ss = sdscatprintf(ss, " %s:\r\n", flow_str[flow]);
+ ss = sdscatprintf(ss, printf_format, "raw_packets_received", sess_detail->sess_stat[flow][STAT_RAW_PACKETS_RECEIVED]);
+ ss = sdscatprintf(ss, printf_format, "raw_bytes_received", sess_detail->sess_stat[flow][STAT_RAW_BYTES_RECEIVED]);
+ ss = sdscatprintf(ss, printf_format, "duplicate_packets_bypass", sess_detail->sess_stat[flow][STAT_DUPLICATE_PACKETS_BYPASS]);
+ ss = sdscatprintf(ss, printf_format, "injected_packets", sess_detail->sess_stat[flow][STAT_INJECTED_PACKETS_SUCCESS]);
+ ss = sdscatprintf(ss, printf_format, "injected_bytes", sess_detail->sess_stat[flow][STAT_INJECTED_BYTES_SUCCESS]);
+ if (SESSION_TYPE_TCP == sess_detail->sess_brief.protocol)
+ {
+ ss = sdscatprintf(ss, printf_format, "tcp_segments_retransmit", sess_detail->sess_stat[flow][STAT_TCP_SEGMENTS_RETRANSMIT]);
+ ss = sdscatprintf(ss, printf_format, "tcp_payloads_retransmit", sess_detail->sess_stat[flow][STAT_TCP_PAYLOADS_RETRANSMIT]);
+ ss = sdscatprintf(ss, printf_format, "tcp_segments_reordered", sess_detail->sess_stat[flow][STAT_TCP_SEGMENTS_REORDERED]);
+ ss = sdscatprintf(ss, printf_format, "tcp_payloads_reordered", sess_detail->sess_stat[flow][STAT_TCP_PAYLOADS_REORDERED]);
+ }
+ }
+
+ // todo:
+ ss = sdscatprintf(ss, "\r\n \033[31mtodo: add security policy rule id, HTTP URL, Server FQDN, etc... \033[0m");
+ return ss;
+}
+
+static struct monitor_reply *show_session_detail_cmd_cb(struct stellar_monitor *stm, int argc, char *argv[], void *arg)
+{
+ struct stm_show_session_detail_opt detail_opt = {};
+ detail_opt.thread_idx = -1;
+
+ if (show_session_is_help(argc, argv))
+ {
+ return show_session_detail_usage();
+ }
+ struct monitor_reply *error_reply = show_session_detail_cli_args_parse(argc, argv, &detail_opt);
+ if (error_reply != NULL)
+ {
+ return error_reply;
+ }
+
+ struct monitor_reply *cmd_reply;
+ struct stellar_module_manager *mod_mgr = (struct stellar_module_manager *)arg;
+ int thread_num = stellar_module_manager_get_max_thread_num(mod_mgr);
+ struct iovec request;
+ request.iov_base = (void *)&detail_opt;
+ request.iov_len = sizeof(struct stm_show_session_detail_opt);
+
+ struct iovec response[thread_num];
+ memset(response, 0, sizeof(response));
+ size_t tot_sess_id_num = 0;
+ sds ss = sdsempty();
+
+ for (int i = 0; i < thread_num; i++)
+ {
+ if (detail_opt.thread_idx != -1 && detail_opt.thread_idx != i)
+ {
+ continue;
+ }
+ response[i] = monitor_worker_thread_rpc(stm, i, request, show_session_detail_on_worker_thread_rpc_cb, mod_mgr);
+ if (response[i].iov_base == NULL || response[i].iov_len == 0) // empty result
+ {
+ continue;
+ }
+ struct show_session_detail_result *detail_res = (struct show_session_detail_result *)(response[i].iov_base);
+ ss = sdscatprintf(ss, "%s\n", session_detail_to_readable(detail_res));
+ tot_sess_id_num++;
+ break;
+ }
+ if (tot_sess_id_num == 0)
+ {
+ cmd_reply = monitor_reply_new_string("No session found by id %lu", detail_opt.sess_id);
+ goto empty_result;
+ }
+ cmd_reply = monitor_reply_new_string("%s", ss);
+
+empty_result:
+ sdsfree(ss);
+ for (int i = 0; i < thread_num; i++)
+ {
+ if (response[i].iov_base)
+ {
+ free(response[i].iov_base);
+ }
+ }
+ return cmd_reply;
+}
+
+int show_session_enforcer_init(struct stellar_module_manager *mod_mgr, struct stellar_monitor *stm)
+{
+ monitor_register_cmd(stm, "show session brief", show_session_brief_cmd_cb, "readonly",
+ "[sip | dip | sport | dport | protocol | state | cursor | count | limit | create-time-in | last-pkt-time-in ]",
+ "Show session brief information", (void *)mod_mgr);
+ monitor_register_cmd(stm, "show session detail", show_session_detail_cmd_cb, "readonly",
+ "id <id>",
+ "Show session verbose information", (void *)mod_mgr);
+ return 0;
+}
diff --git a/infra/monitor/monitor_cmd_assistant.c b/infra/monitor/monitor_cmd_assistant.c
new file mode 100644
index 0000000..a2d02fa
--- /dev/null
+++ b/infra/monitor/monitor_cmd_assistant.c
@@ -0,0 +1,537 @@
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#include "cJSON.h"
+#include "sds/sds.h"
+#include "monitor_private.h"
+#include "monitor_cmd_assistant.h"
+#include "monitor_utils.h"
+#ifdef __cplusplus
+}
+#endif
+
+struct stm_cmd_spec
+{
+ const char *cmd_name;
+ const char *cmd_flags;
+ const char *cmd_hint;
+ int cmd_arity;
+ int cmd_first_key_offset;
+};
+
+struct stm_cmd_assistant
+{
+ cJSON *cjson_root;
+ stm_cmd_assistant_completion_cb *cmd_completion_cb;
+ stm_cmd_assistant_hints_cb *cmd_hints_cb;
+ sds hints_result; // mallo and free for every hits
+};
+static __thread struct stm_cmd_assistant *__g_stm_cli_assistant = NULL;
+
+static cJSON *stm_cli_register_cmd(cJSON *father_next_array, const char *cur_cmd_prefix)
+{
+ cJSON *cur_level_item = NULL;
+
+ /* search current cmd (name is prefix) in father->next array[] */
+ int array_size = cJSON_GetArraySize(father_next_array);
+ for (int i = 0; i < array_size; i++)
+ {
+ cur_level_item = cJSON_GetArrayItem(father_next_array, i);
+ cJSON *cmd_name_item = cJSON_GetObjectItem(cur_level_item, "prefix");
+ if (cmd_name_item != NULL && (0 == strcmp(cur_cmd_prefix, cmd_name_item->valuestring)))
+ {
+ break;
+ }
+ else
+ {
+ cur_level_item = NULL;
+ }
+ }
+ if (NULL == cur_level_item)
+ {
+ /* if not exist, create new cmd (name is prefix) */
+ cur_level_item = cJSON_CreateObject();
+ cJSON_AddItemToObject(cur_level_item, "prefix", cJSON_CreateString(cur_cmd_prefix));
+ cJSON *new_cmd_next_array = cJSON_CreateArray();
+ cJSON_AddItemToObject(cur_level_item, "next", new_cmd_next_array);
+
+ /* insert into father->next array */
+ cJSON_AddItemToArray(father_next_array, cur_level_item);
+ }
+ else
+ {
+ ; // already exist, do nothing
+ }
+ return cur_level_item;
+}
+
+/*
+ search json object by cli_cmd_line, if exsit , return 0, do nothing,
+ the search json object function can be used for register_usage();
+ if not exist, create new json object and insert into father->next array
+*/
+int stm_cmd_assistant_register(struct stm_cmd_assistant *aide, const char *cli_cmd_line)
+{
+ int argc = 0;
+ if (NULL == aide->cjson_root)
+ {
+ aide->cjson_root = cJSON_CreateArray();
+ }
+ sds *array = sdssplitargs(cli_cmd_line, &argc);
+ cJSON *father_candidate_array = aide->cjson_root;
+ cJSON *cur_cmd_obj = NULL;
+ for (int i = 0; i < argc; i++)
+ {
+ cur_cmd_obj = stm_cli_register_cmd(father_candidate_array, array[i]);
+ father_candidate_array = cJSON_GetObjectItem(cur_cmd_obj, "next");
+ }
+ sdsfreesplitres(array, argc);
+ return 0;
+}
+
+static cJSON *stm_cli_search_cmd(cJSON *father_next_array, const char *cur_cmd_prefix)
+{
+ cJSON *cur_level_item = NULL;
+ /* search current cmd (name is prefix) in father->next array[] */
+ int array_size = cJSON_GetArraySize(father_next_array);
+ for (int i = 0; i < array_size; i++)
+ {
+ cur_level_item = cJSON_GetArrayItem(father_next_array, i);
+ cJSON *cmd_name_item = cJSON_GetObjectItem(cur_level_item, "prefix");
+ if (cmd_name_item != NULL && (0 == strcmp(cur_cmd_prefix, cmd_name_item->valuestring)))
+ {
+ break;
+ }
+ else
+ {
+ cur_level_item = NULL;
+ }
+ }
+ return cur_level_item;
+}
+
+cJSON *stm_cmd_assistant_search(struct stm_cmd_assistant *aide, const char *cli_cmd_line)
+{
+ int argc = 0;
+ sds *array = sdssplitargs(cli_cmd_line, &argc);
+ cJSON *father_candidate_array = aide->cjson_root;
+ cJSON *match_item = NULL;
+ for (int i = 0; i < argc; i++)
+ {
+ match_item = stm_cli_search_cmd(father_candidate_array, array[i]);
+ if (NULL == match_item)
+ {
+ return NULL;
+ }
+ father_candidate_array = cJSON_GetObjectItem(match_item, "next");
+ }
+ sdsfreesplitres(array, argc);
+ return match_item;
+}
+
+int stm_cmd_assistant_register_usage(struct stm_cmd_assistant *aide, const char *cli_cmd_line, const char *usage)
+{
+ if (NULL == aide || NULL == cli_cmd_line || NULL == usage)
+ {
+ return -1;
+ }
+ if (strlen(cli_cmd_line) == 0)
+ {
+ cJSON *obj = cJSON_CreateObject();
+ cJSON_AddItemToObjectCS(obj, "usage", cJSON_CreateString(usage));
+ cJSON_AddItemToArray(aide->cjson_root, obj);
+ return 0;
+ }
+
+ cJSON *obj = stm_cmd_assistant_search(aide, cli_cmd_line);
+ if (NULL == obj)
+ {
+ stm_cmd_assistant_register(aide, cli_cmd_line);
+ obj = stm_cmd_assistant_search(aide, cli_cmd_line);
+ if (NULL == obj)
+ {
+ return -1;
+ }
+ }
+ cJSON_AddItemToObject(obj, "usage", cJSON_CreateString(usage));
+ return 0;
+}
+
+char *stm_cmd_assistant_verbose_print(int format)
+{
+ struct stm_cmd_assistant *aide = __g_stm_cli_assistant;
+ if (NULL == aide->cjson_root)
+ {
+ return (char *)strdup("[]");
+ }
+ char *debug_print;
+ if (format == 1)
+ {
+ debug_print = cJSON_Print(aide->cjson_root);
+ }
+ else
+ {
+ debug_print = cJSON_PrintUnformatted(aide->cjson_root);
+ }
+ return debug_print;
+}
+
+char *stm_cmd_assistant_brief_print(void)
+{
+ struct stm_cmd_assistant *aide = __g_stm_cli_assistant;
+ sds s = sdsempty();
+ s = sdscat(s, "Usage:\r\n");
+ int array_size = cJSON_GetArraySize(aide->cjson_root);
+ for (int sz = 0; sz < array_size; sz++)
+ {
+ cJSON *item = cJSON_GetArrayItem(aide->cjson_root, sz);
+ cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix");
+ s = sdscatprintf(s, "\t%s\r\n", cmd_name_item->valuestring);
+ }
+ char *print_str = strdup(s);
+ sdsfree(s);
+ return print_str;
+}
+
+int stm_cmd_assistant_json_load(struct stm_cmd_assistant *aide, const char *json_str)
+{
+ if (NULL == aide || NULL == json_str || strlen(json_str) == 0)
+ {
+ return -1;
+ }
+ cJSON *root = cJSON_Parse(json_str);
+ if (NULL == root)
+ {
+ return -1;
+ }
+ aide->cjson_root = root;
+ return 0;
+}
+
+void stm_cmd_assistant_free(struct stm_cmd_assistant *aide)
+{
+ if (aide)
+ {
+ if (aide->cjson_root)
+ {
+ cJSON_Delete(aide->cjson_root);
+ }
+ if (aide->hints_result)
+ {
+ sdsfree(aide->hints_result);
+ }
+ free(aide);
+ }
+}
+
+struct stm_cmd_assistant *stm_cmd_assistant_new(void)
+{
+ struct stm_cmd_assistant *aide = calloc(1, sizeof(struct stm_cmd_assistant));
+ aide->cjson_root = NULL;
+ aide->hints_result = NULL;
+ __g_stm_cli_assistant = aide;
+ return aide;
+}
+
+struct stm_cmd_assistant *stm_cmd_assistant_get(void)
+{
+ return __g_stm_cli_assistant;
+}
+
+static int stm_cmd_exist(struct stm_cmd_assistant *aide, const char *cmd_name)
+{
+ cJSON *root = aide->cjson_root;
+ if (NULL == root)
+ {
+ return 0;
+ }
+ int array_size = cJSON_GetArraySize(root);
+ for (int sz = 0; sz < array_size; sz++)
+ {
+ cJSON *item = cJSON_GetArrayItem(root, sz);
+ cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix");
+ if (cmd_name_item && cmd_name_item->valuestring)
+ {
+ if (strcasecmp(cmd_name, cmd_name_item->valuestring) == 0)
+ {
+ return 1;
+ }
+ }
+ }
+ return 0;
+}
+
+/*
+ * return value:
+ * 0: success
+ * -1: failed
+ * 1: already exist
+ */
+int stm_cmd_assistant_register_cmd(struct stm_cmd_assistant *aide, const char *cmd_name, void *cmd_cb, void *cmd_arg,
+ const char *flags, const char *hint, const char *desc)
+{
+ if (NULL == aide || NULL == cmd_name || NULL == cmd_cb)
+ {
+ return -1;
+ }
+ if (stm_cmd_exist(aide, cmd_name))
+ {
+ return 1;
+ }
+ if (stm_strncasecmp_exactly(flags, "readonly", 8) != 0 && stm_strncasecmp_exactly(flags, "write", 5) != 0)
+ {
+ return -1;
+ }
+ cJSON *obj = cJSON_CreateObject();
+ cJSON_AddItemToObject(obj, "prefix", cJSON_CreateString(cmd_name));
+ cJSON_AddItemToObject(obj, "flags", cJSON_CreateString(flags));
+ cJSON_AddItemToObject(obj, "hints", cJSON_CreateString(hint));
+ cJSON_AddItemToObject(obj, "desc", cJSON_CreateString(desc));
+
+ char tmp_str[32] = {};
+ snprintf(tmp_str, sizeof(tmp_str), "%p", cmd_cb);
+ cJSON_AddItemToObject(obj, "cmd_cb", cJSON_CreateString(tmp_str));
+
+ snprintf(tmp_str, sizeof(tmp_str), "%p", cmd_arg);
+ cJSON_AddItemToObject(obj, "cmd_arg", cJSON_CreateString(tmp_str));
+
+ if (NULL == aide->cjson_root)
+ {
+ aide->cjson_root = cJSON_CreateArray();
+ }
+ cJSON_AddItemToArray(aide->cjson_root, obj);
+ return 0;
+}
+
+char *stm_cmd_assistant_serialize(struct stm_cmd_assistant *aide)
+{
+ if (NULL == aide)
+ {
+ return NULL;
+ }
+ char *debug_print = cJSON_Print(aide->cjson_root);
+ return debug_print;
+}
+
+int stm_cmd_assistant_dserialize(struct stm_cmd_assistant *aide, const char *json)
+{
+ cJSON *tmp_root = cJSON_Parse(json);
+ if (NULL == tmp_root)
+ {
+ return -1;
+ }
+ aide->cjson_root = tmp_root;
+ return 0;
+}
+
+/*
+ return value:
+ 0: equal exactly, uesed for hints, or get_cmd_cb()
+ -1: cli_array < register_cmd_array //raw command1 and command2 line has same section, and command1 line is shorter than command2, used for tab auto completion
+ 1: cli_array > register_cmd_array //raw command1 and command2 line has same section, and command1 line is longer than command2, used for get_cmd_cb()
+ -2: not match any secions
+*/
+int stm_cmd_assistant_sds_compare(sds *cli_array, int cli_argc, sds *register_cmd_array, int register_argc)
+{
+ if (0 == cli_argc || 0 == register_argc)
+ {
+ return -2;
+ }
+ // compare the first n-1 words, must be exactly match
+ int min_argc = MIN(cli_argc, register_argc);
+ for (int i = 0; i < min_argc - 1; i++)
+ {
+ // previous words must be exactly match, so use strcasecmp()
+ if (strcasecmp(cli_array[i], register_cmd_array[i]) != 0)
+ {
+ return -2;
+ }
+ }
+
+ // compare the last common word use substring match
+ if (strncasecmp(cli_array[min_argc - 1], register_cmd_array[min_argc - 1], strlen(cli_array[min_argc - 1])) == 0)
+ {
+ if (strcasecmp(cli_array[min_argc - 1], register_cmd_array[min_argc - 1]) == 0)
+ {
+ if (cli_argc == register_argc)
+ {
+ return 0;
+ }
+ else if (cli_argc < register_argc)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+ else
+ {
+ // cli command is not complete
+ return -1;
+ }
+ }
+ return -2;
+}
+
+int stm_cmd_assistant_set_completion_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_completion_cb *cb)
+{
+ aide->cmd_completion_cb = cb;
+ return 0;
+}
+
+int stm_cmd_assistant_set_hints_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_hints_cb *cb)
+{
+ aide->cmd_hints_cb = cb;
+ return 0;
+}
+
+static void cjson_traversal_completion(struct stm_cmd_assistant *aide, sds *sds_cli_array, int cli_argc, cJSON *cjson_root, void *arg)
+{
+ int array_size = cJSON_GetArraySize(cjson_root);
+ for (int sz = 0; sz < array_size; sz++)
+ {
+ cJSON *array_item = cJSON_GetArrayItem(cjson_root, sz);
+ cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix");
+ int register_cmd_section_num;
+ sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, &register_cmd_section_num);
+
+ int match = stm_cmd_assistant_sds_compare(sds_cli_array, cli_argc, register_cmd_array, register_cmd_section_num);
+ sdsfreesplitres(register_cmd_array, register_cmd_section_num);
+ if (match == 0 || match == -1)
+ {
+ aide->cmd_completion_cb(arg, prefix_item->valuestring);
+ }
+ }
+ return;
+}
+
+static void stm_cmd_assistant_completion_cb_fun(struct stm_cmd_assistant *aide, const char *buf, void *arg)
+{
+ /* input cmd buf must <= registed command
+ if buf_len < command_len ,auto completion the longest prefix
+ if buf_len == command_len, add a blank space
+ */
+ int argc = 0;
+ sds *cli_cmd_array = sdssplitargs(buf, &argc);
+ cjson_traversal_completion(aide, cli_cmd_array, argc, aide->cjson_root, arg);
+ sdsfreesplitres(cli_cmd_array, argc);
+}
+
+void stm_cmd_assistant_input_line(struct stm_cmd_assistant *aide, const char *line, void *arg)
+{
+ stm_cmd_assistant_completion_cb_fun(aide, line, arg);
+}
+
+static const char *cjson_traversal_hints(sds *sds_cli_array, int cli_argc, cJSON *cjson_root)
+{
+ int array_size = cJSON_GetArraySize(cjson_root);
+ for (int sz = 0; sz < array_size; sz++)
+ {
+ cJSON *array_item = cJSON_GetArrayItem(cjson_root, sz);
+ cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix");
+ int register_cmd_section_num;
+ sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, &register_cmd_section_num);
+
+ int match = stm_cmd_assistant_sds_compare(sds_cli_array, cli_argc, register_cmd_array, register_cmd_section_num);
+ sdsfreesplitres(register_cmd_array, register_cmd_section_num);
+ if (match == 0) // hints must be exactly match
+ {
+ cJSON *hint_item = cJSON_GetObjectItem(array_item, "hints");
+ if (hint_item)
+ {
+ return hint_item->valuestring;
+ }
+ }
+ }
+ return NULL;
+}
+
+const char *stm_cmd_assistant_input_line_for_hints(struct stm_cmd_assistant *aide, const char *line)
+{
+ int argc = 0;
+ sds *cli_cmd_array = sdssplitargs(line, &argc);
+ const char *hints = cjson_traversal_hints(cli_cmd_array, argc, aide->cjson_root);
+ sdsfreesplitres(cli_cmd_array, argc);
+ return hints;
+}
+
+static long long stm_cmd_assistant_get_item(struct stm_cmd_assistant *aide, const char *cmd_line, const char *json_item_name, const char *format)
+{
+ long long item_value = 0;
+ int array_size = cJSON_GetArraySize(aide->cjson_root);
+ int cli_argc;
+ sds *cli_cmd_array = sdssplitargs(cmd_line, &cli_argc);
+ int last_match_cmd_section_num = -1;
+ for (int sz = 0; sz < array_size; sz++)
+ {
+ cJSON *array_item = cJSON_GetArrayItem(aide->cjson_root, sz);
+ cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix");
+ int register_cmd_section_num;
+ sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, &register_cmd_section_num);
+
+ int match = stm_cmd_assistant_sds_compare(cli_cmd_array, cli_argc, register_cmd_array, register_cmd_section_num);
+ sdsfreesplitres(register_cmd_array, register_cmd_section_num);
+ if (match >= 0)
+ {
+ cJSON *hint_item = cJSON_GetObjectItem(array_item, json_item_name);
+ if (hint_item)
+ {
+ /* longest match */
+ if (register_cmd_section_num > last_match_cmd_section_num)
+ {
+ last_match_cmd_section_num = register_cmd_section_num;
+ sscanf(hint_item->valuestring, format, &item_value);
+ }
+ }
+ }
+ }
+ sdsfreesplitres(cli_cmd_array, cli_argc);
+ return item_value;
+}
+
+void *stm_cmd_assistant_get_cb(struct stm_cmd_assistant *aide, const char *cmd_line)
+{
+ return (void *)stm_cmd_assistant_get_item(aide, cmd_line, "cmd_cb", "%p");
+}
+
+void *stm_cmd_assistant_get_user_arg(struct stm_cmd_assistant *aide, const char *cmd_line)
+{
+ return (void *)stm_cmd_assistant_get_item(aide, cmd_line, "cmd_arg", "%p");
+}
+
+int stm_cmd_assistant_get_arity(struct stm_cmd_assistant *aide, const char *cmd_line)
+{
+ return (int)stm_cmd_assistant_get_item(aide, cmd_line, "arity", "%d");
+}
+
+sds stm_cmd_assistant_list_cmd_brief(struct stm_cmd_assistant *aide)
+{
+ sds res = sdsempty();
+ int array_size = cJSON_GetArraySize(aide->cjson_root);
+ for (int sz = 0; sz < array_size; sz++)
+ {
+ cJSON *item = cJSON_GetArrayItem(aide->cjson_root, sz);
+ cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix");
+ cJSON *cmd_desc_item = cJSON_GetObjectItem(item, "desc");
+ res = sdscatprintf(res, "\"%s\", %s \r\n", cmd_name_item->valuestring, cmd_desc_item ? cmd_desc_item->valuestring : "");
+ }
+ return res;
+}
+
+sds stm_cmd_assistant_list_cmd_verbose(struct stm_cmd_assistant *aide)
+{
+ if (NULL == aide->cjson_root)
+ {
+ return sdsempty();
+ }
+ char *json_str = cJSON_PrintUnformatted(aide->cjson_root);
+ sds res = sdsnew(json_str);
+ free(json_str);
+ return res;
+}
diff --git a/infra/monitor/monitor_cmd_assistant.h b/infra/monitor/monitor_cmd_assistant.h
new file mode 100644
index 0000000..02100a7
--- /dev/null
+++ b/infra/monitor/monitor_cmd_assistant.h
@@ -0,0 +1,60 @@
+#pragma once
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#include "cJSON.h"
+#include "sds/sds.h"
+
+#ifndef MIN
+#define MIN(x, y) ((x) < (y) ? (x) : (y))
+#endif
+
+#ifndef MAX
+#define MAX(x, y) ((x) > (y) ? (x) : (y))
+#endif
+ struct stm_cmd_assistant;
+ struct stm_cmd_spec;
+
+ struct stm_cmd_assistant *stm_cmd_assistant_new();
+ void stm_cmd_assistant_free(struct stm_cmd_assistant *aide);
+ int stm_cmd_assistant_json_load(struct stm_cmd_assistant *aide, const char *json_str);
+ char *stm_cmd_assistant_brief_print(void);
+ char *stm_cmd_assistant_verbose_print(int format);
+ int stm_cmd_assistant_is_help(const char *line);
+ cJSON *stm_cmd_assistant_search(struct stm_cmd_assistant *aide, const char *cli_cmd_line);
+ int stm_cmd_assistant_register(struct stm_cmd_assistant *aide, const char *cli_cmd_line);
+ int stm_cmd_assistant_register_usage(struct stm_cmd_assistant *aide, const char *cli_cmd_line, const char *usage);
+
+ // return value shoule be free after uesd.
+ char *stm_cmd_assistant_serialize(struct stm_cmd_assistant *aide);
+ int stm_cmd_assistant_dserialize(struct stm_cmd_assistant *aide, const char *json);
+
+ /*
+ * return value:
+ * 0: success
+ * -1: failed
+ * 1: already exist
+ */
+ int stm_cmd_assistant_register_cmd(struct stm_cmd_assistant *aide, const char *cmd, void *cmd_cb, void *cmd_arg,
+ const char *flags, const char *hint, const char *description);
+
+ int stm_cmd_assistant_sds_compare(sds *cli_array, int cli_argc, sds *register_cmd_array, int register_argc);
+ typedef void(stm_cmd_assistant_completion_cb)(void *arg, const char *candidate_completion);
+ typedef char *(stm_cmd_assistant_hints_cb)(const char *line);
+ int stm_cmd_assistant_set_completion_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_completion_cb *cb);
+ int stm_cmd_assistant_set_hints_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_hints_cb *cb);
+ void stm_cmd_assistant_input_line(struct stm_cmd_assistant *aide, const char *line, void *arg);
+ const char *stm_cmd_assistant_input_line_for_hints(struct stm_cmd_assistant *aide, const char *line);
+ struct stm_cmd_assistant *stm_cmd_assistant_get(void);
+
+ // struct stm_cmd_spec *stm_cmd_assistant_get_cmd(struct stm_cmd_assistant *aide, const char *cmd);
+ void *stm_cmd_assistant_get_cb(struct stm_cmd_assistant *aide, const char *cmd);
+ void *stm_cmd_assistant_get_user_arg(struct stm_cmd_assistant *aide, const char *cmd);
+ int stm_cmd_assistant_get_arity(struct stm_cmd_assistant *aide, const char *cmd);
+ sds stm_cmd_assistant_list_cmd_brief(struct stm_cmd_assistant *aide);
+ sds stm_cmd_assistant_list_cmd_verbose(struct stm_cmd_assistant *aide);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/infra/monitor/monitor_private.h b/infra/monitor/monitor_private.h
new file mode 100644
index 0000000..2e18836
--- /dev/null
+++ b/infra/monitor/monitor_private.h
@@ -0,0 +1,331 @@
+#pragma once
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <pthread.h>
+#include <netinet/in.h>
+#include <pcap/pcap.h>
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#include "stellar/monitor.h"
+#include "sds/sds.h"
+
+#include "stellar/module.h"
+#include "stellar/log.h"
+
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/bufferevent.h>
+#include <event2/buffer.h>
+#include <event2/thread.h>
+#include <event2/http.h>
+#include "monitor_rpc.h"
+ /********************************** limit definition *****************************************/
+
+#ifndef STELLAR_MAX_THREAD_NUM
+#define STELLAR_MAX_THREAD_NUM (256)
+#endif
+#define STM_RINGBUF_SIZE (1024 * 1024) /* per thread */
+#define STM_CONNECTION_IDLE_TIMEOUT 300 /* How many seconds elapsed without input command, connection will closed */
+#define STM_REQUEST_TIMEOUT 5
+
+#define STM_SERVER_LISTEN_IP "127.0.0.1"
+#define STM_SERVER_LISTEN_PORT 80
+#define STM_TZSP_UDP_PORT 37008 /* default port of TZSP protocol: https://en.wikipedia.org/wiki/TZSP# */
+
+#define STM_SESSION_DEFAULT_SEARCH_COUNT 100 /* if no count params, max search session number */
+#define STM_SESSION_DEFAULT_LIMIT_NUM 10 /* if no limit params, max support result session number */
+#define STM_SESSION_MAX_LIMIT_NUM 1000
+
+#define STM_UINT64_READABLE_STRING_MAX_LEN 21 /* MAX value is: 18446744073709551615 */
+#define STM_UINT32_READABLE_STRING_MAX_LEN 11 /* MAX value is: 4294967295 */
+
+#define STM_CONNECTIVITY_DEFALUT_COUNT 5 /* ping default count */
+#define STM_CONNECTIVITY_DEFALUT_SIZE 64 /* ping default bytes */
+#define STM_CONNECTIVITY_MAX_COUNT 100 /* ping max count */
+#define STM_CONNECTIVITY_MAX_SIZE 65535 /* ping max bytes */
+
+ /************************************************************************/
+#define STM_CMD_CALLBACK_THREAD_LOCAL_MAGIC (0x1234ABCD)
+#define STM_RINGBUF_HDR_MAGIC (0x0ABCD12345678)
+
+#define STM_RINGBUF_THREAD_IDX_SERVER 0
+#define STM_RINGBUF_THREAD_IDX_AGENT 1
+
+#define STM_MONITOR_THREAD_ID 0 // There are only two threads, use fix id
+#define STM_WORKER_THREAD_ID 1 // There are only two threads, use fix id
+
+#define STM_LOG_MODULE_NAME "monitor"
+#define STM_STAT_OUTPUT_PATH "log/monitor.fs4"
+#define STM_STAT_OUTPUT_INTERVAL_MS 3000
+#define STM_RESTFUL_VERSION "v1"
+#define STM_RESTFUL_RESOURCE "stellar_monitor"
+#define STM_RESTFUL_URI_CMD_KEY "raw_cmd" // example: http://127.0.0.1:80/v1/stellar_monitor?raw_cmd=show%20session
+#define STM_CLIENT_SERVER_SYNC_CMD "show command verbose"
+
+#define STM_CLI_CMD_HINTS_COLOR 90
+#define STM_CLI_CMD_HINTS_BOLD 0
+
+#ifndef UNUSED
+#define UNUSED __attribute__((unused))
+#endif
+
+#ifdef NDEBUG // release version
+#define STM_DBG_PRINT(fmt, args...)
+#else
+#define STM_DBG_PRINT(fmt, args...) fprintf(stderr, fmt, ##args)
+#endif
+
+#ifndef CALLOC
+#define CALLOC(type, number) ((type *)calloc(sizeof(type), number))
+#endif
+
+#ifndef FREE
+#define FREE(ptr) \
+ { \
+ if (ptr) \
+ { \
+ free((void *)ptr); \
+ ptr = NULL; \
+ } \
+ }
+#endif
+
+#define STM_TIME_START() \
+ struct timespec __start_time, __end_time; \
+ unsigned long long diff; \
+ clock_gettime(CLOCK_MONOTONIC, &__start_time);
+
+#define STM_TIME_DIFF() \
+ { \
+ clock_gettime(CLOCK_MONOTONIC, &__end_time); \
+ if (__start_time.tv_sec == __end_time.tv_sec) \
+ { \
+ diff = (unsigned long long)(__end_time.tv_nsec - __start_time.tv_nsec); \
+ } \
+ diff = ((unsigned long long)__end_time.tv_sec * 1000 * 1000 * 1000 + __end_time.tv_nsec) - ((unsigned long long)__start_time.tv_sec * 1000 * 1000 * 1000 + __start_time.tv_nsec); \
+ }
+
+#ifndef MIN
+#define MIN(x, y) ((x) < (y) ? (x) : (y))
+#endif
+
+#ifndef MAX
+#define MAX(x, y) ((x) > (y) ? (x) : (y))
+#endif
+
+#ifndef TRUE
+#define TRUE 1
+#endif
+
+#ifndef FALSE
+#define FALSE 0
+#endif
+
+#define STM_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
+#define STM_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
+#define STM_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
+#define STM_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
+
+ enum stm_http_response_code
+ {
+ STM_HTTP_200_OK = 200,
+ STM_HTTP_204_NO_CONTENT = 204,
+ STM_HTTP_403_FORBIDDEN = 403,
+ STM_HTTP_408_REQUEST_TIMEOUT = 408,
+ STM_HTTP_413_PAYLOAD_TOO_LARGE = 413,
+ };
+
+ enum stm_stat_type
+ {
+ STM_STAT_CLI_CONNECTION_NEW,
+ STM_STAT_CLI_CONNECTION_CLOSE,
+ STM_STAT_CLI_REQUEST_SUCC,
+ STM_STAT_CLI_RESPONSE_SUCC,
+ STM_STAT_CLI_REQUEST_ERR, // RESTFul Syntax error!
+ STM_STAT_CLI_RESPONSE_ERR, // attention: empty result is not error!
+ STM_STAT_MAX,
+ };
+
+ struct stm_spinlock;
+
+ struct stellar_monitor_config
+ {
+ // int thread_count;
+ size_t ringbuf_size; /* bytes */
+ int connection_idle_timeout;
+ int cli_request_timeout;
+ char *listen_ipaddr;
+ unsigned short listen_port_host_order;
+ unsigned short data_link_bind_port_host_order; // for TZSP protocol
+ int output_interval_ms;
+ char *output_path;
+ };
+
+ struct stm_key_value_tuple
+ {
+ char *key;
+ char *value;
+ };
+
+ struct stm_key_value
+ {
+ int tuple_num;
+ struct stm_key_value_tuple *tuple;
+ };
+
+ typedef struct evhttp_request stm_network_connection;
+
+ struct stm_cmd_transaction
+ {
+ struct stm_cmd_request *cmd_req;
+ struct stm_cmd_reply *cmd_res[STELLAR_MAX_THREAD_NUM]; // multi thread merge to one
+ };
+
+ struct stm_connection_manager
+ {
+ struct timeval link_start_time;
+ struct timeval last_active_time;
+ struct evhttp_connection *conn;
+ char peer_ipaddr[INET6_ADDRSTRLEN];
+ uint16_t peer_port_host_order;
+ struct stm_connection_manager *next, *prev;
+ };
+
+ struct stm_stat_counter
+ {
+ int counter_id;
+ uint64_t count;
+ uint64_t bytes;
+ };
+
+ struct stm_stat
+ {
+ void *fs4_ins;
+ struct stm_stat_counter counters[STM_STAT_MAX];
+ };
+
+ struct monitor_connection
+ {
+ struct evhttp_connection *current_evconn_ref;
+ };
+
+ /* optional API */
+ struct monitor_connection;
+ typedef void(monitor_connection_close_cb)(struct monitor_connection *conn, void *arg);
+ int monitor_register_connection_close_cb(struct stellar_monitor *monitor, monitor_connection_close_cb *cb, void *arg);
+ struct stm_conn_close_cb_manager
+ {
+ monitor_connection_close_cb *cb;
+ void *arg;
+ struct stm_conn_close_cb_manager *next, *prev;
+ };
+
+ struct stm_pktdump_runtime;
+ struct stellar_monitor
+ {
+ struct module_manager *mod_mgr_ref;
+ struct logger *logger_ref;
+ int worker_thread_num;
+ struct stellar_monitor_config *config;
+ struct stm_cmd_assistant *aide; // reference, share with stellar
+ struct stm_connection_manager *connection_mgr; // used to tracking all connections, for cli "who" command
+ struct stm_conn_close_cb_manager *conn_close_mgr;
+ // struct stm_ringbuf_mgr *ringbuf_mgr[STELLAR_MAX_THREAD_NUM];
+ struct event_base *evt_base;
+ // struct event *ev_timeout;
+ struct evhttp *evt_http_server;
+ pthread_t evt_main_loop_tid;
+ struct timeval time_now;
+ struct stm_stat *stat;
+ struct stm_spinlock *lock; // for dynamic register command, conn_close_cb
+ int (*gettime_cb)(struct timeval *tv, struct timezone *tz);
+ struct monitor_connection current_conn;
+ struct stm_pktdump_runtime *packet_dump;
+
+ struct monitor_rpc **rpc_ins_array; // multir threads
+ };
+
+ enum monitor_reply_type
+ {
+ MONITOR_REPLY_INTEGER,
+ MONITOR_REPLY_DOUBLE,
+ MONITOR_REPLY_STRING,
+ MONITOR_REPLY_ERROR,
+ MONITOR_REPLY_STATUS,
+ MONITOR_REPLY_NIL,
+ };
+
+ struct monitor_reply
+ {
+ enum monitor_reply_type type;
+ long long integer; /* The integer when type is SWARMKV_REPLY_INTEGER */
+ double dval; /* The double when type is SWARMKV_REPLY_DOUBLE */
+ int len; /* Length of string */
+ char *str;
+ int http_code;
+ const char *http_reason;
+ };
+
+ struct monitor_cli_args
+ {
+ const char *short_opt;
+ const char *long_opt;
+ int require_arg_value;
+ int value_is_multi_words; // "a b c d e f g"
+ char *value; // should be free after use
+ };
+ /************************************************************************************************************/
+ /* monitor call gettimeofday(2) by default */
+ struct stellar_monitor_config *stellar_monitor_config_new(const char *toml);
+ int stellar_monitor_set_gettime_callback(struct stellar_monitor *stm, int (*gettime_cb)(struct timeval *tv, struct timezone *tz));
+ struct stellar_monitor *stellar_monitor_get(void);
+ struct stm_connection_manager *stm_connection_insert(struct evhttp_connection *evconn);
+ void stm_connection_update(struct stm_connection_manager *conn_mgr, const struct evhttp_connection *evconn);
+ void stm_connection_delete(struct evhttp_connection *evconn);
+ const struct stm_connection_manager *stm_connection_search(const struct stm_connection_manager *conn_mgr_head, const struct evhttp_connection *evconn);
+
+ struct stm_key_value *stm_cmd_key_value_new(void);
+ void stm_cmd_key_value_append(struct stm_key_value **kv, const char *key, const char *value);
+ void stm_cmd_key_value_free(struct stm_key_value *kv);
+
+ /************************************** command manager **********************************************/
+ struct stm_stat *stm_stat_init(struct stellar_monitor *stm);
+ sds stm_config_print(const struct stellar_monitor_config *config);
+ void stm_stat_free(struct stm_stat *stat);
+ void stm_stat_update(struct stm_stat *stat, int thread_idx, enum stm_stat_type type, long long value);
+ long long stm_get_stat_count(struct stm_stat *stat, enum stm_stat_type type);
+ long long stm_get_stat_bytes(struct stm_stat *stat, enum stm_stat_type type);
+ sds monitor_reply_to_string(const struct monitor_reply *reply);
+ void monitor_reply_free(struct monitor_reply *reply);
+ int monitor_util_parse_cmd_args(int argc, const char *argv[], struct monitor_cli_args cli_args[], size_t cli_args_array_size);
+ char *stm_http_url_encode(const char *originalText);
+ struct stm_spinlock *stm_spinlock_new(void);
+ void stm_spinlock_lock(struct stm_spinlock *splock);
+ void stm_spinlock_unlock(struct stm_spinlock *splock);
+ void stm_spinlock_free(struct stm_spinlock *splock);
+ struct stm_pktdump_runtime *stm_packet_dump_new(struct stellar_monitor *stm, const struct stellar_monitor_config *config);
+ void stm_pktdump_enforcer_free(struct stellar_monitor *stm);
+
+ struct monitor_rpc *stm_rpc_new(void);
+ void stm_rpc_free(struct monitor_rpc *rpc_ins);
+ int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins);
+ struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args);
+ void monitor_rpc_free(struct monitor_rpc *rpc_ins);
+ struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr);
+ struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh);
+
+ /* Must be called in 'monitor_cmd_cb' context */
+ struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor);
+ /* Get the remote address and port associated with this connection. */
+ int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, unsigned short *peer_port);
+
+ /* command enforcer */
+ int show_session_enforcer_init(struct module_manager *mod_mgr, struct stellar_monitor *stm);
+#ifdef __cplusplus
+}
+#endif
diff --git a/infra/monitor/monitor_ringbuf.c b/infra/monitor/monitor_ringbuf.c
new file mode 100644
index 0000000..63270ba
--- /dev/null
+++ b/infra/monitor/monitor_ringbuf.c
@@ -0,0 +1,137 @@
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stddef.h>
+#include "monitor_private.h"
+#include "monitor_ringbuf.h"
+
+struct monitor_ringbuf_wrap
+{
+ ringbuf_t *ringbuf;
+ char *ringbuf_data;
+ unsigned long long push_number; // only for statistics
+ unsigned long long push_bytes; // only for statistics
+ unsigned long long pop_number; // only for statistics
+ unsigned long long pop_bytes; // only for statistics
+};
+
+ssize_t stm_ringbuf_stream_start(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t require_size)
+{
+ ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id);
+ ssize_t offset = ringbuf_acquire(rbf->ringbuf, rb_worker, require_size);
+ if (offset < 0)
+ {
+ STM_DBG_PRINT("stm ringbuf stream prealloc buffer(): ringbuf_acquire fail, no valid space!\n");
+ return 0;
+ }
+ return offset;
+}
+
+int stm_ringbuf_stream_append(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t rbf_offset, const void *value, size_t len)
+{
+ (void)thread_id;
+ memcpy(rbf->ringbuf_data + rbf_offset, value, len);
+ rbf->push_number++;
+ rbf->push_bytes += len;
+ return 0;
+}
+
+void stm_ringbuf_stream_finish(int thread_id, struct monitor_ringbuf_wrap *rbf)
+{
+ ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id);
+ ringbuf_produce(rbf->ringbuf, rb_worker);
+}
+
+int stm_ringbuf_easy_push(int thread_id, struct monitor_ringbuf_wrap *rbf, const void *push_value /*must continuous*/, size_t push_len)
+{
+ ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id);
+ ssize_t offset = ringbuf_acquire(rbf->ringbuf, rb_worker, push_len);
+ if (offset < 0)
+ {
+ STM_DBG_PRINT("stm ringbuf easy push(): ringbuf_acquire fail, no valid space!\n");
+ return -1;
+ }
+ memcpy(rbf->ringbuf_data + offset, push_value, push_len);
+
+ ringbuf_produce(rbf->ringbuf, rb_worker);
+ rbf->push_number++;
+ rbf->push_bytes += push_len;
+ // STM_DBG_PRINT("stm ringbuf push() success, len:%llu, number:%llu\n", push_len, rbf->push_number);
+ return 0;
+}
+
+void *stm_ringbuf_pop(struct monitor_ringbuf_wrap *rbf, size_t *pop_len)
+{
+ size_t len = 0, offset = 0;
+ len = ringbuf_consume(rbf->ringbuf, &offset);
+ if (0 == len)
+ {
+ // STM_DBG_PRINT("stm_ringbuf_pop(): not valid data\n");
+ *pop_len = 0;
+ return NULL;
+ }
+ rbf->pop_number++;
+ *pop_len = len;
+ // STM_DBG_PRINT("stm_ringbuf_pop() success, len:%llu, number:%llu\n", len, rbf->pop_number);
+ return rbf->ringbuf_data + offset;
+}
+
+void stm_ringbuf_release(struct monitor_ringbuf_wrap *rbf, int rel_len)
+{
+ ringbuf_release(rbf->ringbuf, rel_len);
+ rbf->pop_bytes += rel_len;
+}
+
+struct monitor_ringbuf_wrap *stm_ringbuf_wrap_new(int thread_tot_num, size_t ringbuf_size)
+{
+ struct monitor_ringbuf_wrap *rbf = (struct monitor_ringbuf_wrap *)calloc(1, sizeof(struct monitor_ringbuf_wrap));
+ size_t ringbuf_obj_size;
+ ringbuf_get_sizes(thread_tot_num, &ringbuf_obj_size, NULL);
+ rbf->ringbuf = (ringbuf_t *)calloc(1, ringbuf_obj_size);
+ rbf->ringbuf_data = (char *)calloc(1, ringbuf_size);
+ ringbuf_setup(rbf->ringbuf, thread_tot_num, ringbuf_size);
+ return rbf;
+}
+
+void stm_ringbuf_wrap_free(struct monitor_ringbuf_wrap *rbf)
+{
+ if (NULL == rbf)
+ {
+ return;
+ }
+ if (rbf->ringbuf)
+ {
+ free(rbf->ringbuf);
+ }
+ if (rbf->ringbuf_data)
+ {
+ free(rbf->ringbuf_data);
+ }
+ free(rbf);
+}
+
+void stm_ringbuf_get_statistics(const struct monitor_ringbuf_wrap *rbf, unsigned long long *push_number, unsigned long long *push_bytes, unsigned long long *pop_number, unsigned long long *pop_bytes)
+{
+ if (NULL == rbf)
+ {
+ return;
+ }
+ if (push_number)
+ {
+ *push_number = rbf->push_number;
+ }
+ if (push_bytes)
+ {
+ *push_bytes = rbf->push_bytes;
+ }
+ if (pop_number)
+ {
+ *pop_number = rbf->pop_number;
+ }
+ if (pop_bytes)
+ {
+ *pop_bytes = rbf->pop_bytes;
+ }
+} \ No newline at end of file
diff --git a/infra/monitor/monitor_ringbuf.h b/infra/monitor/monitor_ringbuf.h
new file mode 100644
index 0000000..294bac5
--- /dev/null
+++ b/infra/monitor/monitor_ringbuf.h
@@ -0,0 +1,30 @@
+#pragma once
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "ringbuf/ringbuf.h"
+
+ /*
+ Message format:
+ || header | payload ....(variable-length) || header | payload ....(variable-length) || ....
+ */
+
+ struct monitor_ringbuf_wrap;
+ struct monitor_ringbuf_wrap *stm_ringbuf_wrap_new(int thread_tot_num, size_t ringbuf_size);
+ int stm_ringbuf_easy_push(int thread_id, struct monitor_ringbuf_wrap *rbf, const void *value, size_t len);
+ void stm_ringbuf_release(struct monitor_ringbuf_wrap *rbf, int rel_len);
+ void *stm_ringbuf_pop(struct monitor_ringbuf_wrap *rbf, size_t *pop_len);
+ void stm_ringbuf_wrap_free(struct monitor_ringbuf_wrap *rbf);
+ ssize_t stm_ringbuf_stream_start(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t require_size);
+ int stm_ringbuf_stream_append(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t rbf_offset, const void *value, size_t len);
+ void stm_ringbuf_stream_finish(int thread_id, struct monitor_ringbuf_wrap *rbf);
+ void stm_ringbuf_get_statistics(const struct monitor_ringbuf_wrap *rbf, unsigned long long *push_number,
+ unsigned long long *push_bytes, unsigned long long *pop_number, unsigned long long *pop_bytes);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/infra/monitor/monitor_rpc.c b/infra/monitor/monitor_rpc.c
new file mode 100644
index 0000000..8e2af67
--- /dev/null
+++ b/infra/monitor/monitor_rpc.c
@@ -0,0 +1,115 @@
+#include <stddef.h>
+#include "stellar/monitor.h"
+#include "monitor_private.h"
+#include "monitor_rpc.h"
+#include "stellar/module.h"
+
+#define RPC_WORKER_THREAD_BUSY 1
+#define RPC_WORKER_THREAD_IDLE 0
+
+struct monitor_rpc_msg_hdr
+{
+ unsigned int type;
+ unsigned int length; // total messaage length, include this header, = payload length + sizeof(struct monitor_rpc_msg_hdr)
+ char value[0]; // variable-length, continuous
+} __attribute__((packed));
+
+enum monitor_rpc_ringbuf_dir
+{ // full duplex, dir: 0: worker thread to monitor thread; 1: monitor thread to worker thread
+ RPC_RINBUG_DIR_W2M = 0,
+ RPC_RINBUG_DIR_M2W = 1,
+ RPC_RINBUG_DIR_MAX = 2,
+};
+
+struct monitor_rpc
+{
+ volatile long atomic_val;
+
+ monitor_rpc_callabck *rpc_cb;
+ void *rpc_args;
+ struct iovec rpc_request;
+ struct iovec rpc_response;
+};
+
+struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args)
+{
+ while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY)
+ {
+ // wait for the last rpc response, not support concurrent rpc yet!
+ usleep(1000);
+ }
+ rpc_ins->rpc_cb = cb;
+ rpc_ins->rpc_args = user_args;
+ rpc_ins->rpc_request = rpc_request;
+ __sync_fetch_and_or(&rpc_ins->atomic_val, 1);
+
+ while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY)
+ {
+ // wait for the rpc response...
+ usleep(1000);
+ }
+ return rpc_ins->rpc_response;
+}
+
+int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins)
+{
+ if (0 == __sync_or_and_fetch(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE))
+ {
+ return 0;
+ }
+ rpc_ins->rpc_response = rpc_ins->rpc_cb(thread_idx, rpc_ins->rpc_request, rpc_ins->rpc_args);
+ __sync_fetch_and_and(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE);
+ return 1;
+}
+
+/*
+ * Communicate between different threads by ringbuf.
+ */
+struct iovec monitor_worker_thread_rpc(struct stellar_monitor *stm, int worker_thread_idx, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args)
+{
+ int worker_thread_num = module_manager_get_max_thread_num(stm->mod_mgr_ref);
+ if (worker_thread_idx >= worker_thread_num)
+ {
+ struct iovec response = {0};
+ return response;
+ }
+ struct monitor_rpc *rpc_ins = stm->rpc_ins_array[worker_thread_idx];
+ return stm_rpc_call(rpc_ins, rpc_request, cb, user_args);
+}
+
+__thread long long rpc_idle_num = 0;
+
+void module_rpc_worker_thread_polling_cb(struct module_manager *mod_mgr, void *polling_arg)
+{
+ struct stellar_monitor *stm = (struct stellar_monitor *)polling_arg;
+ int thread_idx = module_manager_get_thread_id(mod_mgr);
+ struct monitor_rpc *rpc_ins = stm->rpc_ins_array[thread_idx];
+
+ stm_rpc_exec(thread_idx, rpc_ins);
+}
+
+struct monitor_rpc *stm_rpc_new(void)
+{
+ struct monitor_rpc *rpc_ins = (struct monitor_rpc *)calloc(1, sizeof(struct monitor_rpc));
+ return rpc_ins;
+}
+
+void stm_rpc_free(struct monitor_rpc *rpc_ins)
+{
+ if (NULL == rpc_ins)
+ {
+ return;
+ }
+ free(rpc_ins);
+}
+
+struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr)
+{
+ module_manager_polling_subscribe(mod_mgr, module_rpc_worker_thread_polling_cb, (void *)stm);
+ return stm_rpc_new();
+}
+
+void monitor_rpc_free(struct monitor_rpc *rpc_ins)
+{
+ stm_rpc_free(rpc_ins);
+} \ No newline at end of file
diff --git a/infra/monitor/monitor_rpc.h b/infra/monitor/monitor_rpc.h
new file mode 100644
index 0000000..9945c5c
--- /dev/null
+++ b/infra/monitor/monitor_rpc.h
@@ -0,0 +1,10 @@
+#pragma once
+#include <stdint.h>
+#include <stddef.h>
+#include "stellar/monitor.h"
+#include <bits/types/struct_iovec.h>
+
+struct monitor_rpc;
+
+typedef struct iovec(monitor_rpc_callabck)(int worker_thread_idx, struct iovec user_data, void *user_args);
+struct iovec monitor_worker_thread_rpc(struct stellar_monitor *monitor, int worker_thread_idx, struct iovec user_data, monitor_rpc_callabck *cb, void *user_args);
diff --git a/infra/monitor/monitor_server.c b/infra/monitor/monitor_server.c
new file mode 100644
index 0000000..35bcea3
--- /dev/null
+++ b/infra/monitor/monitor_server.c
@@ -0,0 +1,591 @@
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <pthread.h>
+#include <assert.h>
+#include <evhttp.h>
+#include <event2/event.h>
+#include <event2/keyvalq_struct.h>
+#include <sys/queue.h>
+#include "stellar/log.h"
+#include "monitor_private.h"
+#include "monitor_cmd_assistant.h"
+#include "monitor/monitor_utils.h"
+#include "toml/toml.h"
+#include "sds/sds.h"
+#include "uthash/utlist.h"
+
+static __thread struct stellar_monitor *__thread_local_stm;
+static __thread pthread_t __thread_local_tid;
+static __thread pthread_t __stm_libevevt_callback_thread_local_tid;
+static __thread struct logger *__stm_thread_local_logger = NULL;
+
+static void stm_save_thread_local_context(struct stellar_monitor *stm)
+{
+ __thread_local_stm = stm;
+ __thread_local_tid = pthread_self();
+ __stm_thread_local_logger = stm->logger_ref;
+}
+
+static void stm_connection_close_notify(struct stellar_monitor *stm, UNUSED struct evhttp_connection *evconn)
+{
+ struct stm_conn_close_cb_manager *ele, *tmp;
+ DL_FOREACH_SAFE(stm->conn_close_mgr, ele, tmp)
+ {
+ ele->cb(&stm->current_conn, ele->arg);
+ }
+}
+
+static void on_connection_close_cb(UNUSED struct evhttp_connection *ev_conn, UNUSED void *arg)
+{
+ __stm_libevevt_callback_thread_local_tid = pthread_self();
+ struct stellar_monitor *stm = stellar_monitor_get();
+ stm->current_conn.current_evconn_ref = ev_conn;
+ stm_spinlock_lock(stm->lock);
+ stm_connection_delete(ev_conn);
+ stm_connection_close_notify(stm, ev_conn);
+ stm_spinlock_unlock(stm->lock);
+ char *peer_ip_addr;
+ uint16_t peer_port;
+ evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port);
+ STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "cli connection closed, client %s:%u\n", peer_ip_addr, peer_port);
+ stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_CLOSE, 1);
+ stm->current_conn.current_evconn_ref = NULL;
+}
+
+static void stm_command_send_reply_by_cstr(struct evhttp_request *request, int http_status_code, const char *reply, UNUSED void *params)
+{
+ struct evbuffer *buffer = evbuffer_new();
+ evbuffer_add(buffer, reply, strlen(reply));
+ evhttp_send_reply(request, http_status_code, "OK", buffer);
+ evbuffer_free(buffer);
+}
+
+static void stm_command_send_reply(struct evhttp_request *request, struct monitor_reply *reply)
+{
+ struct evbuffer *buffer = evbuffer_new();
+ sds reply_str = monitor_reply_to_string(reply);
+ evbuffer_add(buffer, reply_str, sdslen(reply_str));
+ evhttp_send_reply(request, reply->http_code, reply->http_reason, buffer);
+ evbuffer_free(buffer);
+ sdsfree(reply_str);
+ monitor_reply_free(reply);
+}
+
+static void stm_command_notfound(struct evhttp_request *request, UNUSED void *arg)
+{
+ struct stellar_monitor *stm = stellar_monitor_get();
+ const char *req_str_uri = evhttp_request_get_uri(request);
+ struct evkeyvalq headers = {};
+ evhttp_parse_query(req_str_uri, &headers);
+ const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY);
+
+ stm_command_send_reply(request, monitor_reply_new_error(error_format_unknown_command, raw_cmd_content));
+ STELLAR_LOG_ERROR(stm->logger_ref, STM_LOG_MODULE_NAME, "invlid http uri: %s\r\n", evhttp_request_get_uri(request));
+ evhttp_clear_headers(&headers);
+}
+
+static void stm_exec_command(struct stellar_monitor *stm, struct evhttp_request *request, const char *cmd_line)
+{
+ stm_spinlock_lock(stm->lock);
+ monitor_cmd_cb *cmd_cb = stm_cmd_assistant_get_cb(stm->aide, cmd_line);
+ if (NULL == cmd_cb)
+ {
+ stm_command_notfound(request, NULL);
+ stm_spinlock_unlock(stm->lock);
+ return;
+ }
+ void *cmd_user_arg = stm_cmd_assistant_get_user_arg(stm->aide, cmd_line);
+ int argc;
+ sds *cmd_argv = sdssplitargs(cmd_line, &argc);
+ struct monitor_reply *reply = cmd_cb(stm, argc, cmd_argv, cmd_user_arg);
+
+ stm_command_send_reply(request, reply);
+ sdsfreesplitres(cmd_argv, argc);
+ stm_spinlock_unlock(stm->lock);
+}
+
+static void stm_new_request_cb(struct evhttp_request *request, UNUSED void *privParams)
+{
+ __stm_libevevt_callback_thread_local_tid = pthread_self();
+ struct stellar_monitor *stm = stellar_monitor_get();
+ struct evhttp_connection *ev_conn = evhttp_request_get_connection(request);
+ stm->current_conn.current_evconn_ref = ev_conn;
+ stm_spinlock_lock(stm->lock);
+ stm_connection_insert(ev_conn);
+ stm_spinlock_unlock(stm->lock);
+ evhttp_connection_set_closecb(ev_conn, on_connection_close_cb, request);
+ // evhttp_request_set_error_cb(request, on_request_error_cb);
+
+ const char *req_str_uri = evhttp_request_get_uri(request);
+ char *peer_ip_addr;
+ uint16_t peer_port;
+ evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port);
+
+ STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "new cli request, client:%s:%u, uri: %s\n", peer_ip_addr, peer_port, req_str_uri);
+
+ struct evkeyvalq headers = {};
+ evhttp_parse_query(req_str_uri, &headers);
+ const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY);
+ if (NULL == raw_cmd_content)
+ {
+ stm_command_send_reply_by_cstr(request, HTTP_BADREQUEST, "http uri syntax error\r\n", NULL);
+ evhttp_clear_headers(&headers);
+ return;
+ }
+ stm_exec_command(stm, request, raw_cmd_content);
+ evhttp_clear_headers(&headers);
+}
+
+static int stm_event_http_init(struct stellar_monitor *stm)
+{
+ // Create a new event handler
+ stm->evt_base = event_base_new();
+ // Create a http server using that handler
+ stm->evt_http_server = evhttp_new(stm->evt_base);
+ // Limit serving GET requests
+ evhttp_set_allowed_methods(stm->evt_http_server, EVHTTP_REQ_GET);
+
+ char restful_path[256] = {0}; /* must start with '/' */
+ snprintf(restful_path, sizeof(restful_path), "/%s/%s", STM_RESTFUL_VERSION, STM_RESTFUL_RESOURCE);
+ evhttp_set_cb(stm->evt_http_server, restful_path, stm_new_request_cb, stm->evt_base);
+
+ // Set the callback for anything not recognized
+ evhttp_set_gencb(stm->evt_http_server, stm_command_notfound, NULL);
+ if (evhttp_bind_socket(stm->evt_http_server, stm->config->listen_ipaddr, stm->config->listen_port_host_order) != 0)
+ {
+ STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "Could not bind to %s:%u\r\n", stm->config->listen_ipaddr, stm->config->listen_port_host_order);
+ return -1;
+ }
+ evhttp_set_timeout(stm->evt_http_server, stm->config->connection_idle_timeout);
+ STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "accept http uri path: %s\r\n", restful_path);
+ return 0;
+}
+
+static void *stm_event_main_loop(void *arg)
+{
+ struct stellar_monitor *stm = (struct stellar_monitor *)arg;
+ stm_save_thread_local_context(stm);
+ event_base_dispatch(stm->evt_base);
+ return NULL;
+}
+
+static void stm_event_http_free(struct stellar_monitor *stm)
+{
+ event_base_loopbreak(stm->evt_base);
+ pthread_cancel(stm->evt_main_loop_tid);
+ pthread_join(stm->evt_main_loop_tid, NULL);
+ evhttp_free(stm->evt_http_server);
+ // event_free(stm->ev_timeout);
+ event_base_free(stm->evt_base);
+}
+
+static void stm_server_set_default_cfg(struct stellar_monitor_config *config)
+{
+ config->ringbuf_size = STM_RINGBUF_SIZE;
+ config->connection_idle_timeout = STM_CONNECTION_IDLE_TIMEOUT;
+ config->cli_request_timeout = STM_REQUEST_TIMEOUT;
+ config->listen_ipaddr = "0.0.0.0";
+ config->listen_port_host_order = STM_SERVER_LISTEN_PORT;
+ config->data_link_bind_port_host_order = STM_TZSP_UDP_PORT;
+ config->output_interval_ms = STM_STAT_OUTPUT_INTERVAL_MS;
+}
+
+int stellar_monitor_set_gettime_callback(struct stellar_monitor *stm, int (*gettime_cb)(struct timeval *tv, struct timezone *tz))
+{
+ if (NULL == gettime_cb)
+ {
+ return -1;
+ }
+ stm->gettime_cb = gettime_cb;
+ return 0;
+}
+
+struct stellar_monitor_config *stellar_monitor_config_new(const char *toml_file)
+{
+ struct stellar_monitor_config *config = CALLOC(struct stellar_monitor_config, 1);
+ stm_server_set_default_cfg(config);
+ int64_t int64_val = 0;
+ char errbuf[256];
+ FILE *fp = NULL;
+ toml_table_t *root = NULL;
+ toml_table_t *table = NULL;
+ toml_raw_t ptr = NULL;
+
+ fp = fopen(toml_file, "r");
+ if (fp == NULL)
+ {
+ fprintf(stderr, "config file %s open failed, %s", toml_file, strerror(errno));
+ goto fail_exit;
+ }
+
+ root = toml_parse_file(fp, errbuf, sizeof(errbuf));
+ if (root == NULL)
+ {
+ fprintf(stderr, "config file %s parse failed, %s", toml_file, errbuf);
+ goto fail_exit;
+ }
+
+ table = toml_table_in(root, "monitor");
+ if (table == NULL)
+ {
+ fprintf(stderr, "config file %s missing [monitor]", toml_file);
+ goto fail_exit;
+ }
+
+ /* listen_port */
+ ptr = toml_raw_in(table, "listen_port");
+ if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
+ {
+ if (int64_val < 1 || int64_val > 65535)
+ {
+ fprintf(stderr, "invalid monitor.listen_port %ld\n", int64_val);
+ FREE(config);
+ goto fail_exit;
+ }
+ config->listen_port_host_order = (uint16_t)int64_val;
+ }
+
+ /* data link bind port */
+ ptr = toml_raw_in(table, "data_link_bind_port");
+ if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
+ {
+ if (int64_val < 1 || int64_val > 65535)
+ {
+ fprintf(stderr, "invalid monitor.data_link_bind_port %ld\n", int64_val);
+ FREE(config);
+ goto fail_exit;
+ }
+ config->data_link_bind_port_host_order = (uint16_t)int64_val;
+ }
+
+ /* connection_idle_timeout */
+ ptr = toml_raw_in(table, "connection_idle_timeout");
+ if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
+ {
+ if (int64_val < 1 || int64_val > 3600)
+ {
+ fprintf(stderr, "invalid monitor.connection_idle_timeout %ld, should be [1, 3600]\n", int64_val);
+ FREE(config);
+ goto fail_exit;
+ }
+ config->connection_idle_timeout = (int)int64_val;
+ }
+
+ /* cli_request_timeout */
+ ptr = toml_raw_in(table, "cli_request_timeout");
+ if (ptr != NULL || toml_rtoi(ptr, &int64_val) == 0)
+ {
+ if (int64_val < 1 || int64_val > 360)
+ {
+ fprintf(stderr, "invalid monitor.cli_request_timeout %ld, , should be [1, 360]\n", int64_val);
+ FREE(config);
+ goto fail_exit;
+ }
+ config->cli_request_timeout = (int)int64_val;
+ }
+
+ /* stat */
+ ptr = toml_raw_in(table, "stat_output_path");
+ if (ptr == NULL || toml_rtos(ptr, &config->output_path) != 0)
+ {
+ config->output_path = strdup(STM_STAT_OUTPUT_PATH);
+ }
+
+ ptr = toml_raw_in(table, "stat_output_interval_ms");
+ if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
+ {
+ if (int64_val < 1000 || int64_val > 1000 * 60)
+ {
+ fprintf(stderr, "invalid monitor.stat_output_interval_ms %ld, , should be [1, 600000]\n", int64_val);
+ FREE(config);
+ goto fail_exit;
+ }
+ config->output_interval_ms = (int)int64_val;
+ }
+
+fail_exit:
+ if (root)
+ {
+ toml_free(root);
+ }
+ if (fp)
+ {
+ fclose(fp);
+ }
+ return config;
+}
+
+struct stellar_monitor *stellar_monitor_get(void)
+{
+ if (pthread_self() != __thread_local_tid)
+ {
+ assert(0);
+ // fprintf(stderr, "ERR stellar_monitor_get() failed, caller must in same thread context!\n");
+ return NULL;
+ }
+ return __thread_local_stm;
+}
+
+// support dynamic register command, independent of the order of initialization
+int monitor_register_cmd(struct stellar_monitor *stm, const char *cmd, monitor_cmd_cb *cb, const char *flags,
+ const char *hint, const char *desc, void *arg)
+{
+ stm_spinlock_lock(stm->lock);
+ int ret = stm_cmd_assistant_register_cmd(stm->aide, cmd, cb, arg, flags, hint, desc);
+ stm_spinlock_unlock(stm->lock);
+ return ret;
+}
+
+int monitor_register_connection_close_cb(struct stellar_monitor *stm, monitor_connection_close_cb *cb, void *arg)
+{
+ stm_spinlock_lock(stm->lock);
+ struct stm_conn_close_cb_manager *ele = CALLOC(struct stm_conn_close_cb_manager, 1);
+ ele->cb = cb;
+ ele->arg = arg;
+ DL_APPEND(stm->conn_close_mgr, ele);
+ stm_spinlock_unlock(stm->lock);
+ return 0;
+}
+
+static struct monitor_reply *monitor_cmd_show_brief_cb(struct stellar_monitor *stm)
+{
+ sds cmd_brief = sdsempty();
+ cmd_brief = sdscatfmt(cmd_brief, "%s, %s\r\n", "\"command\"", "description");
+ cmd_brief = sdscatfmt(cmd_brief, "-----------------------------\r\n");
+ sds cmd_brief_cont = stm_cmd_assistant_list_cmd_brief(stm->aide);
+ cmd_brief = sdscatsds(cmd_brief, cmd_brief_cont);
+ struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_brief);
+ sdsfree(cmd_brief);
+ sdsfree(cmd_brief_cont);
+ return reply;
+}
+
+static struct monitor_reply *monitor_cmd_show_verbose_cb(struct stellar_monitor *stm)
+{
+ sds cmd_verbose = stm_cmd_assistant_list_cmd_verbose(stm->aide);
+ struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_verbose);
+ sdsfree(cmd_verbose);
+ return reply;
+}
+
+static struct monitor_reply *monitor_server_builtin_show_command_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg)
+{
+ if (argc != 3)
+ {
+ return monitor_reply_new_error(error_format_wrong_number_of_args, "show command");
+ }
+ if (stm_strncasecmp_exactly(argv[2], "brief", 5) == 0)
+ {
+ return monitor_cmd_show_brief_cb((struct stellar_monitor *)arg);
+ }
+ else if (stm_strncasecmp_exactly(argv[2], "verbose", 7) == 0)
+ {
+ return monitor_cmd_show_verbose_cb((struct stellar_monitor *)arg);
+ }
+ return monitor_reply_new_error(error_format_unknown_arg, argv[2]);
+}
+
+static struct monitor_reply *monitor_server_builtin_ping_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg)
+{
+ if (argc == 1)
+ {
+ return monitor_reply_new_string("pong");
+ }
+ else if (argc == 2)
+ {
+ return monitor_reply_new_string("%s", argv[1]);
+ }
+ return monitor_reply_new_error(error_format_wrong_number_of_args, "ping");
+}
+
+static struct monitor_reply *monitor_server_builtin_who_cb(struct stellar_monitor *stm, int argc UNUSED, char *argv[] UNUSED, UNUSED void *arg)
+{
+ struct stm_connection_manager *conn_mgr = stm->connection_mgr;
+ struct stm_connection_manager *ele, *tmp;
+ sds who = sdsempty();
+ char timestr[64];
+ DL_FOREACH_SAFE(conn_mgr, ele, tmp)
+ {
+ struct timeval tv = ele->link_start_time;
+ struct tm *tm = localtime(&tv.tv_sec);
+ strftime(timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", tm);
+ who = sdscatprintf(who, "%s %s:%u", timestr, ele->peer_ipaddr, ele->peer_port_host_order);
+ if (stm->current_conn.current_evconn_ref == ele->conn)
+ {
+ who = sdscat(who, "\033[1m [current]\033[0m");
+ }
+ who = sdscat(who, "\r\n");
+ }
+ sdsIncrLen(who, -2); // delete last \r\n
+ struct monitor_reply *reply = monitor_reply_new_string("%s", who);
+ sdsfree(who);
+ return reply;
+}
+
+static int stm_builtin_cmd_register(struct stellar_monitor *stm)
+{
+ int ret = 0;
+ ret += monitor_register_cmd(stm, "show command", monitor_server_builtin_show_command_cb, "readonly", "[ brief|verbose ]", "show all registered commands info", (void *)stm);
+ assert(ret == 0);
+ ret += monitor_register_cmd(stm, "who", monitor_server_builtin_who_cb, "readonly", "<cr>", "show who is logged on", (void *)stm);
+ assert(ret == 0);
+ ret += monitor_register_cmd(stm, "ping", monitor_server_builtin_ping_cb, "readonly", "[message]", "ping the server", (void *)stm);
+ assert(ret == 0);
+ return ret;
+}
+
+struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor)
+{
+ if (__stm_libevevt_callback_thread_local_tid != pthread_self())
+ {
+ return NULL;
+ }
+ return &monitor->current_conn;
+}
+
+int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, uint16_t *peer_port)
+{
+ if (NULL == conn || conn->current_evconn_ref == NULL)
+ {
+ if (peer_ip)
+ {
+ *peer_ip = NULL;
+ }
+ if (peer_port)
+ {
+ *peer_port = 0;
+ }
+ return -1;
+ }
+ evhttp_connection_get_peer(conn->current_evconn_ref, peer_ip, peer_port);
+ return 0;
+}
+
+void monitor_free(struct stellar_monitor *stm)
+{
+ STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "free and exit\n");
+ stm_event_http_free(stm);
+ stm_stat_free(stm->stat);
+ stm_cmd_assistant_free(stm->aide);
+ stm_spinlock_free(stm->lock);
+ if (stm->rpc_ins_array)
+ {
+ for (int tid = 0; tid < stm->worker_thread_num; tid++)
+ {
+ monitor_rpc_free(stm->rpc_ins_array[tid]);
+ }
+ free(stm->rpc_ins_array);
+ }
+
+ __thread_local_stm = NULL;
+ FREE(stm->config->output_path);
+ FREE(stm->config);
+ FREE(stm);
+}
+
+struct stellar_monitor *monitor_module_to_monitor(struct module *monitor_module)
+{
+ if (monitor_module == NULL)
+ {
+ return NULL;
+ }
+ return (struct stellar_monitor *)module_get_ctx(monitor_module);
+}
+
+struct stellar_monitor *stellar_module_get_monitor(struct module_manager *mod_mgr)
+{
+ assert(mod_mgr);
+ struct module *monitor_mod = module_manager_get_module(mod_mgr, MONITOR_MODULE_NAME);
+ return monitor_module_to_monitor(monitor_mod);
+}
+
+void monitor_on_exit(struct module_manager *mod_mgr __attribute__((unused)), struct module *mod)
+{
+ if (mod)
+ {
+ struct stellar_monitor *stm = module_get_ctx(mod);
+ monitor_free(stm);
+ module_free(mod);
+ }
+}
+
+struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh)
+{
+ struct stellar_monitor *stm = (struct stellar_monitor *)calloc(1, sizeof(struct stellar_monitor));
+ stm->logger_ref = logh;
+ stm->mod_mgr_ref = mod_mgr;
+
+ struct stellar_monitor_config *config = stellar_monitor_config_new(toml_file);
+ if (NULL == config)
+ {
+ STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "get config failed!\n");
+ goto fail_exit;
+ }
+ stm->config = config;
+
+ stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr);
+
+ stm->lock = stm_spinlock_new();
+ stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr);
+ assert(stm->worker_thread_num > 0);
+ stm->gettime_cb = gettimeofday;
+ stm->aide = stm_cmd_assistant_new();
+ if (stm_event_http_init(stm) < 0)
+ {
+ STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "libevent http server init() failed!\n");
+ goto fail_exit;
+ }
+ stm->stat = stm_stat_init(stm);
+ stm_builtin_cmd_register(stm);
+ stm_save_thread_local_context(stm);
+ pthread_create(&stm->evt_main_loop_tid, NULL, stm_event_main_loop, (void *)stm);
+ sds config_print = stm_config_print(config);
+ STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "config: %s\n", config_print);
+ sdsfree(config_print);
+
+ stm->rpc_ins_array = (struct monitor_rpc **)calloc(stm->worker_thread_num, sizeof(struct monitor_rpc *));
+ for (int tid = 0; tid < stm->worker_thread_num; tid++)
+ {
+ stm->rpc_ins_array[tid] = monitor_rpc_new(stm, mod_mgr);
+ if (stm->rpc_ins_array[tid] == NULL)
+ {
+ STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "rpc init failed\n");
+ goto fail_exit;
+ }
+ }
+
+ stm_save_thread_local_context(stm);
+ return stm;
+
+fail_exit:
+ monitor_free(stm);
+ return NULL;
+}
+
+struct module *monitor_on_init(struct module_manager *mod_mgr)
+{
+ assert(mod_mgr);
+ const char *toml_file = module_manager_get_toml_path(mod_mgr);
+ assert(toml_file);
+ struct logger *logh = module_manager_get_logger(mod_mgr);
+ assert(logh);
+
+ struct stellar_monitor *stm = monitor_new(toml_file, mod_mgr, logh);
+ struct module *stm_mod = module_new(MONITOR_MODULE_NAME, (void *)stm);
+ if (stm_mod == NULL)
+ {
+ STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "moudule new '%s' fail\n", MONITOR_MODULE_NAME);
+ monitor_free(stm);
+ return NULL;
+ }
+
+ // show_session_enforcer_init(mod_mgr, stm);
+ // stm_pktdump_enforcer_init(mod_mgr, stm);
+
+ STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "init success\n");
+ return stm_mod;
+}
diff --git a/infra/monitor/monitor_spinlock.c b/infra/monitor/monitor_spinlock.c
new file mode 100644
index 0000000..9614803
--- /dev/null
+++ b/infra/monitor/monitor_spinlock.c
@@ -0,0 +1,72 @@
+// https://www.cs.utexas.edu/~pingali/CS378/2015sp/lectures/Spinlocks%20and%20Read-Write%20Locks.htm
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <pthread.h>
+
+#if 0 /* use gcc builtin function */
+struct stm_spinlock
+{
+ long value;
+};
+
+struct stm_spinlock *stm_spinlock_new(void)
+{
+ struct stm_spinlock *splock = (struct stm_spinlock *)calloc(1, sizeof(struct stm_spinlock));
+ return splock;
+}
+
+void stm_spinlock_lock(struct stm_spinlock *splock)
+{
+ while (__sync_lock_test_and_set(&splock->value, 1))
+ {
+ }
+}
+
+void stm_spinlock_unlock(struct stm_spinlock *splock)
+{
+ __sync_lock_release(&splock->value);
+}
+
+void stm_spinlock_free(struct stm_spinlock *splock)
+{
+ if (splock)
+ {
+ free(splock);
+ }
+}
+#else /* pthread spin lock */
+struct stm_spinlock
+{
+ pthread_spinlock_t lock_ins;
+};
+
+struct stm_spinlock *stm_spinlock_new(void)
+{
+ struct stm_spinlock *splock = (struct stm_spinlock *)calloc(1, sizeof(struct stm_spinlock));
+ pthread_spin_init(&splock->lock_ins, PTHREAD_PROCESS_PRIVATE);
+ return splock;
+}
+
+void stm_spinlock_lock(struct stm_spinlock *splock)
+{
+ pthread_spin_lock(&splock->lock_ins);
+}
+
+void stm_spinlock_unlock(struct stm_spinlock *splock)
+{
+ pthread_spin_unlock(&splock->lock_ins);
+}
+
+void stm_spinlock_free(struct stm_spinlock *splock)
+{
+ if (splock)
+ {
+ pthread_spin_destroy(&splock->lock_ins);
+ free(splock);
+ }
+}
+#endif \ No newline at end of file
diff --git a/infra/monitor/monitor_spinlock.h b/infra/monitor/monitor_spinlock.h
new file mode 100644
index 0000000..fb5acb2
--- /dev/null
+++ b/infra/monitor/monitor_spinlock.h
@@ -0,0 +1,8 @@
+#pragma once
+
+struct stm_spinlock;
+
+struct stm_spinlock *stm_spinlock_new(void);
+void stm_spinlock_lock(struct stm_spinlock *splock);
+void stm_spinlock_unlock(struct stm_spinlock *splock);
+void stm_spinlock_free(struct stm_spinlock *splock);
diff --git a/infra/monitor/monitor_stat.c b/infra/monitor/monitor_stat.c
new file mode 100644
index 0000000..2216b09
--- /dev/null
+++ b/infra/monitor/monitor_stat.c
@@ -0,0 +1,76 @@
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <arpa/inet.h>
+#include <getopt.h>
+#include <evhttp.h>
+#include "monitor_private.h"
+#include <fieldstat/fieldstat_easy.h>
+
+static const char *stm_stat_field_name[] = {
+ "connection_new",
+ "connection_close",
+ "request_succ",
+ "request_err",
+ "response_succ",
+ "response_err",
+ NULL,
+};
+
+long long stm_get_stat_count(struct stm_stat *stat, enum stm_stat_type type)
+{
+ if ((int)type < 0 || type >= STM_STAT_MAX)
+ {
+ return 0;
+ }
+ return stat->counters[type].count;
+}
+
+long long stm_get_stat_bytes(struct stm_stat *stat, enum stm_stat_type type)
+{
+ if ((int)type < 0 || type >= STM_STAT_MAX)
+ {
+ return 0;
+ }
+ return stat->counters[type].bytes;
+}
+
+void stm_stat_update(struct stm_stat *stat, int thread_idx, enum stm_stat_type type, long long value)
+{
+ if ((int)type < 0 || type >= STM_STAT_MAX)
+ {
+ return;
+ }
+ fieldstat_easy_counter_incrby(stat->fs4_ins, thread_idx, stat->counters[type].counter_id, NULL, 0, value);
+}
+
+struct stm_stat *stm_stat_init(struct stellar_monitor *stm)
+{
+ const struct stellar_monitor_config *config = stm->config;
+ assert(sizeof(stm_stat_field_name) / sizeof(stm_stat_field_name[0]) == STM_STAT_MAX + 1);
+ struct stm_stat *stat = CALLOC(struct stm_stat, 1);
+ /* worker thread count + 1, reserved for libevent callback thread context */
+ stat->fs4_ins = fieldstat_easy_new(stm->worker_thread_num + 1, "monitor", NULL, 0);
+ for (int i = 0; stm_stat_field_name[i] != NULL; i++)
+ {
+ stat->counters[i].counter_id = fieldstat_easy_register_counter(stat->fs4_ins, stm_stat_field_name[i]);
+ }
+ fieldstat_easy_enable_auto_output(stat->fs4_ins, config->output_path, MAX(config->output_interval_ms / 1000, 1));
+ return stat;
+}
+
+void stm_stat_free(struct stm_stat *stat)
+{
+ if (NULL == stat)
+ {
+ return;
+ }
+ if (stat->fs4_ins)
+ {
+ fieldstat_easy_free(stat->fs4_ins);
+ }
+ FREE(stat);
+} \ No newline at end of file
diff --git a/infra/monitor/monitor_transaction.c b/infra/monitor/monitor_transaction.c
new file mode 100644
index 0000000..0f9aba8
--- /dev/null
+++ b/infra/monitor/monitor_transaction.c
@@ -0,0 +1,83 @@
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <pthread.h>
+#include "uthash/utlist.h"
+#include "monitor_private.h"
+
+/******************************************** connection manager *****************************************/
+struct stm_connection_manager *stm_connection_insert(struct evhttp_connection *evconn)
+{
+ struct stellar_monitor *stm = stellar_monitor_get();
+ struct stm_connection_manager *conn_mgr = stm->connection_mgr;
+ struct stm_connection_manager *ele, *tmp;
+ DL_FOREACH_SAFE(conn_mgr, ele, tmp) // check if current connection already exist
+ {
+ if (ele->conn == evconn)
+ {
+ stm->gettime_cb(&ele->last_active_time, NULL);
+ return ele;
+ }
+ }
+
+ stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_NEW, 1);
+ struct stm_connection_manager *new_conn = (struct stm_connection_manager *)calloc(1, sizeof(struct stm_connection_manager));
+ char *tmp_ip_addr;
+ uint16_t tmp_port;
+ evhttp_connection_get_peer(evconn, &tmp_ip_addr, &tmp_port);
+ if (tmp_ip_addr)
+ {
+ strncpy(new_conn->peer_ipaddr, tmp_ip_addr, INET6_ADDRSTRLEN - 1);
+ }
+ new_conn->peer_port_host_order = tmp_port;
+ stm->gettime_cb(&new_conn->link_start_time, NULL);
+ new_conn->last_active_time = new_conn->link_start_time;
+ new_conn->conn = evconn;
+ DL_APPEND(stm->connection_mgr, new_conn);
+ return new_conn;
+}
+
+void stm_connection_delete(struct evhttp_connection *evconn)
+{
+ struct stellar_monitor *stm = stellar_monitor_get();
+ struct stm_connection_manager *conn_mgr = stm->connection_mgr;
+ struct stm_connection_manager *ele, *tmp;
+ DL_FOREACH_SAFE(conn_mgr, ele, tmp)
+ {
+ if (ele->conn == evconn)
+ {
+ DL_DELETE(conn_mgr, ele);
+ FREE(ele);
+ }
+ }
+ stm->connection_mgr = conn_mgr;
+}
+
+void stm_connection_update(struct stm_connection_manager *conn_mgr, const struct evhttp_connection *evconn)
+{
+ struct stellar_monitor *stm = stellar_monitor_get();
+ struct stm_connection_manager *ele, *tmp;
+ DL_FOREACH_SAFE(conn_mgr, ele, tmp)
+ {
+ if (ele->conn == evconn)
+ {
+ stm->gettime_cb(&ele->last_active_time, NULL);
+ }
+ }
+}
+
+const struct stm_connection_manager *stm_connection_search(const struct stm_connection_manager *conn_mgr_head, const struct evhttp_connection *evconn)
+{
+ const struct stm_connection_manager *ele, *tmp;
+ DL_FOREACH_SAFE(conn_mgr_head, ele, tmp)
+ {
+ if (ele->conn == evconn)
+ {
+ return ele;
+ }
+ }
+ return NULL;
+}
diff --git a/infra/monitor/monitor_utils.c b/infra/monitor/monitor_utils.c
new file mode 100644
index 0000000..5280feb
--- /dev/null
+++ b/infra/monitor/monitor_utils.c
@@ -0,0 +1,672 @@
+#include "sds/sds.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <ctype.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#include <netinet/in.h>
+#include <threads.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#include "monitor_private.h"
+#include "stellar/session.h"
+#include "tuple/tuple.h"
+
+ struct stm_key_value *stm_cmd_key_value_new(void)
+ {
+ struct stm_key_value *kv = (struct stm_key_value *)calloc(1, sizeof(struct stm_key_value));
+ kv->tuple_num = 0;
+ kv->tuple = NULL;
+ return kv;
+ }
+
+ void stm_cmd_key_value_append(struct stm_key_value **kv, const char *key, const char *value)
+ {
+ if (NULL == *kv)
+ {
+ *kv = stm_cmd_key_value_new();
+ }
+ struct stm_key_value *new_kv = *kv;
+ new_kv->tuple = (struct stm_key_value_tuple *)realloc(new_kv->tuple, (new_kv->tuple_num + 1) * sizeof(struct stm_key_value_tuple));
+ new_kv->tuple[new_kv->tuple_num].key = strdup(key);
+ new_kv->tuple[new_kv->tuple_num].value = strdup(value);
+ new_kv->tuple_num++;
+ *kv = new_kv;
+ }
+
+ void stm_cmd_key_value_free(struct stm_key_value *kv)
+ {
+ if (NULL == kv)
+ {
+ return;
+ }
+ for (int i = 0; i < kv->tuple_num; i++)
+ {
+ FREE(kv->tuple[i].key);
+ FREE(kv->tuple[i].value);
+ }
+ FREE(kv->tuple);
+ FREE(kv);
+ }
+
+ void stm_cmd_request_free(struct stm_cmd_request *req)
+ {
+ FREE(req);
+ }
+
+ int stm_strncasecmp_exactly(const char *s1, const char *s2, size_t n)
+ {
+ if (NULL == s1 || NULL == s2)
+ {
+ return -1;
+ }
+ size_t len1 = strlen(s1);
+ size_t len2 = strlen(s2);
+
+ if (len1 != len2 || len1 != n)
+ {
+ return -1;
+ }
+ return strncasecmp(s1, s2, n);
+ }
+
+ const char *stm_session_state_ntop(enum session_state state)
+ {
+ switch (state)
+ {
+ case SESSION_STATE_OPENING:
+ return "opening";
+ break;
+ case SESSION_STATE_ACTIVE:
+ return "active";
+ break;
+
+ case SESSION_STATE_CLOSING:
+ return "closing";
+ break;
+
+ default:
+ break;
+ }
+ return "unknown";
+ }
+
+ const char *stm_session_flow_dir_ntop(uint32_t dir)
+ {
+ if (SESSION_SEEN_C2S_FLOW == dir)
+ {
+ return "C2S";
+ }
+ else if (SESSION_SEEN_S2C_FLOW == dir)
+ {
+ return "S2C";
+ }
+ else if ((SESSION_SEEN_C2S_FLOW | SESSION_SEEN_S2C_FLOW) == dir)
+ {
+ return "BIDIRECTION";
+ }
+ return "UNKNOWN";
+ }
+
+ int stm_time_range_pton(const char *time_str, time_t now, time_t time_range[2])
+ {
+ const char *delim = "-";
+ char *save_ptr;
+ const char *time_str_min_example = "last-1-days";
+ if (NULL == time_str || NULL == time_range)
+ {
+ return -1;
+ }
+ if (strlen(time_str) < strlen(time_str_min_example))
+ {
+ return -1;
+ }
+ char *local_time_str = strdup(time_str);
+
+ char *fix_prefix = strtok_r(local_time_str, delim, &save_ptr);
+ if (stm_strncasecmp_exactly(fix_prefix, "last", 4) != 0)
+ {
+ free(local_time_str);
+ return -1;
+ }
+
+ char *number_string = strtok_r(NULL, delim, &save_ptr);
+ if (NULL == number_string)
+ {
+ free(local_time_str);
+ return -1;
+ }
+ long number = strtol(number_string, NULL, 10);
+ if (number <= 0 || number > 3600)
+ {
+ free(local_time_str);
+ return -1;
+ }
+
+ char *unit_string = strtok_r(NULL, delim, &save_ptr);
+ if (NULL == unit_string)
+ {
+ free(local_time_str);
+ return -1;
+ }
+ long long multiple_second = 1;
+ if (stm_strncasecmp_exactly(unit_string, "seconds", 7) == 0)
+ {
+ multiple_second = 1;
+ }
+ else if (stm_strncasecmp_exactly(unit_string, "minutes", 7) == 0)
+ {
+ multiple_second = 60;
+ }
+ else if (stm_strncasecmp_exactly(unit_string, "hours", 5) == 0)
+ {
+ multiple_second = 60 * 60;
+ }
+ else if (stm_strncasecmp_exactly(unit_string, "days", 4) == 0)
+ {
+ multiple_second = 60 * 60 * 24;
+ }
+ else
+ {
+ free(local_time_str);
+ return -1;
+ }
+
+ if ((long long)now < number * multiple_second)
+ {
+ time_range[0] = 0;
+ }
+ else
+ {
+ time_range[0] = now - number * multiple_second;
+ }
+ time_range[1] = now;
+ while (strtok_r(NULL, delim, &save_ptr))
+ {
+ }
+ free(local_time_str);
+ return 0;
+ }
+
+ int stm_time_in_range(time_t t, const time_t time_range[2])
+ {
+ if (NULL == time_range)
+ {
+ return 0;
+ }
+ if (t >= time_range[0] && t <= time_range[1])
+ {
+ return 1;
+ }
+ return 0;
+ }
+
+ uint32_t stm_inet_pton(const char *ipstr, void *ipv4_value, void *ipv6_value)
+ {
+ unsigned int tmp_ipv4;
+ struct in6_addr tmp_ipv6;
+ if (NULL == ipstr || NULL == ipv4_value || NULL == ipv6_value)
+ {
+ return 0;
+ }
+ if (inet_pton(AF_INET, ipstr, &tmp_ipv4) == 1)
+ {
+ memcpy(ipv4_value, &tmp_ipv4, sizeof(int));
+ return AF_INET;
+ }
+ if (inet_pton(AF_INET6, ipstr, &tmp_ipv6) == 1)
+ {
+ memcpy(ipv6_value, &tmp_ipv6, sizeof(struct in6_addr));
+ return AF_INET6;
+ }
+ return 0;
+ }
+
+ /**
+ * Calculate a 128-bit mask given a network prefix.
+ */
+ void in6_addr_mask(struct in6_addr *mask, uint8_t bits)
+ {
+ for (uint8_t i = 0; i < sizeof(mask->s6_addr); i++)
+ {
+ mask->s6_addr[i] = bits ? (uint8_t)0xFF << (8 - (bits > 8 ? 8 : bits)) : 0;
+
+ if (bits < 8)
+ bits = 0;
+ else
+ bits -= 8;
+ }
+ }
+
+ /**
+ * Calculate the first address in a network given a mask.
+ */
+ void in6_addr_network(struct in6_addr *network, const struct in6_addr *addr, const struct in6_addr *mask)
+ {
+ for (uint8_t i = 0; i < sizeof(network->s6_addr); i++)
+ {
+ network->s6_addr[i] = addr->s6_addr[i] & mask->s6_addr[i];
+ }
+ }
+
+ /**
+ * Calculate the last address in a network given a mask.
+ */
+ void in6_addr_end(struct in6_addr *end, const struct in6_addr *addr, const struct in6_addr *mask)
+ {
+ for (uint8_t i = 0; i < sizeof(end->s6_addr); i++)
+ {
+ end->s6_addr[i] = (addr->s6_addr[i] & mask->s6_addr[i]) | ~mask->s6_addr[i];
+ }
+ }
+
+ int stm_ipv4_cidr_to_range(uint32_t ipaddr, uint32_t ipmask, uint32_t iprange[2])
+ {
+ uint32_t network_addr = ipaddr & ipmask;
+ uint32_t broadcast_addr = (ipaddr & ipmask) | ~ipmask;
+ iprange[0] = network_addr;
+ iprange[1] = broadcast_addr;
+ return 0;
+ }
+
+ int stm_ipv6_cidr_to_range(const struct in6_addr *ipaddr, const struct in6_addr *ipmask, struct in6_addr iprange[2])
+ {
+ struct in6_addr network_addr = {};
+ struct in6_addr broadcast_addr = {};
+ in6_addr_network(&network_addr, ipaddr, ipmask);
+ in6_addr_end(&broadcast_addr, ipaddr, ipmask);
+ memcpy(&iprange[0], &network_addr, sizeof(struct in6_addr));
+ memcpy(&iprange[1], &broadcast_addr, sizeof(struct in6_addr));
+
+ return 0;
+ }
+
+ uint32_t stm_ip_cidr_pton(const char *ipcidr, void *ipv4_value, void *ipv4_mask, void *ipv6_value, void *ipv6_mask)
+ {
+ int ipver = 0;
+ const char *delim = "/";
+ char *save_ptr;
+
+ if (NULL == ipcidr || NULL == ipv4_value || NULL == ipv4_mask || NULL == ipv6_value || NULL == ipv6_mask)
+ {
+ return 0;
+ }
+
+ if (strchr(ipcidr, '/') == NULL)
+ {
+ return 0;
+ }
+ char *local_ip_cidr = strdup(ipcidr);
+
+ char *ipaddr = strtok_r(local_ip_cidr, delim, &save_ptr);
+ ipver = stm_inet_pton(ipaddr, ipv4_value, ipv6_value);
+ if (ipver != AF_INET && ipver != AF_INET6)
+ {
+ free(local_ip_cidr);
+ return 0;
+ }
+
+ char *cidr = strtok_r(NULL, delim, &save_ptr);
+ if (NULL == cidr)
+ {
+ free(local_ip_cidr);
+ return 0;
+ }
+ int cidr_num = atoi(cidr);
+ if (ipver == AF_INET)
+ {
+ if (cidr_num <= 0 || cidr_num > 32)
+ {
+ free(local_ip_cidr);
+ return 0;
+ }
+ uint32_t mask = 0;
+ for (int i = 0; i < cidr_num; i++)
+ {
+ mask |= (uint32_t)1 << (31 - i);
+ }
+ mask = ntohl(mask);
+ memcpy(ipv4_mask, &mask, sizeof(int));
+ }
+ else if (ipver == AF_INET6)
+ {
+ if (cidr_num <= 0 || cidr_num > 128)
+ {
+ free(local_ip_cidr);
+ return -1;
+ }
+ struct in6_addr mask = {};
+ for (int i = 0; i < cidr_num; i++)
+ {
+ mask.s6_addr[i / 8] |= 1 << (7 - i % 8);
+ }
+ memcpy(ipv6_mask, &mask, sizeof(struct in6_addr));
+ }
+
+ while (strtok_r(NULL, delim, &save_ptr))
+ ;
+
+ free(local_ip_cidr);
+ return ipver;
+ }
+
+ void stm_mem_fill_rand(char *buf, size_t len, unsigned char range_min, unsigned char range_max)
+ {
+ unsigned char *p = (unsigned char *)buf;
+ for (size_t i = 0; i < len; i++)
+ {
+ p[i] = (rand() % range_min + 1) + (range_max - range_min);
+ }
+ }
+
+ int stm_string_isdigit(const char *str)
+ {
+ if (NULL == str || strlen(str) == 0)
+ {
+ return 0;
+ }
+ for (size_t i = 0; i < strlen(str); i++)
+ {
+ if (!isdigit(str[i]))
+ {
+ return 0;
+ }
+ }
+ return 1;
+ }
+
+ /*
+ * input("hello, world!", "hello", "hi", &output) -> output = "hi, world!"
+ * return: the number of sub string have been replaced.
+ */
+ int stm_replace_str(const char *raw, const char *search, const char *replace, char **outline)
+ {
+ const char *p = NULL;
+ int search_len = strlen(search);
+ int replace_len = strlen(replace);
+ int raw_len = strlen(raw);
+
+ p = strstr(raw, search);
+ if (p == NULL)
+ {
+ return 0;
+ }
+ const char *remain_ptr = p + search_len;
+ char *new_line = (char *)calloc(1, replace_len + (raw_len - search_len) + 1);
+ strcpy(new_line, replace);
+ strcpy(new_line + replace_len, remain_ptr);
+
+ *outline = new_line;
+ return 1;
+ }
+
+ int stm_timeout(struct timeval start, struct timeval end, int timeout_sec)
+ {
+ return (end.tv_sec * 1000 * 1000 + end.tv_usec) - (start.tv_sec * 1000 * 1000 + start.tv_usec) >= timeout_sec * 1000 * 1000;
+ }
+
+ struct monitor_reply *monitor_reply_nil(void)
+ {
+ struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
+ reply->type = MONITOR_REPLY_NIL;
+ reply->http_code = HTTP_OK;
+ reply->http_reason = "OK";
+ return reply;
+ }
+
+ /* string should without "\r\n", will add "\r\n" in monitor_reply_to_string() */
+ struct monitor_reply *monitor_reply_new_string(const char *format, ...)
+ {
+ struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
+ reply->type = MONITOR_REPLY_STRING;
+ reply->http_code = HTTP_OK;
+ reply->http_reason = "OK";
+ va_list ap;
+ va_start(ap, format);
+ reply->str = sdscatvprintf(sdsempty(), format, ap);
+ reply->len = strlen(reply->str);
+ va_end(ap);
+ return reply;
+ }
+
+ /* string should without "\r\n", will add "\r\n" in monitor_reply_to_string() */
+ struct monitor_reply *monitor_reply_new_error(const char *format, ...)
+ {
+ struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
+ reply->type = MONITOR_REPLY_ERROR;
+ reply->http_code = HTTP_BADREQUEST;
+ reply->http_reason = "ERROR";
+ va_list ap;
+ va_start(ap, format);
+ reply->str = sdscatvprintf(sdsempty(), format, ap);
+ reply->len = strlen(reply->str);
+ va_end(ap);
+ return reply;
+ }
+
+ struct monitor_reply *monitor_reply_new_status(const char *format, ...)
+ {
+ struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
+ reply->type = MONITOR_REPLY_STATUS;
+ reply->http_code = HTTP_OK;
+ reply->http_reason = "OK";
+ va_list ap;
+ va_start(ap, format);
+ reply->str = sdscatvprintf(sdsempty(), format, ap);
+ reply->len = strlen(reply->str);
+ va_end(ap);
+ return reply;
+ }
+
+ struct monitor_reply *monitor_reply_new_integer(long long integer)
+ {
+ struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
+ reply->type = MONITOR_REPLY_INTEGER;
+ reply->http_code = HTTP_OK;
+ reply->http_reason = "OK";
+ reply->integer = integer;
+ return reply;
+ }
+
+ struct monitor_reply *monitor_reply_new_double(double dval)
+ {
+ struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
+ reply->type = MONITOR_REPLY_DOUBLE;
+ reply->http_code = HTTP_OK;
+ reply->http_reason = "OK";
+ reply->dval = dval;
+ return reply;
+ }
+
+ sds monitor_reply_to_string(const struct monitor_reply *reply)
+ {
+ sds res = sdsempty();
+ switch (reply->type)
+ {
+ case MONITOR_REPLY_INTEGER:
+ res = sdscatprintf(res, "(integer) %lld\r\n", reply->integer);
+ break;
+ case MONITOR_REPLY_DOUBLE:
+ res = sdscatprintf(res, "(double) %f\r\n", reply->dval);
+ break;
+ case MONITOR_REPLY_STRING:
+ case MONITOR_REPLY_STATUS:
+ res = sdscatlen(res, reply->str, reply->len);
+ res = sdscat(res, "\r\n");
+ break;
+ case MONITOR_REPLY_NIL:
+ res = sdscat(res, "(nil)\r\n");
+ break;
+ case MONITOR_REPLY_ERROR:
+ res = sdscatprintf(res, "(error) %s\r\n", reply->str);
+ break;
+ default:
+ break;
+ }
+ return res;
+ }
+
+ void monitor_reply_free(struct monitor_reply *reply)
+ {
+ switch (reply->type)
+ {
+ case MONITOR_REPLY_STRING:
+ case MONITOR_REPLY_ERROR:
+ case MONITOR_REPLY_STATUS:
+ sdsfree(reply->str);
+ reply->str = NULL;
+ break;
+ default:
+ break;
+ }
+ free(reply);
+ }
+
+ static struct monitor_cli_args *monitor_util_search_cmd_args(const char *opt_name, const struct monitor_cli_args cli_args[], size_t cli_args_array_size)
+ {
+ if (NULL == cli_args)
+ {
+ return NULL;
+ }
+ for (size_t i = 0; i < cli_args_array_size; i++)
+ {
+ if (stm_strncasecmp_exactly(opt_name, cli_args[i].short_opt, strlen(cli_args[i].short_opt)) == 0 || stm_strncasecmp_exactly(opt_name, cli_args[i].long_opt, strlen(cli_args[i].long_opt)) == 0)
+ {
+ return (struct monitor_cli_args *)&cli_args[i];
+ }
+ }
+ return NULL;
+ }
+
+ sds monitor_util_copy_arg_value(int argc, const char *argv[], int *copy_args_num)
+ {
+ sds res = sdsempty();
+ int num = 0;
+ for (int i = 0; i < argc; i++)
+ {
+ if ((strncmp(argv[i], "-", 1) == 0) || (strncmp(argv[i], "--", 2) == 0))
+ {
+ break;
+ }
+ else
+ {
+ res = sdscat(res, argv[i]);
+ if ((i + 1 < argc) && (strncmp(argv[i + 1], "-", 1) != 0) && (strncmp(argv[i + 1], "--", 2) != 0)) // not the last one
+ {
+ res = sdscat(res, " ");
+ }
+ num++;
+ }
+ }
+ *copy_args_num = num;
+ return res;
+ }
+
+ int monitor_util_parse_cmd_args(int argc, const char *argv[], struct monitor_cli_args cli_args[], size_t cli_args_array_size)
+ {
+ if (argc <= 0 || NULL == argv || NULL == cli_args)
+ {
+ return -1;
+ }
+ int parse_stage = 0; // 0: arg; 1: value
+ struct monitor_cli_args *one_arg = NULL;
+ for (int i = 1; i < argc;) // skip program self name
+ {
+ if (0 == parse_stage)
+ {
+ one_arg = monitor_util_search_cmd_args(argv[i], cli_args, cli_args_array_size);
+ if (NULL == one_arg)
+ {
+ fprintf(stderr, "unknown option: %s\n", argv[i]);
+ return -1;
+ }
+ if (one_arg->require_arg_value && (i + 1 >= argc))
+ {
+ fprintf(stderr, "option requires value: %s\n", argv[i]);
+ return -1;
+ }
+ parse_stage = 1;
+ i += 1;
+ }
+ else
+ {
+ if (NULL == one_arg)
+ {
+ fprintf(stderr, "unknown option: %s\n", argv[i]);
+ return -1;
+ }
+ int copy_args_num = 0;
+ if (one_arg->value_is_multi_words)
+ {
+ one_arg->value = monitor_util_copy_arg_value(argc - i, &argv[i], &copy_args_num);
+ }
+ else
+ {
+ one_arg->value = sdsnew(argv[i]);
+ copy_args_num = 1;
+ }
+ i += copy_args_num;
+ parse_stage = 0;
+ one_arg = NULL;
+ }
+ }
+ return 0;
+ }
+
+ sds stm_config_print(const struct stellar_monitor_config *config)
+ {
+ sds res = sdsempty();
+ res = sdscatprintf(res, "cli_request_timeout: %ds, ", config->cli_request_timeout);
+ res = sdscatprintf(res, "connection_idle_timeout: %ds, ", config->connection_idle_timeout);
+ res = sdscatprintf(res, "listen_ip_addr: %s, ", config->listen_ipaddr);
+ res = sdscatprintf(res, "listen_port %u, ", config->listen_port_host_order);
+ res = sdscatprintf(res, "data_link_bind_port: %u, ", config->data_link_bind_port_host_order);
+ res = sdscatprintf(res, "stat_output_path: %s, ", config->output_path);
+ res = sdscatprintf(res, "stat_output_interval_ms: %dms ", config->output_interval_ms);
+ return res;
+ }
+
+ char *stm_http_url_encode(const char *originalText)
+ {
+ // allocate memory for the worst possible case (all characters need to be encoded)
+ char *encodedText = (char *)malloc(sizeof(char) * strlen(originalText) * 3 + 1);
+ const char *hex = "0123456789abcdef";
+ int pos = 0;
+ for (size_t i = 0; i < strlen(originalText); i++)
+ {
+ if (('a' <= originalText[i] && originalText[i] <= 'z') || ('A' <= originalText[i] && originalText[i] <= 'Z') || ('0' <= originalText[i] && originalText[i] <= '9'))
+ {
+ encodedText[pos++] = originalText[i];
+ }
+ else if (originalText[i] == ' ')
+ {
+ encodedText[pos++] = '%';
+ encodedText[pos++] = hex[originalText[i] >> 4];
+ encodedText[pos++] = hex[originalText[i] & 15];
+ }
+ /* todo: other characters ? */
+ else
+ {
+ encodedText[pos++] = originalText[i];
+ }
+ }
+ encodedText[pos] = '\0';
+ return encodedText;
+ }
+
+ const char *stm_get0_readable_session_addr(const struct tuple6 *addr, char *addr_buf, size_t max_buf_len)
+ {
+ tuple4_to_str((struct tuple4 *)addr, addr_buf, max_buf_len);
+ return addr_buf;
+ }
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/infra/monitor/monitor_utils.h b/infra/monitor/monitor_utils.h
new file mode 100644
index 0000000..2ccecd2
--- /dev/null
+++ b/infra/monitor/monitor_utils.h
@@ -0,0 +1,30 @@
+#pragma once
+#include <stdint.h>
+#include <stddef.h>
+#include <time.h>
+#include <netinet/in.h>
+#include "stellar/session.h"
+#include "tuple/tuple.h"
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+ int stm_replace_str(const char *raw, const char *search, const char *replace, char **outline);
+ int stm_strncasecmp_exactly(const char *s1, const char *s2, size_t n);
+ struct stm_cmd_reply *stm_build_reply_error(const char *format, ...);
+ struct stm_cmd_reply *stm_cmd_reply_dup(const struct stm_cmd_reply *src);
+ const char *stm_session_flow_dir_ntop(uint32_t dir);
+ int stm_time_range_pton(const char *time_str, time_t now, time_t time_range[2]);
+ int stm_time_in_range(time_t t, const time_t time_range[2]);
+ uint32_t stm_inet_pton(const char *ipstr, void *ipv4_value, void *ipv6_value);
+ uint32_t stm_ip_cidr_pton(const char *ipcidr, void *ipv4_value, void *ipv4_mask, void *ipv6_value, void *ipv6_mask);
+ int stm_ipv4_cidr_to_range(uint32_t ipaddr, uint32_t ipmask, uint32_t iprange[2]);
+ int stm_ipv6_cidr_to_range(const struct in6_addr *ipaddr, const struct in6_addr *ipmask, struct in6_addr iprange[2]);
+ void stm_mem_fill_rand(char *buf, size_t len, unsigned char range_min, unsigned char range_max);
+ int stm_string_isdigit(const char *str);
+ int stm_timeout(struct timeval start, struct timeval end, int timeout_sec);
+ const char *stm_session_state_ntop(enum session_state state);
+ const char *stm_get0_readable_session_addr(const struct tuple6 *addr, char *addr_buf, size_t max_buf_len);
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/infra/monitor/version.map b/infra/monitor/version.map
new file mode 100644
index 0000000..5958f4c
--- /dev/null
+++ b/infra/monitor/version.map
@@ -0,0 +1,10 @@
+VERS_3.0{
+global:
+ extern "C" {
+ stellar_monitor_*;
+ monitor_*;
+ stm_*;
+ stellar_module_get_monitor;
+ };
+ local: *;
+};