diff options
Diffstat (limited to 'src/main/java')
5 files changed, 29 insertions, 24 deletions
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 6fa11d6..40afc63 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -52,13 +52,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec long endTime = DateUtil.currentSeconds(); try { for (Map<String, Object> newSketchLog : elements) { - sessions++; - packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts")); - bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes")); long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - + if(connStartTimetime>0){ + sessions++; + packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts")); + bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes")); + startTime = connStartTimetime < startTime ? connStartTimetime : startTime; + endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index aac8428..8862da6 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -66,12 +66,14 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj long endTime = DateUtil.currentSeconds(); try { for (Map<String, Object> newSketchLog : elements) { - sessions += Convert.toLong(newSketchLog.get("sketch_sessions")); - packets += Convert.toLong(newSketchLog.get("sketch_packets")); - bytes += Convert.toLong(newSketchLog.get("sketch_bytes")); long connStartTimetime = Convert.toLong(newSketchLog.get("sketch_start_time")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + if(connStartTimetime>0){ + sessions += Convert.toLong(newSketchLog.get("sketch_sessions")); + packets += Convert.toLong(newSketchLog.get("sketch_packets")); + bytes += Convert.toLong(newSketchLog.get("sketch_bytes")); + startTime = connStartTimetime < startTime ? connStartTimetime : startTime; + endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index 4fd104d..fa66f37 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -7,6 +7,7 @@ import cn.hutool.log.LogFactory; import com.google.common.base.Joiner; import com.zdjizhi.enums.DnsType; import com.zdjizhi.utils.json.TypeUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.MapFunction; import java.util.List; @@ -60,19 +61,19 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri rawLog.put("response", JSONUtil.toJsonStr(response)); //获取类型,相同类型合并用,拼接,并且计数加1 - rawLog.put("dns_a", dnsA); + rawLog.put("dns_a", StringUtils.removeStart(dnsA, ",")); rawLog.put("dns_a_num", dnsANum); - rawLog.put("dns_aaaa", dnsAAAA); + rawLog.put("dns_aaaa", StringUtils.removeStart(dnsAAAA, ",")); rawLog.put("dns_aaaa_num", dnsAAAANum); - rawLog.put("dns_cname", dnsCNAME); + rawLog.put("dns_cname", StringUtils.removeStart(dnsCNAME, ",")); rawLog.put("dns_cname_num", dnsCNAMENum); - rawLog.put("dns_ns", dnsNs); + rawLog.put("dns_ns", StringUtils.removeStart(dnsNs, ",")); rawLog.put("dns_ns_num", dnsNsNum); - rawLog.put("dns_mx", dnsMx); + rawLog.put("dns_mx", StringUtils.removeStart(dnsMx, ",")); rawLog.put("dns_mx_num", dnsMxNum); } } catch (Exception e) { @@ -81,5 +82,4 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri return rawLog; } - } diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java index 03a0c41..894867f 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java @@ -38,10 +38,12 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String long startTime = DateUtil.currentSeconds(); long endTime = DateUtil.currentSeconds(); for (Map<String, Object> log : elements) { - sessions++; long logStartTime = Convert.toLong(log.get("start_time")); - startTime = logStartTime < startTime ? logStartTime : startTime; - endTime = logStartTime > endTime ? logStartTime : endTime; + if (logStartTime > 0) { + sessions++; + startTime = logStartTime < startTime ? logStartTime : startTime; + endTime = logStartTime > endTime ? logStartTime : endTime; + } } Map<String, Object> newDns = new LinkedHashMap<>(); newDns.put("start_time", startTime); diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 1a075cc..ab577ce 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -41,14 +41,14 @@ public class LogFlowWriteTopology { //connection DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) .setParallelism(SOURCE_PARALLELISM) - .filter(Objects::nonNull) + .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("conn_start_time"))>0) .map(new ConnTimeMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH)) - .filter(Objects::nonNull) + .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("sketch_start_time"))>0) .map(new SketchTimeMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_SKETCH); @@ -75,7 +75,7 @@ public class LogFlowWriteTopology { DataStream<Map<String, Object>> connTransformStream = connSource .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) + .withTimestampAssigner((event, timestamp) -> {return Convert.toLong(event.get("conn_start_time")) * 1000;})) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) @@ -119,7 +119,7 @@ public class LogFlowWriteTopology { } else if (FlowWriteConfig.LOG_TYPE == 2) { DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) - .filter(Objects::nonNull) + .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("capture_time"))>0) .map(new DnsMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); |
