summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-09-23 17:10:05 +0800
committerzhanghongqing <[email protected]>2022-09-23 17:10:05 +0800
commit9cdfe060cfeda37e04fa43563569efae53641eb4 (patch)
treefabf1a170248666f1587cffd1c0f3d6e724de0d9 /src/main/java/com/zdjizhi/etl
parent25e5b51766540d8c1b238a1e28a96fdff45024d3 (diff)
1.过滤异常数据 2.优化sink写入代码 3.优化clickhouse配置
Diffstat (limited to 'src/main/java/com/zdjizhi/etl')
-rw-r--r--src/main/java/com/zdjizhi/etl/CKBatchWindow.java23
-rw-r--r--src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java131
-rw-r--r--src/main/java/com/zdjizhi/etl/LogService.java31
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java58
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java162
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java15
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java80
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java12
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java32
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java11
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java21
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java28
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsLogService.java25
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java15
19 files changed, 153 insertions, 575 deletions
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
deleted file mode 100644
index 947bad8..0000000
--- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.etl;
-
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<Map<String, Object>> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- batchLog.add(iterator.next());
- }
- out.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java
deleted file mode 100644
index 66b36a9..0000000
--- a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.zdjizhi.etl;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-
-/**
- *  * 带超时的计数窗口触发器
- */
-public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
- private static Log logger = LogFactory.get();
-
- /**
- * 窗口最大数据量
- */
- private int maxCount;
- /**
- * event time / process time
- */
- private TimeCharacteristic timeType;
-
- private String stateName;
-
- public String getStateName() {
- return stateName;
- }
-
- public void setStateName(String stateName) {
- this.stateName = stateName;
- }
-
- public CountTriggerWithTimeout(String stateName) {
- this.stateName = stateName;
- }
-
- /**
- * 用于储存窗口当前数据量的状态对象
- */
- private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE);
-
-
- public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) {
-
- this.maxCount = maxCount;
- this.timeType = timeType;
- this.stateName = stateName;
- }
-
- private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
- clear(window, ctx);
- return TriggerResult.FIRE_AND_PURGE;
- }
-
-
- @Override
- public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
- ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
- countState.add(1L);
-
-
- if (countState.get() >= maxCount) {
- logger.info("fire with count: " + countState.get());
- return fireAndPurge(window, ctx);
- }
- if (timestamp >= window.getEnd()) {
- logger.info("fire with tiem: " + timestamp);
- return fireAndPurge(window, ctx);
- } else {
- return TriggerResult.CONTINUE;
- }
- }
-
-
- @Override
- public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
- if (timeType != TimeCharacteristic.ProcessingTime) {
- return TriggerResult.CONTINUE;
- }
-
-
- if (time >= window.getEnd()) {
- return TriggerResult.CONTINUE;
- } else {
- logger.debug("fire with process tiem: " + time);
- return fireAndPurge(window, ctx);
- }
- }
-
-
- @Override
- public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
- if (timeType != TimeCharacteristic.EventTime) {
- return TriggerResult.CONTINUE;
- }
-
-
- if (time >= window.getEnd()) {
- return TriggerResult.CONTINUE;
- } else {
- logger.debug("fire with event tiem: " + time);
- return fireAndPurge(window, ctx);
- }
- }
-
-
- @Override
- public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
- ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
- countState.clear();
- }
-
- /**
- * 计数方法
- */
- class Sum implements ReduceFunction<Long> {
-
-
- @Override
- public Long reduce(Long value1, Long value2) throws Exception {
- return value1 + value2;
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java
index 052a8b3..de5141a 100644
--- a/src/main/java/com/zdjizhi/etl/LogService.java
+++ b/src/main/java/com/zdjizhi/etl/LogService.java
@@ -1,45 +1,16 @@
package com.zdjizhi.etl;
import cn.hutool.json.JSONUtil;
-import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
-import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.CKSink;
import com.zdjizhi.utils.kafka.KafkaProducer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
-import static com.zdjizhi.common.FlowWriteConfig.*;
+import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM;
public class LogService {
-
-// @Deprecated
-// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
-//
-// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
-// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
-// .apply(new CKBatchWindow())
-// .addSink(new ClickhouseSink(sink))
-// .setParallelism(SINK_PARALLELISM)
-// .name(sink)
-// .setParallelism(SINK_PARALLELISM);
-//
-// }
-
- public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
- sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new ArangodbBatchIPWindow())
- .addSink(new ArangoDBSink(sink))
- .setParallelism(SINK_PARALLELISM)
- .name(sink)
- .setParallelism(SINK_PARALLELISM);
- }
-
public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
sourceStream.map(JSONUtil::toJsonStr)
.setParallelism(SINK_PARALLELISM)
diff --git a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
deleted file mode 100644
index f5848c4..0000000
--- a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.etl.connection;
-
-import cn.hutool.core.util.StrUtil;
-import com.arangodb.entity.BaseEdgeDocument;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<BaseEdgeDocument> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- String srcIp = StrUtil.toString(next.get("src_ip"));
- String dstIp = StrUtil.toString(next.get("dst_ip"));
- BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
- baseEdgeDocument.setKey(String.join("-", srcIp, dstIp));
- baseEdgeDocument.setFrom("src_ip/" + srcIp);
- baseEdgeDocument.setTo("dst_ip/" + dstIp);
- baseEdgeDocument.addAttribute("src_ip", srcIp);
- baseEdgeDocument.addAttribute("dst_ip", dstIp);
- baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
-
- batchLog.add(baseEdgeDocument);
- }
- collector.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
index b0fff6b..9c5f8e0 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -2,14 +2,14 @@ package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import com.alibaba.fastjson.util.TypeUtils;
+import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.etl.LogService;
-import com.zdjizhi.etl.dns.SketchTimeMapFunction;
+import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
@@ -36,16 +36,21 @@ public class ConnLogService {
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
} else {
+ LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION);
+ LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH);
LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
}
- DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
+ if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) {
- //合并通联和通联sketch
- DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
+ DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
- //写入arangodb
- LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
+ //合并通联和通联sketch
+ DataStream<BaseEdgeDocument> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
+
+ //写入arangodb
+ ConnLogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
+ }
}
@@ -59,21 +64,24 @@ public class ConnLogService {
String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
- SingleOutputStreamOperator<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
+ DataStream<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
.filter(x -> {
if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) {
return false;
}
if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) {
- if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
- String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
+ if (TypeUtils.castToLong(x.get("total_cs_pkts")) < 0 || TypeUtils.castToLong(x.get("total_cs_pkts")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("total_sc_pkts")) < 0 || TypeUtils.castToLong(x.get("total_sc_pkts")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("total_cs_bytes")) < 0 || TypeUtils.castToLong(x.get("total_cs_bytes")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("total_sc_bytes")) < 0 || TypeUtils.castToLong(x.get("total_sc_bytes")) == Long.MAX_VALUE) {
return false;
}
return true;
} else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) {
- if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
- String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
+ if (TypeUtils.castToLong(x.get("sketch_sessions")) < 0 || TypeUtils.castToLong(x.get("sketch_sessions")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("sketch_packets")) < 0 || TypeUtils.castToLong(x.get("sketch_packets")) == Long.MAX_VALUE
+ || TypeUtils.castToLong(x.get("sketch_bytes")) < 0 || TypeUtils.castToLong(x.get("sketch_bytes")) == Long.MAX_VALUE) {
return false;
}
return true;
@@ -96,8 +104,10 @@ public class ConnLogService {
}))
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
.setParallelism(TRANSFORM_PARALLELISM);
return connTransformStream;
}
@@ -107,18 +117,28 @@ public class ConnLogService {
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
- .process(new SketchProcessFunction());
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .process(new SketchProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
+ .setParallelism(TRANSFORM_PARALLELISM);
return sketchTransformStream;
}
- private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
- DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
+ private static DataStream<BaseEdgeDocument> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
+ DataStream<BaseEdgeDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return ip2ipGraph;
}
+ public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
+ sourceStream.addSink(new AGSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+ }
+
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java
deleted file mode 100644
index b727535..0000000
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java
+++ /dev/null
@@ -1,162 +0,0 @@
-//package com.zdjizhi.etl.connection;
-//
-//import cn.hutool.core.convert.Convert;
-//import cn.hutool.core.util.StrUtil;
-//import com.zdjizhi.etl.LogService;
-//import com.zdjizhi.etl.dns.SketchTimeMapFunction;
-//import com.zdjizhi.utils.kafka.KafkaConsumer;
-//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-//import org.apache.flink.api.common.functions.MapFunction;
-//import org.apache.flink.api.java.utils.ParameterTool;
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-//import org.apache.flink.streaming.api.windowing.time.Time;
-//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
-//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
-//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
-//
-//import java.time.Duration;
-//import java.util.*;
-//
-//import static com.zdjizhi.common.FlowWriteConfig.*;
-//
-//
-//public class ConnLogService2 {
-//
-// public static String convertToCsv(Map<String, Object> map) {
-// List<Object> list = new ArrayList<>();
-// list.add("(");
-// for (Map.Entry<String, Object> m : map.entrySet()) {
-// if (m.getValue() instanceof String) {
-// list.add("'" + m.getValue() + "'");
-// } else {
-// list.add(m.getValue());
-// }
-// }
-// list.add(")");
-// String join = StrUtil.join(",", list);
-// return join;
-//
-// }
-//
-// public static void connLogStream(StreamExecutionEnvironment env) throws Exception {
-// //connection
-// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
-// //sketch
-// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
-//
-// //写入CKsink,批量处理
-// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
-//
-// Map<String, String> globalParameters = new HashMap<>();
-// // ClickHouse cluster properties
-// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/");
-// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME);
-// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN);
-//
-// // sink common
-// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
-// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");
-// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
-// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
-// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2");
-// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
-//
-// // set global paramaters
-// ParameterTool parameters = ParameterTool.fromMap(globalParameters);
-// env.getConfig().setGlobalJobParameters(parameters);
-//
-// // Transform 操作
-// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() {
-// @Override
-// public String map(Map<String, Object> data) throws Exception {
-// String s = convertToCsv(data);
-// System.err.println(s);
-// return convertToCsv(data);
-// }
-// });
-//
-//
-// // create props for sink
-// Properties props = new Properties();
-// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH);
-// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH);
-// ClickHouseSink sink = new ClickHouseSink(props);
-// dataStream.addSink(sink);
-// dataStream.print();
-//
-//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
-//
-// //transform
-// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
-//
-// //写入ck通联relation表
-// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
-//
-// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
-//
-// //合并通联和通联sketch
-// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
-//
-// //写入arangodb
-// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
-//
-// }
-//
-// /**
-// * 通联原始日志数据源消费kafka
-// *
-// * @param source
-// * @return
-// */
-// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
-//
-// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
-//
-// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
-// .setParallelism(SOURCE_PARALLELISM)
-// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
-// .setParallelism(SOURCE_PARALLELISM)
-// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
-// .setParallelism(SOURCE_PARALLELISM)
-// .name(source)
-// .setParallelism(SOURCE_PARALLELISM);
-// return sourceStream;
-// }
-//
-// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
-// DataStream<Map<String, Object>> connTransformStream = connSource
-// .assignTimestampsAndWatermarks(WatermarkStrategy
-// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
-// .withTimestampAssigner((event, timestamp) -> {
-// return Convert.toLong(event.get("conn_start_time")) * 1000;
-// }))
-// .setParallelism(TRANSFORM_PARALLELISM)
-// .keyBy(new IpKeysSelector())
-// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
-// .process(new ConnProcessFunction())
-// .setParallelism(TRANSFORM_PARALLELISM);
-// return connTransformStream;
-// }
-//
-// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
-// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
-// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
-// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
-// .keyBy(new IpKeysSelector())
-// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
-// .process(new SketchProcessFunction());
-// return sketchTransformStream;
-// }
-//
-// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
-// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
-// .keyBy(new IpKeysSelector())
-// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
-// .process(new Ip2IpGraphProcessFunction())
-// .setParallelism(TRANSFORM_PARALLELISM);
-// return ip2ipGraph;
-// }
-//
-//}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
index 91cb47f..9a5245f 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
@@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -29,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
try {
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
if (values != null) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -53,16 +53,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
- long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
- if (connStartTimetime > 0) {
+ long connStartTime = Convert.toLong(newSketchLog.get("conn_start_time"));
+ if (connStartTime > 0) {
sessions++;
packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts"));
bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes"));
- startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
- endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
- packets = packets > Long.MAX_VALUE ? 0 : packets;
- bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
+ startTime = Math.min(connStartTime, startTime);
+ endTime = Math.max(connStartTime, endTime);
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
@@ -71,4 +69,5 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
}
return null;
}
+
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java
deleted file mode 100644
index e38517a..0000000
--- a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.zdjizhi.etl.connection;
-
-import cn.hutool.core.convert.Convert;
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.util.TypeUtils;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import java.util.Map;
-
-
-/**
- * @author 94976
- */
-public class ConnReduceFunction implements ReduceFunction<Map<String, Object>> {
-
- private static final Log logger = LogFactory.get();
-
- //
-// public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
-// try {
-// Tuple5<Long, Long, Long, Long, Long> values = sum(elements);
-// if (values != null) {
-// Map<String, Object> result = new HashMap<>();
-// result.put("start_time", values.f0);
-// result.put("end_time", values.f1);
-// result.put("src_ip", keys.f0);
-// result.put("dst_ip", keys.f1);
-// result.put("sessions", values.f2);
-// result.put("packets", values.f3);
-// result.put("bytes", values.f4);
-// out.collect(result);
-// logger.debug("获取中间聚合结果:{}", result.toString());
-// }
-// } catch (Exception e) {
-// logger.error("获取中间聚合结果失败,middleResult: {}", e);
-// }
-// }
-//
- private Tuple5<Long, Long, Long, Long, Long> sum(Map<String, Object> map1, Map<String, Object> map2) {
-
- try {
- long sessions = 0L;
- long packets = 0L;
- long bytes = 0L;
- long startTime = DateUtil.currentSeconds();
- long endTime = DateUtil.currentSeconds();
-
- long connStartTime1 = Convert.toLong(map1.get("conn_start_time"));
- long connStartTime2 = Convert.toLong(map2.get("conn_start_time"));
- if (connStartTime1 > 0 && connStartTime2 > 0) {
- sessions++;
- packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) +
- TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts"));
-
- bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) +
- TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes"));
-
- startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2;
- endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2;
-
- packets = packets > Long.MAX_VALUE ? 0 : packets;
- bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
-
- }
-
- } catch (Exception e) {
- logger.error("聚合中间结果集失败 {}", e);
- }
- return null;
- }
-
- @Override
- public Map<String, Object> reduce(Map<String, Object> map1, Map<String, Object> map2) throws Exception {
-
- return null;
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java
index e957d65..7becf90 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java
@@ -1,6 +1,7 @@
package com.zdjizhi.etl.connection;
-import com.zdjizhi.utils.json.TypeUtils;
+import com.alibaba.fastjson.util.TypeUtils;
+import com.zdjizhi.utils.json.TypeUtil;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
@@ -9,8 +10,13 @@ public class ConnTimeMapFunction implements MapFunction<Map<String, Object>, Map
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
- value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time")));
- value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time")));
+ value.put("conn_start_time", TypeUtil.coverMSToS(value.get("conn_start_time")));
+ value.put("log_gen_time", TypeUtil.coverMSToS(value.get("log_gen_time")));
+
+ value.put("total_cs_pkts", TypeUtils.castToLong(value.get("total_cs_pkts")));
+ value.put("total_sc_pkts", TypeUtils.castToLong(value.get("total_sc_pkts")));
+ value.put("total_cs_bytes", TypeUtils.castToLong(value.get("total_cs_bytes")));
+ value.put("total_sc_bytes", TypeUtils.castToLong(value.get("total_sc_bytes")));
return value;
}
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
index 1b1b0c9..9468e26 100644
--- a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
@@ -2,42 +2,48 @@ package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.HashUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
import java.util.Map;
/**
- * 对ip去重
+ * 处理时间,转为图数据
*/
-public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
+public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseEdgeDocument, Tuple2<String, String>, TimeWindow> {
private static final Log logger = LogFactory.get();
@Override
- public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
+ public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseEdgeDocument> out) {
try {
- long lastFoundTime = DateUtil.currentSeconds();;
+ long lastFoundTime = DateUtil.currentSeconds();
for (Map<String, Object> log : elements) {
long connStartTime = Convert.toLong(log.get("start_time"));
- lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime;
+ lastFoundTime = Math.max(connStartTime, lastFoundTime);
}
- Map<String, Object> newLog = new HashMap<>();
- newLog.put("src_ip", keys.f0);
- newLog.put("dst_ip", keys.f1);
- newLog.put("last_found_time", lastFoundTime);
- out.collect(newLog);
- logger.debug("获取中间聚合结果:{}", newLog.toString());
+
+ BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
+ baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(keys.f0 + keys.f1)));
+ baseEdgeDocument.setFrom("src_ip/" + keys.f0);
+ baseEdgeDocument.setTo("dst_ip/" + keys.f1);
+ baseEdgeDocument.addAttribute("src_ip", keys.f0);
+ baseEdgeDocument.addAttribute("dst_ip", keys.f1);
+ baseEdgeDocument.addAttribute("last_found_time", lastFoundTime);
+
+ out.collect(baseEdgeDocument);
+ logger.debug("获取中间聚合结果:{}", baseEdgeDocument.toString());
} catch (Exception e) {
- logger.error("获取中间聚合结果失败,middleResult: {}", e);
+ logger.error("获取中间聚合结果失败,middleResult: {}", e.getMessage());
}
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
index e2dd3f7..b27958a 100644
--- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
@@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -42,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
try {
if (values != null) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -72,12 +72,9 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions"));
packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets"));
bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes"));
- startTime = connStartTime < startTime ? connStartTime : startTime;
- endTime = connStartTime > endTime ? connStartTime : endTime;
- sessions = sessions > Long.MAX_VALUE ? 0 : sessions;
- packets = packets > Long.MAX_VALUE ? 0 : packets;
- bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
+ startTime = Math.min(connStartTime, startTime);
+ endTime = Math.max(connStartTime, endTime);
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java
new file mode 100644
index 0000000..ee0d6ae
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java
@@ -0,0 +1,21 @@
+package com.zdjizhi.etl.connection;
+
+import com.alibaba.fastjson.util.TypeUtils;
+import com.zdjizhi.utils.json.TypeUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.Map;
+
+public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
+
+ @Override
+ public Map<String, Object> map(Map<String, Object> value) throws Exception {
+ value.put("sketch_start_time", TypeUtil.coverMSToS(value.get("sketch_start_time")));
+
+ value.put("sketch_sessions", TypeUtils.castToLong(value.get("sketch_sessions")));
+ value.put("sketch_packets", TypeUtils.castToLong(value.get("sketch_packets")));
+ value.put("sketch_bytes", TypeUtils.castToLong(value.get("sketch_bytes")));
+ return value;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
deleted file mode 100644
index ada4b83..0000000
--- a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.etl.dns;
-
-import cn.hutool.core.util.StrUtil;
-import com.arangodb.entity.BaseEdgeDocument;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<BaseEdgeDocument> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- String qname = StrUtil.toString(next.get("qname"));
- String record = StrUtil.toString(next.get("record"));
- BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
- baseEdgeDocument.setKey(String.join("-", qname, record));
- baseEdgeDocument.setFrom("qname/" + qname);
- baseEdgeDocument.setTo("record/" + record);
- baseEdgeDocument.addAttribute("qname", qname);
- baseEdgeDocument.addAttribute("record", record);
- baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
-
- batchLog.add(baseEdgeDocument);
- }
- out.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java
new file mode 100644
index 0000000..a467d40
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java
@@ -0,0 +1,28 @@
+package com.zdjizhi.etl.dns;
+
+import cn.hutool.core.util.HashUtil;
+import cn.hutool.core.util.StrUtil;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.Map;
+
+public class DnsGraphMapFunction implements MapFunction<Map<String, Object>, BaseEdgeDocument> {
+
+ @Override
+ public BaseEdgeDocument map(Map<String, Object> value) throws Exception {
+
+ String qname = StrUtil.toString(value.get("qname"));
+ String record = StrUtil.toString(value.get("record"));
+
+ BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
+ baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(qname + record)));
+ baseEdgeDocument.setFrom("qname/" + qname);
+ baseEdgeDocument.setTo("record/" + record);
+ baseEdgeDocument.addAttribute("qname", qname);
+ baseEdgeDocument.addAttribute("record", record);
+ baseEdgeDocument.addAttribute("last_found_time", value.get("last_found_time"));
+
+ return baseEdgeDocument;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
index cde34c1..d8b3a36 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
@@ -28,7 +28,7 @@ public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, O
Long startTime = Convert.toLong(log.get("start_time"));
tmpTime = startTime > tmpTime ? startTime : tmpTime;
}
- Map newLog = new HashMap<>();
+ Map newLog = new LinkedHashMap<>();
newLog.put("record_type", keys.f0);
newLog.put("qname", keys.f1);
newLog.put("record", keys.f2);
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
index b4ab915..7c67d6e 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
@@ -2,13 +2,14 @@ package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
+import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.LogService;
+import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -32,20 +33,24 @@ public class DnsLogService {
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
} else {
- LogService.getLogCKSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
+ LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS);
+ LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
}
//arango 入库,按record_type分组入不同的表
- DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull)
+ DataStream<Map<String, Object>> dnsGraph = dnsTransform
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
- DataStream<Map<String, Object>> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
+ DataStream<BaseEdgeDocument> dnsRecordData = dnsGraph
+ .filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
+ .setParallelism(SINK_PARALLELISM)
+ .map(new DnsGraphMapFunction())
.setParallelism(SINK_PARALLELISM);
- LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink());
+ getLogArangoSink(dnsRecordData, dnsEnum.getSink());
}
}
@@ -64,6 +69,7 @@ public class DnsLogService {
private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
+ .setParallelism(SOURCE_PARALLELISM)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
@@ -71,10 +77,17 @@ public class DnsLogService {
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return dnsTransform;
}
+ public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
+ sourceStream.addSink(new AGSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+ }
+
}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
index 1da5a90..9607b00 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
@@ -6,7 +6,7 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.google.common.base.Joiner;
import com.zdjizhi.enums.DnsType;
-import com.zdjizhi.utils.json.TypeUtils;
+import com.zdjizhi.utils.json.TypeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
@@ -25,7 +25,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
@Override
public Map<String, Object> map(Map<String, Object> rawLog) throws Exception {
try {
- rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time")));
+ rawLog.put("capture_time", TypeUtil.coverMSToS(rawLog.get("capture_time")));
//qname ,record 转小写
rawLog.put("qname", StringUtils.lowerCase(StrUtil.toString(rawLog.get("qname"))));
if (Objects.nonNull(rawLog.get("response"))) {
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
index 8744060..894867f 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
@@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String
endTime = logStartTime > endTime ? logStartTime : endTime;
}
}
- Map<String, Object> newDns = new HashMap<>();
+ Map<String, Object> newDns = new LinkedHashMap<>();
newDns.put("start_time", startTime);
newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
newDns.put("record_type", keys.f0);
diff --git a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java
deleted file mode 100644
index cf10a75..0000000
--- a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.zdjizhi.etl.dns;
-
-import com.zdjizhi.utils.json.TypeUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.Map;
-
-public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
-
- @Override
- public Map<String, Object> map(Map<String, Object> value) throws Exception {
- value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time")));
- return value;
- }
-}