diff options
| author | qidaijie <[email protected]> | 2021-03-16 14:46:14 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-03-16 14:46:14 +0800 |
| commit | 4120969aecb8c49d5e78176a1bc785fd1b3d7e78 (patch) | |
| tree | 5d5375b2e2813f5efd9fed663e089c0d95818d60 | |
| parent | 2dd9c75278861c3bcfc513113f6b98e1a081b2bd (diff) | |
修复EAL4代码版本
| -rw-r--r-- | pom.xml | 8 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java | 55 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java) | 13 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java) | 13 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java) | 27 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java) | 13 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java (renamed from src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java) | 10 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/topology/StormRunner.java (renamed from src/main/java/cn/ac/iie/storm/topology/StormRunner.java) | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java (renamed from src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java) | 37 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java (renamed from src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java) | 22 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java (renamed from src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java) | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java (renamed from src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java) | 12 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java | 64 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/utils/kafka/LogSendKafka.java (renamed from src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java) | 9 |
14 files changed, 137 insertions, 151 deletions
@@ -40,7 +40,7 @@ <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>cn.ac.iie.storm.topology.StreamAggregateTopology</mainClass> + <mainClass>com.zdjizhi.storm.topology.StreamAggregateTopology</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> @@ -203,5 +203,11 @@ <version>4.4.1</version> </dependency> + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.5.2</version> + </dependency> + </dependencies> </project> diff --git a/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java b/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java deleted file mode 100644 index 2e00efd..0000000 --- a/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java +++ /dev/null @@ -1,55 +0,0 @@ -package cn.ac.iie.storm.utils.http; - -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -/** - * 获取网关schema的工具类 - * - * @author qidaijie - */ -public class HttpClientUtil { - - /** - * 请求网关获取schema - * @param http 网关url - * @return schema - */ - public static String requestByGetMethod(String http) { - CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder = null; - try { - HttpGet get = new HttpGet(http); - try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - String line; - while ((line = bufferedReader.readLine()) != null) { - entityStringBuilder.append(line); - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - try { - if (httpClient != null) { - httpClient.close(); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - return entityStringBuilder.toString(); - } - -} diff --git a/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java index 1b62844..87b20e6 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/CalculateBolt.java @@ -1,10 +1,11 @@ -package cn.ac.iie.storm.bolt; +package com.zdjizhi.storm.bolt; -import cn.ac.iie.storm.utils.combine.AggregateUtils; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; -import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.zdjizhi.storm.utils.combine.AggregateUtils; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import com.zdjizhi.storm.utils.http.HttpClientUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; -import org.apache.log4j.Logger; import org.apache.storm.Config; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; @@ -25,7 +26,7 @@ import java.util.Map; * @Version V1.0 **/ public class CalculateBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(CalculateBolt.class); + private static final Log logger = LogFactory.get(); private static final long serialVersionUID = -7666031217706448622L; private HashMap<String, JSONObject> metricsMap; private HashMap<String, String[]> actionMap; diff --git a/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java index a9051db..d996a84 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/GatheringBolt.java @@ -1,10 +1,11 @@ -package cn.ac.iie.storm.bolt; +package com.zdjizhi.storm.bolt; -import cn.ac.iie.storm.utils.combine.AggregateUtils; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; -import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.zdjizhi.storm.utils.combine.AggregateUtils; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import com.zdjizhi.storm.utils.http.HttpClientUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; -import org.apache.log4j.Logger; import org.apache.storm.Config; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; @@ -25,7 +26,7 @@ import java.util.Map; * @Version V1.0 **/ public class GatheringBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(GatheringBolt.class); + private static final Log logger = LogFactory.get(); private static final long serialVersionUID = -6166717864837350277L; private HashMap<String, JSONObject> metricsMap; private HashMap<String, String[]> actionMap; diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java index d7fdad9..6203f16 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java @@ -1,13 +1,13 @@ -package cn.ac.iie.storm.bolt; +package com.zdjizhi.storm.bolt; -import cn.ac.iie.storm.utils.combine.AggregateUtils; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; -import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import com.zdjizhi.storm.utils.http.HttpClientUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -21,8 +21,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import static cn.ac.iie.storm.utils.combine.AggregateUtils.transDimensions; -import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation; +import static com.zdjizhi.storm.utils.combine.AggregateUtils.transDimensions; +import static com.zdjizhi.storm.utils.combine.AggregateUtils.updateAppRelation; /** * @ClassNameMyWindowBolt @@ -31,7 +31,7 @@ import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation; * @Version V1.0 **/ public class ParseKvBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(ParseKvBolt.class); + private static final Log logger = LogFactory.get(); private static final long serialVersionUID = -999382396035310355L; private JSONArray transforms; private JSONArray dimensions; @@ -102,17 +102,6 @@ public class ParseKvBolt extends BaseBasicBolt { collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString())); } break; -// case "hierarchy": -// String hierarchyValue = message.getString(fieldName); -// //TODO 执行拆分代码 -// if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) { -// String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); -// String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]); -// //TODO 递归拼接tuple并发送出去 -// System.out.println(dimensionsObj.get(name) + "---" + dimensionsObj.toString() + "---" + message.toString()); -// AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name); -// } -// break; default: break; } diff --git a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java index af53133..a7e6a6b 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java @@ -1,13 +1,12 @@ -package cn.ac.iie.storm.bolt; +package com.zdjizhi.storm.bolt; -import cn.ac.iie.storm.utils.combine.AggregateUtils; -import cn.ac.iie.storm.utils.common.LogSendKafka; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import com.zdjizhi.storm.utils.kafka.LogSendKafka; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; import com.zdjizhi.utils.StringUtil; -import kafka.utils.json.JsonObject; -import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -24,7 +23,7 @@ import java.util.Map; public class ResultSendBolt extends BaseBasicBolt { private static final long serialVersionUID = 3237813470939823159L; - private static Logger logger = Logger.getLogger(ResultSendBolt.class); + private static final Log logger = LogFactory.get(); private LogSendKafka logSendKafka; @Override diff --git a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java index feeb86c..0880315 100644 --- a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java +++ b/src/main/java/com/zdjizhi/storm/spout/CustomizedKafkaSpout.java @@ -1,11 +1,12 @@ -package cn.ac.iie.storm.spout; +package com.zdjizhi.storm.spout; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.log4j.Logger; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -27,8 +28,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout { private KafkaConsumer<String, String> consumer; private SpoutOutputCollector collector = null; private TopologyContext context = null; - private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); - + private static final Log logger = LogFactory.get(); private static Properties createConsumerConfig() { Properties props = new Properties(); diff --git a/src/main/java/cn/ac/iie/storm/topology/StormRunner.java b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java index 6e77c66..205f2f5 100644 --- a/src/main/java/cn/ac/iie/storm/topology/StormRunner.java +++ b/src/main/java/com/zdjizhi/storm/topology/StormRunner.java @@ -1,4 +1,4 @@ -package cn.ac.iie.storm.topology; +package com.zdjizhi.storm.topology; import org.apache.storm.Config; diff --git a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java index 520b829..388abba 100644 --- a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/storm/topology/StreamAggregateTopology.java @@ -1,12 +1,13 @@ -package cn.ac.iie.storm.topology; - -import cn.ac.iie.storm.bolt.CalculateBolt; -import cn.ac.iie.storm.bolt.GatheringBolt; -import cn.ac.iie.storm.bolt.ResultSendBolt; -import cn.ac.iie.storm.bolt.ParseKvBolt; -import cn.ac.iie.storm.spout.CustomizedKafkaSpout; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; -import org.apache.log4j.Logger; +package com.zdjizhi.storm.topology; + +import com.zdjizhi.storm.bolt.CalculateBolt; +import com.zdjizhi.storm.bolt.GatheringBolt; +import com.zdjizhi.storm.bolt.ResultSendBolt; +import com.zdjizhi.storm.bolt.ParseKvBolt; +import com.zdjizhi.storm.spout.CustomizedKafkaSpout; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; @@ -22,8 +23,7 @@ import org.apache.storm.tuple.Fields; **/ public class StreamAggregateTopology { - - private static Logger logger = Logger.getLogger(StreamAggregateTopology.class); + private static final Log logger = LogFactory.get(); private final String topologyName; private final Config topologyConfig; private TopologyBuilder builder; @@ -82,25 +82,24 @@ public class StreamAggregateTopology { } public static void main(String[] args) throws Exception { - StreamAggregateTopology csst = null; + StreamAggregateTopology streamAggregateTopology; boolean runLocally = true; - String parameter = "remote"; int size = 2; - if (args.length >= size && parameter.equalsIgnoreCase(args[1])) { + if (args.length >= size && StreamAggregateConfig.MODEL.equalsIgnoreCase(args[1])) { runLocally = false; - csst = new StreamAggregateTopology(args[0]); + streamAggregateTopology = new StreamAggregateTopology(args[0]); } else { - csst = new StreamAggregateTopology(); + streamAggregateTopology = new StreamAggregateTopology(); } - csst.buildTopology(); + streamAggregateTopology.buildTopology(); if (runLocally) { logger.info("执行本地模式..."); - csst.runLocally(); + streamAggregateTopology.runLocally(); } else { logger.info("执行远程部署模式..."); - csst.runRemotely(); + streamAggregateTopology.runRemotely(); } } } diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java index de5938c..0563666 100644 --- a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java +++ b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java @@ -1,10 +1,11 @@ -package cn.ac.iie.storm.utils.combine; +package com.zdjizhi.storm.utils.combine; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; -import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import com.zdjizhi.storm.utils.http.HttpClientUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import org.apache.log4j.Logger; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.tuple.Values; @@ -18,7 +19,7 @@ import java.util.HashMap; * @Version V1.0 **/ public class AggregateUtils { - private final static Logger logger = Logger.getLogger(AggregateUtils.class); + private static final Log logger = LogFactory.get(); /** * Long类型的数据求和 @@ -139,17 +140,6 @@ public class AggregateUtils { .getString("name"); } - public static void main(String[] args) { - String sc = "{\"status\":200,\"code\":\"200666\",\"queryKey\":null,\"success\":true,\"message\":\"ok\",\"statistics\":null,\"formatType\":null,\"meta\":null,\"data\":{\"type\":\"record\",\"name\":\"liveChart\",\"doc\":{\"timestamp\":{\"name\":\"stat_time\",\"type\":\"Long\"},\"dimensions\":[{\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"type\":\"String\"},{\"name\":\"entrance_id\",\"fieldName\":\"common_entrance_id\",\"type\":\"String\"},{\"name\":\"isp\",\"fieldName\":\"common_isp\",\"type\":\"String\"},{\"name\":\"data_center\",\"fieldName\":\"common_data_center\",\"type\":\"String\"}],\"metrics\":[{\"function\":\"sum\",\"name\":\"sessions\",\"fieldName\":\"common_sessions\"},{\"function\":\"sum\",\"name\":\"c2s_byte_num\",\"fieldName\":\"common_c2s_byte_num\"},{\"function\":\"sum\",\"name\":\"s2c_byte_num\",\"fieldName\":\"common_s2c_byte_num\"},{\"function\":\"sum\",\"name\":\"c2s_pkt_num\",\"fieldName\":\"common_c2s_pkt_num\"},{\"function\":\"sum\",\"name\":\"s2c_pkt_num\",\"fieldName\":\"common_s2c_pkt_num\"},{\"function\":\"sum\",\"name\":\"c2s_ipfrag_num\",\"fieldName\":\"common_c2s_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"s2c_ipfrag_num\",\"fieldName\":\"common_s2c_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_lostlen\",\"fieldName\":\"common_c2s_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_lostlen\",\"fieldName\":\"common_s2c_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_unorder_num\",\"fieldName\":\"common_c2s_tcp_unorder_num\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_unorder_num\",\"fieldName\":\"common_s2c_tcp_unorder_num\"},{\"function\":\"disCount\",\"name\":\"unique_sip_num\",\"fieldName\":\"common_server_ip\"},{\"function\":\"disCount\",\"name\":\"unique_cip_num\",\"fieldName\":\"common_client_ip\"}],\"filters\":{\"type\":\"and\",\"fields\":[{\"fieldName\":\"common_protocol_label\",\"type\":\"not\",\"values\":null}]},\"transforms\":[{\"fieldName\":\"common_app_label\",\"function\":\"replaceapp\",\"name\":\"common_app_label\",\"parameters\":\"0,/\"},{\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"common_app_label,/\"},{\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"/\"}],\"action\":[{\"label\":\"Default\",\"metrics\":\"sessions,c2s_byte_num,s2c_byte_num,c2s_pkt_num,s2c_pkt_num,c2s_ipfrag_num,s2c_ipfrag_num,c2s_tcp_lostlen,s2c_tcp_lostlen,c2s_tcp_unorder_num,s2c_tcp_unorder_num\"}],\"granularity\":{\"type\":\"period\",\"period\":\"5M\"}},\"fields\":[],\"task\":\"Protocol-Distribution\",\"in\":\"CONNECTION-RECORD-COMPLETED-LOG\",\"out\":\"TRAFFIC-PROTOCOL-STAT-LOG\"}}"; - JSONObject jsonObject = JSONObject.parseObject(sc); - String data = jsonObject.getString("data"); - - System.out.println(JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data) - .getString("doc")) - .getString("timestamp")) - .getString("name")); - } - /** * 更新缓存中的对应关系map * diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java index fdeace4..32e059b 100644 --- a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java @@ -1,4 +1,4 @@ -package cn.ac.iie.storm.utils.file; +package com.zdjizhi.storm.utils.common; /** @@ -7,6 +7,7 @@ package cn.ac.iie.storm.utils.file; public class StreamAggregateConfig { public static final String FORMAT_SPLITTER = ","; + public static final String MODEL = "remote"; /** * System */ diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java index 03f67c0..1f951c6 100644 --- a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java +++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfigurations.java @@ -1,4 +1,4 @@ -package cn.ac.iie.storm.utils.file; +package com.zdjizhi.storm.utils.common; import java.util.Properties; @@ -9,15 +9,12 @@ import java.util.Properties; public final class StreamAggregateConfigurations { - // private static Properties propCommon = new Properties(); private static Properties propService = new Properties(); public static String getStringProperty(Integer type, String key) { if (type == 0) { return propService.getProperty(key); -// } else if (type == 1) { -// return propCommon.getProperty(key); } else { return null; } @@ -27,8 +24,6 @@ public final class StreamAggregateConfigurations { public static Integer getIntProperty(Integer type, String key) { if (type == 0) { return Integer.parseInt(propService.getProperty(key)); -// } else if (type == 1) { -// return Integer.parseInt(propCommon.getProperty(key)); } else { return null; } @@ -37,8 +32,6 @@ public final class StreamAggregateConfigurations { public static Long getLongProperty(Integer type, String key) { if (type == 0) { return Long.parseLong(propService.getProperty(key)); -// } else if (type == 1) { -// return Long.parseLong(propCommon.getProperty(key)); } else { return null; } @@ -47,8 +40,6 @@ public final class StreamAggregateConfigurations { public static Boolean getBooleanProperty(Integer type, String key) { if (type == 0) { return "true".equals(propService.getProperty(key).toLowerCase().trim()); -// } else if (type == 1) { -// return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); } else { return null; } @@ -58,7 +49,6 @@ public final class StreamAggregateConfigurations { try { propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); } catch (Exception e) { -// propCommon = null; propService = null; } } diff --git a/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..9757dd6 --- /dev/null +++ b/src/main/java/com/zdjizhi/storm/utils/http/HttpClientUtil.java @@ -0,0 +1,64 @@ +package com.zdjizhi.storm.utils.http; + +import com.zdjizhi.storm.utils.logout.LogPrintUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + * + * @author qidaijie + */ +public class HttpClientUtil { + private static final Log logger = LogFactory.get(); + + /** + * 请求网关获取schema + * + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder = null; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + String line; + while ((line = bufferedReader.readLine()) != null) { + entityStringBuilder.append(line); + } + } + } catch (Exception e) { + logger.error(LogPrintUtil.print(e)); + } finally { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.error(LogPrintUtil.print(e)); + } + } + if (bufferedReader != null) { +// bufferedReader.close(); + org.apache.commons.io.IOUtils.closeQuietly(bufferedReader); + } + } + return entityStringBuilder.toString(); + } + +} diff --git a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java b/src/main/java/com/zdjizhi/storm/utils/kafka/LogSendKafka.java index ee42553..51cdd0d 100644 --- a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java +++ b/src/main/java/com/zdjizhi/storm/utils/kafka/LogSendKafka.java @@ -1,9 +1,10 @@ -package cn.ac.iie.storm.utils.common; +package com.zdjizhi.storm.utils.kafka; -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import com.zdjizhi.storm.utils.common.StreamAggregateConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.kafka.clients.producer.*; -import org.apache.log4j.Logger; import java.util.Properties; @@ -16,7 +17,7 @@ import java.util.Properties; public class LogSendKafka { - private static Logger logger = Logger.getLogger(LogSendKafka.class); + private static final Log logger = LogFactory.get(); /** * kafka生产者,用于向kafka中发送消息 |
