diff options
Diffstat (limited to 'src/trace_output.c')
| -rw-r--r-- | src/trace_output.c | 83 |
1 files changed, 75 insertions, 8 deletions
diff --git a/src/trace_output.c b/src/trace_output.c index c5a2e5b..38ceec8 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -45,6 +45,8 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id); static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size); int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip_port * info); +static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id, + unsigned int qids[], unsigned int * nr_qids); struct dp_trace_output { @@ -101,7 +103,16 @@ void * dp_trace_process_thread(void * arg) const struct config * conf = global_config_get(); saving_stat = record_saving_stat_point_get(thread_id); - uintptr_t qid = thread_id % DP_TRACE_RING_NUM; + unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io); + unsigned int nr_ring = DP_TRACE_RING_NUM; + + unsigned int qids[nr_ring]; + unsigned int nr_qids = 0; + thread_id_to_ring_id_calculate(nr_thread, nr_ring, thread_id, qids, &nr_qids); + for (unsigned int i = 0; i < nr_qids; i++) + { + dzlog_info("thread %u receive ring %u packet", thread_id, qids[i]); + } marsio_buff_t * rx_buff[BURST_MAX]; marsio_buff_t * tx_buff[BURST_MAX]; @@ -110,15 +121,25 @@ void * dp_trace_process_thread(void * arg) marsio_thread_init(mr_instance); - while (1) + unsigned int no_pkt_recv_cnt = 0; + for (unsigned int qid_index = 0;; qid_index++) { - unsigned int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, TELEMETRY_DIM(rx_buff)); + qid_index = qid_index % nr_qids; + + unsigned int nr_recv = + marsio_dp_trace_mbuf_recv_burst(mr_instance, qids[qid_index], rx_buff, TELEMETRY_DIM(rx_buff)); if (nr_recv == 0) { - // todo: use epoll+eventfd - sleep(1); + no_pkt_recv_cnt++; + if (no_pkt_recv_cnt == nr_qids) + { + no_pkt_recv_cnt = 0; + sleep(1); + } continue; } + + no_pkt_recv_cnt = 0; saving_stat->recv_success += nr_recv; dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs); @@ -154,6 +175,12 @@ void * dp_trace_process_thread(void * arg) saving_stat->save_to_kafka_failed_at_decode_messagepack++; continue; } + + if (conf->kafka_dump_to_log) + { + kafka_dump_to_log(kafka_topic, (void *)data, size); + } + int ret = kafka_produce(kafka_topic, (void *)data, size); if (ret != 0) { @@ -172,7 +199,7 @@ void * dp_trace_process_thread(void * arg) // discarded."); } - marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs); + marsio_dp_trace_mbuf_free(mr_instance, class_mbufs[i], nr_mbufs); } } } @@ -521,7 +548,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat const char * comment = cur + sizeof(struct dp_trace_record_header); const unsigned int comment_len = record_header->recode_len; - if ((record_header->type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0) + if ((record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0) { mpack_start_map(&writer, 4); @@ -572,7 +599,7 @@ static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, u const char * str = cur + sizeof(struct dp_trace_record_header); const unsigned int str_len = record_header->recode_len; - if (record_header->type == DP_TRACE_MEASUREMENT_TYPE_TRACE) + if (record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TRACE) { int n = snprintf(data, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module, record_header->ts.tv_sec, record_header->ts.tv_nsec); @@ -680,4 +707,44 @@ int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip } return 0; +} + +static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id, + unsigned int qids[], unsigned int * nr_qids) +{ + // nr_thread threads; nr_ring rings; The current thread id is thread_id + // Calculate the ring subscript that the current thread needs to process + *nr_qids = 0; + + if (nr_thread >= nr_ring) + { + qids[0] = thread_id % nr_ring; + *nr_qids = 1; + return; + } + + unsigned int numbers[nr_thread]; + int avg = nr_ring / nr_thread; + int remainder = nr_ring % nr_thread; + for (unsigned i = 0; i < nr_thread; i++) + { + numbers[i] = avg; + if (remainder > 0) + { + numbers[i]++; + remainder--; + } + } + + int prefix_index = -1; + for (int i = 0; i < thread_id; i++) + { + prefix_index += numbers[i]; + } + + for (int i = 0; i < numbers[thread_id]; i++) + { + qids[i] = (++prefix_index); + (*nr_qids)++; + } }
\ No newline at end of file |
