diff options
| author | 童宗振 <[email protected]> | 2024-04-16 09:12:45 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-16 09:12:45 +0000 |
| commit | 1f413abebc4c2b8b7b6457a717a9d9d7c6b74ae7 (patch) | |
| tree | d0106802ed44baa6d4de9f647b2cc83a0d7f1207 | |
| parent | 43acf274892771a84488a6a189fd0d3e3db3619d (diff) | |
| parent | 7734429f5a06efa8eb5ed3067aacc75ce7889e44 (diff) | |
Merge branch 'fix_kafka_error' into 'master'
fix kafka message job_id error
See merge request tsg/dp_telemetry_app!12
| -rw-r--r-- | etc/dp_trace.conf | 4 | ||||
| -rw-r--r-- | src/job_ctx.c | 105 | ||||
| -rw-r--r-- | src/job_ctx.h | 16 | ||||
| -rw-r--r-- | src/kafka.c | 43 | ||||
| -rw-r--r-- | src/maat.c | 105 | ||||
| -rw-r--r-- | src/trace_output.c | 17 |
6 files changed, 162 insertions, 128 deletions
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf index dabec81..226012d 100644 --- a/etc/dp_trace.conf +++ b/etc/dp_trace.conf @@ -7,8 +7,8 @@ device_group= [kafka] borker_list="192.168.44.12" topic_name="datapath-telemetry-record-test" -sasl_username="admin" -sasl_password="galaxy2019" +sasl_username= +sasl_password= [maat] # 0:json 1:redis diff --git a/src/job_ctx.c b/src/job_ctx.c index 99ded9e..ae0fa16 100644 --- a/src/job_ctx.c +++ b/src/job_ctx.c @@ -2,6 +2,7 @@ #include "common.h" #include "trace_output.h" +#include <MESA/maat.h> #include <stdlib.h> #include <string.h> @@ -12,6 +13,9 @@ struct dp_trace_job_occupy }; static struct dp_trace_job_occupy dp_trace_job_occupy[DP_TRACE_JOB_NUM_MAX] = {}; +static struct dp_trace_telemetry_desc telemetry_descs[DP_TRACE_JOB_NUM_MAX] = {}; + +static int telemetry_unused_job_index_get(); void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role) { @@ -55,6 +59,81 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8 } } +void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad, + long argl, void * argp) +{ + int ret = 0; + size_t offset = 0; + size_t len = 0; + char * json_str = NULL; + cJSON * json = NULL; + + ret = maat_helper_read_column(table_line, 2, &offset, &len); + if (ret < 0) + { + dzlog_error("fail to get data path rule in maat."); + goto out; + } + + json_str = calloc(sizeof(char), len + 1); + memcpy(json_str, table_line + offset, len); + json = cJSON_Parse(json_str); + if (json == NULL) + { + dzlog_error("Invalid decryption parameter: %s", table_line); + goto out; + } + + struct dp_trace_telemetry_desc telemetry_desc = {}; + struct dp_trace_job_desc * job_desc = &telemetry_desc.job_desc; + + cJSON * uuid_obj = cJSON_GetObjectItem(json, "job_id"); + DP_TRACE_VERIFY(cJSON_IsString(uuid_obj), "uuid is not string"); + ret = uuid_parse(uuid_obj->valuestring, telemetry_desc.uuid); + if (ret != 0) + { + dzlog_error("uuid parsing failed: %s", uuid_obj->valuestring); + goto out; + } + + cJSON * bpf_obj = cJSON_GetObjectItem(json, "bpf_expr"); + DP_TRACE_VERIFY(cJSON_IsString(bpf_obj), "bpf expr is not string"); + snprintf(job_desc->bpf_expr, sizeof(job_desc->bpf_expr), "%s", bpf_obj->valuestring); + + int index = telemetry_unused_job_index_get(); + if (index < 0) + { + dzlog_warn("no enough job for bpf_expr:", job_desc->bpf_expr); + goto out; + } + + job_desc->rule_index = index; + job_desc->sampling = 1; + job_desc->enable = true; + + memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc)); + + job_rule_apply(&telemetry_descs[index].job_desc, 1, DP_TELEMETRY_ROLE); + + *ad = &telemetry_descs[index]; + +out: + if (json_str) + cJSON_Delete(json); + if (json_str) + free(json_str); + return; +} + +void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp) +{ + struct dp_trace_telemetry_desc * telemetry_desc = *ad; + struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc; + job_desc->enable = false; + job_rule_apply(job_desc, 1, DP_TELEMETRY_ROLE); + return; +} + int is_job_id_used(job_bitmap_t job_id) { unsigned int index = job_id_to_index(job_id); @@ -80,4 +159,30 @@ job_bitmap_t index_to_job_id(unsigned int index) { assert(index < DP_TRACE_JOB_NUM_MAX); return 1 << index; +} + +void telemetry_job_uuid_get(job_bitmap_t job_id, unsigned char * uu) +{ + unsigned int index = job_id_to_index(job_id); + uuid_copy(uu, telemetry_descs[index].uuid); +} + +static int telemetry_unused_job_index_get() +{ +#define TELEMETRY_INDEX_START 8 +#define TELEMETRY_INDEX_END DP_TRACE_JOB_NUM_MAX + + int ret = -1; + for (unsigned int i = TELEMETRY_INDEX_START; i < TELEMETRY_INDEX_END; i++) + { + job_bitmap_t job_id = index_to_job_id(i); + if (is_job_id_used(job_id)) + { + continue; + } + ret = i; + break; + } + + return ret; }
\ No newline at end of file diff --git a/src/job_ctx.h b/src/job_ctx.h index c1732fd..76a11fb 100644 --- a/src/job_ctx.h +++ b/src/job_ctx.h @@ -1,8 +1,22 @@ #pragma once #include "marsio.h" +#include <cjson/cJSON.h> +#include <uuid/uuid.h> + +struct dp_trace_telemetry_desc +{ + uuid_t uuid; + char job_name[128]; + struct dp_trace_job_desc job_desc; +}; void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role); int job_id_to_index(job_bitmap_t job_id); job_bitmap_t index_to_job_id(unsigned int index); int is_job_id_used(job_bitmap_t job_id); -uint8_t job_id_role_get(job_bitmap_t job_id);
\ No newline at end of file +uint8_t job_id_role_get(job_bitmap_t job_id); + +void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad, + long argl, void * argp); +void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp); +void telemetry_job_uuid_get(job_bitmap_t job_id, unsigned char * uu);
\ No newline at end of file diff --git a/src/kafka.c b/src/kafka.c index 60146f7..69d8dc7 100644 --- a/src/kafka.c +++ b/src/kafka.c @@ -22,23 +22,42 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user dzlog_error("Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr); goto error; } - - assert(strlen(sasl_username) > 0); - assert(strlen(sasl_passwd) > 0); - - rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); - ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr)); + ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { - dzlog_error("Error to set kafka \"sasl.username\", %s.", kafka_errstr); + dzlog_error("Error to set kafka \"security.protocol\", %s.", kafka_errstr); goto error; } - ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); - if (ret != RD_KAFKA_CONF_OK) + + if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0) { - dzlog_error("Error to set kafka \"sasl.password\", %s.", kafka_errstr); - goto error; + + ret = rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + dzlog_error("Error to set kafka \"security.protocol\", %s.", kafka_errstr); + goto error; + } + + ret = rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + dzlog_error("Error to set kafka \"sasl.mechanisms\", %s.", kafka_errstr); + goto error; + } + + ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + dzlog_error("Error to set kafka \"sasl.username\", %s.", kafka_errstr); + goto error; + } + ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + dzlog_error("Error to set kafka \"sasl.password\", %s.", kafka_errstr); + goto error; + } } // The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. @@ -4,36 +4,6 @@ #include <MESA/maat.h> #include <cjson/cJSON.h> -#include <uuid/uuid.h> - -struct dp_trace_telemetry_desc -{ - uuid_t uuid; - char job_name[128]; - struct dp_trace_job_desc job_desc; -}; - -static struct dp_trace_telemetry_desc telemetry_descs[DP_TRACE_JOB_NUM_MAX]; - -static int dp_trace_telemetry_unused_job_index_get() -{ -#define TELEMETRY_INDEX_START 8 -#define TELEMETRY_INDEX_END DP_TRACE_JOB_NUM_MAX - - int ret = -1; - for (unsigned int i = TELEMETRY_INDEX_START; i < TELEMETRY_INDEX_END; i++) - { - job_bitmap_t job_id = index_to_job_id(i); - if (is_job_id_used(job_id)) - { - continue; - } - ret = i; - break; - } - - return ret; -} static struct maat * dp_trace_maat_instance_create() { @@ -99,81 +69,6 @@ error_out: return NULL; } -static void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, - void ** ad, long argl, void * argp) -{ - int ret = 0; - size_t offset = 0; - size_t len = 0; - char * json_str = NULL; - cJSON * json = NULL; - - ret = maat_helper_read_column(table_line, 2, &offset, &len); - if (ret < 0) - { - dzlog_error("fail to get data path rule in maat."); - goto out; - } - - json_str = calloc(sizeof(char), len + 1); - memcpy(json_str, table_line + offset, len); - json = cJSON_Parse(json_str); - if (json == NULL) - { - dzlog_error("Invalid decryption parameter: %s", table_line); - goto out; - } - - struct dp_trace_telemetry_desc telemetry_desc; - struct dp_trace_job_desc * job_desc = &telemetry_desc.job_desc; - - cJSON * uuid_obj = cJSON_GetObjectItem(json, "job_id"); - DP_TRACE_VERIFY(cJSON_IsString(uuid_obj), "uuid is not string"); - ret = uuid_parse(uuid_obj->valuestring, telemetry_desc.uuid); - if (ret != 0) - { - dzlog_error("uuid parsing failed: %s", uuid_obj->valuestring); - goto out; - } - - cJSON * bpf_obj = cJSON_GetObjectItem(json, "bpf_expr"); - DP_TRACE_VERIFY(cJSON_IsString(bpf_obj), "bpf expr is not string"); - snprintf(job_desc->bpf_expr, sizeof(job_desc->bpf_expr), "%s", bpf_obj->valuestring); - - int index = dp_trace_telemetry_unused_job_index_get(); - if (index < 0) - { - dzlog_warn("no enough job for bpf_expr:", job_desc->bpf_expr); - goto out; - } - - job_desc->rule_index = index; - job_desc->sampling = 1; - job_desc->enable = true; - - memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc)); - - job_rule_apply(&telemetry_descs[index].job_desc, 1, DP_TELEMETRY_ROLE); - - *ad = &telemetry_descs[index]; - -out: - if (json_str) - cJSON_Delete(json); - if (json_str) - free(json_str); - return; -} - -static void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp) -{ - struct dp_trace_telemetry_desc * telemetry_desc = *ad; - struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc; - job_desc->enable = false; - job_rule_apply(job_desc, 1, DP_TELEMETRY_ROLE); - return; -} - void dp_trace_maat_init() { dzlog_info("data path trace maat init start..."); diff --git a/src/trace_output.c b/src/trace_output.c index b984390..cf7c2e6 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -26,7 +26,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id); int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs, marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][nr_mbufs], int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]); -static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size); +static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id); static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size); struct dp_trace_output @@ -127,7 +127,8 @@ void * dp_trace_process_thread(void * arg) { char * data; size_t size; - dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size); + job_bitmap_t job_id = index_to_job_id(i); + dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id); kafka_produce(kafka_topic, (void *)data, size); } } @@ -403,7 +404,7 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id) return pthread_mutex_unlock(&dp_trace_output[index].file_mutex); } -static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size) +static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id) { struct dp_trace_buffer_telemetry trace_buff_info; marsio_dp_trace_buffer_info_get(mr_mbuf, &trace_buff_info); @@ -414,8 +415,6 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat gettimeofday(&tv, NULL); uint64_t microseconds = (uint64_t)(tv.tv_sec) * 1000000 + tv.tv_usec; - job_bitmap_t job_id = trace_buff_info.jobs_id; - struct pkt_inner_ip_port pkt_info; marsio_pkt_inner_ip_port_get(mr_mbuf, &pkt_info); @@ -426,10 +425,12 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat mpack_write_cstr(&writer, "timestamp_us"); mpack_write_i64(&writer, microseconds); - char job_id_str[10]; - snprintf(job_id_str, sizeof(job_id_str), "%d", job_id); + uuid_t uuid; + telemetry_job_uuid_get(job_id, uuid); + char uuid_str[37]; + uuid_unparse(uuid, uuid_str); mpack_write_cstr(&writer, "job_id"); - mpack_write_cstr(&writer, job_id_str); + mpack_write_cstr(&writer, uuid_str); mpack_write_cstr(&writer, "sled_ip"); if (global_config_get()->sled_ip != NULL) |
