diff options
| author | 王宽 <[email protected]> | 2024-08-01 10:32:19 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-01 10:32:19 +0000 |
| commit | df126a194232fe5b5f68ba4fc4fa0b7acd628050 (patch) | |
| tree | 8226a53d125a4bda993e28f4b24ba5ebc55a48a1 /groot-core | |
| parent | 0b3bc65eb23f3b09b43dd61083631766a0e4d095 (diff) | |
| parent | c0a2e18f14fd495d80e047b90e5afb0abcb9602c (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[improve][core]aggregate processor增加window_timestamp_field配置
See merge request galaxy/platform/groot-stream!85
Diffstat (limited to 'groot-core')
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java | 3 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java | 4 |
2 files changed, 4 insertions, 3 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index 8cccbbd..d3cbaac 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -13,10 +13,9 @@ public class AggregateConfig extends ProcessorConfig { private List<String> group_by_fields; - private String timestamp_field; + private String window_timestamp_field; private String window_type; private Integer window_size; - private Integer max_out_of_orderness; private Integer window_slide; private List<UDFContext> functions; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java index eaa712a..cd5c485 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java @@ -44,7 +44,9 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu internalMetrics.incrementOutEvents(); internalMetrics.incrementErrorEvents(accumulator.getErrorCount()); internalMetrics.incrementInEvents(accumulator.getInEvents()); - + if (aggregateConfig.getWindow_timestamp_field() != null) { + event.getExtractedFields().put(aggregateConfig.getWindow_timestamp_field(), context.window().getStart()); + } if (aggregateConfig.getOutput_fields() != null && !aggregateConfig.getOutput_fields().isEmpty()) { event.setExtractedFields( |
