diff options
| author | 王宽 <[email protected]> | 2024-08-29 02:18:42 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-29 02:18:42 +0000 |
| commit | 9fd0e45c696ea08f92dc2d1c261f7e4be37a5555 (patch) | |
| tree | f3a1bfe0868ad8f35c84df2193a6476f60b0a9a6 | |
| parent | e6bc7ce383d8ac24414e048c4ef020a5b1084564 (diff) | |
| parent | 8d90c04d22a5df3ac5a6d4d12fc1b9fee03f38e8 (diff) | |
Merge branch 'feature/task-properties' into 'develop'
[improve][bootstrap] GAL-651 Groot Stream支持在作业(Job)级别定义变量,修改命名,并统一存储至Flink grootstream.config
See merge request galaxy/platform/groot-stream!99
5 files changed, 7 insertions, 16 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 1962ce8..07fde4d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -35,8 +35,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set<String> splitSet = new HashSet<>(); - private final Map<String, String> taskProperties= new HashMap<>(); - private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; this.initialize(config); @@ -65,8 +63,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } @Override - public RuntimeEnvironment initTaskProperties(Config taskProperties) { - this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet() + public RuntimeEnvironment loadJobProperties(Config jobProperties) { + this.grootStreamConfig.getCommonConfig().getPropertiesConfig().putAll(jobProperties.root().unwrapped().entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); return this; @@ -144,7 +142,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); - configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties)); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index 3b06c0c..710e7f6 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -13,14 +13,14 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); Config getEnvConfig(); - RuntimeEnvironment initTaskProperties(Config taskProperties); + RuntimeEnvironment loadJobProperties(Config jobProperties); CheckResult checkConfig(); RuntimeEnvironment prepare(); void registerPlugin(List<URL> pluginPaths); default void initialize(Config config) { if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { - this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); + this.loadJobProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 7716d33..76dcf7f 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -205,6 +205,6 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 -# properties: -# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + properties: + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index ba572ef..b523591 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -25,8 +25,6 @@ public final class Constants { " \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" + " \n"; - public static final String SYSPROP_GROOTSTREAM_TASK_PROPERTIES = "grootstream.task.properties"; - public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 5a5393d..874735d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -26,12 +26,8 @@ public class PathCombine implements ScalarFunction { Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); - Map<String, String> taskProperties = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES), Map.class); CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); - Map<String, String> propertiesConfig = new HashMap<>(engineConfig.getPropertiesConfig()); - if(taskProperties!=null){ - propertiesConfig.putAll(taskProperties); - } + Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig(); if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { String paths = udfContext.getParameters().getOrDefault("path","").toString(); |
