diff options
| author | 王宽 <[email protected]> | 2024-08-01 10:32:19 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-01 10:32:19 +0000 |
| commit | df126a194232fe5b5f68ba4fc4fa0b7acd628050 (patch) | |
| tree | 8226a53d125a4bda993e28f4b24ba5ebc55a48a1 | |
| parent | 0b3bc65eb23f3b09b43dd61083631766a0e4d095 (diff) | |
| parent | c0a2e18f14fd495d80e047b90e5afb0abcb9602c (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[improve][core]aggregate processor增加window_timestamp_field配置
See merge request galaxy/platform/groot-stream!85
4 files changed, 11 insertions, 3 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 74239c9..4726af0 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -24,6 +24,7 @@ processing_pipelines: output_fields: group_by_fields: [server_ip,server_port] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_timestamp_field: recv_time window_size: 60 window_slide: 10 #滑动窗口步长 functions: diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java index 3998a3b..0b0379d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -5,6 +5,8 @@ import com.geedgenetworks.common.udf.UDFContext; import java.util.List; +import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP; + public interface AggregateConfigOptions { Option<String> TYPE = Options.key("type") .stringType() @@ -46,4 +48,8 @@ public interface AggregateConfigOptions { .noDefaultValue() .withDescription("The size of sliding window."); + Option<String> WINDOW_TIMESTAMP_FIELD = Options.key("window_timestamp_field") + .stringType() + .noDefaultValue() + .withDescription("which field to be set the start time of window."); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index 8cccbbd..d3cbaac 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -13,10 +13,9 @@ public class AggregateConfig extends ProcessorConfig { private List<String> group_by_fields; - private String timestamp_field; + private String window_timestamp_field; private String window_type; private Integer window_size; - private Integer max_out_of_orderness; private Integer window_slide; private List<UDFContext> functions; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java index eaa712a..cd5c485 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java @@ -44,7 +44,9 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu internalMetrics.incrementOutEvents(); internalMetrics.incrementErrorEvents(accumulator.getErrorCount()); internalMetrics.incrementInEvents(accumulator.getInEvents()); - + if (aggregateConfig.getWindow_timestamp_field() != null) { + event.getExtractedFields().put(aggregateConfig.getWindow_timestamp_field(), context.window().getStart()); + } if (aggregateConfig.getOutput_fields() != null && !aggregateConfig.getOutput_fields().isEmpty()) { event.setExtractedFields( |
