diff options
| author | wangkuan <[email protected]> | 2024-08-28 18:12:18 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-28 18:12:18 +0800 |
| commit | 9da15fc59204a79eef815e0a4cddbf7027d44274 (patch) | |
| tree | 705c1c58e684d43c200da33192aeb682288ca07c /groot-bootstrap | |
| parent | 0bfc0a2fb13409f816cd168f6cfbe353396bce1e (diff) | |
[feature][bootstrap][common] GAL-651 Groot Stream支持配置task properties
Diffstat (limited to 'groot-bootstrap')
4 files changed, 18 insertions, 6 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 2e68098..1962ce8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -22,8 +22,6 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TernaryBoolean; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URL; import java.util.*; import java.util.stream.Collectors; @@ -37,6 +35,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set<String> splitSet = new HashSet<>(); + private final Map<String, String> taskProperties= new HashMap<>(); private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -66,6 +65,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } @Override + public RuntimeEnvironment initTaskProperties(Config taskProperties) { + this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); + return this; + } + + @Override public CheckResult checkConfig() { return EnvironmentUtil.checkRestartStrategy(envConfig); } @@ -137,6 +144,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties)); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index bee1c0a..3b06c0c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -13,12 +13,15 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); Config getEnvConfig(); - + RuntimeEnvironment initTaskProperties(Config taskProperties); CheckResult checkConfig(); RuntimeEnvironment prepare(); void registerPlugin(List<URL> pluginPaths); default void initialize(Config config) { + if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { + this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); + } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 556c8c4..90ff95d 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,11 +75,10 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); + Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); - List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list"); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 888c94e..7716d33 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -111,7 +111,7 @@ processing_pipelines: lookup_fields: [ packet_capture_file ] output_fields: [ packet_capture_file ] parameters: - path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + path: [ props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] - function: STRING_JOINER lookup_fields: [ server_ip,client_ip ] output_fields: [ ip_string ] @@ -205,4 +205,6 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 +# properties: +# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket |
