From 778ee36be04ce08c8230d0b3247b4166f90a05ae Mon Sep 17 00:00:00 2001 From: wangkuan Date: Fri, 2 Aug 2024 10:16:58 +0800 Subject: [improve][core]优化部分函数校验逻辑 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java | 3 +++ groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java | 1 - groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java | 4 +++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java index 3153ef7..84c2c2a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java @@ -24,6 +24,9 @@ public class Flatten implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + if(udfContext.getParameters()==null){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } prefix = udfContext.getParameters().getOrDefault("prefix", "").toString(); delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString(); flattenKeys = new HashSet<>(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index e48a503..874735d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -31,7 +31,6 @@ public class PathCombine implements ScalarFunction { if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { String paths = udfContext.getParameters().getOrDefault("path","").toString(); - // 使用逗号分隔项并转换为数组 if (!paths.isEmpty()) { List pathList; try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java index ba9b4d2..6a77c3a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java @@ -26,7 +26,9 @@ public class Rename implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - + if(udfContext.getParameters()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } String parentFields = udfContext.getParameters().getOrDefault("parent_fields", "").toString(); this.parentFields = new HashSet<>(); if (!parentFields.isEmpty()) { -- cgit v1.2.3 From df7c5dc10d1f6d89abf20ca81a43d0028233b888 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Fri, 2 Aug 2024 11:27:24 +0800 Subject: [improve][core][bootstrap]调整结构,合并Processor接口 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bootstrap/execution/AbstractExecutor.java | 14 +++++--------- .../bootstrap/execution/AbstractProcessorExecutor.java | 14 +++++++------- .../java/com/geedgenetworks/core/processor/Processor.java | 14 ++++++++++++++ .../core/processor/aggregate/AggregateProcessor.java | 13 ++----------- .../core/processor/aggregate/AggregateProcessorImpl.java | 9 ++------- .../core/processor/projection/ProjectionProcessor.java | 11 ++--------- .../core/processor/projection/ProjectionProcessorImpl.java | 8 +++----- .../services/com.geedgenetworks.core.processor.Processor | 2 ++ ...dgenetworks.core.processor.aggregate.AggregateProcessor | 1 - ...enetworks.core.processor.projection.ProjectionProcessor | 1 - 10 files changed, 37 insertions(+), 50 deletions(-) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java create mode 100644 groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor delete mode 100644 groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor delete mode 100644 groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 04a6a94..64c66b6 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.core.filter.Filter; +import com.geedgenetworks.core.processor.Processor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; @@ -20,8 +21,7 @@ public abstract class AbstractExecutor protected final Config operatorConfig; protected final Map operatorMap; protected final Map filterMap = new HashMap<>(); - protected final Map projectionProcessorMap = new HashMap<>(); - protected final Map aggregateProcessorMap = new HashMap<>(); + protected final Map processorMap = new HashMap<>(); protected AbstractExecutor(List jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; @@ -30,13 +30,9 @@ public abstract class AbstractExecutor for (Filter filter : filters) { this.filterMap.put(filter.type(), filter); } - ServiceLoader projectionProcessors = ServiceLoader.load(ProjectionProcessor.class); - for (ProjectionProcessor projectionProcessor : projectionProcessors) { - this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor); - } - ServiceLoader aggregateProcessors = ServiceLoader.load(AggregateProcessor.class); - for (AggregateProcessor aggregateProcessor : aggregateProcessors) { - this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor); + ServiceLoader processors = ServiceLoader.load(Processor.class); + for (Processor processor : processors) { + this.processorMap.put(processor.type(), processor); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 42a4828..bd8b75c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -47,8 +47,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor { + + SingleOutputStreamOperator processorFunction( + SingleOutputStreamOperator singleOutputStreamOperator, + T processorConfig, ExecutionConfig config) + throws Exception; + String type(); +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java index 9acf8fc..0846ffe 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java @@ -1,16 +1,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.core.pojo.AggregateConfig; -import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.processor.Processor; +public interface AggregateProcessor extends Processor { -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - -public interface AggregateProcessor { - - SingleOutputStreamOperator aggregateProcessorFunction( - SingleOutputStreamOperator singleOutputStreamOperator, - AggregateConfig aggregateConfig, ExecutionConfig config) - throws Exception; - String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index 2f086b0..bc87c32 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -11,17 +11,12 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; - import static com.geedgenetworks.common.Constants.*; -public class AggregateProcessorImpl implements AggregateProcessor { +public class AggregateProcessorImpl implements AggregateProcessor { @Override - public SingleOutputStreamOperator aggregateProcessorFunction( - SingleOutputStreamOperator grootEventSingleOutputStreamOperator, - AggregateConfig aggregateConfig, ExecutionConfig config) - throws Exception { - + public SingleOutputStreamOperator processorFunction(SingleOutputStreamOperator grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { if (aggregateConfig.getParallelism() != 0) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java index 862ba5e..f15d481 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java @@ -1,15 +1,8 @@ package com.geedgenetworks.core.processor.projection; -import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; +import com.geedgenetworks.core.processor.Processor; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +public interface ProjectionProcessor extends Processor{ -public interface ProjectionProcessor { - - SingleOutputStreamOperator projectionProcessorFunction( - SingleOutputStreamOperator grootEventSingleOutputStreamOperator, - ProjectionConfig projectionConfig) - throws Exception; - String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 79b0e0d..6b46a7b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -3,16 +3,14 @@ package com.geedgenetworks.core.processor.projection; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public class ProjectionProcessorImpl implements ProjectionProcessor { - @Override - public SingleOutputStreamOperator projectionProcessorFunction( - SingleOutputStreamOperator grootEventSingleOutputStreamOperator, - ProjectionConfig projectionConfig) - throws Exception{ + @Override + public SingleOutputStreamOperator processorFunction(SingleOutputStreamOperator grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { if (projectionConfig.getParallelism() != 0) { return grootEventSingleOutputStreamOperator .process(new ProjectionProcessFunction(projectionConfig)) diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor new file mode 100644 index 0000000..727b42b --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor @@ -0,0 +1,2 @@ +com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl +com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl \ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor deleted file mode 100644 index 426a1a9..0000000 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor +++ /dev/null @@ -1 +0,0 @@ -com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl \ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor deleted file mode 100644 index ede2c8c..0000000 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor +++ /dev/null @@ -1 +0,0 @@ -com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl \ No newline at end of file -- cgit v1.2.3