summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-07-10 07:08:27 +0000
committer童宗振 <[email protected]>2024-07-10 07:08:27 +0000
commit9c989c18759903f09b155d86311fdecc36b15d42 (patch)
tree87d07e43a24d6ec75d88c82c28ff71bc72e605a2 /src
parente2200bb6c2cd93554b06d0193e4f64cee03376d9 (diff)
(TSG-21729)kafka send egress_action filed
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/kafka.c150
-rw-r--r--src/mocking.c147
-rw-r--r--src/trace_output.c4
4 files changed, 153 insertions, 149 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 79f6315..8a47cd1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -10,7 +10,6 @@ set(DP_TELEMETRY_SRC
${CMAKE_CURRENT_SOURCE_DIR}/kafka.c
${CMAKE_CURRENT_SOURCE_DIR}/maat.c
${CMAKE_CURRENT_SOURCE_DIR}/monit.c
- ${CMAKE_CURRENT_SOURCE_DIR}/mocking.c
${CMAKE_CURRENT_SOURCE_DIR}/http_serv.c
${CMAKE_CURRENT_SOURCE_DIR}/logger.c
${CMAKE_SOURCE_DIR}/support/mpack/mpack.c)
diff --git a/src/kafka.c b/src/kafka.c
index bd426d0..91c7b9c 100644
--- a/src/kafka.c
+++ b/src/kafka.c
@@ -1,6 +1,7 @@
#include "kafka.h"
#include "common.h"
#include <errno.h>
+#include <mpack.h>
rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd,
uint32_t kafka_queue_size)
@@ -121,4 +122,153 @@ int kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
}
#endif
return ret;
+}
+
+int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len)
+{
+ struct measurements
+ {
+ int32_t tv_sec;
+ int32_t tv_nsec;
+ const char * app;
+ uint32_t app_len;
+ const char * comments;
+ uint32_t comments_len;
+ };
+
+ struct dp_trace_message_pack
+ {
+ int64_t microseconds;
+ char job_id_str[MR_SYMBOL_MAX];
+ char sled_ip[INET6_ADDRSTRLEN];
+ char device_group[MR_SYMBOL_MAX];
+ int32_t traffic_link_id;
+ char source_ip[INET6_ADDRSTRLEN];
+ int32_t source_port;
+ char server_ip[INET6_ADDRSTRLEN];
+ int32_t server_port;
+ int32_t egress_action;
+ const char * packet;
+ int32_t packet_length;
+ int32_t measurements_num;
+ struct measurements record[128];
+ };
+
+ struct dp_trace_message_pack packet = {};
+
+ mpack_tree_t tree;
+ mpack_tree_init_data(&tree, payload, len);
+ mpack_tree_parse(&tree);
+
+ mpack_node_t root = mpack_tree_root(&tree);
+ packet.microseconds = mpack_node_i64(mpack_node_map_cstr(root, "timestamp_us"));
+
+ mpack_node_copy_cstr(mpack_node_map_cstr(root, "job_id"), packet.job_id_str, sizeof(packet.job_id_str));
+
+ mpack_node_copy_cstr(mpack_node_map_cstr(root, "sled_ip"), packet.sled_ip, sizeof(packet.sled_ip));
+
+ mpack_node_copy_cstr(mpack_node_map_cstr(root, "device_group"), packet.device_group, sizeof(packet.device_group));
+
+ mpack_node_t traffic_link_id_node = mpack_node_map_cstr(root, "traffic_link_id");
+ if (!mpack_node_is_nil(traffic_link_id_node))
+ {
+ packet.traffic_link_id = mpack_node_i32(traffic_link_id_node);
+ }
+
+ mpack_node_t source_ip_node = mpack_node_map_cstr(root, "source_ip");
+ if (!mpack_node_is_nil(source_ip_node))
+ {
+ mpack_node_copy_cstr(source_ip_node, packet.source_ip, sizeof(packet.source_ip));
+ }
+
+ mpack_node_t source_port_node = mpack_node_map_cstr(root, "source_port");
+ if (!mpack_node_is_nil(source_port_node))
+ {
+ packet.source_port = mpack_node_i32(source_port_node);
+ }
+
+ mpack_node_t destination_ip_node = mpack_node_map_cstr(root, "destination_ip");
+ if (!mpack_node_is_nil(destination_ip_node))
+ {
+ mpack_node_copy_cstr(destination_ip_node, packet.server_ip, sizeof(packet.server_ip));
+ }
+
+ mpack_node_t destination_port_node = mpack_node_map_cstr(root, "destination_port");
+ if (!mpack_node_is_nil(destination_port_node))
+ {
+ packet.server_port = mpack_node_i32(destination_port_node);
+ }
+
+ packet.egress_action = mpack_node_i32(mpack_node_map_cstr(root, "egress_action"));
+
+ packet.packet = mpack_node_bin_data(mpack_node_map_cstr(root, "packet"));
+
+ packet.packet_length = mpack_node_i32(mpack_node_map_cstr(root, "packet_length"));
+
+ mpack_node_t measurements_val = mpack_node_map_cstr(root, "measurements");
+ packet.measurements_num = mpack_node_array_length(mpack_node_map_cstr(root, "measurements"));
+ for (int i = 0; i < packet.measurements_num; i++)
+ {
+ if (i >= 128)
+ {
+ zlog_debug(logger, "too many measurements...");
+ continue;
+ }
+ mpack_node_t measurement = mpack_node_array_at(measurements_val, i);
+ packet.record[i].tv_sec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_sec"));
+ packet.record[i].tv_nsec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_nsec"));
+ packet.record[i].app = mpack_node_str(mpack_node_map_cstr(measurement, "app"));
+ packet.record[i].app_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "app"));
+ packet.record[i].comments = mpack_node_str(mpack_node_map_cstr(measurement, "comments"));
+ packet.record[i].comments_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "comments"));
+ }
+
+ // print
+ zlog_debug(logger, "microseconds %ld", packet.microseconds);
+ zlog_debug(logger, "job_id %s", packet.job_id_str);
+ zlog_debug(logger, "sled_ip %s", packet.sled_ip);
+ zlog_debug(logger, "device_group %s", packet.device_group);
+ zlog_debug(logger, "traffic_link_id %d", packet.traffic_link_id);
+
+ if (strlen(packet.source_ip) == 0)
+ {
+ zlog_debug(logger, "source_ip is empty");
+ }
+ else
+ {
+ zlog_debug(logger, "source_ip %s", packet.source_ip);
+ }
+
+ zlog_debug(logger, "source_port %d", packet.source_port);
+
+ if (strlen(packet.server_ip) == 0)
+ {
+ zlog_debug(logger, "server_ip is empty");
+ }
+ else
+ {
+ zlog_debug(logger, "server_ip %s", packet.server_ip);
+ }
+
+ zlog_debug(logger, "server_port %d", packet.server_port);
+
+ zlog_debug(logger, "egress_action %d", packet.egress_action);
+
+ zlog_debug(logger, "packet_length %d", packet.packet_length);
+
+ if (packet.measurements_num == 0)
+ {
+ zlog_debug(logger, "measurements num is zero");
+ }
+
+ for (int i = 0; i < packet.measurements_num; i++)
+ {
+ zlog_debug(logger, "record %u:", i);
+ zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec);
+ zlog_debug(logger, "tv_nsec %d", packet.record[i].tv_nsec);
+ zlog_debug(logger, "app %.*s", packet.record[i].app_len, packet.record[i].app);
+ zlog_debug(logger, "comments %.*s", packet.record[i].comments_len, packet.record[i].comments);
+ }
+
+ return 0;
} \ No newline at end of file
diff --git a/src/mocking.c b/src/mocking.c
deleted file mode 100644
index 3255261..0000000
--- a/src/mocking.c
+++ /dev/null
@@ -1,147 +0,0 @@
-#include "mocking.h"
-#include "common.h"
-#include <mpack.h>
-
-int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len)
-{
- struct measurements
- {
- int32_t tv_sec;
- int32_t tv_nsec;
- const char * app;
- uint32_t app_len;
- const char * comments;
- uint32_t comments_len;
- };
-
- struct dp_trace_message_pack
- {
- int64_t microseconds;
- char job_id_str[MR_SYMBOL_MAX];
- char sled_ip[INET6_ADDRSTRLEN];
- char device_group[MR_SYMBOL_MAX];
- int32_t traffic_link_id;
- char source_ip[INET6_ADDRSTRLEN];
- int32_t source_port;
- char server_ip[INET6_ADDRSTRLEN];
- int32_t server_port;
- const char * packet;
- int32_t packet_length;
- int32_t measurements_num;
- struct measurements record[128];
- };
-
- struct dp_trace_message_pack packet = {};
-
- mpack_tree_t tree;
- mpack_tree_init_data(&tree, payload, len);
- mpack_tree_parse(&tree);
-
- mpack_node_t root = mpack_tree_root(&tree);
- packet.microseconds = mpack_node_i64(mpack_node_map_cstr(root, "timestamp_us"));
-
- mpack_node_copy_cstr(mpack_node_map_cstr(root, "job_id"), packet.job_id_str, sizeof(packet.job_id_str));
-
- mpack_node_copy_cstr(mpack_node_map_cstr(root, "sled_ip"), packet.sled_ip, sizeof(packet.sled_ip));
-
- mpack_node_copy_cstr(mpack_node_map_cstr(root, "device_group"), packet.device_group, sizeof(packet.device_group));
-
- mpack_node_t traffic_link_id_node = mpack_node_map_cstr(root, "traffic_link_id");
- if (!mpack_node_is_nil(traffic_link_id_node))
- {
- packet.traffic_link_id = mpack_node_i32(traffic_link_id_node);
- }
-
- mpack_node_t source_ip_node = mpack_node_map_cstr(root, "source_ip");
- if (!mpack_node_is_nil(source_ip_node))
- {
- mpack_node_copy_cstr(source_ip_node, packet.source_ip, sizeof(packet.source_ip));
- }
-
- mpack_node_t source_port_node = mpack_node_map_cstr(root, "source_port");
- if (!mpack_node_is_nil(source_port_node))
- {
- packet.source_port = mpack_node_i32(source_port_node);
- }
-
- mpack_node_t destination_ip_node = mpack_node_map_cstr(root, "destination_ip");
- if (!mpack_node_is_nil(destination_ip_node))
- {
- mpack_node_copy_cstr(destination_ip_node, packet.server_ip, sizeof(packet.server_ip));
- }
-
- mpack_node_t destination_port_node = mpack_node_map_cstr(root, "destination_port");
- if (!mpack_node_is_nil(destination_port_node))
- {
- packet.server_port = mpack_node_i32(destination_port_node);
- }
-
- packet.packet = mpack_node_bin_data(mpack_node_map_cstr(root, "packet"));
-
- packet.packet_length = mpack_node_i32(mpack_node_map_cstr(root, "packet_length"));
-
- mpack_node_t measurements_val = mpack_node_map_cstr(root, "measurements");
- packet.measurements_num = mpack_node_array_length(mpack_node_map_cstr(root, "measurements"));
- for (int i = 0; i < packet.measurements_num; i++)
- {
- if (i >= 128)
- {
- zlog_debug(logger, "too many measurements...");
- continue;
- }
- mpack_node_t measurement = mpack_node_array_at(measurements_val, i);
- packet.record[i].tv_sec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_sec"));
- packet.record[i].tv_nsec = mpack_node_i32(mpack_node_map_cstr(measurement, "tv_nsec"));
- packet.record[i].app = mpack_node_str(mpack_node_map_cstr(measurement, "app"));
- packet.record[i].app_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "app"));
- packet.record[i].comments = mpack_node_str(mpack_node_map_cstr(measurement, "comments"));
- packet.record[i].comments_len = mpack_node_strlen(mpack_node_map_cstr(measurement, "comments"));
- }
-
- // print
- zlog_debug(logger, "microseconds %ld", packet.microseconds);
- zlog_debug(logger, "job_id %s", packet.job_id_str);
- zlog_debug(logger, "sled_ip %s", packet.sled_ip);
- zlog_debug(logger, "device_group %s", packet.device_group);
- zlog_debug(logger, "traffic_link_id %d", packet.traffic_link_id);
-
- if (strlen(packet.source_ip) == 0)
- {
- zlog_debug(logger, "source_ip is empty");
- }
- else
- {
- zlog_debug(logger, "source_ip %s", packet.source_ip);
- }
-
- zlog_debug(logger, "source_port %d", packet.source_port);
-
- if (strlen(packet.server_ip) == 0)
- {
- zlog_debug(logger, "server_ip is empty");
- }
- else
- {
- zlog_debug(logger, "server_ip %s", packet.server_ip);
- }
-
- zlog_debug(logger, "server_port %d", packet.server_port);
-
- zlog_debug(logger, "packet_length %d", packet.packet_length);
-
- if (packet.measurements_num == 0)
- {
- zlog_debug(logger, "measurements num is zero");
- }
-
- for (int i = 0; i < packet.measurements_num; i++)
- {
- zlog_debug(logger, "record %u:", i);
- zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec);
- zlog_debug(logger, "tv_nsec %d", packet.record[i].tv_nsec);
- zlog_debug(logger, "app %.*s", packet.record[i].app_len, packet.record[i].app);
- zlog_debug(logger, "comments %.*s", packet.record[i].comments_len, packet.record[i].comments);
- }
-
- return 0;
-} \ No newline at end of file
diff --git a/src/trace_output.c b/src/trace_output.c
index 5fa66c4..b340ec7 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -4,7 +4,6 @@
#include "job_ctx.h"
#include "kafka.h"
#include "logger.h"
-#include "mocking.h"
#include "monit.h"
#include "pcapng.h"
@@ -576,6 +575,9 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
mpack_write_i32(&writer, trace_buff_info.inner_dst_port);
}
+ mpack_write_cstr(&writer, "egress_action");
+ mpack_write_i32(&writer, trace_buff_info.egress_action);
+
unsigned int snaplen = trace_buff_info.snaplen;
snaplen = (snaplen < marsio_buff_datalen(mr_mbuf)) ? snaplen : marsio_buff_datalen(mr_mbuf);