summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-03-16 14:46:14 +0800
committerqidaijie <[email protected]>2021-03-16 14:46:14 +0800
commit4120969aecb8c49d5e78176a1bc785fd1b3d7e78 (patch)
tree5d5375b2e2813f5efd9fed663e089c0d95818d60
parent2dd9c75278861c3bcfc513113f6b98e1a081b2bd (diff)
修复EAL4代码版本
-rw-r--r--pom.xml8
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java55
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java)13
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java)13
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java)27
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java)13
-rw-r--r--src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java (renamed from src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java)10
-rw-r--r--src/main/java/com/zdjizhi/storm/topology/StormRunner.java (renamed from src/main/java/cn/ac/iie/storm/topology/StormRunner.java)2
-rw-r--r--src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java (renamed from src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java)37
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java (renamed from src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java)22
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java (renamed from src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java)3
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java (renamed from src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java)12
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java64
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/kafka/LogSendKafka.java (renamed from src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java)9
14 files changed, 137 insertions, 151 deletions
diff --git a/pom.xml b/pom.xml
index 64cdbe0..10baaf8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>cn.ac.iie.storm.topology.StreamAggregateTopology</mainClass>
+ <mainClass>com.zdjizhi.storm.topology.StreamAggregateTopology</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
@@ -203,5 +203,11 @@
<version>4.4.1</version>
</dependency>
+ <dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.5.2</version>
+ </dependency>
+
</dependencies>
</project>
diff --git a/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java b/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java
deleted file mode 100644
index 2e00efd..0000000
--- a/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package cn.ac.iie.storm.utils.http;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * 获取网关schema的工具类
- *
- * @author qidaijie
- */
-public class HttpClientUtil {
-
- /**
- * 请求网关获取schema
- * @param http 网关url
- * @return schema
- */
- public static String requestByGetMethod(String http) {
- CloseableHttpClient httpClient = HttpClients.createDefault();
- StringBuilder entityStringBuilder = null;
- try {
- HttpGet get = new HttpGet(http);
- try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
- HttpEntity entity = httpResponse.getEntity();
- entityStringBuilder = new StringBuilder();
- if (null != entity) {
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- entityStringBuilder.append(line);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (httpClient != null) {
- httpClient.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return entityStringBuilder.toString();
- }
-
-}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java
index 1b62844..87b20e6 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java
@@ -1,10 +1,11 @@
-package cn.ac.iie.storm.bolt;
+package com.zdjizhi.storm.bolt;
-import cn.ac.iie.storm.utils.combine.AggregateUtils;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
-import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.zdjizhi.storm.utils.combine.AggregateUtils;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.http.HttpClientUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
-import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
@@ -25,7 +26,7 @@ import java.util.Map;
* @Version V1.0
**/
public class CalculateBolt extends BaseBasicBolt {
- private final static Logger logger = Logger.getLogger(CalculateBolt.class);
+ private static final Log logger = LogFactory.get();
private static final long serialVersionUID = -7666031217706448622L;
private HashMap<String, JSONObject> metricsMap;
private HashMap<String, String[]> actionMap;
diff --git a/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java
index a9051db..d996a84 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java
@@ -1,10 +1,11 @@
-package cn.ac.iie.storm.bolt;
+package com.zdjizhi.storm.bolt;
-import cn.ac.iie.storm.utils.combine.AggregateUtils;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
-import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.zdjizhi.storm.utils.combine.AggregateUtils;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.http.HttpClientUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
-import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
@@ -25,7 +26,7 @@ import java.util.Map;
* @Version V1.0
**/
public class GatheringBolt extends BaseBasicBolt {
- private final static Logger logger = Logger.getLogger(GatheringBolt.class);
+ private static final Log logger = LogFactory.get();
private static final long serialVersionUID = -6166717864837350277L;
private HashMap<String, JSONObject> metricsMap;
private HashMap<String, String[]> actionMap;
diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
index d7fdad9..6203f16 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
@@ -1,13 +1,13 @@
-package cn.ac.iie.storm.bolt;
+package com.zdjizhi.storm.bolt;
-import cn.ac.iie.storm.utils.combine.AggregateUtils;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
-import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.http.HttpClientUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
-import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -21,8 +21,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import static cn.ac.iie.storm.utils.combine.AggregateUtils.transDimensions;
-import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation;
+import static com.zdjizhi.storm.utils.combine.AggregateUtils.transDimensions;
+import static com.zdjizhi.storm.utils.combine.AggregateUtils.updateAppRelation;
/**
* @ClassNameMyWindowBolt
@@ -31,7 +31,7 @@ import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation;
* @Version V1.0
**/
public class ParseKvBolt extends BaseBasicBolt {
- private final static Logger logger = Logger.getLogger(ParseKvBolt.class);
+ private static final Log logger = LogFactory.get();
private static final long serialVersionUID = -999382396035310355L;
private JSONArray transforms;
private JSONArray dimensions;
@@ -102,17 +102,6 @@ public class ParseKvBolt extends BaseBasicBolt {
collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString()));
}
break;
-// case "hierarchy":
-// String hierarchyValue = message.getString(fieldName);
-// //TODO 执行拆分代码
-// if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) {
-// String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
-// String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]);
-// //TODO 递归拼接tuple并发送出去
-// System.out.println(dimensionsObj.get(name) + "---" + dimensionsObj.toString() + "---" + message.toString());
-// AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name);
-// }
-// break;
default:
break;
}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
index af53133..a7e6a6b 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
@@ -1,13 +1,12 @@
-package cn.ac.iie.storm.bolt;
+package com.zdjizhi.storm.bolt;
-import cn.ac.iie.storm.utils.combine.AggregateUtils;
-import cn.ac.iie.storm.utils.common.LogSendKafka;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.kafka.LogSendKafka;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
-import kafka.utils.json.JsonObject;
-import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -24,7 +23,7 @@ import java.util.Map;
public class ResultSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3237813470939823159L;
- private static Logger logger = Logger.getLogger(ResultSendBolt.class);
+ private static final Log logger = LogFactory.get();
private LogSendKafka logSendKafka;
@Override
diff --git a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java
index feeb86c..0880315 100644
--- a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java
@@ -1,11 +1,12 @@
-package cn.ac.iie.storm.spout;
+package com.zdjizhi.storm.spout;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.log4j.Logger;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -27,8 +28,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
private KafkaConsumer<String, String> consumer;
private SpoutOutputCollector collector = null;
private TopologyContext context = null;
- private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
-
+ private static final Log logger = LogFactory.get();
private static Properties createConsumerConfig() {
Properties props = new Properties();
diff --git a/src/main/java/cn/ac/iie/storm/topology/StormRunner.java b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java
index 6e77c66..205f2f5 100644
--- a/src/main/java/cn/ac/iie/storm/topology/StormRunner.java
+++ b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.storm.topology;
+package com.zdjizhi.storm.topology;
import org.apache.storm.Config;
diff --git a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java
index 520b829..388abba 100644
--- a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java
+++ b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java
@@ -1,12 +1,13 @@
-package cn.ac.iie.storm.topology;
-
-import cn.ac.iie.storm.bolt.CalculateBolt;
-import cn.ac.iie.storm.bolt.GatheringBolt;
-import cn.ac.iie.storm.bolt.ResultSendBolt;
-import cn.ac.iie.storm.bolt.ParseKvBolt;
-import cn.ac.iie.storm.spout.CustomizedKafkaSpout;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
-import org.apache.log4j.Logger;
+package com.zdjizhi.storm.topology;
+
+import com.zdjizhi.storm.bolt.CalculateBolt;
+import com.zdjizhi.storm.bolt.GatheringBolt;
+import com.zdjizhi.storm.bolt.ResultSendBolt;
+import com.zdjizhi.storm.bolt.ParseKvBolt;
+import com.zdjizhi.storm.spout.CustomizedKafkaSpout;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
@@ -22,8 +23,7 @@ import org.apache.storm.tuple.Fields;
**/
public class StreamAggregateTopology {
-
- private static Logger logger = Logger.getLogger(StreamAggregateTopology.class);
+ private static final Log logger = LogFactory.get();
private final String topologyName;
private final Config topologyConfig;
private TopologyBuilder builder;
@@ -82,25 +82,24 @@ public class StreamAggregateTopology {
}
public static void main(String[] args) throws Exception {
- StreamAggregateTopology csst = null;
+ StreamAggregateTopology streamAggregateTopology;
boolean runLocally = true;
- String parameter = "remote";
int size = 2;
- if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
+ if (args.length >= size && StreamAggregateConfig.MODEL.equalsIgnoreCase(args[1])) {
runLocally = false;
- csst = new StreamAggregateTopology(args[0]);
+ streamAggregateTopology = new StreamAggregateTopology(args[0]);
} else {
- csst = new StreamAggregateTopology();
+ streamAggregateTopology = new StreamAggregateTopology();
}
- csst.buildTopology();
+ streamAggregateTopology.buildTopology();
if (runLocally) {
logger.info("执行本地模式...");
- csst.runLocally();
+ streamAggregateTopology.runLocally();
} else {
logger.info("执行远程部署模式...");
- csst.runRemotely();
+ streamAggregateTopology.runRemotely();
}
}
}
diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
index de5938c..0563666 100644
--- a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
+++ b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
@@ -1,10 +1,11 @@
-package cn.ac.iie.storm.utils.combine;
+package com.zdjizhi.storm.utils.combine;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
-import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.http.HttpClientUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.tuple.Values;
@@ -18,7 +19,7 @@ import java.util.HashMap;
* @Version V1.0
**/
public class AggregateUtils {
- private final static Logger logger = Logger.getLogger(AggregateUtils.class);
+ private static final Log logger = LogFactory.get();
/**
* Long类型的数据求和
@@ -139,17 +140,6 @@ public class AggregateUtils {
.getString("name");
}
- public static void main(String[] args) {
- String sc = "{\"status\":200,\"code\":\"200666\",\"queryKey\":null,\"success\":true,\"message\":\"ok\",\"statistics\":null,\"formatType\":null,\"meta\":null,\"data\":{\"type\":\"record\",\"name\":\"liveChart\",\"doc\":{\"timestamp\":{\"name\":\"stat_time\",\"type\":\"Long\"},\"dimensions\":[{\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"type\":\"String\"},{\"name\":\"entrance_id\",\"fieldName\":\"common_entrance_id\",\"type\":\"String\"},{\"name\":\"isp\",\"fieldName\":\"common_isp\",\"type\":\"String\"},{\"name\":\"data_center\",\"fieldName\":\"common_data_center\",\"type\":\"String\"}],\"metrics\":[{\"function\":\"sum\",\"name\":\"sessions\",\"fieldName\":\"common_sessions\"},{\"function\":\"sum\",\"name\":\"c2s_byte_num\",\"fieldName\":\"common_c2s_byte_num\"},{\"function\":\"sum\",\"name\":\"s2c_byte_num\",\"fieldName\":\"common_s2c_byte_num\"},{\"function\":\"sum\",\"name\":\"c2s_pkt_num\",\"fieldName\":\"common_c2s_pkt_num\"},{\"function\":\"sum\",\"name\":\"s2c_pkt_num\",\"fieldName\":\"common_s2c_pkt_num\"},{\"function\":\"sum\",\"name\":\"c2s_ipfrag_num\",\"fieldName\":\"common_c2s_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"s2c_ipfrag_num\",\"fieldName\":\"common_s2c_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_lostlen\",\"fieldName\":\"common_c2s_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_lostlen\",\"fieldName\":\"common_s2c_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_unorder_num\",\"fieldName\":\"common_c2s_tcp_unorder_num\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_unorder_num\",\"fieldName\":\"common_s2c_tcp_unorder_num\"},{\"function\":\"disCount\",\"name\":\"unique_sip_num\",\"fieldName\":\"common_server_ip\"},{\"function\":\"disCount\",\"name\":\"unique_cip_num\",\"fieldName\":\"common_client_ip\"}],\"filters\":{\"type\":\"and\",\"fields\":[{\"fieldName\":\"common_protocol_label\",\"type\":\"not\",\"values\":null}]},\"transforms\":[{\"fieldName\":\"common_app_label\",\"function\":\"replaceapp\",\"name\":\"common_app_label\",\"parameters\":\"0,/\"},{\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"common_app_label,/\"},{\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"/\"}],\"action\":[{\"label\":\"Default\",\"metrics\":\"sessions,c2s_byte_num,s2c_byte_num,c2s_pkt_num,s2c_pkt_num,c2s_ipfrag_num,s2c_ipfrag_num,c2s_tcp_lostlen,s2c_tcp_lostlen,c2s_tcp_unorder_num,s2c_tcp_unorder_num\"}],\"granularity\":{\"type\":\"period\",\"period\":\"5M\"}},\"fields\":[],\"task\":\"Protocol-Distribution\",\"in\":\"CONNECTION-RECORD-COMPLETED-LOG\",\"out\":\"TRAFFIC-PROTOCOL-STAT-LOG\"}}";
- JSONObject jsonObject = JSONObject.parseObject(sc);
- String data = jsonObject.getString("data");
-
- System.out.println(JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data)
- .getString("doc"))
- .getString("timestamp"))
- .getString("name"));
- }
-
/**
* 更新缓存中的对应关系map
*
diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java
index fdeace4..32e059b 100644
--- a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java
+++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.storm.utils.file;
+package com.zdjizhi.storm.utils.common;
/**
@@ -7,6 +7,7 @@ package cn.ac.iie.storm.utils.file;
public class StreamAggregateConfig {
public static final String FORMAT_SPLITTER = ",";
+ public static final String MODEL = "remote";
/**
* System
*/
diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java
index 03f67c0..1f951c6 100644
--- a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java
+++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.storm.utils.file;
+package com.zdjizhi.storm.utils.common;
import java.util.Properties;
@@ -9,15 +9,12 @@ import java.util.Properties;
public final class StreamAggregateConfigurations {
- // private static Properties propCommon = new Properties();
private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
-// } else if (type == 1) {
-// return propCommon.getProperty(key);
} else {
return null;
}
@@ -27,8 +24,6 @@ public final class StreamAggregateConfigurations {
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
-// } else if (type == 1) {
-// return Integer.parseInt(propCommon.getProperty(key));
} else {
return null;
}
@@ -37,8 +32,6 @@ public final class StreamAggregateConfigurations {
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
-// } else if (type == 1) {
-// return Long.parseLong(propCommon.getProperty(key));
} else {
return null;
}
@@ -47,8 +40,6 @@ public final class StreamAggregateConfigurations {
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return "true".equals(propService.getProperty(key).toLowerCase().trim());
-// } else if (type == 1) {
-// return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
} else {
return null;
}
@@ -58,7 +49,6 @@ public final class StreamAggregateConfigurations {
try {
propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
} catch (Exception e) {
-// propCommon = null;
propService = null;
}
}
diff --git a/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..9757dd6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java
@@ -0,0 +1,64 @@
+package com.zdjizhi.storm.utils.http;
+
+import com.zdjizhi.storm.utils.logout.LogPrintUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 获取网关schema的工具类
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 请求网关获取schema
+ *
+ * @param http 网关url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder = null;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ entityStringBuilder.append(line);
+ }
+ }
+ } catch (Exception e) {
+ logger.error(LogPrintUtil.print(e));
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ logger.error(LogPrintUtil.print(e));
+ }
+ }
+ if (bufferedReader != null) {
+// bufferedReader.close();
+ org.apache.commons.io.IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ return entityStringBuilder.toString();
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java b/src/main/java/com/zdjizhi/storm/utils/kafka/LogSendKafka.java
index ee42553..51cdd0d 100644
--- a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java
+++ b/src/main/java/com/zdjizhi/storm/utils/kafka/LogSendKafka.java
@@ -1,9 +1,10 @@
-package cn.ac.iie.storm.utils.common;
+package com.zdjizhi.storm.utils.kafka;
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.producer.*;
-import org.apache.log4j.Logger;
import java.util.Properties;
@@ -16,7 +17,7 @@ import java.util.Properties;
public class LogSendKafka {
- private static Logger logger = Logger.getLogger(LogSendKafka.class);
+ private static final Log logger = LogFactory.get();
/**
* kafka生产者,用于向kafka中发送消息