diff options
| author | zhanghongqing <[email protected]> | 2022-09-23 17:10:05 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-09-23 17:10:05 +0800 |
| commit | 9cdfe060cfeda37e04fa43563569efae53641eb4 (patch) | |
| tree | fabf1a170248666f1587cffd1c0f3d6e724de0d9 /src/main/java/com/zdjizhi/etl | |
| parent | 25e5b51766540d8c1b238a1e28a96fdff45024d3 (diff) | |
1.过滤异常数据 2.优化sink写入代码 3.优化clickhouse配置
Diffstat (limited to 'src/main/java/com/zdjizhi/etl')
19 files changed, 153 insertions, 575 deletions
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java deleted file mode 100644 index 947bad8..0000000 --- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.etl; - -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> { - - @Override - public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception { - Iterator<Map<String, Object>> iterator = iterable.iterator(); - List<Map<String, Object>> batchLog = new ArrayList<>(); - while (iterator.hasNext()) { - batchLog.add(iterator.next()); - } - out.collect(batchLog); - } -} diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java deleted file mode 100644 index 66b36a9..0000000 --- a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.zdjizhi.etl; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; - - -/** - * * 带超时的计数窗口触发器 - */ -public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> { - private static Log logger = LogFactory.get(); - - /** - * 窗口最大数据量 - */ - private int maxCount; - /** - * event time / process time - */ - private TimeCharacteristic timeType; - - private String stateName; - - public String getStateName() { - return stateName; - } - - public void setStateName(String stateName) { - this.stateName = stateName; - } - - public CountTriggerWithTimeout(String stateName) { - this.stateName = stateName; - } - - /** - * 用于储存窗口当前数据量的状态对象 - */ - private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE); - - - public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) { - - this.maxCount = maxCount; - this.timeType = timeType; - this.stateName = stateName; - } - - private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { - clear(window, ctx); - return TriggerResult.FIRE_AND_PURGE; - } - - - @Override - public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { - ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); - countState.add(1L); - - - if (countState.get() >= maxCount) { - logger.info("fire with count: " + countState.get()); - return fireAndPurge(window, ctx); - } - if (timestamp >= window.getEnd()) { - logger.info("fire with tiem: " + timestamp); - return fireAndPurge(window, ctx); - } else { - return TriggerResult.CONTINUE; - } - } - - - @Override - public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { - if (timeType != TimeCharacteristic.ProcessingTime) { - return TriggerResult.CONTINUE; - } - - - if (time >= window.getEnd()) { - return TriggerResult.CONTINUE; - } else { - logger.debug("fire with process tiem: " + time); - return fireAndPurge(window, ctx); - } - } - - - @Override - public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { - if (timeType != TimeCharacteristic.EventTime) { - return TriggerResult.CONTINUE; - } - - - if (time >= window.getEnd()) { - return TriggerResult.CONTINUE; - } else { - logger.debug("fire with event tiem: " + time); - return fireAndPurge(window, ctx); - } - } - - - @Override - public void clear(TimeWindow window, TriggerContext ctx) throws Exception { - ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); - countState.clear(); - } - - /** - * 计数方法 - */ - class Sum implements ReduceFunction<Long> { - - - @Override - public Long reduce(Long value1, Long value2) throws Exception { - return value1 + value2; - } - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java index 052a8b3..de5141a 100644 --- a/src/main/java/com/zdjizhi/etl/LogService.java +++ b/src/main/java/com/zdjizhi/etl/LogService.java @@ -1,45 +1,16 @@ package com.zdjizhi.etl; import cn.hutool.json.JSONUtil; -import com.zdjizhi.etl.connection.ArangodbBatchIPWindow; -import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.CKSink; import com.zdjizhi.utils.kafka.KafkaProducer; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import java.util.Map; -import static com.zdjizhi.common.FlowWriteConfig.*; +import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM; public class LogService { - -// @Deprecated -// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { -// -// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) -// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) -// .apply(new CKBatchWindow()) -// .addSink(new ClickhouseSink(sink)) -// .setParallelism(SINK_PARALLELISM) -// .name(sink) -// .setParallelism(SINK_PARALLELISM); -// -// } - - public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { - sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new ArangodbBatchIPWindow()) - .addSink(new ArangoDBSink(sink)) - .setParallelism(SINK_PARALLELISM) - .name(sink) - .setParallelism(SINK_PARALLELISM); - } - public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { sourceStream.map(JSONUtil::toJsonStr) .setParallelism(SINK_PARALLELISM) diff --git a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java deleted file mode 100644 index f5848c4..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.zdjizhi.etl.connection; - -import cn.hutool.core.util.StrUtil; -import com.arangodb.entity.BaseEdgeDocument; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> { - - @Override - public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception { - Iterator<Map<String, Object>> iterator = iterable.iterator(); - List<BaseEdgeDocument> batchLog = new ArrayList<>(); - while (iterator.hasNext()) { - Map<String, Object> next = iterator.next(); - String srcIp = StrUtil.toString(next.get("src_ip")); - String dstIp = StrUtil.toString(next.get("dst_ip")); - BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); - baseEdgeDocument.setKey(String.join("-", srcIp, dstIp)); - baseEdgeDocument.setFrom("src_ip/" + srcIp); - baseEdgeDocument.setTo("dst_ip/" + dstIp); - baseEdgeDocument.addAttribute("src_ip", srcIp); - baseEdgeDocument.addAttribute("dst_ip", dstIp); - baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); - - batchLog.add(baseEdgeDocument); - } - collector.collect(batchLog); - } -} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index b0fff6b..9c5f8e0 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -2,14 +2,14 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import com.alibaba.fastjson.util.TypeUtils; +import com.arangodb.entity.BaseEdgeDocument; import com.zdjizhi.etl.LogService; -import com.zdjizhi.etl.dns.SketchTimeMapFunction; +import com.zdjizhi.utils.arangodb.AGSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; @@ -36,16 +36,21 @@ public class ConnLogService { //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); } else { + LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION); + LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH); LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } - DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); + if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) { - //合并通联和通联sketch - DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); + DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); - //写入arangodb - LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); + //合并通联和通联sketch + DataStream<BaseEdgeDocument> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); + + //写入arangodb + ConnLogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); + } } @@ -59,21 +64,24 @@ public class ConnLogService { String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; - SingleOutputStreamOperator<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + DataStream<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> { if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) { return false; } if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) { - if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || - String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + if (TypeUtils.castToLong(x.get("total_cs_pkts")) < 0 || TypeUtils.castToLong(x.get("total_cs_pkts")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_sc_pkts")) < 0 || TypeUtils.castToLong(x.get("total_sc_pkts")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_cs_bytes")) < 0 || TypeUtils.castToLong(x.get("total_cs_bytes")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_sc_bytes")) < 0 || TypeUtils.castToLong(x.get("total_sc_bytes")) == Long.MAX_VALUE) { return false; } return true; } else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) { - if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH || - String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + if (TypeUtils.castToLong(x.get("sketch_sessions")) < 0 || TypeUtils.castToLong(x.get("sketch_sessions")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("sketch_packets")) < 0 || TypeUtils.castToLong(x.get("sketch_packets")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("sketch_bytes")) < 0 || TypeUtils.castToLong(x.get("sketch_bytes")) == Long.MAX_VALUE) { return false; } return true; @@ -96,8 +104,10 @@ public class ConnLogService { })) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); return connTransformStream; } @@ -107,18 +117,28 @@ public class ConnLogService { .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) - .process(new SketchProcessFunction()); + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new SketchProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) + .setParallelism(TRANSFORM_PARALLELISM); return sketchTransformStream; } - private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { - DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream) + private static DataStream<BaseEdgeDocument> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { + DataStream<BaseEdgeDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return ip2ipGraph; } + public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception { + sourceStream.addSink(new AGSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } + } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java deleted file mode 100644 index b727535..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java +++ /dev/null @@ -1,162 +0,0 @@ -//package com.zdjizhi.etl.connection; -// -//import cn.hutool.core.convert.Convert; -//import cn.hutool.core.util.StrUtil; -//import com.zdjizhi.etl.LogService; -//import com.zdjizhi.etl.dns.SketchTimeMapFunction; -//import com.zdjizhi.utils.kafka.KafkaConsumer; -//import org.apache.flink.api.common.eventtime.WatermarkStrategy; -//import org.apache.flink.api.common.functions.MapFunction; -//import org.apache.flink.api.java.utils.ParameterTool; -//import org.apache.flink.streaming.api.datastream.DataStream; -//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -//import org.apache.flink.streaming.api.windowing.time.Time; -//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink; -//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; -//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst; -// -//import java.time.Duration; -//import java.util.*; -// -//import static com.zdjizhi.common.FlowWriteConfig.*; -// -// -//public class ConnLogService2 { -// -// public static String convertToCsv(Map<String, Object> map) { -// List<Object> list = new ArrayList<>(); -// list.add("("); -// for (Map.Entry<String, Object> m : map.entrySet()) { -// if (m.getValue() instanceof String) { -// list.add("'" + m.getValue() + "'"); -// } else { -// list.add(m.getValue()); -// } -// } -// list.add(")"); -// String join = StrUtil.join(",", list); -// return join; -// -// } -// -// public static void connLogStream(StreamExecutionEnvironment env) throws Exception { -// //connection -// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); -// //sketch -// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); -// -// //写入CKsink,批量处理 -// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); -// -// Map<String, String> globalParameters = new HashMap<>(); -// // ClickHouse cluster properties -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/"); -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME); -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN); -// -// // sink common -// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1"); -// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/"); -// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2"); -// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2"); -// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2"); -// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); -// -// // set global paramaters -// ParameterTool parameters = ParameterTool.fromMap(globalParameters); -// env.getConfig().setGlobalJobParameters(parameters); -// -// // Transform 操作 -// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() { -// @Override -// public String map(Map<String, Object> data) throws Exception { -// String s = convertToCsv(data); -// System.err.println(s); -// return convertToCsv(data); -// } -// }); -// -// -// // create props for sink -// Properties props = new Properties(); -// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH); -// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH); -// ClickHouseSink sink = new ClickHouseSink(props); -// dataStream.addSink(sink); -// dataStream.print(); -// -//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); -// -// //transform -// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource); -// -// //写入ck通联relation表 -// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); -// -// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); -// -// //合并通联和通联sketch -// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); -// -// //写入arangodb -// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); -// -// } -// -// /** -// * 通联原始日志数据源消费kafka -// * -// * @param source -// * @return -// */ -// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { -// -// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; -// -// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) -// .setParallelism(SOURCE_PARALLELISM) -// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0) -// .setParallelism(SOURCE_PARALLELISM) -// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) -// .setParallelism(SOURCE_PARALLELISM) -// .name(source) -// .setParallelism(SOURCE_PARALLELISM); -// return sourceStream; -// } -// -// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception { -// DataStream<Map<String, Object>> connTransformStream = connSource -// .assignTimestampsAndWatermarks(WatermarkStrategy -// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) -// .withTimestampAssigner((event, timestamp) -> { -// return Convert.toLong(event.get("conn_start_time")) * 1000; -// })) -// .setParallelism(TRANSFORM_PARALLELISM) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) -// .process(new ConnProcessFunction()) -// .setParallelism(TRANSFORM_PARALLELISM); -// return connTransformStream; -// } -// -// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception { -// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy -// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) -// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) -// .process(new SketchProcessFunction()); -// return sketchTransformStream; -// } -// -// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { -// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) -// .process(new Ip2IpGraphProcessFunction()) -// .setParallelism(TRANSFORM_PARALLELISM); -// return ip2ipGraph; -// } -// -//} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 91cb47f..9a5245f 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -29,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec try { Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); if (values != null) { - Map<String, Object> result = new HashMap<>(); + Map<String, Object> result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -53,16 +53,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec long endTime = DateUtil.currentSeconds(); try { for (Map<String, Object> newSketchLog : elements) { - long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time")); - if (connStartTimetime > 0) { + long connStartTime = Convert.toLong(newSketchLog.get("conn_start_time")); + if (connStartTime > 0) { sessions++; packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts")); bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; + startTime = Math.min(connStartTime, startTime); + endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); @@ -71,4 +69,5 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec } return null; } + } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java deleted file mode 100644 index e38517a..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.zdjizhi.etl.connection; - -import cn.hutool.core.convert.Convert; -import cn.hutool.core.date.DateUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.util.TypeUtils; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.tuple.Tuple5; - -import java.util.Map; - - -/** - * @author 94976 - */ -public class ConnReduceFunction implements ReduceFunction<Map<String, Object>> { - - private static final Log logger = LogFactory.get(); - - // -// public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { -// try { -// Tuple5<Long, Long, Long, Long, Long> values = sum(elements); -// if (values != null) { -// Map<String, Object> result = new HashMap<>(); -// result.put("start_time", values.f0); -// result.put("end_time", values.f1); -// result.put("src_ip", keys.f0); -// result.put("dst_ip", keys.f1); -// result.put("sessions", values.f2); -// result.put("packets", values.f3); -// result.put("bytes", values.f4); -// out.collect(result); -// logger.debug("获取中间聚合结果:{}", result.toString()); -// } -// } catch (Exception e) { -// logger.error("获取中间聚合结果失败,middleResult: {}", e); -// } -// } -// - private Tuple5<Long, Long, Long, Long, Long> sum(Map<String, Object> map1, Map<String, Object> map2) { - - try { - long sessions = 0L; - long packets = 0L; - long bytes = 0L; - long startTime = DateUtil.currentSeconds(); - long endTime = DateUtil.currentSeconds(); - - long connStartTime1 = Convert.toLong(map1.get("conn_start_time")); - long connStartTime2 = Convert.toLong(map2.get("conn_start_time")); - if (connStartTime1 > 0 && connStartTime2 > 0) { - sessions++; - packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) + - TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts")); - - bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) + - TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes")); - - startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2; - endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2; - - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; - - } - - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); - } - return null; - } - - @Override - public Map<String, Object> reduce(Map<String, Object> map1, Map<String, Object> map2) throws Exception { - - return null; - } -} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java index e957d65..7becf90 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java @@ -1,6 +1,7 @@ package com.zdjizhi.etl.connection; -import com.zdjizhi.utils.json.TypeUtils; +import com.alibaba.fastjson.util.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; @@ -9,8 +10,13 @@ public class ConnTimeMapFunction implements MapFunction<Map<String, Object>, Map @Override public Map<String, Object> map(Map<String, Object> value) throws Exception { - value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time"))); - value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time"))); + value.put("conn_start_time", TypeUtil.coverMSToS(value.get("conn_start_time"))); + value.put("log_gen_time", TypeUtil.coverMSToS(value.get("log_gen_time"))); + + value.put("total_cs_pkts", TypeUtils.castToLong(value.get("total_cs_pkts"))); + value.put("total_sc_pkts", TypeUtils.castToLong(value.get("total_sc_pkts"))); + value.put("total_cs_bytes", TypeUtils.castToLong(value.get("total_cs_bytes"))); + value.put("total_sc_bytes", TypeUtils.castToLong(value.get("total_sc_bytes"))); return value; } } diff --git a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java index 1b1b0c9..9468e26 100644 --- a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java @@ -2,42 +2,48 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.HashUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.arangodb.entity.BaseEdgeDocument; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; import java.util.Map; /** - * 对ip去重 + * 处理时间,转为图数据 */ -public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> { +public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseEdgeDocument, Tuple2<String, String>, TimeWindow> { private static final Log logger = LogFactory.get(); @Override - public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { + public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseEdgeDocument> out) { try { - long lastFoundTime = DateUtil.currentSeconds();; + long lastFoundTime = DateUtil.currentSeconds(); for (Map<String, Object> log : elements) { long connStartTime = Convert.toLong(log.get("start_time")); - lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime; + lastFoundTime = Math.max(connStartTime, lastFoundTime); } - Map<String, Object> newLog = new HashMap<>(); - newLog.put("src_ip", keys.f0); - newLog.put("dst_ip", keys.f1); - newLog.put("last_found_time", lastFoundTime); - out.collect(newLog); - logger.debug("获取中间聚合结果:{}", newLog.toString()); + + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(keys.f0 + keys.f1))); + baseEdgeDocument.setFrom("src_ip/" + keys.f0); + baseEdgeDocument.setTo("dst_ip/" + keys.f1); + baseEdgeDocument.addAttribute("src_ip", keys.f0); + baseEdgeDocument.addAttribute("dst_ip", keys.f1); + baseEdgeDocument.addAttribute("last_found_time", lastFoundTime); + + out.collect(baseEdgeDocument); + logger.debug("获取中间聚合结果:{}", baseEdgeDocument.toString()); } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}", e); + logger.error("获取中间聚合结果失败,middleResult: {}", e.getMessage()); } } diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index e2dd3f7..b27958a 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -42,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); try { if (values != null) { - Map<String, Object> result = new HashMap<>(); + Map<String, Object> result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -72,12 +72,9 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")); packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")); bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")); - startTime = connStartTime < startTime ? connStartTime : startTime; - endTime = connStartTime > endTime ? connStartTime : endTime; - sessions = sessions > Long.MAX_VALUE ? 0 : sessions; - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; + startTime = Math.min(connStartTime, startTime); + endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java new file mode 100644 index 0000000..ee0d6ae --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java @@ -0,0 +1,21 @@ +package com.zdjizhi.etl.connection; + +import com.alibaba.fastjson.util.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + +public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> { + + @Override + public Map<String, Object> map(Map<String, Object> value) throws Exception { + value.put("sketch_start_time", TypeUtil.coverMSToS(value.get("sketch_start_time"))); + + value.put("sketch_sessions", TypeUtils.castToLong(value.get("sketch_sessions"))); + value.put("sketch_packets", TypeUtils.castToLong(value.get("sketch_packets"))); + value.put("sketch_bytes", TypeUtils.castToLong(value.get("sketch_bytes"))); + return value; + } + +} diff --git a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java deleted file mode 100644 index ada4b83..0000000 --- a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.zdjizhi.etl.dns; - -import cn.hutool.core.util.StrUtil; -import com.arangodb.entity.BaseEdgeDocument; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> { - - @Override - public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception { - Iterator<Map<String, Object>> iterator = iterable.iterator(); - List<BaseEdgeDocument> batchLog = new ArrayList<>(); - while (iterator.hasNext()) { - Map<String, Object> next = iterator.next(); - String qname = StrUtil.toString(next.get("qname")); - String record = StrUtil.toString(next.get("record")); - BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); - baseEdgeDocument.setKey(String.join("-", qname, record)); - baseEdgeDocument.setFrom("qname/" + qname); - baseEdgeDocument.setTo("record/" + record); - baseEdgeDocument.addAttribute("qname", qname); - baseEdgeDocument.addAttribute("record", record); - baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); - - batchLog.add(baseEdgeDocument); - } - out.collect(batchLog); - } -} diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java new file mode 100644 index 0000000..a467d40 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java @@ -0,0 +1,28 @@ +package com.zdjizhi.etl.dns; + +import cn.hutool.core.util.HashUtil; +import cn.hutool.core.util.StrUtil; +import com.arangodb.entity.BaseEdgeDocument; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + +public class DnsGraphMapFunction implements MapFunction<Map<String, Object>, BaseEdgeDocument> { + + @Override + public BaseEdgeDocument map(Map<String, Object> value) throws Exception { + + String qname = StrUtil.toString(value.get("qname")); + String record = StrUtil.toString(value.get("record")); + + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(qname + record))); + baseEdgeDocument.setFrom("qname/" + qname); + baseEdgeDocument.setTo("record/" + record); + baseEdgeDocument.addAttribute("qname", qname); + baseEdgeDocument.addAttribute("record", record); + baseEdgeDocument.addAttribute("last_found_time", value.get("last_found_time")); + + return baseEdgeDocument; + } +} diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java index cde34c1..d8b3a36 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java @@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; @@ -28,7 +28,7 @@ public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, O Long startTime = Convert.toLong(log.get("start_time")); tmpTime = startTime > tmpTime ? startTime : tmpTime; } - Map newLog = new HashMap<>(); + Map newLog = new LinkedHashMap<>(); newLog.put("record_type", keys.f0); newLog.put("qname", keys.f1); newLog.put("record", keys.f2); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index b4ab915..7c67d6e 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -2,13 +2,14 @@ package com.zdjizhi.etl.dns; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.ObjectUtil; +import com.arangodb.entity.BaseEdgeDocument; import com.zdjizhi.enums.DnsType; import com.zdjizhi.etl.LogService; +import com.zdjizhi.utils.arangodb.AGSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -32,20 +33,24 @@ public class DnsLogService { //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); } else { - LogService.getLogCKSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); + LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS); + LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); } //arango 入库,按record_type分组入不同的表 - DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull) + DataStream<Map<String, Object>> dnsGraph = dnsTransform .keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) .setParallelism(SINK_PARALLELISM); for (DnsType dnsEnum : DnsType.values()) { - DataStream<Map<String, Object>> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + DataStream<BaseEdgeDocument> dnsRecordData = dnsGraph + .filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + .setParallelism(SINK_PARALLELISM) + .map(new DnsGraphMapFunction()) .setParallelism(SINK_PARALLELISM); - LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink()); + getLogArangoSink(dnsRecordData, dnsEnum.getSink()); } } @@ -64,6 +69,7 @@ public class DnsLogService { private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception { DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + .setParallelism(SOURCE_PARALLELISM) .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) @@ -71,10 +77,17 @@ public class DnsLogService { .flatMap(new DnsSplitFlatMapFunction()) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new DnsGraphKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return dnsTransform; } + public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception { + sourceStream.addSink(new AGSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } + } diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index 1da5a90..9607b00 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -6,7 +6,7 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.google.common.base.Joiner; import com.zdjizhi.enums.DnsType; -import com.zdjizhi.utils.json.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.MapFunction; @@ -25,7 +25,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri @Override public Map<String, Object> map(Map<String, Object> rawLog) throws Exception { try { - rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time"))); + rawLog.put("capture_time", TypeUtil.coverMSToS(rawLog.get("capture_time"))); //qname ,record 转小写 rawLog.put("qname", StringUtils.lowerCase(StrUtil.toString(rawLog.get("qname")))); if (Objects.nonNull(rawLog.get("response"))) { diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java index 8744060..894867f 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java @@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String endTime = logStartTime > endTime ? logStartTime : endTime; } } - Map<String, Object> newDns = new HashMap<>(); + Map<String, Object> newDns = new LinkedHashMap<>(); newDns.put("start_time", startTime); newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION); newDns.put("record_type", keys.f0); diff --git a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java deleted file mode 100644 index cf10a75..0000000 --- a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.zdjizhi.etl.dns; - -import com.zdjizhi.utils.json.TypeUtils; -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.Map; - -public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> { - - @Override - public Map<String, Object> map(Map<String, Object> value) throws Exception { - value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time"))); - return value; - } -} |
