summaryrefslogtreecommitdiff
path: root/src/trace_output.c
diff options
context:
space:
mode:
authortongzongzhen <[email protected]>2024-04-16 16:36:46 +0800
committertongzongzhen <[email protected]>2024-04-16 16:36:46 +0800
commit7734429f5a06efa8eb5ed3067aacc75ce7889e44 (patch)
treed0106802ed44baa6d4de9f647b2cc83a0d7f1207 /src/trace_output.c
parent43acf274892771a84488a6a189fd0d3e3db3619d (diff)
fix kafka message job_id error
Diffstat (limited to 'src/trace_output.c')
-rw-r--r--src/trace_output.c17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/trace_output.c b/src/trace_output.c
index b984390..cf7c2e6 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -26,7 +26,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id);
int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs,
marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][nr_mbufs],
int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]);
-static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size);
+static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id);
static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size);
struct dp_trace_output
@@ -127,7 +127,8 @@ void * dp_trace_process_thread(void * arg)
{
char * data;
size_t size;
- dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size);
+ job_bitmap_t job_id = index_to_job_id(i);
+ dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id);
kafka_produce(kafka_topic, (void *)data, size);
}
}
@@ -403,7 +404,7 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id)
return pthread_mutex_unlock(&dp_trace_output[index].file_mutex);
}
-static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size)
+static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id)
{
struct dp_trace_buffer_telemetry trace_buff_info;
marsio_dp_trace_buffer_info_get(mr_mbuf, &trace_buff_info);
@@ -414,8 +415,6 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
gettimeofday(&tv, NULL);
uint64_t microseconds = (uint64_t)(tv.tv_sec) * 1000000 + tv.tv_usec;
- job_bitmap_t job_id = trace_buff_info.jobs_id;
-
struct pkt_inner_ip_port pkt_info;
marsio_pkt_inner_ip_port_get(mr_mbuf, &pkt_info);
@@ -426,10 +425,12 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
mpack_write_cstr(&writer, "timestamp_us");
mpack_write_i64(&writer, microseconds);
- char job_id_str[10];
- snprintf(job_id_str, sizeof(job_id_str), "%d", job_id);
+ uuid_t uuid;
+ telemetry_job_uuid_get(job_id, uuid);
+ char uuid_str[37];
+ uuid_unparse(uuid, uuid_str);
mpack_write_cstr(&writer, "job_id");
- mpack_write_cstr(&writer, job_id_str);
+ mpack_write_cstr(&writer, uuid_str);
mpack_write_cstr(&writer, "sled_ip");
if (global_config_get()->sled_ip != NULL)