diff options
| author | wanglihui <[email protected]> | 2021-12-27 18:01:21 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-12-27 18:01:21 +0800 |
| commit | 15f9cb1e2f78d5ea9803eac8b5d9b8c40e8d5a26 (patch) | |
| tree | 6be8ccf4b827cc2f57c25dcd22597ea4581c7743 | |
first commit
20 files changed, 851 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..249425e --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +# Created by .ignore support plugin (hsz.mobi) +*.class + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear +*.zip + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + + +target/ +!.mvn/wrapper/maven-wrapper.jar + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +nbproject/private/ +builds/ +nbbuild/ +dist/ +nbdist/ +.nb-gradle/ + +log/ +logs/
\ No newline at end of file diff --git a/base-platform/pom.xml b/base-platform/pom.xml new file mode 100644 index 0000000..7cd098d --- /dev/null +++ b/base-platform/pom.xml @@ -0,0 +1,14 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-stream-schedule-platform</artifactId> + <groupId>com.zdjizhi</groupId> + <version>1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>base-platform</artifactId> + +</project>
\ No newline at end of file diff --git a/base-platform/src/main/java/com/zdjizhi/base/common/CommonConfig.java b/base-platform/src/main/java/com/zdjizhi/base/common/CommonConfig.java new file mode 100644 index 0000000..8bbb749 --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/common/CommonConfig.java @@ -0,0 +1,47 @@ +package com.zdjizhi.base.common; + +import com.zdjizhi.base.utils.CommonConfigurations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * @author wlh + * @date 2021/1/6 + */ +public class CommonConfig { + + private static CommonConfigurations configurations; + private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class); + + static { + Properties propService; + try { + propService = new Properties(); + propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties")); + configurations = new CommonConfigurations(propService); + } catch (Exception e) { + logger.error("加载common.properties配置文件失败"); + System.exit(1); + } + } + + public static final int STREAM_EXECUTION_ENVIRONMENT_PARALLELISM = configurations.getIntProperty("stream.execution.environment.parallelism"); + public static final String STREAM_EXECUTION_JOB_NAME = configurations.getStringProperty("stream.execution.job.name"); + + public static final int KAFKA_INPUT_PARALLELISM = configurations.getIntProperty("kafka.input.parallelism"); + public static final String KAFKA_INPUT_TOPIC_NAME = configurations.getStringProperty("kafka.input.topic.name"); + public static final String KAFKA_INPUT_BOOTSTRAP_SERVERS = configurations.getStringProperty("kafka.input.bootstrap.servers"); + public static final String KAFKA_GROUP_ID = configurations.getStringProperty("kafka.input.group.id"); + + public static final int KAFKA_OUTPUT_PARALLELISM = configurations.getIntProperty("kafka.output.parallelism"); + public static final String KAFKA_OUTPUT_TOPIC_NAME = configurations.getStringProperty("kafka.output.topic.name"); + public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = configurations.getStringProperty("kafka.output.bootstrap.servers"); + + public static final String SASL_JAAS_CONFIG_USER = configurations.getStringProperty("sasl.jaas.config.user"); + public static final String SASL_JAAS_CONFIG_PASSWORD = configurations.getStringProperty("sasl.jaas.config.password"); + + public static final int SASL_JAAS_CONFIG_FLAG = configurations.getIntProperty("sasl.jaas.config.flag"); + +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/common/DosSketchLog.java b/base-platform/src/main/java/com/zdjizhi/base/common/DosSketchLog.java new file mode 100644 index 0000000..6c023da --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/common/DosSketchLog.java @@ -0,0 +1,140 @@ +package com.zdjizhi.base.common; + +import java.io.Serializable; +import java.util.Objects; + +public class DosSketchLog implements Serializable { + + private String common_sled_ip; + private String common_data_center; + private long sketch_start_time; + private long sketch_duration; + private String attack_type; + private String source_ip; + private String destination_ip; + private long sketch_sessions; + private long sketch_packets; + private long sketch_bytes; + + @Override + public String toString() { + return "DosSketchLog{" + + "common_sled_ip='" + common_sled_ip + '\'' + + ", common_data_center='" + common_data_center + '\'' + + ", sketch_start_time=" + sketch_start_time + + ", sketch_duration=" + sketch_duration + + ", attack_type='" + attack_type + '\'' + + ", source_ip='" + source_ip + '\'' + + ", destination_ip='" + destination_ip + '\'' + + ", sketch_sessions=" + sketch_sessions + + ", sketch_packets=" + sketch_packets + + ", sketch_bytes=" + sketch_bytes + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DosSketchLog)) { + return false; + } + DosSketchLog sketchLog = (DosSketchLog) o; + return getSketch_start_time() == sketchLog.getSketch_start_time() && + getSketch_duration() == sketchLog.getSketch_duration() && + getSketch_sessions() == sketchLog.getSketch_sessions() && + getSketch_packets() == sketchLog.getSketch_packets() && + getSketch_bytes() == sketchLog.getSketch_bytes() && + Objects.equals(getCommon_sled_ip(), sketchLog.getCommon_sled_ip()) && + Objects.equals(getCommon_data_center(), sketchLog.getCommon_data_center()) && + Objects.equals(getAttack_type(), sketchLog.getAttack_type()) && + Objects.equals(getSource_ip(), sketchLog.getSource_ip()) && + Objects.equals(getDestination_ip(), sketchLog.getDestination_ip()); + } + + @Override + public int hashCode() { + return Objects.hash(getCommon_sled_ip(), getCommon_data_center(), getSketch_start_time(), getSketch_duration(), getAttack_type(), getSource_ip(), getDestination_ip(), getSketch_sessions(), getSketch_packets(), getSketch_bytes()); + } + + public String getCommon_sled_ip() { + return common_sled_ip; + } + + public void setCommon_sled_ip(String common_sled_ip) { + this.common_sled_ip = common_sled_ip; + } + + public String getCommon_data_center() { + return common_data_center; + } + + public void setCommon_data_center(String common_data_center) { + this.common_data_center = common_data_center; + } + + public long getSketch_start_time() { + return sketch_start_time; + } + + public void setSketch_start_time(long sketch_start_time) { + this.sketch_start_time = sketch_start_time; + } + + public long getSketch_duration() { + return sketch_duration; + } + + public void setSketch_duration(long sketch_duration) { + this.sketch_duration = sketch_duration; + } + + public String getAttack_type() { + return attack_type; + } + + public void setAttack_type(String attack_type) { + this.attack_type = attack_type; + } + + public String getSource_ip() { + return source_ip; + } + + public void setSource_ip(String source_ip) { + this.source_ip = source_ip; + } + + public String getDestination_ip() { + return destination_ip; + } + + public void setDestination_ip(String destination_ip) { + this.destination_ip = destination_ip; + } + + public long getSketch_sessions() { + return sketch_sessions; + } + + public void setSketch_sessions(long sketch_sessions) { + this.sketch_sessions = sketch_sessions; + } + + public long getSketch_packets() { + return sketch_packets; + } + + public void setSketch_packets(long sketch_packets) { + this.sketch_packets = sketch_packets; + } + + public long getSketch_bytes() { + return sketch_bytes; + } + + public void setSketch_bytes(long sketch_bytes) { + this.sketch_bytes = sketch_bytes; + } +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java b/base-platform/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java new file mode 100644 index 0000000..69dbd45 --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java @@ -0,0 +1,76 @@ +package com.zdjizhi.base.etl; + +import com.fasterxml.jackson.databind.JavaType; +import com.zdjizhi.base.common.DosSketchLog; +import com.zdjizhi.base.source.Source; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; + +public class DosDetectionEtl { + + private static Logger logger = LoggerFactory.getLogger(DosDetectionEtl.class); + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); + private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); + + + public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){ + return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); + } + + private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){ + return Source.createSource().flatMap(new FlatSketchLog()); + } + + private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){ + return WatermarkStrategy + .<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(10)) + .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); + } + + private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> { + @Override + public void flatMap(String s, Collector<DosSketchLog> collector) { + try { + if (StringUtil.isNotBlank(s)){ + HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType); + long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); + long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); + String attackType = sketchSource.get("attack_type").toString(); + ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); + for (HashMap<String, Object> obj : reportIpList) { + DosSketchLog dosSketchLog = new DosSketchLog(); + dosSketchLog.setSketch_start_time(sketchStartTime); + dosSketchLog.setSketch_duration(sketchDuration); + dosSketchLog.setAttack_type(attackType); + String sourceIp = obj.get("source_ip").toString(); + String destinationIp = obj.get("destination_ip").toString(); + long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); + long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); + long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); + dosSketchLog.setSource_ip(sourceIp); + dosSketchLog.setDestination_ip(destinationIp); + dosSketchLog.setSketch_sessions(sketchSessions); + dosSketchLog.setSketch_packets(sketchPackets); + dosSketchLog.setSketch_bytes(sketchBytes); + collector.collect(dosSketchLog); + logger.debug("数据解析成功:{}",dosSketchLog.toString()); + } + } + } catch (Exception e) { + logger.error("数据解析错误:{} \n{}",s,e); + } + } + } + +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/platform/Schedule.java b/base-platform/src/main/java/com/zdjizhi/base/platform/Schedule.java new file mode 100644 index 0000000..d1567be --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/platform/Schedule.java @@ -0,0 +1,7 @@ +package com.zdjizhi.base.platform; + +public interface Schedule { + + void schedule() throws Exception; + +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/sink/Sink.java b/base-platform/src/main/java/com/zdjizhi/base/sink/Sink.java new file mode 100644 index 0000000..0aaaaf8 --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/sink/Sink.java @@ -0,0 +1,4 @@ +package com.zdjizhi.base.sink; + +public class Sink { +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/source/Source.java b/base-platform/src/main/java/com/zdjizhi/base/source/Source.java new file mode 100644 index 0000000..643909f --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/source/Source.java @@ -0,0 +1,18 @@ +package com.zdjizhi.base.source; + +import com.zdjizhi.base.common.CommonConfig; +import com.zdjizhi.base.utils.FlinkEnvironmentUtils; +import com.zdjizhi.base.utils.KafkaUtils; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class Source { + private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; + + public static DataStreamSource<String> createSource(){ + + return streamExeEnv.addSource(KafkaUtils.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME)) + .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM); + } + +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/utils/CommonConfigurations.java b/base-platform/src/main/java/com/zdjizhi/base/utils/CommonConfigurations.java new file mode 100644 index 0000000..583dbdd --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/utils/CommonConfigurations.java @@ -0,0 +1,38 @@ +package com.zdjizhi.base.utils; + +import java.util.Properties; + +public final class CommonConfigurations { + + private Properties propService; + + public CommonConfigurations(Properties propService){ + this.propService = propService; + } + + + public String getStringProperty(String key) { + return propService.getProperty(key); + } + + public Integer getIntProperty(String key) { + + return Integer.parseInt(propService.getProperty(key)); + + } + + public Double getDoubleProperty(String key) { + + return Double.parseDouble(propService.getProperty(key)); + + } + + public Long getLongProperty(String key) { + return Long.parseLong(propService.getProperty(key)); + + } + + public Boolean getBooleanProperty(Integer type, String key) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java b/base-platform/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java new file mode 100644 index 0000000..d15d81e --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/utils/FlinkEnvironmentUtils.java @@ -0,0 +1,17 @@ +package com.zdjizhi.base.utils; + +import com.zdjizhi.base.common.CommonConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + + +/** + * @author wlh + */ +public class FlinkEnvironmentUtils { + public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + + static { + streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); + } + +} diff --git a/base-platform/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/base-platform/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java new file mode 100644 index 0000000..fa4b2f0 --- /dev/null +++ b/base-platform/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java @@ -0,0 +1,52 @@ +package com.zdjizhi.base.utils; + +import com.zdjizhi.base.common.CommonConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; + +import java.util.Optional; +import java.util.Properties; + +public class KafkaUtils { + + private static Properties getKafkaSinkProperty() { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); + updateSasl(properties); + return properties; + } + + private static Properties getKafkaSourceProperty() { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); + properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); + updateSasl(properties); + return properties; + } + + private static void updateSasl(Properties properties) { + if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + CommonConfig.SASL_JAAS_CONFIG_USER + "\" password=\"" + CommonConfig.SASL_JAAS_CONFIG_PASSWORD + "\";"); + } + } + + public static FlinkKafkaProducer<String> getKafkaSink(String topic) { + return new FlinkKafkaProducer<String>( + topic, + new SimpleStringSchema(), + getKafkaSinkProperty(), + Optional.empty() + ); + } + + public static FlinkKafkaConsumer<String> createSource(String topic){ + return new FlinkKafkaConsumer<String>( + topic, + new SimpleStringSchema(), + getKafkaSourceProperty()); + } + +} diff --git a/base-platform/src/main/resources/common.properties b/base-platform/src/main/resources/common.properties new file mode 100644 index 0000000..4e52f6c --- /dev/null +++ b/base-platform/src/main/resources/common.properties @@ -0,0 +1,33 @@ +#flink运行环境并行度,其优先级低于算子并行度,如果未设置算子并行度,则使用该数值 +stream.execution.environment.parallelism=1 + +#flink任务名,一般不变 +stream.execution.job.name=FLINK-STREAM-SCHEDULE-PLATFORM + +#输入kafka并行度大小 +kafka.input.parallelism=1 + +#输入kafka topic名 +kafka.input.topic.name=DOS-SKETCH-RECORD + +#输入kafka地址 +kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 + +#读取kafka group id +kafka.input.group.id=2112080949 + +#发送kafka metrics并行度大小 +kafka.output.parallelism=1 + +#发送kafka metrics topic名 +kafka.output.topic.name=test + +#kafka输出地址 +kafka.output.bootstrap.servers=192.168.44.12:9094 + +#kafka用户认证配置参数 +sasl.jaas.config.user=admin +sasl.jaas.config.password=galaxy2019 + +#是否开启kafka用户认证配置,1:是;0:否 +sasl.jaas.config.flag=1
\ No newline at end of file diff --git a/dos-detection/pom.xml b/dos-detection/pom.xml new file mode 100644 index 0000000..740d0aa --- /dev/null +++ b/dos-detection/pom.xml @@ -0,0 +1,22 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-stream-schedule-platform</artifactId> + <groupId>com.zdjizhi</groupId> + <version>1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>dos-detection</artifactId> + + <dependencies> + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>base-platform</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> + </dependencies> + +</project>
\ No newline at end of file diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/common/CommonConfig.java b/dos-detection/src/main/java/com/zdjizhi/dos/common/CommonConfig.java new file mode 100644 index 0000000..b8bc0f2 --- /dev/null +++ b/dos-detection/src/main/java/com/zdjizhi/dos/common/CommonConfig.java @@ -0,0 +1,77 @@ +package com.zdjizhi.dos.common; +import com.zdjizhi.base.utils.CommonConfigurations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * @author wlh + * @date 2021/1/6 + */ +public class CommonConfig { + + private static CommonConfigurations configurations; + private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class); + + static { + Properties propService; + try { + propService = new Properties(); + propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("dos-detection.properties")); + configurations = new CommonConfigurations(propService); + } catch (Exception e) { + logger.error("加载dos-detection.properties配置文件失败"); + System.exit(1); + } + } + + public static final int KAFKA_OUTPUT_METRIC_PARALLELISM = configurations.getIntProperty("kafka.output.metric.parallelism"); + public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = configurations.getStringProperty("kafka.output.metric.topic.name"); + public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = configurations.getIntProperty("kafka.output.event.parallelism"); + public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = configurations.getStringProperty("kafka.output.event.topic.name"); + + public static final String HBASE_ZOOKEEPER_QUORUM = configurations.getStringProperty("hbase.zookeeper.quorum"); + public static final int HBASE_CLIENT_OPERATION_TIMEOUT = configurations.getIntProperty("hbase.client.operation.timeout"); + public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = configurations.getIntProperty("hbase.client.scanner.timeout.period"); + + public static final String HBASE_BASELINE_TABLE_NAME = configurations.getStringProperty("hbase.baseline.table.name"); + public static final int HBASE_BASELINE_TOTAL_NUM = configurations.getIntProperty("hbase.baseline.total.num"); + public static final int HBASE_BASELINE_TTL = configurations.getIntProperty("hbase.baseline.ttl"); + + public static final int FLINK_FIRST_AGG_PARALLELISM = configurations.getIntProperty("flink.first.agg.parallelism"); + public static final int FLINK_DETECTION_MAP_PARALLELISM = configurations.getIntProperty("flink.detection.map.parallelism"); + public static final int FLINK_WATERMARK_MAX_ORDERNESS = configurations.getIntProperty("flink.watermark.max.orderness"); + public static final int FLINK_WINDOW_MAX_TIME = configurations.getIntProperty("flink.window.max.time"); + + public static final int SOURCE_IP_LIST_LIMIT = configurations.getIntProperty("source.ip.list.limit"); + public static final int DESTINATION_IP_PARTITION_NUM = configurations.getIntProperty("destination.ip.partition.num"); + public static final int DATA_CENTER_ID_NUM = configurations.getIntProperty("data.center.id.num"); + + public static final String IP_MMDB_PATH = configurations.getStringProperty("ip.mmdb.path"); + + public static final int STATIC_SENSITIVITY_THRESHOLD = configurations.getIntProperty("static.sensitivity.threshold"); + public static final double BASELINE_SENSITIVITY_THRESHOLD = configurations.getDoubleProperty("baseline.sensitivity.threshold"); + + public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.minor.threshold"); + public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.warning.threshold"); + public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.major.threshold"); + public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.severe.threshold"); + public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = configurations.getDoubleProperty("baseline.sessions.critical.threshold"); + + public static final String BIFANG_SERVER_URI = configurations.getStringProperty("bifang.server.uri"); + public static final String BIFANG_SERVER_TOKEN = configurations.getStringProperty("bifang.server.token"); + public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = configurations.getStringProperty("bifang.server.encryptpwd.path"); + public static final String BIFANG_SERVER_LOGIN_PATH = configurations.getStringProperty("bifang.server.login.path"); + public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = configurations.getStringProperty("bifang.server.policy.threshold.path"); + + public static final int HTTP_POOL_MAX_CONNECTION = configurations.getIntProperty("http.pool.max.connection"); + public static final int HTTP_POOL_MAX_PER_ROUTE = configurations.getIntProperty("http.pool.max.per.route"); + public static final int HTTP_POOL_REQUEST_TIMEOUT = configurations.getIntProperty("http.pool.request.timeout"); + public static final int HTTP_POOL_CONNECT_TIMEOUT = configurations.getIntProperty("http.pool.connect.timeout"); + public static final int HTTP_POOL_RESPONSE_TIMEOUT = configurations.getIntProperty("http.pool.response.timeout"); + + public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = configurations.getIntProperty("static.threshold.schedule.minutes"); + public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = configurations.getIntProperty("baseline.threshold.schedule.days"); + +} diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java new file mode 100644 index 0000000..ec80146 --- /dev/null +++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java @@ -0,0 +1,15 @@ +package com.zdjizhi.dos.sink; + +import com.zdjizhi.base.etl.DosDetectionEtl; +import com.zdjizhi.base.platform.Schedule; + +/** + * @author 94976 + */ +public class OutputStreamSink implements Schedule { + + @Override + public void schedule() throws Exception { + DosDetectionEtl.getSketchSource().print(); + } +} diff --git a/dos-detection/src/main/resources/dos-detection.properties b/dos-detection/src/main/resources/dos-detection.properties new file mode 100644 index 0000000..2af2a12 --- /dev/null +++ b/dos-detection/src/main/resources/dos-detection.properties @@ -0,0 +1,105 @@ +#发送kafka metrics并行度大小 +kafka.output.metric.parallelism=1 + +#发送kafka metrics topic名 +#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS +kafka.output.metric.topic.name=test + +#发送kafka event并行度大小 +kafka.output.event.parallelism=1 + +#发送kafka event topic名 +kafka.output.event.topic.name=dos-test + +#zookeeper地址 +hbase.zookeeper.quorum=192.168.44.12:2181 +#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 + +#hbase客户端处理时间 +hbase.client.operation.timeout=30000 +hbase.client.scanner.timeout.period=30000 + +##hbase baseline表名 +hbase.baseline.table.name=dos:ddos_traffic_baselines + +#读取baseline限制 +hbase.baseline.total.num=1000000 + +#baseline ttl,单位:天 +hbase.baseline.ttl=30 + +#设置聚合并行度,2个key +flink.first.agg.parallelism=1 + +#设置结果判定并行度 +flink.detection.map.parallelism=1 + +#watermark延迟 +flink.watermark.max.orderness=10 + +#计算窗口大小,默认600s +flink.window.max.time=10 + +#dos event结果中distinct source IP限制 +source.ip.list.limit=10000 + +#基于目的IP的分区数,默认为10000,一般不变 +destination.ip.partition.num=10000 + +data.center.id.num=15 + +#IP mmdb库路径 +ip.mmdb.path=D:\\data\\dat\\ +#ip.mmdb.path=/home/bigdata/topology/dat/ +#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ + +#静态敏感阈值,速率小于此值不报警 +static.sensitivity.threshold=500 + +#基线敏感阈值 +baseline.sensitivity.threshold=0.2 + +#基于baseline判定dos攻击的上下限 +baseline.sessions.minor.threshold=0.5 +baseline.sessions.warning.threshold=1 +baseline.sessions.major.threshold=2.5 +baseline.sessions.severe.threshold=5 +baseline.sessions.critical.threshold=8 + +#bifang服务访问地址 +#bifang.server.uri=http://192.168.44.72:80 +bifang.server.uri=http://192.168.44.3:80 + +#访问bifang只读权限token,bifang内置,无需修改 +bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867 + +#加密密码路径信息 +bifang.server.encryptpwd.path=/v1/user/encryptpwd + +#登录bifang服务路径信息 +bifang.server.login.path=/v1/user/login + +#获取静态阈值路径信息 +bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold + +#http请求相关参数 +#最大连接数 +http.pool.max.connection=400 + +#单路由最大连接数 +http.pool.max.per.route=80 + +#向服务端请求超时时间设置(单位:毫秒) +http.pool.request.timeout=60000 + +#向服务端连接超时时间设置(单位:毫秒) +http.pool.connect.timeout=60000 + +#服务端响应超时时间设置(单位:毫秒) +http.pool.response.timeout=60000 + +#获取静态阈值周期,默认十分钟 +static.threshold.schedule.minutes=10 + +#获取baseline周期,默认7天 +baseline.threshold.schedule.days=1
\ No newline at end of file @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.zdjizhi</groupId> + <artifactId>flink-stream-schedule-platform</artifactId> + <packaging>pom</packaging> + <version>1.0-SNAPSHOT</version> + <modules> + <module>dos-detection</module> + <module>base-platform</module> + <module>schedule-platform</module> + </modules> + + <properties> + <flink.version>1.13.1</flink.version> + <zdjizhi.version>1.0.6</zdjizhi.version> + </properties> + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + + <repository> + <id>ebi</id> + <name>www.ebi.ac.uk</name> + <url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url> + </repository> + + <repository> + <id>maven-ali</id> + <url>http://maven.aliyun.com/nexus/content/groups/public/</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + <updatePolicy>always</updatePolicy> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + </repository> + + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_2.12</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.12</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>${zdjizhi.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.0</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + <!-- The semantics of this option are reversed, see MCOMPILER-209. --> + <useIncrementalCompilation>false</useIncrementalCompilation> + <compilerArgs> + <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 --> + <arg>-Xpkginfo:always</arg> + </compilerArgs> + </configuration> + </plugin> + </plugins> + </build> + +</project>
\ No newline at end of file diff --git a/schedule-platform/pom.xml b/schedule-platform/pom.xml new file mode 100644 index 0000000..9cabe83 --- /dev/null +++ b/schedule-platform/pom.xml @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-stream-schedule-platform</artifactId> + <groupId>com.zdjizhi</groupId> + <version>1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>schedule-platform</artifactId> + + <dependencies> + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>dos-detection</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> + </dependencies> + + +</project>
\ No newline at end of file diff --git a/schedule-platform/src/main/java/com/zdjizhi/schedule/Execute.java b/schedule-platform/src/main/java/com/zdjizhi/schedule/Execute.java new file mode 100644 index 0000000..be6ab38 --- /dev/null +++ b/schedule-platform/src/main/java/com/zdjizhi/schedule/Execute.java @@ -0,0 +1,29 @@ +package com.zdjizhi.schedule; + +import com.zdjizhi.base.common.CommonConfig; +import com.zdjizhi.base.platform.Schedule; +import com.zdjizhi.base.utils.FlinkEnvironmentUtils; + +import java.util.Properties; + +public class Execute { + + public static void main(String[] args) throws Exception { + execute(); + } + + private static void execute() throws Exception { + Properties propService = new Properties(); + propService.load(Execute.class.getClassLoader().getResourceAsStream("business.properties")); + if (!propService.isEmpty()){ + for (Object key : propService.keySet()) { + String className = propService.getProperty(key.toString()); + Class cls = Class.forName(className); + Schedule schedule = (Schedule) cls.newInstance(); + schedule.schedule(); + } + FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME); + } + } + +} diff --git a/schedule-platform/src/main/resources/business.properties b/schedule-platform/src/main/resources/business.properties new file mode 100644 index 0000000..c69e1d4 --- /dev/null +++ b/schedule-platform/src/main/resources/business.properties @@ -0,0 +1 @@ +dos.detection.task.class=com.zdjizhi.dos.sink.OutputStreamSink
\ No newline at end of file |
