From 9da15fc59204a79eef815e0a4cddbf7027d44274 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Wed, 28 Aug 2024 18:12:18 +0800 Subject: [feature][bootstrap][common] GAL-651 Groot Stream支持配置task properties MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/grootstream_job_example.yaml | 7 ++++++- .../bootstrap/execution/JobRuntimeEnvironment.java | 12 ++++++++++-- .../bootstrap/execution/RuntimeEnvironment.java | 5 ++++- .../geedgenetworks/bootstrap/main/simple/SimpleJobTest.java | 3 +-- .../src/test/resources/grootstream_job_etl_test.yaml | 4 +++- .../src/main/java/com/geedgenetworks/common/Constants.java | 3 +++ .../main/java/com/geedgenetworks/core/udf/PathCombine.java | 6 +++++- 7 files changed, 32 insertions(+), 8 deletions(-) diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 46d1123..b77958d 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -80,4 +80,9 @@ application: - name: aggregate_processor downstream: [ print_sink ] - name: print_sink - downstream: [] \ No newline at end of file + downstream: [] + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket \ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 2e68098..1962ce8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -22,8 +22,6 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TernaryBoolean; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URL; import java.util.*; import java.util.stream.Collectors; @@ -37,6 +35,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set splitSet = new HashSet<>(); + private final Map taskProperties= new HashMap<>(); private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -65,6 +64,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return envConfig; } + @Override + public RuntimeEnvironment initTaskProperties(Config taskProperties) { + this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); + return this; + } + @Override public CheckResult checkConfig() { return EnvironmentUtil.checkRestartStrategy(envConfig); @@ -137,6 +144,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties)); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index bee1c0a..3b06c0c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -13,12 +13,15 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); Config getEnvConfig(); - + RuntimeEnvironment initTaskProperties(Config taskProperties); CheckResult checkConfig(); RuntimeEnvironment prepare(); void registerPlugin(List pluginPaths); default void initialize(Config config) { + if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { + this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); + } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 556c8c4..90ff95d 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,11 +75,10 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); + Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); - List asn_list = (List) CollectSink.values.get(0).getExtractedFields().get("asn_list"); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 888c94e..7716d33 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -111,7 +111,7 @@ processing_pipelines: lookup_fields: [ packet_capture_file ] output_fields: [ packet_capture_file ] parameters: - path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + path: [ props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] - function: STRING_JOINER lookup_fields: [ server_ip,client_ip ] output_fields: [ ip_string ] @@ -205,4 +205,6 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 +# properties: +# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index e973c20..ba572ef 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -11,6 +11,8 @@ public final class Constants { public static final String SINKS = "sinks"; public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; + + public static final String PROPERTIES = "properties"; public static final String SPLITS = "splits"; public static final String APPLICATION_ENV ="env"; public static final String APPLICATION_TOPOLOGY = "topology"; @@ -23,6 +25,7 @@ public final class Constants { " \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" + " \n"; + public static final String SYSPROP_GROOTSTREAM_TASK_PROPERTIES = "grootstream.task.properties"; public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 874735d..5a5393d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -26,8 +26,12 @@ public class PathCombine implements ScalarFunction { Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); + Map taskProperties = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES), Map.class); CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); - Map propertiesConfig =engineConfig.getPropertiesConfig(); + Map propertiesConfig = new HashMap<>(engineConfig.getPropertiesConfig()); + if(taskProperties!=null){ + propertiesConfig.putAll(taskProperties); + } if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { String paths = udfContext.getParameters().getOrDefault("path","").toString(); -- cgit v1.2.3