summaryrefslogtreecommitdiff
path: root/src/trace_output.c
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-26 04:15:25 +0000
committer童宗振 <[email protected]>2024-04-26 04:15:25 +0000
commit4bf56d370298bbe5321d67ede799b9ee3495c842 (patch)
treecff4882d396f638c17e2006e76c24f5d94de68db /src/trace_output.c
parent491a486f86ec4132f44d3937367b3ab255de8a90 (diff)
Code modification
Diffstat (limited to 'src/trace_output.c')
-rw-r--r--src/trace_output.c83
1 files changed, 75 insertions, 8 deletions
diff --git a/src/trace_output.c b/src/trace_output.c
index c5a2e5b..38ceec8 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -45,6 +45,8 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs
static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id);
static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size);
int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip_port * info);
+static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id,
+ unsigned int qids[], unsigned int * nr_qids);
struct dp_trace_output
{
@@ -101,7 +103,16 @@ void * dp_trace_process_thread(void * arg)
const struct config * conf = global_config_get();
saving_stat = record_saving_stat_point_get(thread_id);
- uintptr_t qid = thread_id % DP_TRACE_RING_NUM;
+ unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io);
+ unsigned int nr_ring = DP_TRACE_RING_NUM;
+
+ unsigned int qids[nr_ring];
+ unsigned int nr_qids = 0;
+ thread_id_to_ring_id_calculate(nr_thread, nr_ring, thread_id, qids, &nr_qids);
+ for (unsigned int i = 0; i < nr_qids; i++)
+ {
+ dzlog_info("thread %u receive ring %u packet", thread_id, qids[i]);
+ }
marsio_buff_t * rx_buff[BURST_MAX];
marsio_buff_t * tx_buff[BURST_MAX];
@@ -110,15 +121,25 @@ void * dp_trace_process_thread(void * arg)
marsio_thread_init(mr_instance);
- while (1)
+ unsigned int no_pkt_recv_cnt = 0;
+ for (unsigned int qid_index = 0;; qid_index++)
{
- unsigned int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, TELEMETRY_DIM(rx_buff));
+ qid_index = qid_index % nr_qids;
+
+ unsigned int nr_recv =
+ marsio_dp_trace_mbuf_recv_burst(mr_instance, qids[qid_index], rx_buff, TELEMETRY_DIM(rx_buff));
if (nr_recv == 0)
{
- // todo: use epoll+eventfd
- sleep(1);
+ no_pkt_recv_cnt++;
+ if (no_pkt_recv_cnt == nr_qids)
+ {
+ no_pkt_recv_cnt = 0;
+ sleep(1);
+ }
continue;
}
+
+ no_pkt_recv_cnt = 0;
saving_stat->recv_success += nr_recv;
dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs);
@@ -154,6 +175,12 @@ void * dp_trace_process_thread(void * arg)
saving_stat->save_to_kafka_failed_at_decode_messagepack++;
continue;
}
+
+ if (conf->kafka_dump_to_log)
+ {
+ kafka_dump_to_log(kafka_topic, (void *)data, size);
+ }
+
int ret = kafka_produce(kafka_topic, (void *)data, size);
if (ret != 0)
{
@@ -172,7 +199,7 @@ void * dp_trace_process_thread(void * arg)
// discarded.");
}
- marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs);
+ marsio_dp_trace_mbuf_free(mr_instance, class_mbufs[i], nr_mbufs);
}
}
}
@@ -521,7 +548,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
const char * comment = cur + sizeof(struct dp_trace_record_header);
const unsigned int comment_len = record_header->recode_len;
- if ((record_header->type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0)
+ if ((record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0)
{
mpack_start_map(&writer, 4);
@@ -572,7 +599,7 @@ static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, u
const char * str = cur + sizeof(struct dp_trace_record_header);
const unsigned int str_len = record_header->recode_len;
- if (record_header->type == DP_TRACE_MEASUREMENT_TYPE_TRACE)
+ if (record_header->measurement_type == DP_TRACE_MEASUREMENT_TYPE_TRACE)
{
int n = snprintf(data, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module,
record_header->ts.tv_sec, record_header->ts.tv_nsec);
@@ -680,4 +707,44 @@ int marsio_pkt_inner_ip_port_get(const marsio_buff_t * mbuf, struct pkt_inner_ip
}
return 0;
+}
+
+static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int nr_ring, unsigned int thread_id,
+ unsigned int qids[], unsigned int * nr_qids)
+{
+ // nr_thread threads; nr_ring rings; The current thread id is thread_id
+ // Calculate the ring subscript that the current thread needs to process
+ *nr_qids = 0;
+
+ if (nr_thread >= nr_ring)
+ {
+ qids[0] = thread_id % nr_ring;
+ *nr_qids = 1;
+ return;
+ }
+
+ unsigned int numbers[nr_thread];
+ int avg = nr_ring / nr_thread;
+ int remainder = nr_ring % nr_thread;
+ for (unsigned i = 0; i < nr_thread; i++)
+ {
+ numbers[i] = avg;
+ if (remainder > 0)
+ {
+ numbers[i]++;
+ remainder--;
+ }
+ }
+
+ int prefix_index = -1;
+ for (int i = 0; i < thread_id; i++)
+ {
+ prefix_index += numbers[i];
+ }
+
+ for (int i = 0; i < numbers[thread_id]; i++)
+ {
+ qids[i] = (++prefix_index);
+ (*nr_qids)++;
+ }
} \ No newline at end of file