diff options
| author | wangkuan <[email protected]> | 2024-09-04 09:50:32 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-09-04 09:50:32 +0800 |
| commit | 8217c23fa564ad2051427a12f4eabf3fea75aafc (patch) | |
| tree | 761f6b2dbe28c6a4e6e1392465aecdc869cf1cb1 | |
| parent | e3efdcac80dc1ca8fb0bdd08f69318f745f9bf7c (diff) | |
[improve][bootstrap]优化代码,修改splitsset位置,单元测试适配properties配置变更
4 files changed, 36 insertions, 26 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 6a23a0b..f6e19eb 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -6,7 +6,14 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.config.SplitConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -38,6 +45,7 @@ public class JobExecution { private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; private final List<URL> jarPaths; + private final Set<String> splitSet = new HashSet<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -201,6 +209,12 @@ public class JobExecution { } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); @@ -254,7 +268,7 @@ public class JobExecution { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { @@ -264,28 +278,28 @@ public class JobExecution { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 4c6c2d2..a4289ff 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -33,7 +33,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; - private Set<String> splitSet = new HashSet<>(); private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -200,14 +199,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } - public Set<String> getSplitSet() { - return splitSet; - } - - public void setSplitSet(Set<String> splitSet) { - this.splitSet = splitSet; - } - private void setCheckpoint() { long interval = 0; if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index 3513a67..e549087 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -37,9 +37,9 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { - Config routes = operatorConfig.getConfig(Constants.SPLITS); - routes.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), + Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); + splitsConfig.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key), SplitConfigOptions.TYPE.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( @@ -75,9 +75,6 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { if (node.getParallelism() > 0) { splitConfig.setParallelism(node.getParallelism()); } - for(RuleContext ruleContext:splitConfig.getRules()) { - jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName()); - } try { dataStream = split.splitFunction( diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 7e995cf..7b9544a 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -7,8 +7,10 @@ import com.geedgenetworks.bootstrap.execution.*; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.udf.RuleContext; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -42,7 +44,7 @@ public class JobExecutionTest { private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; - + private final Set<String> splitSet = new HashSet<>(); private final List<Node> nodes; private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = @@ -219,6 +221,12 @@ public class JobExecutionTest { } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); @@ -262,7 +270,7 @@ public class JobExecutionTest { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { @@ -272,28 +280,28 @@ public class JobExecutionTest { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { |
