summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-22 15:35:07 +0800
committerzhanghongqing <[email protected]>2022-07-22 15:35:07 +0800
commit9f22443c7c0ff6333b27e21fb0e790e3c7fa837a (patch)
treee0e910da424b3e7e4471dc054b3fbf68a5e482e5 /src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
parentfa3f628658fc4d9197053e8755c685b3d80ff3b5 (diff)
优化代码:统一时间单位为秒,删除部分冗余代码。
Diffstat (limited to 'src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java')
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java86
1 files changed, 45 insertions, 41 deletions
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index e7ac08e..1a075cc 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -40,17 +40,37 @@ public class LogFlowWriteTopology {
if (FlowWriteConfig.LOG_TYPE == 1) {
//connection
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
- .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
+ .setParallelism(SOURCE_PARALLELISM)
.filter(Objects::nonNull)
+ .map(new ConnTimeMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
.filter(Objects::nonNull)
+ .map(new SketchTimeMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_SKETCH);
+ //写入CKsink,批量处理
+ if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
+ connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
+
+ sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
+
+ }
+
//transform
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
@@ -64,6 +84,13 @@ public class LogFlowWriteTopology {
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+ connTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
+
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
@@ -73,7 +100,6 @@ public class LogFlowWriteTopology {
.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
//入Arangodb
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
@@ -83,29 +109,6 @@ public class LogFlowWriteTopology {
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
- //写入CKsink,批量处理
- if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
- connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
-
- sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
- }
- sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
-
//写入arangodb
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
@@ -121,19 +124,6 @@ public class LogFlowWriteTopology {
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
- DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
- .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
- .setParallelism(TRANSFORM_PARALLELISM)
- .flatMap(new DnsSplitFlatMapFunction())
- .setParallelism(TRANSFORM_PARALLELISM)
- .keyBy(new DnsGraphKeysSelector())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
- .process(new DnsRelationProcessFunction())
- .setParallelism(TRANSFORM_PARALLELISM)
- .filter(Objects::nonNull)
- .setParallelism(TRANSFORM_PARALLELISM);
-
//dns 原始日志 ck入库
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
@@ -144,8 +134,22 @@ public class LogFlowWriteTopology {
.name("CKSink");
}
+ DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
+ .assignTimestampsAndWatermarks(WatermarkStrategy
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
+ .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .flatMap(new DnsSplitFlatMapFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .keyBy(new DnsGraphKeysSelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
+ .process(new DnsRelationProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
+ .filter(Objects::nonNull)
+ .setParallelism(TRANSFORM_PARALLELISM);
+
//dns 拆分后relation日志 ck入库
- dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ dnsTransform.filter(Objects::nonNull).windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
@@ -153,13 +157,13 @@ public class LogFlowWriteTopology {
.name("CKSink");
//arango 入库,按record_type分组入不同的表
- DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
+ DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull).keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
- dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
+ dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.setParallelism(SINK_PARALLELISM)
.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))