diff options
| author | wanglihui <[email protected]> | 2021-09-13 09:46:02 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-09-13 09:46:02 +0800 |
| commit | 4f8807dfa15e9c00adf2e4e706b021978b600394 (patch) | |
| tree | 48664e50c87b80d8a535334c797eb5ef66338f14 | |
| parent | 81f64994589b696482403f316f10dcbb5dded340 (diff) | |
修改计算速率方式,使用session总数除以时间窗口
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/EtlProcessFunction.java | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java index 464b5f6..aab3e88 100644 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java @@ -80,7 +80,9 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS } } String sourceIpList = StringUtils.join(sourceIpSet, ","); - return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration); +// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration); + return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME, + bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration); }catch (Exception e){ logger.error("聚合中间结果集失败 {}",e); } |
