summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java')
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java31
1 files changed, 16 insertions, 15 deletions
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index 0ed5052..7ca7acb 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -4,9 +4,11 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.*;
+import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.enums.DnsType;
-import com.zdjizhi.etl.*;
+import com.zdjizhi.etl.CKBatchWindow;
+import com.zdjizhi.etl.connection.*;
+import com.zdjizhi.etl.dns.*;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
@@ -49,7 +51,7 @@ public class LogFlowWriteTopology {
//transform
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
@@ -58,7 +60,7 @@ public class LogFlowWriteTopology {
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
@@ -72,16 +74,16 @@ public class LogFlowWriteTopology {
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
-// .filter(Objects::nonNull)
+ .filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入CKsink,批量处理
- connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
- sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
- sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+ connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
+ sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
+ sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
//写入arangodb
- ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
+ ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
@@ -92,7 +94,7 @@ public class LogFlowWriteTopology {
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS))
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.flatMap(new DnsSplitFlatMapFunction())
.keyBy(new DnsGraphKeysSelector())
@@ -104,13 +106,13 @@ public class LogFlowWriteTopology {
//dns 原始日志 ck入库
dnsSource.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
//dns 拆分后relation日志 ck入库
- dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
+ dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
.setParallelism(SINK_PARALLELISM)
.name("CKSink");
@@ -119,12 +121,11 @@ public class LogFlowWriteTopology {
DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
- .setParallelism(SINK_PARALLELISM)
- .filter(Objects::nonNull);
+ .setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow())
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow())
.addSink(new ArangoDBSink(dnsEnum.getSink()))
.setParallelism(SINK_PARALLELISM)
.name("ArangodbSink");