diff options
Diffstat (limited to 'src')
28 files changed, 368 insertions, 983 deletions
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 59c059e..7795d9e 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -137,4 +137,16 @@ public class FlowWriteConfig { public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch"); public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open"); public static final Integer AGGREGATE_MAX_VALUE_LENGTH = FlowWriteConfigurations.getIntProperty(0, "aggregate.max.value.length"); + + public static final Integer SINK_ARANGODB_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.raw.log.insert.open"); + + + public static final Integer HIKARI_MINIMUM_IDLE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.minimum-idle"); + public static final Integer HIKARI_MAXIMUM_POOL_SIZE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.maximum-pool-size"); + public static final Long HIKARI_IDLE_TIMEOUT = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.idle-timeout"); + public static final Long HIKARI_MAX_LIFETIME = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.max-lifetime"); + public static final Integer HIKARI_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.connection-timeout"); + public static final Integer CK_MAX_THREADS = FlowWriteConfigurations.getIntProperty(1, "ck.max.threads"); + public static final Integer CK_SCHEDULE_ACTUALIZATION = FlowWriteConfigurations.getIntProperty(1, "ck.schedule.actualization"); + }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java deleted file mode 100644 index 947bad8..0000000 --- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.etl; - -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> { - - @Override - public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception { - Iterator<Map<String, Object>> iterator = iterable.iterator(); - List<Map<String, Object>> batchLog = new ArrayList<>(); - while (iterator.hasNext()) { - batchLog.add(iterator.next()); - } - out.collect(batchLog); - } -} diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java deleted file mode 100644 index 66b36a9..0000000 --- a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.zdjizhi.etl; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; - - -/** - * * 带超时的计数窗口触发器 - */ -public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> { - private static Log logger = LogFactory.get(); - - /** - * 窗口最大数据量 - */ - private int maxCount; - /** - * event time / process time - */ - private TimeCharacteristic timeType; - - private String stateName; - - public String getStateName() { - return stateName; - } - - public void setStateName(String stateName) { - this.stateName = stateName; - } - - public CountTriggerWithTimeout(String stateName) { - this.stateName = stateName; - } - - /** - * 用于储存窗口当前数据量的状态对象 - */ - private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE); - - - public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) { - - this.maxCount = maxCount; - this.timeType = timeType; - this.stateName = stateName; - } - - private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { - clear(window, ctx); - return TriggerResult.FIRE_AND_PURGE; - } - - - @Override - public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { - ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); - countState.add(1L); - - - if (countState.get() >= maxCount) { - logger.info("fire with count: " + countState.get()); - return fireAndPurge(window, ctx); - } - if (timestamp >= window.getEnd()) { - logger.info("fire with tiem: " + timestamp); - return fireAndPurge(window, ctx); - } else { - return TriggerResult.CONTINUE; - } - } - - - @Override - public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { - if (timeType != TimeCharacteristic.ProcessingTime) { - return TriggerResult.CONTINUE; - } - - - if (time >= window.getEnd()) { - return TriggerResult.CONTINUE; - } else { - logger.debug("fire with process tiem: " + time); - return fireAndPurge(window, ctx); - } - } - - - @Override - public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { - if (timeType != TimeCharacteristic.EventTime) { - return TriggerResult.CONTINUE; - } - - - if (time >= window.getEnd()) { - return TriggerResult.CONTINUE; - } else { - logger.debug("fire with event tiem: " + time); - return fireAndPurge(window, ctx); - } - } - - - @Override - public void clear(TimeWindow window, TriggerContext ctx) throws Exception { - ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); - countState.clear(); - } - - /** - * 计数方法 - */ - class Sum implements ReduceFunction<Long> { - - - @Override - public Long reduce(Long value1, Long value2) throws Exception { - return value1 + value2; - } - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java index 052a8b3..de5141a 100644 --- a/src/main/java/com/zdjizhi/etl/LogService.java +++ b/src/main/java/com/zdjizhi/etl/LogService.java @@ -1,45 +1,16 @@ package com.zdjizhi.etl; import cn.hutool.json.JSONUtil; -import com.zdjizhi.etl.connection.ArangodbBatchIPWindow; -import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.CKSink; import com.zdjizhi.utils.kafka.KafkaProducer; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import java.util.Map; -import static com.zdjizhi.common.FlowWriteConfig.*; +import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM; public class LogService { - -// @Deprecated -// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { -// -// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) -// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) -// .apply(new CKBatchWindow()) -// .addSink(new ClickhouseSink(sink)) -// .setParallelism(SINK_PARALLELISM) -// .name(sink) -// .setParallelism(SINK_PARALLELISM); -// -// } - - public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { - sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new ArangodbBatchIPWindow()) - .addSink(new ArangoDBSink(sink)) - .setParallelism(SINK_PARALLELISM) - .name(sink) - .setParallelism(SINK_PARALLELISM); - } - public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { sourceStream.map(JSONUtil::toJsonStr) .setParallelism(SINK_PARALLELISM) diff --git a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java deleted file mode 100644 index f5848c4..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.zdjizhi.etl.connection; - -import cn.hutool.core.util.StrUtil; -import com.arangodb.entity.BaseEdgeDocument; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> { - - @Override - public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception { - Iterator<Map<String, Object>> iterator = iterable.iterator(); - List<BaseEdgeDocument> batchLog = new ArrayList<>(); - while (iterator.hasNext()) { - Map<String, Object> next = iterator.next(); - String srcIp = StrUtil.toString(next.get("src_ip")); - String dstIp = StrUtil.toString(next.get("dst_ip")); - BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); - baseEdgeDocument.setKey(String.join("-", srcIp, dstIp)); - baseEdgeDocument.setFrom("src_ip/" + srcIp); - baseEdgeDocument.setTo("dst_ip/" + dstIp); - baseEdgeDocument.addAttribute("src_ip", srcIp); - baseEdgeDocument.addAttribute("dst_ip", dstIp); - baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); - - batchLog.add(baseEdgeDocument); - } - collector.collect(batchLog); - } -} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index b0fff6b..9c5f8e0 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -2,14 +2,14 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import com.alibaba.fastjson.util.TypeUtils; +import com.arangodb.entity.BaseEdgeDocument; import com.zdjizhi.etl.LogService; -import com.zdjizhi.etl.dns.SketchTimeMapFunction; +import com.zdjizhi.utils.arangodb.AGSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; @@ -36,16 +36,21 @@ public class ConnLogService { //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); } else { + LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION); + LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH); LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } - DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); + if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) { - //合并通联和通联sketch - DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); + DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); - //写入arangodb - LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); + //合并通联和通联sketch + DataStream<BaseEdgeDocument> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); + + //写入arangodb + ConnLogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); + } } @@ -59,21 +64,24 @@ public class ConnLogService { String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; - SingleOutputStreamOperator<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + DataStream<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> { if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) { return false; } if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) { - if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || - String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + if (TypeUtils.castToLong(x.get("total_cs_pkts")) < 0 || TypeUtils.castToLong(x.get("total_cs_pkts")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_sc_pkts")) < 0 || TypeUtils.castToLong(x.get("total_sc_pkts")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_cs_bytes")) < 0 || TypeUtils.castToLong(x.get("total_cs_bytes")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("total_sc_bytes")) < 0 || TypeUtils.castToLong(x.get("total_sc_bytes")) == Long.MAX_VALUE) { return false; } return true; } else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) { - if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH || - String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + if (TypeUtils.castToLong(x.get("sketch_sessions")) < 0 || TypeUtils.castToLong(x.get("sketch_sessions")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("sketch_packets")) < 0 || TypeUtils.castToLong(x.get("sketch_packets")) == Long.MAX_VALUE + || TypeUtils.castToLong(x.get("sketch_bytes")) < 0 || TypeUtils.castToLong(x.get("sketch_bytes")) == Long.MAX_VALUE) { return false; } return true; @@ -96,8 +104,10 @@ public class ConnLogService { })) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); return connTransformStream; } @@ -107,18 +117,28 @@ public class ConnLogService { .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) - .process(new SketchProcessFunction()); + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new SketchProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) + .setParallelism(TRANSFORM_PARALLELISM); return sketchTransformStream; } - private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { - DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream) + private static DataStream<BaseEdgeDocument> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { + DataStream<BaseEdgeDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return ip2ipGraph; } + public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception { + sourceStream.addSink(new AGSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } + } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java deleted file mode 100644 index b727535..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java +++ /dev/null @@ -1,162 +0,0 @@ -//package com.zdjizhi.etl.connection; -// -//import cn.hutool.core.convert.Convert; -//import cn.hutool.core.util.StrUtil; -//import com.zdjizhi.etl.LogService; -//import com.zdjizhi.etl.dns.SketchTimeMapFunction; -//import com.zdjizhi.utils.kafka.KafkaConsumer; -//import org.apache.flink.api.common.eventtime.WatermarkStrategy; -//import org.apache.flink.api.common.functions.MapFunction; -//import org.apache.flink.api.java.utils.ParameterTool; -//import org.apache.flink.streaming.api.datastream.DataStream; -//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -//import org.apache.flink.streaming.api.windowing.time.Time; -//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink; -//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; -//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst; -// -//import java.time.Duration; -//import java.util.*; -// -//import static com.zdjizhi.common.FlowWriteConfig.*; -// -// -//public class ConnLogService2 { -// -// public static String convertToCsv(Map<String, Object> map) { -// List<Object> list = new ArrayList<>(); -// list.add("("); -// for (Map.Entry<String, Object> m : map.entrySet()) { -// if (m.getValue() instanceof String) { -// list.add("'" + m.getValue() + "'"); -// } else { -// list.add(m.getValue()); -// } -// } -// list.add(")"); -// String join = StrUtil.join(",", list); -// return join; -// -// } -// -// public static void connLogStream(StreamExecutionEnvironment env) throws Exception { -// //connection -// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); -// //sketch -// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); -// -// //写入CKsink,批量处理 -// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); -// -// Map<String, String> globalParameters = new HashMap<>(); -// // ClickHouse cluster properties -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/"); -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME); -// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN); -// -// // sink common -// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1"); -// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/"); -// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2"); -// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2"); -// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2"); -// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); -// -// // set global paramaters -// ParameterTool parameters = ParameterTool.fromMap(globalParameters); -// env.getConfig().setGlobalJobParameters(parameters); -// -// // Transform 操作 -// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() { -// @Override -// public String map(Map<String, Object> data) throws Exception { -// String s = convertToCsv(data); -// System.err.println(s); -// return convertToCsv(data); -// } -// }); -// -// -// // create props for sink -// Properties props = new Properties(); -// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH); -// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH); -// ClickHouseSink sink = new ClickHouseSink(props); -// dataStream.addSink(sink); -// dataStream.print(); -// -//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); -// -// //transform -// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource); -// -// //写入ck通联relation表 -// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); -// -// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); -// -// //合并通联和通联sketch -// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); -// -// //写入arangodb -// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); -// -// } -// -// /** -// * 通联原始日志数据源消费kafka -// * -// * @param source -// * @return -// */ -// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { -// -// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; -// -// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) -// .setParallelism(SOURCE_PARALLELISM) -// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0) -// .setParallelism(SOURCE_PARALLELISM) -// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) -// .setParallelism(SOURCE_PARALLELISM) -// .name(source) -// .setParallelism(SOURCE_PARALLELISM); -// return sourceStream; -// } -// -// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception { -// DataStream<Map<String, Object>> connTransformStream = connSource -// .assignTimestampsAndWatermarks(WatermarkStrategy -// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) -// .withTimestampAssigner((event, timestamp) -> { -// return Convert.toLong(event.get("conn_start_time")) * 1000; -// })) -// .setParallelism(TRANSFORM_PARALLELISM) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) -// .process(new ConnProcessFunction()) -// .setParallelism(TRANSFORM_PARALLELISM); -// return connTransformStream; -// } -// -// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception { -// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy -// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) -// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) -// .process(new SketchProcessFunction()); -// return sketchTransformStream; -// } -// -// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception { -// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream) -// .keyBy(new IpKeysSelector()) -// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) -// .process(new Ip2IpGraphProcessFunction()) -// .setParallelism(TRANSFORM_PARALLELISM); -// return ip2ipGraph; -// } -// -//} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 91cb47f..9a5245f 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -29,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec try { Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); if (values != null) { - Map<String, Object> result = new HashMap<>(); + Map<String, Object> result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -53,16 +53,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec long endTime = DateUtil.currentSeconds(); try { for (Map<String, Object> newSketchLog : elements) { - long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time")); - if (connStartTimetime > 0) { + long connStartTime = Convert.toLong(newSketchLog.get("conn_start_time")); + if (connStartTime > 0) { sessions++; packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts")); bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; + startTime = Math.min(connStartTime, startTime); + endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); @@ -71,4 +69,5 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec } return null; } + } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java deleted file mode 100644 index e38517a..0000000 --- a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.zdjizhi.etl.connection; - -import cn.hutool.core.convert.Convert; -import cn.hutool.core.date.DateUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.util.TypeUtils; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.tuple.Tuple5; - -import java.util.Map; - - -/** - * @author 94976 - */ -public class ConnReduceFunction implements ReduceFunction<Map<String, Object>> { - - private static final Log logger = LogFactory.get(); - - // -// public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { -// try { -// Tuple5<Long, Long, Long, Long, Long> values = sum(elements); -// if (values != null) { -// Map<String, Object> result = new HashMap<>(); -// result.put("start_time", values.f0); -// result.put("end_time", values.f1); -// result.put("src_ip", keys.f0); -// result.put("dst_ip", keys.f1); -// result.put("sessions", values.f2); -// result.put("packets", values.f3); -// result.put("bytes", values.f4); -// out.collect(result); -// logger.debug("获取中间聚合结果:{}", result.toString()); -// } -// } catch (Exception e) { -// logger.error("获取中间聚合结果失败,middleResult: {}", e); -// } -// } -// - private Tuple5<Long, Long, Long, Long, Long> sum(Map<String, Object> map1, Map<String, Object> map2) { - - try { - long sessions = 0L; - long packets = 0L; - long bytes = 0L; - long startTime = DateUtil.currentSeconds(); - long endTime = DateUtil.currentSeconds(); - - long connStartTime1 = Convert.toLong(map1.get("conn_start_time")); - long connStartTime2 = Convert.toLong(map2.get("conn_start_time")); - if (connStartTime1 > 0 && connStartTime2 > 0) { - sessions++; - packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) + - TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts")); - - bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) + - TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes")); - - startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2; - endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2; - - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; - - } - - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); - } - return null; - } - - @Override - public Map<String, Object> reduce(Map<String, Object> map1, Map<String, Object> map2) throws Exception { - - return null; - } -} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java index e957d65..7becf90 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java @@ -1,6 +1,7 @@ package com.zdjizhi.etl.connection; -import com.zdjizhi.utils.json.TypeUtils; +import com.alibaba.fastjson.util.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; @@ -9,8 +10,13 @@ public class ConnTimeMapFunction implements MapFunction<Map<String, Object>, Map @Override public Map<String, Object> map(Map<String, Object> value) throws Exception { - value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time"))); - value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time"))); + value.put("conn_start_time", TypeUtil.coverMSToS(value.get("conn_start_time"))); + value.put("log_gen_time", TypeUtil.coverMSToS(value.get("log_gen_time"))); + + value.put("total_cs_pkts", TypeUtils.castToLong(value.get("total_cs_pkts"))); + value.put("total_sc_pkts", TypeUtils.castToLong(value.get("total_sc_pkts"))); + value.put("total_cs_bytes", TypeUtils.castToLong(value.get("total_cs_bytes"))); + value.put("total_sc_bytes", TypeUtils.castToLong(value.get("total_sc_bytes"))); return value; } } diff --git a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java index 1b1b0c9..9468e26 100644 --- a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java @@ -2,42 +2,48 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.HashUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.arangodb.entity.BaseEdgeDocument; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; import java.util.Map; /** - * 对ip去重 + * 处理时间,转为图数据 */ -public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> { +public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseEdgeDocument, Tuple2<String, String>, TimeWindow> { private static final Log logger = LogFactory.get(); @Override - public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { + public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseEdgeDocument> out) { try { - long lastFoundTime = DateUtil.currentSeconds();; + long lastFoundTime = DateUtil.currentSeconds(); for (Map<String, Object> log : elements) { long connStartTime = Convert.toLong(log.get("start_time")); - lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime; + lastFoundTime = Math.max(connStartTime, lastFoundTime); } - Map<String, Object> newLog = new HashMap<>(); - newLog.put("src_ip", keys.f0); - newLog.put("dst_ip", keys.f1); - newLog.put("last_found_time", lastFoundTime); - out.collect(newLog); - logger.debug("获取中间聚合结果:{}", newLog.toString()); + + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(keys.f0 + keys.f1))); + baseEdgeDocument.setFrom("src_ip/" + keys.f0); + baseEdgeDocument.setTo("dst_ip/" + keys.f1); + baseEdgeDocument.addAttribute("src_ip", keys.f0); + baseEdgeDocument.addAttribute("dst_ip", keys.f1); + baseEdgeDocument.addAttribute("last_found_time", lastFoundTime); + + out.collect(baseEdgeDocument); + logger.debug("获取中间聚合结果:{}", baseEdgeDocument.toString()); } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}", e); + logger.error("获取中间聚合结果失败,middleResult: {}", e.getMessage()); } } diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index e2dd3f7..b27958a 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -42,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); try { if (values != null) { - Map<String, Object> result = new HashMap<>(); + Map<String, Object> result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -72,12 +72,9 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")); packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")); bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")); - startTime = connStartTime < startTime ? connStartTime : startTime; - endTime = connStartTime > endTime ? connStartTime : endTime; - sessions = sessions > Long.MAX_VALUE ? 0 : sessions; - packets = packets > Long.MAX_VALUE ? 0 : packets; - bytes = bytes > Long.MAX_VALUE ? 0 : bytes; + startTime = Math.min(connStartTime, startTime); + endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java new file mode 100644 index 0000000..ee0d6ae --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/SketchTimeMapFunction.java @@ -0,0 +1,21 @@ +package com.zdjizhi.etl.connection; + +import com.alibaba.fastjson.util.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + +public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> { + + @Override + public Map<String, Object> map(Map<String, Object> value) throws Exception { + value.put("sketch_start_time", TypeUtil.coverMSToS(value.get("sketch_start_time"))); + + value.put("sketch_sessions", TypeUtils.castToLong(value.get("sketch_sessions"))); + value.put("sketch_packets", TypeUtils.castToLong(value.get("sketch_packets"))); + value.put("sketch_bytes", TypeUtils.castToLong(value.get("sketch_bytes"))); + return value; + } + +} diff --git a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java deleted file mode 100644 index ada4b83..0000000 --- a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.zdjizhi.etl.dns; - -import cn.hutool.core.util.StrUtil; -import com.arangodb.entity.BaseEdgeDocument; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> { - - @Override - public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception { - Iterator<Map<String, Object>> iterator = iterable.iterator(); - List<BaseEdgeDocument> batchLog = new ArrayList<>(); - while (iterator.hasNext()) { - Map<String, Object> next = iterator.next(); - String qname = StrUtil.toString(next.get("qname")); - String record = StrUtil.toString(next.get("record")); - BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); - baseEdgeDocument.setKey(String.join("-", qname, record)); - baseEdgeDocument.setFrom("qname/" + qname); - baseEdgeDocument.setTo("record/" + record); - baseEdgeDocument.addAttribute("qname", qname); - baseEdgeDocument.addAttribute("record", record); - baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); - - batchLog.add(baseEdgeDocument); - } - out.collect(batchLog); - } -} diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java new file mode 100644 index 0000000..a467d40 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphMapFunction.java @@ -0,0 +1,28 @@ +package com.zdjizhi.etl.dns; + +import cn.hutool.core.util.HashUtil; +import cn.hutool.core.util.StrUtil; +import com.arangodb.entity.BaseEdgeDocument; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + +public class DnsGraphMapFunction implements MapFunction<Map<String, Object>, BaseEdgeDocument> { + + @Override + public BaseEdgeDocument map(Map<String, Object> value) throws Exception { + + String qname = StrUtil.toString(value.get("qname")); + String record = StrUtil.toString(value.get("record")); + + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(qname + record))); + baseEdgeDocument.setFrom("qname/" + qname); + baseEdgeDocument.setTo("record/" + record); + baseEdgeDocument.addAttribute("qname", qname); + baseEdgeDocument.addAttribute("record", record); + baseEdgeDocument.addAttribute("last_found_time", value.get("last_found_time")); + + return baseEdgeDocument; + } +} diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java index cde34c1..d8b3a36 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java @@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; @@ -28,7 +28,7 @@ public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, O Long startTime = Convert.toLong(log.get("start_time")); tmpTime = startTime > tmpTime ? startTime : tmpTime; } - Map newLog = new HashMap<>(); + Map newLog = new LinkedHashMap<>(); newLog.put("record_type", keys.f0); newLog.put("qname", keys.f1); newLog.put("record", keys.f2); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index b4ab915..7c67d6e 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -2,13 +2,14 @@ package com.zdjizhi.etl.dns; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.ObjectUtil; +import com.arangodb.entity.BaseEdgeDocument; import com.zdjizhi.enums.DnsType; import com.zdjizhi.etl.LogService; +import com.zdjizhi.utils.arangodb.AGSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -32,20 +33,24 @@ public class DnsLogService { //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); } else { - LogService.getLogCKSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); + LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS); + LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); } //arango 入库,按record_type分组入不同的表 - DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull) + DataStream<Map<String, Object>> dnsGraph = dnsTransform .keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) .setParallelism(SINK_PARALLELISM); for (DnsType dnsEnum : DnsType.values()) { - DataStream<Map<String, Object>> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + DataStream<BaseEdgeDocument> dnsRecordData = dnsGraph + .filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + .setParallelism(SINK_PARALLELISM) + .map(new DnsGraphMapFunction()) .setParallelism(SINK_PARALLELISM); - LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink()); + getLogArangoSink(dnsRecordData, dnsEnum.getSink()); } } @@ -64,6 +69,7 @@ public class DnsLogService { private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception { DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + .setParallelism(SOURCE_PARALLELISM) .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) @@ -71,10 +77,17 @@ public class DnsLogService { .flatMap(new DnsSplitFlatMapFunction()) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new DnsGraphKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return dnsTransform; } + public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception { + sourceStream.addSink(new AGSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } + } diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index 1da5a90..9607b00 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -6,7 +6,7 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.google.common.base.Joiner; import com.zdjizhi.enums.DnsType; -import com.zdjizhi.utils.json.TypeUtils; +import com.zdjizhi.utils.json.TypeUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.MapFunction; @@ -25,7 +25,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri @Override public Map<String, Object> map(Map<String, Object> rawLog) throws Exception { try { - rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time"))); + rawLog.put("capture_time", TypeUtil.coverMSToS(rawLog.get("capture_time"))); //qname ,record 转小写 rawLog.put("qname", StringUtils.lowerCase(StrUtil.toString(rawLog.get("qname")))); if (Objects.nonNull(rawLog.get("response"))) { diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java index 8744060..894867f 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java @@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String endTime = logStartTime > endTime ? logStartTime : endTime; } } - Map<String, Object> newDns = new HashMap<>(); + Map<String, Object> newDns = new LinkedHashMap<>(); newDns.put("start_time", startTime); newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION); newDns.put("record_type", keys.f0); diff --git a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java deleted file mode 100644 index cf10a75..0000000 --- a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.zdjizhi.etl.dns; - -import com.zdjizhi.utils.json.TypeUtils; -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.Map; - -public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> { - - @Override - public Map<String, Object> map(Map<String, Object> value) throws Exception { - value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time"))); - return value; - } -} diff --git a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java new file mode 100644 index 0000000..db4b207 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java @@ -0,0 +1,137 @@ +package com.zdjizhi.utils.arangodb; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.arangodb.entity.BaseEdgeDocument; +import org.apache.commons.lang3.time.StopWatch; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.*; + +import static com.zdjizhi.common.FlowWriteConfig.ARANGODB_BATCH; +import static com.zdjizhi.common.FlowWriteConfig.SINK_ARANGODB_BATCH_DELAY_TIME; + +public class AGSink extends RichSinkFunction<BaseEdgeDocument> { + private static final Log log = LogFactory.get(); + + private static final long serialVersionUID = 1L; + + private static final Log logger = LogFactory.get(); + + // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP + private final List<BaseEdgeDocument> ipWithDataList; + // 满足此时间条件写出数据 + private final long insertArangoTimeInterval = SINK_ARANGODB_BATCH_DELAY_TIME; + // 插入的批次 + private final int insertArangoBatchSize = ARANGODB_BATCH; // 开发测试用10条 + private static ArangoDBConnect arangoDBConnect; + private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + // 数据表名 + private String sink; + + public AGSink(String sink) { + this.sink = sink; + this.ipWithDataList = new CopyOnWriteArrayList<>(); + } + + public String getSink() { + return sink; + } + + /** + * Connects to the target database and initializes the prepared statement. + */ + @Override + public void open(Configuration parameters) throws Exception { + arangoDBConnect = ArangoDBConnect.getInstance(); + + if (insertArangoTimeInterval != 0 && insertArangoBatchSize != 1) { + this.scheduler = Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("arangodb-upsert-output-format")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (AGSink.this) { + if (!closed) { + try { + logger.debug("arangodb_flush............."); + flush(ipWithDataList); + } catch (Exception e) { + log.error(e); + } + } + } + }, + insertArangoTimeInterval, + insertArangoTimeInterval, + TimeUnit.MILLISECONDS); + } + + } + + @Override + public final synchronized void invoke(BaseEdgeDocument row, Context context) throws IOException { + ipWithDataList.add(row); + if (ipWithDataList.size() >= this.insertArangoBatchSize) { + try { + flush(ipWithDataList); + } catch (SQLException e) { + logger.error("ck sink invoke flush failed.", e); + } + } + + } + + // 插入数据 + private synchronized void flush(List<BaseEdgeDocument> data) throws SQLException { + if (data.size() > 0) { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + log.debug("开始写入arangodb数据 :{}", data.size()); + arangoDBConnect.overwrite(data, sink); + stopWatch.stop(); + log.debug("总共花费时间 {} ms", stopWatch.getTime()); + log.debug("写入arangodb表{},数据 {}", sink, data.size()); + ipWithDataList.clear(); + } + } + + /** + * Executes prepared statement and closes all resources of this instance. + * + * @throws IOException Thrown, if the input could not be closed properly. + */ + @Override + public synchronized void close() throws IOException { + if (!closed) { + closed = true; + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + if (arangoDBConnect != null) { + try { + flush(ipWithDataList); + } catch (SQLException e) { + log.error("JDBC statement could not be closed: " + e.getMessage()); + } finally { + try { + arangoDBConnect.clean(); + } catch (Exception e) { + log.error("JDBC connection could not be closed: " + e.getMessage()); + } + } + } + + + } + } +} + diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSink.java b/src/main/java/com/zdjizhi/utils/ck/CKSink.java index 95ef69e..d074926 100644 --- a/src/main/java/com/zdjizhi/utils/ck/CKSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/CKSink.java @@ -1,23 +1,6 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.zdjizhi.utils.ck; +import cn.hutool.core.io.IoUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.enums.LogMetadata; @@ -25,10 +8,10 @@ import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import ru.yandex.clickhouse.ClickHousePreparedStatement; import java.io.IOException; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.HashMap; import java.util.List; @@ -44,20 +27,6 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> { private static final long serialVersionUID = 1L; private static final Log logger = LogFactory.get(); - private Connection connection; - private ClickHousePreparedStatement preparedStatement = null; - // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP - private final List<Map> ipWithDataList; - - // 满足此时间条件写出数据 - private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L - // 插入的批次 - private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条 - - private transient volatile boolean closed = false; - private transient ScheduledExecutorService scheduler; - private transient ScheduledFuture<?> scheduledFuture; - private static final Map<String, String[]> logMetadataFields = new HashMap<>(); private static final Map<String, String> logMetadataSql = new HashMap<>(); @@ -68,6 +37,17 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> { } } + // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP + private final CopyOnWriteArrayList<Map> ipWithDataList; + // 满足此时间条件写出数据 + private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L + // 插入的批次 + private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条 + private Connection connection; + private PreparedStatement preparedStatement = null; + private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; // 数据表名 private String sink; @@ -88,19 +68,19 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> { connection = CKUtils.getConnection(); String sql = logMetadataSql.get(sink); log.debug(sql); - preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); + preparedStatement = connection.prepareStatement(sql); if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) { this.scheduler = Executors.newScheduledThreadPool( - 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); + 1, new ExecutorThreadFactory("ck-upsert-output-format")); this.scheduledFuture = this.scheduler.scheduleWithFixedDelay( () -> { synchronized (CKSink.this) { if (!closed) { try { - logger.info("ck_flush............."); - flushClose(); + logger.debug("ck_flush............."); + flush(ipWithDataList); } catch (Exception e) { log.error(e); } @@ -115,7 +95,7 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> { } @Override - public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException { + public synchronized void invoke(Map<String, Object> row, Context context) throws IOException { ipWithDataList.add(row); /** * 1. 将数据写入CK @@ -123,9 +103,10 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> { if (ipWithDataList.size() >= this.insertCkBatchSize) { try { flush(ipWithDataList); - logger.info("insertCkBatchSize"); + + log.debug("ck sink invoke flush "); } catch (SQLException e) { - throw new RuntimeException("Preparation of JDBC statement failed.", e); + logger.error("ck sink invoke flush failed.", e); } } @@ -134,35 +115,29 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> { // 插入数据 private synchronized void flush(List<Map> data) throws SQLException { if (data.size() > 0) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - log.info("开始写入ck数据 :{}", data.size()); - - connection.setAutoCommit(false); + log.debug("ck sink flush "+data.size()); + try { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + log.debug("开始写入ck数据 :{}", data.size()); + String[] logFields = logMetadataFields.get(sink); - String[] logFields = logMetadataFields.get(sink); + for (Map<String, Object> map : data) { - for (Map<String, Object> map : data) { - for (int i = 0; i < logFields.length; i++) { - preparedStatement.setObject(i + 1, map.get(logFields[i])); + for (int i = 0; i < logFields.length; i++) { + preparedStatement.setObject(i + 1, map.get(logFields[i])); + } + preparedStatement.addBatch(); } - preparedStatement.addBatch(); + preparedStatement.executeBatch(); + preparedStatement.clearBatch(); + stopWatch.stop(); + log.debug("总共花费时间 {} ms", stopWatch.getTime()); + log.debug("写入ck表{},数据 {}", sink, data.size()); + ipWithDataList.clear(); + } catch (SQLException e) { + logger.error(e); } - preparedStatement.executeBatch(); - connection.commit(); - preparedStatement.clearBatch(); - stopWatch.stop(); - log.info("总共花费时间 {} ms", stopWatch.getTime()); - log.info("写入ck表{},数据 {}", sink, data.size()); - ipWithDataList.clear(); - } - } - - private synchronized void flushClose() { - try { - flush(ipWithDataList); - } catch (SQLException e) { - log.error("Preparation of JDBC statement failed.", e); } } @@ -179,23 +154,22 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> { scheduledFuture.cancel(false); this.scheduler.shutdown(); } - if (preparedStatement != null) { - flushClose(); try { - preparedStatement.close(); + log.debug("ck sink close flush "); + flush(ipWithDataList); } catch (SQLException e) { log.error("JDBC statement could not be closed: " + e.getMessage()); } finally { - preparedStatement = null; + try { + IoUtil.close(preparedStatement); + CKUtils.close(connection); + } catch (Exception e) { + log.error("JDBC connection could not be closed: " + e.getMessage()); + } } } - try { - CKUtils.close(connection); - } catch (Exception e) { - log.error("JDBC connection could not be closed: " + e.getMessage()); - } } } } diff --git a/src/main/java/com/zdjizhi/utils/ck/CKUtils.java b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java index 7ff48d6..4edff5f 100644 --- a/src/main/java/com/zdjizhi/utils/ck/CKUtils.java +++ b/src/main/java/com/zdjizhi/utils/ck/CKUtils.java @@ -3,6 +3,7 @@ package com.zdjizhi.utils.ck; import cn.hutool.core.io.IoUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zaxxer.hikari.HikariDataSource; import ru.yandex.clickhouse.BalancedClickhouseDataSource; import ru.yandex.clickhouse.settings.ClickHouseProperties; @@ -16,10 +17,8 @@ public class CKUtils { private static final Log log = LogFactory.get(); - private static Connection connection; - public static Connection getConnection() { - + Connection connection = null; try { ClickHouseProperties props = new ClickHouseProperties(); props.setDatabase(CK_DATABASE); @@ -27,19 +26,20 @@ public class CKUtils { props.setPassword(CK_PIN); props.setConnectionTimeout(CK_CONNECTION_TIMEOUT); props.setSocketTimeout(CK_SOCKET_TIMEOUT); - props.setMaxThreads(50); + props.setMaxThreads(CK_MAX_THREADS); BalancedClickhouseDataSource blDataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, props); blDataSource.actualize(); - blDataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测 - -// HikariConfig conf = new HikariConfig(); -// conf.setDataSource(blDataSource); -// conf.setMinimumIdle(1); -// conf.setMaximumPoolSize(20); -// -// HikariDataSource hkDs = new HikariDataSource(conf); - connection = blDataSource.getConnection(); + blDataSource.scheduleActualization(CK_SCHEDULE_ACTUALIZATION, TimeUnit.SECONDS);//开启检测 + + HikariDataSource hkDs = new HikariDataSource(); + hkDs.setDataSource(blDataSource); + hkDs.setMinimumIdle(HIKARI_MINIMUM_IDLE); + hkDs.setMaximumPoolSize(HIKARI_MAXIMUM_POOL_SIZE); + hkDs.setMaxLifetime(HIKARI_MAX_LIFETIME); + hkDs.setIdleTimeout(HIKARI_IDLE_TIMEOUT); + + connection = hkDs.getConnection(); log.debug("get clickhouse connection success"); } catch (SQLException e) { log.error("clickhouse connection error ,{}", e); diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java deleted file mode 100644 index 40b7db6..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.zdjizhi.utils.ck; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.enums.LogMetadata; -import org.apache.commons.lang3.time.StopWatch; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.util.ExecutorThreadFactory; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import ru.yandex.clickhouse.ClickHousePreparedStatement; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - -import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH; -import static com.zdjizhi.common.FlowWriteConfig.SINK_CK_BATCH_DELAY_TIME; - -public class ClickHouseJDBCOutput extends RichSinkFunction<Map<String, Object>> { - private static final Log log = LogFactory.get(); - - private static final long serialVersionUID = 1L; - - private static final Logger logger = LoggerFactory.getLogger(ClickHouseJDBCOutput.class); - - private Connection connection; - private ClickHousePreparedStatement preparedStatement = null; - // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP - private final List<Map> ipWithDataList; - - // 满足此时间条件写出数据 - private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L - // 插入的批次 - private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条 - - private transient volatile boolean closed = false; - private transient ScheduledExecutorService scheduler; - private transient ScheduledFuture<?> scheduledFuture; - - private static final Map<String, String[]> logMetadataFields = new HashMap<>(); - private static final Map<String, String> logMetadataSql = new HashMap<>(); - - static { - for (LogMetadata value : LogMetadata.values()) { - logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink())); - logMetadataFields.put(value.getSink(), value.getFields()); - } - } - - // 数据表名 - private String sink; - - public ClickHouseJDBCOutput(String sink) { - this.sink = sink; - this.ipWithDataList = new CopyOnWriteArrayList<>(); - } - - public String getSink() { - return sink; - } - - /** - * Connects to the target database and initializes the prepared statement. - */ - @Override - public void open(Configuration parameters) throws Exception { - connection = CKUtils.getConnection(); - String sql = logMetadataSql.get(sink); - log.debug(sql); - preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); - - if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) { - this.scheduler = Executors.newScheduledThreadPool( - 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); - this.scheduledFuture = - this.scheduler.scheduleWithFixedDelay( - () -> { - synchronized (ClickHouseJDBCOutput.this) { - if (!closed) { - try { - logger.info("ck_flush............."); - flushClose(); - } catch (Exception e) { -// flushException = e; - log.error(e); - } - } - } - }, - insertCkTimeInterval, - insertCkTimeInterval, - TimeUnit.MILLISECONDS); - } - - } - - @Override - public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException { - ipWithDataList.add(row); - /** - * 1. 将数据写入CK - */ - if (ipWithDataList.size() >= this.insertCkBatchSize) { - try { - flush(ipWithDataList); - logger.info("insertCkBatchSize"); - } catch (SQLException e) { - throw new RuntimeException("Preparation of JDBC statement failed.", e); - } - } - - } - - // 插入数据 - private synchronized void flush(List<Map> data) throws SQLException { - if (data.size() > 0) { -// checkFlushException(); - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - log.info("开始写入ck数据 :{}", data.size()); - - connection.setAutoCommit(false); - - String[] logFields = logMetadataFields.get(sink); - - for (Map<String, Object> map : data) { - for (int i = 0; i < logFields.length; i++) { - preparedStatement.setObject(i + 1, map.get(logFields[i])); - } - preparedStatement.addBatch(); - } - preparedStatement.executeBatch(); - connection.commit(); - preparedStatement.clearBatch(); - stopWatch.stop(); - log.info("总共花费时间 {} ms", stopWatch.getTime()); - log.info("写入ck表{},数据 {}", sink, data.size()); - ipWithDataList.clear(); - } - } - - private synchronized void flushClose() { - try { - flush(ipWithDataList); - } catch (SQLException e) { - log.error("Preparation of JDBC statement failed.", e); - } - } - - /** - * Executes prepared statement and closes all resources of this instance. - * - * @throws IOException Thrown, if the input could not be closed properly. - */ - @Override - public synchronized void close() throws IOException { - if (!closed) { - closed = true; - - if (this.scheduledFuture != null) { - scheduledFuture.cancel(false); - this.scheduler.shutdown(); - } - - if (preparedStatement != null) { - flushClose(); - try { - preparedStatement.close(); - } catch (SQLException e) { - log.error("JDBC statement could not be closed: " + e.getMessage()); - } finally { - preparedStatement = null; - } - } - - try { - CKUtils.close(connection); - } catch (Exception e) { - log.error("JDBC connection could not be closed: " + e.getMessage()); - } - } - } -} - diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java deleted file mode 100644 index 6cf2f60..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java +++ /dev/null @@ -1,106 +0,0 @@ -//package com.zdjizhi.utils.ck; -// -//import cn.hutool.core.io.IoUtil; -//import cn.hutool.log.Log; -//import cn.hutool.log.LogFactory; -//import com.zdjizhi.enums.LogMetadata; -//import org.apache.commons.lang3.time.StopWatch; -//import org.apache.flink.configuration.Configuration; -//import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -//import ru.yandex.clickhouse.ClickHousePreparedStatement; -// -//import java.sql.Connection; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -//import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH; -// -//public class ClickhouseSink2 extends RichSinkFunction<List<Map<String, Object>>> { -// -// private static final Log log = LogFactory.get(); -// -// private Connection connection; -// private ClickHousePreparedStatement preparedStatement; -// public String sink; -// -// private static final Map<String, String[]> logMetadataFields = new HashMap<>(); -// private static final Map<String, String> logMetadataSql = new HashMap<>(); -// -// static { -// for (LogMetadata value : LogMetadata.values()) { -// logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink())); -// logMetadataFields.put(value.getSink(), value.getFields()); -// } -// } -// -// public ClickhouseSink2(String sink) { -// this.sink = sink; -// } -// -// public String getSink() { -// return sink; -// } -// -// public void setSink(String sink) { -// this.sink = sink; -// } -// -// @Override -// public void invoke(List<Map<String, Object>> logs, Context context) throws Exception { -// executeInsert(logs, getSink()); -// } -// -// @Override -// public void open(Configuration parameters) throws Exception { -// connection = CKUtils.getConnection(); -// String sql = logMetadataSql.get(sink); -// log.debug(sql); -// connection.setAutoCommit(false); -// preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); -// } -// -// @Override -// public void close() throws Exception { -// IoUtil.close(preparedStatement); -// CKUtils.close(connection); -// } -// -// public void executeInsert(List<Map<String, Object>> data, String tableName) { -// -// try { -// StopWatch stopWatch = new StopWatch(); -// stopWatch.start(); -// log.info("开始写入ck数据 :{}", data.size()); -// -// String[] logFields = logMetadataFields.get(tableName); -// -// int count = 0; -// for (Map<String, Object> map : data) { -// for (int i = 0; i < logFields.length; i++) { -// preparedStatement.setObject(i + 1, map.get(logFields[i])); -// } -// preparedStatement.addBatch(); -// count++; -// if (count % CK_BATCH == 0) { -// preparedStatement.executeBatch(); -// connection.commit(); -// preparedStatement.clearBatch(); -// count = 0; -// } -// } -// if (count > 0) { -// preparedStatement.executeBatch(); -// connection.commit(); -// preparedStatement.clearBatch(); -// } -// -// stopWatch.stop(); -// log.info("总共花费时间 {} ms", stopWatch.getTime()); -// log.info("写入ck表{},数据 {}", tableName, data.size()); -// } catch (Exception ex) { -// log.error("ClickhouseSink插入报错", ex); -// } -// } -// -//}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java index 0cf16ff..782e0ff 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java @@ -79,7 +79,7 @@ public class JsonTypeUtil { * @return Long value */ static long checkLongValue(Object value) { - Long longVal = TypeUtils.castToLong(value); + Long longVal = TypeUtil.castToLong(value); if (longVal == null) { return 0L; @@ -99,7 +99,7 @@ public class JsonTypeUtil { return null; } - return TypeUtils.castToDouble(value); + return TypeUtil.castToDouble(value); } @@ -111,7 +111,7 @@ public class JsonTypeUtil { */ static int getIntValue(Object value) { - Integer intVal = TypeUtils.castToInt(value); + Integer intVal = TypeUtil.castToInt(value); if (intVal == null) { return 0; } diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtil.java index 1beb1fa..8b1adfa 100644 --- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtil.java @@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit; * @Description: * @date 2021/7/1218:20 */ -public class TypeUtils { +public class TypeUtil { private static final Log logger = LogFactory.get(); /** diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java index 8f3fccf..3aeb499 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java +++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java @@ -31,13 +31,11 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem @Override @SuppressWarnings("unchecked") - public Map<String,Object> deserialize(ConsumerRecord record) throws Exception { + public Map<String, Object> deserialize(ConsumerRecord record) throws Exception { if (record != null) { try { -// long timestamp = record.timestamp() / 1000; String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING); Map<String, Object> data = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class); -// json.put("common_ingestion_time", timestamp); return data; } catch (RuntimeException e) { logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage()); |
