summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-03-25 14:29:02 +0800
committerqidaijie <[email protected]>2021-03-25 14:29:02 +0800
commit928581d8600d03d6af11ea19a7bfc1bb0aa9d4ec (patch)
tree5ff76db96020107910bcf1953993e0b5866c253e
parent4120969aecb8c49d5e78176a1bc785fd1b3d7e78 (diff)
修复EAL4中低级警告代码版本
-rw-r--r--pom.xml28
-rw-r--r--properties/service_flow_config.properties10
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java5
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java6
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java4
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java3
-rw-r--r--src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java7
-rw-r--r--src/main/java/com/zdjizhi/storm/topology/StormRunner.java3
-rw-r--r--src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java43
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java104
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java10
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/exception/AggregationException.java18
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java36
13 files changed, 175 insertions, 102 deletions
diff --git a/pom.xml b/pom.xml
index 10baaf8..5088c6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
<groupId>cn.ac.iie</groupId>
<artifactId>storm-olap-aggregation</artifactId>
- <version>v3.20.11.17-ratelimit</version>
+ <version>v3.21.03.16-eal4</version>
<packaging>jar</packaging>
@@ -20,6 +20,19 @@
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+
</repositories>
@@ -56,6 +69,19 @@
</executions>
</plugin>
+ <plugin>
+ <groupId>io.github.zlika</groupId>
+ <artifactId>reproducible-build-maven-plugin</artifactId>
+ <version>0.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>strip-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 2166e6f..43e8ee0 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,5 +1,5 @@
#管理kafka地址
-bootstrap.servers=192.168.40.132:9092
+bootstrap.servers=192.168.44.12:9092
#latest/earliest
auto.offset.reset=latest
@@ -10,7 +10,7 @@ consumer.client.id=live-chart-consumer-connection-record
producer.client.id=live-chart-producer-connection-record
#生产者压缩模式 none or snappy
-producer.kafka.compression.type=snappy
+producer.kafka.compression.type=none
#kafka broker下的topic名称
kafka.topic=test
@@ -19,7 +19,7 @@ kafka.topic=test
group.id=test-20200922
#输出topic
-results.bootstrap.servers=192.168.40.132:9092
+results.bootstrap.servers=192.168.44.12:9092
#输出topic
results.output.topic=test-result
@@ -55,10 +55,10 @@ kafka.bolt.parallelism=1
batch.insert.num=2000
#网关的schema位置
-schema.http=http://192.168.40.132:9999/metadata/schema/v1/fields/liveChart
+schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart
#网关的schema位置
-app.id.http=http://192.168.40.132:9999/open-api/appDicList
+app.id.http=http://192.168.40.12:9999/open-api/appDicList
#数据中心(UID)
data.center.id.num=15
diff --git a/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java
index 87b20e6..d36cc5b 100644
--- a/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java
@@ -2,6 +2,7 @@ package com.zdjizhi.storm.bolt;
import com.zdjizhi.storm.utils.combine.AggregateUtils;
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.exception.AggregationException;
import com.zdjizhi.storm.utils.http.HttpClientUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -83,8 +84,8 @@ public class CalculateBolt extends BaseBasicBolt {
cacheMap.put(dimensions, cacheMessage);
}
- } catch (Exception e) {
- logger.error("计算节点异常,异常信息:" + e);
+ } catch (AggregationException e) {
+ logger.error("初次泛聚合异常,异常信息:" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java
index d996a84..3f1bc19 100644
--- a/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java
@@ -2,6 +2,7 @@ package com.zdjizhi.storm.bolt;
import com.zdjizhi.storm.utils.combine.AggregateUtils;
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.exception.AggregationException;
import com.zdjizhi.storm.utils.http.HttpClientUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -84,9 +85,8 @@ public class GatheringBolt extends BaseBasicBolt {
cacheMap.put(dimensions, cacheMessage);
}
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("计算节点异常,异常信息:" + e);
+ } catch (AggregationException e) {
+ logger.error("二次聚合异常,异常信息:" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
index 6203f16..14011c1 100644
--- a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
@@ -2,6 +2,7 @@ package com.zdjizhi.storm.bolt;
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.exception.AggregationException;
import com.zdjizhi.storm.utils.http.HttpClientUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -109,9 +110,8 @@ public class ParseKvBolt extends BaseBasicBolt {
}
}
}
- } catch (Exception e) {
+ } catch (AggregationException e) {
logger.error("上层解析原始日志/拼接计算日志发送异常,异常信息:" + e);
- e.printStackTrace();
}
}
diff --git a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
index a7e6a6b..2925188 100644
--- a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
@@ -1,6 +1,7 @@
package com.zdjizhi.storm.bolt;
+import com.zdjizhi.storm.utils.exception.AggregationException;
import com.zdjizhi.storm.utils.kafka.LogSendKafka;
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
import cn.hutool.log.Log;
@@ -51,7 +52,7 @@ public class ResultSendBolt extends BaseBasicBolt {
}
}
}
- } catch (Exception e) {
+ } catch (AggregationException e) {
logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java
index 0880315..7fffcc8 100644
--- a/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java
@@ -1,8 +1,10 @@
package com.zdjizhi.storm.spout;
+import cn.hutool.core.thread.ThreadUtil;
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.storm.utils.exception.AggregationException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -68,13 +70,12 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
try {
// TODO Auto-generated method stub
ConsumerRecords<String, String> records = consumer.poll(10000L);
- Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
+ ThreadUtil.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
for (ConsumerRecord<String, String> record : records) {
this.collector.emit(new Values(record.value()));
}
- } catch (Exception e) {
+ } catch (AggregationException e) {
logger.error("KafkaSpout发送消息出现异常!", e);
- e.printStackTrace();
}
}
diff --git a/src/main/java/com/zdjizhi/storm/topology/StormRunner.java b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java
index 205f2f5..19b84ca 100644
--- a/src/main/java/com/zdjizhi/storm/topology/StormRunner.java
+++ b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java
@@ -1,6 +1,7 @@
package com.zdjizhi.storm.topology;
+import cn.hutool.core.thread.ThreadUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
@@ -21,7 +22,7 @@ public final class StormRunner {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, conf, builder.createTopology());
- Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
+ ThreadUtil.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
localCluster.shutdown();
}
diff --git a/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java
index 388abba..9bb181c 100644
--- a/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java
+++ b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java
@@ -8,6 +8,7 @@ import com.zdjizhi.storm.spout.CustomizedKafkaSpout;
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.storm.utils.exception.AggregationException;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
@@ -81,25 +82,29 @@ public class StreamAggregateTopology {
}
- public static void main(String[] args) throws Exception {
- StreamAggregateTopology streamAggregateTopology;
- boolean runLocally = true;
- int size = 2;
- if (args.length >= size && StreamAggregateConfig.MODEL.equalsIgnoreCase(args[1])) {
- runLocally = false;
- streamAggregateTopology = new StreamAggregateTopology(args[0]);
- } else {
- streamAggregateTopology = new StreamAggregateTopology();
- }
-
- streamAggregateTopology.buildTopology();
-
- if (runLocally) {
- logger.info("执行本地模式...");
- streamAggregateTopology.runLocally();
- } else {
- logger.info("执行远程部署模式...");
- streamAggregateTopology.runRemotely();
+ public static void main(String[] args) {
+ try {
+ StreamAggregateTopology streamAggregateTopology;
+ boolean runLocally = true;
+ int size = 2;
+ if (args.length >= size && StreamAggregateConfig.MODEL.equalsIgnoreCase(args[1])) {
+ runLocally = false;
+ streamAggregateTopology = new StreamAggregateTopology(args[0]);
+ } else {
+ streamAggregateTopology = new StreamAggregateTopology();
+ }
+
+ streamAggregateTopology.buildTopology();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ streamAggregateTopology.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ streamAggregateTopology.runRemotely();
+ }
+ } catch (AggregationException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) {
+ logger.error("Topology Start ERROR! message is:" + e);
}
}
}
diff --git a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
index 0563666..22cd06b 100644
--- a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
+++ b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
@@ -6,6 +6,7 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.tuple.Values;
@@ -35,17 +36,16 @@ public class AggregateUtils {
return Long.parseLong(value1) + Long.parseLong(value2);
}
- /**
- * 计算Count
- *
- * @param count 当前count值
- * @return count+1
- */
- public static Long count(Long count) {
-
- count++;
- return count;
- }
+// /**
+// * 计算Count
+// *
+// * @param count 当前count值
+// * @return count+1
+// */
+// public static Long count(Long count) {
+//
+// return count++;
+// }
/**
@@ -72,32 +72,32 @@ public class AggregateUtils {
}
- /**
- * 递归发送tuple
- *
- * @param headIndex ssss
- * @param splitArr
- * @param initStr
- * @param collector
- * @param message
- * @param dimesionsObj
- * @param name
- */
- public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) {
- //递归拼接字符串
- if (splitArr.length != headIndex - 1) {
- //递归的核心代码
- if ("".equals(initStr)) {
- initStr = splitArr[splitArr.length - headIndex];
- } else {
- initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
- }
- dimesionsObj.put(name, initStr);
-
- collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
- reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
- }
- }
+// /**
+// * 递归发送tuple
+// *
+// * @param headIndex ssss
+// * @param splitArr
+// * @param initStr
+// * @param collector
+// * @param message
+// * @param dimesionsObj
+// * @param name
+// */
+// public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) {
+// //递归拼接字符串
+// if (splitArr.length != headIndex - 1) {
+// //递归的核心代码
+// if ("".equals(initStr)) {
+// initStr = splitArr[splitArr.length - headIndex];
+// } else {
+// initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
+// }
+// dimesionsObj.put(name, initStr);
+//
+// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
+// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
+// }
+// }
/**
@@ -149,25 +149,25 @@ public class AggregateUtils {
try {
Long begin = System.currentTimeMillis();
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
- String data = JSONObject.parseObject(schema).getString("data");
- JSONArray objects = JSONArray.parseArray(data);
- for (Object object : objects) {
- JSONArray jsonArray = JSONArray.parseArray(object.toString());
- Long key = jsonArray.getLong(0);
- String value = jsonArray.getString(1);
- if (hashMap.containsKey(key)) {
- if (!value.equals(hashMap.get(key))) {
+ if (StringUtil.isNotBlank(schema)) {
+ String data = JSONObject.parseObject(schema).getString("data");
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ Long key = jsonArray.getLong(0);
+ String value = jsonArray.getString(1);
+ if (hashMap.containsKey(key)) {
+ if (!value.equals(hashMap.get(key))) {
+ hashMap.put(key, value);
+ }
+ } else {
hashMap.put(key, value);
}
- } else {
- hashMap.put(key, value);
}
-
+ logger.warn("更新缓存对应关系用时:" + (begin - System.currentTimeMillis()));
+ logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
}
- System.out.println((System.currentTimeMillis() - begin));
- logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (RuntimeException e) {
logger.error("更新缓存APP-ID失败,异常:" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java
index 1f951c6..1790dd4 100644
--- a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java
+++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java
@@ -1,5 +1,11 @@
package com.zdjizhi.storm.utils.common;
+import com.zdjizhi.storm.utils.exception.AggregationException;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang.ObjectUtils;
+
+import java.io.IOException;
+import java.util.Locale;
import java.util.Properties;
@@ -39,7 +45,7 @@ public final class StreamAggregateConfigurations {
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
- return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else {
return null;
}
@@ -48,7 +54,7 @@ public final class StreamAggregateConfigurations {
static {
try {
propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- } catch (Exception e) {
+ } catch (IOException | AggregationException e) {
propService = null;
}
}
diff --git a/src/main/java/com/zdjizhi/storm/utils/exception/AggregationException.java b/src/main/java/com/zdjizhi/storm/utils/exception/AggregationException.java
new file mode 100644
index 0000000..3790c31
--- /dev/null
+++ b/src/main/java/com/zdjizhi/storm/utils/exception/AggregationException.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.storm.utils.exception;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.execption
+ * @Description:
+ * @date 2021/3/259:42
+ */
+public class AggregationException extends RuntimeException {
+
+ public AggregationException() {
+ }
+
+ public AggregationException(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java
index 9757dd6..3385b65 100644
--- a/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java
+++ b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java
@@ -1,6 +1,5 @@
package com.zdjizhi.storm.utils.http;
-import com.zdjizhi.storm.utils.logout.LogPrintUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.http.HttpEntity;
@@ -19,6 +18,7 @@ import java.io.InputStreamReader;
* @author qidaijie
*/
public class HttpClientUtil {
+ // private static final int MAX_STR_LEN = 1024;
private static final Log logger = LogFactory.get();
/**
@@ -29,36 +29,50 @@ public class HttpClientUtil {
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
- StringBuilder entityStringBuilder = null;
+ StringBuilder entityStringBuilder;
HttpGet get = new HttpGet(http);
BufferedReader bufferedReader = null;
- try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
+ CloseableHttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(get);
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- entityStringBuilder.append(line);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
+ }
+ entityStringBuilder.append(c);
}
+
+ return entityStringBuilder.toString();
}
- } catch (Exception e) {
- logger.error(LogPrintUtil.print(e));
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
} finally {
if (httpClient != null) {
try {
httpClient.close();
} catch (IOException e) {
- logger.error(LogPrintUtil.print(e));
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
}
}
if (bufferedReader != null) {
-// bufferedReader.close();
org.apache.commons.io.IOUtils.closeQuietly(bufferedReader);
}
}
- return entityStringBuilder.toString();
+ return "";
}
}