summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-02 03:35:28 +0000
committer王宽 <[email protected]>2024-08-02 03:35:28 +0000
commitaf254ff23a3cfa88c65ad15515d211ebfcf58c9b (patch)
treea68143608b3841c46c94c419a03b0ca619540960
parent0d68d0be840ae86c79896a239d27d6fa417c77d4 (diff)
parentdf7c5dc10d1f6d89abf20ca81a43d0028233b888 (diff)
Merge branch 'feature/aggregate' into 'develop'
Feature/aggregate See merge request galaxy/platform/groot-stream!87
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java14
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java4
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor2
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor1
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor1
13 files changed, 43 insertions, 52 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 04a6a94..64c66b6 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.geedgenetworks.core.filter.Filter;
+import com.geedgenetworks.core.processor.Processor;
import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
@@ -20,8 +21,7 @@ public abstract class AbstractExecutor<K, V>
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
protected final Map<String,Filter> filterMap = new HashMap<>();
- protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>();
- protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>();
+ protected final Map<String, Processor> processorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
this.operatorConfig = operatorConfig;
@@ -30,13 +30,9 @@ public abstract class AbstractExecutor<K, V>
for (Filter filter : filters) {
this.filterMap.put(filter.type(), filter);
}
- ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class);
- for (ProjectionProcessor projectionProcessor : projectionProcessors) {
- this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor);
- }
- ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class);
- for (AggregateProcessor aggregateProcessor : aggregateProcessors) {
- this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor);
+ ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
+ for (Processor processor : processors) {
+ this.processorMap.put(processor.type(), processor);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index 42a4828..bd8b75c 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -47,8 +47,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
AggregateProcessor aggregateProcessor;
- if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) {
- aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType());
+ if (processorMap.containsKey(aggregateConfig.getType())) {
+ aggregateProcessor = (AggregateProcessor) processorMap.get(aggregateConfig.getType());
} else {
Class cls;
try {
@@ -63,7 +63,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
try {
dataStream =
- aggregateProcessor.aggregateProcessorFunction(
+ aggregateProcessor.processorFunction(
dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
} catch (Exception e) {
throw new JobExecuteException("Create aggregate pipeline instance failed!", e);
@@ -74,8 +74,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ if (processorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = (ProjectionProcessor) processorMap.get(projectionConfig.getType());
} else {
Class cls;
try {
@@ -90,8 +90,8 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
try {
dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
+ projectionProcessor.processorFunction(
+ dataStream, projectionConfig,jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
} catch (Exception e) {
throw new JobExecuteException("Create processing pipeline instance failed!", e);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
new file mode 100644
index 0000000..172b368
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
@@ -0,0 +1,14 @@
+package com.geedgenetworks.core.processor;
+
+import com.geedgenetworks.common.Event;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+public interface Processor<T> {
+
+ SingleOutputStreamOperator<Event> processorFunction(
+ SingleOutputStreamOperator<Event> singleOutputStreamOperator,
+ T processorConfig, ExecutionConfig config)
+ throws Exception;
+ String type();
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
index 9acf8fc..0846ffe 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
@@ -1,16 +1,7 @@
package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.core.pojo.AggregateConfig;
-import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.processor.Processor;
+public interface AggregateProcessor extends Processor<AggregateConfig> {
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
-public interface AggregateProcessor {
-
- SingleOutputStreamOperator<Event> aggregateProcessorFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator,
- AggregateConfig aggregateConfig, ExecutionConfig config)
- throws Exception;
- String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index 2f086b0..bc87c32 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -11,17 +11,12 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
-
import static com.geedgenetworks.common.Constants.*;
-public class AggregateProcessorImpl implements AggregateProcessor {
+public class AggregateProcessorImpl implements AggregateProcessor {
@Override
- public SingleOutputStreamOperator<Event> aggregateProcessorFunction(
- SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator,
- AggregateConfig aggregateConfig, ExecutionConfig config)
- throws Exception {
-
+ public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
if (aggregateConfig.getParallelism() != 0) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
index 862ba5e..f15d481 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
@@ -1,15 +1,8 @@
package com.geedgenetworks.core.processor.projection;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProjectionConfig;
+import com.geedgenetworks.core.processor.Processor;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+public interface ProjectionProcessor extends Processor<ProjectionConfig>{
-public interface ProjectionProcessor {
-
- SingleOutputStreamOperator<Event> projectionProcessorFunction(
- SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator,
- ProjectionConfig projectionConfig)
- throws Exception;
- String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
index 79b0e0d..6b46a7b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
@@ -3,16 +3,14 @@ package com.geedgenetworks.core.processor.projection;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProjectionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public class ProjectionProcessorImpl implements ProjectionProcessor {
- @Override
- public SingleOutputStreamOperator<Event> projectionProcessorFunction(
- SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator,
- ProjectionConfig projectionConfig)
- throws Exception{
+ @Override
+ public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception {
if (projectionConfig.getParallelism() != 0) {
return grootEventSingleOutputStreamOperator
.process(new ProjectionProcessFunction(projectionConfig))
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
index 3153ef7..84c2c2a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
@@ -24,6 +24,9 @@ public class Flatten implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ if(udfContext.getParameters()==null){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
prefix = udfContext.getParameters().getOrDefault("prefix", "").toString();
delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString();
flattenKeys = new HashSet<>();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index e48a503..874735d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -31,7 +31,6 @@ public class PathCombine implements ScalarFunction {
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
String paths = udfContext.getParameters().getOrDefault("path","").toString();
- // 使用逗号分隔项并转换为数组
if (!paths.isEmpty()) {
List<String> pathList;
try {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index ba9b4d2..6a77c3a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -26,7 +26,9 @@ public class Rename implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
-
+ if(udfContext.getParameters()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
String parentFields = udfContext.getParameters().getOrDefault("parent_fields", "").toString();
this.parentFields = new HashSet<>();
if (!parentFields.isEmpty()) {
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor
new file mode 100644
index 0000000..727b42b
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor
@@ -0,0 +1,2 @@
+com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl
+com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl \ No newline at end of file
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor
deleted file mode 100644
index 426a1a9..0000000
--- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor
+++ /dev/null
@@ -1 +0,0 @@
-com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl \ No newline at end of file
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor
deleted file mode 100644
index ede2c8c..0000000
--- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor
+++ /dev/null
@@ -1 +0,0 @@
-com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl \ No newline at end of file