summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-02 11:27:24 +0800
committerwangkuan <[email protected]>2024-08-02 11:27:24 +0800
commitdf7c5dc10d1f6d89abf20ca81a43d0028233b888 (patch)
tree9bc832e72cf5b162b595e4378e443c0c02a5e014 /groot-bootstrap
parent778ee36be04ce08c8230d0b3247b4166f90a05ae (diff)
[improve][core][bootstrap]调整结构,合并Processor接口
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java14
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java14
2 files changed, 12 insertions, 16 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 04a6a94..64c66b6 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.geedgenetworks.core.filter.Filter;
+import com.geedgenetworks.core.processor.Processor;
import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
@@ -20,8 +21,7 @@ public abstract class AbstractExecutor<K, V>
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
protected final Map<String,Filter> filterMap = new HashMap<>();
- protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>();
- protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>();
+ protected final Map<String, Processor> processorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
this.operatorConfig = operatorConfig;
@@ -30,13 +30,9 @@ public abstract class AbstractExecutor<K, V>
for (Filter filter : filters) {
this.filterMap.put(filter.type(), filter);
}
- ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class);
- for (ProjectionProcessor projectionProcessor : projectionProcessors) {
- this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor);
- }
- ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class);
- for (AggregateProcessor aggregateProcessor : aggregateProcessors) {
- this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor);
+ ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
+ for (Processor processor : processors) {
+ this.processorMap.put(processor.type(), processor);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index 42a4828..bd8b75c 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -47,8 +47,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
AggregateProcessor aggregateProcessor;
- if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) {
- aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType());
+ if (processorMap.containsKey(aggregateConfig.getType())) {
+ aggregateProcessor = (AggregateProcessor) processorMap.get(aggregateConfig.getType());
} else {
Class cls;
try {
@@ -63,7 +63,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
try {
dataStream =
- aggregateProcessor.aggregateProcessorFunction(
+ aggregateProcessor.processorFunction(
dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
} catch (Exception e) {
throw new JobExecuteException("Create aggregate pipeline instance failed!", e);
@@ -74,8 +74,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ if (processorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = (ProjectionProcessor) processorMap.get(projectionConfig.getType());
} else {
Class cls;
try {
@@ -90,8 +90,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
try {
dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
+ projectionProcessor.processorFunction(
+ dataStream, projectionConfig,jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
} catch (Exception e) {
throw new JobExecuteException("Create processing pipeline instance failed!", e);
}