diff options
| author | 童宗振 <[email protected]> | 2024-07-10 07:08:27 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-07-10 07:08:27 +0000 |
| commit | 9c989c18759903f09b155d86311fdecc36b15d42 (patch) | |
| tree | 87d07e43a24d6ec75d88c82c28ff71bc72e605a2 /src | |
| parent | e2200bb6c2cd93554b06d0193e4f64cee03376d9 (diff) | |
(TSG-21729)kafka send egress_action filed
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/kafka.c | 150 | ||||
| -rw-r--r-- | src/mocking.c | 147 | ||||
| -rw-r--r-- | src/trace_output.c | 4 |
4 files changed, 153 insertions, 149 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 79f6315..8a47cd1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,7 +10,6 @@ set(DP_TELEMETRY_SRC ${CMAKE_CURRENT_SOURCE_DIR}/kafka.c ${CMAKE_CURRENT_SOURCE_DIR}/maat.c ${CMAKE_CURRENT_SOURCE_DIR}/monit.c - ${CMAKE_CURRENT_SOURCE_DIR}/mocking.c ${CMAKE_CURRENT_SOURCE_DIR}/http_serv.c ${CMAKE_CURRENT_SOURCE_DIR}/logger.c ${CMAKE_SOURCE_DIR}/support/mpack/mpack.c) diff --git a/src/kafka.c b/src/kafka.c index bd426d0..91c7b9c 100644 --- a/src/kafka.c +++ b/src/kafka.c @@ -1,6 +1,7 @@ #include "kafka.h" #include "common.h" #include <errno.h> +#include <mpack.h> rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd, uint32_t kafka_queue_size) @@ -121,4 +122,153 @@ int kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len) } #endif return ret; +} + +int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len) +{ + struct measurements + { + int32_t tv_sec; + int32_t tv_nsec; + const char * app; + uint32_t app_len; + const char * comments; + uint32_t comments_len; + }; + + struct dp_trace_message_pack + { + int64_t microseconds; + char job_id_str[MR_SYMBOL_MAX]; + char sled_ip[INET6_ADDRSTRLEN]; + char device_group[MR_SYMBOL_MAX]; + int32_t traffic_link_id; + char source_ip[INET6_ADDRSTRLEN]; + int32_t source_port; + char server_ip[INET6_ADDRSTRLEN]; + int32_t server_port; + int32_t egress_action; + const char * packet; + int32_t packet_length; + int32_t measurements_num; + struct measurements record[128]; + }; + + struct dp_trace_message_pack packet = {}; + + mpack_tree_t tree; + mpack_tree_init_data(&tree, payload, len); + mpack_tree_parse(&tree); + + mpack_node_t root = mpack_tree_root(&tree); + packet.microseconds = mpack_node_i64(mpack_node_map_cstr(root, "timestamp_us")); + + mpack_node_copy_cstr(mpack_node_map_cstr(root, "job_id"), packet.job_id_str, sizeof(packet.job_id_str)); + + mpack_node_copy_cstr(mpack_node_map_cstr(root, "sled_ip"), packet.sled_ip, sizeof(packet.sled_ip)); + + mpack_node_copy_cstr(mpack_node_map_cstr(root, "device_group"), packet.device_group, sizeof(packet.device_group)); + + mpack_node_t traffic_link_id_node = mpack_node_map_cstr(root, "traffic_link_id"); + if (!mpack_node_is_nil(traffic_link_id_node)) + { + packet.traffic_link_id = mpack_node_i32(traffic_link_id_node); + } + + mpack_node_t source_ip_node = mpack_node_map_cstr(root, "source_ip"); + if (!mpack_node_is_nil(source_ip_node)) + { + mpack_node_copy_cstr(source_ip_node, packet.source_ip, sizeof(packet.source_ip)); + } + + mpack_node_t source_port_node = mpack_node_map_cstr(root, "source_port"); + if (!mpack_node_is_nil(source_port_node)) + { + packet.source_port = mpack_node_i32(source_port_node); + } + + mpack_node_t destination_ip_node = mpack_node_map_cstr(root, "destination_ip"); + if (!mpack_node_is_nil(destination_ip_node)) + { + mpack_node_copy_cstr(destination_ip_node, packet.server_ip, sizeof(packet.server_ip)); + } + + mpack_node_t destination_port_node = mpack_node_map_cstr(root, "destination_port"); + if (!mpack_node_is_nil(destination_port_node)) + { + packet.server_port = mpack_node_i32(destination_port_node); + } + + packet.egress_action = mpack_node_i32(mpack_node_map_cstr(root, "egress_action")); + + packet.packet = mpack_node_bin_data(mpack_node_map_cstr(root, "packet")); + + packet.packet_length = mpack_node_i32(mpack_node_map_cstr(root, "packet_length")); + + mpack_node_t measurements_val = mpack_node_map_cstr(root, "measurements"); + packet.measurements_num = mpack_node_array_length(mpack_node_map_cstr(root, "measurements")); + for (int i = 0; i < packet.measurements_num; i++) + { + if (i >= 128) + { + zlog_debug(logger, "too many measurements..."); + continue; + } + mpack_node_t measurement = mpack_node_array_at(measurements_val, i); + packet.record[i].tv_sec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_sec")); + packet.record[i].tv_nsec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_nsec")); + packet.record[i].app = mpack_node_str(mpack_node_map_cstr(measurement, "app")); + packet.record[i].app_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "app")); + packet.record[i].comments = mpack_node_str(mpack_node_map_cstr(measurement, "comments")); + packet.record[i].comments_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "comments")); + } + + // print + zlog_debug(logger, "microseconds %ld", packet.microseconds); + zlog_debug(logger, "job_id %s", packet.job_id_str); + zlog_debug(logger, "sled_ip %s", packet.sled_ip); + zlog_debug(logger, "device_group %s", packet.device_group); + zlog_debug(logger, "traffic_link_id %d", packet.traffic_link_id); + + if (strlen(packet.source_ip) == 0) + { + zlog_debug(logger, "source_ip is empty"); + } + else + { + zlog_debug(logger, "source_ip %s", packet.source_ip); + } + + zlog_debug(logger, "source_port %d", packet.source_port); + + if (strlen(packet.server_ip) == 0) + { + zlog_debug(logger, "server_ip is empty"); + } + else + { + zlog_debug(logger, "server_ip %s", packet.server_ip); + } + + zlog_debug(logger, "server_port %d", packet.server_port); + + zlog_debug(logger, "egress_action %d", packet.egress_action); + + zlog_debug(logger, "packet_length %d", packet.packet_length); + + if (packet.measurements_num == 0) + { + zlog_debug(logger, "measurements num is zero"); + } + + for (int i = 0; i < packet.measurements_num; i++) + { + zlog_debug(logger, "record %u:", i); + zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec); + zlog_debug(logger, "tv_nsec %d", packet.record[i].tv_nsec); + zlog_debug(logger, "app %.*s", packet.record[i].app_len, packet.record[i].app); + zlog_debug(logger, "comments %.*s", packet.record[i].comments_len, packet.record[i].comments); + } + + return 0; }
\ No newline at end of file diff --git a/src/mocking.c b/src/mocking.c deleted file mode 100644 index 3255261..0000000 --- a/src/mocking.c +++ /dev/null @@ -1,147 +0,0 @@ -#include "mocking.h" -#include "common.h" -#include <mpack.h> - -int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len) -{ - struct measurements - { - int32_t tv_sec; - int32_t tv_nsec; - const char * app; - uint32_t app_len; - const char * comments; - uint32_t comments_len; - }; - - struct dp_trace_message_pack - { - int64_t microseconds; - char job_id_str[MR_SYMBOL_MAX]; - char sled_ip[INET6_ADDRSTRLEN]; - char device_group[MR_SYMBOL_MAX]; - int32_t traffic_link_id; - char source_ip[INET6_ADDRSTRLEN]; - int32_t source_port; - char server_ip[INET6_ADDRSTRLEN]; - int32_t server_port; - const char * packet; - int32_t packet_length; - int32_t measurements_num; - struct measurements record[128]; - }; - - struct dp_trace_message_pack packet = {}; - - mpack_tree_t tree; - mpack_tree_init_data(&tree, payload, len); - mpack_tree_parse(&tree); - - mpack_node_t root = mpack_tree_root(&tree); - packet.microseconds = mpack_node_i64(mpack_node_map_cstr(root, "timestamp_us")); - - mpack_node_copy_cstr(mpack_node_map_cstr(root, "job_id"), packet.job_id_str, sizeof(packet.job_id_str)); - - mpack_node_copy_cstr(mpack_node_map_cstr(root, "sled_ip"), packet.sled_ip, sizeof(packet.sled_ip)); - - mpack_node_copy_cstr(mpack_node_map_cstr(root, "device_group"), packet.device_group, sizeof(packet.device_group)); - - mpack_node_t traffic_link_id_node = mpack_node_map_cstr(root, "traffic_link_id"); - if (!mpack_node_is_nil(traffic_link_id_node)) - { - packet.traffic_link_id = mpack_node_i32(traffic_link_id_node); - } - - mpack_node_t source_ip_node = mpack_node_map_cstr(root, "source_ip"); - if (!mpack_node_is_nil(source_ip_node)) - { - mpack_node_copy_cstr(source_ip_node, packet.source_ip, sizeof(packet.source_ip)); - } - - mpack_node_t source_port_node = mpack_node_map_cstr(root, "source_port"); - if (!mpack_node_is_nil(source_port_node)) - { - packet.source_port = mpack_node_i32(source_port_node); - } - - mpack_node_t destination_ip_node = mpack_node_map_cstr(root, "destination_ip"); - if (!mpack_node_is_nil(destination_ip_node)) - { - mpack_node_copy_cstr(destination_ip_node, packet.server_ip, sizeof(packet.server_ip)); - } - - mpack_node_t destination_port_node = mpack_node_map_cstr(root, "destination_port"); - if (!mpack_node_is_nil(destination_port_node)) - { - packet.server_port = mpack_node_i32(destination_port_node); - } - - packet.packet = mpack_node_bin_data(mpack_node_map_cstr(root, "packet")); - - packet.packet_length = mpack_node_i32(mpack_node_map_cstr(root, "packet_length")); - - mpack_node_t measurements_val = mpack_node_map_cstr(root, "measurements"); - packet.measurements_num = mpack_node_array_length(mpack_node_map_cstr(root, "measurements")); - for (int i = 0; i < packet.measurements_num; i++) - { - if (i >= 128) - { - zlog_debug(logger, "too many measurements..."); - continue; - } - mpack_node_t measurement = mpack_node_array_at(measurements_val, i); - packet.record[i].tv_sec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_sec")); - packet.record[i].tv_nsec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_nsec")); - packet.record[i].app = mpack_node_str(mpack_node_map_cstr(measurement, "app")); - packet.record[i].app_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "app")); - packet.record[i].comments = mpack_node_str(mpack_node_map_cstr(measurement, "comments")); - packet.record[i].comments_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "comments")); - } - - // print - zlog_debug(logger, "microseconds %ld", packet.microseconds); - zlog_debug(logger, "job_id %s", packet.job_id_str); - zlog_debug(logger, "sled_ip %s", packet.sled_ip); - zlog_debug(logger, "device_group %s", packet.device_group); - zlog_debug(logger, "traffic_link_id %d", packet.traffic_link_id); - - if (strlen(packet.source_ip) == 0) - { - zlog_debug(logger, "source_ip is empty"); - } - else - { - zlog_debug(logger, "source_ip %s", packet.source_ip); - } - - zlog_debug(logger, "source_port %d", packet.source_port); - - if (strlen(packet.server_ip) == 0) - { - zlog_debug(logger, "server_ip is empty"); - } - else - { - zlog_debug(logger, "server_ip %s", packet.server_ip); - } - - zlog_debug(logger, "server_port %d", packet.server_port); - - zlog_debug(logger, "packet_length %d", packet.packet_length); - - if (packet.measurements_num == 0) - { - zlog_debug(logger, "measurements num is zero"); - } - - for (int i = 0; i < packet.measurements_num; i++) - { - zlog_debug(logger, "record %u:", i); - zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec); - zlog_debug(logger, "tv_nsec %d", packet.record[i].tv_nsec); - zlog_debug(logger, "app %.*s", packet.record[i].app_len, packet.record[i].app); - zlog_debug(logger, "comments %.*s", packet.record[i].comments_len, packet.record[i].comments); - } - - return 0; -}
\ No newline at end of file diff --git a/src/trace_output.c b/src/trace_output.c index 5fa66c4..b340ec7 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -4,7 +4,6 @@ #include "job_ctx.h" #include "kafka.h" #include "logger.h" -#include "mocking.h" #include "monit.h" #include "pcapng.h" @@ -576,6 +575,9 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat mpack_write_i32(&writer, trace_buff_info.inner_dst_port); } + mpack_write_cstr(&writer, "egress_action"); + mpack_write_i32(&writer, trace_buff_info.egress_action); + unsigned int snaplen = trace_buff_info.snaplen; snaplen = (snaplen < marsio_buff_datalen(mr_mbuf)) ? snaplen : marsio_buff_datalen(mr_mbuf); |
