summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-08-02 18:49:43 +0800
committerwanglihui <[email protected]>2021-08-02 18:49:43 +0800
commit2f7cceb8268ceab4936325bdf387fa8558f34307 (patch)
tree2d7203a9a4eaed8e20968f1914e365d5840e60a2
parent51d254990205f422a7bb57be136fbfa162169ee5 (diff)
格式化代码
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java125
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala10
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala13
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala8
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala6
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala8
6 files changed, 27 insertions, 143 deletions
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
deleted file mode 100644
index 1ca66d7..0000000
--- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package cn.ac.iie.service.read;
-
-import cn.ac.iie.config.ApplicationConfig;
-import cn.ac.iie.utils.ArangoDBConnect;
-import com.arangodb.ArangoCursor;
-import com.arangodb.entity.BaseDocument;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-
-/**
- * @author wlh
- * 多线程全量读取arangoDb历史数据,封装到map
- */
-public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
- public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60;
- private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class);
- static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR();
-
- public static final HashSet<String> PROTOCOL_SET;
-
- static {
- PROTOCOL_SET = new HashSet<>();
- PROTOCOL_SET.add("HTTP");
- PROTOCOL_SET.add("TLS");
- PROTOCOL_SET.add("DNS");
- }
-
- private ArangoDBConnect arangoConnect;
- private String query;
- private ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map;
- private Class<T> type;
- private String table;
- private CountDownLatch countDownLatch;
-
- public ReadHistoryArangoData(ArangoDBConnect arangoConnect,
- String query,
- ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map,
- Class<T> type,
- String table,
- CountDownLatch countDownLatch) {
- this.arangoConnect = arangoConnect;
- this.query = query;
- this.map = map;
- this.type = type;
- this.table = table;
- this.countDownLatch = countDownLatch;
- }
-
- @Override
- public void run() {
- try {
- long s = System.currentTimeMillis();
- ArangoCursor<T> docs = arangoConnect.executorQuery(query, type);
- if (docs != null) {
- List<T> baseDocuments = docs.asListRemaining();
- int i = 0;
- for (T doc : baseDocuments) {
- String key = doc.getKey();
- switch (table) {
- case "R_LOCATE_FQDN2IP":
- updateProtocolDocument(doc);
- deleteDistinctClientIpByTime(doc);
- break;
- case "R_VISIT_IP2FQDN":
- updateProtocolDocument(doc);
- break;
- default:
- }
- int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER();
- ConcurrentHashMap<String, T> tmpMap = map.get(hashCode);
- tmpMap.put(key, doc);
- i++;
- }
- long l = System.currentTimeMillis();
- LOG.warn(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
- }
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- countDownLatch.countDown();
- LOG.warn("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount());
- }
- }
-
- private void updateProtocolDocument(T doc) {
- if (doc.getProperties().containsKey("PROTOCOL_TYPE")) {
- for (String protocol : PROTOCOL_SET) {
- String protocolRecent = protocol + "_CNT_RECENT";
- ArrayList<Long> cntRecent = (ArrayList<Long>) doc.getAttribute(protocolRecent);
- Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]);
- Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR];
- System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1);
- cntRecentsDst[0] = 0L;
- doc.addAttribute(protocolRecent, cntRecentsDst);
- }
- }
- }
-
- private void deleteDistinctClientIpByTime(T doc) {
- ArrayList<String> distCip = (ArrayList<String>) doc.getAttribute("DIST_CIP");
- ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
- distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
- Collections.sort(distCipTs);
- int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600);
- String[] distCipArr = new String[index];
- long[] disCipTsArr = new long[index];
- if (distCip.size() + 1 == distCipTs.size()){
- for (int i = 0; i < index; i++) {
- distCipArr[i] = distCip.get(i);
- disCipTsArr[i] = distCipTs.get(i);
- }
- }
- doc.updateAttribute("DIST_CIP", distCipArr);
- doc.updateAttribute("DIST_CIP_TS", disCipTsArr);
- }
-
-}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
index e70ffea..c33ee14 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
@@ -33,20 +33,21 @@ object BaseClickhouseData {
}
def getVertexFqdnDf: DataFrame = {
+ val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
val sql =
- """
+ s"""
|(SELECT
| FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME
|FROM
| ((SELECT
| ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM connection_record_log
- | WHERE common_schema_type = 'SSL' GROUP BY ssl_sni
+ | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni
| )UNION ALL
| (SELECT
| http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM connection_record_log
- | WHERE common_schema_type = 'HTTP' GROUP BY http_host))
+ | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host))
|GROUP BY FQDN HAVING FQDN != '') as dbtable
""".stripMargin
LOG.warn(sql)
@@ -164,7 +165,8 @@ object BaseClickhouseData {
val sql =
s"""
|(
- |SELECT DISTINCT radius_framed_ip,common_recv_time as LAST_FOUND_TIME FROM radius_record_log WHERE $where
+ |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME FROM radius_record_log WHERE $where
+ |GROUP BY radius_framed_ip
|)as dbtable
""".stripMargin
LOG.warn(sql)
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
index 309c1a7..b936697 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
@@ -18,9 +18,9 @@ object MergeDataFrame {
def mergeVertexFqdn(): RDD[(String, (Option[BaseDocument], Row))] = {
val fqdnRddRow: RDD[(String, Row)] = BaseClickhouseData.getVertexFqdnDf
- .rdd.filter(row => isDomain(row.getAs[String](0))).map(row => {
+ .repartition().rdd.filter(row => isDomain(row.getAs[String](0))).map(row => {
(row.getAs[String]("FQDN"), row)
- }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
val fqdnRddDoc: ArangoRdd[BaseDocument] = BaseArangoData.loadArangoRdd[BaseDocument]("FQDN")
@@ -29,7 +29,7 @@ object MergeDataFrame {
def mergeVertexIp(): RDD[(String, (Option[BaseDocument], Row))] = {
val vertexIpDf = BaseClickhouseData.getVertexIpDf
- val frame = vertexIpDf.groupBy("IP").agg(
+ val frame = vertexIpDf.repartition().groupBy("IP").agg(
min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
@@ -39,14 +39,15 @@ object MergeDataFrame {
)
val ipRddRow = frame.rdd.map(row => {
(row.getAs[String]("IP"), row)
- }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
val ipRddDoc = BaseArangoData.loadArangoRdd[BaseDocument]("IP")
ipRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(ipRddRow)
}
def mergeRelationFqdnLocateIp(): RDD[(String, (Option[BaseEdgeDocument], Row))] = {
- val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN")))
+ val frame = BaseClickhouseData.getRelationFqdnLocateIpDf
+ .repartition().filter(row => isDomain(row.getAs[String]("FQDN")))
.groupBy("FQDN", "common_server_ip")
.agg(
min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
@@ -60,7 +61,7 @@ object MergeDataFrame {
val serverIp = row.getAs[String]("common_server_ip")
val key = fqdn.concat("-" + serverIp)
(key, row)
- }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
val fqdnLocIpRddDoc = BaseArangoData.loadArangoRdd[BaseEdgeDocument]("R_LOCATE_FQDN2IP")
fqdnLocIpRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(fqdnLocIpRddRow)
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
index 06d731a..e91ef03 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
@@ -2,10 +2,10 @@ package cn.ac.iie.service.update
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConversions._
import cn.ac.iie.config.ApplicationConfig
-import cn.ac.iie.service.read.ReadHistoryArangoData
+import cn.ac.iie.dao.BaseClickhouseData
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import scala.collection.mutable
@@ -132,7 +132,7 @@ object UpdateDocHandler {
def putDistinctIp(doc: BaseEdgeDocument, newDistinctIp: Array[String]): Unit = {
val map = newDistinctIp.map(ip => {
- (ip, ReadHistoryArangoData.currentHour)
+ (ip, BaseClickhouseData.currentHour)
}).toMap
doc.addAttribute("DIST_CIP", map.keys.toArray)
doc.addAttribute("DIST_CIP_TS", map.values.toArray)
@@ -146,7 +146,7 @@ object UpdateDocHandler {
val distCipToTsMap: Map[String, Long] = hisDistCip.zip(hisDistCipTs).toMap
val muDistCipToTsMap: mutable.Map[String, Long] = mutable.Map(distCipToTsMap.toSeq: _*)
newDistinctIp.foreach(cip => {
- muDistCipToTsMap.put(cip, ReadHistoryArangoData.currentHour)
+ muDistCipToTsMap.put(cip, BaseClickhouseData.currentHour)
})
val resultMap = muDistCipToTsMap.toList.sortBy(-_._2).take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toMap
hisDoc.addAttribute("DIST_CIP", resultMap.keys.toArray)
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
index dba2b98..febfe4d 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
@@ -21,11 +21,11 @@ object UpdateDocument {
try {
updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn)
- updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid)
+// updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid)
- insertFrameIp()
+// insertFrameIp()
- updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp)
+// updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp)
updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp)
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
index ab77299..7132c19 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
@@ -1,11 +1,15 @@
package cn.ac.iie.spark.rdd
+import java.util
+
import scala.collection.JavaConverters.asScalaIteratorConverter
import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.service.update.UpdateDocument
import cn.ac.iie.spark
import cn.ac.iie.spark.partition.QueryArangoPartition
import com.arangodb.ArangoCursor
+import com.arangodb.model.AqlQueryOptions
+import com.arangodb.util.MapBuilder
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
@@ -38,13 +42,15 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext,
var arangoCursor:ArangoCursor[T] = null
val arangoDB = spark.createArangoBuilder(options).build()
+ val bindVars: util.Map[String, AnyRef] = new MapBuilder().get
+ val queryOptions: AqlQueryOptions = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL)
try {
val offset = split.offset
val separate = split.separate
val collection = options.collection
val sql = s"FOR doc IN $collection limit $offset,$separate RETURN doc"
LOG.info(sql)
- arangoCursor = arangoDB.db(options.database).query(sql,clazz.runtimeClass.asInstanceOf[Class[T]])
+ arangoCursor = arangoDB.db(options.database).query(sql,bindVars,queryOptions,clazz.runtimeClass.asInstanceOf[Class[T]])
}catch {
case e: Exception => LOG.error(s"创建Cursor异常:${e.getMessage}")
}finally {