summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorfengyi <[email protected]>2023-03-08 14:47:09 +0800
committerfengyi <[email protected]>2023-03-08 14:47:09 +0800
commitdc1f5a8af5ef4a601f9a3f7c4dcb32b93f5aa0ef (patch)
treec67f9d6ed32d45e05e5eb19357ade94277fc08f7 /src
parent4965ac02310f0a311754a34c855f49eed557e46d (diff)
初步增加datasketch方法处理topn
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/galaxy/tsg/Toptask.java58
-rw-r--r--src/main/java/com/galaxy/tsg/function/Dimension.java80
-rw-r--r--src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java43
-rw-r--r--src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java79
-rw-r--r--src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg5.java217
-rw-r--r--src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java99
-rw-r--r--src/main/resources/common.properties2
7 files changed, 577 insertions, 1 deletions
diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java
index f0030bf..e6d6d0f 100644
--- a/src/main/java/com/galaxy/tsg/Toptask.java
+++ b/src/main/java/com/galaxy/tsg/Toptask.java
@@ -13,11 +13,13 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -225,6 +227,62 @@ public class Toptask {
break;
case 2:
//datasketch
+
+
+ //Session_record top1000 21个窗口一并计算
+ SingleOutputStreamOperator<Entity> clientipdStream2 = inputForSession.filter(new FilterFunction<Entity>() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+
+ AllWindowedStream<Entity, TimeWindow> entityTimeWindowAllWindowedStream = clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)));
+ SingleOutputStreamOperator<String> aggregate = entityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg5(), new UserCountWindowResult5());
+ aggregate.print();
+
+
+
+
+
+
+ //Security_record top 1000 1个窗口、proxy_record top 1000 1个窗口
+ SingleOutputStreamOperator<UrlEntity> UrlStream2 = inputForUrl.filter(new FilterFunction<UrlEntity>() {
+ @Override
+ public boolean filter(UrlEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getHttp_url());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSecurity);
+
+
+ AllWindowedStream<UrlEntity, TimeWindow> urlEntityTimeWindowAllWindowedStream = UrlStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)));
+ SingleOutputStreamOperator<String> aggregate1 = urlEntityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg6(), new UserCountWindowResult5());
+ aggregate1.print();
+
+
+
+ //clientip聚合TOP
+
+// SingleOutputStreamOperator<Entity> clientipdStream3 = inputForSession.filter(new FilterFunction<Entity>() {
+// @Override
+// public boolean filter(Entity value) throws Exception {
+// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
+// }
+// }).assignTimestampsAndWatermarks(strategyForSession);
+//
+// SingleOutputStreamOperator<ResultEntity> windowedStream3 = clientipdStream3.keyBy(new groupBySelector("common_client_ip"))
+// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+// .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
+// DataStream<String> windoweddStream3 = windowedStream3.keyBy(new oneKeySelector())
+// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
+// windoweddStream3.print();
+
+
+
+
+
+
break;
}
diff --git a/src/main/java/com/galaxy/tsg/function/Dimension.java b/src/main/java/com/galaxy/tsg/function/Dimension.java
new file mode 100644
index 0000000..bc5a683
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/Dimension.java
@@ -0,0 +1,80 @@
+package com.galaxy.tsg.function;
+
+
+import com.galaxy.tsg.pojo.Entity;
+import com.galaxy.tsg.pojo.UrlEntity;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/2/17 10:14
+ * 各种维度
+ */
+public class Dimension {
+
+ public static final String ONE = "common_client_ip,common_vsys_id,common_device_group,common_data_center";
+ public static final String TWO = "common_server_ip,common_vsys_id,common_device_group,common_data_center";
+ public static final String THREE = "common_internal_ip,common_vsys_id,common_device_group,common_data_center";
+ public static final String FOUR = "common_external_ip,common_vsys_id,common_device_group,common_data_center";
+ public static final String FIVE = "http_domain,common_vsys_id,common_device_group,common_data_center";
+ public static final String SIX = "common_subscriber_id,common_vsys_id,common_device_group,common_data_center";
+ public static final String SEVEN = "common_app_label,common_vsys_id,common_device_group,common_data_center";
+
+ public static final String EIGHT = "http_url,common_vsys_id";
+
+
+
+ public static String setOneDimension(Entity cnRecordLog){
+
+ String oneDimension = cnRecordLog.getCommon_client_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center();
+ return oneDimension;
+ }
+
+
+ public static String setTwoDimension(Entity cnRecordLog){
+
+ String twoDimension = cnRecordLog.getCommon_server_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center();
+ return twoDimension;
+ }
+
+
+ public static String setThreeDimension(Entity cnRecordLog){
+
+ String threeDimension = cnRecordLog.getCommon_internal_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center();
+ return threeDimension;
+ }
+
+
+ public static String setFourDimension(Entity cnRecordLog){
+
+ String fourDimension = cnRecordLog.getCommon_external_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center();
+ return fourDimension;
+ }
+
+ public static String setFiveDimension(Entity cnRecordLog){
+
+ String fiveDimension = cnRecordLog.getHttp_domain()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center();
+ return fiveDimension;
+ }
+
+
+ public static String setSixDimension(Entity cnRecordLog){
+
+ String sixDimension = cnRecordLog.getCommon_subscriber_id()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center();
+ return sixDimension;
+ }
+
+
+ public static String setSevenDimension(Entity cnRecordLog){
+
+ String sevenDimension = cnRecordLog.getCommon_app_label()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center();
+ return sevenDimension;
+ }
+
+ public static String setEightDimension(UrlEntity cnRecordLog){
+
+ String eightDimension = cnRecordLog.getHttp_url()+","+cnRecordLog.getCommon_vsys_id();
+ return eightDimension;
+ }
+
+}
diff --git a/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java b/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java
new file mode 100644
index 0000000..457cbc2
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java
@@ -0,0 +1,43 @@
+package com.galaxy.tsg.function;
+
+
+import org.apache.datasketches.frequencies.ItemsSketch;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/2/17 10:03
+ * 维度+datasketches
+ */
+public class DimensionItemsSketch {
+
+ private String dimension;//维度
+ private ItemsSketch<String> itemsSketch;//对应的
+
+
+ public DimensionItemsSketch(String dimension, ItemsSketch<String> itemsSketch) {
+ this.dimension = dimension;
+ this.itemsSketch = itemsSketch;
+ }
+
+
+ public String getDimension() {
+ return dimension;
+ }
+
+ public void setDimension(String dimension) {
+ this.dimension = dimension;
+ }
+
+ public ItemsSketch<String> getItemsSketch() {
+ return itemsSketch;
+ }
+
+ public void setItemsSketch(ItemsSketch<String> itemsSketch) {
+ this.itemsSketch = itemsSketch;
+ }
+}
+
+
+
+
diff --git a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java
new file mode 100644
index 0000000..f7f9207
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java
@@ -0,0 +1,79 @@
+package com.galaxy.tsg.function;
+
+import org.apache.datasketches.frequencies.ErrorType;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/3/7 15:50
+ */
+public class UserCountWindowResult5 extends ProcessAllWindowFunction<HashMap<String, DimensionItemsSketch>, String, TimeWindow> {
+
+
+ @Override
+ public void process(Context context, Iterable<HashMap<String, DimensionItemsSketch>> iterable, Collector<String> collector) throws Exception {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("-------------------------\n");
+ stringBuilder.append("datasketches方法窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n");
+
+
+ HashMap<String, DimensionItemsSketch> dataHashMap = iterable.iterator().next();
+
+
+ System.out.println(dataHashMap.toString());
+
+ Set<Map.Entry<String, DimensionItemsSketch>> entries = dataHashMap.entrySet();
+
+
+ for(Map.Entry<String, DimensionItemsSketch> entry:entries){
+
+ System.out.println(entry.getKey()+"");
+ stringBuilder.append(entry.getKey()+"\n");
+
+ ItemsSketch.Row<String>[] items = entry.getValue().getItemsSketch().getFrequentItems(ErrorType.NO_FALSE_POSITIVES);
+
+ for (int i=0;i<items.length;i++){
+
+
+ String resultStr = "No." + (i + 1) + " "
+ + "ip:" + items[i].getItem() + " "
+ + " Est:" + items[i].getEstimate()
+ + " UB:" + items[i].getUpperBound()
+ + " LB:" + items[i].getLowerBound();
+
+
+
+// String item = items[i].toString();
+ stringBuilder.append(resultStr);
+
+ stringBuilder.append("\n");
+
+
+ if (i==1000)
+ break;
+
+ }
+
+
+
+
+
+ }
+
+ collector.collect(stringBuilder.toString());
+
+ }
+
+
+
+
+}
diff --git a/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg5.java b/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg5.java
new file mode 100644
index 0000000..9f37305
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg5.java
@@ -0,0 +1,217 @@
+package com.galaxy.tsg.function;
+
+
+import com.galaxy.tsg.pojo.Entity;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+import java.util.HashMap;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/2/14 17:29
+ *
+ *Session_record top10000 21个窗口计算
+ */
+public class UserHashMapCountAgg5 implements AggregateFunction<Entity, HashMap<String,DimensionItemsSketch>, HashMap<String,DimensionItemsSketch>> {
+
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> createAccumulator() {
+ return new HashMap<String, DimensionItemsSketch>(1048576);
+ }
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> add(Entity cnRecordLog, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
+
+// String dimension = cnRecordLog.getCommon_client_ip();//维度
+// System.out.println(dimension);
+
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch<String> oneSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+ ItemsSketch<String> twoSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+ ItemsSketch<String> threeSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+ ItemsSketch<String> fourSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+ ItemsSketch<String> fiveSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+ ItemsSketch<String> sixSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+ ItemsSketch<String> sevenSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ ItemsSketch<String> onePktItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> twoPktItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> threePktItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> fourPktItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> fivePktItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> sixPktItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> sevenPktItemsSketch = new ItemsSketch<>(1048576);
+
+
+
+ ItemsSketch<String> oneByteItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> twoByteItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> threeByteItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> fourByteItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> fiveByteItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> sixByteItemsSketch = new ItemsSketch<>(1048576);
+ ItemsSketch<String> sevenByteItemsSketch = new ItemsSketch<>(1048576);
+
+
+ oneSessionItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ twoSessionItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ threeSessionItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ fourSessionItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ fiveSessionItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ sixSessionItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ sevenSessionItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ onePktItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ twoPktItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ threePktItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ fourPktItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ fivePktItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ sixPktItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ sevenPktItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+
+ oneByteItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ twoByteItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ threeByteItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ fourByteItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ fiveByteItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ sixByteItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ sevenByteItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+
+
+ DimensionItemsSketch oneSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE, oneSessionItemsSketch);
+ DimensionItemsSketch twoSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoSessionItemsSketch);
+ DimensionItemsSketch threeSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeSessionItemsSketch);
+ DimensionItemsSketch fourSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourSessionItemsSketch);
+ DimensionItemsSketch fiveSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveSessionItemsSketch);
+ DimensionItemsSketch sixSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixSessionItemsSketch);
+ DimensionItemsSketch sevenSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenSessionItemsSketch);
+
+ DimensionItemsSketch onePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,onePktItemsSketch);
+ DimensionItemsSketch twoPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoPktItemsSketch);
+ DimensionItemsSketch threePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threePktItemsSketch);
+ DimensionItemsSketch fourPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourPktItemsSketch);
+ DimensionItemsSketch fivePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fivePktItemsSketch);
+ DimensionItemsSketch sixPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixPktItemsSketch);
+ DimensionItemsSketch sevenPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenPktItemsSketch);
+
+ DimensionItemsSketch oneByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,oneByteItemsSketch);
+ DimensionItemsSketch twoByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoByteItemsSketch);
+ DimensionItemsSketch threeByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeByteItemsSketch);
+ DimensionItemsSketch fourByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourByteItemsSketch);
+ DimensionItemsSketch fiveByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveByteItemsSketch);
+ DimensionItemsSketch sixByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixByteItemsSketch);
+ DimensionItemsSketch sevenByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenByteItemsSketch);
+
+
+ stringItemsSketchHashMap.put("oneSession", oneSessionDimensionItemsSketch);
+ stringItemsSketchHashMap.put("twoSession", twoSessionDimensionItemsSketch);
+ stringItemsSketchHashMap.put("threeSession", threeSessionDimensionItemsSketch);
+ stringItemsSketchHashMap.put("fourSession", fourSessionDimensionItemsSketch);
+ stringItemsSketchHashMap.put("fiveSession", fiveSessionDimensionItemsSketch);
+ stringItemsSketchHashMap.put("sixSession", sixSessionDimensionItemsSketch);
+ stringItemsSketchHashMap.put("sevenSession", sevenSessionDimensionItemsSketch);
+
+
+ stringItemsSketchHashMap.put("onePkt",onePktDimensionItemsSketch);
+ stringItemsSketchHashMap.put("twoPkt",twoPktDimensionItemsSketch);
+ stringItemsSketchHashMap.put("threePkt",threePktDimensionItemsSketch);
+ stringItemsSketchHashMap.put("fourPkt",fourPktDimensionItemsSketch);
+ stringItemsSketchHashMap.put("fivePkt",fivePktDimensionItemsSketch);
+ stringItemsSketchHashMap.put("sixPkt",sixPktDimensionItemsSketch);
+ stringItemsSketchHashMap.put("sevenPkt",sevenPktDimensionItemsSketch);
+
+
+ stringItemsSketchHashMap.put("oneByte",oneByteDimensionItemsSketch);
+ stringItemsSketchHashMap.put("twoByte",twoByteDimensionItemsSketch);
+ stringItemsSketchHashMap.put("threeByte",threeByteDimensionItemsSketch);
+ stringItemsSketchHashMap.put("fourByte",fourByteDimensionItemsSketch);
+ stringItemsSketchHashMap.put("fiveByte",fiveByteDimensionItemsSketch);
+ stringItemsSketchHashMap.put("sixByte",sixByteDimensionItemsSketch);
+ stringItemsSketchHashMap.put("sevenByte",sevenByteDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("oneSession").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ stringItemsSketchHashMap.get("twoSession").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ stringItemsSketchHashMap.get("threeSession").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ stringItemsSketchHashMap.get("fourSession").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ stringItemsSketchHashMap.get("fiveSession").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ stringItemsSketchHashMap.get("sixSession").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+ stringItemsSketchHashMap.get("sevenSession").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ stringItemsSketchHashMap.get("onePkt").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ stringItemsSketchHashMap.get("twoPkt").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ stringItemsSketchHashMap.get("threePkt").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ stringItemsSketchHashMap.get("fourPkt").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ stringItemsSketchHashMap.get("fivePkt").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ stringItemsSketchHashMap.get("sixPkt").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+ stringItemsSketchHashMap.get("sevenPkt").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+
+ stringItemsSketchHashMap.get("oneByte").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ stringItemsSketchHashMap.get("twoByte").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ stringItemsSketchHashMap.get("threeByte").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ stringItemsSketchHashMap.get("fourByte").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ stringItemsSketchHashMap.get("fiveByte").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ stringItemsSketchHashMap.get("sixByte").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+ stringItemsSketchHashMap.get("sevenByte").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+
+
+
+
+ }
+
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> getResult(HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> merge(HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap, HashMap<String, DimensionItemsSketch> acc1) {
+
+// System.out.println("合并");
+// HashMap<String, ItemsSketch<String>> unionSketchHashMap = new HashMap<>();
+// ItemsSketch<String> sessionItemsSketch = new ItemsSketch<>(1048576);
+// ItemsSketch<String> pktItemsSketch = new ItemsSketch<>(1048576);
+// ItemsSketch<String> byteItemsSketch = new ItemsSketch<>(1048576);
+
+// ItemsSketch<String> session_stringItemsSketch = stringItemsSketchHashMap.get("session");
+// ItemsSketch<String> pkt_stringItemsSketch = stringItemsSketchHashMap.get("pkt");
+// ItemsSketch<String> byte_stringItemsSketch = stringItemsSketchHashMap.get("byte");
+
+// ItemsSketch<String> session_acc1 = acc1.get("session");
+// ItemsSketch<String> pkt_acc1 = acc1.get("pkt");
+// ItemsSketch<String> byte_acc1 = acc1.get("byte");
+
+// sessionItemsSketch.merge(session_stringItemsSketch);
+// sessionItemsSketch.merge(session_acc1);
+
+// pktItemsSketch.merge(pkt_stringItemsSketch);
+// pktItemsSketch.merge(pkt_acc1);
+//
+// byteItemsSketch.merge(byte_stringItemsSketch);
+// byteItemsSketch.merge(byte_acc1);
+
+// unionSketchHashMap.put("session",sessionItemsSketch);
+// unionSketchHashMap.put("pkt",pktItemsSketch);
+// unionSketchHashMap.put("byte",byteItemsSketch);
+
+
+ return null;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java b/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java
new file mode 100644
index 0000000..2005e1f
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java
@@ -0,0 +1,99 @@
+package com.galaxy.tsg.function;
+
+
+import com.galaxy.tsg.pojo.Entity;
+import com.galaxy.tsg.pojo.UrlEntity;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+import java.util.HashMap;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/2/14 17:29
+ *
+ *Session_record top10000 21个窗口计算
+ */
+public class UserHashMapCountAgg6 implements AggregateFunction<UrlEntity, HashMap<String,DimensionItemsSketch>, HashMap<String,DimensionItemsSketch>> {
+
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> createAccumulator() {
+ return new HashMap<String, DimensionItemsSketch>(1048576);
+ }
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> add(UrlEntity cnRecordLog, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
+
+// String dimension = cnRecordLog.getCommon_client_ip();//维度
+// System.out.println(dimension);
+
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch<String> eightSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ eightSessionItemsSketch.update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ DimensionItemsSketch eightSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.EIGHT,eightSessionItemsSketch);
+
+ stringItemsSketchHashMap.put("eightSession", eightSessionDimensionItemsSketch);
+
+
+ }else {
+
+
+
+ stringItemsSketchHashMap.get("eightSession").getItemsSketch().update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+
+
+ }
+
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> getResult(HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap<String, DimensionItemsSketch> merge(HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap, HashMap<String, DimensionItemsSketch> acc1) {
+
+// System.out.println("合并");
+// HashMap<String, ItemsSketch<String>> unionSketchHashMap = new HashMap<>();
+// ItemsSketch<String> sessionItemsSketch = new ItemsSketch<>(1048576);
+// ItemsSketch<String> pktItemsSketch = new ItemsSketch<>(1048576);
+// ItemsSketch<String> byteItemsSketch = new ItemsSketch<>(1048576);
+
+// ItemsSketch<String> session_stringItemsSketch = stringItemsSketchHashMap.get("session");
+// ItemsSketch<String> pkt_stringItemsSketch = stringItemsSketchHashMap.get("pkt");
+// ItemsSketch<String> byte_stringItemsSketch = stringItemsSketchHashMap.get("byte");
+
+// ItemsSketch<String> session_acc1 = acc1.get("session");
+// ItemsSketch<String> pkt_acc1 = acc1.get("pkt");
+// ItemsSketch<String> byte_acc1 = acc1.get("byte");
+
+// sessionItemsSketch.merge(session_stringItemsSketch);
+// sessionItemsSketch.merge(session_acc1);
+
+// pktItemsSketch.merge(pkt_stringItemsSketch);
+// pktItemsSketch.merge(pkt_acc1);
+//
+// byteItemsSketch.merge(byte_stringItemsSketch);
+// byteItemsSketch.merge(byte_acc1);
+
+// unionSketchHashMap.put("session",sessionItemsSketch);
+// unionSketchHashMap.put("pkt",pktItemsSketch);
+// unionSketchHashMap.put("byte",byteItemsSketch);
+
+
+ return null;
+ }
+}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index f6a9a9d..13cda90 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -56,4 +56,4 @@ kafka.producer.compression.type=none
kafka_producer_broker=192.168.44.12:9092
-tmp.test.type=1 \ No newline at end of file
+tmp.test.type=2 \ No newline at end of file