summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-12-29 16:29:37 +0800
committerwanglihui <[email protected]>2021-12-29 16:29:37 +0800
commit5f24f3403436663c857e47880c4b211e720b07b1 (patch)
tree4a875c7007979f87864c91e9473ce70579749b7c
parentb31906b71748a1eafdbfe73731ac0119b2cf7660 (diff)
修复获取单例数据源失败导致无法分流bug
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java9
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/common/SourceConfig.java42
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/source/Source.java36
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java11
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java23
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java78
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java53
-rw-r--r--platform-schedule/src/main/resources/business.properties3
8 files changed, 167 insertions, 88 deletions
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
index d9d13b3..5e24b98 100644
--- a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
@@ -1,8 +1,11 @@
package com.zdjizhi.dos.sink;
import com.zdjizhi.base.platform.Schedule;
+import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
import com.zdjizhi.etl.DosDetectionEtl;
+import java.time.Duration;
+
/**
* @author 94976
*/
@@ -10,6 +13,10 @@ public class OutputStreamSink implements Schedule {
@Override
public void schedule() throws Exception {
- DosDetectionEtl.getSketchSource().print();
+ DosDetectionEtl.getSketchSource().assignTimestampsAndWatermarks(
+ FlinkEnvironmentUtils.createWatermarkStrategy(
+ Duration.ofSeconds(10),
+ (event, timestamp) -> event.getSketch_start_time() * 1000))
+ .print();
}
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/SourceConfig.java b/platform-base/src/main/java/com/zdjizhi/base/common/SourceConfig.java
new file mode 100644
index 0000000..fdf54c4
--- /dev/null
+++ b/platform-base/src/main/java/com/zdjizhi/base/common/SourceConfig.java
@@ -0,0 +1,42 @@
+package com.zdjizhi.base.common;
+
+import java.util.Properties;
+
+public class SourceConfig {
+
+ private String topic;
+ private int parallelism;
+ private Properties properties;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ private SourceConfig(){}
+
+ public SourceConfig(String topic, int parallelism, Properties properties) {
+ this.topic = topic;
+ this.parallelism = parallelism;
+ this.properties = properties;
+ }
+}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
index 9182517..a658927 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
@@ -1,31 +1,43 @@
package com.zdjizhi.base.source;
-import com.zdjizhi.base.common.CommonConfig;
+import com.zdjizhi.base.common.SourceConfig;
import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
import com.zdjizhi.base.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import java.util.Properties;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class Source {
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
+ private static Map<String, DataStreamSource<String>> sourceHashMap = new ConcurrentHashMap<>();
- public static DataStreamSource<String> createSource(String topic){
-
- return createSource(topic,CommonConfig.KAFKA_INPUT_PARALLELISM);
+ public static DataStreamSource<String> getSingleSource(SourceConfig sourceConfig) {
+ return createSingleSource(sourceConfig);
}
- public static DataStreamSource<String> createSource(String topic,int parallelism){
-
- return streamExeEnv.addSource(KafkaUtils.createSource(topic))
- .setParallelism(parallelism);
+ private static DataStreamSource<String> createSource(SourceConfig sourceConfig) {
+ return streamExeEnv.addSource(KafkaUtils.createSource(sourceConfig.getTopic(), sourceConfig.getProperties()))
+ .setParallelism(sourceConfig.getParallelism());
}
- public static DataStreamSource<String> createSource(String topic, int parallelism, Properties properties){
+ private synchronized static DataStreamSource<String> createSingleSource(SourceConfig sourceConfig) {
+ if (checkConfigNotNull(sourceConfig)) {
+ if (sourceHashMap.containsKey(sourceConfig.getTopic())) {
+ return sourceHashMap.get(sourceConfig.getTopic());
+ }else {
+ DataStreamSource<String> streamSource = createSource(sourceConfig);
+ sourceHashMap.put(sourceConfig.getTopic(),streamSource);
+ return streamSource;
+ }
+ } else {
+ throw new IllegalArgumentException("SourceConfig is empty");
+ }
+ }
- return streamExeEnv.addSource(KafkaUtils.createSource(topic,properties))
- .setParallelism(parallelism);
+ private static boolean checkConfigNotNull(SourceConfig sourceConfig) {
+ return sourceConfig != null && sourceConfig.getTopic() != null && !sourceConfig.getProperties().isEmpty();
}
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java
index d15d81e..4344613 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java
@@ -1,8 +1,12 @@
package com.zdjizhi.base.utils;
import com.zdjizhi.base.common.CommonConfig;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import java.time.Duration;
+
/**
* @author wlh
@@ -14,4 +18,11 @@ public class FlinkEnvironmentUtils {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
}
+ public static <T> WatermarkStrategy<T> createWatermarkStrategy(Duration maxOutOfOrderness, SerializableTimestampAssigner<T> timestampAssigner){
+ return WatermarkStrategy
+ .<T>forBoundedOutOfOrderness(maxOutOfOrderness)
+ .withTimestampAssigner(timestampAssigner);
+ }
+
+
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
index b7173ef..9fbe1be 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
@@ -17,20 +17,11 @@ public class KafkaUtils {
return properties;
}
- private static Properties getKafkaSourceProperty() {
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
- properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
- updateSasl(properties);
- return properties;
- }
+ public static void updateSasl(Properties properties) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + CommonConfig.SASL_JAAS_CONFIG_USER + "\" password=\"" + CommonConfig.SASL_JAAS_CONFIG_PASSWORD + "\";");
- private static void updateSasl(Properties properties) {
- if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1) {
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + CommonConfig.SASL_JAAS_CONFIG_USER + "\" password=\"" + CommonConfig.SASL_JAAS_CONFIG_PASSWORD + "\";");
- }
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
@@ -42,11 +33,7 @@ public class KafkaUtils {
);
}
- public static FlinkKafkaConsumer<String> createSource(String topic){
- return createSource(topic,getKafkaSourceProperty());
- }
-
- public static FlinkKafkaConsumer<String> createSource(String topic,Properties properties){
+ public static FlinkKafkaConsumer<String> createSource(String topic, Properties properties) {
return new FlinkKafkaConsumer<String>(
topic,
new SimpleStringSchema(),
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java
index 5c5524b..8253c92 100644
--- a/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java
@@ -1,77 +1,43 @@
package com.zdjizhi.etl;
-import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.base.common.CommonConfig;
import com.zdjizhi.base.common.DosSketchLog;
+import com.zdjizhi.base.common.SourceConfig;
+import com.zdjizhi.base.platform.Schedule;
import com.zdjizhi.base.source.Source;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.FlatMapFunction;
+import com.zdjizhi.base.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-public class DosDetectionEtl {
-
- private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class);
- private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
- private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
- private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
+import java.util.Properties;
+public class DosDetectionEtl implements Schedule {
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
- return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
+ return flatSketchSource();
}
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
- return Source.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME).flatMap(new FlatSketchLog());
+ SourceConfig sourceConfig = new SourceConfig(
+ CommonConfig.KAFKA_INPUT_TOPIC_NAME,
+ CommonConfig.KAFKA_INPUT_PARALLELISM,
+ getKafkaSourceProperty());
+ return Source.getSingleSource(sourceConfig).flatMap(new FlatSketchLog());
}
- private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
- return WatermarkStrategy
- .<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(10))
- .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
+ @Override
+ public void schedule() throws Exception {
+ getSketchSource().print();
}
- private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
- @Override
- public void flatMap(String s, Collector<DosSketchLog> collector) {
- try {
- if (StringUtil.isNotBlank(s)){
- HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
- long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
- long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
- String attackType = sketchSource.get("attack_type").toString();
- ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
- for (HashMap<String, Object> obj : reportIpList) {
- DosSketchLog dosSketchLog = new DosSketchLog();
- dosSketchLog.setSketch_start_time(sketchStartTime);
- dosSketchLog.setSketch_duration(sketchDuration);
- dosSketchLog.setAttack_type(attackType);
- String sourceIp = obj.get("source_ip").toString();
- String destinationIp = obj.get("destination_ip").toString();
- long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
- long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
- long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
- dosSketchLog.setSource_ip(sourceIp);
- dosSketchLog.setDestination_ip(destinationIp);
- dosSketchLog.setSketch_sessions(sketchSessions);
- dosSketchLog.setSketch_packets(sketchPackets);
- dosSketchLog.setSketch_bytes(sketchBytes);
- collector.collect(dosSketchLog);
- logger.debug("数据解析成功:{}",dosSketchLog.toString());
- }
- }
- } catch (Exception e) {
- logger.error("数据解析错误:{} \n{}",s,e);
- }
+ private static Properties getKafkaSourceProperty() {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
+ properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
+ if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
+ KafkaUtils.updateSasl(properties);
}
+ return properties;
}
+
}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java b/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java
new file mode 100644
index 0000000..9420df4
--- /dev/null
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java
@@ -0,0 +1,53 @@
+package com.zdjizhi.etl;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.zdjizhi.base.common.DosSketchLog;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
+ private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class);
+ private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
+ private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
+ private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
+
+ @Override
+ public void flatMap(String s, Collector<DosSketchLog> collector) {
+ try {
+ if (StringUtil.isNotBlank(s)){
+ HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
+ long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
+ long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
+ String attackType = sketchSource.get("attack_type").toString();
+ ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
+ for (HashMap<String, Object> obj : reportIpList) {
+ DosSketchLog dosSketchLog = new DosSketchLog();
+ dosSketchLog.setSketch_start_time(sketchStartTime);
+ dosSketchLog.setSketch_duration(sketchDuration);
+ dosSketchLog.setAttack_type(attackType);
+ String sourceIp = obj.get("source_ip").toString();
+ String destinationIp = obj.get("destination_ip").toString();
+ long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
+ long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
+ long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
+ dosSketchLog.setSource_ip(sourceIp);
+ dosSketchLog.setDestination_ip(destinationIp);
+ dosSketchLog.setSketch_sessions(sketchSessions);
+ dosSketchLog.setSketch_packets(sketchPackets);
+ dosSketchLog.setSketch_bytes(sketchBytes);
+ collector.collect(dosSketchLog);
+ logger.debug("数据解析成功:{}",dosSketchLog.toString());
+ }
+ }
+ } catch (Exception e) {
+ logger.error("数据解析错误:{} \n{}",s,e);
+ }
+ }
+}
diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties
index c69e1d4..51c892e 100644
--- a/platform-schedule/src/main/resources/business.properties
+++ b/platform-schedule/src/main/resources/business.properties
@@ -1 +1,2 @@
-dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink \ No newline at end of file
+dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink
+dos.sketch.etl.class=com.zdjizhi.etl.DosDetectionEtl \ No newline at end of file