diff options
| author | wanglihui <[email protected]> | 2021-12-28 18:33:40 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-12-28 18:33:40 +0800 |
| commit | b31906b71748a1eafdbfe73731ac0119b2cf7660 (patch) | |
| tree | 72c94aefc94a0f39fbc1bd79fca1d22a726ef2e1 | |
| parent | 2cb7dcba4bcd41355b8be53c13a493d8371a64fc (diff) | |
抽取etl module
| -rw-r--r-- | dos-detection/pom.xml | 8 | ||||
| -rw-r--r-- | dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java | 2 | ||||
| -rw-r--r-- | platform-base/src/main/java/com/zdjizhi/base/source/Source.java | 19 | ||||
| -rw-r--r-- | platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java | 6 | ||||
| -rw-r--r-- | platform-etl/pom.xml | 23 | ||||
| -rw-r--r-- | platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java (renamed from platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java) | 5 | ||||
| -rw-r--r-- | pom.xml | 1 |
7 files changed, 56 insertions, 8 deletions
diff --git a/dos-detection/pom.xml b/dos-detection/pom.xml index 740d0aa..4cf6291 100644 --- a/dos-detection/pom.xml +++ b/dos-detection/pom.xml @@ -14,7 +14,13 @@ <dependencies> <dependency> <groupId>com.zdjizhi</groupId> - <artifactId>base-platform</artifactId> + <artifactId>platform-base</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>platform-etl</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java index ec80146..d9d13b3 100644 --- a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java +++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java @@ -1,7 +1,7 @@ package com.zdjizhi.dos.sink; -import com.zdjizhi.base.etl.DosDetectionEtl; import com.zdjizhi.base.platform.Schedule; +import com.zdjizhi.etl.DosDetectionEtl; /** * @author 94976 diff --git a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java index 643909f..9182517 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java +++ b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java @@ -6,13 +6,26 @@ import com.zdjizhi.base.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.util.Properties; + public class Source { private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; - public static DataStreamSource<String> createSource(){ + public static DataStreamSource<String> createSource(String topic){ + + return createSource(topic,CommonConfig.KAFKA_INPUT_PARALLELISM); + } + + public static DataStreamSource<String> createSource(String topic,int parallelism){ + + return streamExeEnv.addSource(KafkaUtils.createSource(topic)) + .setParallelism(parallelism); + } + + public static DataStreamSource<String> createSource(String topic, int parallelism, Properties properties){ - return streamExeEnv.addSource(KafkaUtils.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME)) - .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM); + return streamExeEnv.addSource(KafkaUtils.createSource(topic,properties)) + .setParallelism(parallelism); } } diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java index fa4b2f0..b7173ef 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java +++ b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java @@ -43,10 +43,14 @@ public class KafkaUtils { } public static FlinkKafkaConsumer<String> createSource(String topic){ + return createSource(topic,getKafkaSourceProperty()); + } + + public static FlinkKafkaConsumer<String> createSource(String topic,Properties properties){ return new FlinkKafkaConsumer<String>( topic, new SimpleStringSchema(), - getKafkaSourceProperty()); + properties); } } diff --git a/platform-etl/pom.xml b/platform-etl/pom.xml new file mode 100644 index 0000000..d9effb7 --- /dev/null +++ b/platform-etl/pom.xml @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-stream-schedule-platform</artifactId> + <groupId>com.zdjizhi</groupId> + <version>1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>platform-etl</artifactId> + + <dependencies> + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>platform-base</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> + </dependencies> + + +</project>
\ No newline at end of file diff --git a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java index 69dbd45..5c5524b 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java +++ b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java @@ -1,6 +1,7 @@ -package com.zdjizhi.base.etl; +package com.zdjizhi.etl; import com.fasterxml.jackson.databind.JavaType; +import com.zdjizhi.base.common.CommonConfig; import com.zdjizhi.base.common.DosSketchLog; import com.zdjizhi.base.source.Source; import com.zdjizhi.utils.JsonMapper; @@ -29,7 +30,7 @@ public class DosDetectionEtl { } private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){ - return Source.createSource().flatMap(new FlatSketchLog()); + return Source.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME).flatMap(new FlatSketchLog()); } private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){ @@ -12,6 +12,7 @@ <module>dos-detection</module> <module>platform-base</module> <module>platform-schedule</module> + <module>platform-etl</module> </modules> <properties> |
