diff options
| author | lifengchao <[email protected]> | 2024-07-19 10:25:53 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-07-19 10:25:53 +0800 |
| commit | 0e8005ae3b8b45480e4d511c600700ac8de897ae (patch) | |
| tree | 0a16f72cbea5f204745cfac63dab358d6432568c | |
| parent | d17ecc5eebb7eeb48ebd6c04f9fcd7122f866525 (diff) | |
[feature][connector] connector source 添加watermark时间戳属性配置
3 files changed, 99 insertions, 10 deletions
diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 56d3242..08ec673 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -19,11 +19,14 @@ sources: ${prop_key}: ${prop_value} ``` -| Name | Type | Required | Default | Description | -|------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | -| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. | -| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. | +| Name | Type | Required | Default | Description | +|--------------------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | +| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. | +| watermark_timestamp | String | No | (none) | watermark timestamp field name, if need use eventTime. | +| watermark_timestamp_unit | String | No | ms | watermark field timestamp unit, options:ms(milliseconds),s(seconds). is required if watermark_timestamp is not none. | +| watermark_lag | Long | No | (none) | watermark out-of-order milliseconds. is required if watermark_timestamp is not none. | +| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. | ## Schema Field Projection diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index f46e3fc..d4751af 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -2,9 +2,11 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SourceConfigOptions; @@ -19,9 +21,14 @@ import com.geedgenetworks.core.pojo.SourceConfig; import com.google.common.collect.Maps; import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + import java.net.URL; +import java.time.Duration; import java.util.List; import java.util.Map; @@ -81,13 +88,68 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString())); } - sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()); + sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName()); if (node.getParallelism() > 0) { sourceSingleOutputStreamOperator.setParallelism(node.getParallelism()); } - return sourceSingleOutputStreamOperator.name(sourceConfig.getName()); + sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node); + return sourceSingleOutputStreamOperator; } catch (Exception e) { throw new JobExecuteException("Create source instance failed!", e); } } + + private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){ + final String watermarkTimestamp = sourceConfig.getWatermark_timestamp(); + if(StringUtils.isNotBlank(watermarkTimestamp)){ + String timestampUnit = sourceConfig.getWatermark_timestamp_unit(); + Long watermarkLag = sourceConfig.getWatermark_lag(); + if (watermarkLag == null || watermarkLag < 0) { + throw new ConfigCheckException("非法的watermarkLag:" + watermarkLag); + } + SerializableTimestampAssigner<Event> timestampAssigner; + if ("ms".equalsIgnoreCase(timestampUnit)) { + timestampAssigner = (event, timestamp) -> { + Object ms = event.getExtractedFields().get(watermarkTimestamp); + if (ms == null) { + return 0; + } else if (ms instanceof Number) { + return ((Number) ms).longValue(); + } else{ + return str2Long(ms.toString()); + } + }; + } else if ("s".equalsIgnoreCase(timestampUnit)) { + timestampAssigner = (event, timestamp) -> { + Object s = event.getExtractedFields().get(watermarkTimestamp); + if (s == null) { + return 0; + } else if (s instanceof Number) { + return ((Number) s).longValue() * 1000; + } else { + return str2Long(s.toString()) * 1000; + } + }; + } else { + throw new ConfigCheckException("不支持的timestamp_unit:" + timestampUnit); + } + + dataStream = dataStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag)) + .withTimestampAssigner(timestampAssigner) + ); + if (node.getParallelism() > 0) { + dataStream.setParallelism(node.getParallelism()); + } + } + return dataStream; + } + + private static long str2Long(String str){ + try { + return Long.parseLong(str); + } catch (NumberFormatException e) { + return 0; + } + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java index 6d019e9..dd18593 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java @@ -1,14 +1,14 @@ package com.geedgenetworks.core.pojo; -import com.geedgenetworks.core.types.StructType; - import java.io.Serializable; -import java.util.List; import java.util.Map; public class SourceConfig implements Serializable { private String type; private Map<String, Object> schema; + private String watermark_timestamp; + private String watermark_timestamp_unit = "ms"; + private Long watermark_lag; private Map<String, String> properties; private String name; public String getType() { @@ -27,6 +27,30 @@ public class SourceConfig implements Serializable { this.schema = schema; } + public String getWatermark_timestamp() { + return watermark_timestamp; + } + + public void setWatermark_timestamp(String watermark_timestamp) { + this.watermark_timestamp = watermark_timestamp; + } + + public String getWatermark_timestamp_unit() { + return watermark_timestamp_unit; + } + + public void setWatermark_timestamp_unit(String watermark_timestamp_unit) { + this.watermark_timestamp_unit = watermark_timestamp_unit; + } + + public Long getWatermark_lag() { + return watermark_lag; + } + + public void setWatermark_lag(Long watermark_lag) { + this.watermark_lag = watermark_lag; + } + public Map<String, String> getProperties() { return properties; } |
