summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-08-19 17:34:37 +0800
committerzhanghongqing <[email protected]>2022-08-19 17:34:37 +0800
commitb30f82f5881380f7c9445d59e16c4654d6473b37 (patch)
tree16265679946dbce1cfff945bbd2aa0e485c83ce6 /src/main/java
parentba64fe818729e0a1190d2e627e8cf5bb89140e81 (diff)
代码优化,抽取公共方法,优化clickhousesink写入
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/zdjizhi/enums/LogMetadata.java40
-rw-r--r--src/main/java/com/zdjizhi/etl/LogFormat.java19
-rw-r--r--src/main/java/com/zdjizhi/etl/LogService.java38
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java104
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsLogService.java76
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java171
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKUtils.java54
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java101
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/utils/general/CityHash.java180
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormMap.java120
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java122
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFunction.java230
15 files changed, 342 insertions, 976 deletions
diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java
index d3a7f49..2949085 100644
--- a/src/main/java/com/zdjizhi/enums/LogMetadata.java
+++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java
@@ -1,6 +1,11 @@
package com.zdjizhi.enums;
import cn.hutool.core.util.EnumUtil;
+import cn.hutool.core.util.StrUtil;
+
+import java.util.Arrays;
+
+import static com.zdjizhi.common.FlowWriteConfig.CK_DATABASE;
/**
* @description: \
@@ -10,23 +15,27 @@ import cn.hutool.core.util.EnumUtil;
public enum LogMetadata {
/*
- * 日志名称topic,表名
+ * 日志名称,表名,字段
* */
- CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log"),
- CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log"),
- DNS_RECORD_LOG("dns_record_log", "dns_record_log"),
+ CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}),
+ CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}),
+ CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}),
+ DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}),
+ DNS_RELATION_LOG("dns_relation_log", "dns_relation_log_local", new String[]{"start_time", "end_time", "record_type", "qname", "record", "sessions"}),
;
private String source;
private String sink;
+ private String[] fields;
LogMetadata() {
}
- LogMetadata(String source, String sink) {
+ LogMetadata(String source, String sink, String[] fields) {
this.source = source;
this.sink = sink;
+ this.fields = fields;
}
public String getSource() {
@@ -37,10 +46,31 @@ public enum LogMetadata {
return sink;
}
+ public String[] getFields() {
+ return fields;
+ }
+
public static String getLogSink(String source) {
LogMetadata logMetadata = EnumUtil.fromString(LogMetadata.class, source);
return logMetadata.getSink();
+ }
+ public static String[] getLogFields(String tableName) {
+ LogMetadata[] values = LogMetadata.values();
+ for (LogMetadata value : values) {
+ if (value.sink.equals(tableName)) {
+ return value.fields;
+ }
+ }
+ return null;
}
+ public static String preparedSql(String tableName) {
+ String[] fields = LogMetadata.getLogFields(tableName);
+ String[] placeholders = new String[fields.length];
+ Arrays.fill(placeholders, "?");
+
+ return StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
+ "(", StrUtil.join(",", fields), ") VALUES (", StrUtil.join(",", placeholders), ")");
+ }
}
diff --git a/src/main/java/com/zdjizhi/etl/LogFormat.java b/src/main/java/com/zdjizhi/etl/LogFormat.java
deleted file mode 100644
index c0edaa8..0000000
--- a/src/main/java/com/zdjizhi/etl/LogFormat.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.zdjizhi.etl;
-
-import com.zdjizhi.utils.json.TypeUtils;
-
-import java.util.Map;
-
-public class LogFormat {
-
- public static Map<String, Object> connTime(Map<String, Object> value) {
- value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time")));
- return value;
- }
-
-
- public static Map<String, Object> sketchTime(Map<String, Object> value) {
- value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time")));
- return value;
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java
new file mode 100644
index 0000000..56989b1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/LogService.java
@@ -0,0 +1,38 @@
+package com.zdjizhi.etl;
+
+import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
+import com.zdjizhi.utils.arangodb.ArangoDBSink;
+import com.zdjizhi.utils.ck.ClickhouseSink;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.util.Map;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+public interface LogService {
+
+ public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
+
+ sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+
+ }
+
+ public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
+ sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new ArangodbBatchIPWindow())
+ .addSink(new ArangoDBSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
new file mode 100644
index 0000000..f413cc3
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -0,0 +1,104 @@
+package com.zdjizhi.etl.connection;
+
+import cn.hutool.core.convert.Convert;
+import com.zdjizhi.etl.LogService;
+import com.zdjizhi.etl.dns.SketchTimeMapFunction;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+
+public class ConnLogService {
+
+ public static void connLogStream(StreamExecutionEnvironment env) throws Exception{
+ //connection
+ DataStream<Map<String, Object>> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
+ //sketch
+ DataStream<Map<String, Object>> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
+
+ //写入CKsink,批量处理
+ LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
+
+ LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
+
+ //transform
+ DataStream<Map<String, Object>> connTransformStream = ConnLogService.getConnTransformStream(connSource);
+
+ //写入ck通联relation表
+ LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
+
+ DataStream<Map<String, Object>> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource);
+
+ //合并通联和通联sketch
+ DataStream<Map<String, Object>> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream);
+
+ //写入arangodb
+ LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
+
+ }
+
+ /**
+ * 通联原始日志数据源消费kafka
+ *
+ * @param source
+ * @return
+ */
+ private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
+
+ String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
+
+ DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
+ .setParallelism(SOURCE_PARALLELISM)
+ .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
+ .setParallelism(SOURCE_PARALLELISM)
+ .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
+ .setParallelism(SOURCE_PARALLELISM)
+ .name(source)
+ .setParallelism(SOURCE_PARALLELISM);
+ return sourceStream;
+ }
+
+ private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
+ DataStream<Map<String, Object>> connTransformStream = connSource
+ .assignTimestampsAndWatermarks(WatermarkStrategy
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
+ .withTimestampAssigner((event, timestamp) -> {
+ return Convert.toLong(event.get("conn_start_time")) * 1000;
+ }))
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .keyBy(new IpKeysSelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .process(new ConnProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM);
+ return connTransformStream;
+ }
+
+ private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
+ DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
+ .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
+ .keyBy(new IpKeysSelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .process(new SketchProcessFunction());
+ return sketchTransformStream;
+ }
+
+ private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
+ DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
+ .keyBy(new IpKeysSelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
+ .process(new Ip2IpGraphProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM);
+ return ip2ipGraph;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
new file mode 100644
index 0000000..5e39df4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
@@ -0,0 +1,76 @@
+package com.zdjizhi.etl.dns;
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.core.util.ObjectUtil;
+import com.zdjizhi.enums.DnsType;
+import com.zdjizhi.etl.LogService;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+public class DnsLogService {
+
+ public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{
+
+ DataStream<Map<String, Object>> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
+
+ //dns 原始日志 ck入库
+ LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
+
+ DataStream<Map<String, Object>> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource);
+
+ //dns 拆分后relation日志 ck入库
+ LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
+
+ //arango 入库,按record_type分组入不同的表
+ DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull)
+ .keyBy(new DnsGraphKeysSelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
+ .process(new DnsGraphProcessFunction())
+ .setParallelism(SINK_PARALLELISM);
+
+ for (DnsType dnsEnum : DnsType.values()) {
+ DataStream<Map<String, Object>> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
+ .setParallelism(SINK_PARALLELISM);
+ LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink());
+ }
+
+ }
+
+ private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{
+
+ DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
+ .setParallelism(SOURCE_PARALLELISM)
+ .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0)
+ .setParallelism(SOURCE_PARALLELISM)
+ .map(new DnsMapFunction())
+ .setParallelism(SOURCE_PARALLELISM)
+ .name(source);
+ return dnsSource;
+ }
+
+ private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception{
+ DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
+ .assignTimestampsAndWatermarks(WatermarkStrategy
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
+ .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .flatMap(new DnsSplitFlatMapFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .keyBy(new DnsGraphKeysSelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .process(new DnsRelationProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM);
+ return dnsTransform;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index ab577ce..f7b6b70 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -1,30 +1,13 @@
package com.zdjizhi.topology;
-import cn.hutool.core.convert.Convert;
-import cn.hutool.core.util.ObjectUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.enums.DnsType;
-import com.zdjizhi.etl.CKBatchWindow;
-import com.zdjizhi.etl.CountTriggerWithTimeout;
-import com.zdjizhi.etl.connection.*;
-import com.zdjizhi.etl.dns.*;
-import com.zdjizhi.utils.arangodb.ArangoDBSink;
-import com.zdjizhi.utils.ck.ClickhouseSink;
-import com.zdjizhi.utils.kafka.KafkaConsumer;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import com.zdjizhi.etl.connection.ConnLogService;
+import com.zdjizhi.etl.dns.DnsLogService;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import java.time.Duration;
-import java.util.Map;
-import java.util.Objects;
-
-import static com.zdjizhi.common.FlowWriteConfig.*;
+import static com.zdjizhi.common.FlowWriteConfig.BUFFER_TIMEOUT;
+import static com.zdjizhi.common.FlowWriteConfig.LOG_TYPE;
public class LogFlowWriteTopology {
private static final Log logger = LogFactory.get();
@@ -32,154 +15,18 @@ public class LogFlowWriteTopology {
public static void main(String[] args) {
try {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//两个输出之间的最大时间 (单位milliseconds)
- env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
-
+ env.setBufferTimeout(BUFFER_TIMEOUT);
//1 connection,2 dns
- if (FlowWriteConfig.LOG_TYPE == 1) {
- //connection
- DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
- .setParallelism(SOURCE_PARALLELISM)
- .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("conn_start_time"))>0)
- .map(new ConnTimeMapFunction())
- .setParallelism(SOURCE_PARALLELISM)
- .name(SOURCE_KAFKA_TOPIC_CONNECTION);
-
- //sketch
- DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
- .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("sketch_start_time"))>0)
- .map(new SketchTimeMapFunction())
- .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
- .name(SOURCE_KAFKA_TOPIC_SKETCH);
-
- //写入CKsink,批量处理
- if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
- connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
-
- sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
-
- }
-
- //transform
- DataStream<Map<String, Object>> connTransformStream = connSource
- .assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
- .withTimestampAssigner((event, timestamp) -> {return Convert.toLong(event.get("conn_start_time")) * 1000;}))
- .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
- .keyBy(new IpKeysSelector())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
- .process(new ConnProcessFunction())
- .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
- .filter(x -> Objects.nonNull(x))
- .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
- connTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
-
- DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
- .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
- .keyBy(new IpKeysSelector())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
- .process(new SketchProcessFunction())
- .filter(Objects::nonNull)
- .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
- //入Arangodb
- DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
- .keyBy(new IpKeysSelector())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
- .process(new Ip2IpGraphProcessFunction())
- .setParallelism(TRANSFORM_PARALLELISM)
- .filter(Objects::nonNull)
- .setParallelism(TRANSFORM_PARALLELISM);
-
- //写入arangodb
- ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new ArangodbBatchIPWindow())
- .addSink(new ArangoDBSink(R_VISIT_IP2IP))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM).name(R_VISIT_IP2IP);
-
- } else if (FlowWriteConfig.LOG_TYPE == 2) {
- DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
- .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
- .filter(x->Objects.nonNull(x) && Convert.toLong(x.get("capture_time"))>0)
- .map(new DnsMapFunction())
- .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
- .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
-
- //dns 原始日志 ck入库
- if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
- dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
- }
-
- DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
- .assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
- .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
- .setParallelism(TRANSFORM_PARALLELISM)
- .flatMap(new DnsSplitFlatMapFunction())
- .setParallelism(TRANSFORM_PARALLELISM)
- .keyBy(new DnsGraphKeysSelector())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
- .process(new DnsRelationProcessFunction())
- .setParallelism(TRANSFORM_PARALLELISM)
- .filter(Objects::nonNull)
- .setParallelism(TRANSFORM_PARALLELISM);
-
- //dns 拆分后relation日志 ck入库
- dnsTransform.filter(Objects::nonNull).windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
- .setParallelism(SINK_PARALLELISM)
- .name("CKSink");
-
- //arango 入库,按record_type分组入不同的表
- DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull).keyBy(new DnsGraphKeysSelector())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
- .process(new DnsGraphProcessFunction())
- .setParallelism(SINK_PARALLELISM);
-
- for (DnsType dnsEnum : DnsType.values()) {
- dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
- .setParallelism(SINK_PARALLELISM)
- .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new ArangodbBatchDnsWindow())
- .addSink(new ArangoDBSink(dnsEnum.getSink()))
- .setParallelism(SINK_PARALLELISM)
- .name("ArangodbSink");
- }
-
+ if (LOG_TYPE == 1) {
+ ConnLogService.connLogStream(env);
+ } else if (LOG_TYPE == 2) {
+ DnsLogService.dnsLogStream(env);
}
-
env.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is : {}", e);
}
-
}
}
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKUtils.java b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java
new file mode 100644
index 0000000..7ff48d6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java
@@ -0,0 +1,54 @@
+package com.zdjizhi.utils.ck;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+public class CKUtils {
+
+ private static final Log log = LogFactory.get();
+
+ private static Connection connection;
+
+ public static Connection getConnection() {
+
+ try {
+ ClickHouseProperties props = new ClickHouseProperties();
+ props.setDatabase(CK_DATABASE);
+ props.setUser(CK_USERNAME);
+ props.setPassword(CK_PIN);
+ props.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
+ props.setSocketTimeout(CK_SOCKET_TIMEOUT);
+ props.setMaxThreads(50);
+
+ BalancedClickhouseDataSource blDataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, props);
+ blDataSource.actualize();
+ blDataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
+
+// HikariConfig conf = new HikariConfig();
+// conf.setDataSource(blDataSource);
+// conf.setMinimumIdle(1);
+// conf.setMaximumPoolSize(20);
+//
+// HikariDataSource hkDs = new HikariDataSource(conf);
+ connection = blDataSource.getConnection();
+ log.debug("get clickhouse connection success");
+ } catch (SQLException e) {
+ log.error("clickhouse connection error ,{}", e);
+ }
+ return connection;
+ }
+
+ public static void close(Connection connection) throws Exception {
+ IoUtil.close(connection);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
index d58190d..75a3d7d 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -1,26 +1,18 @@
package com.zdjizhi.utils.ck;
-import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.IoUtil;
-import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.enums.LogMetadata;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import ru.yandex.clickhouse.BalancedClickhouseDataSource;
-import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static com.zdjizhi.common.FlowWriteConfig.*;
public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>> {
@@ -49,90 +41,49 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
@Override
public void open(Configuration parameters) throws Exception {
- try {
- ClickHouseProperties properties = new ClickHouseProperties();
-
- properties.setDatabase(CK_DATABASE);
- properties.setUser(CK_USERNAME);
- properties.setPassword(CK_PIN);
-// properties.setKeepAliveTimeout(5);
- properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
- properties.setSocketTimeout(CK_SOCKET_TIMEOUT);
-
- BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
- dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
- connection = dataSource.getConnection();
- log.debug("get clickhouse connection success");
- } catch (SQLException e) {
- log.error("clickhouse connection error ,{}", e);
- }
-
+ connection = CKUtils.getConnection();
}
@Override
public void close() throws Exception {
IoUtil.close(preparedStatement);
- IoUtil.close(connection);
+ CKUtils.close(connection);
}
public void executeInsert(List<Map<String, Object>> data, String tableName) {
try {
- connection.setAutoCommit(false);
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ log.debug("开始写入ck数据 :{}", data.size());
- int count = 0;
- for (Map<String, Object> map : data) {
- List<String> keys = new LinkedList<>(map.keySet());
- preparedStatement = connection.prepareStatement(preparedSql(keys, tableName));
-
- List<Object> values = new LinkedList<>(map.values());
- for (int i = 1; i <= values.size(); i++) {
- Object val = values.get(i - 1);
- if (val instanceof Long) {
- preparedStatement.setLong((i), Convert.toLong(val));
- } else if (val instanceof Integer) {
- preparedStatement.setLong((i), Convert.toLong(val));
- } else if (val instanceof Boolean) {
- preparedStatement.setInt((i), Boolean.valueOf(StrUtil.toString(val)) ? 1 : 0);
- } else {
- preparedStatement.setString((i), StrUtil.toString(val));
- }
- }
- preparedStatement.addBatch();
- count++;
- //1w提交一次
- if (count % CK_BATCH == 0) {
- preparedStatement.executeBatch();
- connection.commit();
- preparedStatement.clearBatch();
- count = 0;
- }
- }
- if (count > 0) {
- preparedStatement.executeBatch();
- connection.commit();
- }
+ boolean autoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ batch(data, tableName);
+ preparedStatement.executeBatch();
+ connection.commit();
+ connection.setAutoCommit(autoCommit);
+ stopWatch.stop();
+ log.debug("总共花费时间 {}", stopWatch.getTime());
} catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex);
}
}
+ private void batch(List<Map<String, Object>> data, String tableName) throws SQLException {
+ String[] logFields = LogMetadata.getLogFields(tableName);
+ String sql = LogMetadata.preparedSql(tableName);
+ log.debug(sql);
+ preparedStatement = connection.prepareStatement(sql);
- public static String preparedSql(List<String> fields, String tableName) {
+ for (Map<String, Object> map : data) {
+ for (int i = 0; i < logFields.length; i++) {
+ preparedStatement.setObject(i + 1, map.get(logFields[i]));
+ }
+ preparedStatement.addBatch();
+ }
- String placeholders = fields.stream()
- .filter(Objects::nonNull)
- .map(f -> "?")
- .collect(Collectors.joining(", "));
- String columns = fields.stream()
- .filter(Objects::nonNull)
- .collect(Collectors.joining(", "));
- String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
- "(", columns, ") VALUES (", placeholders, ")");
- log.debug(sql);
- return sql;
}
-
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
deleted file mode 100644
index de507ad..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class FilterNullFunction implements FilterFunction<String> {
- @Override
- public boolean filter(String message) {
- return StringUtil.isNotBlank(message);
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
deleted file mode 100644
index 810e4c8..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-
-import com.zdjizhi.utils.general.TransFormMap;
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.Map;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class MapCompletedFunction implements MapFunction<Map<String, Object>, String> {
-
- @Override
- @SuppressWarnings("unchecked")
- public String map(Map<String, Object> logs) {
- return TransFormMap.dealCommonMessage(logs);
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
deleted file mode 100644
index ccef850..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import com.zdjizhi.utils.general.TransFormTypeMap;
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.Map;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, String> {
-
- @Override
- @SuppressWarnings("unchecked")
- public String map(Map<String, Object> logs) {
-
- return TransFormTypeMap.dealCommonMessage(logs);
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/general/CityHash.java b/src/main/java/com/zdjizhi/utils/general/CityHash.java
deleted file mode 100644
index 5de4785..0000000
--- a/src/main/java/com/zdjizhi/utils/general/CityHash.java
+++ /dev/null
@@ -1,180 +0,0 @@
-package com.zdjizhi.utils.general;
-
-
-
-
-/**
- * CityHash64算法对logid进行散列计算
- * 版本规划暂不实现-TSG22.01
- *
- * @author qidaijie
- */
-@Deprecated
-public class CityHash {
-
- private static final long k0 = 0xc3a5c85c97cb3127L;
- private static final long k1 = 0xb492b66fbe98f273L;
- private static final long k2 = 0x9ae16a3b2f90404fL;
- private static final long k3 = 0xc949d7c7509e6557L;
- private static final long k5 = 0x9ddfea08eb382d69L;
-
- private CityHash() {}
-
- public static long CityHash64(byte[] s, int index, int len) {
- if (len <= 16 ) {
- return HashLen0to16(s, index, len);
- } else if (len > 16 && len <= 32) {
- return HashLen17to32(s, index, len);
- } else if (len > 32 && len <= 64) {
- return HashLen33to64(s, index, len);
- } else {
- long x = Fetch64(s, index);
- long y = Fetch64(s, index + len - 16) ^ k1;
- long z = Fetch64(s, index + len - 56) ^ k0;
- long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y);
- long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0);
- z += ShiftMix(v[1]) * k1;
- x = Rotate(z + x, 39) * k1;
- y = Rotate(y, 33) * k1;
-
- len = (len - 1) & ~63;
- do {
- x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1;
- y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1;
- x ^= w[1];
- y ^= v[0];
- z = Rotate(z ^ w[0], 33);
- v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]);
- w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y);
- long t = z;
- z = x;
- x = t;
- index += 64;
- len -= 64;
- } while (len != 0);
- return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z,
- HashLen16(v[1], w[1]) + x);
- }
- }
-
- private static long HashLen0to16(byte[] s, int index, int len) {
- if (len > 8) {
- long a = Fetch64(s, index);
- long b = Fetch64(s, index + len - 8);
- return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b;
- }
- if (len >= 4) {
- long a = Fetch32(s, index);
- return HashLen16(len + (a << 3), Fetch32(s, index + len - 4));
- }
- if (len > 0) {
- byte a = s[index];
- byte b = s[index + len >>> 1];
- byte c = s[index + len - 1];
- int y = (a) + (b << 8);
- int z = len + (c << 2);
- return ShiftMix(y * k2 ^ z * k3) * k2;
- }
- return k2;
- }
-
- private static long HashLen17to32(byte[] s, int index, int len) {
- long a = Fetch64(s, index) * k1;
- long b = Fetch64(s, index + 8);
- long c = Fetch64(s, index + len - 8) * k2;
- long d = Fetch64(s, index + len - 16) * k0;
- return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d,
- a + Rotate(b ^ k3, 20) - c + len);
- }
-
- private static long HashLen33to64(byte[] s, int index, int len) {
- long z = Fetch64(s, index + 24);
- long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0;
- long b = Rotate(a + z, 52);
- long c = Rotate(a, 37);
- a += Fetch64(s, index + 8);
- c += Rotate(a, 7);
- a += Fetch64(s, index + 16);
- long vf = a + z;
- long vs = b + Rotate(a, 31) + c;
- a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32);
- z = Fetch64(s, index + len - 8);
- b = Rotate(a + z, 52);
- c = Rotate(a, 37);
- a += Fetch64(s, index + len - 24);
- c += Rotate(a, 7);
- a += Fetch64(s, index + len - 16);
- long wf = a + z;
- long ws = b + Rotate(a, 31) + c;
- long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0);
- return ShiftMix(r * k0 + vs) * k2;
- }
-
- private static long Fetch64(byte[] p, int index) {
- return toLongLE(p,index);
- }
-
- private static long Fetch32(byte[] p, int index) {
- return toIntLE(p,index);
- }
- private static long[] WeakHashLen32WithSeeds(
- long w, long x, long y, long z, long a, long b) {
- a += w;
- b = Rotate(b + a + z, 21);
- long c = a;
- a += x;
- a += y;
- b += Rotate(a, 44);
- return new long[]{a + z, b + c};
- }
-
- private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) {
- return WeakHashLen32WithSeeds(Fetch64(s, index),
- Fetch64(s, index + 8),
- Fetch64(s, index + 16),
- Fetch64(s, index + 24),
- a,
- b);
- }
-
- private static long toLongLE(byte[] b, int i) {
- return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
- }
-
- private static long toIntLE(byte[] b, int i) {
- return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
- }
- private static long RotateByAtLeastOne(long val, int shift) {
- return (val >>> shift) | (val << (64 - shift));
- }
-
- private static long ShiftMix(long val) {
- return val ^ (val >>> 47);
- }
-
- private static long Uint128Low64(long[] x) {
- return x[0];
- }
-
- private static long Rotate(long val, int shift) {
- return shift == 0 ? val : (val >>> shift) | (val << (64 - shift));
- }
-
- private static long Uint128High64(long[] x) {
- return x[1];
- }
-
- private static long Hash128to64(long[] x) {
- long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5;
- a ^= (a >>> 47);
- long b = (Uint128High64(x) ^ a) * k5;
- b ^= (b >>> 47);
- b *= k5;
- return b;
- }
-
- private static long HashLen16(long u, long v) {
- return Hash128to64(new long[]{u,v});
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
deleted file mode 100644
index 35312fc..0000000
--- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package com.zdjizhi.utils.general;
-
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.json.JsonParseUtil;
-
-import java.util.Map;
-
-
-/**
- * 描述:转换或补全工具类
- *
- * @author qidaijie
- */
-public class TransFormMap {
- private static final Log logger = LogFactory.get();
-
- /**
- * 解析日志,并补全
- *
- * @param jsonMap kafka Topic消费原始日志并解析
- * @return 补全后的日志
- */
- @SuppressWarnings("unchecked")
- public static String dealCommonMessage(Map<String, Object> jsonMap) {
- try {
- JsonParseUtil.dropJsonField(jsonMap);
- for (String[] strings : JsonParseUtil.getJobList()) {
- //用到的参数的值
- Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //需要补全的字段的key
- String appendToKeyName = strings[1];
- //需要补全的字段的值
- Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
- //匹配操作函数的字段
- String function = strings[2];
- //额外的参数的值
- String param = strings[3];
- functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
- }
- return JsonMapper.toJsonString(jsonMap);
- } catch (RuntimeException e) {
- logger.error("TransForm logs failed,The exception is :" + e);
- return null;
- }
- }
-
-
- /**
- * 根据schema描述对应字段进行操作的 函数集合
- *
- * @param function 匹配操作函数的字段
- * @param jsonMap 原始日志解析map
- * @param appendToKeyName 需要补全的字段的key
- * @param appendTo 需要补全的字段的值
- * @param logValue 用到的参数的值
- * @param param 额外的参数的值
- */
- private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) {
- switch (function) {
- case "current_timestamp":
- if (!(appendTo instanceof Long)) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
- }
- break;
-// case "snowflake_id":
-// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
-// break;
- case "geo_ip_detail":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
- }
- break;
- case "geo_asn":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
- }
- break;
- case "geo_ip_country":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
- }
- break;
- case "set_value":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
- }
- break;
- case "get_value":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
- }
- break;
- case "if":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
- }
- break;
- case "sub_domain":
- if (appendTo == null && logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
- }
- break;
- case "decode_of_base64":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
- }
- break;
- case "flattenSpec":
- if (logValue != null && param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
- }
- break;
- default:
- }
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
deleted file mode 100644
index f13894e..0000000
--- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.zdjizhi.utils.general;
-
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.json.JsonParseUtil;
-
-import java.util.Map;
-
-
-/**
- * 描述:转换或补全工具类
- *
- * @author qidaijie
- */
-public class TransFormTypeMap {
- private static final Log logger = LogFactory.get();
-
- /**
- * 解析日志,并补全
- *
- * @param message kafka Topic原始日志
- * @return 补全后的日志
- */
- @SuppressWarnings("unchecked")
- public static String dealCommonMessage(Map<String, Object> message) {
- try {
- Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message);
- for (String[] strings : JsonParseUtil.getJobList()) {
- //用到的参数的值
- Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //需要补全的字段的key
- String appendToKeyName = strings[1];
- //需要补全的字段的值
- Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
- //匹配操作函数的字段
- String function = strings[2];
- //额外的参数的值
- String param = strings[3];
- functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
- }
- return JsonMapper.toJsonString(jsonMap);
- } catch (RuntimeException e) {
- logger.error("TransForm logs failed,The exception is :" + e);
- return null;
- }
- }
-
-
- /**
- * 根据schema描述对应字段进行操作的 函数集合
- *
- * @param function 匹配操作函数的字段
- * @param jsonMap 原始日志解析map
- * @param appendToKeyName 需要补全的字段的key
- * @param appendToKeyValue 需要补全的字段的值
- * @param logValue 用到的参数的值
- * @param param 额外的参数的值
- */
- private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
- switch (function) {
- case "current_timestamp":
- if (!(appendToKeyValue instanceof Long)) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
- }
- break;
-// case "snowflake_id":
-// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
-// //版本规划暂不实现TSG-22.01
-//// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
-// break;
- case "geo_ip_detail":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
- }
- break;
- case "geo_asn":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
- }
- break;
- case "geo_ip_country":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
- }
- break;
- case "set_value":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
- }
- break;
- case "get_value":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
- }
- break;
- case "if":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
- }
- break;
- case "sub_domain":
- if (appendToKeyValue == null && logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
- }
- break;
- case "decode_of_base64":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
- }
- break;
- case "flattenSpec":
- if (logValue != null && param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
- }
- break;
- default:
- }
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
deleted file mode 100644
index 84fe5cc..0000000
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ /dev/null
@@ -1,230 +0,0 @@
-package com.zdjizhi.utils.general;
-
-import cn.hutool.core.codec.Base64;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.jayway.jsonpath.InvalidPathException;
-import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.FormatUtils;
-import com.zdjizhi.utils.IpLookupV2;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * @author qidaijie
- */
-class TransFunction {
- private static final Log logger = LogFactory.get();
-
- /**
- * 校验数字正则
- */
- private static final Pattern PATTERN = Pattern.compile("[0-9]*");
-
- /**
- * IP定位库工具类
- */
- private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
- .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
- .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
- .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
- .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
- .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
- .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
- .build();
-
- /**
- * 生成当前时间戳的操作
- */
- static long getCurrentTime() {
-
- return System.currentTimeMillis() / 1000;
- }
-
- /**
- * CityHash64算法
- * 版本规划暂不实现-TSG22.01
- *
- * @param data 原始数据
- * @return 散列结果
- */
- @Deprecated
- static BigInteger getDecimalHash(long data) {
- byte[] dataBytes = String.valueOf(data).getBytes();
- long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
- String decimalValue = Long.toUnsignedString(hashValue, 10);
- return new BigInteger(decimalValue);
- }
-
- /**
- * 根据clientIp获取location信息
- *
- * @param ip client IP
- * @return ip地址详细信息
- */
- static String getGeoIpDetail(String ip) {
- try {
- return ipLookup.cityLookupDetail(ip);
- } catch (NullPointerException npe) {
- logger.error("The MMDB file is not loaded or IP is null! " + npe);
- return "";
- } catch (RuntimeException e) {
- logger.error("Get clientIP location error! " + e);
- return "";
- }
- }
-
- /**
- * 根据ip获取asn信息
- *
- * @param ip client/server IP
- * @return ASN
- */
- static String getGeoAsn(String ip) {
- try {
- return ipLookup.asnLookup(ip);
- } catch (NullPointerException npe) {
- logger.error("The MMDB file is not loaded or IP is null! " + npe);
- return "";
- } catch (RuntimeException e) {
- logger.error("Get IP ASN error! " + e);
- return "";
- }
- }
-
- /**
- * 根据ip获取country信息
- *
- * @param ip server IP
- * @return 国家
- */
- static String getGeoIpCountry(String ip) {
- try {
- return ipLookup.countryLookup(ip);
- } catch (NullPointerException npe) {
- logger.error("The MMDB file is not loaded or IP is null! " + npe);
- return "";
- } catch (RuntimeException e) {
- logger.error("Get ServerIP location error! " + e);
- return "";
- }
- }
-
-
- /**
- * radius借助HBase补齐
- *
- * @param ip client IP
- * @return account
- */
-
-
- /**
- * 解析顶级域名
- *
- * @param domain 初始域名
- * @return 顶级域名
- */
- static String getTopDomain(String domain) {
- try {
- return FormatUtils.getTopPrivateDomain(domain);
- } catch (StringIndexOutOfBoundsException outException) {
- logger.error("Parse top-level domain exceptions, exception domain names:" + domain);
- return "";
- }
- }
-
- /**
- * 根据编码解码base64
- *
- * @param message base64
- * @param charset 编码
- * @return 解码字符串
- */
- static String decodeBase64(String message, Object charset) {
- String result = "";
- try {
- if (StringUtil.isNotBlank(message)) {
- if (charset == null) {
- result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
- } else {
- result = Base64.decodeStr(message, charset.toString());
- }
- }
- } catch (RuntimeException e) {
- logger.error("Resolve Base64 exception, exception information:" + e);
- }
- return result;
- }
-
- /**
- * 根据表达式解析json
- *
- * @param message json
- * @param expr 解析表达式
- * @return 解析结果
- */
- static String flattenSpec(String message, String expr) {
- String flattenResult = "";
- try {
- if (StringUtil.isNotBlank(expr)) {
- ArrayList<String> read = JsonPath.parse(message).read(expr);
- if (read.size() >= 1) {
- flattenResult = read.get(0);
- }
- }
- } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
- logger.error("The device label resolution exception or [expr] analytic expression error" + e);
- }
- return flattenResult;
- }
-
- /**
- * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
- *
- * @param jsonMap 内存实体类
- * @param param 字段名/普通字符串
- * @return JSON.Value or String
- */
- static Object isJsonValue(Map<String, Object> jsonMap, String param) {
- if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
- return JsonParseUtil.getValue(jsonMap, param.substring(2));
- } else {
- return param;
- }
- }
-
- /**
- * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
- *
- * @param jsonMap 内存实体类
- * @param ifParam 字段名/普通字符串
- * @return resultA or resultB or null
- */
- static Object condition(Map<String, Object> jsonMap, String ifParam) {
- Object result = null;
- try {
- String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
- if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
- String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
- Object direction = isJsonValue(jsonMap, norms[0]);
- Object resultA = isJsonValue(jsonMap, split[1]);
- Object resultB = isJsonValue(jsonMap, split[2]);
- if (direction instanceof Number) {
- result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
- } else if (direction instanceof String) {
- result = direction.equals(norms[1]) ? resultA : resultB;
- }
- }
- } catch (RuntimeException e) {
- logger.error("IF 函数执行异常,异常信息:" + e);
- }
- return result;
- }
-}