summaryrefslogtreecommitdiff
path: root/groot-bootstrap/src
diff options
context:
space:
mode:
Diffstat (limited to 'groot-bootstrap/src')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java14
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java33
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java72
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml2
5 files changed, 80 insertions, 45 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
index c0ac3a5..3513a67 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -6,7 +6,7 @@ import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.RouteConfigOptions;
+import com.geedgenetworks.common.config.SplitConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.common.udf.RuleContext;
@@ -14,16 +14,12 @@ import com.geedgenetworks.core.pojo.SplitConfig;
import com.geedgenetworks.core.split.Split;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
@@ -39,12 +35,12 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
@Override
protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, SplitConfig> routeConfigMap = Maps.newHashMap();
+ Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SPLITS)) {
Config routes = operatorConfig.getConfig(Constants.SPLITS);
routes.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key),
- RouteConfigOptions.TYPE.key());
+ SplitConfigOptions.TYPE.key());
if (!result.isSuccess()) {
throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
"split: %s, Message: %s",
@@ -52,11 +48,11 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
}
SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
splitConfig.setName(key);
- routeConfigMap.put(key, splitConfig);
+ splitConfigMap.put(key, splitConfig);
});
}
- return routeConfigMap;
+ return splitConfigMap;
}
@Override
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
index dfe0600..2f6984b 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
@@ -70,38 +70,5 @@ public class JobSplitTest {
}
Assert.assertEquals(7, CollectSink.values.size());
}
- @Test
- public void testSplitForAgg() {
-
- CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
- ExecuteCommandArgs executeCommandArgs = CommandLineUtils
- .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
-
- executeCommandArgs.buildCommand();
-
-
- GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
- Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
- // check config file exist
- Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
- ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
- Config config = configObject.toConfig();
- config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
- ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
-
-
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
- Assert.assertEquals(1, CollectSink.values.size());
- Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
-
- }
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
new file mode 100644
index 0000000..2edc5e7
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+
+public class JobSplitWithAggTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplitForAgg() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(1, CollectSink.values.size());
+ Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
+
+ }
+}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
index 732d0f6..872800f 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -3,7 +3,7 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
@@ -58,7 +58,7 @@ postprocessing_pipelines:
type: aggregate
group_by_fields: [decoded_as]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
- window_size: 3
+ window_size: 5
window_timestamp_field: test_time
functions:
- function: NUMBER_SUM
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
index f13d69e..9bb2900 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -60,7 +60,7 @@ postprocessing_pipelines:
type: aggregate
group_by_fields: [decoded_as]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
- window_size: 3
+ window_size: 5
window_timestamp_field: test_time
functions:
- function: NUMBER_SUM