summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-08-25 15:52:28 +0800
committerzhanghongqing <[email protected]>2022-08-25 15:52:28 +0800
commit812fb82c95417a1d3cf7f068c3c07860230f36cd (patch)
tree25251400a422a53cb67bb50a1cbab6d0f174da5a /src/main
parent065db72593172bfe23aa2f48877ca0f7cc5feb7d (diff)
代码优化,对不规范字节数处理,科学计数法处理
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/zdjizhi/enums/LogMetadata.java7
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java3
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java162
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java25
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java4
7 files changed, 202 insertions, 26 deletions
diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java
index 7c501b5..576b846 100644
--- a/src/main/java/com/zdjizhi/enums/LogMetadata.java
+++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java
@@ -18,7 +18,12 @@ public enum LogMetadata {
* 日志名称,表名,字段
* */
- CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}),
+ CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{
+ "cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status",
+ "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy",
+ "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent",
+ "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num",
+ "imei", "imsi"}),
CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}),
CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}),
DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}),
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
index 3038914..c61fdaa 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -1,6 +1,7 @@
package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
+import com.alibaba.fastjson.util.TypeUtils;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.etl.dns.SketchTimeMapFunction;
import com.zdjizhi.utils.kafka.KafkaConsumer;
@@ -85,7 +86,7 @@ public class ConnLogService {
private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
- .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
+ .withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction());
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java
new file mode 100644
index 0000000..e469ac1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java
@@ -0,0 +1,162 @@
+//package com.zdjizhi.etl.connection;
+//
+//import cn.hutool.core.convert.Convert;
+//import cn.hutool.core.util.StrUtil;
+//import com.zdjizhi.etl.LogService;
+//import com.zdjizhi.etl.dns.SketchTimeMapFunction;
+//import com.zdjizhi.utils.kafka.KafkaConsumer;
+//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+//import org.apache.flink.api.common.functions.MapFunction;
+//import org.apache.flink.api.java.utils.ParameterTool;
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+//import org.apache.flink.streaming.api.windowing.time.Time;
+//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
+//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
+//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
+//
+//import java.time.Duration;
+//import java.util.*;
+//
+//import static com.zdjizhi.common.FlowWriteConfig.*;
+//
+//
+//public class ConnLogService2 {
+//
+// public static String convertToCsv(Map<String, Object> map) {
+// List<Object> list = new ArrayList<>();
+// list.add("(");
+// for (Map.Entry<String, Object> m : map.entrySet()) {
+// if (m.getValue() instanceof String) {
+// list.add("'" + m.getValue() + "'");
+// } else {
+// list.add(m.getValue());
+// }
+// }
+// list.add(")");
+// String join = StrUtil.join(",", list);
+// return join;
+//
+// }
+//
+// public static void connLogStream(StreamExecutionEnvironment env) throws Exception {
+// //connection
+// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
+// //sketch
+// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
+//
+// //写入CKsink,批量处理
+// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
+//
+// Map<String, String> globalParameters = new HashMap<>();
+// // ClickHouse cluster properties
+// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/");
+// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME);
+// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN);
+//
+// // sink common
+// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
+// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");
+// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
+// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
+// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2");
+// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
+//
+// // set global paramaters
+// ParameterTool parameters = ParameterTool.fromMap(globalParameters);
+// env.getConfig().setGlobalJobParameters(parameters);
+//
+// // Transform 操作
+// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() {
+// @Override
+// public String map(Map<String, Object> data) throws Exception {
+// String s = convertToCsv(data);
+// System.err.println(s);
+// return convertToCsv(data);
+// }
+// });
+//
+//
+// // create props for sink
+// Properties props = new Properties();
+// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH);
+// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH);
+// ClickHouseSink sink = new ClickHouseSink(props);
+// dataStream.addSink(sink);
+// dataStream.print();
+//
+//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
+//
+// //transform
+// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
+//
+// //写入ck通联relation表
+// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
+//
+// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
+//
+// //合并通联和通联sketch
+// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
+//
+// //写入arangodb
+// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
+//
+// }
+//
+// /**
+// * 通联原始日志数据源消费kafka
+// *
+// * @param source
+// * @return
+// */
+// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
+//
+// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
+//
+// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
+// .setParallelism(SOURCE_PARALLELISM)
+// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
+// .setParallelism(SOURCE_PARALLELISM)
+// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
+// .setParallelism(SOURCE_PARALLELISM)
+// .name(source)
+// .setParallelism(SOURCE_PARALLELISM);
+// return sourceStream;
+// }
+//
+// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
+// DataStream<Map<String, Object>> connTransformStream = connSource
+// .assignTimestampsAndWatermarks(WatermarkStrategy
+// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
+// .withTimestampAssigner((event, timestamp) -> {
+// return Convert.toLong(event.get("conn_start_time")) * 1000;
+// }))
+// .setParallelism(TRANSFORM_PARALLELISM)
+// .keyBy(new IpKeysSelector())
+// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+// .process(new ConnProcessFunction())
+// .setParallelism(TRANSFORM_PARALLELISM);
+// return connTransformStream;
+// }
+//
+// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
+// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
+// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
+// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
+// .keyBy(new IpKeysSelector())
+// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+// .process(new SketchProcessFunction());
+// return sketchTransformStream;
+// }
+//
+// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
+// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
+// .keyBy(new IpKeysSelector())
+// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
+// .process(new Ip2IpGraphProcessFunction())
+// .setParallelism(TRANSFORM_PARALLELISM);
+// return ip2ipGraph;
+// }
+//
+//}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
index 40afc63..bdad72a 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
@@ -4,13 +4,14 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.util.TypeUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -28,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
try {
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
if (values != null) {
- Map<String, Object> result = new LinkedHashMap<>();
+ Map<String, Object> result = new HashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -45,18 +46,24 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
}
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
- long sessions = 0;
- long packets = 0;
- long bytes = 0;
+ long sessions = 0L;
+ long packets = 0L;
+ long bytes = 0L;
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
- if(connStartTimetime>0){
+ if (connStartTimetime > 0) {
sessions++;
- packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts"));
- bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes"));
+ Long totalCsPkts = TypeUtils.castToLong(newSketchLog.get("total_cs_pkts"));
+ Long totalScPkts = TypeUtils.castToLong(newSketchLog.get("total_sc_pkts"));
+ packets = packets + totalCsPkts < Long.MAX_VALUE ? totalCsPkts : 0 + totalScPkts < Long.MAX_VALUE ? totalScPkts : 0;
+
+ Long totalCsBytes = TypeUtils.castToLong(newSketchLog.get("total_cs_bytes"));
+ Long totalScBytes = TypeUtils.castToLong(newSketchLog.get("total_sc_bytes"));
+ bytes = bytes + totalCsBytes< Long.MAX_VALUE ? totalCsBytes : 0 + totalScBytes< Long.MAX_VALUE ? totalScBytes : 0;
+
startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
index 8862da6..31957dc 100644
--- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
@@ -4,13 +4,14 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.util.TypeUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -41,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
try {
if (values != null) {
- Map<String, Object> result = new LinkedHashMap<>();
+ Map<String, Object> result = new HashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -59,20 +60,20 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
}
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
- long sessions = 0;
- long packets = 0;
- long bytes = 0;
+ long sessions = 0L;
+ long packets = 0L;
+ long bytes = 0L;
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
- long connStartTimetime = Convert.toLong(newSketchLog.get("sketch_start_time"));
- if(connStartTimetime>0){
- sessions += Convert.toLong(newSketchLog.get("sketch_sessions"));
- packets += Convert.toLong(newSketchLog.get("sketch_packets"));
- bytes += Convert.toLong(newSketchLog.get("sketch_bytes"));
- startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
- endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ long connStartTime = Convert.toLong(newSketchLog.get("sketch_start_time"));
+ if (connStartTime > 0) {
+ sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) : 0;
+ packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_packets")) : 0;
+ bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) : 0;
+ startTime = connStartTime < startTime ? connStartTime : startTime;
+ endTime = connStartTime > endTime ? connStartTime : endTime;
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
index 894867f..8744060 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
@@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String
endTime = logStartTime > endTime ? logStartTime : endTime;
}
}
- Map<String, Object> newDns = new LinkedHashMap<>();
+ Map<String, Object> newDns = new HashMap<>();
newDns.put("start_time", startTime);
newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
newDns.put("record_type", keys.f0);
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
index 1ab45f8..0558dae 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -65,7 +65,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
- log.info("开始写入ck数据 :{}", data.size());
+ log.debug("开始写入ck数据 :{}", data.size());
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
@@ -86,7 +86,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
connection.commit();
connection.setAutoCommit(autoCommit);
stopWatch.stop();
- log.info("总共花费时间 {}", stopWatch.getTime());
+ log.debug("总共花费时间 {}", stopWatch.getTime());
} catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex);
}