summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-01 10:32:19 +0000
committer王宽 <[email protected]>2024-08-01 10:32:19 +0000
commitdf126a194232fe5b5f68ba4fc4fa0b7acd628050 (patch)
tree8226a53d125a4bda993e28f4b24ba5ebc55a48a1 /groot-core
parent0b3bc65eb23f3b09b43dd61083631766a0e4d095 (diff)
parentc0a2e18f14fd495d80e047b90e5afb0abcb9602c (diff)
Merge branch 'feature/aggregate' into 'develop'
[improve][core]aggregate processor增加window_timestamp_field配置 See merge request galaxy/platform/groot-stream!85
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java4
2 files changed, 4 insertions, 3 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
index 8cccbbd..d3cbaac 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
@@ -13,10 +13,9 @@ public class AggregateConfig extends ProcessorConfig {
private List<String> group_by_fields;
- private String timestamp_field;
+ private String window_timestamp_field;
private String window_type;
private Integer window_size;
- private Integer max_out_of_orderness;
private Integer window_slide;
private List<UDFContext> functions;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
index eaa712a..cd5c485 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
@@ -44,7 +44,9 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu
internalMetrics.incrementOutEvents();
internalMetrics.incrementErrorEvents(accumulator.getErrorCount());
internalMetrics.incrementInEvents(accumulator.getInEvents());
-
+ if (aggregateConfig.getWindow_timestamp_field() != null) {
+ event.getExtractedFields().put(aggregateConfig.getWindow_timestamp_field(), context.window().getStart());
+ }
if (aggregateConfig.getOutput_fields() != null
&& !aggregateConfig.getOutput_fields().isEmpty()) {
event.setExtractedFields(