summaryrefslogtreecommitdiff
path: root/common/src
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2024-07-26 16:50:51 +0800
committerfengweihao <[email protected]>2024-07-26 16:50:51 +0800
commit83f51432b1b86d6d35eb5c3e10e6cbca29a1a45a (patch)
tree923c63905a9746f8979d58de3885a80ef025adae /common/src
parenta59b9390336dafa0ee1f05e74e52411d175b9843 (diff)
TSG-21854 TFE使用fieldstat4序列化Manipulation Policy的metric并输出到kafka
Diffstat (limited to 'common/src')
-rw-r--r--common/src/kafka.cpp13
-rw-r--r--common/src/metrics.cpp290
-rw-r--r--common/src/tfe_fieldstat.cpp268
-rw-r--r--common/src/tfe_packet_io.cpp3
-rw-r--r--common/src/tfe_resource.cpp33
-rw-r--r--common/src/tfe_session_table.cpp4
6 files changed, 287 insertions, 324 deletions
diff --git a/common/src/kafka.cpp b/common/src/kafka.cpp
index a82c641..cab4642 100644
--- a/common/src/kafka.cpp
+++ b/common/src/kafka.cpp
@@ -216,6 +216,19 @@ void kafka_destroy(struct kafka *handle)
}
}
+int kafka_send2(struct kafka *handle, enum topic_idx idx, const char *data, int len)
+{
+ if (handle && handle->pppt[idx] && handle->pppt[idx]->topic)
+ {
+ if(rd_kafka_produce(handle->pppt[idx]->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL) == -1)
+ {
+ TFE_LOG_ERROR(g_default_logger, "KAFKA: failed to produce message with topic[%d], %s", idx, rd_kafka_err2str(rd_kafka_last_error()));
+ return -1;
+ }
+ }
+ return 0;
+}
+
int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int len)
{
if (!handle)
diff --git a/common/src/metrics.cpp b/common/src/metrics.cpp
deleted file mode 100644
index 7b6ed5f..0000000
--- a/common/src/metrics.cpp
+++ /dev/null
@@ -1,290 +0,0 @@
-#include <stdio.h>
-#include <stdint.h>
-#include <errno.h>
-#include <assert.h>
-#include <unistd.h>
-#include <MESA/MESA_prof_load.h>
-#include <fieldstat/fieldstat_easy.h>
-
-#include "uthash.h"
-#include "metrics.h"
-#include "tfe_utils.h"
-#include "tfe_cmsg.h"
-#include "tfe_session_table.h"
-
-#define FIELDSTAT_TAG_INIT(ptr, index, _key, _type, _value) \
- do { \
- ptr[index].key = _key; \
- ptr[index].type = _type; \
- ptr[index].value_longlong = _value; \
- }while(0)
-
-struct config
-{
- uint16_t thr_num;
- int output_fs_interval_ms;
- int output_kafka_interval_ms;
- char data_center[256];
- char device_group[256];
- char device_id[256];
-};
-
-struct metrics
-{
- struct config cfg;
-
- int hit_count_idx;
- int in_bytes_idx;
- int out_bytes_idx;
- int in_pkts_idx;
- int out_pkts_idx;
-
- pthread_t tid;
- int thr_is_runing;
- int thr_need_exit;
- struct kafka *kfk;
- struct fieldstat_easy *fs;
-};
-
-/******************************************************************************
- * Private API
- ******************************************************************************/
-
-static void *fs2kafka_thread_cycle(void *arg)
-{
- struct metrics *handle = (struct metrics *)arg;
- ATOMIC_SET(&handle->thr_is_runing, 1);
-
- char **ptr = NULL;
- size_t len = 0;
- while (!ATOMIC_READ(&handle->thr_need_exit))
- {
- fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len);
- if (ptr)
- {
- for (size_t i = 0; i < len; i++)
- {
- kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i]));
- free(ptr[i]);
- ptr[i] = NULL;
- }
- free(ptr);
- }
-
- usleep(handle->cfg.output_kafka_interval_ms * 1000);
- }
- ATOMIC_SET(&handle->thr_is_runing, 0);
-
- return NULL;
-}
-
-/******************************************************************************
- * Public API
- ******************************************************************************/
-
-struct metrics *metrics_create(const char *profile, struct kafka *kfk)
-{
- struct metrics *handle = (struct metrics *)calloc(1, sizeof(struct metrics));
- if (!handle)
- {
- return NULL;
- }
-
- MESA_load_profile_int_def(profile, "packet_io", "packet_io_threads", (int *)&(handle->cfg.thr_num), 0);
- MESA_load_profile_int_def(profile, "metrics", "output_fs_interval_ms", &(handle->cfg.output_fs_interval_ms), 500);
- MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000);
- MESA_load_profile_string_def(profile, "public", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), "");
- MESA_load_profile_string_def(profile, "public", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), "");
- MESA_load_profile_string_def(profile, "public", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), "");
-
- const struct fieldstat_tag tags[] = {
- {"data_center", TAG_CSTRING, {.value_str = handle->cfg.data_center}},
- {"device_group", TAG_CSTRING, {.value_str = handle->cfg.device_group}},
- {"device_id", TAG_CSTRING, {.value_str = handle->cfg.device_id}},
- };
-
- handle->kfk = kfk;
- handle->fs = fieldstat_easy_new(handle->cfg.thr_num, "proxy_rule_hits", tags, sizeof(tags) / sizeof(tags[0]));
- if (!handle->fs)
- {
- goto error_out;
- }
-
- handle->hit_count_idx = fieldstat_easy_register_counter(handle->fs, "hit_count");
- handle->in_bytes_idx = fieldstat_easy_register_counter(handle->fs, "in_bytes");
- handle->out_bytes_idx = fieldstat_easy_register_counter(handle->fs, "out_bytes");
- handle->in_pkts_idx = fieldstat_easy_register_counter(handle->fs, "in_pkts");
- handle->out_pkts_idx = fieldstat_easy_register_counter(handle->fs, "out_pkts");
-
- if (pthread_create(&handle->tid, NULL, fs2kafka_thread_cycle, (void *)handle) < 0)
- {
- goto error_out;
- }
-
- return handle;
-
-error_out:
- metrics_destory(handle);
- return NULL;
-}
-
-void metrics_destory(struct metrics *handle)
-{
- if (handle)
- {
- ATOMIC_SET(&handle->thr_need_exit, 1);
- while (ATOMIC_READ(&handle->thr_is_runing))
- {
- usleep(1000);
- }
-
- if (handle->kfk)
- {
- kafka_destroy(handle->kfk);
- handle->kfk = NULL;
- }
-
- if (handle->fs)
- {
- fieldstat_easy_free(handle->fs);
- handle->fs = NULL;
- }
-
- free(handle);
- handle = NULL;
- }
-}
-
-void metrics_single_session_output(struct session_node *node, void *ctx)
-{
- int ret = 0;
- int hit_count = 0;
- uint16_t out_size = 0;
- struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx;
- struct metrics *metrics = thread_ctx->ref_acceptor_ctx->metrics;
- struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
- struct tfe_cmsg *cmsg = s_ctx->cmsg;
- int thr_idx = thread_ctx->thread_index;
- if (cmsg == NULL)
- return;
-
- int c2s_dir = s_ctx->c2s_info.is_e2i_dir;
- int c2s_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts;
- int c2s_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes;
- int s2c_dir = s_ctx->s2c_info.is_e2i_dir;
- int s2c_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts;
- int s2c_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes;
- s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx;
- s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx;
-
- if (c2s_rx_pkts == 0 && c2s_rx_bytes == 0 && s2c_rx_pkts == 0 && s2c_rx_bytes == 0)
- return;
-
- int vsys_id = 0;
- ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size);
- if (ret != 0)
- {
- TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret));
- return;
- }
-
- uint64_t rule_id = 0;
- ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &out_size);
- if (ret != 0)
- {
- TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret));
- return;
- }
-
- uint8_t hit_no_intercept = 0;
- ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size);
- if (ret != 0)
- {
- TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret));
- return;
- }
-
- if (s_ctx->metric_hit == 0 && s_ctx->send_log_flag)
- {
- s_ctx->metric_hit = 1;
- hit_count = 1;
- }
-
- int in_pkts = 0;
- int in_bytes = 0;
- int out_pkts = 0;
- int out_bytes = 0;
-
- // incoming : E2I 的流量
- // outgoing : I2E 的流量
- // first_ctr_packet_dir <==> client hello packet dir
- // 1: E2I 0:I2E
- if (c2s_dir == 1)
- {
- in_pkts += c2s_rx_pkts;
- in_bytes += c2s_rx_bytes;
- }
- else
- {
- out_pkts += c2s_rx_pkts;
- out_bytes += c2s_rx_bytes;
- }
-
- if (s2c_dir == 1)
- {
- in_pkts += s2c_rx_pkts;
- in_bytes += s2c_rx_bytes;
- }
- else
- {
- out_pkts += s2c_rx_pkts;
- out_bytes += s2c_rx_bytes;
- }
-
- int nr_tags = 0;
- struct fieldstat_tag tags[5] = {0};
- FIELDSTAT_TAG_INIT(tags, nr_tags, "vsys_id", TAG_INTEGER, vsys_id);
- nr_tags++;
- FIELDSTAT_TAG_INIT(tags, nr_tags, "rule_id", TAG_INTEGER, rule_id);
- nr_tags++;
- uint8_t pinning_status = 0;
- if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0)
- {
- FIELDSTAT_TAG_INIT(tags, nr_tags, "pinning_status", TAG_INTEGER, pinning_status);
- nr_tags++;
- }
- // action : 2 Intercept; 3 No Intercept
- FIELDSTAT_TAG_INIT(tags, nr_tags, "action", TAG_INTEGER, (hit_no_intercept == 1 ? 3 : 2));
- nr_tags++;
-
- if (hit_count > 0)
- fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->hit_count_idx, tags, (size_t)nr_tags, hit_count);
-
- if (in_pkts > 0)
- fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->in_pkts_idx, tags, (size_t)nr_tags, in_pkts);
-
- if (in_bytes > 0)
- fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->in_bytes_idx, tags, (size_t)nr_tags, in_bytes);
-
- if (out_pkts > 0)
- fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->out_pkts_idx, tags, (size_t)nr_tags, out_pkts);
-
- if (out_bytes > 0)
- fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->out_bytes_idx, tags, (size_t)nr_tags, out_bytes);
- return;
-}
-
-void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx)
-{
- if (thread_ctx == NULL)
- return;
-
- struct session_table *session_table = thread_ctx->session_table;
- session_foreach(session_table, metrics_single_session_output, thread_ctx);
- return;
-}
-
-int metrics_get_interval(struct metrics *handle)
-{
- return handle->cfg.output_fs_interval_ms;
-} \ No newline at end of file
diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp
index f83a36f..74c73d7 100644
--- a/common/src/tfe_fieldstat.cpp
+++ b/common/src/tfe_fieldstat.cpp
@@ -1,39 +1,248 @@
#include <stdlib.h>
+#include <unistd.h>
#include <tfe_fieldstat.h>
#include "tfe_stream.h"
#include "tfe_resource.h"
#include "tfe_packet_io.h"
-int tfe_fieldstat_easy_incrby(struct tfe_fieldstat_easy_t *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id)
+int tfe_fieldstat_intercept_incrby(struct fieldstat_easy_intercept *metrics, void *val_data, int thread_index)
{
- return fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, counter_id, tags, (size_t)n_tags, value);
+ int ret = 0;
+ int hit_count = 0;
+ uint16_t out_size = 0;
+
+ struct session_ctx *s_ctx = (struct session_ctx *)val_data;
+ struct tfe_cmsg *cmsg = s_ctx->cmsg;
+ if (cmsg == NULL)
+ {
+ return 0;
+ }
+
+ int c2s_dir = s_ctx->c2s_info.is_e2i_dir;
+ int c2s_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts;
+ int c2s_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes;
+ int s2c_dir = s_ctx->s2c_info.is_e2i_dir;
+ int s2c_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts;
+ int s2c_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes;
+ s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx;
+ s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx;
+
+ if (c2s_rx_pkts == 0 && c2s_rx_bytes == 0 && s2c_rx_pkts == 0 && s2c_rx_bytes == 0)
+ {
+ return 0;
+ }
+
+ int vsys_id = 0;
+ ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size);
+ if (ret != 0)
+ {
+ TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret));
+ return 0;
+ }
+
+ uint64_t rule_id = 0;
+ ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &out_size);
+ if (ret != 0)
+ {
+ TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret));
+ return 0;
+ }
+
+ uint8_t hit_no_intercept = 0;
+ ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size);
+ if (ret != 0)
+ {
+ TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret));
+ return 0;
+ }
+
+ if (s_ctx->metric_hit == 0 && s_ctx->send_log_flag)
+ {
+ s_ctx->metric_hit = 1;
+ hit_count = 1;
+ }
+
+ int in_pkts = 0;
+ int in_bytes = 0;
+ int out_pkts = 0;
+ int out_bytes = 0;
+
+ // incoming : E2I ������
+ // outgoing : I2E ������
+ // first_ctr_packet_dir <==> client hello packet dir
+ // 1: E2I 0:I2E
+ if (c2s_dir == 1)
+ {
+ in_pkts += c2s_rx_pkts;
+ in_bytes += c2s_rx_bytes;
+ }
+ else
+ {
+ out_pkts += c2s_rx_pkts;
+ out_bytes += c2s_rx_bytes;
+ }
+
+ if (s2c_dir == 1)
+ {
+ in_pkts += s2c_rx_pkts;
+ in_bytes += s2c_rx_bytes;
+ }
+ else
+ {
+ out_pkts += s2c_rx_pkts;
+ out_bytes += s2c_rx_bytes;
+ }
+
+ int nr_tags = 0;
+ struct fieldstat_tag tags[5] = {0};
+ FIELDSTAT_TAG_INIT(tags, nr_tags, "vsys_id", TAG_INTEGER, vsys_id);
+ nr_tags++;
+ FIELDSTAT_TAG_INIT(tags, nr_tags, "rule_id", TAG_INTEGER, rule_id);
+ nr_tags++;
+ uint8_t pinning_status = 0;
+ if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0)
+ {
+ FIELDSTAT_TAG_INIT(tags, nr_tags, "pinning_status", TAG_INTEGER, pinning_status);
+ nr_tags++;
+ }
+ // action : 2 Intercept; 3 No Intercept
+ FIELDSTAT_TAG_INIT(tags, nr_tags, "action", TAG_INTEGER, (hit_no_intercept == 1 ? 3 : 2));
+ nr_tags++;
+
+ if (hit_count > 0)
+ fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->hit_count_idx, tags, (size_t)nr_tags, hit_count);
+
+ if (in_pkts > 0)
+ fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->in_pkts_idx, tags, (size_t)nr_tags, in_pkts);
+
+ if (in_bytes > 0)
+ fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->in_bytes_idx, tags, (size_t)nr_tags, in_bytes);
+
+ if (out_pkts > 0)
+ fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->out_pkts_idx, tags, (size_t)nr_tags, out_pkts);
+
+ if (out_bytes > 0)
+ fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->out_bytes_idx, tags, (size_t)nr_tags, out_bytes);
+ return 1;
}
-struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger)
+int tfe_fieldstat_get_output_interval(struct fieldstat_easy_intercept *fieldstat)
+{
+ return fieldstat->output_fs_interval_ms;
+}
+
+int tfe_fieldstat_manipulation_incrby(struct filedstat_easy_manipulation *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id)
+{
+ return fieldstat_easy_counter_incrby(fieldstat->fs, thread_id, counter_id, tags, (size_t)n_tags, value);
+}
+
+static void *tfe_fieldstat_thread_cycle(void *arg)
+{
+ struct tfe_fieldstat_easy_t *tfe_fieldstat4_easy = (struct tfe_fieldstat_easy_t *)arg;
+ ATOMIC_SET(&tfe_fieldstat4_easy->thr_is_runing, 1);
+
+ char **ptr = NULL;
+ size_t len = 0;
+ while (!ATOMIC_READ(&tfe_fieldstat4_easy->thr_need_exit))
+ {
+ if(tfe_fieldstat4_easy->intercept)
+ {
+ fieldstat_easy_output_array_and_reset(tfe_fieldstat4_easy->intercept->fs, &ptr, &len);
+ if (ptr)
+ {
+ for (size_t i = 0; i < len; i++)
+ {
+ kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, ptr[i], strlen(ptr[i]));
+ free(ptr[i]);
+ ptr[i] = NULL;
+ }
+ free(ptr);
+ }
+ }
+
+ if(tfe_fieldstat4_easy->manipulation)
+ {
+ fieldstat_easy_output_array_and_reset(tfe_fieldstat4_easy->manipulation->fs, &ptr, &len);
+ if (ptr)
+ {
+ for (size_t i = 0; i < len; i++)
+ {
+ kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, ptr[i], strlen(ptr[i]));
+ free(ptr[i]);
+ ptr[i] = NULL;
+ }
+ free(ptr);
+ }
+ }
+
+ usleep(tfe_fieldstat4_easy->output_kafka_interval_ms * 1000);
+ }
+ ATOMIC_SET(&tfe_fieldstat4_easy->thr_is_runing, 0);
+
+ return NULL;
+}
+
+struct fieldstat_easy_intercept *tfe_fieldstat_easy_intercept_create(char *app_name, int max_thread, int output_fs_interval_ms, void *local_logger)
+{
+ struct fieldstat_easy_intercept *fieldstat = ALLOC(struct fieldstat_easy_intercept, 1);
+
+ const struct fieldstat_tag tags[] = {
+ {"data_center", TAG_CSTRING, {.value_str = tfe_get_data_center()}},
+ {"device_group", TAG_CSTRING, {.value_str = tfe_get_device_group()}},
+ {"device_id", TAG_CSTRING, {.value_str = tfe_get_device_id()}},
+ };
+
+ fieldstat->fs = fieldstat_easy_new(max_thread, app_name, tags, sizeof(tags) / sizeof(tags[0]));
+ if (!fieldstat->fs)
+ {
+ TFE_LOG_ERROR(local_logger, "fieldstat4 easy intercept instance init failed.");
+ FREE(&fieldstat);
+ return NULL;
+ }
+ fieldstat->output_fs_interval_ms = output_fs_interval_ms;
+ fieldstat->hit_count_idx = fieldstat_easy_register_counter(fieldstat->fs, "hit_count");
+ fieldstat->in_bytes_idx = fieldstat_easy_register_counter(fieldstat->fs, "in_bytes");
+ fieldstat->out_bytes_idx = fieldstat_easy_register_counter(fieldstat->fs, "out_bytes");
+ fieldstat->in_pkts_idx = fieldstat_easy_register_counter(fieldstat->fs, "in_pkts");
+ fieldstat->out_pkts_idx = fieldstat_easy_register_counter(fieldstat->fs, "out_pkts");
+
+ return fieldstat;
+}
+
+struct filedstat_easy_manipulation *tfe_fieldstat_easy_manipulation_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger)
{
const char *counter_field[COLUMN_MAX] = {"hit_count", "in_bytes", "out_bytes", "in_pkts", "out_pkts"};
struct fieldstat_tag metric_tags[TAG_MAX - 1] = {{"vsys_id", TAG_INTEGER, -1}, {"rule_id", TAG_INTEGER, -1}, {"action", TAG_INTEGER, -1}, {"sub_action", TAG_CSTRING, -1}};
- struct tfe_fieldstat_easy_t *fieldstat = ALLOC(struct tfe_fieldstat_easy_t, 1);
-
- fieldstat->fseasy = fieldstat_easy_new(max_thread, app_name, NULL, 0);
- if(!fieldstat->fseasy)
+ struct filedstat_easy_manipulation *fieldstat = ALLOC(struct filedstat_easy_manipulation, 1);
+
+ const struct fieldstat_tag tags[] = {
+ {"data_center", TAG_CSTRING, {.value_str = tfe_get_data_center()}},
+ {"device_group", TAG_CSTRING, {.value_str = tfe_get_device_group()}},
+ {"device_id", TAG_CSTRING, {.value_str = tfe_get_device_id()}},
+ };
+
+ fieldstat->fs = fieldstat_easy_new(max_thread, app_name, tags, sizeof(tags) / sizeof(tags[0]));
+ if(!fieldstat->fs)
{
- TFE_LOG_ERROR(local_logger, "fieldstat4 easy instance init failed.");
+ TFE_LOG_ERROR(local_logger, "fieldstat4 easy manipulation instance init failed.");
FREE(&fieldstat);
return NULL;
}
- fieldstat_easy_enable_auto_output(fieldstat->fseasy, outpath, cycle);
+ if(cycle > 0)
+ {
+ fieldstat_easy_enable_auto_output(fieldstat->fs, outpath, cycle);
+ }
for(int i=0; i<COLUMN_MAX; i++)
{
- fieldstat->counter_array[i]=fieldstat_easy_register_counter(fieldstat->fseasy, counter_field[i]);
+ fieldstat->counter_array[i]=fieldstat_easy_register_counter(fieldstat->fs, counter_field[i]);
if(fieldstat->counter_array[i] < 0)
{
TFE_LOG_ERROR(local_logger, "fieldstat4 easy register counter failed.");
FREE(&fieldstat);
- return NULL;
+ return NULL;
}
}
@@ -47,23 +256,48 @@ struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *out
return fieldstat;
}
+struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(int output_kafka_interval_ms)
+{
+ struct tfe_fieldstat_easy_t *fieldstat = ALLOC(struct tfe_fieldstat_easy_t, 1);
+ fieldstat->output_kafka_interval_ms = output_kafka_interval_ms;
+
+ if (pthread_create(&fieldstat->tid, NULL, tfe_fieldstat_thread_cycle, (void *)fieldstat) < 0)
+ {
+ FREE(&fieldstat);
+ return NULL;
+ }
+
+ return fieldstat;
+}
+
void tfe_fieldstat_easy_destroy(struct tfe_fieldstat_easy_t *fieldstat)
{
if(fieldstat)
{
- if(fieldstat->fseasy)
+ if(fieldstat->manipulation)
{
- fieldstat_easy_free(fieldstat->fseasy);
+ if(fieldstat->manipulation->fs)
+ {
+ fieldstat_easy_free(fieldstat->manipulation->fs);
+ }
+
+ for (int i = 0; i < fieldstat->manipulation->max_thread; i++)
+ {
+ if (fieldstat->manipulation->tags[i])
+ {
+ FREE(&fieldstat->manipulation->tags[i]);
+ }
+ }
+ FREE(&fieldstat->manipulation->tags);
}
- for (int i = 0; i < fieldstat->max_thread; i++)
+ if(fieldstat->intercept)
{
- if (fieldstat->tags[i])
+ if(fieldstat->intercept->fs)
{
- FREE(&fieldstat->tags[i]);
+ fieldstat_easy_free(fieldstat->intercept->fs);
}
}
- FREE(&fieldstat->tags);
FREE(&fieldstat);
}
}
diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp
index 9950347..32e1250 100644
--- a/common/src/tfe_packet_io.cpp
+++ b/common/src/tfe_packet_io.cpp
@@ -35,7 +35,6 @@
#include "dablooms.h"
#include "timestamp.h"
#include "tfe_dp_trace.h"
-#include "metrics.h"
/******************************************************************************
* Struct
@@ -1350,7 +1349,7 @@ static int handle_session_closing(struct metadata *meta, marsio_buff_t *rx_buff,
if (node)
{
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
- metrics_single_session_output(node, thread);
+ tfe_fieldstat_intercept_incrby(thread->ref_acceptor_ctx->metrics, s_ctx, thread->thread_index);
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, meta->session_id, "closing", NULL, NULL);
session_table_delete_by_id(thread->session_table, meta->session_id);
diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp
index e811436..3203775 100644
--- a/common/src/tfe_resource.cpp
+++ b/common/src/tfe_resource.cpp
@@ -11,7 +11,7 @@
#define MAAT_INPUT_FILE 2
static int scan_table_id[__SCAN_COMMON_TABLE_MAX];
-static struct tfe_fieldstat_easy_t *fieldstat_easy = NULL;
+static struct tfe_fieldstat_easy_t *fieldstat4_easy = NULL;
static char *device_tag=NULL;
struct kafka *kafka_handle = NULL;
@@ -65,30 +65,37 @@ struct maat *tfe_get_maat_handle()
struct tfe_fieldstat_easy_t *tfe_get_fieldstat_handle()
{
- return fieldstat_easy;
+ return fieldstat4_easy;
}
-static struct tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger)
+static tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger)
{
int cycle=0;
char app_name[TFE_STRING_MAX]={0};
char outpath[TFE_STRING_MAX]={0};
- struct tfe_fieldstat_easy_t *fieldstat_easy=NULL;
+ int output_fs_interval_ms=0, output_kafka_interval_ms=0;
+ struct tfe_fieldstat_easy_t *fieldstat_easy=NULL;
- MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "proxy_rule_hits");
- MESA_load_profile_int_def(profile, section, "cycle", &cycle, 5);
- MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_fieldstat.json");
+ MESA_load_profile_int_def(profile, section, "output_fs_interval_ms", &output_fs_interval_ms, 500);
+ MESA_load_profile_int_def(profile, section, "output_kafka_interval_ms", &output_kafka_interval_ms, 1000);
- fieldstat_easy = tfe_fieldstat_easy_create(app_name, outpath, cycle, max_thread, logger);
+ fieldstat_easy = tfe_fieldstat_easy_create(output_kafka_interval_ms);
if (fieldstat_easy == NULL)
{
TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric.");
return NULL;
}
- TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name);
- TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle);
- TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath);
+ MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "proxy_rule_hits");
+ MESA_load_profile_int_def(profile, section, "cycle", &cycle, 0);
+ MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_fieldstat.json");
+ fieldstat_easy->manipulation = tfe_fieldstat_easy_manipulation_create(app_name, outpath, cycle, max_thread, logger);
+
+ TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name);
+ TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle);
+ TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath);
+ TFE_LOG_INFO(logger, "tfe output_fs_interval_ms : %d", output_fs_interval_ms);
+ TFE_LOG_INFO(logger, "tfe output_kafka_interval_ms : %d", output_kafka_interval_ms);
return fieldstat_easy;
}
@@ -377,8 +384,8 @@ int tfe_env_init()
return -1;
}
- fieldstat_easy = create_fieldstat4_instance(profile_path, "proxy_hits", thread_num, g_default_logger);
- if(!fieldstat_easy)
+ fieldstat4_easy = create_fieldstat4_instance(profile_path, "proxy_hits", thread_num, g_default_logger);
+ if(!fieldstat4_easy)
{
return -1;
}
diff --git a/common/src/tfe_session_table.cpp b/common/src/tfe_session_table.cpp
index 237537b..d9fac5e 100644
--- a/common/src/tfe_session_table.cpp
+++ b/common/src/tfe_session_table.cpp
@@ -212,7 +212,7 @@ struct session_node *session_table_search_by_addr(struct session_table *table, c
return temp;
}
-void session_foreach(struct session_table *table, void (*func)(struct session_node *, void *), void *ctx)
+void session_foreach(struct session_table *table, struct fieldstat_easy_intercept *metrics, int (*func)(struct fieldstat_easy_intercept *, void *, int), int thread_index)
{
struct session_node *temp = NULL;
struct session_node *node = NULL;
@@ -222,7 +222,7 @@ void session_foreach(struct session_table *table, void (*func)(struct session_no
HASH_ITER(hh1, table->root_by_id, node, temp)
{
- func(node, ctx);
+ func(metrics, node->val_data, thread_index);
}
return;
} \ No newline at end of file