diff options
| author | wangkuan <[email protected]> | 2024-08-23 17:58:22 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-23 17:58:22 +0800 |
| commit | 215dd9aa1e4ec6a509d64c78ec414a8196dace3c (patch) | |
| tree | 5c038000b9de72684a1f8b8f1832394aed30cf13 /groot-core | |
| parent | 07332297c1306aa0dac649c7d15bf131e8edbc7e (diff) | |
[feature][core][common]GAL-646 Groot Stream支持Split Operator实现动态分流
Diffstat (limited to 'groot-core')
11 files changed, 177 insertions, 26 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java index 668ba6f..d8b8bc4 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java @@ -2,13 +2,14 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public class AviatorFilter implements Filter { @Override - public SingleOutputStreamOperator<Event> filterFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) + public DataStream<Event> filterFunction( + DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception { if (FilterConfig.getParallelism() != 0) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java index a173438..f8b50eb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java @@ -3,12 +3,13 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Filter { - SingleOutputStreamOperator<Event> filterFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) + DataStream<Event> filterFunction( + DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception; String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java new file mode 100644 index 0000000..4381df5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.core.pojo; + +import com.geedgenetworks.common.udf.RuleContext; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +@Data +public class SplitConfig implements Serializable { + + private String type; + private Map<String, Object> properties; + private int parallelism; + private String name; + private List<RuleContext> rules; +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java index 172b368..3852414 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java @@ -2,12 +2,13 @@ package com.geedgenetworks.core.processor; import com.geedgenetworks.common.Event; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Processor<T> { - SingleOutputStreamOperator<Event> processorFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, + DataStream<Event> processorFunction( + DataStream<Event> singleOutputStreamOperator, T processorConfig, ExecutionConfig config) throws Exception; String type(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index bc87c32..cf78310 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -5,45 +5,50 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.AggregateConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; + import static com.geedgenetworks.common.Constants.*; public class AggregateProcessorImpl implements AggregateProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + if (aggregateConfig.getParallelism() != 0) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } }else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } } + } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 6b46a7b..d87e7e2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -4,25 +4,26 @@ import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; public class ProjectionProcessorImpl implements ProjectionProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { if (projectionConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .setParallelism(projectionConfig.getParallelism()) .name(projectionConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .name(projectionConfig.getName()); } } - @Override public String type() { return "projection"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java index f36f8db..6ddc616 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java @@ -2,25 +2,25 @@ package com.geedgenetworks.core.processor.table; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.TableConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; + +import java.util.Map; -import java.time.Duration; public class TableProcessorImpl implements TableProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception { - + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) throws Exception { if (tableConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .setParallelism(tableConfig.getParallelism()) .name(tableConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .name(tableConfig.getName()); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java new file mode 100644 index 0000000..37e7b44 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.util.Set; + +public interface Split { + + DataStream<Event> splitFunction( + DataStream<Event> dataStream, SplitConfig splitConfig) + throws Exception; + String type(); +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java new file mode 100644 index 0000000..7a129ef --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -0,0 +1,79 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.List; +import java.util.Map; + + +@Slf4j +public class SplitFunction extends ProcessFunction<Event, Event> { + private SplitConfig splitConfig; + private List<RuleContext> routes; + private transient InternalMetrics internalMetrics; + + public SplitFunction(SplitConfig splitConfig) { + this.splitConfig = splitConfig; + + } + + @Override + public void open(Configuration parameters) throws Exception { + + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + this.routes = splitConfig.getRules(); + for(RuleContext rule :routes){ + String expression = rule.getExpression(); + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + Expression compiledExp = instance.compile(expression, true); + rule.setCompiledExpression(compiledExp); + OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){}; + rule.setOutputTag(outputTag); + } + } + + + @Override + public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception { + try { + internalMetrics.incrementInEvents(); + for (RuleContext route :routes){ + boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; + if (result) { + ctx.output(route.getOutputTag(), event); + } + } + }catch (Exception e) { + internalMetrics.incrementErrorEvents(); + log.error("error in split function", e); + } + } + + public static Boolean filterExecute(Expression expression, Map<String, Object> map) { + + boolean result; + Object object = expression.execute(map); + if (object != null) { + result = (Boolean) object; + } else { + throw new ExpressionRuntimeException(); + } + return result; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java new file mode 100644 index 0000000..f6d2c8c --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java @@ -0,0 +1,29 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; + +public class SplitOperator implements Split { + + @Override + public DataStream<Event> splitFunction( + DataStream<Event> dataStream, SplitConfig splitConfig) + throws Exception { + if (splitConfig.getParallelism() != 0) { + return dataStream + .process(new SplitFunction(splitConfig)) + .setParallelism(splitConfig.getParallelism()) + .name(splitConfig.getName()); + } else { + return dataStream + .process(new SplitFunction(splitConfig)) + .name(splitConfig.getName()); + } + } + @Override + public String type() { + return "split"; + } + +} diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split new file mode 100644 index 0000000..500c367 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split @@ -0,0 +1 @@ +com.geedgenetworks.core.split.SplitOperator
\ No newline at end of file |
