summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2022-06-20 18:15:24 +0800
committerwanglihui <[email protected]>2022-06-20 18:15:24 +0800
commitcead1d4d990b25557aca0f91c6e508282f1d4322 (patch)
tree260db767cb468f746632947962e39673c98cc3f5
parent3d974217d9a6f91ef954fd73b245f9c63b6c018b (diff)
parent2d98c3b6e688c31aff7612e5d7e632763932d4b5 (diff)
Merge branch 'tsg-22.06' of git.mesalab.cn:bigdata/tsg/flink-dos-detection
-rw-r--r--src/main/java/com/zdjizhi/common/CommonConfig.java2
-rw-r--r--src/main/java/com/zdjizhi/common/DosVsysId.java22
-rw-r--r--src/main/java/com/zdjizhi/etl/DosDetection.java2
-rw-r--r--src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java76
-rw-r--r--src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java29
-rw-r--r--src/main/java/com/zdjizhi/utils/KafkaUtils.java4
-rw-r--r--src/main/resources/common.properties14
7 files changed, 131 insertions, 18 deletions
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index c47361b..dd6a6f8 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -64,6 +64,8 @@ public class CommonConfig {
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
+ public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path");
+
public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection");
public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route");
public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout");
diff --git a/src/main/java/com/zdjizhi/common/DosVsysId.java b/src/main/java/com/zdjizhi/common/DosVsysId.java
new file mode 100644
index 0000000..27c0eaf
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DosVsysId.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.common;
+
+import java.util.Objects;
+
+public class DosVsysId {
+ private int vsysId;
+
+ public int getVsysId() {
+ return vsysId;
+ }
+
+ public void setVsysId(int vsysId) {
+ this.vsysId = vsysId;
+ }
+
+ @Override
+ public String toString() {
+ return "DosVsysId{" +
+ "vsysId=" + vsysId +
+ '}';
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index 1f5568d..af5bafb 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -193,7 +193,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
switch (type) {
case STATIC_CONDITION_TYPE:
return new StrBuilder()
- .append(tag).append(" > ")
+ .append("Rate > ")
.append(base).append(" ")
.append(tag).append("/s")
.toString();
diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
index 77ff8e9..cfaea86 100644
--- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
+++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
@@ -3,6 +3,7 @@ package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosDetectionThreshold;
+import com.zdjizhi.common.DosVsysId;
import com.zdjizhi.utils.HttpClientUtils;
import com.zdjizhi.utils.JsonMapper;
import inet.ipaddr.IPAddress;
@@ -29,6 +30,7 @@ public class ParseStaticThreshold {
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
+ private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
static {
//加载加密登录密码
@@ -99,19 +101,18 @@ public class ParseStaticThreshold {
}
/**
- * 获取静态阈值配置列表
+ * 获取vsysId配置列表
*
- * @return thresholds
+ * @return vsysIdList
*/
- private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
- ArrayList<DosDetectionThreshold> thresholds = null;
+ private static ArrayList<DosVsysId> getVsysId() {
+ ArrayList<DosVsysId> vsysIdList = null;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
- parms.put("orderBy", "profileId asc");
- parms.put("isValid", 1);
- HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
+ parms.put("orderBy", "vsysId desc");
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
String token = CommonConfig.BIFANG_SERVER_TOKEN;
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
@@ -125,10 +126,10 @@ public class ParseStaticThreshold {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
- thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
- logger.info("获取到静态阈值配置{}条", thresholds.size());
+ vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
+ logger.info("获取到vsysId{}条", vsysIdList.size());
} else {
- logger.warn("静态阈值配置为空");
+ logger.warn("vsysIdList为空");
}
} else {
logger.error(msg);
@@ -136,8 +137,59 @@ public class ParseStaticThreshold {
}
}
} catch (Exception e) {
+ logger.error("获取vsysId失败,请检查bifang服务或登录配置信息 ", e);
+ }
+ return vsysIdList;
+ }
+
+ /**
+ * 根据vsysId获取静态阈值配置列表
+ *
+ * @return thresholds
+ */
+ private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
+ ArrayList<DosDetectionThreshold> thresholds = null;
+// ArrayList<DosVsysId> vsysId = getVsysId();
+ try {
+// if (vsysId != null){
+// for (DosVsysId dosVsysId : vsysId) {
+ URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
+ HashMap<String, Object> parms = new HashMap<>();
+ parms.put("pageSize", -1);
+ parms.put("orderBy", "profileId asc");
+ parms.put("isValid", 1);
+// parms.put("vsysId", dosVsysId.getVsysId());
+ parms.put("vsysId", 1);
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
+ String token = CommonConfig.BIFANG_SERVER_TOKEN;
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
+ BasicHeader authorization = new BasicHeader("Authorization", token);
+ BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
+ String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
+ HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
+ boolean success = (boolean) resposeMap.get("success");
+ String msg = resposeMap.get("msg").toString();
+ if (success) {
+ HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
+ Object list = data.get("list");
+ if (list != null) {
+ thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
+ logger.info("获取到静态阈值配置{}条", thresholds.size());
+ } else {
+ logger.warn("静态阈值配置为空");
+ }
+ } else {
+ logger.error(msg);
+ }
+ }
+ }
+// }
+// }
+ } catch (Exception e) {
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
}
+
return thresholds;
}
@@ -196,7 +248,6 @@ public class ParseStaticThreshold {
}
public static void main(String[] args) {
-
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
dosDetectionThreshold.forEach(System.out::println);
@@ -214,7 +265,8 @@ public class ParseStaticThreshold {
}
System.out.println("------------------------");
}
-
+// String s = loginBifangServer();
+// System.out.println(s);
}
diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
index e34ce28..cd628c5 100644
--- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
+++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
@@ -1,6 +1,8 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -12,6 +14,33 @@ public class FlinkEnvironmentUtils {
static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
+
+ /*
+ // 每 1000ms 开始一次 checkpoint
+ streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000);
+
+ // 设置模式为精确一次 (这是默认值)
+ streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+ // 确认 checkpoints 之间的时间会进行 500 ms
+ streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
+
+ // Checkpoint 必须在一分钟内完成,否则就会被抛弃
+ streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
+
+ // 允许两个连续的 checkpoint 错误
+ streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+
+ // 同一时间只允许一个 checkpoint 进行
+ streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
+ streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+ // 开启实验性的 unaligned checkpoints
+ streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints();
+ */
}
}
diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
index 6e6167a..b0312a5 100644
--- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java
+++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
@@ -22,12 +22,14 @@ public class KafkaUtils {
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
- return new FlinkKafkaProducer<String>(
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
new SimpleStringSchema(),
getKafkaSinkProperty(),
Optional.empty()
);
+ kafkaProducer.setLogFailuresOnly(true);
+ return kafkaProducer;
}
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index 16d0fec..7237611 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -15,23 +15,25 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
-kafka.input.group.id=2203241552
+kafka.input.group.id=2112080949
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
kafka.output.metric.parallelism=1
#发送kafka metrics topic名
-kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
+#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
+kafka.output.metric.topic.name=test
#发送kafka event并行度大小
kafka.output.event.parallelism=1
#发送kafka event topic名
-kafka.output.event.topic.name=DOS-EVENT
+#kafka.output.event.topic.name=DOS-EVENT
+kafka.output.event.topic.name=storm-dos-test
#kafka输出地址
-kafka.output.bootstrap.servers=192.168.40.223:9094
+kafka.output.bootstrap.servers=192.168.44.12:9094
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址
@@ -102,6 +104,9 @@ bifang.server.encryptpwd.path=/v1/user/encryptpwd
#登录bifang服务路径信息
bifang.server.login.path=/v1/user/login
+#获取vaysId路径信息
+bifang.server.policy.vaysid.path=/v1/system/vsys/
+
#获取静态阈值路径信息
bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
@@ -130,6 +135,7 @@ baseline.threshold.schedule.days=1
#kafka用户认证配置参数
sasl.jaas.config.user=admin
#sasl.jaas.config.password=galaxy2019
+#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#是否开启kafka用户认证配置,1:是;0:否