From 8217c23fa564ad2051427a12f4eabf3fea75aafc Mon Sep 17 00:00:00 2001 From: wangkuan Date: Wed, 4 Sep 2024 09:50:32 +0800 Subject: [improve][bootstrap]优化代码,修改splitsset位置,单元测试适配properties配置变更 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bootstrap/execution/JobExecution.java | 24 +++++++++++++++++----- .../bootstrap/execution/JobRuntimeEnvironment.java | 9 -------- .../bootstrap/execution/SplitExecutor.java | 9 +++----- .../bootstrap/main/simple/JobExecutionTest.java | 20 ++++++++++++------ 4 files changed, 36 insertions(+), 26 deletions(-) (limited to 'groot-bootstrap') diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 6a23a0b..f6e19eb 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -6,7 +6,14 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.config.SplitConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -38,6 +45,7 @@ public class JobExecution { private final Executor, JobRuntimeEnvironment> postprocessingExecutor; private final List nodes; private final List jarPaths; + private final Set splitSet = new HashSet<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -201,6 +209,12 @@ public class JobExecution { } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); @@ -254,7 +268,7 @@ public class JobExecution { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { @@ -264,28 +278,28 @@ public class JobExecution { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 4c6c2d2..a4289ff 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -33,7 +33,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; - private Set splitSet = new HashSet<>(); private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -200,14 +199,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } - public Set getSplitSet() { - return splitSet; - } - - public void setSplitSet(Set splitSet) { - this.splitSet = splitSet; - } - private void setCheckpoint() { long interval = 0; if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index 3513a67..e549087 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -37,9 +37,9 @@ public class SplitExecutor extends AbstractExecutor { protected Map initialize(List jarPaths, Config operatorConfig) { Map splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { - Config routes = operatorConfig.getConfig(Constants.SPLITS); - routes.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), + Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); + splitsConfig.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key), SplitConfigOptions.TYPE.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( @@ -75,9 +75,6 @@ public class SplitExecutor extends AbstractExecutor { if (node.getParallelism() > 0) { splitConfig.setParallelism(node.getParallelism()); } - for(RuleContext ruleContext:splitConfig.getRules()) { - jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName()); - } try { dataStream = split.splitFunction( diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 7e995cf..7b9544a 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -7,8 +7,10 @@ import com.geedgenetworks.bootstrap.execution.*; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.udf.RuleContext; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -42,7 +44,7 @@ public class JobExecutionTest { private final Executor, JobRuntimeEnvironment> processingExecutor; private final Executor, JobRuntimeEnvironment> postprocessingExecutor; private final Executor, JobRuntimeEnvironment> sinkExecutor; - + private final Set splitSet = new HashSet<>(); private final List nodes; private BiConsumer ADD_URL_TO_CLASSLOADER = @@ -219,6 +221,12 @@ public class JobExecutionTest { } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); @@ -262,7 +270,7 @@ public class JobExecutionTest { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { @@ -272,28 +280,28 @@ public class JobExecutionTest { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { -- cgit v1.2.3 From 063af747e7ccedd2c6c0688782766616db3f36b0 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Wed, 4 Sep 2024 09:51:57 +0800 Subject: [improve][bootstrap]单元测试配置文件适配properties变更 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'groot-bootstrap') diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 76dcf7f..9724e21 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -191,6 +191,8 @@ sinks: application: # [object] Application Configuration env: # [object] Environment Variables name: groot-stream-job # [string] Job Name + properties: + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket pipeline: object-reuse: true # [boolean] Object Reuse, default is false topology: # [array of object] Node List. It will be used build data flow for job dag graph. @@ -205,6 +207,5 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 - properties: - hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + -- cgit v1.2.3