diff options
| author | 王宽 <[email protected]> | 2024-08-02 03:35:28 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-02 03:35:28 +0000 |
| commit | af254ff23a3cfa88c65ad15515d211ebfcf58c9b (patch) | |
| tree | a68143608b3841c46c94c419a03b0ca619540960 /groot-bootstrap | |
| parent | 0d68d0be840ae86c79896a239d27d6fa417c77d4 (diff) | |
| parent | df7c5dc10d1f6d89abf20ca81a43d0028233b888 (diff) | |
Merge branch 'feature/aggregate' into 'develop'
Feature/aggregate
See merge request galaxy/platform/groot-stream!87
Diffstat (limited to 'groot-bootstrap')
2 files changed, 12 insertions, 16 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 04a6a94..64c66b6 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.core.filter.Filter; +import com.geedgenetworks.core.processor.Processor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; @@ -20,8 +21,7 @@ public abstract class AbstractExecutor<K, V> protected final Config operatorConfig; protected final Map<K,V> operatorMap; protected final Map<String,Filter> filterMap = new HashMap<>(); - protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>(); - protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>(); + protected final Map<String, Processor> processorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; @@ -30,13 +30,9 @@ public abstract class AbstractExecutor<K, V> for (Filter filter : filters) { this.filterMap.put(filter.type(), filter); } - ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class); - for (ProjectionProcessor projectionProcessor : projectionProcessors) { - this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor); - } - ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class); - for (AggregateProcessor aggregateProcessor : aggregateProcessors) { - this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor); + ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); + for (Processor processor : processors) { + this.processorMap.put(processor.type(), processor); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 42a4828..bd8b75c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -47,8 +47,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; - if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) { - aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType()); + if (processorMap.containsKey(aggregateConfig.getType())) { + aggregateProcessor = (AggregateProcessor) processorMap.get(aggregateConfig.getType()); } else { Class cls; try { @@ -63,7 +63,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } try { dataStream = - aggregateProcessor.aggregateProcessorFunction( + aggregateProcessor.processorFunction( dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); } catch (Exception e) { throw new JobExecuteException("Create aggregate pipeline instance failed!", e); @@ -74,8 +74,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + if (processorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = (ProjectionProcessor) processorMap.get(projectionConfig.getType()); } else { Class cls; try { @@ -90,8 +90,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } try { dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); + projectionProcessor.processorFunction( + dataStream, projectionConfig,jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); } catch (Exception e) { throw new JobExecuteException("Create processing pipeline instance failed!", e); } |
