diff options
| author | wangkuan <[email protected]> | 2024-08-02 11:27:24 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-02 11:27:24 +0800 |
| commit | df7c5dc10d1f6d89abf20ca81a43d0028233b888 (patch) | |
| tree | 9bc832e72cf5b162b595e4378e443c0c02a5e014 | |
| parent | 778ee36be04ce08c8230d0b3247b4166f90a05ae (diff) | |
[improve][core][bootstrap]调整结构,合并Processor接口
10 files changed, 37 insertions, 50 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 04a6a94..64c66b6 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.core.filter.Filter; +import com.geedgenetworks.core.processor.Processor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; @@ -20,8 +21,7 @@ public abstract class AbstractExecutor<K, V> protected final Config operatorConfig; protected final Map<K,V> operatorMap; protected final Map<String,Filter> filterMap = new HashMap<>(); - protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>(); - protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>(); + protected final Map<String, Processor> processorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; @@ -30,13 +30,9 @@ public abstract class AbstractExecutor<K, V> for (Filter filter : filters) { this.filterMap.put(filter.type(), filter); } - ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class); - for (ProjectionProcessor projectionProcessor : projectionProcessors) { - this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor); - } - ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class); - for (AggregateProcessor aggregateProcessor : aggregateProcessors) { - this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor); + ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); + for (Processor processor : processors) { + this.processorMap.put(processor.type(), processor); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 42a4828..bd8b75c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -47,8 +47,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; - if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) { - aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType()); + if (processorMap.containsKey(aggregateConfig.getType())) { + aggregateProcessor = (AggregateProcessor) processorMap.get(aggregateConfig.getType()); } else { Class cls; try { @@ -63,7 +63,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } try { dataStream = - aggregateProcessor.aggregateProcessorFunction( + aggregateProcessor.processorFunction( dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); } catch (Exception e) { throw new JobExecuteException("Create aggregate pipeline instance failed!", e); @@ -74,8 +74,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + if (processorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = (ProjectionProcessor) processorMap.get(projectionConfig.getType()); } else { Class cls; try { @@ -90,8 +90,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } try { dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); + projectionProcessor.processorFunction( + dataStream, projectionConfig,jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); } catch (Exception e) { throw new JobExecuteException("Create processing pipeline instance failed!", e); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java new file mode 100644 index 0000000..172b368 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java @@ -0,0 +1,14 @@ +package com.geedgenetworks.core.processor; + +import com.geedgenetworks.common.Event; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +public interface Processor<T> { + + SingleOutputStreamOperator<Event> processorFunction( + SingleOutputStreamOperator<Event> singleOutputStreamOperator, + T processorConfig, ExecutionConfig config) + throws Exception; + String type(); +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java index 9acf8fc..0846ffe 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java @@ -1,16 +1,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.core.pojo.AggregateConfig; -import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.processor.Processor; +public interface AggregateProcessor extends Processor<AggregateConfig> { -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - -public interface AggregateProcessor { - - SingleOutputStreamOperator<Event> aggregateProcessorFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, - AggregateConfig aggregateConfig, ExecutionConfig config) - throws Exception; - String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index 2f086b0..bc87c32 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -11,17 +11,12 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; - import static com.geedgenetworks.common.Constants.*; -public class AggregateProcessorImpl implements AggregateProcessor { +public class AggregateProcessorImpl implements AggregateProcessor { @Override - public SingleOutputStreamOperator<Event> aggregateProcessorFunction( - SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, - AggregateConfig aggregateConfig, ExecutionConfig config) - throws Exception { - + public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { if (aggregateConfig.getParallelism() != 0) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java index 862ba5e..f15d481 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java @@ -1,15 +1,8 @@ package com.geedgenetworks.core.processor.projection; -import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; +import com.geedgenetworks.core.processor.Processor; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +public interface ProjectionProcessor extends Processor<ProjectionConfig>{ -public interface ProjectionProcessor { - - SingleOutputStreamOperator<Event> projectionProcessorFunction( - SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, - ProjectionConfig projectionConfig) - throws Exception; - String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 79b0e0d..6b46a7b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -3,16 +3,14 @@ package com.geedgenetworks.core.processor.projection; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public class ProjectionProcessorImpl implements ProjectionProcessor { - @Override - public SingleOutputStreamOperator<Event> projectionProcessorFunction( - SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, - ProjectionConfig projectionConfig) - throws Exception{ + @Override + public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { if (projectionConfig.getParallelism() != 0) { return grootEventSingleOutputStreamOperator .process(new ProjectionProcessFunction(projectionConfig)) diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor new file mode 100644 index 0000000..727b42b --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor @@ -0,0 +1,2 @@ +com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl +com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor deleted file mode 100644 index 426a1a9..0000000 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor +++ /dev/null @@ -1 +0,0 @@ -com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor deleted file mode 100644 index ede2c8c..0000000 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor +++ /dev/null @@ -1 +0,0 @@ -com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
\ No newline at end of file |
