diff options
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java | 14 | ||||
| -rw-r--r-- | groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java | 33 | ||||
| -rw-r--r-- | groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java | 72 | ||||
| -rw-r--r-- | groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml | 4 | ||||
| -rw-r--r-- | groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml | 2 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java) | 2 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java | 4 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java | 10 |
8 files changed, 87 insertions, 54 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index c0ac3a5..3513a67 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -6,7 +6,7 @@ import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.RouteConfigOptions; +import com.geedgenetworks.common.config.SplitConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.common.udf.RuleContext; @@ -14,16 +14,12 @@ import com.geedgenetworks.core.pojo.SplitConfig; import com.geedgenetworks.core.split.Split; import com.google.common.collect.Maps; import com.typesafe.config.Config; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** @@ -39,12 +35,12 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { @Override protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, SplitConfig> routeConfigMap = Maps.newHashMap(); + Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { Config routes = operatorConfig.getConfig(Constants.SPLITS); routes.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), - RouteConfigOptions.TYPE.key()); + SplitConfigOptions.TYPE.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( "split: %s, Message: %s", @@ -52,11 +48,11 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { } SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); splitConfig.setName(key); - routeConfigMap.put(key, splitConfig); + splitConfigMap.put(key, splitConfig); }); } - return routeConfigMap; + return splitConfigMap; } @Override diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java index dfe0600..2f6984b 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -70,38 +70,5 @@ public class JobSplitTest { } Assert.assertEquals(7, CollectSink.values.size()); } - @Test - public void testSplitForAgg() { - - CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; - ExecuteCommandArgs executeCommandArgs = CommandLineUtils - .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); - - executeCommandArgs.buildCommand(); - - - GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); - Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); - // check config file exist - Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); - ConfigObject configObject = ConfigValueFactory.fromMap(configMap); - Config config = configObject.toConfig(); - config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), - ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - - - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } - Assert.assertEquals(1, CollectSink.values.size()); - Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); - - } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java new file mode 100644 index 0000000..2edc5e7 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobSplitWithAggTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplitForAgg() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(1, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + + } +} diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 732d0f6..872800f 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,7 +3,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json @@ -58,7 +58,7 @@ postprocessing_pipelines: type: aggregate group_by_fields: [decoded_as] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time - window_size: 3 + window_size: 5 window_timestamp_field: test_time functions: - function: NUMBER_SUM diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml index f13d69e..9bb2900 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -60,7 +60,7 @@ postprocessing_pipelines: type: aggregate group_by_fields: [decoded_as] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time - window_size: 3 + window_size: 5 window_timestamp_field: test_time functions: - function: NUMBER_SUM diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java index 4d4ef12..a2acb71 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson2.TypeReference; import com.geedgenetworks.common.udf.RuleContext; import java.util.List; -public interface RouteConfigOptions { +public interface SplitConfigOptions { Option<String> TYPE = Options.key("type") .stringType() .noDefaultValue() diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 803fefc..4f9535d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { private final List<UDFContext> udfContexts; private final List<String> udfClassNameLists; - private final List<String> groupByFields; - private LinkedList<UdfEntity> functions; + private final LinkedList<UdfEntity> functions; public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); } - groupByFields = aggregateConfig.getGroup_by_fields(); functions = Lists.newLinkedList(); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java index 7a129ef..f07b568 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -21,8 +21,8 @@ import java.util.Map; @Slf4j public class SplitFunction extends ProcessFunction<Event, Event> { - private SplitConfig splitConfig; - private List<RuleContext> routes; + private final SplitConfig splitConfig; + private List<RuleContext> rules; private transient InternalMetrics internalMetrics; public SplitFunction(SplitConfig splitConfig) { @@ -34,8 +34,8 @@ public class SplitFunction extends ProcessFunction<Event, Event> { public void open(Configuration parameters) throws Exception { this.internalMetrics = new InternalMetrics(getRuntimeContext()); - this.routes = splitConfig.getRules(); - for(RuleContext rule :routes){ + this.rules = splitConfig.getRules(); + for(RuleContext rule : rules){ String expression = rule.getExpression(); AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); instance.setCachedExpressionByDefault(true); @@ -53,7 +53,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> { public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception { try { internalMetrics.incrementInEvents(); - for (RuleContext route :routes){ + for (RuleContext route : rules){ boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; if (result) { ctx.output(route.getOutputTag(), event); |
