summaryrefslogtreecommitdiff
path: root/platform-base
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-12-28 18:33:40 +0800
committerwanglihui <[email protected]>2021-12-28 18:33:40 +0800
commitb31906b71748a1eafdbfe73731ac0119b2cf7660 (patch)
tree72c94aefc94a0f39fbc1bd79fca1d22a726ef2e1 /platform-base
parent2cb7dcba4bcd41355b8be53c13a493d8371a64fc (diff)
抽取etl module
Diffstat (limited to 'platform-base')
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java76
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/source/Source.java19
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java6
3 files changed, 21 insertions, 80 deletions
diff --git a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java b/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java
deleted file mode 100644
index 69dbd45..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.zdjizhi.base.etl;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.zdjizhi.base.common.DosSketchLog;
-import com.zdjizhi.base.source.Source;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-public class DosDetectionEtl {
-
- private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class);
- private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
- private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
- private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
-
-
- public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
- return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
- }
-
- private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
- return Source.createSource().flatMap(new FlatSketchLog());
- }
-
- private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
- return WatermarkStrategy
- .<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(10))
- .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
- }
-
- private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
- @Override
- public void flatMap(String s, Collector<DosSketchLog> collector) {
- try {
- if (StringUtil.isNotBlank(s)){
- HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
- long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
- long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
- String attackType = sketchSource.get("attack_type").toString();
- ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
- for (HashMap<String, Object> obj : reportIpList) {
- DosSketchLog dosSketchLog = new DosSketchLog();
- dosSketchLog.setSketch_start_time(sketchStartTime);
- dosSketchLog.setSketch_duration(sketchDuration);
- dosSketchLog.setAttack_type(attackType);
- String sourceIp = obj.get("source_ip").toString();
- String destinationIp = obj.get("destination_ip").toString();
- long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
- long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
- long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
- dosSketchLog.setSource_ip(sourceIp);
- dosSketchLog.setDestination_ip(destinationIp);
- dosSketchLog.setSketch_sessions(sketchSessions);
- dosSketchLog.setSketch_packets(sketchPackets);
- dosSketchLog.setSketch_bytes(sketchBytes);
- collector.collect(dosSketchLog);
- logger.debug("数据解析成功:{}",dosSketchLog.toString());
- }
- }
- } catch (Exception e) {
- logger.error("数据解析错误:{} \n{}",s,e);
- }
- }
- }
-
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
index 643909f..9182517 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
@@ -6,13 +6,26 @@ import com.zdjizhi.base.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import java.util.Properties;
+
public class Source {
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
- public static DataStreamSource<String> createSource(){
+ public static DataStreamSource<String> createSource(String topic){
+
+ return createSource(topic,CommonConfig.KAFKA_INPUT_PARALLELISM);
+ }
+
+ public static DataStreamSource<String> createSource(String topic,int parallelism){
+
+ return streamExeEnv.addSource(KafkaUtils.createSource(topic))
+ .setParallelism(parallelism);
+ }
+
+ public static DataStreamSource<String> createSource(String topic, int parallelism, Properties properties){
- return streamExeEnv.addSource(KafkaUtils.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME))
- .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM);
+ return streamExeEnv.addSource(KafkaUtils.createSource(topic,properties))
+ .setParallelism(parallelism);
}
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
index fa4b2f0..b7173ef 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
@@ -43,10 +43,14 @@ public class KafkaUtils {
}
public static FlinkKafkaConsumer<String> createSource(String topic){
+ return createSource(topic,getKafkaSourceProperty());
+ }
+
+ public static FlinkKafkaConsumer<String> createSource(String topic,Properties properties){
return new FlinkKafkaConsumer<String>(
topic,
new SimpleStringSchema(),
- getKafkaSourceProperty());
+ properties);
}
}