diff options
| author | wanglihui <[email protected]> | 2021-12-28 18:33:40 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-12-28 18:33:40 +0800 |
| commit | b31906b71748a1eafdbfe73731ac0119b2cf7660 (patch) | |
| tree | 72c94aefc94a0f39fbc1bd79fca1d22a726ef2e1 /platform-base | |
| parent | 2cb7dcba4bcd41355b8be53c13a493d8371a64fc (diff) | |
抽取etl module
Diffstat (limited to 'platform-base')
3 files changed, 21 insertions, 80 deletions
diff --git a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java b/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java deleted file mode 100644 index 69dbd45..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.zdjizhi.base.etl; - -import com.fasterxml.jackson.databind.JavaType; -import com.zdjizhi.base.common.DosSketchLog; -import com.zdjizhi.base.source.Source; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; - -public class DosDetectionEtl { - - private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class); - private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); - private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); - private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); - - - public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){ - return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); - } - - private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){ - return Source.createSource().flatMap(new FlatSketchLog()); - } - - private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){ - return WatermarkStrategy - .<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(10)) - .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); - } - - private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> { - @Override - public void flatMap(String s, Collector<DosSketchLog> collector) { - try { - if (StringUtil.isNotBlank(s)){ - HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType); - long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); - long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); - String attackType = sketchSource.get("attack_type").toString(); - ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); - for (HashMap<String, Object> obj : reportIpList) { - DosSketchLog dosSketchLog = new DosSketchLog(); - dosSketchLog.setSketch_start_time(sketchStartTime); - dosSketchLog.setSketch_duration(sketchDuration); - dosSketchLog.setAttack_type(attackType); - String sourceIp = obj.get("source_ip").toString(); - String destinationIp = obj.get("destination_ip").toString(); - long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); - long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); - long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); - dosSketchLog.setSource_ip(sourceIp); - dosSketchLog.setDestination_ip(destinationIp); - dosSketchLog.setSketch_sessions(sketchSessions); - dosSketchLog.setSketch_packets(sketchPackets); - dosSketchLog.setSketch_bytes(sketchBytes); - collector.collect(dosSketchLog); - logger.debug("数据解析成功:{}",dosSketchLog.toString()); - } - } - } catch (Exception e) { - logger.error("数据解析错误:{} \n{}",s,e); - } - } - } - -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java index 643909f..9182517 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java +++ b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java @@ -6,13 +6,26 @@ import com.zdjizhi.base.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.util.Properties; + public class Source { private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; - public static DataStreamSource<String> createSource(){ + public static DataStreamSource<String> createSource(String topic){ + + return createSource(topic,CommonConfig.KAFKA_INPUT_PARALLELISM); + } + + public static DataStreamSource<String> createSource(String topic,int parallelism){ + + return streamExeEnv.addSource(KafkaUtils.createSource(topic)) + .setParallelism(parallelism); + } + + public static DataStreamSource<String> createSource(String topic, int parallelism, Properties properties){ - return streamExeEnv.addSource(KafkaUtils.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME)) - .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM); + return streamExeEnv.addSource(KafkaUtils.createSource(topic,properties)) + .setParallelism(parallelism); } } diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java index fa4b2f0..b7173ef 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java +++ b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java @@ -43,10 +43,14 @@ public class KafkaUtils { } public static FlinkKafkaConsumer<String> createSource(String topic){ + return createSource(topic,getKafkaSourceProperty()); + } + + public static FlinkKafkaConsumer<String> createSource(String topic,Properties properties){ return new FlinkKafkaConsumer<String>( topic, new SimpleStringSchema(), - getKafkaSourceProperty()); + properties); } } |
