diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 0ed5052..7ca7acb 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -4,9 +4,11 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.util.ObjectUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.*; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.enums.DnsType; -import com.zdjizhi.etl.*; +import com.zdjizhi.etl.CKBatchWindow; +import com.zdjizhi.etl.connection.*; +import com.zdjizhi.etl.dns.*; import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.ClickhouseSink; import com.zdjizhi.utils.kafka.KafkaConsumer; @@ -49,7 +51,7 @@ public class LogFlowWriteTopology { //transform DataStream<Map<String, Object>> connTransformStream = connSource .assignTimestampsAndWatermarks(WatermarkStrategy - .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) + .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) @@ -58,7 +60,7 @@ public class LogFlowWriteTopology { .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy - .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) + .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) @@ -72,16 +74,16 @@ public class LogFlowWriteTopology { .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) -// .filter(Objects::nonNull) + .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //写入CKsink,批量处理 - connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); - sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); - sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); + sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); + sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); //写入arangodb - ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); + ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) { @@ -92,7 +94,7 @@ public class LogFlowWriteTopology { .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy - .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS)) + .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) .flatMap(new DnsSplitFlatMapFunction()) .keyBy(new DnsGraphKeysSelector()) @@ -104,13 +106,13 @@ public class LogFlowWriteTopology { //dns 原始日志 ck入库 dnsSource.filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); //dns 拆分后relation日志 ck入库 - dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) + dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) .setParallelism(SINK_PARALLELISM) .name("CKSink"); @@ -119,12 +121,11 @@ public class LogFlowWriteTopology { DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) - .setParallelism(SINK_PARALLELISM) - .filter(Objects::nonNull); + .setParallelism(SINK_PARALLELISM); for (DnsType dnsEnum : DnsType.values()) { dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow()) + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow()) .addSink(new ArangoDBSink(dnsEnum.getSink())) .setParallelism(SINK_PARALLELISM) .name("ArangodbSink"); |
