summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshizhendong <[email protected]>2020-10-16 17:21:13 +0800
committershizhendong <[email protected]>2020-10-16 17:21:13 +0800
commitcdab979b13182d6ccb90d827443af101b63eec11 (patch)
treedfd03c000b3d17daf91d7d4f5ca8f5179776cad2
parent02c78375122f29e310fa0b477d5760aae1d24bc4 (diff)
fix: 修改 traffic 指标获取方式,并可以在配置线程刮取超时时间
-rw-r--r--src/main/java/com/nis/controller/TrafficController.java4
-rw-r--r--src/main/java/com/nis/service/impl/TrafficServiceImpl.java251
-rw-r--r--src/main/java/com/nis/util/SnmpUtil.java44
-rw-r--r--src/main/resources/application.yml5
4 files changed, 172 insertions, 132 deletions
diff --git a/src/main/java/com/nis/controller/TrafficController.java b/src/main/java/com/nis/controller/TrafficController.java
index 6a4284d..0b032f8 100644
--- a/src/main/java/com/nis/controller/TrafficController.java
+++ b/src/main/java/com/nis/controller/TrafficController.java
@@ -21,7 +21,11 @@ public class TrafficController {
@GetMapping(value = "/traffic", produces = "text/plain;charset=utf-8")
private String traffic(Integer dcId) {
+ long start, end;
+ start = System.currentTimeMillis();
String trafficByIdcId = trafficService.getTrafficByIdcId(dcId);
+ end = System.currentTimeMillis();
+ log.info("traffic 完毕,执行时长为:" + (end - start) + "(ms)");
return trafficByIdcId;
}
}
diff --git a/src/main/java/com/nis/service/impl/TrafficServiceImpl.java b/src/main/java/com/nis/service/impl/TrafficServiceImpl.java
index 2e32974..7c26e5e 100644
--- a/src/main/java/com/nis/service/impl/TrafficServiceImpl.java
+++ b/src/main/java/com/nis/service/impl/TrafficServiceImpl.java
@@ -9,16 +9,15 @@ import com.nis.util.SnmpUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
@@ -32,149 +31,183 @@ import java.util.stream.Collectors;
@Slf4j
public class TrafficServiceImpl implements TrafficService {
- private static Logger logger = LoggerFactory.getLogger(TrafficServiceImpl.class);
+ private static ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+ @Value("${confagent.snmp.scrapeTimeOut}")
+ private Integer scrapeTimeOut;
@Autowired
private TrafficDao trafficDao;
+ @Autowired
+ private SnmpUtil snmpUtil;
+
@Override
public String getTrafficByIdcId(Integer idcId) {
- // 最终返回结果
- List<Map<String, String>> list = new ArrayList<>();
-
- List<Traffic> ipList = trafficDao.selectHost(idcId);
-
- // 按照ip分组 减少请求服务次数
- Map<String, List<Traffic>> listMap = ipList.stream().collect(Collectors.groupingBy(Traffic::getHost));
-
- // 获取注释信息
- Map<String, String[]> desc = getDesc();
-
- // ifTable接口内包含的oid 和 name 对应关系map
- Map<String,String> ifTableOidAndNameMap = getIfTableOidAndNameMap();
-
- String resultOid, name, index, directionStr, inOutPrefix, trafficTags, tagStr, trafficDirection, ifPhysAddresStr, ifDescrStr;
- List<Map> resultData = new ArrayList<>();
- Traffic t;
- Map auth = new HashMap();
- Map<String, String> tagMap;
-
+ List<Traffic> traffics = trafficDao.selectHost(idcId);
+ // 按照ip分组 减少请求服务次数 同一个IP 认证信息一致
+ Map<String, List<Traffic>> listMap = traffics.stream().collect(Collectors.groupingBy(Traffic::getHost));
+ // 获取请求数据
+ Map allIpReqestSNMPResult = this.getAllIpReqestSNMPResult(listMap);
+
+ List<Map<String, String>> resultMap = new ArrayList<>();
+ Map<String, String> ifTableOidAndNameMap = getIfTableOidAndNameMap();
for (Map.Entry<String, List<Traffic>> entry : listMap.entrySet()) {
- try {
- t = entry.getValue().get(0);
- if (t.getAuth() != null) {
- auth = JSONObject.parseObject(t.getAuth().toString(), Map.class);
- }
- resultData = SnmpUtil.snmpWalk(entry.getKey(), t.getPort(), t.getVersion(), t.getCommunity(), Constant.IFTABLE_OID, auth);
- } catch (IOException e) {
- logger.error("通过SNMP采集IfTable失败,ip ->" + entry.getKey(), e);
- }
-
+ List<Map> resultData = (List<Map>) allIpReqestSNMPResult.get(entry.getKey());
if (CollectionUtils.isEmpty(resultData)) {
- logger.error("通过SNMP采集IfTable失败,ip ->" + entry.getKey() + ",请检查agent 服务是否开启、是否可达、身份验证信息是否正确。");
+ log.error("通过SNMP采集IfTable失败,ip ->" + entry.getKey() + ",请检查agent 服务是否开启、是否可达、身份验证信息是否正确。");
continue;
}
- Map<String, String> tempMap;
- // 遍历 traffic 配置信息
- for (Traffic traffic : entry.getValue()) {
- trafficTags = traffic.getTags();
- trafficDirection = traffic.getDirection();
-
- for (Map map : resultData) {
- tempMap = new HashMap();
- boolean directionFlag = true;
- tagStr = directionStr = ifPhysAddresStr = ifDescrStr = "";
-
- resultOid = map.get("oid").toString();
- index = resultOid;
-
- // 从最后开始截取 目前暂定截取最后一个 .
- resultOid = resultOid.substring(0, resultOid.lastIndexOf("."));
-
- name = ifTableOidAndNameMap.get(resultOid);
- if (StringUtils.isNotEmpty(name)) {
- index = index.replaceAll(resultOid + ".", "");
- } else {
- // logger.error("未能通过oid找到对应名称{}", map);
- continue;
- }
+ resultMap.addAll(this.handelMetricFormat(entry.getValue(), resultData, ifTableOidAndNameMap));
+ }
- if (index.equals(traffic.getIfIndex().toString())) {
- // ifSpecific已过时
- if (name.contains("ifSpecific")) {
- continue;
- }
+ if (CollectionUtils.isNotEmpty(resultMap)) {
+ return this.handelResultForPromeserverFormat(resultMap);
+ }
+ return null;
+ }
- if (trafficDirection.contains(Constant.TRAFFIC_RX)) {
- directionStr += Constant.TRAFFIC_RX + "=" + "\"" + 1 + "\"";
- directionFlag = false;
- }
- if (trafficDirection.contains(Constant.TRAFFIC_TX)) {
- if (!directionFlag) {
- directionStr += ",";
- }
- directionStr += Constant.TRAFFIC_TX + "=" + "\"" + 1 + "\"";
- }
+ private List<Map<String, String>> handelMetricFormat(List<Traffic> traffics, List<Map> resultData, Map<String, String> ifTableOidAndNameMap) {
+ List<Map<String, String>> list = new ArrayList<>();
+ Map<String, String> tempMap, tagMap;
+ String resultOid, name, index, directionStr, inOutPrefix, trafficTags, tagStr, trafficDirection, ifPhysAddresStr, ifDescrStr;
+ for (Traffic traffic : traffics) {
+ trafficTags = traffic.getTags();
+ trafficDirection = traffic.getDirection();
+
+ for (Map map : resultData) {
+ tempMap = new HashMap();
+ boolean directionFlag = true;
+ tagStr = directionStr = ifPhysAddresStr = ifDescrStr = "";
+
+ resultOid = map.get("oid").toString();
+ index = resultOid;
+
+ // 从最后开始截取 目前暂定截取最后一个 .
+ resultOid = resultOid.substring(0, resultOid.lastIndexOf("."));
+
+ name = ifTableOidAndNameMap.get(resultOid);
+ if (StringUtils.isNotEmpty(name)) {
+ index = index.replaceAll(resultOid + ".", "");
+ } else {
+ continue;
+ }
- if (StringUtils.isNotEmpty(trafficTags)) {
- tagMap = JSONObject.parseObject(trafficTags, Map.class);
- for (Map.Entry<String, String> tagEntry : tagMap.entrySet()) {
- if (tagEntry.getKey().toLowerCase().equals("ifIndex".toLowerCase())) {
- continue;
- }
- tagStr += tagEntry.getKey() + "=" + "\"" + tagEntry.getValue() + "\"" + ",";
- }
+ if (index.equals(traffic.getIfIndex().toString())) {
+ // ifSpecific已过时
+ if (name.contains("ifSpecific")) continue;
+ if (trafficDirection.contains(Constant.TRAFFIC_RX)) {
+ directionStr += Constant.TRAFFIC_RX + "=" + "\"" + 1 + "\"";
+ directionFlag = false;
+ }
+ if (trafficDirection.contains(Constant.TRAFFIC_TX)) {
+ if (!directionFlag) {
+ directionStr += ",";
}
- if (name.contains("ifPhysAddress")) {
- if (StringUtils.isEmpty(map.get("value").toString())) {
- map.put("value", "00:00:00:00:00:00");
- }
- ifPhysAddresStr = "ifPhysAddress=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ",";
- map.put("value", 1);
+ directionStr += Constant.TRAFFIC_TX + "=" + "\"" + 1 + "\"";
+ }
+ if (StringUtils.isNotEmpty(trafficTags)) {
+ tagMap = JSONObject.parseObject(trafficTags, Map.class);
+ for (Map.Entry<String, String> tagEntry : tagMap.entrySet()) {
+ if (tagEntry.getKey().toLowerCase().equals("ifIndex".toLowerCase())) continue;
+ tagStr += tagEntry.getKey() + "=" + "\"" + tagEntry.getValue() + "\"" + ",";
}
-
- if (name.contains("ifDescr")) {
- ifDescrStr = "ifDescr=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ",";
- map.put("value", 1);
+ }
+ if (name.contains("ifPhysAddress")) {
+ if (StringUtils.isEmpty(map.get("value").toString())) {
+ map.put("value", "00:00:00:00:00:00");
}
-
- inOutPrefix = "{ifIndex=" + "\"" + index + "\"" + ","
- + ifPhysAddresStr
- + ifDescrStr
- + "datacenter=" + "\"" + traffic.getDatacenter() + "\"" + ","
- + "asset=" + "\"" + traffic.getHost() + "\"" + ","
- + tagStr
- + directionStr + "}"
- + " " + map.get("value");
-
- tempMap.put(name, inOutPrefix);
- list.add(tempMap);
+ ifPhysAddresStr = "ifPhysAddress=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ",";
+ map.put("value", 1);
+ }
+ if (name.contains("ifDescr")) {
+ ifDescrStr = "ifDescr=" + "\"" + map.get("value").toString().toUpperCase() + "\"" + ",";
+ map.put("value", 1);
}
+ inOutPrefix = "{ifIndex=" + "\"" + index + "\"" + ","
+ + ifPhysAddresStr
+ + ifDescrStr
+ + "datacenter=" + "\"" + traffic.getDatacenter() + "\"" + ","
+ + "asset=" + "\"" + traffic.getHost() + "\"" + ","
+ + tagStr
+ + directionStr + "}"
+ + " " + map.get("value");
+
+ tempMap.put(name, inOutPrefix);
+ list.add(tempMap);
}
}
}
+ return list;
+ }
+ private String handelResultForPromeserverFormat(List<Map<String, String>> resultMap) {
// 按照 指标名称分组之后
- Map<Object, List<Map<String, String>>> groupListMap = list.stream().collect(Collectors.groupingBy(map -> map.keySet().toArray()[0]));
+ Map<Object, List<Map<String, String>>> groupListMap = resultMap.stream().collect(Collectors.groupingBy(map -> map.keySet().toArray()[0]));
String r = "";
-
- String descHelp, descType;
+ // 获取注释信息
+ Map<String, String[]> desc = getDesc();
+ String descHelp, descType, name;
for (Map.Entry<Object, List<Map<String, String>>> entry : groupListMap.entrySet()) {
name = (String) entry.getKey();
descHelp = desc.get(name)[0] + "\n";
descType = desc.get(name)[1] + "\n";
r += descHelp + descType;
-
for (Map<String, String> pojo : entry.getValue()) {
r += pojo.keySet().toArray()[0] + pojo.values().iterator().next() + "\n";
}
}
-
return r;
}
+ private Map getAllIpReqestSNMPResult(Map<String, List<Traffic>> listMap) {
+ List<Callable<Map>> tasks = new ArrayList<>();
+ Callable<Map> task = null;
+ Map auth = null;
+ for (Map.Entry<String, List<Traffic>> entry : listMap.entrySet()) {
+ Traffic t = entry.getValue().get(0);
+ if (t.getAuth() != null) {
+ auth = JSONObject.parseObject(t.getAuth().toString(), Map.class);
+ }
+ Map finalAuth = auth;
+ task = new Callable<Map>() {
+ @Override
+ public Map call() throws Exception {
+ Map map = new HashMap();
+ List<Map> list = snmpUtil.snmpWalk(entry.getKey(), t.getPort(), t.getVersion(), t.getCommunity(), Constant.IFTABLE_OID, finalAuth);
+ map.put(entry.getKey(), list);
+ return map;
+ }
+ };
+ tasks.add(task);
+ }
+
+ List<Map> ipAndResultMaps = new ArrayList<>();
+ try {
+ List<Future<Map>> futures = exec.invokeAll(tasks, scrapeTimeOut, TimeUnit.MILLISECONDS);
+ for (Future<Map> future : futures) {
+ if (!future.isCancelled()) {
+ try {
+ ipAndResultMaps.add(future.get());
+ } catch (CancellationException e) {
+ log.error("该任务已经被取消,错误原因是:" + e.getMessage(), e);
+ } catch (Exception e){
+ log.error("任务执行出现异常,错误原因是:" + e.getMessage(), e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("awaitTermination 方法被中断,中止线程池中全部的线程的执行" + e.getMessage(), e);
+ //exec.shutdownNow();
+ }
+
+ Map merged = new HashMap();
+ ipAndResultMaps.forEach(merged::putAll);
+ return merged;
+ }
+
private static Map<String, String> getIfTableOidAndNameMap() {
Map<String, String> map = new HashMap<>();
map.put("1.3.6.1.2.1.2.2.1.1", "ifIndex");
diff --git a/src/main/java/com/nis/util/SnmpUtil.java b/src/main/java/com/nis/util/SnmpUtil.java
index af47808..4fcbe6a 100644
--- a/src/main/java/com/nis/util/SnmpUtil.java
+++ b/src/main/java/com/nis/util/SnmpUtil.java
@@ -1,8 +1,7 @@
package com.nis.util;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.snmp4j.*;
import org.snmp4j.event.ResponseEvent;
import org.snmp4j.mp.MPv3;
@@ -10,6 +9,8 @@ import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.security.*;
import org.snmp4j.smi.*;
import org.snmp4j.transport.DefaultUdpTransportMapping;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
@@ -17,25 +18,24 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@Component
+@Slf4j
public class SnmpUtil {
- private static Logger logger = LoggerFactory.getLogger(SnmpUtil.class);
+ @Value("${confagent.snmp.walkTimeOut}")
+ private Integer DEFAULT_TIMEOUT;
- // private static Snmp snmp = null;
+ @Value("${confagent.snmp.walkRetrtNum}")
+ private Integer DEFAULT_RETRY;
- // 默认版本 v2
- private static final int DEFAULT_VERSION = SnmpConstants.version2c;
- private static final String DEFAULT_PROTOCOL = "udp";
- private static final long DEFAULT_TIMEOUT = 1000L;
- // private static final int DEFAULT_RETRY = 3;
+ // 默认版本 v2
+ private final int DEFAULT_VERSION = SnmpConstants.version2c;
+ private final String DEFAULT_PROTOCOL = "udp";
// 默认团体名
- private static final String DEFAULT_COMMUNITY = "public";
-
- // 默认端口
- private static final Integer DEFAULT_PORT = 161;
+ private final String DEFAULT_COMMUNITY = "public";
- public static Target getTatget(Integer version, String community, Map<String, String> auth, String ip, Integer port, Snmp snmp) {
+ public Target getTatget(Integer version, String community, Map<String, String> auth, String ip, Integer port, Snmp snmp) {
Target target = null;
if (version == SnmpConstants.version3) {
// 添加用户
@@ -104,9 +104,9 @@ public class SnmpUtil {
((CommunityTarget) target).setCommunity(new OctetString(community));
}
// 超时时间 timeout in milliseconds before a confirmed request is resent or timed out.
- target.setTimeout(DEFAULT_TIMEOUT);
+ target.setTimeout(DEFAULT_TIMEOUT == null ? 3000 : DEFAULT_TIMEOUT);
// 重试次数
- // target.setRetries(DEFAULT_RETRY);
+ target.setRetries(DEFAULT_RETRY == null ? 3 : DEFAULT_RETRY);
Address address = GenericAddress.parse(DEFAULT_PROTOCOL + ":" + ip + "/" + port);
target.setAddress(address);
@@ -114,7 +114,7 @@ public class SnmpUtil {
return target;
}
- private static PDU getPdu(Integer version) {
+ private PDU getPdu(Integer version) {
if (version == SnmpConstants.version3) {
ScopedPDU pdu = new ScopedPDU();
// 如果agent上设定的contextEngineId和snmpEngineId不一致 这里需要设置引擎id
@@ -137,7 +137,7 @@ public class SnmpUtil {
* @return
* @throws IOException
*/
- public static List<Map> snmpWalk(String ip, Integer port, Integer version, String community, String oid, Map<String, String> auth) throws IOException {
+ public List<Map> snmpWalk(String ip, Integer port, Integer version, String community, String oid, Map<String, String> auth) throws IOException {
List<Map> resultData = new ArrayList<>();
Snmp snmp = null;
TransportMapping transport = null;
@@ -202,21 +202,21 @@ public class SnmpUtil {
}
}
} catch (IOException e) {
- logger.error("snmp采集失败,错误信息是:" + e.getMessage() + " pdu是:" + pdu.toString() + " target是:" + target.toString(), e);
+ log.error("snmp采集失败,错误信息是:" + e.getMessage() + " pdu是:" + pdu.toString() + " target是:" + target.toString(), e);
}
} finally {
if (snmp != null) {
try {
snmp.close();
} catch (Exception e) {
- logger.error("关闭snmp失败", e);
+ log.error("关闭snmp失败", e);
}
}
if (transport != null) {
try {
transport.close();
} catch (Exception e) {
- logger.error("关闭transport失败", e);
+ log.error("关闭transport失败", e);
}
}
}
@@ -231,7 +231,7 @@ public class SnmpUtil {
* @param vb
* @return
*/
- private static boolean checkWalkFinished(OID targetOID, PDU pdu, VariableBinding vb) {
+ private boolean checkWalkFinished(OID targetOID, PDU pdu, VariableBinding vb) {
boolean finished = false;
if (pdu.getErrorStatus() != 0) {
finished = true;
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 6548084..fe0eaba 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -41,7 +41,10 @@ confagent:
## snmp trap config
snmp:
trapThredPoolSize: 2
- trapPort: 162
+ trapPort: 162
+ scrapeTimeOut: 5000 # snmp 线程采集超时时间 单位 ms
+ walkTimeOut: 2000 # snmpwalk 超时时间 单位 ms
+ walkRetrtNum: 1 # snmpwalk 超时重试次数
#mybatis