summaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-11-09 14:13:45 +0800
committerqidaijie <[email protected]>2023-11-09 14:13:45 +0800
commit0a116352d672d56cc82c28ed9f8331cc6a59e95d (patch)
tree7393361f15735cbf8b51d291aa78510e99fe72e0 /src/test
parentf765650d9c7c2f6f1c9c5fa6a37ef74f6f896211 (diff)
优化配置加载方式:通过读取外部文件加载(GAL-435)
Diffstat (limited to 'src/test')
-rw-r--r--src/test/java/com/zdjizhi/ConfigTest.java8
-rw-r--r--src/test/java/com/zdjizhi/ConventionalTest.java3
2 files changed, 6 insertions, 5 deletions
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java
index 7b9580b..7a2b5d3 100644
--- a/src/test/java/com/zdjizhi/ConfigTest.java
+++ b/src/test/java/com/zdjizhi/ConfigTest.java
@@ -1,11 +1,10 @@
package com.zdjizhi;
import com.zdjizhi.conf.FusionConfiguration;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -30,6 +29,7 @@ public class ConfigTest {
System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
+
final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
config.get(SOURCE_KAFKA_TOPIC),
new SimpleStringSchema(),
@@ -41,12 +41,14 @@ public class ConfigTest {
sourceStream.process(new ProcessFunction<String, String>() {
@Override
- public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
+ public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) {
out.collect(value);
}
}).print();
+
+
environment.execute();
} catch (Exception e) {
diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java
index 7900d61..287b3fb 100644
--- a/src/test/java/com/zdjizhi/ConventionalTest.java
+++ b/src/test/java/com/zdjizhi/ConventionalTest.java
@@ -1,6 +1,5 @@
package com.zdjizhi;
-import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.utils.StringUtil;
import org.junit.Test;
@@ -22,7 +21,7 @@ public class ConventionalTest {
System.out.println(protocol);
StringBuffer stringBuffer = new StringBuffer();
String appName = "qq_r2";
- String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER);
+ String[] protocolIds = protocol.split("\\.");
for (String proto : protocolIds) {
if (StringUtil.isBlank(stringBuffer.toString())) {
stringBuffer.append(proto);