summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-28 10:18:24 +0000
committer王宽 <[email protected]>2024-08-28 10:18:24 +0000
commite6bc7ce383d8ac24414e048c4ef020a5b1084564 (patch)
tree705c1c58e684d43c200da33192aeb682288ca07c
parent0bfc0a2fb13409f816cd168f6cfbe353396bce1e (diff)
parent9da15fc59204a79eef815e0a4cddbf7027d44274 (diff)
Merge branch 'feature/task-properties' into 'develop'
[feature][bootstrap][common] GAL-651 Groot Stream支持配置task properties See merge request galaxy/platform/groot-stream!98
-rw-r--r--config/grootstream_job_example.yaml7
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java12
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java5
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java3
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java6
7 files changed, 32 insertions, 8 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 46d1123..b77958d 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -80,4 +80,9 @@ application:
- name: aggregate_processor
downstream: [ print_sink ]
- name: print_sink
- downstream: [] \ No newline at end of file
+ downstream: []
+ properties:
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket \ No newline at end of file
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 2e68098..1962ce8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -22,8 +22,6 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TernaryBoolean;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
@@ -37,6 +35,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private String jobName = Constants.DEFAULT_JOB_NAME;
private Set<String> splitSet = new HashSet<>();
+ private final Map<String, String> taskProperties= new HashMap<>();
private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
@@ -66,6 +65,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
@Override
+ public RuntimeEnvironment initTaskProperties(Config taskProperties) {
+ this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue()))));
+ return this;
+ }
+
+ @Override
public CheckResult checkConfig() {
return EnvironmentUtil.checkRestartStrategy(envConfig);
}
@@ -137,6 +144,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig()));
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig()));
+ configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties));
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
setCheckpoint();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
index bee1c0a..3b06c0c 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
@@ -13,12 +13,15 @@ import java.util.List;
public interface RuntimeEnvironment {
RuntimeEnvironment setEnvConfig(Config envConfig);
Config getEnvConfig();
-
+ RuntimeEnvironment initTaskProperties(Config taskProperties);
CheckResult checkConfig();
RuntimeEnvironment prepare();
void registerPlugin(List<URL> pluginPaths);
default void initialize(Config config) {
+ if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) {
+ this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES)));
+ }
this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare();
}
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index 556c8c4..90ff95d 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -75,11 +75,10 @@ public class SimpleJobTest {
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString()));
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString()));
assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString()));
- Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
+ Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString());
- List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list");
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index 888c94e..7716d33 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
@@ -111,7 +111,7 @@ processing_pipelines:
lookup_fields: [ packet_capture_file ]
output_fields: [ packet_capture_file ]
parameters:
- path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
+ path: [ props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file]
- function: STRING_JOINER
lookup_fields: [ server_ip,client_ip ]
output_fields: [ ip_string ]
@@ -205,4 +205,6 @@ application: # [object] Application Configuration
downstream: [collect_sink]
- name: collect_sink
parallelism: 1
+# properties:
+# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index e973c20..ba572ef 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -11,6 +11,8 @@ public final class Constants {
public static final String SINKS = "sinks";
public static final int SYSTEM_EXIT_CODE = 2618;
public static final String APPLICATION = "application";
+
+ public static final String PROPERTIES = "properties";
public static final String SPLITS = "splits";
public static final String APPLICATION_ENV ="env";
public static final String APPLICATION_TOPOLOGY = "topology";
@@ -23,6 +25,7 @@ public final class Constants {
" \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" +
" \n";
+ public static final String SYSPROP_GROOTSTREAM_TASK_PROPERTIES = "grootstream.task.properties";
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 874735d..5a5393d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -26,8 +26,12 @@ public class PathCombine implements ScalarFunction {
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
+ Map<String, String> taskProperties = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES), Map.class);
CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
- Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig();
+ Map<String, String> propertiesConfig = new HashMap<>(engineConfig.getPropertiesConfig());
+ if(taskProperties!=null){
+ propertiesConfig.putAll(taskProperties);
+ }
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
String paths = udfContext.getParameters().getOrDefault("path","").toString();