diff options
| author | lifengchao <[email protected]> | 2024-07-19 10:25:53 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-07-19 10:25:53 +0800 |
| commit | 0e8005ae3b8b45480e4d511c600700ac8de897ae (patch) | |
| tree | 0a16f72cbea5f204745cfac63dab358d6432568c /groot-bootstrap | |
| parent | d17ecc5eebb7eeb48ebd6c04f9fcd7122f866525 (diff) | |
[feature][connector] connector source 添加watermark时间戳属性配置
Diffstat (limited to 'groot-bootstrap')
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java | 66 |
1 files changed, 64 insertions, 2 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index f46e3fc..d4751af 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -2,9 +2,11 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SourceConfigOptions; @@ -19,9 +21,14 @@ import com.geedgenetworks.core.pojo.SourceConfig; import com.google.common.collect.Maps; import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + import java.net.URL; +import java.time.Duration; import java.util.List; import java.util.Map; @@ -81,13 +88,68 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString())); } - sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()); + sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName()); if (node.getParallelism() > 0) { sourceSingleOutputStreamOperator.setParallelism(node.getParallelism()); } - return sourceSingleOutputStreamOperator.name(sourceConfig.getName()); + sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node); + return sourceSingleOutputStreamOperator; } catch (Exception e) { throw new JobExecuteException("Create source instance failed!", e); } } + + private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){ + final String watermarkTimestamp = sourceConfig.getWatermark_timestamp(); + if(StringUtils.isNotBlank(watermarkTimestamp)){ + String timestampUnit = sourceConfig.getWatermark_timestamp_unit(); + Long watermarkLag = sourceConfig.getWatermark_lag(); + if (watermarkLag == null || watermarkLag < 0) { + throw new ConfigCheckException("非法的watermarkLag:" + watermarkLag); + } + SerializableTimestampAssigner<Event> timestampAssigner; + if ("ms".equalsIgnoreCase(timestampUnit)) { + timestampAssigner = (event, timestamp) -> { + Object ms = event.getExtractedFields().get(watermarkTimestamp); + if (ms == null) { + return 0; + } else if (ms instanceof Number) { + return ((Number) ms).longValue(); + } else{ + return str2Long(ms.toString()); + } + }; + } else if ("s".equalsIgnoreCase(timestampUnit)) { + timestampAssigner = (event, timestamp) -> { + Object s = event.getExtractedFields().get(watermarkTimestamp); + if (s == null) { + return 0; + } else if (s instanceof Number) { + return ((Number) s).longValue() * 1000; + } else { + return str2Long(s.toString()) * 1000; + } + }; + } else { + throw new ConfigCheckException("不支持的timestamp_unit:" + timestampUnit); + } + + dataStream = dataStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag)) + .withTimestampAssigner(timestampAssigner) + ); + if (node.getParallelism() > 0) { + dataStream.setParallelism(node.getParallelism()); + } + } + return dataStream; + } + + private static long str2Long(String str){ + try { + return Long.parseLong(str); + } catch (NumberFormatException e) { + return 0; + } + } } |
