diff options
| author | 王宽 <[email protected]> | 2024-07-24 10:21:46 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-07-24 10:21:46 +0000 |
| commit | 05bf631f8562a357fda67891973c098aa32b2763 (patch) | |
| tree | 7eb4da72f1914e5448163d37ffc65b7ca9eb3b3b | |
| parent | b593e070f20c3097a21765229f87c444f3a60bad (diff) | |
| parent | bd422c028ad28ef8f1c3bb237d4ba5f7fa0bcb65 (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[improve][core]聚合函数支持配置output_fields和remove_fields
See merge request galaxy/platform/groot-stream!80
5 files changed, 38 insertions, 16 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index 8111f09..8cccbbd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -4,11 +4,12 @@ import com.geedgenetworks.common.udf.UDFContext; import lombok.Data; import lombok.EqualsAndHashCode; +import java.io.Serializable; import java.util.List; import java.util.Map; @EqualsAndHashCode(callSuper = true) @Data -public class AggregateConfig extends ProcessorConfig{ +public class AggregateConfig extends ProcessorConfig { private List<String> group_by_fields; @@ -18,6 +19,5 @@ public class AggregateConfig extends ProcessorConfig{ private Integer max_out_of_orderness; private Integer window_slide; private List<UDFContext> functions; - private String[] output_fields; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java index dacbd78..18fb300 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java @@ -2,11 +2,15 @@ package com.geedgenetworks.core.pojo; import lombok.Data; +import java.io.Serializable; +import java.util.List; import java.util.Map; @Data -public class ProcessorConfig { +public class ProcessorConfig implements Serializable { private String type; private int parallelism; private Map<String, Object> properties; private String name; + private List<String> output_fields; + private List<String> remove_fields; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java index 4670fd8..48daefd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java @@ -9,12 +9,9 @@ import java.util.List; import java.util.Map; @EqualsAndHashCode(callSuper = true) @Data -public class ProjectionConfig extends ProcessorConfig implements Serializable { +public class ProjectionConfig extends ProcessorConfig { private List<UDFContext> functions; private String format; - private List<String> output_fields; - private List<String> remove_fields; - } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index 43fe20d..2f086b0 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -25,26 +25,26 @@ public class AggregateProcessorImpl implements AggregateProcessor { if (aggregateConfig.getParallelism() != 0) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } }else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java index 46c9607..eaa712a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java @@ -3,7 +3,9 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.KeybyEntity; +import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.AggregateConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -17,9 +19,14 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu Event, // 输出类型 KeybyEntity, // 键类型 TimeWindow> { - + private final AggregateConfig aggregateConfig; private transient InternalMetrics internalMetrics; + public ProcessWindowFunctionImpl(AggregateConfig aggregateConfig) { + this.aggregateConfig = aggregateConfig; + } + + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); @@ -37,6 +44,20 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu internalMetrics.incrementOutEvents(); internalMetrics.incrementErrorEvents(accumulator.getErrorCount()); internalMetrics.incrementInEvents(accumulator.getInEvents()); + + if (aggregateConfig.getOutput_fields() != null + && !aggregateConfig.getOutput_fields().isEmpty()) { + event.setExtractedFields( + ColumnUtil.columnSelector( + event.getExtractedFields(), aggregateConfig.getOutput_fields())); + } + if (aggregateConfig.getRemove_fields() != null + && !aggregateConfig.getRemove_fields().isEmpty()) { + event.setExtractedFields( + ColumnUtil.columnRemover( + event.getExtractedFields(), aggregateConfig.getRemove_fields())); + } + out.collect(event); } } |
