diff options
| author | zhanghongqing <[email protected]> | 2022-07-22 15:35:07 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-07-22 15:35:07 +0800 |
| commit | 9f22443c7c0ff6333b27e21fb0e790e3c7fa837a (patch) | |
| tree | e0e910da424b3e7e4471dc054b3fbf68a5e482e5 /src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java | |
| parent | fa3f628658fc4d9197053e8755c685b3d80ff3b5 (diff) | |
优化代码:统一时间单位为秒,删除部分冗余代码。
Diffstat (limited to 'src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java | 86 |
1 files changed, 45 insertions, 41 deletions
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index e7ac08e..1a075cc 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -40,17 +40,37 @@ public class LogFlowWriteTopology { if (FlowWriteConfig.LOG_TYPE == 1) { //connection DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) + .setParallelism(SOURCE_PARALLELISM) .filter(Objects::nonNull) + .map(new ConnTimeMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH)) .filter(Objects::nonNull) + .map(new SketchTimeMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_SKETCH); + //写入CKsink,批量处理 + if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { + connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + + sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + + } + //transform DataStream<Map<String, Object>> connTransformStream = connSource .assignTimestampsAndWatermarks(WatermarkStrategy @@ -64,6 +84,13 @@ public class LogFlowWriteTopology { .filter(x -> Objects.nonNull(x)) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + connTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) @@ -73,7 +100,6 @@ public class LogFlowWriteTopology { .filter(Objects::nonNull) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - //入Arangodb DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) @@ -83,29 +109,6 @@ public class LogFlowWriteTopology { .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); - //写入CKsink,批量处理 - if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { - connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); - - sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); - } - sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); - //写入arangodb ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) @@ -121,19 +124,6 @@ public class LogFlowWriteTopology { .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); - DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy - .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) - .setParallelism(TRANSFORM_PARALLELISM) - .flatMap(new DnsSplitFlatMapFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .keyBy(new DnsGraphKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) - .process(new DnsRelationProcessFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .filter(Objects::nonNull) - .setParallelism(TRANSFORM_PARALLELISM); - //dns 原始日志 ck入库 if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) @@ -144,8 +134,22 @@ public class LogFlowWriteTopology { .name("CKSink"); } + DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + .assignTimestampsAndWatermarks(WatermarkStrategy + .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) + .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) + .setParallelism(TRANSFORM_PARALLELISM) + .flatMap(new DnsSplitFlatMapFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .keyBy(new DnsGraphKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new DnsRelationProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(Objects::nonNull) + .setParallelism(TRANSFORM_PARALLELISM); + //dns 拆分后relation日志 ck入库 - dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + dnsTransform.filter(Objects::nonNull).windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) .apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) @@ -153,13 +157,13 @@ public class LogFlowWriteTopology { .name("CKSink"); //arango 入库,按record_type分组入不同的表 - DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector()) + DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull).keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) .setParallelism(SINK_PARALLELISM); for (DnsType dnsEnum : DnsType.values()) { - dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) .setParallelism(SINK_PARALLELISM) .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) |
