summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-09-01 23:49:48 +0800
committerdoufenghu <[email protected]>2024-09-01 23:49:48 +0800
commita55399cb95c6408233e84540db482ae5e6131746 (patch)
treede9b913e817190830ea92122abf779b91b238114
parent2947507b84076e5c6a2accc7b142bd27594ba3d8 (diff)
[Improve][bootstrap] Improve job-level user-defined variables, move the path from application/properties to application/env/properties, and add support for defining variables via the runtime CLI. (GAL-651)
-rw-r--r--config/grootstream_job_example.yaml10
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java11
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java28
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java30
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java11
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java5
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml6
-rw-r--r--groot-tests/test-common/src/test/resources/grootstream.yaml4
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java14
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml20
12 files changed, 108 insertions, 61 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index f5c6610..37ef114 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -71,6 +71,11 @@ application:
execution:
restart:
strategy: none
+ properties:
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
topology:
- name: inline_source
downstream: [decoded_as_split]
@@ -82,8 +87,3 @@ application:
downstream: [ print_sink ]
- name: print_sink
downstream: []
- properties:
- hos.bucket.name.rtp_file: traffic_rtp_file_bucket
- hos.bucket.name.http_file: traffic_http_file_bucket
- hos.bucket.name.eml_file: traffic_eml_file_bucket
- hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket \ No newline at end of file
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
index f83183c..6ee5151 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
@@ -31,10 +31,10 @@ public abstract class CommandArgs {
description = "Whether check config")
protected boolean checkConfig = false;
-
+ // user-defined parameters
@Parameter(names = {"-i", "--variable"},
splitter = ParameterSplitter.class,
- description = "user-defined parameters , such as -i data_center=bj "
+ description = "Job level user-defined parameters, such as -i traffic_file_bucket=traffic_file_bucket, or -i scheduler.knowledge_base.update.interval.minutes=1. "
+ "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.")
protected List<String> variables = Collections.emptyList();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
index 79a27e1..c3538b0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
@@ -26,27 +26,43 @@ public class ExecuteCommand implements Command<ExecuteCommandArgs> {
@Override
public void execute() throws CommandExecuteException, ConfigCheckException {
+ // Groot Stream Global Config for all processing jobs
GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
// check config file exist
checkConfigExist(configFile);
- Config config = ConfigBuilder.of(configFile);
+ Config jobConfig = ConfigBuilder.of(configFile);
+
// if user specified job name using command line arguments, override config option
if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) {
- config = config.withValue(
+ jobConfig = jobConfig.withValue(
ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName()));
}
+ // if user specified target type using command line arguments, override config option
if(executeCommandArgs.getTargetType() != null) {
- config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
}
- JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ // if user specified variables using command line arguments, override config option
+ if(!executeCommandArgs.getVariables().isEmpty()) {
+ for (String variable : executeCommandArgs.getVariables()) {
+ String[] keyValue = variable.split("=");
+ if (keyValue.length != 2) {
+ throw new CommandExecuteException("Invalid variable format: " + variable);
+ }
+ jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, Constants.PROPERTIES, keyValue[0]),
+ ConfigValueFactory.fromAnyRef(keyValue[1]));
+ }
+ }
+
+
+ JobExecution jobExecution = new JobExecution(jobConfig, grootStreamConfig);
try {
jobExecution.execute();
} catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
+ throw new JobExecuteException("Job executed failed", e);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
index 0c00a61..61ced82 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
@@ -20,14 +20,14 @@ public class ExecuteCommandArgs extends CommandArgs {
@Parameter(names={"-e", "--deploy-mode"},
converter = DeployModeConverter.class,
- description = "deploy mode, only support [run] ")
+ description = "Job deploy mode, only support [run] ")
private DeployMode deployMode = DeployMode.RUN;
@Parameter(
names = {"--target"},
converter = TargetTypeConverter.class,
description =
- "job submitted target type, support [local, remote, yarn-session, yarn-per-job]")
+ "Job submitted target type, support [local, remote, yarn-session, yarn-per-job]")
private TargetType targetType;
@Override
@@ -55,7 +55,7 @@ public class ExecuteCommandArgs extends CommandArgs {
@Override
public String toString() {
- return "FlinkCommandArgs{"
+ return "CommandArgs{"
+ "deployMode="
+ deployMode
+ ", targetType="
@@ -74,7 +74,6 @@ public class ExecuteCommandArgs extends CommandArgs {
}
-
private void userParamsToSysEnv() {
if (!this.variables.isEmpty()) {
variables.stream()
@@ -111,7 +110,7 @@ public class ExecuteCommandArgs extends CommandArgs {
return targetType;
} else {
throw new IllegalArgumentException(
- "Groot-Stream job on flink engine submitted target type only "
+ "GrootStream job submitted target type only "
+ "support these options: [local, remote, yarn-session, yarn-per-job]");
}
}
@@ -132,7 +131,7 @@ public class ExecuteCommandArgs extends CommandArgs {
return deployMode;
} else {
throw new IllegalArgumentException(
- "Groot-Stream job on flink engine deploy mode only "
+ "GrootStream job deploy mode only "
+ "support these options: [run, run-application]");
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 22b23a7..6a23a0b 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -39,26 +39,26 @@ public class JobExecution {
private final List<Node> nodes;
private final List<URL> jarPaths;
- public JobExecution(Config config, GrootStreamConfig grootStreamConfig) {
+ public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
.resolve(GrootStreamRunner.APP_JAR_NAME).toString())
.toURI().toURL()));
} catch (MalformedURLException e) {
- throw new JobExecuteException("load groot stream bootstrap jar error.", e);
+ throw new JobExecuteException("Load GrootStream Bootstrap jar failed.", e);
}
- registerPlugin(config.getConfig(Constants.APPLICATION));
+ registerPlugin(jobConfig.getConfig(Constants.APPLICATION));
- this.sourceExecutor = new SourceExecutor(jarPaths, config);
- this.sinkExecutor = new SinkExecutor(jarPaths, config);
- this.filterExecutor = new FilterExecutor(jarPaths, config);
- this.splitExecutor = new SplitExecutor(jarPaths, config);
- this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecutor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
+ this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig);
+ this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig);
+ this.filterExecutor = new FilterExecutor(jarPaths, jobConfig);
+ this.splitExecutor = new SplitExecutor(jarPaths, jobConfig);
+ this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig);
+ this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig);
+ this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig);
this.jobRuntimeEnvironment =
- JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
+ JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig);
this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
@@ -67,7 +67,7 @@ public class JobExecution {
this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.nodes = buildJobNode(config);
+ this.nodes = buildJobNode(jobConfig);
}
@@ -126,7 +126,7 @@ public class JobExecution {
return new URL(uri);
} catch (MalformedURLException e) {
throw new RuntimeException(
- "the uri of jar illegal:" + uri, e);
+ "The uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toSet());
@@ -251,7 +251,7 @@ public class JobExecution {
private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
- throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
+ throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 07fde4d..4c6c2d2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -35,16 +35,16 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private String jobName = Constants.DEFAULT_JOB_NAME;
private Set<String> splitSet = new HashSet<>();
- private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
+ private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
- this.initialize(config);
+ this.initialize(jobConfig);
}
- public static JobRuntimeEnvironment getInstance(Config config, GrootStreamConfig grootStreamConfig) {
+ public static JobRuntimeEnvironment getInstance(Config jobConfig, GrootStreamConfig grootStreamConfig) {
if (INSTANCE == null) {
synchronized (JobRuntimeEnvironment.class) {
if (INSTANCE == null) {
- INSTANCE = new JobRuntimeEnvironment(config, grootStreamConfig);
+ INSTANCE = new JobRuntimeEnvironment(jobConfig, grootStreamConfig);
}
}
}
@@ -63,14 +63,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
@Override
- public RuntimeEnvironment loadJobProperties(Config jobProperties) {
- this.grootStreamConfig.getCommonConfig().getPropertiesConfig().putAll(jobProperties.root().unwrapped().entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue()))));
- return this;
- }
-
- @Override
public CheckResult checkConfig() {
return EnvironmentUtil.checkRestartStrategy(envConfig);
}
@@ -78,10 +70,19 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
@Override
public RuntimeEnvironment prepare() {
- createStreamEnvironment();
+
if (envConfig.hasPath(Constants.JOB_NAME)) {
jobName = envConfig.getString(Constants.JOB_NAME);
}
+ // Job-level user-defined variables override the grootStreamConfig
+ if (envConfig.hasPath(Constants.PROPERTIES)) {
+ envConfig.getConfig(Constants.PROPERTIES).root().unwrapped().entrySet().forEach(entry -> {
+ this.grootStreamConfig.getCommonConfig().getPropertiesConfig().put(entry.getKey(), String.valueOf(entry.getValue()));
+ });
+ }
+
+ createStreamEnvironment();
+
return this;
}
@@ -89,8 +90,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
return jobName;
}
-
-
public boolean isLocalMode() {
return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
&& envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget());
@@ -131,7 +130,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
private void createStreamEnvironment() {
-
Configuration configuration = new Configuration();
EnvironmentUtil.initConfiguration(envConfig, configuration);
if (isLocalMode()) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
index 710e7f6..b177e40 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
@@ -1,7 +1,4 @@
package com.geedgenetworks.bootstrap.execution;
-
-
-import com.geedgenetworks.bootstrap.enums.TargetType;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckResult;
import com.typesafe.config.Config;
@@ -12,16 +9,12 @@ import java.util.List;
public interface RuntimeEnvironment {
RuntimeEnvironment setEnvConfig(Config envConfig);
+ //Prepare runtime environment for job execution
+ RuntimeEnvironment prepare();
Config getEnvConfig();
- RuntimeEnvironment loadJobProperties(Config jobProperties);
CheckResult checkConfig();
- RuntimeEnvironment prepare();
-
void registerPlugin(List<URL> pluginPaths);
default void initialize(Config config) {
- if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) {
- this.loadJobProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES)));
- }
this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare();
}
}
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index f435f59..9b58289 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -9,11 +9,12 @@ import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
+import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_kafka.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
@@ -21,6 +22,8 @@ public class GrootStreamExample {
executeCommandArgs.setEncrypt(false);
executeCommandArgs.setDecrypt(false);
executeCommandArgs.setVersion(false);
+ executeCommandArgs.setVariables(List.of("hos.bucket.name.traffic_file=user_define_traffic_file_bucket",
+ "scheduler.knowledge_base.update.interval.minutes=1"));
executeCommandArgs.setDeployMode(DeployMode.RUN);
executeCommandArgs.setTargetType(TargetType.LOCAL);
GrootStreamServer.run(executeCommandArgs.buildCommand());
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
index 00f2a7d..408fbad 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
@@ -33,6 +33,12 @@ application:
parallelism: 3
pipeline:
object-reuse: true
+ properties:
+ hos.bucket.name.traffic_file: local_traffic_file_bucket
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
topology:
- name: inline_source
downstream: [filter_operator]
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml
index 5520945..2eb105b 100644
--- a/groot-tests/test-common/src/test/resources/grootstream.yaml
+++ b/groot-tests/test-common/src/test/resources/grootstream.yaml
@@ -15,3 +15,7 @@ grootstream:
hos.bucket.name.traffic_file: traffic_file_bucket
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
index d4cdcbd..dde7b28 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
@@ -8,6 +8,7 @@ import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
import com.geedgenetworks.test.common.container.TestContainerId;
import com.geedgenetworks.test.common.junit.DisabledOnContainer;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import java.io.IOException;
@@ -31,7 +32,7 @@ public class InlineToPrintIT extends TestSuiteBase {
CompletableFuture.supplyAsync(
() -> {
try {
- return container.executeJob("/kafka_to_print.yaml");
+ return container.executeJob("/inline_to_print.yaml");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
@@ -85,6 +86,17 @@ public class InlineToPrintIT extends TestSuiteBase {
Assertions.assertNotNull(jobNumRestartsReference.get());
});
+
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ String logs = container.getServerLogs();
+ Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_rtp_file_bucket/test_pcap_file") > 10);
+ Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_http_file_bucket/test_http_req_file") > 10);
+ });
+
+
+
}
}
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
index daf6e32..b4773a1 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -2,7 +2,7 @@ sources:
inline_source:
type: inline
properties:
- data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
format: json
json.ignore.parse.errors: false
@@ -14,11 +14,21 @@ filters:
processing_pipelines:
projection_processor:
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
- function: DROP
filter: event.server_ip == '4.4.4.4'
+ - function: PATH_COMBINE
+ lookup_fields: [ rtp_pcap_path ]
+ output_fields: [ rtp_pcap_path ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ]
+ - function: PATH_COMBINE
+ lookup_fields: [ http_request_body ]
+ output_fields: [ http_request_body ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ]
sinks:
print_sink:
@@ -33,6 +43,12 @@ application:
parallelism: 3
pipeline:
object-reuse: true
+ properties:
+ hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket
+ hos.bucket.name.http_file: job_level_traffic_http_file_bucket
+ hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket
+
topology:
- name: inline_source
downstream: [filter_operator]