diff options
| author | zhanghongqing <[email protected]> | 2022-09-23 17:10:05 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-09-23 17:10:05 +0800 |
| commit | 9cdfe060cfeda37e04fa43563569efae53641eb4 (patch) | |
| tree | fabf1a170248666f1587cffd1c0f3d6e724de0d9 /src/main/java/com/zdjizhi/etl/connection | |
| parent | 25e5b51766540d8c1b238a1e28a96fdff45024d3 (diff) | |
1.过滤异常数据 2.优化sink写入代码 3.优化clickhouse配置
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/connection')
9 files changed, 99 insertions, 328 deletions
diff --git a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java deleted file mode 100644 index f5848c4..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.zdjizhi.etl.connection; - -import cn.hutool.core.util.StrUtil; -import com.arangodb.entity.BaseEdgeDocument; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> { - - @Override - public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception { - Iterator<Map<String, Object>> iterator = iterable.iterator(); - List<BaseEdgeDocument> batchLog = new ArrayList<>(); - while (iterator.hasNext()) { - Map<String, Object> next = iterator.next(); - String srcIp = StrUtil.toString(next.get("src_ip")); - String dstIp = StrUtil.toString(next.get("dst_ip")); - BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); - baseEdgeDocument.setKey(String.join("-", srcIp, dstIp)); - baseEdgeDocument.setFrom("src_ip/" + srcIp); - baseEdgeDocument.setTo("dst_ip/" + dstIp); - baseEdgeDocument.addAttribute("src_ip", srcIp); - baseEdgeDocument.addAttribute("dst_ip", dstIp); - baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); - - batchLog.add(baseEdgeDocument); - } - collector.collect(batchLog); - } -} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index b0fff6b..9c5f8e0 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -2,14 +2,14 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import com.alibaba.fastjson.util.TypeUtils; +import com.arangodb.entity.BaseEdgeDocument; import com.zdjizhi.etl.LogService; -import com.zdjizhi.etl.dns.SketchTimeMapFunction; +import com.zdjizhi.utils.arangodb.AGSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; @@ -36,16 +36,21 @@ public class ConnLogService { //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); } else { + LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION); + LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH); LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } - DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); + if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) { - //合并通联和通联sketch - DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); + DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); - //写入arangodb - LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); + //合并通联和通联sketch + DataStream<BaseEdgeDocument> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); + + //写入arangodb + ConnLogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); + } } @@ -59,21 +64,24 @@ public class ConnLogService { String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; - SingleOutputStreamOperator<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + DataStream<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> { if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) { return false; } if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) { - if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || - String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + if (TypeUtils.castToLong(x.get("total_cs_pkts")) < 0 || TypeUtils.castToLong(x.get("total_cs_pkts")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_sc_pkts")) < 0 || TypeUtils.castToLong(x.get("total_sc_pkts")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_cs_bytes")) < 0 || TypeUtils.castToLong(x.get("total_cs_bytes")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_sc_bytes")) < 0 || TypeUtils.castToLong(x.get("total_sc_bytes")) == Long.MAX_VALUE) { return false; } return true; } else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) { - if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH || - String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + if (TypeUtils.castToLong(x.get("sketch_sessions")) < 0 || TypeUtils.castToLong(x.get("sketch_sessions")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("sketch_packets")) < 0 || TypeUtils.castToLong(x.get("sketch_packets")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("sketch_bytes")) < 0 || TypeUtils.castToLong(x.get("sketch_bytes")) == Long.MAX_VALUE) { return false; } return true; @@ -96,8 +104,10 @@ public class ConnLogService { })) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); return connTransformStream; } @@ -107,18 +117,28 @@ public class ConnLogService { .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) - .process(new SketchProcessFunction()); + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new SketchProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) + .setParallelism(TRANSFORM_PARALLELISM); return sketchTransformStream; } - private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { - DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream) + private static DataStream<BaseEdgeDocument> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { + DataStream<BaseEdgeDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return ip2ipGraph; } + public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception { + sourceStream.addSink(new AGSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } + } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java deleted file mode 100644 index b727535..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java +++ /dev/null @@ -1,162 +0,0 @@ -//package com.zdjizhi.etl.connection; -// -//import cn.hutool.core.convert.Convert; -//import cn.hutool.core.util.StrUtil; -//import com.zdjizhi.etl.LogService; -//import com.zdjizhi.etl.dns.SketchTimeMapFunction; -//import com.zdjizhi.utils.kafka.KafkaConsumer; -//import org.apache.flink.api.common.eventtime.WatermarkStrategy; -//import org.apache.flink.api.common.functions.MapFunction; -//import org.apache.flink.api.java.utils.ParameterTool; -//import org.apache.flink.streaming.api.datastream.DataStream; -//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -//import org.apache.flink.streaming.api.windowing.time.Time; -//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink; -//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; -//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst; -// -//import java.time.Duration; -//import java.util.*; -// -//import static com.zdjizhi.common.FlowWriteConfig.*; -// -// -//public class ConnLogService2 { -// -// public static String convertToCsv(Map<String, Object> map) { -// List<Object> list = new ArrayList<>(); -// list.add("("); -// for (Map.Entry<String, Object> m : map.entrySet()) { -// if (m.getValue() instanceof String) { -// list.add("'" + m.getValue() + "'"); -// } else { -// list.add(m.getValue()); -// } -// } -// list.add(")"); -// String join = StrUtil.join(",", list); -// return join; -// -// } -// -// public static void connLogStream(StreamExecutionEnvironment env) throws Exception { -// //connection -// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); -// //sketch -// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); -// -// //写入CKsink,批量处理 -// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); -// -// Map<String, String> globalParameters = new HashMap<>(); -// // ClickHouse cluster properties -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/"); -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME); -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN); -// -// // sink common -// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1"); -// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/"); -// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2"); -// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2"); -// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2"); -// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); -// -// // set global paramaters -// ParameterTool parameters = ParameterTool.fromMap(globalParameters); -// env.getConfig().setGlobalJobParameters(parameters); -// -// // Transform 操作 -// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() { -// @Override -// public String map(Map<String, Object> data) throws Exception { -// String s = convertToCsv(data); -// System.err.println(s); -// return convertToCsv(data); -// } -// }); -// -// -// // create props for sink -// Properties props = new Properties(); -// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH); -// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH); -// ClickHouseSink sink = new ClickHouseSink(props); -// dataStream.addSink(sink); -// dataStream.print(); -// -//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); -// -// //transform -// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource); -// -// //写入ck通联relation表 -// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); -// -// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); -// -// //合并通联和通联sketch -// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); -// -// //写入arangodb -// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); -// -// } -// -// /** -// * 通联原始日志数据源消费kafka -// * -// * @param source -// * @return -// */ -// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { -// -// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; -// -// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) -// .setParallelism(SOURCE_PARALLELISM) -// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0) -// .setParallelism(SOURCE_PARALLELISM) -// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) -// .setParallelism(SOURCE_PARALLELISM) -// .name(source) -// .setParallelism(SOURCE_PARALLELISM); -// return sourceStream; -// } -// -// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception { -// DataStream<Map<String, Object>> connTransformStream = connSource -// .assignTimestampsAndWatermarks(WatermarkStrategy -// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) -// .withTimestampAssigner((event, timestamp) -> { -// return Convert.toLong(event.get("conn_start_time")) * 1000; -// })) -// .setParallelism(TRANSFORM_PARALLELISM) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) -// .process(new ConnProcessFunction()) -// .setParallelism(TRANSFORM_PARALLELISM); -// return connTransformStream; -// } -// -// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception { -// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy -// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) -// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) -// .process(new SketchProcessFunction()); -// return sketchTransformStream; -// } -// -// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { -// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) -// .process(new Ip2IpGraphProcessFunction()) -// .setParallelism(TRANSFORM_PARALLELISM); -// return ip2ipGraph; -// } -// -//} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 91cb47f..9a5245f 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -29,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec try { Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); if (values != null) { - Map<String, Object> result = new HashMap<>(); + Map<String, Object> result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -53,16 +53,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec long endTime = DateUtil.currentSeconds(); try { for (Map<String, Object> newSketchLog : elements) { - long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time")); - if (connStartTimetime > 0) { + long connStartTime = Convert.toLong(newSketchLog.get("conn_start_time")); + if (connStartTime > 0) { sessions++; packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts")); bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; + startTime = Math.min(connStartTime, startTime); + endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); @@ -71,4 +69,5 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec } return null; } + } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java deleted file mode 100644 index e38517a..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.zdjizhi.etl.connection; - -import cn.hutool.core.convert.Convert; -import cn.hutool.core.date.DateUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.util.TypeUtils; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.tuple.Tuple5; - -import java.util.Map; - - -/** - * @author 94976 - */ -public class ConnReduceFunction implements ReduceFunction<Map<String, Object>> { - - private static final Log logger = LogFactory.get(); - - // -// public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { -// try { -// Tuple5<Long, Long, Long, Long, Long> values = sum(elements); -// if (values != null) { -// Map<String, Object> result = new HashMap<>(); -// result.put("start_time", values.f0); -// result.put("end_time", values.f1); -// result.put("src_ip", keys.f0); -// result.put("dst_ip", keys.f1); -// result.put("sessions", values.f2); -// result.put("packets", values.f3); -// result.put("bytes", values.f4); -// out.collect(result); -// logger.debug("获取中间聚合结果:{}", result.toString()); -// } -// } catch (Exception e) { -// logger.error("获取中间聚合结果失败,middleResult: {}", e); -// } -// } -// - private Tuple5<Long, Long, Long, Long, Long> sum(Map<String, Object> map1, Map<String, Object> map2) { - - try { - long sessions = 0L; - long packets = 0L; - long bytes = 0L; - long startTime = DateUtil.currentSeconds(); - long endTime = DateUtil.currentSeconds(); - - long connStartTime1 = Convert.toLong(map1.get("conn_start_time")); - long connStartTime2 = Convert.toLong(map2.get("conn_start_time")); - if (connStartTime1 > 0 && connStartTime2 > 0) { - sessions++; - packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) + - TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts")); - - bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) + - TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes")); - - startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2; - endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2; - - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; - - } - - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); - } - return null; - } - - @Override - public Map<String, Object> reduce(Map<String, Object> map1, Map<String, Object> map2) throws Exception { - - return null; - } -} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java index e957d65..7becf90 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java @@ -1,6 +1,7 @@ package com.zdjizhi.etl.connection; -import com.zdjizhi.utils.json.TypeUtils; +import com.alibaba.fastjson.util.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; @@ -9,8 +10,13 @@ public class ConnTimeMapFunction implements MapFunction<Map<String, Object>, Map @Override public Map<String, Object> map(Map<String, Object> value) throws Exception { - value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time"))); - value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time"))); + value.put("conn_start_time", TypeUtil.coverMSToS(value.get("conn_start_time"))); + value.put("log_gen_time", TypeUtil.coverMSToS(value.get("log_gen_time"))); + + value.put("total_cs_pkts", TypeUtils.castToLong(value.get("total_cs_pkts"))); + value.put("total_sc_pkts", TypeUtils.castToLong(value.get("total_sc_pkts"))); + value.put("total_cs_bytes", TypeUtils.castToLong(value.get("total_cs_bytes"))); + value.put("total_sc_bytes", TypeUtils.castToLong(value.get("total_sc_bytes"))); return value; } } diff --git a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java index 1b1b0c9..9468e26 100644 --- a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java @@ -2,42 +2,48 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.HashUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.arangodb.entity.BaseEdgeDocument; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; import java.util.Map; /** - * 对ip去重 + * 处理时间,转为图数据 */ -public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> { +public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseEdgeDocument, Tuple2<String, String>, TimeWindow> { private static final Log logger = LogFactory.get(); @Override - public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { + public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseEdgeDocument> out) { try { - long lastFoundTime = DateUtil.currentSeconds();; + long lastFoundTime = DateUtil.currentSeconds(); for (Map<String, Object> log : elements) { long connStartTime = Convert.toLong(log.get("start_time")); - lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime; + lastFoundTime = Math.max(connStartTime, lastFoundTime); } - Map<String, Object> newLog = new HashMap<>(); - newLog.put("src_ip", keys.f0); - newLog.put("dst_ip", keys.f1); - newLog.put("last_found_time", lastFoundTime); - out.collect(newLog); - logger.debug("获取中间聚合结果:{}", newLog.toString()); + + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(keys.f0 + keys.f1))); + baseEdgeDocument.setFrom("src_ip/" + keys.f0); + baseEdgeDocument.setTo("dst_ip/" + keys.f1); + baseEdgeDocument.addAttribute("src_ip", keys.f0); + baseEdgeDocument.addAttribute("dst_ip", keys.f1); + baseEdgeDocument.addAttribute("last_found_time", lastFoundTime); + + out.collect(baseEdgeDocument); + logger.debug("获取中间聚合结果:{}", baseEdgeDocument.toString()); } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}", e); + logger.error("获取中间聚合结果失败,middleResult: {}", e.getMessage()); } } diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index e2dd3f7..b27958a 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -42,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); try { if (values != null) { - Map<String, Object> result = new HashMap<>(); + Map<String, Object> result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -72,12 +72,9 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")); packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")); bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")); - startTime = connStartTime < startTime ? connStartTime : startTime; - endTime = connStartTime > endTime ? connStartTime : endTime; - sessions = sessions > Long.MAX_VALUE ? 0 : sessions; - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; + startTime = Math.min(connStartTime, startTime); + endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java new file mode 100644 index 0000000..ee0d6ae --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java @@ -0,0 +1,21 @@ +package com.zdjizhi.etl.connection; + +import com.alibaba.fastjson.util.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + +public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> { + + @Override + public Map<String, Object> map(Map<String, Object> value) throws Exception { + value.put("sketch_start_time", TypeUtil.coverMSToS(value.get("sketch_start_time"))); + + value.put("sketch_sessions", TypeUtils.castToLong(value.get("sketch_sessions"))); + value.put("sketch_packets", TypeUtils.castToLong(value.get("sketch_packets"))); + value.put("sketch_bytes", TypeUtils.castToLong(value.get("sketch_bytes"))); + return value; + } + +} |
