From bd422c028ad28ef8f1c3bb237d4ba5f7fa0bcb65 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Wed, 24 Jul 2024 18:13:51 +0800 Subject: [improve][core]聚合函数支持配置output_fields和remove_fields MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../geedgenetworks/core/pojo/AggregateConfig.java | 4 ++-- .../geedgenetworks/core/pojo/ProcessorConfig.java | 6 +++++- .../geedgenetworks/core/pojo/ProjectionConfig.java | 5 +---- .../aggregate/AggregateProcessorImpl.java | 16 +++++++-------- .../aggregate/ProcessWindowFunctionImpl.java | 23 +++++++++++++++++++++- 5 files changed, 38 insertions(+), 16 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index 8111f09..8cccbbd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -4,11 +4,12 @@ import com.geedgenetworks.common.udf.UDFContext; import lombok.Data; import lombok.EqualsAndHashCode; +import java.io.Serializable; import java.util.List; import java.util.Map; @EqualsAndHashCode(callSuper = true) @Data -public class AggregateConfig extends ProcessorConfig{ +public class AggregateConfig extends ProcessorConfig { private List group_by_fields; @@ -18,6 +19,5 @@ public class AggregateConfig extends ProcessorConfig{ private Integer max_out_of_orderness; private Integer window_slide; private List functions; - private String[] output_fields; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java index dacbd78..18fb300 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java @@ -2,11 +2,15 @@ package com.geedgenetworks.core.pojo; import lombok.Data; +import java.io.Serializable; +import java.util.List; import java.util.Map; @Data -public class ProcessorConfig { +public class ProcessorConfig implements Serializable { private String type; private int parallelism; private Map properties; private String name; + private List output_fields; + private List remove_fields; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java index 4670fd8..48daefd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java @@ -9,12 +9,9 @@ import java.util.List; import java.util.Map; @EqualsAndHashCode(callSuper = true) @Data -public class ProjectionConfig extends ProcessorConfig implements Serializable { +public class ProjectionConfig extends ProcessorConfig { private List functions; private String format; - private List output_fields; - private List remove_fields; - } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index 43fe20d..2f086b0 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -25,26 +25,26 @@ public class AggregateProcessorImpl implements AggregateProcessor { if (aggregateConfig.getParallelism() != 0) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } }else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java index 46c9607..eaa712a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java @@ -3,7 +3,9 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.KeybyEntity; +import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.AggregateConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -17,9 +19,14 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu Event, // 输出类型 KeybyEntity, // 键类型 TimeWindow> { - + private final AggregateConfig aggregateConfig; private transient InternalMetrics internalMetrics; + public ProcessWindowFunctionImpl(AggregateConfig aggregateConfig) { + this.aggregateConfig = aggregateConfig; + } + + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); @@ -37,6 +44,20 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu internalMetrics.incrementOutEvents(); internalMetrics.incrementErrorEvents(accumulator.getErrorCount()); internalMetrics.incrementInEvents(accumulator.getInEvents()); + + if (aggregateConfig.getOutput_fields() != null + && !aggregateConfig.getOutput_fields().isEmpty()) { + event.setExtractedFields( + ColumnUtil.columnSelector( + event.getExtractedFields(), aggregateConfig.getOutput_fields())); + } + if (aggregateConfig.getRemove_fields() != null + && !aggregateConfig.getRemove_fields().isEmpty()) { + event.setExtractedFields( + ColumnUtil.columnRemover( + event.getExtractedFields(), aggregateConfig.getRemove_fields())); + } + out.collect(event); } } -- cgit v1.2.3