diff options
| author | shizhendong <[email protected]> | 2020-10-16 17:21:13 +0800 |
|---|---|---|
| committer | shizhendong <[email protected]> | 2020-10-16 17:21:13 +0800 |
| commit | cdab979b13182d6ccb90d827443af101b63eec11 (patch) | |
| tree | dfd03c000b3d17daf91d7d4f5ca8f5179776cad2 | |
| parent | 02c78375122f29e310fa0b477d5760aae1d24bc4 (diff) | |
fix: 修改 traffic 指标获取方式,并可以在配置线程刮取超时时间
| -rw-r--r-- | src/main/java/com/nis/controller/TrafficController.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/nis/service/impl/TrafficServiceImpl.java | 251 | ||||
| -rw-r--r-- | src/main/java/com/nis/util/SnmpUtil.java | 44 | ||||
| -rw-r--r-- | src/main/resources/application.yml | 5 |
4 files changed, 172 insertions, 132 deletions
diff --git a/src/main/java/com/nis/controller/TrafficController.java b/src/main/java/com/nis/controller/TrafficController.java index 6a4284d..0b032f8 100644 --- a/src/main/java/com/nis/controller/TrafficController.java +++ b/src/main/java/com/nis/controller/TrafficController.java @@ -21,7 +21,11 @@ public class TrafficController { @GetMapping(value = "/traffic", produces = "text/plain;charset=utf-8") private String traffic(Integer dcId) { + long start, end; + start = System.currentTimeMillis(); String trafficByIdcId = trafficService.getTrafficByIdcId(dcId); + end = System.currentTimeMillis(); + log.info("traffic 完毕,执行时长为:" + (end - start) + "(ms)"); return trafficByIdcId; } } diff --git a/src/main/java/com/nis/service/impl/TrafficServiceImpl.java b/src/main/java/com/nis/service/impl/TrafficServiceImpl.java index 2e32974..7c26e5e 100644 --- a/src/main/java/com/nis/service/impl/TrafficServiceImpl.java +++ b/src/main/java/com/nis/service/impl/TrafficServiceImpl.java @@ -9,16 +9,15 @@ import com.nis.util.SnmpUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.*; import java.util.stream.Collectors; /** @@ -32,149 +31,183 @@ import java.util.stream.Collectors; @Slf4j public class TrafficServiceImpl implements TrafficService { - private static Logger logger = LoggerFactory.getLogger(TrafficServiceImpl.class); + private static ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + + @Value("${confagent.snmp.scrapeTimeOut}") + private Integer scrapeTimeOut; @Autowired private TrafficDao trafficDao; + @Autowired + private SnmpUtil snmpUtil; + @Override public String getTrafficByIdcId(Integer idcId) { - // 最终返回结果 - List<Map<String, String>> list = new ArrayList<>(); - - List<Traffic> ipList = trafficDao.selectHost(idcId); - - // 按照ip分组 减少请求服务次数 - Map<String, List<Traffic>> listMap = ipList.stream().collect(Collectors.groupingBy(Traffic::getHost)); - - // 获取注释信息 - Map<String, String[]> desc = getDesc(); - - // ifTable接口内包含的oid 和 name 对应关系map - Map<String,String> ifTableOidAndNameMap = getIfTableOidAndNameMap(); - - String resultOid, name, index, directionStr, inOutPrefix, trafficTags, tagStr, trafficDirection, ifPhysAddresStr, ifDescrStr; - List<Map> resultData = new ArrayList<>(); - Traffic t; - Map auth = new HashMap(); - Map<String, String> tagMap; - + List<Traffic> traffics = trafficDao.selectHost(idcId); + // 按照ip分组 减少请求服务次数 同一个IP 认证信息一致 + Map<String, List<Traffic>> listMap = traffics.stream().collect(Collectors.groupingBy(Traffic::getHost)); + // 获取请求数据 + Map allIpReqestSNMPResult = this.getAllIpReqestSNMPResult(listMap); + + List<Map<String, String>> resultMap = new ArrayList<>(); + Map<String, String> ifTableOidAndNameMap = getIfTableOidAndNameMap(); for (Map.Entry<String, List<Traffic>> entry : listMap.entrySet()) { - try { - t = entry.getValue().get(0); - if (t.getAuth() != null) { - auth = JSONObject.parseObject(t.getAuth().toString(), Map.class); - } - resultData = SnmpUtil.snmpWalk(entry.getKey(), t.getPort(), t.getVersion(), t.getCommunity(), Constant.IFTABLE_OID, auth); - } catch (IOException e) { - logger.error("通过SNMP采集IfTable失败,ip ->" + entry.getKey(), e); - } - + List<Map> resultData = (List<Map>) allIpReqestSNMPResult.get(entry.getKey()); if (CollectionUtils.isEmpty(resultData)) { - logger.error("通过SNMP采集IfTable失败,ip ->" + entry.getKey() + ",请检查agent 服务是否开启、是否可达、身份验证信息是否正确。"); + log.error("通过SNMP采集IfTable失败,ip ->" + entry.getKey() + ",请检查agent 服务是否开启、是否可达、身份验证信息是否正确。"); continue; } - Map<String, String> tempMap; - // 遍历 traffic 配置信息 - for (Traffic traffic : entry.getValue()) { - trafficTags = traffic.getTags(); - trafficDirection = traffic.getDirection(); - - for (Map map : resultData) { - tempMap = new HashMap(); - boolean directionFlag = true; - tagStr = directionStr = ifPhysAddresStr = ifDescrStr = ""; - - resultOid = map.get("oid").toString(); - index = resultOid; - - // 从最后开始截取 目前暂定截取最后一个 . - resultOid = resultOid.substring(0, resultOid.lastIndexOf(".")); - - name = ifTableOidAndNameMap.get(resultOid); - if (StringUtils.isNotEmpty(name)) { - index = index.replaceAll(resultOid + ".", ""); - } else { - // logger.error("未能通过oid找到对应名称{}", map); - continue; - } + resultMap.addAll(this.handelMetricFormat(entry.getValue(), resultData, ifTableOidAndNameMap)); + } - if (index.equals(traffic.getIfIndex().toString())) { - // ifSpecific已过时 - if (name.contains("ifSpecific")) { - continue; - } + if (CollectionUtils.isNotEmpty(resultMap)) { + return this.handelResultForPromeserverFormat(resultMap); + } + return null; + } - if (trafficDirection.contains(Constant.TRAFFIC_RX)) { - directionStr += Constant.TRAFFIC_RX + "=" + "\"" + 1 + "\""; - directionFlag = false; - } - if (trafficDirection.contains(Constant.TRAFFIC_TX)) { - if (!directionFlag) { - directionStr += ","; - } - directionStr += Constant.TRAFFIC_TX + "=" + "\"" + 1 + "\""; - } + private List<Map<String, String>> handelMetricFormat(List<Traffic> traffics, List<Map> resultData, Map<String, String> ifTableOidAndNameMap) { + List<Map<String, String>> list = new ArrayList<>(); + Map<String, String> tempMap, tagMap; + String resultOid, name, index, directionStr, inOutPrefix, trafficTags, tagStr, trafficDirection, ifPhysAddresStr, ifDescrStr; + for (Traffic traffic : traffics) { + trafficTags = traffic.getTags(); + trafficDirection = traffic.getDirection(); + + for (Map map : resultData) { + tempMap = new HashMap(); + boolean directionFlag = true; + tagStr = directionStr = ifPhysAddresStr = ifDescrStr = ""; + + resultOid = map.get("oid").toString(); + index = resultOid; + + // 从最后开始截取 目前暂定截取最后一个 . + resultOid = resultOid.substring(0, resultOid.lastIndexOf(".")); + + name = ifTableOidAndNameMap.get(resultOid); + if (StringUtils.isNotEmpty(name)) { + index = index.replaceAll(resultOid + ".", ""); + } else { + continue; + } - if (StringUtils.isNotEmpty(trafficTags)) { - tagMap = JSONObject.parseObject(trafficTags, Map.class); - for (Map.Entry<String, String> tagEntry : tagMap.entrySet()) { - if (tagEntry.getKey().toLowerCase().equals("ifIndex".toLowerCase())) { - continue; - } - tagStr += tagEntry.getKey() + "=" + "\"" + tagEntry.getValue() + "\"" + ","; - } + if (index.equals(traffic.getIfIndex().toString())) { + // ifSpecific已过时 + if (name.contains("ifSpecific")) continue; + if (trafficDirection.contains(Constant.TRAFFIC_RX)) { + directionStr += Constant.TRAFFIC_RX + "=" + "\"" + 1 + "\""; + directionFlag = false; + } + if (trafficDirection.contains(Constant.TRAFFIC_TX)) { + if (!directionFlag) { + directionStr += ","; } - if (name.contains("ifPhysAddress")) { - if (StringUtils.isEmpty(map.get("value").toString())) { - map.put("value", "00:00:00:00:00:00"); - } - ifPhysAddresStr = "ifPhysAddress=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ","; - map.put("value", 1); + directionStr += Constant.TRAFFIC_TX + "=" + "\"" + 1 + "\""; + } + if (StringUtils.isNotEmpty(trafficTags)) { + tagMap = JSONObject.parseObject(trafficTags, Map.class); + for (Map.Entry<String, String> tagEntry : tagMap.entrySet()) { + if (tagEntry.getKey().toLowerCase().equals("ifIndex".toLowerCase())) continue; + tagStr += tagEntry.getKey() + "=" + "\"" + tagEntry.getValue() + "\"" + ","; } - - if (name.contains("ifDescr")) { - ifDescrStr = "ifDescr=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ","; - map.put("value", 1); + } + if (name.contains("ifPhysAddress")) { + if (StringUtils.isEmpty(map.get("value").toString())) { + map.put("value", "00:00:00:00:00:00"); } - - inOutPrefix = "{ifIndex=" + "\"" + index + "\"" + "," - + ifPhysAddresStr - + ifDescrStr - + "datacenter=" + "\"" + traffic.getDatacenter() + "\"" + "," - + "asset=" + "\"" + traffic.getHost() + "\"" + "," - + tagStr - + directionStr + "}" - + " " + map.get("value"); - - tempMap.put(name, inOutPrefix); - list.add(tempMap); + ifPhysAddresStr = "ifPhysAddress=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ","; + map.put("value", 1); + } + if (name.contains("ifDescr")) { + ifDescrStr = "ifDescr=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ","; + map.put("value", 1); } + inOutPrefix = "{ifIndex=" + "\"" + index + "\"" + "," + + ifPhysAddresStr + + ifDescrStr + + "datacenter=" + "\"" + traffic.getDatacenter() + "\"" + "," + + "asset=" + "\"" + traffic.getHost() + "\"" + "," + + tagStr + + directionStr + "}" + + " " + map.get("value"); + + tempMap.put(name, inOutPrefix); + list.add(tempMap); } } } + return list; + } + private String handelResultForPromeserverFormat(List<Map<String, String>> resultMap) { // 按照 指标名称分组之后 - Map<Object, List<Map<String, String>>> groupListMap = list.stream().collect(Collectors.groupingBy(map -> map.keySet().toArray()[0])); + Map<Object, List<Map<String, String>>> groupListMap = resultMap.stream().collect(Collectors.groupingBy(map -> map.keySet().toArray()[0])); String r = ""; - - String descHelp, descType; + // 获取注释信息 + Map<String, String[]> desc = getDesc(); + String descHelp, descType, name; for (Map.Entry<Object, List<Map<String, String>>> entry : groupListMap.entrySet()) { name = (String) entry.getKey(); descHelp = desc.get(name)[0] + "\n"; descType = desc.get(name)[1] + "\n"; r += descHelp + descType; - for (Map<String, String> pojo : entry.getValue()) { r += pojo.keySet().toArray()[0] + pojo.values().iterator().next() + "\n"; } } - return r; } + private Map getAllIpReqestSNMPResult(Map<String, List<Traffic>> listMap) { + List<Callable<Map>> tasks = new ArrayList<>(); + Callable<Map> task = null; + Map auth = null; + for (Map.Entry<String, List<Traffic>> entry : listMap.entrySet()) { + Traffic t = entry.getValue().get(0); + if (t.getAuth() != null) { + auth = JSONObject.parseObject(t.getAuth().toString(), Map.class); + } + Map finalAuth = auth; + task = new Callable<Map>() { + @Override + public Map call() throws Exception { + Map map = new HashMap(); + List<Map> list = snmpUtil.snmpWalk(entry.getKey(), t.getPort(), t.getVersion(), t.getCommunity(), Constant.IFTABLE_OID, finalAuth); + map.put(entry.getKey(), list); + return map; + } + }; + tasks.add(task); + } + + List<Map> ipAndResultMaps = new ArrayList<>(); + try { + List<Future<Map>> futures = exec.invokeAll(tasks, scrapeTimeOut, TimeUnit.MILLISECONDS); + for (Future<Map> future : futures) { + if (!future.isCancelled()) { + try { + ipAndResultMaps.add(future.get()); + } catch (CancellationException e) { + log.error("该任务已经被取消,错误原因是:" + e.getMessage(), e); + } catch (Exception e){ + log.error("任务执行出现异常,错误原因是:" + e.getMessage(), e); + } + } + } + } catch (Exception e) { + log.error("awaitTermination 方法被中断,中止线程池中全部的线程的执行" + e.getMessage(), e); + //exec.shutdownNow(); + } + + Map merged = new HashMap(); + ipAndResultMaps.forEach(merged::putAll); + return merged; + } + private static Map<String, String> getIfTableOidAndNameMap() { Map<String, String> map = new HashMap<>(); map.put("1.3.6.1.2.1.2.2.1.1", "ifIndex"); diff --git a/src/main/java/com/nis/util/SnmpUtil.java b/src/main/java/com/nis/util/SnmpUtil.java index af47808..4fcbe6a 100644 --- a/src/main/java/com/nis/util/SnmpUtil.java +++ b/src/main/java/com/nis/util/SnmpUtil.java @@ -1,8 +1,7 @@ package com.nis.util; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.snmp4j.*; import org.snmp4j.event.ResponseEvent; import org.snmp4j.mp.MPv3; @@ -10,6 +9,8 @@ import org.snmp4j.mp.SnmpConstants; import org.snmp4j.security.*; import org.snmp4j.smi.*; import org.snmp4j.transport.DefaultUdpTransportMapping; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; @@ -17,25 +18,24 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +@Component +@Slf4j public class SnmpUtil { - private static Logger logger = LoggerFactory.getLogger(SnmpUtil.class); + @Value("${confagent.snmp.walkTimeOut}") + private Integer DEFAULT_TIMEOUT; - // private static Snmp snmp = null; + @Value("${confagent.snmp.walkRetrtNum}") + private Integer DEFAULT_RETRY; - // 默认版本 v2 - private static final int DEFAULT_VERSION = SnmpConstants.version2c; - private static final String DEFAULT_PROTOCOL = "udp"; - private static final long DEFAULT_TIMEOUT = 1000L; - // private static final int DEFAULT_RETRY = 3; + // 默认版本 v2 + private final int DEFAULT_VERSION = SnmpConstants.version2c; + private final String DEFAULT_PROTOCOL = "udp"; // 默认团体名 - private static final String DEFAULT_COMMUNITY = "public"; - - // 默认端口 - private static final Integer DEFAULT_PORT = 161; + private final String DEFAULT_COMMUNITY = "public"; - public static Target getTatget(Integer version, String community, Map<String, String> auth, String ip, Integer port, Snmp snmp) { + public Target getTatget(Integer version, String community, Map<String, String> auth, String ip, Integer port, Snmp snmp) { Target target = null; if (version == SnmpConstants.version3) { // 添加用户 @@ -104,9 +104,9 @@ public class SnmpUtil { ((CommunityTarget) target).setCommunity(new OctetString(community)); } // 超时时间 timeout in milliseconds before a confirmed request is resent or timed out. - target.setTimeout(DEFAULT_TIMEOUT); + target.setTimeout(DEFAULT_TIMEOUT == null ? 3000 : DEFAULT_TIMEOUT); // 重试次数 - // target.setRetries(DEFAULT_RETRY); + target.setRetries(DEFAULT_RETRY == null ? 3 : DEFAULT_RETRY); Address address = GenericAddress.parse(DEFAULT_PROTOCOL + ":" + ip + "/" + port); target.setAddress(address); @@ -114,7 +114,7 @@ public class SnmpUtil { return target; } - private static PDU getPdu(Integer version) { + private PDU getPdu(Integer version) { if (version == SnmpConstants.version3) { ScopedPDU pdu = new ScopedPDU(); // 如果agent上设定的contextEngineId和snmpEngineId不一致 这里需要设置引擎id @@ -137,7 +137,7 @@ public class SnmpUtil { * @return * @throws IOException */ - public static List<Map> snmpWalk(String ip, Integer port, Integer version, String community, String oid, Map<String, String> auth) throws IOException { + public List<Map> snmpWalk(String ip, Integer port, Integer version, String community, String oid, Map<String, String> auth) throws IOException { List<Map> resultData = new ArrayList<>(); Snmp snmp = null; TransportMapping transport = null; @@ -202,21 +202,21 @@ public class SnmpUtil { } } } catch (IOException e) { - logger.error("snmp采集失败,错误信息是:" + e.getMessage() + " pdu是:" + pdu.toString() + " target是:" + target.toString(), e); + log.error("snmp采集失败,错误信息是:" + e.getMessage() + " pdu是:" + pdu.toString() + " target是:" + target.toString(), e); } } finally { if (snmp != null) { try { snmp.close(); } catch (Exception e) { - logger.error("关闭snmp失败", e); + log.error("关闭snmp失败", e); } } if (transport != null) { try { transport.close(); } catch (Exception e) { - logger.error("关闭transport失败", e); + log.error("关闭transport失败", e); } } } @@ -231,7 +231,7 @@ public class SnmpUtil { * @param vb * @return */ - private static boolean checkWalkFinished(OID targetOID, PDU pdu, VariableBinding vb) { + private boolean checkWalkFinished(OID targetOID, PDU pdu, VariableBinding vb) { boolean finished = false; if (pdu.getErrorStatus() != 0) { finished = true; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 6548084..fe0eaba 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -41,7 +41,10 @@ confagent: ## snmp trap config snmp: trapThredPoolSize: 2 - trapPort: 162 + trapPort: 162 + scrapeTimeOut: 5000 # snmp 线程采集超时时间 单位 ms + walkTimeOut: 2000 # snmpwalk 超时时间 单位 ms + walkRetrtNum: 1 # snmpwalk 超时重试次数 #mybatis |
