diff options
| author | wanglihui <[email protected]> | 2021-09-06 16:19:33 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-09-06 16:19:33 +0800 |
| commit | b4237bb4a9f7d318b432e6b5686b1a4329cf4acd (patch) | |
| tree | eac68d2ef88d422de812f0eef3dbee1417de1665 | |
| parent | c5943298bd618c4be7e4e4ee0e2b160873c62e17 (diff) | |
新增kafka sasl认证机制
| -rw-r--r-- | pom.xml | 64 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/CommonConfig.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/source/DosSketchSource.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java | 13 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/KafkaUtils.java | 10 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 20 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/common/UdtfTest.java | 17 |
7 files changed, 45 insertions, 85 deletions
@@ -116,44 +116,7 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-connector-kafka_2.11</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka_2.11</artifactId> - <version>${flink.version}</version> - <!--<scope>provided</scope>--> - </dependency> - - <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>1.0.0</version> - </dependency> - - <!--Flink modules--> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java</artifactId> - <version>${flink.version}</version> - <!--<scope>provided</scope>--> - </dependency> - - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_2.11</artifactId> - <version>${flink.version}</version> - <!--<scope>provided</scope>--> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_2.11</artifactId> + <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> @@ -161,7 +124,7 @@ <!-- CLI dependencies --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.11</artifactId> + <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> @@ -181,6 +144,23 @@ </exclusions> </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>2.2.3</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> @@ -204,12 +184,6 @@ </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-hbase-2.2_2.11</artifactId> - <version>${flink.version}</version> - </dependency> - - <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.2</version> diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 2da067b..69d6859 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -61,4 +61,7 @@ public class CommonConfig { public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes"); public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days"); + public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user"); + public static final String SASL_JAAS_CONFIG_PASSWORD = CommonConfigurations.getStringProperty("sasl.jaas.config.password"); + } diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java index fc86c87..af7f6ed 100644 --- a/src/main/java/com/zdjizhi/source/DosSketchSource.java +++ b/src/main/java/com/zdjizhi/source/DosSketchSource.java @@ -20,6 +20,9 @@ public class DosSketchSource { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); + properties.setProperty("security.protocol", "SASL_PLAINTEXT"); + properties.setProperty("sasl.mechanism", "PLAIN"); + properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); return streamExeEnv.addSource(new FlinkKafkaConsumer<String>( CommonConfig.KAFKA_INPUT_TOPIC_NAME, diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java index 6e8e02c..e34ce28 100644 --- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java +++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java @@ -2,8 +2,6 @@ package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** @@ -12,17 +10,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkEnvironmentUtils { public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - - public static StreamTableEnvironment getStreamTableEnv() { + static { streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); - - EnvironmentSettings settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() - .inStreamingMode() - .build(); - - return StreamTableEnvironment.create(streamExeEnv, settings); } - } diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java index 954a406..0f32683 100644 --- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java +++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java @@ -9,10 +9,14 @@ import java.util.Properties; public class KafkaUtils { private static Properties getKafkaSinkProperty(){ - Properties propertiesproducer = new Properties(); - propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); + properties.setProperty("security.protocol", "SASL_PLAINTEXT"); + properties.setProperty("sasl.mechanism", "PLAIN"); + properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); - return propertiesproducer; + + return properties; } public static FlinkKafkaProducer<String> getKafkaSink(String topic){ diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 82cea3e..9b19d77 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -8,11 +8,11 @@ stream.execution.job.name=DOS-DETECTION-APPLICATION kafka.input.parallelism=1 #输入kafka topic名 -kafka.input.topic.name=DOS-SKETCH-LOG +kafka.input.topic.name=DOS-SKETCH-RECORD #输入kafka地址 -#kafka.input.bootstrap.servers=192.168.44.12:9092 -kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 +kafka.input.bootstrap.servers=192.168.44.12:9092 +#kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #读取kafka group id kafka.input.group.id=2108231709 @@ -22,15 +22,15 @@ kafka.input.group.id=2108231709 kafka.output.metric.parallelism=1 #发送kafka metrics topic名 -#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG -kafka.output.metric.topic.name=test +kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS +#kafka.output.metric.topic.name=test #发送kafka event并行度大小 kafka.output.event.parallelism=1 #发送kafka event topic名 -#kafka.output.event.topic.name=DOS-EVENT-LOG -kafka.output.event.topic.name=test +kafka.output.event.topic.name=DOS-EVENT +#kafka.output.event.topic.name=test #kafka输出地址 kafka.output.bootstrap.servers=192.168.44.12:9092 @@ -118,4 +118,8 @@ http.pool.response.timeout=60000 static.threshold.schedule.minutes=10 #获取baseline周期,默认7天 -baseline.threshold.schedule.days=7
\ No newline at end of file +baseline.threshold.schedule.days=7 + +#kafka用户认证配置参数 +sasl.jaas.config.user=admin +sasl.jaas.config.password=galaxy2019
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/common/UdtfTest.java b/src/test/java/com/zdjizhi/common/UdtfTest.java deleted file mode 100644 index 479febe..0000000 --- a/src/test/java/com/zdjizhi/common/UdtfTest.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.zdjizhi.common; - -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.types.Row; - -public class UdtfTest extends TableFunction<Row> { - - public void eval(Row[] rows) { - for (Row row : rows) { - collect(row); - } - } - - public static void main(String[] args) { - - } -} |
