summaryrefslogtreecommitdiff
path: root/ip-learning-spark
diff options
context:
space:
mode:
Diffstat (limited to 'ip-learning-spark')
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java53
-rw-r--r--ip-learning-spark/src/main/resources/application.properties32
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala7
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala183
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala109
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala99
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala340
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/spark/ArangoSpark.scala9
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala2
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/WriteOptions.scala5
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala21
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala2
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala35
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala46
14 files changed, 632 insertions, 311 deletions
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
index 99b55ed..0e03d2e 100644
--- a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
@@ -6,7 +6,6 @@ import cn.ac.iie.utils.ArangoDBConnect;
import cn.ac.iie.utils.ExecutorThreadPool;
import com.arangodb.ArangoCursor;
import com.arangodb.entity.BaseDocument;
-import com.arangodb.entity.BaseEdgeDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -20,15 +19,6 @@ import java.util.concurrent.CountDownLatch;
*/
public class BaseArangoData {
private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
-
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexFqdnMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexIpMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexSubscriberMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>();
- public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
-
private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
@@ -82,47 +72,4 @@ public class BaseArangoData {
return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
}
- private long[] getTimeRange(String table) {
- long minTime = 0L;
- long maxTime = 0L;
- long startTime = System.currentTimeMillis();
- String sql = "LET doc = (FOR doc IN " + table + " RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}";
- switch (ApplicationConfig.ARANGO_TIME_LIMIT_TYPE()) {
- case 0:
- ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class);
- try {
- if (timeDoc != null) {
- while (timeDoc.hasNext()) {
- BaseDocument doc = timeDoc.next();
- maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER();
- minTime = Long.parseLong(doc.getAttribute("min_time").toString());
- }
- } else {
- LOG.warn("获取ArangoDb时间范围为空");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- break;
- case 1:
- maxTime = ApplicationConfig.READ_ARANGO_MAX_TIME();
- minTime = ApplicationConfig.READ_ARANGO_MIN_TIME();
- break;
- default:
- }
- long lastTime = System.currentTimeMillis();
- LOG.warn(sql + "\n查询最大最小时间用时:" + (lastTime - startTime));
- return new long[]{minTime, maxTime};
-
- }
-
- private String getQuerySql(long[] timeRange, int threadNumber, String table) {
- long minTime = timeRange[0];
- long maxTime = timeRange[1];
- long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER();
- long maxThreadTime = minTime + (threadNumber + 1) * diffTime;
- long minThreadTime = minTime + threadNumber * diffTime;
- return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT() + " RETURN doc";
- }
-
}
diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties
index c2e81ea..9b3cee4 100644
--- a/ip-learning-spark/src/main/resources/application.properties
+++ b/ip-learning-spark/src/main/resources/application.properties
@@ -1,45 +1,39 @@
#spark任务配置
-spark.sql.shuffle.partitions=5
+spark.sql.shuffle.partitions=10
spark.executor.memory=4g
spark.app.name=test
spark.network.timeout=300s
-repartitionNumber=36
spark.serializer=org.apache.spark.serializer.KryoSerializer
master=local[*]
#spark读取clickhouse配置
-spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3
+spark.read.clickhouse.url=jdbc:clickhouse://192.168.44.67:8123/tsg_galaxy_v3
spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
spark.read.clickhouse.user=default
-spark.read.clickhouse.password=111111
-spark.read.clickhouse.numPartitions=144
+spark.read.clickhouse.password=ceiec2019
+spark.read.clickhouse.numPartitions=5
spark.read.clickhouse.fetchsize=10000
-spark.read.clickhouse.partitionColumn=common_recv_time
+spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME
clickhouse.socket.timeout=300000
#arangoDB配置
arangoDB.host=192.168.40.182
arangoDB.port=8529
arangoDB.user=upsert
arangoDB.password=ceiec2018
-#arangoDB.DB.name=insert_iplearn_index
-arangoDB.DB.name=iplearn_media_domain
+arangoDB.DB.name=ip-learning-test-0
+#arangoDB.DB.name=iplearn_media_domain
arangoDB.ttl=3600
-thread.pool.number=5
+thread.pool.number=10
#读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围
-clickhouse.time.limit.type=0
-read.clickhouse.max.time=1571245220
-read.clickhouse.min.time=1571245210
+clickhouse.time.limit.type=1
+read.clickhouse.max.time=1603785961
+read.clickhouse.min.time=1603354682
-#读取arangoDB时间范围方式,0:正常读;1:指定时间范围
-arango.time.limit.type=0
-read.arango.max.time=1571245320
-read.arango.min.time=1571245200
-
-arangoDB.read.limit=
+arangoDB.read.limit=1
update.arango.batch=10000
distinct.client.ip.num=10000
recent.count.hour=24
-update.interval=10800
+update.interval=3600
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
index 395ea6b..687bdd5 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
@@ -36,12 +36,7 @@ object ApplicationConfig {
val READ_CLICKHOUSE_MAX_TIME: Long = config.getLong("read.clickhouse.max.time")
val READ_CLICKHOUSE_MIN_TIME: Long = config.getLong("read.clickhouse.min.time")
- val ARANGO_TIME_LIMIT_TYPE: Int = config.getInt("arango.time.limit.type")
-
- val READ_ARANGO_MAX_TIME: Long = config.getLong("read.arango.max.time")
- val READ_ARANGO_MIN_TIME: Long = config.getLong("read.arango.min.time")
-
- val ARANGODB_READ_LIMIT: String = config.getString("arangoDB.read.limit")
+ val ARANGODB_READ_LIMIT: Int = config.getInt("arangoDB.read.limit")
val UPDATE_ARANGO_BATCH: Int = config.getInt("update.arango.batch")
val RECENT_COUNT_HOUR: Int = config.getInt("recent.count.hour")
val DISTINCT_CLIENT_IP_NUM: Int = config.getInt("distinct.client.ip.num")
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
index 952c30c..eb6a736 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
@@ -11,7 +11,7 @@ object BaseClickhouseData {
val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
private val timeLimit: (Long, Long) = getTimeLimit
- private def initClickhouseData(sql:String): Unit ={
+ private def initClickhouseData(sql:String): DataFrame ={
val dataFrame: DataFrame = spark.read.format("jdbc")
.option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL)
@@ -28,6 +28,8 @@ object BaseClickhouseData {
.load()
dataFrame.printSchema()
dataFrame.createOrReplaceGlobalTempView("dbtable")
+
+ dataFrame
}
def loadConnectionDataFromCk(): Unit ={
@@ -68,41 +70,7 @@ object BaseClickhouseData {
initClickhouseData(sql)
}
- def getVertexFqdnDf: DataFrame ={
- loadConnectionDataFromCk()
- val sql =
- """
- |SELECT
- | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME
- |FROM
- | (
- | (SELECT
- | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
- | FROM
- | global_temp.dbtable
- | WHERE
- | common_schema_type = 'SSL' GROUP BY ssl_sni
- | )
- | UNION ALL
- | (SELECT
- | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
- | FROM
- | global_temp.dbtable
- | WHERE
- | common_schema_type = 'HTTP' GROUP BY http_host
- | )
- | )
- |GROUP BY
- | FQDN
- |HAVING
- | FQDN != ''
- """.stripMargin
- LOG.warn(sql)
- val vertexFqdnDf = spark.sql(sql)
- vertexFqdnDf.printSchema()
- vertexFqdnDf
- }
-
+ /*
def getVertexIpDf: DataFrame ={
loadConnectionDataFromCk()
val sql =
@@ -190,6 +158,149 @@ object BaseClickhouseData {
relationFqdnLocateIpDf.printSchema()
relationFqdnLocateIpDf
}
+ */
+
+ def getVertexFqdnDf: DataFrame ={
+ val sql =
+ """
+ |(SELECT
+ | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME
+ |FROM
+ | ((SELECT
+ | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
+ | FROM tsg_galaxy_v3.connection_record_log
+ | WHERE common_schema_type = 'SSL' GROUP BY ssl_sni
+ | )UNION ALL
+ | (SELECT
+ | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
+ | FROM tsg_galaxy_v3.connection_record_log
+ | WHERE common_schema_type = 'HTTP' GROUP BY http_host))
+ |GROUP BY FQDN HAVING FQDN != '') as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexIpDf: DataFrame ={
+ val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
+ val sql =
+ s"""
+ |(SELECT * FROM
+ |((SELECT common_client_ip AS IP,MIN(common_recv_time) AS FIRST_FOUND_TIME,
+ |MAX(common_recv_time) AS LAST_FOUND_TIME,
+ |count(*) as SESSION_COUNT,
+ |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
+ |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info,
+ |'client' as ip_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |where $where
+ |group by common_client_ip)
+ |UNION ALL
+ |(SELECT common_server_ip AS IP,
+ |MIN(common_recv_time) AS FIRST_FOUND_TIME,
+ |MAX(common_recv_time) AS LAST_FOUND_TIME,
+ |count(*) as SESSION_COUNT,
+ |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
+ |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info,
+ |'server' as ip_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |where $where
+ |group by common_server_ip))) as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+
+ def getRelationFqdnLocateIpDf: DataFrame ={
+ val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
+ val sql =
+ s"""
+ |(SELECT * FROM
+ |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
+ |UNION ALL
+ |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))
+ |WHERE FQDN != '') as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getRelationSubidLocateIpDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME
+ |FROM radius_record_log
+ |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip
+ |) as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexSubidDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log
+ |WHERE $where GROUP BY common_subscriber_id
+ |)as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexFramedIpDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT DISTINCT radius_framed_ip,common_recv_time as LAST_FOUND_TIME FROM radius_record_log WHERE $where
+ |)as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
private def getTimeLimit: (Long,Long) ={
var maxTime = 0L
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
index 460caed..3094691 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
@@ -4,37 +4,57 @@ import java.util.regex.Pattern
import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.dao.BaseClickhouseData
+import cn.ac.iie.spark.ArangoSpark
import cn.ac.iie.spark.partition.CustomPartitioner
+import cn.ac.iie.spark.rdd.ReadOptions
+import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import org.slf4j.LoggerFactory
+import cn.ac.iie.utils.SparkSessionUtil._
object MergeDataFrame {
private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass)
private val pattern = Pattern.compile("^[\\d]*$")
+ private val options = ReadOptions(ApplicationConfig.ARANGODB_DB_NAME)
- def mergeVertexFqdn(): RDD[Row] ={
- BaseClickhouseData.getVertexFqdnDf
- .rdd.filter(row => isDomain(row.getAs[String](0))).map(row => (row.get(0),row))
- .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
+ def mergeVertexFqdn(): RDD[(String, (Option[BaseDocument], Option[Row]))] ={
+ val fqdnAccmu = getLongAccumulator("FQDN Accumulator")
+ val fqdnRddRow = BaseClickhouseData.getVertexFqdnDf
+ .rdd.filter(row => isDomain(row.getAs[String](0))).map(row => {
+ fqdnAccmu.add(1)
+ (row.getAs[String]("FQDN"), row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ fqdnRddRow.cache()
+ val fqdnRddDoc = ArangoSpark.load[BaseDocument](sparkContext,"FQDN",options)
+
+ fqdnRddDoc.map(doc => (doc.getKey, doc)).fullOuterJoin(fqdnRddRow)
}
- def mergeVertexIp(): RDD[Row]={
+ def mergeVertexIp(): RDD[(String, (Option[BaseDocument], Option[Row]))]={
+ val ipAccum = getLongAccumulator("IP Accumulator")
val vertexIpDf = BaseClickhouseData.getVertexIpDf
val frame = vertexIpDf.groupBy("IP").agg(
min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"),
- collect_list("ip_type").alias("ip_type_list")
+ collect_list("ip_type").alias("ip_type_list"),
+ last("common_link_info").alias("common_link_info")
)
- val values = frame.rdd.map(row => (row.get(0), row))
- .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
- values
+ val ipRddRow = frame.rdd.map(row => {
+ ipAccum.add(1)
+ (row.getAs[String]("IP"), row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ val ipRddDoc = ArangoSpark.load[BaseDocument](sparkContext,"IP",options)
+
+ ipRddDoc.map(doc => (doc.getKey, doc)).fullOuterJoin(ipRddRow)
+
}
- def mergeRelationFqdnLocateIp(): RDD[Row] ={
+ def mergeRelationFqdnLocateIp(): RDD[(String, (Option[BaseEdgeDocument], Option[Row]))] ={
+ val fqdnLocIpAccum = getLongAccumulator("R_LOCATE_FQDN2IP Accumulator")
val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN")))
.groupBy("FQDN", "common_server_ip")
.agg(
@@ -44,28 +64,72 @@ object MergeDataFrame {
collect_list("schema_type").alias("schema_type_list"),
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT")
)
- frame.rdd.map(row => {
+ val fqdnLocIpRddRow = frame.rdd.map(row => {
val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip")
- val key = fqdn.concat("-"+serverIp)
- (key,row)
- }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
+ val key = fqdn.concat("-" + serverIp)
+ fqdnLocIpAccum.add(1)
+ (key, row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ val fqdnLocIpRddDoc = ArangoSpark.load[BaseEdgeDocument](sparkContext,"R_LOCATE_FQDN2IP",options)
+
+ fqdnLocIpRddDoc.map(doc => (doc.getKey, doc)).fullOuterJoin(fqdnLocIpRddRow)
}
+ def mergeRelationSubidLocateIp(): RDD[(String, (Option[BaseEdgeDocument], Option[Row]))] ={
+ val subidLocIpAccum = getLongAccumulator("R_LOCATE_SUBSCRIBER2IP Accumulator")
+ val subidLocIpRddRow = BaseClickhouseData.getRelationSubidLocateIpDf
+ .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .rdd.map(row => {
+ val commonSubscriberId = row.getAs[String]("common_subscriber_id")
+ val ip = row.getAs[String]("radius_framed_ip")
+ val key = commonSubscriberId.concat("-" + ip)
+ subidLocIpAccum.add(1)
+ (key, row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ val subidLocIpRddDoc = ArangoSpark.load[BaseEdgeDocument](sparkContext,"R_LOCATE_SUBSCRIBER2IP",options)
+
+ subidLocIpRddDoc.map(doc => (doc.getKey, doc)).fullOuterJoin(subidLocIpRddRow)
+ }
+
+ def mergeVertexSubid(): RDD[(String, (Option[BaseDocument], Option[Row]))] ={
+ val subidAccum = getLongAccumulator("SUBSCRIBER Accumulator")
+ val subidRddRow = BaseClickhouseData.getVertexSubidDf
+ .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .rdd.map(row => {
+ val commonSubscriberId = row.getAs[String]("common_subscriber_id")
+ subidAccum.add(1)
+ (commonSubscriberId, row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+
+ val subidRddDoc = ArangoSpark.load[BaseDocument](sparkContext,"SUBSCRIBER",options)
+
+ subidRddDoc.map(doc => (doc.getKey, doc)).fullOuterJoin(subidRddRow)
+
+ }
+
+ def mergeVertexFrameIp: RDD[Row] ={
+ val framedIpAccum = getLongAccumulator("framed ip Accumulator")
+ val values = BaseClickhouseData.getVertexFramedIpDf
+ .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .rdd.map(row => {
+ val ip = row.getAs[String]("radius_framed_ip")
+ framedIpAccum.add(1)
+ (ip, row)
+ }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
+ values
+ }
+
private def isDomain(fqdn: String): Boolean = {
try {
if (fqdn == null || fqdn.length == 0) {
return false
}
- if (fqdn.contains(":")) {
- val s = fqdn.split(":")(0)
- if (s.contains(":")){
- return false
- }
- }
- val fqdnArr = fqdn.split("\\.")
- if (fqdnArr.length < 4 || fqdnArr.length > 4){
+
+ val fqdnArr = fqdn.split(":")(0).split("\\.")
+
+ if (fqdnArr.length != 4){
return true
}
for (f <- fqdnArr) {
@@ -83,6 +147,7 @@ object MergeDataFrame {
LOG.error("解析域名 " + fqdn + " 失败:\n" + e.toString)
}
false
+
}
}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
index bdf8120..a275ab3 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
@@ -1,7 +1,8 @@
package cn.ac.iie.service.update
-import java.lang
+import java.util
+import scala.collection.JavaConversions._
import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.service.read.ReadHistoryArangoData
@@ -14,16 +15,24 @@ object UpdateDocHandler {
val PROTOCOL_SET: Set[String] = Set("HTTP","TLS","DNS")
def updateMaxAttribute(hisDoc: BaseDocument,newAttribute:Long,attributeName:String): Unit ={
- var hisAttritube = hisDoc.getAttribute(attributeName).toString.toLong
- if (newAttribute > hisAttritube){
- hisAttritube = newAttribute
+ if(hisDoc.getProperties.containsKey(attributeName)){
+ var hisAttritube = hisDoc.getAttribute(attributeName).toString.toLong
+ if (newAttribute > hisAttritube){
+ hisAttritube = newAttribute
+ }
+ hisDoc.addAttribute(attributeName,hisAttritube)
}
- hisDoc.addAttribute(attributeName,hisAttritube)
}
def updateSumAttribute(hisDoc: BaseDocument,newAttribute:Long,attributeName:String): Unit ={
- val hisAttritube = hisDoc.getAttribute(attributeName).toString.toLong
- hisDoc.addAttribute(attributeName,newAttribute+hisAttritube)
+ if (hisDoc.getProperties.containsKey(attributeName)){
+ val hisAttritube = hisDoc.getAttribute(attributeName).toString.toLong
+ hisDoc.addAttribute(attributeName,newAttribute+hisAttritube)
+ }
+ }
+
+ def replaceAttribute(hisDoc: BaseDocument,newAttribute:String,attributeName:String): Unit ={
+ hisDoc.addAttribute(attributeName,newAttribute)
}
def separateAttributeByIpType(ipTypeList:ofRef[String],
@@ -62,19 +71,21 @@ object UpdateDocHandler {
}
def updateProtocolAttritube(hisDoc:BaseEdgeDocument, protocolMap: Map[String, Long]): Unit ={
- var protocolType = hisDoc.getAttribute("PROTOCOL_TYPE").toString
- protocolMap.foreach(t => {
- if (t._2 > 0 && !protocolType.contains(t._1)){
- protocolType = protocolType.concat(","+ t._1)
- }
- val cntTotalName = t._1.concat("_CNT_TOTAL")
- val cntRecentName = t._1.concat("_CNT_RECENT")
- val cntRecent: Array[lang.Long] = hisDoc.getAttribute(cntRecentName).asInstanceOf[Array[java.lang.Long]]
- cntRecent.update(0,t._2)
- updateSumAttribute(hisDoc,t._2,cntTotalName)
- hisDoc.addAttribute(cntRecentName,cntRecent)
- })
- hisDoc.addAttribute("PROTOCOL_TYPE",protocolType)
+ if (hisDoc.getProperties.containsKey("PROTOCOL_TYPE")){
+ var protocolType = hisDoc.getAttribute("PROTOCOL_TYPE").toString
+ protocolMap.foreach((t: (String, Long)) => {
+ if (t._2 > 0 && !protocolType.contains(t._1)){
+ protocolType = protocolType.concat(","+ t._1)
+ }
+ val cntTotalName = t._1.concat("_CNT_TOTAL")
+ val cntRecentName = t._1.concat("_CNT_RECENT")
+ val cntRecent = hisDoc.getAttribute(cntRecentName).asInstanceOf[Array[Long]]
+ cntRecent.update(0,t._2)
+ updateSumAttribute(hisDoc,t._2,cntTotalName)
+ hisDoc.addAttribute(cntRecentName,cntRecent)
+ })
+ hisDoc.addAttribute("PROTOCOL_TYPE",protocolType)
+ }
}
def putProtocolAttritube(doc:BaseEdgeDocument, protocolMap: Map[String, Long]): Unit ={
@@ -93,10 +104,30 @@ object UpdateDocHandler {
doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",",""))
}
- def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={
- distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
+ def updateProtocolDocument(doc: BaseEdgeDocument): Unit = {
+ if (doc.getProperties.containsKey("PROTOCOL_TYPE")) {
+ for (protocol <- PROTOCOL_SET) {
+ val protocolRecent = protocol + "_CNT_RECENT"
+ val cntRecent: util.ArrayList[Long] = doc.getAttribute(protocolRecent).asInstanceOf[util.ArrayList[Long]]
+ val cntRecentsSrc = cntRecent.toArray().map(_.toString.toLong)
+ val cntRecentsDst = new Array[Long](24)
+ System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1)
+ cntRecentsDst(0) = 0L
+ doc.addAttribute(protocolRecent, cntRecentsDst)
+ }
+ }
+ }
+
+ def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={
+ distCipRecent.flatMap(str => {
+ str.replaceAll("\\[","")
+ .replaceAll("\\]","")
+ .replaceAll("\\'","")
+ .split(",")
+ }).distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
}
+
def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={
val map = newDistinctIp.map(ip => {
(ip, ReadHistoryArangoData.currentHour)
@@ -106,17 +137,19 @@ object UpdateDocHandler {
}
def updateDistinctIp(hisDoc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={
- val hisDistCip = hisDoc.getAttribute("DIST_CIP").asInstanceOf[Array[String]]
- val hisDistCipTs = hisDoc.getAttribute("DIST_CIP_TS").asInstanceOf[Array[Long]]
- if (hisDistCip.length == hisDistCipTs.length){
- val distCipToTsMap: Map[String, Long] = hisDistCip.zip(hisDistCipTs).toMap
- val muDistCipToTsMap: mutable.Map[String, Long] = mutable.Map(distCipToTsMap.toSeq:_*)
- newDistinctIp.foreach(cip => {
- muDistCipToTsMap.put(cip,ReadHistoryArangoData.currentHour)
- })
- val resultMap = muDistCipToTsMap.toList.sortBy(-_._2).take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toMap
- hisDoc.addAttribute("DIST_CIP",resultMap.keys.toArray)
- hisDoc.addAttribute("DIST_CIP_TS",resultMap.values.toArray)
+ if (hisDoc.getProperties.containsKey("DIST_CIP") && hisDoc.getProperties.containsKey("DIST_CIP_TS")){
+ val hisDistCip = hisDoc.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]]
+ val hisDistCipTs = hisDoc.getAttribute("DIST_CIP_TS").asInstanceOf[util.ArrayList[Long]]
+ if (hisDistCip.length == hisDistCipTs.length){
+ val distCipToTsMap: Map[String, Long] = hisDistCip.zip(hisDistCipTs).toMap
+ val muDistCipToTsMap: mutable.Map[String, Long] = mutable.Map(distCipToTsMap.toSeq:_*)
+ newDistinctIp.foreach(cip => {
+ muDistCipToTsMap.put(cip,ReadHistoryArangoData.currentHour)
+ })
+ val resultMap = muDistCipToTsMap.toList.sortBy(-_._2).take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toMap
+ hisDoc.addAttribute("DIST_CIP",resultMap.keys.toArray)
+ hisDoc.addAttribute("DIST_CIP_TS",resultMap.values.toArray)
+ }
}
}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
index b7d4875..5162834 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
@@ -1,17 +1,12 @@
package cn.ac.iie.service.update
import java.util
-import java.util.concurrent.ConcurrentHashMap
import cn.ac.iie.config.ApplicationConfig
-import cn.ac.iie.dao.BaseArangoData
-import cn.ac.iie.dao.BaseArangoData._
import cn.ac.iie.service.transform.MergeDataFrame._
import cn.ac.iie.service.update.UpdateDocHandler._
-import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil}
-import cn.ac.iie.utils.SparkSessionUtil.spark
+import cn.ac.iie.utils.{ArangoDBConnect, SparkSessionUtil}
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
-import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.slf4j.LoggerFactory
@@ -19,44 +14,47 @@ import org.slf4j.LoggerFactory
import scala.collection.mutable.WrappedArray.ofRef
object UpdateDocument {
- private val pool = ExecutorThreadPool.getInstance
private val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance()
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
- private val baseArangoData = new BaseArangoData()
def update(): Unit = {
try {
- updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn)
- updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp)
- updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
+ updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn)
+
+ updateDocument("SUBSCRIBER",getVertexSubidRow,mergeVertexSubid)
+
+ insertFrameIp()
+
+ updateDocument("R_LOCATE_SUBSCRIBER2IP",getRelationSubidLocateIpRow,mergeRelationSubidLocateIp)
+
+ updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp)
+
+ updateDocument("IP", getVertexIpRow, mergeVertexIp)
+
} catch {
case e: Exception => e.printStackTrace()
} finally {
- pool.shutdown()
arangoManger.clean()
SparkSessionUtil.closeSpark()
+ System.exit(0)
}
}
private def updateDocument[T <: BaseDocument](collName: String,
- historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]],
- getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T,
- clazz: Class[T],
- getNewDataRdd: () => RDD[Row]
+ getDocumentRow: ((String, (Option[T], Option[Row]))) => T,
+ getJoinRdd: () => RDD[(String, (Option[T], Option[Row]))]
): Unit = {
- baseArangoData.readHistoryData(collName, historyMap, clazz)
- val hisBc = spark.sparkContext.broadcast(historyMap)
try {
val start = System.currentTimeMillis()
- val newDataRdd = getNewDataRdd()
- newDataRdd.foreachPartition(iter => {
- val partitionId: Int = TaskContext.get.partitionId
- val dictionaryMap: ConcurrentHashMap[String, T] = hisBc.value.get(partitionId)
+ val joinRdd = getJoinRdd()
+ joinRdd.foreachPartition(iter => {
val resultDocumentList = new util.ArrayList[T]
var i = 0
iter.foreach(row => {
- val document = getDocumentRow(row, dictionaryMap)
- resultDocumentList.add(document)
+ val document = getDocumentRow(row)
+ if (document != null){
+ resultDocumentList.add(document)
+ }
i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, collName)
@@ -73,88 +71,238 @@ object UpdateDocument {
LOG.warn(s"更新$collName 时间:${last - start}")
} catch {
case e: Exception => e.printStackTrace()
- } finally {
- hisBc.destroy()
}
}
- private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
- val fqdn = row.getAs[String]("FQDN")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- var document: BaseDocument = dictionaryMap.getOrDefault(fqdn, null)
- if (document != null) {
- updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
- } else {
- document = new BaseDocument
- document.setKey(fqdn)
- document.addAttribute("FQDN_NAME", fqdn)
- document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
- document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
- }
+ private def insertFrameIp(): Unit ={
+ mergeVertexFrameIp.foreachPartition(iter => {
+ val resultDocumentList = new util.ArrayList[BaseDocument]
+ var i = 0
+ iter.foreach(row => {
+ val document = getVertexFrameipRow(row)
+ resultDocumentList.add(document)
+ i += 1
+ if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
+ arangoManger.overwrite(resultDocumentList, "IP")
+ LOG.warn(s"更新:IP" + i)
+ i = 0
+ }
+ })
+ if (i != 0) {
+ arangoManger.overwrite(resultDocumentList, "IP")
+ LOG.warn(s"更新IP:" + i)
+ }
+ })
+ }
+
+ private def getVertexFrameipRow(row: Row): BaseDocument ={
+ val ip = row.getAs[String]("radius_framed_ip")
+ val document = new BaseDocument()
+ document.setKey(ip)
+ document.addAttribute("IP",ip)
document
}
- private def getVertexIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
- val ip = row.getAs[String]("IP")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
- val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
- val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
- val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
-
- var document = dictionaryMap.getOrDefault(ip, null)
- if (document != null) {
- updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
- updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT")
- updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
- updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
- updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
- } else {
- document = new BaseDocument
- document.setKey(ip)
- document.addAttribute("IP", ip)
- document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
- document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
- document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1)
- document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
- document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
- document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
- document.addAttribute("COMMON_LINK_INFO", "")
+ private def getRelationSubidLocateIpRow(joinRow: (String, (Option[BaseEdgeDocument], Option[Row]))): BaseEdgeDocument ={
+
+ val subidLocIpDocOpt = joinRow._2._1
+ var subidLocIpDoc = subidLocIpDocOpt match {
+ case Some(doc) => doc
+ case None => null
}
- document
+
+ val subidLocIpRowOpt = joinRow._2._2
+
+ val subidLocIpRow = subidLocIpRowOpt match {
+ case Some(r) => r
+ case None => null
+ }
+
+ if (subidLocIpRow != null){
+ val subId = subidLocIpRow.getAs[String]("common_subscriber_id")
+ val ip = subidLocIpRow.getAs[String]("radius_framed_ip")
+ val lastFoundTime = subidLocIpRow.getAs[Long]("LAST_FOUND_TIME")
+ val firstFoundTime = subidLocIpRow.getAs[Long]("FIRST_FOUND_TIME")
+
+ val key = subId.concat("-"+ip)
+ if (subidLocIpDoc != null){
+ updateMaxAttribute(subidLocIpDoc,lastFoundTime,"LAST_FOUND_TIME")
+ } else {
+ subidLocIpDoc = new BaseEdgeDocument()
+ subidLocIpDoc.setKey(key)
+ subidLocIpDoc.setFrom("SUBSCRIBER/" + subId)
+ subidLocIpDoc.setTo("IP/" + ip)
+ subidLocIpDoc.addAttribute("SUBSCRIBER",subId)
+ subidLocIpDoc.addAttribute("IP",ip)
+ subidLocIpDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime)
+ subidLocIpDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime)
+ }
+ }
+ subidLocIpDoc
}
- private def getRelationFqdnLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument = {
- val fqdn = row.getAs[String]("FQDN")
- val serverIp = row.getAs[String]("common_server_ip")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
- val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
- val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
-
- val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
- val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
-
- val key = fqdn.concat("-" + serverIp)
- var document = dictionaryMap.getOrDefault(key, null)
- if (document != null) {
- updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
- updateProtocolAttritube(document, sepAttritubeMap)
- updateDistinctIp(document, distinctIp)
- } else {
- document = new BaseEdgeDocument()
- document.setKey(key)
- document.setFrom("FQDN/" + fqdn)
- document.setTo("IP/" + serverIp)
- document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
- document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
- putProtocolAttritube(document, sepAttritubeMap)
- putDistinctIp(document, distinctIp)
+ private def getVertexSubidRow(joinRow: (String, (Option[BaseDocument], Option[Row]))): BaseDocument ={
+ val subidDocOpt = joinRow._2._1
+ var subidDoc = subidDocOpt match {
+ case Some(doc) => doc
+ case None => null
}
- document
+
+ val subidRowOpt = joinRow._2._2
+
+ val subidRow = subidRowOpt match {
+ case Some(r) => r
+ case None => null
+ }
+
+ if (subidRow != null){
+ val subId = subidRow.getAs[String]("common_subscriber_id")
+ val subLastFoundTime = subidRow.getAs[Long]("LAST_FOUND_TIME")
+ val subFirstFoundTime = subidRow.getAs[Long]("FIRST_FOUND_TIME")
+ if (subidDoc != null){
+ updateMaxAttribute(subidDoc,subLastFoundTime,"LAST_FOUND_TIME")
+ } else {
+ subidDoc = new BaseDocument()
+ subidDoc.setKey(subId)
+ subidDoc.addAttribute("SUBSCRIBER",subId)
+ subidDoc.addAttribute("FIRST_FOUND_TIME",subFirstFoundTime)
+ subidDoc.addAttribute("LAST_FOUND_TIME",subLastFoundTime)
+ }
+ }
+
+ subidDoc
+ }
+
+ private def getVertexFqdnRow(joinRow: (String, (Option[BaseDocument], Option[Row]))): BaseDocument = {
+ val fqdnDocOpt = joinRow._2._1
+ var fqdnDoc = fqdnDocOpt match {
+ case Some(doc) => doc
+ case None => null
+ }
+
+ val fqdnRowOpt = joinRow._2._2
+
+ val fqdnRow = fqdnRowOpt match {
+ case Some(r) => r
+ case None => null
+ }
+
+ if (fqdnRow != null){
+ val fqdn = fqdnRow.getAs[String]("FQDN")
+ val lastFoundTime = fqdnRow.getAs[Long]("LAST_FOUND_TIME")
+ val firstFoundTime = fqdnRow.getAs[Long]("FIRST_FOUND_TIME")
+ if (fqdnDoc != null) {
+ updateMaxAttribute(fqdnDoc, lastFoundTime, "LAST_FOUND_TIME")
+ } else {
+ fqdnDoc = new BaseDocument
+ fqdnDoc.setKey(fqdn)
+ fqdnDoc.addAttribute("FQDN_NAME", fqdn)
+ fqdnDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
+ fqdnDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime)
+ }
+ }
+
+ fqdnDoc
+ }
+
+ private def getVertexIpRow(joinRow: (String, (Option[BaseDocument], Option[Row]))): BaseDocument = {
+ val ipDocOpt = joinRow._2._1
+ var ipDoc = ipDocOpt match {
+ case Some(doc) => doc
+ case None => null
+ }
+
+ val ipRowOpt = joinRow._2._2
+
+ val ipRow = ipRowOpt match {
+ case Some(r) => r
+ case None => null
+ }
+
+ if (ipRow != null){
+ val ip = ipRow.getAs[String]("IP")
+ val firstFoundTime = ipRow.getAs[Long]("FIRST_FOUND_TIME")
+ val lastFoundTime = ipRow.getAs[Long]("LAST_FOUND_TIME")
+ val sessionCountList = ipRow.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
+ val bytesSumList = ipRow.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
+ val ipTypeList = ipRow.getAs[ofRef[String]]("ip_type_list")
+ val linkInfo = ipRow.getAs[String]("common_link_info")
+ val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
+
+ if (ipDoc != null) {
+ updateMaxAttribute(ipDoc, lastFoundTime, "LAST_FOUND_TIME")
+ updateSumAttribute(ipDoc, sepAttributeTuple._1, "SERVER_SESSION_COUNT")
+ updateSumAttribute(ipDoc, sepAttributeTuple._2, "SERVER_BYTES_SUM")
+ updateSumAttribute(ipDoc, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
+ updateSumAttribute(ipDoc, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
+ replaceAttribute(ipDoc,linkInfo,"COMMON_LINK_INFO")
+ } else {
+ ipDoc = new BaseDocument
+ ipDoc.setKey(ip)
+ ipDoc.addAttribute("IP", ip)
+ ipDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
+ ipDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime)
+ ipDoc.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1)
+ ipDoc.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
+ ipDoc.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
+ ipDoc.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
+ ipDoc.addAttribute("COMMON_LINK_INFO", "")
+ }
+ }
+
+ ipDoc
+ }
+
+ private def getRelationFqdnLocateIpRow(joinRow: (String, (Option[BaseEdgeDocument], Option[Row]))): BaseEdgeDocument = {
+
+ val fqdnLocIpDocOpt = joinRow._2._1
+ var fqdnLocIpDoc = fqdnLocIpDocOpt match {
+ case Some(doc) => doc
+ case None => null
+ }
+
+ val fqdnLocIpRowOpt = joinRow._2._2
+
+ val fqdnLocIpRow = fqdnLocIpRowOpt match {
+ case Some(r) => r
+ case None => null
+ }
+
+ if (fqdnLocIpDoc != null){
+ updateProtocolDocument(fqdnLocIpDoc)
+ }
+
+ if (fqdnLocIpRow != null){
+ val fqdn = fqdnLocIpRow.getAs[String]("FQDN")
+ val serverIp = fqdnLocIpRow.getAs[String]("common_server_ip")
+ val firstFoundTime = fqdnLocIpRow.getAs[Long]("FIRST_FOUND_TIME")
+ val lastFoundTime = fqdnLocIpRow.getAs[Long]("LAST_FOUND_TIME")
+ val countTotalList = fqdnLocIpRow.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
+ val schemaTypeList = fqdnLocIpRow.getAs[ofRef[AnyRef]]("schema_type_list")
+ val distCipRecent = fqdnLocIpRow.getAs[ofRef[String]]("DIST_CIP_RECENT")
+
+ val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
+ val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
+
+ val key = fqdn.concat("-" + serverIp)
+
+ if (fqdnLocIpDoc != null) {
+ updateMaxAttribute(fqdnLocIpDoc, lastFoundTime, "LAST_FOUND_TIME")
+ updateProtocolAttritube(fqdnLocIpDoc, sepAttritubeMap)
+ updateDistinctIp(fqdnLocIpDoc, distinctIp)
+ } else {
+ fqdnLocIpDoc = new BaseEdgeDocument()
+ fqdnLocIpDoc.setKey(key)
+ fqdnLocIpDoc.setFrom("FQDN/" + fqdn)
+ fqdnLocIpDoc.setTo("IP/" + serverIp)
+ fqdnLocIpDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
+ fqdnLocIpDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime)
+ putProtocolAttritube(fqdnLocIpDoc, sepAttritubeMap)
+ putDistinctIp(fqdnLocIpDoc, distinctIp)
+ }
+ }
+
+ fqdnLocIpDoc
}
}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/ArangoSpark.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/ArangoSpark.scala
index b492f9a..e1a4060 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/ArangoSpark.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/ArangoSpark.scala
@@ -24,6 +24,7 @@ package cn.ac.iie.spark
import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions, WriteOptions}
import cn.ac.iie.spark.vpack.VPackUtils
+import com.arangodb.model.DocumentCreateOptions
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -76,7 +77,6 @@ object ArangoSpark {
*
* @param dataframe the dataframe with data to save
* @param collection the collection to save in
- * @param options additional write options
*/
def saveDF(dataframe: DataFrame, collection: String): Unit =
saveRDD[Row](dataframe.rdd, collection, WriteOptions(), (x: Iterator[Row]) => x.map { y => VPackUtils.rowToVPack(y) })
@@ -102,6 +102,11 @@ object ArangoSpark {
case WriteOptions.INSERT => col.insertDocuments(docs)
case WriteOptions.UPDATE => col.updateDocuments(docs)
case WriteOptions.REPLACE => col.replaceDocuments(docs)
+ case WriteOptions.OVERWRITE =>
+ val documentCreateOptions = new DocumentCreateOptions
+ documentCreateOptions.overwrite(true)
+ documentCreateOptions.silent(true)
+ col.insertDocuments(docs, documentCreateOptions)
}
arangoDB.shutdown()
@@ -123,7 +128,7 @@ object ArangoSpark {
*
* @param sparkContext the sparkContext containing the ArangoDB configuration
* @param collection the collection to load data from
- * @param additional read options
+ * @param options read options
*/
def load[T: ClassTag](sparkContext: SparkContext, collection: String, options: ReadOptions): ArangoRdd[T] =
new ArangoRdd[T](sparkContext, createReadOptions(options, sparkContext.getConf).copy(collection = collection))
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
index 4162e76..ab77299 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
@@ -56,7 +56,7 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext,
override def repartition(numPartitions: Int)(implicit ord: Ordering[T]): RDD[T] = super.repartition(numPartitions)
private def getPartition(idx: Int, countTotal: Long): QueryArangoPartition = {
- val sepNum = countTotal / ApplicationConfig.THREAD_POOL_NUMBER + 1
+ val sepNum = countTotal / ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS + 1
val offsetNum = idx * sepNum
new QueryArangoPartition(idx, offsetNum, sepNum)
}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/WriteOptions.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/WriteOptions.scala
index 46f3c80..d659cb5 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/WriteOptions.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/WriteOptions.scala
@@ -116,4 +116,9 @@ object WriteOptions {
*/
case object REPLACE extends Method
+ /**
+ * save documents by overwrite
+ */
+ case object OVERWRITE extends Method
+
} \ No newline at end of file
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala
index 8f7661a..5bececa 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala
@@ -1,7 +1,9 @@
package cn.ac.iie.utils
import cn.ac.iie.config.ApplicationConfig
+import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.LongAccumulator
import org.slf4j.LoggerFactory
object SparkSessionUtil {
@@ -9,6 +11,8 @@ object SparkSessionUtil {
val spark: SparkSession = getSparkSession
+ var sparkContext: SparkContext = getContext
+
private def getSparkSession: SparkSession ={
val spark: SparkSession = SparkSession
.builder()
@@ -26,10 +30,27 @@ object SparkSessionUtil {
spark
}
+ def getContext: SparkContext = {
+ @transient var sc: SparkContext = null
+ if (sparkContext == null) sc = spark.sparkContext
+ sc
+ }
+
+ def getLongAccumulator(name: String): LongAccumulator ={
+ if (sparkContext == null){
+ sparkContext = getContext
+ }
+ sparkContext.longAccumulator(name)
+
+ }
+
def closeSpark(): Unit ={
if (spark != null){
spark.stop()
}
+ if (sparkContext != null){
+ sparkContext.stop()
+ }
}
}
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala
index 608fb2d..08d39c5 100644
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala
+++ b/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala
@@ -7,7 +7,7 @@ import org.apache.spark.sql.SparkSession
object BaseClickhouseDataTest {
private val spark: SparkSession = SparkSessionUtil.spark
def main(args: Array[String]): Unit = {
- BaseClickhouseData loadConnectionDataFromCk()
+// BaseClickhouseData loadConnectionDataFromCk()
val sql =
"""
|SELECT
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala
deleted file mode 100644
index 67590ff..0000000
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package cn.ac.iie.service.update
-
-import java.util
-import java.util.ArrayList
-import java.util.concurrent.ConcurrentHashMap
-
-import cn.ac.iie.dao.BaseArangoData
-import cn.ac.iie.dao.BaseArangoData._
-import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
-
-import scala.collection.mutable.WrappedArray.ofRef
-
-object UpdateDocumentTest {
- def main(args: Array[String]): Unit = {
- val baseArangoData = new BaseArangoData()
- baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument])
-
- val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys()
- while (value.hasMoreElements) {
- val integer: Integer = value.nextElement()
- val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer)
- val unit = map.keys()
- while (unit.hasMoreElements) {
- val key = unit.nextElement()
- val edgeDocument = map.get(key)
- // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]]
- // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]]
- val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]]
- val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]]
- println(longs.toString + "---" + strings.toString)
- }
- }
- }
-
-}
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
index 4936cd4..852aeb1 100644
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
+++ b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
@@ -1,9 +1,14 @@
package cn.ac.iie.spark
-import cn.ac.iie.spark.rdd.ReadOptions
+import cn.ac.iie.config.ApplicationConfig
+import cn.ac.iie.dao.BaseClickhouseData
+import cn.ac.iie.spark.partition.CustomPartitioner
+import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions}
import cn.ac.iie.utils.SparkSessionUtil
import com.arangodb.entity.BaseDocument
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions.{collect_list, max, min}
import org.apache.spark.storage.StorageLevel
object RDDTest {
@@ -14,30 +19,57 @@ object RDDTest {
println(sparkContext.getConf.get("arangodb.hosts"))
// val options = ReadOptions("iplearn_media_domain").copy(collection = "R_LOCATE_FQDN2IP")
- val options = ReadOptions("ip-learning-test-0")
+ val options = ReadOptions(ApplicationConfig.ARANGODB_DB_NAME)
val ipOptions = options.copy(collection = "IP")
- val rdd = ArangoSpark.load[BaseDocument](sparkContext,"IP",options)
+ val rdd: ArangoRdd[BaseDocument] = ArangoSpark.load[BaseDocument](sparkContext,"IP",options)
println(rdd.count())
println(rdd.getNumPartitions)
+ val ipRDD = mergeVertexIp()
+ val value: RDD[(String, (Option[BaseDocument], Option[Row]))] = rdd.map(doc => {
+ (doc.getKey, doc)
+ }).fullOuterJoin(ipRDD)
+ value.foreach((row: (String, (Option[BaseDocument], Option[Row]))) => {
+ val value = row._2._2
+ val str: String = value match {
+ case Some(r) => r.getAs[String]("IP")
+// case None => null
+ case _ => null
+ }
+ println(str)
+ })
+
+ /*
val value: RDD[BaseDocument] = rdd.filter(doc => doc.getAttribute("CLIENT_SESSION_COUNT").asInstanceOf[Long] > 100).map(doc => {
doc.addAttribute("abc", 1)
doc
})
-
value.map(doc => {(doc.getKey,doc)})
-
value.persist(StorageLevel.MEMORY_AND_DISK)
-
- value.foreach(row => println(row.toString))
+ value.foreach(fqdnRow => println(fqdnRow.toString))
println(value.count())
+ */
SparkSessionUtil.spark.close()
System.exit(0)
}
+ def mergeVertexIp(): RDD[(String,Row)]={
+ val vertexIpDf = BaseClickhouseData.getVertexIpDf
+ val frame = vertexIpDf.groupBy("IP").agg(
+ min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
+ max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
+ collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
+ collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"),
+ collect_list("ip_type").alias("ip_type_list")
+ )
+ val values = frame.rdd.map(row => (row.getAs[String]("IP"), row))
+ .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ values
+ }
+
}