diff options
| author | qidaijie <[email protected]> | 2021-03-25 14:29:02 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-03-25 14:29:02 +0800 |
| commit | 928581d8600d03d6af11ea19a7bfc1bb0aa9d4ec (patch) | |
| tree | 5ff76db96020107910bcf1953993e0b5866c253e | |
| parent | 4120969aecb8c49d5e78176a1bc785fd1b3d7e78 (diff) | |
修复EAL4中低级警告代码版本
13 files changed, 175 insertions, 102 deletions
@@ -4,7 +4,7 @@ <groupId>cn.ac.iie</groupId> <artifactId>storm-olap-aggregation</artifactId> - <version>v3.20.11.17-ratelimit</version> + <version>v3.21.03.16-eal4</version> <packaging>jar</packaging> @@ -20,6 +20,19 @@ <url>http://192.168.40.125:8099/content/groups/public</url> </repository> + <repository> + <id>maven-ali</id> + <url>http://maven.aliyun.com/nexus/content/groups/public/</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + <updatePolicy>always</updatePolicy> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + </repository> + </repositories> @@ -56,6 +69,19 @@ </executions> </plugin> + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 2166e6f..43e8ee0 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,5 +1,5 @@ #管理kafka地址 -bootstrap.servers=192.168.40.132:9092 +bootstrap.servers=192.168.44.12:9092 #latest/earliest auto.offset.reset=latest @@ -10,7 +10,7 @@ consumer.client.id=live-chart-consumer-connection-record producer.client.id=live-chart-producer-connection-record #生产者压缩模式 none or snappy -producer.kafka.compression.type=snappy +producer.kafka.compression.type=none #kafka broker下的topic名称 kafka.topic=test @@ -19,7 +19,7 @@ kafka.topic=test group.id=test-20200922 #输出topic -results.bootstrap.servers=192.168.40.132:9092 +results.bootstrap.servers=192.168.44.12:9092 #输出topic results.output.topic=test-result @@ -55,10 +55,10 @@ kafka.bolt.parallelism=1 batch.insert.num=2000 #网关的schema位置 -schema.http=http://192.168.40.132:9999/metadata/schema/v1/fields/liveChart +schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart #网关的schema位置 -app.id.http=http://192.168.40.132:9999/open-api/appDicList +app.id.http=http://192.168.40.12:9999/open-api/appDicList #数据中心(UID) data.center.id.num=15 diff --git a/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java index 87b20e6..d36cc5b 100644 --- a/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java @@ -2,6 +2,7 @@ package com.zdjizhi.storm.bolt; import com.zdjizhi.storm.utils.combine.AggregateUtils; import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import com.zdjizhi.storm.utils.exception.AggregationException; import com.zdjizhi.storm.utils.http.HttpClientUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -83,8 +84,8 @@ public class CalculateBolt extends BaseBasicBolt { cacheMap.put(dimensions, cacheMessage); } - } catch (Exception e) { - logger.error("计算节点异常,异常信息:" + e); + } catch (AggregationException e) { + logger.error("初次泛聚合异常,异常信息:" + e); } } diff --git a/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java index d996a84..3f1bc19 100644 --- a/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java @@ -2,6 +2,7 @@ package com.zdjizhi.storm.bolt; import com.zdjizhi.storm.utils.combine.AggregateUtils; import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import com.zdjizhi.storm.utils.exception.AggregationException; import com.zdjizhi.storm.utils.http.HttpClientUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -84,9 +85,8 @@ public class GatheringBolt extends BaseBasicBolt { cacheMap.put(dimensions, cacheMessage); } - } catch (Exception e) { - e.printStackTrace(); - logger.error("计算节点异常,异常信息:" + e); + } catch (AggregationException e) { + logger.error("二次聚合异常,异常信息:" + e); } } diff --git a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java index 6203f16..14011c1 100644 --- a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java @@ -2,6 +2,7 @@ package com.zdjizhi.storm.bolt; import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import com.zdjizhi.storm.utils.exception.AggregationException; import com.zdjizhi.storm.utils.http.HttpClientUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -109,9 +110,8 @@ public class ParseKvBolt extends BaseBasicBolt { } } } - } catch (Exception e) { + } catch (AggregationException e) { logger.error("上层解析原始日志/拼接计算日志发送异常,异常信息:" + e); - e.printStackTrace(); } } diff --git a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java index a7e6a6b..2925188 100644 --- a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java @@ -1,6 +1,7 @@ package com.zdjizhi.storm.bolt; +import com.zdjizhi.storm.utils.exception.AggregationException; import com.zdjizhi.storm.utils.kafka.LogSendKafka; import com.zdjizhi.storm.utils.common.StreamAggregateConfig; import cn.hutool.log.Log; @@ -51,7 +52,7 @@ public class ResultSendBolt extends BaseBasicBolt { } } } - } catch (Exception e) { + } catch (AggregationException e) { logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e); } } diff --git a/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java index 0880315..7fffcc8 100644 --- a/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java +++ b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java @@ -1,8 +1,10 @@ package com.zdjizhi.storm.spout; +import cn.hutool.core.thread.ThreadUtil; import com.zdjizhi.storm.utils.common.StreamAggregateConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.storm.utils.exception.AggregationException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -68,13 +70,12 @@ public class CustomizedKafkaSpout extends BaseRichSpout { try { // TODO Auto-generated method stub ConsumerRecords<String, String> records = consumer.poll(10000L); - Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + ThreadUtil.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME); for (ConsumerRecord<String, String> record : records) { this.collector.emit(new Values(record.value())); } - } catch (Exception e) { + } catch (AggregationException e) { logger.error("KafkaSpout发送消息出现异常!", e); - e.printStackTrace(); } } diff --git a/src/main/java/com/zdjizhi/storm/topology/StormRunner.java b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java index 205f2f5..19b84ca 100644 --- a/src/main/java/com/zdjizhi/storm/topology/StormRunner.java +++ b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java @@ -1,6 +1,7 @@ package com.zdjizhi.storm.topology; +import cn.hutool.core.thread.ThreadUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; @@ -21,7 +22,7 @@ public final class StormRunner { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, conf, builder.createTopology()); - Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + ThreadUtil.sleep((long) runtimeInSeconds * MILLS_IN_SEC); localCluster.shutdown(); } diff --git a/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java index 388abba..9bb181c 100644 --- a/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java @@ -8,6 +8,7 @@ import com.zdjizhi.storm.spout.CustomizedKafkaSpout; import com.zdjizhi.storm.utils.common.StreamAggregateConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.storm.utils.exception.AggregationException; import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; @@ -81,25 +82,29 @@ public class StreamAggregateTopology { } - public static void main(String[] args) throws Exception { - StreamAggregateTopology streamAggregateTopology; - boolean runLocally = true; - int size = 2; - if (args.length >= size && StreamAggregateConfig.MODEL.equalsIgnoreCase(args[1])) { - runLocally = false; - streamAggregateTopology = new StreamAggregateTopology(args[0]); - } else { - streamAggregateTopology = new StreamAggregateTopology(); - } - - streamAggregateTopology.buildTopology(); - - if (runLocally) { - logger.info("执行本地模式..."); - streamAggregateTopology.runLocally(); - } else { - logger.info("执行远程部署模式..."); - streamAggregateTopology.runRemotely(); + public static void main(String[] args) { + try { + StreamAggregateTopology streamAggregateTopology; + boolean runLocally = true; + int size = 2; + if (args.length >= size && StreamAggregateConfig.MODEL.equalsIgnoreCase(args[1])) { + runLocally = false; + streamAggregateTopology = new StreamAggregateTopology(args[0]); + } else { + streamAggregateTopology = new StreamAggregateTopology(); + } + + streamAggregateTopology.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + streamAggregateTopology.runLocally(); + } else { + logger.info("执行远程部署模式..."); + streamAggregateTopology.runRemotely(); + } + } catch (AggregationException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) { + logger.error("Topology Start ERROR! message is:" + e); } } } diff --git a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java index 0563666..22cd06b 100644 --- a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java +++ b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java @@ -6,6 +6,7 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.tuple.Values; @@ -35,17 +36,16 @@ public class AggregateUtils { return Long.parseLong(value1) + Long.parseLong(value2); } - /** - * 计算Count - * - * @param count 当前count值 - * @return count+1 - */ - public static Long count(Long count) { - - count++; - return count; - } +// /** +// * 计算Count +// * +// * @param count 当前count值 +// * @return count+1 +// */ +// public static Long count(Long count) { +// +// return count++; +// } /** @@ -72,32 +72,32 @@ public class AggregateUtils { } - /** - * 递归发送tuple - * - * @param headIndex ssss - * @param splitArr - * @param initStr - * @param collector - * @param message - * @param dimesionsObj - * @param name - */ - public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) { - //递归拼接字符串 - if (splitArr.length != headIndex - 1) { - //递归的核心代码 - if ("".equals(initStr)) { - initStr = splitArr[splitArr.length - headIndex]; - } else { - initStr = initStr + "/" + splitArr[splitArr.length - headIndex]; - } - dimesionsObj.put(name, initStr); - - collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); - reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); - } - } +// /** +// * 递归发送tuple +// * +// * @param headIndex ssss +// * @param splitArr +// * @param initStr +// * @param collector +// * @param message +// * @param dimesionsObj +// * @param name +// */ +// public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) { +// //递归拼接字符串 +// if (splitArr.length != headIndex - 1) { +// //递归的核心代码 +// if ("".equals(initStr)) { +// initStr = splitArr[splitArr.length - headIndex]; +// } else { +// initStr = initStr + "/" + splitArr[splitArr.length - headIndex]; +// } +// dimesionsObj.put(name, initStr); +// +// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); +// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); +// } +// } /** @@ -149,25 +149,25 @@ public class AggregateUtils { try { Long begin = System.currentTimeMillis(); String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); - String data = JSONObject.parseObject(schema).getString("data"); - JSONArray objects = JSONArray.parseArray(data); - for (Object object : objects) { - JSONArray jsonArray = JSONArray.parseArray(object.toString()); - Long key = jsonArray.getLong(0); - String value = jsonArray.getString(1); - if (hashMap.containsKey(key)) { - if (!value.equals(hashMap.get(key))) { + if (StringUtil.isNotBlank(schema)) { + String data = JSONObject.parseObject(schema).getString("data"); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + Long key = jsonArray.getLong(0); + String value = jsonArray.getString(1); + if (hashMap.containsKey(key)) { + if (!value.equals(hashMap.get(key))) { + hashMap.put(key, value); + } + } else { hashMap.put(key, value); } - } else { - hashMap.put(key, value); } - + logger.warn("更新缓存对应关系用时:" + (begin - System.currentTimeMillis())); + logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size()); } - System.out.println((System.currentTimeMillis() - begin)); - logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size()); - } catch (Exception e) { - e.printStackTrace(); + } catch (RuntimeException e) { logger.error("更新缓存APP-ID失败,异常:" + e); } } diff --git a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java index 1f951c6..1790dd4 100644 --- a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java +++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java @@ -1,5 +1,11 @@ package com.zdjizhi.storm.utils.common; +import com.zdjizhi.storm.utils.exception.AggregationException; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang.ObjectUtils; + +import java.io.IOException; +import java.util.Locale; import java.util.Properties; @@ -39,7 +45,7 @@ public final class StreamAggregateConfigurations { public static Boolean getBooleanProperty(Integer type, String key) { if (type == 0) { - return "true".equals(propService.getProperty(key).toLowerCase().trim()); + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); } else { return null; } @@ -48,7 +54,7 @@ public final class StreamAggregateConfigurations { static { try { propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - } catch (Exception e) { + } catch (IOException | AggregationException e) { propService = null; } } diff --git a/src/main/java/com/zdjizhi/storm/utils/exception/AggregationException.java b/src/main/java/com/zdjizhi/storm/utils/exception/AggregationException.java new file mode 100644 index 0000000..3790c31 --- /dev/null +++ b/src/main/java/com/zdjizhi/storm/utils/exception/AggregationException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.storm.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class AggregationException extends RuntimeException { + + public AggregationException() { + } + + public AggregationException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java index 9757dd6..3385b65 100644 --- a/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java +++ b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java @@ -1,6 +1,5 @@ package com.zdjizhi.storm.utils.http; -import com.zdjizhi.storm.utils.logout.LogPrintUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.http.HttpEntity; @@ -19,6 +18,7 @@ import java.io.InputStreamReader; * @author qidaijie */ public class HttpClientUtil { + // private static final int MAX_STR_LEN = 1024; private static final Log logger = LogFactory.get(); /** @@ -29,36 +29,50 @@ public class HttpClientUtil { */ public static String requestByGetMethod(String http) { CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder = null; + StringBuilder entityStringBuilder; HttpGet get = new HttpGet(http); BufferedReader bufferedReader = null; - try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { + CloseableHttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(get); HttpEntity entity = httpResponse.getEntity(); entityStringBuilder = new StringBuilder(); if (null != entity) { bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - String line; - while ((line = bufferedReader.readLine()) != null) { - entityStringBuilder.append(line); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; + } + entityStringBuilder.append(c); } + + return entityStringBuilder.toString(); } - } catch (Exception e) { - logger.error(LogPrintUtil.print(e)); + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); } finally { if (httpClient != null) { try { httpClient.close(); } catch (IOException e) { - logger.error(LogPrintUtil.print(e)); + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); } } if (bufferedReader != null) { -// bufferedReader.close(); org.apache.commons.io.IOUtils.closeQuietly(bufferedReader); } } - return entityStringBuilder.toString(); + return ""; } } |
