summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-09-10 17:14:50 +0800
committerwangkuan <[email protected]>2024-09-10 17:14:50 +0800
commitdaf6ee08400b1102bb7e97af2c5663fc5cee24b4 (patch)
tree0db262096e7f5f5681489ee406adb9a240dca071
parente4047ae73ed837fcc931fd2dee45c7fca9741df4 (diff)
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为tag。完善单元测试,target_type新增test模式。
-rw-r--r--config/grootstream_job_example.yaml7
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java46
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java5
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java1
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java54
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java327
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java13
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java13
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java27
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml7
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java2
14 files changed, 52 insertions, 454 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 37ef114..8f27609 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -15,9 +15,9 @@ splits:
decoded_as_split:
type: split
rules:
- - name: projection_processor
+ - tag: http_tag
expression: event.decoded_as == 'HTTP'
- - name: aggregate_processor
+ - tag: dns_tag
expression: event.decoded_as == 'DNS'
processing_pipelines:
projection_processor:
@@ -80,7 +80,8 @@ application:
- name: inline_source
downstream: [decoded_as_split]
- name: decoded_as_split
- downstream: [ projection_processor ,aggregate_processor]
+ tags: [http_tag, dns_tag]
+ downstream: [ projection_processor, aggregate_processor]
- name: projection_processor
downstream: [ print_sink ]
- name: aggregate_processor
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
index 61ced82..5b36671 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
@@ -98,6 +98,7 @@ public class ExecuteCommandArgs extends CommandArgs {
static {
TARGET_TYPE_LIST.add(TargetType.LOCAL);
+ TARGET_TYPE_LIST.add(TargetType.TEST);
TARGET_TYPE_LIST.add(TargetType.REMOTE);
TARGET_TYPE_LIST.add(TargetType.YARN_SESSION);
TARGET_TYPE_LIST.add(TargetType.YARN_PER_JOB);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java
index abf1c1f..bdc70d0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java
@@ -4,6 +4,7 @@ public enum TargetType {
LOCAL("local"),
REMOTE("remote"),
+ TEST("test"),
YARN_SESSION("yarn-session"),
YARN_PER_JOB("yarn-per-job"),
YARN_APPLICATION("yarn-application");
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index f6e19eb..706fc18 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -6,14 +6,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.common.config.SplitConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.common.udf.RuleContext;
-import com.geedgenetworks.core.pojo.SplitConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -45,7 +38,7 @@ public class JobExecution {
private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
private final List<Node> nodes;
private final List<URL> jarPaths;
- private final Set<String> splitSet = new HashSet<>();
+ private final Map<String,String> nodeNameWithSplitTags = new HashMap<>();
public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
@@ -209,12 +202,6 @@ public class JobExecution {
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
} else if (splits.containsKey(node.getName())) {
- splits.forEach((key, value) -> {
- SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
- for(RuleContext ruleContext:splitConfig.getRules()) {
- splitSet.add(ruleContext.getName());
- }
- });
node.setType(ProcessorType.SPLIT);
} else if (preprocessingPipelines.containsKey(node.getName())) {
node.setType(ProcessorType.PREPROCESSING);
@@ -233,7 +220,7 @@ public class JobExecution {
public void execute() throws JobExecuteException {
- if (!jobRuntimeEnvironment.isLocalMode()) {
+ if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
List<Node> sourceNodes = nodes
@@ -268,39 +255,46 @@ public class JobExecution {
throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = filterExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ if (node.getTags().size() == node.getDownstream().size()) {
+ for (int i = 0; i < node.getDownstream().size();i++) {
+ nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i));
+ }
+ }
+ else {
+ throw new JobExecuteException("split node downstream size not equal tags size");
+ }
dataStream = splitExecutor.execute(dataStream, node);
-
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){
}), node);
} else {
dataStream = preprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = processingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = postprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = sinkExecutor.execute(dataStream, node);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index a4289ff..e23d446 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -93,7 +93,10 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
&& envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget());
}
-
+ public boolean isTestMode() {
+ return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
+ && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.TEST.getTarget());
+ }
@Override
public void registerPlugin(List<URL> pluginPaths) {
pluginPaths.forEach(url -> log.info("Begin register plugins: {}", url));
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
index f86d106..66303c2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
@@ -19,5 +19,6 @@ public class Node implements Serializable {
private ProcessorType type;
private List<String> downstream = Collections.emptyList();
private int parallelism;
+ private List<String> tags = Collections.emptyList();
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java
deleted file mode 100644
index d7ed524..0000000
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.geedgenetworks.bootstrap.main.simple;
-
-import com.geedgenetworks.bootstrap.command.Command;
-import com.geedgenetworks.bootstrap.command.CommandArgs;
-import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
-import com.geedgenetworks.bootstrap.enums.EngineType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-@Slf4j
-public class GrootStreamServerTest {
- public static void main(String[] args) {
- ExecuteCommandArgs bootstrapCommandArgs = CommandLineUtils
- .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
- run(bootstrapCommandArgs.buildCommand());
- }
-
- public static <T extends CommandArgs> void run(Command<T> command) throws JobExecuteException {
- try {
- command.execute();
- } catch (ConfigCheckException e) {
- outputConfigError(e);
- throw e;
- } catch (Exception e) {
- outputFatalError(e);
- throw e;
- }
- }
- private static void outputConfigError(Throwable throwable) {
- log.error(
- "\n\n===============================================================================\n\n");
- String errorMsg = throwable.getMessage();
- log.error("Config Error:\n");
- log.error("Reason: {} \n", errorMsg);
- log.error(
- "\n===============================================================================\n\n\n");
- }
-
-
- private static void outputFatalError(Throwable throwable) {
- log.error("\\n\\n===============================================================================\\n\\n");
- String errorMsg = throwable.getMessage();
- log.error("Fatal Error ,Reason is :{} \n", errorMsg);
- log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable));
- log.error("\\n\\n===============================================================================\\n\\n");
- }
-
-
-
-
-}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
deleted file mode 100644
index 7b9544a..0000000
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-package com.geedgenetworks.bootstrap.main.simple;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.bootstrap.execution.*;
-import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
-import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.common.udf.RuleContext;
-import com.geedgenetworks.common.utils.ReflectionUtils;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.pojo.SplitConfig;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigUtil;
-import com.typesafe.config.ConfigValueFactory;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.util.OutputTag;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.Path;
-import java.util.*;
-import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-@Data
-public class JobExecutionTest {
-
- protected final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
- private final Set<String> splitSet = new HashSet<>();
- private final List<Node> nodes;
-
- private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
- (classLoader, url) -> {
- if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
- URLClassLoader c =
- (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
- ReflectionUtils.invoke(c, "addURL", url);
- } else if (classLoader instanceof URLClassLoader) {
- ReflectionUtils.invoke(classLoader, "addURL", url);
- } else {
- throw new RuntimeException(
- "Unsupported classloader: " + classLoader.getClass().getName());
- }
- };
- private final List<URL> jarPaths;
- public JobExecutionTest(Config config, GrootStreamConfig grootStreamConfig) {
- try {
- jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
- .resolve(GrootStreamRunner.APP_JAR_NAME).toString())
- .toURI().toURL()));
- } catch (MalformedURLException e) {
- throw new JobExecuteException("load groot stream bootstrap jar error.", e);
- }
- registerPlugin(config.getConfig(Constants.APPLICATION));
-
- this.sourceExecutor = new SourceExecutor(jarPaths, config);
- this.sinkExecutor = new SinkExecutor(jarPaths, config);
- this.splitExecutor = new SplitExecutor(jarPaths, config);
- this.filterExecutor = new FilterExecutor(jarPaths, config);
- this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecutor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
- this.jobRuntimeEnvironment =
- JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
- this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.nodes = buildJobNode(config);
-
- }
-
- private void registerPlugin(Config appConfig) {
- List<Path> thirdPartyJars = new ArrayList<>();
- Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
- if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
- thirdPartyJars = new ArrayList<>(StartBuilder
- .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS)));
- }
- thirdPartyJars.addAll(StartBuilder.getConnectorJars());
- thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies());
-
- List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(), StartBuilder.getLibJars().stream())
- .map(Path::toUri)
- .map(uri -> {
- try {
- return uri.toURL();
- }catch (MalformedURLException e){
- throw new RuntimeException("the uri of jar illegal: " + uri, e);
- }
- })
- .collect(Collectors.toList());
- jarDependencies.forEach(url -> {
- ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url);
- });
- jarPaths.addAll(jarDependencies);
-
- }
-
-
- private Config registerPlugin(Config config , List<URL> jars) {
- config = this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
- return this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars);
- }
-
-
- private Config injectJarsToConfig(Config config, String path, List<URL> jars) {
- List<URL> validJars = new ArrayList<>();
- for (URL jarUrl : jars) {
- if (new File(jarUrl.getFile()).exists()) {
- validJars.add(jarUrl);
- log.info("Inject jar to config: {}", jarUrl);
- } else {
- log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
- }
- }
-
- if (config.hasPath(path)) {
- Set<URL> paths =
- Arrays.stream(config.getString(path).split(";"))
- .map(
- uri -> {
- try {
- return new URL(uri);
- } catch (MalformedURLException e) {
- throw new RuntimeException(
- "the uri of jar illegal:" + uri, e);
- }
- })
- .collect(Collectors.toSet());
- paths.addAll(validJars);
-
- config = config.withValue(
- path,
- ConfigValueFactory.fromAnyRef(
- paths.stream()
- .map(URL::toString)
- .distinct()
- .collect(Collectors.joining(";"))));
- } else {
- config =
- config.withValue(
- path,
- ConfigValueFactory.fromAnyRef(
- validJars.stream()
- .map(URL::toString)
- .distinct()
- .collect(Collectors.joining(";"))));
- }
- return config;
- }
-
- private List<Node> buildJobNode(Config config) {
-
-
- Map<String, Object> sources = Maps.newHashMap();
- Map<String, Object> sinks =Maps.newHashMap();
- Map<String, Object> filters = Maps.newHashMap();
- Map<String, Object> splits = Maps.newHashMap();
- Map<String, Object> preprocessingPipelines = Maps.newHashMap();
- Map<String, Object> processingPipelines = Maps.newHashMap();
- Map<String, Object> postprocessingPipelines = Maps.newHashMap();
-
- if (config.hasPath(Constants.SOURCES)) {
- sources = config.getConfig(Constants.SOURCES).root().unwrapped();
- }
- if (config.hasPath(Constants.SINKS)) {
- sinks =config.getConfig(Constants.SINKS).root().unwrapped();
- }
- if (config.hasPath(Constants.FILTERS)) {
- filters = config.getConfig(Constants.FILTERS).root().unwrapped();
- }
- if (config.hasPath(Constants.SPLITS)) {
- splits = config.getConfig(Constants.SPLITS).root().unwrapped();
- }
- if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
- preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
- }
- if (config.hasPath(Constants.PROCESSING_PIPELINES)) {
- processingPipelines = config.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped();
- }
- if (config.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
- postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped();
- }
-
- List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY);
-
- List<Node> nodes = Lists.newArrayList();
-
- topology.forEach(item -> {
- Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class);
- nodes.add(node);
- });
-
- for(Node node : nodes) {
- if (sources.containsKey(node.getName())) {
- node.setType(ProcessorType.SOURCE);
- } else if (sinks.containsKey(node.getName())) {
- node.setType(ProcessorType.SINK);
- } else if (splits.containsKey(node.getName())) {
- splits.forEach((key, value) -> {
- SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
- for(RuleContext ruleContext:splitConfig.getRules()) {
- splitSet.add(ruleContext.getName());
- }
- });
- node.setType(ProcessorType.SPLIT);
- } else if (filters.containsKey(node.getName())) {
- node.setType(ProcessorType.FILTER);
- } else if (preprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PREPROCESSING);
- } else if (processingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PROCESSING);
- } else if (postprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.POSTPROCESSING);
- } else {
- throw new JobExecuteException("unsupported process type " + node.getName());
- }
- }
-
- return nodes;
-
- }
-
-
- public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException {
-
- List<Node> sourceNodes = nodes
- .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
-
- DataStream<Event> singleOutputStreamOperator = null;
-
- for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
- for (String nodeName : sourceNode.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
- }
- }
-
- return singleOutputStreamOperator;
-
-
- }
-
- private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
- Node node = getNode(downstreamNodeName).orElseGet(() -> {
- throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
- });
- if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = filterExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
- dataStream = splitExecutor.execute(dataStream, node);
-
- } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = preprocessingExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = processingExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = postprocessingExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = sinkExecutor.execute(dataStream, node);
- }
- } else {
- throw new JobExecuteException("unsupported process type " + node.getType().name());
- }
-
-
- for (String nodeName : node.getDownstream()) {
- buildJobGraph(dataStream, nodeName);
- }
-
-
- }
-
- private Optional<Node> getNode(String name) {
- return nodes.stream().filter(v-> v.getName().equals(name)).findFirst();
- }
-
-
-}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
index 2f6984b..352bad2 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
@@ -42,7 +43,7 @@ public class JobSplitTest {
public void testSplit() {
CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_split_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -60,14 +61,8 @@ public class JobSplitTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
Assert.assertEquals(7, CollectSink.values.size());
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
index 9fa81c0..57feb0c 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
@@ -39,7 +40,7 @@ public class JobSplitWithAggTest {
public void testSplitForAgg() {
CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_split_agg_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -57,14 +58,8 @@ public class JobSplitWithAggTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
Assert.assertEquals(2, CollectSink.values.size());
Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index 90ff95d..503aba4 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
@@ -42,7 +43,7 @@ public class SimpleJobTest {
@Test
public void testEtl() {
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_etl_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -60,15 +61,8 @@ public class SimpleJobTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
-
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
Assert.assertEquals(1, CollectSink.values.size());
Assert.assertEquals("BASE", CollectSink.values.get(0).getExtractedFields().get("decoded_as").toString());
Assert.assertEquals("google.com", CollectSink.values.get(0).getExtractedFields().get("server_domain").toString());
@@ -86,7 +80,7 @@ public class SimpleJobTest {
public void testTransmission() {
CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_transmission_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_transmission_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -104,15 +98,8 @@ public class SimpleJobTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
-
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
Assert.assertEquals(4, CollectSink.values.size());
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
index 9bb2900..5f839ed 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -17,9 +17,9 @@ splits:
test_split:
type: split
rules:
- - name: table_processor
+ - tag: http_tag
expression: event.decoded_as == 'HTTP'
- - name: pre_etl_processor
+ - tag: dns_tag
expression: event.decoded_as == 'DNS'
postprocessing_pipelines:
@@ -83,8 +83,9 @@ application: # [object] Application Configuration
parallelism: 1 # [number] Operator-Level Parallelism.
downstream: [test_split,collect_sink]
- name: test_split
- parallelism: 1
+ tags: [http_tag,dns_tag]
downstream: [ table_processor,pre_etl_processor ]
+ parallelism: 1
- name: pre_etl_processor
parallelism: 1
downstream: [ collect_sink ]
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
index ead0ecd..6aa9e3d 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
@@ -10,7 +10,7 @@ import java.io.Serializable;
@Data
public class RuleContext implements Serializable {
- private String name;
+ private String tag;
private String expression;
private Expression compiledExpression;
private OutputTag<Event> outputTag ;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
index f07b568..07d4f9f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
@@ -43,7 +43,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
instance.setFunctionMissing(null);
Expression compiledExp = instance.compile(expression, true);
rule.setCompiledExpression(compiledExp);
- OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){};
+ OutputTag<Event> outputTag = new OutputTag<>(rule.getTag()){};
rule.setOutputTag(outputTag);
}
}