diff options
| author | HEATAO <[email protected]> | 2021-09-29 17:25:48 +0800 |
|---|---|---|
| committer | HEATAO <[email protected]> | 2021-09-29 17:25:48 +0800 |
| commit | 0e56eea95c75cb23a80e3a8ed7fbbd6c2028d83a (patch) | |
| tree | e86483026cef30b54df3d1cad0851002325f6006 | |
| parent | db3f315053dded43d345e41bfae8574378ed7ca3 (diff) | |
add MiningService
| -rw-r--r-- | .idea/inspectionProfiles/Project_Default.xml | 10 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/mapper/AVDataListMapper.java | 2 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml | 8 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/mining/MiningService.java | 83 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/mining/prefixSpark.java | 95 | ||||
| -rw-r--r-- | src/main/resources/video_data.properties | 8 | ||||
| -rw-r--r-- | target/classes/cn/ac/iie/mapper/AVDataListMapper.xml | 8 | ||||
| -rw-r--r-- | target/classes/video_data.properties | 8 |
8 files changed, 197 insertions, 25 deletions
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..58ad3a5 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,10 @@ +<component name="InspectionProjectProfileManager"> + <profile version="1.0"> + <option name="myName" value="Project Default" /> + <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true"> + <Languages> + <language minSize="46" name="Java" /> + </Languages> + </inspection_tool> + </profile> +</component>
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java index 6fbbd28..e1491ef 100644 --- a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java +++ b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.java @@ -23,4 +23,6 @@ public interface AVDataListMapper { public List<AVlog> statisticsUA(); public int cumulativeData(@Param("urlPath") String urlPath, @Param("effectiveTime") long effectiveTime); + + public List<AVlog> getHistoryHost(@Param("host") String host); } diff --git a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml index 5ed3020..86da199 100644 --- a/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml +++ b/src/main/java/cn/ac/iie/mapper/AVDataListMapper.xml @@ -50,4 +50,12 @@ select count(url) from `av_event_log_local` where url like #{urlPath} and recv_time >= #{effectiveTime} </select> + + <select id="getHistoryHost" resultType="cn.ac.iie.pojo.AVlog"> + select client_ip, server_ip, url, recv_time, found_time, expire_time + from `av_event_log_local` + where url like #{host} + order by recv_time desc + limit 100 + </select> </mapper>
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/mining/MiningService.java b/src/main/java/cn/ac/iie/mining/MiningService.java index 3dd1aac..a64b1d7 100644 --- a/src/main/java/cn/ac/iie/mining/MiningService.java +++ b/src/main/java/cn/ac/iie/mining/MiningService.java @@ -3,8 +3,11 @@ package cn.ac.iie.mining; import cn.ac.iie.Utils.JedisPoolUtils; import cn.ac.iie.Utils.SqlSessionFactoryUtils; import cn.ac.iie.Utils.URLHandleUtils; +import cn.ac.iie.config.Configurations; import cn.ac.iie.config.GetConf; import cn.ac.iie.mapper.AVDataListMapper; +import cn.ac.iie.pojo.AVlog; +import cn.ac.iie.pojo.TrafficLog; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.ibatis.session.SqlSession; @@ -12,13 +15,17 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.spark.ml.fpm.PrefixSpan; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.*; import org.junit.Test; import redis.clients.jedis.Jedis; import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,7 +70,7 @@ public class MiningService { private static SparkSession spark = SparkSession .builder() .appName("PrefixSpanHosts") - //.master("local[4]") + .master(Configurations.getStringProperty(0, "Spark.master")) .getOrCreate(); @Test @@ -110,7 +117,7 @@ public class MiningService { * 查询redis中这条key是否在3天内已经进行过序列挖掘 * 判断redis(入口表)中是否存在记录,是否过期 * 注意这里如果在序列表中加了记录的过期删除的话需要修改代码哟~ - * @param urlPath + * @param urlPath 以 / 分割的一级地址 */ public void judgeKey(String urlPath) { if (jedis.exists(urlPath)) { @@ -147,13 +154,79 @@ public class MiningService { /** * 先查ck找到每一个相似的host的client ip和server ip,时间戳 * 每一个host找其时间窗口,构造一行RowFactory + * 将构造好的data交给Spark进行序列挖掘 + * 返回结果处理后写入redis + * + * redis格式: + * key: urlPath field: write_time, candidate 皆为字符型 + * candidate(String): [host1, host2, host3] */ public void sequenceMining(String urlPath) { // 先查ck找到每一个相似的host + String likeUrl = '%' + urlPath + '%'; + List<AVlog> aVlogs = avListMapper.getHistoryHost(likeUrl); + + ArrayList<Row> data = new ArrayList<>(); + + // 对于每一个host查找其对应窗口内的所有host + for (AVlog eachLog: aVlogs) { + List<TrafficLog> trafficLogList = avListMapper.getRelationListWithHost(eachLog.getFound_time(), + eachLog.getClient_ip(), + Configurations.getIntProperty(0, "interval")); + data.add(RowFactory.create(constructRow(trafficLogList))); + } // 调用spark模块 + Dataset<Row> sequence = relateHostMiningTask(spark, data); - // 写会redis + // 写回redis + List<String> candidateList = buildCandidate(sequence); + jedis.hset(urlPath, "write_time", String.valueOf(System.currentTimeMillis()/1000L)); + long returnStatus = jedis.hset(urlPath, "candidate", candidateList.toString()); + } + + /** + * 工具方法:构造Spark序列挖掘的一行输入 + */ + private List<List<String>> constructRow(List<TrafficLog> trafficLogList) { + List<List<String>> row = new ArrayList<>(); + for (TrafficLog trafficLog: trafficLogList) { + row.add(Arrays.asList(trafficLog.getHttp_host())); + } + return row; } + /** + * 序列挖掘方法 + */ + public Dataset<Row> relateHostMiningTask(SparkSession spark, List<Row> data) { + StructType schema = new StructType(new StructField[]{ new StructField( + "sequence", new ArrayType(new ArrayType(DataTypes.StringType, true), true), + false, Metadata.empty()) + }); + Dataset<Row> sequenceDF = spark.createDataFrame(data, schema); + + PrefixSpan prefixSpan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5); + + // Finding frequent sequential patterns + Dataset<Row> sequence = prefixSpan.findFrequentSequentialPatterns(sequenceDF); + sequence.show(); + return sequence; + } + + /** + * 将spark序列挖掘返回的结果进行处理,返回候选集 + * 如果有需要重排序的也在这一步进行处理 + * 总感觉Spark用Iterator的处理很慢,不如lambda + */ + public List<String> buildCandidate(Dataset<Row> sequence) { + List<String> oneItemList = new ArrayList<>(); + for(Iterator<Row> iter = sequence.toLocalIterator(); iter.hasNext();) { + Row each = iter.next(); + if (each.size() == 1) + oneItemList.add(each.getString(0)); + else break; + } + return oneItemList; + } } diff --git a/src/main/java/cn/ac/iie/mining/prefixSpark.java b/src/main/java/cn/ac/iie/mining/prefixSpark.java index a8323a6..a77092d 100644 --- a/src/main/java/cn/ac/iie/mining/prefixSpark.java +++ b/src/main/java/cn/ac/iie/mining/prefixSpark.java @@ -1,37 +1,50 @@ package cn.ac.iie.mining; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.ml.fpm.PrefixSpan; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.apache.spark.sql.types.*; +import org.junit.Test; public class prefixSpark { public static void main(String[] args) { long stime = System.currentTimeMillis(); SparkSession spark = SparkSession .builder() - .appName("JavaPrefixSpanExample") - .master("local[4]") + .appName("PrefixTest") + //.master("local[4]") + .master("spark://localhost:7077") .getOrCreate(); // $example on$ - //List<Row> data = Arrays.asList( - // RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3))), - // RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1,2))), - // RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5))), - // RowFactory.create(Arrays.asList(Arrays.asList(6))) - //); List<Row> data = Arrays.asList( + RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3))), + RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1,2))), + RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5))), + RowFactory.create(Arrays.asList(Arrays.asList(6))) + ); + miningTask(spark, data); + + List<Row> data1 = Arrays.asList( RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(2), Arrays.asList(2))), RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(2), Arrays.asList(2), Arrays.asList(1), Arrays.asList(2))), RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(2), Arrays.asList(5))), RowFactory.create(Arrays.asList(Arrays.asList(6))) ); + miningTask(spark, data1); + + spark.stop(); + long etime = System.currentTimeMillis(); + System.out.printf("执行时长:%d 秒.", (etime - stime) / 1000); + } + + private static void miningTask(SparkSession spark, List<Row> data) { StructType schema = new StructType(new StructField[]{ new StructField( "sequence", new ArrayType(new ArrayType(DataTypes.IntegerType, true), true), false, Metadata.empty()) @@ -41,11 +54,61 @@ public class prefixSpark { PrefixSpan prefixSpan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5); // Finding frequent sequential patterns - prefixSpan.findFrequentSequentialPatterns(sequenceDF).show(); + Dataset<Row> sequence = prefixSpan.findFrequentSequentialPatterns(sequenceDF); + sequence.show(); // $example off$ + //List<String> listOne = sequence.as(Encoders.STRING()).collectAsList(); + //System.out.println(listOne); + // + //List<String> listTwo = sequence.map((MapFunction<Row, String>) Row::mkString, Encoders.STRING()).collectAsList(); + //System.out.println(listTwo); + } - spark.stop(); - long etime = System.currentTimeMillis(); - System.out.printf("执行时长:%d 秒.", (etime - stime) / 1000); + @Test + public void sparkGrammar() { + SparkSession spark = SparkSession + .builder() + .appName("SparkSample") + .master("local[*]") + .getOrCreate(); + SparkContext sc = spark.sparkContext(); + sc.setLogLevel("ERROR"); + //create df + List<String> myList = Arrays.asList("one", "two", "three", "four", "five"); + Dataset<Row> df = spark.createDataset(myList, Encoders.STRING()).toDF(); + df.show(); + //using df.as + List<String> listOne = df.as(Encoders.STRING()).collectAsList(); + System.out.println(listOne); + //using df.map + List<String> listTwo = df.map((MapFunction<Row, String>) row -> row.mkString(), Encoders.STRING()).collectAsList(); + System.out.println(listTwo); + + List<String> oneItemList = new ArrayList<>(); + for(Iterator<Row> iter = df.toLocalIterator(); iter.hasNext();) { + //String item = (iter.next()).toString(); + //System.out.println(item.toString()); + Row each = iter.next(); + if (each.size() == 1) + oneItemList.add(each.getString(0)); + } + System.out.println(oneItemList); + System.exit(1); + + List<String> oneItem = new ArrayList<>(); + df.foreach(row -> { + if (row.size() == 1) { + oneItem.add(row.getString(0)); + System.out.println(1); + System.out.println(row.getString(0)); + System.out.println(oneItem.toString()); + } + else { + System.out.println(row.size()); + System.out.println(row.getString(0)); + System.out.println(2); + } + }); + System.out.println("oneItem: " + oneItem.toString()); } } diff --git a/src/main/resources/video_data.properties b/src/main/resources/video_data.properties index 6b4bff5..e7a25f9 100644 --- a/src/main/resources/video_data.properties +++ b/src/main/resources/video_data.properties @@ -1,6 +1,10 @@ # kafka地址 bootstrap.servers=192.168.10.28:9092 - kafka.avc.topic = AV-EVENT-COMPLETED-LOG kafka.av.topic = AV-EVENT-LOG -interval = 7
\ No newline at end of file + +# 时间窗口 +interval = 22 + +# Spark地址 +Spark.master=192.168.10.9:7077
\ No newline at end of file diff --git a/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml b/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml index 5ed3020..86da199 100644 --- a/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml +++ b/target/classes/cn/ac/iie/mapper/AVDataListMapper.xml @@ -50,4 +50,12 @@ select count(url) from `av_event_log_local` where url like #{urlPath} and recv_time >= #{effectiveTime} </select> + + <select id="getHistoryHost" resultType="cn.ac.iie.pojo.AVlog"> + select client_ip, server_ip, url, recv_time, found_time, expire_time + from `av_event_log_local` + where url like #{host} + order by recv_time desc + limit 100 + </select> </mapper>
\ No newline at end of file diff --git a/target/classes/video_data.properties b/target/classes/video_data.properties index 6b4bff5..e7a25f9 100644 --- a/target/classes/video_data.properties +++ b/target/classes/video_data.properties @@ -1,6 +1,10 @@ # kafka地址 bootstrap.servers=192.168.10.28:9092 - kafka.avc.topic = AV-EVENT-COMPLETED-LOG kafka.av.topic = AV-EVENT-LOG -interval = 7
\ No newline at end of file + +# 时间窗口 +interval = 22 + +# Spark地址 +Spark.master=192.168.10.9:7077
\ No newline at end of file |
