diff options
| author | wanglihui <[email protected]> | 2021-12-29 16:29:37 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-12-29 16:29:37 +0800 |
| commit | 5f24f3403436663c857e47880c4b211e720b07b1 (patch) | |
| tree | 4a875c7007979f87864c91e9473ce70579749b7c | |
| parent | b31906b71748a1eafdbfe73731ac0119b2cf7660 (diff) | |
修复获取单例数据源失败导致无法分流bug
8 files changed, 167 insertions, 88 deletions
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java index d9d13b3..5e24b98 100644 --- a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java +++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java @@ -1,8 +1,11 @@ package com.zdjizhi.dos.sink; import com.zdjizhi.base.platform.Schedule; +import com.zdjizhi.base.utils.FlinkEnvironmentUtils; import com.zdjizhi.etl.DosDetectionEtl; +import java.time.Duration; + /** * @author 94976 */ @@ -10,6 +13,10 @@ public class OutputStreamSink implements Schedule { @Override public void schedule() throws Exception { - DosDetectionEtl.getSketchSource().print(); + DosDetectionEtl.getSketchSource().assignTimestampsAndWatermarks( + FlinkEnvironmentUtils.createWatermarkStrategy( + Duration.ofSeconds(10), + (event, timestamp) -> event.getSketch_start_time() * 1000)) + .print(); } } diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/SourceConfig.java b/platform-base/src/main/java/com/zdjizhi/base/common/SourceConfig.java new file mode 100644 index 0000000..fdf54c4 --- /dev/null +++ b/platform-base/src/main/java/com/zdjizhi/base/common/SourceConfig.java @@ -0,0 +1,42 @@ +package com.zdjizhi.base.common; + +import java.util.Properties; + +public class SourceConfig { + + private String topic; + private int parallelism; + private Properties properties; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + private SourceConfig(){} + + public SourceConfig(String topic, int parallelism, Properties properties) { + this.topic = topic; + this.parallelism = parallelism; + this.properties = properties; + } +} diff --git a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java index 9182517..a658927 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java +++ b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java @@ -1,31 +1,43 @@ package com.zdjizhi.base.source; -import com.zdjizhi.base.common.CommonConfig; +import com.zdjizhi.base.common.SourceConfig; import com.zdjizhi.base.utils.FlinkEnvironmentUtils; import com.zdjizhi.base.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import java.util.Properties; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class Source { private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; + private static Map<String, DataStreamSource<String>> sourceHashMap = new ConcurrentHashMap<>(); - public static DataStreamSource<String> createSource(String topic){ - - return createSource(topic,CommonConfig.KAFKA_INPUT_PARALLELISM); + public static DataStreamSource<String> getSingleSource(SourceConfig sourceConfig) { + return createSingleSource(sourceConfig); } - public static DataStreamSource<String> createSource(String topic,int parallelism){ - - return streamExeEnv.addSource(KafkaUtils.createSource(topic)) - .setParallelism(parallelism); + private static DataStreamSource<String> createSource(SourceConfig sourceConfig) { + return streamExeEnv.addSource(KafkaUtils.createSource(sourceConfig.getTopic(), sourceConfig.getProperties())) + .setParallelism(sourceConfig.getParallelism()); } - public static DataStreamSource<String> createSource(String topic, int parallelism, Properties properties){ + private synchronized static DataStreamSource<String> createSingleSource(SourceConfig sourceConfig) { + if (checkConfigNotNull(sourceConfig)) { + if (sourceHashMap.containsKey(sourceConfig.getTopic())) { + return sourceHashMap.get(sourceConfig.getTopic()); + }else { + DataStreamSource<String> streamSource = createSource(sourceConfig); + sourceHashMap.put(sourceConfig.getTopic(),streamSource); + return streamSource; + } + } else { + throw new IllegalArgumentException("SourceConfig is empty"); + } + } - return streamExeEnv.addSource(KafkaUtils.createSource(topic,properties)) - .setParallelism(parallelism); + private static boolean checkConfigNotNull(SourceConfig sourceConfig) { + return sourceConfig != null && sourceConfig.getTopic() != null && !sourceConfig.getProperties().isEmpty(); } } diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java index d15d81e..4344613 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java +++ b/platform-base/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java @@ -1,8 +1,12 @@ package com.zdjizhi.base.utils; import com.zdjizhi.base.common.CommonConfig; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.time.Duration; + /** * @author wlh @@ -14,4 +18,11 @@ public class FlinkEnvironmentUtils { streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); } + public static <T> WatermarkStrategy<T> createWatermarkStrategy(Duration maxOutOfOrderness, SerializableTimestampAssigner<T> timestampAssigner){ + return WatermarkStrategy + .<T>forBoundedOutOfOrderness(maxOutOfOrderness) + .withTimestampAssigner(timestampAssigner); + } + + } diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java index b7173ef..9fbe1be 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java +++ b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java @@ -17,20 +17,11 @@ public class KafkaUtils { return properties; } - private static Properties getKafkaSourceProperty() { - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); - properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); - updateSasl(properties); - return properties; - } + public static void updateSasl(Properties properties) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + CommonConfig.SASL_JAAS_CONFIG_USER + "\" password=\"" + CommonConfig.SASL_JAAS_CONFIG_PASSWORD + "\";"); - private static void updateSasl(Properties properties) { - if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1) { - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + CommonConfig.SASL_JAAS_CONFIG_USER + "\" password=\"" + CommonConfig.SASL_JAAS_CONFIG_PASSWORD + "\";"); - } } public static FlinkKafkaProducer<String> getKafkaSink(String topic) { @@ -42,11 +33,7 @@ public class KafkaUtils { ); } - public static FlinkKafkaConsumer<String> createSource(String topic){ - return createSource(topic,getKafkaSourceProperty()); - } - - public static FlinkKafkaConsumer<String> createSource(String topic,Properties properties){ + public static FlinkKafkaConsumer<String> createSource(String topic, Properties properties) { return new FlinkKafkaConsumer<String>( topic, new SimpleStringSchema(), diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java index 5c5524b..8253c92 100644 --- a/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java +++ b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java @@ -1,77 +1,43 @@ package com.zdjizhi.etl; -import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.base.common.CommonConfig; import com.zdjizhi.base.common.DosSketchLog; +import com.zdjizhi.base.common.SourceConfig; +import com.zdjizhi.base.platform.Schedule; import com.zdjizhi.base.source.Source; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FlatMapFunction; +import com.zdjizhi.base.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; - -public class DosDetectionEtl { - - private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class); - private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); - private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); - private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); +import java.util.Properties; +public class DosDetectionEtl implements Schedule { public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){ - return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); + return flatSketchSource(); } private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){ - return Source.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME).flatMap(new FlatSketchLog()); + SourceConfig sourceConfig = new SourceConfig( + CommonConfig.KAFKA_INPUT_TOPIC_NAME, + CommonConfig.KAFKA_INPUT_PARALLELISM, + getKafkaSourceProperty()); + return Source.getSingleSource(sourceConfig).flatMap(new FlatSketchLog()); } - private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){ - return WatermarkStrategy - .<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(10)) - .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); + @Override + public void schedule() throws Exception { + getSketchSource().print(); } - private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> { - @Override - public void flatMap(String s, Collector<DosSketchLog> collector) { - try { - if (StringUtil.isNotBlank(s)){ - HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType); - long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); - long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); - String attackType = sketchSource.get("attack_type").toString(); - ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); - for (HashMap<String, Object> obj : reportIpList) { - DosSketchLog dosSketchLog = new DosSketchLog(); - dosSketchLog.setSketch_start_time(sketchStartTime); - dosSketchLog.setSketch_duration(sketchDuration); - dosSketchLog.setAttack_type(attackType); - String sourceIp = obj.get("source_ip").toString(); - String destinationIp = obj.get("destination_ip").toString(); - long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); - long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); - long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); - dosSketchLog.setSource_ip(sourceIp); - dosSketchLog.setDestination_ip(destinationIp); - dosSketchLog.setSketch_sessions(sketchSessions); - dosSketchLog.setSketch_packets(sketchPackets); - dosSketchLog.setSketch_bytes(sketchBytes); - collector.collect(dosSketchLog); - logger.debug("数据解析成功:{}",dosSketchLog.toString()); - } - } - } catch (Exception e) { - logger.error("数据解析错误:{} \n{}",s,e); - } + private static Properties getKafkaSourceProperty() { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); + properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); + if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){ + KafkaUtils.updateSasl(properties); } + return properties; } + } diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java b/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java new file mode 100644 index 0000000..9420df4 --- /dev/null +++ b/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java @@ -0,0 +1,53 @@ +package com.zdjizhi.etl; + +import com.fasterxml.jackson.databind.JavaType; +import com.zdjizhi.base.common.DosSketchLog; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; + +public class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> { + private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class); + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); + private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); + + @Override + public void flatMap(String s, Collector<DosSketchLog> collector) { + try { + if (StringUtil.isNotBlank(s)){ + HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType); + long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); + long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); + String attackType = sketchSource.get("attack_type").toString(); + ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); + for (HashMap<String, Object> obj : reportIpList) { + DosSketchLog dosSketchLog = new DosSketchLog(); + dosSketchLog.setSketch_start_time(sketchStartTime); + dosSketchLog.setSketch_duration(sketchDuration); + dosSketchLog.setAttack_type(attackType); + String sourceIp = obj.get("source_ip").toString(); + String destinationIp = obj.get("destination_ip").toString(); + long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); + long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); + long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); + dosSketchLog.setSource_ip(sourceIp); + dosSketchLog.setDestination_ip(destinationIp); + dosSketchLog.setSketch_sessions(sketchSessions); + dosSketchLog.setSketch_packets(sketchPackets); + dosSketchLog.setSketch_bytes(sketchBytes); + collector.collect(dosSketchLog); + logger.debug("数据解析成功:{}",dosSketchLog.toString()); + } + } + } catch (Exception e) { + logger.error("数据解析错误:{} \n{}",s,e); + } + } +} diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties index c69e1d4..51c892e 100644 --- a/platform-schedule/src/main/resources/business.properties +++ b/platform-schedule/src/main/resources/business.properties @@ -1 +1,2 @@ -dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink
\ No newline at end of file +dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink +dos.sketch.etl.class=com.zdjizhi.etl.DosDetectionEtl
\ No newline at end of file |
