summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-09-29 10:26:25 +0800
committerwanglihui <[email protected]>2020-09-29 10:26:25 +0800
commite0f5b20ab6e42ed8da668c8faad6c57466a02a91 (patch)
tree2fd2de3c2e1884bc9964f9344d467b5b4daba9dd
parente7ff669d4c441f14aec93820c7caaed4ae474ebe (diff)
格式化代码
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java8
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java24
-rw-r--r--ip-learning-spark/src/main/resources/application.properties2
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala68
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala3
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala4
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala4
7 files changed, 47 insertions, 66 deletions
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
index 9b1f6c2..2b6fcef 100644
--- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
* @author wlh
* 多线程全量读取arangoDb历史数据,封装到map
*/
+@SuppressWarnings("unchecked")
public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60;
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class);
@@ -92,7 +93,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
for (String protocol : PROTOCOL_SET) {
String protocolRecent = protocol + "_CNT_RECENT";
ArrayList<Long> cntRecent = (ArrayList<Long>) doc.getAttribute(protocolRecent);
- Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]);
+ Long[] cntRecentsSrc = cntRecent.toArray(new Long[0]);
Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR];
System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1);
cntRecentsDst[0] = 0L;
@@ -104,6 +105,11 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
private void deleteDistinctClientIpByTime(T doc) {
ArrayList<String> distCip = (ArrayList<String>) doc.getAttribute("DIST_CIP");
ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
+ if (distCip == null || distCip.isEmpty()){
+ doc.updateAttribute("DIST_CIP", new String[0]);
+ doc.updateAttribute("DIST_CIP_TS", new long[0]);
+ return;
+ }
distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
Collections.sort(distCipTs);
int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600);
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
index d5fb1b8..f6d3c5f 100644
--- a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
@@ -70,26 +70,6 @@ public class ArangoDBConnect {
}
}
- @Deprecated
- public <T> void insertAndUpdate(ArrayList<T> docInsert,ArrayList<T> docUpdate,String collectionName){
- ArangoDatabase database = getDatabase();
- try {
- ArangoCollection collection = database.collection(collectionName);
- if (!docInsert.isEmpty()){
- collection.importDocuments(docInsert);
- }
- if (!docUpdate.isEmpty()){
- collection.replaceDocuments(docUpdate);
- }
- }catch (Exception e){
- System.out.println("更新失败");
- e.printStackTrace();
- }finally {
- docInsert.clear();
- docInsert.clear();
- }
- }
-
public <T> void overwrite(ArrayList<T> docOverwrite,String collectionName){
ArangoDatabase database = getDatabase();
try {
@@ -101,11 +81,11 @@ public class ArangoDBConnect {
MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
for (ErrorEntity errorEntity:errors){
- LOG.warn("写入arangoDB异常:"+errorEntity.getErrorMessage());
+ LOG.debug("写入arangoDB异常:"+errorEntity.getErrorMessage());
}
}
}catch (Exception e){
- System.out.println("更新失败:"+e.toString());
+ LOG.error("更新arangoDB失败:"+e.toString());
}finally {
docOverwrite.clear();
}
diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties
index 77006b8..9a61792 100644
--- a/ip-learning-spark/src/main/resources/application.properties
+++ b/ip-learning-spark/src/main/resources/application.properties
@@ -8,11 +8,9 @@ spark.serializer=org.apache.spark.serializer.KryoSerializer
master=local[*]
#spark读取clickhouse配置
spark.read.clickhouse.url=jdbc:clickhouse://192.168.44.12:8123/tsg_galaxy_v3
-#spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3
spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
spark.read.clickhouse.user=default
spark.read.clickhouse.password=ceiec2019
-#spark.read.clickhouse.password=111111
spark.read.clickhouse.numPartitions=5
spark.read.clickhouse.fetchsize=10000
spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
index e37b959..4d66deb 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
@@ -107,45 +107,35 @@ object BaseClickhouseData {
def getVertexIpDf: DataFrame ={
loadConnectionDataFromCk()
+ val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
val sql =
- """
- |SELECT
- | *
- |FROM
- | (
- | (
- | SELECT
- | common_client_ip AS IP,
- | MIN(common_recv_time) AS FIRST_FOUND_TIME,
- | MAX(common_recv_time) AS LAST_FOUND_TIME,
- | count(*) as SESSION_COUNT,
- | sum(common_c2s_byte_num) as BYTES_SUM,
- | 'client' as ip_type
- | FROM
- | global_temp.dbtable
- | GROUP BY
- | IP
- | )
- | UNION ALL
- | (
- | SELECT
- | common_server_ip AS IP,
- | MIN(common_recv_time) AS FIRST_FOUND_TIME,
- | MAX(common_recv_time) AS LAST_FOUND_TIME,
- | count(*) as SESSION_COUNT,
- | sum(common_s2c_byte_num) as BYTES_SUM,
- | 'server' as ip_type
- | FROM
- | global_temp.dbtable
- | GROUP BY
- | IP
- | )
- | )
+ s"""
+ |(SELECT * FROM
+ |((SELECT common_client_ip AS IP,MIN(common_end_time) AS FIRST_FOUND_TIME,
+ |MAX(common_end_time) AS LAST_FOUND_TIME,
+ |count(*) as SESSION_COUNT,
+ |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
+ |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info,
+ |'client' as ip_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |where $where
+ |group by common_client_ip)
+ |UNION ALL
+ |(SELECT common_server_ip AS IP,
+ |MIN(common_end_time) AS FIRST_FOUND_TIME,
+ |MAX(common_end_time) AS LAST_FOUND_TIME,
+ |count(*) as SESSION_COUNT,
+ |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
+ |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info,
+ |'server' as ip_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |where $where
+ |group by common_server_ip))) as dbtable
""".stripMargin
LOG.warn(sql)
- val vertexIpDf = spark.sql(sql)
- vertexIpDf.printSchema()
- vertexIpDf
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
}
/*
@@ -196,16 +186,16 @@ object BaseClickhouseData {
*/
def getRelationFqdnLocateIpDf: DataFrame ={
- val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1
+ val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
val sql =
s"""
|(SELECT * FROM
- |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
|FROM tsg_galaxy_v3.connection_record_log
|WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
|UNION ALL
- |(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
|FROM tsg_galaxy_v3.connection_record_log
|WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
index 7ae7f30..0a55ed8 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
@@ -38,7 +38,8 @@ object MergeDataFrame {
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"),
- collect_list("ip_type").alias("ip_type_list")
+ collect_list("ip_type").alias("ip_type_list"),
+ last("common_link_info").alias("common_link_info")
)
val values = frame.rdd.map(row => (row.get(0), row))
.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
index ddaf145..f56f6f2 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
@@ -26,6 +26,10 @@ object UpdateDocHandler {
hisDoc.addAttribute(attributeName,newAttribute+hisAttritube)
}
+ def replaceAttribute(hisDoc: BaseDocument,newAttribute:String,attributeName:String): Unit ={
+ hisDoc.addAttribute(attributeName,newAttribute)
+ }
+
def separateAttributeByIpType(ipTypeList:ofRef[String],
sessionCountList:ofRef[AnyRef],
bytesSumList:ofRef[AnyRef]): (Long,Long,Long,Long) ={
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
index b3719fb..1c8dd91 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
@@ -174,6 +174,7 @@ object UpdateDocument {
val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
+ val linkInfo = row.getAs[String]("common_link_info")
val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
var document = dictionaryMap.getOrDefault(ip, null)
@@ -183,6 +184,7 @@ object UpdateDocument {
updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
+ replaceAttribute(document,linkInfo,"COMMON_LINK_INFO")
} else {
document = new BaseDocument
document.setKey(ip)
@@ -193,7 +195,7 @@ object UpdateDocument {
document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
- document.addAttribute("COMMON_LINK_INFO", "")
+ document.addAttribute("COMMON_LINK_INFO", linkInfo)
}
document
}