diff options
| author | 王宽 <[email protected]> | 2024-08-28 10:18:24 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-28 10:18:24 +0000 |
| commit | e6bc7ce383d8ac24414e048c4ef020a5b1084564 (patch) | |
| tree | 705c1c58e684d43c200da33192aeb682288ca07c | |
| parent | 0bfc0a2fb13409f816cd168f6cfbe353396bce1e (diff) | |
| parent | 9da15fc59204a79eef815e0a4cddbf7027d44274 (diff) | |
Merge branch 'feature/task-properties' into 'develop'
[feature][bootstrap][common] GAL-651 Groot Stream支持配置task properties
See merge request galaxy/platform/groot-stream!98
7 files changed, 32 insertions, 8 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 46d1123..b77958d 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -80,4 +80,9 @@ application: - name: aggregate_processor downstream: [ print_sink ] - name: print_sink - downstream: []
\ No newline at end of file + downstream: [] + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
\ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 2e68098..1962ce8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -22,8 +22,6 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TernaryBoolean; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URL; import java.util.*; import java.util.stream.Collectors; @@ -37,6 +35,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set<String> splitSet = new HashSet<>(); + private final Map<String, String> taskProperties= new HashMap<>(); private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -66,6 +65,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } @Override + public RuntimeEnvironment initTaskProperties(Config taskProperties) { + this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); + return this; + } + + @Override public CheckResult checkConfig() { return EnvironmentUtil.checkRestartStrategy(envConfig); } @@ -137,6 +144,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties)); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index bee1c0a..3b06c0c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -13,12 +13,15 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); Config getEnvConfig(); - + RuntimeEnvironment initTaskProperties(Config taskProperties); CheckResult checkConfig(); RuntimeEnvironment prepare(); void registerPlugin(List<URL> pluginPaths); default void initialize(Config config) { + if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { + this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); + } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 556c8c4..90ff95d 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,11 +75,10 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); + Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); - List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list"); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 888c94e..7716d33 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -111,7 +111,7 @@ processing_pipelines: lookup_fields: [ packet_capture_file ] output_fields: [ packet_capture_file ] parameters: - path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + path: [ props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] - function: STRING_JOINER lookup_fields: [ server_ip,client_ip ] output_fields: [ ip_string ] @@ -205,4 +205,6 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 +# properties: +# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index e973c20..ba572ef 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -11,6 +11,8 @@ public final class Constants { public static final String SINKS = "sinks"; public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; + + public static final String PROPERTIES = "properties"; public static final String SPLITS = "splits"; public static final String APPLICATION_ENV ="env"; public static final String APPLICATION_TOPOLOGY = "topology"; @@ -23,6 +25,7 @@ public final class Constants { " \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" + " \n"; + public static final String SYSPROP_GROOTSTREAM_TASK_PROPERTIES = "grootstream.task.properties"; public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 874735d..5a5393d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -26,8 +26,12 @@ public class PathCombine implements ScalarFunction { Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); + Map<String, String> taskProperties = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES), Map.class); CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); - Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig(); + Map<String, String> propertiesConfig = new HashMap<>(engineConfig.getPropertiesConfig()); + if(taskProperties!=null){ + propertiesConfig.putAll(taskProperties); + } if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { String paths = udfContext.getParameters().getOrDefault("path","").toString(); |
