summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-01 10:32:19 +0000
committer王宽 <[email protected]>2024-08-01 10:32:19 +0000
commitdf126a194232fe5b5f68ba4fc4fa0b7acd628050 (patch)
tree8226a53d125a4bda993e28f4b24ba5ebc55a48a1
parent0b3bc65eb23f3b09b43dd61083631766a0e4d095 (diff)
parentc0a2e18f14fd495d80e047b90e5afb0abcb9602c (diff)
Merge branch 'feature/aggregate' into 'develop'
[improve][core]aggregate processor增加window_timestamp_field配置 See merge request galaxy/platform/groot-stream!85
-rw-r--r--config/grootstream_job_example.yaml1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java4
4 files changed, 11 insertions, 3 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 74239c9..4726af0 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -24,6 +24,7 @@ processing_pipelines:
output_fields:
group_by_fields: [server_ip,server_port]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_timestamp_field: recv_time
window_size: 60
window_slide: 10 #滑动窗口步长
functions:
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
index 3998a3b..0b0379d 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
@@ -5,6 +5,8 @@ import com.geedgenetworks.common.udf.UDFContext;
import java.util.List;
+import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP;
+
public interface AggregateConfigOptions {
Option<String> TYPE = Options.key("type")
.stringType()
@@ -46,4 +48,8 @@ public interface AggregateConfigOptions {
.noDefaultValue()
.withDescription("The size of sliding window.");
+ Option<String> WINDOW_TIMESTAMP_FIELD = Options.key("window_timestamp_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("which field to be set the start time of window.");
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
index 8cccbbd..d3cbaac 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
@@ -13,10 +13,9 @@ public class AggregateConfig extends ProcessorConfig {
private List<String> group_by_fields;
- private String timestamp_field;
+ private String window_timestamp_field;
private String window_type;
private Integer window_size;
- private Integer max_out_of_orderness;
private Integer window_slide;
private List<UDFContext> functions;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
index eaa712a..cd5c485 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
@@ -44,7 +44,9 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu
internalMetrics.incrementOutEvents();
internalMetrics.incrementErrorEvents(accumulator.getErrorCount());
internalMetrics.incrementInEvents(accumulator.getInEvents());
-
+ if (aggregateConfig.getWindow_timestamp_field() != null) {
+ event.getExtractedFields().put(aggregateConfig.getWindow_timestamp_field(), context.window().getStart());
+ }
if (aggregateConfig.getOutput_fields() != null
&& !aggregateConfig.getOutput_fields().isEmpty()) {
event.setExtractedFields(