diff options
| author | 王宽 <[email protected]> | 2024-09-04 01:56:54 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-09-04 01:56:54 +0000 |
| commit | e4047ae73ed837fcc931fd2dee45c7fca9741df4 (patch) | |
| tree | 657f4eeb37c5dafb3f680c1bf265e730eb11e9c8 /groot-bootstrap | |
| parent | e3efdcac80dc1ca8fb0bdd08f69318f745f9bf7c (diff) | |
| parent | 063af747e7ccedd2c6c0688782766616db3f36b0 (diff) | |
Merge branch 'improve/split' into 'develop'v1.6.0-1-SNAPSHOT
Improve/split
See merge request galaxy/platform/groot-stream!103
Diffstat (limited to 'groot-bootstrap')
5 files changed, 39 insertions, 28 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 6a23a0b..f6e19eb 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -6,7 +6,14 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.config.SplitConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -38,6 +45,7 @@ public class JobExecution { private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; private final List<URL> jarPaths; + private final Set<String> splitSet = new HashSet<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -201,6 +209,12 @@ public class JobExecution { } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); @@ -254,7 +268,7 @@ public class JobExecution { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { @@ -264,28 +278,28 @@ public class JobExecution { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 4c6c2d2..a4289ff 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -33,7 +33,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; - private Set<String> splitSet = new HashSet<>(); private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -200,14 +199,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } - public Set<String> getSplitSet() { - return splitSet; - } - - public void setSplitSet(Set<String> splitSet) { - this.splitSet = splitSet; - } - private void setCheckpoint() { long interval = 0; if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index 3513a67..e549087 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -37,9 +37,9 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { - Config routes = operatorConfig.getConfig(Constants.SPLITS); - routes.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), + Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); + splitsConfig.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key), SplitConfigOptions.TYPE.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( @@ -75,9 +75,6 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { if (node.getParallelism() > 0) { splitConfig.setParallelism(node.getParallelism()); } - for(RuleContext ruleContext:splitConfig.getRules()) { - jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName()); - } try { dataStream = split.splitFunction( diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 7e995cf..7b9544a 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -7,8 +7,10 @@ import com.geedgenetworks.bootstrap.execution.*; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.udf.RuleContext; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -42,7 +44,7 @@ public class JobExecutionTest { private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; - + private final Set<String> splitSet = new HashSet<>(); private final List<Node> nodes; private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = @@ -219,6 +221,12 @@ public class JobExecutionTest { } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); @@ -262,7 +270,7 @@ public class JobExecutionTest { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { @@ -272,28 +280,28 @@ public class JobExecutionTest { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { }), node); } else { diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 76dcf7f..9724e21 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -191,6 +191,8 @@ sinks: application: # [object] Application Configuration env: # [object] Environment Variables name: groot-stream-job # [string] Job Name + properties: + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket pipeline: object-reuse: true # [boolean] Object Reuse, default is false topology: # [array of object] Node List. It will be used build data flow for job dag graph. @@ -205,6 +207,5 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 - properties: - hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + |
