diff options
| author | 王宽 <[email protected]> | 2024-09-10 09:32:22 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-09-10 09:32:22 +0000 |
| commit | e9f8f7a435d805fa6d0ec8b06715f0f68be326ff (patch) | |
| tree | dab763cc768efac0df16fca860fa47a73b2eb534 | |
| parent | e4047ae73ed837fcc931fd2dee45c7fca9741df4 (diff) | |
| parent | fc1675f6f270270f7dc2331bcc9f50c77057eaa7 (diff) | |
Merge branch 'feature/split-tag' into 'develop'
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t...
See merge request galaxy/platform/groot-stream!104
15 files changed, 54 insertions, 465 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 37ef114..8f27609 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -15,9 +15,9 @@ splits: decoded_as_split: type: split rules: - - name: projection_processor + - tag: http_tag expression: event.decoded_as == 'HTTP' - - name: aggregate_processor + - tag: dns_tag expression: event.decoded_as == 'DNS' processing_pipelines: projection_processor: @@ -80,7 +80,8 @@ application: - name: inline_source downstream: [decoded_as_split] - name: decoded_as_split - downstream: [ projection_processor ,aggregate_processor] + tags: [http_tag, dns_tag] + downstream: [ projection_processor, aggregate_processor] - name: projection_processor downstream: [ print_sink ] - name: aggregate_processor diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java index 61ced82..5b36671 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java @@ -98,6 +98,7 @@ public class ExecuteCommandArgs extends CommandArgs { static { TARGET_TYPE_LIST.add(TargetType.LOCAL); + TARGET_TYPE_LIST.add(TargetType.TEST); TARGET_TYPE_LIST.add(TargetType.REMOTE); TARGET_TYPE_LIST.add(TargetType.YARN_SESSION); TARGET_TYPE_LIST.add(TargetType.YARN_PER_JOB); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java index abf1c1f..bdc70d0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java @@ -4,6 +4,7 @@ public enum TargetType { LOCAL("local"), REMOTE("remote"), + TEST("test"), YARN_SESSION("yarn-session"), YARN_PER_JOB("yarn-per-job"), YARN_APPLICATION("yarn-application"); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index f6e19eb..706fc18 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -6,14 +6,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.common.config.SplitConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.common.udf.RuleContext; -import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -45,7 +38,7 @@ public class JobExecution { private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; private final List<URL> jarPaths; - private final Set<String> splitSet = new HashSet<>(); + private final Map<String,String> nodeNameWithSplitTags = new HashMap<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -209,12 +202,6 @@ public class JobExecution { } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (splits.containsKey(node.getName())) { - splits.forEach((key, value) -> { - SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); - for(RuleContext ruleContext:splitConfig.getRules()) { - splitSet.add(ruleContext.getName()); - } - }); node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); @@ -233,7 +220,7 @@ public class JobExecution { public void execute() throws JobExecuteException { - if (!jobRuntimeEnvironment.isLocalMode()) { + if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } List<Node> sourceNodes = nodes @@ -268,39 +255,46 @@ public class JobExecution { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (splitSet.contains(node.getName())) { - dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = filterExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + if (node.getTags().size() == node.getDownstream().size()) { + for (int i = 0; i < node.getDownstream().size();i++) { + nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i)); + } + } + else { + throw new JobExecuteException("split node downstream size not equal tags size"); + } dataStream = splitExecutor.execute(dataStream, node); - } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){ }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (splitSet.contains(node.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = sinkExecutor.execute(dataStream, node); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index a4289ff..e23d446 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -93,7 +93,10 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget()); } - + public boolean isTestMode() { + return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) + && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.TEST.getTarget()); + } @Override public void registerPlugin(List<URL> pluginPaths) { pluginPaths.forEach(url -> log.info("Begin register plugins: {}", url)); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java index f86d106..66303c2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java @@ -19,5 +19,6 @@ public class Node implements Serializable { private ProcessorType type; private List<String> downstream = Collections.emptyList(); private int parallelism; + private List<String> tags = Collections.emptyList(); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java deleted file mode 100644 index d7ed524..0000000 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.geedgenetworks.bootstrap.main.simple; - -import com.geedgenetworks.bootstrap.command.Command; -import com.geedgenetworks.bootstrap.command.CommandArgs; -import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; -import com.geedgenetworks.bootstrap.enums.EngineType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.bootstrap.utils.CommandLineUtils; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; - -@Slf4j -public class GrootStreamServerTest { - public static void main(String[] args) { - ExecuteCommandArgs bootstrapCommandArgs = CommandLineUtils - .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); - run(bootstrapCommandArgs.buildCommand()); - } - - public static <T extends CommandArgs> void run(Command<T> command) throws JobExecuteException { - try { - command.execute(); - } catch (ConfigCheckException e) { - outputConfigError(e); - throw e; - } catch (Exception e) { - outputFatalError(e); - throw e; - } - } - private static void outputConfigError(Throwable throwable) { - log.error( - "\n\n===============================================================================\n\n"); - String errorMsg = throwable.getMessage(); - log.error("Config Error:\n"); - log.error("Reason: {} \n", errorMsg); - log.error( - "\n===============================================================================\n\n\n"); - } - - - private static void outputFatalError(Throwable throwable) { - log.error("\\n\\n===============================================================================\\n\\n"); - String errorMsg = throwable.getMessage(); - log.error("Fatal Error ,Reason is :{} \n", errorMsg); - log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable)); - log.error("\\n\\n===============================================================================\\n\\n"); - } - - - - -} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java index 9fa81c0..6c69f64 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java @@ -3,8 +3,8 @@ package com.geedgenetworks.bootstrap.main.simple; import cn.hutool.setting.yaml.YamlUtil; import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; @@ -25,7 +25,7 @@ import java.nio.file.Path; import java.util.Map; -public class JobSplitWithAggTest { +public class JobAggTest { @ClassRule public static MiniClusterWithClientResource flinkCluster = @@ -39,7 +39,7 @@ public class JobSplitWithAggTest { public void testSplitForAgg() { CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_agg_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -57,14 +57,8 @@ public class JobSplitWithAggTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); Assert.assertEquals(2, CollectSink.values.size()); Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java index 90ff95d..a6516d6 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java @@ -3,8 +3,8 @@ package com.geedgenetworks.bootstrap.main.simple; import cn.hutool.setting.yaml.YamlUtil; import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; @@ -22,14 +22,13 @@ import org.junit.ClassRule; import org.junit.Test; import java.nio.file.Path; -import java.util.List; import java.util.Map; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; -public class SimpleJobTest { +public class JobEtlTest { @ClassRule public static MiniClusterWithClientResource flinkCluster = @@ -42,7 +41,7 @@ public class SimpleJobTest { @Test public void testEtl() { - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_etl_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -60,15 +59,8 @@ public class SimpleJobTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); Assert.assertEquals(1, CollectSink.values.size()); Assert.assertEquals("BASE", CollectSink.values.get(0).getExtractedFields().get("decoded_as").toString()); Assert.assertEquals("google.com", CollectSink.values.get(0).getExtractedFields().get("server_domain").toString()); @@ -86,7 +78,7 @@ public class SimpleJobTest { public void testTransmission() { CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_transmission_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_transmission_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -104,15 +96,8 @@ public class SimpleJobTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); Assert.assertEquals(4, CollectSink.values.size()); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java deleted file mode 100644 index 7b9544a..0000000 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ /dev/null @@ -1,327 +0,0 @@ -package com.geedgenetworks.bootstrap.main.simple; - -import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.bootstrap.execution.*; -import com.geedgenetworks.bootstrap.main.GrootStreamRunner; -import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.common.udf.RuleContext; -import com.geedgenetworks.common.utils.ReflectionUtils; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.pojo.SplitConfig; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigUtil; -import com.typesafe.config.ConfigValueFactory; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.util.OutputTag; - -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Path; -import java.util.*; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -@Slf4j -@Data -public class JobExecutionTest { - - protected final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; - private final Set<String> splitSet = new HashSet<>(); - private final List<Node> nodes; - - private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = - (classLoader, url) -> { - if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) { - URLClassLoader c = - (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get(); - ReflectionUtils.invoke(c, "addURL", url); - } else if (classLoader instanceof URLClassLoader) { - ReflectionUtils.invoke(classLoader, "addURL", url); - } else { - throw new RuntimeException( - "Unsupported classloader: " + classLoader.getClass().getName()); - } - }; - private final List<URL> jarPaths; - public JobExecutionTest(Config config, GrootStreamConfig grootStreamConfig) { - try { - jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() - .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) - .toURI().toURL())); - } catch (MalformedURLException e) { - throw new JobExecuteException("load groot stream bootstrap jar error.", e); - } - registerPlugin(config.getConfig(Constants.APPLICATION)); - - this.sourceExecutor = new SourceExecutor(jarPaths, config); - this.sinkExecutor = new SinkExecutor(jarPaths, config); - this.splitExecutor = new SplitExecutor(jarPaths, config); - this.filterExecutor = new FilterExecutor(jarPaths, config); - this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); - this.processingExecutor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); - this.jobRuntimeEnvironment = - JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); - this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.nodes = buildJobNode(config); - - } - - private void registerPlugin(Config appConfig) { - List<Path> thirdPartyJars = new ArrayList<>(); - Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { - thirdPartyJars = new ArrayList<>(StartBuilder - .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); - } - thirdPartyJars.addAll(StartBuilder.getConnectorJars()); - thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies()); - - List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(), StartBuilder.getLibJars().stream()) - .map(Path::toUri) - .map(uri -> { - try { - return uri.toURL(); - }catch (MalformedURLException e){ - throw new RuntimeException("the uri of jar illegal: " + uri, e); - } - }) - .collect(Collectors.toList()); - jarDependencies.forEach(url -> { - ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url); - }); - jarPaths.addAll(jarDependencies); - - } - - - private Config registerPlugin(Config config , List<URL> jars) { - config = this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); - return this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); - } - - - private Config injectJarsToConfig(Config config, String path, List<URL> jars) { - List<URL> validJars = new ArrayList<>(); - for (URL jarUrl : jars) { - if (new File(jarUrl.getFile()).exists()) { - validJars.add(jarUrl); - log.info("Inject jar to config: {}", jarUrl); - } else { - log.warn("Remove invalid jar when inject jars into config: {}", jarUrl); - } - } - - if (config.hasPath(path)) { - Set<URL> paths = - Arrays.stream(config.getString(path).split(";")) - .map( - uri -> { - try { - return new URL(uri); - } catch (MalformedURLException e) { - throw new RuntimeException( - "the uri of jar illegal:" + uri, e); - } - }) - .collect(Collectors.toSet()); - paths.addAll(validJars); - - config = config.withValue( - path, - ConfigValueFactory.fromAnyRef( - paths.stream() - .map(URL::toString) - .distinct() - .collect(Collectors.joining(";")))); - } else { - config = - config.withValue( - path, - ConfigValueFactory.fromAnyRef( - validJars.stream() - .map(URL::toString) - .distinct() - .collect(Collectors.joining(";")))); - } - return config; - } - - private List<Node> buildJobNode(Config config) { - - - Map<String, Object> sources = Maps.newHashMap(); - Map<String, Object> sinks =Maps.newHashMap(); - Map<String, Object> filters = Maps.newHashMap(); - Map<String, Object> splits = Maps.newHashMap(); - Map<String, Object> preprocessingPipelines = Maps.newHashMap(); - Map<String, Object> processingPipelines = Maps.newHashMap(); - Map<String, Object> postprocessingPipelines = Maps.newHashMap(); - - if (config.hasPath(Constants.SOURCES)) { - sources = config.getConfig(Constants.SOURCES).root().unwrapped(); - } - if (config.hasPath(Constants.SINKS)) { - sinks =config.getConfig(Constants.SINKS).root().unwrapped(); - } - if (config.hasPath(Constants.FILTERS)) { - filters = config.getConfig(Constants.FILTERS).root().unwrapped(); - } - if (config.hasPath(Constants.SPLITS)) { - splits = config.getConfig(Constants.SPLITS).root().unwrapped(); - } - if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { - preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); - } - if (config.hasPath(Constants.PROCESSING_PIPELINES)) { - processingPipelines = config.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped(); - } - if (config.hasPath(Constants.POSTPROCESSING_PIPELINES)) { - postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped(); - } - - List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY); - - List<Node> nodes = Lists.newArrayList(); - - topology.forEach(item -> { - Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class); - nodes.add(node); - }); - - for(Node node : nodes) { - if (sources.containsKey(node.getName())) { - node.setType(ProcessorType.SOURCE); - } else if (sinks.containsKey(node.getName())) { - node.setType(ProcessorType.SINK); - } else if (splits.containsKey(node.getName())) { - splits.forEach((key, value) -> { - SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); - for(RuleContext ruleContext:splitConfig.getRules()) { - splitSet.add(ruleContext.getName()); - } - }); - node.setType(ProcessorType.SPLIT); - } else if (filters.containsKey(node.getName())) { - node.setType(ProcessorType.FILTER); - } else if (preprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PREPROCESSING); - } else if (processingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PROCESSING); - } else if (postprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.POSTPROCESSING); - } else { - throw new JobExecuteException("unsupported process type " + node.getName()); - } - } - - return nodes; - - } - - - public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException { - - List<Node> sourceNodes = nodes - .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - - DataStream<Event> singleOutputStreamOperator = null; - - for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); - for (String nodeName : sourceNode.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); - } - } - - return singleOutputStreamOperator; - - - } - - private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { - Node node = getNode(downstreamNodeName).orElseGet(() -> { - throw new JobExecuteException("can't find downstream node " + downstreamNodeName); - }); - if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (splitSet.contains(node.getName())) { - dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = filterExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { - dataStream = splitExecutor.execute(dataStream, node); - - } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = preprocessingExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = processingExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = postprocessingExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (splitSet.contains(node.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = sinkExecutor.execute(dataStream, node); - } - } else { - throw new JobExecuteException("unsupported process type " + node.getType().name()); - } - - - for (String nodeName : node.getDownstream()) { - buildJobGraph(dataStream, nodeName); - } - - - } - - private Optional<Node> getNode(String name) { - return nodes.stream().filter(v-> v.getName().equals(name)).findFirst(); - } - - -} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java index 2f6984b..352bad2 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; @@ -42,7 +43,7 @@ public class JobSplitTest { public void testSplit() { CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_split_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -60,14 +61,8 @@ public class JobSplitTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); Assert.assertEquals(7, CollectSink.values.size()); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml index 5163642..ee589ef 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml @@ -32,12 +32,6 @@ postprocessing_pipelines: - function: MEAN lookup_fields: [ pkts ] - table_processor: - type: table - functions: - - function: JSON_UNROLL - lookup_fields: [ encapsulation ] - output_fields: [ new_name ] application: # [object] Application Configuration env: # [object] Environment Variables diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml index 9bb2900..5f839ed 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -17,9 +17,9 @@ splits: test_split: type: split rules: - - name: table_processor + - tag: http_tag expression: event.decoded_as == 'HTTP' - - name: pre_etl_processor + - tag: dns_tag expression: event.decoded_as == 'DNS' postprocessing_pipelines: @@ -83,8 +83,9 @@ application: # [object] Application Configuration parallelism: 1 # [number] Operator-Level Parallelism. downstream: [test_split,collect_sink] - name: test_split - parallelism: 1 + tags: [http_tag,dns_tag] downstream: [ table_processor,pre_etl_processor ] + parallelism: 1 - name: pre_etl_processor parallelism: 1 downstream: [ collect_sink ] diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java index ead0ecd..6aa9e3d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java @@ -10,7 +10,7 @@ import java.io.Serializable; @Data public class RuleContext implements Serializable { - private String name; + private String tag; private String expression; private Expression compiledExpression; private OutputTag<Event> outputTag ; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java index f07b568..07d4f9f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -43,7 +43,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> { instance.setFunctionMissing(null); Expression compiledExp = instance.compile(expression, true); rule.setCompiledExpression(compiledExp); - OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){}; + OutputTag<Event> outputTag = new OutputTag<>(rule.getTag()){}; rule.setOutputTag(outputTag); } } |
