diff options
| author | fengyi <[email protected]> | 2023-03-13 10:25:06 +0800 |
|---|---|---|
| committer | fengyi <[email protected]> | 2023-03-13 10:25:06 +0800 |
| commit | f9c33dd93c2b9812242859114ce507a1b022acc7 (patch) | |
| tree | 539ea159631a3216f173f0533eb8b204f4f83df2 | |
| parent | 455f390387e54d0ef97812a834d2db264e3f5f60 (diff) | |
datasketch部分代码采用reduce方式
7 files changed, 873 insertions, 538 deletions
diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java index 1841988..53065e2 100644 --- a/src/main/java/com/galaxy/tsg/Toptask.java +++ b/src/main/java/com/galaxy/tsg/Toptask.java @@ -228,8 +228,8 @@ public class Toptask { case 2: //datasketch - //clientip聚合TOP + SingleOutputStreamOperator<Entity> clientipdStream2 = inputForSession.filter(new FilterFunction<Entity>() { @Override public boolean filter(Entity value) throws Exception { @@ -237,24 +237,12 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - - clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("oneSession"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); - - clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("onePkt"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); - - - clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("oneByte"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); - - + SingleOutputStreamOperator<ResultEntity> windowedStream2 = clientipdStream2.keyBy(new groupBySelector("common_client_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM); + DataStream<String> windoweddStream2 = windowedStream2.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); + windoweddStream2.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); //serverip聚合TOP @@ -265,21 +253,12 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("twoSession"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3); - - serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("twoPkt"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3); - - serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("twoByte"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3); - + SingleOutputStreamOperator<ResultEntity> windowedStreamForServerIp2 = serveripdStream2.keyBy(new groupBySelector("common_server_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM); + DataStream<String> windoweddStreamForServerIp2 = windowedStreamForServerIp2.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); + windoweddStreamForServerIp2.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3); //common_internal_ip聚合TOP @@ -290,22 +269,12 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("threeSession"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3); - - internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("threePkt"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3); - - internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("threeByte"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3); - - + SingleOutputStreamOperator<ResultEntity> windowedStreamForInternal2 = internalStream2.keyBy(new groupBySelector("common_internal_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM); + DataStream<String> WindoweddStreamForInternal2 = windowedStreamForInternal2.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); + WindoweddStreamForInternal2.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3); //common_external_ip聚合TOP @@ -316,22 +285,12 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("fourSession"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3); - - externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("fourPkt"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3); - - externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("fourByte"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3); - - + SingleOutputStreamOperator<ResultEntity> windowedStreamForExternal2 = externalStream2.keyBy(new groupBySelector("common_external_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM); + DataStream<String> WindoweddStreamForExternal2 = windowedStreamForExternal2.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); + WindoweddStreamForExternal2.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3); //http_domain聚合TOP @@ -342,24 +301,13 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); + SingleOutputStreamOperator<ResultEntity> windowedStreamForDomain2 = domainStream2.keyBy(new groupBySelector("http_domain")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM); + DataStream<String> WindoweddStreamForDomain2 = windowedStreamForDomain2.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); + WindoweddStreamForDomain2.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3); - domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("fiveSession"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3); - - domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("fivePkt"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3); - - domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("fiveByte"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3); - - - //common_subscriber_id聚合TOP SingleOutputStreamOperator<Entity> userStream2 = inputForSession.filter(new FilterFunction<Entity>() { @Override public boolean filter(Entity value) throws Exception { @@ -367,20 +315,13 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("sixSession"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-USER")).setParallelism(3); - - userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("sixPkt"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-USER")).setParallelism(3); - - userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("sixByte"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-USER")).setParallelism(3); + //common_subscriber_id聚合TOP + SingleOutputStreamOperator<ResultEntity> windowedStreamForUser2 = userStream2.keyBy(new groupBySelector("common_subscriber_id")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM); + DataStream<String> WindoweddStreamForUser2 = windowedStreamForUser2.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); + WindoweddStreamForUser2.addSink(getKafkaSink("TOP-USER")).setParallelism(3); //common_app_label聚合求全量 @@ -392,26 +333,14 @@ public class Toptask { }).assignTimestampsAndWatermarks(strategyForSession); - appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("sevenSession"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM); - - appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("sevenPkt"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM); - - appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForMetricsAggregate("sevenByte"), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM); - + appNameStream2.keyBy(new groupBySelector("common_app_label")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM); - //Security_record top 1000 1个窗口、proxy_record top 1000 1个窗口 + //url聚合session求top SingleOutputStreamOperator<UrlEntity> UrlStream2 = inputForUrl.filter(new FilterFunction<UrlEntity>() { @Override public boolean filter(UrlEntity value) throws Exception { @@ -420,29 +349,12 @@ public class Toptask { }).assignTimestampsAndWatermarks(strategyForSecurity); - UrlStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .aggregate(new DatasketchForUrlAggregate(), new UserCountWindowResult5()) -// .print() - .addSink(getKafkaSink("TOP-URLS")).setParallelism(3); - - - - - //clientip聚合TOP - -// SingleOutputStreamOperator<Entity> clientipdStream3 = inputForSession.filter(new FilterFunction<Entity>() { -// @Override -// public boolean filter(Entity value) throws Exception { -// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol()); -// } -// }).assignTimestampsAndWatermarks(strategyForSession); -// -// SingleOutputStreamOperator<ResultEntity> windowedStream3 = clientipdStream3.keyBy(new groupBySelector("common_client_ip")) -// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) -// .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM); -// DataStream<String> windoweddStream3 = windowedStream3.keyBy(new oneKeySelector()) -// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); -// windoweddStream3.print(); + SingleOutputStreamOperator<ResultEntity> windowedStreamForUrl2 = UrlStream2.keyBy(new twoKeySelector()) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new UrlAggregationReduce(), new DatasketchUrlCalculate(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM); + DataStream<String> WindoweddStreamForUrl2 = windowedStreamForUrl2.keyBy(new oneKeySelector()) + .process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1); + WindoweddStreamForUrl2.addSink(getKafkaSink("TOP-URLS")).setParallelism(3); diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate.java b/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate.java index 4883642..2b6e7ad 100644 --- a/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate.java +++ b/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate.java @@ -31,48 +31,48 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } @Override - public HashMap<String, DimensionItemsSketch> add(Entity cnRecordLog, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) { + public HashMap<String, DimensionItemsSketch> add(Entity entity, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) { // String dimension = cnRecordLog.getCommon_client_ip();//维度 // System.out.println(dimension); switch (key){ - case "oneSession": + case "clientIpSession": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> oneSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> clientIpSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - oneSessionItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + clientIpSessionItemsSketch.update(Dimension.setClientIpDimension(entity),entity.getCommon_sessions()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_client_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_client_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setOneDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setClientIpDimension(entity),collectionValue); - DimensionItemsSketch oneSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE, oneSessionItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch clientIpSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.CLIENTIP, clientIpSessionItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("oneSession", oneSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("clientIpSession", clientIpSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("oneSession").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("clientIpSession").getItemsSketch().update(Dimension.setClientIpDimension(entity),entity.getCommon_sessions()); - CollectionValue oneSession = stringItemsSketchHashMap.get("oneSession").getStringCollectionValueHashMap().get(Dimension.setOneDimension(cnRecordLog));//从key获取集合 + CollectionValue clientIpSession = stringItemsSketchHashMap.get("clientIpSession").getStringCollectionValueHashMap().get(Dimension.setClientIpDimension(entity));//从key获取集合 - if (oneSession==null){ + if (clientIpSession==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_client_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_client_ip()); - stringItemsSketchHashMap.get("oneSession").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("clientIpSession").getStringCollectionValueHashMap().put(Dimension.setClientIpDimension(entity),collectionValue); }else {//做加和 - oneSession.getCollectionValue(oneSession,cnRecordLog); + clientIpSession.getCollectionValue(clientIpSession,entity); - stringItemsSketchHashMap.get("oneSession").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),oneSession); + stringItemsSketchHashMap.get("clientIpSession").getStringCollectionValueHashMap().put(Dimension.setClientIpDimension(entity),clientIpSession); } @@ -81,43 +81,43 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "twoSession": + case "serverIpSession": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> twoSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> serverIpSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - twoSessionItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + serverIpSessionItemsSketch.update(Dimension.setServerIpDimension(entity),entity.getCommon_sessions()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_server_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setTwoDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setServerIpDimension(entity),collectionValue); - DimensionItemsSketch twoSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoSessionItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch serverIpSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SERVERIP,serverIpSessionItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("twoSession", twoSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("serverIpSession", serverIpSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("twoSession").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("serverIpSession").getItemsSketch().update(Dimension.setServerIpDimension(entity),entity.getCommon_sessions()); - CollectionValue twoSession = stringItemsSketchHashMap.get("twoSession").getStringCollectionValueHashMap().get(Dimension.setTwoDimension(cnRecordLog)); + CollectionValue serverIpSession = stringItemsSketchHashMap.get("serverIpSession").getStringCollectionValueHashMap().get(Dimension.setServerIpDimension(entity)); - if (twoSession==null){ + if (serverIpSession==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_server_ip()); - stringItemsSketchHashMap.get("twoSession").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("serverIpSession").getStringCollectionValueHashMap().put(Dimension.setServerIpDimension(entity),collectionValue); }else {//做加和 - twoSession.getCollectionValue(twoSession,cnRecordLog); + serverIpSession.getCollectionValue(serverIpSession,entity); - stringItemsSketchHashMap.get("twoSession").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),twoSession); + stringItemsSketchHashMap.get("serverIpSession").getStringCollectionValueHashMap().put(Dimension.setServerIpDimension(entity),serverIpSession); } @@ -128,45 +128,45 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "threeSession": + case "internalIpSession": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> threeSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> internalIpSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - threeSessionItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + internalIpSessionItemsSketch.update(Dimension.setInternalIpDimension(entity),entity.getCommon_sessions()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_internal_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_internal_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setThreeDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setInternalIpDimension(entity),collectionValue); - DimensionItemsSketch threeSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeSessionItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch internalIpSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.INTERNALIP,internalIpSessionItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("threeSession", threeSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("internalIpSession", internalIpSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("threeSession").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("internalIpSession").getItemsSketch().update(Dimension.setInternalIpDimension(entity),entity.getCommon_sessions()); - CollectionValue threeSession = stringItemsSketchHashMap.get("threeSession").getStringCollectionValueHashMap().get(Dimension.setThreeDimension(cnRecordLog)); + CollectionValue internalIpSession = stringItemsSketchHashMap.get("internalIpSession").getStringCollectionValueHashMap().get(Dimension.setInternalIpDimension(entity)); - if (threeSession==null){ + if (internalIpSession==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_server_ip()); - stringItemsSketchHashMap.get("threeSession").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("internalIpSession").getStringCollectionValueHashMap().put(Dimension.setInternalIpDimension(entity),collectionValue); }else {//做加和 - threeSession.getCollectionValue(threeSession,cnRecordLog); + internalIpSession.getCollectionValue(internalIpSession,entity); - stringItemsSketchHashMap.get("threeSession").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),threeSession); + stringItemsSketchHashMap.get("internalIpSession").getStringCollectionValueHashMap().put(Dimension.setInternalIpDimension(entity),internalIpSession); } @@ -177,47 +177,47 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "fourSession": + case "externalIpSession": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> fourSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> externalIpSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - fourSessionItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + externalIpSessionItemsSketch.update(Dimension.setExternalIpDimension(entity),entity.getCommon_sessions()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_external_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_external_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setFourDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setExternalIpDimension(entity),collectionValue); - DimensionItemsSketch fourSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourSessionItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch externalIpSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.EXTERNALIP,externalIpSessionItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("fourSession", fourSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("externalIpSession", externalIpSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("fourSession").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("externalIpSession").getItemsSketch().update(Dimension.setExternalIpDimension(entity),entity.getCommon_sessions()); - CollectionValue fourSession = stringItemsSketchHashMap.get("fourSession").getStringCollectionValueHashMap().get(Dimension.setFourDimension(cnRecordLog)); + CollectionValue externalIpSession = stringItemsSketchHashMap.get("externalIpSession").getStringCollectionValueHashMap().get(Dimension.setExternalIpDimension(entity)); - if (fourSession==null){ + if (externalIpSession==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_external_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_external_ip()); - stringItemsSketchHashMap.get("fourSession").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("externalIpSession").getStringCollectionValueHashMap().put(Dimension.setExternalIpDimension(entity),collectionValue); }else {//做加和 - fourSession.getCollectionValue(fourSession,cnRecordLog); + externalIpSession.getCollectionValue(externalIpSession,entity); - stringItemsSketchHashMap.get("fourSession").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),fourSession); + stringItemsSketchHashMap.get("externalIpSession").getStringCollectionValueHashMap().put(Dimension.setExternalIpDimension(entity),externalIpSession); } @@ -228,48 +228,48 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "fiveSession": + case "domainSession": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> fiveSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> domainSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - fiveSessionItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + domainSessionItemsSketch.update(Dimension.setDomainDimension(entity),entity.getCommon_sessions()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getHttp_domain()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getHttp_domain()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setFiveDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setDomainDimension(entity),collectionValue); - DimensionItemsSketch fiveSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveSessionItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch domainSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.DOMAIN,domainSessionItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("fiveSession", fiveSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("domainSession", domainSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("fiveSession").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("domainSession").getItemsSketch().update(Dimension.setDomainDimension(entity),entity.getCommon_sessions()); - CollectionValue fiveSession = stringItemsSketchHashMap.get("fiveSession").getStringCollectionValueHashMap().get(Dimension.setFiveDimension(cnRecordLog)); + CollectionValue domainSession = stringItemsSketchHashMap.get("domainSession").getStringCollectionValueHashMap().get(Dimension.setDomainDimension(entity)); - if (fiveSession==null){ + if (domainSession==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getHttp_domain()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getHttp_domain()); - stringItemsSketchHashMap.get("fiveSession").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("domainSession").getStringCollectionValueHashMap().put(Dimension.setDomainDimension(entity),collectionValue); }else {//做加和 - fiveSession.getCollectionValue(fiveSession,cnRecordLog); + domainSession.getCollectionValue(domainSession,entity); - stringItemsSketchHashMap.get("fiveSession").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),fiveSession); + stringItemsSketchHashMap.get("domainSession").getStringCollectionValueHashMap().put(Dimension.setDomainDimension(entity),domainSession); } @@ -279,46 +279,46 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "sixSession": + case "subscriberIdSession": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> sixSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> subscriberIdSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - sixSessionItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + subscriberIdSessionItemsSketch.update(Dimension.setSubscriberIdDimension(entity),entity.getCommon_sessions()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_subscriber_id()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_subscriber_id()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setSixDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setSubscriberIdDimension(entity),collectionValue); - DimensionItemsSketch sixSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixSessionItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch subscriberIdSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SUBSCRIBERID,subscriberIdSessionItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("sixSession", sixSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("subscriberIdSession", subscriberIdSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("sixSession").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("subscriberIdSession").getItemsSketch().update(Dimension.setSubscriberIdDimension(entity),entity.getCommon_sessions()); - CollectionValue sixSession = stringItemsSketchHashMap.get("sixSession").getStringCollectionValueHashMap().get(Dimension.setSixDimension(cnRecordLog)); + CollectionValue subscriberIdSession = stringItemsSketchHashMap.get("subscriberIdSession").getStringCollectionValueHashMap().get(Dimension.setSubscriberIdDimension(entity)); - if (sixSession==null){ + if (subscriberIdSession==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_subscriber_id()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_subscriber_id()); - stringItemsSketchHashMap.get("sixSession").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("subscriberIdSession").getStringCollectionValueHashMap().put(Dimension.setSubscriberIdDimension(entity),collectionValue); }else {//做加和 - sixSession.getCollectionValue(sixSession,cnRecordLog); + subscriberIdSession.getCollectionValue(subscriberIdSession,entity); - stringItemsSketchHashMap.get("sixSession").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),sixSession); + stringItemsSketchHashMap.get("subscriberIdSession").getStringCollectionValueHashMap().put(Dimension.setSubscriberIdDimension(entity),subscriberIdSession); } @@ -331,46 +331,46 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "sevenSession": + case "appLabelSession": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> sevenSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> appLabelSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - sevenSessionItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + appLabelSessionItemsSketch.update(Dimension.setAppLabelDimension(entity),entity.getCommon_sessions()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_app_label()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_app_label()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setSevenDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setAppLabelDimension(entity),collectionValue); - DimensionItemsSketch sevenSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenSessionItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch appLabelSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.APPLABEL,appLabelSessionItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("sevenSession", sevenSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("appLabelSession", appLabelSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("sevenSession").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("appLabelSession").getItemsSketch().update(Dimension.setAppLabelDimension(entity),entity.getCommon_sessions()); - CollectionValue sevenSession = stringItemsSketchHashMap.get("sevenSession").getStringCollectionValueHashMap().get(Dimension.setSevenDimension(cnRecordLog)); + CollectionValue appLabelSession = stringItemsSketchHashMap.get("appLabelSession").getStringCollectionValueHashMap().get(Dimension.setAppLabelDimension(entity)); - if (sevenSession==null){ + if (appLabelSession==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions"); - collectionValue.setSource(cnRecordLog.getCommon_app_label()); + CollectionValue collectionValue = new CollectionValue(entity,"sessions"); + collectionValue.setSource(entity.getCommon_app_label()); - stringItemsSketchHashMap.get("sevenSession").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("appLabelSession").getStringCollectionValueHashMap().put(Dimension.setAppLabelDimension(entity),collectionValue); }else {//做加和 - sevenSession.getCollectionValue(sevenSession,cnRecordLog); + appLabelSession.getCollectionValue(appLabelSession,entity); - stringItemsSketchHashMap.get("sevenSession").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),sevenSession); + stringItemsSketchHashMap.get("appLabelSession").getStringCollectionValueHashMap().put(Dimension.setAppLabelDimension(entity),appLabelSession); } @@ -380,43 +380,43 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "onePkt": + case "clientIpPkt": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> onePktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> clientIpPktItemsSketch = new ItemsSketch<>(1048576); - onePktItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + clientIpPktItemsSketch.update(Dimension.setClientIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_client_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_client_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setOneDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setClientIpDimension(entity),collectionValue); - DimensionItemsSketch onePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,onePktItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch clientIpPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.CLIENTIP,clientIpPktItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("onePkt",onePktDimensionItemsSketch); + stringItemsSketchHashMap.put("clientIpPkt",clientIpPktDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("onePkt").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("clientIpPkt").getItemsSketch().update(Dimension.setClientIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue onePkt = stringItemsSketchHashMap.get("onePkt").getStringCollectionValueHashMap().get(Dimension.setOneDimension(cnRecordLog));//从key获取集合 + CollectionValue clientIpPkt = stringItemsSketchHashMap.get("clientIpPkt").getStringCollectionValueHashMap().get(Dimension.setClientIpDimension(entity));//从key获取集合 - if (onePkt==null){ + if (clientIpPkt==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_client_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_client_ip()); - stringItemsSketchHashMap.get("onePkt").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("clientIpPkt").getStringCollectionValueHashMap().put(Dimension.setClientIpDimension(entity),collectionValue); }else {//做加和 - onePkt.getCollectionValue(onePkt,cnRecordLog); + clientIpPkt.getCollectionValue(clientIpPkt,entity); - stringItemsSketchHashMap.get("onePkt").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),onePkt); + stringItemsSketchHashMap.get("clientIpPkt").getStringCollectionValueHashMap().put(Dimension.setClientIpDimension(entity),clientIpPkt); } @@ -426,45 +426,45 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "twoPkt": + case "serverIpPkt": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> twoPktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> serverIpPktItemsSketch = new ItemsSketch<>(1048576); - twoPktItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + serverIpPktItemsSketch.update(Dimension.setServerIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_server_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setTwoDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setServerIpDimension(entity),collectionValue); - DimensionItemsSketch twoPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoPktItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch serverIpPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SERVERIP,serverIpPktItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("twoPkt",twoPktDimensionItemsSketch); + stringItemsSketchHashMap.put("serverIpPkt",serverIpPktDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("twoPkt").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("serverIpPkt").getItemsSketch().update(Dimension.setServerIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue twoPkt = stringItemsSketchHashMap.get("twoPkt").getStringCollectionValueHashMap().get(Dimension.setTwoDimension(cnRecordLog)); + CollectionValue serverIpPkt = stringItemsSketchHashMap.get("serverIpPkt").getStringCollectionValueHashMap().get(Dimension.setServerIpDimension(entity)); - if (twoPkt==null){ + if (serverIpPkt==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_server_ip()); - stringItemsSketchHashMap.get("twoPkt").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("serverIpPkt").getStringCollectionValueHashMap().put(Dimension.setServerIpDimension(entity),collectionValue); }else {//做加和 - twoPkt.getCollectionValue(twoPkt,cnRecordLog); + serverIpPkt.getCollectionValue(serverIpPkt,entity); - stringItemsSketchHashMap.get("twoPkt").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),twoPkt); + stringItemsSketchHashMap.get("serverIpPkt").getStringCollectionValueHashMap().put(Dimension.setServerIpDimension(entity),serverIpPkt); } @@ -474,47 +474,47 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "threePkt": + case "internalIpPkt": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> threePktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> internalIpPktItemsSketch = new ItemsSketch<>(1048576); - threePktItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + internalIpPktItemsSketch.update(Dimension.setInternalIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_internal_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_internal_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setThreeDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setInternalIpDimension(entity),collectionValue); - DimensionItemsSketch threePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threePktItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch internalIpPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.INTERNALIP,internalIpPktItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("threePkt",threePktDimensionItemsSketch); + stringItemsSketchHashMap.put("internalIpPkt",internalIpPktDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("threePkt").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("internalIpPkt").getItemsSketch().update(Dimension.setInternalIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue threePkt = stringItemsSketchHashMap.get("threePkt").getStringCollectionValueHashMap().get(Dimension.setThreeDimension(cnRecordLog)); + CollectionValue internalIpPkt = stringItemsSketchHashMap.get("internalIpPkt").getStringCollectionValueHashMap().get(Dimension.setInternalIpDimension(entity)); - if (threePkt==null){ + if (internalIpPkt==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_server_ip()); - stringItemsSketchHashMap.get("threePkt").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("internalIpPkt").getStringCollectionValueHashMap().put(Dimension.setInternalIpDimension(entity),collectionValue); }else {//做加和 - threePkt.getCollectionValue(threePkt,cnRecordLog); + internalIpPkt.getCollectionValue(internalIpPkt,entity); - stringItemsSketchHashMap.get("threePkt").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),threePkt); + stringItemsSketchHashMap.get("internalIpPkt").getStringCollectionValueHashMap().put(Dimension.setInternalIpDimension(entity),internalIpPkt); } @@ -525,47 +525,47 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "fourPkt": + case "externalIpPkt": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> fourPktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> externalIpPktItemsSketch = new ItemsSketch<>(1048576); - fourPktItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + externalIpPktItemsSketch.update(Dimension.setExternalIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_external_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_external_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setFourDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setExternalIpDimension(entity),collectionValue); - DimensionItemsSketch fourPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourPktItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch externalIpPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.EXTERNALIP,externalIpPktItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("fourPkt",fourPktDimensionItemsSketch); + stringItemsSketchHashMap.put("externalIpPkt",externalIpPktDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("fourPkt").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("externalIpPkt").getItemsSketch().update(Dimension.setExternalIpDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue fourPkt = stringItemsSketchHashMap.get("fourPkt").getStringCollectionValueHashMap().get(Dimension.setFourDimension(cnRecordLog)); + CollectionValue externalIpPkt = stringItemsSketchHashMap.get("externalIpPkt").getStringCollectionValueHashMap().get(Dimension.setExternalIpDimension(entity)); - if (fourPkt==null){ + if (externalIpPkt==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_external_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_external_ip()); - stringItemsSketchHashMap.get("fourPkt").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("externalIpPkt").getStringCollectionValueHashMap().put(Dimension.setExternalIpDimension(entity),collectionValue); }else {//做加和 - fourPkt.getCollectionValue(fourPkt,cnRecordLog); + externalIpPkt.getCollectionValue(externalIpPkt,entity); - stringItemsSketchHashMap.get("fourPkt").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),fourPkt); + stringItemsSketchHashMap.get("externalIpPkt").getStringCollectionValueHashMap().put(Dimension.setExternalIpDimension(entity),externalIpPkt); } @@ -577,46 +577,46 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "fivePkt": + case "domainPkt": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> fivePktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> domainPktItemsSketch = new ItemsSketch<>(1048576); - fivePktItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + domainPktItemsSketch.update(Dimension.setDomainDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getHttp_domain()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getHttp_domain()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setFiveDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setDomainDimension(entity),collectionValue); - DimensionItemsSketch fivePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fivePktItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch domainPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.DOMAIN,domainPktItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("fivePkt",fivePktDimensionItemsSketch); + stringItemsSketchHashMap.put("domainPkt",domainPktDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("fivePkt").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("domainPkt").getItemsSketch().update(Dimension.setDomainDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue fivePkt = stringItemsSketchHashMap.get("fivePkt").getStringCollectionValueHashMap().get(Dimension.setFiveDimension(cnRecordLog)); + CollectionValue domainPkt = stringItemsSketchHashMap.get("domainPkt").getStringCollectionValueHashMap().get(Dimension.setDomainDimension(entity)); - if (fivePkt==null){ + if (domainPkt==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getHttp_domain()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getHttp_domain()); - stringItemsSketchHashMap.get("fivePkt").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("domainPkt").getStringCollectionValueHashMap().put(Dimension.setDomainDimension(entity),collectionValue); }else {//做加和 - fivePkt.getCollectionValue(fivePkt,cnRecordLog); + domainPkt.getCollectionValue(domainPkt,entity); - stringItemsSketchHashMap.get("fivePkt").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),fivePkt); + stringItemsSketchHashMap.get("domainPkt").getStringCollectionValueHashMap().put(Dimension.setDomainDimension(entity),domainPkt); } @@ -627,46 +627,46 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "sixPkt": + case "subscriberIdPkt": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> sixPktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> subscriberIdPktItemsSketch = new ItemsSketch<>(1048576); - sixPktItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + subscriberIdPktItemsSketch.update(Dimension.setSubscriberIdDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_subscriber_id()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_subscriber_id()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setSixDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setSubscriberIdDimension(entity),collectionValue); - DimensionItemsSketch sixPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixPktItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch subscriberIdPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SUBSCRIBERID,subscriberIdPktItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("sixPkt",sixPktDimensionItemsSketch); + stringItemsSketchHashMap.put("subscriberIdPkt",subscriberIdPktDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("sixPkt").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("subscriberIdPkt").getItemsSketch().update(Dimension.setSubscriberIdDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue sixPkt = stringItemsSketchHashMap.get("sixPkt").getStringCollectionValueHashMap().get(Dimension.setSixDimension(cnRecordLog)); + CollectionValue subscriberIdPkt = stringItemsSketchHashMap.get("subscriberIdPkt").getStringCollectionValueHashMap().get(Dimension.setSubscriberIdDimension(entity)); - if (sixPkt==null){ + if (subscriberIdPkt==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_subscriber_id()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_subscriber_id()); - stringItemsSketchHashMap.get("sixPkt").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("subscriberIdPkt").getStringCollectionValueHashMap().put(Dimension.setSubscriberIdDimension(entity),collectionValue); }else {//做加和 - sixPkt.getCollectionValue(sixPkt,cnRecordLog); + subscriberIdPkt.getCollectionValue(subscriberIdPkt,entity); - stringItemsSketchHashMap.get("sixPkt").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),sixPkt); + stringItemsSketchHashMap.get("subscriberIdPkt").getStringCollectionValueHashMap().put(Dimension.setSubscriberIdDimension(entity),subscriberIdPkt); } @@ -677,45 +677,45 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "sevenPkt": + case "appLabelPkt": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> sevenPktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> appLabelPktItemsSketch = new ItemsSketch<>(1048576); - sevenPktItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + appLabelPktItemsSketch.update(Dimension.setAppLabelDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_app_label()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_app_label()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setSevenDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setAppLabelDimension(entity),collectionValue); - DimensionItemsSketch sevenPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenPktItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch appLabelPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.APPLABEL,appLabelPktItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("sevenPkt",sevenPktDimensionItemsSketch); + stringItemsSketchHashMap.put("appLabelPkt",appLabelPktDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("sevenPkt").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("appLabelPkt").getItemsSketch().update(Dimension.setAppLabelDimension(entity),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - CollectionValue sevenPkt = stringItemsSketchHashMap.get("sevenPkt").getStringCollectionValueHashMap().get(Dimension.setSevenDimension(cnRecordLog)); + CollectionValue appLabelPkt = stringItemsSketchHashMap.get("appLabelPkt").getStringCollectionValueHashMap().get(Dimension.setAppLabelDimension(entity)); - if (sevenPkt==null){ + if (appLabelPkt==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets"); - collectionValue.setSource(cnRecordLog.getCommon_app_label()); + CollectionValue collectionValue = new CollectionValue(entity,"packets"); + collectionValue.setSource(entity.getCommon_app_label()); - stringItemsSketchHashMap.get("sevenPkt").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("appLabelPkt").getStringCollectionValueHashMap().put(Dimension.setAppLabelDimension(entity),collectionValue); }else {//做加和 - sevenPkt.getCollectionValue(sevenPkt,cnRecordLog); + appLabelPkt.getCollectionValue(appLabelPkt,entity); - stringItemsSketchHashMap.get("sevenPkt").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),sevenPkt); + stringItemsSketchHashMap.get("appLabelPkt").getStringCollectionValueHashMap().put(Dimension.setAppLabelDimension(entity),appLabelPkt); } @@ -725,47 +725,47 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "oneByte": + case "clientIpByte": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> oneByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> clientIpByteItemsSketch = new ItemsSketch<>(1048576); - oneByteItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + clientIpByteItemsSketch.update(Dimension.setClientIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_client_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_client_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setOneDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setClientIpDimension(entity),collectionValue); - DimensionItemsSketch oneByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,oneByteItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch clientIpByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.CLIENTIP,clientIpByteItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("oneByte",oneByteDimensionItemsSketch); + stringItemsSketchHashMap.put("clientIpByte",clientIpByteDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("oneByte").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("clientIpByte").getItemsSketch().update(Dimension.setClientIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue oneByte = stringItemsSketchHashMap.get("oneByte").getStringCollectionValueHashMap().get(Dimension.setOneDimension(cnRecordLog));//从key获取集合 + CollectionValue clientIpByte = stringItemsSketchHashMap.get("clientIpByte").getStringCollectionValueHashMap().get(Dimension.setClientIpDimension(entity));//从key获取集合 - if (oneByte==null){ + if (clientIpByte==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_client_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_client_ip()); - stringItemsSketchHashMap.get("oneByte").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("clientIpByte").getStringCollectionValueHashMap().put(Dimension.setClientIpDimension(entity),collectionValue); }else {//做加和 - oneByte.getCollectionValue(oneByte,cnRecordLog); + clientIpByte.getCollectionValue(clientIpByte,entity); - stringItemsSketchHashMap.get("oneByte").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),oneByte); + stringItemsSketchHashMap.get("clientIpByte").getStringCollectionValueHashMap().put(Dimension.setClientIpDimension(entity),clientIpByte); } @@ -774,45 +774,45 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "twoByte": + case "serverIpByte": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> twoByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> serverIpByteItemsSketch = new ItemsSketch<>(1048576); - twoByteItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + serverIpByteItemsSketch.update(Dimension.setServerIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_server_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setTwoDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setServerIpDimension(entity),collectionValue); - DimensionItemsSketch twoByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoByteItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch serverIpByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SERVERIP,serverIpByteItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("twoByte",twoByteDimensionItemsSketch); + stringItemsSketchHashMap.put("serverIpByte",serverIpByteDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("twoByte").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("serverIpByte").getItemsSketch().update(Dimension.setServerIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue twoByte = stringItemsSketchHashMap.get("twoByte").getStringCollectionValueHashMap().get(Dimension.setTwoDimension(cnRecordLog)); + CollectionValue serverIpByte = stringItemsSketchHashMap.get("serverIpByte").getStringCollectionValueHashMap().get(Dimension.setServerIpDimension(entity)); - if (twoByte==null){ + if (serverIpByte==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_server_ip()); - stringItemsSketchHashMap.get("twoByte").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("serverIpByte").getStringCollectionValueHashMap().put(Dimension.setServerIpDimension(entity),collectionValue); }else {//做加和 - twoByte.getCollectionValue(twoByte,cnRecordLog); + serverIpByte.getCollectionValue(serverIpByte,entity); - stringItemsSketchHashMap.get("twoByte").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),twoByte); + stringItemsSketchHashMap.get("serverIpByte").getStringCollectionValueHashMap().put(Dimension.setServerIpDimension(entity),serverIpByte); } @@ -821,49 +821,49 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "threeByte": + case "internalIpByte": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> threeByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> internalIpByteItemsSketch = new ItemsSketch<>(1048576); - threeByteItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + internalIpByteItemsSketch.update(Dimension.setInternalIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_internal_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_internal_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setThreeDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setInternalIpDimension(entity),collectionValue); - DimensionItemsSketch threeByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeByteItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch internalIpByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.INTERNALIP,internalIpByteItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("threeByte",threeByteDimensionItemsSketch); + stringItemsSketchHashMap.put("internalIpByte",internalIpByteDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("threeByte").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("internalIpByte").getItemsSketch().update(Dimension.setInternalIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue threeByte = stringItemsSketchHashMap.get("threeByte").getStringCollectionValueHashMap().get(Dimension.setThreeDimension(cnRecordLog)); + CollectionValue internalIpByte = stringItemsSketchHashMap.get("internalIpByte").getStringCollectionValueHashMap().get(Dimension.setInternalIpDimension(entity)); - if (threeByte==null){ + if (internalIpByte==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_server_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_server_ip()); - stringItemsSketchHashMap.get("threeByte").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("internalIpByte").getStringCollectionValueHashMap().put(Dimension.setInternalIpDimension(entity),collectionValue); }else {//做加和 - threeByte.getCollectionValue(threeByte,cnRecordLog); + internalIpByte.getCollectionValue(internalIpByte,entity); - stringItemsSketchHashMap.get("threeByte").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),threeByte); + stringItemsSketchHashMap.get("internalIpByte").getStringCollectionValueHashMap().put(Dimension.setInternalIpDimension(entity),internalIpByte); } @@ -874,50 +874,50 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "fourByte": + case "externalIpByte": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> fourByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> externalIpByteItemsSketch = new ItemsSketch<>(1048576); - fourByteItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + externalIpByteItemsSketch.update(Dimension.setExternalIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_external_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_external_ip()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setFourDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setExternalIpDimension(entity),collectionValue); - DimensionItemsSketch fourByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourByteItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch externalIpByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.EXTERNALIP,externalIpByteItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("fourByte",fourByteDimensionItemsSketch); + stringItemsSketchHashMap.put("externalIpByte",externalIpByteDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("fourByte").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("externalIpByte").getItemsSketch().update(Dimension.setExternalIpDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue fourByte = stringItemsSketchHashMap.get("fourByte").getStringCollectionValueHashMap().get(Dimension.setFourDimension(cnRecordLog)); + CollectionValue externalIpByte = stringItemsSketchHashMap.get("externalIpByte").getStringCollectionValueHashMap().get(Dimension.setExternalIpDimension(entity)); - if (fourByte==null){ + if (externalIpByte==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_external_ip()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_external_ip()); - stringItemsSketchHashMap.get("fourByte").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("externalIpByte").getStringCollectionValueHashMap().put(Dimension.setExternalIpDimension(entity),collectionValue); }else {//做加和 - fourByte.getCollectionValue(fourByte,cnRecordLog); + externalIpByte.getCollectionValue(externalIpByte,entity); - stringItemsSketchHashMap.get("fourByte").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),fourByte); + stringItemsSketchHashMap.get("externalIpByte").getStringCollectionValueHashMap().put(Dimension.setExternalIpDimension(entity),externalIpByte); } @@ -930,46 +930,46 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "fiveByte": + case "domainByte": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> fiveByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> domainByteItemsSketch = new ItemsSketch<>(1048576); - fiveByteItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + domainByteItemsSketch.update(Dimension.setDomainDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getHttp_domain()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getHttp_domain()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setFiveDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setDomainDimension(entity),collectionValue); - DimensionItemsSketch fiveByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveByteItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch domainByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.DOMAIN,domainByteItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("fiveByte",fiveByteDimensionItemsSketch); + stringItemsSketchHashMap.put("domainByte",domainByteDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("fiveByte").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("domainByte").getItemsSketch().update(Dimension.setDomainDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue fiveByte = stringItemsSketchHashMap.get("fiveByte").getStringCollectionValueHashMap().get(Dimension.setFiveDimension(cnRecordLog)); + CollectionValue domainByte = stringItemsSketchHashMap.get("domainByte").getStringCollectionValueHashMap().get(Dimension.setDomainDimension(entity)); - if (fiveByte==null){ + if (domainByte==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getHttp_domain()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getHttp_domain()); - stringItemsSketchHashMap.get("fiveByte").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("domainByte").getStringCollectionValueHashMap().put(Dimension.setDomainDimension(entity),collectionValue); }else {//做加和 - fiveByte.getCollectionValue(fiveByte,cnRecordLog); + domainByte.getCollectionValue(domainByte,entity); - stringItemsSketchHashMap.get("fiveByte").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),fiveByte); + stringItemsSketchHashMap.get("domainByte").getStringCollectionValueHashMap().put(Dimension.setDomainDimension(entity),domainByte); } @@ -981,49 +981,49 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "sixByte": + case "subscriberIdByte": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> sixByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> subscriberIdByteItemsSketch = new ItemsSketch<>(1048576); - sixByteItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + subscriberIdByteItemsSketch.update(Dimension.setSubscriberIdDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_subscriber_id()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_subscriber_id()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setSixDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setSubscriberIdDimension(entity),collectionValue); - DimensionItemsSketch sixByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixByteItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch subscriberIdByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SUBSCRIBERID,subscriberIdByteItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("sixByte",sixByteDimensionItemsSketch); + stringItemsSketchHashMap.put("subscriberIdByte",subscriberIdByteDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("sixByte").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("subscriberIdByte").getItemsSketch().update(Dimension.setSubscriberIdDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue sixByte = stringItemsSketchHashMap.get("sixByte").getStringCollectionValueHashMap().get(Dimension.setSixDimension(cnRecordLog)); + CollectionValue subscriberIdByte = stringItemsSketchHashMap.get("subscriberIdByte").getStringCollectionValueHashMap().get(Dimension.setSubscriberIdDimension(entity)); - if (sixByte==null){ + if (subscriberIdByte==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_subscriber_id()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_subscriber_id()); - stringItemsSketchHashMap.get("sixByte").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("subscriberIdByte").getStringCollectionValueHashMap().put(Dimension.setSubscriberIdDimension(entity),collectionValue); }else {//做加和 - sixByte.getCollectionValue(sixByte,cnRecordLog); + subscriberIdByte.getCollectionValue(subscriberIdByte,entity); - stringItemsSketchHashMap.get("sixByte").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),sixByte); + stringItemsSketchHashMap.get("subscriberIdByte").getStringCollectionValueHashMap().put(Dimension.setSubscriberIdDimension(entity),subscriberIdByte); } @@ -1036,50 +1036,50 @@ public class DatasketchForMetricsAggregate implements AggregateFunction<Entity, } break; - case "sevenByte": + case "appLabelByte": if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> sevenByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch<String> appLabelByteItemsSketch = new ItemsSketch<>(1048576); - sevenByteItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + appLabelByteItemsSketch.update(Dimension.setAppLabelDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_app_label()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_app_label()); HashMap<String, CollectionValue> stringCollectionValueHashMap = new HashMap<>(); - stringCollectionValueHashMap.put(Dimension.setSevenDimension(cnRecordLog),collectionValue); + stringCollectionValueHashMap.put(Dimension.setAppLabelDimension(entity),collectionValue); - DimensionItemsSketch sevenByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenByteItemsSketch,stringCollectionValueHashMap); + DimensionItemsSketch appLabelByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.APPLABEL,appLabelByteItemsSketch,stringCollectionValueHashMap); - stringItemsSketchHashMap.put("sevenByte",sevenByteDimensionItemsSketch); + stringItemsSketchHashMap.put("appLabelByte",appLabelByteDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("sevenByte").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("appLabelByte").getItemsSketch().update(Dimension.setAppLabelDimension(entity),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - CollectionValue sevenByte = stringItemsSketchHashMap.get("sevenByte").getStringCollectionValueHashMap().get(Dimension.setSevenDimension(cnRecordLog)); + CollectionValue appLabelByte = stringItemsSketchHashMap.get("appLabelByte").getStringCollectionValueHashMap().get(Dimension.setAppLabelDimension(entity)); - if (sevenByte==null){ + if (appLabelByte==null){ - CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes"); - collectionValue.setSource(cnRecordLog.getCommon_app_label()); + CollectionValue collectionValue = new CollectionValue(entity,"bytes"); + collectionValue.setSource(entity.getCommon_app_label()); - stringItemsSketchHashMap.get("sevenByte").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),collectionValue); + stringItemsSketchHashMap.get("appLabelByte").getStringCollectionValueHashMap().put(Dimension.setAppLabelDimension(entity),collectionValue); }else {//做加和 - sevenByte.getCollectionValue(sevenByte,cnRecordLog); + appLabelByte.getCollectionValue(appLabelByte,entity); - stringItemsSketchHashMap.get("sevenByte").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),sevenByte); + stringItemsSketchHashMap.get("appLabelByte").getStringCollectionValueHashMap().put(Dimension.setAppLabelDimension(entity),appLabelByte); } diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate.java b/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate.java index 03b04f3..04015c1 100644 --- a/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate.java +++ b/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate.java @@ -24,56 +24,56 @@ public class DatasketchForUrlAggregate implements AggregateFunction<UrlEntity, H } @Override - public HashMap<String, DimensionItemsSketch> add(UrlEntity cnRecordLog, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) { + public HashMap<String, DimensionItemsSketch> add(UrlEntity urlEntity, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) { // String dimension = cnRecordLog.getCommon_client_ip();//维度 // System.out.println(dimension); if(stringItemsSketchHashMap.isEmpty()) { - ItemsSketch<String> eightSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> urlSessionItemsSketch = new ItemsSketch<>(1048576);//新建 - eightSessionItemsSketch.update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + urlSessionItemsSketch.update(Dimension.setUrlDimension(urlEntity),urlEntity.getCommon_sessions()); TopUrlEntity topUrlEntity = new TopUrlEntity(); - topUrlEntity.setSession_num(cnRecordLog.getCommon_sessions()); + topUrlEntity.setSession_num(urlEntity.getCommon_sessions()); topUrlEntity.setStat_time(System.currentTimeMillis() / 1000); - topUrlEntity.setUrl(cnRecordLog.getHttp_url()); - topUrlEntity.setVsys_id(cnRecordLog.getCommon_vsys_id()); + topUrlEntity.setUrl(urlEntity.getHttp_url()); + topUrlEntity.setVsys_id(urlEntity.getCommon_vsys_id()); HashMap<String,TopUrlEntity> stringTopUrlEntityHashMap = new HashMap<>(); - stringTopUrlEntityHashMap.put(Dimension.setEightDimension(cnRecordLog),topUrlEntity); + stringTopUrlEntityHashMap.put(Dimension.setUrlDimension(urlEntity),topUrlEntity); - DimensionItemsSketch eightSessionDimensionItemsSketch = new DimensionItemsSketch(stringTopUrlEntityHashMap,Dimension.EIGHT,eightSessionItemsSketch); + DimensionItemsSketch urlSessionDimensionItemsSketch = new DimensionItemsSketch(stringTopUrlEntityHashMap,Dimension.URL,urlSessionItemsSketch); - stringItemsSketchHashMap.put("eightSession", eightSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("urlSession", urlSessionDimensionItemsSketch); }else { - stringItemsSketchHashMap.get("eightSession").getItemsSketch().update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("urlSession").getItemsSketch().update(Dimension.setUrlDimension(urlEntity),urlEntity.getCommon_sessions()); - TopUrlEntity eightSession = stringItemsSketchHashMap.get("eightSession").getStringTopUrlEntityHashMap().get(Dimension.setEightDimension(cnRecordLog));//从key获取集合 + TopUrlEntity urlSession = stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().get(Dimension.setUrlDimension(urlEntity));//从key获取集合 - if (eightSession==null){ + if (urlSession==null){ TopUrlEntity topUrlEntity = new TopUrlEntity(); - topUrlEntity.setSession_num(cnRecordLog.getCommon_sessions()); + topUrlEntity.setSession_num(urlEntity.getCommon_sessions()); topUrlEntity.setStat_time(System.currentTimeMillis() / 1000); - topUrlEntity.setUrl(cnRecordLog.getHttp_url()); - topUrlEntity.setVsys_id(cnRecordLog.getCommon_vsys_id()); + topUrlEntity.setUrl(urlEntity.getHttp_url()); + topUrlEntity.setVsys_id(urlEntity.getCommon_vsys_id()); - stringItemsSketchHashMap.get("eightSession").getStringTopUrlEntityHashMap().put(Dimension.setEightDimension(cnRecordLog),eightSession); + stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().put(Dimension.setUrlDimension(urlEntity),urlSession); }else {//做加和 - eightSession.setSession_num(eightSession.getSession_num()+cnRecordLog.getCommon_sessions()); + urlSession.setSession_num(urlSession.getSession_num()+urlEntity.getCommon_sessions()); - stringItemsSketchHashMap.get("eightSession").getStringTopUrlEntityHashMap().put(Dimension.setEightDimension(cnRecordLog),eightSession); + stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().put(Dimension.setUrlDimension(urlEntity),urlSession); } diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchMetricsCalculate.java b/src/main/java/com/galaxy/tsg/function/DatasketchMetricsCalculate.java new file mode 100644 index 0000000..47427fb --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/DatasketchMetricsCalculate.java @@ -0,0 +1,276 @@ +package com.galaxy.tsg.function; + +import com.alibaba.fastjson.JSONObject; +import com.galaxy.tsg.pojo.*; +import org.apache.datasketches.frequencies.ErrorType; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.HashMap; + +import static com.galaxy.tsg.config.commonConfig.TOP_LIMIT; + +/** + * @author fy + * @version 1.0 + * @date 2023/3/10 16:09 + */ +public class DatasketchMetricsCalculate extends ProcessWindowFunction< + Entity, // 输入类型 + ResultEntity, // 输出类型 + Tuple4<String,Long,String,String>, // 键类型 + TimeWindow> { + + private final int topSize; + private final String key; + + + + + public DatasketchMetricsCalculate(int topSize, String key) { + this.topSize = topSize; + this.key = key; + } + + + @Override + public void process(Tuple4<String, Long, String, String> S, Context context, Iterable<Entity> iterable, Collector<ResultEntity> collector) throws Exception { + + + ItemsSketch<String> sessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> pktItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch<String> byteItemsSketch = new ItemsSketch<>(1048576);//新建 + + HashMap<String, SessionResultEntity> sessionResultEntityHashMap = new HashMap<>(); + HashMap<String, PacketResultEntity> packetResultEntityHashMap = new HashMap<>(); + HashMap<String, ByteResultEntity> byteResultEntityHashMap = new HashMap<>(); + + + + for (Entity entity:iterable){ + + //处理session + sessionItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_sessions()); + sessionResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichessionResult(context.window().getEnd() / 1000, entity)); + + //处理pkt + pktItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); + packetResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichPacketResult(context.window().getEnd() / 1000, entity)); + + //处理byte + byteItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); + byteResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichByteResult(context.window().getEnd() / 1000, entity)); + + + } + + + ItemsSketch.Row<String>[] sessionFrequentItems = sessionItemsSketch.getFrequentItems(ErrorType.NO_FALSE_POSITIVES); + + for (int i=0;i<sessionFrequentItems.length;i++){ + + SessionResultEntity sessionResultEntity = sessionResultEntityHashMap.get(sessionFrequentItems[i].getItem()); + + ResultEntity en = new ResultEntity(); + en.setOrder_by("sessions"); + en.setStat_time(context.window().getEnd() / 1000); + en.setSessionResultEntity(sessionResultEntity); + collector.collect(en); + + + if (i==topSize)//够条数就结束 + break; + + } + + + ItemsSketch.Row<String>[] pktFrequentItems = pktItemsSketch.getFrequentItems(ErrorType.NO_FALSE_POSITIVES); + + for (int i=0;i<pktFrequentItems.length;i++){ + + PacketResultEntity packetResultEntity = packetResultEntityHashMap.get(pktFrequentItems[i].getItem()); + + ResultEntity en = new ResultEntity(); + en.setOrder_by("packets"); + en.setStat_time(context.window().getEnd() / 1000); + en.setPacketResultEntity(packetResultEntity); + collector.collect(en); + + + if (i==topSize)//够条数就结束 + break; + + } + + + ItemsSketch.Row<String>[] byteFrequentItems = byteItemsSketch.getFrequentItems(ErrorType.NO_FALSE_POSITIVES); + + for (int i=0;i<byteFrequentItems.length;i++){ + + ByteResultEntity byteResultEntity = byteResultEntityHashMap.get(byteFrequentItems[i].getItem()); + + ResultEntity en = new ResultEntity(); + en.setOrder_by("bytes"); + en.setStat_time(context.window().getEnd() / 1000); + en.setByteResultEntity(byteResultEntity); + collector.collect(en); + + + if (i==TOP_LIMIT)//够条数就结束 + break; + + } + + + + } + + + + + + + + + + + public ByteResultEntity enrichByteResult(Long time,Entity objectEntity) { + ByteResultEntity en = new ByteResultEntity(); + en.setVsys_id(objectEntity.getCommon_vsys_id()); + en.setStat_time(time); + en.setSource(objectEntity.getCommon_client_ip()); + en.setSession_num(objectEntity.getCommon_sessions()); + en.setOrder_by("bytes"); + en.setDevice_group(objectEntity.getCommon_device_group()); + en.setData_center(objectEntity.getCommon_data_center()); + en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num()); + en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num()); + en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num()); + en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num()); + switch (key) { + case "common_client_ip": + en.setSource(objectEntity.getCommon_client_ip()); + break; + case "common_server_ip": + en.setDestination(objectEntity.getCommon_server_ip()); + break; + case "common_internal_ip": + en.setSource(objectEntity.getCommon_internal_ip()); + break; + case "common_external_ip": + en.setDestination(objectEntity.getCommon_external_ip()); + break; + case "http_domain": + en.setDomain(objectEntity.getHttp_domain()); + break; + + case "common_subscriber_id": + en.setSubscriber_id(objectEntity.getCommon_subscriber_id()); + break; + + case "common_app_label": + en.setApp_name(objectEntity.getCommon_app_label()); + break; + + default: + + } + return en; + + } + public SessionResultEntity enrichessionResult(Long time,Entity objectEntity){ + + SessionResultEntity en =new SessionResultEntity(); + en.setVsys_id(objectEntity.getCommon_vsys_id()); + en.setStat_time(time); + en.setSession_num(objectEntity.getCommon_sessions()); + en.setOrder_by("sessions"); + en.setDevice_group(objectEntity.getCommon_device_group()); + en.setData_center(objectEntity.getCommon_data_center()); + en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num()); + en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num()); + en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num()); + en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num()); + switch(key) { + case "common_client_ip": + en.setSource(objectEntity.getCommon_client_ip()); + break; + case "common_server_ip": + en.setDestination(objectEntity.getCommon_server_ip()); + break; + case "common_internal_ip": + en.setSource(objectEntity.getCommon_internal_ip()); + break; + case "common_external_ip": + en.setDestination(objectEntity.getCommon_external_ip()); + break; + case "http_domain": + en.setDomain(objectEntity.getHttp_domain()); + break; + + case "common_subscriber_id": + en.setSubscriber_id(objectEntity.getCommon_subscriber_id()); + break; + + case "common_app_label": + en.setApp_name(objectEntity.getCommon_app_label()); + break; + + default: + + + + } + return en; + } + public PacketResultEntity enrichPacketResult(Long time,Entity objectEntity){ + PacketResultEntity en =new PacketResultEntity(); + en.setVsys_id(objectEntity.getCommon_vsys_id()); + en.setStat_time(time); + en.setSource(objectEntity.getCommon_client_ip()); + en.setSession_num(objectEntity.getCommon_sessions()); + en.setOrder_by("packets"); + en.setDevice_group(objectEntity.getCommon_device_group()); + en.setData_center(objectEntity.getCommon_data_center()); + en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num()); + en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num()); + en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num()); + en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num()); + switch(key) { + case "common_client_ip": + en.setSource(objectEntity.getCommon_client_ip()); + break; + case "common_server_ip": + en.setDestination(objectEntity.getCommon_server_ip()); + break; + case "common_internal_ip": + en.setSource(objectEntity.getCommon_internal_ip()); + break; + case "common_external_ip": + en.setDestination(objectEntity.getCommon_external_ip()); + break; + case "common_subscriber_id": + en.setSubscriber_id(objectEntity.getCommon_subscriber_id()); + break; + + case "common_app_label": + en.setApp_name(objectEntity.getCommon_app_label()); + break; + + default: + + + } + return en; + } + + + + + + + +} diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchUrlCalculate.java b/src/main/java/com/galaxy/tsg/function/DatasketchUrlCalculate.java new file mode 100644 index 0000000..e65d537 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/DatasketchUrlCalculate.java @@ -0,0 +1,96 @@ +package com.galaxy.tsg.function; + +import com.galaxy.tsg.pojo.ResultEntity; +import com.galaxy.tsg.pojo.SessionResultEntity; +import com.galaxy.tsg.pojo.TopUrlEntity; +import com.galaxy.tsg.pojo.UrlEntity; +import org.apache.datasketches.frequencies.ErrorType; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.HashMap; + +/** + * @author fy + * @version 1.0 + * @date 2023/3/10 18:27 + */ +public class DatasketchUrlCalculate extends ProcessWindowFunction< + UrlEntity, // 输入类型 + ResultEntity, // 输出类型 + Tuple2<String,Long>, // 键类型 + TimeWindow> { + + private final int topSize; + + + + public DatasketchUrlCalculate(int topSize) { + this.topSize = topSize; + } + + + @Override + public void process(Tuple2<String, Long> stringLongTuple2, Context context, Iterable<UrlEntity> iterable, Collector<ResultEntity> collector) throws Exception { + + + ItemsSketch<String> sessionItemsSketch = new ItemsSketch<>(1048576);//新建 + HashMap<String, TopUrlEntity> sessionResultEntityHashMap = new HashMap<>(); + + + + + for (UrlEntity urlEntity:iterable){ + + + //处理session + sessionItemsSketch.update(Dimension.setUrlDimension(urlEntity),urlEntity.getCommon_sessions()); + + TopUrlEntity en = new TopUrlEntity(); + en.setSession_num(urlEntity.getCommon_sessions()); + en.setStat_time(context.window().getEnd() / 1000); + en.setUrl(urlEntity.getHttp_url()); + en.setVsys_id(urlEntity.getCommon_vsys_id()); + + sessionResultEntityHashMap.put(Dimension.setUrlDimension(urlEntity),en); + + } + + + ItemsSketch.Row<String>[] sessionFrequentItems = sessionItemsSketch.getFrequentItems(ErrorType.NO_FALSE_POSITIVES); + + for (int i=0;i<sessionFrequentItems.length;i++){ + + TopUrlEntity topUrlEntity = sessionResultEntityHashMap.get(sessionFrequentItems[i].getItem()); + + ResultEntity resultEntity = new ResultEntity(); + resultEntity.setOrder_by("sessions"); + resultEntity.setStat_time(context.window().getEnd() / 1000); + resultEntity.setTopUrlEntity(topUrlEntity); + collector.collect(resultEntity); + + + if (i==topSize)//够条数就结束 + break; + + } + + + + + + } + + + + + + + + + + +} diff --git a/src/main/java/com/galaxy/tsg/function/Dimension.java b/src/main/java/com/galaxy/tsg/function/Dimension.java index bc5a683..17e6476 100644 --- a/src/main/java/com/galaxy/tsg/function/Dimension.java +++ b/src/main/java/com/galaxy/tsg/function/Dimension.java @@ -12,69 +12,116 @@ import com.galaxy.tsg.pojo.UrlEntity; */ public class Dimension { - public static final String ONE = "common_client_ip,common_vsys_id,common_device_group,common_data_center"; - public static final String TWO = "common_server_ip,common_vsys_id,common_device_group,common_data_center"; - public static final String THREE = "common_internal_ip,common_vsys_id,common_device_group,common_data_center"; - public static final String FOUR = "common_external_ip,common_vsys_id,common_device_group,common_data_center"; - public static final String FIVE = "http_domain,common_vsys_id,common_device_group,common_data_center"; - public static final String SIX = "common_subscriber_id,common_vsys_id,common_device_group,common_data_center"; - public static final String SEVEN = "common_app_label,common_vsys_id,common_device_group,common_data_center"; + public static final String CLIENTIP = "common_client_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String SERVERIP = "common_server_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String INTERNALIP = "common_internal_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String EXTERNALIP = "common_external_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String DOMAIN = "http_domain,common_vsys_id,common_device_group,common_data_center"; + public static final String SUBSCRIBERID = "common_subscriber_id,common_vsys_id,common_device_group,common_data_center"; + public static final String APPLABEL = "common_app_label,common_vsys_id,common_device_group,common_data_center"; - public static final String EIGHT = "http_url,common_vsys_id"; + public static final String URL = "http_url,common_vsys_id"; - public static String setOneDimension(Entity cnRecordLog){ + public static String setClientIpDimension(Entity entity){ - String oneDimension = cnRecordLog.getCommon_client_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); - return oneDimension; + String clientIpDimension = entity.getCommon_client_ip()+","+entity.getCommon_vsys_id()+","+entity.getCommon_device_group()+","+entity.getCommon_data_center(); + return clientIpDimension; } - public static String setTwoDimension(Entity cnRecordLog){ + public static String setServerIpDimension(Entity entity){ - String twoDimension = cnRecordLog.getCommon_server_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); - return twoDimension; + String serverIpDimension = entity.getCommon_server_ip()+","+entity.getCommon_vsys_id()+","+entity.getCommon_device_group()+","+entity.getCommon_data_center(); + return serverIpDimension; } - public static String setThreeDimension(Entity cnRecordLog){ + public static String setInternalIpDimension(Entity entity){ - String threeDimension = cnRecordLog.getCommon_internal_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); - return threeDimension; + String internalIpDimension = entity.getCommon_internal_ip()+","+entity.getCommon_vsys_id()+","+entity.getCommon_device_group()+","+entity.getCommon_data_center(); + return internalIpDimension; } - public static String setFourDimension(Entity cnRecordLog){ + public static String setExternalIpDimension(Entity entity){ - String fourDimension = cnRecordLog.getCommon_external_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); - return fourDimension; + String externalIpDimension = entity.getCommon_external_ip()+","+entity.getCommon_vsys_id()+","+entity.getCommon_device_group()+","+entity.getCommon_data_center(); + return externalIpDimension; } - public static String setFiveDimension(Entity cnRecordLog){ + public static String setDomainDimension(Entity entity){ - String fiveDimension = cnRecordLog.getHttp_domain()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); - return fiveDimension; + String domainDimension = entity.getHttp_domain()+","+entity.getCommon_vsys_id()+","+entity.getCommon_device_group()+","+entity.getCommon_data_center(); + return domainDimension; } - public static String setSixDimension(Entity cnRecordLog){ + public static String setSubscriberIdDimension(Entity entity){ - String sixDimension = cnRecordLog.getCommon_subscriber_id()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); - return sixDimension; + String subscriberIdDimension = entity.getCommon_subscriber_id()+","+entity.getCommon_vsys_id()+","+entity.getCommon_device_group()+","+entity.getCommon_data_center(); + return subscriberIdDimension; } - public static String setSevenDimension(Entity cnRecordLog){ + public static String setAppLabelDimension(Entity entity){ - String sevenDimension = cnRecordLog.getCommon_app_label()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); - return sevenDimension; + String appLabelDimension = entity.getCommon_app_label()+","+entity.getCommon_vsys_id()+","+entity.getCommon_device_group()+","+entity.getCommon_data_center(); + return appLabelDimension; } - public static String setEightDimension(UrlEntity cnRecordLog){ + public static String setUrlDimension(UrlEntity entity){ - String eightDimension = cnRecordLog.getHttp_url()+","+cnRecordLog.getCommon_vsys_id(); - return eightDimension; + String urlDimension = entity.getHttp_url()+","+entity.getCommon_vsys_id(); + return urlDimension; } + + + public static String setDimension(Entity entity,String key){ + + String dimension = ""; + + switch (key) { + case "common_client_ip": + dimension = setClientIpDimension(entity); + break; + case "common_server_ip": + dimension = setServerIpDimension(entity); + break; + case "common_internal_ip": + dimension = setInternalIpDimension(entity); + break; + case "common_external_ip": + dimension = setExternalIpDimension(entity); + break; + case "http_domain": + dimension = setDomainDimension(entity); + break; + + case "common_subscriber_id": + dimension = setSubscriberIdDimension(entity); + break; + + case "common_app_label": + dimension = setAppLabelDimension(entity); + break; + + default: + + } + + + + + return dimension; + } + + + + + + + } diff --git a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java index c98545b..f689caf 100644 --- a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java +++ b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java @@ -13,6 +13,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import static com.galaxy.tsg.config.commonConfig.TOP_LIMIT; + /** * @author fy * @version 1.0 @@ -23,41 +25,43 @@ public class UserCountWindowResult5 extends ProcessAllWindowFunction<HashMap<Str @Override public void process(Context context, Iterable<HashMap<String, DimensionItemsSketch>> iterable, Collector<String> collector) throws Exception { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("-------------------------\n"); - stringBuilder.append("datasketches方法窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n"); +// StringBuilder stringBuilder = new StringBuilder(); +// stringBuilder.append("-------------------------\n"); +// stringBuilder.append("datasketches方法窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n"); HashMap<String, DimensionItemsSketch> dataHashMap = iterable.iterator().next(); - System.out.println(dataHashMap.toString()); +// System.out.println(dataHashMap.toString()); Set<Map.Entry<String, DimensionItemsSketch>> entries = dataHashMap.entrySet(); for(Map.Entry<String, DimensionItemsSketch> entry:entries){ - System.out.println(entry.getKey()+""); - stringBuilder.append(entry.getKey()+"\n"); +// System.out.println(entry.getKey()+""); +// stringBuilder.append(entry.getKey()+"\n"); ItemsSketch.Row<String>[] items = entry.getValue().getItemsSketch().getFrequentItems(ErrorType.NO_FALSE_POSITIVES); for (int i=0;i<items.length;i++){ - String resultStr = "No." + (i + 1) + " " - + "ip:" + items[i].getItem() + " " - + " Est:" + items[i].getEstimate() - + " UB:" + items[i].getUpperBound() - + " LB:" + items[i].getLowerBound(); +// String resultStr = "No." + (i + 1) + " " +// + "ip:" + items[i].getItem() + " " +// + " Est:" + items[i].getEstimate() +// + " UB:" + items[i].getUpperBound() +// + " LB:" + items[i].getLowerBound(); String jsonStr = ""; - if(!entry.getKey().equals("eightSession")){ + if(!entry.getKey().equals("urlSession")){ CollectionValue collectionValue = entry.getValue().getStringCollectionValueHashMap().get(items[i].getItem()); + collectionValue.setStat_time(context.window().getEnd()/1000); jsonStr = JSONObject.toJSONString(collectionValue); }else { TopUrlEntity topUrlEntity = entry.getValue().getStringTopUrlEntityHashMap().get(items[i].getItem()); + topUrlEntity.setStat_time(context.window().getEnd()/1000); jsonStr = JSONObject.toJSONString(topUrlEntity); } @@ -65,12 +69,12 @@ public class UserCountWindowResult5 extends ProcessAllWindowFunction<HashMap<Str collector.collect(jsonStr); // String item = items[i].toString(); - stringBuilder.append(resultStr); +// stringBuilder.append(resultStr); - stringBuilder.append("\n"); +// stringBuilder.append("\n"); - if (i==1000) + if (i==TOP_LIMIT)//够条数就结束 break; } |
