diff options
| author | 童宗振 <[email protected]> | 2024-04-26 04:15:25 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-26 04:15:25 +0000 |
| commit | 4bf56d370298bbe5321d67ede799b9ee3495c842 (patch) | |
| tree | cff4882d396f638c17e2006e76c24f5d94de68db /src | |
| parent | 491a486f86ec4132f44d3937367b3ab255de8a90 (diff) | |
Code modification
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.c | 3 | ||||
| -rw-r--r-- | src/job_ctx.c | 1 | ||||
| -rw-r--r-- | src/main.c | 10 | ||||
| -rw-r--r-- | src/mocking.c | 13 | ||||
| -rw-r--r-- | src/trace_output.c | 83 |
5 files changed, 84 insertions, 26 deletions
diff --git a/src/config.c b/src/config.c index 1593544..88600fc 100644 --- a/src/config.c +++ b/src/config.c @@ -93,6 +93,7 @@ void config_load() printf("SLED_IP environment variable does not exist.\n"); } + MESA_load_profile_int_def(config_path, "kafka", "kafka_dump_to_log", &(g_conf->kafka_dump_to_log), 0); MESA_load_profile_string_def(config_path, "kafka", "borker_list", g_conf->broker_list, sizeof(g_conf->broker_list), ""); MESA_load_profile_string_def(config_path, "kafka", "topic_name", g_conf->topic_name, sizeof(g_conf->topic_name), @@ -180,6 +181,8 @@ void dynamic_config_load() desc_i->pkt_cnt_max = pkt_cnt_max; desc_i->sampling = (sampling == 0) ? 1 : sampling; desc_i->snaplen = (snaplen == 0) ? UINT32_MAX : snaplen; + + desc_i->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE; } dzlog_info("Loading data path trace configuration file is completed."); diff --git a/src/job_ctx.c b/src/job_ctx.c index a0dba57..75175de 100644 --- a/src/job_ctx.c +++ b/src/job_ctx.c @@ -114,6 +114,7 @@ void telemetry_job_add_cb(const char * table_name, int table_id, const char * ke job_desc->pkt_cnt_max = pkt_cnt_max; job_desc->sampling = sampling; job_desc->snaplen = snaplen; + job_desc->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY; int index = telemetry_unused_job_index_get(); if (index < 0) @@ -6,6 +6,7 @@ #include "trace_output.h" #include <getopt.h> +#include <math.h> #include <pthread.h> #include <sched.h> #include <signal.h> @@ -151,12 +152,9 @@ int main(int argc, char * argv[]) signal_event_init(); unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io); - if (nr_thread < DP_TRACE_RING_NUM) - { - dzlog_error("The number of cores must be greater than %u", DP_TRACE_RING_NUM); - return 0; - } - dzlog_info("Thread Count = %d", nr_thread); + unsigned int ring_num = DP_TRACE_RING_NUM; + dzlog_info("thread count = %u", nr_thread); + dzlog_info("ring num = %u", ring_num); monit_init(nr_thread); diff --git a/src/mocking.c b/src/mocking.c index c3b232c..d405489 100644 --- a/src/mocking.c +++ b/src/mocking.c @@ -2,17 +2,7 @@ #include "common.h" #include <mpack.h> -rd_kafka_t * __wrap_kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd) -{ - return NULL; -} - -rd_kafka_topic_t * __wrap_kafka_topic_new(rd_kafka_t * rk, const char * topic, rd_kafka_topic_conf_t * conf) -{ - return NULL; -} - -int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len) +int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len) { struct measurements { @@ -104,6 +94,5 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len) dzlog_debug("comments %.*s", packet.record[i].comments_len, packet.record[i].comments); } - free(payload); return 0; }
\ No newline at end of file diff --git a/src/trace_output.c b/src/trace_output.c index c5a2e5b..38ceec8 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -45,6 +45,8 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id); static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size); int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip_port * info); +static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id, + unsigned int qids[], unsigned int * nr_qids); struct dp_trace_output { @@ -101,7 +103,16 @@ void * dp_trace_process_thread(void * arg) const struct config * conf = global_config_get(); saving_stat = record_saving_stat_point_get(thread_id); - uintptr_t qid = thread_id % DP_TRACE_RING_NUM; + unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io); + unsigned int nr_ring = DP_TRACE_RING_NUM; + + unsigned int qids[nr_ring]; + unsigned int nr_qids = 0; + thread_id_to_ring_id_calculate(nr_thread, nr_ring, thread_id, qids, &nr_qids); + for (unsigned int i = 0; i < nr_qids; i++) + { + dzlog_info("thread %u receive ring %u packet", thread_id, qids[i]); + } marsio_buff_t * rx_buff[BURST_MAX]; marsio_buff_t * tx_buff[BURST_MAX]; @@ -110,15 +121,25 @@ void * dp_trace_process_thread(void * arg) marsio_thread_init(mr_instance); - while (1) + unsigned int no_pkt_recv_cnt = 0; + for (unsigned int qid_index = 0;; qid_index++) { - unsigned int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, TELEMETRY_DIM(rx_buff)); + qid_index = qid_index % nr_qids; + + unsigned int nr_recv = + marsio_dp_trace_mbuf_recv_burst(mr_instance, qids[qid_index], rx_buff, TELEMETRY_DIM(rx_buff)); if (nr_recv == 0) { - // todo: use epoll+eventfd - sleep(1); + no_pkt_recv_cnt++; + if (no_pkt_recv_cnt == nr_qids) + { + no_pkt_recv_cnt = 0; + sleep(1); + } continue; } + + no_pkt_recv_cnt = 0; saving_stat->recv_success += nr_recv; dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs); @@ -154,6 +175,12 @@ void * dp_trace_process_thread(void * arg) saving_stat->save_to_kafka_failed_at_decode_messagepack++; continue; } + + if (conf->kafka_dump_to_log) + { + kafka_dump_to_log(kafka_topic, (void *)data, size); + } + int ret = kafka_produce(kafka_topic, (void *)data, size); if (ret != 0) { @@ -172,7 +199,7 @@ void * dp_trace_process_thread(void * arg) // discarded."); } - marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs); + marsio_dp_trace_mbuf_free(mr_instance, class_mbufs[i], nr_mbufs); } } } @@ -521,7 +548,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat const char * comment = cur + sizeof(struct dp_trace_record_header); const unsigned int comment_len = record_header->recode_len; - if ((record_header->type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0) + if ((record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0) { mpack_start_map(&writer, 4); @@ -572,7 +599,7 @@ static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, u const char * str = cur + sizeof(struct dp_trace_record_header); const unsigned int str_len = record_header->recode_len; - if (record_header->type == DP_TRACE_MEASUREMENT_TYPE_TRACE) + if (record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TRACE) { int n = snprintf(data, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module, record_header->ts.tv_sec, record_header->ts.tv_nsec); @@ -680,4 +707,44 @@ int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip } return 0; +} + +static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id, + unsigned int qids[], unsigned int * nr_qids) +{ + // nr_thread threads; nr_ring rings; The current thread id is thread_id + // Calculate the ring subscript that the current thread needs to process + *nr_qids = 0; + + if (nr_thread >= nr_ring) + { + qids[0] = thread_id % nr_ring; + *nr_qids = 1; + return; + } + + unsigned int numbers[nr_thread]; + int avg = nr_ring / nr_thread; + int remainder = nr_ring % nr_thread; + for (unsigned i = 0; i < nr_thread; i++) + { + numbers[i] = avg; + if (remainder > 0) + { + numbers[i]++; + remainder--; + } + } + + int prefix_index = -1; + for (int i = 0; i < thread_id; i++) + { + prefix_index += numbers[i]; + } + + for (int i = 0; i < numbers[thread_id]; i++) + { + qids[i] = (++prefix_index); + (*nr_qids)++; + } }
\ No newline at end of file |
