summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2022-05-17 13:54:43 +0800
committerwanglihui <[email protected]>2022-05-17 13:54:43 +0800
commit1fcdb79739c396727207becb24358221edcc94f7 (patch)
treed1673922d26573aa8baae7aab144b2abdd8dd04d
parentdb17064f73bde69f9f56f5db96ebc5dcd7de2ced (diff)
DoS事件日志对Conditions基于速率检测属性值修正tsg-22.03
-rw-r--r--src/main/java/com/zdjizhi/etl/DosDetection.java2
-rw-r--r--src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java29
-rw-r--r--src/main/java/com/zdjizhi/utils/KafkaUtils.java4
-rw-r--r--src/main/resources/common.properties14
4 files changed, 40 insertions, 9 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index 1f5568d..af5bafb 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -193,7 +193,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
switch (type) {
case STATIC_CONDITION_TYPE:
return new StrBuilder()
- .append(tag).append(" > ")
+ .append("Rate > ")
.append(base).append(" ")
.append(tag).append("/s")
.toString();
diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
index e34ce28..cd628c5 100644
--- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
+++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
@@ -1,6 +1,8 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -12,6 +14,33 @@ public class FlinkEnvironmentUtils {
static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
+
+ /*
+ // 每 1000ms 开始一次 checkpoint
+ streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000);
+
+ // 设置模式为精确一次 (这是默认值)
+ streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+ // 确认 checkpoints 之间的时间会进行 500 ms
+ streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
+
+ // Checkpoint 必须在一分钟内完成,否则就会被抛弃
+ streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
+
+ // 允许两个连续的 checkpoint 错误
+ streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+
+ // 同一时间只允许一个 checkpoint 进行
+ streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
+ streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+ // 开启实验性的 unaligned checkpoints
+ streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints();
+ */
}
}
diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
index 6e6167a..b0312a5 100644
--- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java
+++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
@@ -22,12 +22,14 @@ public class KafkaUtils {
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
- return new FlinkKafkaProducer<String>(
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
new SimpleStringSchema(),
getKafkaSinkProperty(),
Optional.empty()
);
+ kafkaProducer.setLogFailuresOnly(true);
+ return kafkaProducer;
}
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index 1331475..92d0520 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
-kafka.input.group.id=2112080949
+kafka.input.group.id=dos-detection-job-220516-1
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
@@ -37,8 +37,8 @@ kafka.output.bootstrap.servers=192.168.44.12:9094
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址
-hbase.zookeeper.quorum=192.168.44.12:2181
-#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+#hbase.zookeeper.quorum=192.168.44.12:2181
+hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
#hbase客户端处理时间
hbase.client.operation.timeout=30000
@@ -74,12 +74,12 @@ destination.ip.partition.num=10000
data.center.id.num=15
#IP mmdb库路径
-ip.mmdb.path=D:\\data\\dat\\
+ip.mmdb.path=D:\\data\\dat\\bak\\
#ip.mmdb.path=/home/bigdata/topology/dat/
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
#静态敏感阈值,速率小于此值不报警
-static.sensitivity.threshold=500
+static.sensitivity.threshold=10
#基线敏感阈值
baseline.sensitivity.threshold=0.2
@@ -92,8 +92,8 @@ baseline.sessions.severe.threshold=5
baseline.sessions.critical.threshold=8
#bifang服务访问地址
-#bifang.server.uri=http://192.168.44.72:80
-bifang.server.uri=http://192.168.44.3:80
+bifang.server.uri=http://192.168.44.72:80
+#bifang.server.uri=http://192.168.44.3:80
#访问bifang只读权限token,bifang内置,无需修改
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867