diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/dns/DnsLogService.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/dns/DnsLogService.java | 10 |
1 files changed, 5 insertions, 5 deletions
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index 7c67d6e..dd5aff3 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -32,9 +32,11 @@ public class DnsLogService { LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); - } else { + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){ LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS); LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){ + LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); } //arango 入库,按record_type分组入不同的表 @@ -57,18 +59,17 @@ public class DnsLogService { private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { - DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + return env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0) .setParallelism(SOURCE_PARALLELISM) .map(new DnsMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(source); - return dnsSource; } private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception { - DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + return dnsSource.filter(x -> Objects.nonNull(x.get("response"))) .setParallelism(SOURCE_PARALLELISM) .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) @@ -80,7 +81,6 @@ public class DnsLogService { .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); - return dnsTransform; } public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception { |
