diff options
| author | wanglihui <[email protected]> | 2021-08-04 16:30:13 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-08-04 16:30:13 +0800 |
| commit | 7077c91d46109775f3635b9800f08162f27abb6a (patch) | |
| tree | fc0f6cf86380f9473c40182c6e38651ff0875658 | |
| parent | 896e23db7df59b0ad20407d4facd25274afce6d1 (diff) | |
修改IP定位库信息,增加异常检测信息。dos-detection-broadcast
| -rw-r--r-- | pom.xml | 1 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/DosDetection.java | 23 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/ParseSketchLog.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/sink/OutputStreamSink.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/source/BaselineSource.java | 7 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/IpUtils.java | 6 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 10 |
7 files changed, 34 insertions, 21 deletions
@@ -141,6 +141,7 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> + <scope>provided</scope> </dependency> diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index fe30c70..1107377 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -7,6 +7,9 @@ import com.zdjizhi.sink.OutputStreamSink; import com.zdjizhi.utils.IpUtils; import com.zdjizhi.utils.SnowflakeId; import org.apache.commons.lang.StringUtils; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; @@ -29,6 +32,10 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str private final static int BASELINE_SIZE = 144; + private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state", + Types.STRING, + new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass())); + private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); @Override @@ -39,7 +46,7 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str @Override public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector<DosEventLog> out) throws Exception { try { - Map<String, Map<String, List<Integer>>> broadcast = ctx.getBroadcastState(OutputStreamSink.descriptor).get("broadcast-state"); + Map<String, Map<String, List<Integer>>> broadcast = ctx.getBroadcastState(descriptor).get("broadcast-state"); String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); logger.info("当前判断IP:{}, 类型: {}",destinationIp,attackType); @@ -72,8 +79,12 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str } @Override - public void processBroadcastElement(Map<String, Map<String, List<Integer>>> value, Context ctx, Collector<DosEventLog> out) throws Exception { - ctx.getBroadcastState(OutputStreamSink.descriptor).put("broadcast-state", value); + public void processBroadcastElement(Map<String, Map<String, List<Integer>>> value, Context ctx, Collector<DosEventLog> out) { + try { + ctx.getBroadcastState(descriptor).put("broadcast-state", value); + }catch (Exception e){ + logger.error("更新广播状态失败 {}",e); + } } public static void main(String[] args) { @@ -86,8 +97,8 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str // strings.add("153.146.241.196"); // strings.add("132.46.241.21"); // String join = StringUtils.join(strings, ","); - System.out.println(IpUtils.ipLookup.countryLookup("192.168.50.150")); - +// System.out.println(IpUtils.ipLookup.countryLookup("192.168.50.150")); + System.out.println(Severity.CRITICAL.severity); } private DosEventLog getResult(DosSketchLog value,Severity severity,String percent){ @@ -96,7 +107,7 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str dosEventLog.setStart_time(value.getSketch_start_time()); dosEventLog.setEnd_time(value.getSketch_start_time()+CommonConfig.FLINK_WINDOW_MAX_TIME); dosEventLog.setAttack_type(value.getAttack_type()); - dosEventLog.setSeverity(severity.name()); + dosEventLog.setSeverity(severity.toString()); dosEventLog.setConditions(getConditions(percent)); dosEventLog.setDestination_ip(value.getDestination_ip()); dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip())); diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index fe14071..b17a9f5 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -26,7 +26,7 @@ public class ParseSketchLog { } private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){ - return DosSketchSource.createDosSketchSource().flatMap(new flatSketchLog()); + return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog()); } private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){ @@ -35,7 +35,7 @@ public class ParseSketchLog { .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); } - private static class flatSketchLog implements FlatMapFunction<String, DosSketchLog> { + private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> { @Override public void flatMap(String s, Collector<DosSketchLog> collector) throws Exception { try { diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index 755d9bf..a8fc74c 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -34,7 +34,7 @@ public class OutputStreamSink { public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){}; - public static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state", + private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state", Types.STRING, new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass())); @@ -46,7 +46,7 @@ public class OutputStreamSink { TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream); FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME); } catch (Exception e) { - logger.error(""); + logger.error("任务启动失败 {}",e); } } diff --git a/src/main/java/com/zdjizhi/source/BaselineSource.java b/src/main/java/com/zdjizhi/source/BaselineSource.java index 9998dc3..a6d0429 100644 --- a/src/main/java/com/zdjizhi/source/BaselineSource.java +++ b/src/main/java/com/zdjizhi/source/BaselineSource.java @@ -63,6 +63,7 @@ public class BaselineSource extends RichSourceFunction<Map<String, Map<String,Li @Override public void run(SourceContext<Map<String, Map<String,List<Integer>>>> sourceContext) throws Exception { + logger.info("开始读取baseline数据"); ResultScanner rs = table.getScanner(scan); // Map<String, List<Integer>[]> baselineMap = new HashMap<>(); Map<String, Map<String,List<Integer>>> baselineMap = new HashMap<>(); @@ -70,9 +71,9 @@ public class BaselineSource extends RichSourceFunction<Map<String, Map<String,Li Map<String, List<Integer>> floodTypeMap = new HashMap<>(); String rowkey = Bytes.toString(result.getRow()); ArrayList<Integer> tcp = getArraylist(result,"TCP SYN Flood", "session_num"); - ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "b"); - ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "c"); - ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "d"); + ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "session_num"); + ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "session_num"); + ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "session_num"); floodTypeMap.put("TCP SYN Flood",tcp); floodTypeMap.put("UDP Flood",udp); floodTypeMap.put("ICMP Flood",icmp); diff --git a/src/main/java/com/zdjizhi/utils/IpUtils.java b/src/main/java/com/zdjizhi/utils/IpUtils.java index 11aea2c..1468c04 100644 --- a/src/main/java/com/zdjizhi/utils/IpUtils.java +++ b/src/main/java/com/zdjizhi/utils/IpUtils.java @@ -8,10 +8,10 @@ public class IpUtils { * IP定位库工具类 */ public static IpLookup ipLookup = new IpLookup.Builder(false) - .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb") - .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb") +// .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb") +// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb") .loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_private_v4.mmdb") - .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb") +// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb") .build(); public static void main(String[] args) { diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index bfdd73a..f7b48d6 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -5,8 +5,8 @@ kafka.input.parallelism=1 kafka.input.topic.name=DOS-SKETCH-LOG kafka.input.bootstrap.servers=192.168.44.12:9092 kafka.input.scan.startup.mode=latest-offset -#kafka.input.group.id=2107291738 -kafka.input.group.id=test +kafka.input.group.id=2108041426 +#kafka.input.group.id=test kafka.output.metric.parallelism=1 kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG @@ -25,14 +25,14 @@ hbase.baseline.total.num=1000000 flink.first.agg.parallelism=1 flink.second.agg.parallelism=1 flink.watermark.max.orderness=1 -flink.window.max.time=10 +flink.window.max.time=600 source.ip.list.limit=10000 data.center.id.num=15 -ip.mmdb.path=D:\\data\\dat\\ -#ip.mmdb.path=/home/bigdata/topology/dat/ +ip.mmdb.path=D:\\data\\dat_test\\ +#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ baseline.sessions.minor.threshold=0.1 baseline.sessions.warning.threshold=0.5 |
