diff options
| author | 王宽 <[email protected]> | 2024-11-18 06:13:38 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-11-18 06:13:38 +0000 |
| commit | 734ebe6bfa5757f511774d6f4a25e045c6b48583 (patch) | |
| tree | af817c8e2acfeb54fc076acd6205efbf4eff82f2 | |
| parent | 85f95e2c20ad76d9a00b188aa41ce6fef9a2ac99 (diff) | |
| parent | bc8fe110e1037e75012c4fb655fff888d4356bf4 (diff) | |
Merge branch 'develop' into 'fix/aviator-expression'
# Conflicts:
# groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
# groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
# groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
18 files changed, 154 insertions, 112 deletions
diff --git a/docs/images/groot-stream-factory.png b/docs/images/groot-stream-factory.png Binary files differindex f5b04ff..59bfd32 100644 --- a/docs/images/groot-stream-factory.png +++ b/docs/images/groot-stream-factory.png diff --git a/docs/images/groot-stream-module.png b/docs/images/groot-stream-module.png Binary files differindex ebe0e2d..7c7519b 100644 --- a/docs/images/groot-stream-module.png +++ b/docs/images/groot-stream-module.png diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java index 771bc66..6cb4f5a 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java @@ -6,7 +6,7 @@ import lombok.Data; import java.io.Serializable; @Data -public class UdfEntity implements Serializable { +public class UDFEntity implements Serializable { private ScalarFunction scalarFunction; private AggregateFunction aggregateFunction; private TableFunction tableFunction; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java index dcb3e56..88505ee 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java @@ -4,8 +4,10 @@ public enum JobStage { SOURCE("source"), FILTER("filter"), SPLIT("split"), + @Deprecated PREPROCESSING("preprocessing"), PROCESSING("processing"), + @Deprecated POSTPROCESSING("postprocessing"), SINK("sink"); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index b7f8b97..2cdfbb3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -25,6 +25,9 @@ import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkArgument; +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull; + @Slf4j public class JobExecution { @@ -34,7 +37,6 @@ public class JobExecution { private final Executor<DataStream<Event>> processorExecutor; private final List<JobTopologyNode> jobTopologyNodes; private final List<URL> jarPaths; - private final Map<String,String> nodeNameWithSplitTags = new HashMap<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -147,7 +149,6 @@ public class JobExecution { Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); - if (config.hasPath(Constants.SOURCES)) { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } @@ -180,20 +181,23 @@ public class JobExecution { }); for (JobTopologyNode jobTopologyNode : jobTopologyNodes) { + checkNotNull(jobTopologyNode.getName(), "Node name is null"); if (sources.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.SOURCE); + jobTopologyNode.setStage(JobStage.SOURCE); } else if (sinks.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.SINK); + jobTopologyNode.setStage(JobStage.SINK); } else if (filters.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.FILTER); + jobTopologyNode.setStage(JobStage.FILTER); } else if (splits.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.SPLIT); + checkArgument( jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size(), + "split node downstream size not equal tags size"); + jobTopologyNode.setStage(JobStage.SPLIT); } else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.PREPROCESSING); + jobTopologyNode.setStage(JobStage.PREPROCESSING); } else if (processingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.PROCESSING); + jobTopologyNode.setStage(JobStage.PROCESSING); } else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.POSTPROCESSING); + jobTopologyNode.setStage(JobStage.POSTPROCESSING); } else { throw new JobExecuteException("Unsupported operator type " + jobTopologyNode.getName()); } @@ -205,17 +209,18 @@ public class JobExecution { public void execute() throws JobExecuteException { + if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } - List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes - .stream().filter(v -> v.getType().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList()); + List<JobTopologyNode> sourceNodes = jobTopologyNodes + .stream().filter(v -> v.getStage().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList()); DataStream<Event> dataStream = null; - for (JobTopologyNode sourceJobTopologyNode : sourceJobTopologyNodes) { - dataStream = sourceExecutor.execute(dataStream, sourceJobTopologyNode); - for (String nodeName : sourceJobTopologyNode.getDownstream()) { + for (JobTopologyNode sourceNode : sourceNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceNode); + for (String nodeName : sourceNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } @@ -235,67 +240,73 @@ public class JobExecution { } - private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { - JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> { - throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); - }); - if (jobTopologyNode.getType().name().equals(JobStage.FILTER.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream) - .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.SPLIT.name())) { - if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) { - for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) { - nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i)); - } - } - else { - throw new JobExecuteException("split node downstream size not equal tags size"); - } - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } else if (jobTopologyNode.getType().name().equals(JobStage.PREPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){ - }), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.PROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { - }), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.POSTPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { - }), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.SINK.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { - }), jobTopologyNode); - } else { - dataStream = sinkExecutor.execute(dataStream, jobTopologyNode); - } + private void buildJobGraph(DataStream<Event> dataStream, String nodeName) { + JobTopologyNode currentNode = getNode(nodeName).orElseThrow(() -> + new JobExecuteException("Node not found: " + nodeName)); + switch (currentNode.getStage()) { + case SPLIT: + case FILTER: + case PREPROCESSING: + case PROCESSING: + case POSTPROCESSING: + dataStream = executeProcessorNode(dataStream, currentNode); + break; + case SINK: + dataStream = executeSinkNode(dataStream, currentNode); + break; + default: + throw new JobExecuteException("Unsupported Job stage: " + currentNode.getStage()); + } + // Recursively build job graph for downstream nodes + for (String downstreamNodeName : currentNode.getDownstream()) { + buildJobGraph(dataStream, downstreamNodeName); + } + + } + + private DataStream<Event> executeProcessorNode(DataStream<Event> dataStream, JobTopologyNode currentNode) { + + JobTopologyNode inputNode = getInputNode(dataStream); + + if (isSplitAndHasTag(inputNode, currentNode)) { + return executeWithSideOutput(dataStream, inputNode, currentNode, processorExecutor); } else { - throw new JobExecuteException("unsupported process type " + jobTopologyNode.getType().name()); + return processorExecutor.execute(dataStream, currentNode); } + } + private DataStream<Event> executeSinkNode(DataStream<Event> dataStream, JobTopologyNode currentNode) { + JobTopologyNode inputNode = getInputNode(dataStream); - for (String nodeName : jobTopologyNode.getDownstream()) { - buildJobGraph(dataStream, nodeName); + if (isSplitAndHasTag(inputNode, currentNode)) { + return executeWithSideOutput(dataStream, inputNode, currentNode, sinkExecutor); + } else { + return sinkExecutor.execute(dataStream, currentNode); } + } + // Helper method to get the input node based on the current data stream + private JobTopologyNode getInputNode(DataStream<Event> dataStream) { + String inputName = dataStream.getTransformation().getName(); + return getNode(inputName).orElseThrow(() -> new JobExecuteException("Node not found: " + inputName)); + } + + // Helper method to check if input node is SPLIT and has a tag for downstream + private boolean isSplitAndHasTag(JobTopologyNode inputNode, JobTopologyNode currentNode) { + return inputNode.getStage().equals(JobStage.SPLIT) + && inputNode.getTagForDownstream(currentNode.getName()) != null; + } + // Helper method to execute with side output + private DataStream<Event> executeWithSideOutput(DataStream<Event> dataStream, JobTopologyNode inputNode, + JobTopologyNode currentNode, Executor<DataStream<Event>> executor) { + OutputTag<Event> outputTag = new OutputTag<Event>(inputNode.getTagForDownstream(currentNode.getName())) {}; + SingleOutputStreamOperator<Event> singleOutputStream = (SingleOutputStreamOperator<Event>) dataStream; + DataStream<Event> sideOutput = singleOutputStream.getSideOutput(outputTag); + return executor.execute(sideOutput, currentNode); } + // Helper method to get the node based on the name private Optional<JobTopologyNode> getNode(String name) { return jobTopologyNodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java index ab2aec3..27881a2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java @@ -1,6 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.JobStage; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -8,7 +9,9 @@ import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Represents an operator node in the execution graph. @@ -19,9 +22,20 @@ import java.util.List; @EqualsAndHashCode public class JobTopologyNode implements Serializable { private String name; - private JobStage type; + private JobStage stage; private int parallelism; private List<String> downstream = Collections.emptyList(); private List<String> tags = Collections.emptyList(); + public String getTagForDownstream(String downstreamNode) { + int index = downstream.indexOf(downstreamNode); + if (index < 0) { + return null; + } + return tags.get(index); + } + + + + } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java index b67a842..7629d48 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java @@ -6,9 +6,11 @@ public final class Constants { public static final String SOURCES = "sources"; public static final String FILTERS = "filters"; public static final String SPLITS = "splits"; + @Deprecated public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines"; - public static final String PROCESSING_PIPELINES = "processing_pipelines"; + @Deprecated public static final String POSTPROCESSING_PIPELINES = "postprocessing_pipelines"; + public static final String PROCESSING_PIPELINES = "processing_pipelines"; public static final String SINKS = "sinks"; public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java index 842d1bf..8807123 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java @@ -72,7 +72,7 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder { grootStreamRoot = yamlRootNode; } - YamlDomChecker.check(grootStreamRoot); + YamlDomChecker.check(grootStreamRoot, null); Node w3cRootNode = asW3cNode(grootStreamRoot); replaceVariables(w3cRootNode); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java index 00d7d9c..139c715 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -9,7 +9,7 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.api.common.udf.AggregateFunction; import com.geedgenetworks.api.common.udf.UDFContext; -import com.geedgenetworks.api.common.udf.UdfEntity; +import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; import com.geedgenetworks.api.expressions.AviatorExecutor; @@ -41,7 +41,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator protected final Map<Long, Map<String, Accumulator>> windows = new HashMap<>(); protected List<String> groupByFields; - private LinkedList<UdfEntity> functions; + private LinkedList<UDFEntity> functions; protected InternalMetrics internalMetrics; private final AggregateConfig aggregateConfig; @@ -93,7 +93,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator } } - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); } @@ -137,7 +137,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(keysMap); - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { udfEntity.getAggregateFunction().initAccumulator(accumulator); } return accumulator; @@ -164,7 +164,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator public Accumulator add(Event event, Accumulator accumulator) { accumulator.setInEvents(accumulator.getInEvents() + 1); - for (UdfEntity udafEntity : functions) { + for (UDFEntity udafEntity : functions) { try { boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true; if (result) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index ea05c1c..a7dff22 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -8,7 +8,7 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.api.common.udf.AggregateFunction; import com.geedgenetworks.api.common.udf.UDFContext; -import com.geedgenetworks.api.common.udf.UdfEntity; +import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.event.Event; import com.google.common.collect.Lists; import com.googlecode.aviator.AviatorEvaluator; @@ -31,7 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { private final List<UDFContext> udfContexts; private final List<String> udfClassNameLists; - private final LinkedList<UdfEntity> functions; + private final LinkedList<UDFEntity> functions; public AggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) { udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -64,7 +64,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f } } - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); } } catch (Exception e) { @@ -77,7 +77,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f Map<String, Object> map = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(map); - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { udfEntity.getAggregateFunction().initAccumulator(accumulator); } return accumulator; @@ -86,7 +86,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f public Accumulator add(Event event, Accumulator accumulator) { accumulator.setInEvents(accumulator.getInEvents()+1); - for (UdfEntity udafEntity : functions) { + for (UDFEntity udafEntity : functions) { try { boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true; if (result) { @@ -105,7 +105,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f @Override public Accumulator getResult(Accumulator accumulator) { - for (UdfEntity udafEntity : functions) { + for (UDFEntity udafEntity : functions) { try { udafEntity.getAggregateFunction().getResult(accumulator); } catch (Exception e) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index db5336a..d7946bd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -7,7 +7,7 @@ import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.api.common.udf.UDFContext; -import com.geedgenetworks.api.common.udf.UdfEntity; +import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.common.udf.AggregateFunction; import com.google.common.collect.Lists; import com.googlecode.aviator.AviatorEvaluator; @@ -29,7 +29,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> { private final List<UDFContext> udfContexts; private final List<String> udfClassNameLists; - private final LinkedList<UdfEntity> functions; + private final LinkedList<UDFEntity> functions; public SecondAggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) { udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -42,7 +42,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co try { for (UDFContext udfContext : udfContexts) { Expression filterExpression = null; - UdfEntity udfEntity = new UdfEntity(); + UDFEntity udfEntity = new UDFEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); @@ -59,7 +59,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co } } - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); } } catch (Exception e) { @@ -73,7 +73,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co Map<String, Object> map = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(map); - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { udfEntity.getAggregateFunction().initAccumulator(accumulator); } return accumulator; @@ -86,7 +86,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co @Override public Accumulator getResult(Accumulator accumulator) { - for (UdfEntity udafEntity : functions) { + for (UDFEntity udafEntity : functions) { try { udafEntity.getAggregateFunction().getResult(accumulator); } catch (Exception e) { @@ -99,7 +99,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co @Override public Accumulator merge(Accumulator acc1, Accumulator acc2) { acc1.setInEvents(acc1.getInEvents() + 1); - for (UdfEntity udafEntity : functions) { + for (UDFEntity udafEntity : functions) { try { udafEntity.getAggregateFunction().merge(acc1, acc2); } catch (ExpressionRuntimeException ignore) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index e079790..24e77ae 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -11,7 +11,7 @@ import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler; import com.geedgenetworks.api.common.udf.ScalarFunction; import com.geedgenetworks.api.common.udf.UDFContext; -import com.geedgenetworks.api.common.udf.UdfEntity; +import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; import com.google.common.collect.Lists; @@ -34,7 +34,7 @@ import java.util.Map; @Slf4j public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { private final ProjectionConfig projectionConfig; - private LinkedList<UdfEntity> functions; + private LinkedList<UDFEntity> functions; private transient InternalMetrics internalMetrics; @@ -88,7 +88,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { internalMetrics.incrementInEvents(); int errorCount = 0; - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { try { if (event.isDropped()) { @@ -135,7 +135,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { @Override public void close() throws Exception { - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { udfEntity.getScalarFunction().close(); } KnowledgeBaseScheduler.decrement(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java index d9232d7..bfd3d10 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java @@ -52,7 +52,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> { ctx.output(route.getOutputTag(), event); } } - }catch (Exception e) { + } catch (Exception e) { internalMetrics.incrementErrorEvents(); log.error("error in split function", e); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java index a054fcd..038a252 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -8,7 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.api.common.udf.TableFunction; import com.geedgenetworks.api.common.udf.UDFContext; -import com.geedgenetworks.api.common.udf.UdfEntity; +import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; import com.google.common.collect.Lists; @@ -28,7 +28,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; @Slf4j public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { - private LinkedList<UdfEntity> functions; + private LinkedList<UDFEntity> functions; private final TableConfig tableConfig; private transient InternalMetrics internalMetrics; @@ -50,7 +50,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); for (UDFContext udfContext : udfContexts) { Expression filterExpression = null; - UdfEntity udfEntity = new UdfEntity(); + UDFEntity udfEntity = new UDFEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); @@ -82,7 +82,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { int errorCount = 0; List<Event> events = new ArrayList<>(); events.add(event); - for (UdfEntity udfEntity : functions) { + for (UDFEntity udfEntity : functions) { List<Event> newEvents = new ArrayList<>(); for(int i=0;i<events.size();i++) { try { diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 9b58289..9f86144 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -14,7 +14,7 @@ import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/grootstream_job_split_test.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml index 9bb2900..3477b00 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml @@ -10,21 +10,28 @@ sources: json.ignore.parse.errors: false sinks: collect_sink: - type: collect + type: print properties: format: json splits: - test_split: + test_split_a: type: split rules: - - name: table_processor + - tag: http_a_tag expression: event.decoded_as == 'HTTP' - - name: pre_etl_processor + - tag: dns_a_tag + expression: event.decoded_as == 'DNS' + test_split_b: + type: split + rules: + - tag: base_b_tag + expression: event.decoded_as == 'BASE' + - tag: dns_b_tag expression: event.decoded_as == 'DNS' postprocessing_pipelines: pre_etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: [fields,tags] output_fields: functions: # [array of object] Function List @@ -45,7 +52,7 @@ postprocessing_pipelines: # parent_fields: [tags] # rename_fields: # tags: tags - rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + rename_expression: key=string.replace_all(key,'tags.','');key=string.replace_all(key,'fields.','');return key; - function: UNIX_TIMESTAMP_CONVERTER @@ -81,16 +88,22 @@ application: # [object] Application Configuration topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [test_split,collect_sink] - - name: test_split + downstream: [test_split_a, test_split_b] + - name: test_split_a parallelism: 1 + tags: [http_a_tag,dns_a_tag] downstream: [ table_processor,pre_etl_processor ] + - name: test_split_b + parallelism: 1 + tags: [ base_b_tag] + downstream: [ collect_sink ] - name: pre_etl_processor parallelism: 1 downstream: [ collect_sink ] - name: table_processor parallelism: 1 downstream: [ collect_sink ] + - name: collect_sink parallelism: 1 diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index 65cd3cb..0c00060 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -38,12 +38,12 @@ filters: properties: expression: event.decoded_as == 'HTTP' -preprocessing_pipelines: +processing_pipelines: + transform_processor: type: projection - remove_fields: [client_ip] + remove_fields: [ client_ip ] -processing_pipelines: session_record_processor: type: projection remove_fields: [device_tag] @@ -37,7 +37,7 @@ <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version> <flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version> <maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version> - <testcontainer.version>1.20.1</testcontainer.version> + <testcontainer.version>1.20.3</testcontainer.version> <awaitility.version>4.2.0</awaitility.version> <spotless.version>2.40.0</spotless.version> <slf4j.version>1.7.25</slf4j.version> @@ -65,7 +65,7 @@ <guava-retrying.version>2.0.0</guava-retrying.version> <ipaddress.version>5.3.3</ipaddress.version> <aviator.version>5.4.1</aviator.version> - <snakeyaml.version>1.29</snakeyaml.version> + <snakeyaml.version>2.0</snakeyaml.version> <nacos.version>2.0.4</nacos.version> <antlr4.version>4.8</antlr4.version> <jcommander.version>1.81</jcommander.version> @@ -73,7 +73,7 @@ <snappy-java.version>1.1.10.4</snappy-java.version> <lombok.version>1.18.24</lombok.version> <config.version>1.3.3</config.version> - <hazelcast.version>5.1</hazelcast.version> + <hazelcast.version>5.3.5</hazelcast.version> <quartz.version>2.3.2</quartz.version> <hadoop.version>2.7.1</hadoop.version> <!--Option config --> |
