summaryrefslogtreecommitdiff
path: root/src/trace_output.c
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-08 06:32:58 +0000
committer童宗振 <[email protected]>2024-04-08 06:32:58 +0000
commit1f609621536b5023c864abfc0814696377355b43 (patch)
treeaf67f0f38b866230b5c7e0429bb709bd87bd30d8 /src/trace_output.c
parent2f9706ea37e32118843d82e404fdfe26130d3bfd (diff)
Support kafka
Diffstat (limited to 'src/trace_output.c')
-rw-r--r--src/trace_output.c104
1 files changed, 104 insertions, 0 deletions
diff --git a/src/trace_output.c b/src/trace_output.c
index 1902220..5cfa370 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -16,6 +16,8 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id);
void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id);
void dp_trace_file_rollbak(job_bitmap_t job_id);
+static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd);
+
struct dp_trace_output
{
char * file_path;
@@ -29,6 +31,7 @@ struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {};
void dp_trace_output_init()
{
+ // pcapng file path init
char * dp_trace_dir = g_conf->dp_trace_dir;
int ret = mkdir(dp_trace_dir, 0755);
if (ret != 0 && errno != EEXIST)
@@ -57,6 +60,12 @@ void dp_trace_output_init()
exit(EXIT_FAILURE);
}
}
+
+ // kafka init
+#if 0
+ g_conf->kafka_handle = kafka_handle_create(g_conf->broker_list, g_conf->sasl_password, g_conf->sasl_username);
+ g_conf->kafka_topic = rd_kafka_topic_new(g_conf->kafka_handle, g_conf->topic_name, NULL);
+#endif
}
void * dp_trace_process_thread(void * arg)
@@ -98,6 +107,30 @@ void * dp_trace_process_thread(void * arg)
job_bitmap_t job_id = index_to_job_id(i);
cli_job_mbufs_write_process(rx_buff, nr_mbufs, job_id);
}
+
+ for (unsigned int i = 8; i < 16; i++)
+ {
+ unsigned int nr_mbufs = nr_jobs_mbufs[i];
+ if (nr_mbufs == 0)
+ {
+ continue;
+ }
+
+ for (unsigned int j = 0; j < nr_mbufs; j++)
+ {
+ rx_buff[j] = class_mbufs[i * nr_recv + j];
+ }
+ for (unsigned int j = 0; j < nr_mbufs; j++)
+ {
+ char * data;
+ size_t size;
+ marsio_dp_trace_decode_to_message_pack(mr_instance, rx_buff[j], &data, &size);
+#if 0
+ rd_kafka_produce(g_conf->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, (void *)data, size,
+ NULL, 0, NULL);
+#endif
+ }
+ }
}
}
@@ -299,4 +332,75 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id)
{
unsigned int index = job_id_to_index(job_id);
return pthread_mutex_unlock(&dp_trace_output[index].file_mutex);
+}
+
+static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd)
+{
+ int ret;
+ char kafka_errstr[1024] = {0};
+ rd_kafka_t * handle = NULL;
+ rd_kafka_conf_t * rconf = NULL;
+
+ rconf = rd_kafka_conf_new();
+
+ ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ syslog(LOG_ERR, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
+ goto error;
+ }
+ ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ syslog(LOG_ERR, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr);
+ goto error;
+ }
+
+#if 0
+ assert(strlen(sasl_username) > 0);
+ assert(strlen(sasl_passwd) > 0);
+#endif
+
+ rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
+ ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ syslog(LOG_ERR, "Error to set kafka \"sasl.username\", %s.", kafka_errstr);
+ goto error;
+ }
+ ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ syslog(LOG_ERR, "Error to set kafka \"sasl.password\", %s.", kafka_errstr);
+ goto error;
+ }
+
+ // The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
+ handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr));
+ rconf = NULL;
+ if (handle == NULL)
+ {
+ syslog(LOG_ERR, "Error to new kafka, %s.", kafka_errstr);
+ goto error;
+ }
+
+ if (rd_kafka_brokers_add(handle, brokerlist) == 0)
+ {
+ syslog(LOG_ERR, "Error to add kakfa bokers.");
+ goto error;
+ }
+
+ return handle;
+
+error:
+ if (rconf != NULL)
+ {
+ rd_kafka_conf_destroy(rconf);
+ }
+ if (handle != NULL)
+ {
+ rd_kafka_destroy(handle);
+ }
+ return NULL;
} \ No newline at end of file