diff options
| author | qidaijie <[email protected]> | 2023-11-09 14:13:45 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-11-09 14:13:45 +0800 |
| commit | 0a116352d672d56cc82c28ed9f8331cc6a59e95d (patch) | |
| tree | 7393361f15735cbf8b51d291aa78510e99fe72e0 /src/test | |
| parent | f765650d9c7c2f6f1c9c5fa6a37ef74f6f896211 (diff) | |
优化配置加载方式:通过读取外部文件加载(GAL-435)
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/java/com/zdjizhi/ConfigTest.java | 8 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/ConventionalTest.java | 3 |
2 files changed, 6 insertions, 5 deletions
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java index 7b9580b..7a2b5d3 100644 --- a/src/test/java/com/zdjizhi/ConfigTest.java +++ b/src/test/java/com/zdjizhi/ConfigTest.java @@ -1,11 +1,10 @@ package com.zdjizhi; import com.zdjizhi.conf.FusionConfiguration; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -30,6 +29,7 @@ public class ConfigTest { System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); + final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( config.get(SOURCE_KAFKA_TOPIC), new SimpleStringSchema(), @@ -41,12 +41,14 @@ public class ConfigTest { sourceStream.process(new ProcessFunction<String, String>() { @Override - public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception { + public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) { out.collect(value); } }).print(); + + environment.execute(); } catch (Exception e) { diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java index 7900d61..287b3fb 100644 --- a/src/test/java/com/zdjizhi/ConventionalTest.java +++ b/src/test/java/com/zdjizhi/ConventionalTest.java @@ -1,6 +1,5 @@ package com.zdjizhi; -import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.utils.StringUtil; import org.junit.Test; @@ -22,7 +21,7 @@ public class ConventionalTest { System.out.println(protocol); StringBuffer stringBuffer = new StringBuffer(); String appName = "qq_r2"; - String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER); + String[] protocolIds = protocol.split("\\."); for (String proto : protocolIds) { if (StringUtil.isBlank(stringBuffer.toString())) { stringBuffer.append(proto); |
