summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-03-08 15:18:28 +0800
committerqidaijie <[email protected]>2022-03-08 15:18:28 +0800
commitdaea0a07a1acd1b6ea6a57419447a8b20d4e8802 (patch)
treefac87fd65de9348304ac31518592d37e1e6f3307
parent53b07555f295321de33e990a90dd6aea3f3feef3 (diff)
1:增加kafka序列化类,用于获取日志写入kafka的时间戳。TSG-9844
2:删除kafka认证类型,通过连接端口判断。 3:删除强匹配模式;仅适用弱匹配和不匹配即可满足需求。
-rw-r--r--pom.xml2
-rw-r--r--properties/default_config.properties14
-rw-r--r--properties/service_flow_config.properties12
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java2
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java35
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java20
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormMap.java39
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormObject.java153
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java38
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Consumer.java)35
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Producer.java)5
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java47
15 files changed, 173 insertions, 289 deletions
diff --git a/pom.xml b/pom.xml
index 217901d..69d563f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
- <version>220209-ipLookup</version>
+ <version>220308-IngestionTime</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index ebf7927..6a01de4 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -1,4 +1,4 @@
-#====================Kafka Consumer====================#
+#====================Kafka KafkaConsumer====================#
#kafka source connection timeout
session.timeout.ms=60000
@@ -7,7 +7,7 @@ max.poll.records=3000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
-#====================Kafka Producer====================#
+#====================Kafka KafkaProducer====================#
#producer重试的次数设置
retries=0
@@ -28,12 +28,6 @@ buffer.memory=134217728
#10M
max.request.size=10485760
#====================kafka default====================#
-#kafka source protocol; SSL or SASL
-kafka.source.protocol=SASL
-
-#kafka sink protocol; SSL or SASL
-kafka.sink.protocol=SASL
-
#kafka SASL验证用户名
kafka.user=admin
@@ -47,8 +41,8 @@ hbase.table.name=tsg_galaxy:relation_framedip_account
#邮件默认编码
mail.default.charset=UTF-8
-#0不做任何校验,1强类型校验,2弱类型校验
-log.transform.type=2
+#0不做任何校验,1弱类型校验
+log.transform.type=0
#两个输出之间的最大时间(单位milliseconds)
buffer.timeout=5000 \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index ddd10f6..df12fa7 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,23 +1,23 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=192.168.44.11:9094
+source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
-sink.kafka.servers=192.168.44.11:9094
+sink.kafka.servers=192.168.44.12:9094
#zookeeper 地址 用于配置log_id
-zookeeper.servers=192.168.44.11:2181
+zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=192.168.44.11:2181
+hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
tools.library=D:\\workerspace\\dat\\
#网关的schema位置
-schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/session_record
+schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/session_record
#网关APP_ID 获取接口
app.id.http=http://192.168.44.67:9999/open-api/appDicList
@@ -31,7 +31,7 @@ source.kafka.topic=test
sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=flink-test-1
+group.id=flinktest-1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index e2d430a..ebc8eeb 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -52,8 +52,6 @@ public class FlowWriteConfig {
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
- public static final String KAFKA_SOURCE_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.source.protocol");
- public static final String KAFKA_SINK_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.sink.protocol");
public static final String KAFKA_USER = FlowWriteConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = FlowWriteConfigurations.getStringProperty(1, "kafka.pin");
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index 07e0407..2d42769 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -5,14 +5,15 @@ import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.functions.FilterNullFunction;
import com.zdjizhi.utils.functions.MapCompletedFunction;
-import com.zdjizhi.utils.functions.ObjectCompletedFunction;
import com.zdjizhi.utils.functions.TypeMapCompletedFunction;
-import com.zdjizhi.utils.kafka.Consumer;
-import com.zdjizhi.utils.kafka.Producer;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import java.util.Map;
+
/**
* @author qidaijie
* @Package com.zdjizhi.topology
@@ -25,56 +26,48 @@ public class LogFlowWriteTopology {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- //开启Checkpoint,interval用于指定checkpoint的触发间隔(单位milliseconds)
-// environment.enableCheckpointing(5000);
-
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
- DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
- .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
-
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
+ DataStreamSource<Map<String, Object>> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer())
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
+
DataStream<String> cleaningLog;
switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) {
case 0:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
break;
case 1:
- //对原始日志进行处理补全转换等,强制要求日志字段类型与schema一致。
- cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction")
- .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
- break;
- case 2:
//对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。
cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
break;
default:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
}
-// //过滤空数据不发送到Kafka内
+ //过滤空数据不发送到Kafka内
DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
- result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
+ result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);
} else {
+ DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.flinkConsumer())
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
+
//过滤空数据不发送到Kafka内
DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
//发送数据到Kafka
- result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
+ result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
index 5e5d0b7..810e4c8 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -4,6 +4,8 @@ package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormMap;
import org.apache.flink.api.common.functions.MapFunction;
+import java.util.Map;
+
/**
* @author qidaijie
@@ -11,11 +13,11 @@ import org.apache.flink.api.common.functions.MapFunction;
* @Description:
* @date 2021/5/2715:01
*/
-public class MapCompletedFunction implements MapFunction<String, String> {
+public class MapCompletedFunction implements MapFunction<Map<String, Object>, String> {
@Override
@SuppressWarnings("unchecked")
- public String map(String logs) {
+ public String map(Map<String, Object> logs) {
return TransFormMap.dealCommonMessage(logs);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java
deleted file mode 100644
index 131d2f6..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import com.zdjizhi.utils.general.TransFormObject;
-import org.apache.flink.api.common.functions.MapFunction;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class ObjectCompletedFunction implements MapFunction<String, String> {
-
- @Override
- @SuppressWarnings("unchecked")
- public String map(String logs) {
- return TransFormObject.dealCommonMessage(logs);
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
index 99c92e8..ccef850 100644
--- a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
@@ -3,6 +3,8 @@ package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormTypeMap;
import org.apache.flink.api.common.functions.MapFunction;
+import java.util.Map;
+
/**
* @author qidaijie
@@ -10,11 +12,11 @@ import org.apache.flink.api.common.functions.MapFunction;
* @Description:
* @date 2021/5/2715:01
*/
-public class TypeMapCompletedFunction implements MapFunction<String, String> {
+public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, String> {
@Override
@SuppressWarnings("unchecked")
- public String map(String logs) {
+ public String map(Map<String, Object> logs) {
return TransFormTypeMap.dealCommonMessage(logs);
}
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
index 5ae9859..de4ca99 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
@@ -30,34 +30,29 @@ public class TransFormMap {
/**
* 解析日志,并补全
*
- * @param message kafka Topic原始日志
+ * @param jsonMap kafka Topic消费原始日志并解析
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
- public static String dealCommonMessage(String message) {
+ public static String dealCommonMessage(Map<String, Object> jsonMap) {
try {
- if (StringUtil.isNotBlank(message)) {
- Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
- JsonParseUtil.dropJsonField(jsonMap);
- for (String[] strings : jobList) {
- //用到的参数的值
- Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //需要补全的字段的key
- String appendToKeyName = strings[1];
- //需要补全的字段的值
- Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
- //匹配操作函数的字段
- String function = strings[2];
- //额外的参数的值
- String param = strings[3];
- functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
- }
- return JsonMapper.toJsonString(jsonMap);
- } else {
- return null;
+ JsonParseUtil.dropJsonField(jsonMap);
+ for (String[] strings : jobList) {
+ //用到的参数的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
}
+ return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
- logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
+ logger.error("TransForm logs failed,The exception is :" + e.getMessage());
return null;
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java
deleted file mode 100644
index 54629db..0000000
--- a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package com.zdjizhi.utils.general;
-
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-
-/**
- * 描述:转换或补全工具类
- *
- * @author qidaijie
- */
-public class TransFormObject {
- private static final Log logger = LogFactory.get();
-
- /**
- * 在内存中加载反射类用的map
- */
- private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
-
- /**
- * 反射成一个类
- */
- private static Object mapObject = JsonParseUtil.generateObject(map);
-
- /**
- * 获取任务列表
- * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
- * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
- */
- private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
-
- /**
- * 解析日志,并补全
- *
- * @param message kafka Topic原始日志
- * @return 补全后的日志
- */
- public static String dealCommonMessage(String message) {
- try {
- if (StringUtil.isNotBlank(message)) {
- Object object = JsonMapper.fromJsonString(message, mapObject.getClass());
- for (String[] strings : jobList) {
- //用到的参数的值
- Object name = JsonParseUtil.getValue(object, strings[0]);
- //需要补全的字段的key
- String appendToKeyName = strings[1];
- //需要补全的字段的值
- Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
- //匹配操作函数的字段
- String function = strings[2];
- //额外的参数的值
- String param = strings[3];
- functionSet(function, object, appendToKeyName, appendTo, name, param);
- }
- return JsonMapper.toJsonString(object);
- } else {
- return null;
- }
- } catch (RuntimeException e) {
- logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
- return null;
- }
- }
-
-
- /**
- * 根据schema描述对应字段进行操作的 函数集合
- *
- * @param function 匹配操作函数的字段
- * @param object 动态POJO Object
- * @param appendToKeyName 需要补全的字段的key
- * @param appendTo 需要补全的字段的值
- * @param name 用到的参数的值
- * @param param 额外的参数的值
- */
- private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) {
- switch (function) {
- case "current_timestamp":
- if (!(appendTo instanceof Long)) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime());
- }
- break;
- case "snowflake_id":
- JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
- break;
- case "geo_ip_detail":
- if (name != null && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString()));
- }
- break;
- case "geo_asn":
- if (name != null && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString()));
- }
- break;
- case "geo_ip_country":
- if (name != null && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString()));
- }
- break;
- case "set_value":
- if (name != null && param != null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param));
- }
- break;
- case "get_value":
- if (name != null) {
- JsonParseUtil.setValue(object, appendToKeyName, name);
- }
- break;
- case "if":
- if (param != null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param));
- }
- break;
- case "sub_domain":
- if (appendTo == null && name != null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString()));
- }
- break;
- case "radius_match":
- if (name != null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString()));
- }
- break;
- case "decode_of_base64":
- if (name != null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param)));
- }
- break;
- case "flattenSpec":
- if (name != null && param != null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
- }
- break;
- case "app_match":
- if (name != null && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
- }
- break;
- default:
- }
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
index 699470f..5b3cede 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
@@ -39,31 +39,25 @@ public class TransFormTypeMap {
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
- public static String dealCommonMessage(String message) {
+ public static String dealCommonMessage(Map<String, Object> message) {
try {
- if (StringUtil.isNotBlank(message)) {
- Map<String, Object> map = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
- Map<String, Object> jsonMap = JsonTypeUtils.typeTransform(map);
- for (String[] strings : jobList) {
- //用到的参数的值
- Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //需要补全的字段的key
- String appendToKeyName = strings[1];
- //需要补全的字段的值
- Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
- //匹配操作函数的字段
- String function = strings[2];
- //额外的参数的值
- String param = strings[3];
- functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
- }
- return JsonMapper.toJsonString(jsonMap);
-
- } else {
- return null;
+ Map<String, Object> jsonMap = JsonTypeUtils.typeTransform(message);
+ for (String[] strings : jobList) {
+ //用到的参数的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
}
+ return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
- logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
+ logger.error("TransForm logs failed,The exception is :" + e.getMessage());
return null;
}
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
index b09eedb..fe86fe7 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -12,24 +12,36 @@ import java.util.Properties;
* @date 2021/9/610:37
*/
class CertUtils {
- static void chooseCert(String type, Properties properties) {
- switch (type) {
- case "SSL":
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
- properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
- properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
- break;
- case "SASL":
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
- break;
- default:
+ /**
+ * Kafka SASL认证端口
+ */
+ private static final String SASL_PORT = "9094";
+
+ /**
+ * Kafka SSL认证端口
+ */
+ private static final String SSL_PORT = "9095";
+
+ /**
+ * 根据连接信息端口判断认证方式。
+ *
+ * @param servers kafka 连接信息
+ * @param properties kafka 连接配置信息
+ */
+ static void chooseCert(String servers, Properties properties) {
+ if (servers.contains(SASL_PORT)) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
+ } else if (servers.contains(SSL_PORT)) {
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 339b7e3..078c2fe 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -1,14 +1,14 @@
package com.zdjizhi.utils.kafka;
-import com.sun.tools.javac.comp.Flow;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.config.SslConfigs;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import java.util.Map;
import java.util.Properties;
/**
@@ -17,7 +17,7 @@ import java.util.Properties;
* @Description:
* @date 2021/6/813:54
*/
-public class Consumer {
+public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
@@ -27,12 +27,33 @@ public class Consumer {
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties);
+ properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
+ CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
- public static FlinkKafkaConsumer<String> getKafkaConsumer() {
+ /**
+ * 用户序列化kafka数据,增加 kafka Timestamp内容。
+ *
+ * @return kafka logs -> map
+ */
+ public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
+ FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
+ new TimestampDeserializationSchema(), createConsumerConfig());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+
+ /**
+ * 官方序列化kafka数据
+ *
+ * @return kafka logs
+ */
+ public static FlinkKafkaConsumer<String> flinkConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index 1671643..f2f399d 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -15,7 +15,7 @@ import java.util.Properties;
* @Description:
* @date 2021/6/814:04
*/
-public class Producer {
+public class KafkaProducer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
@@ -29,7 +29,7 @@ public class Producer {
properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
- CertUtils.chooseCert(FlowWriteConfig.KAFKA_SINK_PROTOCOL, properties);
+ CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties);
return properties;
}
@@ -43,7 +43,6 @@ public class Producer {
kafkaProducer.setLogFailuresOnly(false);
-
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
new file mode 100644
index 0000000..e978369
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.utils.kafka;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2022/3/89:42
+ */
+public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
+ private static final Log logger = LogFactory.get();
+ private final String ENCODING = "UTF8";
+
+ @Override
+ public boolean isEndOfStream(Object nextElement) {
+ return false;
+ }
+
+ @Override
+ public Map<String, Object> deserialize(ConsumerRecord record) throws Exception {
+ if (record != null) {
+ try {
+ long timestamp = record.timestamp() / 1000;
+ String value = new String((byte[]) record.value(), ENCODING);
+ Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
+ json.put("common_ingestion_time", timestamp);
+ return json;
+ } catch (RuntimeException e) {
+ logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(Map.class);
+ }
+}