summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-11 10:05:14 +0800
committerzhanghongqing <[email protected]>2022-07-11 10:05:14 +0800
commitc1b70a6da06a7a55123b7fb904e421b59c230a34 (patch)
tree4c846d8c4e22cc7db7293a91cd1733be5ec77744 /src/main/java
parente9c92fb2866bff0cc0457dd3a0a5d87fc0bc2fb6 (diff)
新增入库批量操作,clickhouse负载均衡调用
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/zdjizhi/common/ArangoDelayProcess.java65
-rw-r--r--src/main/java/com/zdjizhi/common/CKDelayProcess.java65
-rw-r--r--src/main/java/com/zdjizhi/common/ConnKeysSelector.java21
-rw-r--r--src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java23
-rw-r--r--src/main/java/com/zdjizhi/common/DnsKeysSelector.java19
-rw-r--r--src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java19
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java11
-rw-r--r--src/main/java/com/zdjizhi/common/KeysSelector.java1
-rw-r--r--src/main/java/com/zdjizhi/common/ListWindow.java12
-rw-r--r--src/main/java/com/zdjizhi/common/SketchKeysSelector.java19
-rw-r--r--src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java19
-rw-r--r--src/main/java/com/zdjizhi/common/TopMetricProcessV2.java65
-rw-r--r--src/main/java/com/zdjizhi/enums/DnsType.java40
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java118
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java35
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java43
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsMapFunction.java79
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsProcessFunction.java85
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java58
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java72
-rw-r--r--src/main/java/com/zdjizhi/pojo/DbLogEntity.java47
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java69
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKSink.java165
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java151
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java41
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java124
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java81
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java71
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java39
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java14
30 files changed, 857 insertions, 814 deletions
diff --git a/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java b/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java
new file mode 100644
index 0000000..d39e6c5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java
@@ -0,0 +1,65 @@
+package com.zdjizhi.common;
+
+import com.arangodb.entity.BaseDocument;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class ArangoDelayProcess extends ProcessFunction<BaseDocument, List<BaseDocument>> {
+
+ private ValueState<Long> currentTimer;
+ private ListState<BaseDocument> itemState;
+ private String stateName;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG));
+ ListStateDescriptor<BaseDocument> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class);
+ itemState = getRuntimeContext().getListState(itemViewStateDesc);
+ }
+
+ @Override
+ public void processElement(BaseDocument value, Context context, Collector<List<BaseDocument>> collector) throws Exception {
+ //判断定时器是否为空,为空则创建新的定时器
+ Long curTimeStamp = currentTimer.value();
+ if (curTimeStamp == null || curTimeStamp == 0) {
+ long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
+ context.timerService().registerEventTimeTimer(onTimer);
+ currentTimer.update(onTimer);
+ }
+ itemState.add(value);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<BaseDocument>> out) throws Exception {
+ Spliterator<BaseDocument> spliterator = itemState.get().spliterator();
+ List<BaseDocument> collect = StreamSupport.stream(spliterator, false)
+ .collect(Collectors.toList());
+ out.collect(collect);
+ currentTimer.clear();
+ itemState.clear();
+ }
+
+ public ArangoDelayProcess(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ public void setStateName(String stateName) {
+ this.stateName = stateName;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/CKDelayProcess.java b/src/main/java/com/zdjizhi/common/CKDelayProcess.java
new file mode 100644
index 0000000..35ec90e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/CKDelayProcess.java
@@ -0,0 +1,65 @@
+package com.zdjizhi.common;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class CKDelayProcess extends ProcessFunction<Map<String, Object>, List<Map<String, Object>>> {
+
+
+ private ValueState<Long> currentTimer;
+ private ListState<Map<String, Object>> itemState;
+ private String stateName;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG));
+ ListStateDescriptor<Map<String, Object>> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class);
+ itemState = getRuntimeContext().getListState(itemViewStateDesc);
+ }
+
+ @Override
+ public void processElement(Map<String, Object> value, Context context, Collector<List<Map<String, Object>>> collector) throws Exception {
+ //判断定时器是否为空,为空则创建新的定时器
+ Long curTimeStamp = currentTimer.value();
+ if (curTimeStamp == null || curTimeStamp == 0) {
+ long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
+ context.timerService().registerEventTimeTimer(onTimer);
+ currentTimer.update(onTimer);
+ }
+ itemState.add(value);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<Map<String, Object>>> out) throws Exception {
+ Spliterator<Map<String, Object>> spliterator = itemState.get().spliterator();
+ List<Map<String, Object>> collect = StreamSupport.stream(spliterator, false)
+ .collect(Collectors.toList());
+ out.collect(collect);
+ currentTimer.clear();
+ itemState.clear();
+ }
+
+ public CKDelayProcess(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ public void setStateName(String stateName) {
+ this.stateName = stateName;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/ConnKeysSelector.java b/src/main/java/com/zdjizhi/common/ConnKeysSelector.java
new file mode 100644
index 0000000..6012626
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/ConnKeysSelector.java
@@ -0,0 +1,21 @@
+package com.zdjizhi.common;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+@Deprecated
+public class ConnKeysSelector implements KeySelector<Map<String, Object>, String> {
+
+ @Override
+ public String getKey(Map<String,Object> log) throws Exception {
+ return String.valueOf(log.get("conn_start_time"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java
new file mode 100644
index 0000000..5aa08c5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.common;
+
+import cn.hutool.core.util.StrUtil;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class DnsGraphKeysSelector implements KeySelector<Map<String, Object>, Tuple3<String, String, String>> {
+
+ @Override
+ public Tuple3<String, String, String> getKey(Map<String, Object> log) throws Exception {
+
+ return Tuple3.of(StrUtil.toString(log.get("record_type")),
+ StrUtil.toString(log.get("qname")),
+ StrUtil.toString(log.get("record")));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/DnsKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java
deleted file mode 100644
index 101597c..0000000
--- a/src/main/java/com/zdjizhi/common/DnsKeysSelector.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.zdjizhi.common;
-
-import org.apache.flink.api.java.functions.KeySelector;
-
-import java.util.Map;
-
-/**
- * @description:
- * @author: zhq
- * @create: 2022-07-05
- **/
-public class DnsKeysSelector implements KeySelector<Map<String, Object>, String> {
-
- @Override
- public String getKey(Map<String, Object> log) throws Exception {
-
- return String.valueOf(log.get("dns_qname"));
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java
new file mode 100644
index 0000000..54af74c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.common;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class DnsTimeKeysSelector implements KeySelector<Map<String, Object>, String> {
+
+ @Override
+ public String getKey(Map<String,Object> log) throws Exception {
+ return String.valueOf(log.get("capture_time"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 34674f4..57f09a0 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -2,7 +2,6 @@ package com.zdjizhi.common;
import com.zdjizhi.utils.system.FlowWriteConfigurations;
-import org.apache.flink.configuration.ConfigUtils;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -123,6 +122,8 @@ public class FlowWriteConfig {
public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0, "ck.username");
public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0, "ck.pin");
public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0, "ck.database");
+ public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout");
+ public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout");
public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness");
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
@@ -139,6 +140,14 @@ public class FlowWriteConfig {
public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
+ public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.visit.ip2ip");
+ public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.cname.domain2domain");
+ public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.mx.domain2domain");
+ public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.resolve.domain2ip");
+ public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
+
+
+
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
diff --git a/src/main/java/com/zdjizhi/common/KeysSelector.java b/src/main/java/com/zdjizhi/common/KeysSelector.java
index a4d616c..bfa2083 100644
--- a/src/main/java/com/zdjizhi/common/KeysSelector.java
+++ b/src/main/java/com/zdjizhi/common/KeysSelector.java
@@ -11,6 +11,7 @@ import java.util.Map;
* @author: zhq
* @create: 2022-07-05
**/
+@Deprecated
public class KeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
@Override
diff --git a/src/main/java/com/zdjizhi/common/ListWindow.java b/src/main/java/com/zdjizhi/common/ListWindow.java
new file mode 100644
index 0000000..c5f6161
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/ListWindow.java
@@ -0,0 +1,12 @@
+package com.zdjizhi.common;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-10
+ **/
+public class ListWindow {
+
+
+
+}
diff --git a/src/main/java/com/zdjizhi/common/SketchKeysSelector.java b/src/main/java/com/zdjizhi/common/SketchKeysSelector.java
new file mode 100644
index 0000000..373a557
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/SketchKeysSelector.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.common;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class SketchKeysSelector implements KeySelector<Map<String, Object>, String> {
+
+ @Override
+ public String getKey(Map<String,Object> log) throws Exception {
+ return String.valueOf(log.get("sketch_start_time"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java b/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java
new file mode 100644
index 0000000..37ad5d5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.common;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class StartTimeKeysSelector implements KeySelector<Map<String, Object>, String> {
+
+ @Override
+ public String getKey(Map<String,Object> log) throws Exception {
+ return String.valueOf(log.get("start_time"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
deleted file mode 100644
index 46d308d..0000000
--- a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.zdjizhi.common;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeSet;
-
-public class TopMetricProcessV2 extends ProcessFunction<Map<String,Object>, Collector<Map<String,Object>>> {
-
-
- private ValueState<Long> currentTimer;
- private ListState<Map<String,Object>> itemState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("_timer", Types.LONG));
- ListStateDescriptor<Map<String,Object>> itemViewStateDesc = new ListStateDescriptor("_state", Map.class);
- itemState = getRuntimeContext().getListState(itemViewStateDesc);
- }
-
- @Override
- public void processElement(Map<String,Object> value, Context context, Collector<Collector<Map<String,Object>>> collector) throws Exception {
- //判断定时器是否为空,为空则创建新的定时器
- Long curTimeStamp = currentTimer.value();
- if (curTimeStamp == null || curTimeStamp == 0) {
- long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
- context.timerService().registerEventTimeTimer(onTimer);
- currentTimer.update(onTimer);
- }
- itemState.add(value);
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<Collector<Map<String, Object>>> out) throws Exception {
- super.onTimer(timestamp, ctx, out);
-
- Iterator<Map<String,Object>> iterator = itemState.get().iterator();
- if(iterator.hasNext()){
- out.collect((Collector<Map<String, Object>>) iterator.next());
- }
-// if (baseLogs.size() > FlowWriteConfig.SINK_BATCH) {
-// Map last = baseLogs.last();
-// if (Double.compare(map.get(orderBy).doubleValue(), last.get(orderBy).doubleValue()) > 0) {
-// baseLogs.pollLast();
-// baseLogs.add(map);
-// }
-// } else {
-// baseLogs.add(map);
-// }
-// }
- currentTimer.clear();
- itemState.clear();
-
-
- }
-}
diff --git a/src/main/java/com/zdjizhi/enums/DnsType.java b/src/main/java/com/zdjizhi/enums/DnsType.java
index bc5805e..2d8c985 100644
--- a/src/main/java/com/zdjizhi/enums/DnsType.java
+++ b/src/main/java/com/zdjizhi/enums/DnsType.java
@@ -1,16 +1,42 @@
package com.zdjizhi.enums;
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
/**
- * @description:
- * @author: zhq
- * @create: 2022-07-06
+ * @author zhq
+ * @description
+ * @create 2022-07-08
**/
public enum DnsType {
+ //对应dns类型,编码,入库表
+ A("a", " 0x0001", R_RESOLVE_DOMAIN2IP),
+ AAAA("aaaa", " 0x001c", R_RESOLVE_DOMAIN2IP),
+ CNAME("cname", " 0x0005", R_CNAME_DOMAIN2DOMAIN),
+ MX("mx", " 0x000f", R_MX_DOMAIN2DOMAIN),
+ NS("ns", " 0x0002", R_NX_DOMAIN2DOMAIN);
+
+ private String type;
+ private String code;
+ private String sink;
+
+ DnsType() {
+ }
+
+ DnsType(String type, String code, String table) {
+ this.type = type;
+ this.code = code;
+ this.sink = table;
+ }
- /*
- *dns 类型
- * */
+ public String getType() {
+ return type;
+ }
- a, aaaa, cname, mx, ns;
+ public String getCode() {
+ return code;
+ }
+ public String getSink() {
+ return sink;
+ }
}
diff --git a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java
deleted file mode 100644
index 5e852e0..0000000
--- a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package com.zdjizhi.etl;
-
-import cn.hutool.core.convert.Convert;
-import cn.hutool.core.util.StrUtil;
-import com.zdjizhi.enums.DnsType;
-import com.zdjizhi.pojo.DbLogEntity;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
-
-
-/**
- * @author 94976
- */
-public class DnsFlatMapFunction implements FlatMapFunction<DbLogEntity, DbLogEntity> {
-
- private static final Logger logger = LoggerFactory.getLogger(DnsFlatMapFunction.class);
-
- public void process(Iterable<DbLogEntity> elements, Collector<List<DbLogEntity>> out) {
- List<DbLogEntity> middleResult = getMiddleResult(elements);
- try {
- if (middleResult != null) {
- out.collect(middleResult);
- logger.debug("获取中间聚合结果:{}", middleResult.toString());
- }
- } catch (Exception e) {
- logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
- }
- }
-
- /**
- * 拆分dns_record
- * 五种:a/aaaa/cname/mx/ns
- * @param elements
- * @return
- */
- private List<DbLogEntity> getMiddleResult(Iterable<DbLogEntity> elements) {
- long startTime = System.currentTimeMillis() / 1000;
- long endTime = System.currentTimeMillis() / 1000;
- String tableName = "";
- String dnsQname = "";
- try {
- Map<String, Long> distinctA = new HashMap<>();
- Map<String, Long> distinctAAAA = new HashMap<>();
- Map<String, Long> distinctCname = new HashMap<>();
- Map<String, Long> distinctNs = new HashMap<>();
- Map<String, Long> distinctMx = new HashMap<>();
- for (DbLogEntity log : elements) {
- tableName = log.getTableName();
- List<String> dnsA = splitDns(log, "dns_a");
- List<String> dnsAAAA = splitDns(log, "dns_aaaa");
- List<String> dnsCname = splitDns(log, "dns_cname");
- List<String> dnsNs = splitDns(log, "dns_ns");
- List<String> dnsMx = splitDns(log, "dns_mx");
-
- dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum));
- dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum));
- dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum));
- dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum));
- dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum));
-
- long connStartTimetime = Convert.toLong(log.getData().get("capure_time_s"));
- startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
- endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
- dnsQname = StrUtil.toString(log.getData().get("dns_qname"));
- }
- DbLogEntity dbLogEntity = new DbLogEntity();
- dbLogEntity.setTableName(tableName);
- List<DbLogEntity> result = new ArrayList<>();
- result.addAll(getNewDns(startTime, endTime, dnsQname, distinctA, DnsType.a.toString(), dbLogEntity));
- result.addAll(getNewDns(startTime, endTime, dnsQname, distinctAAAA, DnsType.aaaa.toString(), dbLogEntity));
- result.addAll(getNewDns(startTime, endTime, dnsQname, distinctCname, DnsType.cname.toString(), dbLogEntity));
- result.addAll(getNewDns(startTime, endTime, dnsQname, distinctNs, DnsType.ns.toString(), dbLogEntity));
- result.addAll(getNewDns(startTime, endTime, dnsQname, distinctMx, DnsType.mx.toString(), dbLogEntity));
- return result;
-
- } catch (Exception e) {
- logger.error("聚合中间结果集失败 {}", e);
- }
- return null;
- }
-
-
- private static List<String> splitDns(DbLogEntity dbLogEntity, String key) {
-
- return StrUtil.split(StrUtil.toString(dbLogEntity.getData().get(key)), ",");
- }
-
- private List<DbLogEntity> getNewDns(long startTime, long endTime, String dnsQname, Map<String, Long> distinctMap, String type, DbLogEntity dbLogEntity) {
- List<DbLogEntity> newList = new ArrayList<>();
- for (Map.Entry<String, Long> dns : distinctMap.entrySet()) {
- Map<String, Object> newDns = new HashMap<>();
- newDns.put("start_time", startTime);
- newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
- newDns.put("record_type", type);
- newDns.put("qname", dnsQname);
- newDns.put("record", dns.getKey());
- newDns.put("sessions", dns.getValue());
- dbLogEntity.setData(newDns);
- newList.add(dbLogEntity);
- }
- return newList;
- }
-
- @Override
- public void flatMap(DbLogEntity dbLogEntity, Collector<DbLogEntity> collector) throws Exception {
-
-
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java
new file mode 100644
index 0000000..bc9a73d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java
@@ -0,0 +1,35 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.util.StrUtil;
+import com.arangodb.entity.BaseDocument;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+/**
+ * 去重
+ */
+public class DnsGraphMapFunction extends RichMapFunction<Map<String, Object>, BaseDocument> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsGraphMapFunction.class);
+
+ @Override
+ public BaseDocument map(Map<String, Object> map) throws Exception {
+ try {
+ BaseDocument baseDocument = new BaseDocument();
+ baseDocument.setKey(String.join("-", StrUtil.toString(map.get("qname")), StrUtil.toString(map.get("record"))));
+ baseDocument.addAttribute("qname", map.get("qname"));
+ baseDocument.addAttribute("record", map.get("record"));
+ baseDocument.addAttribute("last_found_time", map.get("start_time"));
+ return baseDocument;
+ } catch (Exception e) {
+ logger.error("dns record type 类型转换错误: {}", e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
new file mode 100644
index 0000000..c438a14
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
@@ -0,0 +1,43 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.convert.Convert;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+
+/**
+ * 去重
+ */
+public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple3<String, String, String>, TimeWindow> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsGraphProcessFunction.class);
+
+ @Override
+ public void process(Tuple3<String, String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
+
+ try {
+ long tmpTime = 0L;
+ for (Map<String, Object> log : elements) {
+ long startTime = Convert.toLong(log.get("capure_time"));
+ tmpTime = startTime > tmpTime ? startTime : tmpTime;
+ }
+ Map newLog = new LinkedHashMap<>();
+ newLog.put("record_type", keys.f0);
+ newLog.put("qname", keys.f1);
+ newLog.put("record", keys.f2);
+ newLog.put("last_found_time", tmpTime);
+ out.collect(newLog);
+ logger.debug("获取中间聚合结果:{}", newLog.toString());
+ } catch (Exception e) {
+ logger.error("获取中间聚合结果失败,middleResult: {}", e);
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsMapFunction.java
new file mode 100644
index 0000000..d5926fe
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsMapFunction.java
@@ -0,0 +1,79 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONUtil;
+import com.google.common.base.Joiner;
+import com.zdjizhi.enums.DnsType;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * @author zhq
+ * desc 将dns数据response拆分
+ */
+public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsMapFunction.class);
+
+ @Override
+ public Map<String, Object> map(Map<String, Object> rawLog) throws Exception {
+ try {
+ Object response = rawLog.get("response");
+ JSONArray responseArray = JSONUtil.parseArray(response);
+ String dnsA = null;
+ int dnsANum = 0;
+ String dnsAAAA = null;
+ int dnsAAAANum = 0;
+ String dnsCNAME = null;
+ int dnsCNAMENum = 0;
+ String dnsNs = null;
+ int dnsNsNum = 0;
+ String dnsMx = null;
+ int dnsMxNum = 0;
+ for (Object res : responseArray) {
+ Map<String, String> resMap = (Map<String, String>) res;
+ String type = resMap.get("res_type");
+ String body = resMap.get("res_body");
+ if (DnsType.A.getCode().equals(type)) {
+ dnsA = Joiner.on(",").skipNulls().join(dnsA, body);
+ dnsANum++;
+ } else if (DnsType.AAAA.getCode().equals(type)) {
+ dnsAAAA = Joiner.on(",").skipNulls().join(dnsAAAA, body);
+ dnsAAAANum++;
+ } else if (DnsType.CNAME.getCode().equals(type)) {
+ dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body);
+ dnsCNAMENum++;
+ } else if (DnsType.CNAME.getCode().equals(type)) {
+ dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body);
+ dnsNsNum++;
+ } else if (DnsType.MX.getCode().equals(type)) {
+ dnsMx = Joiner.on(",").skipNulls().join(dnsMx, body);
+ dnsMxNum++;
+ }
+ }
+ //获取类型,相同类型合并用,拼接,并且计数加1
+ rawLog.put("dns_a", dnsA);
+ rawLog.put("dns_a_num", dnsANum);
+
+ rawLog.put("dns_aaaa", dnsAAAA);
+ rawLog.put("dns_aaaa_num", dnsAAAANum);
+
+ rawLog.put("dns_cname", dnsCNAME);
+ rawLog.put("dns_cname_num", dnsCNAMENum);
+
+ rawLog.put("dns_ns", dnsNs);
+ rawLog.put("dns_ns_num", dnsNsNum);
+
+ rawLog.put("dns_mx", dnsMx);
+ rawLog.put("dns_mx_num", dnsMxNum);
+ } catch (Exception e) {
+ logger.error("dns 原始日志拆分 response 失败 {}", e.getMessage());
+ }
+
+ return rawLog;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
index c9bc596..e101afe 100644
--- a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
@@ -19,20 +19,11 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
/**
* @author 94976
*/
+@Deprecated
public class DnsProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(DnsProcessFunction.class);
- @Override
- public void process(String keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
-
- try {
- getMiddleResult(out, elements);
- } catch (Exception e) {
- logger.error("获取中间聚合结果失败,middleResult: {}", e);
- }
- }
-
/**
* 拆分dns_record
* 五种:a/aaaa/cname/mx/ns
@@ -40,49 +31,51 @@ public class DnsProcessFunction extends ProcessWindowFunction<Map<String, Object
* @param elements
* @return
*/
- private void getMiddleResult(Collector<Map<String, Object>> out, Iterable<Map<String, Object>> elements) {
- long startTime = System.currentTimeMillis() / 1000;
- long endTime = System.currentTimeMillis() / 1000;
- String dnsQname = "";
- try {
- Map<String, Long> distinctA = new HashMap<>();
- Map<String, Long> distinctAAAA = new HashMap<>();
- Map<String, Long> distinctCname = new HashMap<>();
- Map<String, Long> distinctNs = new HashMap<>();
- Map<String, Long> distinctMx = new HashMap<>();
- for (Map<String, Object> log : elements) {
- List<String> dnsA = splitDns(log, "dns_a");
- List<String> dnsAAAA = splitDns(log, "dns_aaaa");
- List<String> dnsCname = splitDns(log, "dns_cname");
- List<String> dnsNs = splitDns(log, "dns_ns");
- List<String> dnsMx = splitDns(log, "dns_mx");
-
- dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum));
- dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum));
- dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum));
- dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum));
- dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum));
+ @Override
+ public void process(String keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
- long connStartTimetime = Convert.toLong(log.get("capure_time_s"));
- startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
- endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
- dnsQname = StrUtil.toString(log.get("dns_qname"));
+ try {
+ long startTime = System.currentTimeMillis() / 1000;
+ long endTime = System.currentTimeMillis() / 1000;
+ try {
+ Map<String, Long> distinctA = new HashMap<>();
+ Map<String, Long> distinctAAAA = new HashMap<>();
+ Map<String, Long> distinctCname = new HashMap<>();
+ Map<String, Long> distinctNs = new HashMap<>();
+ Map<String, Long> distinctMx = new HashMap<>();
+ for (Map<String, Object> log : elements) {
+ List<String> dnsA = splitDns(log, "dns_a");
+ List<String> dnsAAAA = splitDns(log, "dns_aaaa");
+ List<String> dnsCname = splitDns(log, "dns_cname");
+ List<String> dnsNs = splitDns(log, "dns_ns");
+ List<String> dnsMx = splitDns(log, "dns_mx");
+
+ dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum));
+ dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum));
+ dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum));
+ dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum));
+ dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum));
+
+ long connStartTimetime = Convert.toLong(log.get("capture_time"));
+ startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
+ endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
+ }
+ getNewDns(startTime, endTime, keys, distinctA, DnsType.A.getType(), out);
+ getNewDns(startTime, endTime, keys, distinctAAAA, DnsType.AAAA.getType(), out);
+ getNewDns(startTime, endTime, keys, distinctCname, DnsType.CNAME.getType(), out);
+ getNewDns(startTime, endTime, keys, distinctNs, DnsType.NS.getType(), out);
+ getNewDns(startTime, endTime, keys, distinctMx, DnsType.MX.getType(), out);
+
+ } catch (Exception e) {
+ logger.error("聚合中间结果集失败 {}", e);
}
- getNewDns(startTime, endTime, dnsQname, distinctA, DnsType.a.toString(), out);
- getNewDns(startTime, endTime, dnsQname, distinctAAAA, DnsType.aaaa.toString(), out);
- getNewDns(startTime, endTime, dnsQname, distinctCname, DnsType.cname.toString(), out);
- getNewDns(startTime, endTime, dnsQname, distinctNs, DnsType.ns.toString(), out);
- getNewDns(startTime, endTime, dnsQname, distinctMx, DnsType.mx.toString(), out);
-
} catch (Exception e) {
- logger.error("聚合中间结果集失败 {}", e);
+ logger.error("获取中间聚合结果失败,middleResult: {}", e);
}
}
-
private static List<String> splitDns(Map<String, Object> log, String key) {
-
- return StrUtil.split(StrUtil.toString(log.get(key)), ",");
+ return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA);
}
private void getNewDns(long startTime, long endTime, String dnsQname, Map<String, Long> distinctMap, String type, Collector<Map<String, Object>> out) {
diff --git a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java
new file mode 100644
index 0000000..104d2d7
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java
@@ -0,0 +1,58 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.core.date.DateUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
+
+
+/**
+ * @author 94976
+ */
+public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple3<String, String, String>, TimeWindow> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsRelationProcessFunction.class);
+
+ /**
+ * 拆分dns_record
+ * 聚合统计
+ * 五种:a/aaaa/cname/mx/ns
+ *
+ * @param elements
+ * @return
+ */
+ @Override
+ public void process(Tuple3<String, String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
+
+ try {
+ long sessions = 0L;
+ long startTime = DateUtil.currentSeconds();
+ long endTime = DateUtil.currentSeconds();
+ for (Map<String, Object> log : elements) {
+ sessions++;
+ long logStartTime = Convert.toLong(log.get("start_time"));
+ startTime = logStartTime < startTime ? logStartTime : startTime;
+ endTime = logStartTime > endTime ? logStartTime : endTime;
+ }
+ Map<String, Object> newDns = new LinkedHashMap<>();
+ newDns.put("start_time", startTime);
+ newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
+ newDns.put("record_type", keys.f0);
+ newDns.put("qname", keys.f1);
+ newDns.put("record", keys.f2);
+ newDns.put("sessions", sessions);
+ out.collect(newDns);
+ } catch (Exception e) {
+ logger.error("dns relation 日志聚合失败: {}", e);
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
new file mode 100644
index 0000000..725978e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
@@ -0,0 +1,72 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import com.zdjizhi.enums.DnsType;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author zhq
+ */
+public class DnsSplitFlatMapFunction extends RichFlatMapFunction<Map<String, Object>, Map<String, Object>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsSplitFlatMapFunction.class);
+
+ /**
+ * 拆分dns_record
+ * 五种:a/aaaa/cname/mx/ns
+ *
+ * @return
+ */
+
+ @Override
+ public void flatMap(Map<String, Object> log, Collector<Map<String, Object>> out) {
+
+ try {
+ List<String> dnsA = splitDns(log, "dns_a");
+ List<String> dnsAAAA = splitDns(log, "dns_aaaa");
+ List<String> dnsCname = splitDns(log, "dns_cname");
+ List<String> dnsNs = splitDns(log, "dns_ns");
+ List<String> dnsMx = splitDns(log, "dns_mx");
+ String startTime = StrUtil.toString(log.get("capture_time"));
+ Object qname = log.get("qname");
+
+ getNewDns(qname, startTime, DnsType.A.getType(), dnsA, out);
+ getNewDns(qname, startTime, DnsType.AAAA.getType(), dnsAAAA, out);
+ getNewDns(qname, startTime, DnsType.CNAME.getType(), dnsCname, out);
+ getNewDns(qname, startTime, DnsType.NS.getType(), dnsNs, out);
+ getNewDns(qname, startTime, DnsType.MX.getType(), dnsMx, out);
+
+ } catch (Exception e) {
+ logger.error("dns 原始日志拆分错: {}", e);
+ }
+
+ }
+
+ private void getNewDns(Object qname, String startTime, String type, List<String> dnsList, Collector<Map<String, Object>> out) throws Exception {
+ if (ObjectUtil.isNotEmpty(dnsList)) {
+ for (String record : dnsList) {
+ Map<String, Object> newDns = new LinkedHashMap<>();
+ newDns.put("start_time", startTime);
+ newDns.put("record_type", type);
+ newDns.put("qname", qname);
+ newDns.put("record", record);
+ out.collect(newDns);
+ }
+ }
+ }
+
+ private static List<String> splitDns(Map<String, Object> log, String key) {
+
+ return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/pojo/DbLogEntity.java b/src/main/java/com/zdjizhi/pojo/DbLogEntity.java
deleted file mode 100644
index b89f1db..0000000
--- a/src/main/java/com/zdjizhi/pojo/DbLogEntity.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.zdjizhi.pojo;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * @description:
- * @author: zhq
- * @create: 2022-07-05
- **/
-public class DbLogEntity implements Serializable {
-
- private String tableName;
- private Map<String, Object> data;
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public Map<String, Object> getData() {
- return data;
- }
-
- public void setData(Map<String, Object> data) {
- this.data = data;
- }
-
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- DbLogEntity that = (DbLogEntity) o;
- return Objects.equals(tableName, that.tableName) &&
- Objects.equals(data, that.data);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tableName, data);
- }
-}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index b372f9e..f28d79a 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -1,17 +1,15 @@
package com.zdjizhi.topology;
import cn.hutool.core.convert.Convert;
+import cn.hutool.core.util.ObjectUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.arangodb.entity.BaseDocument;
-import com.zdjizhi.common.DnsKeysSelector;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.common.IpKeysSelector;
-import com.zdjizhi.etl.ConnProcessFunction;
-import com.zdjizhi.etl.Ip2IpGraphProcessFunction;
-import com.zdjizhi.etl.DnsProcessFunction;
-import com.zdjizhi.etl.SketchProcessFunction;
+import com.zdjizhi.common.*;
+import com.zdjizhi.enums.DnsType;
+import com.zdjizhi.etl.*;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
+import com.zdjizhi.utils.ck.ClickhouseSingleSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -61,6 +59,7 @@ public class LogFlowWriteTopology {
.process(new ConnProcessFunction())
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+ connTransformStream.print();
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
@@ -68,7 +67,7 @@ public class LogFlowWriteTopology {
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction())
- .filter(x -> Objects.nonNull(x))
+ .filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//入Arangodb
@@ -76,16 +75,18 @@ public class LogFlowWriteTopology {
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
- .filter(x -> Objects.nonNull(x))
+ .filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入CKsink,批量处理
- connSource.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
- sketchSource.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
- connTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
- sketchTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+ connSource.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
+ connTransformStream.print();
+ connTransformStream.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+ sketchSource.keyBy(new SketchKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_SKETCH)).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
+ connTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+ sketchTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
-// ip2ipGraph.addSink(new ArangoDBSink("R_VISIT_IP2IP"));
+ ip2ipGraph.keyBy("key").process(new ArangoDelayProcess(R_VISIT_IP2IP)).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
@@ -93,39 +94,57 @@ public class LogFlowWriteTopology {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
.filter(Objects::nonNull)
+ .map(new DnsMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS))
- .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capure_time_s")) * 1000))
- .keyBy(new DnsKeysSelector())
+ .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
+ .flatMap(new DnsSplitFlatMapFunction())
+ .keyBy(new DnsGraphKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
- .process(new DnsProcessFunction())
- .filter(x -> Objects.nonNull(x))
+ .process(new DnsRelationProcessFunction())
+ .filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
- //过滤空数据不发送到Kafka内
+ //dns 原始日志 ck入库
dnsSource.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("FilterOriginalData")
+ .keyBy(new DnsTimeKeysSelector())
+ .process(new CKDelayProcess(SINK_CK_TABLE_DNS))
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
- dnsTransform.filter(Objects::nonNull)
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("FilterOriginalData")
+ //dns 拆分后relation日志 ck入库
+ dnsTransform.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_DNS))
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(SINK_PARALLELISM)
.name("CKSink");
- }
+ //arango 入库,按record_type分组入不同的表
+ DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
+ .process(new DnsGraphProcessFunction())
+ .setParallelism(SINK_PARALLELISM)
+ .filter(Objects::nonNull);
+
+ for (DnsType dnsEnum : DnsType.values()) {
+ dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
+ .keyBy(new StartTimeKeysSelector())
+ .map(new DnsGraphMapFunction())
+ .process(new ArangoDelayProcess(dnsEnum.getSink()))
+ .addSink(new ArangoDBSink(dnsEnum.getSink()))
+ .setParallelism(SINK_PARALLELISM)
+ .name("ArangodbSink");
+ }
+
+ }
env.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is : {}", e);
- e.printStackTrace();
}
}
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSink.java b/src/main/java/com/zdjizhi/utils/ck/CKSink.java
deleted file mode 100644
index 99a579a..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/CKSink.java
+++ /dev/null
@@ -1,165 +0,0 @@
-package com.zdjizhi.utils.ck;
-
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import ru.yandex.clickhouse.ClickHouseConnection;
-import ru.yandex.clickhouse.ClickHouseDataSource;
-import ru.yandex.clickhouse.settings.ClickHouseProperties;
-
-import java.sql.PreparedStatement;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class CKSink extends RichSinkFunction<Map<String, Object>> {
-
- private static final Log log = LogFactory.get();
-
- private static int count = 1;
- private static ClickHouseConnection connection = null;
- private static PreparedStatement preparedStatement = null;
-
- static String database = "default";
- static String address = "jdbc:clickhouse://192.168.45.102:8123/"+database;
- static String username = "default";
- static String password = "galaxy2019";
- static String fieldStr = "id,name,age";
- static String tableName = "user_table";
-
- private String insertSql;
-
- //创建连接对象和会话
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- try {
- connection = getConn();
- log.info("get clickhouse connection success !");
- String insertSql = preparedSql(fieldStr, tableName);
- connection.setAutoCommit(false);
- preparedStatement = connection.prepareStatement(insertSql);
- } catch (Exception e) {
- log.error("clickhouse初始化连接报错:", e);
- }
- }
-
-// @Override
-// public void close() throws Exception {
-// super.close();
-// //关闭连接和释放资源
-// if (connection != null) {
-// connection.close();
-// }
-// if (preparedStatement != null) {
-// preparedStatement.close();
-// }
-// }
-
- //使用Batch批量写入,关闭自动提交
- @Override
- public void invoke(Map<String, Object> data, Context context) {
- log.info(" invoke methed ");
-
- try {
-
- LinkedList<Object> values = new LinkedList<>(data.values());
- for (int i = 1; i <= values.size(); i++) {
- Object val = values.get(i - 1);
- if (val instanceof Long) {
- preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
- } else if (val instanceof Integer) {
- preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
- } else if (val instanceof Boolean) {
- preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
- } else {
- preparedStatement.setString((i), StrUtil.toString(val));
- }
- }
-
- preparedStatement.addBatch();
- count = count + 1;
- try {
-// if (count >= 50000) {
-// preparedStatement.executeBatch();
-// connection.commit();
-// preparedStatement.clearBatch();
-// count = 1;
-// }
-
- //1w提交一次
-// if (count % 10000 == 0) {
-// preparedStatement.executeBatch();
-// connection.commit();
-// preparedStatement.clearBatch();
-// }
- preparedStatement.executeBatch();
- connection.commit();
-
- } catch (Exception ee) {
- log.error("数据插入click house 报错:", ee);
- }
- } catch (Exception ex) {
- log.error("ClickhouseSink插入报错====", ex);
- }
- }
-
- public static ClickHouseConnection getConn() {
-
- int socketTimeout = 600000;
- ClickHouseProperties properties = new ClickHouseProperties();
- properties.setUser(username);
- properties.setPassword(password);
- properties.setDatabase(database);
- properties.setSocketTimeout(socketTimeout);
- ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties);
- ClickHouseConnection conn = null;
- try {
- conn = clickHouseDataSource.getConnection();
- return conn;
- } catch (Exception e) {
- log.error(e.getMessage());
- e.printStackTrace();
- }
- return null;
- }
-
- public static Map getField() {
-
- return null;
- }
-
-
- public String preparedSql(String fieldStr, String tableName) {
- List<String> fields = StrUtil.split(fieldStr, ",");
- return getInsertSql(fields, tableName);
- }
-
- public String getInsertSql(List<String> fileds, String tableName) {
- String sql = "";
- String sqlStr1 = "INSERT INTO `" + database + "`." + tableName + " (";
- String sqlStr2 = ") VALUES (";
- String sqlStr3 = ")";
- String sqlKey = "";
- String sqlValue = "";
- for (String key : fileds) {
- sqlKey += key + ",";
- sqlValue += "?,";
- }
- sqlKey = sqlKey.substring(0, sqlKey.length() - 1);
- sqlValue = sqlValue.substring(0, sqlValue.length() - 1);
- sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3);
-
-// String placeholders = Arrays.stream(fieldNames)
-// .map(f -> "?")
-// .collect(Collectors.joining(", "));
-// return "INSERT INTO " + quoteIdentifier(tableName) +
-// "(" + columns + ")" + " VALUES (" + placeholders + ")";
-
-
- log.info(sql);
- return sql;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java
deleted file mode 100644
index 55c99c4..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package com.zdjizhi.utils.ck;
-
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import ru.yandex.clickhouse.ClickHouseConnection;
-import ru.yandex.clickhouse.ClickHouseDataSource;
-import ru.yandex.clickhouse.settings.ClickHouseProperties;
-
-import java.sql.PreparedStatement;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class CKSinkFlatMap extends RichFlatMapFunction<Map<String, Object>, String> {
-
- private static final Log log = LogFactory.get();
-
- private static int count = 1;
- private static ClickHouseConnection connection = null;
- private static PreparedStatement preparedStatement = null;
-
- static String address = "jdbc:clickhouse://192.168.45.102:8123";
- static String database = "default";
- static String username = "default";
- static String password = "galaxy2019";
- static String fieldStr = "id,name,age";
- static String tableName = "user_table";
-
- private String insertSql;
-
- //创建连接对象和会话
- @Override
- public void open(Configuration parameters) {
- try {
- connection = getConn();
- log.info("get clickhouse connection success !");
- } catch (Exception e) {
- log.error("clickhouse初始化连接报错:", e);
- }
- }
-
- //使用Batch批量写入,关闭自动提交
- @Override
- public void flatMap(Map<String, Object> data, Collector<String> collector) {
-
- try {
- String insertSql = preparedSql(fieldStr, tableName);
- connection.setAutoCommit(false);
- preparedStatement = connection.prepareStatement(insertSql);
-
- LinkedList<Object> values = new LinkedList<>(data.values());
- for (int i = 1; i <= values.size(); i++) {
- Object val = values.get(i - 1);
- if (val instanceof Long) {
- preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
- } else if (val instanceof Integer) {
- preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
- } else if (val instanceof Boolean) {
- preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
- } else {
- preparedStatement.setString((i), StrUtil.toString(val));
- }
- }
-
- preparedStatement.addBatch();
- count = count + 1;
- try {
-// if (count >= 50000) {
-// preparedStatement.executeBatch();
-// connection.commit();
-// preparedStatement.clearBatch();
-// count = 1;
-// }
-
- //1w提交一次
- if (count % 10000 == 0) {
- preparedStatement.executeBatch();
- connection.commit();
- preparedStatement.clearBatch();
- }
- preparedStatement.executeBatch();
- connection.commit();
-
- } catch (Exception ee) {
- log.error("数据插入click house 报错:", ee);
- }
- } catch (Exception ex) {
- log.error("ClickhouseSink插入报错====", ex);
- }
- }
-
- public static ClickHouseConnection getConn() {
-
- int socketTimeout = 600000;
- ClickHouseProperties properties = new ClickHouseProperties();
- properties.setUser(username);
- properties.setPassword(password);
- properties.setDatabase(database);
- properties.setSocketTimeout(socketTimeout);
- ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties);
- ClickHouseConnection conn = null;
- try {
- conn = clickHouseDataSource.getConnection();
- return conn;
- } catch (Exception e) {
- log.error(e.getMessage());
- e.printStackTrace();
- }
- return null;
- }
-
- public static Map getField() {
-
- return null;
- }
-
-
- public String preparedSql(String fieldStr, String tableName) {
- List<String> fields = StrUtil.split(fieldStr, ",");
- return getInsertSql(fields, database + "." + tableName);
- }
-
- public String getInsertSql(List<String> fileds, String tableName) {
- String sql = "";
- String sqlStr1 = "INSERT INTO " + tableName + " (";
- String sqlStr2 = ") VALUES (";
- String sqlStr3 = ")";
- String sqlKey = "";
- String sqlValue = "";
- for (String key : fileds) {
- sqlKey += key + ",";
- sqlValue += "?,";
- }
- sqlKey = sqlKey.substring(0, sqlKey.length() - 1);
- sqlValue = sqlValue.substring(0, sqlValue.length() - 1);
- sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3);
-
-// String placeholders = Arrays.stream(fieldNames)
-// .map(f -> "?")
-// .collect(Collectors.joining(", "));
-// return "INSERT INTO " + quoteIdentifier(tableName) +
-// "(" + columns + ")" + " VALUES (" + placeholders + ")";
-
- log.info(sql);
- return sql;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java
deleted file mode 100644
index 963eef1..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-package com.zdjizhi.utils.ck;
-
-import java.util.Optional;
-
-*/
-/**
- * clickhouse方言
- *//*
-
-public class ClickHouseJDBCDialect implements JDBCDialect {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean canHandle(String url) {
- return url.startsWith("jdbc:clickhouse:");
- }
-
- @Override
- public Optional<String> defaultDriverName() {
- return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
- }
-
- @Override
- public String quoteIdentifier(String identifier) {
- return "`" + identifier + "`";
- }
-
- @Override
- public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
- return Optional.of(getInsertIntoStatement(tableName, fieldNames));
- }
-
- @Override
- public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
- return null;
- }
-
-}
-*/
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java
new file mode 100644
index 0000000..dc5cbe7
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java
@@ -0,0 +1,124 @@
+package com.zdjizhi.utils.ck;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+public class ClickhouseSingleSink extends RichSinkFunction<Map<String, Object>> {
+
+ private static final Log log = LogFactory.get();
+
+ private Connection connection;
+ private PreparedStatement preparedStatement;
+ public String sink;
+
+
+ public ClickhouseSingleSink(String sink) {
+ this.sink = sink;
+ }
+
+ public String getSink() {
+ return sink;
+ }
+
+ public void setSink(String sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public void invoke(Map<String, Object> logs, Context context) throws Exception {
+ executeInsert(logs, getSink());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ try {
+ Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
+ connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
+// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
+// connection = dataSource.getConnection();
+
+ log.info("get clickhouse connection success");
+ } catch (ClassNotFoundException | SQLException e) {
+ log.error("clickhouse connection error ,{}", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ IoUtil.close(preparedStatement);
+ IoUtil.close(connection);
+ }
+
+ public void executeInsert(Map<String, Object> data, String tableName) {
+
+ try {
+ List<String> keys = new LinkedList<>(data.keySet());
+ connection.setAutoCommit(false);
+ preparedStatement = connection.prepareStatement(preparedSql(keys, tableName));
+ int count = 0;
+ List<Object> values = new LinkedList<>(data.values());
+ for (int i = 1; i <= values.size(); i++) {
+ Object val = values.get(i - 1);
+ if (val instanceof Long) {
+ preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Integer) {
+ preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Boolean) {
+ preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
+ } else {
+ preparedStatement.setString((i), StrUtil.toString(val));
+ }
+ }
+ preparedStatement.addBatch();
+ count++;
+ //1w提交一次
+ if (count % SINK_BATCH == 0) {
+ preparedStatement.executeBatch();
+ connection.commit();
+ preparedStatement.clearBatch();
+ count = 0;
+ }
+ if (count > 0) {
+ preparedStatement.executeBatch();
+ connection.commit();
+ }
+
+ } catch (Exception ex) {
+ log.error("ClickhouseSink插入报错", ex);
+ }
+ }
+
+
+ public static String preparedSql(List<String> fields, String tableName) {
+
+ String placeholders = fields.stream()
+ .filter(Objects::nonNull)
+ .map(f -> "?")
+ .collect(Collectors.joining(", "));
+ String columns = fields.stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.joining(", "));
+ String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
+ "(", columns, ") VALUES (", placeholders, ")");
+ log.debug(sql);
+ return sql;
+ }
+
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
index 80a1d0c..98afe4c 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -1,5 +1,6 @@
package com.zdjizhi.utils.ck;
+import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -18,7 +19,7 @@ import java.util.stream.Collectors;
import static com.zdjizhi.common.FlowWriteConfig.*;
-public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
+public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>> {
private static final Log log = LogFactory.get();
@@ -27,7 +28,15 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
public String sink;
static {
-
+ try {
+ Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
+ connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
+// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
+// connection = dataSource.getConnection();
+ log.info("get clickhouse connection success");
+ } catch (ClassNotFoundException | SQLException e) {
+ log.error("clickhouse connection error ,{}", e);
+ }
}
public ClickhouseSink(String sink) {
@@ -43,21 +52,13 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
}
@Override
- public void invoke(Map<String,Object> log, Context context) throws Exception {
- executeInsert(log, getSink());
+ public void invoke(List<Map<String, Object>> logs, Context context) throws Exception {
+ executeInsert(logs, getSink());
}
@Override
public void open(Configuration parameters) throws Exception {
- try {
- Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
- connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
-// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
-// connection = dataSource.getConnection();
- log.info("get clickhouse connection success");
- } catch (ClassNotFoundException | SQLException e) {
- log.error("clickhouse connection error ,{}", e);
- }
+
}
@Override
@@ -70,50 +71,52 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
}
}
- public void executeInsert(Map<String, Object> data, String tableName) {
+ public void executeInsert(List<Map<String, Object>> data, String tableName) {
try {
- int count = 1;
- List<String> keys = new LinkedList<>(data.keySet());
-
+ List<String> keys = new LinkedList<>(data.get(0).keySet());
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement(preparedSql(keys, tableName));
- List<Object> values = new LinkedList<>(data.values());
- for (int i = 1; i <= values.size(); i++) {
- Object val = values.get(i - 1);
- if (val instanceof Long) {
- preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
- } else if (val instanceof Integer) {
- preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val)));
- } else if (val instanceof Boolean) {
- preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
- } else {
- preparedStatement.setString((i), StrUtil.toString(val));
+ int count = 0;
+ for (Map<String, Object> map : data) {
+ List<Object> values = new LinkedList<>(map.values());
+ for (int i = 1; i <= values.size(); i++) {
+ Object val = values.get(i - 1);
+ if (val instanceof Long) {
+ preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Integer) {
+ preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Boolean) {
+ preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
+ } else {
+ preparedStatement.setString((i), StrUtil.toString(val));
+ }
}
- }
-
- preparedStatement.addBatch();
- count = count + 1;
- try {
+ preparedStatement.addBatch();
+ count++;
//1w提交一次
- if (count % 10000 == 0) {
+ if (count % SINK_BATCH == 0) {
preparedStatement.executeBatch();
connection.commit();
preparedStatement.clearBatch();
- count = 1;
+ count = 0;
}
+ }
+ if (count > 0) {
preparedStatement.executeBatch();
connection.commit();
- } catch (Exception ee) {
- log.error("数据插入clickhouse 报错:", ee);
}
+
} catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex);
+ } finally {
+ IoUtil.close(preparedStatement);
+ IoUtil.close(connection);
}
}
- public String preparedSql(List<String> fields, String tableName) {
+ public static String preparedSql(List<String> fields, String tableName) {
String placeholders = fields.stream()
.filter(Objects::nonNull)
@@ -124,7 +127,7 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
.collect(Collectors.joining(", "));
String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
"(", columns, ") VALUES (", placeholders, ")");
- log.info(sql);
+ log.debug(sql);
return sql;
}
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
index 5fc1894..9ebdeb5 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
@@ -1,56 +1,53 @@
-/*
package com.zdjizhi.utils.ck;
-import org.apache.flink.api.java.utils.ParameterTool;
-import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
-import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
-*/
/**
* @description:
* @author: zhq
- * @create: 2022-06-29
- **//*
-
+ * @create: 2022-07-10
+ **/
public class ClickhouseUtil {
+ private static final Log log = LogFactory.get();
- public static ParameterTool getGlobalPro() {
- Map<String, String> sinkPro = new HashMap<>();
- //sink Properties
- sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "sc.chproxy.bigdata.services.org:10000");
+ private static Connection connection;
- // ClickHouse 本地写账号
- sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "default");
- sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "galaxy2019");
- // sink common
- sinkPro.put(ClickHouseSinkConst.TIMEOUT_SEC, "10");
- sinkPro.put(ClickHouseSinkConst.NUM_WRITERS, "10");
- sinkPro.put(ClickHouseSinkConst.NUM_RETRIES, "3");
- sinkPro.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1000000");
- sinkPro.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
- sinkPro.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");//本地运行会在项目内生成名字为"d:"的文件夹,以存放运行失败明细记录
- // env - sinkPro
- ParameterTool parameters = ParameterTool.fromMap(sinkPro);
+ public static Connection getConnection() {
+ try {
+ Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
+// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
+ ClickHouseProperties properties = new ClickHouseProperties();
+ properties.setDatabase(CK_DATABASE);
+ properties.setUser(CK_USERNAME);
+ properties.setPassword(CK_PIN);
+ properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
+ properties.setSocketTimeout(CK_SOCKET_TIMEOUT);
+ BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
+ connection = dataSource.getConnection();
- return parameters;
-
+ log.info("get clickhouse connection success");
+ return connection;
+ } catch (ClassNotFoundException | SQLException e) {
+ log.error("clickhouse connection error ,{}", e);
+ }
+ return null;
}
- public static Properties getCKPro() {
- // ClickHouseSink - sinkPro
- Properties props = new Properties();
- props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "database_1564.ch_zjk_test_local");
- props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
- return props;
+ public static void close() {
+ IoUtil.close(connection);
}
-
}
-*/
diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java
deleted file mode 100644
index 9cd448a..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-package com.zdjizhi.utils.ck;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier;
-
-*/
-/**
- * Handle the SQL dialect of jdbc driver.
- *//*
-
-public interface JDBCDialect extends Serializable {
- default Optional<String> getUpsertStatement(
- String tableName, String[] fieldNames, String[] uniqueKeyFields) {
- return Optional.empty();
- }
- default String getInsertIntoStatement(String tableName, String[] fieldNames) {
- String columns = Arrays.stream(fieldNames)
- .map(this::quoteIdentifier)
- .collect(Collectors.joining(", "));
- String placeholders = Arrays.stream(fieldNames)
- .map(f -> "?")
- .collect(Collectors.joining(", "));
- return "INSERT INTO " + quoteIdentifier(tableName) +
- "(" + columns + ")" + " VALUES (" + placeholders + ")";
- }
-
- default String getDeleteStatement(String tableName, String[] conditionFields) {
- String conditionClause = Arrays.stream(conditionFields)
- .map(f -> quoteIdentifier(f) + "=?")
- .collect(Collectors.joining(" AND "));
- return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
- }
-}
-*/
diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java
deleted file mode 100644
index 9c951e4..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java
+++ /dev/null
@@ -1,14 +0,0 @@
-/*
-package com.zdjizhi.utils.ck;
-
-import java.util.Arrays;
-import java.util.List;
-
-public final class JDBCDialects {
-
- private static final List<JDBCDialect> DIALECTS = Arrays.asList(
-// new DerbyDialect(),
-// new MySQLDialect(),
-// new PostgresDialect()
- );
-}*/