diff options
Diffstat (limited to 'src/main/java/com/zdjizhi')
3 files changed, 11 insertions, 11 deletions
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index 9c5f8e0..ab07376 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -35,14 +35,15 @@ public class ConnLogService { LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); - } else { + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){ LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION); LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH); LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){ + LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) { - DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch @@ -96,7 +97,7 @@ public class ConnLogService { } private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception { - DataStream<Map<String, Object>> connTransformStream = connSource + return connSource .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> { @@ -109,7 +110,6 @@ public class ConnLogService { .setParallelism(TRANSFORM_PARALLELISM) .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); - return connTransformStream; } private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception { diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index 7c67d6e..dd5aff3 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -32,9 +32,11 @@ public class DnsLogService { LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); - } else { + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){ LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS); LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){ + LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); } //arango 入库,按record_type分组入不同的表 @@ -57,18 +59,17 @@ public class DnsLogService { private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { - DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + return env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0) .setParallelism(SOURCE_PARALLELISM) .map(new DnsMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(source); - return dnsSource; } private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception { - DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + return dnsSource.filter(x -> Objects.nonNull(x.get("response"))) .setParallelism(SOURCE_PARALLELISM) .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) @@ -80,7 +81,6 @@ public class DnsLogService { .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); - return dnsTransform; } public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception { diff --git a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java index db4b207..d4d05a5 100644 --- a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java +++ b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java @@ -24,7 +24,7 @@ public class AGSink extends RichSinkFunction<BaseEdgeDocument> { private static final Log logger = LogFactory.get(); // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP - private final List<BaseEdgeDocument> ipWithDataList; + private final CopyOnWriteArrayList<BaseEdgeDocument> ipWithDataList; // 满足此时间条件写出数据 private final long insertArangoTimeInterval = SINK_ARANGODB_BATCH_DELAY_TIME; // 插入的批次 @@ -82,7 +82,7 @@ public class AGSink extends RichSinkFunction<BaseEdgeDocument> { if (ipWithDataList.size() >= this.insertArangoBatchSize) { try { flush(ipWithDataList); - } catch (SQLException e) { + } catch (Exception e) { logger.error("ck sink invoke flush failed.", e); } } |
