diff options
| author | 童宗振 <[email protected]> | 2024-04-08 06:32:58 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-08 06:32:58 +0000 |
| commit | 1f609621536b5023c864abfc0814696377355b43 (patch) | |
| tree | af67f0f38b866230b5c7e0429bb709bd87bd30d8 /src | |
| parent | 2f9706ea37e32118843d82e404fdfe26130d3bfd (diff) | |
Support kafka
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/config.c | 9 | ||||
| -rw-r--r-- | src/config.h | 11 | ||||
| -rw-r--r-- | src/trace_output.c | 104 |
4 files changed, 125 insertions, 1 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ec0db9e..ccb93af 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}) add_executable(${PROJECT_NAME} main.c config.c job_ctx.c trace_output.c) -target_link_libraries(${PROJECT_NAME} libmarsio maatframe MESA_prof_load libevent-static uuid pthread)
\ No newline at end of file +target_link_libraries(${PROJECT_NAME} libmarsio maatframe MESA_prof_load libevent-static rdkafka uuid pthread)
\ No newline at end of file diff --git a/src/config.c b/src/config.c index e35221e..2de6c45 100644 --- a/src/config.c +++ b/src/config.c @@ -51,6 +51,15 @@ void config_load() exit(EXIT_FAILURE); } g_conf->log_level = loglevel; + + MESA_load_profile_string_def(g_conf->config_path, "kafka", "borker_list", g_conf->broker_list, + sizeof(g_conf->broker_list), ""); + MESA_load_profile_string_def(g_conf->config_path, "kafka", "topic_name", g_conf->topic_name, + sizeof(g_conf->topic_name), ""); + MESA_load_profile_string_def(g_conf->config_path, "kafka", "sasl_username", g_conf->sasl_username, + sizeof(g_conf->sasl_username), ""); + MESA_load_profile_string_def(g_conf->config_path, "kafka", "sasl_password", g_conf->sasl_password, + sizeof(g_conf->sasl_password), ""); } void dynamic_config_load() diff --git a/src/config.h b/src/config.h index eafd533..22707d8 100644 --- a/src/config.h +++ b/src/config.h @@ -2,6 +2,7 @@ #include "common.h" #include <event.h> +#include <librdkafka/rdkafka.h> #include <limits.h> #include <sched.h> @@ -15,9 +16,19 @@ struct config cpu_set_t cpu_set_io; uint8_t log_level; + // signal event struct event_base * evbase; struct event * evsignal[8]; + // kafka + char topic_name[MR_SYMBOL_MAX]; + char broker_list[1024]; + char sasl_username[MR_SYMBOL_MAX]; + char sasl_password[MR_SYMBOL_MAX]; + rd_kafka_t * kafka_handle; + rd_kafka_topic_t * kafka_topic; + + // dp trace char dp_trace_dir[PATH_MAX]; unsigned int dp_trace_file_max_size_in_KB; unsigned int dp_trace_merge_timeout; diff --git a/src/trace_output.c b/src/trace_output.c index 1902220..5cfa370 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -16,6 +16,8 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id); void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id); void dp_trace_file_rollbak(job_bitmap_t job_id); +static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd); + struct dp_trace_output { char * file_path; @@ -29,6 +31,7 @@ struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {}; void dp_trace_output_init() { + // pcapng file path init char * dp_trace_dir = g_conf->dp_trace_dir; int ret = mkdir(dp_trace_dir, 0755); if (ret != 0 && errno != EEXIST) @@ -57,6 +60,12 @@ void dp_trace_output_init() exit(EXIT_FAILURE); } } + + // kafka init +#if 0 + g_conf->kafka_handle = kafka_handle_create(g_conf->broker_list, g_conf->sasl_password, g_conf->sasl_username); + g_conf->kafka_topic = rd_kafka_topic_new(g_conf->kafka_handle, g_conf->topic_name, NULL); +#endif } void * dp_trace_process_thread(void * arg) @@ -98,6 +107,30 @@ void * dp_trace_process_thread(void * arg) job_bitmap_t job_id = index_to_job_id(i); cli_job_mbufs_write_process(rx_buff, nr_mbufs, job_id); } + + for (unsigned int i = 8; i < 16; i++) + { + unsigned int nr_mbufs = nr_jobs_mbufs[i]; + if (nr_mbufs == 0) + { + continue; + } + + for (unsigned int j = 0; j < nr_mbufs; j++) + { + rx_buff[j] = class_mbufs[i * nr_recv + j]; + } + for (unsigned int j = 0; j < nr_mbufs; j++) + { + char * data; + size_t size; + marsio_dp_trace_decode_to_message_pack(mr_instance, rx_buff[j], &data, &size); +#if 0 + rd_kafka_produce(g_conf->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, (void *)data, size, + NULL, 0, NULL); +#endif + } + } } } @@ -299,4 +332,75 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id) { unsigned int index = job_id_to_index(job_id); return pthread_mutex_unlock(&dp_trace_output[index].file_mutex); +} + +static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd) +{ + int ret; + char kafka_errstr[1024] = {0}; + rd_kafka_t * handle = NULL; + rd_kafka_conf_t * rconf = NULL; + + rconf = rd_kafka_conf_new(); + + ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + syslog(LOG_ERR, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr); + goto error; + } + ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + syslog(LOG_ERR, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr); + goto error; + } + +#if 0 + assert(strlen(sasl_username) > 0); + assert(strlen(sasl_passwd) > 0); +#endif + + rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); + ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + syslog(LOG_ERR, "Error to set kafka \"sasl.username\", %s.", kafka_errstr); + goto error; + } + ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + syslog(LOG_ERR, "Error to set kafka \"sasl.password\", %s.", kafka_errstr); + goto error; + } + + // The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. + handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr)); + rconf = NULL; + if (handle == NULL) + { + syslog(LOG_ERR, "Error to new kafka, %s.", kafka_errstr); + goto error; + } + + if (rd_kafka_brokers_add(handle, brokerlist) == 0) + { + syslog(LOG_ERR, "Error to add kakfa bokers."); + goto error; + } + + return handle; + +error: + if (rconf != NULL) + { + rd_kafka_conf_destroy(rconf); + } + if (handle != NULL) + { + rd_kafka_destroy(handle); + } + return NULL; }
\ No newline at end of file |
