summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java14
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java33
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java72
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java)2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java10
8 files changed, 87 insertions, 54 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
index c0ac3a5..3513a67 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -6,7 +6,7 @@ import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.RouteConfigOptions;
+import com.geedgenetworks.common.config.SplitConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.common.udf.RuleContext;
@@ -14,16 +14,12 @@ import com.geedgenetworks.core.pojo.SplitConfig;
import com.geedgenetworks.core.split.Split;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
@@ -39,12 +35,12 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
@Override
protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, SplitConfig> routeConfigMap = Maps.newHashMap();
+ Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SPLITS)) {
Config routes = operatorConfig.getConfig(Constants.SPLITS);
routes.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key),
- RouteConfigOptions.TYPE.key());
+ SplitConfigOptions.TYPE.key());
if (!result.isSuccess()) {
throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
"split: %s, Message: %s",
@@ -52,11 +48,11 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
}
SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
splitConfig.setName(key);
- routeConfigMap.put(key, splitConfig);
+ splitConfigMap.put(key, splitConfig);
});
}
- return routeConfigMap;
+ return splitConfigMap;
}
@Override
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
index dfe0600..2f6984b 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
@@ -70,38 +70,5 @@ public class JobSplitTest {
}
Assert.assertEquals(7, CollectSink.values.size());
}
- @Test
- public void testSplitForAgg() {
-
- CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
- ExecuteCommandArgs executeCommandArgs = CommandLineUtils
- .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
-
- executeCommandArgs.buildCommand();
-
-
- GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
- Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
- // check config file exist
- Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
- ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
- Config config = configObject.toConfig();
- config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
- ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
-
-
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
- Assert.assertEquals(1, CollectSink.values.size());
- Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
-
- }
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
new file mode 100644
index 0000000..2edc5e7
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+
+public class JobSplitWithAggTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplitForAgg() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(1, CollectSink.values.size());
+ Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
+
+ }
+}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
index 732d0f6..872800f 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -3,7 +3,7 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
@@ -58,7 +58,7 @@ postprocessing_pipelines:
type: aggregate
group_by_fields: [decoded_as]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
- window_size: 3
+ window_size: 5
window_timestamp_field: test_time
functions:
- function: NUMBER_SUM
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
index f13d69e..9bb2900 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -60,7 +60,7 @@ postprocessing_pipelines:
type: aggregate
group_by_fields: [decoded_as]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
- window_size: 3
+ window_size: 5
window_timestamp_field: test_time
functions:
- function: NUMBER_SUM
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
index 4d4ef12..a2acb71 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
@@ -4,7 +4,7 @@ import com.alibaba.fastjson2.TypeReference;
import com.geedgenetworks.common.udf.RuleContext;
import java.util.List;
-public interface RouteConfigOptions {
+public interface SplitConfigOptions {
Option<String> TYPE = Options.key("type")
.stringType()
.noDefaultValue()
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 803fefc..4f9535d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> {
private final List<UDFContext> udfContexts;
private final List<String> udfClassNameLists;
- private final List<String> groupByFields;
- private LinkedList<UdfEntity> functions;
+ private final LinkedList<UdfEntity> functions;
public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
@@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
}
- groupByFields = aggregateConfig.getGroup_by_fields();
functions = Lists.newLinkedList();
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
index 7a129ef..f07b568 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
@@ -21,8 +21,8 @@ import java.util.Map;
@Slf4j
public class SplitFunction extends ProcessFunction<Event, Event> {
- private SplitConfig splitConfig;
- private List<RuleContext> routes;
+ private final SplitConfig splitConfig;
+ private List<RuleContext> rules;
private transient InternalMetrics internalMetrics;
public SplitFunction(SplitConfig splitConfig) {
@@ -34,8 +34,8 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
public void open(Configuration parameters) throws Exception {
this.internalMetrics = new InternalMetrics(getRuntimeContext());
- this.routes = splitConfig.getRules();
- for(RuleContext rule :routes){
+ this.rules = splitConfig.getRules();
+ for(RuleContext rule : rules){
String expression = rule.getExpression();
AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
instance.setCachedExpressionByDefault(true);
@@ -53,7 +53,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
try {
internalMetrics.incrementInEvents();
- for (RuleContext route :routes){
+ for (RuleContext route : rules){
boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true;
if (result) {
ctx.output(route.getOutputTag(), event);