diff options
| author | 童宗振 <[email protected]> | 2024-04-11 10:19:06 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-11 10:19:06 +0000 |
| commit | 6b01bae42db789e5373bb9fab41365619f03517c (patch) | |
| tree | 396e5dd7f96df14d8b3faf20dea659420ae3957a /src/trace_output.c | |
| parent | de1bd5cdd8b6f7ecf3e3c512bed48b9154141e70 (diff) | |
Adjust code
Diffstat (limited to 'src/trace_output.c')
| -rw-r--r-- | src/trace_output.c | 94 |
1 files changed, 47 insertions, 47 deletions
diff --git a/src/trace_output.c b/src/trace_output.c index e3adfdc..79e9d14 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -15,6 +15,8 @@ #define BURST_MAX 64 extern struct mr_instance * mr_instance; +static rd_kafka_t * kafka_handle = NULL; +static rd_kafka_topic_t * kafka_topic = NULL; int dp_trace_file_mutex_lock(job_bitmap_t job_id); int dp_trace_file_mutex_unlock(job_bitmap_t job_id); @@ -22,7 +24,8 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id); void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id); void dp_trace_file_rollbak(job_bitmap_t job_id); int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs, - marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]); + marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][nr_mbufs], + int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]); static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size); static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size); @@ -35,12 +38,14 @@ struct dp_trace_output pthread_mutex_t file_mutex; }; -struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {}; +static struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {}; void dp_trace_output_init() { + const struct config * conf = global_config_get(); + // pcapng file path init - char * dp_trace_dir = g_conf->dp_trace_dir; + const char * dp_trace_dir = conf->dp_trace_dir; int ret = mkdir(dp_trace_dir, 0755); if (ret != 0 && errno != EEXIST) { @@ -69,25 +74,26 @@ void dp_trace_output_init() } } - g_conf->kafka_handle = kafka_handle_create(g_conf->broker_list, g_conf->sasl_password, g_conf->sasl_username); - g_conf->kafka_topic = kafka_topic_new(g_conf->kafka_handle, g_conf->topic_name, NULL); + kafka_handle = kafka_handle_create(conf->broker_list, conf->sasl_password, conf->sasl_username); + kafka_topic = kafka_topic_new(kafka_handle, conf->topic_name, NULL); } void * dp_trace_process_thread(void * arg) { + const struct config * conf = global_config_get(); + uintptr_t qid = (uintptr_t)arg; marsio_buff_t * rx_buff[BURST_MAX]; - unsigned int nr_burst = TELEMETRY_DIM(rx_buff); - unsigned int nr_class_mbufs = nr_burst * DP_TRACE_JOB_NUM_MAX; - marsio_buff_t * class_mbufs[nr_class_mbufs]; + marsio_buff_t * tx_buff[BURST_MAX]; + marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][BURST_MAX]; int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]; marsio_thread_init(mr_instance); while (1) { - int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, nr_burst); + unsigned int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, TELEMETRY_DIM(rx_buff)); if (nr_recv == 0) { // todo: use epoll+eventfd @@ -95,9 +101,9 @@ void * dp_trace_process_thread(void * arg) continue; } - dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_class_mbufs, nr_jobs_mbufs); + dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs); - for (unsigned int i = 0; i < 8; i++) + for (unsigned int i = 0; i < DP_TRACE_JOB_NUM_MAX; i++) { unsigned int nr_mbufs = nr_jobs_mbufs[i]; if (nr_mbufs == 0) @@ -105,46 +111,40 @@ void * dp_trace_process_thread(void * arg) continue; } - for (unsigned int j = 0; j < nr_mbufs; j++) + uint8_t role = job_id_role_get(index_to_job_id(i)); + if (role == DP_TRACE_ROLE) { - rx_buff[j] = class_mbufs[i * nr_recv + j]; + for (unsigned int j = 0; j < nr_mbufs; j++) + { + tx_buff[j] = class_mbufs[i][j]; + } + job_bitmap_t job_id = index_to_job_id(i); + cli_job_mbufs_write_process(tx_buff, nr_mbufs, job_id); } - job_bitmap_t job_id = index_to_job_id(i); - cli_job_mbufs_write_process(rx_buff, nr_mbufs, job_id); - } - - for (unsigned int i = 8; i < 16; i++) - { - unsigned int nr_mbufs = nr_jobs_mbufs[i]; - if (nr_mbufs == 0) + else if (role == DP_TELEMETRY_ROLE) { - continue; + for (unsigned int j = 0; j < nr_mbufs; j++) + { + char * data; + size_t size; + dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size); + kafka_produce(kafka_topic, (void *)data, size); + } } - - for (unsigned int j = 0; j < nr_mbufs; j++) - { - rx_buff[j] = class_mbufs[i * nr_recv + j]; - } - for (unsigned int j = 0; j < nr_mbufs; j++) + else { - char * data; - size_t size; - dp_trace_decode_to_message_pack(rx_buff[j], &data, &size); - kafka_produce(g_conf->kafka_topic, (void *)data, size); + syslog(LOG_INFO, + "The job has been deleted. The trace content corresponding to the job has been discarded."); + marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs); } } } } int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs, - marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]) + marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][BURST_MAX], + int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]) { - assert(nr_class_mbufs >= DP_TRACE_JOB_NUM_MAX * nr_mbufs); - if (unlikely(nr_class_mbufs < DP_TRACE_JOB_NUM_MAX * nr_mbufs)) - { - return -1; - } - memset((void *)nr_jobs_mbufs, 0, DP_TRACE_JOB_NUM_MAX * sizeof(int)); for (unsigned int i = 0; i < nr_mbufs; i++) @@ -163,7 +163,7 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs for (unsigned int j = 0; j < DP_TRACE_JOB_NUM_MAX; j++) { - uint16_t job_id_mask = 1 << j; + uint16_t job_id_mask = index_to_job_id(j); uint16_t job_id = jobs_id & job_id_mask; if (job_id == 0) { @@ -171,7 +171,7 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs } unsigned int nr_job = nr_jobs_mbufs[j]; - class_mbufs[j * nr_mbufs + nr_job] = mbufs[i]; + class_mbufs[j][nr_job] = mbufs[i]; nr_jobs_mbufs[j]++; } } @@ -261,7 +261,7 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id) dp_trace_file_mutex_lock(job_id); - unsigned int max_size = g_conf->dp_trace_file_max_size_in_KB / 2; + unsigned int max_size = global_config_get()->dp_trace_file_max_size_in_KB / 2; if (max_size == 0) { @@ -342,8 +342,8 @@ void dp_trace_pcapng_merger(job_bitmap_t job_id) } char command[2 * PATH_MAX]; - snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1", g_conf->dp_trace_merge_timeout, - file_path, file_middle_path, file_bak_path); + snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1", + global_config_get()->dp_trace_merge_timeout, file_path, file_middle_path, file_bak_path); syslog(LOG_INFO, "merge trace file: %s", command); FILE * fp; @@ -433,9 +433,9 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat mpack_write_cstr(&writer, job_id_str); mpack_write_cstr(&writer, "sled_ip"); - if (g_conf->sled_ip != NULL) + if (global_config_get()->sled_ip != NULL) { - mpack_write_cstr(&writer, g_conf->sled_ip); + mpack_write_cstr(&writer, global_config_get()->sled_ip); } else { @@ -443,7 +443,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat } mpack_write_cstr(&writer, "device_group"); - mpack_write_cstr(&writer, g_conf->device_group); + mpack_write_cstr(&writer, global_config_get()->device_group); mpack_write_cstr(&writer, "source_ip"); mpack_write_cstr(&writer, pkt_info.src_addr_str); |
