summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-07 14:07:27 +0800
committerzhanghongqing <[email protected]>2022-07-07 14:07:27 +0800
commitf552793230d0428cbc63714ee296c1ce4971a31b (patch)
tree1bf3a26d957710b261f61a65559d393f55bf9382 /src/main
Initial commit
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/zdjizhi/common/DnsKeysSelector.java19
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java138
-rw-r--r--src/main/java/com/zdjizhi/common/KeysSelector.java23
-rw-r--r--src/main/java/com/zdjizhi/enums/DnsType.java16
-rw-r--r--src/main/java/com/zdjizhi/enums/LogMetadata.java46
-rw-r--r--src/main/java/com/zdjizhi/etl/ConnProcessFunction.java83
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java120
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsProcessFunction.java101
-rw-r--r--src/main/java/com/zdjizhi/etl/SketchProcessFunction.java83
-rw-r--r--src/main/java/com/zdjizhi/pojo/DbLogEntity.java47
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java117
-rw-r--r--src/main/java/com/zdjizhi/utils/app/AppUtils.java124
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java158
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java132
-rw-r--r--src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java18
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/utils/general/CityHash.java180
-rw-r--r--src/main/java/com/zdjizhi/utils/general/SnowflakeId.java213
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormMap.java125
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java127
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFunction.java248
-rw-r--r--src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java77
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java338
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java121
-rw-r--r--src/main/java/com/zdjizhi/utils/json/TypeUtils.java171
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java75
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java50
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java50
-rw-r--r--src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java75
-rw-r--r--src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java190
-rw-r--r--src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java140
-rw-r--r--src/main/log4j.properties25
-rw-r--r--src/main/logback.xml42
36 files changed, 3583 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/DnsKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java
new file mode 100644
index 0000000..101597c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.common;
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class DnsKeysSelector implements KeySelector<Map<String, Object>, String> {
+
+ @Override
+ public String getKey(Map<String, Object> log) throws Exception {
+
+ return String.valueOf(log.get("dns_qname"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
new file mode 100644
index 0000000..a84ebae
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -0,0 +1,138 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.FlowWriteConfigurations;
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
+
+/**
+ * @author Administrator
+ */
+public class FlowWriteConfig {
+
+
+ private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
+
+ static {
+ encryptor.setPassword("galaxy");
+ }
+
+ public static final int IF_PARAM_LENGTH = 3;
+ /**
+ * 有此标识的字段为失效字段,不计入最终日志字段
+ */
+ public static final String VISIBILITY = "disabled";
+ /**
+ * 默认的切分符号
+ */
+ public static final String FORMAT_SPLITTER = ",";
+ /**
+ * 标识字段为日志字段还是schema指定字段
+ */
+ public static final String IS_JSON_KEY_TAG = "$.";
+ /**
+ * if函数连接分隔符
+ */
+ public static final String IF_CONDITION_SPLITTER = "=";
+ /**
+ * 默认的字符串解析编码
+ */
+ public static final String ENCODING = "UTF8";
+
+ /**
+ * Nacos
+ */
+ public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
+ public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
+ public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
+ public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
+ public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
+ public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
+ public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
+
+ /**
+ * System config
+ */
+ public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
+ public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
+ public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
+ public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
+ public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
+ public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
+ public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
+ public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
+
+ /**
+ * HBase
+ */
+ public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
+ public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
+
+ /**
+ * kafka common
+ */
+ public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
+ public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
+
+ /**
+ * kafka source config
+ */
+ public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
+ public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
+ public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
+ /**
+ * kafka sink config
+ */
+ public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic");
+ public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+
+ /**
+ * connection kafka
+ */
+ public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
+
+ /**
+ * http
+ */
+ public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
+
+ /**
+ * common config
+ */
+ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
+ public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
+ public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
+
+
+ /*
+ * ck
+ * */
+ public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0,"ck.hosts");
+ public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0,"ck.username");
+ public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0,"ck.pin");
+ public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0,"ck.database");
+
+ public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0,"flink.watermark.max.orderness");
+ public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0,"log.aggregate.duration");
+ public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");;
+ public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection");
+ public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch");
+
+ //sink.ck.table
+ public static final String SINK_CK_TABLE_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.connection");
+ public static final String SINK_CK_TABLE_SKETCH = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.sketch");
+ public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns");
+ public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
+ public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/KeysSelector.java b/src/main/java/com/zdjizhi/common/KeysSelector.java
new file mode 100644
index 0000000..a4d616c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/KeysSelector.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.common;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class KeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
+
+ @Override
+ public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
+
+ return Tuple2.of(
+ String.valueOf(log.get("src_ip")),
+ String.valueOf(log.get("dst_ip")));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/enums/DnsType.java b/src/main/java/com/zdjizhi/enums/DnsType.java
new file mode 100644
index 0000000..bc5805e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/enums/DnsType.java
@@ -0,0 +1,16 @@
+package com.zdjizhi.enums;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-06
+ **/
+public enum DnsType {
+
+ /*
+ *dns 类型
+ * */
+
+ a, aaaa, cname, mx, ns;
+
+}
diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java
new file mode 100644
index 0000000..d3a7f49
--- /dev/null
+++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java
@@ -0,0 +1,46 @@
+package com.zdjizhi.enums;
+
+import cn.hutool.core.util.EnumUtil;
+
+/**
+ * @description: \
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public enum LogMetadata {
+
+ /*
+ * 日志名称topic,表名
+ * */
+
+ CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log"),
+ CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log"),
+ DNS_RECORD_LOG("dns_record_log", "dns_record_log"),
+ ;
+
+ private String source;
+ private String sink;
+
+ LogMetadata() {
+ }
+
+ LogMetadata(String source, String sink) {
+ this.source = source;
+ this.sink = sink;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public String getSink() {
+ return sink;
+ }
+
+ public static String getLogSink(String source) {
+ LogMetadata logMetadata = EnumUtil.fromString(LogMetadata.class, source);
+ return logMetadata.getSink();
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java
new file mode 100644
index 0000000..fa2b5bb
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java
@@ -0,0 +1,83 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.convert.Convert;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
+
+
+/**
+ * @author 94976
+ */
+public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
+
+ private static final Logger logger = LoggerFactory.getLogger(ConnProcessFunction.class);
+
+ @Override
+ public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
+ Map<String, Object> middleResult = getMiddleResult(keys, elements);
+ try {
+ if (middleResult != null) {
+ out.collect(middleResult);
+ logger.debug("获取中间聚合结果:{}", middleResult.toString());
+ }
+ } catch (Exception e) {
+ logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
+ }
+ }
+
+ private Map<String, Object> getMiddleResult(Tuple2<String, String> keys, Iterable<Map<String, Object>> elements) {
+
+ Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
+ try {
+ if (values != null) {
+ Map<String, Object> result = new LinkedHashMap<>();
+ result.put("start_time", values.f0);
+ result.put("end_time", values.f1);
+ result.put("src_ip", keys.f0);
+ result.put("dst_ip", keys.f1);
+ result.put("sessions", values.f2);
+ result.put("packets", values.f3);
+ result.put("bytes", values.f4);
+ return result;
+ }
+
+ } catch (Exception e) {
+ logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
+ }
+ return null;
+ }
+
+ private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
+ long sessions = 0;
+ long packets = 0;
+ long bytes = 0;
+ long startTime = System.currentTimeMillis() / 1000;
+ long endTime = System.currentTimeMillis() / 1000;
+ try {
+ for (Map<String, Object> newSketchLog : elements) {
+ sessions++;
+ packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts"));
+ bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes"));
+ long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
+ startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
+ endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+
+ }
+ return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
+ } catch (Exception e) {
+ logger.error("聚合中间结果集失败 {}", e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java
new file mode 100644
index 0000000..db69974
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java
@@ -0,0 +1,120 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.core.util.StrUtil;
+import com.zdjizhi.enums.DnsType;
+import com.zdjizhi.pojo.DbLogEntity;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
+
+
+/**
+ * @author 94976
+ */
+public class DnsFlatMapFunction implements FlatMapFunction<DbLogEntity, DbLogEntity> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsFlatMapFunction.class);
+
+ public void process(Iterable<DbLogEntity> elements, Collector<List<DbLogEntity>> out) {
+ List<DbLogEntity> middleResult = getMiddleResult(elements);
+ try {
+ if (middleResult != null) {
+ out.collect(middleResult);
+ logger.debug("获取中间聚合结果:{}", middleResult.toString());
+ }
+ } catch (Exception e) {
+ logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
+ }
+ }
+
+ /**
+ * 拆分dns_record
+ * 五种:a/aaaa/cname/mx/ns
+ * @param elements
+ * @return
+ */
+ private List<DbLogEntity> getMiddleResult(Iterable<DbLogEntity> elements) {
+ long startTime = System.currentTimeMillis() / 1000;
+ long endTime = System.currentTimeMillis() / 1000;
+ String tableName = "";
+ String dnsQname = "";
+ try {
+ Map<String, Long> distinctA = new HashMap<>();
+ Map<String, Long> distinctAAAA = new HashMap<>();
+ Map<String, Long> distinctCname = new HashMap<>();
+ Map<String, Long> distinctNs = new HashMap<>();
+ Map<String, Long> distinctMx = new HashMap<>();
+ for (DbLogEntity log : elements) {
+ tableName = log.getTableName();
+ List<String> dnsA = splitDns(log, "dns_a");
+ List<String> dnsAAAA = splitDns(log, "dns_aaaa");
+ List<String> dnsCname = splitDns(log, "dns_cname");
+ List<String> dnsNs = splitDns(log, "dns_ns");
+ List<String> dnsMx = splitDns(log, "dns_mx");
+
+ dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum));
+ dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum));
+ dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum));
+ dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum));
+ dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum));
+
+ long connStartTimetime = Convert.toLong(log.getData().get("capure_time_s"));
+ startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
+ endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ dnsQname = StrUtil.toString(log.getData().get("dns_qname"));
+ }
+ DbLogEntity dbLogEntity = new DbLogEntity();
+ dbLogEntity.setTableName(tableName);
+ List<DbLogEntity> result = new ArrayList<>();
+ result.addAll(getNewDns(startTime, endTime, dnsQname, distinctA, DnsType.a.toString(), dbLogEntity));
+ result.addAll(getNewDns(startTime, endTime, dnsQname, distinctAAAA, DnsType.aaaa.toString(), dbLogEntity));
+ result.addAll(getNewDns(startTime, endTime, dnsQname, distinctCname, DnsType.cname.toString(), dbLogEntity));
+ result.addAll(getNewDns(startTime, endTime, dnsQname, distinctNs, DnsType.ns.toString(), dbLogEntity));
+ result.addAll(getNewDns(startTime, endTime, dnsQname, distinctMx, DnsType.mx.toString(), dbLogEntity));
+ return result;
+
+ } catch (Exception e) {
+ logger.error("聚合中间结果集失败 {}", e);
+ }
+ return null;
+ }
+
+
+ private static List<String> splitDns(DbLogEntity dbLogEntity, String key) {
+
+ return StrUtil.split(StrUtil.toString(dbLogEntity.getData().get(key)), ",");
+ }
+
+ private List<DbLogEntity> getNewDns(long startTime, long endTime, String dnsQname, Map<String, Long> distinctMap, String type, DbLogEntity dbLogEntity) {
+ List<DbLogEntity> newList = new ArrayList<>();
+ for (Map.Entry<String, Long> dns : distinctMap.entrySet()) {
+ Map<String, Object> newDns = new HashMap<>();
+ newDns.put("start_time", startTime);
+ newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
+ newDns.put("record_type", type);
+ newDns.put("qname", dnsQname);
+ newDns.put("record", dns.getKey());
+ newDns.put("sessions", dns.getValue());
+ dbLogEntity.setData(newDns);
+ newList.add(dbLogEntity);
+ }
+ return newList;
+ }
+
+ @Override
+ public void flatMap(DbLogEntity dbLogEntity, Collector<DbLogEntity> collector) throws Exception {
+
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
new file mode 100644
index 0000000..46d0814
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
@@ -0,0 +1,101 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.core.util.StrUtil;
+import com.zdjizhi.enums.DnsType;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
+
+
+/**
+ * @author 94976
+ */
+public class DnsProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, String, TimeWindow> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsProcessFunction.class);
+
+ @Override
+ public void process(String keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
+
+ try {
+ getMiddleResult(out, elements);
+ } catch (Exception e) {
+ logger.error("获取中间聚合结果失败,middleResult: {}", e);
+ }
+ }
+
+ /**
+ * 拆分dns_record
+ * 五种:a/aaaa/cname/mx/ns
+ *
+ * @param elements
+ * @return
+ */
+ private void getMiddleResult(Collector<Map<String, Object>> out, Iterable<Map<String, Object>> elements) {
+ long startTime = System.currentTimeMillis() / 1000;
+ long endTime = System.currentTimeMillis() / 1000;
+ String dnsQname = "";
+ try {
+ Map<String, Long> distinctA = new HashMap<>();
+ Map<String, Long> distinctAAAA = new HashMap<>();
+ Map<String, Long> distinctCname = new HashMap<>();
+ Map<String, Long> distinctNs = new HashMap<>();
+ Map<String, Long> distinctMx = new HashMap<>();
+ for (Map<String, Object> log : elements) {
+ List<String> dnsA = splitDns(log, "dns_a");
+ List<String> dnsAAAA = splitDns(log, "dns_aaaa");
+ List<String> dnsCname = splitDns(log, "dns_cname");
+ List<String> dnsNs = splitDns(log, "dns_ns");
+ List<String> dnsMx = splitDns(log, "dns_mx");
+
+ dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum));
+ dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum));
+ dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum));
+ dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum));
+ dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum));
+
+ long connStartTimetime = Convert.toLong(log.get("capure_time_s"));
+ startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
+ endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ dnsQname = StrUtil.toString(log.get("dns_qname"));
+ }
+ getNewDns(startTime, endTime, dnsQname, distinctA, DnsType.a.toString(), out);
+ getNewDns(startTime, endTime, dnsQname, distinctAAAA, DnsType.aaaa.toString(), out);
+ getNewDns(startTime, endTime, dnsQname, distinctCname, DnsType.cname.toString(), out);
+ getNewDns(startTime, endTime, dnsQname, distinctNs, DnsType.ns.toString(), out);
+ getNewDns(startTime, endTime, dnsQname, distinctMx, DnsType.mx.toString(), out);
+
+ } catch (Exception e) {
+ logger.error("聚合中间结果集失败 {}", e);
+ }
+ }
+
+
+ private static List<String> splitDns(Map<String, Object> log, String key) {
+
+ return StrUtil.split(StrUtil.toString(log.get(key)), ",");
+ }
+
+ private void getNewDns(long startTime, long endTime, String dnsQname, Map<String, Long> distinctMap, String type, Collector<Map<String, Object>> out) {
+ for (Map.Entry<String, Long> dns : distinctMap.entrySet()) {
+ Map<String, Object> newDns = new HashMap<>();
+ newDns.put("start_time", startTime);
+ newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
+ newDns.put("record_type", type);
+ newDns.put("qname", dnsQname);
+ newDns.put("record", dns.getKey());
+ newDns.put("sessions", dns.getValue());
+ out.collect(newDns);
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java
new file mode 100644
index 0000000..98a2fe5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java
@@ -0,0 +1,83 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.convert.Convert;
+import com.zdjizhi.enums.LogMetadata;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
+
+
+/**
+ * @author 94976
+ */
+public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
+
+ private static final Logger logger = LoggerFactory.getLogger(SketchProcessFunction.class);
+
+ @Override
+ public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
+ Map<String, Object> middleResult = getMiddleResult(keys, elements);
+ try {
+ if (middleResult != null) {
+ out.collect(middleResult);
+ logger.debug("获取中间聚合结果:{}", middleResult.toString());
+ }
+ } catch (Exception e) {
+ logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
+ }
+ }
+
+ private Map<String, Object> getMiddleResult(Tuple2<String, String> keys, Iterable<Map<String, Object>> elements) {
+
+ Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
+ try {
+ if (values != null) {
+ Map<String, Object> result = new LinkedHashMap<>();
+ result.put("start_time", values.f0);
+ result.put("end_time", values.f1);
+ result.put("src_ip", keys.f0);
+ result.put("dst_ip", keys.f1);
+ result.put("sessions", values.f2);
+ result.put("packets", values.f3);
+ result.put("bytes", values.f4);
+ return result;
+ }
+
+ } catch (Exception e) {
+ logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
+ }
+ return null;
+ }
+
+ private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
+ long sessions = 0;
+ long packets = 0;
+ long bytes = 0;
+ long startTime = System.currentTimeMillis() / 1000;
+ long endTime = System.currentTimeMillis() / 1000;
+ try {
+ for (Map<String, Object> newSketchLog : elements) {
+ sessions += Convert.toLong(newSketchLog.get("sketch_sessions"));
+ packets += Convert.toLong(newSketchLog.get("sketch_packets"));
+ bytes += Convert.toLong(newSketchLog.get("sketch_bytes"));
+ long connStartTimetime = Convert.toLong(newSketchLog.get("sketch_start_time"));
+ startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
+ endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ }
+ return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
+ } catch (Exception e) {
+ logger.error("聚合中间结果集失败 {}", e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/pojo/DbLogEntity.java b/src/main/java/com/zdjizhi/pojo/DbLogEntity.java
new file mode 100644
index 0000000..b89f1db
--- /dev/null
+++ b/src/main/java/com/zdjizhi/pojo/DbLogEntity.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.pojo;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class DbLogEntity implements Serializable {
+
+ private String tableName;
+ private Map<String, Object> data;
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public Map<String, Object> getData() {
+ return data;
+ }
+
+ public void setData(Map<String, Object> data) {
+ this.data = data;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DbLogEntity that = (DbLogEntity) o;
+ return Objects.equals(tableName, that.tableName) &&
+ Objects.equals(data, that.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, data);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
new file mode 100644
index 0000000..cfbc18b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -0,0 +1,117 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.DnsKeysSelector;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.common.KeysSelector;
+import com.zdjizhi.etl.ConnProcessFunction;
+import com.zdjizhi.etl.DnsProcessFunction;
+import com.zdjizhi.etl.SketchProcessFunction;
+import com.zdjizhi.utils.ck.ClickhouseSink;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+public class LogFlowWriteTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ //两个输出之间的最大时间 (单位milliseconds)
+ env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
+
+ //1connection,2dns
+ if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
+ //connection
+ DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
+ .filter(x -> Objects.nonNull(x))
+ .setParallelism(SOURCE_PARALLELISM)
+ .name(SOURCE_KAFKA_TOPIC_CONNECTION);
+
+ //sketch
+ DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
+ .filter(x -> Objects.nonNull(x))
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
+ .name(SOURCE_KAFKA_TOPIC_SKETCH);
+
+ //transform
+ DataStream<Map<String, Object>> connTransformStream = connSource
+ .assignTimestampsAndWatermarks(WatermarkStrategy
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
+ .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
+ .keyBy(new KeysSelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
+ .process(new ConnProcessFunction())
+ .filter(x -> Objects.nonNull(x))
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+ DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
+ .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
+ .keyBy(new KeysSelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
+ .process(new SketchProcessFunction())
+ .filter(x -> Objects.nonNull(x))
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+ //写入CKsink
+ connSource.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
+ sketchSource.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
+ connTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+ sketchTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+
+ } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
+
+ DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
+ .filter(x -> Objects.nonNull(x))
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
+ .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
+
+ DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS))
+ .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capure_time_s")) * 1000))
+ .keyBy(new DnsKeysSelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .process(new DnsProcessFunction())
+ .filter(x -> Objects.nonNull(x))
+ .setParallelism(TRANSFORM_PARALLELISM);
+
+ //过滤空数据不发送到Kafka内
+ dnsSource.filter(x -> Objects.nonNull(x))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("FilterOriginalData")
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
+
+ dnsTransform.filter(x -> Objects.nonNull(x))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("FilterOriginalData")
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
+ .setParallelism(SINK_PARALLELISM)
+ .name("CKSink");
+ }
+
+
+ env.execute(args[0]);
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is : {}", e);
+ e.printStackTrace();
+ }
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
new file mode 100644
index 0000000..1425ce9
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
@@ -0,0 +1,124 @@
+package com.zdjizhi.utils.app;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.http.HttpClientUtil;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AppId 工具类
+ *
+ * @author qidaijie
+ */
+
+@Deprecated
+public class AppUtils {
+ private static final Log logger = LogFactory.get();
+ private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
+ private static AppUtils appUtils;
+
+ private static void getAppInstance() {
+ appUtils = new AppUtils();
+ }
+
+
+ /**
+ * 构造函数-新
+ */
+ private AppUtils() {
+ //定时更新
+ updateAppIdCache();
+ }
+
+ /**
+ * 更新变量
+ */
+ private static void change() {
+ if (appUtils == null) {
+ getAppInstance();
+ }
+ timestampsFilter();
+ }
+
+
+ /**
+ * 获取变更内容
+ */
+ private static void timestampsFilter() {
+ try {
+ Long begin = System.currentTimeMillis();
+ String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
+ if (StringUtil.isNotBlank(schema)) {
+ String data = JSONObject.parseObject(schema).getString("data");
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ int key = jsonArray.getInteger(0);
+ String value = jsonArray.getString(1);
+ if (appIdMap.containsKey(key)) {
+ if (!value.equals(appIdMap.get(key))) {
+ appIdMap.put(key, value);
+ }
+ } else {
+ appIdMap.put(key, value);
+ }
+ }
+ logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
+ logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
+ }
+ } catch (RuntimeException e) {
+ logger.error("Update cache app-id failed, exception:" + e);
+ }
+ }
+
+
+ /**
+ * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
+ */
+ private void updateAppIdCache() {
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
+ change();
+ }
+ } catch (RuntimeException e) {
+ logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
+ }
+ }
+ }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * 获取 appName
+ *
+ * @param appId app_id
+ * @return account
+ */
+ public static String getAppName(int appId) {
+
+ if (appUtils == null) {
+ getAppInstance();
+ }
+
+ if (appIdMap.containsKey(appId)) {
+ return appIdMap.get(appId);
+ } else {
+ logger.warn("AppMap get appName is null, ID is :" + appId);
+ return "";
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java
new file mode 100644
index 0000000..e4d7a8c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java
@@ -0,0 +1,158 @@
+package com.zdjizhi.utils.ck;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.ClickHouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier;
+
+public class CKSinkFlatMap extends RichFlatMapFunction<Map<String, Object>, String> {
+
+ private static final Log log = LogFactory.get();
+
+ private static int count = 1;
+ private static ClickHouseConnection connection = null;
+ private static PreparedStatement preparedStatement = null;
+
+ static String address = "jdbc:clickhouse://192.168.45.102:8123";
+ static String database = "default";
+ static String username = "default";
+ static String password = "galaxy2019";
+ static String fieldStr = "id,name,age";
+ static String tableName = "user_table";
+
+ private String insertSql;
+
+ //创建连接对象和会话
+ @Override
+ public void open(Configuration parameters) {
+ try {
+ connection = getConn();
+ log.info("get clickhouse connection success !");
+ } catch (Exception e) {
+ log.error("clickhouse初始化连接报错:", e);
+ }
+ }
+
+ //使用Batch批量写入,关闭自动提交
+ @Override
+ public void flatMap(Map<String, Object> data, Collector<String> collector) {
+
+ try {
+ String insertSql = preparedSql(fieldStr, tableName);
+ connection.setAutoCommit(false);
+ preparedStatement = connection.prepareStatement(insertSql);
+
+ LinkedList<Object> values = new LinkedList<>(data.values());
+ for (int i = 1; i <= values.size(); i++) {
+ Object val = values.get(i - 1);
+ if (val instanceof Long) {
+ preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Integer) {
+ preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Boolean) {
+ preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
+ } else {
+ preparedStatement.setString((i), StrUtil.toString(val));
+ }
+ }
+
+ preparedStatement.addBatch();
+ count = count + 1;
+ try {
+// if (count >= 50000) {
+// preparedStatement.executeBatch();
+// connection.commit();
+// preparedStatement.clearBatch();
+// count = 1;
+// }
+
+ //1w提交一次
+ if (count % 10000 == 0) {
+ preparedStatement.executeBatch();
+ connection.commit();
+ preparedStatement.clearBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+
+ } catch (Exception ee) {
+ log.error("数据插入click house 报错:", ee);
+ }
+ } catch (Exception ex) {
+ log.error("ClickhouseSink插入报错====", ex);
+ }
+ }
+
+ public static ClickHouseConnection getConn() {
+
+ int socketTimeout = 600000;
+ ClickHouseProperties properties = new ClickHouseProperties();
+ properties.setUser(username);
+ properties.setPassword(password);
+ properties.setDatabase(database);
+ properties.setSocketTimeout(socketTimeout);
+ ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties);
+ ClickHouseConnection conn = null;
+ try {
+ conn = clickHouseDataSource.getConnection();
+ return conn;
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public static Map getField() {
+
+ return null;
+ }
+
+
+ public String preparedSql(String fieldStr, String tableName) {
+ List<String> fields = StrUtil.split(fieldStr, ",");
+ return getInsertSql(fields, database + "." + tableName);
+ }
+
+ public String getInsertSql(List<String> fileds, String tableName) {
+ String sql = "";
+ String sqlStr1 = "INSERT INTO " + tableName + " (";
+ String sqlStr2 = ") VALUES (";
+ String sqlStr3 = ")";
+ String sqlKey = "";
+ String sqlValue = "";
+ for (String key : fileds) {
+ sqlKey += key + ",";
+ sqlValue += "?,";
+ }
+ sqlKey = sqlKey.substring(0, sqlKey.length() - 1);
+ sqlValue = sqlValue.substring(0, sqlValue.length() - 1);
+ sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3);
+
+// String placeholders = Arrays.stream(fieldNames)
+// .map(f -> "?")
+// .collect(Collectors.joining(", "));
+// return "INSERT INTO " + quoteIdentifier(tableName) +
+// "(" + columns + ")" + " VALUES (" + placeholders + ")";
+
+ log.info(sql);
+ return sql;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
new file mode 100644
index 0000000..0407544
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -0,0 +1,132 @@
+package com.zdjizhi.utils.ck;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
+
+ private static final Log log = LogFactory.get();
+
+ private static Connection connection;
+ private static PreparedStatement preparedStatement;
+ public String sink;
+
+ static {
+ try {
+ Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
+ connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
+// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
+// connection = dataSource.getConnection();
+ log.info("get clickhouse connection success");
+ } catch (ClassNotFoundException | SQLException e) {
+ log.error("clickhouse connection error ,{}", e);
+ }
+ }
+
+ public ClickhouseSink(String sink) {
+ this.sink = sink;
+ }
+
+ public String getSink() {
+ return sink;
+ }
+
+ public void setSink(String sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public void invoke(Map<String,Object> log, Context context) throws Exception {
+ executeInsert(log, getSink());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != connection) {
+ connection.close();
+ }
+ if (null != preparedStatement) {
+ preparedStatement.close();
+ }
+ }
+
+ public void executeInsert(Map<String, Object> data, String tableName) {
+
+ try {
+ int count = 1;
+ List<String> keys = new LinkedList<>(data.keySet());
+
+ connection.setAutoCommit(false);
+ preparedStatement = connection.prepareStatement(preparedSql(keys, tableName));
+ List<Object> values = new LinkedList<>(data.values());
+ for (int i = 1; i <= values.size(); i++) {
+ Object val = values.get(i - 1);
+ if (val instanceof Long) {
+ preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Integer) {
+ preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Boolean) {
+ preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
+ } else {
+ preparedStatement.setString((i), StrUtil.toString(val));
+ }
+ }
+
+ preparedStatement.addBatch();
+ count = count + 1;
+ try {
+ //1w提交一次
+ if (count % 10000 == 0) {
+ preparedStatement.executeBatch();
+ connection.commit();
+ preparedStatement.clearBatch();
+ count = 1;
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (Exception ee) {
+ log.error("数据插入clickhouse 报错:", ee);
+ }
+ } catch (Exception ex) {
+ log.error("ClickhouseSink插入报错", ex);
+ }
+ }
+
+
+ public String preparedSql(List<String> fields, String tableName) {
+
+ String placeholders = fields.stream()
+ .filter(Objects::nonNull)
+ .map(f -> "?")
+ .collect(Collectors.joining(", "));
+ String columns = fields.stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.joining(", "));
+ String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
+ "(", columns, ") VALUES (", placeholders, ")");
+ log.info(sql);
+ return sql;
+ }
+
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java
new file mode 100644
index 0000000..67c88f0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.utils.exception;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.execption
+ * @Description:
+ * @date 2021/3/259:42
+ */
+public class FlowWriteException extends RuntimeException {
+
+ public FlowWriteException() {
+ }
+
+ public FlowWriteException(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..de507ad
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class FilterNullFunction implements FilterFunction<String> {
+ @Override
+ public boolean filter(String message) {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
new file mode 100644
index 0000000..810e4c8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.utils.functions;
+
+
+import com.zdjizhi.utils.general.TransFormMap;
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.Map;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class MapCompletedFunction implements MapFunction<Map<String, Object>, String> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(Map<String, Object> logs) {
+ return TransFormMap.dealCommonMessage(logs);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
new file mode 100644
index 0000000..ccef850
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.general.TransFormTypeMap;
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.Map;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, String> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(Map<String, Object> logs) {
+
+ return TransFormTypeMap.dealCommonMessage(logs);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/CityHash.java b/src/main/java/com/zdjizhi/utils/general/CityHash.java
new file mode 100644
index 0000000..5de4785
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/CityHash.java
@@ -0,0 +1,180 @@
+package com.zdjizhi.utils.general;
+
+
+
+
+/**
+ * CityHash64算法对logid进行散列计算
+ * 版本规划暂不实现-TSG22.01
+ *
+ * @author qidaijie
+ */
+@Deprecated
+public class CityHash {
+
+ private static final long k0 = 0xc3a5c85c97cb3127L;
+ private static final long k1 = 0xb492b66fbe98f273L;
+ private static final long k2 = 0x9ae16a3b2f90404fL;
+ private static final long k3 = 0xc949d7c7509e6557L;
+ private static final long k5 = 0x9ddfea08eb382d69L;
+
+ private CityHash() {}
+
+ public static long CityHash64(byte[] s, int index, int len) {
+ if (len <= 16 ) {
+ return HashLen0to16(s, index, len);
+ } else if (len > 16 && len <= 32) {
+ return HashLen17to32(s, index, len);
+ } else if (len > 32 && len <= 64) {
+ return HashLen33to64(s, index, len);
+ } else {
+ long x = Fetch64(s, index);
+ long y = Fetch64(s, index + len - 16) ^ k1;
+ long z = Fetch64(s, index + len - 56) ^ k0;
+ long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y);
+ long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0);
+ z += ShiftMix(v[1]) * k1;
+ x = Rotate(z + x, 39) * k1;
+ y = Rotate(y, 33) * k1;
+
+ len = (len - 1) & ~63;
+ do {
+ x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1;
+ y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1;
+ x ^= w[1];
+ y ^= v[0];
+ z = Rotate(z ^ w[0], 33);
+ v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]);
+ w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y);
+ long t = z;
+ z = x;
+ x = t;
+ index += 64;
+ len -= 64;
+ } while (len != 0);
+ return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z,
+ HashLen16(v[1], w[1]) + x);
+ }
+ }
+
+ private static long HashLen0to16(byte[] s, int index, int len) {
+ if (len > 8) {
+ long a = Fetch64(s, index);
+ long b = Fetch64(s, index + len - 8);
+ return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b;
+ }
+ if (len >= 4) {
+ long a = Fetch32(s, index);
+ return HashLen16(len + (a << 3), Fetch32(s, index + len - 4));
+ }
+ if (len > 0) {
+ byte a = s[index];
+ byte b = s[index + len >>> 1];
+ byte c = s[index + len - 1];
+ int y = (a) + (b << 8);
+ int z = len + (c << 2);
+ return ShiftMix(y * k2 ^ z * k3) * k2;
+ }
+ return k2;
+ }
+
+ private static long HashLen17to32(byte[] s, int index, int len) {
+ long a = Fetch64(s, index) * k1;
+ long b = Fetch64(s, index + 8);
+ long c = Fetch64(s, index + len - 8) * k2;
+ long d = Fetch64(s, index + len - 16) * k0;
+ return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d,
+ a + Rotate(b ^ k3, 20) - c + len);
+ }
+
+ private static long HashLen33to64(byte[] s, int index, int len) {
+ long z = Fetch64(s, index + 24);
+ long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0;
+ long b = Rotate(a + z, 52);
+ long c = Rotate(a, 37);
+ a += Fetch64(s, index + 8);
+ c += Rotate(a, 7);
+ a += Fetch64(s, index + 16);
+ long vf = a + z;
+ long vs = b + Rotate(a, 31) + c;
+ a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32);
+ z = Fetch64(s, index + len - 8);
+ b = Rotate(a + z, 52);
+ c = Rotate(a, 37);
+ a += Fetch64(s, index + len - 24);
+ c += Rotate(a, 7);
+ a += Fetch64(s, index + len - 16);
+ long wf = a + z;
+ long ws = b + Rotate(a, 31) + c;
+ long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0);
+ return ShiftMix(r * k0 + vs) * k2;
+ }
+
+ private static long Fetch64(byte[] p, int index) {
+ return toLongLE(p,index);
+ }
+
+ private static long Fetch32(byte[] p, int index) {
+ return toIntLE(p,index);
+ }
+ private static long[] WeakHashLen32WithSeeds(
+ long w, long x, long y, long z, long a, long b) {
+ a += w;
+ b = Rotate(b + a + z, 21);
+ long c = a;
+ a += x;
+ a += y;
+ b += Rotate(a, 44);
+ return new long[]{a + z, b + c};
+ }
+
+ private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) {
+ return WeakHashLen32WithSeeds(Fetch64(s, index),
+ Fetch64(s, index + 8),
+ Fetch64(s, index + 16),
+ Fetch64(s, index + 24),
+ a,
+ b);
+ }
+
+ private static long toLongLE(byte[] b, int i) {
+ return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
+ }
+
+ private static long toIntLE(byte[] b, int i) {
+ return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
+ }
+ private static long RotateByAtLeastOne(long val, int shift) {
+ return (val >>> shift) | (val << (64 - shift));
+ }
+
+ private static long ShiftMix(long val) {
+ return val ^ (val >>> 47);
+ }
+
+ private static long Uint128Low64(long[] x) {
+ return x[0];
+ }
+
+ private static long Rotate(long val, int shift) {
+ return shift == 0 ? val : (val >>> shift) | (val << (64 - shift));
+ }
+
+ private static long Uint128High64(long[] x) {
+ return x[1];
+ }
+
+ private static long Hash128to64(long[] x) {
+ long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5;
+ a ^= (a >>> 47);
+ long b = (Uint128High64(x) ^ a) * k5;
+ b ^= (b >>> 47);
+ b *= k5;
+ return b;
+ }
+
+ private static long HashLen16(long u, long v) {
+ return Hash128to64(new long[]{u,v});
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
new file mode 100644
index 0000000..7cb907e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
@@ -0,0 +1,213 @@
+package com.zdjizhi.utils.general;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.zookeeper.DistributedLock;
+import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
+
+/**
+ * 雪花算法
+ *
+ * @author qidaijie
+ */
+public class SnowflakeId {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 共64位 第一位为符号位 默认0
+ * 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63),
+ * workerId(关联进程):7(0-127) ,序列号:11位(2047/ms)
+ *
+ * 序列号 /ms = (-1L ^ (-1L << 11))
+ * 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
+ */
+ /**
+ * 开始时间截 (2020-11-14 00:00:00) max 17years
+ */
+ private final long twepoch = 1605283200000L;
+
+ /**
+ * 机器id所占的位数
+ */
+ private final long workerIdBits = 8L;
+
+ /**
+ * 数据标识id所占的位数
+ */
+ private final long dataCenterIdBits = 5L;
+
+ /**
+ * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
+ * M << n = M * 2^n
+ */
+ private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
+
+ /**
+ * 支持的最大数据标识id,结果是31
+ */
+ private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
+
+ /**
+ * 序列在id中占的位数
+ */
+ private final long sequenceBits = 11L;
+
+ /**
+ * 机器ID向左移12位
+ */
+ private final long workerIdShift = sequenceBits;
+
+ /**
+ * 数据标识id向左移17位(14+6)
+ */
+ private final long dataCenterIdShift = sequenceBits + workerIdBits;
+
+ /**
+ * 时间截向左移22位(4+6+14)
+ */
+ private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
+
+ /**
+ * 生成序列的掩码,这里为2047
+ */
+ private final long sequenceMask = -1L ^ (-1L << sequenceBits);
+
+ /**
+ * 工作机器ID(0~255)
+ */
+ private long workerId;
+
+ /**
+ * 数据中心ID(0~31)
+ */
+ private long dataCenterId;
+
+ /**
+ * 毫秒内序列(0~2047)
+ */
+ private long sequence = 0L;
+
+ /**
+ * 上次生成ID的时间截
+ */
+ private long lastTimestamp = -1L;
+
+
+ /**
+ * 设置允许时间回拨的最大限制10s
+ */
+ private static final long rollBackTime = 10000L;
+
+
+ private static SnowflakeId idWorker;
+
+ private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
+
+ static {
+ idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM);
+ }
+
+ //==============================Constructors=====================================
+
+ /**
+ * 构造函数
+ */
+ private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
+ DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
+ try {
+ lock.lock();
+ int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
+ if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
+ throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
+ }
+ if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
+ throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
+ }
+ this.workerId = tmpWorkerId;
+ this.dataCenterId = dataCenterIdNum;
+ } catch (RuntimeException e) {
+ logger.error("This is not usual error!!!===>>>" + e + "<<<===");
+ }finally {
+ lock.unlock();
+ }
+ }
+
+ // ==============================Methods==========================================
+
+ /**
+ * 获得下一个ID (该方法是线程安全的)
+ *
+ * @return SnowflakeId
+ */
+ private synchronized long nextId() {
+ long timestamp = timeGen();
+ //设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准
+ if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
+ if (timestamp < lastTimestamp) {
+ throw new RuntimeException(
+ String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
+ }
+
+ //如果是同一时间生成的,则进行毫秒内序列
+ if (lastTimestamp == timestamp) {
+ sequence = (sequence + 1) & sequenceMask;
+ //毫秒内序列溢出
+ if (sequence == 0) {
+ //阻塞到下一个毫秒,获得新的时间戳
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ }
+ //时间戳改变,毫秒内序列重置
+ else {
+ sequence = 0L;
+ }
+
+ //上次生成ID的时间截
+ lastTimestamp = timestamp;
+
+ //移位并通过或运算拼到一起组成64位的ID
+ return ((timestamp - twepoch) << timestampLeftShift)
+ | (dataCenterId << dataCenterIdShift)
+ | (workerId << workerIdShift)
+ | sequence;
+ }
+
+ /**
+ * 阻塞到下一个毫秒,直到获得新的时间戳
+ *
+ * @param lastTimestamp 上次生成ID的时间截
+ * @return 当前时间戳
+ */
+ protected long tilNextMillis(long lastTimestamp) {
+ long timestamp = timeGen();
+ while (timestamp <= lastTimestamp) {
+ timestamp = timeGen();
+ }
+ return timestamp;
+ }
+
+ /**
+ * 返回以毫秒为单位的当前时间
+ *
+ * @return 当前时间(毫秒)
+ */
+ protected long timeGen() {
+ return System.currentTimeMillis();
+ }
+
+
+ /**
+ * 静态工具类
+ *
+ * @return
+ */
+ public static Long generateId() {
+ return idWorker.nextId();
+ }
+
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
new file mode 100644
index 0000000..27daa71
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
@@ -0,0 +1,125 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.Map;
+
+
+/**
+ * 描述:转换或补全工具类
+ *
+ * @author qidaijie
+ */
+public class TransFormMap {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 解析日志,并补全
+ *
+ * @param jsonMap kafka Topic消费原始日志并解析
+ * @return 补全后的日志
+ */
+ @SuppressWarnings("unchecked")
+ public static String dealCommonMessage(Map<String, Object> jsonMap) {
+ try {
+ JsonParseUtil.dropJsonField(jsonMap);
+ for (String[] strings : JsonParseUtil.getJobList()) {
+ //用到的参数的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
+ }
+ return JsonMapper.toJsonString(jsonMap);
+ } catch (RuntimeException e) {
+ logger.error("TransForm logs failed,The exception is :" + e);
+ return null;
+ }
+ }
+
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKeyName 需要补全的字段的key
+ * @param appendTo 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
+ */
+ private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendTo instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendTo == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ }
+ break;
+ default:
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
new file mode 100644
index 0000000..34cabfa
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
@@ -0,0 +1,127 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.Map;
+
+
+/**
+ * 描述:转换或补全工具类
+ *
+ * @author qidaijie
+ */
+public class TransFormTypeMap {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 解析日志,并补全
+ *
+ * @param message kafka Topic原始日志
+ * @return 补全后的日志
+ */
+ @SuppressWarnings("unchecked")
+ public static String dealCommonMessage(Map<String, Object> message) {
+ try {
+ Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message);
+ for (String[] strings : JsonParseUtil.getJobList()) {
+ //用到的参数的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
+ }
+ return JsonMapper.toJsonString(jsonMap);
+ } catch (RuntimeException e) {
+ logger.error("TransForm logs failed,The exception is :" + e);
+ return null;
+ }
+ }
+
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKeyName 需要补全的字段的key
+ * @param appendToKeyValue 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
+ */
+ private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendToKeyValue instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ //版本规划暂不实现TSG-22.01
+// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendToKeyValue == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ }
+ break;
+ default:
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
new file mode 100644
index 0000000..e3363f9
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -0,0 +1,248 @@
+package com.zdjizhi.utils.general;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.FormatUtils;
+import com.zdjizhi.utils.IpLookupV2;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.app.AppUtils;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * @author qidaijie
+ */
+class TransFunction {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 校验数字正则
+ */
+ private static final Pattern PATTERN = Pattern.compile("[0-9]*");
+
+ /**
+ * IP定位库工具类
+ */
+ private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
+ .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
+ .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
+ .build();
+
+ /**
+ * 生成当前时间戳的操作
+ */
+ static long getCurrentTime() {
+
+ return System.currentTimeMillis() / 1000;
+ }
+
+ /**
+ * CityHash64算法
+ * 版本规划暂不实现-TSG22.01
+ *
+ * @param data 原始数据
+ * @return 散列结果
+ */
+ @Deprecated
+ static BigInteger getDecimalHash(long data) {
+ byte[] dataBytes = String.valueOf(data).getBytes();
+ long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
+ String decimalValue = Long.toUnsignedString(hashValue, 10);
+ return new BigInteger(decimalValue);
+ }
+
+ /**
+ * 根据clientIp获取location信息
+ *
+ * @param ip client IP
+ * @return ip地址详细信息
+ */
+ static String getGeoIpDetail(String ip) {
+ try {
+ return ipLookup.cityLookupDetail(ip);
+ } catch (NullPointerException npe) {
+ logger.error("The MMDB file is not loaded or IP is null! " + npe);
+ return "";
+ } catch (RuntimeException e) {
+ logger.error("Get clientIP location error! " + e);
+ return "";
+ }
+ }
+
+ /**
+ * 根据ip获取asn信息
+ *
+ * @param ip client/server IP
+ * @return ASN
+ */
+ static String getGeoAsn(String ip) {
+ try {
+ return ipLookup.asnLookup(ip);
+ } catch (NullPointerException npe) {
+ logger.error("The MMDB file is not loaded or IP is null! " + npe);
+ return "";
+ } catch (RuntimeException e) {
+ logger.error("Get IP ASN error! " + e);
+ return "";
+ }
+ }
+
+ /**
+ * 根据ip获取country信息
+ *
+ * @param ip server IP
+ * @return 国家
+ */
+ static String getGeoIpCountry(String ip) {
+ try {
+ return ipLookup.countryLookup(ip);
+ } catch (NullPointerException npe) {
+ logger.error("The MMDB file is not loaded or IP is null! " + npe);
+ return "";
+ } catch (RuntimeException e) {
+ logger.error("Get ServerIP location error! " + e);
+ return "";
+ }
+ }
+
+
+ /**
+ * radius借助HBase补齐
+ *
+ * @param ip client IP
+ * @return account
+ */
+
+ /**
+ * appId与缓存中对应关系补全appName
+ *
+ * @param appIds app id 列表
+ * @return appName
+ */
+ @Deprecated
+ static String appMatch(String appIds) {
+ try {
+ String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
+ return AppUtils.getAppName(Integer.parseInt(appId));
+ } catch (NumberFormatException | ClassCastException exception) {
+ logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
+ return "";
+ }
+ }
+
+ /**
+ * 解析顶级域名
+ *
+ * @param domain 初始域名
+ * @return 顶级域名
+ */
+ static String getTopDomain(String domain) {
+ try {
+ return FormatUtils.getTopPrivateDomain(domain);
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("Parse top-level domain exceptions, exception domain names:" + domain);
+ return "";
+ }
+ }
+
+ /**
+ * 根据编码解码base64
+ *
+ * @param message base64
+ * @param charset 编码
+ * @return 解码字符串
+ */
+ static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset == null) {
+ result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ } else {
+ result = Base64.decodeStr(message, charset.toString());
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("Resolve Base64 exception, exception information:" + e);
+ }
+ return result;
+ }
+
+ /**
+ * 根据表达式解析json
+ *
+ * @param message json
+ * @param expr 解析表达式
+ * @return 解析结果
+ */
+ static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ if (StringUtil.isNotBlank(expr)) {
+ ArrayList<String> read = JsonPath.parse(message).read(expr);
+ if (read.size() >= 1) {
+ flattenResult = read.get(0);
+ }
+ }
+ } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
+ logger.error("The device label resolution exception or [expr] analytic expression error" + e);
+ }
+ return flattenResult;
+ }
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param jsonMap 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ static Object isJsonValue(Map<String, Object> jsonMap, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ return JsonParseUtil.getValue(jsonMap, param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+ *
+ * @param jsonMap 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or null
+ */
+ static Object condition(Map<String, Object> jsonMap, String ifParam) {
+ Object result = null;
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(jsonMap, norms[0]);
+ Object resultA = isJsonValue(jsonMap, split[1]);
+ Object resultB = isJsonValue(jsonMap, split[2]);
+ if (direction instanceof Number) {
+ result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(norms[1]) ? resultA : resultB;
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ }
+ return result;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..1adb1d1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.utils.http;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 获取网关schema的工具类
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 请求网关获取schema
+ *
+ * @param http 网关url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ CloseableHttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(get);
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
+ }
+ entityStringBuilder.append(c);
+ }
+
+ return entityStringBuilder.toString();
+ }
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (bufferedReader != null) {
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
new file mode 100644
index 0000000..ddb29ed
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -0,0 +1,338 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+
+
+/**
+ * 使用FastJson解析json的工具类
+ *
+ * @author qidaijie
+ */
+public class JsonParseUtil {
+ private static final Log logger = LogFactory.get();
+ private static Properties propNacos = new Properties();
+
+ /**
+ * 获取需要删除字段的列表
+ */
+ private static ArrayList<String> dropList = new ArrayList<>();
+
+ /**
+ * 在内存中加载反射类用的map
+ */
+ private static HashMap<String, Class> jsonFieldsMap;
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList;
+
+ static {
+ propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
+ propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE);
+ propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
+ propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
+ try {
+ ConfigService configService = NacosFactory.createConfigService(propNacos);
+ String dataId = FlowWriteConfig.NACOS_DATA_ID;
+ String group = FlowWriteConfig.NACOS_GROUP;
+ String schema = configService.getConfig(dataId, group, 5000);
+ if (StringUtil.isNotBlank(schema)) {
+ jsonFieldsMap = getFieldsFromSchema(schema);
+ jobList = getJobListFromHttp(schema);
+ }
+ configService.addListener(dataId, group, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configMsg) {
+ if (StringUtil.isNotBlank(configMsg)) {
+ clearCache();
+ jsonFieldsMap = getFieldsFromSchema(configMsg);
+ jobList = getJobListFromHttp(configMsg);
+ }
+ }
+ });
+ } catch (NacosException e) {
+ logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
+ }
+ }
+
+ /**
+ * 模式匹配,给定一个类型字符串返回一个类类型
+ *
+ * @param type 类型
+ * @return 类类型
+ */
+
+ private static Class getClassName(String type) {
+ Class clazz;
+
+ switch (type) {
+ case "int":
+ clazz = Integer.class;
+ break;
+ case "string":
+ clazz = String.class;
+ break;
+ case "long":
+ clazz = long.class;
+ break;
+ case "array":
+ clazz = List.class;
+ break;
+ case "double":
+ clazz = double.class;
+ break;
+ case "float":
+ clazz = float.class;
+ break;
+ case "char":
+ clazz = char.class;
+ break;
+ case "byte":
+ clazz = byte.class;
+ break;
+ case "boolean":
+ clazz = boolean.class;
+ break;
+ case "short":
+ clazz = short.class;
+ break;
+ default:
+ clazz = String.class;
+ }
+ return clazz;
+ }
+
+ /**
+ * 获取属性值的方法
+ *
+ * @param jsonMap 原始日志
+ * @param property key
+ * @return 属性的值
+ */
+ public static Object getValue(Map<String, Object> jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 更新属性值的方法
+ *
+ * @param jsonMap 原始日志json map
+ * @param property 更新的key
+ * @param value 更新的值
+ */
+ public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("赋予实体类错误类型数据", e);
+ }
+ }
+
+ /**
+ * 类型转换
+ *
+ * @param jsonMap 原始日志map
+ */
+ public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
+ JsonParseUtil.dropJsonField(jsonMap);
+ HashMap<String, Object> tmpMap = new HashMap<>(192);
+ for (String key : jsonMap.keySet()) {
+ if (jsonFieldsMap.containsKey(key)) {
+ String simpleName = jsonFieldsMap.get(key).getSimpleName();
+ switch (simpleName) {
+ case "String":
+ tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
+ break;
+ case "Integer":
+ tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key)));
+ break;
+ case "long":
+ tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key)));
+ break;
+ case "List":
+ tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key)));
+ break;
+ case "Map":
+ tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key)));
+ break;
+ case "double":
+ tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key)));
+ break;
+ default:
+ tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
+ }
+ }
+ }
+ return tmpMap;
+ }
+
+ public static ArrayList<String[]> getJobList() {
+ return jobList;
+ }
+
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ * <p>
+ * // * @param http 网关schema地址
+ *
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ private static HashMap<String, Class> getFieldsFromSchema(String schema) {
+ HashMap<String, Class> map = new HashMap<>(16);
+
+ //获取fields,并转化为数组,数组的每个元素都是一个name doc type
+ JSONObject schemaJson = JSON.parseObject(schema);
+ JSONArray fields = (JSONArray) schemaJson.get("fields");
+
+ for (Object field : fields) {
+ String filedStr = field.toString();
+ if (checkKeepField(filedStr)) {
+ String name = JsonPath.read(filedStr, "$.name").toString();
+ String type = JsonPath.read(filedStr, "$.type").toString();
+ if (type.contains("{")) {
+ type = JsonPath.read(filedStr, "$.type.type").toString();
+ }
+ //组合用来生成实体类的map
+ map.put(name, getClassName(type));
+ } else {
+ dropList.add(filedStr);
+ }
+ }
+ return map;
+ }
+
+ /**
+ * 判断字段是否需要保留
+ *
+ * @param message 单个field-json
+ * @return true or false
+ */
+ private static boolean checkKeepField(String message) {
+ boolean isKeepField = true;
+ boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
+ if (isHiveDoc) {
+ boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
+ if (isHiveVi) {
+ String visibility = JsonPath.read(message, "$.doc.visibility").toString();
+ if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
+ isKeepField = false;
+ }
+ }
+ }
+ return isKeepField;
+ }
+
+ /**
+ * 删除schema内指定的无效字段(jackson)
+ *
+ * @param jsonMap
+ */
+ public static void dropJsonField(Map<String, Object> jsonMap) {
+ for (String field : dropList) {
+ jsonMap.remove(field);
+ }
+ }
+
+ /**
+ * 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ *
+ * @param schema 日志schema
+ * @return 任务列表
+ */
+ private static ArrayList<String[]> getJobListFromHttp(String schema) {
+ ArrayList<String[]> list = new ArrayList<>();
+
+ //获取fields,并转化为数组,数组的每个元素都是一个name doc type
+ JSONObject schemaJson = JSON.parseObject(schema);
+ JSONArray fields = (JSONArray) schemaJson.get("fields");
+
+ for (Object field : fields) {
+
+ if (JSON.parseObject(field.toString()).containsKey("doc")) {
+ Object doc = JSON.parseObject(field.toString()).get("doc");
+
+ if (JSON.parseObject(doc.toString()).containsKey("format")) {
+ String name = JSON.parseObject(field.toString()).get("name").toString();
+ Object format = JSON.parseObject(doc.toString()).get("format");
+ JSONObject formatObject = JSON.parseObject(format.toString());
+
+ String functions = formatObject.get("functions").toString();
+ String appendTo = null;
+ String params = null;
+
+ if (formatObject.containsKey("appendTo")) {
+ appendTo = formatObject.get("appendTo").toString();
+ }
+
+ if (formatObject.containsKey("param")) {
+ params = formatObject.get("param").toString();
+ }
+
+
+ if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
+ String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
+
+ for (int i = 0; i < functionArray.length; i++) {
+ list.add(new String[]{name, appendToArray[i], functionArray[i], null});
+ }
+
+ } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
+ String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
+
+ for (int i = 0; i < functionArray.length; i++) {
+ list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
+
+ }
+ } else {
+ list.add(new String[]{name, name, functions, params});
+ }
+
+ }
+ }
+
+ }
+ return list;
+ }
+
+ /**
+ * 在配置变动时,清空缓存重新获取
+ */
+ private static void clearCache() {
+ jobList.clear();
+ jsonFieldsMap.clear();
+ dropList.clear();
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
new file mode 100644
index 0000000..0cf16ff
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
@@ -0,0 +1,121 @@
+package com.zdjizhi.utils.json;
+
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.exception.FlowWriteException;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1217:34
+ */
+public class JsonTypeUtil {
+ /**
+ * String 类型检验转换方法
+ *
+ * @param value json value
+ * @return String value
+ */
+ static String checkString(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * array 类型检验转换方法
+ *
+ * @param value json value
+ * @return List value
+ */
+ static Map checkObject(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return (Map) value;
+ }
+
+ throw new FlowWriteException("can not cast to map, value : " + value);
+ }
+
+ /**
+ * array 类型检验转换方法
+ *
+ * @param value json value
+ * @return List value
+ */
+ static List checkArray(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof List) {
+ return (List) value;
+ }
+
+ throw new FlowWriteException("can not cast to List, value : " + value);
+ }
+
+ /**
+ * long 类型检验转换方法,若为空返回基础值
+ *
+ * @param value json value
+ * @return Long value
+ */
+ static long checkLongValue(Object value) {
+ Long longVal = TypeUtils.castToLong(value);
+
+ if (longVal == null) {
+ return 0L;
+ }
+
+ return longVal;
+ }
+
+ /**
+ * Double 类型校验转换方法
+ *
+ * @param value json value
+ * @return Double value
+ */
+ static Double checkDouble(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToDouble(value);
+ }
+
+
+ /**
+ * int 类型检验转换方法,若为空返回基础值
+ *
+ * @param value json value
+ * @return int value
+ */
+ static int getIntValue(Object value) {
+
+ Integer intVal = TypeUtils.castToInt(value);
+ if (intVal == null) {
+ return 0;
+ }
+ return intVal;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
new file mode 100644
index 0000000..b13627f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
@@ -0,0 +1,171 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.exception.FlowWriteException;
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1218:20
+ */
+public class TypeUtils {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * Integer 类型判断方法
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ public static Object castToIfFunction(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value.toString();
+ }
+
+ if (value instanceof Integer) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof Long) {
+ return ((Number) value).longValue();
+ }
+
+// if (value instanceof Map) {
+// return (Map) value;
+// }
+//
+// if (value instanceof List) {
+// return Collections.singletonList(value.toString());
+// }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new FlowWriteException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Integer 类型判断方法
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ static Integer castToInt(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Integer.parseInt(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Integer Error,The error Str is:" + strVal);
+ }
+ }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new FlowWriteException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Double类型判断方法
+ *
+ * @param value json value
+ * @return double value or null
+ */
+ static Double castToDouble(Object value) {
+
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Double.parseDouble(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Double Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new FlowWriteException("can not cast to double, value : " + value);
+ }
+
+ /**
+ * Long类型判断方法
+ *
+ * @param value json value
+ * @return (Long)value or null
+ */
+ static Long castToLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Long.parseLong(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Long Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new FlowWriteException("can not cast to long, value : " + value);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..ce059f8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ /**
+ * Kafka SASL认证端口
+ */
+ private static final String SASL_PORT = "9094";
+
+ /**
+ * Kafka SSL认证端口
+ */
+ private static final String SSL_PORT = "9095";
+
+ /**
+ * 根据连接信息端口判断认证方式。
+ *
+ * @param servers kafka 连接信息
+ * @param properties kafka 连接配置信息
+ */
+ static void chooseCert(String servers, Properties properties) {
+ if (servers.contains(SASL_PORT)) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";");
+ } else if (servers.contains(SSL_PORT)) {
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..4b8c8f0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -0,0 +1,75 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.pojo.DbLogEntity;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/813:54
+ */
+public class KafkaConsumer {
+ private static Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
+ properties.put("group.id", FlowWriteConfig.GROUP_ID);
+ properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
+ properties.put("partition.discovery.interval.ms", "10000");
+ CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
+
+ return properties;
+ }
+
+ /**
+ * 用户序列化kafka数据,增加 kafka Timestamp内容。
+ *
+ * @return kafka logs -> map
+ */
+ @SuppressWarnings("unchecked")
+ public static FlinkKafkaConsumer<Map<String,Object>> myDeserializationConsumer(String topic) {
+ FlinkKafkaConsumer<Map<String,Object>> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new TimestampDeserializationSchema(), createConsumerConfig());
+
+ //随着checkpoint提交,将offset提交到kafka
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+
+ //从消费组当前的offset开始消费
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+
+ /**
+ * 官方序列化kafka数据
+ *
+ * @return kafka logs
+ */
+ public static FlinkKafkaConsumer<String> flinkConsumer() {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
+ new SimpleStringSchema(), createConsumerConfig());
+
+ //随着checkpoint提交,将offset提交到kafka
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+
+ //从消费组当前的offset开始消费
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
new file mode 100644
index 0000000..28ecff9
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/814:04
+ */
+public class KafkaProducer {
+
+ private static Properties createProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.SINK_KAFKA_SERVERS);
+ properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
+ properties.put("retries", FlowWriteConfig.RETRIES);
+ properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
+ properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
+ properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties);
+
+ return properties;
+ }
+
+
+ public static FlinkKafkaProducer<String> getKafkaProducer() {
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
+ FlowWriteConfig.SINK_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createProducerConfig(),
+ //sink与所有分区建立连接,轮询写入;
+ Optional.empty());
+
+ //允许producer记录失败日志而不是捕获和抛出它们
+ kafkaProducer.setLogFailuresOnly(true);
+
+ return kafkaProducer;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
new file mode 100644
index 0000000..7b18ab5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.utils.kafka;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.enums.LogMetadata;
+import com.zdjizhi.pojo.DbLogEntity;
+import com.zdjizhi.utils.JsonMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2022/3/89:42
+ */
+public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
+ private static final Log logger = LogFactory.get();
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(Map.class);
+ }
+
+ @Override
+ public boolean isEndOfStream(Object nextElement) {
+ return false;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<String,Object> deserialize(ConsumerRecord record) throws Exception {
+ if (record != null) {
+ try {
+// long timestamp = record.timestamp() / 1000;
+ String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING);
+ Map<String, Object> data = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
+// json.put("common_ingestion_time", timestamp);
+ return data;
+ } catch (RuntimeException e) {
+ logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
+ }
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
new file mode 100644
index 0000000..d793628
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
@@ -0,0 +1,75 @@
+package com.zdjizhi.utils.system;
+
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Locale;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class FlowWriteConfigurations {
+
+ private static Properties propKafka = new Properties();
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else if (type == 1) {
+ return propKafka.getProperty(key);
+ } else {
+ return null;
+ }
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | RuntimeException e) {
+ propKafka = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
new file mode 100644
index 0000000..2afab03
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
@@ -0,0 +1,190 @@
+package com.zdjizhi.utils.zookeeper;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * @author qidaijie
+ */
+public class DistributedLock implements Lock, Watcher {
+ private static final Log logger = LogFactory.get();
+
+ private ZooKeeper zk = null;
+ /**
+ * 根节点
+ */
+ private final String ROOT_LOCK = "/locks";
+ /**
+ * 竞争的资源
+ */
+ private String lockName;
+ /**
+ * 等待的前一个锁
+ */
+ private String waitLock;
+ /**
+ * 当前锁
+ */
+ private String currentLock;
+ /**
+ * 计数器
+ */
+ private CountDownLatch countDownLatch;
+
+ private int sessionTimeout = 2000;
+
+ private List<Exception> exceptionList = new ArrayList<Exception>();
+
+ /**
+ * 配置分布式锁
+ *
+ * @param config 连接的url
+ * @param lockName 竞争资源
+ */
+ public DistributedLock(String config, String lockName) {
+ this.lockName = lockName;
+ try {
+ // 连接zookeeper
+ zk = new ZooKeeper(config, sessionTimeout, this);
+ Stat stat = zk.exists(ROOT_LOCK, false);
+ if (stat == null) {
+ // 如果根节点不存在,则创建根节点
+ zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (IOException | InterruptedException | KeeperException e) {
+ logger.error("Node already exists!");
+ }
+ }
+
+ // 节点监视器
+ @Override
+ public void process(WatchedEvent event) {
+ if (this.countDownLatch != null) {
+ this.countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void lock() {
+ if (exceptionList.size() > 0) {
+ throw new LockException(exceptionList.get(0));
+ }
+ try {
+ if (this.tryLock()) {
+ logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁");
+ } else {
+ // 等待锁
+ waitForLock(waitLock, sessionTimeout);
+ }
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("获取锁异常" + e);
+ }
+ }
+
+ @Override
+ public boolean tryLock() {
+ try {
+ String splitStr = "_lock_";
+ if (lockName.contains(splitStr)) {
+ throw new LockException("锁名有误");
+ }
+ // 创建临时有序节点
+ currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ // 取所有子节点
+ List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
+ // 取出所有lockName的锁
+ List<String> lockObjects = new ArrayList<String>();
+ for (String node : subNodes) {
+ String tmpNode = node.split(splitStr)[0];
+ if (tmpNode.equals(lockName)) {
+ lockObjects.add(node);
+ }
+ }
+ Collections.sort(lockObjects);
+ // 若当前节点为最小节点,则获取锁成功
+ if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
+ return true;
+ }
+ // 若不是最小节点,则找到自己的前一个节点
+ String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
+ waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("获取锁过程异常" + e);
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean tryLock(long timeout, TimeUnit unit) {
+ try {
+ if (this.tryLock()) {
+ return true;
+ }
+ return waitForLock(waitLock, timeout);
+ } catch (KeeperException | InterruptedException | RuntimeException e) {
+ logger.error("判断是否锁定异常" + e);
+ }
+ return false;
+ }
+
+ // 等待锁
+ private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
+ Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
+
+ if (stat != null) {
+ this.countDownLatch = new CountDownLatch(1);
+ // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
+ this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
+ this.countDownLatch = null;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ zk.delete(currentLock, -1);
+ currentLock = null;
+ zk.close();
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("关闭锁异常" + e);
+ }
+ }
+
+ @Override
+ public Condition newCondition() {
+ return null;
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ this.lock();
+ }
+
+
+ public class LockException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LockException(String e) {
+ super(e);
+ }
+
+ public LockException(Exception e) {
+ super(e);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
new file mode 100644
index 0000000..9efbd46
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
@@ -0,0 +1,140 @@
+package com.zdjizhi.utils.zookeeper;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author qidaijie
+ * @Package cn.ac.iie.utils.zookeeper
+ * @Description:
+ * @date 2020/11/1411:28
+ */
+public class ZookeeperUtils implements Watcher {
+ private static final Log logger = LogFactory.get();
+ private static final int ID_MAX = 255;
+
+ private ZooKeeper zookeeper;
+
+ private static final int SESSION_TIME_OUT = 20000;
+
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected) {
+ countDownLatch.countDown();
+ }
+ }
+
+
+ /**
+ * 修改节点信息
+ *
+ * @param path 节点路径
+ */
+ public int modifyNode(String path, String zookeeperIp) {
+ createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
+ int workerId = 0;
+ try {
+ connectZookeeper(zookeeperIp);
+ Stat stat = zookeeper.exists(path, true);
+ workerId = Integer.parseInt(getNodeDate(path));
+ if (workerId > ID_MAX) {
+ workerId = 0;
+ zookeeper.setData(path, "1".getBytes(), stat.getVersion());
+ } else {
+ String result = String.valueOf(workerId + 1);
+ if (stat != null) {
+ zookeeper.setData(path, result.getBytes(), stat.getVersion());
+ } else {
+ logger.error("Node does not exist!,Can't modify");
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("modify error Can't modify," + e);
+ } finally {
+ closeConn();
+ }
+ logger.warn("workerID is:" + workerId);
+ return workerId;
+ }
+
+ /**
+ * 连接zookeeper
+ *
+ * @param host 地址
+ */
+ public void connectZookeeper(String host) {
+ try {
+ zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
+ countDownLatch.await();
+ } catch (IOException | InterruptedException e) {
+ logger.error("Connection to the Zookeeper Exception! message:" + e);
+ }
+ }
+
+ /**
+ * 关闭连接
+ */
+ public void closeConn() {
+ try {
+ if (zookeeper != null) {
+ zookeeper.close();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Close the Zookeeper connection Exception! message:" + e);
+ }
+ }
+
+ /**
+ * 获取节点内容
+ *
+ * @param path 节点路径
+ * @return 内容/异常null
+ */
+ public String getNodeDate(String path) {
+ String result = null;
+ Stat stat = new Stat();
+ try {
+ byte[] resByte = zookeeper.getData(path, true, stat);
+
+ result = StrUtil.str(resByte, "UTF-8");
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("Get node information exception" + e);
+ }
+ return result;
+ }
+
+ /**
+ * @param path 节点创建的路径
+ * @param date 节点所存储的数据的byte[]
+ * @param acls 控制权限策略
+ */
+ public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
+ try {
+ connectZookeeper(zookeeperIp);
+ Stat exists = zookeeper.exists(path, true);
+ if (exists == null) {
+ Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
+ if (existsSnowflakeld == null) {
+ zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
+ }
+ zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
+ } else {
+ logger.warn("Node already exists ! Don't need to create");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ logger.error(e);
+ } finally {
+ closeConn();
+ }
+ }
+}
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
new file mode 100644
index 0000000..facffc7
--- /dev/null
+++ b/src/main/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+log4j.logger.com.nis.web.dao=info
+#bonecp数据源配置
+log4j.category.com.jolbox=info,console
+
+
diff --git a/src/main/logback.xml b/src/main/logback.xml
new file mode 100644
index 0000000..59095f6
--- /dev/null
+++ b/src/main/logback.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符-->
+ <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
+ <!-- 定义日志存储的路径,不要配置相对路径 -->
+ <property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" />
+
+ <!-- 控制台输出日志 -->
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <!-- 按照上面配置的LOG_PATTERN来打印日志 -->
+ <pattern>${LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
+
+ <!--每天生成一个日志文件,保存30天的日志文件。rollingFile是用来切分文件的 -->
+ <appender name="FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE_PATH}</fileNamePattern>
+ <!-- keep 15 days' worth of history -->
+ <maxHistory>30</maxHistory>
+ <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <!-- 日志文件的最大大小 -->
+ <maxFileSize>20MB</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+
+ <encoder>
+ <pattern>${LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
+ <!-- project default level项目输出的日志级别 -->
+ <logger name="com.example.demo" level="INFO" />
+
+ <!-- 日志输出级别 常用的日志级别按照从高到低依次为:ERROR、WARN、INFO、DEBUG。 -->
+ <root level="INFO">
+ <appender-ref ref="CONSOLE" />
+ <appender-ref ref="FILE" /><!--对应appender name="FILE"。 -->
+ </root>
+</configuration> \ No newline at end of file