diff options
| author | qidaijie <[email protected]> | 2023-09-18 15:09:51 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-09-18 15:09:51 +0800 |
| commit | f765650d9c7c2f6f1c9c5fa6a37ef74f6f896211 (patch) | |
| tree | 2148526693012355361d53d0d7f55d472bca5fde /src/test/java | |
| parent | 3c5af945c30de0400a5b1c274505ef74486ba8ff (diff) | |
统计时间戳字段重命名timestamp改为timestamp_ms。(TSG-17084)v1.6
Diffstat (limited to 'src/test/java')
| -rw-r--r-- | src/test/java/com/zdjizhi/ConfigTest.java | 56 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/conf/FusionConfigs.java | 34 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/conf/FusionConfiguration.java | 36 |
3 files changed, 126 insertions, 0 deletions
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java new file mode 100644 index 0000000..7b9580b --- /dev/null +++ b/src/test/java/com/zdjizhi/ConfigTest.java @@ -0,0 +1,56 @@ +package com.zdjizhi; + +import com.zdjizhi.conf.FusionConfiguration; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.util.Collector; + +import static com.zdjizhi.conf.FusionConfigs.*; + +public class ConfigTest { + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + final ParameterTool tool; + try { + tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties"); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final FusionConfiguration fusionConfiguration = new FusionConfiguration(config); + + System.out.println(config.get(SOURCE_KAFKA_TOPIC)); + System.out.println(config.get(SINK_KAFKA_TOPIC)); + System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); + System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); + + final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( + config.get(SOURCE_KAFKA_TOPIC), + new SimpleStringSchema(), + fusionConfiguration + .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); + + final DataStreamSource<String> sourceStream = environment.addSource(kafkaConsumer); + + sourceStream.process(new ProcessFunction<String, String>() { + + @Override + public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception { + + out.collect(value); + } + }).print(); + + environment.execute(); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/com/zdjizhi/conf/FusionConfigs.java b/src/test/java/com/zdjizhi/conf/FusionConfigs.java new file mode 100644 index 0000000..ca18112 --- /dev/null +++ b/src/test/java/com/zdjizhi/conf/FusionConfigs.java @@ -0,0 +1,34 @@ +package com.zdjizhi.conf; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class FusionConfigs { + /** + * The prefix for Kafka properties used in the source. + */ + public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props."; + + /** + * The prefix for Kafka properties used in the sink. + */ + public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props."; + /** + * Configuration option for the Kafka topic used in the source. + */ + public static final ConfigOption<String> SOURCE_KAFKA_TOPIC = + ConfigOptions.key("source.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the source."); + + /** + * Configuration option for the Kafka topic used in the sink. + */ + public static final ConfigOption<String> SINK_KAFKA_TOPIC = + ConfigOptions.key("sink.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the sink."); + +} diff --git a/src/test/java/com/zdjizhi/conf/FusionConfiguration.java b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java new file mode 100644 index 0000000..1e6dcf2 --- /dev/null +++ b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java @@ -0,0 +1,36 @@ +package com.zdjizhi.conf; + +import org.apache.flink.configuration.Configuration; + +import java.util.Properties; + +public class FusionConfiguration { + private final Configuration config; + + public FusionConfiguration(final Configuration config) { + this.config = config; + } + + /** + * Retrieves properties from the underlying `Configuration` instance that start with the specified + * `prefix`. The properties are then converted into a `java.util.Properties` object and returned. + * + * @param prefix The prefix to filter properties. + * @return A `java.util.Properties` object containing the properties with the specified prefix. + */ + public Properties getProperties(final String prefix) { + if (prefix == null) { + final Properties props = new Properties(); + props.putAll(config.toMap()); + return props; + } + return config.toMap() + .entrySet() + .stream() + .filter(entry -> entry.getKey().startsWith(prefix)) + .collect(Properties::new, (props, e) -> + props.setProperty(e.getKey().substring(prefix.length()), e.getValue()), + Properties::putAll); + } + +} |
