summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-05-17 04:04:15 +0000
committer童宗振 <[email protected]>2024-05-17 04:04:15 +0000
commit9045da4907f7044ee3a44a0edfcd1a404c6bdc3d (patch)
treee7598c9867018ec3c2d1857537bba55671721b59
parente8f6731f9e8dd0ac168d711829e841f5ad0016fe (diff)
parent38e274f1d0a72a77daeb48de1844ce01ec306a6b (diff)
Merge branch 'not_send_ctrlbuf' into 'master'
Not send ctrlbuf See merge request tsg/dp_telemetry_app!35
-rw-r--r--etc/dp_trace.conf1
-rw-r--r--include/config.h1
-rw-r--r--include/monit.h1
-rw-r--r--src/config.c2
-rw-r--r--src/kafka.c3
-rw-r--r--src/monit.c2
-rw-r--r--src/trace_output.c28
7 files changed, 31 insertions, 7 deletions
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf
index cb79fdd..6dfd0dc 100644
--- a/etc/dp_trace.conf
+++ b/etc/dp_trace.conf
@@ -4,6 +4,7 @@ zlog_config_path=../etc/dp_trace_zlog.conf
dp_trace_dir=./
device_group="device-xxx"
monit_file_path=/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving
+send_ctrlbuf=0
[http_server]
listen_addr=127.0.0.1
diff --git a/include/config.h b/include/config.h
index 794f97f..9f58f75 100644
--- a/include/config.h
+++ b/include/config.h
@@ -18,6 +18,7 @@ struct config
char monit_file_path[PATH_MAX];
cpu_set_t cpu_set_io;
+ unsigned int send_ctrlbuf;
// device Information
char * sled_ip;
diff --git a/include/monit.h b/include/monit.h
index d833832..c2919d6 100644
--- a/include/monit.h
+++ b/include/monit.h
@@ -5,6 +5,7 @@ struct record_saving_stat
{
uint64_t recv_success;
uint64_t init_old_packet_drop;
+ uint64_t ctrlbuf_drop;
uint64_t save_failed_at_job_deleted;
uint64_t save_failed_at_mutex_lock;
diff --git a/src/config.c b/src/config.c
index 376d266..0d7b01b 100644
--- a/src/config.c
+++ b/src/config.c
@@ -76,6 +76,8 @@ void config_load()
sizeof(g_conf->monit_file_path),
"/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving");
+ MESA_load_profile_uint_def(config_path, "global", "send_ctrlbuf", &g_conf->send_ctrlbuf, 0);
+
MESA_load_profile_string_def(config_path, "global", "device_group", g_conf->device_group,
sizeof(g_conf->device_group), "");
diff --git a/src/kafka.c b/src/kafka.c
index 7854cca..d99825f 100644
--- a/src/kafka.c
+++ b/src/kafka.c
@@ -107,10 +107,13 @@ int kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
// Automatically release payload
// Even if the function fails to execute and returns -1, the payload will be released.
int ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, (void *)payload, len, NULL, 0, NULL);
+#if 0
+// Avoid having too many same logs
if (ret != 0)
{
rd_kafka_resp_err_t err = rd_kafka_last_error();
dzlog_error("rd_kafka_topic_new failed:%s", rd_kafka_err2str(err));
}
+#endif
return ret;
} \ No newline at end of file
diff --git a/src/monit.c b/src/monit.c
index 5441c5b..8434564 100644
--- a/src/monit.c
+++ b/src/monit.c
@@ -34,6 +34,7 @@ static void monit_dump()
{
total_stat.recv_success += monit->savint_stats[i].recv_success;
total_stat.init_old_packet_drop += monit->savint_stats[i].init_old_packet_drop;
+ total_stat.ctrlbuf_drop += monit->savint_stats[i].ctrlbuf_drop;
total_stat.save_failed_at_job_deleted += monit->savint_stats[i].save_failed_at_job_deleted;
total_stat.save_failed_at_mutex_lock += monit->savint_stats[i].save_failed_at_mutex_lock;
@@ -53,6 +54,7 @@ static void monit_dump()
struct cJSON * json_root = cJSON_CreateObject();
cJSON_AddNumberToObject(json_root, "recv_success", total_stat.recv_success);
cJSON_AddNumberToObject(json_root, "init_old_packet_drop", total_stat.init_old_packet_drop);
+ cJSON_AddNumberToObject(json_root, "ctrlbuf_drop", total_stat.ctrlbuf_drop);
cJSON_AddNumberToObject(json_root, "save_failed_at_job_deleted", total_stat.save_failed_at_job_deleted);
cJSON_AddNumberToObject(json_root, "save_failed_at_mutex_lock", total_stat.save_failed_at_mutex_lock);
diff --git a/src/trace_output.c b/src/trace_output.c
index 51506ac..fff3072 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -211,9 +211,20 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs
int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX])
{
memset((void *)nr_jobs_mbufs, 0, DP_TRACE_JOB_NUM_MAX * sizeof(int));
+ const struct config * conf = global_config_get();
for (unsigned int i = 0; i < nr_mbufs; i++)
{
+ if (marsio_buff_is_ctrlbuf(mbufs[i]) == 1)
+ {
+ if (conf->send_ctrlbuf == 0)
+ {
+ marsio_dp_trace_mbuf_free(instance, &mbufs[i], 1);
+ saving_stat->ctrlbuf_drop++;
+ continue;
+ }
+ }
+
struct dp_trace_buffer_telemetry info;
marsio_dp_trace_buffer_info_get((struct rte_mbuf *)mbufs[i], &info);
@@ -252,7 +263,7 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
if (dp_trace_file_mutex_lock(job_id) < 0)
{
- saving_stat->save_failed_at_mutex_lock;
+ saving_stat->save_failed_at_mutex_lock += nr_mbufs;
goto end;
}
@@ -702,12 +713,15 @@ void dp_trace_ring_clear()
for (unsigned int i = 0; i < nr_ring; i++)
{
- unsigned int nr_recv = marsio_dp_trace_mbuf_recv_burst(mr_instance, i, rx_buff, TELEMETRY_DIM(rx_buff));
- saving_stat->recv_success += nr_recv;
- saving_stat->init_old_packet_drop += nr_recv;
-
- marsio_dp_trace_mbuf_free(mr_instance, rx_buff, nr_recv);
- ring_clear_cnt += nr_recv;
+ unsigned int nr_recv = 0;
+ do
+ {
+ nr_recv = marsio_dp_trace_mbuf_recv_burst(mr_instance, i, rx_buff, TELEMETRY_DIM(rx_buff));
+ saving_stat->recv_success += nr_recv;
+ saving_stat->init_old_packet_drop += nr_recv;
+ marsio_dp_trace_mbuf_free(mr_instance, rx_buff, nr_recv);
+ ring_clear_cnt += nr_recv;
+ } while (nr_recv != 0);
}
dzlog_info("The program starts and clears %u mbufs", ring_clear_cnt);