summaryrefslogtreecommitdiff
path: root/platform
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-07-19 10:02:07 +0800
committerluwenpeng <[email protected]>2024-07-19 15:14:41 +0800
commitc8d40f1347937612143daf6eceac697f57150ad1 (patch)
tree1fcbe7044ca7198c4315550d12261930097d6633 /platform
parent9e63902c0d4b1954d883faed7a0d9f3a030b3ddb (diff)
feature: TSG-21852 service_function_status support fieldstat4
Diffstat (limited to 'platform')
-rw-r--r--platform/include/health_check.h4
-rw-r--r--platform/include/sf_status.h20
-rw-r--r--platform/src/health_check.cpp63
-rw-r--r--platform/src/main.cpp5
-rw-r--r--platform/src/policy.cpp26
-rw-r--r--platform/src/sce.cpp2
-rw-r--r--platform/src/sf_metrics.cpp2
-rw-r--r--platform/src/sf_status.cpp230
-rw-r--r--platform/test/gtest_sf_status.cpp34
-rw-r--r--platform/test/test_resource/sce.conf5
10 files changed, 195 insertions, 196 deletions
diff --git a/platform/include/health_check.h b/platform/include/health_check.h
index 9e4d7cd..ea0f6f9 100644
--- a/platform/include/health_check.h
+++ b/platform/include/health_check.h
@@ -8,7 +8,7 @@ extern "C"
#include "policy.h"
-void health_check_session_init(const char *profile);
+void health_check_session_init(const char *profile, struct kafka *kfk);
// return 0 : success
// return -1 : key exist
@@ -17,7 +17,7 @@ uint64_t health_check_session_add(int profile_id, int vsys_id, const struct heal
// return 0 : success
// return -1 : key not exist
-int health_check_session_del(uint64_t session_id, int profile_id);
+int health_check_session_del(uint64_t session_id, int profile_id, int vsys_id);
// return 1 : active
// return 0 : inactive
diff --git a/platform/include/sf_status.h b/platform/include/sf_status.h
index f5541f1..90f42b8 100644
--- a/platform/include/sf_status.h
+++ b/platform/include/sf_status.h
@@ -6,17 +6,21 @@ extern "C"
{
#endif
+#include "kafka.h"
#include <stdint.h>
-#include "uthash.h"
-struct sf_status *sf_status_create(const char *profile);
-void sf_status_destory(struct sf_status *handle);
-void sf_status_reset(struct sf_status *handle);
+struct sf_status_key
+{
+ uint32_t vsys_id;
+ uint32_t sf_profile_id;
+};
-void sf_status_delete(struct sf_status *handle, int sf_profile_id);
-void sf_status_update(struct sf_status *handle, int sf_vsys_id, int sf_profile_id, int sf_status, int sf_latency);
-void sf_status_send(struct sf_status *handle);
-int sf_status_get_interval(struct sf_status *handle);
+struct sf_status *sf_status_create(const char *profile, struct kafka *kfk);
+void sf_status_destory(struct sf_status *handle);
+void sf_status_delete(struct sf_status *handle, const struct sf_status_key *key);
+void sf_status_update(struct sf_status *handle, const struct sf_status_key *key, int sf_status, int sf_latency);
+void sf_status_output(struct sf_status *handle);
+int sf_status_get_ouput_interval_ms(struct sf_status *handle);
#ifdef __cplusplus
}
diff --git a/platform/src/health_check.cpp b/platform/src/health_check.cpp
index 726e852..abe3730 100644
--- a/platform/src/health_check.cpp
+++ b/platform/src/health_check.cpp
@@ -31,6 +31,7 @@
#define PACKET_SIZE 64
#define HC_DEV_NAME_LEN 16
#define HC_LOCAL_ADDRESS_LEN 64
+#define TIMESPEC_TO_MSEC(ts) ((ts).tv_sec * 1000 + (ts).tv_nsec / 1000000)
struct session_table
{
@@ -74,7 +75,7 @@ static struct session_table_addr g_handle_bfd;
static struct session_table_addr g_handle_none;
static struct sf_status *g_sf_status = NULL;
-int sleep_ms = 300;
+int next_check_wait_ms = 300;
int enable = 1;
int icmp_cycle_time_s = 10;
char path[BFD_PATHLEN];
@@ -335,7 +336,7 @@ static int send_icmp_cycle()
return pid;
}
-void health_check_session_init(const char *profile)
+void health_check_session_init(const char *profile, struct kafka *kfk)
{
char default_gw_mac_str[32] = { 0 };
memset(&g_handle, 0, sizeof(g_handle));
@@ -358,7 +359,7 @@ void health_check_session_init(const char *profile)
return;
}
- g_sf_status = sf_status_create(profile);
+ g_sf_status = sf_status_create(profile, kfk);
if (strlen(gateway_address) > 0) {
health_check_method_table_add(&g_handle_none, gateway_address);
}
@@ -503,7 +504,7 @@ uint64_t health_check_session_add(int profile_id, int vsys_id, const struct heal
// return 0 : success
// return -1 : key not exist
-int health_check_session_del(uint64_t session_id, int profile_id)
+int health_check_session_del(uint64_t session_id, int profile_id, int vsys_id)
{
int ret = 0;
struct session_iterm *tmp = NULL;
@@ -533,7 +534,10 @@ int health_check_session_del(uint64_t session_id, int profile_id)
end:
HASH_DELETE(hh1, g_handle.root_by_id, tmp);
- sf_status_delete(g_sf_status, profile_id);
+ struct sf_status_key key = {0};
+ key.vsys_id = vsys_id;
+ key.sf_profile_id = profile_id;
+ sf_status_delete(g_sf_status, &key);
pthread_rwlock_unlock(&g_handle.rwlock);
free(tmp);
tmp = NULL;
@@ -623,7 +627,7 @@ static int get_mac_by_addr(char *addr, uint8_t *buf)
static void *_health_check_session_foreach(void *arg)
{
int is_active = 0;
- int interval_s = sf_status_get_interval(g_sf_status);
+ int ouput_interval_ms = sf_status_get_ouput_interval_ms(g_sf_status);
struct bfd_vtysh_client client;
struct session_iterm *tmp = NULL;
struct session_iterm *node = NULL;
@@ -632,10 +636,10 @@ static void *_health_check_session_foreach(void *arg)
struct sockaddr_in addr;
struct timespec current_time;
- struct timespec g_status_last_send_time;
+ struct timespec last_output_time;
clock_gettime(CLOCK_MONOTONIC, &current_time);
- clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time);
+ last_output_time = current_time;
health_check_session_init_bfd_client(&client);
bfd_vtysh_connect(&client);
@@ -661,7 +665,10 @@ static void *_health_check_session_foreach(void *arg)
is_active = 0;
}
- sf_status_update(g_sf_status, node->vsys_id, node->profile_id, is_active, 0);
+ struct sf_status_key key = {0};
+ key.vsys_id = node->vsys_id;
+ key.sf_profile_id = node->profile_id;
+ sf_status_update(g_sf_status, &key, is_active, 0);
if (node->is_active != is_active) {
node->is_active = is_active;
if (node->is_active == 1) {
@@ -673,41 +680,31 @@ static void *_health_check_session_foreach(void *arg)
health_check_method_table_set_mac(&g_handle_bfd, node->policy.address, init_mac);
}
}
- if (sleep_ms > node->policy.interval_ms)
- sleep_ms = node->policy.interval_ms;
+ if (next_check_wait_ms > node->policy.interval_ms)
+ next_check_wait_ms = node->policy.interval_ms;
}
pthread_rwlock_unlock(&g_handle.rwlock);
clock_gettime(CLOCK_MONOTONIC, &current_time);
- if (current_time.tv_sec - g_status_last_send_time.tv_sec >= interval_s)
+ int next_output_wait_ms = ouput_interval_ms - (TIMESPEC_TO_MSEC(current_time) - TIMESPEC_TO_MSEC(last_output_time));
+ if (next_output_wait_ms <= 0)
{
- sf_status_send(g_sf_status);
- clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time);
+ next_output_wait_ms = 0;
}
- // interval_s : 1000 ms
- // sleep_ms : 900 ms
- if (interval_s * 1000 > sleep_ms)
+ if (next_output_wait_ms >= next_check_wait_ms)
{
- usleep(sleep_ms * 1000);
+ usleep(next_check_wait_ms * 1000);
}
- // interval_s : 900 ms
- // sleep_ms : 1000 ms
else
{
- int tmp_time = sleep_ms;
- while(tmp_time > interval_s * 1000) {
- usleep(interval_s * 1000 * 1000);
-
- clock_gettime(CLOCK_MONOTONIC, &current_time);
- if (current_time.tv_sec - g_status_last_send_time.tv_sec >= interval_s)
- {
- sf_status_send(g_sf_status);
- clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time);
- }
- tmp_time -= interval_s * 1000;
- }
- usleep(tmp_time * 1000);
+ usleep(next_output_wait_ms * 1000);
+
+ clock_gettime(CLOCK_MONOTONIC, &current_time);
+ sf_status_output(g_sf_status);
+ last_output_time = current_time;
+
+ usleep((next_check_wait_ms - next_output_wait_ms) * 1000);
}
}
bfd_vtysh_close(&client);
diff --git a/platform/src/main.cpp b/platform/src/main.cpp
index 6cf813e..1f58b91 100644
--- a/platform/src/main.cpp
+++ b/platform/src/main.cpp
@@ -9,7 +9,6 @@
#include "log.h"
#include "utils.h"
#include "sf_metrics.h"
-#include "health_check.h"
#include "global_metrics.h"
struct breakpad_instance *g_breakpad = NULL;
@@ -75,7 +74,7 @@ static void *worker_thread_cycle(void *arg)
int n_packet_recved = 0;
char thread_name[16];
uint64_t sf_metrics_last_send_ts = timestamp_get_msec(ts);
- uint64_t sf_metrics_send_interval = sf_metrics_get_interval(sf_metrics) * 1000;
+ uint64_t sf_metrics_send_interval = sf_metrics_get_interval(sf_metrics);
ATOMIC_SET(&thread_ctx->thread_is_runing, 1);
snprintf(thread_name, sizeof(thread_name), "sce:worker-%d", thread_index);
@@ -183,8 +182,6 @@ int main(int argc, char **argv)
g_breakpad = breakpad_init(profile, "system", g_default_logger, __sce_version);
- health_check_session_init(profile);
-
struct sce_ctx *ctx = sce_ctx_create(profile);
if (ctx == NULL)
{
diff --git a/platform/src/policy.cpp b/platform/src/policy.cpp
index bb6cf79..7fc12af 100644
--- a/platform/src/policy.cpp
+++ b/platform/src/policy.cpp
@@ -976,7 +976,7 @@ static void sf_param_free_cb(int table_id, void **ad, long argl, void *argp)
{
if (param->sf_connectivity.method != ENCAPSULATE_METHOD_LAYER2_SWITCH)
{
- health_check_session_del(param->health_check_session_id, param->sf_profile_id);
+ health_check_session_del(param->health_check_session_id, param->sf_profile_id, param->sf_vsys_id);
}
LOG_INFO("%s: Del sf profile: %d", LOG_TAG_POLICY, param->sf_profile_id);
free(param);
@@ -1296,16 +1296,24 @@ const char *action_desc_tostring(enum action_desc action_desc)
switch (action_desc)
{
// success action
- case ACTION_FORWAED_DUE_SELECTED_SF: return "forward";
+ case ACTION_FORWAED_DUE_SELECTED_SF:
+ return "forward";
// failure action
- case ACTION_BYPASS_DUE_FAILURE_ACTION: return "bypass";
- case ACTION_BLOCK_DUE_FAILURE_ACTION: return "block";
- case ACTION_BLOCK_DUE_UNAVAILABLE_ACTION: return "re-dispatch block";
- case ACTION_BYPASS_DUE_UNAVAILABLE_ACTION: return "re-dispatch bypass";
- case ACTION_BYPASS_DUE_HEALTH_SF_LIMIT: return "re-dispatch bypass(health SF limit)";
+ case ACTION_BYPASS_DUE_FAILURE_ACTION:
+ return "bypass";
+ case ACTION_BLOCK_DUE_FAILURE_ACTION:
+ return "block";
+ case ACTION_BLOCK_DUE_UNAVAILABLE_ACTION:
+ return "re-dispatch block";
+ case ACTION_BYPASS_DUE_UNAVAILABLE_ACTION:
+ return "re-dispatch bypass";
+ case ACTION_BYPASS_DUE_HEALTH_SF_LIMIT:
+ return "re-dispatch bypass(health SF limit)";
// default action
- case ACTION_BYPASS_DUE_DEFAULT: return "bypass(default)";
- case ACTION_BYPASS_DUE_INVALID_POLICY: return "bypass(invalid policy)";
+ case ACTION_BYPASS_DUE_DEFAULT:
+ return "bypass(default)";
+ case ACTION_BYPASS_DUE_INVALID_POLICY:
+ return "bypass(invalid policy)";
// unreachable
default:
return "action unknown";
diff --git a/platform/src/sce.cpp b/platform/src/sce.cpp
index fe4ba89..fab0db8 100644
--- a/platform/src/sce.cpp
+++ b/platform/src/sce.cpp
@@ -3,6 +3,7 @@
#include "sce.h"
#include "log.h"
+#include "health_check.h"
char *memdup(const char *src, int len)
{
@@ -91,6 +92,7 @@ struct sce_ctx *sce_ctx_create(const char *profile)
{
goto error_out;
}
+ health_check_session_init(profile, sce_ctx->kfk);
sce_ctx->ts = timestamp_new(sce_ctx->ts_update_interval_ms);
sce_ctx->sf_metrics = sf_metrics_create(profile, sce_ctx->kfk);
diff --git a/platform/src/sf_metrics.cpp b/platform/src/sf_metrics.cpp
index 90d3200..e56d05d 100644
--- a/platform/src/sf_metrics.cpp
+++ b/platform/src/sf_metrics.cpp
@@ -94,7 +94,7 @@ static void *fs2kafka_thread_cycle(void *arg)
free(ptr);
}
- usleep(handle->cfg.output_fs_interval_ms * 1000);
+ usleep(handle->cfg.output_kafka_interval_ms * 1000);
}
ATOMIC_SET(&handle->thr_is_runing, 0);
diff --git a/platform/src/sf_status.cpp b/platform/src/sf_status.cpp
index 6ba84c1..c91cc2f 100644
--- a/platform/src/sf_status.cpp
+++ b/platform/src/sf_status.cpp
@@ -4,217 +4,189 @@
#include <arpa/inet.h>
#include <sys/socket.h>
#include <MESA/MESA_prof_load.h>
+#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
#include "sf_status.h"
+#include "uthash.h"
-#define SCE_SF_STATUS "service_function_status,vsys_id=%d,sf_profile_id=%d sf_status=%d,sf_latency_us=%d"
-
-struct node
+struct metric
{
- int sf_vsys_id;
- int sf_profile_id;
+ struct sf_status_key key;
+
int sf_status;
int sf_latency;
UT_hash_handle hh;
};
-struct sf_status_config
+struct config
{
- int enable;
- int interval_s;
- int telegraf_listen_port;
- char telegraf_bind_address[2048];
+ int output_kafka_interval_ms;
+ char data_center[256];
+ char device_group[256];
+ char device_id[256];
};
struct sf_status
{
- struct sf_status_config config;
- struct sockaddr_in sock_addr;
- int sockfd;
+ struct config cfg;
- struct node *htable;
- uint64_t htable_elem_count;
+ int sf_status_idx;
+ int sf_latency_idx;
+
+ struct kafka *kfk;
+ struct fieldstat_easy *fs;
+ struct metric *htable;
};
-static void sf_status_parse_config(const char *profile, struct sf_status_config *config)
-{
- MESA_load_profile_int_def(profile, "METRICS", "enable", &(config->enable), 1);
- MESA_load_profile_int_def(profile, "METRICS", "interval_s", &(config->interval_s), 1);
- MESA_load_profile_int_def(profile, "METRICS", "telegraf_listen_port", &(config->telegraf_listen_port), 8300);
- MESA_load_profile_string_def(profile, "METRICS", "telegraf_bind_address", config->telegraf_bind_address, sizeof(config->telegraf_bind_address), "127.0.0.1");
-
- LOG_DEBUG("%s: METRICS->enable : %d", LOG_TAG_SFSTATUS, config->enable);
- LOG_DEBUG("%s: METRICS->interval_s : %d", LOG_TAG_SFSTATUS, config->interval_s);
- LOG_DEBUG("%s: METRICS->telegraf_listen_port : %d", LOG_TAG_SFSTATUS, config->telegraf_listen_port);
- LOG_DEBUG("%s: METRICS->telegraf_bind_address : %s", LOG_TAG_SFSTATUS, config->telegraf_bind_address);
-}
+/******************************************************************************
+ * Public API
+ ******************************************************************************/
-void sf_status_destory(struct sf_status *handle)
+struct sf_status *sf_status_create(const char *profile, struct kafka *kfk)
{
- if (handle)
+ struct sf_status *handle = (struct sf_status *)calloc(1, sizeof(struct sf_status));
+ if (!handle)
{
- if (handle->sockfd)
- {
- close(handle->sockfd);
- handle->sockfd = -1;
- }
-
- struct node *temp = NULL;
- struct node *node = NULL;
- HASH_ITER(hh, handle->htable, node, temp)
- {
- HASH_DELETE(hh, handle->htable, node);
- free(node);
- node = NULL;
- }
-
- handle->htable_elem_count = 0;
- free(handle);
- handle = NULL;
+ return NULL;
}
-}
-struct sf_status *sf_status_create(const char *profile)
-{
- struct sf_status *handle = (struct sf_status *)calloc(1, sizeof(struct sf_status));
- assert(handle);
- sf_status_parse_config(profile, &(handle->config));
+ MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000);
+ MESA_load_profile_string_def(profile, "metrics", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), "");
+ MESA_load_profile_string_def(profile, "metrics", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), "");
+ MESA_load_profile_string_def(profile, "metrics", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), "");
- if (handle->config.enable == 0)
- {
- return handle;
- }
+ const struct fieldstat_tag tags[] = {
+ {"data_center", TAG_CSTRING, {.value_str = handle->cfg.data_center}},
+ {"device_group", TAG_CSTRING, {.value_str = handle->cfg.device_group}},
+ {"device_id", TAG_CSTRING, {.value_str = handle->cfg.device_id}},
+ };
- handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- handle->sock_addr.sin_family = AF_INET;
- handle->sock_addr.sin_port = htons(handle->config.telegraf_listen_port);
- handle->sock_addr.sin_addr.s_addr = inet_addr(handle->config.telegraf_bind_address);
- handle->htable_elem_count = 0;
- if (handle->sockfd == -1)
+ handle->kfk = kfk;
+ handle->fs = fieldstat_easy_new(1, "service_function_status", tags, sizeof(tags) / sizeof(tags[0]));
+ if (!handle->fs)
{
- LOG_ERROR("%s: failed to create udp sockfd %s:%d, errno: %d, %s", LOG_TAG_SFSTATUS, handle->config.telegraf_bind_address, handle->config.telegraf_listen_port, errno, strerror(errno));
- sf_status_destory(handle);
- return NULL;
+ goto error_out;
}
+ handle->sf_status_idx = fieldstat_easy_register_counter(handle->fs, "sf_status");
+ handle->sf_latency_idx = fieldstat_easy_register_counter(handle->fs, "sf_latency_us");
+
return handle;
+
+error_out:
+ sf_status_destory(handle);
+ return NULL;
}
-void sf_status_reset(struct sf_status *handle)
+void sf_status_destory(struct sf_status *handle)
{
- if (handle == NULL || handle->config.enable == 0)
+ if (handle)
{
- return;
- }
-
- LOG_DEBUG("%s: reset: elem_num %lu", LOG_TAG_SFSTATUS, handle->htable_elem_count);
+ struct metric *temp = NULL;
+ struct metric *node = NULL;
+ HASH_ITER(hh, handle->htable, node, temp)
+ {
+ HASH_DELETE(hh, handle->htable, node);
+ free(node);
+ node = NULL;
+ }
- struct node *temp = NULL;
- struct node *node = NULL;
- HASH_ITER(hh, handle->htable, node, temp)
- {
- HASH_DELETE(hh, handle->htable, node);
+ if (handle->fs)
+ {
+ fieldstat_easy_free(handle->fs);
+ handle->fs = NULL;
+ }
- free(node);
- node = NULL;
- handle->htable_elem_count--;
+ free(handle);
+ handle = NULL;
}
}
-void sf_status_delete(struct sf_status *handle, int sf_profile_id)
+void sf_status_delete(struct sf_status *handle, const struct sf_status_key *key)
{
- if (handle == NULL || handle->config.enable == 0)
+ if (!handle)
{
return;
}
- struct node *temp = NULL;
- HASH_FIND(hh, handle->htable, &sf_profile_id, sizeof(sf_profile_id), temp);
+ struct metric *temp = NULL;
+ HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp);
if (temp)
{
- handle->htable_elem_count--;
- LOG_DEBUG("%s: delete: sf_profile %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, handle->htable_elem_count);
HASH_DELETE(hh, handle->htable, temp);
free(temp);
temp = NULL;
}
- else
- {
- LOG_DEBUG("%s: delete: sf_profile %d not exists, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, handle->htable_elem_count);
- }
}
-void sf_status_update(struct sf_status *handle, int sf_vsys_id, int sf_profile_id, int sf_status, int sf_latency)
+void sf_status_update(struct sf_status *handle, const struct sf_status_key *key, int sf_status, int sf_latency)
{
- if (handle == NULL || handle->config.enable == 0)
+ if (!handle)
{
return;
}
- struct node *temp = NULL;
- HASH_FIND(hh, handle->htable, &sf_profile_id, sizeof(sf_profile_id), temp);
+ struct metric *temp = NULL;
+ HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp);
if (temp)
{
- if (temp->sf_status != sf_status)
- {
- LOG_DEBUG("%s: update: sf_profile %d status %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, sf_status, handle->htable_elem_count);
- }
- temp->sf_vsys_id = sf_vsys_id;
- temp->sf_profile_id = sf_profile_id;
temp->sf_status = sf_status;
temp->sf_latency = sf_latency;
}
else
{
- handle->htable_elem_count++;
- LOG_DEBUG("%s: insert: sf_profile %d status %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, sf_status, handle->htable_elem_count);
- temp = (struct node *)calloc(1, sizeof(struct node));
- temp->sf_vsys_id = sf_vsys_id;
- temp->sf_profile_id = sf_profile_id;
+ temp = (struct metric *)calloc(1, sizeof(struct metric));
+ temp->key.vsys_id = key->vsys_id;
+ temp->key.sf_profile_id = key->sf_profile_id;
temp->sf_status = sf_status;
temp->sf_latency = sf_latency;
-
- HASH_ADD(hh, handle->htable, sf_profile_id, sizeof(sf_profile_id), temp);
+ HASH_ADD(hh, handle->htable, key, sizeof(struct sf_status_key), temp);
}
}
-void sf_status_send(struct sf_status *handle)
+void sf_status_output(struct sf_status *handle)
{
- char buff[2048];
- int nsend = 0;
- int size = sizeof(buff);
-
- struct node *temp = NULL;
- struct node *node = NULL;
-
- if (handle == NULL || handle->config.enable == 0)
+ if (!handle)
{
return;
}
+ struct metric *temp = NULL;
+ struct metric *node = NULL;
HASH_ITER(hh, handle->htable, node, temp)
{
- memset(buff, 0, size);
- nsend = snprintf(buff, size, SCE_SF_STATUS,
- node->sf_vsys_id,
- node->sf_profile_id,
- node->sf_status,
- node->sf_latency);
- sendto(handle->sockfd, buff, nsend, 0, (struct sockaddr *)&handle->sock_addr, sizeof(handle->sock_addr));
+ const struct fieldstat_tag tags[] = {
+ {"vsys_id", TAG_INTEGER, {.value_longlong = node->key.vsys_id}},
+ {"sf_profile_id", TAG_INTEGER, {.value_longlong = node->key.sf_profile_id}},
+ };
+
+ fieldstat_easy_counter_set(handle->fs, 0, handle->sf_status_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_status);
+ fieldstat_easy_counter_set(handle->fs, 0, handle->sf_latency_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_latency);
+ }
+
+ char **ptr = NULL;
+ size_t len = 0;
+ fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len);
+ if (ptr)
+ {
+ for (size_t i = 0; i < len; i++)
+ {
+ kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i]));
+ free(ptr[i]);
+ ptr[i] = NULL;
+ }
+ free(ptr);
}
}
-int sf_status_get_interval(struct sf_status *handle)
+int sf_status_get_ouput_interval_ms(struct sf_status *handle)
{
- if (handle == NULL)
+ if (!handle)
{
return 0;
}
- else
- {
- return handle->config.interval_s;
- }
+ return handle->cfg.output_kafka_interval_ms;
} \ No newline at end of file
diff --git a/platform/test/gtest_sf_status.cpp b/platform/test/gtest_sf_status.cpp
index d34ee8a..8f1cfa7 100644
--- a/platform/test/gtest_sf_status.cpp
+++ b/platform/test/gtest_sf_status.cpp
@@ -4,12 +4,36 @@
TEST(SF_STATUS, TEST)
{
- struct sf_status *status = sf_status_create("./test_resource/sce.conf");
- EXPECT_TRUE(sf_status_get_interval(status) == 1);
- sf_status_update(status, 11, 1, 0, 0);
- sf_status_update(status, 22, 2, 1, 1);
- sf_status_send(status);
+ struct kafka *kfk = kafka_create("./test_resource/sce.conf");
+ EXPECT_TRUE(kfk != NULL);
+ struct sf_status *status = sf_status_create("./test_resource/sce.conf", kfk);
+ EXPECT_TRUE(status != NULL);
+
+ EXPECT_TRUE(sf_status_get_ouput_interval_ms(status) == 1000);
+
+ struct sf_status_key key1 = {0};
+ key1.vsys_id = 11;
+ key1.sf_profile_id = 12;
+
+ struct sf_status_key key2 = {0};
+ key2.vsys_id = 21;
+ key2.sf_profile_id = 22;
+
+ sf_status_update(status, &key1, 1, 2);
+ sf_status_update(status, &key2, 2, 1);
+ printf("\n========================================\n expect key1 + key2 \n========================================\n");
+ sf_status_output(status);
+
+ sf_status_delete(status, &key1);
+ printf("\n========================================\n expect only key2 \n========================================\n");
+ sf_status_output(status);
+
+ sf_status_delete(status, &key2);
+ printf("\n========================================\n expect no output \n========================================\n");
+ sf_status_output(status);
+
sf_status_destory(status);
+ kafka_destroy(kfk);
}
int main(int argc, char **argv)
diff --git a/platform/test/test_resource/sce.conf b/platform/test/test_resource/sce.conf
index 04360ee..b0e6c93 100644
--- a/platform/test/test_resource/sce.conf
+++ b/platform/test/test_resource/sce.conf
@@ -23,11 +23,6 @@ redis_server=127.0.0.1
redis_port_range=6379
[metrics]
-# Kafka Topic: POLICY-RULE-METRIC
-enable=1
-interval_s=1
-telegraf_bind_address=127.0.0.1
-telegraf_listen_port=8300
output_fs_interval_ms=500
output_kafka_interval_ms=1000
data_center=center-xxg-tsgx