summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-26 04:15:25 +0000
committer童宗振 <[email protected]>2024-04-26 04:15:25 +0000
commitd079244d80cd89c85bbfd5dbfd8e390d1171e0f9 (patch)
treecff4882d396f638c17e2006e76c24f5d94de68db
parent491a486f86ec4132f44d3937367b3ab255de8a90 (diff)
parent4bf56d370298bbe5321d67ede799b9ee3495c842 (diff)
Merge branch 'code_modification' into 'master'v0.1.4-20240426
Code modification See merge request tsg/dp_telemetry_app!23
-rw-r--r--CMakeLists.txt2
-rw-r--r--etc/dp_trace.conf1
-rw-r--r--include/config.h1
-rw-r--r--include/mocking.h4
-rw-r--r--src/config.c3
-rw-r--r--src/job_ctx.c1
-rw-r--r--src/main.c10
-rw-r--r--src/mocking.c13
-rw-r--r--src/trace_output.c83
9 files changed, 88 insertions, 30 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 20f01d9..6d02149 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0)
project(dp_trace_telemetry)
-option(ENABLE_DEVELOP_MOCKING "enable develop mocking" TRUE)
+option(ENABLE_DEVELOP_MOCKING "enable develop mocking" OFF)
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
include(Version)
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf
index 0435678..b44ea96 100644
--- a/etc/dp_trace.conf
+++ b/etc/dp_trace.conf
@@ -11,6 +11,7 @@ listen_port=10000
keep_alive_path=/probe
[kafka]
+kafka_dump_to_log=0
borker_list="192.168.44.12:9094"
topic_name="DATAPATH-TELEMETRY-RECORD"
sasl_username=admin
diff --git a/include/config.h b/include/config.h
index d7c69bd..794f97f 100644
--- a/include/config.h
+++ b/include/config.h
@@ -29,6 +29,7 @@ struct config
char keep_alive_path[MR_SYMBOL_MAX];
// kafka
+ unsigned int kafka_dump_to_log;
char topic_name[MR_SYMBOL_MAX];
char broker_list[1024];
char sasl_username[MR_SYMBOL_MAX];
diff --git a/include/mocking.h b/include/mocking.h
index d509081..2121463 100644
--- a/include/mocking.h
+++ b/include/mocking.h
@@ -1,6 +1,4 @@
#pragma once
#include <librdkafka/rdkafka.h>
-rd_kafka_t * __wrap_kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd);
-rd_kafka_topic_t * __wrap_kafka_topic_new(rd_kafka_t * rk, const char * topic, rd_kafka_topic_conf_t * conf);
-int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len); \ No newline at end of file
+int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len); \ No newline at end of file
diff --git a/src/config.c b/src/config.c
index 1593544..88600fc 100644
--- a/src/config.c
+++ b/src/config.c
@@ -93,6 +93,7 @@ void config_load()
printf("SLED_IP environment variable does not exist.\n");
}
+ MESA_load_profile_int_def(config_path, "kafka", "kafka_dump_to_log", &(g_conf->kafka_dump_to_log), 0);
MESA_load_profile_string_def(config_path, "kafka", "borker_list", g_conf->broker_list, sizeof(g_conf->broker_list),
"");
MESA_load_profile_string_def(config_path, "kafka", "topic_name", g_conf->topic_name, sizeof(g_conf->topic_name),
@@ -180,6 +181,8 @@ void dynamic_config_load()
desc_i->pkt_cnt_max = pkt_cnt_max;
desc_i->sampling = (sampling == 0) ? 1 : sampling;
desc_i->snaplen = (snaplen == 0) ? UINT32_MAX : snaplen;
+
+ desc_i->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE;
}
dzlog_info("Loading data path trace configuration file is completed.");
diff --git a/src/job_ctx.c b/src/job_ctx.c
index a0dba57..75175de 100644
--- a/src/job_ctx.c
+++ b/src/job_ctx.c
@@ -114,6 +114,7 @@ void telemetry_job_add_cb(const char * table_name, int table_id, const char * ke
job_desc->pkt_cnt_max = pkt_cnt_max;
job_desc->sampling = sampling;
job_desc->snaplen = snaplen;
+ job_desc->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY;
int index = telemetry_unused_job_index_get();
if (index < 0)
diff --git a/src/main.c b/src/main.c
index 328994a..8374821 100644
--- a/src/main.c
+++ b/src/main.c
@@ -6,6 +6,7 @@
#include "trace_output.h"
#include <getopt.h>
+#include <math.h>
#include <pthread.h>
#include <sched.h>
#include <signal.h>
@@ -151,12 +152,9 @@ int main(int argc, char * argv[])
signal_event_init();
unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io);
- if (nr_thread < DP_TRACE_RING_NUM)
- {
- dzlog_error("The number of cores must be greater than %u", DP_TRACE_RING_NUM);
- return 0;
- }
- dzlog_info("Thread Count = %d", nr_thread);
+ unsigned int ring_num = DP_TRACE_RING_NUM;
+ dzlog_info("thread count = %u", nr_thread);
+ dzlog_info("ring num = %u", ring_num);
monit_init(nr_thread);
diff --git a/src/mocking.c b/src/mocking.c
index c3b232c..d405489 100644
--- a/src/mocking.c
+++ b/src/mocking.c
@@ -2,17 +2,7 @@
#include "common.h"
#include <mpack.h>
-rd_kafka_t * __wrap_kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd)
-{
- return NULL;
-}
-
-rd_kafka_topic_t * __wrap_kafka_topic_new(rd_kafka_t * rk, const char * topic, rd_kafka_topic_conf_t * conf)
-{
- return NULL;
-}
-
-int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
+int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len)
{
struct measurements
{
@@ -104,6 +94,5 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
dzlog_debug("comments %.*s", packet.record[i].comments_len, packet.record[i].comments);
}
- free(payload);
return 0;
} \ No newline at end of file
diff --git a/src/trace_output.c b/src/trace_output.c
index c5a2e5b..38ceec8 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -45,6 +45,8 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs
static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id);
static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size);
int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip_port * info);
+static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id,
+ unsigned int qids[], unsigned int * nr_qids);
struct dp_trace_output
{
@@ -101,7 +103,16 @@ void * dp_trace_process_thread(void * arg)
const struct config * conf = global_config_get();
saving_stat = record_saving_stat_point_get(thread_id);
- uintptr_t qid = thread_id % DP_TRACE_RING_NUM;
+ unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io);
+ unsigned int nr_ring = DP_TRACE_RING_NUM;
+
+ unsigned int qids[nr_ring];
+ unsigned int nr_qids = 0;
+ thread_id_to_ring_id_calculate(nr_thread, nr_ring, thread_id, qids, &nr_qids);
+ for (unsigned int i = 0; i < nr_qids; i++)
+ {
+ dzlog_info("thread %u receive ring %u packet", thread_id, qids[i]);
+ }
marsio_buff_t * rx_buff[BURST_MAX];
marsio_buff_t * tx_buff[BURST_MAX];
@@ -110,15 +121,25 @@ void * dp_trace_process_thread(void * arg)
marsio_thread_init(mr_instance);
- while (1)
+ unsigned int no_pkt_recv_cnt = 0;
+ for (unsigned int qid_index = 0;; qid_index++)
{
- unsigned int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, TELEMETRY_DIM(rx_buff));
+ qid_index = qid_index % nr_qids;
+
+ unsigned int nr_recv =
+ marsio_dp_trace_mbuf_recv_burst(mr_instance, qids[qid_index], rx_buff, TELEMETRY_DIM(rx_buff));
if (nr_recv == 0)
{
- // todo: use epoll+eventfd
- sleep(1);
+ no_pkt_recv_cnt++;
+ if (no_pkt_recv_cnt == nr_qids)
+ {
+ no_pkt_recv_cnt = 0;
+ sleep(1);
+ }
continue;
}
+
+ no_pkt_recv_cnt = 0;
saving_stat->recv_success += nr_recv;
dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs);
@@ -154,6 +175,12 @@ void * dp_trace_process_thread(void * arg)
saving_stat->save_to_kafka_failed_at_decode_messagepack++;
continue;
}
+
+ if (conf->kafka_dump_to_log)
+ {
+ kafka_dump_to_log(kafka_topic, (void *)data, size);
+ }
+
int ret = kafka_produce(kafka_topic, (void *)data, size);
if (ret != 0)
{
@@ -172,7 +199,7 @@ void * dp_trace_process_thread(void * arg)
// discarded.");
}
- marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs);
+ marsio_dp_trace_mbuf_free(mr_instance, class_mbufs[i], nr_mbufs);
}
}
}
@@ -521,7 +548,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
const char * comment = cur + sizeof(struct dp_trace_record_header);
const unsigned int comment_len = record_header->recode_len;
- if ((record_header->type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0)
+ if ((record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0)
{
mpack_start_map(&writer, 4);
@@ -572,7 +599,7 @@ static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, u
const char * str = cur + sizeof(struct dp_trace_record_header);
const unsigned int str_len = record_header->recode_len;
- if (record_header->type == DP_TRACE_MEASUREMENT_TYPE_TRACE)
+ if (record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TRACE)
{
int n = snprintf(data, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module,
record_header->ts.tv_sec, record_header->ts.tv_nsec);
@@ -680,4 +707,44 @@ int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip
}
return 0;
+}
+
+static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id,
+ unsigned int qids[], unsigned int * nr_qids)
+{
+ // nr_thread threads; nr_ring rings; The current thread id is thread_id
+ // Calculate the ring subscript that the current thread needs to process
+ *nr_qids = 0;
+
+ if (nr_thread >= nr_ring)
+ {
+ qids[0] = thread_id % nr_ring;
+ *nr_qids = 1;
+ return;
+ }
+
+ unsigned int numbers[nr_thread];
+ int avg = nr_ring / nr_thread;
+ int remainder = nr_ring % nr_thread;
+ for (unsigned i = 0; i < nr_thread; i++)
+ {
+ numbers[i] = avg;
+ if (remainder > 0)
+ {
+ numbers[i]++;
+ remainder--;
+ }
+ }
+
+ int prefix_index = -1;
+ for (int i = 0; i < thread_id; i++)
+ {
+ prefix_index += numbers[i];
+ }
+
+ for (int i = 0; i < numbers[thread_id]; i++)
+ {
+ qids[i] = (++prefix_index);
+ (*nr_qids)++;
+ }
} \ No newline at end of file