summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-12-28 18:33:40 +0800
committerwanglihui <[email protected]>2021-12-28 18:33:40 +0800
commitb31906b71748a1eafdbfe73731ac0119b2cf7660 (patch)
tree72c94aefc94a0f39fbc1bd79fca1d22a726ef2e1
parent2cb7dcba4bcd41355b8be53c13a493d8371a64fc (diff)
抽取etl module
-rw-r--r--dos-detection/pom.xml8
-rw-r--r--dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java2
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/source/Source.java19
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java6
-rw-r--r--platform-etl/pom.xml23
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java (renamed from platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java)5
-rw-r--r--pom.xml1
7 files changed, 56 insertions, 8 deletions
diff --git a/dos-detection/pom.xml b/dos-detection/pom.xml
index 740d0aa..4cf6291 100644
--- a/dos-detection/pom.xml
+++ b/dos-detection/pom.xml
@@ -14,7 +14,13 @@
<dependencies>
<dependency>
<groupId>com.zdjizhi</groupId>
- <artifactId>base-platform</artifactId>
+ <artifactId>platform-base</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>platform-etl</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
diff --git a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
index ec80146..d9d13b3 100644
--- a/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
+++ b/dos-detection/src/main/java/com/zdjizhi/dos/sink/OutputStreamSink.java
@@ -1,7 +1,7 @@
package com.zdjizhi.dos.sink;
-import com.zdjizhi.base.etl.DosDetectionEtl;
import com.zdjizhi.base.platform.Schedule;
+import com.zdjizhi.etl.DosDetectionEtl;
/**
* @author 94976
diff --git a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
index 643909f..9182517 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/source/Source.java
@@ -6,13 +6,26 @@ import com.zdjizhi.base.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import java.util.Properties;
+
public class Source {
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
- public static DataStreamSource<String> createSource(){
+ public static DataStreamSource<String> createSource(String topic){
+
+ return createSource(topic,CommonConfig.KAFKA_INPUT_PARALLELISM);
+ }
+
+ public static DataStreamSource<String> createSource(String topic,int parallelism){
+
+ return streamExeEnv.addSource(KafkaUtils.createSource(topic))
+ .setParallelism(parallelism);
+ }
+
+ public static DataStreamSource<String> createSource(String topic, int parallelism, Properties properties){
- return streamExeEnv.addSource(KafkaUtils.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME))
- .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM);
+ return streamExeEnv.addSource(KafkaUtils.createSource(topic,properties))
+ .setParallelism(parallelism);
}
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
index fa4b2f0..b7173ef 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/utils/KafkaUtils.java
@@ -43,10 +43,14 @@ public class KafkaUtils {
}
public static FlinkKafkaConsumer<String> createSource(String topic){
+ return createSource(topic,getKafkaSourceProperty());
+ }
+
+ public static FlinkKafkaConsumer<String> createSource(String topic,Properties properties){
return new FlinkKafkaConsumer<String>(
topic,
new SimpleStringSchema(),
- getKafkaSourceProperty());
+ properties);
}
}
diff --git a/platform-etl/pom.xml b/platform-etl/pom.xml
new file mode 100644
index 0000000..d9effb7
--- /dev/null
+++ b/platform-etl/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-stream-schedule-platform</artifactId>
+ <groupId>com.zdjizhi</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>platform-etl</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>platform-base</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java
index 69dbd45..5c5524b 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/etl/DosDetectionEtl.java
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/DosDetectionEtl.java
@@ -1,6 +1,7 @@
-package com.zdjizhi.base.etl;
+package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
+import com.zdjizhi.base.common.CommonConfig;
import com.zdjizhi.base.common.DosSketchLog;
import com.zdjizhi.base.source.Source;
import com.zdjizhi.utils.JsonMapper;
@@ -29,7 +30,7 @@ public class DosDetectionEtl {
}
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
- return Source.createSource().flatMap(new FlatSketchLog());
+ return Source.createSource(CommonConfig.KAFKA_INPUT_TOPIC_NAME).flatMap(new FlatSketchLog());
}
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
diff --git a/pom.xml b/pom.xml
index 7b3d6e6..90a26ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,6 +12,7 @@
<module>dos-detection</module>
<module>platform-base</module>
<module>platform-schedule</module>
+ <module>platform-etl</module>
</modules>
<properties>