summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java4
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java4
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java43
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/DosSketchEtl.java92
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java53
-rw-r--r--platform-schedule/src/main/resources/business.properties2
6 files changed, 97 insertions, 101 deletions
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java
index e9edb1b..0ddf8be 100644
--- a/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java
@@ -5,7 +5,7 @@ import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
import com.zdjizhi.dos.EtlProcessFunction;
import com.zdjizhi.dos.KeysSelector;
import com.zdjizhi.dos.common.CommonConfig;
-import com.zdjizhi.etl.DosDetectionEtl;
+import com.zdjizhi.etl.DosSketchEtl;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -15,7 +15,7 @@ public class DosMetricsSink implements Schedule {
@Override
public void schedule() throws Exception {
- DosDetectionEtl.getSketchSource().assignTimestampsAndWatermarks(
+ DosSketchEtl.getSketchSource().assignTimestampsAndWatermarks(
FlinkEnvironmentUtils.createWatermarkStrategy(
Duration.ofSeconds(10),
(event, timestamp) -> event.getSketch_start_time() * 1000))
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
index 5e24b98..b688463 100644
--- a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
@@ -2,7 +2,7 @@ package com.zdjizhi.dos.sink;
import com.zdjizhi.base.platform.Schedule;
import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
-import com.zdjizhi.etl.DosDetectionEtl;
+import com.zdjizhi.etl.DosSketchEtl;
import java.time.Duration;
@@ -13,7 +13,7 @@ public class OutputStreamSink implements Schedule {
@Override
public void schedule() throws Exception {
- DosDetectionEtl.getSketchSource().assignTimestampsAndWatermarks(
+ DosSketchEtl.getSketchSource().assignTimestampsAndWatermarks(
FlinkEnvironmentUtils.createWatermarkStrategy(
Duration.ofSeconds(10),
(event, timestamp) -> event.getSketch_start_time() * 1000))
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java
deleted file mode 100644
index 8253c92..0000000
--- a/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.zdjizhi.etl;
-
-import com.zdjizhi.base.common.CommonConfig;
-import com.zdjizhi.base.common.DosSketchLog;
-import com.zdjizhi.base.common.SourceConfig;
-import com.zdjizhi.base.platform.Schedule;
-import com.zdjizhi.base.source.Source;
-import com.zdjizhi.base.utils.KafkaUtils;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
-import java.util.Properties;
-
-public class DosDetectionEtl implements Schedule {
-
- public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
- return flatSketchSource();
- }
-
- private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
- SourceConfig sourceConfig = new SourceConfig(
- CommonConfig.KAFKA_INPUT_TOPIC_NAME,
- CommonConfig.KAFKA_INPUT_PARALLELISM,
- getKafkaSourceProperty());
- return Source.getSingleSource(sourceConfig).flatMap(new FlatSketchLog());
- }
-
- @Override
- public void schedule() throws Exception {
- getSketchSource().print();
- }
-
- private static Properties getKafkaSourceProperty() {
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
- properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
- if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
- KafkaUtils.updateSasl(properties);
- }
- return properties;
- }
-
-
-}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/DosSketchEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/DosSketchEtl.java
new file mode 100644
index 0000000..1460017
--- /dev/null
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/DosSketchEtl.java
@@ -0,0 +1,92 @@
+package com.zdjizhi.etl;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.zdjizhi.base.common.CommonConfig;
+import com.zdjizhi.base.common.DosSketchLog;
+import com.zdjizhi.base.common.SourceConfig;
+import com.zdjizhi.base.platform.Schedule;
+import com.zdjizhi.base.source.Source;
+import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Properties;
+
+public class DosSketchEtl implements Schedule {
+
+ public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
+ return flatSketchSource();
+ }
+
+ private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
+ SourceConfig sourceConfig = new SourceConfig(
+ CommonConfig.KAFKA_INPUT_TOPIC_NAME,
+ CommonConfig.KAFKA_INPUT_PARALLELISM,
+ getKafkaSourceProperty());
+ return Source.getSingleSource(sourceConfig).flatMap(new FlatSketchLog());
+ }
+
+ @Override
+ public void schedule() throws Exception {
+ getSketchSource().print();
+ }
+
+ private static Properties getKafkaSourceProperty() {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
+ properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
+ if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
+ KafkaUtils.updateSasl(properties);
+ }
+ return properties;
+ }
+
+ static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
+ private static Logger logger = LoggerFactory.getLogger(DosSketchEtl.class);
+ private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
+ private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
+ private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
+
+ @Override
+ public void flatMap(String s, Collector<DosSketchLog> collector) {
+ try {
+ if (StringUtil.isNotBlank(s)){
+ HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
+ long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
+ long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
+ String attackType = sketchSource.get("attack_type").toString();
+ ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
+ for (HashMap<String, Object> obj : reportIpList) {
+ DosSketchLog dosSketchLog = new DosSketchLog();
+ dosSketchLog.setSketch_start_time(sketchStartTime);
+ dosSketchLog.setSketch_duration(sketchDuration);
+ dosSketchLog.setAttack_type(attackType);
+ String sourceIp = obj.get("source_ip").toString();
+ String destinationIp = obj.get("destination_ip").toString();
+ long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
+ long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
+ long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
+ dosSketchLog.setSource_ip(sourceIp);
+ dosSketchLog.setDestination_ip(destinationIp);
+ dosSketchLog.setSketch_sessions(sketchSessions);
+ dosSketchLog.setSketch_packets(sketchPackets);
+ dosSketchLog.setSketch_bytes(sketchBytes);
+ collector.collect(dosSketchLog);
+ logger.debug("数据解析成功:{}",dosSketchLog.toString());
+ }
+ }
+ } catch (Exception e) {
+ logger.error("数据解析错误:{} \n{}",s,e);
+ }
+ }
+ }
+
+
+}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java b/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java
deleted file mode 100644
index 9420df4..0000000
--- a/platform-etl/src/main/java/com/zdjizhi/etl/FlatSketchLog.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.zdjizhi.etl;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.zdjizhi.base.common.DosSketchLog;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-public class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
- private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class);
- private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
- private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
- private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
-
- @Override
- public void flatMap(String s, Collector<DosSketchLog> collector) {
- try {
- if (StringUtil.isNotBlank(s)){
- HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
- long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
- long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
- String attackType = sketchSource.get("attack_type").toString();
- ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
- for (HashMap<String, Object> obj : reportIpList) {
- DosSketchLog dosSketchLog = new DosSketchLog();
- dosSketchLog.setSketch_start_time(sketchStartTime);
- dosSketchLog.setSketch_duration(sketchDuration);
- dosSketchLog.setAttack_type(attackType);
- String sourceIp = obj.get("source_ip").toString();
- String destinationIp = obj.get("destination_ip").toString();
- long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
- long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
- long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
- dosSketchLog.setSource_ip(sourceIp);
- dosSketchLog.setDestination_ip(destinationIp);
- dosSketchLog.setSketch_sessions(sketchSessions);
- dosSketchLog.setSketch_packets(sketchPackets);
- dosSketchLog.setSketch_bytes(sketchBytes);
- collector.collect(dosSketchLog);
- logger.debug("数据解析成功:{}",dosSketchLog.toString());
- }
- }
- } catch (Exception e) {
- logger.error("数据解析错误:{} \n{}",s,e);
- }
- }
-}
diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties
index 0e442fb..7a52c77 100644
--- a/platform-schedule/src/main/resources/business.properties
+++ b/platform-schedule/src/main/resources/business.properties
@@ -1,3 +1,3 @@
dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink
dos.detection.metric.class=com.zdjizhi.dos.sink.DosMetricsSink
-dos.sketch.etl.class=com.zdjizhi.etl.DosDetectionEtl \ No newline at end of file
+dos.sketch.etl.class=com.zdjizhi.etl.DosSketchEtl \ No newline at end of file