summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-07-24 10:21:46 +0000
committer王宽 <[email protected]>2024-07-24 10:21:46 +0000
commit05bf631f8562a357fda67891973c098aa32b2763 (patch)
tree7eb4da72f1914e5448163d37ffc65b7ca9eb3b3b
parentb593e070f20c3097a21765229f87c444f3a60bad (diff)
parentbd422c028ad28ef8f1c3bb237d4ba5f7fa0bcb65 (diff)
Merge branch 'feature/aggregate' into 'develop'
[improve][core]聚合函数支持配置output_fields和remove_fields See merge request galaxy/platform/groot-stream!80
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java23
5 files changed, 38 insertions, 16 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
index 8111f09..8cccbbd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
@@ -4,11 +4,12 @@ import com.geedgenetworks.common.udf.UDFContext;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
-public class AggregateConfig extends ProcessorConfig{
+public class AggregateConfig extends ProcessorConfig {
private List<String> group_by_fields;
@@ -18,6 +19,5 @@ public class AggregateConfig extends ProcessorConfig{
private Integer max_out_of_orderness;
private Integer window_slide;
private List<UDFContext> functions;
- private String[] output_fields;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java
index dacbd78..18fb300 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java
@@ -2,11 +2,15 @@ package com.geedgenetworks.core.pojo;
import lombok.Data;
+import java.io.Serializable;
+import java.util.List;
import java.util.Map;
@Data
-public class ProcessorConfig {
+public class ProcessorConfig implements Serializable {
private String type;
private int parallelism;
private Map<String, Object> properties;
private String name;
+ private List<String> output_fields;
+ private List<String> remove_fields;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
index 4670fd8..48daefd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
@@ -9,12 +9,9 @@ import java.util.List;
import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
-public class ProjectionConfig extends ProcessorConfig implements Serializable {
+public class ProjectionConfig extends ProcessorConfig {
private List<UDFContext> functions;
private String format;
- private List<String> output_fields;
- private List<String> remove_fields;
-
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index 43fe20d..2f086b0 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -25,26 +25,26 @@ public class AggregateProcessorImpl implements AggregateProcessor {
if (aggregateConfig.getParallelism() != 0) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}else {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
index 46c9607..eaa712a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
@@ -3,7 +3,9 @@ package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.KeybyEntity;
+import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.core.pojo.AggregateConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -17,9 +19,14 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu
Event, // 输出类型
KeybyEntity, // 键类型
TimeWindow> {
-
+ private final AggregateConfig aggregateConfig;
private transient InternalMetrics internalMetrics;
+ public ProcessWindowFunctionImpl(AggregateConfig aggregateConfig) {
+ this.aggregateConfig = aggregateConfig;
+ }
+
+
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
@@ -37,6 +44,20 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu
internalMetrics.incrementOutEvents();
internalMetrics.incrementErrorEvents(accumulator.getErrorCount());
internalMetrics.incrementInEvents(accumulator.getInEvents());
+
+ if (aggregateConfig.getOutput_fields() != null
+ && !aggregateConfig.getOutput_fields().isEmpty()) {
+ event.setExtractedFields(
+ ColumnUtil.columnSelector(
+ event.getExtractedFields(), aggregateConfig.getOutput_fields()));
+ }
+ if (aggregateConfig.getRemove_fields() != null
+ && !aggregateConfig.getRemove_fields().isEmpty()) {
+ event.setExtractedFields(
+ ColumnUtil.columnRemover(
+ event.getExtractedFields(), aggregateConfig.getRemove_fields()));
+ }
+
out.collect(event);
}
}