summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-07-19 10:25:53 +0800
committerlifengchao <[email protected]>2024-07-19 10:25:53 +0800
commit0e8005ae3b8b45480e4d511c600700ac8de897ae (patch)
tree0a16f72cbea5f204745cfac63dab358d6432568c
parentd17ecc5eebb7eeb48ebd6c04f9fcd7122f866525 (diff)
[feature][connector] connector source 添加watermark时间戳属性配置
-rw-r--r--docs/connector/connector.md13
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java66
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java30
3 files changed, 99 insertions, 10 deletions
diff --git a/docs/connector/connector.md b/docs/connector/connector.md
index 56d3242..08ec673 100644
--- a/docs/connector/connector.md
+++ b/docs/connector/connector.md
@@ -19,11 +19,14 @@ sources:
${prop_key}: ${prop_value}
```
-| Name | Type | Required | Default | Description |
-|------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
-| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. |
-| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. |
+| Name | Type | Required | Default | Description |
+|--------------------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
+| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. |
+| watermark_timestamp | String | No | (none) | watermark timestamp field name, if need use eventTime. |
+| watermark_timestamp_unit | String | No | ms | watermark field timestamp unit, options:ms(milliseconds),s(seconds). is required if watermark_timestamp is not none. |
+| watermark_lag | Long | No | (none) | watermark out-of-order milliseconds. is required if watermark_timestamp is not none. |
+| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. |
## Schema Field Projection
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index f46e3fc..d4751af 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -2,9 +2,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SourceConfigOptions;
@@ -19,9 +21,14 @@ import com.geedgenetworks.core.pojo.SourceConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
import java.net.URL;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -81,13 +88,68 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString()));
}
- sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment());
+ sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName());
if (node.getParallelism() > 0) {
sourceSingleOutputStreamOperator.setParallelism(node.getParallelism());
}
- return sourceSingleOutputStreamOperator.name(sourceConfig.getName());
+ sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node);
+ return sourceSingleOutputStreamOperator;
} catch (Exception e) {
throw new JobExecuteException("Create source instance failed!", e);
}
}
+
+ private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){
+ final String watermarkTimestamp = sourceConfig.getWatermark_timestamp();
+ if(StringUtils.isNotBlank(watermarkTimestamp)){
+ String timestampUnit = sourceConfig.getWatermark_timestamp_unit();
+ Long watermarkLag = sourceConfig.getWatermark_lag();
+ if (watermarkLag == null || watermarkLag < 0) {
+ throw new ConfigCheckException("非法的watermarkLag:" + watermarkLag);
+ }
+ SerializableTimestampAssigner<Event> timestampAssigner;
+ if ("ms".equalsIgnoreCase(timestampUnit)) {
+ timestampAssigner = (event, timestamp) -> {
+ Object ms = event.getExtractedFields().get(watermarkTimestamp);
+ if (ms == null) {
+ return 0;
+ } else if (ms instanceof Number) {
+ return ((Number) ms).longValue();
+ } else{
+ return str2Long(ms.toString());
+ }
+ };
+ } else if ("s".equalsIgnoreCase(timestampUnit)) {
+ timestampAssigner = (event, timestamp) -> {
+ Object s = event.getExtractedFields().get(watermarkTimestamp);
+ if (s == null) {
+ return 0;
+ } else if (s instanceof Number) {
+ return ((Number) s).longValue() * 1000;
+ } else {
+ return str2Long(s.toString()) * 1000;
+ }
+ };
+ } else {
+ throw new ConfigCheckException("不支持的timestamp_unit:" + timestampUnit);
+ }
+
+ dataStream = dataStream.assignTimestampsAndWatermarks(
+ WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag))
+ .withTimestampAssigner(timestampAssigner)
+ );
+ if (node.getParallelism() > 0) {
+ dataStream.setParallelism(node.getParallelism());
+ }
+ }
+ return dataStream;
+ }
+
+ private static long str2Long(String str){
+ try {
+ return Long.parseLong(str);
+ } catch (NumberFormatException e) {
+ return 0;
+ }
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
index 6d019e9..dd18593 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
@@ -1,14 +1,14 @@
package com.geedgenetworks.core.pojo;
-import com.geedgenetworks.core.types.StructType;
-
import java.io.Serializable;
-import java.util.List;
import java.util.Map;
public class SourceConfig implements Serializable {
private String type;
private Map<String, Object> schema;
+ private String watermark_timestamp;
+ private String watermark_timestamp_unit = "ms";
+ private Long watermark_lag;
private Map<String, String> properties;
private String name;
public String getType() {
@@ -27,6 +27,30 @@ public class SourceConfig implements Serializable {
this.schema = schema;
}
+ public String getWatermark_timestamp() {
+ return watermark_timestamp;
+ }
+
+ public void setWatermark_timestamp(String watermark_timestamp) {
+ this.watermark_timestamp = watermark_timestamp;
+ }
+
+ public String getWatermark_timestamp_unit() {
+ return watermark_timestamp_unit;
+ }
+
+ public void setWatermark_timestamp_unit(String watermark_timestamp_unit) {
+ this.watermark_timestamp_unit = watermark_timestamp_unit;
+ }
+
+ public Long getWatermark_lag() {
+ return watermark_lag;
+ }
+
+ public void setWatermark_lag(Long watermark_lag) {
+ this.watermark_lag = watermark_lag;
+ }
+
public Map<String, String> getProperties() {
return properties;
}