summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-10-17 13:59:28 +0800
committerqidaijie <[email protected]>2022-10-17 13:59:28 +0800
commitb4786aa808097d1db90ade10033f00f147094c80 (patch)
tree1e078616aa22e85348e77425643687329d0bc080
parent9404b08eb4bbe237082c9935f7ebad3cef701ba4 (diff)
1:MAIL日志charset为空时ETL不进行转换。(TSG-12175)
-rw-r--r--pom.xml2
-rw-r--r--properties/default_config.properties12
-rw-r--r--src/main/java/com/zdjizhi/tools/general/TransFunction.java8
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java7
4 files changed, 15 insertions, 14 deletions
diff --git a/pom.xml b/pom.xml
index 3442b0e..7fd6afd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
- <version>20220921-VSYS</version>
+ <version>20221017-VSYS</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 781d319..9495f9c 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -27,15 +27,15 @@ buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小
#default: 10485760 = 10M
max.request.size=10485760
+
+#producer ack
+producer.ack=1
#====================kafka default====================#
#kafka SASL/SSL username (encryption)
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL/SSL pin (encryption)
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
-
-#producer ack
-producer.ack=1
#====================nacos default====================#
#nacos username (encryption)
nacos.username=kANxu/Zi5rBnZVxa5zAjrQ==
@@ -63,11 +63,11 @@ log.transform.type=1
buffer.timeout=5000
#The gtpc data scan max rows,0 = no limit.
-hbase.gtpc.scan.max.rows=0
+hbase.gtpc.scan.max.rows=1000000
#The radius data scan max rows,0 = no limit.
-hbase.radius.scan.max.rows=0
+hbase.radius.scan.max.rows=1000000
#Whether vsys_id is used as the relationship key between gtpc and radius.
#vsys or global
-data.relationship.model=global \ No newline at end of file
+data.relationship.model=vsys \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/tools/general/TransFunction.java b/src/main/java/com/zdjizhi/tools/general/TransFunction.java
index 48c0813..c527b56 100644
--- a/src/main/java/com/zdjizhi/tools/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/tools/general/TransFunction.java
@@ -180,12 +180,8 @@ class TransFunction {
static String decodeBase64(String message, Object charset) {
String result = "";
try {
- if (StringUtil.isNotBlank(message)) {
- if (charset == null) {
- result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
- } else {
- result = Base64.decodeStr(message, charset.toString());
- }
+ if (StringUtil.isNotBlank(message) && charset != null) {
+ result = Base64.decodeStr(message, charset.toString());
}
} catch (RuntimeException e) {
logger.error("Resolve Base64 exception, exception information:" + e.getMessage());
diff --git a/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java
index 409ea94..d46634e 100644
--- a/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java
@@ -37,10 +37,15 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem
long timestamp = record.timestamp() / 1000;
String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING);
Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
- json.put("common_ingestion_time", timestamp);
+ if (json != null) {
+ if (timestamp != 0L) {
+ json.put("common_ingestion_time", timestamp);
+ }
+ }
return json;
} catch (RuntimeException e) {
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
+ e.printStackTrace();
}
}
return null;