summaryrefslogtreecommitdiff
path: root/src/test/java
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-09-18 15:09:51 +0800
committerqidaijie <[email protected]>2023-09-18 15:09:51 +0800
commitf765650d9c7c2f6f1c9c5fa6a37ef74f6f896211 (patch)
tree2148526693012355361d53d0d7f55d472bca5fde /src/test/java
parent3c5af945c30de0400a5b1c274505ef74486ba8ff (diff)
统计时间戳字段重命名timestamp改为timestamp_ms。(TSG-17084)v1.6
Diffstat (limited to 'src/test/java')
-rw-r--r--src/test/java/com/zdjizhi/ConfigTest.java56
-rw-r--r--src/test/java/com/zdjizhi/conf/FusionConfigs.java34
-rw-r--r--src/test/java/com/zdjizhi/conf/FusionConfiguration.java36
3 files changed, 126 insertions, 0 deletions
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java
new file mode 100644
index 0000000..7b9580b
--- /dev/null
+++ b/src/test/java/com/zdjizhi/ConfigTest.java
@@ -0,0 +1,56 @@
+package com.zdjizhi;
+
+import com.zdjizhi.conf.FusionConfiguration;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.util.Collector;
+
+import static com.zdjizhi.conf.FusionConfigs.*;
+
+public class ConfigTest {
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final ParameterTool tool;
+ try {
+ tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties");
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
+
+ System.out.println(config.get(SOURCE_KAFKA_TOPIC));
+ System.out.println(config.get(SINK_KAFKA_TOPIC));
+ System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
+ System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
+
+ final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
+ config.get(SOURCE_KAFKA_TOPIC),
+ new SimpleStringSchema(),
+ fusionConfiguration
+ .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
+
+ final DataStreamSource<String> sourceStream = environment.addSource(kafkaConsumer);
+
+ sourceStream.process(new ProcessFunction<String, String>() {
+
+ @Override
+ public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
+
+ out.collect(value);
+ }
+ }).print();
+
+ environment.execute();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/conf/FusionConfigs.java b/src/test/java/com/zdjizhi/conf/FusionConfigs.java
new file mode 100644
index 0000000..ca18112
--- /dev/null
+++ b/src/test/java/com/zdjizhi/conf/FusionConfigs.java
@@ -0,0 +1,34 @@
+package com.zdjizhi.conf;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FusionConfigs {
+ /**
+ * The prefix for Kafka properties used in the source.
+ */
+ public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
+
+ /**
+ * The prefix for Kafka properties used in the sink.
+ */
+ public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
+ /**
+ * Configuration option for the Kafka topic used in the source.
+ */
+ public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
+ ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the source.");
+
+ /**
+ * Configuration option for the Kafka topic used in the sink.
+ */
+ public static final ConfigOption<String> SINK_KAFKA_TOPIC =
+ ConfigOptions.key("sink.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the sink.");
+
+}
diff --git a/src/test/java/com/zdjizhi/conf/FusionConfiguration.java b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java
new file mode 100644
index 0000000..1e6dcf2
--- /dev/null
+++ b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.conf;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Properties;
+
+public class FusionConfiguration {
+ private final Configuration config;
+
+ public FusionConfiguration(final Configuration config) {
+ this.config = config;
+ }
+
+ /**
+ * Retrieves properties from the underlying `Configuration` instance that start with the specified
+ * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
+ *
+ * @param prefix The prefix to filter properties.
+ * @return A `java.util.Properties` object containing the properties with the specified prefix.
+ */
+ public Properties getProperties(final String prefix) {
+ if (prefix == null) {
+ final Properties props = new Properties();
+ props.putAll(config.toMap());
+ return props;
+ }
+ return config.toMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Properties::new, (props, e) ->
+ props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
+ Properties::putAll);
+ }
+
+}