summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/dp_trace.conf4
-rw-r--r--src/job_ctx.c105
-rw-r--r--src/job_ctx.h16
-rw-r--r--src/kafka.c43
-rw-r--r--src/maat.c105
-rw-r--r--src/trace_output.c17
6 files changed, 162 insertions, 128 deletions
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf
index dabec81..226012d 100644
--- a/etc/dp_trace.conf
+++ b/etc/dp_trace.conf
@@ -7,8 +7,8 @@ device_group=
[kafka]
borker_list="192.168.44.12"
topic_name="datapath-telemetry-record-test"
-sasl_username="admin"
-sasl_password="galaxy2019"
+sasl_username=
+sasl_password=
[maat]
# 0:json 1:redis
diff --git a/src/job_ctx.c b/src/job_ctx.c
index 99ded9e..ae0fa16 100644
--- a/src/job_ctx.c
+++ b/src/job_ctx.c
@@ -2,6 +2,7 @@
#include "common.h"
#include "trace_output.h"
+#include <MESA/maat.h>
#include <stdlib.h>
#include <string.h>
@@ -12,6 +13,9 @@ struct dp_trace_job_occupy
};
static struct dp_trace_job_occupy dp_trace_job_occupy[DP_TRACE_JOB_NUM_MAX] = {};
+static struct dp_trace_telemetry_desc telemetry_descs[DP_TRACE_JOB_NUM_MAX] = {};
+
+static int telemetry_unused_job_index_get();
void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role)
{
@@ -55,6 +59,81 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8
}
}
+void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad,
+ long argl, void * argp)
+{
+ int ret = 0;
+ size_t offset = 0;
+ size_t len = 0;
+ char * json_str = NULL;
+ cJSON * json = NULL;
+
+ ret = maat_helper_read_column(table_line, 2, &offset, &len);
+ if (ret < 0)
+ {
+ dzlog_error("fail to get data path rule in maat.");
+ goto out;
+ }
+
+ json_str = calloc(sizeof(char), len + 1);
+ memcpy(json_str, table_line + offset, len);
+ json = cJSON_Parse(json_str);
+ if (json == NULL)
+ {
+ dzlog_error("Invalid decryption parameter: %s", table_line);
+ goto out;
+ }
+
+ struct dp_trace_telemetry_desc telemetry_desc = {};
+ struct dp_trace_job_desc * job_desc = &telemetry_desc.job_desc;
+
+ cJSON * uuid_obj = cJSON_GetObjectItem(json, "job_id");
+ DP_TRACE_VERIFY(cJSON_IsString(uuid_obj), "uuid is not string");
+ ret = uuid_parse(uuid_obj->valuestring, telemetry_desc.uuid);
+ if (ret != 0)
+ {
+ dzlog_error("uuid parsing failed: %s", uuid_obj->valuestring);
+ goto out;
+ }
+
+ cJSON * bpf_obj = cJSON_GetObjectItem(json, "bpf_expr");
+ DP_TRACE_VERIFY(cJSON_IsString(bpf_obj), "bpf expr is not string");
+ snprintf(job_desc->bpf_expr, sizeof(job_desc->bpf_expr), "%s", bpf_obj->valuestring);
+
+ int index = telemetry_unused_job_index_get();
+ if (index < 0)
+ {
+ dzlog_warn("no enough job for bpf_expr:", job_desc->bpf_expr);
+ goto out;
+ }
+
+ job_desc->rule_index = index;
+ job_desc->sampling = 1;
+ job_desc->enable = true;
+
+ memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc));
+
+ job_rule_apply(&telemetry_descs[index].job_desc, 1, DP_TELEMETRY_ROLE);
+
+ *ad = &telemetry_descs[index];
+
+out:
+ if (json_str)
+ cJSON_Delete(json);
+ if (json_str)
+ free(json_str);
+ return;
+}
+
+void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp)
+{
+ struct dp_trace_telemetry_desc * telemetry_desc = *ad;
+ struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc;
+ job_desc->enable = false;
+ job_rule_apply(job_desc, 1, DP_TELEMETRY_ROLE);
+ return;
+}
+
int is_job_id_used(job_bitmap_t job_id)
{
unsigned int index = job_id_to_index(job_id);
@@ -80,4 +159,30 @@ job_bitmap_t index_to_job_id(unsigned int index)
{
assert(index < DP_TRACE_JOB_NUM_MAX);
return 1 << index;
+}
+
+void telemetry_job_uuid_get(job_bitmap_t job_id, unsigned char * uu)
+{
+ unsigned int index = job_id_to_index(job_id);
+ uuid_copy(uu, telemetry_descs[index].uuid);
+}
+
+static int telemetry_unused_job_index_get()
+{
+#define TELEMETRY_INDEX_START 8
+#define TELEMETRY_INDEX_END DP_TRACE_JOB_NUM_MAX
+
+ int ret = -1;
+ for (unsigned int i = TELEMETRY_INDEX_START; i < TELEMETRY_INDEX_END; i++)
+ {
+ job_bitmap_t job_id = index_to_job_id(i);
+ if (is_job_id_used(job_id))
+ {
+ continue;
+ }
+ ret = i;
+ break;
+ }
+
+ return ret;
} \ No newline at end of file
diff --git a/src/job_ctx.h b/src/job_ctx.h
index c1732fd..76a11fb 100644
--- a/src/job_ctx.h
+++ b/src/job_ctx.h
@@ -1,8 +1,22 @@
#pragma once
#include "marsio.h"
+#include <cjson/cJSON.h>
+#include <uuid/uuid.h>
+
+struct dp_trace_telemetry_desc
+{
+ uuid_t uuid;
+ char job_name[128];
+ struct dp_trace_job_desc job_desc;
+};
void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role);
int job_id_to_index(job_bitmap_t job_id);
job_bitmap_t index_to_job_id(unsigned int index);
int is_job_id_used(job_bitmap_t job_id);
-uint8_t job_id_role_get(job_bitmap_t job_id); \ No newline at end of file
+uint8_t job_id_role_get(job_bitmap_t job_id);
+
+void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad,
+ long argl, void * argp);
+void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp);
+void telemetry_job_uuid_get(job_bitmap_t job_id, unsigned char * uu); \ No newline at end of file
diff --git a/src/kafka.c b/src/kafka.c
index 60146f7..69d8dc7 100644
--- a/src/kafka.c
+++ b/src/kafka.c
@@ -22,23 +22,42 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user
dzlog_error("Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr);
goto error;
}
-
- assert(strlen(sasl_username) > 0);
- assert(strlen(sasl_passwd) > 0);
-
- rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
- ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
+ ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
- dzlog_error("Error to set kafka \"sasl.username\", %s.", kafka_errstr);
+ dzlog_error("Error to set kafka \"security.protocol\", %s.", kafka_errstr);
goto error;
}
- ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
- if (ret != RD_KAFKA_CONF_OK)
+
+ if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
{
- dzlog_error("Error to set kafka \"sasl.password\", %s.", kafka_errstr);
- goto error;
+
+ ret = rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ dzlog_error("Error to set kafka \"security.protocol\", %s.", kafka_errstr);
+ goto error;
+ }
+
+ ret = rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ dzlog_error("Error to set kafka \"sasl.mechanisms\", %s.", kafka_errstr);
+ goto error;
+ }
+
+ ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ dzlog_error("Error to set kafka \"sasl.username\", %s.", kafka_errstr);
+ goto error;
+ }
+ ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ dzlog_error("Error to set kafka \"sasl.password\", %s.", kafka_errstr);
+ goto error;
+ }
}
// The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
diff --git a/src/maat.c b/src/maat.c
index 836b8ae..4e2cbbe 100644
--- a/src/maat.c
+++ b/src/maat.c
@@ -4,36 +4,6 @@
#include <MESA/maat.h>
#include <cjson/cJSON.h>
-#include <uuid/uuid.h>
-
-struct dp_trace_telemetry_desc
-{
- uuid_t uuid;
- char job_name[128];
- struct dp_trace_job_desc job_desc;
-};
-
-static struct dp_trace_telemetry_desc telemetry_descs[DP_TRACE_JOB_NUM_MAX];
-
-static int dp_trace_telemetry_unused_job_index_get()
-{
-#define TELEMETRY_INDEX_START 8
-#define TELEMETRY_INDEX_END DP_TRACE_JOB_NUM_MAX
-
- int ret = -1;
- for (unsigned int i = TELEMETRY_INDEX_START; i < TELEMETRY_INDEX_END; i++)
- {
- job_bitmap_t job_id = index_to_job_id(i);
- if (is_job_id_used(job_id))
- {
- continue;
- }
- ret = i;
- break;
- }
-
- return ret;
-}
static struct maat * dp_trace_maat_instance_create()
{
@@ -99,81 +69,6 @@ error_out:
return NULL;
}
-static void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line,
- void ** ad, long argl, void * argp)
-{
- int ret = 0;
- size_t offset = 0;
- size_t len = 0;
- char * json_str = NULL;
- cJSON * json = NULL;
-
- ret = maat_helper_read_column(table_line, 2, &offset, &len);
- if (ret < 0)
- {
- dzlog_error("fail to get data path rule in maat.");
- goto out;
- }
-
- json_str = calloc(sizeof(char), len + 1);
- memcpy(json_str, table_line + offset, len);
- json = cJSON_Parse(json_str);
- if (json == NULL)
- {
- dzlog_error("Invalid decryption parameter: %s", table_line);
- goto out;
- }
-
- struct dp_trace_telemetry_desc telemetry_desc;
- struct dp_trace_job_desc * job_desc = &telemetry_desc.job_desc;
-
- cJSON * uuid_obj = cJSON_GetObjectItem(json, "job_id");
- DP_TRACE_VERIFY(cJSON_IsString(uuid_obj), "uuid is not string");
- ret = uuid_parse(uuid_obj->valuestring, telemetry_desc.uuid);
- if (ret != 0)
- {
- dzlog_error("uuid parsing failed: %s", uuid_obj->valuestring);
- goto out;
- }
-
- cJSON * bpf_obj = cJSON_GetObjectItem(json, "bpf_expr");
- DP_TRACE_VERIFY(cJSON_IsString(bpf_obj), "bpf expr is not string");
- snprintf(job_desc->bpf_expr, sizeof(job_desc->bpf_expr), "%s", bpf_obj->valuestring);
-
- int index = dp_trace_telemetry_unused_job_index_get();
- if (index < 0)
- {
- dzlog_warn("no enough job for bpf_expr:", job_desc->bpf_expr);
- goto out;
- }
-
- job_desc->rule_index = index;
- job_desc->sampling = 1;
- job_desc->enable = true;
-
- memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc));
-
- job_rule_apply(&telemetry_descs[index].job_desc, 1, DP_TELEMETRY_ROLE);
-
- *ad = &telemetry_descs[index];
-
-out:
- if (json_str)
- cJSON_Delete(json);
- if (json_str)
- free(json_str);
- return;
-}
-
-static void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp)
-{
- struct dp_trace_telemetry_desc * telemetry_desc = *ad;
- struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc;
- job_desc->enable = false;
- job_rule_apply(job_desc, 1, DP_TELEMETRY_ROLE);
- return;
-}
-
void dp_trace_maat_init()
{
dzlog_info("data path trace maat init start...");
diff --git a/src/trace_output.c b/src/trace_output.c
index b984390..cf7c2e6 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -26,7 +26,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id);
int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs,
marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][nr_mbufs],
int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]);
-static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size);
+static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id);
static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size);
struct dp_trace_output
@@ -127,7 +127,8 @@ void * dp_trace_process_thread(void * arg)
{
char * data;
size_t size;
- dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size);
+ job_bitmap_t job_id = index_to_job_id(i);
+ dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id);
kafka_produce(kafka_topic, (void *)data, size);
}
}
@@ -403,7 +404,7 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id)
return pthread_mutex_unlock(&dp_trace_output[index].file_mutex);
}
-static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size)
+static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size, job_bitmap_t job_id)
{
struct dp_trace_buffer_telemetry trace_buff_info;
marsio_dp_trace_buffer_info_get(mr_mbuf, &trace_buff_info);
@@ -414,8 +415,6 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
gettimeofday(&tv, NULL);
uint64_t microseconds = (uint64_t)(tv.tv_sec) * 1000000 + tv.tv_usec;
- job_bitmap_t job_id = trace_buff_info.jobs_id;
-
struct pkt_inner_ip_port pkt_info;
marsio_pkt_inner_ip_port_get(mr_mbuf, &pkt_info);
@@ -426,10 +425,12 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
mpack_write_cstr(&writer, "timestamp_us");
mpack_write_i64(&writer, microseconds);
- char job_id_str[10];
- snprintf(job_id_str, sizeof(job_id_str), "%d", job_id);
+ uuid_t uuid;
+ telemetry_job_uuid_get(job_id, uuid);
+ char uuid_str[37];
+ uuid_unparse(uuid, uuid_str);
mpack_write_cstr(&writer, "job_id");
- mpack_write_cstr(&writer, job_id_str);
+ mpack_write_cstr(&writer, uuid_str);
mpack_write_cstr(&writer, "sled_ip");
if (global_config_get()->sled_ip != NULL)