summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-09-13 09:46:02 +0800
committerwanglihui <[email protected]>2021-09-13 09:46:02 +0800
commit4f8807dfa15e9c00adf2e4e706b021978b600394 (patch)
tree48664e50c87b80d8a535334c797eb5ef66338f14
parent81f64994589b696482403f316f10dcbb5dded340 (diff)
修改计算速率方式,使用session总数除以时间窗口
-rw-r--r--src/main/java/com/zdjizhi/etl/EtlProcessFunction.java4
1 files changed, 3 insertions, 1 deletions
diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
index 464b5f6..aab3e88 100644
--- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
@@ -80,7 +80,9 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
}
String sourceIpList = StringUtils.join(sourceIpSet, ",");
- return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
+// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
+ return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
+ bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
}catch (Exception e){
logger.error("聚合中间结果集失败 {}",e);
}