diff options
7 files changed, 305 insertions, 9 deletions
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/EtlProcessFunction.java b/dos-detection/src/main/java/com/zdjizhi/dos/EtlProcessFunction.java new file mode 100644 index 0000000..c2e56da --- /dev/null +++ b/dos-detection/src/main/java/com/zdjizhi/dos/EtlProcessFunction.java @@ -0,0 +1,97 @@ +package com.zdjizhi.dos; + +import com.zdjizhi.base.common.DosSketchLog; +import com.zdjizhi.dos.common.CommonConfig; +import com.zdjizhi.dos.common.DosMetricsLog; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; + +public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> { + + private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class); + private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0"; + private static final String EMPTY_SOURCE_IP_IPV6 = "::"; + private static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){}; + + @Override + public void process(Tuple2<String, String> keys, + Context context, Iterable<DosSketchLog> elements, + Collector<DosSketchLog> out) { + DosSketchLog middleResult = getMiddleResult(keys, elements); + try { + if (middleResult != null){ + out.collect(middleResult); + logger.debug("获取中间聚合结果:{}",middleResult.toString()); + context.output(outputTag,TrafficServerIpMetrics.getOutputMetric(middleResult)); + } + }catch (Exception e){ + logger.error("获取中间聚合结果失败,middleResult: {}\n{}",middleResult.toString(),e); + } + } + + private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){ + + DosSketchLog midResuleLog = new DosSketchLog(); + Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements); + try { + if (values != null){ + midResuleLog.setAttack_type(keys.f0); + midResuleLog.setDestination_ip(keys.f1); + midResuleLog.setSketch_start_time(values.f4); + midResuleLog.setSketch_duration(values.f5); + midResuleLog.setSource_ip(values.f3); + midResuleLog.setSketch_sessions(values.f0); + midResuleLog.setSketch_packets(values.f1); + midResuleLog.setSketch_bytes(values.f2); + return midResuleLog; + } + } catch (Exception e){ + logger.error("加载中间结果集失败,keys: {} values: {}\n{}",keys,values,e); + } + return null; + } + + private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){ + long sessions = 0; + long packets = 0 ; + long bytes = 0; + long startTime = System.currentTimeMillis()/1000; + long endTime = System.currentTimeMillis()/1000; + long duration = 0; + HashSet<String> sourceIpSet = new HashSet<>(); + try { + for (DosSketchLog newSketchLog : elements){ + String sourceIp = newSketchLog.getSource_ip(); + if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){ + sessions += newSketchLog.getSketch_sessions(); + packets += newSketchLog.getSketch_packets(); + bytes += newSketchLog.getSketch_bytes(); + startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time(); + endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime; + duration = endTime - startTime == 0 ? 5 : endTime - startTime; + }else { + if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){ + sourceIpSet.add(sourceIp); + } + } + } + String sourceIpList = StringUtils.join(sourceIpSet, ","); + return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME, + bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration); + }catch (Exception e){ + logger.error("聚合中间结果集失败 {}",e); + } + return null; + } + + +} diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/KeysSelector.java b/dos-detection/src/main/java/com/zdjizhi/dos/KeysSelector.java new file mode 100644 index 0000000..fcddd88 --- /dev/null +++ b/dos-detection/src/main/java/com/zdjizhi/dos/KeysSelector.java @@ -0,0 +1,16 @@ +package com.zdjizhi.dos; + +import com.zdjizhi.base.common.DosSketchLog; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +public class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>> { + + @Override + public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){ + return Tuple2.of( + dosSketchLog.getAttack_type(), + dosSketchLog.getDestination_ip()); + } + +} diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/TrafficServerIpMetrics.java b/dos-detection/src/main/java/com/zdjizhi/dos/TrafficServerIpMetrics.java new file mode 100644 index 0000000..a16480e --- /dev/null +++ b/dos-detection/src/main/java/com/zdjizhi/dos/TrafficServerIpMetrics.java @@ -0,0 +1,41 @@ +package com.zdjizhi.dos; + + +import com.zdjizhi.base.common.DosSketchLog; +import com.zdjizhi.dos.common.CommonConfig; +import com.zdjizhi.dos.common.DosMetricsLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +class TrafficServerIpMetrics { + + private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class); + + static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) { + DosMetricsLog dosMetricsLog = new DosMetricsLog(); + dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000)); + dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip()); + dosMetricsLog.setAttack_type(midResuleLog.getAttack_type()); + dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions()); + dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets()); + dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes()); + dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip())); + logger.debug("metric 结果已加载:{}",dosMetricsLog.toString()); + return dosMetricsLog; + } + + private static long timeFloor(long sketchStartTime){ + return sketchStartTime / CommonConfig.FLINK_WINDOW_MAX_TIME * CommonConfig.FLINK_WINDOW_MAX_TIME; + } + + private static int getPartitionNumByIp(String destinationIp){ + return Math.abs(destinationIp.hashCode()) % CommonConfig.DESTINATION_IP_PARTITION_NUM; + } + + public static void main(String[] args) { + System.out.println(getPartitionNumByIp("146.177.223.43")); + System.out.println("146.177.223.43".hashCode()); + } + +} diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/common/DosMetricsLog.java b/dos-detection/src/main/java/com/zdjizhi/dos/common/DosMetricsLog.java new file mode 100644 index 0000000..b66ff9f --- /dev/null +++ b/dos-detection/src/main/java/com/zdjizhi/dos/common/DosMetricsLog.java @@ -0,0 +1,107 @@ +package com.zdjizhi.dos.common; + +import java.io.Serializable; +import java.util.Objects; + +public class DosMetricsLog implements Serializable { + + private long sketch_start_time; + private String attack_type; + private String destination_ip; + private long session_rate; + private long packet_rate; + private long bit_rate; + private int partition_num; + + public int getPartition_num() { + return partition_num; + } + + public void setPartition_num(int partition_num) { + this.partition_num = partition_num; + } + + public long getSketch_start_time() { + return sketch_start_time; + } + + public void setSketch_start_time(long sketch_start_time) { + this.sketch_start_time = sketch_start_time; + } + + public String getAttack_type() { + return attack_type; + } + + public void setAttack_type(String attack_type) { + this.attack_type = attack_type; + } + + public String getDestination_ip() { + return destination_ip; + } + + public void setDestination_ip(String destination_ip) { + this.destination_ip = destination_ip; + } + + public long getSession_rate() { + return session_rate; + } + + public void setSession_rate(long session_rate) { + this.session_rate = session_rate; + } + + public long getPacket_rate() { + return packet_rate; + } + + public void setPacket_rate(long packet_rate) { + this.packet_rate = packet_rate; + } + + public long getBit_rate() { + return bit_rate; + } + + public void setBit_rate(long bit_rate) { + this.bit_rate = bit_rate; + } + + @Override + public String toString() { + return "DosMetricsLog{" + + "sketch_start_time=" + sketch_start_time + + ", attack_type='" + attack_type + '\'' + + ", destination_ip='" + destination_ip + '\'' + + ", session_rate=" + session_rate + + ", packet_rate=" + packet_rate + + ", bit_rate=" + bit_rate + + ", partition_num=" + partition_num + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DosMetricsLog)) { + return false; + } + DosMetricsLog that = (DosMetricsLog) o; + return getSketch_start_time() == that.getSketch_start_time() && + getSession_rate() == that.getSession_rate() && + getPacket_rate() == that.getPacket_rate() && + getBit_rate() == that.getBit_rate() && + getPartition_num() == that.getPartition_num() && + Objects.equals(getAttack_type(), that.getAttack_type()) && + Objects.equals(getDestination_ip(), that.getDestination_ip()); + } + + @Override + public int hashCode() { + return Objects.hash(getSketch_start_time(), getAttack_type(), getDestination_ip(), getSession_rate(), getPacket_rate(), getBit_rate(), getPartition_num()); + } +} diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java new file mode 100644 index 0000000..e9edb1b --- /dev/null +++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/DosMetricsSink.java @@ -0,0 +1,30 @@ +package com.zdjizhi.dos.sink; + +import com.zdjizhi.base.platform.Schedule; +import com.zdjizhi.base.utils.FlinkEnvironmentUtils; +import com.zdjizhi.dos.EtlProcessFunction; +import com.zdjizhi.dos.KeysSelector; +import com.zdjizhi.dos.common.CommonConfig; +import com.zdjizhi.etl.DosDetectionEtl; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.time.Duration; + +public class DosMetricsSink implements Schedule { + + @Override + public void schedule() throws Exception { + DosDetectionEtl.getSketchSource().assignTimestampsAndWatermarks( + FlinkEnvironmentUtils.createWatermarkStrategy( + Duration.ofSeconds(10), + (event, timestamp) -> event.getSketch_start_time() * 1000)) + .keyBy(new KeysSelector()) + .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) + .process(new EtlProcessFunction()) + .print() + .setParallelism(1); + } + + +} diff --git a/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java b/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java index be6ab38..943942f 100644 --- a/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java +++ b/platform-schedule/src/main/java/com/zdjizhi/schedule/Execute.java @@ -13,16 +13,20 @@ public class Execute { } private static void execute() throws Exception { - Properties propService = new Properties(); - propService.load(Execute.class.getClassLoader().getResourceAsStream("business.properties")); - if (!propService.isEmpty()){ - for (Object key : propService.keySet()) { - String className = propService.getProperty(key.toString()); - Class cls = Class.forName(className); - Schedule schedule = (Schedule) cls.newInstance(); - schedule.schedule(); + try { + Properties propService = new Properties(); + propService.load(Execute.class.getClassLoader().getResourceAsStream("business.properties")); + if (!propService.isEmpty()){ + for (Object key : propService.keySet()) { + String className = propService.getProperty(key.toString()); + Class cls = Class.forName(className); + Schedule schedule = (Schedule) cls.newInstance(); + schedule.schedule(); + } + FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME); } - FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME); + }catch (Exception e){ + System.exit(1); } } diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties index 51c892e..0e442fb 100644 --- a/platform-schedule/src/main/resources/business.properties +++ b/platform-schedule/src/main/resources/business.properties @@ -1,2 +1,3 @@ dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink +dos.detection.metric.class=com.zdjizhi.dos.sink.DosMetricsSink dos.sketch.etl.class=com.zdjizhi.etl.DosDetectionEtl
\ No newline at end of file |
