summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-07-19 10:25:53 +0800
committerlifengchao <[email protected]>2024-07-19 10:25:53 +0800
commit0e8005ae3b8b45480e4d511c600700ac8de897ae (patch)
tree0a16f72cbea5f204745cfac63dab358d6432568c /groot-bootstrap
parentd17ecc5eebb7eeb48ebd6c04f9fcd7122f866525 (diff)
[feature][connector] connector source 添加watermark时间戳属性配置
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java66
1 files changed, 64 insertions, 2 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index f46e3fc..d4751af 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -2,9 +2,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SourceConfigOptions;
@@ -19,9 +21,14 @@ import com.geedgenetworks.core.pojo.SourceConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
import java.net.URL;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -81,13 +88,68 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString()));
}
- sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment());
+ sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName());
if (node.getParallelism() > 0) {
sourceSingleOutputStreamOperator.setParallelism(node.getParallelism());
}
- return sourceSingleOutputStreamOperator.name(sourceConfig.getName());
+ sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node);
+ return sourceSingleOutputStreamOperator;
} catch (Exception e) {
throw new JobExecuteException("Create source instance failed!", e);
}
}
+
+ private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){
+ final String watermarkTimestamp = sourceConfig.getWatermark_timestamp();
+ if(StringUtils.isNotBlank(watermarkTimestamp)){
+ String timestampUnit = sourceConfig.getWatermark_timestamp_unit();
+ Long watermarkLag = sourceConfig.getWatermark_lag();
+ if (watermarkLag == null || watermarkLag < 0) {
+ throw new ConfigCheckException("非法的watermarkLag:" + watermarkLag);
+ }
+ SerializableTimestampAssigner<Event> timestampAssigner;
+ if ("ms".equalsIgnoreCase(timestampUnit)) {
+ timestampAssigner = (event, timestamp) -> {
+ Object ms = event.getExtractedFields().get(watermarkTimestamp);
+ if (ms == null) {
+ return 0;
+ } else if (ms instanceof Number) {
+ return ((Number) ms).longValue();
+ } else{
+ return str2Long(ms.toString());
+ }
+ };
+ } else if ("s".equalsIgnoreCase(timestampUnit)) {
+ timestampAssigner = (event, timestamp) -> {
+ Object s = event.getExtractedFields().get(watermarkTimestamp);
+ if (s == null) {
+ return 0;
+ } else if (s instanceof Number) {
+ return ((Number) s).longValue() * 1000;
+ } else {
+ return str2Long(s.toString()) * 1000;
+ }
+ };
+ } else {
+ throw new ConfigCheckException("不支持的timestamp_unit:" + timestampUnit);
+ }
+
+ dataStream = dataStream.assignTimestampsAndWatermarks(
+ WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag))
+ .withTimestampAssigner(timestampAssigner)
+ );
+ if (node.getParallelism() > 0) {
+ dataStream.setParallelism(node.getParallelism());
+ }
+ }
+ return dataStream;
+ }
+
+ private static long str2Long(String str){
+ try {
+ return Long.parseLong(str);
+ } catch (NumberFormatException e) {
+ return 0;
+ }
+ }
}