summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/galaxy/cn/common/CipEntity.java37
-rw-r--r--src/main/java/com/galaxy/cn/common/Entity.java55
-rw-r--r--src/main/java/com/galaxy/cn/common/ObjectEntity.java55
-rw-r--r--src/main/java/com/galaxy/cn/common/ResultEntity.java57
-rw-r--r--src/main/java/com/galaxy/cn/config/CommonConfigurations.java62
-rw-r--r--src/main/java/com/galaxy/cn/config/commonConfig.java40
-rw-r--r--src/main/java/com/galaxy/cn/function/TopNHotItems.java137
-rw-r--r--src/main/java/com/galaxy/cn/function/TopnHotItem.java98
-rw-r--r--src/main/java/com/galaxy/cn/function/metricsCalculate.java30
-rw-r--r--src/main/java/com/galaxy/cn/sink/HbaseSink.java67
-rw-r--r--src/main/java/com/galaxy/recommend/Recommendation.java143
-rw-r--r--src/main/resources/common.properties74
12 files changed, 855 insertions, 0 deletions
diff --git a/src/main/java/com/galaxy/cn/common/CipEntity.java b/src/main/java/com/galaxy/cn/common/CipEntity.java
new file mode 100644
index 0000000..dd6c1ba
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/common/CipEntity.java
@@ -0,0 +1,37 @@
+package com.galaxy.cn.common;
+
+public class CipEntity {
+
+ public long common_recv_time ;
+ public String common_client_list ;
+ public String common_app_label ;
+
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public String getCommon_client_list() {
+ return common_client_list;
+ }
+
+ public void setCommon_client_list(String common_client_list) {
+ this.common_client_list = common_client_list;
+ }
+
+ public String getCommon_app_label() {
+ return common_app_label;
+ }
+
+ public void setCommon_app_label(String common_app_label) {
+ this.common_app_label = common_app_label;
+ }
+
+ public CipEntity() {
+ }
+
+}
diff --git a/src/main/java/com/galaxy/cn/common/Entity.java b/src/main/java/com/galaxy/cn/common/Entity.java
new file mode 100644
index 0000000..e1a8324
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/common/Entity.java
@@ -0,0 +1,55 @@
+package com.galaxy.cn.common;
+
+import java.io.Serializable;
+
+public class Entity implements Serializable {
+
+ public int ifError;
+ public long common_processing_time ;
+ public String common_client_ip ;
+ public String common_app_label ;
+ public long common_sessions;
+
+ public Entity() {
+ }
+
+ public int getIfError() {
+ return ifError;
+ }
+
+ public void setIfError(int ifError) {
+ this.ifError = ifError;
+ }
+
+ public long getCommon_recv_time() {
+ return common_processing_time;
+ }
+
+ public String getCommon_app_label() {
+ return common_app_label;
+ }
+
+ public void setCommon_app_label(String common_app_label) {
+ this.common_app_label = common_app_label;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_processing_time = common_recv_time;
+ }
+
+ public String getCommon_client_ip() {
+ return common_client_ip;
+ }
+
+ public void setCommon_client_ip(String common_client_ip) {
+ this.common_client_ip = common_client_ip;
+ }
+
+ public long getCommon_sessions() {
+ return common_sessions;
+ }
+
+ public void setCommon_sessions(long common_sessions) {
+ this.common_sessions = common_sessions;
+ }
+}
diff --git a/src/main/java/com/galaxy/cn/common/ObjectEntity.java b/src/main/java/com/galaxy/cn/common/ObjectEntity.java
new file mode 100644
index 0000000..f45641b
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/common/ObjectEntity.java
@@ -0,0 +1,55 @@
+package com.galaxy.cn.common;
+
+public class ObjectEntity implements Comparable<ObjectEntity> {
+
+ public String common_client_ip ;
+ public String common_app_label ;
+ public long sessions ;
+ public long common_recv_time ;
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public String getCommon_client_ip() {
+ return common_client_ip;
+ }
+
+ public void setCommon_client_ip(String common_client_ip) {
+ this.common_client_ip = common_client_ip;
+ }
+
+ public String getCommon_app_label() {
+ return common_app_label;
+ }
+
+ public void setCommon_app_label(String common_app_label) {
+ this.common_app_label = common_app_label;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ @Override
+ public int compareTo(ObjectEntity per) {
+ if(this.sessions<per.sessions){
+ return -1 ;
+ }else if(this.sessions>per.sessions){
+ return 1 ;
+ }else{
+ return this.common_client_ip.compareTo(per.common_client_ip) ; // 调用String中的compareTo()方法
+ }
+ }
+
+
+
+}
diff --git a/src/main/java/com/galaxy/cn/common/ResultEntity.java b/src/main/java/com/galaxy/cn/common/ResultEntity.java
new file mode 100644
index 0000000..31d3697
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/common/ResultEntity.java
@@ -0,0 +1,57 @@
+package com.galaxy.cn.common;
+
+public class ResultEntity implements Comparable<ResultEntity> {
+
+ public String common_client_ip ;
+ public long sessions ;
+/*
+ public String common_app_label ;
+ public long common_recv_time ;
+*/
+
+/* public String getCommon_app_label() {
+ return common_app_label;
+ }
+
+ public void setCommon_app_label(String common_app_label) {
+ this.common_app_label = common_app_label;
+ }
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }*/
+
+ public String getCommon_client_ip() {
+ return common_client_ip;
+ }
+
+ public void setCommon_client_ip(String common_client_ip) {
+ this.common_client_ip = common_client_ip;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ @Override
+ public int compareTo(ResultEntity per) {
+ if(this.sessions<per.sessions){
+ return -1 ;
+ }else if(this.sessions>per.sessions){
+ return 1 ;
+ }else{
+ return this.common_client_ip.compareTo(per.common_client_ip) ; // 调用String中的compareTo()方法
+ }
+ }
+
+
+
+}
diff --git a/src/main/java/com/galaxy/cn/config/CommonConfigurations.java b/src/main/java/com/galaxy/cn/config/CommonConfigurations.java
new file mode 100644
index 0000000..88ebc6d
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/config/CommonConfigurations.java
@@ -0,0 +1,62 @@
+package com.galaxy.cn.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class CommonConfigurations {
+
+ private static Properties propService = new Properties();
+
+ public static Map<String,String> getHashTableProperty(String key) {
+
+ Map<String,String> map = new HashMap<>();
+
+
+ String[] keyarray = propService.getProperty(key).split(",");
+ for(String k :keyarray){
+
+ if(k!=null && !"".equals(k.trim())){
+ map.put(k,"");
+ }
+
+ }
+
+ return map;
+ }
+
+ public static String getStringProperty(String key) {
+
+ return propService.getProperty(key);
+
+
+ }
+
+ public static Integer getIntProperty( String key) {
+
+ return Integer.parseInt(propService.getProperty(key));
+
+ }
+
+ public static Long getLongProperty(String key) {
+ return Long.parseLong(propService.getProperty(key));
+
+ }
+
+ public static Boolean getBooleanProperty(String key) {
+ return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ }
+
+ static {
+ try {
+ propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
+ } catch (Exception e) {
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/com/galaxy/cn/config/commonConfig.java b/src/main/java/com/galaxy/cn/config/commonConfig.java
new file mode 100644
index 0000000..b242a31
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/config/commonConfig.java
@@ -0,0 +1,40 @@
+package com.galaxy.cn.config;
+
+import java.util.Map;
+
+/**
+ * Created by wk on 2021/1/6.
+ */
+public class commonConfig {
+
+
+ public static final String SOURCE_KAFKA_BROKER = CommonConfigurations.getStringProperty("source.kafka.broker");
+ public static final String SOURCE_KAFKA_GROUP_ID = CommonConfigurations.getStringProperty("source.kafka.group.id");
+ public static final String SOURCE_KAFKA_TOPIC = CommonConfigurations.getStringProperty("source.kafka.topic");
+ public static final String ZK_HOST = CommonConfigurations.getStringProperty("zk.host");
+
+ public static final int SOURCE_KAFKA_PARALLELISM = CommonConfigurations.getIntProperty("source.kafka.parallelism");
+ public static final int SINK_HBASE_PARALLELISM = CommonConfigurations.getIntProperty("sink.hbase.parallelism");
+ public static final String SINK_HBASE_FM = CommonConfigurations.getStringProperty("sink.hbase.fm");
+ public static final String SINK_HBASE_TABLE = CommonConfigurations.getStringProperty("sink.hbase.table");
+
+ public static final int TASK_PARALLELISM = CommonConfigurations.getIntProperty("task.parallelism");
+
+ public static final int WATERMARK_TIME = CommonConfigurations.getIntProperty("watermark.time");
+ public static final int SLIDINGWINDOW_TIME_MINUTE = CommonConfigurations.getIntProperty("slidingwindow.time.minute");
+ public static final int SLIDINGWINDOWSLOT_TIME_MINUTE = CommonConfigurations.getIntProperty("slidingwindowslot.time.minute");
+ public static final int TOP_LIMIT = CommonConfigurations.getIntProperty("top.limit");
+
+ public static final String KAFKA_USER = CommonConfigurations.getStringProperty("kafka.user");
+ public static final String KAFKA_PIN = CommonConfigurations.getStringProperty("kafka.pin");
+ public static final int KAFKA_SECURITY = CommonConfigurations.getIntProperty("kafka.security");
+ public static final String TOOLS_LIBRARY = CommonConfigurations.getStringProperty("tools.library");
+
+ public static final Boolean no_filter = CommonConfigurations.getBooleanProperty("has.filter");
+ public static final Map<String,String> app_white_list= CommonConfigurations.getHashTableProperty("app.white.list");
+
+ public static final String SESSION_TIMEOUT_MS=CommonConfigurations.getStringProperty("session.timeout.ms");
+ public static final String MAX_POLL_RECORD=CommonConfigurations.getStringProperty("max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES=CommonConfigurations.getStringProperty("max.partition.fetch.bytes");
+
+}
diff --git a/src/main/java/com/galaxy/cn/function/TopNHotItems.java b/src/main/java/com/galaxy/cn/function/TopNHotItems.java
new file mode 100644
index 0000000..f43a257
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/function/TopNHotItems.java
@@ -0,0 +1,137 @@
+package com.galaxy.cn.function;
+
+import com.alibaba.fastjson.JSONObject;
+import com.galaxy.cn.common.ObjectEntity;
+import com.galaxy.cn.common.ResultEntity;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.*;
+
+public class TopNHotItems extends KeyedProcessFunction<Tuple1<String>, ObjectEntity, Tuple3<String,String,Long>> {
+ private final int topSize;
+ // Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
+ private Map<String,PriorityQueue<ResultEntity>> resultMap ;
+ private Map<String,Map<String,Long>> countMap ;
+
+ public TopNHotItems(int i) {
+ this.topSize = i;
+
+
+ }
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.resultMap= new HashMap<>();
+ this.countMap= new HashMap<>();
+
+ }
+
+
+ @Override
+ public void processElement(ObjectEntity objectEntity, Context context, Collector<Tuple3<String,String,Long>> collector) throws Exception {
+ //allSet.add(objectEntity);
+
+
+ if(countMap.containsKey(objectEntity.getCommon_app_label())) {
+
+ PriorityQueue<ResultEntity> queue =resultMap.get(objectEntity.getCommon_app_label());
+ Map<String,Long> map = countMap.get(objectEntity.getCommon_app_label());
+ if (map.size() < topSize) {
+ ResultEntity resultEntity =new ResultEntity();
+ resultEntity.setSessions(objectEntity.getSessions());
+ resultEntity.setCommon_client_ip(objectEntity.getCommon_client_ip());
+ // resultEntity.setCommon_app_label(objectEntity.getCommon_app_label());
+ // resultEntity.setCommon_recv_time(objectEntity.getCommon_recv_time());
+ queue.add(resultEntity);
+ map.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions());
+ } else {
+ if (queue.peek() != null) {
+ ResultEntity res=queue.peek();
+ if (res.getSessions() <= objectEntity.getSessions()) {
+ queue.poll();
+ map.remove(res.getCommon_client_ip());
+ ResultEntity resultEntity =new ResultEntity();
+ resultEntity.setSessions(objectEntity.getSessions());
+ resultEntity.setCommon_client_ip(objectEntity.getCommon_client_ip());
+ // resultEntity.setCommon_app_label(objectEntity.getCommon_app_label());
+ // resultEntity.setCommon_recv_time(objectEntity.getCommon_recv_time());
+ queue.add(resultEntity);
+ map.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions());
+ }
+ }
+ }
+ }
+ else{
+
+ PriorityQueue<ResultEntity> que = new PriorityQueue<>();
+ Map<String,Long> map = new HashMap<>();
+
+ ResultEntity resultEntity =new ResultEntity();
+ resultEntity.setSessions(objectEntity.getSessions());
+ resultEntity.setCommon_client_ip(objectEntity.getCommon_client_ip());
+ // resultEntity.setCommon_app_label(objectEntity.getCommon_app_label());
+ // resultEntity.setCommon_recv_time(objectEntity.getCommon_recv_time());
+ que.add(resultEntity);
+ map.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions());
+ resultMap.put(objectEntity.getCommon_app_label(),que);
+ countMap.put(objectEntity.getCommon_app_label(),map);
+
+ }
+ //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据
+ context.timerService().registerEventTimeTimer(objectEntity.getCommon_recv_time() + 1);
+ }
+
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String,String,Long>> out) throws Exception {
+
+ if(resultMap.size()>0) {
+/*
+ List<Map.Entry<String, Long>> objectlist = new ArrayList<Map.Entry<String, Long>>(resultMap.entrySet());
+
+
+ Collections.sort(objectlist, new Comparator<Map.Entry<String, Long>>() {
+ public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
+ return Math.toIntExact((o2.getValue() - o1.getValue()));
+ }
+ });
+*/
+
+ for (Map.Entry<String, PriorityQueue<ResultEntity>> entry : resultMap.entrySet()) {
+ PriorityQueue<ResultEntity> queue= entry.getValue();
+ String jsonStr = JSONObject.toJSONString(queue);
+ Tuple3<String,String,Long> emit= new Tuple3<>(entry.getKey(),jsonStr,ctx.timestamp());
+ out.collect(emit);
+ }
+ resultMap.clear();
+ countMap.clear();
+ }
+
+ /* if(queue.size()>0) {
+ int i = 0;
+ StringBuilder sb = new StringBuilder();
+ CipEntity ce = new CipEntity();
+ for (ObjectEntity obj : queue) {
+
+ if (i == 0) {
+ ce.setCommon_app_label(obj.getCommon_app_label());
+ ce.setCommon_recv_time(obj.getCommon_recv_time());
+
+ }
+ sb.append(obj.getCommon_client_ip());
+ sb.append(",");
+ i++;
+ if (i >= topSize) {
+ break;
+ }
+ }
+ ce.setCommon_client_list(sb.toString());
+ out.collect(ce);
+ queue.clear();
+ }*/
+
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/galaxy/cn/function/TopnHotItem.java b/src/main/java/com/galaxy/cn/function/TopnHotItem.java
new file mode 100644
index 0000000..02f1fab
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/function/TopnHotItem.java
@@ -0,0 +1,98 @@
+package com.galaxy.cn.function;
+
+import com.alibaba.fastjson.JSONObject;
+import com.galaxy.cn.common.ObjectEntity;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.*;
+
+public class TopnHotItem extends KeyedProcessFunction<Tuple1<String>, ObjectEntity, Tuple3<String,String,Long>> {
+ private final int topSize;
+ // Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
+ private PriorityQueue<ObjectEntity> queue ;
+ private Map<String,Long> resultMap ;
+
+ public TopnHotItem(int i) {
+ this.topSize = i;
+
+
+ }
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.queue= new PriorityQueue<>();
+ this.resultMap= new HashMap<>();
+ }
+
+
+ @Override
+ public void processElement(ObjectEntity objectEntity, Context context, Collector<Tuple3<String,String,Long>> collector) throws Exception {
+ //allSet.add(objectEntity);
+
+ if (resultMap.size() < topSize) {
+ queue.add(objectEntity);
+ resultMap.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions());
+ } else {
+ if(queue.peek()!=null){
+ if (queue.peek().getSessions()<objectEntity.getSessions()) {
+
+ resultMap.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions());
+ queue.poll();
+ queue.add(objectEntity);
+ }
+ }
+ }
+ //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据
+ context.timerService().registerEventTimeTimer(objectEntity.getCommon_recv_time() + 1);
+
+ }
+
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String,String,Long>> out) throws Exception {
+
+ if(resultMap.size()>0) {
+ List<Map.Entry<String, Long>> objectlist = new ArrayList<Map.Entry<String, Long>>(resultMap.entrySet());
+
+
+ Collections.sort(objectlist, new Comparator<Map.Entry<String, Long>>() {
+ public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
+ return Math.toIntExact((o2.getValue() - o1.getValue()));
+ }
+ });
+
+ String jsonStr = JSONObject.toJSONString(objectlist);
+
+ Tuple3<String,String,Long> emit= new Tuple3<>(ctx.getCurrentKey().f0,jsonStr,ctx.timestamp());
+ out.collect(emit);
+ queue.clear();
+ resultMap.clear();
+ }
+ /* if(queue.size()>0) {
+ int i = 0;
+ StringBuilder sb = new StringBuilder();
+ CipEntity ce = new CipEntity();
+ for (ObjectEntity obj : queue) {
+
+ if (i == 0) {
+ ce.setCommon_app_label(obj.getCommon_app_label());
+ ce.setCommon_recv_time(obj.getCommon_recv_time());
+
+ }
+ sb.append(obj.getCommon_client_ip());
+ sb.append(",");
+ i++;
+ if (i >= topSize) {
+ break;
+ }
+ }
+ ce.setCommon_client_list(sb.toString());
+ out.collect(ce);
+ queue.clear();
+ }*/
+
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/galaxy/cn/function/metricsCalculate.java b/src/main/java/com/galaxy/cn/function/metricsCalculate.java
new file mode 100644
index 0000000..55f8d76
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/function/metricsCalculate.java
@@ -0,0 +1,30 @@
+package com.galaxy.cn.function;
+
+import com.galaxy.cn.common.Entity;
+import com.galaxy.cn.common.ObjectEntity;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+public class metricsCalculate extends ProcessWindowFunction<
+ Entity, // 输入类型
+ ObjectEntity, // 输出类型
+ Tuple2<String,String>, // 键类型
+ TimeWindow> { // 窗口类型
+ @Override
+ public void process(Tuple2<String,String> s,
+ Context context,
+ Iterable<Entity> elements, Collector<ObjectEntity> out) throws Exception {
+
+ ObjectEntity objEntity= new ObjectEntity();
+ objEntity.setCommon_recv_time(context.window().getEnd()/1000);
+ objEntity.setCommon_app_label(s.f0);
+ objEntity.setCommon_client_ip(s.f1);
+
+ for (Entity event : elements) {
+ objEntity.sessions+=event.common_sessions;
+ }
+ out.collect(objEntity);
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/galaxy/cn/sink/HbaseSink.java b/src/main/java/com/galaxy/cn/sink/HbaseSink.java
new file mode 100644
index 0000000..4a14b34
--- /dev/null
+++ b/src/main/java/com/galaxy/cn/sink/HbaseSink.java
@@ -0,0 +1,67 @@
+package com.galaxy.cn.sink;
+
+import com.galaxy.cn.config.commonConfig;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+
+import static cn.hutool.crypto.SecureUtil.md5;
+
+public class HbaseSink extends RichSinkFunction<Tuple3<String,String,Long>> implements Serializable, SinkFunction<Tuple3<String, String,Long>> {
+ private Logger log;
+
+ private String hbase_zookeeper_host;
+
+ private Connection connection;
+ private Admin admin;
+
+ public HbaseSink(String hbase_zookeeper_host) {
+ this.hbase_zookeeper_host = hbase_zookeeper_host;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ log = Logger.getLogger(HbaseSink.class);
+
+ org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
+
+ connection = ConnectionFactory.createConnection(configuration);
+ admin = connection.getAdmin();
+ }
+
+ public void invoke(Tuple3<String,String,Long> data, Context context) throws Exception {
+ // 按 project:table 归纳
+
+
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(commonConfig.SINK_HBASE_TABLE));
+ Put put = new Put(Bytes.toBytes(md5(data.f0)));
+ put.addColumn(Bytes.toBytes(commonConfig.SINK_HBASE_FM), Bytes.toBytes("app_label"), Bytes.toBytes(data.f0));
+ put.addColumn(Bytes.toBytes(commonConfig.SINK_HBASE_FM), Bytes.toBytes("client_ip_list"), Bytes.toBytes(data.f1));
+ put.addColumn(Bytes.toBytes(commonConfig.SINK_HBASE_FM), Bytes.toBytes("last_update_time"), Bytes.toBytes(data.f2));
+ table.put(put);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+}
diff --git a/src/main/java/com/galaxy/recommend/Recommendation.java b/src/main/java/com/galaxy/recommend/Recommendation.java
new file mode 100644
index 0000000..da19bf2
--- /dev/null
+++ b/src/main/java/com/galaxy/recommend/Recommendation.java
@@ -0,0 +1,143 @@
+package com.galaxy.recommend;
+
+import com.alibaba.fastjson.JSON;
+import com.galaxy.cn.common.Entity;
+import com.galaxy.cn.common.ObjectEntity;
+import com.galaxy.cn.config.commonConfig;
+import com.galaxy.cn.function.TopNHotItems;
+import com.galaxy.cn.function.metricsCalculate;
+import com.galaxy.cn.sink.HbaseSink;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.common.config.SslConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Properties;
+
+public class Recommendation {
+ private static final Logger LOG = LoggerFactory.getLogger(Recommendation.class);
+
+ public static void main(String[] args) throws Exception {
+
+ //1.创建执行环境
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ //指定使用事件时间
+ //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+ String topic = parameterTool.get("topic", commonConfig.SOURCE_KAFKA_TOPIC);
+ Properties properties = parameterTool.getProperties();
+ properties.setProperty("group.id", commonConfig.SOURCE_KAFKA_GROUP_ID);
+ properties.setProperty("bootstrap.servers", commonConfig.SOURCE_KAFKA_BROKER);
+ properties.setProperty("session.timeout.ms", commonConfig.SESSION_TIMEOUT_MS);
+ properties.setProperty("max.poll.records", commonConfig.MAX_POLL_RECORD);
+ properties.setProperty("max.partition.fetch.bytes", commonConfig.MAX_PARTITION_FETCH_BYTES);
+
+ switch (commonConfig.KAFKA_SECURITY) {
+ case 1:
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", commonConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", commonConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", commonConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", commonConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", commonConfig.KAFKA_PIN);
+ break;
+ case 2:
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + commonConfig.KAFKA_USER + " password=" + commonConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+ DataStream<String> source = env.addSource(new FlinkKafkaConsumer<String>(
+ topic,
+ new SimpleStringSchema(), properties)).setParallelism(commonConfig.SOURCE_KAFKA_PARALLELISM);
+ WatermarkStrategy<Entity> strategy = WatermarkStrategy
+ .<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(commonConfig.WATERMARK_TIME))
+ .withTimestampAssigner((Entity, timestamp) -> Entity.getCommon_recv_time()*1000);
+
+
+ SingleOutputStreamOperator<Entity> input = source.map(new MapFunction<String, Entity>() {
+ @Override
+ public Entity map(String message) {
+ Entity entity =new Entity();
+ try {
+ entity = JSON.parseObject(message, Entity.class);
+
+
+ if(!commonConfig.no_filter){
+
+ if(entity.getCommon_app_label()==null ||"".equals(entity.getCommon_app_label().trim())){
+ entity.setIfError(1);
+ }
+ }
+ else{
+ if(!commonConfig.app_white_list.containsKey(entity.getCommon_app_label())){
+ entity.setIfError(1);
+ }
+ }
+ }
+ catch (Exception e){
+ LOG.error("Entity Parsing ERROR");
+ entity.setIfError(1);
+ }
+ return entity;
+ }
+ }).filter(new FilterFunction<Entity>() {
+ @Override
+ public boolean filter(Entity entity) throws Exception {
+
+ return entity.ifError!=1;
+ }
+ });
+
+
+
+ KeyedStream<Entity, Tuple2<String,String>> keyedStream = input.assignTimestampsAndWatermarks(strategy).keyBy(new MyKeySelector());
+
+ SingleOutputStreamOperator<ObjectEntity> windowedStream = keyedStream
+ .window(SlidingEventTimeWindows.of(Time.minutes(commonConfig.SLIDINGWINDOW_TIME_MINUTE), Time.minutes(commonConfig.SLIDINGWINDOWSLOT_TIME_MINUTE)))
+ .process(new metricsCalculate());
+
+ SingleOutputStreamOperator<Tuple3<String,String,Long>> windoweddStream = windowedStream.keyBy(new oneKeySelector())
+ .process(new TopNHotItems(commonConfig.TOP_LIMIT));
+ windoweddStream.addSink(new HbaseSink(commonConfig.ZK_HOST)).setParallelism(commonConfig.SINK_HBASE_PARALLELISM);
+ env.setParallelism(commonConfig.TASK_PARALLELISM);
+ env.execute("RECOMMENDATION-APP-CIP");
+
+ }
+ public static class MyKeySelector implements KeySelector<Entity,Tuple2<String,String>> {
+
+ @Override
+ public Tuple2<String,String> getKey(Entity entity) throws Exception {
+ return new Tuple2<>(entity.getCommon_app_label(),entity.getCommon_client_ip());
+ }
+ }
+ public static class oneKeySelector implements KeySelector<ObjectEntity,Tuple1<String>> {
+
+ @Override
+ public Tuple1<String> getKey(ObjectEntity entity) throws Exception {
+ return new Tuple1<>(entity.getCommon_app_label());
+ }
+ }
+
+
+} \ No newline at end of file
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
new file mode 100644
index 0000000..d3f24f4
--- /dev/null
+++ b/src/main/resources/common.properties
@@ -0,0 +1,74 @@
+#kafka�ĵ�ַ��Ϣ
+source.kafka.broker=192.168.44.11:9094
+source.kafka.group.id =vpn-1206-1
+source.kafka.topic=SESSION-RECORD-COMPLETED
+source.kafka.parallelism=1
+max.poll.records=3000
+session.timeout.ms=60000
+max.partition.fetch.bytes=31457280
+#hbase��zk��ַ
+zk.host=192.168.44.12:2181
+#д��hbase���ж�
+sink.hbase.parallelism=1
+#д��hbase�д�
+sink.hbase.fm=common
+#�hbase����
+sink.hbase.table=tsg_galaxy:recommendation_app_cip
+#�����ж�
+task.parallelism=1
+#�����ӳٵȴ�ʱ�䵥λ��
+watermark.time=1
+#top�������
+top.limit=2
+#����������ʱ�䵥λ����
+slidingwindow.time.minute=30
+#ÿ������ʱ�䵥λ����
+slidingwindowslot.time.minute=1
+#kafka�Ƿ�����ȫ��֤ 0������ 1SSL 2 SASL
+kafka.security=2
+#kafka SASL��֤�û���
+kafka.user=admin
+#kafka SASL��SSL��֤����
+kafka.pin=galaxy2019
+#1SSL��Ҫ
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+#�Ƿ����ȫ��app �޹�������false ����������true
+has.filter=false
+#ֻ����filter���е�common_app_label���ŷָ� baidu.com,qq �ɲ���д
+app.white.list=
+
+#source.kafka.broker=10.111.136.55:9092,10.111.136.56:9092,10.111.136.57:9092,10.111.136.58:9092,10.111.136.59:9092,10.111.136.60:9092,10.111.136.61:9092,10.111.136.62:9092,10.111.136.63:9092,10.111.136.64:9092,10.111.136.65:9092,10.111.136.66:9092,10.111.136.67:9092,10.111.136.68:9092,10.111.136.69:9092,10.111.136.70:9092,10.111.136.71:9092,10.111.136.72:9092,10.111.136.73:9092,10.111.136.74:9092,10.111.136.75:9092,10.111.136.76:9092,10.111.136.77:9092,10.111.136.78:9092,10.111.136.79:9092,10.111.136.80:9092,10.111.136.81:9092,10.111.136.82:9092,10.111.136.83:9092,10.111.136.84:9092,10.111.136.85:9092,10.111.136.86:9092,10.111.136.87:9092,10.111.136.88:9092,10.111.136.89:9092,10.111.136.90:9092,10.111.136.91:9092,10.111.136.92:9092,10.111.136.93:9092,10.111.136.94:9092,10.111.136.95:9092,10.111.136.96:9092,10.111.136.97:9092,10.111.136.98:9092,10.111.136.99:9092,10.111.136.100:9092,10.111.136.101:9092,10.111.136.102:9092,10.111.136.103:9092,10.111.136.104:9092
+##source.kafka.broker=10.111.200.135:9092,10.111.200.136:9092,10.111.200.137:9092,10.111.200.138:9092,10.111.200.139:9092,10.111.200.140:9092,10.111.200.141:9092,10.111.200.142:9092,10.111.200.143:9092,10.111.200.144:9092,10.111.200.145:9092,10.111.200.146:9092,10.111.200.147:9092,10.111.200.148:9092,10.111.200.149:9092,10.111.200.150:9092,10.111.200.151:9092,10.111.200.152:9092,10.111.200.153:9092,10.111.200.154:9092,10.111.200.155:9092,10.111.200.156:9092,10.111.200.158:9092,10.111.200.159:9092,10.111.200.160:9092,10.111.200.161:9092,10.111.200.162:9092,10.111.200.163:9092,10.111.200.164:9092
+#source.kafka.group.id=vpn-1120-1
+#source.kafka.topic=test
+##source.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
+#source.kafka.parallelism=60
+#sink.hbase.parallelism=60
+#sink.hbase.fm=common
+#sink.hbase.table=tsg_galaxy:recommendation_app_cip
+#
+#task.parallelism=60
+#watermark.time=30
+#top.limit=10000
+#zk.host=10.111.200.165,10.111.200.166,10.111.200.167,10.111.200.168,10.111.200.169
+#zk.port=2181
+#slidingwindow.time.minute=30
+#slidingwindowslot.time.minute=1
+#
+#
+#
+#
+#kafka.security=0
+##kafka SASL��֤�û���
+#kafka.user=admin
+##kafka SASL��SSL��֤����
+#kafka.pin=galaxy2019
+##1SSL��Ҫ
+#tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+#
+#
+##�Ƿ����ȫ��app �޹�������false ����������true
+#has.filter=false
+##ֻ����filter���е�common_app_label���ŷָ� baidu.com,qq
+#app.white.list=,
+#