diff options
| author | wangkuan <[email protected]> | 2024-08-01 18:22:54 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-01 18:22:54 +0800 |
| commit | c0a2e18f14fd495d80e047b90e5afb0abcb9602c (patch) | |
| tree | 1848dacfa85d31ea1f5e3b7d6a923c43ce0715b1 | |
| parent | 906a4a1bfe7d2dee5ed7144d20f05ba55546b02c (diff) | |
[improve][core]aggregate processor增加window_timestamp_field配置
4 files changed, 11 insertions, 3 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 74239c9..4726af0 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -24,6 +24,7 @@ processing_pipelines: output_fields: group_by_fields: [server_ip,server_port] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_timestamp_field: recv_time window_size: 60 window_slide: 10 #滑动窗口步长 functions: diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java index 3998a3b..0b0379d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -5,6 +5,8 @@ import com.geedgenetworks.common.udf.UDFContext; import java.util.List; +import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP; + public interface AggregateConfigOptions { Option<String> TYPE = Options.key("type") .stringType() @@ -46,4 +48,8 @@ public interface AggregateConfigOptions { .noDefaultValue() .withDescription("The size of sliding window."); + Option<String> WINDOW_TIMESTAMP_FIELD = Options.key("window_timestamp_field") + .stringType() + .noDefaultValue() + .withDescription("which field to be set the start time of window."); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index 8cccbbd..d3cbaac 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -13,10 +13,9 @@ public class AggregateConfig extends ProcessorConfig { private List<String> group_by_fields; - private String timestamp_field; + private String window_timestamp_field; private String window_type; private Integer window_size; - private Integer max_out_of_orderness; private Integer window_slide; private List<UDFContext> functions; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java index eaa712a..cd5c485 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java @@ -44,7 +44,9 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu internalMetrics.incrementOutEvents(); internalMetrics.incrementErrorEvents(accumulator.getErrorCount()); internalMetrics.incrementInEvents(accumulator.getInEvents()); - + if (aggregateConfig.getWindow_timestamp_field() != null) { + event.getExtractedFields().put(aggregateConfig.getWindow_timestamp_field(), context.window().getStart()); + } if (aggregateConfig.getOutput_fields() != null && !aggregateConfig.getOutput_fields().isEmpty()) { event.setExtractedFields( |
