summaryrefslogtreecommitdiff
path: root/ip-learning
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-06-28 18:27:48 +0800
committerwanglihui <[email protected]>2020-06-28 18:27:48 +0800
commit9ffe686f3f41adc876fa9db7cc2d6f64f6a721f7 (patch)
treee34bac3974ae6d9113c2773d1955a23d160821bf /ip-learning
parent6f86960a70bac53f003b4a6c5c2afa65fc80bebf (diff)
first commit
Diffstat (limited to 'ip-learning')
-rw-r--r--ip-learning/.gitignore9
-rw-r--r--ip-learning/pom.xml211
-rw-r--r--ip-learning/src/main/resources/application.conf26
-rw-r--r--ip-learning/src/main/resources/application.properties25
-rw-r--r--ip-learning/src/main/resources/log4j.properties40
-rw-r--r--ip-learning/src/main/resources/spark-env.sh71
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/dao/BaseMediaDataLoad.scala125
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraph.scala177
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByArangoSpark.scala237
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByDF.scala250
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/etl/CursorTransform.scala33
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/main/IPLearningApplication.scala29
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEdgeIPVisitFqdn.scala34
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEgdeFqdnAddressIP.scala34
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexFqdn.scala30
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexIP.scala32
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/ArangoDBSparkTest.scala52
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbReadV_IPTest.scala37
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTest.scala314
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemory.scala355
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemoryGroupBy.scala40
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/Config.scala22
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/ReadClickhouseTest.scala447
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocument.scala29
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocumentDataFrame.scala35
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/TestIndices.scala219
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/test/TestSparkJoin.scala56
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/utils/ConfigUtils.scala34
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/utils/DateTimeUtils.scala5
-rw-r--r--ip-learning/src/main/scala/cn/ac/iie/utils/InitArangoDBPool.scala24
-rw-r--r--ip-learning/src/test/java/cn/ac/iie/CreateObjectJavaTest.java32
-rw-r--r--ip-learning/src/test/scala/cn/ac/iie/CreateObjectTest.scala157
-rw-r--r--ip-learning/src/test/scala/cn/ac/iie/HiveUnionTest.scala78
-rw-r--r--ip-learning/src/test/scala/cn/ac/iie/TestMap.scala11
34 files changed, 3310 insertions, 0 deletions
diff --git a/ip-learning/.gitignore b/ip-learning/.gitignore
new file mode 100644
index 0000000..a77ab78
--- /dev/null
+++ b/ip-learning/.gitignore
@@ -0,0 +1,9 @@
+# Created by .ignore support plugin (hsz.mobi)
+### Example user template template
+### Example user template
+
+# IntelliJ project files
+.idea
+*.iml
+target
+spark-warehouse/
diff --git a/ip-learning/pom.xml b/ip-learning/pom.xml
new file mode 100644
index 0000000..b66a1ab
--- /dev/null
+++ b/ip-learning/pom.xml
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>cn.ac.iie</groupId>
+ <artifactId>ip-learning</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.2</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>19.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>2.2.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>2.2.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>ru.yandex.clickhouse</groupId>
+ <artifactId>clickhouse-jdbc</artifactId>
+ <version>0.1.54</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid</artifactId>
+ <version>1.1.10</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <version>1.2.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-xml</artifactId>
+ <version>2.11.0-M4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.11.8</version>
+ </dependency>
+
+
+ <!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-graphdb -->
+ <dependency>
+ <groupId>com.orientechnologies</groupId>
+ <artifactId>orientdb-graphdb</artifactId>
+ <version>3.0.31</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-client -->
+ <dependency>
+ <groupId>com.orientechnologies</groupId>
+ <artifactId>orientdb-client</artifactId>
+ <version>3.0.31</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-core -->
+ <dependency>
+ <groupId>com.orientechnologies</groupId>
+ <artifactId>orientdb-core</artifactId>
+ <version>3.0.31</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-server -->
+ <dependency>
+ <groupId>com.orientechnologies</groupId>
+ <artifactId>orientdb-server</artifactId>
+ <version>3.0.31</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-jdbc -->
+ <dependency>
+ <groupId>com.orientechnologies</groupId>
+ <artifactId>orientdb-jdbc</artifactId>
+ <version>3.0.31</version>
+ </dependency>
+
+
+ <!-- https://mvnrepository.com/artifact/com.tinkerpop.blueprints/blueprints-orient-graph -->
+ <dependency>
+ <groupId>com.tinkerpop.blueprints</groupId>
+ <artifactId>blueprints-orient-graph</artifactId>
+ <version>2.4.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.arangodb</groupId>
+ <artifactId>arangodb-java-driver</artifactId>
+ <version>4.2.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.arangodb</groupId>
+ <artifactId>arangodb-spark-connector</artifactId>
+ <version>1.0.2</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.swoop/spark-alchemy -->
+ <dependency>
+ <groupId>com.swoop</groupId>
+ <artifactId>spark-alchemy_2.11</artifactId>
+ <version>0.3.28</version>
+ </dependency>
+
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>cn.ac.iie.main.IPLearningApplication</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/ip-learning/src/main/resources/application.conf b/ip-learning/src/main/resources/application.conf
new file mode 100644
index 0000000..e6ff7dd
--- /dev/null
+++ b/ip-learning/src/main/resources/application.conf
@@ -0,0 +1,26 @@
+#spark任务配置
+spark.sql.shuffle.partitions=144
+spark.sql.read.fetchsize="10000"
+spark.executor.memory="120g"
+spark.app.name="test"
+spark.network.timeout="300s"
+repartitionNumber=36
+spark.serializer="org.apache.spark.serializer.KryoSerializer"
+master="local[*]"
+#spark读取clickhouse配置
+numPartitions="144"
+maxPoolSize=40
+minTime="1571245199"
+maxTime="1571284799"
+clickhouse.socket.timeout=300000
+#arangoDB配置
+arangoDB.host="192.168.40.127"
+arangoDB.port=8529
+arangoDB.user="root"
+arangoDB.password="111111"
+arangoDB.DB.name="insert_iplearn_index"
+arangoDB.batch=100000
+arangoDB.ttl=3600
+
+thread.pool.number=10
+
diff --git a/ip-learning/src/main/resources/application.properties b/ip-learning/src/main/resources/application.properties
new file mode 100644
index 0000000..87b0bbb
--- /dev/null
+++ b/ip-learning/src/main/resources/application.properties
@@ -0,0 +1,25 @@
+#spark任务配置
+spark.sql.shuffle.partitions=144
+spark.sql.read.fetchsize=10000
+spark.executor.memory=120g
+spark.app.name=test
+spark.network.timeout=300s
+repartitionNumber=36
+spark.serializer=org.apache.spark.serializer.KryoSerializer
+master=local[*]
+#spark读取clickhouse配置
+numPartitions=144
+maxPoolSize=40
+minTime=1571245199
+maxTime=1571284799
+clickhouse.socket.timeout=300000
+#arangoDB配置
+arangoDB.host=192.168.40.127
+arangoDB.port=8529
+arangoDB.user=root
+arangoDB.password=111111
+arangoDB.DB.name=insert_iplearn_index
+arangoDB.batch=100000
+arangoDB.ttl=3600
+
+thread.pool.number=10
diff --git a/ip-learning/src/main/resources/log4j.properties b/ip-learning/src/main/resources/log4j.properties
new file mode 100644
index 0000000..2039ec3
--- /dev/null
+++ b/ip-learning/src/main/resources/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=WARN, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell log level to WARN. When running the spark-shell, the
+# log level for this class is used to overwrite the root logger's log level, so that
+# the user can have different defaults for the shell and regular Spark apps.
+log4j.logger.org.apache.spark.repl.Main=WARN
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark_project.jetty=WARN
+log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
diff --git a/ip-learning/src/main/resources/spark-env.sh b/ip-learning/src/main/resources/spark-env.sh
new file mode 100644
index 0000000..80dd7d4
--- /dev/null
+++ b/ip-learning/src/main/resources/spark-env.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is sourced when running various Spark programs.
+# Copy it as spark-env.sh and edit that to configure Spark for your site.
+
+# Options read when launching programs locally with
+# ./bin/run-example or ./bin/spark-submit
+# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
+# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
+
+# Options read by executors and drivers running inside the cluster
+# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
+# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
+# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
+
+# Options read in YARN client mode
+# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
+# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
+# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
+# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
+
+# Options for the daemons used in the standalone deploy mode
+# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
+# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
+# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
+# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
+# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
+# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
+# - SPARK_WORKER_DIR, to set the working directory of worker processes
+# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
+# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
+# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
+# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
+# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
+# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
+# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
+
+# Generic options for the daemons used in the standalone deploy mode
+# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
+# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)
+# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
+# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
+# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
+# - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file.
+
+export SPARK_MASTER_IP=bigdata-119
+export SPARK_MASTER_PORT=7077
+export SPARK_WORKER_CORES=4
+export SPARK_WORKER_INSTANCES=1
+export SPARK_WORKER_MEMORY=3g
+export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_73
+export SCALA_HOME=/home/ceiec/scala-2.11.7
diff --git a/ip-learning/src/main/scala/cn/ac/iie/dao/BaseMediaDataLoad.scala b/ip-learning/src/main/scala/cn/ac/iie/dao/BaseMediaDataLoad.scala
new file mode 100644
index 0000000..ad531bb
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/dao/BaseMediaDataLoad.scala
@@ -0,0 +1,125 @@
+package cn.ac.iie.dao
+
+import cn.ac.iie.test.Config
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+object BaseMediaDataLoad {
+
+ def loadMediaDate(spark: SparkSession): Unit = {
+ val mediaDataFrame: DataFrame = spark.read.format("jdbc")
+ .option("url", "jdbc:clickhouse://192.168.40.193:8123")
+ .option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
+ .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
+ .option("user", "default")
+ .option("password", "111111")
+ .option("numPartitions", Config.NUMPARTITIONS)
+ .option("partitionColumn", "recv_time")
+ .option("lowerBound", Config.MINTIME)
+ .option("upperBound", Config.MAXTIME)
+ .option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
+ .load()
+ mediaDataFrame.printSchema()
+ mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
+ }
+
+ def getFQDNVertexFromMedia(spark: SparkSession): DataFrame = {
+ val v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS new_fqdn_name,
+ | MIN( recv_time ) AS new_fqdn_first_found_time,
+ | MAX( recv_time ) AS new_fqdn_last_found_time,
+ | COUNT( * ) AS new_fqdn_count_total
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | media_domain != ''
+ |GROUP BY
+ | media_domain
+ """.stripMargin
+ )
+ v_FQDN_DF
+ }
+
+ def getIPVertexFromMedia(spark: SparkSession): DataFrame = {
+ val s_IP_DF = spark.sql(
+ """
+ select
+ s1_s_ip as new_ip,
+ s1_s_location_region as new_location,
+ MIN( recv_time ) AS new_ip_first_found_time,
+ MAX( recv_time ) AS new_ip_last_found_time,
+ COUNT( * ) AS new_ip_count_total
+ from global_temp.media_expire_patch
+ GROUP BY
+ s1_s_ip,
+ s1_s_location_region
+ """.stripMargin)
+ val d_IP_DF = spark.sql(
+ """
+ select
+ s1_d_ip as new_ip,
+ s1_d_location_region as new_location,
+| MIN( recv_time ) AS new_ip_first_found_time,
+ MAX( recv_time ) AS new_ip_last_found_time,
+ COUNT( * ) AS new_ip_count_total
+ from global_temp.media_expire_patch
+ GROUP BY
+ s1_d_ip,
+ s1_d_location_region
+ """.stripMargin)
+ import org.apache.spark.sql.functions._
+ val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("new_ip", "new_location").agg(
+ min("new_ip_first_found_time").as("new_ip_first_found_time"),
+ max("new_ip_last_found_time").as("new_ip_last_found_time"),
+ count("new_ip").as("new_ip_count_total")
+ )
+ v_IP_DF
+ }
+
+ def getFQDNAddressIPEdgeFromMedia(spark: SparkSession): DataFrame = {
+ val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS new_fqdn,
+ | s1_d_ip AS new_ip,
+ | MIN( recv_time ) AS new_first_found_time,
+ | MAX( recv_time ) AS new_last_found_time,
+ | COUNT( * ) AS new_count_total,
+ | CONCAT_WS('-',media_domain,s1_d_ip) AS new_key
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( media_domain != '' )
+ | AND ( s1_d_ip != '' )
+ |GROUP BY
+ | s1_d_ip,
+ | media_domain
+ """.stripMargin)
+ e_Address_v_FQDN_to_v_IP_DF
+ }
+
+ def getIPVisitFQDNEdgeFromMedia(spark: SparkSession): DataFrame = {
+ val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | s1_s_ip AS new_ip,
+ | media_domain AS new_fqdn,
+ | MIN( recv_time ) AS new_first_found_time,
+ | MAX( recv_time ) AS new_last_found_time,
+ | COUNT( * ) AS new_count_total,
+ | CONCAT_WS('-',s1_s_ip,media_domain) as new_key
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( s1_s_ip != '' )
+ | AND ( media_domain != '' )
+ |GROUP BY
+ | s1_s_ip,
+ | media_domain
+ """.stripMargin)
+ e_Visit_v_IP_to_v_FQDN_DF
+ }
+
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraph.scala b/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraph.scala
new file mode 100644
index 0000000..b6e7c3f
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraph.scala
@@ -0,0 +1,177 @@
+package cn.ac.iie.dao
+
+import cn.ac.iie.test.ArangoDbTest.arangoDB
+import cn.ac.iie.test.Config
+import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
+import org.apache.spark.sql.DataFrame
+
+import scala.util.Try
+
+object UpdateArangoGraph {
+
+ /**
+ * 更新FQDN点
+ */
+ def updateFQDNVertex(v_FQDN_DF:DataFrame): Unit ={
+
+ v_FQDN_DF.printSchema()
+ v_FQDN_DF.foreachPartition(iter => {
+ val v_FQDN_Coll = arangoDB.db("insert_iplearn_index").collection("V_FQDN")
+ val docs_Insert = new java.util.ArrayList[BaseDocument]()
+ val docs_Update = new java.util.ArrayList[BaseDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("FQDN_NAME")
+ val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
+ val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
+ val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
+
+ if (v_FQDN_Coll.documentExists(fqdn)) {
+ val document: BaseDocument = v_FQDN_Coll.getDocument(fqdn, classOf[BaseDocument])
+ val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
+ document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseDocument = new BaseDocument()
+ baseDocument.setKey(fqdn)
+ baseDocument.addAttribute("FQDN_NAME", fqdn)
+ baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
+ baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
+ baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
+ docs_Insert.add(baseDocument)
+ }
+ i+=1
+ })
+ Try(v_FQDN_Coll.importDocuments(docs_Insert))
+ Try(v_FQDN_Coll.updateDocuments(docs_Update))
+ })
+ }
+
+ /**
+ * 更新IP点
+ */
+ def updateIPVertex(v_IP_DF:DataFrame): Unit ={
+ v_IP_DF.printSchema()
+ v_IP_DF.foreachPartition(iter => {
+ val v_IP_Coll = arangoDB.db("insert_iplearn_index").collection("V_IP")
+ val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
+ val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
+ var i = 0
+
+ iter.foreach(row => {
+ val ip = row.getAs[String]("IP")
+ val location = row.getAs[String]("location")
+ val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
+ val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
+ val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
+
+ if (v_IP_Coll.documentExists(ip)) {
+ val document: BaseDocument = v_IP_Coll.getDocument(ip, classOf[BaseDocument])
+ val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
+ document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseDocument = new BaseDocument()
+ baseDocument.setKey(ip)
+ baseDocument.addAttribute("IP", ip)
+ baseDocument.addAttribute("IP_LOCATION", location)
+ baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
+ baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
+ docs_Insert.add(baseDocument)
+ }
+ i+=1
+ })
+ Try(v_IP_Coll.importDocuments(docs_Insert))
+ Try(v_IP_Coll.updateDocuments(docs_Update))
+ })
+
+ }
+
+ /**
+ * 统计e_Address_Fqdn_to_IP
+ */
+ def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame): Unit ={
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ e_Address_v_FQDN_to_v_IP_DF.foreachPartition(iter => {
+ val e_Add_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_ADDRESS_V_FQDN_TO_V_IP")
+ val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+
+ if (e_Add_Fqdn_to_IP_Coll.documentExists(fqdn+"-"+ip)) {
+ val document: BaseEdgeDocument = e_Add_Fqdn_to_IP_Coll.getDocument(fqdn+"-"+ip, classOf[BaseEdgeDocument])
+ val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", e_Last)
+ document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
+ baseDocument.setKey(fqdn+"-"+ip)
+ baseDocument.setFrom(s"V_FQDN/$fqdn")
+ baseDocument.setTo(s"V_IP/$ip")
+ baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
+ baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
+ docs_Insert.add(baseDocument)
+ }
+ // println(fqdn+"-"+ip)
+ i+=1
+ })
+ Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
+ })
+ }
+
+ /**
+ * 统计e_Visit_v_IP_to_v_FQDN
+ */
+ def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame): Unit ={
+ e_Visit_v_IP_to_v_FQDN_DF.printSchema()
+ e_Visit_v_IP_to_v_FQDN_DF.foreachPartition(iter => {
+ val e_Visit_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_VISIT_V_IP_TO_V_FQDN")
+ val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+
+ if (e_Visit_Fqdn_to_IP_Coll.documentExists(ip+"-"+fqdn)) {
+ val document: BaseEdgeDocument = e_Visit_Fqdn_to_IP_Coll.getDocument(ip+"-"+fqdn, classOf[BaseEdgeDocument])
+ val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", e_Last)
+ document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
+ baseDocument.setKey(ip+"-"+fqdn)
+ baseDocument.setFrom(s"V_IP/$ip")
+ baseDocument.setTo(s"V_FQDN/$fqdn")
+ baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
+ baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
+ docs_Insert.add(baseDocument)
+ }
+ i+=1
+ })
+ Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
+ })
+
+
+ }
+
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByArangoSpark.scala b/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByArangoSpark.scala
new file mode 100644
index 0000000..721d07f
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByArangoSpark.scala
@@ -0,0 +1,237 @@
+package cn.ac.iie.dao
+
+import cn.ac.iie.etl.CursorTransform
+import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
+import cn.ac.iie.test.Config
+import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import scala.util.Try
+
+object UpdateArangoGraphByArangoSpark {
+ /**
+ * 更新FQDN点
+ */
+ def updateFQDNVertex(v_FQDN_DF:DataFrame,v_FQDN_Cursor_DF: DataFrame): Unit ={
+ v_FQDN_DF.printSchema()
+ v_FQDN_Cursor_DF.printSchema()
+
+ val v_Fqdn_Join_Df = v_FQDN_DF
+ .join(v_FQDN_Cursor_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Cursor_DF("key"),"fullouter")
+ v_Fqdn_Join_Df.printSchema()
+
+ v_Fqdn_Join_Df.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val v_FQDN_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
+ val docs_Replace = new java.util.ArrayList[BaseVertexFqdn]()
+ val docs_Insert = new java.util.ArrayList[BaseVertexFqdn]()
+
+ iter.foreach(row => {
+ val new_fqdn_name = row.getAs[String]("new_fqdn_name")
+ val new_fqdn_first_found_time = row.getAs[Long]("new_fqdn_first_found_time")
+ val new_fqdn_last_found_time = row.getAs[Long]("new_fqdn_last_found_time")
+ val new_fqdn_count_total = row.getAs[Long]("new_fqdn_count_total")
+
+ val fqdn = row.getAs[String]("key")
+ val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
+ val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
+
+ if (fqdn != null) {
+ val document: BaseVertexFqdn = new BaseVertexFqdn()
+ document.setKey(new_fqdn_name)
+ document.setFQDN_NAME(new_fqdn_name)
+ document.setFQDN_FIRST_FOUND_TIME(v_Fqdn_First)
+ document.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
+ document.setFQDN_COUNT_TOTAL(v_Fqdn_Cnt+new_fqdn_count_total)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument: BaseVertexFqdn = new BaseVertexFqdn()
+ baseDocument.setKey(new_fqdn_name)
+ baseDocument.setFQDN_NAME(new_fqdn_name)
+ baseDocument.setFQDN_FIRST_FOUND_TIME(new_fqdn_first_found_time)
+ baseDocument.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
+ baseDocument.setFQDN_COUNT_TOTAL(new_fqdn_count_total)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(v_FQDN_Coll.replaceDocuments(docs_Replace))
+ Try(v_FQDN_Coll.importDocuments(docs_Insert))
+ })
+
+ }
+
+ /**
+ * 更新IP点
+ */
+ def updateIPVertex(v_IP_DF:DataFrame,v_IP_Cursor_DF: DataFrame): Unit ={
+ v_IP_DF.printSchema()
+
+ v_IP_Cursor_DF.printSchema()
+
+ val v_IP_Join_DF = v_IP_DF.join(v_IP_Cursor_DF,v_IP_DF("new_ip")===v_IP_Cursor_DF("key"),"fullouter")
+ v_IP_Join_DF.printSchema()
+
+ v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val v_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
+ val docs_Insert: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
+ val docs_Replace: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
+
+ iter.foreach(row => {
+ val new_Ip = row.getAs[String]("new_ip")
+ val new_Location = row.getAs[String]("new_location")
+ val new_Ip_First_Found_Time = row.getAs[Long]("new_ip_first_found_time")
+ val new_Ip_Last_Found_Time = row.getAs[Long]("new_ip_last_found_time")
+ val new_Ip_Count_Total = row.getAs[Long]("new_ip_count_total")
+
+ val key = row.getAs[String]("key")
+ val location = row.getAs[String]("IP_LOCATION")
+ val v_IP_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val v_IP_Cnt = row.getAs[Long]("IP_APPEAR_COUNT")
+
+
+ if (key != null) {
+ val document = new BaseVertexIP()
+ document.setKey(key)
+ document.setIP(key)
+ document.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
+ document.setIP_APPEAR_COUNT(v_IP_Cnt+new_Ip_Count_Total)
+ document.setFIRST_FOUND_TIME(v_IP_First)
+ document.setIP_LOCATION(location)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument = new BaseVertexIP()
+ baseDocument.setKey(new_Ip)
+ baseDocument.setIP(new_Ip)
+ baseDocument.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
+ baseDocument.setIP_APPEAR_COUNT(new_Ip_Count_Total)
+ baseDocument.setFIRST_FOUND_TIME(new_Ip_First_Found_Time)
+ baseDocument.setIP_LOCATION(new_Location)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(v_IP_Coll.importDocuments(docs_Insert))
+ Try(v_IP_Coll.updateDocuments(docs_Replace))
+ })
+
+ }
+
+ /**
+ * 统计e_Address_Fqdn_to_IP
+ */
+ def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame,e_Fqdn_Address_IP_Cursor_DF: DataFrame): Unit ={
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ e_Fqdn_Address_IP_Cursor_DF.printSchema()
+
+ e_Fqdn_Address_IP_Cursor_DF.printSchema()
+
+ val e_Address_v_FQDN_to_v_IP_Join_DF = e_Address_v_FQDN_to_v_IP_DF
+ .join(e_Fqdn_Address_IP_Cursor_DF,
+ e_Address_v_FQDN_to_v_IP_DF("new_key")===e_Fqdn_Address_IP_Cursor_DF("key"),
+ "fullouter")
+
+ e_Address_v_FQDN_to_v_IP_Join_DF.printSchema()
+
+ e_Address_v_FQDN_to_v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val e_Add_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
+ val docs_Insert: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
+ val docs_Replace: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
+ iter.foreach(row => {
+ val new_Fqdn = row.getAs[String]("new_fqdn")
+ val new_IP = row.getAs[String]("new_ip")
+ val new_Key = row.getAs[String]("new_key")
+ val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
+ val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
+ val new_Count_Total = row.getAs[Long]("new_count_total")
+
+ val from = row.getAs[String]("from")
+ val to = row.getAs[String]("to")
+ val key = row.getAs[String]("key")
+ val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
+
+ if (key != null) {
+ val document = new BaseEgdeFqdnAddressIP()
+ document.setKey(key)
+ document.setFrom(from)
+ document.setTo(to)
+ document.setLAST_FOUND_TIME(new_Last_Found_Time)
+ document.setFIRST_FOUND_TIME(e_First_Time)
+ document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument: BaseEgdeFqdnAddressIP = new BaseEgdeFqdnAddressIP()
+ baseDocument.setKey(new_Key)
+ baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
+ baseDocument.setTo(s"V_IP/$new_IP")
+ baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
+ baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
+ baseDocument.setCOUNT_TOTAL(new_Count_Total)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Add_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
+ })
+
+ }
+
+
+ /**
+ * 统计e_Visit_v_IP_to_v_FQDN
+ */
+ def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame,e_IP_Visit_FQDN_Cursor_DF: DataFrame): Unit = {
+ e_Visit_v_IP_to_v_FQDN_DF.printSchema()
+ e_IP_Visit_FQDN_Cursor_DF.printSchema()
+
+ e_IP_Visit_FQDN_Cursor_DF.printSchema()
+
+ val e_Visit_v_IP_to_v_FQDN_Join_DF = e_Visit_v_IP_to_v_FQDN_DF
+ .join(e_IP_Visit_FQDN_Cursor_DF, e_Visit_v_IP_to_v_FQDN_DF("new_key") === e_IP_Visit_FQDN_Cursor_DF("key"), "fullouter")
+
+ e_Visit_v_IP_to_v_FQDN_Join_DF.printSchema()
+
+ e_Visit_v_IP_to_v_FQDN_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val e_Visit_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
+ val docs_Insert: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
+ val docs_Replace: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
+ iter.foreach(row => {
+ val new_Fqdn = row.getAs[String]("new_fqdn")
+ val new_IP = row.getAs[String]("new_ip")
+ val new_Key = row.getAs[String]("new_key")
+ val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
+ val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
+ val new_Count_Total = row.getAs[Long]("new_count_total")
+
+ val to = row.getAs[String]("to")
+ val from = row.getAs[String]("from")
+ val key = row.getAs[String]("key")
+ val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
+
+ if (key != null) {
+ val document = new BaseEdgeIPVisitFqdn()
+ document.setKey(key)
+ document.setFrom(from)
+ document.setTo(to)
+ document.setLAST_FOUND_TIME(new_Last_Found_Time)
+ document.setFIRST_FOUND_TIME(e_First_Time)
+ document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument: BaseEdgeIPVisitFqdn = new BaseEdgeIPVisitFqdn()
+ baseDocument.setKey(new_Key)
+ baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
+ baseDocument.setTo(s"V_IP/$new_IP")
+ baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
+ baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
+ baseDocument.setCOUNT_TOTAL(new_Count_Total)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Visit_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
+ })
+
+ }
+
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByDF.scala b/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByDF.scala
new file mode 100644
index 0000000..20f5884
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/dao/UpdateArangoGraphByDF.scala
@@ -0,0 +1,250 @@
+package cn.ac.iie.dao
+
+import cn.ac.iie.etl.CursorTransform
+import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
+import cn.ac.iie.test.Config
+import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import scala.util.Try
+
+object UpdateArangoGraphByDF {
+
+
+ /**
+ * 更新FQDN点
+ * @param v_FQDN_DF //读取clickhouse结果集
+ * @param spark //sparkSession引擎
+ */
+ def updateFQDNVertex(v_FQDN_DF:DataFrame,spark:SparkSession): Unit ={
+ v_FQDN_DF.printSchema()
+
+ val v_FQDN_Cursor_DF = CursorTransform.cursorToDataFrame("V_FQDN",classOf[BaseVertexFqdn],spark)
+
+ val v_Fqdn_Join_Df = v_FQDN_DF
+ .join(v_FQDN_Cursor_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Cursor_DF("key"),"fullouter")
+ v_Fqdn_Join_Df.printSchema()
+
+ v_Fqdn_Join_Df.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val v_FQDN_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
+ val docs_Replace = new java.util.ArrayList[BaseVertexFqdn]()
+ val docs_Insert = new java.util.ArrayList[BaseVertexFqdn]()
+
+ iter.foreach(row => {
+ val new_fqdn_name = row.getAs[String]("new_fqdn_name")
+ val new_fqdn_first_found_time = row.getAs[Long]("new_fqdn_first_found_time")
+ val new_fqdn_last_found_time = row.getAs[Long]("new_fqdn_last_found_time")
+ val new_fqdn_count_total = row.getAs[Long]("new_fqdn_count_total")
+
+ val fqdn = row.getAs[String]("key")
+ val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
+ val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
+
+ if (fqdn != null) {
+ val document: BaseVertexFqdn = new BaseVertexFqdn()
+ document.setKey(new_fqdn_name)
+ document.setFQDN_NAME(new_fqdn_name)
+ document.setFQDN_FIRST_FOUND_TIME(v_Fqdn_First)
+ document.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
+ document.setFQDN_COUNT_TOTAL(v_Fqdn_Cnt+new_fqdn_count_total)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument: BaseVertexFqdn = new BaseVertexFqdn()
+ baseDocument.setKey(new_fqdn_name)
+ baseDocument.setFQDN_NAME(new_fqdn_name)
+ baseDocument.setFQDN_FIRST_FOUND_TIME(new_fqdn_first_found_time)
+ baseDocument.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
+ baseDocument.setFQDN_COUNT_TOTAL(new_fqdn_count_total)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(v_FQDN_Coll.replaceDocuments(docs_Replace))
+ Try(v_FQDN_Coll.importDocuments(docs_Insert))
+ })
+
+ }
+
+ /**
+ * 更新IP点
+ * @param v_IP_DF //读取clickhouse结果集
+ * @param spark //sparkSession引擎
+ */
+ def updateIPVertex(v_IP_DF:DataFrame,spark:SparkSession): Unit ={
+ v_IP_DF.printSchema()
+
+ val v_IP_Cursor_DF = CursorTransform.cursorToDataFrame("V_IP",classOf[BaseVertexIP],spark)
+
+ val v_IP_Join_DF = v_IP_DF.join(v_IP_Cursor_DF,v_IP_DF("new_ip")===v_IP_Cursor_DF("key"),"fullouter")
+ v_IP_Join_DF.printSchema()
+
+ v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val v_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
+ val docs_Insert: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
+ val docs_Replace: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
+
+ iter.foreach(row => {
+ val new_Ip = row.getAs[String]("new_ip")
+ val new_Location = row.getAs[String]("new_location")
+ val new_Ip_First_Found_Time = row.getAs[Long]("new_ip_first_found_time")
+ val new_Ip_Last_Found_Time = row.getAs[Long]("new_ip_last_found_time")
+ val new_Ip_Count_Total = row.getAs[Long]("new_ip_count_total")
+
+ val key = row.getAs[String]("key")
+ val location = row.getAs[String]("IP_LOCATION")
+ val v_IP_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val v_IP_Cnt = row.getAs[Long]("IP_APPEAR_COUNT")
+
+
+ if (key != null) {
+ val document = new BaseVertexIP()
+ document.setKey(key)
+ document.setIP(key)
+ document.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
+ document.setIP_APPEAR_COUNT(v_IP_Cnt+new_Ip_Count_Total)
+ document.setFIRST_FOUND_TIME(v_IP_First)
+ document.setIP_LOCATION(location)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument = new BaseVertexIP()
+ baseDocument.setKey(new_Ip)
+ baseDocument.setIP(new_Ip)
+ baseDocument.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
+ baseDocument.setIP_APPEAR_COUNT(new_Ip_Count_Total)
+ baseDocument.setFIRST_FOUND_TIME(new_Ip_First_Found_Time)
+ baseDocument.setIP_LOCATION(new_Location)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(v_IP_Coll.importDocuments(docs_Insert))
+ Try(v_IP_Coll.updateDocuments(docs_Replace))
+ })
+
+ }
+
+ /**
+ * 统计e_Address_Fqdn_to_IP
+ * @param e_Address_v_FQDN_to_v_IP_DF //读取clickhouse结果集
+ * @param spark //sparkSession引擎
+ */
+ def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame,spark:SparkSession): Unit ={
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ val e_Fqdn_Address_IP_Cursor_DF = CursorTransform
+ .cursorToDataFrame("E_ADDRESS_V_FQDN_TO_V_IP",classOf[BaseEgdeFqdnAddressIP],spark)
+
+ e_Fqdn_Address_IP_Cursor_DF.printSchema()
+
+ val e_Address_v_FQDN_to_v_IP_Join_DF = e_Address_v_FQDN_to_v_IP_DF
+ .join(e_Fqdn_Address_IP_Cursor_DF,
+ e_Address_v_FQDN_to_v_IP_DF("new_key")===e_Fqdn_Address_IP_Cursor_DF("key"),
+ "fullouter")
+
+ e_Address_v_FQDN_to_v_IP_Join_DF.printSchema()
+
+ e_Address_v_FQDN_to_v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val e_Add_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
+ val docs_Insert: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
+ val docs_Replace: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
+ iter.foreach(row => {
+ val new_Fqdn = row.getAs[String]("new_fqdn")
+ val new_IP = row.getAs[String]("new_ip")
+ val new_Key = row.getAs[String]("new_key")
+ val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
+ val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
+ val new_Count_Total = row.getAs[Long]("new_count_total")
+
+ val from = row.getAs[String]("from")
+ val to = row.getAs[String]("to")
+ val key = row.getAs[String]("key")
+ val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
+
+ if (key != null) {
+ val document = new BaseEgdeFqdnAddressIP()
+ document.setKey(key)
+ document.setFrom(from)
+ document.setTo(to)
+ document.setLAST_FOUND_TIME(new_Last_Found_Time)
+ document.setFIRST_FOUND_TIME(e_First_Time)
+ document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument: BaseEgdeFqdnAddressIP = new BaseEgdeFqdnAddressIP()
+ baseDocument.setKey(new_Key)
+ baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
+ baseDocument.setTo(s"V_IP/$new_IP")
+ baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
+ baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
+ baseDocument.setCOUNT_TOTAL(new_Count_Total)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Add_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
+ })
+
+ }
+
+
+ /**
+ * 统计e_Visit_v_IP_to_v_FQDN
+ * @param e_Visit_v_IP_to_v_FQDN_DF //读取clickhouse结果集
+ * @param spark //sparkSession引擎
+ */
+ def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame,spark:SparkSession): Unit = {
+ e_Visit_v_IP_to_v_FQDN_DF.printSchema()
+ val e_IP_Visit_FQDN_Cursor_DF = CursorTransform
+ .cursorToDataFrame("E_VISIT_V_IP_TO_V_FQDN",classOf[BaseEdgeIPVisitFqdn],spark)
+
+ e_IP_Visit_FQDN_Cursor_DF.printSchema()
+
+ val e_Visit_v_IP_to_v_FQDN_Join_DF = e_Visit_v_IP_to_v_FQDN_DF
+ .join(e_IP_Visit_FQDN_Cursor_DF, e_Visit_v_IP_to_v_FQDN_DF("new_key") === e_IP_Visit_FQDN_Cursor_DF("key"), "fullouter")
+
+ e_Visit_v_IP_to_v_FQDN_Join_DF.printSchema()
+
+ e_Visit_v_IP_to_v_FQDN_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val e_Visit_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
+ val docs_Insert: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
+ val docs_Replace: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
+ iter.foreach(row => {
+ val new_Fqdn = row.getAs[String]("new_fqdn")
+ val new_IP = row.getAs[String]("new_ip")
+ val new_Key = row.getAs[String]("new_key")
+ val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
+ val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
+ val new_Count_Total = row.getAs[Long]("new_count_total")
+
+ val to = row.getAs[String]("to")
+ val from = row.getAs[String]("from")
+ val key = row.getAs[String]("key")
+ val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
+
+ if (key != null) {
+ val document = new BaseEdgeIPVisitFqdn()
+ document.setKey(key)
+ document.setFrom(from)
+ document.setTo(to)
+ document.setLAST_FOUND_TIME(new_Last_Found_Time)
+ document.setFIRST_FOUND_TIME(e_First_Time)
+ document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
+ docs_Replace.add(document)
+ } else {
+ val baseDocument: BaseEdgeIPVisitFqdn = new BaseEdgeIPVisitFqdn()
+ baseDocument.setKey(new_Key)
+ baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
+ baseDocument.setTo(s"V_IP/$new_IP")
+ baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
+ baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
+ baseDocument.setCOUNT_TOTAL(new_Count_Total)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Visit_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
+ })
+
+ }
+
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/etl/CursorTransform.scala b/ip-learning/src/main/scala/cn/ac/iie/etl/CursorTransform.scala
new file mode 100644
index 0000000..9d309f2
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/etl/CursorTransform.scala
@@ -0,0 +1,33 @@
+package cn.ac.iie.etl
+
+import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
+import com.arangodb.ArangoCursor
+import com.arangodb.entity.BaseDocument
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import scala.reflect.ClassTag
+
+object CursorTransform {
+
+ /**
+ * 将查询ArangoDB的结果集转换为DataFrame
+ * @param collection_Name //查询的collection
+ * @param class_Type //转换的pojo类对象
+ * @param spark / /sparkSession引擎
+ * @tparam T
+ * @return
+ */
+ def cursorToDataFrame[T:ClassTag](collection_Name:String,class_Type: Class[T],spark:SparkSession): DataFrame ={
+ val query = s"FOR doc IN $collection_Name RETURN doc"
+ println(query)
+ val cursor: ArangoCursor[T] = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
+ .query(query, InitArangoDBPool.bindVars, InitArangoDBPool.options, class_Type)
+
+ val cursor_DF = spark.createDataFrame(cursor.asListRemaining(),class_Type)
+ cursor_DF.printSchema()
+
+ cursor_DF
+
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/main/IPLearningApplication.scala b/ip-learning/src/main/scala/cn/ac/iie/main/IPLearningApplication.scala
new file mode 100644
index 0000000..39c5182
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/main/IPLearningApplication.scala
@@ -0,0 +1,29 @@
+package cn.ac.iie.main
+
+import cn.ac.iie.test.Config
+import cn.ac.iie.dao.BaseMediaDataLoad
+import org.apache.spark.sql.SparkSession
+import org.slf4j.{Logger, LoggerFactory}
+
+object IPLearningApplication {
+ private val logger: Logger = LoggerFactory.getLogger(IPLearningApplication.getClass)
+
+ def main(args: Array[String]): Unit = {
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ // .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .master(Config.MASTER)
+ .getOrCreate()
+
+ logger.warn("sparkession获取成功!!!")
+ BaseMediaDataLoad.loadMediaDate(spark)
+// BaseMediaDataLoad.
+
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEdgeIPVisitFqdn.scala b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEdgeIPVisitFqdn.scala
new file mode 100644
index 0000000..6bb4298
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEdgeIPVisitFqdn.scala
@@ -0,0 +1,34 @@
+package cn.ac.iie.pojo
+
+import com.arangodb.entity.DocumentField
+import com.arangodb.entity.DocumentField.Type
+
+import scala.beans.BeanProperty
+
+class BaseEdgeIPVisitFqdn {
+ @BeanProperty
+ @DocumentField(Type.FROM)
+ var from: String=""
+
+ @BeanProperty
+ @DocumentField(Type.TO)
+ var to: String=""
+
+ @BeanProperty
+ @DocumentField(Type.KEY)
+ var key: String=""
+
+ @BeanProperty
+ @DocumentField(Type.ID)
+ var id: String=""
+
+ @BeanProperty
+ var FIRST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var LAST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var COUNT_TOTAL:Long = 0
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEgdeFqdnAddressIP.scala b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEgdeFqdnAddressIP.scala
new file mode 100644
index 0000000..89424b1
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseEgdeFqdnAddressIP.scala
@@ -0,0 +1,34 @@
+package cn.ac.iie.pojo
+
+import com.arangodb.entity.DocumentField
+import com.arangodb.entity.DocumentField.Type
+
+import scala.beans.BeanProperty
+
+class BaseEgdeFqdnAddressIP {
+ @BeanProperty
+ @DocumentField(Type.FROM)
+ var from: String=""
+
+ @BeanProperty
+ @DocumentField(Type.TO)
+ var to: String=""
+
+ @BeanProperty
+ @DocumentField(Type.KEY)
+ var key: String=""
+
+ @BeanProperty
+ @DocumentField(Type.ID)
+ var id: String=""
+
+ @BeanProperty
+ var FIRST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var LAST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var COUNT_TOTAL:Long = 0
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexFqdn.scala b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexFqdn.scala
new file mode 100644
index 0000000..eb7983c
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexFqdn.scala
@@ -0,0 +1,30 @@
+package cn.ac.iie.pojo
+
+import com.arangodb.entity.DocumentField
+import com.arangodb.entity.DocumentField.Type
+
+import scala.beans.BeanProperty
+
+class BaseVertexFqdn {
+
+ @BeanProperty
+ @DocumentField(Type.KEY)
+ var key: String=""
+
+ @BeanProperty
+ @DocumentField(Type.ID)
+ var id: String=""
+
+ @BeanProperty
+ var FQDN_FIRST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var FQDN_LAST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var FQDN_COUNT_TOTAL:Long = 0
+
+ @BeanProperty
+ var FQDN_NAME:String = ""
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexIP.scala b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexIP.scala
new file mode 100644
index 0000000..037b632
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/pojo/BaseVertexIP.scala
@@ -0,0 +1,32 @@
+package cn.ac.iie.pojo
+
+import com.arangodb.entity.DocumentField
+import com.arangodb.entity.DocumentField.Type
+
+import scala.beans.BeanProperty
+
+class BaseVertexIP {
+ @BeanProperty
+ @DocumentField(Type.KEY)
+ var key: String=""
+
+ @BeanProperty
+ @DocumentField(Type.ID)
+ var id: String=""
+
+ @BeanProperty
+ var FIRST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var LAST_FOUND_TIME:Long = 0
+
+ @BeanProperty
+ var IP_APPEAR_COUNT:Long = 0
+
+ @BeanProperty
+ var IP:String = ""
+
+ @BeanProperty
+ var IP_LOCATION:String = ""
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDBSparkTest.scala b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDBSparkTest.scala
new file mode 100644
index 0000000..f520f3c
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDBSparkTest.scala
@@ -0,0 +1,52 @@
+package cn.ac.iie.test
+
+import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByArangoSpark}
+import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
+import cn.ac.iie.utils.ConfigUtils
+import com.arangodb.spark.rdd.ArangoRDD
+import com.arangodb.spark.{ArangoSpark, ReadOptions}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+object ArangoDBSparkTest {
+ def main(args: Array[String]): Unit = {
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName(ConfigUtils.SPARK_APP_NAME)
+ .config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
+ .config("spark.network.timeout", ConfigUtils.SPARK_NETWORK_TIMEOUT)
+ .config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", ConfigUtils.SPARK_EXECUTOR_MEMORY)
+ .config("arangodb.hosts", "192.168.40.127:8529")
+ .config("arangodb.user", ConfigUtils.ARANGODB_USER)
+ .config("arangodb.password", ConfigUtils.ARANGODB_PASSWORD)
+ .config("arangodb.maxConnections",ConfigUtils.MAXPOOLSIZE)
+ .master(ConfigUtils.MASTER)
+ .getOrCreate()
+
+ BaseMediaDataLoad.loadMediaDate(spark)
+ val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
+ val v_IP_DF = BaseMediaDataLoad.getIPVertexFromMedia(spark)
+ val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
+ val e_Visit_v_IP_to_v_FQDN_DF= BaseMediaDataLoad.getIPVisitFQDNEdgeFromMedia(spark)
+
+ val v_FQDN_Cursor_Rdd: ArangoRDD[BaseVertexFqdn] = ArangoSpark.load[BaseVertexFqdn](spark.sparkContext, "V_FQDN", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
+ val v_FQDN_Cursor_DF: DataFrame = spark.createDataFrame(v_FQDN_Cursor_Rdd,classOf[BaseVertexFqdn])
+
+ val v_IP_Cursor_Rdd: ArangoRDD[BaseVertexIP] = ArangoSpark.load[BaseVertexIP](spark.sparkContext, "V_IP", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
+ val v_IP_Cursor_DF: DataFrame = spark.createDataFrame(v_IP_Cursor_Rdd,classOf[BaseVertexIP])
+
+ val e_Fqdn_Address_IP_Cursor_Rdd: ArangoRDD[BaseEgdeFqdnAddressIP] = ArangoSpark.load[BaseEgdeFqdnAddressIP](spark.sparkContext, "E_ADDRESS_V_FQDN_TO_V_IP", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
+ val e_Fqdn_Address_IP_Cursor_DF: DataFrame = spark.createDataFrame(e_Fqdn_Address_IP_Cursor_Rdd,classOf[BaseEgdeFqdnAddressIP])
+
+ val e_IP_Visit_FQDN_Cursor_Rdd: ArangoRDD[BaseEdgeIPVisitFqdn] = ArangoSpark.load[BaseEdgeIPVisitFqdn](spark.sparkContext, "E_VISIT_V_IP_TO_V_FQDN", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
+ val e_IP_Visit_FQDN_Cursor_DF: DataFrame = spark.createDataFrame(e_IP_Visit_FQDN_Cursor_Rdd,classOf[BaseEdgeIPVisitFqdn])
+
+ UpdateArangoGraphByArangoSpark.updateFQDNVertex(v_FQDN_DF,v_FQDN_Cursor_DF)
+ UpdateArangoGraphByArangoSpark.updateIPVertex(v_IP_DF,v_IP_Cursor_DF)
+ UpdateArangoGraphByArangoSpark.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,e_Fqdn_Address_IP_Cursor_DF)
+ UpdateArangoGraphByArangoSpark.updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF,e_IP_Visit_FQDN_Cursor_DF)
+ spark.close()
+ System.exit(1)
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbReadV_IPTest.scala b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbReadV_IPTest.scala
new file mode 100644
index 0000000..e570e58
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbReadV_IPTest.scala
@@ -0,0 +1,37 @@
+package cn.ac.iie.test
+
+import com.arangodb.entity.BaseDocument
+import com.arangodb.model.AqlQueryOptions
+import com.arangodb.util.MapBuilder
+import com.arangodb.{ArangoCursor, ArangoDB}
+
+object ArangoDbReadV_IPTest {
+ @transient
+ var arangoDB: ArangoDB = _
+
+ def main(args: Array[String]): Unit = {
+ arangoDB = new ArangoDB.Builder()
+ .maxConnections(Config.MAXPOOLSIZE)
+ .host("192.168.40.127", 8529)
+ .user("root")
+ .password("111111")
+ .build
+ val bindVars = new MapBuilder().get
+ val options = new AqlQueryOptions()
+ .ttl(Config.ARANGODB_TTL)
+ val v_IP_Mutabal_Map = new java.util.HashMap[String,BaseDocument](16048576,0.9f)
+ val v_IP_Query = "FOR doc IN V_IP RETURN doc"
+ val v_IP_Cursor: ArangoCursor[BaseDocument] = arangoDB.db("insert_iplearn_index")
+ .query(v_IP_Query, bindVars, options, classOf[BaseDocument])
+
+ while (v_IP_Cursor.hasNext){
+ val document = v_IP_Cursor.next()
+ v_IP_Mutabal_Map.put(document.getKey ,document)
+ }
+
+ println(v_IP_Mutabal_Map.size())
+ arangoDB.shutdown()
+
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTest.scala b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTest.scala
new file mode 100644
index 0000000..e80b02b
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTest.scala
@@ -0,0 +1,314 @@
+package cn.ac.iie.test
+
+import com.arangodb.ArangoDB
+import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.util.Try
+
+object ArangoDbTest {
+ private val logger: Logger = LoggerFactory.getLogger(ReadClickhouseTest.getClass)
+
+ @transient
+ var arangoDB: ArangoDB = _
+
+ def main(args: Array[String]): Unit = {
+ // val warehouseLocation = new File("spark-warehouse").getAbsolutePath
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", Config.SPARK_SERIALIZER)
+ // .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .master(Config.MASTER)
+
+ /*
+ .config("spark.driver.host", "192.168.41.79")
+ .config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
+ .master("spark://192.168.40.119:7077")
+ */
+ .getOrCreate()
+ logger.warn("sparkession获取成功!!!")
+
+ // val sql = "(select * from av_miner.media_expire_patch_local limit 1000)"
+
+ val mediaDataFrame: DataFrame = spark.read.format("jdbc")
+ .option("url", "jdbc:clickhouse://192.168.40.193:8123")
+ // .option("dbtable", "av_miner.media_expire_patch")
+ // .option("dbtable", "(select * from av_miner.media_expire_patch limit 10)")
+ // .option("dbtable","(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where media_domain not LIKE '%\\n%')")
+ .option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
+ // .option("dbtable","(select media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region,min(recv_time) as min_recv_time,max(recv_time) as max_recv_time from av_miner.media_expire_patch group by media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region limit 10)")
+ .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
+ .option("user", "default")
+ .option("password", "111111")
+ .option("numPartitions", Config.NUMPARTITIONS)
+ .option("partitionColumn", "recv_time")
+ .option("lowerBound", Config.MINTIME)
+ .option("upperBound", Config.MAXTIME)
+ .option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
+ .load()
+ // mediaDataFrame.printSchema()
+ mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
+ // val mediaDataGlobalView = spark.sql("select * from global_temp.media_expire_patch limit 10")
+ // mediaDataGlobalView.show()
+
+ val v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS FQDN_NAME,
+ | MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
+ | COUNT( * ) AS FQDN_COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | media_domain != ''
+ |GROUP BY
+ | media_domain
+ """.stripMargin
+ )
+ val s_IP_DF = spark.sql(
+ """
+ select
+ s1_s_ip as IP,
+ s1_s_location_region as location,
+ MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
+ MAX( recv_time ) AS IP_LAST_FOUND_TIME,
+ COUNT( * ) AS IP_COUNT_TOTAL
+ from global_temp.media_expire_patch
+ GROUP BY
+ IP,
+ location
+ """.stripMargin)
+
+ val d_IP_DF = spark.sql(
+ """
+ select
+ s1_d_ip as IP,
+ s1_d_location_region as location,
+ MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
+ MAX( recv_time ) AS IP_LAST_FOUND_TIME,
+ COUNT( * ) AS IP_COUNT_TOTAL
+ from global_temp.media_expire_patch
+ GROUP BY
+ IP,
+ location
+ """.stripMargin)
+ import org.apache.spark.sql.functions._
+ val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP", "location").agg(
+ min("IP_FIRST_FOUND_TIME").as("IP_FIRST_FOUND_TIME"),
+ max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
+ count("IP").as("IP_COUNT_TOTAL")
+ )
+
+ val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS V_FQDN,
+ | s1_d_ip AS V_IP,
+ | MIN( recv_time ) AS FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS LAST_FOUND_TIME,
+ | COUNT( * ) AS COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( media_domain != '' )
+ | AND ( s1_d_ip != '' )
+ |GROUP BY
+ | s1_d_ip,
+ | media_domain
+ """.stripMargin)
+
+ val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | s1_s_ip AS V_IP,
+ | media_domain AS V_FQDN,
+ | MIN( recv_time ) AS FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS LAST_FOUND_TIME,
+ | COUNT( * ) AS COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( s1_s_ip != '' )
+ | AND ( media_domain != '' )
+ |GROUP BY
+ | s1_s_ip,
+ | media_domain
+ """.stripMargin)
+
+
+ /**
+ * 获取数据库连接
+ */
+ arangoDB = new ArangoDB.Builder()
+ .maxConnections(Config.MAXPOOLSIZE)
+ .host("192.168.40.127", 8529)
+ .user("root")
+ .password("111111")
+ .build
+
+
+ /**
+ * 更新FQDN点
+ */
+ v_FQDN_DF.printSchema()
+ v_FQDN_DF.foreachPartition(iter => {
+ val v_FQDN_Coll = arangoDB.db("insert_iplearn_index").collection("V_FQDN")
+ val docs_Insert = new java.util.ArrayList[BaseDocument]()
+ val docs_Update = new java.util.ArrayList[BaseDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("FQDN_NAME")
+ val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
+ val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
+ val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
+
+ if (v_FQDN_Coll.documentExists(fqdn)) {
+ val document: BaseDocument = v_FQDN_Coll.getDocument(fqdn, classOf[BaseDocument])
+ val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
+ document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseDocument = new BaseDocument()
+ baseDocument.setKey(fqdn)
+ baseDocument.addAttribute("FQDN_NAME", fqdn)
+ baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
+ baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
+ baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
+ docs_Insert.add(baseDocument)
+ }
+ i+=1
+ })
+ Try(v_FQDN_Coll.importDocuments(docs_Insert))
+ Try(v_FQDN_Coll.updateDocuments(docs_Update))
+ })
+
+
+ /**
+ * 更新IP点
+ */
+ v_IP_DF.printSchema()
+ v_IP_DF.foreachPartition(iter => {
+ val v_IP_Coll = arangoDB.db("insert_iplearn_index").collection("V_IP")
+ val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
+ val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
+ var i = 0
+
+ iter.foreach(row => {
+ val ip = row.getAs[String]("IP")
+ val location = row.getAs[String]("location")
+ val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
+ val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
+ val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
+
+ if (v_IP_Coll.documentExists(ip)) {
+ val document: BaseDocument = v_IP_Coll.getDocument(ip, classOf[BaseDocument])
+ val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
+ document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseDocument = new BaseDocument()
+ baseDocument.setKey(ip)
+ baseDocument.addAttribute("IP", ip)
+ baseDocument.addAttribute("IP_LOCATION", location)
+ baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
+ baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
+ docs_Insert.add(baseDocument)
+ }
+ i+=1
+ })
+ Try(v_IP_Coll.importDocuments(docs_Insert))
+ Try(v_IP_Coll.updateDocuments(docs_Update))
+ })
+
+ /**
+ * 统计e_Address_Fqdn_to_IP
+ */
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ e_Address_v_FQDN_to_v_IP_DF.foreachPartition(iter => {
+ val e_Add_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_ADDRESS_V_FQDN_TO_V_IP")
+ val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+
+ if (e_Add_Fqdn_to_IP_Coll.documentExists(fqdn+"-"+ip)) {
+ val document: BaseEdgeDocument = e_Add_Fqdn_to_IP_Coll.getDocument(fqdn+"-"+ip, classOf[BaseEdgeDocument])
+ val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", e_Last)
+ document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
+ baseDocument.setKey(fqdn+"-"+ip)
+ baseDocument.setFrom(s"V_FQDN/$fqdn")
+ baseDocument.setTo(s"V_IP/$ip")
+ baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
+ baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
+ docs_Insert.add(baseDocument)
+ }
+// println(fqdn+"-"+ip)
+
+ })
+ Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
+ })
+
+ /**
+ * 统计e_Visit_v_IP_to_v_FQDN
+ */
+ e_Visit_v_IP_to_v_FQDN_DF.printSchema()
+ e_Visit_v_IP_to_v_FQDN_DF.foreachPartition(iter => {
+ val e_Visit_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_VISIT_V_IP_TO_V_FQDN")
+ val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+
+ if (e_Visit_Fqdn_to_IP_Coll.documentExists(ip+"-"+fqdn)) {
+ val document: BaseEdgeDocument = e_Visit_Fqdn_to_IP_Coll.getDocument(ip+"-"+fqdn, classOf[BaseEdgeDocument])
+ val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", e_Last)
+ document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
+ baseDocument.setKey(ip+"-"+fqdn)
+ baseDocument.setFrom(s"V_IP/$ip")
+ baseDocument.setTo(s"V_FQDN/$fqdn")
+ baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
+ baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
+ docs_Insert.add(baseDocument)
+ }
+ i+=1
+ })
+ Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
+
+ })
+
+ arangoDB.shutdown()
+ }
+
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemory.scala b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemory.scala
new file mode 100644
index 0000000..7267619
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemory.scala
@@ -0,0 +1,355 @@
+package cn.ac.iie.test
+
+import cn.ac.iie.utils.ConfigUtils
+import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
+import com.arangodb.model.AqlQueryOptions
+import com.arangodb.util.MapBuilder
+import com.arangodb.{ArangoCursor, ArangoDB}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.util.Try
+
+object ArangoDbTestMemory {
+ private val logger: Logger = LoggerFactory.getLogger(ArangoDbTestMemory.getClass)
+ @transient
+ var arangoDB: ArangoDB = _
+
+
+ def main(args: Array[String]): Unit = {
+
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", Config.SPARK_SERIALIZER)
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .master(Config.MASTER)
+ .getOrCreate()
+ logger.warn("sparkession获取成功!!!")
+
+ val mediaDataFrame: DataFrame = spark.read.format("jdbc")
+ .option("url", "jdbc:clickhouse://192.168.40.193:8123")
+ .option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
+ .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
+ .option("user", "default")
+ .option("password", "111111")
+ .option("numPartitions", Config.NUMPARTITIONS)
+ .option("partitionColumn", "recv_time")
+ .option("lowerBound", Config.MINTIME)
+ .option("upperBound", Config.MAXTIME)
+ .option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
+ .option("socket_timeout",Config.CLICKHOUSE_SOCKET_TIMEOUT)
+ .load()
+ mediaDataFrame.printSchema()
+ mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
+
+ val v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS FQDN_NAME,
+ | MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
+ | COUNT( * ) AS FQDN_COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | media_domain != ''
+ |GROUP BY
+ | media_domain
+ """.stripMargin
+ )
+ val s_IP_DF = spark.sql(
+ """
+ select
+ s1_s_ip as IP,
+ s1_s_location_region as location,
+ MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
+ MAX( recv_time ) AS IP_LAST_FOUND_TIME,
+ COUNT( * ) AS IP_COUNT_TOTAL
+ from global_temp.media_expire_patch
+ GROUP BY
+ IP,
+ location
+ """.stripMargin)
+
+ val d_IP_DF = spark.sql(
+ """
+ select
+ s1_d_ip as IP,
+ s1_d_location_region as location,
+ MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
+ MAX( recv_time ) AS IP_LAST_FOUND_TIME,
+ COUNT( * ) AS IP_COUNT_TOTAL
+ from global_temp.media_expire_patch
+ GROUP BY
+ IP,
+ location
+ """.stripMargin)
+ import org.apache.spark.sql.functions._
+ val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP", "location").agg(
+ min("IP_FIRST_FOUND_TIME").as("IP_FIRST_FOUND_TIME"),
+ max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
+ count("IP").as("IP_COUNT_TOTAL")
+ )
+
+ val e_Address_v_FQDN_to_v_IP_DF: DataFrame = spark.sql(
+ """
+ |SELECT
+ | media_domain AS V_FQDN,
+ | s1_d_ip AS V_IP,
+ | MIN( recv_time ) AS FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS LAST_FOUND_TIME,
+ | COUNT( * ) AS COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( media_domain != '' )
+ | AND ( s1_d_ip != '' )
+ |GROUP BY
+ | s1_d_ip,
+ | media_domain
+ """.stripMargin)
+
+ val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | s1_s_ip AS V_IP,
+ | media_domain AS V_FQDN,
+ | MIN( recv_time ) AS FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS LAST_FOUND_TIME,
+ | COUNT( * ) AS COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( s1_s_ip != '' )
+ | AND ( media_domain != '' )
+ |GROUP BY
+ | s1_s_ip,
+ | media_domain
+ """.stripMargin)
+
+
+ /**
+ * 获取数据库连接
+ */
+ arangoDB = new ArangoDB.Builder()
+ .maxConnections(Config.MAXPOOLSIZE)
+ .host(ConfigUtils.ARANGODB_HOST, ConfigUtils.ARANGODB_PORT)
+ .user(ConfigUtils.ARANGODB_USER)
+ .password(ConfigUtils.ARANGODB_PASSWORD)
+ .build
+ val bindVars = new MapBuilder().get
+ val options = new AqlQueryOptions()
+ .ttl(Config.ARANGODB_TTL)
+
+ val v_FQDN_Mutabal_Map = new java.util.HashMap[String,BaseDocument](1048576,0.9f)
+ val v_IP_Mutabal_Map = new java.util.HashMap[String,BaseDocument](16048576,0.9f)
+ val e_FQDN_Address_IP_Mutabal_Map = new java.util.HashMap[String,BaseEdgeDocument](1048576,0.9f)
+ val e_IP_Visit_FQDN_Mutabal_Map = new java.util.HashMap[String,BaseEdgeDocument](30408576,0.9f)
+
+ /**
+ * 更新FQDN点
+ */
+ val v_FQDN_Query = "FOR doc IN V_FQDN RETURN doc"
+ val v_FQDN_Cursor: ArangoCursor[BaseDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
+ .query(v_FQDN_Query, bindVars, options, classOf[BaseDocument])
+ while (v_FQDN_Cursor.hasNext){
+ val document = v_FQDN_Cursor.next()
+ v_FQDN_Mutabal_Map.put(document.getKey,document)
+ }
+ val v_FQDN_Map= spark.sparkContext.broadcast(v_FQDN_Mutabal_Map)
+ v_FQDN_Mutabal_Map.clear()
+ v_FQDN_DF.show(20)
+ v_FQDN_DF.printSchema()
+ v_FQDN_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val v_FQDN_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
+ val docs_Insert = new java.util.ArrayList[BaseDocument]()
+ val docs_Update = new java.util.ArrayList[BaseDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("FQDN_NAME")
+ val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
+ val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
+ val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
+
+ val doc = v_FQDN_Map.value.getOrDefault(fqdn, null)
+ if (doc != null) {
+ val document: BaseDocument = doc
+ val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
+ document.addAttribute("LAST_FOUND_TIME", v_Fqdn_Last)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseDocument = new BaseDocument()
+ baseDocument.setKey(fqdn)
+ baseDocument.addAttribute("FQDN_NAME", fqdn)
+ baseDocument.addAttribute("FIRST_FOUND_TIME", v_Fqdn_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME", v_Fqdn_Last)
+ baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
+ docs_Insert.add(baseDocument)
+ }
+ })
+// Try(v_FQDN_Coll.importDocuments(docs_Insert))
+ v_FQDN_Coll.importDocuments(docs_Insert)
+ Try(v_FQDN_Coll.updateDocuments(docs_Update))
+ })
+ v_FQDN_Map.destroy()
+
+ /**
+ * 更新IP点
+ */
+ val v_IP_Query = "FOR doc IN V_IP RETURN doc"
+ val v_IP_Cursor: ArangoCursor[BaseDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
+ .query(v_IP_Query, bindVars, options, classOf[BaseDocument])
+ while (v_IP_Cursor.hasNext){
+ val document = v_IP_Cursor.next()
+ v_IP_Mutabal_Map.put(document.getKey ,document)
+ }
+ val v_IP_Map = spark.sparkContext.broadcast(v_IP_Mutabal_Map)
+// val v_IP_Map = v_IP_Mutabal_Map.toMap
+ v_IP_Mutabal_Map.clear()
+ v_IP_DF.printSchema()
+ v_IP_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val v_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
+ val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
+ val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
+ var i = 0
+
+ iter.foreach(row => {
+ val ip = row.getAs[String]("IP")
+ val location = row.getAs[String]("location")
+ val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
+ val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
+ val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
+ val doc = v_IP_Map.value.getOrDefault(ip, null)
+
+ if (doc != null) {
+ val document: BaseDocument = doc
+ val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
+ document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseDocument = new BaseDocument()
+ baseDocument.setKey(ip)
+ baseDocument.addAttribute("IP", ip)
+ baseDocument.addAttribute("IP_LOCATION", location)
+ baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
+ baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(v_IP_Coll.importDocuments(docs_Insert))
+ Try(v_IP_Coll.updateDocuments(docs_Update))
+ })
+ v_IP_Map.destroy()
+
+
+ /**
+ * 统计e_Address_Fqdn_to_IP
+ */
+ val e_FQDN_Address_IP_Query = "FOR doc IN E_ADDRESS_V_FQDN_TO_V_IP RETURN doc"
+ val e_FQDN_Address_IP_Cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
+ .query(e_FQDN_Address_IP_Query, bindVars, options, classOf[BaseEdgeDocument])
+ while (e_FQDN_Address_IP_Cursor.hasNext){
+ val document = e_FQDN_Address_IP_Cursor.next()
+ e_FQDN_Address_IP_Mutabal_Map.put(document.getKey ,document)
+ }
+ val e_FQDN_Address_IP_Map = spark.sparkContext.broadcast(e_FQDN_Address_IP_Mutabal_Map)
+ e_FQDN_Address_IP_Mutabal_Map.clear()
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ e_Address_v_FQDN_to_v_IP_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val e_Add_Fqdn_to_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
+ val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+ val doc = e_FQDN_Address_IP_Map.value.getOrDefault(fqdn+"-"+ip, null)
+ if (doc != null) {
+ val document: BaseEdgeDocument = doc
+ val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.setFrom(s"V_FQDN/$fqdn")
+ document.setTo(s"V_IP/$ip")
+ document.addAttribute("LAST_FOUND_TIME", e_Last)
+ document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
+ baseDocument.setKey(fqdn+"-"+ip)
+ baseDocument.setFrom(s"V_FQDN/$fqdn")
+ baseDocument.setTo(s"V_IP/$ip")
+ baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
+ baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
+ })
+ e_FQDN_Address_IP_Map.destroy()
+
+
+ /**
+ * 统计e_Visit_v_IP_to_v_FQDN
+ */
+ val e_IP_Visit_FQDN_Query = "FOR doc IN E_VISIT_V_IP_TO_V_FQDN RETURN doc"
+ val e_IP_Visit_FQDN_Cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
+ .query(e_IP_Visit_FQDN_Query, bindVars, options, classOf[BaseEdgeDocument])
+ while (e_IP_Visit_FQDN_Cursor.hasNext){
+ val document = e_IP_Visit_FQDN_Cursor.next()
+ e_IP_Visit_FQDN_Mutabal_Map.put(document.getKey ,document)
+ }
+ val e_IP_Visit_FQDN_Map = spark.sparkContext.broadcast(e_IP_Visit_FQDN_Mutabal_Map)
+
+ e_IP_Visit_FQDN_Mutabal_Map.clear()
+ e_Visit_v_IP_to_v_FQDN_DF.printSchema()
+ e_Visit_v_IP_to_v_FQDN_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
+ val e_Visit_Fqdn_to_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
+ val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
+ var i = 0
+ iter.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+ val doc = e_IP_Visit_FQDN_Map.value.getOrDefault(ip+"-"+fqdn, null)
+
+ if (doc != null) {
+ val document: BaseEdgeDocument = doc
+ val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("LAST_FOUND_TIME", e_Last)
+ document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
+ baseDocument.setKey(ip+"-"+fqdn)
+ baseDocument.setFrom("V_IP/"+ip)
+ baseDocument.setTo("V_FQDN/"+fqdn)
+ baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
+ baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
+ baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
+ docs_Insert.add(baseDocument)
+ }
+ })
+ Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
+ Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
+
+ })
+ e_IP_Visit_FQDN_Map.destroy()
+ arangoDB.shutdown()
+
+ spark.close()
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemoryGroupBy.scala b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemoryGroupBy.scala
new file mode 100644
index 0000000..5e51419
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/ArangoDbTestMemoryGroupBy.scala
@@ -0,0 +1,40 @@
+package cn.ac.iie.test
+
+import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByDF}
+import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
+import org.apache.spark.sql.SparkSession
+import org.slf4j.{Logger, LoggerFactory}
+
+object ArangoDbTestMemoryGroupBy {
+
+ private val logger: Logger = LoggerFactory.getLogger(ArangoDbTestMemoryGroupBy.getClass)
+
+ def main(args: Array[String]): Unit = {
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName(ConfigUtils.SPARK_APP_NAME)
+ .config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
+ .config("spark.network.timeout", ConfigUtils.SPARK_NETWORK_TIMEOUT)
+ .config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", ConfigUtils.SPARK_EXECUTOR_MEMORY)
+ .master(ConfigUtils.MASTER)
+ .getOrCreate()
+ logger.warn("sparkession获取成功!!!")
+
+ BaseMediaDataLoad.loadMediaDate(spark)
+ val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
+ val v_IP_DF = BaseMediaDataLoad.getIPVertexFromMedia(spark)
+ val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
+ val e_Visit_v_IP_to_v_FQDN_DF= BaseMediaDataLoad.getIPVisitFQDNEdgeFromMedia(spark)
+
+ UpdateArangoGraphByDF.updateFQDNVertex(v_FQDN_DF,spark)
+ UpdateArangoGraphByDF.updateIPVertex(v_IP_DF,spark)
+ UpdateArangoGraphByDF.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,spark)
+ UpdateArangoGraphByDF.updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF,spark)
+
+ InitArangoDBPool.arangoDB.shutdown()
+ spark.close()
+
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/Config.scala b/ip-learning/src/main/scala/cn/ac/iie/test/Config.scala
new file mode 100644
index 0000000..699cc16
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/Config.scala
@@ -0,0 +1,22 @@
+package cn.ac.iie.test
+
+import com.typesafe.config.{Config, ConfigFactory}
+
+object Config {
+ private lazy val config: Config = ConfigFactory.load()
+ val SPARK_SQL_SHUFFLE_PARTITIONS: String = config.getString("spark.sql.shuffle.partitions")
+ val SPARK_SQL_READ_FETCHSIZE: String = config.getString("spark.sql.read.fetchsize")
+ val SPARK_EXECUTOR_MEMORY: String = config.getString("spark.executor.memory")
+ val NUMPARTITIONS: String = config.getString("numPartitions")
+ val MASTER: String = config.getString("master")
+ val MAXPOOLSIZE: Int = config.getInt("maxPoolSize")
+ val MINTIME: String = config.getString("minTime")
+ val MAXTIME: String = config.getString("maxTime")
+
+ val REPARTITION_NUMBER: Int = config.getInt("repartitionNumber")
+ val ARANGODB_BATCH: Int = config.getInt("arangoDB.batch")
+ val ARANGODB_TTL: Int = config.getInt("arangoDB.ttl")
+ val CLICKHOUSE_SOCKET_TIMEOUT: Int = config.getInt("clickhouse.socket.timeout")
+ val SPARK_SERIALIZER: String = config.getString("spark.serializer")
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/ReadClickhouseTest.scala b/ip-learning/src/main/scala/cn/ac/iie/test/ReadClickhouseTest.scala
new file mode 100644
index 0000000..3ff1742
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/ReadClickhouseTest.scala
@@ -0,0 +1,447 @@
+package cn.ac.iie.test
+
+import com.orientechnologies.orient.core.db.{ODatabasePool, OPartitionedDatabasePool}
+import com.orientechnologies.orient.core.sql.OCommandSQL
+import com.tinkerpop.blueprints.impls.orient.{OrientGraph, OrientGraphFactory}
+import com.tinkerpop.blueprints.{Direction, Edge, Vertex}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.util.Try
+
+object ReadClickhouseTest {
+ private val logger: Logger = LoggerFactory.getLogger(ReadClickhouseTest.getClass)
+ @transient
+ var factory: OrientGraphFactory = _
+
+ @transient
+ var pool: ODatabasePool = _
+
+ def main(args: Array[String]): Unit = {
+ // val warehouseLocation = new File("spark-warehouse").getAbsolutePath
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ // .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .master(Config.MASTER)
+
+ /*
+ .config("spark.driver.host", "192.168.41.79")
+ .config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
+ .master("spark://192.168.40.119:7077")
+ */
+ .getOrCreate()
+ logger.warn("sparkession获取成功!!!")
+
+ // val sql = "(select * from av_miner.media_expire_patch_local limit 1000)"
+
+ val mediaDataFrame: DataFrame = spark.read.format("jdbc")
+ .option("url", "jdbc:clickhouse://192.168.40.193:8123")
+// .option("dbtable", "av_miner.media_expire_patch")
+// .option("dbtable", "(select * from av_miner.media_expire_patch limit 10)")
+// .option("dbtable","(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where media_domain not LIKE '%\\n%')")
+ .option("dbtable",s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
+// .option("dbtable","(select media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region,min(recv_time) as min_recv_time,max(recv_time) as max_recv_time from av_miner.media_expire_patch group by media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region limit 10)")
+ .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
+ .option("user", "default")
+ .option("password", "111111")
+ .option("numPartitions", Config.NUMPARTITIONS)
+ .option("partitionColumn", "recv_time")
+ .option("lowerBound", Config.MINTIME)
+ .option("upperBound", Config.MAXTIME)
+ .option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
+ .load()
+ // mediaDataFrame.printSchema()
+ mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
+ // val mediaDataGlobalView = spark.sql("select * from global_temp.media_expire_patch limit 10")
+ // mediaDataGlobalView.show()
+
+ val v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS FQDN_NAME,
+ | MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
+ | COUNT( * ) AS FQDN_COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | media_domain != ''
+ |GROUP BY
+ | media_domain
+ """.stripMargin
+ )
+ val s_IP_DF = spark.sql(
+ """
+ select
+ s1_s_ip as IP,
+ s1_s_location_region as location,
+ MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
+ MAX( recv_time ) AS IP_LAST_FOUND_TIME,
+ COUNT( * ) AS IP_COUNT_TOTAL
+ from global_temp.media_expire_patch
+ GROUP BY
+ IP,
+ location
+ """.stripMargin)
+
+ val d_IP_DF = spark.sql(
+ """
+ select
+ s1_d_ip as IP,
+ s1_d_location_region as location,
+ MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
+ MAX( recv_time ) AS IP_LAST_FOUND_TIME,
+ COUNT( * ) AS IP_COUNT_TOTAL
+ from global_temp.media_expire_patch
+ GROUP BY
+ IP,
+ location
+ """.stripMargin)
+ import org.apache.spark.sql.functions._
+ val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP","location").agg(
+ min("IP_FIRST_FOUND_TIME") .as("IP_FIRST_FOUND_TIME"),
+ max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
+ count("IP").as("IP_COUNT_TOTAL")
+ )
+
+ /*
+ val v_IP_DF = spark.sql(
+ """
+ |SELECT
+ | IP,
+ | location,
+ | MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS IP_LAST_FOUND_TIME,
+ | COUNT( * ) AS IP_COUNT_TOTAL
+ |FROM
+ | (
+ | ( SELECT s1_s_ip AS IP, s1_s_location_region AS location, recv_time FROM global_temp.media_expire_patch ) UNION ALL
+ | ( SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM global_temp.media_expire_patch )
+ | )
+ |GROUP BY
+ | IP,
+ | location
+ """.stripMargin)
+ */
+ val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS V_FQDN,
+ | s1_d_ip AS V_IP,
+ | MIN( recv_time ) AS FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS LAST_FOUND_TIME,
+ | COUNT( * ) AS COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( media_domain != '' )
+ | AND ( s1_d_ip != '' )
+ |GROUP BY
+ | s1_d_ip,
+ | media_domain
+ """.stripMargin)
+
+ val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | s1_s_ip AS V_IP,
+ | media_domain AS V_FQDN,
+ | MIN( recv_time ) AS FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS LAST_FOUND_TIME,
+ | COUNT( * ) AS COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( s1_s_ip != '' )
+ | AND ( media_domain != '' )
+ |GROUP BY
+ | s1_s_ip,
+ | media_domain
+ """.stripMargin)
+
+
+ /**
+ * 获取数据库连接
+ */
+ val uri: String = "remote:192.168.40.127/iplearning-insert"
+// val uri: String = "remote:192.168.40.207/iplearing-test"
+// val uri: String = "remote:192.168.40.152:2424;192.168.40.151:2424:192.168.40.153:2424/iplearing-test"
+ val pool = new OPartitionedDatabasePool(uri, "root", "111111", Config.MAXPOOLSIZE, Config.MAXPOOLSIZE)
+ factory = new OrientGraphFactory(uri, "root", "111111", pool)
+ factory.setConnectionStrategy("ROUND_ROBIN_CONNECT")
+
+ /**
+ * 更新FQDN点
+ */
+ v_FQDN_DF.printSchema()
+ v_FQDN_DF.foreach(row => {
+ val fqdn = row.getAs[String]("FQDN_NAME")
+ val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
+ val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
+ val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
+// val graph = factory.getNoTx
+ val graph: OrientGraph = factory.getTx
+ var v_Fqdn_Obj: Vertex = null
+ import scala.collection.JavaConversions._
+
+ if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
+ for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
+ val update_Fqdn_Last = v.getProperty[Long]("LAST_FOUND_TIME")
+ val update_Fqdn_Cnt = v.getProperty[Long]("FQDN_APPEAR_COUNT")
+ val sqlComm = new OCommandSQL(
+ s"UPDATE v_FQDN SET LAST_FOUND_TIME = $update_Fqdn_Last,FQDN_APPEAR_COUNT = ${update_Fqdn_Cnt + v_Fqdn_Cnt} WHERE FQDN_NAME == '$fqdn'")
+ Try(graph.command(sqlComm).execute())
+ println("update fqdn:"+fqdn)
+ v_Fqdn_Obj = v
+ }
+ } else {
+ v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
+
+ v_Fqdn_Obj.setProperty("FQDN_NAME", fqdn)
+ v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", v_Fqdn_First)
+ v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", v_Fqdn_Last)
+ v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", v_Fqdn_Cnt)
+ println("insert fqdn:"+fqdn)
+ }
+ var i = 0
+ i = i+1
+ if (i == 10000){
+ graph.commit()
+ }
+ })
+ factory.getTx.commit()
+
+ /**
+ * 更新IP点
+ */
+ v_IP_DF.printSchema()
+ v_IP_DF.foreach(row => {
+ val ip = row.getAs[String]("IP")
+ val location = row.getAs[String]("location")
+ val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
+ val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
+ val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
+// val graph = factory.getNoTx
+
+ val graph = factory.getTx
+ var v_IP_Obj: Vertex = null
+ import scala.collection.JavaConversions._
+ if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
+ for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
+ val update_IP_Last = v.getProperty[Long]("LAST_FOUND_TIME")
+ val update_IP_Cnt = v.getProperty[Long]("IP_APPEAR_COUNT")
+ val sqlComm = new OCommandSQL(
+ s"UPDATE v_IP SET LAST_FOUND_TIME = $update_IP_Last,FQDN_APPEAR_COUNT = ${update_IP_Cnt + v_IP_Cnt} "
+ + s"WHERE IP == '$ip'")
+ Try(graph.command(sqlComm).execute())
+ println("update ip:"+ip)
+ v_IP_Obj = v
+ }
+ } else {
+ v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
+
+ v_IP_Obj.setProperty("IP", ip)
+ v_IP_Obj.setProperty("IP_LOCATION", location)
+ v_IP_Obj.setProperty("FIRST_FOUND_TIME", v_IP_First)
+ v_IP_Obj.setProperty("LAST_FOUND_TIME", v_IP_Last)
+ v_IP_Obj.setProperty("IP_APPEAR_COUNT", v_IP_Cnt)
+ println("insert ip:"+ip)
+ }
+ var i = 0
+ i = i+1
+ if (i == 10000){
+ graph.commit()
+ }
+ })
+ factory.getTx.commit()
+
+ /**
+ * 统计e_Address_Fqdn_to_IP
+ */
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ e_Address_v_FQDN_to_v_IP_DF.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+
+ val graph = factory.getNoTx
+ var v_Fqdn_Obj: Vertex = null
+ var v_IP_Obj: Vertex = null
+ var e_Edge_Obj:Edge = null
+
+ import scala.collection.JavaConversions._
+
+ //获取fqdn点
+ if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
+ for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
+ v_Fqdn_Obj = v
+ }
+ } else {
+ v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
+ v_Fqdn_Obj.setProperty("FQDN_NAME", fqdn)
+ v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", 0)
+ v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", 0)
+ v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", 0)
+
+ }
+
+ //获取IP点
+ if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
+ for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
+ v_IP_Obj = v
+ }
+ } else {
+ v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
+
+ v_IP_Obj.setProperty("IP", ip)
+ v_IP_Obj.setProperty("IP_LOCATION", "")
+ v_IP_Obj.setProperty("FIRST_FOUND_TIME", 0)
+ v_IP_Obj.setProperty("LAST_FOUND_TIME", 0)
+ v_IP_Obj.setProperty("IP_APPEAR_COUNT", 0)
+ }
+// println("e_address_egde:"+v_Fqdn_Obj.getProperty[String]("FQDN_NAME")+"-"+v_IP_Obj.getProperty[String]("IP"))
+ //添加或更新边
+ for (e: Edge <- v_Fqdn_Obj.getEdges(Direction.OUT)) {
+ if (e.getVertex(Direction.IN).getProperty[String]("IP") == ip){
+ val cnt = e.getProperty[Long]("COUNT_TOTAL")
+ e.setProperty("COUNT_TOTAL",e_Cnt+cnt)
+ e.setProperty("LAST_FOUND_TIME",e_Last)
+ println("update e_address_egde:"+fqdn+"-"+ip)
+ e_Edge_Obj = e
+ }
+ }
+ if (e_Edge_Obj != null){
+ val newEdge = graph.addEdge(null, v_Fqdn_Obj, v_IP_Obj, "E_ADDRESS_V_FQDN_TO_V_IP")
+ newEdge.setProperty("COUNT_TOTAL",e_Cnt)
+ newEdge.setProperty("FIRST_FOUND_TIME",e_First)
+ newEdge.setProperty("LAST_FOUND_TIME",e_Last)
+ println("insert e_address_egde:"+fqdn+"-"+ip)
+ }
+ })
+
+ /**
+ * 统计e_Visit_v_IP_to_v_FQDN
+ */
+ e_Visit_v_IP_to_v_FQDN_DF.printSchema()
+ e_Visit_v_IP_to_v_FQDN_DF.foreach(row => {
+ val fqdn = row.getAs[String]("V_FQDN")
+ val ip = row.getAs[String]("V_IP")
+ val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
+ val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
+ val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
+
+ val graph = factory.getNoTx
+ var v_Fqdn_Obj: Vertex = null
+ var v_IP_Obj: Vertex = null
+ var e_Edge_Obj:Edge = null
+
+ import scala.collection.JavaConversions._
+
+ //添加或更新fqdn点
+ if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
+ for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
+ v_Fqdn_Obj = v
+ }
+ } else {
+ v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
+ v_Fqdn_Obj.setProperty("IP", ip)
+ v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", 0)
+ v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", 0)
+ v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", 0)
+ }
+
+ //添加或更新IP点
+ if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
+ for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
+ v_IP_Obj = v
+ }
+ } else {
+ v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
+ v_IP_Obj.setProperty("FQDN_NAME", fqdn)
+ v_IP_Obj.setProperty("IP_LOCATION", "")
+ v_IP_Obj.setProperty("FIRST_FOUND_TIME", 0)
+ v_IP_Obj.setProperty("LAST_FOUND_TIME", 0)
+ v_IP_Obj.setProperty("IP_APPEAR_COUNT", 0)
+ }
+// println("e_visit_egde:"+v_Fqdn_Obj.getProperty[String]("FQDN_NAME")+"-"+v_IP_Obj.getProperty[String]("IP"))
+
+ //添加或更新边
+ for (e: Edge <- v_IP_Obj.getEdges(Direction.OUT)) {
+ if (e.getVertex(Direction.IN).getProperty[String]("FQDN_NAME") == fqdn){
+ val cnt = e.getProperty[Long]("COUNT_TOTAL")
+ e.setProperty("COUNT_TOTAL",e_Cnt+cnt)
+ e.setProperty("LAST_FOUND_TIME",e_Last)
+ println("update e_visit_egde:"+fqdn+"-"+ip)
+ e_Edge_Obj = e
+ }
+ }
+ if (e_Edge_Obj != null){
+ val newEdge = graph.addEdge(null, v_Fqdn_Obj, v_IP_Obj, "E_VISIT_V_IP_TO_V_FQDN")
+ newEdge.setProperty("COUNT_TOTAL",e_Cnt)
+ newEdge.setProperty("FIRST_FOUND_TIME",e_First)
+ newEdge.setProperty("LAST_FOUND_TIME",e_Last)
+ println("insert e_visit_egde:"+fqdn+"-"+ip)
+ }
+ // graph.commit()
+ })
+
+
+
+
+ /*
+ v_FQDN_DF.printSchema()
+ v_FQDN_DF.coalesce(20).foreach(row => {
+ val fqdn = row.getAs[String](0)
+ val first = row.getAs[Long](1)
+ val last = row.getAs[Long](2)
+ val count = row.getAs[Long](3)
+ val session = pool.acquire()
+ val vertex = session.newVertex("v_FQDN")
+ vertex.setProperty("FQDN_NAME",fqdn)
+ vertex.setProperty("FIRST_FOUND_TIME", first)
+ vertex.setProperty("LAST_FOUND_TIME", last)
+ vertex.setProperty("FQDN_APPEAR_COUNT", count)
+ vertex
+ })
+
+
+
+ v_IP_DF.printSchema()
+ v_IP_DF.coalesce(20).foreach(row => {
+ val ip = row.getAs[String](0)
+ val first = row.getAs[Long](2)
+ val last = row.getAs[Long](3)
+ val count = row.getAs[Long](4)
+ val tx: OrientGraph = factory.getTx
+ val vertex = tx.addVertex("class:v_FQDN",Nil: _*)
+ vertex.setProperties("FQDN_NAME",ip)
+ vertex.setProperty("FIRST_FOUND_TIME", first)
+ vertex.setProperty("LAST_FOUND_TIME", last)
+ vertex.setProperty("FQDN_APPEAR_COUNT", count)
+ tx.commit()
+ })
+
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ e_Address_v_FQDN_to_v_IP_DF.foreach(row => {
+ val fqdn = row.getAs[String](0)
+ val ip = row.getAs[String](2)
+ val first = row.getAs[Long](3)
+ val last = row.getAs[Long](4)
+ val count = row.getAs[Long](5)
+ val session = pool.acquire()
+ val tx: OrientGraph = factory.getTx
+ tx.getFeatures.supportsVertexProperties
+ val vertex: OrientVertex = tx.getVertex()
+ tx.addEdge(null,vertex,vertex,"")
+ })
+ */
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocument.scala b/ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocument.scala
new file mode 100644
index 0000000..f653a51
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocument.scala
@@ -0,0 +1,29 @@
+package cn.ac.iie.test
+
+import com.arangodb.entity.DocumentField
+import com.arangodb.entity.DocumentField.Type
+
+import scala.beans.BeanProperty
+
+class TestBaseEdgeDocument {
+
+ @BeanProperty
+ @DocumentField(Type.FROM)
+ var from: String=""
+ @BeanProperty
+ @DocumentField(Type.TO)
+ var to: String=""
+ @BeanProperty
+ @DocumentField(Type.KEY)
+ var key: String=""
+ @BeanProperty
+ @DocumentField(Type.ID)
+ var id: String=""
+ @BeanProperty
+ var FIRST_FOUND_TIME:Long = 0
+ @BeanProperty
+ var LAST_FOUND_TIME:Long = 0
+ @BeanProperty
+ var COUNT_TOTAL:Long = 0
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocumentDataFrame.scala b/ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocumentDataFrame.scala
new file mode 100644
index 0000000..65239bd
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/TestBaseEdgeDocumentDataFrame.scala
@@ -0,0 +1,35 @@
+package cn.ac.iie.test
+
+import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByDF}
+import cn.ac.iie.utils.InitArangoDBPool
+import com.arangodb.entity.BaseEdgeDocument
+import com.arangodb.util.MapBuilder
+import com.arangodb.{ArangoCursor, ArangoDB}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+
+object TestBaseEdgeDocumentDataFrame {
+ @transient
+ var arangoDB: ArangoDB = _
+ def main(args: Array[String]): Unit = {
+
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .master(Config.MASTER)
+ .getOrCreate()
+
+ BaseMediaDataLoad.loadMediaDate(spark)
+ val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
+ UpdateArangoGraphByDF.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,spark)
+
+ InitArangoDBPool.arangoDB.shutdown()
+ spark.close()
+
+
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/TestIndices.scala b/ip-learning/src/main/scala/cn/ac/iie/test/TestIndices.scala
new file mode 100644
index 0000000..80a9916
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/TestIndices.scala
@@ -0,0 +1,219 @@
+package cn.ac.iie.test
+
+import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
+import com.arangodb.util.MapBuilder
+import com.arangodb.{ArangoCursor, ArangoDB}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import scala.util.Try
+
+object TestIndices {
+ @transient
+ var arangoDB: ArangoDB = _
+ def main(args: Array[String]): Unit = {
+
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .master(Config.MASTER)
+ .getOrCreate()
+
+ val mediaDataFrame: DataFrame = spark.read.format("jdbc")
+ .option("url", "jdbc:clickhouse://192.168.40.193:8123")
+ .option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
+ .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
+ .option("user", "default")
+ .option("password", "111111")
+ .option("numPartitions", Config.NUMPARTITIONS)
+ .option("partitionColumn", "recv_time")
+ .option("lowerBound", Config.MINTIME)
+ .option("upperBound", Config.MAXTIME)
+ .option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
+ .load()
+ mediaDataFrame.printSchema()
+ mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
+
+ val v_FQDN_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS FQDN_NAME,
+ | MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
+ | COUNT( * ) AS FQDN_COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | media_domain != ''
+ |GROUP BY
+ | media_domain
+ """.stripMargin
+ )
+ val time1 = System.currentTimeMillis()
+
+ arangoDB = new ArangoDB.Builder()
+ .maxConnections(Config.MAXPOOLSIZE)
+ .host("192.168.40.127", 8529)
+ .user("root")
+ .password("111111")
+ .build
+
+ val dbName = "insert_iplearn_index"
+ val collectionName = "V_FQDN"
+ val query = "FOR doc IN " + collectionName + " RETURN doc"
+ val bindVars = new MapBuilder().get
+ val cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(dbName).query(query, bindVars, null, classOf[BaseEdgeDocument])
+ var cursor_Map = scala.collection.mutable.HashMap[String,BaseEdgeDocument]()
+ while (cursor.hasNext){
+ val document = cursor.next()
+ cursor_Map += (document.getKey -> document)
+ }
+ val time2 = System.currentTimeMillis()
+
+ println((time2 - time1)/1000)
+ val docs_Insert = new java.util.ArrayList[BaseDocument]()
+ val docs_Update = new java.util.ArrayList[BaseDocument]()
+ v_FQDN_DF.foreach(row => {
+ val fqdn = row.getAs[String]("FQDN_NAME")
+ val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
+ val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
+ val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
+
+ val doc = cursor_Map.getOrElse(fqdn, null)
+ if (doc != null) {
+ val document: BaseDocument = doc
+ val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
+ document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
+ document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
+ docs_Update.add(document)
+ } else {
+ val baseDocument: BaseDocument = new BaseDocument()
+ baseDocument.setKey(fqdn)
+ baseDocument.addAttribute("FQDN_NAME", fqdn)
+ baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
+ baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
+ baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
+ docs_Insert.add(baseDocument)
+ }
+ })
+
+// Try(v_FQDN_Coll.importDocuments(docs_Insert))
+// Try(v_FQDN_Coll.updateDocuments(docs_Update))
+
+
+
+
+ /*
+ val db = arangoDB.db("insert_iplearn_index")
+ val coll = db.collection("E_ADDRESS_V_FQDN_TO_V_IP")
+ val docs = new java.util.ArrayList[BaseEdgeDocument]
+ val baseEdgeDocument2 = new BaseEdgeDocument
+ baseEdgeDocument2.setKey("test_edge_2.com")
+ baseEdgeDocument2.setFrom("V_FQDN/test_edge_2_from")
+ baseEdgeDocument2.setTo("V_IP/test_edge_2_to")
+ baseEdgeDocument2.addAttribute("e_add_test_str", "1Two3")
+ baseEdgeDocument2.addAttribute("e_add_test_num", 4321)
+ docs.add(baseEdgeDocument2)
+
+ coll.importDocuments(docs)
+ arangoDB.shutdown()
+
+*/
+
+ /*
+ val uri: String = "remote:192.168.40.127/iplearning-insert"
+ val pool = new OPartitionedDatabasePool(uri, "root", "111111", 5, 5)
+ factory = new OrientGraphFactory(uri, "root", "111111", pool)
+ val graph = factory.getNoTx
+ val ip = "23.224.224.163"
+ import scala.collection.JavaConversions._
+ /*
+ for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
+ val update_IP_Last = v.getProperty[Long]("LAST_FOUND_TIME")
+ val update_IP_Cnt = v.getProperty[Long]("IP_APPEAR_COUNT")
+ val sqlComm = new OCommandSQL(
+ s"UPDATE v_IP SET LAST_FOUND_TIME = $update_IP_Last,FQDN_APPEAR_COUNT = 100 "
+ + s"WHERE IP == '$ip'")
+ Try(graph.command(sqlComm).execute())
+ println("update ip:" + ip)
+ }
+*/
+ val v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
+
+ v_IP_Obj.setProperty("IP", ip)
+ v_IP_Obj.setProperty("IP_LOCATION", "fas")
+ v_IP_Obj.setProperty("FIRST_FOUND_TIME", 1)
+ v_IP_Obj.setProperty("LAST_FOUND_TIME", 1)
+ v_IP_Obj.setProperty("IP_APPEAR_COUNT", 1)
+*/
+
+ /*
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .config("arangodb.hosts", "192.168.40.127:8529")
+ .config("arangodb.user", "root")
+ .config("arangodb.password", "111111")
+ .master(Config.MASTER)
+ .getOrCreate()
+
+ val value: ArangoRDD[BaseDocument] = ArangoSpark
+ .load[BaseDocument](spark.sparkContext,"V_FQDN",ReadOptions("insert_iplearn_index"))
+
+// var stringToDocument: Map[String, BaseDocument] = Map[String,BaseDocument]()
+ val lstBuffer: ListBuffer[(String, BaseDocument)] = ListBuffer[(String, BaseDocument)]()
+ val map: Map[String, BaseDocument] = value.map(doc => (doc.getKey,doc)).collect().toMap
+ println(map.size)
+
+ spark.close()
+*/
+ /*
+ arangoDB = new ArangoDB.Builder()
+ .maxConnections(10)
+ .host("192.168.40.127", 8529)
+ .user("root")
+ .password("111111")
+ .build
+
+ val db = arangoDB.db("insert_iplearn_index")
+// db.createCollection("V_FQDN")
+// db.createCollection("V_IP")
+// db.createCollection("E_ADDRESS_V_FQDN_TO_V_IP")
+// db.createCollection("E_VISIT_V_IP_TO_V_FQDN")
+ val v_FQDN_Coll = db.collection("E_VISIT_V_IP_TO_V_FQDN")
+*/
+
+// val coll: ArangoCollection = db.collection("V_FQDN")
+// val value = coll.getDocument("test1.com",classOf[BaseDocument])
+// val str = value.getAttribute("v_fqdn_test_str")
+// val num: Int = value.getAttribute("v_fqdn_test_num").toString.toInt
+// println(str+"-"+num)
+ /*
+ val docs = new util.ArrayList[BaseDocument]
+ val baseDocument1 = new BaseDocument
+ baseDocument1.setKey("test1.com")
+ baseDocument1.addAttribute("v_fqdn_test_str", "one2three")
+ baseDocument1.addAttribute("v_fqdn_test_num", 1234)
+ docs.add(baseDocument1)
+
+ val baseDocument2 = new BaseDocument
+ baseDocument2.setKey("test2.com")
+ baseDocument2.addAttribute("v_fqdn_test_str", "1Two3")
+ baseDocument2.addAttribute("v_fqdn_test_num", 4321)
+ docs.add(baseDocument2)
+ coll.importDocuments(docs)
+*/
+
+// arangoDB.shutdown()
+
+
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/test/TestSparkJoin.scala b/ip-learning/src/main/scala/cn/ac/iie/test/TestSparkJoin.scala
new file mode 100644
index 0000000..5c74ab7
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/test/TestSparkJoin.scala
@@ -0,0 +1,56 @@
+package cn.ac.iie.test
+
+import cn.ac.iie.dao.BaseMediaDataLoad
+import cn.ac.iie.etl.CursorTransform
+import cn.ac.iie.pojo.BaseVertexFqdn
+import cn.ac.iie.utils.InitArangoDBPool
+import com.arangodb.ArangoCursor
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+object TestSparkJoin {
+ private val logger: Logger = LoggerFactory.getLogger(TestSparkJoin.getClass)
+
+ def main(args: Array[String]): Unit = {
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", Config.SPARK_SERIALIZER)
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
+ .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
+ .master(Config.MASTER)
+ .getOrCreate()
+ logger.warn("sparkession获取成功!!!")
+ BaseMediaDataLoad.loadMediaDate(spark)
+// val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
+ BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark).show(10)
+// v_FQDN_DF.printSchema()
+
+
+/*
+ val arangoDB = InitArangoDBPool.arangoDB
+ val options = InitArangoDBPool.options
+ val bindVars = InitArangoDBPool.bindVars
+
+// val v_FQDN_Query = "FOR doc IN V_FQDN limit 100 RETURN doc"
+// val v_FQDN_Query = "FOR doc IN V_FQDN RETURN doc"
+// val v_FQDN_Cursor: ArangoCursor[BaseVertexFqdn] = arangoDB.db("insert_iplearn_noindex")
+// .query(v_FQDN_Query, bindVars, options, classOf[BaseVertexFqdn])
+// val v_FQDN_Curson_DF: DataFrame = spark.createDataFrame(v_FQDN_Cursor.asListRemaining(),classOf[BaseVertexFqdn])
+// v_FQDN_Curson_DF.printSchema()
+//
+ val v_FQDN_Curson_DF = CursorTransform.cursorToDataFrame("V_FQDN",classOf[BaseVertexFqdn],spark)
+ val v_Fqdn_Join_Df = v_FQDN_DF.join(v_FQDN_Curson_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Curson_DF("key"),"fullouter")
+ v_Fqdn_Join_Df.printSchema()
+ v_Fqdn_Join_Df
+// .filter(row => row.getAs[String]("new_fqdn_name")!=null)
+// .filter(row => row.getAs[String]("new_fqdn_name")!=null)
+ .show(300)
+
+ arangoDB.shutdown()
+ spark.close()
+*/
+ }
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/utils/ConfigUtils.scala b/ip-learning/src/main/scala/cn/ac/iie/utils/ConfigUtils.scala
new file mode 100644
index 0000000..9c52b57
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/utils/ConfigUtils.scala
@@ -0,0 +1,34 @@
+package cn.ac.iie.utils
+
+import com.typesafe.config.{Config, ConfigFactory}
+
+object ConfigUtils {
+ private lazy val config: Config = ConfigFactory.load()
+
+ val SPARK_SQL_SHUFFLE_PARTITIONS: String = config.getString("spark.sql.shuffle.partitions")
+ val SPARK_SQL_READ_FETCHSIZE: String = config.getString("spark.sql.read.fetchsize")
+ val SPARK_EXECUTOR_MEMORY: String = config.getString("spark.executor.memory")
+ val SPARK_APP_NAME: String = config.getString("spark.app.name")
+ val SPARK_NETWORK_TIMEOUT: String = config.getString("spark.network.timeout")
+ val REPARTITION_NUMBER: Int = config.getInt("repartitionNumber")
+ val SPARK_SERIALIZER: String = config.getString("spark.serializer")
+
+ val NUMPARTITIONS: String = config.getString("numPartitions")
+ val MASTER: String = config.getString("master")
+ val MAXPOOLSIZE: Int = config.getInt("maxPoolSize")
+ val MINTIME: String = config.getString("minTime")
+ val MAXTIME: String = config.getString("maxTime")
+
+ val ARANGODB_HOST: String= config.getString("arangoDB.host")
+ val ARANGODB_PORT: Int = config.getInt("arangoDB.port")
+ val ARANGODB_USER: String= config.getString("arangoDB.user")
+ val ARANGODB_PASSWORD:String= config.getString("arangoDB.password")
+ val ARANGODB_BATCH: Int = config.getInt("arangoDB.batch")
+ val ARANGODB_DB_NAME:String= config.getString("arangoDB.DB.name")
+ val ARANGODB_TTL: Int = config.getInt("arangoDB.ttl")
+ val CLICKHOUSE_SOCKET_TIMEOUT: Int = config.getInt("clickhouse.socket.timeout")
+
+ val THREAD_POOL_NUMBER: Int = config.getInt("thread.pool.number")
+
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/utils/DateTimeUtils.scala b/ip-learning/src/main/scala/cn/ac/iie/utils/DateTimeUtils.scala
new file mode 100644
index 0000000..3b66bb6
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/utils/DateTimeUtils.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.utils
+
+object DateTimeUtils {
+
+}
diff --git a/ip-learning/src/main/scala/cn/ac/iie/utils/InitArangoDBPool.scala b/ip-learning/src/main/scala/cn/ac/iie/utils/InitArangoDBPool.scala
new file mode 100644
index 0000000..3a2efdc
--- /dev/null
+++ b/ip-learning/src/main/scala/cn/ac/iie/utils/InitArangoDBPool.scala
@@ -0,0 +1,24 @@
+package cn.ac.iie.utils
+
+import java.util
+
+import com.arangodb.ArangoDB
+import com.arangodb.model.AqlQueryOptions
+import com.arangodb.util.MapBuilder
+
+object InitArangoDBPool {
+
+ @transient
+ lazy val arangoDB: ArangoDB = new ArangoDB.Builder()
+ .maxConnections(ConfigUtils.MAXPOOLSIZE)
+ .host(ConfigUtils.ARANGODB_HOST, ConfigUtils.ARANGODB_PORT)
+ .user(ConfigUtils.ARANGODB_USER)
+ .password(ConfigUtils.ARANGODB_PASSWORD)
+ .build
+
+ val bindVars: util.Map[String, AnyRef] = new MapBuilder().get
+
+ val options: AqlQueryOptions = new AqlQueryOptions()
+ .ttl(ConfigUtils.ARANGODB_TTL)
+
+}
diff --git a/ip-learning/src/test/java/cn/ac/iie/CreateObjectJavaTest.java b/ip-learning/src/test/java/cn/ac/iie/CreateObjectJavaTest.java
new file mode 100644
index 0000000..8be7c35
--- /dev/null
+++ b/ip-learning/src/test/java/cn/ac/iie/CreateObjectJavaTest.java
@@ -0,0 +1,32 @@
+package cn.ac.iie;
+
+import com.tinkerpop.blueprints.impls.orient.OrientGraph;
+
+public class CreateObjectJavaTest {
+ public static void main(String[] args) {
+ String orientDbProtocol = "remote";
+ String orientDbHost = "192.168.40.127";
+ String orientDbDatabase= "iplearning-insert";
+ String orientDbUsername= "root";
+ String orientDbPassword= "111111";
+
+// String orientDbUri = "${orientDbProtocol}:${orientDbHost}/${orientDbDatabase}";
+ String orientDbUri = orientDbProtocol+":"+orientDbHost+"/"+orientDbDatabase;
+ OrientGraph dblpOrientDbGraph = new OrientGraph(orientDbUri, orientDbUsername, orientDbPassword);
+
+// val orientDB: OrientGraph = new OrientGraph(orientDbUri,orientDbUsername, orientDbPassword)
+ try {
+ System.out.println((dblpOrientDbGraph.isAutoStartTx()));
+ dblpOrientDbGraph.dropEdgeType("author_of ");
+ dblpOrientDbGraph.commit();
+ }catch (Exception ex){
+ //catching errors & print out
+ System.out.println(ex.getMessage());
+ }
+ finally {
+ //close the current OrientDb's connection
+ dblpOrientDbGraph.shutdown();
+ }
+
+ }
+}
diff --git a/ip-learning/src/test/scala/cn/ac/iie/CreateObjectTest.scala b/ip-learning/src/test/scala/cn/ac/iie/CreateObjectTest.scala
new file mode 100644
index 0000000..42162d0
--- /dev/null
+++ b/ip-learning/src/test/scala/cn/ac/iie/CreateObjectTest.scala
@@ -0,0 +1,157 @@
+package cn.ac.iie
+
+import java.lang
+
+import com.orientechnologies.orient.core.command.OCommandRequest
+import com.orientechnologies.orient.core.config.OGlobalConfiguration
+import com.orientechnologies.orient.core.db._
+import com.orientechnologies.orient.core.metadata.schema.{OClass, OType}
+import com.orientechnologies.orient.core.sql.OCommandSQL
+import com.tinkerpop.blueprints.Vertex
+import com.tinkerpop.blueprints.impls.orient.{OrientDynaElementIterable, OrientGraph, OrientGraphFactory}
+
+object CreateObjectTest {
+ def main(args: Array[String]): Unit = {
+
+ /*
+ val orientDbProtocol: String = "remote"
+ val orientDbHost: String = "192.168.40.127"
+ val orientDbDatabase: String = "iplearning-insert"
+ val orientDbUsername: String = "root"
+ val orientDbPassword: String = "111111"
+
+ val orientDbUri: String = s"${orientDbProtocol}:${orientDbHost}/${orientDbDatabase}"
+ val orientDB: OrientGraph = new OrientGraph(orientDbUri, orientDbUsername, orientDbPassword)
+
+ try {
+ println("---------------")
+ println(orientDB.isAutoStartTx)
+ orientDB.dropEdgeType("author-paper ")
+ orientDB.commit()
+ }catch {
+ //catching errors & print out
+ case ex: Exception => println(ex.getMessage)
+ }
+ finally {
+ //close the current OrientDb's connection
+ orientDB.shutdown()
+ }
+ */
+
+ /*
+ val poolCfg = OrientDBConfig.builder
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN, 5)
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX, 10)
+
+ val pool: ODatabasePool = new ODatabasePool(orientDB, "iplearning-insert", "root", "111111", poolCfg.build)
+ val session: ODatabaseSession = pool.acquire()
+ println(session.isValidationEnabled)
+ session.close()
+ */
+
+ /*
+ val info = new Properties()
+ info.put("user", "root")
+ info.put("password", "111111")
+
+ info.put("db.usePool", "true"); // USE THE POOL
+ info.put("db.pool.min", "3"); // MINIMUM POOL SIZE
+
+
+ val conn: OrientJdbcConnection = DriverManager.getConnection("jdbc:orient:remote:192.168.40.127/iplearing-index", info).asInstanceOf[OrientJdbcConnection]
+ println(conn.isValid(1))
+ */
+
+
+ val uri: String = "remote:192.168.40.127/iplearning-insert"
+ val pool = new OPartitionedDatabasePool(uri,"root","111111",20,20)
+ val factory: OrientGraphFactory = new OrientGraphFactory(uri,"root","111111",pool)
+// val graph: OrientGraph = factory.getTx()
+ val graph = factory.getNoTx
+
+ /*
+
+ val v_FQDN = graph.createVertexType("v_FQDN")
+ v_FQDN.createProperty("FQDN_NAME", OType.STRING)
+ v_FQDN.createProperty("FIRST_FOUND_TIME", OType.LONG)
+ v_FQDN.createProperty("LAST_FOUND_TIME", OType.LONG)
+ v_FQDN.createProperty("FQDN_APPEAR_COUNT", OType.LONG)
+
+ val v_IP = graph.createVertexType("v_IP")
+ v_IP.createProperty("IP", OType.STRING)
+ v_IP.createProperty("FIRST_FOUND_TIME", OType.LONG)
+ v_IP.createProperty("LAST_FOUND_TIME", OType.LONG)
+ v_IP.createProperty("IP_APPEAR_COUNT", OType.LONG)
+
+ val e_FQDN_to_IP = graph.createEdgeType("E_ADDRESS_V_FQDN_TO_V_IP")
+ e_FQDN_to_IP.createProperty("FIRST_FOUND_TIME", OType.LONG)
+ e_FQDN_to_IP.createProperty("LAST_FOUND_TIME", OType.LONG)
+ e_FQDN_to_IP.createProperty("COUNT_TOTAL", OType.LONG)
+
+ val e_IP_to_FQDN = graph.createEdgeType("E_VISIT_V_IP_TO_V_FQDN")
+ e_IP_to_FQDN.createProperty("FIRST_FOUND_TIME", OType.LONG)
+ e_IP_to_FQDN.createProperty("LAST_FOUND_TIME", OType.LONG)
+ e_IP_to_FQDN.createProperty("COUNT_TOTAL", OType.LONG)
+*/
+
+
+ /*
+ val orientDB = new OrientDB("remote:192.168.40.127", "root", "111111", OrientDBConfig.defaultConfig)
+
+ val poolCfg = OrientDBConfig.builder
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN, 5)
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX, 10)
+
+ val pool: ODatabasePool = new ODatabasePool(orientDB, "iplearning-insert", "root", "111111", poolCfg.build)
+ val session: ODatabaseSession = pool.acquire()
+
+ println(session.isClosed)
+*/
+
+ /*
+ val vertex = graph.addVertex("class:v_FQDN",Nil: _*)
+ vertex.setProperties("FQDN_NAME","123com1234")
+ vertex.setProperty("FIRST_FOUND_TIME", 1571241623)
+ vertex.setProperty("LAST_FOUND_TIME", 1571241570)
+ vertex.setProperty("FQDN_APPEAR_COUNT", 5)
+
+ val v_IP = graph.addVertex("class:v_IP",Nil: _*)
+ v_IP.setProperty("IP","3.3.3.3")
+ v_IP.setProperty("IP_LOCATION","cn")
+ v_IP.setProperty("FIRST_FOUND_TIME", 1571241623)
+ v_IP.setProperty("LAST_FOUND_TIME", 1571241570)
+ v_IP.setProperty("IP_APPEAR_COUNT", 6543)
+
+ graph.addEdge(null,vertex,v_IP,"E_ADDRESS_V_FQDN_TO_V_IP")
+*/
+ val sqlComm = new OCommandSQL(
+ "UPDATE E_ADDRESS_V_FQDN_TO_V_IP SET LAST_FOUND_TIME = 1412,FQDN_APPEAR_COUNT = 1244"
+ + "UPSERT WHERE 'v5-dy.ixigua.com' IN FQDN_NAME")
+ graph.command(sqlComm).execute()
+
+ import scala.collection.JavaConversions._
+ /*
+ if (graph.getVertices("v_FQDN.FQDN_NAME","v5-dy.ixigua.com111111").nonEmpty){
+ for (v <- graph.getVertices("v_FQDN.FQDN_NAME","v5-dy.ixigua.com1")) {
+ println(v == null)
+ }
+ }
+
+ val sqlComm = new OCommandSQL(
+ "UPDATE v_FQDN SET LAST_FOUND_TIME = 1412,FQDN_APPEAR_COUNT = 1244"
+ + "WHERE 'v5-dy.ixigua.com' IN FQDN_NAME")
+ graph.command(sqlComm).execute()
+ */
+
+
+ /*
+ println(graphFactory.exists())
+ val tx = graphFactory.getTx
+ val v_FQDN = tx.createVertexType("V_FQDN")
+ v_FQDN.createProperty("FQDN_NAME",OType.STRING)
+ v_FQDN.createProperty("FIRST_FOUND_TIME",OType.LONG)
+ v_FQDN.createProperty("LAST_FOUND_TIME",OType.LONG)
+ */
+ }
+
+}
diff --git a/ip-learning/src/test/scala/cn/ac/iie/HiveUnionTest.scala b/ip-learning/src/test/scala/cn/ac/iie/HiveUnionTest.scala
new file mode 100644
index 0000000..3a81125
--- /dev/null
+++ b/ip-learning/src/test/scala/cn/ac/iie/HiveUnionTest.scala
@@ -0,0 +1,78 @@
+package cn.ac.iie
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+object HiveUnionTest {
+ def main(args: Array[String]): Unit = {
+
+ val spark: SparkSession = SparkSession
+ .builder()
+ .appName("test")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.network.timeout", "300s")
+ .config("spark.sql.shuffle.partitions", 50)
+ .master("local[*]")
+
+ /*
+ .config("spark.executor.memory", "30g")
+ .config("spark.driver.host", "192.168.41.79")
+ .config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
+ .master("spark://192.168.40.119:7077")
+ */
+
+ .getOrCreate()
+
+ val sql =
+ """
+ |(SELECT s1_s_ip AS IP, s1_s_location_region AS location,recv_time FROM av_miner.media_expire_patch limit 100
+ |UNION ALL
+ |SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM av_miner.media_expire_patch limit 100)
+ """.stripMargin
+
+ val mediaDataFrame: DataFrame = spark.read.format("jdbc")
+ .option("url", "jdbc:clickhouse://192.168.40.193:8123")
+ .option("dbtable","(select * from av_miner.media_expire_patch limit 100) as media_expire_patch")
+// .option("dbtable", "av_miner.media_expire_patch")
+ // .option("dbtable", sql + " as a")
+ .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
+ .option("user", "default")
+ .option("password", "111111")
+ .option("numPartitions", "40")
+ .option("fetchsize", "1000000")
+ .load()
+ mediaDataFrame.printSchema()
+ mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
+ val frame = spark.sql(
+ """
+ select IP,location,MIN(recv_time) AS FIRST_FOUND_TIME, MAX(recv_time) AS LAST_FOUND_TIME, COUNT(*) AS COUNT_TOTAL from (
+ (SELECT s1_s_ip AS IP, s1_s_location_region AS location,recv_time FROM global_temp.media_expire_patch limit 100)
+ UNION ALL
+ (SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM global_temp.media_expire_patch limit 100)
+ ) group by IP,location
+ """.stripMargin)
+
+ val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
+ """
+ |SELECT
+ | media_domain AS V_FQDN,
+ | media_type,
+ | s1_d_ip AS V_IP,
+ | MIN( recv_time ) AS FIRST_FOUND_TIME,
+ | MAX( recv_time ) AS LAST_FOUND_TIME,
+ | COUNT( * ) AS COUNT_TOTAL
+ |FROM
+ | global_temp.media_expire_patch
+ |WHERE
+ | ( media_domain != '' )
+ | AND ( s1_d_ip != '' )
+ |GROUP BY
+ | s1_d_ip,
+ | media_domain,
+ | media_type
+ """.stripMargin)
+ e_Address_v_FQDN_to_v_IP_DF.printSchema()
+ e_Address_v_FQDN_to_v_IP_DF.show(200)
+// mediaDataFrame.show(20)
+ }
+
+}
diff --git a/ip-learning/src/test/scala/cn/ac/iie/TestMap.scala b/ip-learning/src/test/scala/cn/ac/iie/TestMap.scala
new file mode 100644
index 0000000..6bb4a97
--- /dev/null
+++ b/ip-learning/src/test/scala/cn/ac/iie/TestMap.scala
@@ -0,0 +1,11 @@
+package cn.ac.iie
+
+object TestMap {
+ def main(args: Array[String]): Unit = {
+ var mapTest: Map[String, Int] = Map[String,Int]()
+ mapTest += ("1" -> 1)
+ println(mapTest.size)
+
+ }
+
+}