summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-07-07 15:19:34 +0800
committerqidaijie <[email protected]>2021-07-07 15:19:34 +0800
commit5764bb999e4eb60175aa7c0685a4547ef33181d0 (patch)
treed54b5004090f1a2d34468122fd2a11f24eccd308
parent1b47ecf76b37b93c4cfaf873559160153f0ef078 (diff)
1:修改JSON解析方式为JackSon。
2:删除Kafka生产者flush方法。 3:修改打包方式为package。
-rw-r--r--pom.xml34
-rw-r--r--properties/service_flow_config.properties11
-rw-r--r--src/main/java/com/zdjizhi/bolt/LogSendBolt.java4
-rw-r--r--src/main/java/com/zdjizhi/topology/StormRunner.java1
-rw-r--r--src/main/java/com/zdjizhi/utils/app/AppUtils.java8
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormUtils.java36
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFunction.java47
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java5
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java8
9 files changed, 67 insertions, 87 deletions
diff --git a/pom.xml b/pom.xml
index 2d2b363..8b89a8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-completion-schema</artifactId>
- <version>v3.21.06.07-Array</version>
+ <version>v3.21.06.28-jackson</version>
<packaging>jar</packaging>
<name>log-stream-completion-schema</name>
@@ -21,7 +21,6 @@
</repositories>
<build>
-
<plugins>
<plugin>
@@ -34,7 +33,7 @@
<executions>
<execution>
- <phase>install</phase>
+ <phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
@@ -44,14 +43,6 @@
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zdjizhi.topology.LogFlowWriteTopology</mainClass>
</transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>META-INF/spring.handlers</resource>
- </transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>META-INF/spring.schemas</resource>
- </transformer>
</transformers>
</configuration>
</execution>
@@ -67,7 +58,7 @@
<goals>
<goal>strip-jar</goal>
</goals>
- <phase>install</phase>
+ <phase>package</phase>
</execution>
</executions>
</plugin>
@@ -175,7 +166,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
- <version>1.0.3</version>
+ <version>1.0.6</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -222,23 +213,6 @@
</exclusions>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
- <!--<dependency>-->
- <!--<groupId>org.apache.hbase</groupId>-->
- <!--<artifactId>hbase-server</artifactId>-->
- <!--<version>${hbase.version}</version>-->
- <!--<exclusions>-->
- <!--<exclusion>-->
- <!--<artifactId>slf4j-log4j12</artifactId>-->
- <!--<groupId>org.slf4j</groupId>-->
- <!--</exclusion>-->
- <!--<exclusion>-->
- <!--<artifactId>log4j-over-slf4j</artifactId>-->
- <!--<groupId>org.slf4j</groupId>-->
- <!--</exclusion>-->
- <!--</exclusions>-->
- <!--</dependency>-->
-
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 33c1667..e5cd76a 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,7 +1,7 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-input.kafka.servers=192.168.44.12:9092
+input.kafka.servers=192.168.44.11:9092
#管理输出kafka地址
output.kafka.servers=192.168.44.12:9092
@@ -18,7 +18,7 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
#ip.library=/home/bigdata/topology/dat/
#网关的schema位置
-schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log
+schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log
#网关APP_ID 获取接口
app.id.http=http://192.168.44.67:9999/open-api/appDicList
@@ -26,13 +26,13 @@ app.id.http=http://192.168.44.67:9999/open-api/appDicList
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-kafka.topic=PROXY-EVENT-LOG
+kafka.topic=CONNECTION-RECORD-LOG
#补全数据 输出 topic
-results.output.topic=PROXY-EVENT-COMPLETED-LOG
+results.output.topic=test-result
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=connection-record-log-20200818-1-test
+group.id=test-20210628-1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
@@ -101,4 +101,3 @@ mail.default.charset=UTF-8
#需不要补全,不需要则原样日志输出
log.need.complete=yes
-
diff --git a/src/main/java/com/zdjizhi/bolt/LogSendBolt.java b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java
index 46c6353..6fc0537 100644
--- a/src/main/java/com/zdjizhi/bolt/LogSendBolt.java
+++ b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java
@@ -24,13 +24,13 @@ import java.util.Map;
public class LogSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = -3663610927224396615L;
private static final Log logger = LogFactory.get();
- private List<String> list;
+ private List<String> list = new LinkedList<>();
private KafkaLogSend kafkaLogSend;
@Override
public void prepare(Map stormConf, TopologyContext context) {
- list = new LinkedList<>();
+// list = new LinkedList<>();
kafkaLogSend = KafkaLogSend.getInstance();
}
diff --git a/src/main/java/com/zdjizhi/topology/StormRunner.java b/src/main/java/com/zdjizhi/topology/StormRunner.java
index 85024fd..85eb371 100644
--- a/src/main/java/com/zdjizhi/topology/StormRunner.java
+++ b/src/main/java/com/zdjizhi/topology/StormRunner.java
@@ -28,7 +28,6 @@ public final class StormRunner{
}
public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
-
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
index 1193b13..3770dde 100644
--- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java
+++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
@@ -116,12 +116,16 @@ public class AppUtils {
* @return account
*/
public static String getAppName(int appId) {
-
if (appUtils == null) {
getAppInstance();
}
- return appIdMap.get(appId);
+ if (appIdMap.containsKey(appId)) {
+ return appIdMap.get(appId);
+ } else {
+ logger.warn("AppMap get appName is null, ID is :" + appId);
+ return "";
+ }
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
index 66eadde..5bfda89 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
@@ -1,14 +1,14 @@
package com.zdjizhi.utils.general;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.json.JsonParseUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.utils.IpLookup;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.json.JsonParseUtil;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
/**
@@ -37,18 +37,6 @@ public class TransFormUtils {
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
- * IP定位库工具类
- */
- private static IpLookup ipLookup = new IpLookup.Builder(false)
- .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb")
- .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb")
- .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb")
- .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb")
- .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
- .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
- .build();
-
- /**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
@@ -56,7 +44,7 @@ public class TransFormUtils {
*/
public static String dealCommonMessage(String message) {
try {
- Object object = JSONObject.parseObject(message, mapObject.getClass());
+ Object object = JsonMapper.fromJsonString(message, mapObject.getClass());
for (String[] strings : jobList) {
//用到的参数的值
Object name = JsonParseUtil.getValue(object, strings[0]);
@@ -68,12 +56,12 @@ public class TransFormUtils {
String function = strings[2];
//额外的参数的值
String param = strings[3];
-
functionSet(function, object, appendToKeyName, appendTo, name, param);
}
- return JSONObject.toJSONString(object);
+ return JsonMapper.toJsonString(object);
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e);
+ e.printStackTrace();
return "";
}
}
@@ -92,7 +80,7 @@ public class TransFormUtils {
private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) {
switch (function) {
case "current_timestamp":
- if ((long) appendTo == 0L) {
+ if (! (appendTo instanceof Long)) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime());
}
break;
@@ -101,17 +89,17 @@ public class TransFormUtils {
break;
case "geo_ip_detail":
if (name != null && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(ipLookup, name.toString()));
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString()));
}
break;
case "geo_asn":
if (name != null && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(ipLookup, name.toString()));
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString()));
}
break;
case "geo_ip_country":
if (name != null && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(ipLookup, name.toString()));
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString()));
}
break;
case "set_value":
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index bfb71a2..0e9fb93 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -1,18 +1,20 @@
package com.zdjizhi.utils.general;
+import clojure.lang.IFn;
import cn.hutool.core.codec.Base64;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.app.AppUtils;
-import com.zdjizhi.utils.hbase.HBaseUtils;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import cn.hutool.core.text.StrSpliter;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.utils.Encodes;
+import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.app.AppUtils;
+import com.zdjizhi.utils.hbase.HBaseUtils;
+import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.ArrayList;
import java.util.regex.Matcher;
@@ -28,9 +30,22 @@ class TransFunction {
private static final Pattern PATTERN = Pattern.compile("[0-9]*");
/**
+ * IP定位库工具类
+ */
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb")
+ .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
+ .build();
+
+ /**
* 生成当前时间戳的操作
*/
static long getCurrentTime() {
+
return System.currentTimeMillis() / 1000;
}
@@ -40,9 +55,10 @@ class TransFunction {
* @param ip client IP
* @return ip地址详细信息
*/
- static String getGeoIpDetail(IpLookup ipLookup, String ip) {
+ static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
+
}
/**
@@ -51,7 +67,7 @@ class TransFunction {
* @param ip client/server IP
* @return ASN
*/
- static String getGeoAsn(IpLookup ipLookup, String ip) {
+ static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip);
}
@@ -62,7 +78,7 @@ class TransFunction {
* @param ip server IP
* @return 国家
*/
- static String getGeoIpCountry(IpLookup ipLookup, String ip) {
+ static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
@@ -77,7 +93,7 @@ class TransFunction {
static String radiusMatch(String ip) {
String account = HBaseUtils.getAccount(ip.trim());
if (StringUtil.isBlank(account)) {
- logger.warn("HashMap get account is null, Ip is :{}", ip);
+ logger.warn("HashMap get account is null, Ip is :" + ip);
}
return account;
}
@@ -89,12 +105,13 @@ class TransFunction {
* @return appName
*/
static String appMatch(String appIds) {
- String appId = appIds.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
- String appName = AppUtils.getAppName(Integer.parseInt(appId));
- if (StringUtil.isBlank(appName)) {
- logger.warn("AppMap get appName is null, ID is :{}", appId);
+ try {
+ String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
+ return AppUtils.getAppName(Integer.parseInt(appId));
+ } catch (NumberFormatException | ClassCastException exception) {
+ logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
+ return "";
}
- return appName;
}
/**
@@ -107,7 +124,7 @@ class TransFunction {
try {
return FormatUtils.getTopPrivateDomain(domain);
} catch (StringIndexOutOfBoundsException outException) {
- logger.error("解析顶级域名异常,异常域名:{}" + domain);
+ logger.error("解析顶级域名异常,异常域名:" + domain);
return "";
}
}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index 10807e1..07ee2e5 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -23,6 +23,8 @@ import java.util.*;
public class JsonParseUtil {
private static final Log logger = LogFactory.get();
+ private static List<String> dropFieldList = new ArrayList<>();
+
/**
* 模式匹配,给定一个类型字符串返回一个类类型
*
@@ -143,6 +145,8 @@ public class JsonParseUtil {
}
//组合用来生成实体类的map
map.put(name, getClassName(type));
+ } else {
+ dropFieldList.add(JsonPath.read(filedStr, "$.name").toString());
}
}
return map;
@@ -237,5 +241,4 @@ public class JsonParseUtil {
return list;
}
-
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java
index 51b9138..5104745 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java
@@ -42,22 +42,18 @@ public class KafkaLogSend {
public void sendMessage(List<String> list) {
- final int[] errorSum = {0};
for (String value : list) {
kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
- errorSum[0]++;
}
}
});
- if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) {
- list.clear();
- }
}
- kafkaProducer.flush();
+// kafkaProducer.flush();
+// list.clear();
logger.debug("Log sent to National Center successfully!!!!!");
}