summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-03-16 17:07:39 +0800
committerqidaijie <[email protected]>2022-03-16 17:07:39 +0800
commit32cdd71e71884c1fe3a1d8750be19560cba806f3 (patch)
treee08d53072d47de799b0fbb33e7d427f32589d1c8
parentff5039441309bd145bdd811f52c19fe034afae7e (diff)
新增kafka用户名密码加密 TSG-8835
-rw-r--r--pom.xml9
-rw-r--r--properties/default_config.properties9
-rw-r--r--properties/service_flow_config.properties4
-rw-r--r--src/main/java/com/zdjizhi/common/StreamAggregateConfig.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java8
5 files changed, 32 insertions, 15 deletions
diff --git a/pom.xml b/pom.xml
index 02126d6..3cae362 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-olap-analysis-schema</artifactId>
- <version>220309-balance</version>
+ <version>220316-encryption</version>
<name>log-olap-analysis-schema</name>
<url>http://www.example.com</url>
@@ -226,6 +226,13 @@
<scope>test</scope>
</dependency>
+ <!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
+ <dependency>
+ <groupId>org.jasypt</groupId>
+ <artifactId>jasypt</artifactId>
+ <version>1.9.3</version>
+ </dependency>
+
</dependencies>
</project>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 4961a5d..9c083ad 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -28,12 +28,11 @@ buffer.memory=134217728
#10M
max.request.size=10485760
#====================kafka default====================#
+#kafka SASL验证用户名-加密
+kafka.user=nsyGpHKGFA4KW0zro9MDdw==
-#kafka SASL验证用户名
-kafka.user=admin
-
-#kafka SASL及SSL验证密码
-kafka.pin=galaxy2019
+#kafka SASL及SSL验证密码-加密
+kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#====================Topology Default====================#
#两个输出之间的最大时间(单位milliseconds)
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 9231c1b..c933cba 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,10 +1,10 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
-sink.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+sink.kafka.servers=192.168.44.12:9094
#--------------------------------HTTP------------------------------#
#kafka 证书地址
diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
index f6100f3..14d79c6 100644
--- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
+++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
@@ -2,12 +2,19 @@ package com.zdjizhi.common;
import com.zdjizhi.utils.system.StreamAggregateConfigurations;
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author Administrator
*/
public class StreamAggregateConfig {
+ private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
+
+ static {
+ encryptor.setPassword("galaxy");
+ }
+
public static final String FORMAT_SPLITTER = ",";
public static final String PROTOCOL_SPLITTER = "\\.";
@@ -26,15 +33,19 @@ public class StreamAggregateConfig {
public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism");
public static final Integer RANDOM_RANGE_NUM = StreamAggregateConfigurations.getIntProperty(1, "random.range.num");
+ /**
+ * Kafka common
+ */
+ public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(StreamAggregateConfigurations.getStringProperty(1, "kafka.user"));
+ public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(StreamAggregateConfigurations.getStringProperty(1, "kafka.pin"));
+
/**
- * kafka source
+ * kafka sink config
*/
public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack");
- public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user");
- public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin");
public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries");
public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms");
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
index cbc92f4..c85686b 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -33,15 +33,15 @@ class CertUtils {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";");
+ + StreamAggregateConfig.KAFKA_SASL_JAAS_USER + " password=" + StreamAggregateConfig.KAFKA_SASL_JAAS_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN);
+ properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN);
- properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN);
}
}