diff options
| author | qidaijie <[email protected]> | 2023-11-09 14:13:45 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-11-09 14:13:45 +0800 |
| commit | 0a116352d672d56cc82c28ed9f8331cc6a59e95d (patch) | |
| tree | 7393361f15735cbf8b51d291aa78510e99fe72e0 /src/main | |
| parent | f765650d9c7c2f6f1c9c5fa6a37ef74f6f896211 (diff) | |
优化配置加载方式:通过读取外部文件加载(GAL-435)
Diffstat (limited to 'src/main')
12 files changed, 243 insertions, 291 deletions
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java deleted file mode 100644 index 24702a5..0000000 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.zdjizhi.common.config; - - -import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; - -/** - * @author Administrator - */ -public class GlobalConfig { - - private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - - static { - encryptor.setPassword("galaxy"); - } - - /** - * 协议分隔符,需要转义 - */ - public static final String PROTOCOL_SPLITTER = "\\."; - - - /** - * System - */ - public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism"); - public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name"); - public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism"); - public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism"); - public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness"); - public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time"); - public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library"); - public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism"); - public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source"); - - /** - * Kafka common - */ - public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user")); - public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin")); - - - /** - * kafka sink config - */ - public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers"); - public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic"); - public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack"); - public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries"); - public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms"); - public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms"); - public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size"); - public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory"); - public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size"); - - - /** - * kafka source config - */ - public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers"); - public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic"); - public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id"); - public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms"); - public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records"); - public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes"); - - - /** - * kafka限流配置-20201117 - */ - public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type"); - - -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java b/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java deleted file mode 100644 index 0ae91e5..0000000 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.zdjizhi.common.config; - -import com.zdjizhi.utils.StringUtil; - -import java.io.IOException; -import java.util.Locale; -import java.util.Properties; - - -/** - * @author Administrator - */ - -public final class GlobalConfigLoad { - - private static Properties propDefault = new Properties(); - private static Properties propService = new Properties(); - - - static String getStringProperty(Integer type, String key) { - if (type == 0) { - return propService.getProperty(key); - } else if (type == 1) { - return propDefault.getProperty(key); - } else { - return null; - } - - } - - static Integer getIntProperty(Integer type, String key) { - if (type == 0) { - return Integer.parseInt(propService.getProperty(key)); - } else if (type == 1) { - return Integer.parseInt(propDefault.getProperty(key)); - } else { - return null; - } - } - - public static Long getLongProperty(Integer type, String key) { - if (type == 0) { - return Long.parseLong(propService.getProperty(key)); - } else if (type == 1) { - return Long.parseLong(propDefault.getProperty(key)); - } else { - return null; - } - } - - public static Boolean getBooleanProperty(Integer type, String key) { - if (type == 0) { - return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); - } else if (type == 1) { - return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); - } else { - return null; - } - } - - static { - try { - propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties")); - } catch (IOException | RuntimeException e) { - propDefault = null; - propService = null; - } - } -} diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java new file mode 100644 index 0000000..738537c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java @@ -0,0 +1,72 @@ +package com.zdjizhi.common.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Containing configuration options for the Fusion application. + * + * @author chaoc + * @since 1.0 + */ +public class MergeConfigs { + + /** + * The prefix for Kafka properties used in the source. + */ + public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props."; + + /** + * The prefix for Kafka properties used in the sink. + */ + public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props."; + + + public static final ConfigOption<String> SOURCE_KAFKA_TOPIC = + ConfigOptions.key("source.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the source."); + + + public static final ConfigOption<String> SINK_KAFKA_TOPIC = + ConfigOptions.key("sink.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the sink."); + + + public static final ConfigOption<Integer> COUNT_WINDOW_TIME = + ConfigOptions.key("count.window.time") + .intType() + .defaultValue(5) + .withDescription("The aggregate window time"); + + + public static final ConfigOption<Integer> WARTERMARK_MAX_ORDERNESS = + ConfigOptions.key("watermark.max.orderness") + .intType() + .defaultValue(5) + .withDescription("The aggregate watermark max time"); + + + public static final ConfigOption<String> STARTUP_MODE = + ConfigOptions.key("startup.mode") + .stringType() + .defaultValue("group") + .withDescription("The offset commit mode for the consumer."); + + + public static final ConfigOption<Boolean> LOG_FAILURES_ONLY = + ConfigOptions.key("log.failures.only") + .booleanType() + .defaultValue(false) + .withDescription("Defines whether the producer should fail on errors, or only log them."); + + + public static final ConfigOption<String> MEASUREMENT_NAME = + ConfigOptions.key("measurement.name") + .stringType() + .defaultValue("application_protocol_stat") + .withDescription("The data identification."); +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java new file mode 100644 index 0000000..4da7616 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java @@ -0,0 +1,44 @@ +package com.zdjizhi.common.config; + +import org.apache.flink.configuration.Configuration; + +import java.util.Properties; + +/** + * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling + * properties with a specific prefix. This class allows retrieving properties that start with the + * given `prefix` and converts them into a `java.util.Properties` object. + * + * @author chaoc + * @since 1.0 + */ + +public class MergeConfiguration { + private final Configuration config; + + public MergeConfiguration(final Configuration config) { + this.config = config; + } + + /** + * Retrieves properties from the underlying `Configuration` instance that start with the specified + * `prefix`. The properties are then converted into a `java.util.Properties` object and returned. + * + * @param prefix The prefix to filter properties. + * @return A `java.util.Properties` object containing the properties with the specified prefix. + */ + public Properties getProperties(final String prefix) { + if (prefix == null) { + final Properties props = new Properties(); + props.putAll(config.toMap()); + return props; + } + return config.toMap() + .entrySet() + .stream() + .filter(entry -> entry.getKey().startsWith(prefix)) + .collect(Properties::new, (props, e) -> + props.setProperty(e.getKey().substring(prefix.length()), e.getValue()), + Properties::putAll); + } +} diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java index 5f046c9..116c45a 100644 --- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java @@ -2,7 +2,8 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.config.GlobalConfig; +import com.zdjizhi.common.config.MergeConfigs; +import com.zdjizhi.common.config.MergeConfiguration; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; @@ -25,6 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; +import static com.zdjizhi.common.config.MergeConfigs.*; + /** * @author qidaijie * @Package com.zdjizhi.topology @@ -36,36 +39,51 @@ public class ApplicationProtocolTopology { public static void main(String[] args) { try { + + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); + + + //水印 WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy - .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS)) - .withTimestampAssigner((element,timestamp) -> element.f2); + .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS))) + .withTimestampAssigner((element, timestamp) -> element.f2); //数据源 - DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) - .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC); + DataStream<String> streamSource = environment.addSource( + KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), + config.get(SOURCE_KAFKA_TOPIC), + config.get(STARTUP_MODE))); //解析数据 SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData()) .assignTimestampsAndWatermarks(strategyForSession) - .name("ParseDataProcess") - .setParallelism(GlobalConfig.PARSE_PARALLELISM); + .name("ParseDataProcess"); //增量聚合窗口 SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy()) - .window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) + .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME)))) .reduce(new DispersionCountWindow(), new MergeCountWindow()) - .name("DispersionCountWindow") - .setParallelism(GlobalConfig.WINDOW_PARALLELISM); + .name("DispersionCountWindow"); //拆分数据 SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) - .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM); + .name("ResultFlatMap"); //输出 - resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) - .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC); + resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), + config.get(SINK_KAFKA_TOPIC), + config.get(LOG_FAILURES_ONLY))); environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE"); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java index 22f2f51..28c01f5 100644 --- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java +++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java @@ -3,9 +3,6 @@ package com.zdjizhi.utils.functions.map; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.JSONWriter; -import com.zdjizhi.common.config.GlobalConfig; -import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.StringUtil; @@ -20,9 +17,14 @@ import org.apache.flink.util.Collector; */ public class ResultFlatMap implements FlatMapFunction<Metrics, String> { private static final Log logger = LogFactory.get(); + /** + * 协议分隔符,需要转义 + */ + private static final String PROTOCOL_SPLITTER = "\\."; + @Override - public void flatMap(Metrics metrics, Collector<String> out) throws Exception { + public void flatMap(Metrics metrics, Collector<String> out) { try { Tags tags = metrics.getTags(); String protocolStackId = tags.getProtocol_stack_id(); @@ -30,7 +32,7 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> { tags.setApp_name(null); StringBuilder stringBuilder = new StringBuilder(); - String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER); + String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER); int protocolIdsNum = protocolIds.length; for (int i = 0; i < protocolIdsNum - 1; i++) { if (StringUtil.isBlank(stringBuilder.toString())) { diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java index 8216320..44276e2 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java @@ -19,8 +19,9 @@ import org.apache.flink.api.java.tuple.Tuple3; public class DispersionCountWindow implements ReduceFunction<Tuple3<Tags, Fields, Long>> { private static final Log logger = LogFactory.get(); + @Override - public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) throws Exception { + public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) { try { Fields cacheData = value1.f1; Fields newData = value2.f1; diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java index 2677855..4766000 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java @@ -2,12 +2,12 @@ package com.zdjizhi.utils.functions.statistics; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.config.GlobalConfig; +import com.zdjizhi.common.config.MergeConfigs; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -18,17 +18,30 @@ import org.apache.flink.util.Collector; * @Description: * @date 2023/4/2314:43 */ -public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields,Long>, Metrics, String, TimeWindow> { +public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields, Long>, Metrics, String, TimeWindow> { private static final Log logger = LogFactory.get(); + private String NAME = null; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final Configuration configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + + NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME); + + } + @Override - public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields,Long>> input, Collector<Metrics> output) throws Exception { + public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields, Long>> input, Collector<Metrics> output) { try { long timestamp_ms = context.window().getStart(); - for (Tuple3<Tags, Fields,Long> tuple : input) { + for (Tuple3<Tags, Fields, Long> tuple : input) { Tags tags = tuple.f0; Fields fields = tuple.f1; - Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp_ms); + Metrics metrics = new Metrics(NAME, tags, fields, timestamp_ms); output.collect(metrics); } diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java index 6a48bcf..cc8b32c 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java @@ -2,7 +2,6 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.utils.StringUtil; import org.apache.datasketches.hll.HllSketch; @@ -19,8 +18,6 @@ import java.util.Base64; */ public class MetricUtil { private static final Log logger = LogFactory.get(); - private static final String METRICS_DEFAULT_TYPE = "agent"; - /** * 用于对业务指标进行统计 @@ -58,28 +55,26 @@ public class MetricUtil { Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); - if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) { - String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); - return new Fields(sessions, - inBytes, outBytes, inPkts, outPkts, - c2sPkts, s2cPkts, c2sBytes, s2cBytes, - c2sFragments, s2cFragments, - c2sTcpLostBytes, s2cTcpLostBytes, - c2sTcpooorderPkts, s2cTcpooorderPkts, - c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, - c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, - clientIpSketch); - } else { - return new Fields(sessions, - inBytes, outBytes, inPkts, outPkts, - c2sPkts, s2cPkts, c2sBytes, s2cBytes, - c2sFragments, s2cFragments, - c2sTcpLostBytes, s2cTcpLostBytes, - c2sTcpooorderPkts, s2cTcpooorderPkts, - c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, - c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, - null); - } +// String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); +// return new Fields(sessions, +// inBytes, outBytes, inPkts, outPkts, +// c2sPkts, s2cPkts, c2sBytes, s2cBytes, +// c2sFragments, s2cFragments, +// c2sTcpLostBytes, s2cTcpLostBytes, +// c2sTcpooorderPkts, s2cTcpooorderPkts, +// c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, +// c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, +// clientIpSketch); + + return new Fields(sessions, + inBytes, outBytes, inPkts, outPkts, + c2sPkts, s2cPkts, c2sBytes, s2cBytes, + c2sFragments, s2cFragments, + c2sTcpLostBytes, s2cTcpLostBytes, + c2sTcpooorderPkts, s2cTcpooorderPkts, + c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, + c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, + null); } /** diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java deleted file mode 100644 index 877b2e6..0000000 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.zdjizhi.utils.kafka; - -import com.zdjizhi.common.config.GlobalConfig; -import org.apache.kafka.common.config.SslConfigs; - -import java.util.Properties; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.kafka - * @Description: - * @date 2021/9/610:37 - */ -class CertUtils { - /** - * Kafka SASL认证端口 - */ - private static final String SASL_PORT = "9094"; - - /** - * Kafka SSL认证端口 - */ - private static final String SSL_PORT = "9095"; - - /** - * 根据连接信息端口判断认证方式。 - * - * @param servers kafka 连接信息 - * @param properties kafka 连接配置信息 - */ - static void chooseCert(String servers, Properties properties) { - if (servers.contains(SASL_PORT)) { - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";"); - } else if (servers.contains(SSL_PORT)) { - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN); - } - - } -} diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index b0bc2fb..397814f 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -1,8 +1,8 @@ package com.zdjizhi.utils.kafka; -import com.zdjizhi.common.config.GlobalConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; @@ -13,35 +13,40 @@ import java.util.Properties; * @date 2021/6/813:54 */ public class KafkaConsumer { - private static Properties createConsumerConfig() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS); - properties.put("group.id", GlobalConfig.GROUP_ID); - properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS); - properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS); - properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES); - properties.put("partition.discovery.interval.ms", "10000"); - - CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties); - - return properties; - } /** * 官方序列化kafka数据 * * @return kafka logs */ - public static FlinkKafkaConsumer<String> getKafkaConsumer() { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC, - new SimpleStringSchema(), createConsumerConfig()); - - //随着checkpoint提交,将offset提交到kafka - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - - //从消费组当前的offset开始消费 - kafkaConsumer.setStartFromGroupOffsets(); + public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) { + + setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); + setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000); + setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280); + + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); + + switch (startupMode) { + case "group": + kafkaConsumer.setStartFromGroupOffsets(); + break; + case "latest": + kafkaConsumer.setStartFromLatest(); + break; + case "earliest": + kafkaConsumer.setStartFromEarliest(); + break; + default: + kafkaConsumer.setStartFromGroupOffsets(); + } return kafkaConsumer; } + + private static void setDefaultConfig(Properties properties, String key, Object value) { + if (!properties.contains(key)) { + properties.put(key, value); + } + } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index ca62061..c7cd3f2 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -1,9 +1,7 @@ package com.zdjizhi.utils.kafka; -import com.zdjizhi.common.config.GlobalConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Optional; import java.util.Properties; @@ -16,33 +14,29 @@ import java.util.Properties; */ public class KafkaProducer { - private static Properties createProducerConfig() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS); - properties.put("acks", GlobalConfig.PRODUCER_ACK); - properties.put("retries", GlobalConfig.RETRIES); - properties.put("linger.ms", GlobalConfig.LINGER_MS); - properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS); - properties.put("batch.size", GlobalConfig.BATCH_SIZE); - properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY); - properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE); - properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - - CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties); - - return properties; - } - - - public static FlinkKafkaProducer<String> getKafkaProducer() { - FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( - GlobalConfig.SINK_KAFKA_TOPIC, + public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) { + setDefaultConfig(properties, "ack", 1); + setDefaultConfig(properties, "retries", 0); + setDefaultConfig(properties, "linger.ms", 10); + setDefaultConfig(properties, "request.timeout.ms", 30000); + setDefaultConfig(properties, "batch.size", 262144); + setDefaultConfig(properties, "buffer.memory", 134217728); + setDefaultConfig(properties, "max.request.size", 10485760); + setDefaultConfig(properties, "compression.type", "snappy"); + + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( + topic, new SimpleStringSchema(), - createProducerConfig(), Optional.empty()); + properties, Optional.empty()); - //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 - kafkaProducer.setLogFailuresOnly(true); + kafkaProducer.setLogFailuresOnly(logFailuresOnly); return kafkaProducer; } + + private static void setDefaultConfig(Properties properties, String key, Object value) { + if (!properties.contains(key)) { + properties.put(key, value); + } + } } |
