summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-08-04 16:30:13 +0800
committerwanglihui <[email protected]>2021-08-04 16:30:13 +0800
commit7077c91d46109775f3635b9800f08162f27abb6a (patch)
treefc0f6cf86380f9473c40182c6e38651ff0875658
parent896e23db7df59b0ad20407d4facd25274afce6d1 (diff)
修改IP定位库信息,增加异常检测信息。dos-detection-broadcast
-rw-r--r--pom.xml1
-rw-r--r--src/main/java/com/zdjizhi/etl/DosDetection.java23
-rw-r--r--src/main/java/com/zdjizhi/etl/ParseSketchLog.java4
-rw-r--r--src/main/java/com/zdjizhi/sink/OutputStreamSink.java4
-rw-r--r--src/main/java/com/zdjizhi/source/BaselineSource.java7
-rw-r--r--src/main/java/com/zdjizhi/utils/IpUtils.java6
-rw-r--r--src/main/resources/common.properties10
7 files changed, 34 insertions, 21 deletions
diff --git a/pom.xml b/pom.xml
index ea52841..89029f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,6 +141,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
+ <scope>provided</scope>
</dependency>
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index fe30c70..1107377 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -7,6 +7,9 @@ import com.zdjizhi.sink.OutputStreamSink;
import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.SnowflakeId;
import org.apache.commons.lang.StringUtils;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
@@ -29,6 +32,10 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
private final static int BASELINE_SIZE = 144;
+ private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state",
+ Types.STRING,
+ new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass()));
+
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
@Override
@@ -39,7 +46,7 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
@Override
public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector<DosEventLog> out) throws Exception {
try {
- Map<String, Map<String, List<Integer>>> broadcast = ctx.getBroadcastState(OutputStreamSink.descriptor).get("broadcast-state");
+ Map<String, Map<String, List<Integer>>> broadcast = ctx.getBroadcastState(descriptor).get("broadcast-state");
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
logger.info("当前判断IP:{}, 类型: {}",destinationIp,attackType);
@@ -72,8 +79,12 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
}
@Override
- public void processBroadcastElement(Map<String, Map<String, List<Integer>>> value, Context ctx, Collector<DosEventLog> out) throws Exception {
- ctx.getBroadcastState(OutputStreamSink.descriptor).put("broadcast-state", value);
+ public void processBroadcastElement(Map<String, Map<String, List<Integer>>> value, Context ctx, Collector<DosEventLog> out) {
+ try {
+ ctx.getBroadcastState(descriptor).put("broadcast-state", value);
+ }catch (Exception e){
+ logger.error("更新广播状态失败 {}",e);
+ }
}
public static void main(String[] args) {
@@ -86,8 +97,8 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
// strings.add("153.146.241.196");
// strings.add("132.46.241.21");
// String join = StringUtils.join(strings, ",");
- System.out.println(IpUtils.ipLookup.countryLookup("192.168.50.150"));
-
+// System.out.println(IpUtils.ipLookup.countryLookup("192.168.50.150"));
+ System.out.println(Severity.CRITICAL.severity);
}
private DosEventLog getResult(DosSketchLog value,Severity severity,String percent){
@@ -96,7 +107,7 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time()+CommonConfig.FLINK_WINDOW_MAX_TIME);
dosEventLog.setAttack_type(value.getAttack_type());
- dosEventLog.setSeverity(severity.name());
+ dosEventLog.setSeverity(severity.toString());
dosEventLog.setConditions(getConditions(percent));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
index fe14071..b17a9f5 100644
--- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
+++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
@@ -26,7 +26,7 @@ public class ParseSketchLog {
}
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
- return DosSketchSource.createDosSketchSource().flatMap(new flatSketchLog());
+ return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog());
}
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
@@ -35,7 +35,7 @@ public class ParseSketchLog {
.withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
}
- private static class flatSketchLog implements FlatMapFunction<String, DosSketchLog> {
+ private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
@Override
public void flatMap(String s, Collector<DosSketchLog> collector) throws Exception {
try {
diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
index 755d9bf..a8fc74c 100644
--- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
+++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
@@ -34,7 +34,7 @@ public class OutputStreamSink {
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
- public static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state",
+ private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state",
Types.STRING,
new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass()));
@@ -46,7 +46,7 @@ public class OutputStreamSink {
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
} catch (Exception e) {
- logger.error("");
+ logger.error("任务启动失败 {}",e);
}
}
diff --git a/src/main/java/com/zdjizhi/source/BaselineSource.java b/src/main/java/com/zdjizhi/source/BaselineSource.java
index 9998dc3..a6d0429 100644
--- a/src/main/java/com/zdjizhi/source/BaselineSource.java
+++ b/src/main/java/com/zdjizhi/source/BaselineSource.java
@@ -63,6 +63,7 @@ public class BaselineSource extends RichSourceFunction<Map<String, Map<String,Li
@Override
public void run(SourceContext<Map<String, Map<String,List<Integer>>>> sourceContext) throws Exception {
+ logger.info("开始读取baseline数据");
ResultScanner rs = table.getScanner(scan);
// Map<String, List<Integer>[]> baselineMap = new HashMap<>();
Map<String, Map<String,List<Integer>>> baselineMap = new HashMap<>();
@@ -70,9 +71,9 @@ public class BaselineSource extends RichSourceFunction<Map<String, Map<String,Li
Map<String, List<Integer>> floodTypeMap = new HashMap<>();
String rowkey = Bytes.toString(result.getRow());
ArrayList<Integer> tcp = getArraylist(result,"TCP SYN Flood", "session_num");
- ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "b");
- ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "c");
- ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "d");
+ ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "session_num");
+ ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "session_num");
+ ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "session_num");
floodTypeMap.put("TCP SYN Flood",tcp);
floodTypeMap.put("UDP Flood",udp);
floodTypeMap.put("ICMP Flood",icmp);
diff --git a/src/main/java/com/zdjizhi/utils/IpUtils.java b/src/main/java/com/zdjizhi/utils/IpUtils.java
index 11aea2c..1468c04 100644
--- a/src/main/java/com/zdjizhi/utils/IpUtils.java
+++ b/src/main/java/com/zdjizhi/utils/IpUtils.java
@@ -8,10 +8,10 @@ public class IpUtils {
* IP定位库工具类
*/
public static IpLookup ipLookup = new IpLookup.Builder(false)
- .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb")
- .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
+// .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb")
+// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_private_v4.mmdb")
- .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
+// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
.build();
public static void main(String[] args) {
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index bfdd73a..f7b48d6 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -5,8 +5,8 @@ kafka.input.parallelism=1
kafka.input.topic.name=DOS-SKETCH-LOG
kafka.input.bootstrap.servers=192.168.44.12:9092
kafka.input.scan.startup.mode=latest-offset
-#kafka.input.group.id=2107291738
-kafka.input.group.id=test
+kafka.input.group.id=2108041426
+#kafka.input.group.id=test
kafka.output.metric.parallelism=1
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG
@@ -25,14 +25,14 @@ hbase.baseline.total.num=1000000
flink.first.agg.parallelism=1
flink.second.agg.parallelism=1
flink.watermark.max.orderness=1
-flink.window.max.time=10
+flink.window.max.time=600
source.ip.list.limit=10000
data.center.id.num=15
-ip.mmdb.path=D:\\data\\dat\\
-#ip.mmdb.path=/home/bigdata/topology/dat/
+ip.mmdb.path=D:\\data\\dat_test\\
+#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
baseline.sessions.minor.threshold=0.1
baseline.sessions.warning.threshold=0.5