summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-11-18 06:13:38 +0000
committer王宽 <[email protected]>2024-11-18 06:13:38 +0000
commit734ebe6bfa5757f511774d6f4a25e045c6b48583 (patch)
treeaf817c8e2acfeb54fc076acd6205efbf4eff82f2
parent85f95e2c20ad76d9a00b188aa41ce6fef9a2ac99 (diff)
parentbc8fe110e1037e75012c4fb655fff888d4356bf4 (diff)
Merge branch 'develop' into 'fix/aviator-expression'
# Conflicts: # groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java # groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java # groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
-rw-r--r--docs/images/groot-stream-factory.pngbin215649 -> 142201 bytes
-rw-r--r--docs/images/groot-stream-module.pngbin51369 -> 69565 bytes
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java (renamed from groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java)2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java143
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java8
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml29
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml6
-rw-r--r--pom.xml6
18 files changed, 154 insertions, 112 deletions
diff --git a/docs/images/groot-stream-factory.png b/docs/images/groot-stream-factory.png
index f5b04ff..59bfd32 100644
--- a/docs/images/groot-stream-factory.png
+++ b/docs/images/groot-stream-factory.png
Binary files differ
diff --git a/docs/images/groot-stream-module.png b/docs/images/groot-stream-module.png
index ebe0e2d..7c7519b 100644
--- a/docs/images/groot-stream-module.png
+++ b/docs/images/groot-stream-module.png
Binary files differ
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java
index 771bc66..6cb4f5a 100644
--- a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java
+++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java
@@ -6,7 +6,7 @@ import lombok.Data;
import java.io.Serializable;
@Data
-public class UdfEntity implements Serializable {
+public class UDFEntity implements Serializable {
private ScalarFunction scalarFunction;
private AggregateFunction aggregateFunction;
private TableFunction tableFunction;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
index dcb3e56..88505ee 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
@@ -4,8 +4,10 @@ public enum JobStage {
SOURCE("source"),
FILTER("filter"),
SPLIT("split"),
+ @Deprecated
PREPROCESSING("preprocessing"),
PROCESSING("processing"),
+ @Deprecated
POSTPROCESSING("postprocessing"),
SINK("sink");
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index b7f8b97..2cdfbb3 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -25,6 +25,9 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+
@Slf4j
public class JobExecution {
@@ -34,7 +37,6 @@ public class JobExecution {
private final Executor<DataStream<Event>> processorExecutor;
private final List<JobTopologyNode> jobTopologyNodes;
private final List<URL> jarPaths;
- private final Map<String,String> nodeNameWithSplitTags = new HashMap<>();
public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
@@ -147,7 +149,6 @@ public class JobExecution {
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
-
if (config.hasPath(Constants.SOURCES)) {
sources = config.getConfig(Constants.SOURCES).root().unwrapped();
}
@@ -180,20 +181,23 @@ public class JobExecution {
});
for (JobTopologyNode jobTopologyNode : jobTopologyNodes) {
+ checkNotNull(jobTopologyNode.getName(), "Node name is null");
if (sources.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.SOURCE);
+ jobTopologyNode.setStage(JobStage.SOURCE);
} else if (sinks.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.SINK);
+ jobTopologyNode.setStage(JobStage.SINK);
} else if (filters.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.FILTER);
+ jobTopologyNode.setStage(JobStage.FILTER);
} else if (splits.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.SPLIT);
+ checkArgument( jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size(),
+ "split node downstream size not equal tags size");
+ jobTopologyNode.setStage(JobStage.SPLIT);
} else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.PREPROCESSING);
+ jobTopologyNode.setStage(JobStage.PREPROCESSING);
} else if (processingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.PROCESSING);
+ jobTopologyNode.setStage(JobStage.PROCESSING);
} else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.POSTPROCESSING);
+ jobTopologyNode.setStage(JobStage.POSTPROCESSING);
} else {
throw new JobExecuteException("Unsupported operator type " + jobTopologyNode.getName());
}
@@ -205,17 +209,18 @@ public class JobExecution {
public void execute() throws JobExecuteException {
+
if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
- List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes
- .stream().filter(v -> v.getType().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList());
+ List<JobTopologyNode> sourceNodes = jobTopologyNodes
+ .stream().filter(v -> v.getStage().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList());
DataStream<Event> dataStream = null;
- for (JobTopologyNode sourceJobTopologyNode : sourceJobTopologyNodes) {
- dataStream = sourceExecutor.execute(dataStream, sourceJobTopologyNode);
- for (String nodeName : sourceJobTopologyNode.getDownstream()) {
+ for (JobTopologyNode sourceNode : sourceNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceNode);
+ for (String nodeName : sourceNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
@@ -235,67 +240,73 @@ public class JobExecution {
}
- private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
- JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> {
- throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
- });
- if (jobTopologyNode.getType().name().equals(JobStage.FILTER.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream)
- .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.SPLIT.name())) {
- if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) {
- for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) {
- nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i));
- }
- }
- else {
- throw new JobExecuteException("split node downstream size not equal tags size");
- }
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- } else if (jobTopologyNode.getType().name().equals(JobStage.PREPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){
- }), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.PROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
- }), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.POSTPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
- }), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.SINK.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
- }), jobTopologyNode);
- } else {
- dataStream = sinkExecutor.execute(dataStream, jobTopologyNode);
- }
+ private void buildJobGraph(DataStream<Event> dataStream, String nodeName) {
+ JobTopologyNode currentNode = getNode(nodeName).orElseThrow(() ->
+ new JobExecuteException("Node not found: " + nodeName));
+ switch (currentNode.getStage()) {
+ case SPLIT:
+ case FILTER:
+ case PREPROCESSING:
+ case PROCESSING:
+ case POSTPROCESSING:
+ dataStream = executeProcessorNode(dataStream, currentNode);
+ break;
+ case SINK:
+ dataStream = executeSinkNode(dataStream, currentNode);
+ break;
+ default:
+ throw new JobExecuteException("Unsupported Job stage: " + currentNode.getStage());
+ }
+ // Recursively build job graph for downstream nodes
+ for (String downstreamNodeName : currentNode.getDownstream()) {
+ buildJobGraph(dataStream, downstreamNodeName);
+ }
+
+ }
+
+ private DataStream<Event> executeProcessorNode(DataStream<Event> dataStream, JobTopologyNode currentNode) {
+
+ JobTopologyNode inputNode = getInputNode(dataStream);
+
+ if (isSplitAndHasTag(inputNode, currentNode)) {
+ return executeWithSideOutput(dataStream, inputNode, currentNode, processorExecutor);
} else {
- throw new JobExecuteException("unsupported process type " + jobTopologyNode.getType().name());
+ return processorExecutor.execute(dataStream, currentNode);
}
+ }
+ private DataStream<Event> executeSinkNode(DataStream<Event> dataStream, JobTopologyNode currentNode) {
+ JobTopologyNode inputNode = getInputNode(dataStream);
- for (String nodeName : jobTopologyNode.getDownstream()) {
- buildJobGraph(dataStream, nodeName);
+ if (isSplitAndHasTag(inputNode, currentNode)) {
+ return executeWithSideOutput(dataStream, inputNode, currentNode, sinkExecutor);
+ } else {
+ return sinkExecutor.execute(dataStream, currentNode);
}
+ }
+ // Helper method to get the input node based on the current data stream
+ private JobTopologyNode getInputNode(DataStream<Event> dataStream) {
+ String inputName = dataStream.getTransformation().getName();
+ return getNode(inputName).orElseThrow(() -> new JobExecuteException("Node not found: " + inputName));
+ }
+
+ // Helper method to check if input node is SPLIT and has a tag for downstream
+ private boolean isSplitAndHasTag(JobTopologyNode inputNode, JobTopologyNode currentNode) {
+ return inputNode.getStage().equals(JobStage.SPLIT)
+ && inputNode.getTagForDownstream(currentNode.getName()) != null;
+ }
+ // Helper method to execute with side output
+ private DataStream<Event> executeWithSideOutput(DataStream<Event> dataStream, JobTopologyNode inputNode,
+ JobTopologyNode currentNode, Executor<DataStream<Event>> executor) {
+ OutputTag<Event> outputTag = new OutputTag<Event>(inputNode.getTagForDownstream(currentNode.getName())) {};
+ SingleOutputStreamOperator<Event> singleOutputStream = (SingleOutputStreamOperator<Event>) dataStream;
+ DataStream<Event> sideOutput = singleOutputStream.getSideOutput(outputTag);
+ return executor.execute(sideOutput, currentNode);
}
+ // Helper method to get the node based on the name
private Optional<JobTopologyNode> getNode(String name) {
return jobTopologyNodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
index ab2aec3..27881a2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.JobStage;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -8,7 +9,9 @@ import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Represents an operator node in the execution graph.
@@ -19,9 +22,20 @@ import java.util.List;
@EqualsAndHashCode
public class JobTopologyNode implements Serializable {
private String name;
- private JobStage type;
+ private JobStage stage;
private int parallelism;
private List<String> downstream = Collections.emptyList();
private List<String> tags = Collections.emptyList();
+ public String getTagForDownstream(String downstreamNode) {
+ int index = downstream.indexOf(downstreamNode);
+ if (index < 0) {
+ return null;
+ }
+ return tags.get(index);
+ }
+
+
+
+
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
index b67a842..7629d48 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
@@ -6,9 +6,11 @@ public final class Constants {
public static final String SOURCES = "sources";
public static final String FILTERS = "filters";
public static final String SPLITS = "splits";
+ @Deprecated
public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines";
- public static final String PROCESSING_PIPELINES = "processing_pipelines";
+ @Deprecated
public static final String POSTPROCESSING_PIPELINES = "postprocessing_pipelines";
+ public static final String PROCESSING_PIPELINES = "processing_pipelines";
public static final String SINKS = "sinks";
public static final int SYSTEM_EXIT_CODE = 2618;
public static final String APPLICATION = "application";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
index 842d1bf..8807123 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
@@ -72,7 +72,7 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
grootStreamRoot = yamlRootNode;
}
- YamlDomChecker.check(grootStreamRoot);
+ YamlDomChecker.check(grootStreamRoot, null);
Node w3cRootNode = asW3cNode(grootStreamRoot);
replaceVariables(w3cRootNode);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
index 00d7d9c..139c715 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -9,7 +9,7 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.api.common.udf.AggregateFunction;
import com.geedgenetworks.api.common.udf.UDFContext;
-import com.geedgenetworks.api.common.udf.UdfEntity;
+import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
import com.geedgenetworks.api.expressions.AviatorExecutor;
@@ -41,7 +41,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
protected final Map<Long, Map<String, Accumulator>> windows = new HashMap<>();
protected List<String> groupByFields;
- private LinkedList<UdfEntity> functions;
+ private LinkedList<UDFEntity> functions;
protected InternalMetrics internalMetrics;
private final AggregateConfig aggregateConfig;
@@ -93,7 +93,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
}
}
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
}
@@ -137,7 +137,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(keysMap);
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
udfEntity.getAggregateFunction().initAccumulator(accumulator);
}
return accumulator;
@@ -164,7 +164,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
public Accumulator add(Event event, Accumulator accumulator) {
accumulator.setInEvents(accumulator.getInEvents() + 1);
- for (UdfEntity udafEntity : functions) {
+ for (UDFEntity udafEntity : functions) {
try {
boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true;
if (result) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index ea05c1c..a7dff22 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -8,7 +8,7 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.api.common.udf.AggregateFunction;
import com.geedgenetworks.api.common.udf.UDFContext;
-import com.geedgenetworks.api.common.udf.UdfEntity;
+import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.event.Event;
import com.google.common.collect.Lists;
import com.googlecode.aviator.AviatorEvaluator;
@@ -31,7 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> {
private final List<UDFContext> udfContexts;
private final List<String> udfClassNameLists;
- private final LinkedList<UdfEntity> functions;
+ private final LinkedList<UDFEntity> functions;
public AggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) {
udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
@@ -64,7 +64,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
}
}
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
}
} catch (Exception e) {
@@ -77,7 +77,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
Map<String, Object> map = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(map);
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
udfEntity.getAggregateFunction().initAccumulator(accumulator);
}
return accumulator;
@@ -86,7 +86,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
public Accumulator add(Event event, Accumulator accumulator) {
accumulator.setInEvents(accumulator.getInEvents()+1);
- for (UdfEntity udafEntity : functions) {
+ for (UDFEntity udafEntity : functions) {
try {
boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true;
if (result) {
@@ -105,7 +105,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
@Override
public Accumulator getResult(Accumulator accumulator) {
- for (UdfEntity udafEntity : functions) {
+ for (UDFEntity udafEntity : functions) {
try {
udafEntity.getAggregateFunction().getResult(accumulator);
} catch (Exception e) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
index db5336a..d7946bd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -7,7 +7,7 @@ import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.api.common.udf.UDFContext;
-import com.geedgenetworks.api.common.udf.UdfEntity;
+import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.common.udf.AggregateFunction;
import com.google.common.collect.Lists;
import com.googlecode.aviator.AviatorEvaluator;
@@ -29,7 +29,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> {
private final List<UDFContext> udfContexts;
private final List<String> udfClassNameLists;
- private final LinkedList<UdfEntity> functions;
+ private final LinkedList<UDFEntity> functions;
public SecondAggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) {
udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
@@ -42,7 +42,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
try {
for (UDFContext udfContext : udfContexts) {
Expression filterExpression = null;
- UdfEntity udfEntity = new UdfEntity();
+ UDFEntity udfEntity = new UDFEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
@@ -59,7 +59,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
}
}
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
}
} catch (Exception e) {
@@ -73,7 +73,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
Map<String, Object> map = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(map);
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
udfEntity.getAggregateFunction().initAccumulator(accumulator);
}
return accumulator;
@@ -86,7 +86,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
@Override
public Accumulator getResult(Accumulator accumulator) {
- for (UdfEntity udafEntity : functions) {
+ for (UDFEntity udafEntity : functions) {
try {
udafEntity.getAggregateFunction().getResult(accumulator);
} catch (Exception e) {
@@ -99,7 +99,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
@Override
public Accumulator merge(Accumulator acc1, Accumulator acc2) {
acc1.setInEvents(acc1.getInEvents() + 1);
- for (UdfEntity udafEntity : functions) {
+ for (UDFEntity udafEntity : functions) {
try {
udafEntity.getAggregateFunction().merge(acc1, acc2);
} catch (ExpressionRuntimeException ignore) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index e079790..24e77ae 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -11,7 +11,7 @@ import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler;
import com.geedgenetworks.api.common.udf.ScalarFunction;
import com.geedgenetworks.api.common.udf.UDFContext;
-import com.geedgenetworks.api.common.udf.UdfEntity;
+import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
import com.google.common.collect.Lists;
@@ -34,7 +34,7 @@ import java.util.Map;
@Slf4j
public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
private final ProjectionConfig projectionConfig;
- private LinkedList<UdfEntity> functions;
+ private LinkedList<UDFEntity> functions;
private transient InternalMetrics internalMetrics;
@@ -88,7 +88,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
internalMetrics.incrementInEvents();
int errorCount = 0;
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
try {
if (event.isDropped()) {
@@ -135,7 +135,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
@Override
public void close() throws Exception {
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
udfEntity.getScalarFunction().close();
}
KnowledgeBaseScheduler.decrement();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java
index d9232d7..bfd3d10 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java
@@ -52,7 +52,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
ctx.output(route.getOutputTag(), event);
}
}
- }catch (Exception e) {
+ } catch (Exception e) {
internalMetrics.incrementErrorEvents();
log.error("error in split function", e);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
index a054fcd..038a252 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
@@ -8,7 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.api.common.udf.TableFunction;
import com.geedgenetworks.api.common.udf.UDFContext;
-import com.geedgenetworks.api.common.udf.UdfEntity;
+import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
import com.google.common.collect.Lists;
@@ -28,7 +28,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
@Slf4j
public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
- private LinkedList<UdfEntity> functions;
+ private LinkedList<UDFEntity> functions;
private final TableConfig tableConfig;
private transient InternalMetrics internalMetrics;
@@ -50,7 +50,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
for (UDFContext udfContext : udfContexts) {
Expression filterExpression = null;
- UdfEntity udfEntity = new UdfEntity();
+ UDFEntity udfEntity = new UDFEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
@@ -82,7 +82,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
int errorCount = 0;
List<Event> events = new ArrayList<>();
events.add(event);
- for (UdfEntity udfEntity : functions) {
+ for (UDFEntity udfEntity : functions) {
List<Event> newEvents = new ArrayList<>();
for(int i=0;i<events.size();i++) {
try {
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 9b58289..9f86144 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -14,7 +14,7 @@ import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/grootstream_job_split_test.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
index 9bb2900..3477b00 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
@@ -10,21 +10,28 @@ sources:
json.ignore.parse.errors: false
sinks:
collect_sink:
- type: collect
+ type: print
properties:
format: json
splits:
- test_split:
+ test_split_a:
type: split
rules:
- - name: table_processor
+ - tag: http_a_tag
expression: event.decoded_as == 'HTTP'
- - name: pre_etl_processor
+ - tag: dns_a_tag
+ expression: event.decoded_as == 'DNS'
+ test_split_b:
+ type: split
+ rules:
+ - tag: base_b_tag
+ expression: event.decoded_as == 'BASE'
+ - tag: dns_b_tag
expression: event.decoded_as == 'DNS'
postprocessing_pipelines:
pre_etl_processor: # [object] Processing Pipeline
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
remove_fields: [fields,tags]
output_fields:
functions: # [array of object] Function List
@@ -45,7 +52,7 @@ postprocessing_pipelines:
# parent_fields: [tags]
# rename_fields:
# tags: tags
- rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+ rename_expression: key=string.replace_all(key,'tags.','');key=string.replace_all(key,'fields.','');return key;
- function: UNIX_TIMESTAMP_CONVERTER
@@ -81,16 +88,22 @@ application: # [object] Application Configuration
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
parallelism: 1 # [number] Operator-Level Parallelism.
- downstream: [test_split,collect_sink]
- - name: test_split
+ downstream: [test_split_a, test_split_b]
+ - name: test_split_a
parallelism: 1
+ tags: [http_a_tag,dns_a_tag]
downstream: [ table_processor,pre_etl_processor ]
+ - name: test_split_b
+ parallelism: 1
+ tags: [ base_b_tag]
+ downstream: [ collect_sink ]
- name: pre_etl_processor
parallelism: 1
downstream: [ collect_sink ]
- name: table_processor
parallelism: 1
downstream: [ collect_sink ]
+
- name: collect_sink
parallelism: 1
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index 65cd3cb..0c00060 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -38,12 +38,12 @@ filters:
properties:
expression: event.decoded_as == 'HTTP'
-preprocessing_pipelines:
+processing_pipelines:
+
transform_processor:
type: projection
- remove_fields: [client_ip]
+ remove_fields: [ client_ip ]
-processing_pipelines:
session_record_processor:
type: projection
remove_fields: [device_tag]
diff --git a/pom.xml b/pom.xml
index 1edd9c1..5930743 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
<maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version>
- <testcontainer.version>1.20.1</testcontainer.version>
+ <testcontainer.version>1.20.3</testcontainer.version>
<awaitility.version>4.2.0</awaitility.version>
<spotless.version>2.40.0</spotless.version>
<slf4j.version>1.7.25</slf4j.version>
@@ -65,7 +65,7 @@
<guava-retrying.version>2.0.0</guava-retrying.version>
<ipaddress.version>5.3.3</ipaddress.version>
<aviator.version>5.4.1</aviator.version>
- <snakeyaml.version>1.29</snakeyaml.version>
+ <snakeyaml.version>2.0</snakeyaml.version>
<nacos.version>2.0.4</nacos.version>
<antlr4.version>4.8</antlr4.version>
<jcommander.version>1.81</jcommander.version>
@@ -73,7 +73,7 @@
<snappy-java.version>1.1.10.4</snappy-java.version>
<lombok.version>1.18.24</lombok.version>
<config.version>1.3.3</config.version>
- <hazelcast.version>5.1</hazelcast.version>
+ <hazelcast.version>5.3.5</hazelcast.version>
<quartz.version>2.3.2</quartz.version>
<hadoop.version>2.7.1</hadoop.version>
<!--Option config -->