From 7734429f5a06efa8eb5ed3067aacc75ce7889e44 Mon Sep 17 00:00:00 2001 From: tongzongzhen Date: Tue, 16 Apr 2024 16:36:46 +0800 Subject: fix kafka message job_id error --- src/trace_output.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'src/trace_output.c') diff --git a/src/trace_output.c b/src/trace_output.c index b984390..cf7c2e6 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -26,7 +26,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id); int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs, marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][nr_mbufs], int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]); -static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size); +static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id); static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size); struct dp_trace_output @@ -127,7 +127,8 @@ void * dp_trace_process_thread(void * arg) { char * data; size_t size; - dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size); + job_bitmap_t job_id = index_to_job_id(i); + dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id); kafka_produce(kafka_topic, (void *)data, size); } } @@ -403,7 +404,7 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id) return pthread_mutex_unlock(&dp_trace_output[index].file_mutex); } -static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size) +static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id) { struct dp_trace_buffer_telemetry trace_buff_info; marsio_dp_trace_buffer_info_get(mr_mbuf, &trace_buff_info); @@ -414,8 +415,6 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat gettimeofday(&tv, NULL); uint64_t microseconds = (uint64_t)(tv.tv_sec) * 1000000 + tv.tv_usec; - job_bitmap_t job_id = trace_buff_info.jobs_id; - struct pkt_inner_ip_port pkt_info; marsio_pkt_inner_ip_port_get(mr_mbuf, &pkt_info); @@ -426,10 +425,12 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat mpack_write_cstr(&writer, "timestamp_us"); mpack_write_i64(&writer, microseconds); - char job_id_str[10]; - snprintf(job_id_str, sizeof(job_id_str), "%d", job_id); + uuid_t uuid; + telemetry_job_uuid_get(job_id, uuid); + char uuid_str[37]; + uuid_unparse(uuid, uuid_str); mpack_write_cstr(&writer, "job_id"); - mpack_write_cstr(&writer, job_id_str); + mpack_write_cstr(&writer, uuid_str); mpack_write_cstr(&writer, "sled_ip"); if (global_config_get()->sled_ip != NULL) -- cgit v1.2.3