summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-09-02 16:03:08 +0800
committerluwenpeng <[email protected]>2024-09-02 16:55:00 +0800
commitbeb7d2f0ca4f62d96cbe2295bb2bc482f0492d02 (patch)
tree4280cb834345111f41c36c2d7ea1a72582738e61
parent9069dceae7b880e12ef4c9427574d5431c3e8107 (diff)
stellar stat parses output related configuration items
-rw-r--r--conf/stellar.toml6
-rw-r--r--infra/core/CMakeLists.txt2
-rw-r--r--infra/core/stellar_config.c109
-rw-r--r--infra/core/stellar_config.h24
-rw-r--r--infra/core/stellar_core.c116
-rw-r--r--infra/core/stellar_stat.c230
-rw-r--r--infra/core/stellar_stat.h23
-rw-r--r--test/decoders/http/test_based_on_stellar/env/stellar.toml6
-rw-r--r--test/packet_inject/conf/stellar.toml6
9 files changed, 228 insertions, 294 deletions
diff --git a/conf/stellar.toml b/conf/stellar.toml
index 986c18b..f5d4040 100644
--- a/conf/stellar.toml
+++ b/conf/stellar.toml
@@ -59,6 +59,6 @@
timeout_ms = 10000 # range: [1, 60000] (ms)
buffered_segments_max = 256 # range: [2, 4096] per flow
-[schedule]
- merge_stat_interval = 500 # range: [1, 60000] (ms)
- output_stat_interval = 2000 # range: [1, 60000] (ms)
+[stat]
+ merge_interval_ms = 500 # range: [0, 60000] (ms)
+ output_interval_ms = 1000 # range: [0, 60000] (ms) \ No newline at end of file
diff --git a/infra/core/CMakeLists.txt b/infra/core/CMakeLists.txt
index ea74acc..89b5b0f 100644
--- a/infra/core/CMakeLists.txt
+++ b/infra/core/CMakeLists.txt
@@ -1,3 +1,3 @@
-add_library(core stellar_config.c stellar_stat.c stellar_core.c)
+add_library(core stellar_stat.c stellar_core.c)
target_link_libraries(core PUBLIC packet_io ip_reassembly plugin_manager)
diff --git a/infra/core/stellar_config.c b/infra/core/stellar_config.c
deleted file mode 100644
index df9cb52..0000000
--- a/infra/core/stellar_config.c
+++ /dev/null
@@ -1,109 +0,0 @@
-#include <errno.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "toml.h"
-#include "log_private.h"
-#include "stellar_config.h"
-
-#define CONFIG_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "config", format, ##__VA_ARGS__)
-#define CONFIG_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "config", format, ##__VA_ARGS__)
-
-// return 0: success
-// retuun -1: failed
-static int parse_schedule_options(toml_table_t *root, struct schedule_options *opts)
-{
- const char *ptr;
- toml_table_t *table;
-
- table = toml_table_in(root, "schedule");
- if (table == NULL)
- {
- CONFIG_LOG_ERROR("config file missing schedule section");
- return -1;
- }
-
- ptr = toml_raw_in(table, "merge_stat_interval");
- if (ptr == NULL)
- {
- CONFIG_LOG_ERROR("config file missing schedule->merge_stat_interval");
- return -1;
- }
- opts->merge_stat_interval = atoll(ptr);
- if (opts->merge_stat_interval < 1 || opts->merge_stat_interval > 60000)
- {
- CONFIG_LOG_ERROR("config file invalid schedule->merge_stat_interval %ld, range [1, 60000]", opts->merge_stat_interval);
- return -1;
- }
-
- ptr = toml_raw_in(table, "output_stat_interval");
- if (ptr == NULL)
- {
- CONFIG_LOG_ERROR("config file missing schedule->output_stat_interval");
- return -1;
- }
- opts->output_stat_interval = atoll(ptr);
- if (opts->output_stat_interval < 1 || opts->output_stat_interval > 60000)
- {
- CONFIG_LOG_ERROR("config file invalid schedule->output_stat_interval %ld, range [1, 60000]", opts->output_stat_interval);
- return -1;
- }
-
- return 0;
-}
-
-// return 0: success
-// retuun -1: failed
-int stellar_config_load(struct stellar_config *config, const char *file)
-{
- int ret = -1;
- char errbuf[200];
- FILE *fp = NULL;
- toml_table_t *table = NULL;
-
- fp = fopen(file, "r");
- if (fp == NULL)
- {
- CONFIG_LOG_ERROR("open config file %s failed, %s", file, strerror(errno));
- goto error_out;
- }
-
- table = toml_parse_file(fp, errbuf, sizeof(errbuf));
- if (table == NULL)
- {
- CONFIG_LOG_ERROR("parse config file %s failed, %s", file, errbuf);
- goto error_out;
- }
-
- if (parse_schedule_options(table, &config->sched_opts) != 0)
- {
- goto error_out;
- }
-
- ret = 0;
-
-error_out:
- if (table)
- {
- toml_free(table);
- }
-
- if (fp)
- {
- fclose(fp);
- }
-
- return ret;
-}
-
-void stellar_config_print(const struct stellar_config *config)
-{
- if (config == NULL)
- {
- return;
- }
-
- // schedule config
- CONFIG_LOG_DEBUG("schedule->merge_stat_interval : %ld", config->sched_opts.merge_stat_interval);
- CONFIG_LOG_DEBUG("schedule->output_stat_interval : %ld", config->sched_opts.output_stat_interval);
-}
diff --git a/infra/core/stellar_config.h b/infra/core/stellar_config.h
deleted file mode 100644
index 8cb5f07..0000000
--- a/infra/core/stellar_config.h
+++ /dev/null
@@ -1,24 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-struct schedule_options
-{
- uint64_t merge_stat_interval; // range: [1, 60000] (ms)
- uint64_t output_stat_interval; // range: [1, 60000] (ms)
-};
-
-struct stellar_config
-{
- struct schedule_options sched_opts;
-};
-
-int stellar_config_load(struct stellar_config *config, const char *file);
-void stellar_config_print(const struct stellar_config *config);
-
-#ifdef __cplusplus
-}
-#endif
diff --git a/infra/core/stellar_core.c b/infra/core/stellar_core.c
index bcd8c96..197225a 100644
--- a/infra/core/stellar_core.c
+++ b/infra/core/stellar_core.c
@@ -13,7 +13,6 @@
#include "log_private.h"
#include "stellar_stat.h"
#include "stellar_core.h"
-#include "stellar_config.h"
#include "packet_private.h"
#include "plugin_manager.h"
#include "session_private.h"
@@ -41,7 +40,6 @@ struct stellar_thread
pthread_t tid;
uint16_t idx;
uint64_t is_runing;
- uint64_t last_merge_thread_stat_timestamp;
struct ip_reassembly *ip_reass;
struct session_manager *sess_mgr;
struct stellar *st;
@@ -50,24 +48,27 @@ struct stellar_thread
struct stellar_runtime
{
uint64_t need_exit;
- uint64_t stat_last_output_ts;
struct logger *logger;
struct stellar_stat *stat;
struct packet_io *packet_io;
struct plugin_manager_schema *plug_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
+};
- // config
+struct stellar_config
+{
uint64_t instance_id;
struct session_manager_config *sess_mgr_cfg;
struct ip_reassembly_config *ip_reass_cfg;
struct packet_io_config *pkt_io_cfg;
+ struct stellar_stat_config *stat_cfg;
};
struct stellar
{
- struct stellar_runtime runtime;
+ char magic[2]; // for check memory corruption
struct stellar_config config;
+ struct stellar_runtime runtime;
};
static __thread uint16_t __current_thread_idx = UINT16_MAX;
@@ -131,17 +132,14 @@ static void *worker_thread(void *arg)
struct session_manager *sess_mgr = thread->sess_mgr;
struct session_manager_stat *sess_stat = session_manager_stat(sess_mgr);
struct stellar *st = thread->st;
- struct stellar_config *config = &st->config;
struct stellar_runtime *runtime = &st->runtime;
struct packet_io *packet_io = runtime->packet_io;
struct plugin_manager_schema *plug_mgr = runtime->plug_mgr;
struct thread_stat thr_stat = {
- .packet_io = packet_io_stat(packet_io, thread->idx),
- .ip_reassembly = ip_reassembly_stat(ip_reass),
- .session_mgr = session_manager_stat(sess_mgr),
+ .pkt_io = packet_io_stat(packet_io, thread->idx),
+ .ip_reass = ip_reassembly_stat(ip_reass),
+ .sess_mgr = session_manager_stat(sess_mgr),
};
-
- uint64_t merge_stat_interval = config->sched_opts.merge_stat_interval;
uint16_t thr_idx = thread->idx;
__current_thread_idx = thr_idx;
@@ -222,10 +220,6 @@ static void *worker_thread(void *arg)
goto fast_path;
}
}
- if (packet_get_session_id(pkt) == 0)
- {
- packet_set_session_id(pkt, session_get_id(sess));
- }
plugin_manager_on_session_input(sess, pkt);
fast_path:
@@ -285,13 +279,7 @@ static void *worker_thread(void *arg)
clean_session(sess_mgr, now_ms);
ip_reassembly_expire(ip_reass, now_ms);
plugin_manager_on_polling(plug_mgr);
-
- // per merge_stat_interval merge thread stat
- if (now_ms - thread->last_merge_thread_stat_timestamp >= merge_stat_interval)
- {
- stellar_stat_merge(runtime->stat, &thr_stat, thread->idx);
- thread->last_merge_thread_stat_timestamp = now_ms;
- }
+ stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms);
if (nr_pkt_received == 0)
{
@@ -307,7 +295,7 @@ static void *worker_thread(void *arg)
usleep(1000); // 1ms
}
- stellar_stat_merge(runtime->stat, &thr_stat, thread->idx);
+ stellar_stat_merge(runtime->stat, &thr_stat, thread->idx, UINT64_MAX);
stellar_stat_print(runtime->stat, &thr_stat, thread->idx);
ATOMIC_SET(&thread->is_runing, 0);
@@ -323,25 +311,24 @@ static void *worker_thread(void *arg)
static int stellar_thread_init(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
+ struct stellar_config *config = &st->config;
uint64_t now_ms = clock_get_real_time_ms();
- for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
+ for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
thread->idx = i;
thread->is_runing = 0;
- thread->last_merge_thread_stat_timestamp = now_ms;
-
- runtime->sess_mgr_cfg->session_id_seed = runtime->instance_id << 8 | i;
- thread->sess_mgr = session_manager_new(runtime->sess_mgr_cfg, now_ms);
+ config->sess_mgr_cfg->session_id_seed = config->instance_id << 8 | i;
+ thread->sess_mgr = session_manager_new(config->sess_mgr_cfg, now_ms);
if (thread->sess_mgr == NULL)
{
CORE_LOG_ERROR("unable to create session manager");
return -1;
}
- thread->ip_reass = ip_reassembly_new(runtime->ip_reass_cfg, now_ms);
+ thread->ip_reass = ip_reassembly_new(config->ip_reass_cfg, now_ms);
if (thread->ip_reass == NULL)
{
CORE_LOG_ERROR("unable to create ip reassemble manager");
@@ -356,9 +343,10 @@ static int stellar_thread_init(struct stellar *st)
static void stellar_thread_clean(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
+ struct stellar_config *config = &st->config;
CORE_LOG_FATAL("cleaning worker thread context ...");
- for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
+ for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
if (ATOMIC_READ(&thread->is_runing) == 0)
@@ -373,8 +361,9 @@ static void stellar_thread_clean(struct stellar *st)
static int stellar_thread_run(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
+ struct stellar_config *config = &st->config;
- for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
+ for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0)
@@ -390,9 +379,10 @@ static int stellar_thread_run(struct stellar *st)
static void stellar_thread_join(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
+ struct stellar_config *config = &st->config;
CORE_LOG_FATAL("waiting worker thread stop ...");
- for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
+ for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
while (ATOMIC_READ(&thread->is_runing) == 1)
@@ -426,7 +416,8 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
{
return NULL;
}
-
+ st->magic[0] = 0x4d;
+ st->magic[1] = 0x5a;
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
@@ -442,42 +433,43 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
CORE_LOG_FATAL("plugin config file : %s", plugin_cfg_file);
CORE_LOG_FATAL("log config file : %s", log_cfg_file);
- if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&runtime->instance_id, 0, 4095) != 0)
+ if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&config->instance_id, 0, 4095) != 0)
{
CORE_LOG_ERROR("unable to load instance id");
goto error_out;
}
- runtime->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file);
- if (runtime->sess_mgr_cfg == NULL)
+ config->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file);
+ if (config->sess_mgr_cfg == NULL)
{
CORE_LOG_ERROR("unable to create session manager config");
goto error_out;
}
- runtime->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file);
- if (runtime->ip_reass_cfg == NULL)
+ config->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file);
+ if (config->ip_reass_cfg == NULL)
{
CORE_LOG_ERROR("unable to create ip reassembly config");
goto error_out;
}
- runtime->pkt_io_cfg = packet_io_config_new(stellar_cfg_file);
- if (runtime->pkt_io_cfg == NULL)
+ config->pkt_io_cfg = packet_io_config_new(stellar_cfg_file);
+ if (config->pkt_io_cfg == NULL)
{
CORE_LOG_ERROR("unable to create packet io config");
goto error_out;
}
-
- packet_io_config_print(runtime->pkt_io_cfg);
- session_manager_config_print(runtime->sess_mgr_cfg);
- ip_reassembly_config_print(runtime->ip_reass_cfg);
- if (stellar_config_load(config, stellar_cfg_file) != 0)
+ config->stat_cfg = stellar_stat_config_new(stellar_cfg_file);
+ if (config->stat_cfg == NULL)
{
- CORE_LOG_ERROR("unable to load config file");
+ CORE_LOG_ERROR("unable to create stellar stat config");
goto error_out;
}
- stellar_config_print(config);
- runtime->stat = stellar_stat_new(runtime->pkt_io_cfg->nr_worker_thread);
+ session_manager_config_print(config->sess_mgr_cfg);
+ ip_reassembly_config_print(config->ip_reass_cfg);
+ packet_io_config_print(config->pkt_io_cfg);
+ stellar_stat_config_print(config->stat_cfg);
+
+ runtime->stat = stellar_stat_new(config->stat_cfg, clock_get_real_time_ms());
if (runtime->stat == NULL)
{
CORE_LOG_ERROR("unable to create stellar stat");
@@ -490,7 +482,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
goto error_out;
}
- runtime->packet_io = packet_io_new(runtime->pkt_io_cfg);
+ runtime->packet_io = packet_io_new(config->pkt_io_cfg);
if (runtime->packet_io == NULL)
{
CORE_LOG_ERROR("unable to create packet io");
@@ -523,7 +515,6 @@ void stellar_run(struct stellar *st)
}
struct stellar_runtime *runtime = &st->runtime;
- struct stellar_config *config = &st->config;
if (stellar_thread_run(st) != 0)
{
@@ -531,14 +522,16 @@ void stellar_run(struct stellar *st)
return;
}
- runtime->stat_last_output_ts = clock_get_real_time_ms();
while (!ATOMIC_READ(&runtime->need_exit))
{
- if (clock_get_real_time_ms() - runtime->stat_last_output_ts >= config->sched_opts.output_stat_interval)
+ if (st->magic[0] != 0x4d || st->magic[1] != 0x5a)
{
- runtime->stat_last_output_ts = clock_get_real_time_ms();
- stellar_stat_output(runtime->stat);
+ CORE_LOG_FATAL("memory corruption detected");
+ ATOMIC_SET(&runtime->need_exit, 1);
+ break;
}
+
+ stellar_stat_output(runtime->stat, clock_get_real_time_ms());
usleep(1000); // 1ms
// only available in pcap mode
@@ -546,13 +539,12 @@ void stellar_run(struct stellar *st)
{
ATOMIC_SET(&runtime->need_exit, 1);
CORE_LOG_FATAL("notify worker thread to exit");
- stellar_stat_output(runtime->stat); // flush stat
break;
}
}
stellar_thread_join(st);
- stellar_stat_output(runtime->stat);
+ stellar_stat_output(runtime->stat, UINT64_MAX);
}
void stellar_free(struct stellar *st)
@@ -560,14 +552,18 @@ void stellar_free(struct stellar *st)
if (st)
{
struct stellar_runtime *runtime = &st->runtime;
+ struct stellar_config *config = &st->config;
stellar_thread_clean(st);
packet_io_free(runtime->packet_io);
plugin_manager_exit(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
- packet_io_config_free(runtime->pkt_io_cfg);
- ip_reassembly_config_free(runtime->ip_reass_cfg);
- session_manager_config_free(runtime->sess_mgr_cfg);
+
+ session_manager_config_free(config->sess_mgr_cfg);
+ ip_reassembly_config_free(config->ip_reass_cfg);
+ packet_io_config_free(config->pkt_io_cfg);
+ stellar_stat_config_free(config->stat_cfg);
+
CORE_LOG_FATAL("stellar exit\n");
log_free(runtime->logger);
@@ -654,7 +650,7 @@ void stellar_send_build_packet(struct stellar *st, struct packet *pkt)
int stellar_get_worker_thread_num(struct stellar *st)
{
- return st->runtime.pkt_io_cfg->nr_worker_thread;
+ return st->config.pkt_io_cfg->nr_worker_thread;
}
struct logger *stellar_get_logger(struct stellar *st)
diff --git a/infra/core/stellar_stat.c b/infra/core/stellar_stat.c
index 6171ecb..408929f 100644
--- a/infra/core/stellar_stat.c
+++ b/infra/core/stellar_stat.c
@@ -237,13 +237,14 @@ const char *name[] = {
struct stellar_stat
{
- uint16_t nr_thread;
- char output_file[2048];
+ struct stellar_stat_config cfg;
struct fieldstat_easy *fs;
+ uint64_t last_merge_stat_ts;
+ uint64_t last_output_stat_ts;
+
int flag[MAX_THREAD_NUM]; // IS_FREE or IS_BUSY
struct thread_stat thr_stat[MAX_THREAD_NUM];
-
uint64_t stat_idx[STAT_TYPE_MAX];
uint64_t stat_val[STAT_TYPE_MAX];
};
@@ -254,177 +255,177 @@ uint64_t get_stat_value_by_idx(const struct thread_stat *thr_stat, size_t idx)
{
// device packet
case STAT_TYPE_PKTS_RX:
- return thr_stat->packet_io->pkts_rx;
+ return thr_stat->pkt_io->pkts_rx;
case STAT_TYPE_BYTES_RX:
- return thr_stat->packet_io->bytes_rx;
+ return thr_stat->pkt_io->bytes_rx;
case STAT_TYPE_PKTS_TX:
- return thr_stat->packet_io->pkts_tx;
+ return thr_stat->pkt_io->pkts_tx;
case STAT_TYPE_BYTES_TX:
- return thr_stat->packet_io->bytes_tx;
+ return thr_stat->pkt_io->bytes_tx;
// keep-alive packet
case STAT_TYPE_KEEP_ALIVE_PKTS:
- return thr_stat->packet_io->keep_alive_pkts;
+ return thr_stat->pkt_io->keep_alive_pkts;
case STAT_TYPE_KEEP_ALIVE_BYTES:
- return thr_stat->packet_io->keep_alive_bytes;
+ return thr_stat->pkt_io->keep_alive_bytes;
// raw packet
case STAT_TYPE_RAW_PKTS_RX:
- return thr_stat->packet_io->raw_pkts_rx;
+ return thr_stat->pkt_io->raw_pkts_rx;
case STAT_TYPE_RAW_BYTES_RX:
- return thr_stat->packet_io->raw_bytes_rx;
+ return thr_stat->pkt_io->raw_bytes_rx;
case STAT_TYPE_RAW_PKTS_TX:
- return thr_stat->packet_io->raw_pkts_tx;
+ return thr_stat->pkt_io->raw_pkts_tx;
case STAT_TYPE_RAW_BYTES_TX:
- return thr_stat->packet_io->raw_bytes_tx;
+ return thr_stat->pkt_io->raw_bytes_tx;
// drop packet
case STAT_TYPE_PKTS_DROPPED:
- return thr_stat->packet_io->pkts_dropped;
+ return thr_stat->pkt_io->pkts_dropped;
case STAT_TYPE_BYTES_DROPPED:
- return thr_stat->packet_io->bytes_dropped;
+ return thr_stat->pkt_io->bytes_dropped;
// inject packet
case STAT_TYPE_PKTS_INJECTED:
- return thr_stat->packet_io->pkts_injected;
+ return thr_stat->pkt_io->pkts_injected;
case STAT_TYPE_BYTES_INJECTED:
- return thr_stat->packet_io->bytes_injected;
+ return thr_stat->pkt_io->bytes_injected;
// ctrl packet
case STAT_TYPE_CTRL_PKTS_RX:
- return thr_stat->packet_io->ctrl_pkts_rx;
+ return thr_stat->pkt_io->ctrl_pkts_rx;
case STAT_TYPE_CTRL_BYTES_RX:
- return thr_stat->packet_io->ctrl_bytes_rx;
+ return thr_stat->pkt_io->ctrl_bytes_rx;
case STAT_TYPE_CTRL_PKTS_TX:
- return thr_stat->packet_io->ctrl_pkts_tx;
+ return thr_stat->pkt_io->ctrl_pkts_tx;
case STAT_TYPE_CTRL_BYTES_TX:
- return thr_stat->packet_io->ctrl_bytes_tx;
+ return thr_stat->pkt_io->ctrl_bytes_tx;
// ipv4 reassembly
case STAT_TYPE_IP4_DEFRAGS_EXPECTED:
- return thr_stat->ip_reassembly->ip4_defrags_expected;
+ return thr_stat->ip_reass->ip4_defrags_expected;
case STAT_TYPE_IP4_DEFRAGS_SUCCEED:
- return thr_stat->ip_reassembly->ip4_defrags_succeed;
+ return thr_stat->ip_reass->ip4_defrags_succeed;
case STAT_TYPE_IP4_DEFRAGS_FAILED_TIMEOUT:
- return thr_stat->ip_reassembly->ip4_defrags_failed_timeout;
+ return thr_stat->ip_reass->ip4_defrags_failed_timeout;
case STAT_TYPE_IP4_DEFRAGS_FAILED_INVALID_LENGTH:
- return thr_stat->ip_reassembly->ip4_defrags_failed_invalid_length;
+ return thr_stat->ip_reass->ip4_defrags_failed_invalid_length;
case STAT_TYPE_IP4_DEFRAGS_FAILED_OVERLAP:
- return thr_stat->ip_reassembly->ip4_defrags_failed_overlap;
+ return thr_stat->ip_reass->ip4_defrags_failed_overlap;
case STAT_TYPE_IP4_DEFRAGS_FAILED_TOO_MANY_FRAG:
- return thr_stat->ip_reassembly->ip4_defrags_failed_too_many_frag;
+ return thr_stat->ip_reass->ip4_defrags_failed_too_many_frag;
case STAT_TYPE_IP4_FRAGS:
- return thr_stat->ip_reassembly->ip4_frags;
+ return thr_stat->ip_reass->ip4_frags;
case STAT_TYPE_IP4_FRAGS_FREED:
- return thr_stat->ip_reassembly->ip4_frags_freed;
+ return thr_stat->ip_reass->ip4_frags_freed;
case STAT_TYPE_IP4_FRAGS_BUFFERED:
- return thr_stat->ip_reassembly->ip4_frags_buffered;
+ return thr_stat->ip_reass->ip4_frags_buffered;
case STAT_TYPE_IP4_FRAGS_BYPASS_NO_BUFFER:
- return thr_stat->ip_reassembly->ip4_frags_bypass_no_buffer;
+ return thr_stat->ip_reass->ip4_frags_bypass_no_buffer;
case STAT_TYPE_IP4_FRAGS_BYPASS_DUP_FIST_FRAG:
- return thr_stat->ip_reassembly->ip4_frags_bypass_dup_fist_frag;
+ return thr_stat->ip_reass->ip4_frags_bypass_dup_fist_frag;
case STAT_TYPE_IP4_FRAGS_BYPASS_DUP_LAST_FRAG:
- return thr_stat->ip_reassembly->ip4_frags_bypass_dup_last_frag;
+ return thr_stat->ip_reass->ip4_frags_bypass_dup_last_frag;
// ipv6 reassembly
case STAT_TYPE_IP6_DEFRAGS_EXPECTED:
- return thr_stat->ip_reassembly->ip6_defrags_expected;
+ return thr_stat->ip_reass->ip6_defrags_expected;
case STAT_TYPE_IP6_DEFRAGS_SUCCEED:
- return thr_stat->ip_reassembly->ip6_defrags_succeed;
+ return thr_stat->ip_reass->ip6_defrags_succeed;
case STAT_TYPE_IP6_DEFRAGS_FAILED_TIMEOUT:
- return thr_stat->ip_reassembly->ip6_defrags_failed_timeout;
+ return thr_stat->ip_reass->ip6_defrags_failed_timeout;
case STAT_TYPE_IP6_DEFRAGS_FAILED_INVALID_LENGTH:
- return thr_stat->ip_reassembly->ip6_defrags_failed_invalid_length;
+ return thr_stat->ip_reass->ip6_defrags_failed_invalid_length;
case STAT_TYPE_IP6_DEFRAGS_FAILED_OVERLAP:
- return thr_stat->ip_reassembly->ip6_defrags_failed_overlap;
+ return thr_stat->ip_reass->ip6_defrags_failed_overlap;
case STAT_TYPE_IP6_DEFRAGS_FAILED_TOO_MANY_FRAG:
- return thr_stat->ip_reassembly->ip6_defrags_failed_too_many_frag;
+ return thr_stat->ip_reass->ip6_defrags_failed_too_many_frag;
case STAT_TYPE_IP6_FRAGS:
- return thr_stat->ip_reassembly->ip6_frags;
+ return thr_stat->ip_reass->ip6_frags;
case STAT_TYPE_IP6_FRAGS_FREED:
- return thr_stat->ip_reassembly->ip6_frags_freed;
+ return thr_stat->ip_reass->ip6_frags_freed;
case STAT_TYPE_IP6_FRAGS_BUFFERED:
- return thr_stat->ip_reassembly->ip6_frags_buffered;
+ return thr_stat->ip_reass->ip6_frags_buffered;
case STAT_TYPE_IP6_FRAGS_BYPASS_NO_BUFFER:
- return thr_stat->ip_reassembly->ip6_frags_bypass_no_buffer;
+ return thr_stat->ip_reass->ip6_frags_bypass_no_buffer;
case STAT_TYPE_IP6_FRAGS_BYPASS_DUP_FIST_FRAG:
- return thr_stat->ip_reassembly->ip6_frags_bypass_dup_fist_frag;
+ return thr_stat->ip_reass->ip6_frags_bypass_dup_fist_frag;
case STAT_TYPE_IP6_FRAGS_BYPASS_DUP_LAST_FRAG:
- return thr_stat->ip_reassembly->ip6_frags_bypass_dup_last_frag;
+ return thr_stat->ip_reass->ip6_frags_bypass_dup_last_frag;
// TCP session
case STAT_TYPE_HISTORY_TCP_SESSIONS:
- return thr_stat->session_mgr->history_tcp_sessions;
+ return thr_stat->sess_mgr->history_tcp_sessions;
case STAT_TYPE_TCP_SESS_USED:
- return thr_stat->session_mgr->tcp_sess_used;
+ return thr_stat->sess_mgr->tcp_sess_used;
case STAT_TYPE_TCP_SESS_OPENING:
- return thr_stat->session_mgr->tcp_sess_opening;
+ return thr_stat->sess_mgr->tcp_sess_opening;
case STAT_TYPE_TCP_SESS_ACTIVE:
- return thr_stat->session_mgr->tcp_sess_active;
+ return thr_stat->sess_mgr->tcp_sess_active;
case STAT_TYPE_TCP_SESS_CLOSING:
- return thr_stat->session_mgr->tcp_sess_closing;
+ return thr_stat->sess_mgr->tcp_sess_closing;
case STAT_TYPE_TCP_SESS_DISCARD:
- return thr_stat->session_mgr->tcp_sess_discard;
+ return thr_stat->sess_mgr->tcp_sess_discard;
case STAT_TYPE_TCP_SESS_CLOSED:
- return thr_stat->session_mgr->tcp_sess_closed;
+ return thr_stat->sess_mgr->tcp_sess_closed;
// UDP session
case STAT_TYPE_HISTORY_UDP_SESSIONS:
- return thr_stat->session_mgr->history_udp_sessions;
+ return thr_stat->sess_mgr->history_udp_sessions;
case STAT_TYPE_UDP_SESS_USED:
- return thr_stat->session_mgr->udp_sess_used;
+ return thr_stat->sess_mgr->udp_sess_used;
case STAT_TYPE_UDP_SESS_OPENING:
- return thr_stat->session_mgr->udp_sess_opening;
+ return thr_stat->sess_mgr->udp_sess_opening;
case STAT_TYPE_UDP_SESS_ACTIVE:
- return thr_stat->session_mgr->udp_sess_active;
+ return thr_stat->sess_mgr->udp_sess_active;
case STAT_TYPE_UDP_SESS_CLOSING:
- return thr_stat->session_mgr->udp_sess_closing;
+ return thr_stat->sess_mgr->udp_sess_closing;
case STAT_TYPE_UDP_SESS_DISCARD:
- return thr_stat->session_mgr->udp_sess_discard;
+ return thr_stat->sess_mgr->udp_sess_discard;
case STAT_TYPE_UDP_SESS_CLOSED:
- return thr_stat->session_mgr->udp_sess_closed;
+ return thr_stat->sess_mgr->udp_sess_closed;
// Evicted session
case STAT_TYPE_TCP_SESS_EVICTED:
- return thr_stat->session_mgr->tcp_sess_evicted;
+ return thr_stat->sess_mgr->tcp_sess_evicted;
case STAT_TYPE_UDP_SESS_EVICTED:
- return thr_stat->session_mgr->udp_sess_evicted;
+ return thr_stat->sess_mgr->udp_sess_evicted;
// Packet
case STAT_TYPE_UDP_PKTS_BYPASS_TABLE_FULL:
- return thr_stat->session_mgr->udp_pkts_bypass_table_full;
+ return thr_stat->sess_mgr->udp_pkts_bypass_table_full;
case STAT_TYPE_TCP_PKTS_BYPASS_TABLE_FULL:
- return thr_stat->session_mgr->tcp_pkts_bypass_table_full;
+ return thr_stat->sess_mgr->tcp_pkts_bypass_table_full;
case STAT_TYPE_TCP_PKTS_BYPASS_SESSION_NOT_FOUND:
- return thr_stat->session_mgr->tcp_pkts_bypass_session_not_found;
+ return thr_stat->sess_mgr->tcp_pkts_bypass_session_not_found;
case STAT_TYPE_TCP_PKTS_BYPASS_DUPLICATED:
- return thr_stat->session_mgr->tcp_pkts_bypass_duplicated;
+ return thr_stat->sess_mgr->tcp_pkts_bypass_duplicated;
case STAT_TYPE_UDP_PKTS_BYPASS_DUPLICATED:
- return thr_stat->session_mgr->udp_pkts_bypass_duplicated;
+ return thr_stat->sess_mgr->udp_pkts_bypass_duplicated;
case STAT_TYPE_UDP_PKTS_BYPASS_SESSION_EVICTED:
- return thr_stat->session_mgr->udp_pkts_bypass_session_evicted;
+ return thr_stat->sess_mgr->udp_pkts_bypass_session_evicted;
// TCP segments
case STAT_TYPE_TCP_SEGS_INPUT:
- return thr_stat->session_mgr->tcp_segs_input;
+ return thr_stat->sess_mgr->tcp_segs_input;
case STAT_TYPE_TCP_SEGS_CONSUMED:
- return thr_stat->session_mgr->tcp_segs_consumed;
+ return thr_stat->sess_mgr->tcp_segs_consumed;
case STAT_TYPE_TCP_SEGS_TIMEOUT:
- return thr_stat->session_mgr->tcp_segs_timeout;
+ return thr_stat->sess_mgr->tcp_segs_timeout;
case STAT_TYPE_TCP_SEGS_RETRANSMITED:
- return thr_stat->session_mgr->tcp_segs_retransmited;
+ return thr_stat->sess_mgr->tcp_segs_retransmited;
case STAT_TYPE_TCP_SEGS_OVERLAPPED:
- return thr_stat->session_mgr->tcp_segs_overlapped;
+ return thr_stat->sess_mgr->tcp_segs_overlapped;
case STAT_TYPE_TCP_SEGS_OMITTED_TOO_MANY:
- return thr_stat->session_mgr->tcp_segs_omitted_too_many;
+ return thr_stat->sess_mgr->tcp_segs_omitted_too_many;
case STAT_TYPE_TCP_SEGS_INORDER:
- return thr_stat->session_mgr->tcp_segs_inorder;
+ return thr_stat->sess_mgr->tcp_segs_inorder;
case STAT_TYPE_TCP_SEGS_REORDERED:
- return thr_stat->session_mgr->tcp_segs_reordered;
+ return thr_stat->sess_mgr->tcp_segs_reordered;
case STAT_TYPE_TCP_SEGS_BUFFERED:
- return thr_stat->session_mgr->tcp_segs_buffered;
+ return thr_stat->sess_mgr->tcp_segs_buffered;
case STAT_TYPE_TCP_SEGS_FREED:
- return thr_stat->session_mgr->tcp_segs_freed;
+ return thr_stat->sess_mgr->tcp_segs_freed;
default:
assert(0);
@@ -432,10 +433,55 @@ uint64_t get_stat_value_by_idx(const struct thread_stat *thr_stat, size_t idx)
}
}
+struct stellar_stat_config *stellar_stat_config_new(const char *toml_file)
+{
+ if (toml_file == NULL)
+ {
+ return NULL;
+ }
+
+ struct stellar_stat_config *cfg = (struct stellar_stat_config *)calloc(1, sizeof(struct stellar_stat_config));
+ if (cfg == NULL)
+ {
+ return NULL;
+ }
+
+ int ret = 0;
+ ret += load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&cfg->nr_worker_thread, 1, MAX_THREAD_NUM);
+ ret += load_and_validate_toml_integer_config(toml_file, "stat.merge_interval_ms", (uint64_t *)&cfg->merge_interval_ms, 0, 60000);
+ ret += load_and_validate_toml_integer_config(toml_file, "stat.output_interval_ms", (uint64_t *)&cfg->output_interval_ms, 0, 60000);
+
+ if (ret != 0)
+ {
+ stellar_stat_config_free(cfg);
+ return NULL;
+ }
+
+ return cfg;
+}
+
+void stellar_stat_config_free(struct stellar_stat_config *cfg)
+{
+ if (cfg)
+ {
+ free(cfg);
+ cfg = NULL;
+ }
+}
+
+void stellar_stat_config_print(const struct stellar_stat_config *cfg)
+{
+ if (cfg)
+ {
+ STAT_LOG_INFO("stat.merge_interval_ms : %lu", cfg->merge_interval_ms);
+ STAT_LOG_INFO("stat.output_interval_ms : %lu", cfg->output_interval_ms);
+ }
+}
+
// python3 -m pip install prettytable
// python3 -m pip install jinja2
// /opt/MESA/bin/fieldstat_exporter.py local -j log/stellar_fs4.json -e -l --clear-screen
-struct stellar_stat *stellar_stat_new(uint16_t nr_thread)
+struct stellar_stat *stellar_stat_new(const struct stellar_stat_config *cfg, uint64_t now_ms)
{
struct stellar_stat *stat = (struct stellar_stat *)calloc(1, sizeof(struct stellar_stat));
if (stat == NULL)
@@ -443,7 +489,7 @@ struct stellar_stat *stellar_stat_new(uint16_t nr_thread)
return NULL;
}
- snprintf(stat->output_file, sizeof(stat->output_file), "./log/stellar_fs4.json");
+ memcpy(&stat->cfg, cfg, sizeof(struct stellar_stat_config));
stat->fs = fieldstat_easy_new(1, "stellar", NULL, 0);
if (stat->fs == NULL)
@@ -452,7 +498,6 @@ struct stellar_stat *stellar_stat_new(uint16_t nr_thread)
goto error_out;
}
- stat->nr_thread = nr_thread;
for (int i = 0; i < MAX_THREAD_NUM; i++)
{
stat->flag[i] = IS_FREE;
@@ -463,6 +508,9 @@ struct stellar_stat *stellar_stat_new(uint16_t nr_thread)
stat->stat_idx[i] = fieldstat_easy_register_counter(stat->fs, name[i]);
}
+ stat->last_merge_stat_ts = now_ms;
+ stat->last_output_stat_ts = now_ms;
+
return stat;
error_out:
@@ -484,9 +532,15 @@ void stellar_stat_free(struct stellar_stat *stat)
}
}
-void stellar_stat_output(struct stellar_stat *stat)
+void stellar_stat_output(struct stellar_stat *stat, uint64_t now_ms)
{
- for (uint16_t i = 0; i < stat->nr_thread; i++)
+ if (now_ms - stat->last_output_stat_ts < stat->cfg.output_interval_ms)
+ {
+ return;
+ }
+ stat->last_output_stat_ts = now_ms;
+
+ for (uint16_t i = 0; i < stat->cfg.nr_worker_thread; i++)
{
if (ATOMIC_READ(&(stat->flag[i])) == IS_BUSY)
{
@@ -510,10 +564,10 @@ void stellar_stat_output(struct stellar_stat *stat)
fieldstat_easy_output(stat->fs, &buff, &len);
if (buff)
{
- FILE *fp = fopen(stat->output_file, "w+");
+ FILE *fp = fopen("./log/stellar_fs4.json", "w+");
if (fp == NULL)
{
- STAT_LOG_ERROR("failed to open file: %s, %s", stat->output_file, strerror(errno));
+ STAT_LOG_ERROR("failed to open file: ./log/stellar_fs4.json, %s", strerror(errno));
}
else
{
@@ -530,12 +584,18 @@ void stellar_stat_output(struct stellar_stat *stat)
}
}
-void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx)
+void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx, uint64_t now_ms)
{
+ if (now_ms - stat->last_merge_stat_ts < stat->cfg.merge_interval_ms)
+ {
+ return;
+ }
+
if (ATOMIC_READ(&(stat->flag[thr_idx])) == IS_FREE)
{
memcpy(&stat->thr_stat[thr_idx], thr_stat, sizeof(struct thread_stat));
ATOMIC_SET(&(stat->flag[thr_idx]), IS_BUSY);
+ stat->last_merge_stat_ts = now_ms;
}
}
diff --git a/infra/core/stellar_stat.h b/infra/core/stellar_stat.h
index fbef78d..3560ce8 100644
--- a/infra/core/stellar_stat.h
+++ b/infra/core/stellar_stat.h
@@ -11,16 +11,27 @@ extern "C"
struct thread_stat
{
- struct packet_io_stat *packet_io;
- struct ip_reassembly_stat *ip_reassembly;
- struct session_manager_stat *session_mgr;
+ struct packet_io_stat *pkt_io;
+ struct ip_reassembly_stat *ip_reass;
+ struct session_manager_stat *sess_mgr;
};
+struct stellar_stat_config
+{
+ uint16_t nr_worker_thread; // range [1, MAX_THREAD_NUM]
+ uint64_t merge_interval_ms; // range: [0, 60000] (ms)
+ uint64_t output_interval_ms; // range: [0, 60000] (ms)
+};
+
+struct stellar_stat_config *stellar_stat_config_new(const char *toml_file);
+void stellar_stat_config_free(struct stellar_stat_config *cfg);
+void stellar_stat_config_print(const struct stellar_stat_config *cfg);
+
struct stellar_stat;
-struct stellar_stat *stellar_stat_new(uint16_t nr_thread);
+struct stellar_stat *stellar_stat_new(const struct stellar_stat_config *cfg, uint64_t now_ms);
void stellar_stat_free(struct stellar_stat *stat);
-void stellar_stat_output(struct stellar_stat *stat);
-void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx);
+void stellar_stat_output(struct stellar_stat *stat, uint64_t now_ms);
+void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx, uint64_t now_ms);
void stellar_stat_print(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx);
#ifdef __cplusplus
diff --git a/test/decoders/http/test_based_on_stellar/env/stellar.toml b/test/decoders/http/test_based_on_stellar/env/stellar.toml
index e43a206..308c884 100644
--- a/test/decoders/http/test_based_on_stellar/env/stellar.toml
+++ b/test/decoders/http/test_based_on_stellar/env/stellar.toml
@@ -59,6 +59,6 @@
timeout_ms = 100 # range: [1, 60000] (ms)
buffered_segments_max = 256 # range: [2, 4096] per flow
-[schedule]
- merge_stat_interval = 50 # range: [1, 60000] (ms)
- output_stat_interval = 200 # range: [1, 60000] (ms)
+[stat]
+ merge_interval_ms = 500 # range: [0, 60000] (ms)
+ output_interval_ms = 1000 # range: [0, 60000] (ms)
diff --git a/test/packet_inject/conf/stellar.toml b/test/packet_inject/conf/stellar.toml
index 50963b6..cb8fe4b 100644
--- a/test/packet_inject/conf/stellar.toml
+++ b/test/packet_inject/conf/stellar.toml
@@ -59,6 +59,6 @@
timeout_ms = 10000 # range: [1, 60000] (ms)
buffered_segments_max = 256 # range: [2, 4096] per flow
-[schedule]
- merge_stat_interval = 500 # range: [1, 60000] (ms)
- output_stat_interval = 2000 # range: [1, 60000] (ms)
+[stat]
+ merge_interval_ms = 500 # range: [0, 60000] (ms)
+ output_interval_ms = 1000 # range: [0, 60000] (ms)