From ba64fe818729e0a1190d2e627e8cf5bb89140e81 Mon Sep 17 00:00:00 2001 From: zhanghongqing Date: Thu, 11 Aug 2022 14:04:45 +0800 Subject: 增量过滤时间为0的数据 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zdjizhi/etl/connection/ConnProcessFunction.java | 13 +++++++------ .../com/zdjizhi/etl/connection/SketchProcessFunction.java | 12 +++++++----- src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java | 12 ++++++------ .../com/zdjizhi/etl/dns/DnsRelationProcessFunction.java | 8 +++++--- .../java/com/zdjizhi/topology/LogFlowWriteTopology.java | 8 ++++---- 5 files changed, 29 insertions(+), 24 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 6fa11d6..40afc63 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -52,13 +52,14 @@ public class ConnProcessFunction extends ProcessWindowFunction newSketchLog : elements) { - sessions++; - packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts")); - bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes")); long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - + if(connStartTimetime>0){ + sessions++; + packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts")); + bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes")); + startTime = connStartTimetime < startTime ? connStartTimetime : startTime; + endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index aac8428..8862da6 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -66,12 +66,14 @@ public class SketchProcessFunction extends ProcessWindowFunction newSketchLog : elements) { - sessions += Convert.toLong(newSketchLog.get("sketch_sessions")); - packets += Convert.toLong(newSketchLog.get("sketch_packets")); - bytes += Convert.toLong(newSketchLog.get("sketch_bytes")); long connStartTimetime = Convert.toLong(newSketchLog.get("sketch_start_time")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + if(connStartTimetime>0){ + sessions += Convert.toLong(newSketchLog.get("sketch_sessions")); + packets += Convert.toLong(newSketchLog.get("sketch_packets")); + bytes += Convert.toLong(newSketchLog.get("sketch_bytes")); + startTime = connStartTimetime < startTime ? connStartTimetime : startTime; + endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index 4fd104d..fa66f37 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -7,6 +7,7 @@ import cn.hutool.log.LogFactory; import com.google.common.base.Joiner; import com.zdjizhi.enums.DnsType; import com.zdjizhi.utils.json.TypeUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.MapFunction; import java.util.List; @@ -60,19 +61,19 @@ public class DnsMapFunction implements MapFunction, Map, Map log : elements) { - sessions++; long logStartTime = Convert.toLong(log.get("start_time")); - startTime = logStartTime < startTime ? logStartTime : startTime; - endTime = logStartTime > endTime ? logStartTime : endTime; + if (logStartTime > 0) { + sessions++; + startTime = logStartTime < startTime ? logStartTime : startTime; + endTime = logStartTime > endTime ? logStartTime : endTime; + } } Map newDns = new LinkedHashMap<>(); newDns.put("start_time", startTime); diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 1a075cc..ab577ce 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -41,14 +41,14 @@ public class LogFlowWriteTopology { //connection DataStream> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) .setParallelism(SOURCE_PARALLELISM) - .filter(Objects::nonNull) + .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("conn_start_time"))>0) .map(new ConnTimeMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH)) - .filter(Objects::nonNull) + .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("sketch_start_time"))>0) .map(new SketchTimeMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_SKETCH); @@ -75,7 +75,7 @@ public class LogFlowWriteTopology { DataStream> connTransformStream = connSource .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) + .withTimestampAssigner((event, timestamp) -> {return Convert.toLong(event.get("conn_start_time")) * 1000;})) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) @@ -119,7 +119,7 @@ public class LogFlowWriteTopology { } else if (FlowWriteConfig.LOG_TYPE == 2) { DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) - .filter(Objects::nonNull) + .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("capture_time"))>0) .map(new DnsMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); -- cgit v1.2.3