summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/connection/ConnLogService.java')
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java8
1 files changed, 4 insertions, 4 deletions
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
index 9c5f8e0..ab07376 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -35,14 +35,15 @@ public class ConnLogService {
LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
- } else {
+ } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){
LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION);
LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH);
LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
+ } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){
+ LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
}
if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) {
-
DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
//合并通联和通联sketch
@@ -96,7 +97,7 @@ public class ConnLogService {
}
private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
- DataStream<Map<String, Object>> connTransformStream = connSource
+ return connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> {
@@ -109,7 +110,6 @@ public class ConnLogService {
.setParallelism(TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
.setParallelism(TRANSFORM_PARALLELISM);
- return connTransformStream;
}
private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {