summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java13
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java12
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java12
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java8
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java8
5 files changed, 29 insertions, 24 deletions
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
index 6fa11d6..40afc63 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
@@ -52,13 +52,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
- sessions++;
- packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts"));
- bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes"));
long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
- startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
- endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
-
+ if(connStartTimetime>0){
+ sessions++;
+ packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts"));
+ bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes"));
+ startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
+ endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ }
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
} catch (Exception e) {
diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
index aac8428..8862da6 100644
--- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
@@ -66,12 +66,14 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
- sessions += Convert.toLong(newSketchLog.get("sketch_sessions"));
- packets += Convert.toLong(newSketchLog.get("sketch_packets"));
- bytes += Convert.toLong(newSketchLog.get("sketch_bytes"));
long connStartTimetime = Convert.toLong(newSketchLog.get("sketch_start_time"));
- startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
- endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ if(connStartTimetime>0){
+ sessions += Convert.toLong(newSketchLog.get("sketch_sessions"));
+ packets += Convert.toLong(newSketchLog.get("sketch_packets"));
+ bytes += Convert.toLong(newSketchLog.get("sketch_bytes"));
+ startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
+ endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ }
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
} catch (Exception e) {
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
index 4fd104d..fa66f37 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
@@ -7,6 +7,7 @@ import cn.hutool.log.LogFactory;
import com.google.common.base.Joiner;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.utils.json.TypeUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.List;
@@ -60,19 +61,19 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
rawLog.put("response", JSONUtil.toJsonStr(response));
//获取类型,相同类型合并用,拼接,并且计数加1
- rawLog.put("dns_a", dnsA);
+ rawLog.put("dns_a", StringUtils.removeStart(dnsA, ","));
rawLog.put("dns_a_num", dnsANum);
- rawLog.put("dns_aaaa", dnsAAAA);
+ rawLog.put("dns_aaaa", StringUtils.removeStart(dnsAAAA, ","));
rawLog.put("dns_aaaa_num", dnsAAAANum);
- rawLog.put("dns_cname", dnsCNAME);
+ rawLog.put("dns_cname", StringUtils.removeStart(dnsCNAME, ","));
rawLog.put("dns_cname_num", dnsCNAMENum);
- rawLog.put("dns_ns", dnsNs);
+ rawLog.put("dns_ns", StringUtils.removeStart(dnsNs, ","));
rawLog.put("dns_ns_num", dnsNsNum);
- rawLog.put("dns_mx", dnsMx);
+ rawLog.put("dns_mx", StringUtils.removeStart(dnsMx, ","));
rawLog.put("dns_mx_num", dnsMxNum);
}
} catch (Exception e) {
@@ -81,5 +82,4 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
return rawLog;
}
-
}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
index 03a0c41..894867f 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
@@ -38,10 +38,12 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
for (Map<String, Object> log : elements) {
- sessions++;
long logStartTime = Convert.toLong(log.get("start_time"));
- startTime = logStartTime < startTime ? logStartTime : startTime;
- endTime = logStartTime > endTime ? logStartTime : endTime;
+ if (logStartTime > 0) {
+ sessions++;
+ startTime = logStartTime < startTime ? logStartTime : startTime;
+ endTime = logStartTime > endTime ? logStartTime : endTime;
+ }
}
Map<String, Object> newDns = new LinkedHashMap<>();
newDns.put("start_time", startTime);
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index 1a075cc..ab577ce 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -41,14 +41,14 @@ public class LogFlowWriteTopology {
//connection
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
.setParallelism(SOURCE_PARALLELISM)
- .filter(Objects::nonNull)
+ .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("conn_start_time"))>0)
.map(new ConnTimeMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
- .filter(Objects::nonNull)
+ .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("sketch_start_time"))>0)
.map(new SketchTimeMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_SKETCH);
@@ -75,7 +75,7 @@ public class LogFlowWriteTopology {
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
- .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
+ .withTimestampAssigner((event, timestamp) -> {return Convert.toLong(event.get("conn_start_time")) * 1000;}))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
@@ -119,7 +119,7 @@ public class LogFlowWriteTopology {
} else if (FlowWriteConfig.LOG_TYPE == 2) {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
- .filter(Objects::nonNull)
+ .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("capture_time"))>0)
.map(new DnsMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);