diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/connection')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/connection/ConnLogService.java | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index 9c5f8e0..ab07376 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -35,14 +35,15 @@ public class ConnLogService { LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); - } else { + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){ LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION); LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH); LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){ + LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) { - DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch @@ -96,7 +97,7 @@ public class ConnLogService { } private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception { - DataStream<Map<String, Object>> connTransformStream = connSource + return connSource .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> { @@ -109,7 +110,6 @@ public class ConnLogService { .setParallelism(TRANSFORM_PARALLELISM) .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); - return connTransformStream; } private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception { |
