diff options
| author | qidaijie <[email protected]> | 2022-03-16 17:07:39 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-03-16 17:07:39 +0800 |
| commit | 32cdd71e71884c1fe3a1d8750be19560cba806f3 (patch) | |
| tree | e08d53072d47de799b0fbb33e7d427f32589d1c8 | |
| parent | ff5039441309bd145bdd811f52c19fe034afae7e (diff) | |
新增kafka用户名密码加密 TSG-8835
| -rw-r--r-- | pom.xml | 9 | ||||
| -rw-r--r-- | properties/default_config.properties | 9 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/StreamAggregateConfig.java | 17 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/CertUtils.java | 8 |
5 files changed, 32 insertions, 15 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-olap-analysis-schema</artifactId> - <version>220309-balance</version> + <version>220316-encryption</version> <name>log-olap-analysis-schema</name> <url>http://www.example.com</url> @@ -226,6 +226,13 @@ <scope>test</scope> </dependency> + <!-- https://mvnrepository.com/artifact/org.jasypt/jasypt --> + <dependency> + <groupId>org.jasypt</groupId> + <artifactId>jasypt</artifactId> + <version>1.9.3</version> + </dependency> + </dependencies> </project> diff --git a/properties/default_config.properties b/properties/default_config.properties index 4961a5d..9c083ad 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -28,12 +28,11 @@ buffer.memory=134217728 #10M max.request.size=10485760 #====================kafka default====================# +#kafka SASL验证用户名-加密 +kafka.user=nsyGpHKGFA4KW0zro9MDdw== -#kafka SASL验证用户名 -kafka.user=admin - -#kafka SASL及SSL验证密码 -kafka.pin=galaxy2019 +#kafka SASL及SSL验证密码-加密 +kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ #====================Topology Default====================# #两个输出之间的最大时间(单位milliseconds) diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 9231c1b..c933cba 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,10 +1,10 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +source.kafka.servers=192.168.44.12:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +sink.kafka.servers=192.168.44.12:9094 #--------------------------------HTTP------------------------------# #kafka 证书地址 diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java index f6100f3..14d79c6 100644 --- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -2,12 +2,19 @@ package com.zdjizhi.common; import com.zdjizhi.utils.system.StreamAggregateConfigurations; +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; /** * @author Administrator */ public class StreamAggregateConfig { + private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + + static { + encryptor.setPassword("galaxy"); + } + public static final String FORMAT_SPLITTER = ","; public static final String PROTOCOL_SPLITTER = "\\."; @@ -26,15 +33,19 @@ public class StreamAggregateConfig { public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism"); public static final Integer RANDOM_RANGE_NUM = StreamAggregateConfigurations.getIntProperty(1, "random.range.num"); + /** + * Kafka common + */ + public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(StreamAggregateConfigurations.getStringProperty(1, "kafka.user")); + public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(StreamAggregateConfigurations.getStringProperty(1, "kafka.pin")); + /** - * kafka source + * kafka sink config */ public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers"); public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack"); - public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user"); - public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin"); public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries"); public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms"); public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms"); diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java index cbc92f4..c85686b 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -33,15 +33,15 @@ class CertUtils { properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";"); + + StreamAggregateConfig.KAFKA_SASL_JAAS_USER + " password=" + StreamAggregateConfig.KAFKA_SASL_JAAS_PIN + ";"); } else if (servers.contains(SSL_PORT)) { properties.put("security.protocol", "SSL"); properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN); properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN); - properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN); } } |
