summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java12
-rw-r--r--src/main/java/com/zdjizhi/etl/CKBatchWindow.java23
-rw-r--r--src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java131
-rw-r--r--src/main/java/com/zdjizhi/etl/LogService.java31
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java58
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java162
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java15
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java80
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java12
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java32
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java11
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java21
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java28
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsLogService.java25
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java15
-rw-r--r--src/main/java/com/zdjizhi/utils/arangodb/AGSink.java137
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKSink.java122
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKUtils.java26
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java208
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java106
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/json/TypeUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/TypeUtils.java)2
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java4
28 files changed, 368 insertions, 983 deletions
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 59c059e..7795d9e 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -137,4 +137,16 @@ public class FlowWriteConfig {
public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open");
public static final Integer AGGREGATE_MAX_VALUE_LENGTH = FlowWriteConfigurations.getIntProperty(0, "aggregate.max.value.length");
+
+ public static final Integer SINK_ARANGODB_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.raw.log.insert.open");
+
+
+ public static final Integer HIKARI_MINIMUM_IDLE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.minimum-idle");
+ public static final Integer HIKARI_MAXIMUM_POOL_SIZE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.maximum-pool-size");
+ public static final Long HIKARI_IDLE_TIMEOUT = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.idle-timeout");
+ public static final Long HIKARI_MAX_LIFETIME = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.max-lifetime");
+ public static final Integer HIKARI_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.connection-timeout");
+ public static final Integer CK_MAX_THREADS = FlowWriteConfigurations.getIntProperty(1, "ck.max.threads");
+ public static final Integer CK_SCHEDULE_ACTUALIZATION = FlowWriteConfigurations.getIntProperty(1, "ck.schedule.actualization");
+
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
deleted file mode 100644
index 947bad8..0000000
--- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.etl;
-
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<Map<String, Object>> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- batchLog.add(iterator.next());
- }
- out.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java
deleted file mode 100644
index 66b36a9..0000000
--- a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.zdjizhi.etl;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-
-/**
- *  * 带超时的计数窗口触发器
- */
-public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
- private static Log logger = LogFactory.get();
-
- /**
- * 窗口最大数据量
- */
- private int maxCount;
- /**
- * event time / process time
- */
- private TimeCharacteristic timeType;
-
- private String stateName;
-
- public String getStateName() {
- return stateName;
- }
-
- public void setStateName(String stateName) {
- this.stateName = stateName;
- }
-
- public CountTriggerWithTimeout(String stateName) {
- this.stateName = stateName;
- }
-
- /**
- * 用于储存窗口当前数据量的状态对象
- */
- private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE);
-
-
- public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) {
-
- this.maxCount = maxCount;
- this.timeType = timeType;
- this.stateName = stateName;
- }
-
- private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
- clear(window, ctx);
- return TriggerResult.FIRE_AND_PURGE;
- }
-
-
- @Override
- public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
- ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
- countState.add(1L);
-
-
- if (countState.get() >= maxCount) {
- logger.info("fire with count: " + countState.get());
- return fireAndPurge(window, ctx);
- }
- if (timestamp >= window.getEnd()) {
- logger.info("fire with tiem: " + timestamp);
- return fireAndPurge(window, ctx);
- } else {
- return TriggerResult.CONTINUE;
- }
- }
-
-
- @Override
- public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
- if (timeType != TimeCharacteristic.ProcessingTime) {
- return TriggerResult.CONTINUE;
- }
-
-
- if (time >= window.getEnd()) {
- return TriggerResult.CONTINUE;
- } else {
- logger.debug("fire with process tiem: " + time);
- return fireAndPurge(window, ctx);
- }
- }
-
-
- @Override
- public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
- if (timeType != TimeCharacteristic.EventTime) {
- return TriggerResult.CONTINUE;
- }
-
-
- if (time >= window.getEnd()) {
- return TriggerResult.CONTINUE;
- } else {
- logger.debug("fire with event tiem: " + time);
- return fireAndPurge(window, ctx);
- }
- }
-
-
- @Override
- public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
- ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
- countState.clear();
- }
-
- /**
- * 计数方法
- */
- class Sum implements ReduceFunction<Long> {
-
-
- @Override
- public Long reduce(Long value1, Long value2) throws Exception {
- return value1 + value2;
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java
index 052a8b3..de5141a 100644
--- a/src/main/java/com/zdjizhi/etl/LogService.java
+++ b/src/main/java/com/zdjizhi/etl/LogService.java
@@ -1,45 +1,16 @@
package com.zdjizhi.etl;
import cn.hutool.json.JSONUtil;
-import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
-import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.CKSink;
import com.zdjizhi.utils.kafka.KafkaProducer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
-import static com.zdjizhi.common.FlowWriteConfig.*;
+import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM;
public class LogService {
-
-// @Deprecated
-// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
-//
-// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
-// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
-// .apply(new CKBatchWindow())
-// .addSink(new ClickhouseSink(sink))
-// .setParallelism(SINK_PARALLELISM)
-// .name(sink)
-// .setParallelism(SINK_PARALLELISM);
-//
-// }
-
- public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
- sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new ArangodbBatchIPWindow())
- .addSink(new ArangoDBSink(sink))
- .setParallelism(SINK_PARALLELISM)
- .name(sink)
- .setParallelism(SINK_PARALLELISM);
- }
-
public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
sourceStream.map(JSONUtil::toJsonStr)
.setParallelism(SINK_PARALLELISM)
diff --git a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
deleted file mode 100644
index f5848c4..0000000
--- a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.etl.connection;
-
-import cn.hutool.core.util.StrUtil;
-import com.arangodb.entity.BaseEdgeDocument;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<BaseEdgeDocument> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- String srcIp = StrUtil.toString(next.get("src_ip"));
- String dstIp = StrUtil.toString(next.get("dst_ip"));
- BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
- baseEdgeDocument.setKey(String.join("-", srcIp, dstIp));
- baseEdgeDocument.setFrom("src_ip/" + srcIp);
- baseEdgeDocument.setTo("dst_ip/" + dstIp);
- baseEdgeDocument.addAttribute("src_ip", srcIp);
- baseEdgeDocument.addAttribute("dst_ip", dstIp);
- baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
-
- batchLog.add(baseEdgeDocument);
- }
- collector.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
index b0fff6b..9c5f8e0 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -2,14 +2,14 @@ package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import com.alibaba.fastjson.util.TypeUtils;
+import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.etl.LogService;
-import com.zdjizhi.etl.dns.SketchTimeMapFunction;
+import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
@@ -36,16 +36,21 @@ public class ConnLogService {
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
} else {
+ LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION);
+ LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH);
LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
}
- DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
+ if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) {
- //合并通联和通联sketch
- DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
+ DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
- //写入arangodb
- LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
+ //合并通联和通联sketch
+ DataStream<BaseEdgeDocument> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
+
+ //写入arangodb
+ ConnLogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
+ }
}
@@ -59,21 +64,24 @@ public class ConnLogService {
String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
- SingleOutputStreamOperator<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
+ DataStream<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
.filter(x -> {
if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) {
return false;
}
if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) {
- if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
- String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
+ if (TypeUtils.castToLong(x.get("total_cs_pkts")) < 0 || TypeUtils.castToLong(x.get("total_cs_pkts")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("total_sc_pkts")) < 0 || TypeUtils.castToLong(x.get("total_sc_pkts")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("total_cs_bytes")) < 0 || TypeUtils.castToLong(x.get("total_cs_bytes")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("total_sc_bytes")) < 0 || TypeUtils.castToLong(x.get("total_sc_bytes")) == Long.MAX_VALUE) {
return false;
}
return true;
} else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) {
- if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
- String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
+ if (TypeUtils.castToLong(x.get("sketch_sessions")) < 0 || TypeUtils.castToLong(x.get("sketch_sessions")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("sketch_packets")) < 0 || TypeUtils.castToLong(x.get("sketch_packets")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("sketch_bytes")) < 0 || TypeUtils.castToLong(x.get("sketch_bytes")) == Long.MAX_VALUE) {
return false;
}
return true;
@@ -96,8 +104,10 @@ public class ConnLogService {
}))
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
.setParallelism(TRANSFORM_PARALLELISM);
return connTransformStream;
}
@@ -107,18 +117,28 @@ public class ConnLogService {
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
- .process(new SketchProcessFunction());
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .process(new SketchProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
+ .setParallelism(TRANSFORM_PARALLELISM);
return sketchTransformStream;
}
- private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
- DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
+ private static DataStream<BaseEdgeDocument> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
+ DataStream<BaseEdgeDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return ip2ipGraph;
}
+ public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
+ sourceStream.addSink(new AGSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+ }
+
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java
deleted file mode 100644
index b727535..0000000
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java
+++ /dev/null
@@ -1,162 +0,0 @@
-//package com.zdjizhi.etl.connection;
-//
-//import cn.hutool.core.convert.Convert;
-//import cn.hutool.core.util.StrUtil;
-//import com.zdjizhi.etl.LogService;
-//import com.zdjizhi.etl.dns.SketchTimeMapFunction;
-//import com.zdjizhi.utils.kafka.KafkaConsumer;
-//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-//import org.apache.flink.api.common.functions.MapFunction;
-//import org.apache.flink.api.java.utils.ParameterTool;
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-//import org.apache.flink.streaming.api.windowing.time.Time;
-//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
-//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
-//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
-//
-//import java.time.Duration;
-//import java.util.*;
-//
-//import static com.zdjizhi.common.FlowWriteConfig.*;
-//
-//
-//public class ConnLogService2 {
-//
-// public static String convertToCsv(Map<String, Object> map) {
-// List<Object> list = new ArrayList<>();
-// list.add("(");
-// for (Map.Entry<String, Object> m : map.entrySet()) {
-// if (m.getValue() instanceof String) {
-// list.add("'" + m.getValue() + "'");
-// } else {
-// list.add(m.getValue());
-// }
-// }
-// list.add(")");
-// String join = StrUtil.join(",", list);
-// return join;
-//
-// }
-//
-// public static void connLogStream(StreamExecutionEnvironment env) throws Exception {
-// //connection
-// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
-// //sketch
-// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
-//
-// //写入CKsink,批量处理
-// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
-//
-// Map<String, String> globalParameters = new HashMap<>();
-// // ClickHouse cluster properties
-// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/");
-// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME);
-// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN);
-//
-// // sink common
-// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
-// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");
-// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
-// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
-// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2");
-// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
-//
-// // set global paramaters
-// ParameterTool parameters = ParameterTool.fromMap(globalParameters);
-// env.getConfig().setGlobalJobParameters(parameters);
-//
-// // Transform 操作
-// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() {
-// @Override
-// public String map(Map<String, Object> data) throws Exception {
-// String s = convertToCsv(data);
-// System.err.println(s);
-// return convertToCsv(data);
-// }
-// });
-//
-//
-// // create props for sink
-// Properties props = new Properties();
-// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH);
-// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH);
-// ClickHouseSink sink = new ClickHouseSink(props);
-// dataStream.addSink(sink);
-// dataStream.print();
-//
-//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
-//
-// //transform
-// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
-//
-// //写入ck通联relation表
-// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
-//
-// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
-//
-// //合并通联和通联sketch
-// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
-//
-// //写入arangodb
-// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
-//
-// }
-//
-// /**
-// * 通联原始日志数据源消费kafka
-// *
-// * @param source
-// * @return
-// */
-// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
-//
-// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
-//
-// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
-// .setParallelism(SOURCE_PARALLELISM)
-// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
-// .setParallelism(SOURCE_PARALLELISM)
-// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
-// .setParallelism(SOURCE_PARALLELISM)
-// .name(source)
-// .setParallelism(SOURCE_PARALLELISM);
-// return sourceStream;
-// }
-//
-// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
-// DataStream<Map<String, Object>> connTransformStream = connSource
-// .assignTimestampsAndWatermarks(WatermarkStrategy
-// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
-// .withTimestampAssigner((event, timestamp) -> {
-// return Convert.toLong(event.get("conn_start_time")) * 1000;
-// }))
-// .setParallelism(TRANSFORM_PARALLELISM)
-// .keyBy(new IpKeysSelector())
-// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
-// .process(new ConnProcessFunction())
-// .setParallelism(TRANSFORM_PARALLELISM);
-// return connTransformStream;
-// }
-//
-// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
-// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
-// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
-// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
-// .keyBy(new IpKeysSelector())
-// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
-// .process(new SketchProcessFunction());
-// return sketchTransformStream;
-// }
-//
-// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
-// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
-// .keyBy(new IpKeysSelector())
-// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
-// .process(new Ip2IpGraphProcessFunction())
-// .setParallelism(TRANSFORM_PARALLELISM);
-// return ip2ipGraph;
-// }
-//
-//}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
index 91cb47f..9a5245f 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
@@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -29,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
try {
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
if (values != null) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -53,16 +53,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
- long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
- if (connStartTimetime > 0) {
+ long connStartTime = Convert.toLong(newSketchLog.get("conn_start_time"));
+ if (connStartTime > 0) {
sessions++;
packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts"));
bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes"));
- startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
- endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
- packets = packets > Long.MAX_VALUE ? 0 : packets;
- bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
+ startTime = Math.min(connStartTime, startTime);
+ endTime = Math.max(connStartTime, endTime);
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
@@ -71,4 +69,5 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
}
return null;
}
+
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java
deleted file mode 100644
index e38517a..0000000
--- a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.zdjizhi.etl.connection;
-
-import cn.hutool.core.convert.Convert;
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.util.TypeUtils;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import java.util.Map;
-
-
-/**
- * @author 94976
- */
-public class ConnReduceFunction implements ReduceFunction<Map<String, Object>> {
-
- private static final Log logger = LogFactory.get();
-
- //
-// public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
-// try {
-// Tuple5<Long, Long, Long, Long, Long> values = sum(elements);
-// if (values != null) {
-// Map<String, Object> result = new HashMap<>();
-// result.put("start_time", values.f0);
-// result.put("end_time", values.f1);
-// result.put("src_ip", keys.f0);
-// result.put("dst_ip", keys.f1);
-// result.put("sessions", values.f2);
-// result.put("packets", values.f3);
-// result.put("bytes", values.f4);
-// out.collect(result);
-// logger.debug("获取中间聚合结果:{}", result.toString());
-// }
-// } catch (Exception e) {
-// logger.error("获取中间聚合结果失败,middleResult: {}", e);
-// }
-// }
-//
- private Tuple5<Long, Long, Long, Long, Long> sum(Map<String, Object> map1, Map<String, Object> map2) {
-
- try {
- long sessions = 0L;
- long packets = 0L;
- long bytes = 0L;
- long startTime = DateUtil.currentSeconds();
- long endTime = DateUtil.currentSeconds();
-
- long connStartTime1 = Convert.toLong(map1.get("conn_start_time"));
- long connStartTime2 = Convert.toLong(map2.get("conn_start_time"));
- if (connStartTime1 > 0 && connStartTime2 > 0) {
- sessions++;
- packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) +
- TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts"));
-
- bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) +
- TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes"));
-
- startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2;
- endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2;
-
- packets = packets > Long.MAX_VALUE ? 0 : packets;
- bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
-
- }
-
- } catch (Exception e) {
- logger.error("聚合中间结果集失败 {}", e);
- }
- return null;
- }
-
- @Override
- public Map<String, Object> reduce(Map<String, Object> map1, Map<String, Object> map2) throws Exception {
-
- return null;
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java
index e957d65..7becf90 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java
@@ -1,6 +1,7 @@
package com.zdjizhi.etl.connection;
-import com.zdjizhi.utils.json.TypeUtils;
+import com.alibaba.fastjson.util.TypeUtils;
+import com.zdjizhi.utils.json.TypeUtil;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
@@ -9,8 +10,13 @@ public class ConnTimeMapFunction implements MapFunction<Map<String, Object>, Map
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
- value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time")));
- value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time")));
+ value.put("conn_start_time", TypeUtil.coverMSToS(value.get("conn_start_time")));
+ value.put("log_gen_time", TypeUtil.coverMSToS(value.get("log_gen_time")));
+
+ value.put("total_cs_pkts", TypeUtils.castToLong(value.get("total_cs_pkts")));
+ value.put("total_sc_pkts", TypeUtils.castToLong(value.get("total_sc_pkts")));
+ value.put("total_cs_bytes", TypeUtils.castToLong(value.get("total_cs_bytes")));
+ value.put("total_sc_bytes", TypeUtils.castToLong(value.get("total_sc_bytes")));
return value;
}
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
index 1b1b0c9..9468e26 100644
--- a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
@@ -2,42 +2,48 @@ package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.HashUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
import java.util.Map;
/**
- * 对ip去重
+ * 处理时间,转为图数据
*/
-public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
+public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseEdgeDocument, Tuple2<String, String>, TimeWindow> {
private static final Log logger = LogFactory.get();
@Override
- public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
+ public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseEdgeDocument> out) {
try {
- long lastFoundTime = DateUtil.currentSeconds();;
+ long lastFoundTime = DateUtil.currentSeconds();
for (Map<String, Object> log : elements) {
long connStartTime = Convert.toLong(log.get("start_time"));
- lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime;
+ lastFoundTime = Math.max(connStartTime, lastFoundTime);
}
- Map<String, Object> newLog = new HashMap<>();
- newLog.put("src_ip", keys.f0);
- newLog.put("dst_ip", keys.f1);
- newLog.put("last_found_time", lastFoundTime);
- out.collect(newLog);
- logger.debug("获取中间聚合结果:{}", newLog.toString());
+
+ BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
+ baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(keys.f0 + keys.f1)));
+ baseEdgeDocument.setFrom("src_ip/" + keys.f0);
+ baseEdgeDocument.setTo("dst_ip/" + keys.f1);
+ baseEdgeDocument.addAttribute("src_ip", keys.f0);
+ baseEdgeDocument.addAttribute("dst_ip", keys.f1);
+ baseEdgeDocument.addAttribute("last_found_time", lastFoundTime);
+
+ out.collect(baseEdgeDocument);
+ logger.debug("获取中间聚合结果:{}", baseEdgeDocument.toString());
} catch (Exception e) {
- logger.error("获取中间聚合结果失败,middleResult: {}", e);
+ logger.error("获取中间聚合结果失败,middleResult: {}", e.getMessage());
}
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
index e2dd3f7..b27958a 100644
--- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
@@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -42,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
try {
if (values != null) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -72,12 +72,9 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions"));
packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets"));
bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes"));
- startTime = connStartTime < startTime ? connStartTime : startTime;
- endTime = connStartTime > endTime ? connStartTime : endTime;
- sessions = sessions > Long.MAX_VALUE ? 0 : sessions;
- packets = packets > Long.MAX_VALUE ? 0 : packets;
- bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
+ startTime = Math.min(connStartTime, startTime);
+ endTime = Math.max(connStartTime, endTime);
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java
new file mode 100644
index 0000000..ee0d6ae
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java
@@ -0,0 +1,21 @@
+package com.zdjizhi.etl.connection;
+
+import com.alibaba.fastjson.util.TypeUtils;
+import com.zdjizhi.utils.json.TypeUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.Map;
+
+public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
+
+ @Override
+ public Map<String, Object> map(Map<String, Object> value) throws Exception {
+ value.put("sketch_start_time", TypeUtil.coverMSToS(value.get("sketch_start_time")));
+
+ value.put("sketch_sessions", TypeUtils.castToLong(value.get("sketch_sessions")));
+ value.put("sketch_packets", TypeUtils.castToLong(value.get("sketch_packets")));
+ value.put("sketch_bytes", TypeUtils.castToLong(value.get("sketch_bytes")));
+ return value;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
deleted file mode 100644
index ada4b83..0000000
--- a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.etl.dns;
-
-import cn.hutool.core.util.StrUtil;
-import com.arangodb.entity.BaseEdgeDocument;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<BaseEdgeDocument> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- String qname = StrUtil.toString(next.get("qname"));
- String record = StrUtil.toString(next.get("record"));
- BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
- baseEdgeDocument.setKey(String.join("-", qname, record));
- baseEdgeDocument.setFrom("qname/" + qname);
- baseEdgeDocument.setTo("record/" + record);
- baseEdgeDocument.addAttribute("qname", qname);
- baseEdgeDocument.addAttribute("record", record);
- baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
-
- batchLog.add(baseEdgeDocument);
- }
- out.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java
new file mode 100644
index 0000000..a467d40
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java
@@ -0,0 +1,28 @@
+package com.zdjizhi.etl.dns;
+
+import cn.hutool.core.util.HashUtil;
+import cn.hutool.core.util.StrUtil;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.Map;
+
+public class DnsGraphMapFunction implements MapFunction<Map<String, Object>, BaseEdgeDocument> {
+
+ @Override
+ public BaseEdgeDocument map(Map<String, Object> value) throws Exception {
+
+ String qname = StrUtil.toString(value.get("qname"));
+ String record = StrUtil.toString(value.get("record"));
+
+ BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
+ baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(qname + record)));
+ baseEdgeDocument.setFrom("qname/" + qname);
+ baseEdgeDocument.setTo("record/" + record);
+ baseEdgeDocument.addAttribute("qname", qname);
+ baseEdgeDocument.addAttribute("record", record);
+ baseEdgeDocument.addAttribute("last_found_time", value.get("last_found_time"));
+
+ return baseEdgeDocument;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
index cde34c1..d8b3a36 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
@@ -28,7 +28,7 @@ public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, O
Long startTime = Convert.toLong(log.get("start_time"));
tmpTime = startTime > tmpTime ? startTime : tmpTime;
}
- Map newLog = new HashMap<>();
+ Map newLog = new LinkedHashMap<>();
newLog.put("record_type", keys.f0);
newLog.put("qname", keys.f1);
newLog.put("record", keys.f2);
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
index b4ab915..7c67d6e 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
@@ -2,13 +2,14 @@ package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
+import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.LogService;
+import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -32,20 +33,24 @@ public class DnsLogService {
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
} else {
- LogService.getLogCKSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
+ LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS);
+ LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
}
//arango 入库,按record_type分组入不同的表
- DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull)
+ DataStream<Map<String, Object>> dnsGraph = dnsTransform
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
- DataStream<Map<String, Object>> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
+ DataStream<BaseEdgeDocument> dnsRecordData = dnsGraph
+ .filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
+ .setParallelism(SINK_PARALLELISM)
+ .map(new DnsGraphMapFunction())
.setParallelism(SINK_PARALLELISM);
- LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink());
+ getLogArangoSink(dnsRecordData, dnsEnum.getSink());
}
}
@@ -64,6 +69,7 @@ public class DnsLogService {
private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
+ .setParallelism(SOURCE_PARALLELISM)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
@@ -71,10 +77,17 @@ public class DnsLogService {
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return dnsTransform;
}
+ public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
+ sourceStream.addSink(new AGSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+ }
+
}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
index 1da5a90..9607b00 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
@@ -6,7 +6,7 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.google.common.base.Joiner;
import com.zdjizhi.enums.DnsType;
-import com.zdjizhi.utils.json.TypeUtils;
+import com.zdjizhi.utils.json.TypeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
@@ -25,7 +25,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
@Override
public Map<String, Object> map(Map<String, Object> rawLog) throws Exception {
try {
- rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time")));
+ rawLog.put("capture_time", TypeUtil.coverMSToS(rawLog.get("capture_time")));
//qname ,record 转小写
rawLog.put("qname", StringUtils.lowerCase(StrUtil.toString(rawLog.get("qname"))));
if (Objects.nonNull(rawLog.get("response"))) {
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
index 8744060..894867f 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
@@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String
endTime = logStartTime > endTime ? logStartTime : endTime;
}
}
- Map<String, Object> newDns = new HashMap<>();
+ Map<String, Object> newDns = new LinkedHashMap<>();
newDns.put("start_time", startTime);
newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
newDns.put("record_type", keys.f0);
diff --git a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java
deleted file mode 100644
index cf10a75..0000000
--- a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.zdjizhi.etl.dns;
-
-import com.zdjizhi.utils.json.TypeUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.Map;
-
-public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
-
- @Override
- public Map<String, Object> map(Map<String, Object> value) throws Exception {
- value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time")));
- return value;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java
new file mode 100644
index 0000000..db4b207
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java
@@ -0,0 +1,137 @@
+package com.zdjizhi.utils.arangodb;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.*;
+
+import static com.zdjizhi.common.FlowWriteConfig.ARANGODB_BATCH;
+import static com.zdjizhi.common.FlowWriteConfig.SINK_ARANGODB_BATCH_DELAY_TIME;
+
+public class AGSink extends RichSinkFunction<BaseEdgeDocument> {
+ private static final Log log = LogFactory.get();
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Log logger = LogFactory.get();
+
+ // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
+ private final List<BaseEdgeDocument> ipWithDataList;
+ // 满足此时间条件写出数据
+ private final long insertArangoTimeInterval = SINK_ARANGODB_BATCH_DELAY_TIME;
+ // 插入的批次
+ private final int insertArangoBatchSize = ARANGODB_BATCH; // 开发测试用10条
+ private static ArangoDBConnect arangoDBConnect;
+ private transient volatile boolean closed = false;
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
+ // 数据表名
+ private String sink;
+
+ public AGSink(String sink) {
+ this.sink = sink;
+ this.ipWithDataList = new CopyOnWriteArrayList<>();
+ }
+
+ public String getSink() {
+ return sink;
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ arangoDBConnect = ArangoDBConnect.getInstance();
+
+ if (insertArangoTimeInterval != 0 && insertArangoBatchSize != 1) {
+ this.scheduler = Executors.newScheduledThreadPool(
+ 1, new ExecutorThreadFactory("arangodb-upsert-output-format"));
+ this.scheduledFuture =
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized (AGSink.this) {
+ if (!closed) {
+ try {
+ logger.debug("arangodb_flush.............");
+ flush(ipWithDataList);
+ } catch (Exception e) {
+ log.error(e);
+ }
+ }
+ }
+ },
+ insertArangoTimeInterval,
+ insertArangoTimeInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ @Override
+ public final synchronized void invoke(BaseEdgeDocument row, Context context) throws IOException {
+ ipWithDataList.add(row);
+ if (ipWithDataList.size() >= this.insertArangoBatchSize) {
+ try {
+ flush(ipWithDataList);
+ } catch (SQLException e) {
+ logger.error("ck sink invoke flush failed.", e);
+ }
+ }
+
+ }
+
+ // 插入数据
+ private synchronized void flush(List<BaseEdgeDocument> data) throws SQLException {
+ if (data.size() > 0) {
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ log.debug("开始写入arangodb数据 :{}", data.size());
+ arangoDBConnect.overwrite(data, sink);
+ stopWatch.stop();
+ log.debug("总共花费时间 {} ms", stopWatch.getTime());
+ log.debug("写入arangodb表{},数据 {}", sink, data.size());
+ ipWithDataList.clear();
+ }
+ }
+
+ /**
+ * Executes prepared statement and closes all resources of this instance.
+ *
+ * @throws IOException Thrown, if the input could not be closed properly.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+ if (arangoDBConnect != null) {
+ try {
+ flush(ipWithDataList);
+ } catch (SQLException e) {
+ log.error("JDBC statement could not be closed: " + e.getMessage());
+ } finally {
+ try {
+ arangoDBConnect.clean();
+ } catch (Exception e) {
+ log.error("JDBC connection could not be closed: " + e.getMessage());
+ }
+ }
+ }
+
+
+ }
+ }
+}
+
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSink.java b/src/main/java/com/zdjizhi/utils/ck/CKSink.java
index 95ef69e..d074926 100644
--- a/src/main/java/com/zdjizhi/utils/ck/CKSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/CKSink.java
@@ -1,23 +1,6 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package com.zdjizhi.utils.ck;
+import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.enums.LogMetadata;
@@ -25,10 +8,10 @@ import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import ru.yandex.clickhouse.ClickHousePreparedStatement;
import java.io.IOException;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
@@ -44,20 +27,6 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
private static final long serialVersionUID = 1L;
private static final Log logger = LogFactory.get();
- private Connection connection;
- private ClickHousePreparedStatement preparedStatement = null;
- // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
- private final List<Map> ipWithDataList;
-
- // 满足此时间条件写出数据
- private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
- // 插入的批次
- private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
-
- private transient volatile boolean closed = false;
- private transient ScheduledExecutorService scheduler;
- private transient ScheduledFuture<?> scheduledFuture;
-
private static final Map<String, String[]> logMetadataFields = new HashMap<>();
private static final Map<String, String> logMetadataSql = new HashMap<>();
@@ -68,6 +37,17 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
}
}
+ // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
+ private final CopyOnWriteArrayList<Map> ipWithDataList;
+ // 满足此时间条件写出数据
+ private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
+ // 插入的批次
+ private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
+ private Connection connection;
+ private PreparedStatement preparedStatement = null;
+ private transient volatile boolean closed = false;
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
// 数据表名
private String sink;
@@ -88,19 +68,19 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
connection = CKUtils.getConnection();
String sql = logMetadataSql.get(sink);
log.debug(sql);
- preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
+ preparedStatement = connection.prepareStatement(sql);
if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) {
this.scheduler = Executors.newScheduledThreadPool(
- 1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+ 1, new ExecutorThreadFactory("ck-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (CKSink.this) {
if (!closed) {
try {
- logger.info("ck_flush.............");
- flushClose();
+ logger.debug("ck_flush.............");
+ flush(ipWithDataList);
} catch (Exception e) {
log.error(e);
}
@@ -115,7 +95,7 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
}
@Override
- public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
+ public synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
ipWithDataList.add(row);
/**
* 1. 将数据写入CK
@@ -123,9 +103,10 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
if (ipWithDataList.size() >= this.insertCkBatchSize) {
try {
flush(ipWithDataList);
- logger.info("insertCkBatchSize");
+
+ log.debug("ck sink invoke flush ");
} catch (SQLException e) {
- throw new RuntimeException("Preparation of JDBC statement failed.", e);
+ logger.error("ck sink invoke flush failed.", e);
}
}
@@ -134,35 +115,29 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
// 插入数据
private synchronized void flush(List<Map> data) throws SQLException {
if (data.size() > 0) {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- log.info("开始写入ck数据 :{}", data.size());
-
- connection.setAutoCommit(false);
+ log.debug("ck sink flush "+data.size());
+ try {
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ log.debug("开始写入ck数据 :{}", data.size());
+ String[] logFields = logMetadataFields.get(sink);
- String[] logFields = logMetadataFields.get(sink);
+ for (Map<String, Object> map : data) {
- for (Map<String, Object> map : data) {
- for (int i = 0; i < logFields.length; i++) {
- preparedStatement.setObject(i + 1, map.get(logFields[i]));
+ for (int i = 0; i < logFields.length; i++) {
+ preparedStatement.setObject(i + 1, map.get(logFields[i]));
+ }
+ preparedStatement.addBatch();
}
- preparedStatement.addBatch();
+ preparedStatement.executeBatch();
+ preparedStatement.clearBatch();
+ stopWatch.stop();
+ log.debug("总共花费时间 {} ms", stopWatch.getTime());
+ log.debug("写入ck表{},数据 {}", sink, data.size());
+ ipWithDataList.clear();
+ } catch (SQLException e) {
+ logger.error(e);
}
- preparedStatement.executeBatch();
- connection.commit();
- preparedStatement.clearBatch();
- stopWatch.stop();
- log.info("总共花费时间 {} ms", stopWatch.getTime());
- log.info("写入ck表{},数据 {}", sink, data.size());
- ipWithDataList.clear();
- }
- }
-
- private synchronized void flushClose() {
- try {
- flush(ipWithDataList);
- } catch (SQLException e) {
- log.error("Preparation of JDBC statement failed.", e);
}
}
@@ -179,23 +154,22 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
-
if (preparedStatement != null) {
- flushClose();
try {
- preparedStatement.close();
+ log.debug("ck sink close flush ");
+ flush(ipWithDataList);
} catch (SQLException e) {
log.error("JDBC statement could not be closed: " + e.getMessage());
} finally {
- preparedStatement = null;
+ try {
+ IoUtil.close(preparedStatement);
+ CKUtils.close(connection);
+ } catch (Exception e) {
+ log.error("JDBC connection could not be closed: " + e.getMessage());
+ }
}
}
- try {
- CKUtils.close(connection);
- } catch (Exception e) {
- log.error("JDBC connection could not be closed: " + e.getMessage());
- }
}
}
}
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKUtils.java b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java
index 7ff48d6..4edff5f 100644
--- a/src/main/java/com/zdjizhi/utils/ck/CKUtils.java
+++ b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java
@@ -3,6 +3,7 @@ package com.zdjizhi.utils.ck;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zaxxer.hikari.HikariDataSource;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
@@ -16,10 +17,8 @@ public class CKUtils {
private static final Log log = LogFactory.get();
- private static Connection connection;
-
public static Connection getConnection() {
-
+ Connection connection = null;
try {
ClickHouseProperties props = new ClickHouseProperties();
props.setDatabase(CK_DATABASE);
@@ -27,19 +26,20 @@ public class CKUtils {
props.setPassword(CK_PIN);
props.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
props.setSocketTimeout(CK_SOCKET_TIMEOUT);
- props.setMaxThreads(50);
+ props.setMaxThreads(CK_MAX_THREADS);
BalancedClickhouseDataSource blDataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, props);
blDataSource.actualize();
- blDataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
-
-// HikariConfig conf = new HikariConfig();
-// conf.setDataSource(blDataSource);
-// conf.setMinimumIdle(1);
-// conf.setMaximumPoolSize(20);
-//
-// HikariDataSource hkDs = new HikariDataSource(conf);
- connection = blDataSource.getConnection();
+ blDataSource.scheduleActualization(CK_SCHEDULE_ACTUALIZATION, TimeUnit.SECONDS);//开启检测
+
+ HikariDataSource hkDs = new HikariDataSource();
+ hkDs.setDataSource(blDataSource);
+ hkDs.setMinimumIdle(HIKARI_MINIMUM_IDLE);
+ hkDs.setMaximumPoolSize(HIKARI_MAXIMUM_POOL_SIZE);
+ hkDs.setMaxLifetime(HIKARI_MAX_LIFETIME);
+ hkDs.setIdleTimeout(HIKARI_IDLE_TIMEOUT);
+
+ connection = hkDs.getConnection();
log.debug("get clickhouse connection success");
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java
deleted file mode 100644
index 40b7db6..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.zdjizhi.utils.ck;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.enums.LogMetadata;
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import ru.yandex.clickhouse.ClickHousePreparedStatement;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH;
-import static com.zdjizhi.common.FlowWriteConfig.SINK_CK_BATCH_DELAY_TIME;
-
-public class ClickHouseJDBCOutput extends RichSinkFunction<Map<String, Object>> {
- private static final Log log = LogFactory.get();
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger logger = LoggerFactory.getLogger(ClickHouseJDBCOutput.class);
-
- private Connection connection;
- private ClickHousePreparedStatement preparedStatement = null;
- // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
- private final List<Map> ipWithDataList;
-
- // 满足此时间条件写出数据
- private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
- // 插入的批次
- private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
-
- private transient volatile boolean closed = false;
- private transient ScheduledExecutorService scheduler;
- private transient ScheduledFuture<?> scheduledFuture;
-
- private static final Map<String, String[]> logMetadataFields = new HashMap<>();
- private static final Map<String, String> logMetadataSql = new HashMap<>();
-
- static {
- for (LogMetadata value : LogMetadata.values()) {
- logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
- logMetadataFields.put(value.getSink(), value.getFields());
- }
- }
-
- // 数据表名
- private String sink;
-
- public ClickHouseJDBCOutput(String sink) {
- this.sink = sink;
- this.ipWithDataList = new CopyOnWriteArrayList<>();
- }
-
- public String getSink() {
- return sink;
- }
-
- /**
- * Connects to the target database and initializes the prepared statement.
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- connection = CKUtils.getConnection();
- String sql = logMetadataSql.get(sink);
- log.debug(sql);
- preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
-
- if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) {
- this.scheduler = Executors.newScheduledThreadPool(
- 1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
- this.scheduledFuture =
- this.scheduler.scheduleWithFixedDelay(
- () -> {
- synchronized (ClickHouseJDBCOutput.this) {
- if (!closed) {
- try {
- logger.info("ck_flush.............");
- flushClose();
- } catch (Exception e) {
-// flushException = e;
- log.error(e);
- }
- }
- }
- },
- insertCkTimeInterval,
- insertCkTimeInterval,
- TimeUnit.MILLISECONDS);
- }
-
- }
-
- @Override
- public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
- ipWithDataList.add(row);
- /**
- * 1. 将数据写入CK
- */
- if (ipWithDataList.size() >= this.insertCkBatchSize) {
- try {
- flush(ipWithDataList);
- logger.info("insertCkBatchSize");
- } catch (SQLException e) {
- throw new RuntimeException("Preparation of JDBC statement failed.", e);
- }
- }
-
- }
-
- // 插入数据
- private synchronized void flush(List<Map> data) throws SQLException {
- if (data.size() > 0) {
-// checkFlushException();
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- log.info("开始写入ck数据 :{}", data.size());
-
- connection.setAutoCommit(false);
-
- String[] logFields = logMetadataFields.get(sink);
-
- for (Map<String, Object> map : data) {
- for (int i = 0; i < logFields.length; i++) {
- preparedStatement.setObject(i + 1, map.get(logFields[i]));
- }
- preparedStatement.addBatch();
- }
- preparedStatement.executeBatch();
- connection.commit();
- preparedStatement.clearBatch();
- stopWatch.stop();
- log.info("总共花费时间 {} ms", stopWatch.getTime());
- log.info("写入ck表{},数据 {}", sink, data.size());
- ipWithDataList.clear();
- }
- }
-
- private synchronized void flushClose() {
- try {
- flush(ipWithDataList);
- } catch (SQLException e) {
- log.error("Preparation of JDBC statement failed.", e);
- }
- }
-
- /**
- * Executes prepared statement and closes all resources of this instance.
- *
- * @throws IOException Thrown, if the input could not be closed properly.
- */
- @Override
- public synchronized void close() throws IOException {
- if (!closed) {
- closed = true;
-
- if (this.scheduledFuture != null) {
- scheduledFuture.cancel(false);
- this.scheduler.shutdown();
- }
-
- if (preparedStatement != null) {
- flushClose();
- try {
- preparedStatement.close();
- } catch (SQLException e) {
- log.error("JDBC statement could not be closed: " + e.getMessage());
- } finally {
- preparedStatement = null;
- }
- }
-
- try {
- CKUtils.close(connection);
- } catch (Exception e) {
- log.error("JDBC connection could not be closed: " + e.getMessage());
- }
- }
- }
-}
-
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java
deleted file mode 100644
index 6cf2f60..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java
+++ /dev/null
@@ -1,106 +0,0 @@
-//package com.zdjizhi.utils.ck;
-//
-//import cn.hutool.core.io.IoUtil;
-//import cn.hutool.log.Log;
-//import cn.hutool.log.LogFactory;
-//import com.zdjizhi.enums.LogMetadata;
-//import org.apache.commons.lang3.time.StopWatch;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-//import ru.yandex.clickhouse.ClickHousePreparedStatement;
-//
-//import java.sql.Connection;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//
-//import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH;
-//
-//public class ClickhouseSink2 extends RichSinkFunction<List<Map<String, Object>>> {
-//
-// private static final Log log = LogFactory.get();
-//
-// private Connection connection;
-// private ClickHousePreparedStatement preparedStatement;
-// public String sink;
-//
-// private static final Map<String, String[]> logMetadataFields = new HashMap<>();
-// private static final Map<String, String> logMetadataSql = new HashMap<>();
-//
-// static {
-// for (LogMetadata value : LogMetadata.values()) {
-// logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
-// logMetadataFields.put(value.getSink(), value.getFields());
-// }
-// }
-//
-// public ClickhouseSink2(String sink) {
-// this.sink = sink;
-// }
-//
-// public String getSink() {
-// return sink;
-// }
-//
-// public void setSink(String sink) {
-// this.sink = sink;
-// }
-//
-// @Override
-// public void invoke(List<Map<String, Object>> logs, Context context) throws Exception {
-// executeInsert(logs, getSink());
-// }
-//
-// @Override
-// public void open(Configuration parameters) throws Exception {
-// connection = CKUtils.getConnection();
-// String sql = logMetadataSql.get(sink);
-// log.debug(sql);
-// connection.setAutoCommit(false);
-// preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
-// }
-//
-// @Override
-// public void close() throws Exception {
-// IoUtil.close(preparedStatement);
-// CKUtils.close(connection);
-// }
-//
-// public void executeInsert(List<Map<String, Object>> data, String tableName) {
-//
-// try {
-// StopWatch stopWatch = new StopWatch();
-// stopWatch.start();
-// log.info("开始写入ck数据 :{}", data.size());
-//
-// String[] logFields = logMetadataFields.get(tableName);
-//
-// int count = 0;
-// for (Map<String, Object> map : data) {
-// for (int i = 0; i < logFields.length; i++) {
-// preparedStatement.setObject(i + 1, map.get(logFields[i]));
-// }
-// preparedStatement.addBatch();
-// count++;
-// if (count % CK_BATCH == 0) {
-// preparedStatement.executeBatch();
-// connection.commit();
-// preparedStatement.clearBatch();
-// count = 0;
-// }
-// }
-// if (count > 0) {
-// preparedStatement.executeBatch();
-// connection.commit();
-// preparedStatement.clearBatch();
-// }
-//
-// stopWatch.stop();
-// log.info("总共花费时间 {} ms", stopWatch.getTime());
-// log.info("写入ck表{},数据 {}", tableName, data.size());
-// } catch (Exception ex) {
-// log.error("ClickhouseSink插入报错", ex);
-// }
-// }
-//
-//} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
index 0cf16ff..782e0ff 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
@@ -79,7 +79,7 @@ public class JsonTypeUtil {
* @return Long value
*/
static long checkLongValue(Object value) {
- Long longVal = TypeUtils.castToLong(value);
+ Long longVal = TypeUtil.castToLong(value);
if (longVal == null) {
return 0L;
@@ -99,7 +99,7 @@ public class JsonTypeUtil {
return null;
}
- return TypeUtils.castToDouble(value);
+ return TypeUtil.castToDouble(value);
}
@@ -111,7 +111,7 @@ public class JsonTypeUtil {
*/
static int getIntValue(Object value) {
- Integer intVal = TypeUtils.castToInt(value);
+ Integer intVal = TypeUtil.castToInt(value);
if (intVal == null) {
return 0;
}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtil.java
index 1beb1fa..8b1adfa 100644
--- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
+++ b/src/main/java/com/zdjizhi/utils/json/TypeUtil.java
@@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit;
* @Description:
* @date 2021/7/1218:20
*/
-public class TypeUtils {
+public class TypeUtil {
private static final Log logger = LogFactory.get();
/**
diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
index 8f3fccf..3aeb499 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
@@ -31,13 +31,11 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem
@Override
@SuppressWarnings("unchecked")
- public Map<String,Object> deserialize(ConsumerRecord record) throws Exception {
+ public Map<String, Object> deserialize(ConsumerRecord record) throws Exception {
if (record != null) {
try {
-// long timestamp = record.timestamp() / 1000;
String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING);
Map<String, Object> data = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
-// json.put("common_ingestion_time", timestamp);
return data;
} catch (RuntimeException e) {
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());