diff options
Diffstat (limited to 'groot-bootstrap')
5 files changed, 80 insertions, 45 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index c0ac3a5..3513a67 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -6,7 +6,7 @@ import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.RouteConfigOptions; +import com.geedgenetworks.common.config.SplitConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.common.udf.RuleContext; @@ -14,16 +14,12 @@ import com.geedgenetworks.core.pojo.SplitConfig; import com.geedgenetworks.core.split.Split; import com.google.common.collect.Maps; import com.typesafe.config.Config; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** @@ -39,12 +35,12 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { @Override protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, SplitConfig> routeConfigMap = Maps.newHashMap(); + Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { Config routes = operatorConfig.getConfig(Constants.SPLITS); routes.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), - RouteConfigOptions.TYPE.key()); + SplitConfigOptions.TYPE.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( "split: %s, Message: %s", @@ -52,11 +48,11 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { } SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); splitConfig.setName(key); - routeConfigMap.put(key, splitConfig); + splitConfigMap.put(key, splitConfig); }); } - return routeConfigMap; + return splitConfigMap; } @Override diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java index dfe0600..2f6984b 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -70,38 +70,5 @@ public class JobSplitTest { } Assert.assertEquals(7, CollectSink.values.size()); } - @Test - public void testSplitForAgg() { - - CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; - ExecuteCommandArgs executeCommandArgs = CommandLineUtils - .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); - - executeCommandArgs.buildCommand(); - - - GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); - Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); - // check config file exist - Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); - ConfigObject configObject = ConfigValueFactory.fromMap(configMap); - Config config = configObject.toConfig(); - config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), - ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - - - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } - Assert.assertEquals(1, CollectSink.values.size()); - Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); - - } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java new file mode 100644 index 0000000..2edc5e7 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobSplitWithAggTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplitForAgg() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(1, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + + } +} diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 732d0f6..872800f 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,7 +3,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json @@ -58,7 +58,7 @@ postprocessing_pipelines: type: aggregate group_by_fields: [decoded_as] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time - window_size: 3 + window_size: 5 window_timestamp_field: test_time functions: - function: NUMBER_SUM diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml index f13d69e..9bb2900 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -60,7 +60,7 @@ postprocessing_pipelines: type: aggregate group_by_fields: [decoded_as] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time - window_size: 3 + window_size: 5 window_timestamp_field: test_time functions: - function: NUMBER_SUM |
