summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-12-30 14:13:46 +0800
committerwanglihui <[email protected]>2021-12-30 14:13:46 +0800
commit3f98dfac48ee129cab182fc08baa77e531cecc0e (patch)
treea9123b0427f09f47d625af345c4a8941eda72b09
parent5f24f3403436663c857e47880c4b211e720b07b1 (diff)
新增dos检测etl与metrics输出
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/EtlProcessFunction.java97
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/KeysSelector.java16
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/TrafficServerIpMetrics.java41
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/common/DosMetricsLog.java107
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java30
-rw-r--r--platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java22
-rw-r--r--platform-schedule/src/main/resources/business.properties1
7 files changed, 305 insertions, 9 deletions
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/EtlProcessFunction.java b/dos-detection/src/main/java/com/zdjizhi/dos/EtlProcessFunction.java
new file mode 100644
index 0000000..c2e56da
--- /dev/null
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/EtlProcessFunction.java
@@ -0,0 +1,97 @@
+package com.zdjizhi.dos;
+
+import com.zdjizhi.base.common.DosSketchLog;
+import com.zdjizhi.dos.common.CommonConfig;
+import com.zdjizhi.dos.common.DosMetricsLog;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+
+public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
+
+ private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
+ private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
+ private static final String EMPTY_SOURCE_IP_IPV6 = "::";
+ private static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
+
+ @Override
+ public void process(Tuple2<String, String> keys,
+ Context context, Iterable<DosSketchLog> elements,
+ Collector<DosSketchLog> out) {
+ DosSketchLog middleResult = getMiddleResult(keys, elements);
+ try {
+ if (middleResult != null){
+ out.collect(middleResult);
+ logger.debug("获取中间聚合结果:{}",middleResult.toString());
+ context.output(outputTag,TrafficServerIpMetrics.getOutputMetric(middleResult));
+ }
+ }catch (Exception e){
+ logger.error("获取中间聚合结果失败,middleResult: {}\n{}",middleResult.toString(),e);
+ }
+ }
+
+ private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){
+
+ DosSketchLog midResuleLog = new DosSketchLog();
+ Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
+ try {
+ if (values != null){
+ midResuleLog.setAttack_type(keys.f0);
+ midResuleLog.setDestination_ip(keys.f1);
+ midResuleLog.setSketch_start_time(values.f4);
+ midResuleLog.setSketch_duration(values.f5);
+ midResuleLog.setSource_ip(values.f3);
+ midResuleLog.setSketch_sessions(values.f0);
+ midResuleLog.setSketch_packets(values.f1);
+ midResuleLog.setSketch_bytes(values.f2);
+ return midResuleLog;
+ }
+ } catch (Exception e){
+ logger.error("加载中间结果集失败,keys: {} values: {}\n{}",keys,values,e);
+ }
+ return null;
+ }
+
+ private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
+ long sessions = 0;
+ long packets = 0 ;
+ long bytes = 0;
+ long startTime = System.currentTimeMillis()/1000;
+ long endTime = System.currentTimeMillis()/1000;
+ long duration = 0;
+ HashSet<String> sourceIpSet = new HashSet<>();
+ try {
+ for (DosSketchLog newSketchLog : elements){
+ String sourceIp = newSketchLog.getSource_ip();
+ if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){
+ sessions += newSketchLog.getSketch_sessions();
+ packets += newSketchLog.getSketch_packets();
+ bytes += newSketchLog.getSketch_bytes();
+ startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
+ endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
+ duration = endTime - startTime == 0 ? 5 : endTime - startTime;
+ }else {
+ if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){
+ sourceIpSet.add(sourceIp);
+ }
+ }
+ }
+ String sourceIpList = StringUtils.join(sourceIpSet, ",");
+ return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
+ bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
+ }catch (Exception e){
+ logger.error("聚合中间结果集失败 {}",e);
+ }
+ return null;
+ }
+
+
+}
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/KeysSelector.java b/dos-detection/src/main/java/com/zdjizhi/dos/KeysSelector.java
new file mode 100644
index 0000000..fcddd88
--- /dev/null
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/KeysSelector.java
@@ -0,0 +1,16 @@
+package com.zdjizhi.dos;
+
+import com.zdjizhi.base.common.DosSketchLog;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+public class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>> {
+
+ @Override
+ public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
+ return Tuple2.of(
+ dosSketchLog.getAttack_type(),
+ dosSketchLog.getDestination_ip());
+ }
+
+}
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/TrafficServerIpMetrics.java b/dos-detection/src/main/java/com/zdjizhi/dos/TrafficServerIpMetrics.java
new file mode 100644
index 0000000..a16480e
--- /dev/null
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/TrafficServerIpMetrics.java
@@ -0,0 +1,41 @@
+package com.zdjizhi.dos;
+
+
+import com.zdjizhi.base.common.DosSketchLog;
+import com.zdjizhi.dos.common.CommonConfig;
+import com.zdjizhi.dos.common.DosMetricsLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class TrafficServerIpMetrics {
+
+ private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class);
+
+ static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
+ DosMetricsLog dosMetricsLog = new DosMetricsLog();
+ dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000));
+ dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
+ dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
+ dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
+ dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
+ dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
+ dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
+ logger.debug("metric 结果已加载:{}",dosMetricsLog.toString());
+ return dosMetricsLog;
+ }
+
+ private static long timeFloor(long sketchStartTime){
+ return sketchStartTime / CommonConfig.FLINK_WINDOW_MAX_TIME * CommonConfig.FLINK_WINDOW_MAX_TIME;
+ }
+
+ private static int getPartitionNumByIp(String destinationIp){
+ return Math.abs(destinationIp.hashCode()) % CommonConfig.DESTINATION_IP_PARTITION_NUM;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(getPartitionNumByIp("146.177.223.43"));
+ System.out.println("146.177.223.43".hashCode());
+ }
+
+}
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/common/DosMetricsLog.java b/dos-detection/src/main/java/com/zdjizhi/dos/common/DosMetricsLog.java
new file mode 100644
index 0000000..b66ff9f
--- /dev/null
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/common/DosMetricsLog.java
@@ -0,0 +1,107 @@
+package com.zdjizhi.dos.common;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class DosMetricsLog implements Serializable {
+
+ private long sketch_start_time;
+ private String attack_type;
+ private String destination_ip;
+ private long session_rate;
+ private long packet_rate;
+ private long bit_rate;
+ private int partition_num;
+
+ public int getPartition_num() {
+ return partition_num;
+ }
+
+ public void setPartition_num(int partition_num) {
+ this.partition_num = partition_num;
+ }
+
+ public long getSketch_start_time() {
+ return sketch_start_time;
+ }
+
+ public void setSketch_start_time(long sketch_start_time) {
+ this.sketch_start_time = sketch_start_time;
+ }
+
+ public String getAttack_type() {
+ return attack_type;
+ }
+
+ public void setAttack_type(String attack_type) {
+ this.attack_type = attack_type;
+ }
+
+ public String getDestination_ip() {
+ return destination_ip;
+ }
+
+ public void setDestination_ip(String destination_ip) {
+ this.destination_ip = destination_ip;
+ }
+
+ public long getSession_rate() {
+ return session_rate;
+ }
+
+ public void setSession_rate(long session_rate) {
+ this.session_rate = session_rate;
+ }
+
+ public long getPacket_rate() {
+ return packet_rate;
+ }
+
+ public void setPacket_rate(long packet_rate) {
+ this.packet_rate = packet_rate;
+ }
+
+ public long getBit_rate() {
+ return bit_rate;
+ }
+
+ public void setBit_rate(long bit_rate) {
+ this.bit_rate = bit_rate;
+ }
+
+ @Override
+ public String toString() {
+ return "DosMetricsLog{" +
+ "sketch_start_time=" + sketch_start_time +
+ ", attack_type='" + attack_type + '\'' +
+ ", destination_ip='" + destination_ip + '\'' +
+ ", session_rate=" + session_rate +
+ ", packet_rate=" + packet_rate +
+ ", bit_rate=" + bit_rate +
+ ", partition_num=" + partition_num +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DosMetricsLog)) {
+ return false;
+ }
+ DosMetricsLog that = (DosMetricsLog) o;
+ return getSketch_start_time() == that.getSketch_start_time() &&
+ getSession_rate() == that.getSession_rate() &&
+ getPacket_rate() == that.getPacket_rate() &&
+ getBit_rate() == that.getBit_rate() &&
+ getPartition_num() == that.getPartition_num() &&
+ Objects.equals(getAttack_type(), that.getAttack_type()) &&
+ Objects.equals(getDestination_ip(), that.getDestination_ip());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getSketch_start_time(), getAttack_type(), getDestination_ip(), getSession_rate(), getPacket_rate(), getBit_rate(), getPartition_num());
+ }
+}
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java
new file mode 100644
index 0000000..e9edb1b
--- /dev/null
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java
@@ -0,0 +1,30 @@
+package com.zdjizhi.dos.sink;
+
+import com.zdjizhi.base.platform.Schedule;
+import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
+import com.zdjizhi.dos.EtlProcessFunction;
+import com.zdjizhi.dos.KeysSelector;
+import com.zdjizhi.dos.common.CommonConfig;
+import com.zdjizhi.etl.DosDetectionEtl;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.time.Duration;
+
+public class DosMetricsSink implements Schedule {
+
+ @Override
+ public void schedule() throws Exception {
+ DosDetectionEtl.getSketchSource().assignTimestampsAndWatermarks(
+ FlinkEnvironmentUtils.createWatermarkStrategy(
+ Duration.ofSeconds(10),
+ (event, timestamp) -> event.getSketch_start_time() * 1000))
+ .keyBy(new KeysSelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
+ .process(new EtlProcessFunction())
+ .print()
+ .setParallelism(1);
+ }
+
+
+}
diff --git a/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java b/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java
index be6ab38..943942f 100644
--- a/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java
+++ b/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java
@@ -13,16 +13,20 @@ public class Execute {
}
private static void execute() throws Exception {
- Properties propService = new Properties();
- propService.load(Execute.class.getClassLoader().getResourceAsStream("business.properties"));
- if (!propService.isEmpty()){
- for (Object key : propService.keySet()) {
- String className = propService.getProperty(key.toString());
- Class cls = Class.forName(className);
- Schedule schedule = (Schedule) cls.newInstance();
- schedule.schedule();
+ try {
+ Properties propService = new Properties();
+ propService.load(Execute.class.getClassLoader().getResourceAsStream("business.properties"));
+ if (!propService.isEmpty()){
+ for (Object key : propService.keySet()) {
+ String className = propService.getProperty(key.toString());
+ Class cls = Class.forName(className);
+ Schedule schedule = (Schedule) cls.newInstance();
+ schedule.schedule();
+ }
+ FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
}
- FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
+ }catch (Exception e){
+ System.exit(1);
}
}
diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties
index 51c892e..0e442fb 100644
--- a/platform-schedule/src/main/resources/business.properties
+++ b/platform-schedule/src/main/resources/business.properties
@@ -1,2 +1,3 @@
dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink
+dos.detection.metric.class=com.zdjizhi.dos.sink.DosMetricsSink
dos.sketch.etl.class=com.zdjizhi.etl.DosDetectionEtl \ No newline at end of file