summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-29 10:13:48 +0800
committerwangkuan <[email protected]>2024-08-29 10:13:48 +0800
commit8d90c04d22a5df3ac5a6d4d12fc1b9fee03f38e8 (patch)
treef3a1bfe0868ad8f35c84df2193a6476f60b0a9a6
parent9da15fc59204a79eef815e0a4cddbf7027d44274 (diff)
[improve][bootstrap] GAL-651 Groot Stream支持在作业(Job)级别定义变量,修改命名,并统一存储至Flink grootstream.configfeature/task-properties
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java7
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java6
5 files changed, 7 insertions, 16 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 1962ce8..07fde4d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -35,8 +35,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private String jobName = Constants.DEFAULT_JOB_NAME;
private Set<String> splitSet = new HashSet<>();
- private final Map<String, String> taskProperties= new HashMap<>();
-
private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
this.initialize(config);
@@ -65,8 +63,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
@Override
- public RuntimeEnvironment initTaskProperties(Config taskProperties) {
- this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet()
+ public RuntimeEnvironment loadJobProperties(Config jobProperties) {
+ this.grootStreamConfig.getCommonConfig().getPropertiesConfig().putAll(jobProperties.root().unwrapped().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue()))));
return this;
@@ -144,7 +142,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig()));
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig()));
- configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties));
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
setCheckpoint();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
index 3b06c0c..710e7f6 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
@@ -13,14 +13,14 @@ import java.util.List;
public interface RuntimeEnvironment {
RuntimeEnvironment setEnvConfig(Config envConfig);
Config getEnvConfig();
- RuntimeEnvironment initTaskProperties(Config taskProperties);
+ RuntimeEnvironment loadJobProperties(Config jobProperties);
CheckResult checkConfig();
RuntimeEnvironment prepare();
void registerPlugin(List<URL> pluginPaths);
default void initialize(Config config) {
if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) {
- this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES)));
+ this.loadJobProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES)));
}
this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare();
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index 7716d33..76dcf7f 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
@@ -205,6 +205,6 @@ application: # [object] Application Configuration
downstream: [collect_sink]
- name: collect_sink
parallelism: 1
-# properties:
-# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
+ properties:
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index ba572ef..b523591 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -25,8 +25,6 @@ public final class Constants {
" \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" +
" \n";
- public static final String SYSPROP_GROOTSTREAM_TASK_PROPERTIES = "grootstream.task.properties";
-
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 5a5393d..874735d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -26,12 +26,8 @@ public class PathCombine implements ScalarFunction {
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
- Map<String, String> taskProperties = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES), Map.class);
CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
- Map<String, String> propertiesConfig = new HashMap<>(engineConfig.getPropertiesConfig());
- if(taskProperties!=null){
- propertiesConfig.putAll(taskProperties);
- }
+ Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig();
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
String paths = udfContext.getParameters().getOrDefault("path","").toString();