summaryrefslogtreecommitdiff
path: root/src/test/java/cn/ac/iie/mining/MiningServiceTest.java
blob: 06bfca4db9b761b803a4654f0006c342c8f85f1e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package cn.ac.iie.mining;

import cn.ac.iie.Utils.JedisPoolUtils;
import cn.ac.iie.Utils.SqlSessionFactoryUtils;
import cn.ac.iie.Utils.URLHandleUtils;
import cn.ac.iie.config.GetConf;
import cn.ac.iie.mapper.AVDataListMapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.ibatis.session.SqlSession;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Test;
import redis.clients.jedis.Jedis;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;


public class MiningServiceTest {
    @Test
    public void consumeKafka() {
        Properties properties=new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", GetConf.BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // max.poll.records设置的条数要在一定的时间间隔处理完
        properties.put("max.poll.records", 50);
        // 自动提交
        properties.put("enable.auto.commit", "false");
        // 测试的时候设置为从头开始
        properties.put("auto.offset.reset", "earliest");

        KafkaConsumer<String,String> consumer=new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(GetConf.KAFKA_AV_TOPIC));

        try {
            ObjectMapper objectMapper = new ObjectMapper();

            // 在小数据规模下,不需要用set来去重,记录上一个记录就可以了
            String prevHost = "";

            // 拉取消息并消费
            for (int i = 0; i < 30; i++) {
                ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String,String> record:records){
                    JsonNode jsonNode = objectMapper.readTree(record.value());

                    // 查找有数据的部分
                    JsonNode urlNode = jsonNode.get("url");
                    if (jsonNode.get("dir").asInt() == 2 || urlNode == null)
                        continue;
                    String urlPath = URLHandleUtils.getFirstURLPath(urlNode.asText());
                    if (prevHost.equals(urlPath))
                        continue;
                    prevHost = urlPath;

                    // 交给查询模块进行处理
                    System.out.println(urlPath);
                }
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }

    String urlPath = "test.com";

    @Test
    public void judgeKey() {
        JedisPoolUtils.getJedisPoolInstance();
        Jedis jedis = JedisPoolUtils.getJedis();
        if (jedis.exists(urlPath)) {
            // 存在的话再判断一下时间过期没有
            long writeTime = Long.parseLong(jedis.hget(urlPath, "write_time"));
            long nowTime = System.currentTimeMillis()/1000L;
            long earlyMorning3 = nowTime - (nowTime + 8 * 3600) % 86400 - 86400 * 2;
            if (earlyMorning3 > writeTime) {
                // 序列挖掘
                //sequenceMining(urlPath);
                System.out.println("需要进行序列挖掘");
            }
            else {
                System.out.println(String.format("不需要进行序列挖掘,上次的时间为%s", writeTime));
            }
        }
        JedisPoolUtils.releaseResource(jedis);
    }

    @Test
    public void isMore10() {
        AVDataListMapper avListMapper;
        SqlSession sqlSession = SqlSessionFactoryUtils.getSqlSessionFactory().openSession();
        avListMapper = sqlSession.getMapper(AVDataListMapper.class);
        String likeUrl = '%' + urlPath + '%';
        long effectiveTime = System.currentTimeMillis()/1000L - 3000000;
        int cnt = avListMapper.cumulativeData(likeUrl, effectiveTime);
        System.out.println(cnt);
    }

    @Test
    public void sequenceMining() {
    }

    @Test
    public void relateHostMiningTask() {
    }

    @Test
    public void buildCandidate() {
    }
}