summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-09-17 19:12:35 +0800
committerwanglihui <[email protected]>2020-09-17 19:12:35 +0800
commit4ed79bfe79c40032b9049e7202f3965e16acb356 (patch)
tree3b3288346a8493a14b9fa786be12ba960af7d55c
parent0d5e4e9be21d6f5a90ef152352c140f5329380ab (diff)
tsg kz版本
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java66
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java7
-rw-r--r--ip-learning-spark/src/main/resources/application.properties28
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala7
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala91
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala59
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala9
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala86
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala70
9 files changed, 281 insertions, 142 deletions
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
index 6f2e146..1e67c11 100644
--- a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
@@ -6,7 +6,6 @@ import cn.ac.iie.utils.ArangoDBConnect;
import cn.ac.iie.utils.ExecutorThreadPool;
import com.arangodb.ArangoCursor;
import com.arangodb.entity.BaseDocument;
-import com.arangodb.entity.BaseEdgeDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -20,22 +19,12 @@ import java.util.concurrent.CountDownLatch;
*/
public class BaseArangoData {
private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
-
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexFqdnMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexIpMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexSubscriberMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
-
private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
- public <T extends BaseDocument> void readHistoryData(String table,
- ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap,
- Class<T> type) {
+ public <T extends BaseDocument> ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> readHistoryData(String table, Class<T> type) {
+ ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap = new ConcurrentHashMap<>();
try {
LOG.warn("开始更新" + table);
long start = System.currentTimeMillis();
@@ -43,10 +32,8 @@ public class BaseArangoData {
historyMap.put(i, new ConcurrentHashMap<>());
}
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER());
-// long[] timeRange = getTimeRange(table);
Long countTotal = getCountTotal(table);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER(); i++) {
-// String sql = getQuerySql(timeRange, i, table);
String sql = getQuerySql(countTotal, i, table);
ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch);
threadPool.executor(readHistoryArangoData);
@@ -57,6 +44,7 @@ public class BaseArangoData {
} catch (Exception e) {
e.printStackTrace();
}
+ return historyMap;
}
private Long getCountTotal(String table){
@@ -72,58 +60,18 @@ public class BaseArangoData {
LOG.error(sql +"执行异常");
}
long last = System.currentTimeMillis();
- LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start));
+ LOG.warn(sql+" 结果:"+cnt+" 执行时间:"+(last-start));
return cnt;
}
private String getQuerySql(Long cnt,int threadNumber, String table){
long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER() + 1;
long offsetNum = threadNumber * sepNum;
- return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
- }
-
-
- private long[] getTimeRange(String table) {
- long minTime = 0L;
- long maxTime = 0L;
- long startTime = System.currentTimeMillis();
- String sql = "LET doc = (FOR doc IN " + table + " RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}";
- switch (ApplicationConfig.ARANGO_TIME_LIMIT_TYPE()) {
- case 0:
- ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class);
- try {
- if (timeDoc != null) {
- while (timeDoc.hasNext()) {
- BaseDocument doc = timeDoc.next();
- maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER();
- minTime = Long.parseLong(doc.getAttribute("min_time").toString());
- }
- } else {
- LOG.warn("获取ArangoDb时间范围为空");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- break;
- case 1:
- maxTime = ApplicationConfig.READ_ARANGO_MAX_TIME();
- minTime = ApplicationConfig.READ_ARANGO_MIN_TIME();
- break;
- default:
+ if (sepNum >= ApplicationConfig.ARANGODB_READ_LIMIT() * 10000){
+ sepNum = ApplicationConfig.ARANGODB_READ_LIMIT() * 10000;
}
- long lastTime = System.currentTimeMillis();
- LOG.warn(sql + "\n查询最大最小时间用时:" + (lastTime - startTime));
- return new long[]{minTime, maxTime};
-
+ return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
}
- private String getQuerySql(long[] timeRange, int threadNumber, String table) {
- long minTime = timeRange[0];
- long maxTime = timeRange[1];
- long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER();
- long maxThreadTime = minTime + (threadNumber + 1) * diffTime;
- long minThreadTime = minTime + threadNumber * diffTime;
- return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT() + " RETURN doc";
- }
}
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
index 1ca66d7..9b1f6c2 100644
--- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
@@ -22,9 +22,9 @@ import java.util.concurrent.CountDownLatch;
public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60;
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class);
- static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR();
+ private static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR();
- public static final HashSet<String> PROTOCOL_SET;
+ private static final HashSet<String> PROTOCOL_SET;
static {
PROTOCOL_SET = new HashSet<>();
@@ -69,9 +69,6 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
updateProtocolDocument(doc);
deleteDistinctClientIpByTime(doc);
break;
- case "R_VISIT_IP2FQDN":
- updateProtocolDocument(doc);
- break;
default:
}
int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER();
diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties
index 0010b23..77006b8 100644
--- a/ip-learning-spark/src/main/resources/application.properties
+++ b/ip-learning-spark/src/main/resources/application.properties
@@ -1,5 +1,5 @@
#spark任务配置
-spark.sql.shuffle.partitions=5
+spark.sql.shuffle.partitions=10
spark.executor.memory=4g
spark.app.name=test
spark.network.timeout=300s
@@ -7,13 +7,15 @@ repartitionNumber=36
spark.serializer=org.apache.spark.serializer.KryoSerializer
master=local[*]
#spark读取clickhouse配置
-spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3
+spark.read.clickhouse.url=jdbc:clickhouse://192.168.44.12:8123/tsg_galaxy_v3
+#spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3
spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
spark.read.clickhouse.user=default
-spark.read.clickhouse.password=111111
-spark.read.clickhouse.numPartitions=144
+spark.read.clickhouse.password=ceiec2019
+#spark.read.clickhouse.password=111111
+spark.read.clickhouse.numPartitions=5
spark.read.clickhouse.fetchsize=10000
-spark.read.clickhouse.partitionColumn=common_recv_time
+spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME
clickhouse.socket.timeout=300000
#arangoDB配置
arangoDB.host=192.168.40.182
@@ -22,24 +24,20 @@ arangoDB.user=upsert
arangoDB.password=ceiec2018
#arangoDB.DB.name=insert_iplearn_index
arangoDB.DB.name=ip-learning-test-0
+#arangoDB.DB.name=ip-learning-test
arangoDB.ttl=3600
-thread.pool.number=5
+thread.pool.number=10
#读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围
clickhouse.time.limit.type=0
-read.clickhouse.max.time=1571245220
-read.clickhouse.min.time=1571245210
+read.clickhouse.max.time=1600246160
+read.clickhouse.min.time=1597197469
-#读取arangoDB时间范围方式,0:正常读;1:指定时间范围
-arango.time.limit.type=0
-read.arango.max.time=1571245320
-read.arango.min.time=1571245200
-
-arangoDB.read.limit=
+arangoDB.read.sepNum=10
update.arango.batch=10000
distinct.client.ip.num=10000
recent.count.hour=24
-update.interval=10800
+update.interval=3600
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
index 395ea6b..da926ff 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
@@ -36,12 +36,7 @@ object ApplicationConfig {
val READ_CLICKHOUSE_MAX_TIME: Long = config.getLong("read.clickhouse.max.time")
val READ_CLICKHOUSE_MIN_TIME: Long = config.getLong("read.clickhouse.min.time")
- val ARANGO_TIME_LIMIT_TYPE: Int = config.getInt("arango.time.limit.type")
-
- val READ_ARANGO_MAX_TIME: Long = config.getLong("read.arango.max.time")
- val READ_ARANGO_MIN_TIME: Long = config.getLong("read.arango.min.time")
-
- val ARANGODB_READ_LIMIT: String = config.getString("arangoDB.read.limit")
+ val ARANGODB_READ_LIMIT: Long = config.getLong("arangoDB.read.sepNum")
val UPDATE_ARANGO_BATCH: Int = config.getInt("update.arango.batch")
val RECENT_COUNT_HOUR: Int = config.getInt("recent.count.hour")
val DISTINCT_CLIENT_IP_NUM: Int = config.getInt("distinct.client.ip.num")
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
index 48bbd9a..e37b959 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
@@ -11,7 +11,7 @@ object BaseClickhouseData {
val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
private val timeLimit: (Long, Long) = getTimeLimit
- private def initClickhouseData(sql:String): Unit ={
+ private def initClickhouseData(sql:String): DataFrame ={
val dataFrame: DataFrame = spark.read.format("jdbc")
.option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL)
@@ -28,6 +28,8 @@ object BaseClickhouseData {
.load()
dataFrame.printSchema()
dataFrame.createOrReplaceGlobalTempView("dbtable")
+
+ dataFrame
}
def loadConnectionDataFromCk(): Unit ={
@@ -146,6 +148,7 @@ object BaseClickhouseData {
vertexIpDf
}
+ /*
def getRelationFqdnLocateIpDf: DataFrame ={
loadConnectionDataFromCk()
val sslSql =
@@ -190,6 +193,92 @@ object BaseClickhouseData {
relationFqdnLocateIpDf.printSchema()
relationFqdnLocateIpDf
}
+ */
+
+ def getRelationFqdnLocateIpDf: DataFrame ={
+ val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1
+ val sql =
+ s"""
+ |(SELECT * FROM
+ |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
+ |UNION ALL
+ |(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))
+ |WHERE FQDN != '') as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getRelationSubidLocateIpDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME
+ |FROM radius_record_log
+ |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip
+ |) as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexSubidDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log
+ |WHERE $where GROUP BY common_subscriber_id
+ |)as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexFramedIpDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT DISTINCT radius_framed_ip,common_recv_time as LAST_FOUND_TIME FROM radius_record_log WHERE $where
+ |)as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
private def getTimeLimit: (Long,Long) ={
var maxTime = 0L
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
index 460caed..7ae7f30 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
@@ -20,6 +20,17 @@ object MergeDataFrame {
.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
}
+ def mergeVertexFrameIp: RDD[Row] ={
+ val values = BaseClickhouseData.getVertexFramedIpDf
+ .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .rdd.map(row => {
+ val ip = row.getAs[String]("radius_framed_ip")
+ (ip, row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
+ LOG.warn(s"读取R_LOCATE_SUBSCRIBER2IP clickhouse成功,共:${values.count()} 条")
+ values
+ }
+
def mergeVertexIp(): RDD[Row]={
val vertexIpDf = BaseClickhouseData.getVertexIpDf
val frame = vertexIpDf.groupBy("IP").agg(
@@ -35,7 +46,9 @@ object MergeDataFrame {
}
def mergeRelationFqdnLocateIp(): RDD[Row] ={
- val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN")))
+ val frame = BaseClickhouseData.getRelationFqdnLocateIpDf
+ .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .filter(row => isDomain(row.getAs[String]("FQDN")))
.groupBy("FQDN", "common_server_ip")
.agg(
min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
@@ -44,28 +57,50 @@ object MergeDataFrame {
collect_list("schema_type").alias("schema_type_list"),
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT")
)
- frame.rdd.map(row => {
+ val values = frame.rdd.map(row => {
val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip")
- val key = fqdn.concat("-"+serverIp)
- (key,row)
+ val key = fqdn.concat("-" + serverIp)
+ (key, row)
}).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
+ LOG.warn(s"读取R_LOCATE_FQDN2IP clickhouse成功,共:${values.count()} 条")
+ values
}
+ def mergeRelationSubidLocateIp(): RDD[Row] ={
+ val values = BaseClickhouseData.getRelationSubidLocateIpDf
+ .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .rdd.map(row => {
+ val commonSubscriberId = row.getAs[String]("common_subscriber_id")
+ val ip = row.getAs[String]("radius_framed_ip")
+ val key = commonSubscriberId.concat("-" + ip)
+ (key, row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
+ LOG.warn(s"读取R_LOCATE_SUBSCRIBER2IP clickhouse成功,共:${values.count()} 条")
+ values
+ }
+
+ def mergeVertexSubid(): RDD[Row] ={
+ val values = BaseClickhouseData.getVertexSubidDf
+ .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .rdd.map(row => {
+ val commonSubscriberId = row.getAs[String]("common_subscriber_id")
+ (commonSubscriberId, row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
+ LOG.warn(s"读取SUBSCRIBER clickhouse成功,共:${values.count()} 条")
+ values
+ }
+
private def isDomain(fqdn: String): Boolean = {
try {
if (fqdn == null || fqdn.length == 0) {
return false
}
- if (fqdn.contains(":")) {
- val s = fqdn.split(":")(0)
- if (s.contains(":")){
- return false
- }
- }
- val fqdnArr = fqdn.split("\\.")
- if (fqdnArr.length < 4 || fqdnArr.length > 4){
+
+ val fqdnArr = fqdn.split(":")(0).split("\\.")
+
+ if (fqdnArr.length != 4){
return true
}
for (f <- fqdnArr) {
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
index bdf8120..ddaf145 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
@@ -93,8 +93,13 @@ object UpdateDocHandler {
doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",",""))
}
- def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={
- distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
+ def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={
+ distCipRecent.flatMap(str => {
+ str.replaceAll("\\[","")
+ .replaceAll("\\]","")
+ .replaceAll("\\'","")
+ .split(",")
+ }).distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
}
def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
index b7d4875..b3719fb 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
@@ -5,7 +5,6 @@ import java.util.concurrent.ConcurrentHashMap
import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.dao.BaseArangoData
-import cn.ac.iie.dao.BaseArangoData._
import cn.ac.iie.service.transform.MergeDataFrame._
import cn.ac.iie.service.update.UpdateDocHandler._
import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil}
@@ -26,9 +25,12 @@ object UpdateDocument {
def update(): Unit = {
try {
- updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn)
- updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp)
- updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
+// updateDocument("FQDN", getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn)
+// updateDocument("IP", getVertexIpRow, classOf[BaseDocument], mergeVertexIp)
+ updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
+ updateDocument("SUBSCRIBER",getVertexSubidRow,classOf[BaseDocument],mergeVertexSubid)
+ insertFrameIp()
+ updateDocument("R_LOCATE_SUBSCRIBER2IP",getRelationSubidLocateIpRow,classOf[BaseEdgeDocument],mergeRelationSubidLocateIp)
} catch {
case e: Exception => e.printStackTrace()
} finally {
@@ -38,13 +40,33 @@ object UpdateDocument {
}
}
+ private def insertFrameIp(): Unit ={
+ mergeVertexFrameIp.foreachPartition(iter => {
+ val resultDocumentList = new util.ArrayList[BaseDocument]
+ var i = 0
+ iter.foreach(row => {
+ val document = getVertexFrameipRow(row)
+ resultDocumentList.add(document)
+ i += 1
+ if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
+ arangoManger.overwrite(resultDocumentList, "IP")
+ LOG.warn(s"更新:IP" + i)
+ i = 0
+ }
+ })
+ if (i != 0) {
+ arangoManger.overwrite(resultDocumentList, "IP")
+ LOG.warn(s"更新IP:" + i)
+ }
+ })
+ }
+
private def updateDocument[T <: BaseDocument](collName: String,
- historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]],
getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T,
clazz: Class[T],
getNewDataRdd: () => RDD[Row]
): Unit = {
- baseArangoData.readHistoryData(collName, historyMap, clazz)
+ val historyMap = baseArangoData.readHistoryData(collName, clazz)
val hisBc = spark.sparkContext.broadcast(historyMap)
try {
val start = System.currentTimeMillis()
@@ -95,6 +117,56 @@ object UpdateDocument {
document
}
+ private def getRelationSubidLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument ={
+ val subId = row.getAs[String]("common_subscriber_id")
+ val ip = row.getAs[String]("radius_framed_ip")
+ val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
+ val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
+
+ val key = subId.concat("-"+ip)
+ var document = dictionaryMap.getOrDefault(key,null)
+ if (document != null){
+ updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME")
+ } else {
+ document = new BaseEdgeDocument()
+ document.setKey(key)
+ document.setFrom("SUBSCRIBER/" + subId)
+ document.setTo("IP/" + ip)
+ document.addAttribute("SUBSCRIBER",subId)
+ document.addAttribute("IP",ip)
+ document.addAttribute("FIRST_FOUND_TIME",firstFoundTime)
+ document.addAttribute("LAST_FOUND_TIME",lastFoundTime)
+ }
+
+ document
+ }
+
+ private def getVertexFrameipRow(row: Row): BaseDocument ={
+ val ip = row.getAs[String]("radius_framed_ip")
+ val document = new BaseDocument()
+ document.setKey(ip)
+ document.addAttribute("IP",ip)
+ document
+ }
+
+ private def getVertexSubidRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument ={
+ val subId = row.getAs[String]("common_subscriber_id")
+ val subLastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
+ val subFirstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
+ var document = dictionaryMap.getOrDefault(subId,null)
+ if (document != null){
+ updateMaxAttribute(document,subLastFoundTime,"LAST_FOUND_TIME")
+ } else {
+ document = new BaseDocument()
+ document.setKey(subId)
+ document.addAttribute("SUBSCRIBER",subId)
+ document.addAttribute("FIRST_FOUND_TIME",subFirstFoundTime)
+ document.addAttribute("LAST_FOUND_TIME",subLastFoundTime)
+ }
+
+ document
+ }
+
private def getVertexIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
val ip = row.getAs[String]("IP")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
@@ -133,7 +205,7 @@ object UpdateDocument {
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
- val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
+ val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT")
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala
index 67590ff..76bf35f 100644
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala
+++ b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala
@@ -1,35 +1,35 @@
-package cn.ac.iie.service.update
-
-import java.util
-import java.util.ArrayList
-import java.util.concurrent.ConcurrentHashMap
-
-import cn.ac.iie.dao.BaseArangoData
-import cn.ac.iie.dao.BaseArangoData._
-import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
-
-import scala.collection.mutable.WrappedArray.ofRef
-
-object UpdateDocumentTest {
- def main(args: Array[String]): Unit = {
- val baseArangoData = new BaseArangoData()
- baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument])
-
- val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys()
- while (value.hasMoreElements) {
- val integer: Integer = value.nextElement()
- val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer)
- val unit = map.keys()
- while (unit.hasMoreElements) {
- val key = unit.nextElement()
- val edgeDocument = map.get(key)
- // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]]
- // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]]
- val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]]
- val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]]
- println(longs.toString + "---" + strings.toString)
- }
- }
- }
-
-}
+//package cn.ac.iie.service.update
+//
+//import java.util
+//import java.util.ArrayList
+//import java.util.concurrent.ConcurrentHashMap
+//
+//import cn.ac.iie.dao.BaseArangoData
+//import cn.ac.iie.dao.BaseArangoData._
+//import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
+//
+//import scala.collection.mutable.WrappedArray.ofRef
+//
+//object UpdateDocumentTest {
+// def main(args: Array[String]): Unit = {
+// val baseArangoData = new BaseArangoData()
+// baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument])
+//
+// val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys()
+// while (value.hasMoreElements) {
+// val integer: Integer = value.nextElement()
+// val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer)
+// val unit = map.keys()
+// while (unit.hasMoreElements) {
+// val key = unit.nextElement()
+// val edgeDocument = map.get(key)
+// // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]]
+// // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]]
+// val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]]
+// val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]]
+// println(longs.toString + "---" + strings.toString)
+// }
+// }
+// }
+//
+//}