#include #include #include #include #include #include #include #include "log.h" #include "utils.h" #include "sf_status.h" #include "uthash.h" struct metric { struct sf_status_key key; int sf_status; int sf_latency; UT_hash_handle hh; }; struct config { int output_kafka_interval_ms; char data_center[256]; char device_group[256]; char device_id[256]; }; struct sf_status { struct config cfg; int sf_status_idx; int sf_latency_idx; struct kafka *kfk; struct fieldstat_easy *fs; struct metric *htable; }; /****************************************************************************** * Public API ******************************************************************************/ struct sf_status *sf_status_create(const char *profile, struct kafka *kfk) { struct sf_status *handle = (struct sf_status *)calloc(1, sizeof(struct sf_status)); if (!handle) { return NULL; } MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000); MESA_load_profile_string_def(profile, "metrics", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), ""); MESA_load_profile_string_def(profile, "metrics", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), ""); MESA_load_profile_string_def(profile, "metrics", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), ""); const struct field tags[] = { {"data_center", FIELD_VALUE_CSTRING, {.value_str = handle->cfg.data_center}}, {"device_group", FIELD_VALUE_CSTRING, {.value_str = handle->cfg.device_group}}, {"device_id", FIELD_VALUE_CSTRING, {.value_str = handle->cfg.device_id}}, }; handle->kfk = kfk; handle->fs = fieldstat_easy_new(1, "service_function_status", tags, sizeof(tags) / sizeof(tags[0])); if (!handle->fs) { goto error_out; } handle->sf_status_idx = fieldstat_easy_register_counter(handle->fs, "sf_status"); handle->sf_latency_idx = fieldstat_easy_register_counter(handle->fs, "sf_latency_us"); return handle; error_out: sf_status_destory(handle); return NULL; } void sf_status_destory(struct sf_status *handle) { if (handle) { struct metric *temp = NULL; struct metric *node = NULL; HASH_ITER(hh, handle->htable, node, temp) { HASH_DELETE(hh, handle->htable, node); free(node); node = NULL; } if (handle->fs) { fieldstat_easy_free(handle->fs); handle->fs = NULL; } free(handle); handle = NULL; } } void sf_status_delete(struct sf_status *handle, const struct sf_status_key *key) { if (!handle) { return; } struct metric *temp = NULL; HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp); if (temp) { HASH_DELETE(hh, handle->htable, temp); free(temp); temp = NULL; } } void sf_status_update(struct sf_status *handle, const struct sf_status_key *key, int sf_status, int sf_latency) { if (!handle) { return; } struct metric *temp = NULL; HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp); if (temp) { temp->sf_status = sf_status; temp->sf_latency = sf_latency; } else { temp = (struct metric *)calloc(1, sizeof(struct metric)); temp->key.vsys_id = key->vsys_id; uuid_copy(temp->key.sf_uuid, key->sf_uuid); temp->sf_status = sf_status; temp->sf_latency = sf_latency; HASH_ADD(hh, handle->htable, key, sizeof(struct sf_status_key), temp); } } void sf_status_output(struct sf_status *handle) { if (!handle) { return; } char sf_uuid_str[UUID_STRING_SIZE] = {0}; struct metric *temp = NULL; struct metric *node = NULL; HASH_ITER(hh, handle->htable, node, temp) { uuid_unparse(node->key.sf_uuid, sf_uuid_str); const struct field tags[] = { {"vsys_id", FIELD_VALUE_INTEGER, {.value_longlong = node->key.vsys_id}}, {"sf_profile_uuid", FIELD_VALUE_CSTRING, {.value_str = sf_uuid_str}}, }; fieldstat_easy_counter_set(handle->fs, 0, handle->sf_status_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_status); fieldstat_easy_counter_set(handle->fs, 0, handle->sf_latency_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_latency); } char **ptr = NULL; size_t len = 0; fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len); if (ptr) { for (size_t i = 0; i < len; i++) { kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); free(ptr[i]); ptr[i] = NULL; } free(ptr); } } int sf_status_get_ouput_interval_ms(struct sf_status *handle) { if (!handle) { return 0; } return handle->cfg.output_kafka_interval_ms; }