summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-12-27 18:01:21 +0800
committerwanglihui <[email protected]>2021-12-27 18:01:21 +0800
commit15f9cb1e2f78d5ea9803eac8b5d9b8c40e8d5a26 (patch)
tree6be8ccf4b827cc2f57c25dcd22597ea4581c7743
first commit
-rw-r--r--.gitignore43
-rw-r--r--base-platform/pom.xml14
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/common/CommonConfig.java47
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/common/DosSketchLog.java140
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java76
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/platform/Schedule.java7
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/sink/Sink.java4
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/source/Source.java18
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/utils/CommonConfigurations.java38
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java17
-rw-r--r--base-platform/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java52
-rw-r--r--base-platform/src/main/resources/common.properties33
-rw-r--r--dos-detection/pom.xml22
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/common/CommonConfig.java77
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java15
-rw-r--r--dos-detection/src/main/resources/dos-detection.properties105
-rw-r--r--pom.xml90
-rw-r--r--schedule-platform/pom.xml23
-rw-r--r--schedule-platform/src/main/java/com/zdjizhi/schedule/Execute.java29
-rw-r--r--schedule-platform/src/main/resources/business.properties1
20 files changed, 851 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..249425e
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,43 @@
+# Created by .ignore support plugin (hsz.mobi)
+*.class
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+*.zip
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+
+target/
+!.mvn/wrapper/maven-wrapper.jar
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+nbproject/private/
+builds/
+nbbuild/
+dist/
+nbdist/
+.nb-gradle/
+
+log/
+logs/ \ No newline at end of file
diff --git a/base-platform/pom.xml b/base-platform/pom.xml
new file mode 100644
index 0000000..7cd098d
--- /dev/null
+++ b/base-platform/pom.xml
@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-stream-schedule-platform</artifactId>
+ <groupId>com.zdjizhi</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>base-platform</artifactId>
+
+</project> \ No newline at end of file
diff --git a/base-platform/src/main/java/com/zdjizhi/base/common/CommonConfig.java b/base-platform/src/main/java/com/zdjizhi/base/common/CommonConfig.java
new file mode 100644
index 0000000..8bbb749
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/common/CommonConfig.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.base.common;
+
+import com.zdjizhi.base.utils.CommonConfigurations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * @author wlh
+ * @date 2021/1/6
+ */
+public class CommonConfig {
+
+ private static CommonConfigurations configurations;
+ private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class);
+
+ static {
+ Properties propService;
+ try {
+ propService = new Properties();
+ propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
+ configurations = new CommonConfigurations(propService);
+ } catch (Exception e) {
+ logger.error("加载common.properties配置文件失败");
+ System.exit(1);
+ }
+ }
+
+ public static final int STREAM_EXECUTION_ENVIRONMENT_PARALLELISM = configurations.getIntProperty("stream.execution.environment.parallelism");
+ public static final String STREAM_EXECUTION_JOB_NAME = configurations.getStringProperty("stream.execution.job.name");
+
+ public static final int KAFKA_INPUT_PARALLELISM = configurations.getIntProperty("kafka.input.parallelism");
+ public static final String KAFKA_INPUT_TOPIC_NAME = configurations.getStringProperty("kafka.input.topic.name");
+ public static final String KAFKA_INPUT_BOOTSTRAP_SERVERS = configurations.getStringProperty("kafka.input.bootstrap.servers");
+ public static final String KAFKA_GROUP_ID = configurations.getStringProperty("kafka.input.group.id");
+
+ public static final int KAFKA_OUTPUT_PARALLELISM = configurations.getIntProperty("kafka.output.parallelism");
+ public static final String KAFKA_OUTPUT_TOPIC_NAME = configurations.getStringProperty("kafka.output.topic.name");
+ public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = configurations.getStringProperty("kafka.output.bootstrap.servers");
+
+ public static final String SASL_JAAS_CONFIG_USER = configurations.getStringProperty("sasl.jaas.config.user");
+ public static final String SASL_JAAS_CONFIG_PASSWORD = configurations.getStringProperty("sasl.jaas.config.password");
+
+ public static final int SASL_JAAS_CONFIG_FLAG = configurations.getIntProperty("sasl.jaas.config.flag");
+
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/common/DosSketchLog.java b/base-platform/src/main/java/com/zdjizhi/base/common/DosSketchLog.java
new file mode 100644
index 0000000..6c023da
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/common/DosSketchLog.java
@@ -0,0 +1,140 @@
+package com.zdjizhi.base.common;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class DosSketchLog implements Serializable {
+
+ private String common_sled_ip;
+ private String common_data_center;
+ private long sketch_start_time;
+ private long sketch_duration;
+ private String attack_type;
+ private String source_ip;
+ private String destination_ip;
+ private long sketch_sessions;
+ private long sketch_packets;
+ private long sketch_bytes;
+
+ @Override
+ public String toString() {
+ return "DosSketchLog{" +
+ "common_sled_ip='" + common_sled_ip + '\'' +
+ ", common_data_center='" + common_data_center + '\'' +
+ ", sketch_start_time=" + sketch_start_time +
+ ", sketch_duration=" + sketch_duration +
+ ", attack_type='" + attack_type + '\'' +
+ ", source_ip='" + source_ip + '\'' +
+ ", destination_ip='" + destination_ip + '\'' +
+ ", sketch_sessions=" + sketch_sessions +
+ ", sketch_packets=" + sketch_packets +
+ ", sketch_bytes=" + sketch_bytes +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DosSketchLog)) {
+ return false;
+ }
+ DosSketchLog sketchLog = (DosSketchLog) o;
+ return getSketch_start_time() == sketchLog.getSketch_start_time() &&
+ getSketch_duration() == sketchLog.getSketch_duration() &&
+ getSketch_sessions() == sketchLog.getSketch_sessions() &&
+ getSketch_packets() == sketchLog.getSketch_packets() &&
+ getSketch_bytes() == sketchLog.getSketch_bytes() &&
+ Objects.equals(getCommon_sled_ip(), sketchLog.getCommon_sled_ip()) &&
+ Objects.equals(getCommon_data_center(), sketchLog.getCommon_data_center()) &&
+ Objects.equals(getAttack_type(), sketchLog.getAttack_type()) &&
+ Objects.equals(getSource_ip(), sketchLog.getSource_ip()) &&
+ Objects.equals(getDestination_ip(), sketchLog.getDestination_ip());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getCommon_sled_ip(), getCommon_data_center(), getSketch_start_time(), getSketch_duration(), getAttack_type(), getSource_ip(), getDestination_ip(), getSketch_sessions(), getSketch_packets(), getSketch_bytes());
+ }
+
+ public String getCommon_sled_ip() {
+ return common_sled_ip;
+ }
+
+ public void setCommon_sled_ip(String common_sled_ip) {
+ this.common_sled_ip = common_sled_ip;
+ }
+
+ public String getCommon_data_center() {
+ return common_data_center;
+ }
+
+ public void setCommon_data_center(String common_data_center) {
+ this.common_data_center = common_data_center;
+ }
+
+ public long getSketch_start_time() {
+ return sketch_start_time;
+ }
+
+ public void setSketch_start_time(long sketch_start_time) {
+ this.sketch_start_time = sketch_start_time;
+ }
+
+ public long getSketch_duration() {
+ return sketch_duration;
+ }
+
+ public void setSketch_duration(long sketch_duration) {
+ this.sketch_duration = sketch_duration;
+ }
+
+ public String getAttack_type() {
+ return attack_type;
+ }
+
+ public void setAttack_type(String attack_type) {
+ this.attack_type = attack_type;
+ }
+
+ public String getSource_ip() {
+ return source_ip;
+ }
+
+ public void setSource_ip(String source_ip) {
+ this.source_ip = source_ip;
+ }
+
+ public String getDestination_ip() {
+ return destination_ip;
+ }
+
+ public void setDestination_ip(String destination_ip) {
+ this.destination_ip = destination_ip;
+ }
+
+ public long getSketch_sessions() {
+ return sketch_sessions;
+ }
+
+ public void setSketch_sessions(long sketch_sessions) {
+ this.sketch_sessions = sketch_sessions;
+ }
+
+ public long getSketch_packets() {
+ return sketch_packets;
+ }
+
+ public void setSketch_packets(long sketch_packets) {
+ this.sketch_packets = sketch_packets;
+ }
+
+ public long getSketch_bytes() {
+ return sketch_bytes;
+ }
+
+ public void setSketch_bytes(long sketch_bytes) {
+ this.sketch_bytes = sketch_bytes;
+ }
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java b/base-platform/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java
new file mode 100644
index 0000000..69dbd45
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java
@@ -0,0 +1,76 @@
+package com.zdjizhi.base.etl;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.zdjizhi.base.common.DosSketchLog;
+import com.zdjizhi.base.source.Source;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class DosDetectionEtl {
+
+ private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class);
+ private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
+ private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
+ private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
+
+
+ public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
+ return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
+ }
+
+ private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
+ return Source.createSource().flatMap(new FlatSketchLog());
+ }
+
+ private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
+ return WatermarkStrategy
+ .<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(10))
+ .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
+ }
+
+ private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
+ @Override
+ public void flatMap(String s, Collector<DosSketchLog> collector) {
+ try {
+ if (StringUtil.isNotBlank(s)){
+ HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
+ long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
+ long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
+ String attackType = sketchSource.get("attack_type").toString();
+ ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
+ for (HashMap<String, Object> obj : reportIpList) {
+ DosSketchLog dosSketchLog = new DosSketchLog();
+ dosSketchLog.setSketch_start_time(sketchStartTime);
+ dosSketchLog.setSketch_duration(sketchDuration);
+ dosSketchLog.setAttack_type(attackType);
+ String sourceIp = obj.get("source_ip").toString();
+ String destinationIp = obj.get("destination_ip").toString();
+ long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
+ long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
+ long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
+ dosSketchLog.setSource_ip(sourceIp);
+ dosSketchLog.setDestination_ip(destinationIp);
+ dosSketchLog.setSketch_sessions(sketchSessions);
+ dosSketchLog.setSketch_packets(sketchPackets);
+ dosSketchLog.setSketch_bytes(sketchBytes);
+ collector.collect(dosSketchLog);
+ logger.debug("数据解析成功:{}",dosSketchLog.toString());
+ }
+ }
+ } catch (Exception e) {
+ logger.error("数据解析错误:{} \n{}",s,e);
+ }
+ }
+ }
+
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/platform/Schedule.java b/base-platform/src/main/java/com/zdjizhi/base/platform/Schedule.java
new file mode 100644
index 0000000..d1567be
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/platform/Schedule.java
@@ -0,0 +1,7 @@
+package com.zdjizhi.base.platform;
+
+public interface Schedule {
+
+ void schedule() throws Exception;
+
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/sink/Sink.java b/base-platform/src/main/java/com/zdjizhi/base/sink/Sink.java
new file mode 100644
index 0000000..0aaaaf8
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/sink/Sink.java
@@ -0,0 +1,4 @@
+package com.zdjizhi.base.sink;
+
+public class Sink {
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/source/Source.java b/base-platform/src/main/java/com/zdjizhi/base/source/Source.java
new file mode 100644
index 0000000..643909f
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/source/Source.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.base.source;
+
+import com.zdjizhi.base.common.CommonConfig;
+import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
+import com.zdjizhi.base.utils.KafkaUtils;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class Source {
+ private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
+
+ public static DataStreamSource<String> createSource(){
+
+ return streamExeEnv.addSource(KafkaUtils.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME))
+ .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM);
+ }
+
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/utils/CommonConfigurations.java b/base-platform/src/main/java/com/zdjizhi/base/utils/CommonConfigurations.java
new file mode 100644
index 0000000..583dbdd
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/utils/CommonConfigurations.java
@@ -0,0 +1,38 @@
+package com.zdjizhi.base.utils;
+
+import java.util.Properties;
+
+public final class CommonConfigurations {
+
+ private Properties propService;
+
+ public CommonConfigurations(Properties propService){
+ this.propService = propService;
+ }
+
+
+ public String getStringProperty(String key) {
+ return propService.getProperty(key);
+ }
+
+ public Integer getIntProperty(String key) {
+
+ return Integer.parseInt(propService.getProperty(key));
+
+ }
+
+ public Double getDoubleProperty(String key) {
+
+ return Double.parseDouble(propService.getProperty(key));
+
+ }
+
+ public Long getLongProperty(String key) {
+ return Long.parseLong(propService.getProperty(key));
+
+ }
+
+ public Boolean getBooleanProperty(Integer type, String key) {
+ return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ }
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java b/base-platform/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java
new file mode 100644
index 0000000..d15d81e
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.base.utils;
+
+import com.zdjizhi.base.common.CommonConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+
+/**
+ * @author wlh
+ */
+public class FlinkEnvironmentUtils {
+ public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ static {
+ streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
+ }
+
+}
diff --git a/base-platform/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/base-platform/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
new file mode 100644
index 0000000..fa4b2f0
--- /dev/null
+++ b/base-platform/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
@@ -0,0 +1,52 @@
+package com.zdjizhi.base.utils;
+
+import com.zdjizhi.base.common.CommonConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+import java.util.Optional;
+import java.util.Properties;
+
+public class KafkaUtils {
+
+ private static Properties getKafkaSinkProperty() {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
+ updateSasl(properties);
+ return properties;
+ }
+
+ private static Properties getKafkaSourceProperty() {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
+ properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
+ updateSasl(properties);
+ return properties;
+ }
+
+ private static void updateSasl(Properties properties) {
+ if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + CommonConfig.SASL_JAAS_CONFIG_USER + "\" password=\"" + CommonConfig.SASL_JAAS_CONFIG_PASSWORD + "\";");
+ }
+ }
+
+ public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
+ return new FlinkKafkaProducer<String>(
+ topic,
+ new SimpleStringSchema(),
+ getKafkaSinkProperty(),
+ Optional.empty()
+ );
+ }
+
+ public static FlinkKafkaConsumer<String> createSource(String topic){
+ return new FlinkKafkaConsumer<String>(
+ topic,
+ new SimpleStringSchema(),
+ getKafkaSourceProperty());
+ }
+
+}
diff --git a/base-platform/src/main/resources/common.properties b/base-platform/src/main/resources/common.properties
new file mode 100644
index 0000000..4e52f6c
--- /dev/null
+++ b/base-platform/src/main/resources/common.properties
@@ -0,0 +1,33 @@
+#flink运行环境并行度,其优先级低于算子并行度,如果未设置算子并行度,则使用该数值
+stream.execution.environment.parallelism=1
+
+#flink任务名,一般不变
+stream.execution.job.name=FLINK-STREAM-SCHEDULE-PLATFORM
+
+#输入kafka并行度大小
+kafka.input.parallelism=1
+
+#输入kafka topic名
+kafka.input.topic.name=DOS-SKETCH-RECORD
+
+#输入kafka地址
+kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+
+#读取kafka group id
+kafka.input.group.id=2112080949
+
+#发送kafka metrics并行度大小
+kafka.output.parallelism=1
+
+#发送kafka metrics topic名
+kafka.output.topic.name=test
+
+#kafka输出地址
+kafka.output.bootstrap.servers=192.168.44.12:9094
+
+#kafka用户认证配置参数
+sasl.jaas.config.user=admin
+sasl.jaas.config.password=galaxy2019
+
+#是否开启kafka用户认证配置,1:是;0:否
+sasl.jaas.config.flag=1 \ No newline at end of file
diff --git a/dos-detection/pom.xml b/dos-detection/pom.xml
new file mode 100644
index 0000000..740d0aa
--- /dev/null
+++ b/dos-detection/pom.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-stream-schedule-platform</artifactId>
+ <groupId>com.zdjizhi</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dos-detection</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>base-platform</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/common/CommonConfig.java b/dos-detection/src/main/java/com/zdjizhi/dos/common/CommonConfig.java
new file mode 100644
index 0000000..b8bc0f2
--- /dev/null
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/common/CommonConfig.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.dos.common;
+import com.zdjizhi.base.utils.CommonConfigurations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * @author wlh
+ * @date 2021/1/6
+ */
+public class CommonConfig {
+
+ private static CommonConfigurations configurations;
+ private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class);
+
+ static {
+ Properties propService;
+ try {
+ propService = new Properties();
+ propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("dos-detection.properties"));
+ configurations = new CommonConfigurations(propService);
+ } catch (Exception e) {
+ logger.error("加载dos-detection.properties配置文件失败");
+ System.exit(1);
+ }
+ }
+
+ public static final int KAFKA_OUTPUT_METRIC_PARALLELISM = configurations.getIntProperty("kafka.output.metric.parallelism");
+ public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = configurations.getStringProperty("kafka.output.metric.topic.name");
+ public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = configurations.getIntProperty("kafka.output.event.parallelism");
+ public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = configurations.getStringProperty("kafka.output.event.topic.name");
+
+ public static final String HBASE_ZOOKEEPER_QUORUM = configurations.getStringProperty("hbase.zookeeper.quorum");
+ public static final int HBASE_CLIENT_OPERATION_TIMEOUT = configurations.getIntProperty("hbase.client.operation.timeout");
+ public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = configurations.getIntProperty("hbase.client.scanner.timeout.period");
+
+ public static final String HBASE_BASELINE_TABLE_NAME = configurations.getStringProperty("hbase.baseline.table.name");
+ public static final int HBASE_BASELINE_TOTAL_NUM = configurations.getIntProperty("hbase.baseline.total.num");
+ public static final int HBASE_BASELINE_TTL = configurations.getIntProperty("hbase.baseline.ttl");
+
+ public static final int FLINK_FIRST_AGG_PARALLELISM = configurations.getIntProperty("flink.first.agg.parallelism");
+ public static final int FLINK_DETECTION_MAP_PARALLELISM = configurations.getIntProperty("flink.detection.map.parallelism");
+ public static final int FLINK_WATERMARK_MAX_ORDERNESS = configurations.getIntProperty("flink.watermark.max.orderness");
+ public static final int FLINK_WINDOW_MAX_TIME = configurations.getIntProperty("flink.window.max.time");
+
+ public static final int SOURCE_IP_LIST_LIMIT = configurations.getIntProperty("source.ip.list.limit");
+ public static final int DESTINATION_IP_PARTITION_NUM = configurations.getIntProperty("destination.ip.partition.num");
+ public static final int DATA_CENTER_ID_NUM = configurations.getIntProperty("data.center.id.num");
+
+ public static final String IP_MMDB_PATH = configurations.getStringProperty("ip.mmdb.path");
+
+ public static final int STATIC_SENSITIVITY_THRESHOLD = configurations.getIntProperty("static.sensitivity.threshold");
+ public static final double BASELINE_SENSITIVITY_THRESHOLD = configurations.getDoubleProperty("baseline.sensitivity.threshold");
+
+ public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.minor.threshold");
+ public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.warning.threshold");
+ public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.major.threshold");
+ public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.severe.threshold");
+ public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.critical.threshold");
+
+ public static final String BIFANG_SERVER_URI = configurations.getStringProperty("bifang.server.uri");
+ public static final String BIFANG_SERVER_TOKEN = configurations.getStringProperty("bifang.server.token");
+ public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = configurations.getStringProperty("bifang.server.encryptpwd.path");
+ public static final String BIFANG_SERVER_LOGIN_PATH = configurations.getStringProperty("bifang.server.login.path");
+ public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = configurations.getStringProperty("bifang.server.policy.threshold.path");
+
+ public static final int HTTP_POOL_MAX_CONNECTION = configurations.getIntProperty("http.pool.max.connection");
+ public static final int HTTP_POOL_MAX_PER_ROUTE = configurations.getIntProperty("http.pool.max.per.route");
+ public static final int HTTP_POOL_REQUEST_TIMEOUT = configurations.getIntProperty("http.pool.request.timeout");
+ public static final int HTTP_POOL_CONNECT_TIMEOUT = configurations.getIntProperty("http.pool.connect.timeout");
+ public static final int HTTP_POOL_RESPONSE_TIMEOUT = configurations.getIntProperty("http.pool.response.timeout");
+
+ public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = configurations.getIntProperty("static.threshold.schedule.minutes");
+ public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = configurations.getIntProperty("baseline.threshold.schedule.days");
+
+}
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
new file mode 100644
index 0000000..ec80146
--- /dev/null
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
@@ -0,0 +1,15 @@
+package com.zdjizhi.dos.sink;
+
+import com.zdjizhi.base.etl.DosDetectionEtl;
+import com.zdjizhi.base.platform.Schedule;
+
+/**
+ * @author 94976
+ */
+public class OutputStreamSink implements Schedule {
+
+ @Override
+ public void schedule() throws Exception {
+ DosDetectionEtl.getSketchSource().print();
+ }
+}
diff --git a/dos-detection/src/main/resources/dos-detection.properties b/dos-detection/src/main/resources/dos-detection.properties
new file mode 100644
index 0000000..2af2a12
--- /dev/null
+++ b/dos-detection/src/main/resources/dos-detection.properties
@@ -0,0 +1,105 @@
+#发送kafka metrics并行度大小
+kafka.output.metric.parallelism=1
+
+#发送kafka metrics topic名
+#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
+kafka.output.metric.topic.name=test
+
+#发送kafka event并行度大小
+kafka.output.event.parallelism=1
+
+#发送kafka event topic名
+kafka.output.event.topic.name=dos-test
+
+#zookeeper地址
+hbase.zookeeper.quorum=192.168.44.12:2181
+#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+
+#hbase客户端处理时间
+hbase.client.operation.timeout=30000
+hbase.client.scanner.timeout.period=30000
+
+##hbase baseline表名
+hbase.baseline.table.name=dos:ddos_traffic_baselines
+
+#读取baseline限制
+hbase.baseline.total.num=1000000
+
+#baseline ttl,单位:天
+hbase.baseline.ttl=30
+
+#设置聚合并行度,2个key
+flink.first.agg.parallelism=1
+
+#设置结果判定并行度
+flink.detection.map.parallelism=1
+
+#watermark延迟
+flink.watermark.max.orderness=10
+
+#计算窗口大小,默认600s
+flink.window.max.time=10
+
+#dos event结果中distinct source IP限制
+source.ip.list.limit=10000
+
+#基于目的IP的分区数,默认为10000,一般不变
+destination.ip.partition.num=10000
+
+data.center.id.num=15
+
+#IP mmdb库路径
+ip.mmdb.path=D:\\data\\dat\\
+#ip.mmdb.path=/home/bigdata/topology/dat/
+#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
+
+#静态敏感阈值,速率小于此值不报警
+static.sensitivity.threshold=500
+
+#基线敏感阈值
+baseline.sensitivity.threshold=0.2
+
+#基于baseline判定dos攻击的上下限
+baseline.sessions.minor.threshold=0.5
+baseline.sessions.warning.threshold=1
+baseline.sessions.major.threshold=2.5
+baseline.sessions.severe.threshold=5
+baseline.sessions.critical.threshold=8
+
+#bifang服务访问地址
+#bifang.server.uri=http://192.168.44.72:80
+bifang.server.uri=http://192.168.44.3:80
+
+#访问bifang只读权限token,bifang内置,无需修改
+bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867
+
+#加密密码路径信息
+bifang.server.encryptpwd.path=/v1/user/encryptpwd
+
+#登录bifang服务路径信息
+bifang.server.login.path=/v1/user/login
+
+#获取静态阈值路径信息
+bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
+
+#http请求相关参数
+#最大连接数
+http.pool.max.connection=400
+
+#单路由最大连接数
+http.pool.max.per.route=80
+
+#向服务端请求超时时间设置(单位:毫秒)
+http.pool.request.timeout=60000
+
+#向服务端连接超时时间设置(单位:毫秒)
+http.pool.connect.timeout=60000
+
+#服务端响应超时时间设置(单位:毫秒)
+http.pool.response.timeout=60000
+
+#获取静态阈值周期,默认十分钟
+static.threshold.schedule.minutes=10
+
+#获取baseline周期,默认7天
+baseline.threshold.schedule.days=1 \ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..bc8a6c2
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>flink-stream-schedule-platform</artifactId>
+ <packaging>pom</packaging>
+ <version>1.0-SNAPSHOT</version>
+ <modules>
+ <module>dos-detection</module>
+ <module>base-platform</module>
+ <module>schedule-platform</module>
+ </modules>
+
+ <properties>
+ <flink.version>1.13.1</flink.version>
+ <zdjizhi.version>1.0.6</zdjizhi.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+
+ <repository>
+ <id>ebi</id>
+ <name>www.ebi.ac.uk</name>
+ <url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url>
+ </repository>
+
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.12</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>${zdjizhi.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <!-- The semantics of this option are reversed, see MCOMPILER-209. -->
+ <useIncrementalCompilation>false</useIncrementalCompilation>
+ <compilerArgs>
+ <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
+ <arg>-Xpkginfo:always</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/schedule-platform/pom.xml b/schedule-platform/pom.xml
new file mode 100644
index 0000000..9cabe83
--- /dev/null
+++ b/schedule-platform/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-stream-schedule-platform</artifactId>
+ <groupId>com.zdjizhi</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>schedule-platform</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>dos-detection</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/schedule-platform/src/main/java/com/zdjizhi/schedule/Execute.java b/schedule-platform/src/main/java/com/zdjizhi/schedule/Execute.java
new file mode 100644
index 0000000..be6ab38
--- /dev/null
+++ b/schedule-platform/src/main/java/com/zdjizhi/schedule/Execute.java
@@ -0,0 +1,29 @@
+package com.zdjizhi.schedule;
+
+import com.zdjizhi.base.common.CommonConfig;
+import com.zdjizhi.base.platform.Schedule;
+import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
+
+import java.util.Properties;
+
+public class Execute {
+
+ public static void main(String[] args) throws Exception {
+ execute();
+ }
+
+ private static void execute() throws Exception {
+ Properties propService = new Properties();
+ propService.load(Execute.class.getClassLoader().getResourceAsStream("business.properties"));
+ if (!propService.isEmpty()){
+ for (Object key : propService.keySet()) {
+ String className = propService.getProperty(key.toString());
+ Class cls = Class.forName(className);
+ Schedule schedule = (Schedule) cls.newInstance();
+ schedule.schedule();
+ }
+ FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
+ }
+ }
+
+}
diff --git a/schedule-platform/src/main/resources/business.properties b/schedule-platform/src/main/resources/business.properties
new file mode 100644
index 0000000..c69e1d4
--- /dev/null
+++ b/schedule-platform/src/main/resources/business.properties
@@ -0,0 +1 @@
+dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink \ No newline at end of file