summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-07-01 14:44:45 +0800
committerwanglihui <[email protected]>2020-07-01 14:44:45 +0800
commit2d543b3df93f7e806611af506b7559fb2dd23033 (patch)
tree2f3616ee1382b5d4a645d2f519a3ba00a285ace8
parent7e8f4d763e2e3ca563adbb64e5436bc9ae5794b7 (diff)
修改边属性逻辑错误
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java103
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java56
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java3
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java2
-rw-r--r--IP-learning-graph/src/main/resources/application.properties4
5 files changed, 95 insertions, 73 deletions
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
index 6663b35..89518c7 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
@@ -66,22 +66,20 @@ public class BaseClickhouseData {
DruidPooledConnection connection = manger.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
-// HashSet<String> fqdnSet = new HashSet<>();
while (resultSet.next()) {
-// String commonSchemaType = resultSet.getString("common_schema_type");
-// String fqdnName = commonSchemaGetFqdn(commonSchemaType,resultSet);
String fqdnName = resultSet.getString("FQDN");
-// fqdnSet.add(fqdnName);
- long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
- long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
- BaseDocument newDoc = new BaseDocument();
- newDoc.setKey(fqdnName);
- newDoc.addAttribute("FQDN_NAME", fqdnName);
- newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
- newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
- int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
- ArrayList<BaseDocument> documentList = vFqdnMap.getOrDefault(i, new ArrayList<>());
- documentList.add(newDoc);
+ if (isDomain(fqdnName)){
+ long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
+ long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
+ BaseDocument newDoc = new BaseDocument();
+ newDoc.setKey(fqdnName);
+ newDoc.addAttribute("FQDN_NAME", fqdnName);
+ newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
+ newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
+ int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
+ ArrayList<BaseDocument> documentList = vFqdnMap.getOrDefault(i, new ArrayList<>());
+ documentList.add(newDoc);
+ }
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse v_FQDN时间:" + (last - start));
@@ -154,29 +152,31 @@ public class BaseClickhouseData {
while (resultSet.next()) {
String commonSchemaType = resultSet.getString("common_schema_type");
String vFqdn = resultSet.getString("FQDN");
- String vIp = resultSet.getString("common_server_ip");
- long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
- long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
- long countTotal = resultSet.getLong("COUNT_TOTAL");
- String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
+ if (isDomain(vFqdn)){
+ String vIp = resultSet.getString("common_server_ip");
+ long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
+ long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
+ long countTotal = resultSet.getLong("COUNT_TOTAL");
+ String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
- String key = vFqdn + "-" + vIp;
- BaseEdgeDocument newDoc = new BaseEdgeDocument();
- newDoc.setKey(key);
- newDoc.setFrom("FQDN/" + vFqdn);
- newDoc.setTo("IP/" + vIp);
- newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
- newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
- newDoc.addAttribute("COUNT_TOTAL", countTotal);
- newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents);
- newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents);
+ String key = vFqdn + "-" + vIp;
+ BaseEdgeDocument newDoc = new BaseEdgeDocument();
+ newDoc.setKey(key);
+ newDoc.setFrom("FQDN/" + vFqdn);
+ newDoc.setTo("IP/" + vIp);
+ newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
+ newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
+ newDoc.addAttribute("COUNT_TOTAL", countTotal);
+ newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents);
+ newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents);
- int hashMod = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
- HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eFqdnAddressIpMap.getOrDefault(hashMod, new HashMap());
+ int hashMod = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
+ HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eFqdnAddressIpMap.getOrDefault(hashMod, new HashMap());
- HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
- schemaHashMap.put(commonSchemaType, newDoc);
- documentHashMap.put(key, schemaHashMap);
+ HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
+ schemaHashMap.put(commonSchemaType, newDoc);
+ documentHashMap.put(key, schemaHashMap);
+ }
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start));
@@ -203,24 +203,26 @@ public class BaseClickhouseData {
String commonSchemaType = resultSet.getString("common_schema_type");
String vIp = resultSet.getString("common_client_ip");
String vFqdn = resultSet.getString("FQDN");
- String key = vIp + "-" + vFqdn;
- long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
- long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
- long countTotal = resultSet.getLong("COUNT_TOTAL");
+ if (isDomain(vFqdn)){
+ String key = vIp + "-" + vFqdn;
+ long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
+ long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
+ long countTotal = resultSet.getLong("COUNT_TOTAL");
- BaseEdgeDocument newDoc = new BaseEdgeDocument();
- newDoc.setKey(key);
- newDoc.setFrom("IP/" + vIp);
- newDoc.setTo("FQDN/" + vFqdn);
- newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
- newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
- newDoc.addAttribute("COUNT_TOTAL", countTotal);
- int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
- HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap());
+ BaseEdgeDocument newDoc = new BaseEdgeDocument();
+ newDoc.setKey(key);
+ newDoc.setFrom("IP/" + vIp);
+ newDoc.setTo("FQDN/" + vFqdn);
+ newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
+ newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
+ newDoc.addAttribute("COUNT_TOTAL", countTotal);
+ int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
+ HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap());
- HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
- schemaHashMap.put(commonSchemaType, newDoc);
- documentHashMap.put(key, schemaHashMap);
+ HashMap<String, BaseEdgeDocument> schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>());
+ schemaHashMap.put(commonSchemaType, newDoc);
+ documentHashMap.put(key, schemaHashMap);
+ }
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start));
@@ -235,6 +237,7 @@ public class BaseClickhouseData {
}
}
+ @Deprecated
private static String commonSchemaGetFqdn(String commonSchemaType, ResultSet resultSet) {
String vFqdn = "";
try {
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java
index 75f7383..56ae5a2 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java
@@ -10,37 +10,45 @@ public class BaseUpdateEtl {
BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument();
Set<String> schemaSets = newEdgeDocumentSchemaMap.keySet();
+ Map<String, Object> properties = newBaseEdgeDocument.getProperties();
+
for (String schema : schemaSets) {
BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema);
- setSchemaCnt(schema,schemaEdgeDoc,newBaseEdgeDocument);
- if (newBaseEdgeDocument.getKey() != null){
- Map<String, Object> properties = newBaseEdgeDocument.getProperties();
+ if (!properties.isEmpty()){
setFoundTime(properties,schemaEdgeDoc);
setDistinctClientIpBySchema(properties,schemaEdgeDoc);
}else {
- Map<String, Object> properties = schemaEdgeDoc.getProperties();
- properties.remove("COUNT_TOTAL");
newBaseEdgeDocument = schemaEdgeDoc;
+ properties = schemaEdgeDoc.getProperties();
}
+ setSchemaCnt(schema,schemaEdgeDoc,properties);
}
+ properties.remove("COUNT_TOTAL");
+ addSchemaProperty(properties);
+
+ newBaseEdgeDocument.setProperties(properties);
return newBaseEdgeDocument;
}
public static BaseEdgeDocument mergeIp2FqdnBySchema(HashMap<String, BaseEdgeDocument> newEdgeDocumentMap){
BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument();
Set<String> schemaSets = newEdgeDocumentMap.keySet();
+ Map<String, Object> properties = newBaseEdgeDocument.getProperties();
+
for (String schema : schemaSets) {
BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentMap.get(schema);
- setSchemaCnt(schema,schemaEdgeDoc,newBaseEdgeDocument);
- if (newBaseEdgeDocument.getKey() != null){
- Map<String, Object> properties = newBaseEdgeDocument.getProperties();
+ if (!properties.isEmpty()){
setFoundTime(properties,schemaEdgeDoc);
}else {
- Map<String, Object> properties = schemaEdgeDoc.getProperties();
- properties.remove("COUNT_TOTAL");
newBaseEdgeDocument = schemaEdgeDoc;
+ properties = schemaEdgeDoc.getProperties();
}
+ setSchemaCnt(schema,schemaEdgeDoc,properties);
}
+ properties.remove("COUNT_TOTAL");
+ addSchemaProperty(properties);
+
+ newBaseEdgeDocument.setProperties(properties);
return newBaseEdgeDocument;
}
@@ -53,11 +61,21 @@ public class BaseUpdateEtl {
setDistinctClientIpByHistory(newEdgeDocument,edgeDocument);
}
+ private static void addSchemaProperty(Map<String, Object> properties){
+ if (!properties.containsKey("TLS_CNT_TOTAL")){
+ properties.put("TLS_CNT_TOTAL",0L);
+ properties.put("TLS_CNT_RECENT",new long[7]);
+ }else if (!properties.containsKey("HTTP_CNT_TOTAL")){
+ properties.put("HTTP_CNT_TOTAL",0L);
+ properties.put("HTTP_CNT_RECENT",new long[7]);
+ }
+ }
+
private static void setDistinctClientIpByHistory(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){
ArrayList<String> distCipTotal = (ArrayList<String>) edgeDocument.getAttribute("DIST_CIP_TOTAL");
String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]);
- String[] distCipRecentsSrc = (String[]) newEdgeDocument.getAttribute("DIST_CIP_RECENT");
+ Object[] distCipRecentsSrc = (Object[])newEdgeDocument.getAttribute("DIST_CIP_RECENT");
if (distCipTotalsSrc.length == 30) {
Object[] distCipTotals = mergeClientIp(distCipTotalsSrc, distCipRecentsSrc);
edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals);
@@ -88,8 +106,8 @@ public class BaseUpdateEtl {
edgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal);
}
- private static Object[] mergeClientIp(String[] distCipTotalsSrc,String[] distCipRecentsSrc){
- HashSet<String> dIpSet = new HashSet<>();
+ private static Object[] mergeClientIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){
+ HashSet<Object> dIpSet = new HashSet<>();
dIpSet.addAll(Arrays.asList(distCipRecentsSrc));
dIpSet.addAll(Arrays.asList(distCipTotalsSrc));
Object[] distCipTotals = dIpSet.toArray();
@@ -103,7 +121,7 @@ public class BaseUpdateEtl {
String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP_RECENT");
String[] distCipRecents = (String[]) properties.get("DIST_CIP_RECENT");
Object[] mergeClientIp = mergeClientIp(schemaDistCipRecents, distCipRecents);
- properties.put("DIST_CIP_RECENT",mergeClientIp);
+ properties.put("DIST_CIP_RECENT", mergeClientIp);
properties.put("DIST_CIP_TOTAL",mergeClientIp);
}
@@ -116,21 +134,21 @@ public class BaseUpdateEtl {
properties.put("LAST_FOUND_TIME",schemaLastFoundTime>lastFoundTime?schemaLastFoundTime:lastFoundTime);
}
- private static void setSchemaCnt(String schema,BaseEdgeDocument schemaEdgeDoc,BaseEdgeDocument newBaseEdgeDocument){
+ private static void setSchemaCnt(String schema,BaseEdgeDocument schemaEdgeDoc,Map<String, Object> properties){
switch (schema) {
case "HTTP":
long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString());
- newBaseEdgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCntTotal);
+ properties.put("HTTP_CNT_TOTAL", httpCntTotal);
long[] httpCntRecentsDst = new long[7];
httpCntRecentsDst[0] = httpCntTotal;
- newBaseEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst);
+ properties.put("HTTP_CNT_RECENT", httpCntRecentsDst);
break;
case "SSL":
long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString());
- newBaseEdgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCntTotal);
+ properties.put("TLS_CNT_TOTAL", tlsCntTotal);
long[] tlsCntRecentsDst = new long[7];
tlsCntRecentsDst[0] = tlsCntTotal;
- newBaseEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst);
+ properties.put("TLS_CNT_RECENT", tlsCntRecentsDst);
break;
}
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java
index 58502a8..0efdb72 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java
@@ -50,7 +50,8 @@ public class UpdateEFqdnAddressIp implements Runnable {
LOG.info("更新R_LOCATE_FQDN2IP:" + i);
}
} catch (Exception e) {
- LOG.error(e.getMessage());
+ e.printStackTrace();
+ LOG.error(e.toString());
}
}
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java
index c741667..fa4cc4f 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java
@@ -54,7 +54,7 @@ public class UpdateEIpVisitFqdn implements Runnable {
LOG.info("更新R_VISIT_IP2FQDN:" + i);
}
} catch (Exception e) {
- LOG.error(e.getMessage());
+ LOG.error(e.toString());
}
}
}
diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties
index 4dd415a..753ac9e 100644
--- a/IP-learning-graph/src/main/resources/application.properties
+++ b/IP-learning-graph/src/main/resources/application.properties
@@ -1,5 +1,5 @@
#arangoDB参数配置
-arangoDB.host=192.168.40.127
+arangoDB.host=192.168.40.182
arangoDB.port=8529
arangoDB.user=root
arangoDB.password=111111
@@ -13,5 +13,5 @@ update.arango.batch=10000
thread.pool.number=10
thread.await.termination.time=10
-read.clickhouse.max.time=1593162456
+read.clickhouse.max.time=1593582211
read.clickhouse.min.time=1592879247 \ No newline at end of file