diff options
| author | qidaijie <[email protected]> | 2021-07-07 15:19:34 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-07-07 15:19:34 +0800 |
| commit | 5764bb999e4eb60175aa7c0685a4547ef33181d0 (patch) | |
| tree | d54b5004090f1a2d34468122fd2a11f24eccd308 | |
| parent | 1b47ecf76b37b93c4cfaf873559160153f0ef078 (diff) | |
1:修改JSON解析方式为JackSon。
2:删除Kafka生产者flush方法。
3:修改打包方式为package。
| -rw-r--r-- | pom.xml | 34 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 11 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bolt/LogSendBolt.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/StormRunner.java | 1 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/app/AppUtils.java | 8 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/general/TransFormUtils.java | 36 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/general/TransFunction.java | 47 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java | 5 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java | 8 |
9 files changed, 67 insertions, 87 deletions
@@ -4,7 +4,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-stream-completion-schema</artifactId> - <version>v3.21.06.07-Array</version> + <version>v3.21.06.28-jackson</version> <packaging>jar</packaging> <name>log-stream-completion-schema</name> @@ -21,7 +21,6 @@ </repositories> <build> - <plugins> <plugin> @@ -34,7 +33,7 @@ <executions> <execution> - <phase>install</phase> + <phase>package</phase> <goals> <goal>shade</goal> </goals> @@ -44,14 +43,6 @@ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.zdjizhi.topology.LogFlowWriteTopology</mainClass> </transformer> - <transformer - implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>META-INF/spring.handlers</resource> - </transformer> - <transformer - implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>META-INF/spring.schemas</resource> - </transformer> </transformers> </configuration> </execution> @@ -67,7 +58,7 @@ <goals> <goal>strip-jar</goal> </goals> - <phase>install</phase> + <phase>package</phase> </execution> </executions> </plugin> @@ -175,7 +166,7 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> - <version>1.0.3</version> + <version>1.0.6</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> @@ -222,23 +213,6 @@ </exclusions> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> - <!--<dependency>--> - <!--<groupId>org.apache.hbase</groupId>--> - <!--<artifactId>hbase-server</artifactId>--> - <!--<version>${hbase.version}</version>--> - <!--<exclusions>--> - <!--<exclusion>--> - <!--<artifactId>slf4j-log4j12</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--<exclusion>--> - <!--<artifactId>log4j-over-slf4j</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--</exclusions>--> - <!--</dependency>--> - <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 33c1667..e5cd76a 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -input.kafka.servers=192.168.44.12:9092 +input.kafka.servers=192.168.44.11:9092 #管理输出kafka地址 output.kafka.servers=192.168.44.12:9092 @@ -18,7 +18,7 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ #ip.library=/home/bigdata/topology/dat/ #网关的schema位置 -schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log #网关APP_ID 获取接口 app.id.http=http://192.168.44.67:9999/open-api/appDicList @@ -26,13 +26,13 @@ app.id.http=http://192.168.44.67:9999/open-api/appDicList #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -kafka.topic=PROXY-EVENT-LOG +kafka.topic=CONNECTION-RECORD-LOG #补全数据 输出 topic -results.output.topic=PROXY-EVENT-COMPLETED-LOG +results.output.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=connection-record-log-20200818-1-test +group.id=test-20210628-1 #生产者压缩模式 none or snappy producer.kafka.compression.type=none @@ -101,4 +101,3 @@ mail.default.charset=UTF-8 #需不要补全,不需要则原样日志输出 log.need.complete=yes - diff --git a/src/main/java/com/zdjizhi/bolt/LogSendBolt.java b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java index 46c6353..6fc0537 100644 --- a/src/main/java/com/zdjizhi/bolt/LogSendBolt.java +++ b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java @@ -24,13 +24,13 @@ import java.util.Map; public class LogSendBolt extends BaseBasicBolt { private static final long serialVersionUID = -3663610927224396615L; private static final Log logger = LogFactory.get(); - private List<String> list; + private List<String> list = new LinkedList<>(); private KafkaLogSend kafkaLogSend; @Override public void prepare(Map stormConf, TopologyContext context) { - list = new LinkedList<>(); +// list = new LinkedList<>(); kafkaLogSend = KafkaLogSend.getInstance(); } diff --git a/src/main/java/com/zdjizhi/topology/StormRunner.java b/src/main/java/com/zdjizhi/topology/StormRunner.java index 85024fd..85eb371 100644 --- a/src/main/java/com/zdjizhi/topology/StormRunner.java +++ b/src/main/java/com/zdjizhi/topology/StormRunner.java @@ -28,7 +28,6 @@ public final class StormRunner{ } public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); } diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java index 1193b13..3770dde 100644 --- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java +++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java @@ -116,12 +116,16 @@ public class AppUtils { * @return account */ public static String getAppName(int appId) { - if (appUtils == null) { getAppInstance(); } - return appIdMap.get(appId); + if (appIdMap.containsKey(appId)) { + return appIdMap.get(appId); + } else { + logger.warn("AppMap get appName is null, ID is :" + appId); + return ""; + } } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java index 66eadde..5bfda89 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java @@ -1,14 +1,14 @@ package com.zdjizhi.utils.general; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.json.JsonParseUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; /** @@ -37,18 +37,6 @@ public class TransFormUtils { private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); /** - * IP定位库工具类 - */ - private static IpLookup ipLookup = new IpLookup.Builder(false) - .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") - .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb") - .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb") - .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb") - .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") - .build(); - - /** * 解析日志,并补全 * * @param message kafka Topic原始日志 @@ -56,7 +44,7 @@ public class TransFormUtils { */ public static String dealCommonMessage(String message) { try { - Object object = JSONObject.parseObject(message, mapObject.getClass()); + Object object = JsonMapper.fromJsonString(message, mapObject.getClass()); for (String[] strings : jobList) { //用到的参数的值 Object name = JsonParseUtil.getValue(object, strings[0]); @@ -68,12 +56,12 @@ public class TransFormUtils { String function = strings[2]; //额外的参数的值 String param = strings[3]; - functionSet(function, object, appendToKeyName, appendTo, name, param); } - return JSONObject.toJSONString(object); + return JsonMapper.toJsonString(object); } catch (RuntimeException e) { logger.error("解析补全日志信息过程异常,异常信息:" + e); + e.printStackTrace(); return ""; } } @@ -92,7 +80,7 @@ public class TransFormUtils { private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) { switch (function) { case "current_timestamp": - if ((long) appendTo == 0L) { + if (! (appendTo instanceof Long)) { JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime()); } break; @@ -101,17 +89,17 @@ public class TransFormUtils { break; case "geo_ip_detail": if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(ipLookup, name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); } break; case "geo_asn": if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(ipLookup, name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString())); } break; case "geo_ip_country": if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(ipLookup, name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); } break; case "set_value": diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index bfb71a2..0e9fb93 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -1,18 +1,20 @@ package com.zdjizhi.utils.general; +import clojure.lang.IFn; import cn.hutool.core.codec.Base64; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.app.AppUtils; -import com.zdjizhi.utils.hbase.HBaseUtils; -import com.zdjizhi.utils.json.JsonParseUtil; +import cn.hutool.core.text.StrSpliter; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.utils.Encodes; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookup; import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.app.AppUtils; +import com.zdjizhi.utils.hbase.HBaseUtils; +import com.zdjizhi.utils.json.JsonParseUtil; import java.util.ArrayList; import java.util.regex.Matcher; @@ -28,9 +30,22 @@ class TransFunction { private static final Pattern PATTERN = Pattern.compile("[0-9]*"); /** + * IP定位库工具类 + */ + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") + .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb") + .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") + .build(); + + /** * 生成当前时间戳的操作 */ static long getCurrentTime() { + return System.currentTimeMillis() / 1000; } @@ -40,9 +55,10 @@ class TransFunction { * @param ip client IP * @return ip地址详细信息 */ - static String getGeoIpDetail(IpLookup ipLookup, String ip) { + static String getGeoIpDetail(String ip) { return ipLookup.cityLookupDetail(ip); + } /** @@ -51,7 +67,7 @@ class TransFunction { * @param ip client/server IP * @return ASN */ - static String getGeoAsn(IpLookup ipLookup, String ip) { + static String getGeoAsn(String ip) { return ipLookup.asnLookup(ip); } @@ -62,7 +78,7 @@ class TransFunction { * @param ip server IP * @return 国家 */ - static String getGeoIpCountry(IpLookup ipLookup, String ip) { + static String getGeoIpCountry(String ip) { return ipLookup.countryLookup(ip); } @@ -77,7 +93,7 @@ class TransFunction { static String radiusMatch(String ip) { String account = HBaseUtils.getAccount(ip.trim()); if (StringUtil.isBlank(account)) { - logger.warn("HashMap get account is null, Ip is :{}", ip); + logger.warn("HashMap get account is null, Ip is :" + ip); } return account; } @@ -89,12 +105,13 @@ class TransFunction { * @return appName */ static String appMatch(String appIds) { - String appId = appIds.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; - String appName = AppUtils.getAppName(Integer.parseInt(appId)); - if (StringUtil.isBlank(appName)) { - logger.warn("AppMap get appName is null, ID is :{}", appId); + try { + String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); + return AppUtils.getAppName(Integer.parseInt(appId)); + } catch (NumberFormatException | ClassCastException exception) { + logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds); + return ""; } - return appName; } /** @@ -107,7 +124,7 @@ class TransFunction { try { return FormatUtils.getTopPrivateDomain(domain); } catch (StringIndexOutOfBoundsException outException) { - logger.error("解析顶级域名异常,异常域名:{}" + domain); + logger.error("解析顶级域名异常,异常域名:" + domain); return ""; } } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 10807e1..07ee2e5 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -23,6 +23,8 @@ import java.util.*; public class JsonParseUtil { private static final Log logger = LogFactory.get(); + private static List<String> dropFieldList = new ArrayList<>(); + /** * 模式匹配,给定一个类型字符串返回一个类类型 * @@ -143,6 +145,8 @@ public class JsonParseUtil { } //组合用来生成实体类的map map.put(name, getClassName(type)); + } else { + dropFieldList.add(JsonPath.read(filedStr, "$.name").toString()); } } return map; @@ -237,5 +241,4 @@ public class JsonParseUtil { return list; } - }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java index 51b9138..5104745 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java @@ -42,22 +42,18 @@ public class KafkaLogSend { public void sendMessage(List<String> list) { - final int[] errorSum = {0}; for (String value : list) { kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); - errorSum[0]++; } } }); - if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) { - list.clear(); - } } - kafkaProducer.flush(); +// kafkaProducer.flush(); +// list.clear(); logger.debug("Log sent to National Center successfully!!!!!"); } |
