summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-11-09 14:13:45 +0800
committerqidaijie <[email protected]>2023-11-09 14:13:45 +0800
commit0a116352d672d56cc82c28ed9f8331cc6a59e95d (patch)
tree7393361f15735cbf8b51d291aa78510e99fe72e0 /src/main
parentf765650d9c7c2f6f1c9c5fa6a37ef74f6f896211 (diff)
优化配置加载方式:通过读取外部文件加载(GAL-435)
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/zdjizhi/common/config/GlobalConfig.java74
-rw-r--r--src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java70
-rw-r--r--src/main/java/com/zdjizhi/common/config/MergeConfigs.java72
-rw-r--r--src/main/java/com/zdjizhi/common/config/MergeConfiguration.java44
-rw-r--r--src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java44
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java12
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java3
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java25
-rw-r--r--src/main/java/com/zdjizhi/utils/general/MetricUtil.java45
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java51
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java46
12 files changed, 243 insertions, 291 deletions
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
deleted file mode 100644
index 24702a5..0000000
--- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package com.zdjizhi.common.config;
-
-
-import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
-
-/**
- * @author Administrator
- */
-public class GlobalConfig {
-
- private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
-
- static {
- encryptor.setPassword("galaxy");
- }
-
- /**
- * 协议分隔符,需要转义
- */
- public static final String PROTOCOL_SPLITTER = "\\.";
-
-
- /**
- * System
- */
- public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism");
- public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name");
- public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
- public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
- public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness");
- public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
- public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
- public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
- public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source");
-
- /**
- * Kafka common
- */
- public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user"));
- public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin"));
-
-
- /**
- * kafka sink config
- */
- public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers");
- public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic");
- public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack");
- public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries");
- public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms");
- public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms");
- public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size");
- public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory");
- public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size");
-
-
- /**
- * kafka source config
- */
- public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers");
- public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic");
- public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id");
- public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms");
- public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records");
- public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes");
-
-
- /**
- * kafka限流配置-20201117
- */
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type");
-
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java b/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java
deleted file mode 100644
index 0ae91e5..0000000
--- a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.zdjizhi.common.config;
-
-import com.zdjizhi.utils.StringUtil;
-
-import java.io.IOException;
-import java.util.Locale;
-import java.util.Properties;
-
-
-/**
- * @author Administrator
- */
-
-public final class GlobalConfigLoad {
-
- private static Properties propDefault = new Properties();
- private static Properties propService = new Properties();
-
-
- static String getStringProperty(Integer type, String key) {
- if (type == 0) {
- return propService.getProperty(key);
- } else if (type == 1) {
- return propDefault.getProperty(key);
- } else {
- return null;
- }
-
- }
-
- static Integer getIntProperty(Integer type, String key) {
- if (type == 0) {
- return Integer.parseInt(propService.getProperty(key));
- } else if (type == 1) {
- return Integer.parseInt(propDefault.getProperty(key));
- } else {
- return null;
- }
- }
-
- public static Long getLongProperty(Integer type, String key) {
- if (type == 0) {
- return Long.parseLong(propService.getProperty(key));
- } else if (type == 1) {
- return Long.parseLong(propDefault.getProperty(key));
- } else {
- return null;
- }
- }
-
- public static Boolean getBooleanProperty(Integer type, String key) {
- if (type == 0) {
- return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else if (type == 1) {
- return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else {
- return null;
- }
- }
-
- static {
- try {
- propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties"));
- } catch (IOException | RuntimeException e) {
- propDefault = null;
- propService = null;
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
new file mode 100644
index 0000000..738537c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
@@ -0,0 +1,72 @@
+package com.zdjizhi.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Containing configuration options for the Fusion application.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+public class MergeConfigs {
+
+ /**
+ * The prefix for Kafka properties used in the source.
+ */
+ public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
+
+ /**
+ * The prefix for Kafka properties used in the sink.
+ */
+ public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
+
+
+ public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
+ ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the source.");
+
+
+ public static final ConfigOption<String> SINK_KAFKA_TOPIC =
+ ConfigOptions.key("sink.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the sink.");
+
+
+ public static final ConfigOption<Integer> COUNT_WINDOW_TIME =
+ ConfigOptions.key("count.window.time")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The aggregate window time");
+
+
+ public static final ConfigOption<Integer> WARTERMARK_MAX_ORDERNESS =
+ ConfigOptions.key("watermark.max.orderness")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The aggregate watermark max time");
+
+
+ public static final ConfigOption<String> STARTUP_MODE =
+ ConfigOptions.key("startup.mode")
+ .stringType()
+ .defaultValue("group")
+ .withDescription("The offset commit mode for the consumer.");
+
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Defines whether the producer should fail on errors, or only log them.");
+
+
+ public static final ConfigOption<String> MEASUREMENT_NAME =
+ ConfigOptions.key("measurement.name")
+ .stringType()
+ .defaultValue("application_protocol_stat")
+ .withDescription("The data identification.");
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java
new file mode 100644
index 0000000..4da7616
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java
@@ -0,0 +1,44 @@
+package com.zdjizhi.common.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Properties;
+
+/**
+ * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
+ * properties with a specific prefix. This class allows retrieving properties that start with the
+ * given `prefix` and converts them into a `java.util.Properties` object.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+
+public class MergeConfiguration {
+ private final Configuration config;
+
+ public MergeConfiguration(final Configuration config) {
+ this.config = config;
+ }
+
+ /**
+ * Retrieves properties from the underlying `Configuration` instance that start with the specified
+ * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
+ *
+ * @param prefix The prefix to filter properties.
+ * @return A `java.util.Properties` object containing the properties with the specified prefix.
+ */
+ public Properties getProperties(final String prefix) {
+ if (prefix == null) {
+ final Properties props = new Properties();
+ props.putAll(config.toMap());
+ return props;
+ }
+ return config.toMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Properties::new, (props, e) ->
+ props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
+ Properties::putAll);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index 5f046c9..116c45a 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -2,7 +2,8 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
@@ -25,6 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
+import static com.zdjizhi.common.config.MergeConfigs.*;
+
/**
* @author qidaijie
* @Package com.zdjizhi.topology
@@ -36,36 +39,51 @@ public class ApplicationProtocolTopology {
public static void main(String[] args) {
try {
+
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+
+ //水印
WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy
- .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS))
- .withTimestampAssigner((element,timestamp) -> element.f2);
+ .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
+ .withTimestampAssigner((element, timestamp) -> element.f2);
//数据源
- DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
- .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC);
+ DataStream<String> streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
//解析数据
SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData())
.assignTimestampsAndWatermarks(strategyForSession)
- .name("ParseDataProcess")
- .setParallelism(GlobalConfig.PARSE_PARALLELISM);
+ .name("ParseDataProcess");
//增量聚合窗口
SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
- .window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
.reduce(new DispersionCountWindow(), new MergeCountWindow())
- .name("DispersionCountWindow")
- .setParallelism(GlobalConfig.WINDOW_PARALLELISM);
+ .name("DispersionCountWindow");
//拆分数据
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
- .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM);
+ .name("ResultFlatMap");
//输出
- resultFlatMap.addSink(KafkaProducer.getKafkaProducer())
- .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
+ resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
+ config.get(SINK_KAFKA_TOPIC),
+ config.get(LOG_FAILURES_ONLY)));
environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE");
} catch (Exception e) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
index 22f2f51..28c01f5 100644
--- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
+++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
@@ -3,9 +3,6 @@ package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
-import com.alibaba.fastjson2.JSONWriter;
-import com.zdjizhi.common.config.GlobalConfig;
-import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil;
@@ -20,9 +17,14 @@ import org.apache.flink.util.Collector;
*/
public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
private static final Log logger = LogFactory.get();
+ /**
+ * 协议分隔符,需要转义
+ */
+ private static final String PROTOCOL_SPLITTER = "\\.";
+
@Override
- public void flatMap(Metrics metrics, Collector<String> out) throws Exception {
+ public void flatMap(Metrics metrics, Collector<String> out) {
try {
Tags tags = metrics.getTags();
String protocolStackId = tags.getProtocol_stack_id();
@@ -30,7 +32,7 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
tags.setApp_name(null);
StringBuilder stringBuilder = new StringBuilder();
- String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER);
+ String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) {
if (StringUtil.isBlank(stringBuilder.toString())) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
index 8216320..44276e2 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
@@ -19,8 +19,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
public class DispersionCountWindow implements ReduceFunction<Tuple3<Tags, Fields, Long>> {
private static final Log logger = LogFactory.get();
+
@Override
- public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) throws Exception {
+ public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) {
try {
Fields cacheData = value1.f1;
Fields newData = value2.f1;
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
index 2677855..4766000 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
@@ -2,12 +2,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
+import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -18,17 +18,30 @@ import org.apache.flink.util.Collector;
* @Description:
* @date 2023/4/2314:43
*/
-public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields,Long>, Metrics, String, TimeWindow> {
+public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields, Long>, Metrics, String, TimeWindow> {
private static final Log logger = LogFactory.get();
+ private String NAME = null;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ final Configuration configuration = (Configuration) getRuntimeContext()
+ .getExecutionConfig().getGlobalJobParameters();
+
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+
+ }
+
@Override
- public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields,Long>> input, Collector<Metrics> output) throws Exception {
+ public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields, Long>> input, Collector<Metrics> output) {
try {
long timestamp_ms = context.window().getStart();
- for (Tuple3<Tags, Fields,Long> tuple : input) {
+ for (Tuple3<Tags, Fields, Long> tuple : input) {
Tags tags = tuple.f0;
Fields fields = tuple.f1;
- Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp_ms);
+ Metrics metrics = new Metrics(NAME, tags, fields, timestamp_ms);
output.collect(metrics);
}
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
index 6a48bcf..cc8b32c 100644
--- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
+++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.utils.StringUtil;
import org.apache.datasketches.hll.HllSketch;
@@ -19,8 +18,6 @@ import java.util.Base64;
*/
public class MetricUtil {
private static final Log logger = LogFactory.get();
- private static final String METRICS_DEFAULT_TYPE = "agent";
-
/**
* 用于对业务指标进行统计
@@ -58,28 +55,26 @@ public class MetricUtil {
Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
- if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) {
- String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
- return new Fields(sessions,
- inBytes, outBytes, inPkts, outPkts,
- c2sPkts, s2cPkts, c2sBytes, s2cBytes,
- c2sFragments, s2cFragments,
- c2sTcpLostBytes, s2cTcpLostBytes,
- c2sTcpooorderPkts, s2cTcpooorderPkts,
- c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
- c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
- clientIpSketch);
- } else {
- return new Fields(sessions,
- inBytes, outBytes, inPkts, outPkts,
- c2sPkts, s2cPkts, c2sBytes, s2cBytes,
- c2sFragments, s2cFragments,
- c2sTcpLostBytes, s2cTcpLostBytes,
- c2sTcpooorderPkts, s2cTcpooorderPkts,
- c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
- c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
- null);
- }
+// String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
+// return new Fields(sessions,
+// inBytes, outBytes, inPkts, outPkts,
+// c2sPkts, s2cPkts, c2sBytes, s2cBytes,
+// c2sFragments, s2cFragments,
+// c2sTcpLostBytes, s2cTcpLostBytes,
+// c2sTcpooorderPkts, s2cTcpooorderPkts,
+// c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
+// c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
+// clientIpSketch);
+
+ return new Fields(sessions,
+ inBytes, outBytes, inPkts, outPkts,
+ c2sPkts, s2cPkts, c2sBytes, s2cBytes,
+ c2sFragments, s2cFragments,
+ c2sTcpLostBytes, s2cTcpLostBytes,
+ c2sTcpooorderPkts, s2cTcpooorderPkts,
+ c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
+ c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
+ null);
}
/**
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
deleted file mode 100644
index 877b2e6..0000000
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.zdjizhi.utils.kafka;
-
-import com.zdjizhi.common.config.GlobalConfig;
-import org.apache.kafka.common.config.SslConfigs;
-
-import java.util.Properties;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.kafka
- * @Description:
- * @date 2021/9/610:37
- */
-class CertUtils {
- /**
- * Kafka SASL认证端口
- */
- private static final String SASL_PORT = "9094";
-
- /**
- * Kafka SSL认证端口
- */
- private static final String SSL_PORT = "9095";
-
- /**
- * 根据连接信息端口判断认证方式。
- *
- * @param servers kafka 连接信息
- * @param properties kafka 连接配置信息
- */
- static void chooseCert(String servers, Properties properties) {
- if (servers.contains(SASL_PORT)) {
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";");
- } else if (servers.contains(SSL_PORT)) {
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- }
-
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index b0bc2fb..397814f 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -1,8 +1,8 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
@@ -13,35 +13,40 @@ import java.util.Properties;
* @date 2021/6/813:54
*/
public class KafkaConsumer {
- private static Properties createConsumerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS);
- properties.put("group.id", GlobalConfig.GROUP_ID);
- properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS);
- properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS);
- properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES);
- properties.put("partition.discovery.interval.ms", "10000");
-
- CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties);
-
- return properties;
- }
/**
* 官方序列化kafka数据
*
* @return kafka logs
*/
- public static FlinkKafkaConsumer<String> getKafkaConsumer() {
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC,
- new SimpleStringSchema(), createConsumerConfig());
-
- //随着checkpoint提交,将offset提交到kafka
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
-
- //从消费组当前的offset开始消费
- kafkaConsumer.setStartFromGroupOffsets();
+ public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) {
+
+ setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
+
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
+
+ switch (startupMode) {
+ case "group":
+ kafkaConsumer.setStartFromGroupOffsets();
+ break;
+ case "latest":
+ kafkaConsumer.setStartFromLatest();
+ break;
+ case "earliest":
+ kafkaConsumer.setStartFromEarliest();
+ break;
+ default:
+ kafkaConsumer.setStartFromGroupOffsets();
+ }
return kafkaConsumer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index ca62061..c7cd3f2 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -1,9 +1,7 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Optional;
import java.util.Properties;
@@ -16,33 +14,29 @@ import java.util.Properties;
*/
public class KafkaProducer {
- private static Properties createProducerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS);
- properties.put("acks", GlobalConfig.PRODUCER_ACK);
- properties.put("retries", GlobalConfig.RETRIES);
- properties.put("linger.ms", GlobalConfig.LINGER_MS);
- properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", GlobalConfig.BATCH_SIZE);
- properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY);
- properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE);
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
-
- CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties);
-
- return properties;
- }
-
-
- public static FlinkKafkaProducer<String> getKafkaProducer() {
- FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
- GlobalConfig.SINK_KAFKA_TOPIC,
+ public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
+ setDefaultConfig(properties, "ack", 1);
+ setDefaultConfig(properties, "retries", 0);
+ setDefaultConfig(properties, "linger.ms", 10);
+ setDefaultConfig(properties, "request.timeout.ms", 30000);
+ setDefaultConfig(properties, "batch.size", 262144);
+ setDefaultConfig(properties, "buffer.memory", 134217728);
+ setDefaultConfig(properties, "max.request.size", 10485760);
+ setDefaultConfig(properties, "compression.type", "snappy");
+
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
+ topic,
new SimpleStringSchema(),
- createProducerConfig(), Optional.empty());
+ properties, Optional.empty());
- //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
- kafkaProducer.setLogFailuresOnly(true);
+ kafkaProducer.setLogFailuresOnly(logFailuresOnly);
return kafkaProducer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}