summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHEATAO <[email protected]>2021-09-29 17:25:48 +0800
committerHEATAO <[email protected]>2021-09-29 17:25:48 +0800
commit0e56eea95c75cb23a80e3a8ed7fbbd6c2028d83a (patch)
treee86483026cef30b54df3d1cad0851002325f6006
parentdb3f315053dded43d345e41bfae8574378ed7ca3 (diff)
add MiningService
-rw-r--r--.idea/inspectionProfiles/Project_Default.xml10
-rw-r--r--src/main/java/cn/ac/iie/mapper/AVDataListMapper.java2
-rw-r--r--src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml8
-rw-r--r--src/main/java/cn/ac/iie/mining/MiningService.java83
-rw-r--r--src/main/java/cn/ac/iie/mining/prefixSpark.java95
-rw-r--r--src/main/resources/video_data.properties8
-rw-r--r--target/classes/cn/ac/iie/mapper/AVDataListMapper.xml8
-rw-r--r--target/classes/video_data.properties8
8 files changed, 197 insertions, 25 deletions
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
new file mode 100644
index 0000000..58ad3a5
--- /dev/null
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -0,0 +1,10 @@
+<component name="InspectionProjectProfileManager">
+ <profile version="1.0">
+ <option name="myName" value="Project Default" />
+ <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
+ <Languages>
+ <language minSize="46" name="Java" />
+ </Languages>
+ </inspection_tool>
+ </profile>
+</component> \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java
index 6fbbd28..e1491ef 100644
--- a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java
+++ b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java
@@ -23,4 +23,6 @@ public interface AVDataListMapper {
public List<AVlog> statisticsUA();
public int cumulativeData(@Param("urlPath") String urlPath, @Param("effectiveTime") long effectiveTime);
+
+ public List<AVlog> getHistoryHost(@Param("host") String host);
}
diff --git a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml
index 5ed3020..86da199 100644
--- a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml
+++ b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml
@@ -50,4 +50,12 @@
select count(url) from `av_event_log_local`
where url like #{urlPath} and recv_time &gt;= #{effectiveTime}
</select>
+
+ <select id="getHistoryHost" resultType="cn.ac.iie.pojo.AVlog">
+ select client_ip, server_ip, url, recv_time, found_time, expire_time
+ from `av_event_log_local`
+ where url like #{host}
+ order by recv_time desc
+ limit 100
+ </select>
</mapper> \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/mining/MiningService.java b/src/main/java/cn/ac/iie/mining/MiningService.java
index 3dd1aac..a64b1d7 100644
--- a/src/main/java/cn/ac/iie/mining/MiningService.java
+++ b/src/main/java/cn/ac/iie/mining/MiningService.java
@@ -3,8 +3,11 @@ package cn.ac.iie.mining;
import cn.ac.iie.Utils.JedisPoolUtils;
import cn.ac.iie.Utils.SqlSessionFactoryUtils;
import cn.ac.iie.Utils.URLHandleUtils;
+import cn.ac.iie.config.Configurations;
import cn.ac.iie.config.GetConf;
import cn.ac.iie.mapper.AVDataListMapper;
+import cn.ac.iie.pojo.AVlog;
+import cn.ac.iie.pojo.TrafficLog;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.ibatis.session.SqlSession;
@@ -12,13 +15,17 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.spark.ml.fpm.PrefixSpan;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.*;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,7 +70,7 @@ public class MiningService {
private static SparkSession spark = SparkSession
.builder()
.appName("PrefixSpanHosts")
- //.master("local[4]")
+ .master(Configurations.getStringProperty(0, "Spark.master"))
.getOrCreate();
@Test
@@ -110,7 +117,7 @@ public class MiningService {
* 查询redis中这条key是否在3天内已经进行过序列挖掘
* 判断redis(入口表)中是否存在记录,是否过期
* 注意这里如果在序列表中加了记录的过期删除的话需要修改代码哟~
- * @param urlPath
+ * @param urlPath 以 / 分割的一级地址
*/
public void judgeKey(String urlPath) {
if (jedis.exists(urlPath)) {
@@ -147,13 +154,79 @@ public class MiningService {
/**
* 先查ck找到每一个相似的host的client ip和server ip,时间戳
* 每一个host找其时间窗口,构造一行RowFactory
+ * 将构造好的data交给Spark进行序列挖掘
+ * 返回结果处理后写入redis
+ *
+ * redis格式:
+ * key: urlPath field: write_time, candidate 皆为字符型
+ * candidate(String): [host1, host2, host3]
*/
public void sequenceMining(String urlPath) {
// 先查ck找到每一个相似的host
+ String likeUrl = '%' + urlPath + '%';
+ List<AVlog> aVlogs = avListMapper.getHistoryHost(likeUrl);
+
+ ArrayList<Row> data = new ArrayList<>();
+
+ // 对于每一个host查找其对应窗口内的所有host
+ for (AVlog eachLog: aVlogs) {
+ List<TrafficLog> trafficLogList = avListMapper.getRelationListWithHost(eachLog.getFound_time(),
+ eachLog.getClient_ip(),
+ Configurations.getIntProperty(0, "interval"));
+ data.add(RowFactory.create(constructRow(trafficLogList)));
+ }
// 调用spark模块
+ Dataset<Row> sequence = relateHostMiningTask(spark, data);
- // 写会redis
+ // 写回redis
+ List<String> candidateList = buildCandidate(sequence);
+ jedis.hset(urlPath, "write_time", String.valueOf(System.currentTimeMillis()/1000L));
+ long returnStatus = jedis.hset(urlPath, "candidate", candidateList.toString());
+ }
+
+ /**
+ * 工具方法:构造Spark序列挖掘的一行输入
+ */
+ private List<List<String>> constructRow(List<TrafficLog> trafficLogList) {
+ List<List<String>> row = new ArrayList<>();
+ for (TrafficLog trafficLog: trafficLogList) {
+ row.add(Arrays.asList(trafficLog.getHttp_host()));
+ }
+ return row;
}
+ /**
+ * 序列挖掘方法
+ */
+ public Dataset<Row> relateHostMiningTask(SparkSession spark, List<Row> data) {
+ StructType schema = new StructType(new StructField[]{ new StructField(
+ "sequence", new ArrayType(new ArrayType(DataTypes.StringType, true), true),
+ false, Metadata.empty())
+ });
+ Dataset<Row> sequenceDF = spark.createDataFrame(data, schema);
+
+ PrefixSpan prefixSpan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5);
+
+ // Finding frequent sequential patterns
+ Dataset<Row> sequence = prefixSpan.findFrequentSequentialPatterns(sequenceDF);
+ sequence.show();
+ return sequence;
+ }
+
+ /**
+ * 将spark序列挖掘返回的结果进行处理,返回候选集
+ * 如果有需要重排序的也在这一步进行处理
+ * 总感觉Spark用Iterator的处理很慢,不如lambda
+ */
+ public List<String> buildCandidate(Dataset<Row> sequence) {
+ List<String> oneItemList = new ArrayList<>();
+ for(Iterator<Row> iter = sequence.toLocalIterator(); iter.hasNext();) {
+ Row each = iter.next();
+ if (each.size() == 1)
+ oneItemList.add(each.getString(0));
+ else break;
+ }
+ return oneItemList;
+ }
}
diff --git a/src/main/java/cn/ac/iie/mining/prefixSpark.java b/src/main/java/cn/ac/iie/mining/prefixSpark.java
index a8323a6..a77092d 100644
--- a/src/main/java/cn/ac/iie/mining/prefixSpark.java
+++ b/src/main/java/cn/ac/iie/mining/prefixSpark.java
@@ -1,37 +1,50 @@
package cn.ac.iie.mining;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.ml.fpm.PrefixSpan;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
+import org.junit.Test;
public class prefixSpark {
public static void main(String[] args) {
long stime = System.currentTimeMillis();
SparkSession spark = SparkSession
.builder()
- .appName("JavaPrefixSpanExample")
- .master("local[4]")
+ .appName("PrefixTest")
+ //.master("local[4]")
+ .master("spark://localhost:7077")
.getOrCreate();
// $example on$
- //List<Row> data = Arrays.asList(
- // RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3))),
- // RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1,2))),
- // RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5))),
- // RowFactory.create(Arrays.asList(Arrays.asList(6)))
- //);
List<Row> data = Arrays.asList(
+ RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3))),
+ RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1,2))),
+ RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5))),
+ RowFactory.create(Arrays.asList(Arrays.asList(6)))
+ );
+ miningTask(spark, data);
+
+ List<Row> data1 = Arrays.asList(
RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(2), Arrays.asList(2))),
RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(2), Arrays.asList(2), Arrays.asList(1), Arrays.asList(2))),
RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(2), Arrays.asList(5))),
RowFactory.create(Arrays.asList(Arrays.asList(6)))
);
+ miningTask(spark, data1);
+
+ spark.stop();
+ long etime = System.currentTimeMillis();
+ System.out.printf("执行时长:%d 秒.", (etime - stime) / 1000);
+ }
+
+ private static void miningTask(SparkSession spark, List<Row> data) {
StructType schema = new StructType(new StructField[]{ new StructField(
"sequence", new ArrayType(new ArrayType(DataTypes.IntegerType, true), true),
false, Metadata.empty())
@@ -41,11 +54,61 @@ public class prefixSpark {
PrefixSpan prefixSpan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5);
// Finding frequent sequential patterns
- prefixSpan.findFrequentSequentialPatterns(sequenceDF).show();
+ Dataset<Row> sequence = prefixSpan.findFrequentSequentialPatterns(sequenceDF);
+ sequence.show();
// $example off$
+ //List<String> listOne = sequence.as(Encoders.STRING()).collectAsList();
+ //System.out.println(listOne);
+ //
+ //List<String> listTwo = sequence.map((MapFunction<Row, String>) Row::mkString, Encoders.STRING()).collectAsList();
+ //System.out.println(listTwo);
+ }
- spark.stop();
- long etime = System.currentTimeMillis();
- System.out.printf("执行时长:%d 秒.", (etime - stime) / 1000);
+ @Test
+ public void sparkGrammar() {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("SparkSample")
+ .master("local[*]")
+ .getOrCreate();
+ SparkContext sc = spark.sparkContext();
+ sc.setLogLevel("ERROR");
+ //create df
+ List<String> myList = Arrays.asList("one", "two", "three", "four", "five");
+ Dataset<Row> df = spark.createDataset(myList, Encoders.STRING()).toDF();
+ df.show();
+ //using df.as
+ List<String> listOne = df.as(Encoders.STRING()).collectAsList();
+ System.out.println(listOne);
+ //using df.map
+ List<String> listTwo = df.map((MapFunction<Row, String>) row -> row.mkString(), Encoders.STRING()).collectAsList();
+ System.out.println(listTwo);
+
+ List<String> oneItemList = new ArrayList<>();
+ for(Iterator<Row> iter = df.toLocalIterator(); iter.hasNext();) {
+ //String item = (iter.next()).toString();
+ //System.out.println(item.toString());
+ Row each = iter.next();
+ if (each.size() == 1)
+ oneItemList.add(each.getString(0));
+ }
+ System.out.println(oneItemList);
+ System.exit(1);
+
+ List<String> oneItem = new ArrayList<>();
+ df.foreach(row -> {
+ if (row.size() == 1) {
+ oneItem.add(row.getString(0));
+ System.out.println(1);
+ System.out.println(row.getString(0));
+ System.out.println(oneItem.toString());
+ }
+ else {
+ System.out.println(row.size());
+ System.out.println(row.getString(0));
+ System.out.println(2);
+ }
+ });
+ System.out.println("oneItem: " + oneItem.toString());
}
}
diff --git a/src/main/resources/video_data.properties b/src/main/resources/video_data.properties
index 6b4bff5..e7a25f9 100644
--- a/src/main/resources/video_data.properties
+++ b/src/main/resources/video_data.properties
@@ -1,6 +1,10 @@
# kafka地址
bootstrap.servers=192.168.10.28:9092
-
kafka.avc.topic = AV-EVENT-COMPLETED-LOG
kafka.av.topic = AV-EVENT-LOG
-interval = 7 \ No newline at end of file
+
+# 时间窗口
+interval = 22
+
+# Spark地址
+Spark.master=192.168.10.9:7077 \ No newline at end of file
diff --git a/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml b/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml
index 5ed3020..86da199 100644
--- a/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml
+++ b/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml
@@ -50,4 +50,12 @@
select count(url) from `av_event_log_local`
where url like #{urlPath} and recv_time &gt;= #{effectiveTime}
</select>
+
+ <select id="getHistoryHost" resultType="cn.ac.iie.pojo.AVlog">
+ select client_ip, server_ip, url, recv_time, found_time, expire_time
+ from `av_event_log_local`
+ where url like #{host}
+ order by recv_time desc
+ limit 100
+ </select>
</mapper> \ No newline at end of file
diff --git a/target/classes/video_data.properties b/target/classes/video_data.properties
index 6b4bff5..e7a25f9 100644
--- a/target/classes/video_data.properties
+++ b/target/classes/video_data.properties
@@ -1,6 +1,10 @@
# kafka地址
bootstrap.servers=192.168.10.28:9092
-
kafka.avc.topic = AV-EVENT-COMPLETED-LOG
kafka.av.topic = AV-EVENT-LOG
-interval = 7 \ No newline at end of file
+
+# 时间窗口
+interval = 22
+
+# Spark地址
+Spark.master=192.168.10.9:7077 \ No newline at end of file