diff options
| author | 刘学利 <[email protected]> | 2021-08-17 11:01:34 +0000 |
|---|---|---|
| committer | 刘学利 <[email protected]> | 2021-08-17 11:01:34 +0000 |
| commit | dd00c9effe1054420442a2ab9ca8726ef3d61af3 (patch) | |
| tree | 98dab4cdc5b970c281aae4458c7393607b9e50c5 /src/tsg_send_log.cpp | |
| parent | 145786c2da0939fdf12f85fab44667c8eba54305 (diff) | |
Feature connect kafka with sasl plaintext
Diffstat (limited to 'src/tsg_send_log.cpp')
| -rw-r--r-- | src/tsg_send_log.cpp | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index de11e48..9d7651f 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -1421,6 +1421,8 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", _instance->common_field_file, sizeof(_instance->common_field_file), NULL); MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", _instance->broker_list, sizeof(_instance->broker_list), NULL); + MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), "admin"); + MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), "galaxy2019"); MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000"); MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000"); @@ -1462,7 +1464,11 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", _instance->broker_list, kafka_errstr, sizeof(kafka_errstr)); - + rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.username", _instance->sasl_username, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); + if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) { MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_new is error"); |
