diff options
88 files changed, 2172 insertions, 349 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 4726af0..37ef114 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -11,7 +11,14 @@ filters: type: aviator properties: expression: event.server_ip != '12.12.12.12' - +splits: + decoded_as_split: + type: split + rules: + - name: projection_processor + expression: event.decoded_as == 'HTTP' + - name: aggregate_processor + expression: event.decoded_as == 'DNS' processing_pipelines: projection_processor: type: projection @@ -25,8 +32,9 @@ processing_pipelines: group_by_fields: [server_ip,server_port] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_timestamp_field: recv_time - window_size: 60 + window_size: 6 window_slide: 10 #滑动窗口步长 + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sent_pkts ] @@ -63,12 +71,19 @@ application: execution: restart: strategy: none + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket topology: - name: inline_source - downstream: [filter_operator] - - name: filter_operator - downstream: [ projection_processor ] + downstream: [decoded_as_split] + - name: decoded_as_split + downstream: [ projection_processor ,aggregate_processor] - name: projection_processor downstream: [ print_sink ] + - name: aggregate_processor + downstream: [ print_sink ] - name: print_sink - downstream: []
\ No newline at end of file + downstream: [] diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index 7cf50c8..0ca2d68 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -151,7 +151,7 @@ preprocessing_pipelines: # [object] Define Processors for preprocessing pipeline # It will be accomplished the common processing for the event by the user-defined functions. # processing_pipelines: # [object] Define Processors for processing pipelines. - projection_processor: # [object] Define projection processor name, must be unique. + z: # [object] Define projection processor name, must be unique. type: projection # [string] Processor Type remove_fields: output_fields: diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md index 3146569..c2b05f6 100644 --- a/docs/connector/config-encryption-decryption.md +++ b/docs/connector/config-encryption-decryption.md @@ -6,14 +6,14 @@ In production environments, sensitive configuration items such as passwords are ## How to use -Groot Stream default support base64 and AES encryption and decryption. +Groot Stream support base64, AES and SM4 encryption and decryption. Base64 encryption support encrypt the following parameters: - username - password - auth -AES encryption support encrypt the following parameters: +AES/SM4 encryption support encrypt the following parameters: - username - password - auth diff --git a/docs/env-config.md b/docs/env-config.md index 7a31494..8e22a53 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -57,10 +57,10 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/). Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink. -| Groot Stream | Flink | +| Groot Stream | Flink | |----------------------------------------|---------------------------------------------------------------| -| execution.buffer-timeout | flink.execution.buffer-timeout | -| pipeline.object-reuse | flink.object-reuse | +| execution.buffer-timeout | flink.execution.buffer-timeout.interval | +| pipeline.object-reuse | flink.pipeline.object-reuse | | pipeline.max-parallelism | flink.pipeline.max-parallelism | | execution.restart.strategy | flink.restart-strategy | | execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts | @@ -70,3 +70,20 @@ Of course, you can use groot stream parameter, here are some parameter names cor | execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay | | ... | ... | +## Properties +Job-level user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. +The properties can be used in the configuration file by using `props.${property_name}`. It will override the corresponding settings in the `grootstream.yaml` file for the duration of the job. +```yaml +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket + hos.bucket.name.http_file: job_level_traffic_http_file_bucket + hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket +``` + diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md index fb902ae..9dd442f 100644 --- a/docs/grootstream-config.md +++ b/docs/grootstream-config.md @@ -20,7 +20,7 @@ grootstream: ``` -### Knowledge Base +## Knowledge Base The knowledge base is a collection of libraries that can be used in the groot-stream job's UDFs. File system type can be specified `local`, `http` or `hdfs`. If the value is `http`, must be ` QGW Knowledge Base Repository` URL. The library will be dynamically updated according to the `scheduler.knowledge_base.update.interval.minutes` configuration. @@ -77,3 +77,6 @@ grootstream: - asn_builtin.mmdb - asn_user_defined.mmdb ``` +## Properties +Global user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. +The properties can be used in the configuration file by using `props.${property_name}`.
\ No newline at end of file diff --git a/docs/user-guide.md b/docs/user-guide.md index e35616f..d52cfed 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -153,7 +153,7 @@ Used to define job environment configuration information. For more details, you # Command ## Run a job by CLI - +Note: When submitting a job via CLI, you can use `-D` parameter to specify flink configuration. For example, `-Dexecution.buffer-timeout.interval=1000` to set the buffer timeout to 1000ms. More details can be found in the official [flink documentation](https://flink.apache.org/). ```bash Usage: start.sh [options] Options: @@ -164,7 +164,7 @@ Options: -e, --deploy-mode <deploy mode> Deploy mode, only support [run] (default: run) --target <target> Submitted target type, support [local, remote, yarn-session, yarn-per-job] -n, --name <name> Job name (default: groot-stream-job) - -i, --variable <variable> User-defined parameters, eg. -i key=value (default: []) + -i, --variable <variable> User-defined variables, eg. -i key=value (default: []) -h, --help Show help message -v, --version Show version message diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java index f83183c..6ee5151 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java @@ -31,10 +31,10 @@ public abstract class CommandArgs { description = "Whether check config") protected boolean checkConfig = false; - + // user-defined parameters @Parameter(names = {"-i", "--variable"}, splitter = ParameterSplitter.class, - description = "user-defined parameters , such as -i data_center=bj " + description = "Job level user-defined parameters, such as -i traffic_file_bucket=traffic_file_bucket, or -i scheduler.knowledge_base.update.interval.minutes=1. " + "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.") protected List<String> variables = Collections.emptyList(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java index 79a27e1..c3538b0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java @@ -26,27 +26,43 @@ public class ExecuteCommand implements Command<ExecuteCommandArgs> { @Override public void execute() throws CommandExecuteException, ConfigCheckException { + // Groot Stream Global Config for all processing jobs GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); // check config file exist checkConfigExist(configFile); - Config config = ConfigBuilder.of(configFile); + Config jobConfig = ConfigBuilder.of(configFile); + // if user specified job name using command line arguments, override config option if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) { - config = config.withValue( + jobConfig = jobConfig.withValue( ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName())); } + // if user specified target type using command line arguments, override config option if(executeCommandArgs.getTargetType() != null) { - config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); } - JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + // if user specified variables using command line arguments, override config option + if(!executeCommandArgs.getVariables().isEmpty()) { + for (String variable : executeCommandArgs.getVariables()) { + String[] keyValue = variable.split("="); + if (keyValue.length != 2) { + throw new CommandExecuteException("Invalid variable format: " + variable); + } + jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, Constants.PROPERTIES, keyValue[0]), + ConfigValueFactory.fromAnyRef(keyValue[1])); + } + } + + + JobExecution jobExecution = new JobExecution(jobConfig, grootStreamConfig); try { jobExecution.execute(); } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); + throw new JobExecuteException("Job executed failed", e); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java index 0c00a61..61ced82 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java @@ -20,14 +20,14 @@ public class ExecuteCommandArgs extends CommandArgs { @Parameter(names={"-e", "--deploy-mode"}, converter = DeployModeConverter.class, - description = "deploy mode, only support [run] ") + description = "Job deploy mode, only support [run] ") private DeployMode deployMode = DeployMode.RUN; @Parameter( names = {"--target"}, converter = TargetTypeConverter.class, description = - "job submitted target type, support [local, remote, yarn-session, yarn-per-job]") + "Job submitted target type, support [local, remote, yarn-session, yarn-per-job]") private TargetType targetType; @Override @@ -55,7 +55,7 @@ public class ExecuteCommandArgs extends CommandArgs { @Override public String toString() { - return "FlinkCommandArgs{" + return "CommandArgs{" + "deployMode=" + deployMode + ", targetType=" @@ -74,7 +74,6 @@ public class ExecuteCommandArgs extends CommandArgs { } - private void userParamsToSysEnv() { if (!this.variables.isEmpty()) { variables.stream() @@ -111,7 +110,7 @@ public class ExecuteCommandArgs extends CommandArgs { return targetType; } else { throw new IllegalArgumentException( - "Groot-Stream job on flink engine submitted target type only " + "GrootStream job submitted target type only " + "support these options: [local, remote, yarn-session, yarn-per-job]"); } } @@ -132,7 +131,7 @@ public class ExecuteCommandArgs extends CommandArgs { return deployMode; } else { throw new IllegalArgumentException( - "Groot-Stream job on flink engine deploy mode only " + "GrootStream job deploy mode only " + "support these options: [run, run-application]"); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java new file mode 100644 index 0000000..05d3e52 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java @@ -0,0 +1,37 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.crypto.KeyUtil; +import cn.hutool.crypto.SmUtil; +import cn.hutool.crypto.symmetric.SM4; +import com.geedgenetworks.common.config.ConfigShade; + +import java.nio.charset.StandardCharsets; + +public class SM4ConfigShade implements ConfigShade { + private static final String IDENTIFIER = "sm4"; + + private static final String[] SENSITIVE_OPTIONS = + new String[] {"connection.user", "connection.password", "kafka.sasl.jaas.config","kafka.ssl.keystore.password","kafka.ssl.truststore.password","kafka.ssl.key.password"}; + + private static final byte[] SECURITY_KEY = KeyUtil.generateKey(SM4.ALGORITHM_NAME, ".geedgenetworks.".getBytes(StandardCharsets.UTF_8)).getEncoded(); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + return SmUtil.sm4(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8); + } + + @Override + public String decrypt(String content) { + return SmUtil.sm4(SECURITY_KEY).decryptStr(content, StandardCharsets.UTF_8); + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java index d2b37f6..6f33cae 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java @@ -3,6 +3,7 @@ package com.geedgenetworks.bootstrap.enums; public enum ProcessorType { SOURCE("source"), FILTER("filter"), + SPLIT("split"), PREPROCESSING("preprocessing"), PROCESSING("processing"), POSTPROCESSING("postprocessing"), diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 64c66b6..7a55ffe 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,26 +1,26 @@ package com.geedgenetworks.bootstrap.execution; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.processor.Processor; -import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.split.Split; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + import java.net.URL; import java.net.URLClassLoader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; +import java.util.*; import java.util.function.BiConsumer; public abstract class AbstractExecutor<K, V> - implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> { + implements Executor<DataStream<Event>, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; protected final Map<K,V> operatorMap; protected final Map<String,Filter> filterMap = new HashMap<>(); + protected final Map<String, Split> splitMap = new HashMap<>(); protected final Map<String, Processor> processorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { @@ -30,6 +30,10 @@ public abstract class AbstractExecutor<K, V> for (Filter filter : filters) { this.filterMap.put(filter.type(), filter); } + ServiceLoader<Split> splits = ServiceLoader.load(Split.class); + for (Split split : splits) { + this.splitMap.put(split.type(), split); + } ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); for (Processor processor : processors) { this.processorMap.put(processor.type(), processor); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 66c0b0f..b0a04cd 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; @@ -13,6 +14,7 @@ import com.geedgenetworks.core.processor.table.TableProcessor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -27,7 +29,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { ProcessorConfig processorConfig = operatorMap.get(node.getName()); switch (processorConfig.getType()) { @@ -45,7 +47,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } return dataStream; } - protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { + protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { TableProcessor tableProcessor; if (processorMap.containsKey(tableConfig.getType())) { @@ -63,15 +65,15 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, tableConfig.setParallelism(node.getParallelism()); } try { - dataStream = - tableProcessor.processorFunction( - dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + + dataStream = tableProcessor.processorFunction( + dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); } catch (Exception e) { throw new JobExecuteException("Create orderby pipeline instance failed!", e); } return dataStream; } - protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { + protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; if (processorMap.containsKey(aggregateConfig.getType())) { @@ -98,7 +100,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, return dataStream; } - protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { + protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { ProjectionProcessor projectionProcessor; if (processorMap.containsKey(projectionConfig.getType())) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 506aa11..66f0585 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; @@ -14,6 +15,7 @@ import com.geedgenetworks.core.pojo.FilterConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -54,7 +56,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); String className = filterConfig.getType(); Filter filter; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 2eabefa..f6e19eb 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -5,15 +5,24 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.config.SplitConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -27,49 +36,53 @@ import java.util.stream.Stream; public class JobExecution { private final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; - private final List<URL> jarPaths; - public JobExecution(Config config, GrootStreamConfig grootStreamConfig) { - try { + private final Set<String> splitSet = new HashSet<>(); + + public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { + try { jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) .toURI().toURL())); } catch (MalformedURLException e) { - throw new JobExecuteException("load groot stream bootstrap jar error.", e); + throw new JobExecuteException("Load GrootStream Bootstrap jar failed.", e); } - registerPlugin(config.getConfig(Constants.APPLICATION)); + registerPlugin(jobConfig.getConfig(Constants.APPLICATION)); - this.sourceExecutor = new SourceExecutor(jarPaths, config); - this.sinkExecutor = new SinkExecutor(jarPaths, config); - this.filterExecutor = new FilterExecutor(jarPaths, config); - this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); - this.processingExecutor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig); + this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig); + this.filterExecutor = new FilterExecutor(jarPaths, jobConfig); + this.splitExecutor = new SplitExecutor(jarPaths, jobConfig); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig); + this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig); this.jobRuntimeEnvironment = - JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); + JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig); this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.nodes = buildJobNode(config); + this.nodes = buildJobNode(jobConfig); } private void registerPlugin(Config appConfig) { List<Path> thirdPartyJars = new ArrayList<>(); Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { + if (envConfig.hasPath(ExecutionConfigKeyName.JARS)) { thirdPartyJars = new ArrayList<>(StartBuilder .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); } @@ -81,7 +94,7 @@ public class JobExecution { .map(uri -> { try { return uri.toURL(); - }catch (MalformedURLException e){ + } catch (MalformedURLException e) { throw new RuntimeException("the uri of jar illegal: " + uri, e); } }).collect(Collectors.toList()); @@ -93,10 +106,10 @@ public class JobExecution { } - private Config registerPlugin(Config config , List<URL> jars) { - config = this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); - return this.injectJarsToConfig( + private Config registerPlugin(Config config, List<URL> jars) { + config = this.injectJarsToConfig( + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); + return this.injectJarsToConfig( config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); } @@ -121,13 +134,13 @@ public class JobExecution { return new URL(uri); } catch (MalformedURLException e) { throw new RuntimeException( - "the uri of jar illegal:" + uri, e); + "The uri of jar illegal:" + uri, e); } }) .collect(Collectors.toSet()); paths.addAll(validJars); - config = config.withValue( + config = config.withValue( path, ConfigValueFactory.fromAnyRef( paths.stream() @@ -150,8 +163,9 @@ public class JobExecution { private List<Node> buildJobNode(Config config) { Map<String, Object> sources = Maps.newHashMap(); - Map<String, Object> sinks =Maps.newHashMap(); + Map<String, Object> sinks = Maps.newHashMap(); Map<String, Object> filters = Maps.newHashMap(); + Map<String, Object> splits = Maps.newHashMap(); Map<String, Object> preprocessingPipelines = Maps.newHashMap(); Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); @@ -160,11 +174,14 @@ public class JobExecution { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } if (config.hasPath(Constants.SINKS)) { - sinks =config.getConfig(Constants.SINKS).root().unwrapped(); + sinks = config.getConfig(Constants.SINKS).root().unwrapped(); } if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -184,13 +201,21 @@ public class JobExecution { nodes.add(node); }); - for(Node node : nodes) { + for (Node node : nodes) { if (sources.containsKey(node.getName())) { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); + } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); + node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); } else if (processingPipelines.containsKey(node.getName())) { @@ -214,12 +239,12 @@ public class JobExecution { List<Node> sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; + DataStream<Event> dataStream = null; - for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); + for (Node sourceNode : sourceNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceNode); for (String nodeName : sourceNode.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } @@ -230,7 +255,7 @@ public class JobExecution { log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName()); jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName()); final long jobEndTime = System.currentTimeMillis(); - log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime); + log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime); } catch (Exception e) { throw new JobExecuteException("Execute job error", e); @@ -238,34 +263,62 @@ public class JobExecution { } - private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { - throw new JobExecuteException("can't find downstream node " + downstreamNodeName); + throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } private Optional<Node> getNode(String name) { - return nodes.stream().filter(v-> v.getName().equals(name)).findFirst(); + return nodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 7141f5e..a4289ff 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -22,12 +22,8 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TernaryBoolean; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; @Slf4j @@ -38,16 +34,16 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; - private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { + private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; - this.initialize(config); + this.initialize(jobConfig); } - public static JobRuntimeEnvironment getInstance(Config config, GrootStreamConfig grootStreamConfig) { + public static JobRuntimeEnvironment getInstance(Config jobConfig, GrootStreamConfig grootStreamConfig) { if (INSTANCE == null) { synchronized (JobRuntimeEnvironment.class) { if (INSTANCE == null) { - INSTANCE = new JobRuntimeEnvironment(config, grootStreamConfig); + INSTANCE = new JobRuntimeEnvironment(jobConfig, grootStreamConfig); } } } @@ -73,10 +69,19 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ @Override public RuntimeEnvironment prepare() { - createStreamEnvironment(); + if (envConfig.hasPath(Constants.JOB_NAME)) { jobName = envConfig.getString(Constants.JOB_NAME); } + // Job-level user-defined variables override the grootStreamConfig + if (envConfig.hasPath(Constants.PROPERTIES)) { + envConfig.getConfig(Constants.PROPERTIES).root().unwrapped().entrySet().forEach(entry -> { + this.grootStreamConfig.getCommonConfig().getPropertiesConfig().put(entry.getKey(), String.valueOf(entry.getValue())); + }); + } + + createStreamEnvironment(); + return this; } @@ -84,8 +89,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return jobName; } - - public boolean isLocalMode() { return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget()); @@ -126,7 +129,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } private void createStreamEnvironment() { - Configuration configuration = new Configuration(); EnvironmentUtil.initConfiguration(envConfig, configuration); if (isLocalMode()) { @@ -272,6 +274,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 36fad61..b9555b4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -36,7 +38,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor { @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index b1e53e4..a7b9e5e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -3,10 +3,12 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -37,7 +39,8 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index d69fe8c..f6788ed 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -35,7 +37,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index bee1c0a..b177e40 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -1,7 +1,4 @@ package com.geedgenetworks.bootstrap.execution; - - -import com.geedgenetworks.bootstrap.enums.TargetType; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckResult; import com.typesafe.config.Config; @@ -12,11 +9,10 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); + //Prepare runtime environment for job execution + RuntimeEnvironment prepare(); Config getEnvConfig(); - CheckResult checkConfig(); - RuntimeEnvironment prepare(); - void registerPlugin(List<URL> pluginPaths); default void initialize(Config config) { this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index a0bedc9..70934b8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SinkConfigOptions; @@ -20,6 +21,7 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -63,7 +65,7 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { SinkConfig sinkConfig = operatorMap.get(node.getName()); try { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index d4751af..9dff6b0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -67,7 +68,7 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator outputStreamOperator, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException { SourceConfig sourceConfig = operatorMap.get(node.getName()); SingleOutputStreamOperator sourceSingleOutputStreamOperator; try { @@ -152,4 +153,5 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { return 0; } } + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java new file mode 100644 index 0000000..e549087 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -0,0 +1,88 @@ +package com.geedgenetworks.bootstrap.execution; + +import com.alibaba.fastjson.JSONObject; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.SplitConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.geedgenetworks.core.split.Split; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; + +import java.net.URL; +import java.util.List; +import java.util.Map; + + +/** + * Initialize config and execute filter operator + */ +@Slf4j +public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { + + + public SplitExecutor(List<URL> jarPaths, Config config) { + super(jarPaths, config); + } + + @Override + protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); + if (operatorConfig.hasPath(Constants.SPLITS)) { + Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); + splitsConfig.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key), + SplitConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "split: %s, Message: %s", + key, result.getMsg())); + } + SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + splitConfig.setName(key); + splitConfigMap.put(key, splitConfig); + }); + } + + return splitConfigMap; + } + + @Override + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + SplitConfig splitConfig = operatorMap.get(node.getName()); + String className = splitConfig.getType(); + Split split; + if (splitMap.containsKey(splitConfig.getType())) { + + split = splitMap.get(splitConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + split = (Split) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get split instance failed!", e); + } + } + if (node.getParallelism() > 0) { + splitConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + split.splitFunction( + dataStream, splitConfig); + } catch (Exception e) { + throw new JobExecuteException("Create split instance failed!", e); + } + return dataStream; + } + +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java index 8ab8bdc..6a106d2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java @@ -76,7 +76,7 @@ public class GrootStreamRunner { bootstrapCommandArgs.getVariables().stream() .filter(Objects::nonNull) .map(String::trim) - .forEach(variable -> command.add("-D" + variable)); + .forEach(variable -> command.add("-i " + variable)); return command; } diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade index 6654db5..f490f28 100644 --- a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade +++ b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade @@ -1,2 +1,3 @@ com.geedgenetworks.bootstrap.command.Base64ConfigShade -com.geedgenetworks.bootstrap.command.AESConfigShade
\ No newline at end of file +com.geedgenetworks.bootstrap.command.AESConfigShade +com.geedgenetworks.bootstrap.command.SM4ConfigShade
\ No newline at end of file diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 8b299df..7b9544a 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -7,8 +7,10 @@ import com.geedgenetworks.bootstrap.execution.*; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.udf.RuleContext; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -16,7 +18,9 @@ import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -33,13 +37,14 @@ import java.util.stream.Stream; public class JobExecutionTest { protected final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor; - + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; + private final Set<String> splitSet = new HashSet<>(); private final List<Node> nodes; private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = @@ -66,21 +71,22 @@ public class JobExecutionTest { } registerPlugin(config.getConfig(Constants.APPLICATION)); - this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config); - this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config); - this.filterExecuteProcessor = new FilterExecutor(jarPaths, config); - this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config); - this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, config); + this.sinkExecutor = new SinkExecutor(jarPaths, config); + this.splitExecutor = new SplitExecutor(jarPaths, config); + this.filterExecutor = new FilterExecutor(jarPaths, config); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); + this.processingExecutor = new ProcessingExecutor(jarPaths, config); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); - - this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.nodes = buildJobNode(config); } @@ -173,6 +179,7 @@ public class JobExecutionTest { Map<String, Object> sources = Maps.newHashMap(); Map<String, Object> sinks =Maps.newHashMap(); Map<String, Object> filters = Maps.newHashMap(); + Map<String, Object> splits = Maps.newHashMap(); Map<String, Object> preprocessingPipelines = Maps.newHashMap(); Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); @@ -186,6 +193,9 @@ public class JobExecutionTest { if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -210,6 +220,14 @@ public class JobExecutionTest { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); + } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); + node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (preprocessingPipelines.containsKey(node.getName())) { @@ -228,15 +246,15 @@ public class JobExecutionTest { } - public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException { + public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException { List<Node> sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; + DataStream<Event> singleOutputStreamOperator = null; for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode); + singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); for (String nodeName : sourceNode.getDownstream()) { buildJobGraph(singleOutputStreamOperator, nodeName); } @@ -247,27 +265,55 @@ public class JobExecutionTest { } - private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (splitSet.contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node); - } else { + if (splitSet.contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } + } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java new file mode 100644 index 0000000..2f6984b --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -0,0 +1,74 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + + +public class JobSplitTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplit() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(7, CollectSink.values.size()); + } + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java new file mode 100644 index 0000000..9fa81c0 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -0,0 +1,74 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobSplitWithAggTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplitForAgg() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + + Assert.assertEquals(2, CollectSink.values.size()); + Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString()); + + } +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index c4f54a3..90ff95d 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,16 +75,10 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("server_super_admin_area").toString()); - Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString()); - Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString()); - Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region")); - Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); + Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); - List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list"); - Assert.assertEquals("6167", asn_list.get(0)); } @@ -121,4 +115,5 @@ public class SimpleJobTest { } Assert.assertEquals(4, CollectSink.values.size()); } + } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java index dfcb459..c5806ed 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java @@ -3,9 +3,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect; import com.geedgenetworks.common.Event; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; public class CollectSink implements SinkFunction<Event> { diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java index c3746a4..17f56ce 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java @@ -65,8 +65,16 @@ public class ConfigShadeTest { Assertions.assertEquals("159c7da83d988a9ec041d10a6bfbe221bcbaed6b62d9cc1b04ff51e633ebd105", encryptPassword); Assertions.assertEquals(decryptUsername, USERNAME); Assertions.assertEquals(decryptPassword, PASSWORD); - System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); - System.out.println( ConfigShadeUtils.decryptOption("aes", "454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817")); + encryptUsername = ConfigShadeUtils.encryptOption("sm4", USERNAME); + decryptUsername = ConfigShadeUtils.decryptOption("sm4", encryptUsername); + Assertions.assertEquals("72ea74367a15cb96b0d1d42104149519", encryptUsername); + Assertions.assertEquals(decryptUsername, USERNAME); + encryptPassword = ConfigShadeUtils.encryptOption("sm4", PASSWORD); + decryptPassword = ConfigShadeUtils.decryptOption("sm4", encryptPassword); + Assertions.assertEquals("3876c7088d395bbbfa826e3648b6c9a022e7f80941c132313bde6dc8a7f2351f", encryptPassword); + Assertions.assertEquals(decryptPassword, PASSWORD); + System.out.println( ConfigShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); + System.out.println( ConfigShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6")); System.out.println( ConfigShadeUtils.encryptOption("aes", "testuser")); System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";")); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 888c94e..9724e21 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -111,7 +111,7 @@ processing_pipelines: lookup_fields: [ packet_capture_file ] output_fields: [ packet_capture_file ] parameters: - path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + path: [ props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] - function: STRING_JOINER lookup_fields: [ server_ip,client_ip ] output_fields: [ ip_string ] @@ -191,6 +191,8 @@ sinks: application: # [object] Application Configuration env: # [object] Environment Variables name: groot-stream-job # [string] Job Name + properties: + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket pipeline: object-reuse: true # [boolean] Object Reuse, default is false topology: # [array of object] Node List. It will be used build data flow for job dag graph. @@ -206,3 +208,4 @@ application: # [object] Application Configuration - name: collect_sink parallelism: 1 + diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml new file mode 100644 index 0000000..5163642 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -0,0 +1,57 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false + watermark_timestamp: recv_time + watermark_timestamp_unit: ms + watermark_lag: 10 +sinks: + collect_sink: + type: collect + properties: + format: json + +postprocessing_pipelines: + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 5 + window_timestamp_field: test_time + mini_batch: true + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + - function: MEAN + lookup_fields: [ pkts ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [ aggregate_processor ] + - name: aggregate_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml new file mode 100644 index 0000000..9bb2900 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -0,0 +1,97 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: table_processor + expression: event.decoded_as == 'HTTP' + - name: pre_etl_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 5 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split,collect_sink] + - name: test_split + parallelism: 1 + downstream: [ table_processor,pre_etl_processor ] + - name: pre_etl_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: table_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-common/pom.xml b/groot-common/pom.xml index 10e9ed4..37a4d25 100644 --- a/groot-common/pom.xml +++ b/groot-common/pom.xml @@ -41,6 +41,13 @@ <artifactId>hutool-all</artifactId> </dependency> + + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk18on</artifactId> + </dependency> + + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index d13fc4b..b523591 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -12,6 +12,8 @@ public final class Constants { public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; + public static final String PROPERTIES = "properties"; + public static final String SPLITS = "splits"; public static final String APPLICATION_ENV ="env"; public static final String APPLICATION_TOPOLOGY = "topology"; public static final String JOB_NAME = "name"; @@ -23,7 +25,6 @@ public final class Constants { " \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" + " \n"; - public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java index 0b0379d..af94abf 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -5,8 +5,6 @@ import com.geedgenetworks.common.udf.UDFContext; import java.util.List; -import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP; - public interface AggregateConfigOptions { Option<String> TYPE = Options.key("type") .stringType() @@ -42,7 +40,10 @@ public interface AggregateConfigOptions { .intType() .noDefaultValue() .withDescription("The size of window."); - + Option<Boolean> MINI_BATCH = Options.key("mini_batch") + .booleanType() + .defaultValue(false) + .withDescription("The label of pre_aggrergate."); Option<Integer> WINDOW_SLIDE = Options.key("window_slide") .intType() .noDefaultValue() diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java new file mode 100644 index 0000000..a2acb71 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java @@ -0,0 +1,18 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.udf.RuleContext; +import java.util.List; + +public interface SplitConfigOptions { + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of route ."); + + Option<List<RuleContext>> RULES = Options.key("rules") + .type(new TypeReference<List<RuleContext>>() {}) + .noDefaultValue() + .withDescription("The rules to be executed."); + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java index 455073f..6f6e048 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -8,14 +8,10 @@ import java.io.Serializable; public interface AggregateFunction extends Serializable { void open(UDFContext udfContext); - Accumulator initAccumulator(Accumulator acc); - Accumulator add(Event val, Accumulator acc); - String functionName(); - Accumulator getResult(Accumulator acc); - + Accumulator merge(Accumulator a, Accumulator b); default void close(){}; } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java new file mode 100644 index 0000000..ead0ecd --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.common.udf; + +import com.geedgenetworks.common.Event; +import com.googlecode.aviator.Expression; +import lombok.Data; +import org.apache.flink.util.OutputTag; + +import java.io.Serializable; + +@Data +public class RuleContext implements Serializable { + + private String name; + private String expression; + private Expression compiledExpression; + private OutputTag<Event> outputTag ; + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java index 668ba6f..d8b8bc4 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java @@ -2,13 +2,14 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public class AviatorFilter implements Filter { @Override - public SingleOutputStreamOperator<Event> filterFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) + public DataStream<Event> filterFunction( + DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception { if (FilterConfig.getParallelism() != 0) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java index a173438..f8b50eb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java @@ -3,12 +3,13 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Filter { - SingleOutputStreamOperator<Event> filterFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) + DataStream<Event> filterFunction( + DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception; String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index d3cbaac..ebdb0bd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -1,15 +1,15 @@ package com.geedgenetworks.core.pojo; +import com.alibaba.fastjson2.annotation.JSONField; import com.geedgenetworks.common.udf.UDFContext; import lombok.Data; import lombok.EqualsAndHashCode; -import java.io.Serializable; import java.util.List; -import java.util.Map; + @EqualsAndHashCode(callSuper = true) @Data -public class AggregateConfig extends ProcessorConfig { +public class AggregateConfig extends ProcessorConfig { private List<String> group_by_fields; @@ -18,5 +18,7 @@ public class AggregateConfig extends ProcessorConfig { private Integer window_size; private Integer window_slide; private List<UDFContext> functions; + @JSONField(defaultValue = "false" ) + private Boolean mini_batch; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java index 416a7ea..2508730 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java @@ -13,18 +13,50 @@ public class OnlineStatistics { aggregate += (delta * (val.doubleValue() - mean)); return this; } + + public OnlineStatistics merge(OnlineStatistics other) { + if (other.n == 0) { + return this; // Nothing to merge + } + if (this.n == 0) { + this.n = other.n; + this.mean = other.mean; + this.aggregate = other.aggregate; + return this; + } + + // Combine counts + long newN = this.n + other.n; + + // Calculate the new mean + double delta = other.mean - this.mean; + this.mean += delta * (other.n / (double) newN); + + // Update the aggregate + this.aggregate += other.aggregate + + (this.n * delta * delta) / newN; + + // Update the count + this.n = newN; + + return this; + } + //计算总体标准差 public double stddevp() { return Math.sqrt(variancep()); } + //计算总体方差 public double variancep() { return aggregate / n; } + //计算样本标准差 public double stddev() { return Math.sqrt(variance()); } + //计算样本方差 public double variance() { return aggregate / (n - 1); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java new file mode 100644 index 0000000..4381df5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.core.pojo; + +import com.geedgenetworks.common.udf.RuleContext; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +@Data +public class SplitConfig implements Serializable { + + private String type; + private Map<String, Object> properties; + private int parallelism; + private String name; + private List<RuleContext> rules; +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java index 172b368..3852414 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java @@ -2,12 +2,13 @@ package com.geedgenetworks.core.processor; import com.geedgenetworks.common.Event; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Processor<T> { - SingleOutputStreamOperator<Event> processorFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, + DataStream<Event> processorFunction( + DataStream<Event> singleOutputStreamOperator, T processorConfig, ExecutionConfig config) throws Exception; String type(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java new file mode 100644 index 0000000..3632ba7 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -0,0 +1,191 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import cn.hutool.crypto.SecureUtil; +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +@Slf4j +public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator> { + + private final long windowSize; + private Long staggerOffset = null; + + protected final Map<Long, Map<String, Accumulator>> windows = new HashMap<>(); + protected List<String> groupByFields; + private LinkedList<UdfEntity> functions; + + protected InternalMetrics internalMetrics; + private final AggregateConfig aggregateConfig; + + public AbstractFirstAggregation(AggregateConfig aggregateConfig, long windowSize) { + this.windowSize = windowSize; + this.aggregateConfig = aggregateConfig; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + functions = Lists.newLinkedList(); + try { + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + List<UDFContext> udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + return; + } + Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + + udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + throw new RuntimeException(); + } + groupByFields = aggregateConfig.getGroup_by_fields(); + functions = Lists.newLinkedList(); + Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); + + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setAggregateFunction(aggregateFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + udfEntity.setUdfContext(udfContext); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDAF: " + udfContext.getFunction()); + } + + } + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); + } + + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); + + } + } + + @Override + public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception { + + } + + public void onTimer(long timestamp, Collector<Accumulator> out) throws Exception { + Map<String, Accumulator> accumulator = windows.remove(timestamp); + for (Accumulator value : accumulator.values()) { + value = getResult(value); + out.collect(value); + internalMetrics.incrementOutEvents(); + } + accumulator.clear(); + } + + private long assignWindowStart(long timestamp, long offset) { + return timestamp - (timestamp - offset + windowSize) % windowSize; + } + + protected long assignWindowEnd(long timestamp) { + if (staggerOffset == null) { + staggerOffset = getStaggerOffset(); + } + return assignWindowStart(timestamp, (staggerOffset % windowSize)) + windowSize; + } + + private long getStaggerOffset() { + return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize); + } + + public Accumulator createAccumulator() { + + Map<String, Object> map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + + accumulator.setMetricsFields(map); + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().initAccumulator(accumulator); + } + return accumulator; + + } + + public String getKey(Event value, List<String> keys) { + StringBuilder stringBuilder = new StringBuilder(); + for (String key : keys) { + + if (value.getExtractedFields().containsKey(key)) { + stringBuilder.append(value.getExtractedFields().get(key).toString()); + } else { + stringBuilder.append(","); + } + } + return SecureUtil.md5(stringBuilder.toString()); + + } + + public Accumulator add(Event event, Accumulator accumulator) { + accumulator.setInEvents(accumulator.getInEvents() + 1); + for (UdfEntity udafEntity : functions) { + try { + boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; + if (result) { + udafEntity.getAggregateFunction().add(event, accumulator); + } + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udafEntity.getName() + " Invalid filter ! "); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " execute exception !", e); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } + } + return accumulator; + } + + public Accumulator getResult(Accumulator accumulator) { + return accumulator; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 803fefc..4f9535d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { private final List<UDFContext> udfContexts; private final List<String> udfClassNameLists; - private final List<String> groupByFields; - private LinkedList<UdfEntity> functions; + private final LinkedList<UdfEntity> functions; public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); } - groupByFields = aggregateConfig.getGroup_by_fields(); functions = Lists.newLinkedList(); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index bc87c32..c261fb6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -5,49 +5,96 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.AggregateConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; + import static com.geedgenetworks.common.Constants.*; -public class AggregateProcessorImpl implements AggregateProcessor { +public class AggregateProcessorImpl implements AggregateProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { - if (aggregateConfig.getParallelism() != 0) { + public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + + SingleOutputStreamOperator<Event> singleOutputStreamOperator; + if (aggregateConfig.getMini_batch()) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); + } - }else { + } else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } } + if (aggregateConfig.getParallelism() != 0) { + singleOutputStreamOperator.setParallelism(aggregateConfig.getParallelism()); + } + return singleOutputStreamOperator.name(aggregateConfig.getName()); + } @Override public String type() { return "aggregate"; } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java new file mode 100644 index 0000000..156c0ed --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -0,0 +1,60 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.AggregateConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +@Slf4j +public class FirstAggregationEventTime extends AbstractFirstAggregation { + + private final PriorityQueue<Long> eventTimeTimersQueue = new PriorityQueue<>(); + + public FirstAggregationEventTime(AggregateConfig aggregateConfig, long windowSize) { + super(aggregateConfig, windowSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + } + + @Override + public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception { + Long timestamp; + internalMetrics.incrementInEvents(); + try { + String key = getKey(value, groupByFields); + while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) { + eventTimeTimersQueue.poll(); + onTimer(timestamp, out); + } + long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark()); + if (!windows.containsKey(windowEnd)) { + Map<String, Accumulator> map = new HashMap<>(); + map.put(key, createAccumulator()); + windows.put(windowEnd, map); + eventTimeTimersQueue.add(windowEnd); + } else { + if (!windows.get(windowEnd).containsKey(key)) { + windows.get(windowEnd).put(key, createAccumulator()); + } + } + add(value, windows.get(windowEnd).get(key)); + } catch (Exception e) { + log.error("Error in pre-aggregate processElement", e); + internalMetrics.incrementErrorEvents(); + } + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java new file mode 100644 index 0000000..e98daa5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java @@ -0,0 +1,59 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.AggregateConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +@Slf4j +public class FirstAggregationProcessingTime extends AbstractFirstAggregation { + + private final PriorityQueue<Long> processingTimeTimersQueue = new PriorityQueue<>(); + + public FirstAggregationProcessingTime(AggregateConfig aggregateConfig, long windowSize) { + super(aggregateConfig, windowSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + } + + @Override + public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception { + Long timestamp; + internalMetrics.incrementInEvents(); + try { + String key = getKey(value, groupByFields); + while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + processingTimeTimersQueue.poll(); + onTimer(timestamp, out); + } + long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + if (!windows.containsKey(windowEnd)) { + Map<String, Accumulator> map = new HashMap<>(); + map.put(key, createAccumulator()); + windows.put(windowEnd, map); + processingTimeTimersQueue.add(windowEnd); + } else { + if (!windows.get(windowEnd).containsKey(key)) { + windows.get(windowEnd).put(key, createAccumulator()); + } + } + add(value, windows.get(windowEnd).get(key)); + } catch (Exception e) { + log.error("Error in pre-aggregate processElement", e); + internalMetrics.incrementErrorEvents(); + } + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java index 165ed1b..da09690 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.processor.aggregate; +import cn.hutool.crypto.SecureUtil; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.KeybyEntity; @@ -20,16 +21,17 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); - for(String key: keys){ + for (String key : keys) { - if(value.getExtractedFields().containsKey(key)){ - keybyEntity.getKeys().put(key,value.getExtractedFields().get(key)); + if (value.getExtractedFields().containsKey(key)) { + keybyEntity.getKeys().put(key, value.getExtractedFields().get(key)); stringBuilder.append(value.getExtractedFields().get(key).toString()); - }else { + } else { stringBuilder.append(","); } } - keybyEntity.setKeysToString(stringBuilder.toString()); - return keybyEntity; + String hashedKey = SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(hashedKey); + return keybyEntity; } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java new file mode 100644 index 0000000..6e43184 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java @@ -0,0 +1,37 @@ +package com.geedgenetworks.core.processor.aggregate; + +import cn.hutool.crypto.SecureUtil; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.KeybyEntity; + +import java.util.HashMap; +import java.util.List; + +public class PreKeySelector implements org.apache.flink.api.java.functions.KeySelector<Accumulator, KeybyEntity> { + + + private final List<String> keys; + + public PreKeySelector(List<String> keys) { + this.keys = keys; + } + + @Override + public KeybyEntity getKey(Accumulator value) throws Exception { + + KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); + StringBuilder stringBuilder = new StringBuilder(); + for (String key : keys) { + + if (value.getMetricsFields().containsKey(key)) { + keybyEntity.getKeys().put(key, value.getMetricsFields().get(key)); + stringBuilder.append(value.getMetricsFields().get(key).toString()); + } else { + stringBuilder.append(","); + } + } + String hashedKey = SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(hashedKey); + return keybyEntity; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java new file mode 100644 index 0000000..68fa53e --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -0,0 +1,126 @@ +package com.geedgenetworks.core.processor.aggregate; + +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.ExecutionConfig; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +@Slf4j +public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> { + private final List<UDFContext> udfContexts; + private final List<String> udfClassNameLists; + private final LinkedList<UdfEntity> functions; + + public SecondAggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { + udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + throw new RuntimeException(); + } + functions = Lists.newLinkedList(); + Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); + try { + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setAggregateFunction(aggregateFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + udfEntity.setUdfContext(udfContext); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDAF: " + udfContext.getFunction()); + } + + } + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); + } + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); + + } + } + + @Override + public Accumulator createAccumulator() { + Map<String, Object> map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(map); + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().initAccumulator(accumulator); + } + return accumulator; + } + + @Override + public Accumulator add(Accumulator event, Accumulator accumulator) { + return merge(event, accumulator); + } + + @Override + public Accumulator getResult(Accumulator accumulator) { + for (UdfEntity udafEntity : functions) { + try { + udafEntity.getAggregateFunction().getResult(accumulator); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " getResult exception !", e); + } + } + return accumulator; + } + + @Override + public Accumulator merge(Accumulator acc1, Accumulator acc2) { + acc1.setInEvents(acc1.getInEvents() + 1); + for (UdfEntity udafEntity : functions) { + try { + udafEntity.getAggregateFunction().merge(acc1, acc2); + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udafEntity.getName() + " Invalid filter ! "); + acc1.setErrorCount(acc1.getErrorCount() + 1); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " execute exception !", e); + acc1.setErrorCount(acc1.getErrorCount() + 1); + } + } + return acc1; + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 6b46a7b..d87e7e2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -4,25 +4,26 @@ import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; public class ProjectionProcessorImpl implements ProjectionProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { if (projectionConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .setParallelism(projectionConfig.getParallelism()) .name(projectionConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .name(projectionConfig.getName()); } } - @Override public String type() { return "projection"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java index f36f8db..6ddc616 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java @@ -2,25 +2,25 @@ package com.geedgenetworks.core.processor.table; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.TableConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; + +import java.util.Map; -import java.time.Duration; public class TableProcessorImpl implements TableProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception { - + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) throws Exception { if (tableConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .setParallelism(tableConfig.getParallelism()) .name(tableConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .name(tableConfig.getName()); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java new file mode 100644 index 0000000..37e7b44 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.util.Set; + +public interface Split { + + DataStream<Event> splitFunction( + DataStream<Event> dataStream, SplitConfig splitConfig) + throws Exception; + String type(); +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java new file mode 100644 index 0000000..f07b568 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -0,0 +1,79 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.List; +import java.util.Map; + + +@Slf4j +public class SplitFunction extends ProcessFunction<Event, Event> { + private final SplitConfig splitConfig; + private List<RuleContext> rules; + private transient InternalMetrics internalMetrics; + + public SplitFunction(SplitConfig splitConfig) { + this.splitConfig = splitConfig; + + } + + @Override + public void open(Configuration parameters) throws Exception { + + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + this.rules = splitConfig.getRules(); + for(RuleContext rule : rules){ + String expression = rule.getExpression(); + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + Expression compiledExp = instance.compile(expression, true); + rule.setCompiledExpression(compiledExp); + OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){}; + rule.setOutputTag(outputTag); + } + } + + + @Override + public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception { + try { + internalMetrics.incrementInEvents(); + for (RuleContext route : rules){ + boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; + if (result) { + ctx.output(route.getOutputTag(), event); + } + } + }catch (Exception e) { + internalMetrics.incrementErrorEvents(); + log.error("error in split function", e); + } + } + + public static Boolean filterExecute(Expression expression, Map<String, Object> map) { + + boolean result; + Object object = expression.execute(map); + if (object != null) { + result = (Boolean) object; + } else { + throw new ExpressionRuntimeException(); + } + return result; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java new file mode 100644 index 0000000..f6d2c8c --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java @@ -0,0 +1,29 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; + +public class SplitOperator implements Split { + + @Override + public DataStream<Event> splitFunction( + DataStream<Event> dataStream, SplitConfig splitConfig) + throws Exception { + if (splitConfig.getParallelism() != 0) { + return dataStream + .process(new SplitFunction(splitConfig)) + .setParallelism(splitConfig.getParallelism()) + .name(splitConfig.getName()); + } else { + return dataStream + .process(new SplitFunction(splitConfig)) + .name(splitConfig.getName()); + } + } + @Override + public String type() { + return "split"; + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 423eff9..3921ee2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -22,9 +22,9 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.processor.projection.UdfEntity; -import java.util.*; +import java.util.ArrayList; +import java.util.List; /** * Collects elements within a group and returns the list of aggregated objects @@ -36,18 +36,18 @@ public class CollectList implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new ArrayList<>()); @@ -56,7 +56,7 @@ public class CollectList implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -75,4 +75,17 @@ public class CollectList implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List<Object> firstValue = (List<Object>) firstAcc.getMetricsFields().get(outputField); + List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index b4dfb14..9ec9b09 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -8,7 +8,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -23,17 +22,17 @@ public class CollectSet implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new HashSet<>()); @@ -42,7 +41,7 @@ public class CollectSet implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -61,5 +60,16 @@ public class CollectSet implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set<Object> firstValue = (Set<Object>) firstAcc.getMetricsFields().get(outputField); + Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java index 6301a01..a1a35be 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,8 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -36,14 +34,13 @@ public class FirstValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -55,7 +52,7 @@ public class FirstValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){ + if (!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class FirstValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java index 1648fa5..a099fde 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -59,6 +59,26 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ rst = ((Histogramer)agg).merge(((Histogramer) aggOther));
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected void updateHdr(Accumulator acc, Object value) {
Map<String, Object> aggs = acc.getMetricsFields();
ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java index f27a2e6..44b374e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,9 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; -import java.util.List; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -37,17 +34,17 @@ public class LastValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; @@ -55,7 +52,7 @@ public class LastValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class LastValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java index ea33271..05de38c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java @@ -13,20 +13,22 @@ public class LongCount implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getOutput_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getOutput_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } outputField = udfContext.getOutput_fields().get(0); } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L); + acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long) v + 1L); return acc; } @@ -40,5 +42,18 @@ public class LongCount implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + long firstValue = (long) firstAcc.getMetricsFields().get(outputField); + long secondValue = (long) secondAcc.getMetricsFields().get(outputField); + firstValue = firstValue + secondValue; + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java index 2a615ef..9c4e070 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -16,20 +16,20 @@ public class Mean implements AggregateFunction { private String outputField; private Integer precision; private DecimalFormat df; + @Override - public void open(UDFContext udfContext){ + public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } - if(udfContext.getParameters()!= null && !udfContext.getParameters().isEmpty()) { + if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString()); if (precision > 0) { StringBuilder pattern = new StringBuilder("#."); @@ -38,14 +38,15 @@ public class Mean implements AggregateFunction { } df = new DecimalFormat(pattern.toString()); } - }else { + } else { precision = -1; } } + @Override public Accumulator initAccumulator(Accumulator acc) { - acc.getMetricsFields().put(outputField,new OnlineStatistics()); + acc.getMetricsFields().put(outputField, new OnlineStatistics()); return acc; } @@ -67,16 +68,27 @@ public class Mean implements AggregateFunction { @Override public Accumulator getResult(Accumulator acc) { OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField); - if(precision<0){ + if (precision < 0) { acc.getMetricsFields().put(outputField, aggregate.mean()); - } - else if(precision>0){ + } else if (precision > 0) { acc.getMetricsFields().put(outputField, df.format(aggregate.mean())); - } - else { - acc.getMetricsFields().put(outputField,(long)aggregate.mean()); + } else { + acc.getMetricsFields().put(outputField, (long) aggregate.mean()); } return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + OnlineStatistics acc1 = (OnlineStatistics) firstAcc.getMetricsFields().get(outputField); + acc1.merge((OnlineStatistics) secondAcc.getMetricsFields().get(outputField)); + long inEvents = firstAcc.getInEvents() + (secondAcc.getInEvents()); + long outEvent = firstAcc.getOutEvents() + (secondAcc.getOutEvents()); + long error = firstAcc.getErrorCount() + (secondAcc.getErrorCount()); + firstAcc.setInEvents(inEvents); + firstAcc.setErrorCount(error); + firstAcc.setOutEvents(outEvent); + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index 01e9a5b..e972133 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -6,7 +6,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.pojo.OnlineStatistics; public class NumberSum implements AggregateFunction { @@ -15,15 +14,14 @@ public class NumberSum implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getLookup_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -32,22 +30,23 @@ public class NumberSum implements AggregateFunction { public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ - Number val = (Number) event.getExtractedFields().get(lookupField); - Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); - if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) { - aggregate = aggregate.longValue() + val.longValue(); - } else if (aggregate instanceof Float || val instanceof Float) { - aggregate = aggregate.floatValue() + val.floatValue(); - } else if (aggregate instanceof Double || val instanceof Double) { - aggregate = aggregate.doubleValue() + val.doubleValue(); - } - acc.getMetricsFields().put(outputField, aggregate); + if (event.getExtractedFields().containsKey(lookupField)) { + Number val = (Number) event.getExtractedFields().get(lookupField); + Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); + if (aggregate instanceof Long && (val instanceof Integer || val instanceof Long)) { + aggregate = aggregate.longValue() + val.longValue(); + } else if (aggregate instanceof Float || val instanceof Float) { + aggregate = aggregate.floatValue() + val.floatValue(); + } else if (aggregate instanceof Double || val instanceof Double) { + aggregate = aggregate.doubleValue() + val.doubleValue(); } - return acc; + acc.getMetricsFields().put(outputField, aggregate); + } + return acc; } @Override @@ -65,4 +64,24 @@ public class NumberSum implements AggregateFunction { } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + Number firstValue = (Number) firstAcc.getMetricsFields().get(outputField); + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + if (firstValue instanceof Long && (secondValue instanceof Integer || secondValue instanceof Long)) { + firstValue = firstValue.longValue() + secondValue.longValue(); + } else if (firstValue instanceof Float || secondValue instanceof Float) { + firstValue = firstValue.floatValue() + secondValue.floatValue(); + } else if (firstValue instanceof Double || secondValue instanceof Double) { + firstValue = firstValue.doubleValue() + secondValue.doubleValue(); + } + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java index 71d61dc..0802c22 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -51,6 +51,34 @@ public abstract class HlldBaseAggregate implements AggregateFunction { return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ if(inputSketch){
+ ((HllUnion)agg).update(((HllUnion) aggOther).getResult());
+ rst = agg;
+ }else{
+ final HllUnion union = new HllUnion(precision);
+ union.update((Hll) agg);
+ union.update((Hll) aggOther);
+ rst = union.getResult();
+ }
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected Hll getResultHll(Accumulator acc){
Object agg = acc.getMetricsFields().get(outputField);
if (agg == null) {
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split new file mode 100644 index 0000000..500c367 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split @@ -0,0 +1 @@ +com.geedgenetworks.core.split.SplitOperator
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index b0d846b..2bf13a5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -36,11 +36,51 @@ public class CollectListTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List<String> arr) throws ParseException { + private void testMerge(List<String> arr,List<String> arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectList collectList = new CollectList(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectList.getResult(collectList.merge(result1,result2)); + List<String> vals = (List<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),8); + assertEquals("192.168.1.6",vals.get(5).toString()); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + CollectList collectList = new CollectList(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator agg = collectList.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectList.add(event, agg); + + } + return agg; + } + private void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index ea4fe8d..8e992f6 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -31,14 +31,53 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CollectSetTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List<String> arr) throws ParseException { + private void testMerge(List<String> arr,List<String> arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectSet collectSet = new CollectSet(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectSet.getResult(collectSet.merge(result1,result2)); + Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),6); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + CollectSet collectSet = new CollectSet(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator agg = collectSet.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectSet.add(event, agg); + + } + return agg; + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java index 506f6de..43a9732 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -34,11 +34,48 @@ public class FirstValueTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List<String> arr,List<String> arr2) { - private static void excute(List<String> arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_first")); + FirstValue firstValue = new FirstValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = firstValue.getResult(firstValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_first"); + assertEquals(val,"192.168.1.1"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + FirstValue firstValue = new FirstValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator agg = firstValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = firstValue.add(event, agg); + + } + return agg; + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java index f8306cd..e952908 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -37,11 +37,48 @@ public class LastValueTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.3"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List<String> arr,List<String> arr2) { - private static void excute(List<String> arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_last")); + LastValue lastValue = new LastValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = lastValue.getResult(lastValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_last"); + assertEquals(val,"192.168.1.3"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + LastValue lastValue = new LastValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator agg = lastValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = lastValue.add(event, agg); + + } + return agg; + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java index 3c02499..c1dfb9e 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LastValue; import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; @@ -38,10 +39,46 @@ public class LongCountTest { public void test() throws ParseException { Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; - excute(longArr); + Long[] longArr2 = new Long[]{1L, 2L, 3L, 4L}; + testMerge(longArr,longArr2); + testGetResult(longArr); } + private void testMerge(Number[] arr,Number[] arr2) { - private static void excute(Number[] arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("count")); + LongCount longCount = new LongCount(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = longCount.getResult(longCount.merge(result1,result2)); + assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),8); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + LongCount longCount = new LongCount(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator agg = longCount.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = longCount.add(event, agg); + + } + return agg; + } + private static void testGetResult(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setOutput_fields(Collections.singletonList("count")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java index 6deed0f..cc4eaf0 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -39,12 +39,52 @@ public class MeanTest { Integer[] intArr1 = new Integer[]{1, 2, 3, 4}; Integer[] intArr2 = new Integer[]{1, 6, 3}; - excute(intArr1, 0); - excute2(intArr2, 2); - excute3(intArr1); + testInt(intArr1, 0); + testDouble(intArr2, 2); + testNoPrecision(intArr1); + testMerge(intArr1,intArr2,2); + } + private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator result1 = getMiddleResult(arr1,precision); + Accumulator result2 = getMiddleResult(arr2,precision); + Accumulator result = mean.getResult(mean.merge(result1,result2)); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2.86")); + } + private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator agg = mean.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = mean.add(event, agg); + + } + return agg; } - private static void excute(Number[] arr,int precision) throws ParseException { + + private void testInt(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -65,11 +105,12 @@ public class MeanTest { agg = mean.add(event, agg); } + Accumulator result = mean.getResult(agg); assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2")); } - private static void excute2(Number[] arr,int precision) throws ParseException { + private void testDouble(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -94,7 +135,7 @@ public class MeanTest { assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33")); } - private static void excute3(Number[] arr) throws ParseException { + private void testNoPrecision(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java index d0d3d2c..a4072ca 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; import org.junit.jupiter.api.Test; @@ -41,8 +42,44 @@ public class NumberSumTest { excute(doubleArr, Double.class); excute(intArr, Long.class); excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_sum")); + NumberSum numberSum = new NumberSum(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberSum.getResult(numberSum.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_sum").toString())),20.0f); + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberSum numberSum = new NumberSum(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator agg = numberSum.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberSum.add(event, agg); + } + return agg; + } private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { UDFContext udfContext = new UDFContext(); diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 0eba408..9b58289 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -9,11 +9,12 @@ import java.io.FileNotFoundException; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Paths; +import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_with_aggregation.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); @@ -21,6 +22,8 @@ public class GrootStreamExample { executeCommandArgs.setEncrypt(false); executeCommandArgs.setDecrypt(false); executeCommandArgs.setVersion(false); + executeCommandArgs.setVariables(List.of("hos.bucket.name.traffic_file=user_define_traffic_file_bucket", + "scheduler.knowledge_base.update.interval.minutes=1")); executeCommandArgs.setDeployMode(DeployMode.RUN); executeCommandArgs.setTargetType(TargetType.LOCAL); GrootStreamServer.run(executeCommandArgs.buildCommand()); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml index 517d29b..63159c5 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml @@ -46,7 +46,7 @@ sinks: kafka.compression.type: snappy kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.sasl.jaas.config: f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6 format: json log.failures.only: true @@ -64,7 +64,7 @@ sinks: kafka.compression.type: snappy kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.sasl.jaas.config: f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6 format: json log.failures.only: true @@ -72,7 +72,7 @@ application: # [object] Define job configuration env: name: example-inline-to-kafka parallelism: 3 - shade.identifier: aes + shade.identifier: sm4 pipeline: object-reuse: true topology: diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml index 00f2a7d..408fbad 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml @@ -33,6 +33,12 @@ application: parallelism: 3 pipeline: object-reuse: true + properties: + hos.bucket.name.traffic_file: local_traffic_file_bucket + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket topology: - name: inline_source downstream: [filter_operator] diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java index 0f1e3f7..14eb5fb 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java @@ -62,8 +62,9 @@ public abstract class AbstractTestContainer implements TestContainer { container, this.startModuleFullPath, GROOTSTREAM_HOME); } - protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile) + protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile, List<String> variables) throws IOException, InterruptedException { + final String confInContainerPath = ContainerUtil.copyConfigFileToContainer(container, confFile); // copy connectors ContainerUtil.copyConnectorJarToContainer( @@ -81,10 +82,27 @@ public abstract class AbstractTestContainer implements TestContainer { command.add(ContainerUtil.adaptPathForWin(confInContainerPath)); command.add("--target"); command.add("remote"); - command.addAll(getExtraStartShellCommands()); + List<String> extraStartShellCommands = new ArrayList<>(getExtraStartShellCommands()); + if (variables != null && !variables.isEmpty()) { + variables.forEach( + v -> { + extraStartShellCommands.add("-i"); + extraStartShellCommands.add(v); + }); + } + command.addAll(extraStartShellCommands); return executeCommand(container, command); } + + + protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile) + throws IOException, InterruptedException { + return executeJob(container, confFile, null); + } + + + protected Container.ExecResult savepointJob(GenericContainer<?> container, String jobId) throws IOException, InterruptedException { final List<String> command = new ArrayList<>(); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index 30e6eb3..b833115 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -127,8 +127,14 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { @Override public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException { + return executeJob(confFile, null); + } + + @Override + public Container.ExecResult executeJob(String confFile, List<String> variables) + throws IOException, InterruptedException { log.info("test in container: {}", identifier()); - return executeJob(jobManager, confFile); + return executeJob(jobManager, confFile, variables); } @Override diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java index 6e4cd1f..b3bf77a 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java @@ -5,6 +5,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.Network; import java.io.IOException; +import java.util.List; public interface TestContainer extends TestResource { Network NETWORK = Network.newNetwork(); @@ -15,6 +16,8 @@ public interface TestContainer extends TestResource { Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException; + Container.ExecResult executeJob(String confFile, List<String> variables) + throws IOException, InterruptedException; default Container.ExecResult savepointJob(String jobId) throws IOException, InterruptedException { diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml index 5520945..2eb105b 100644 --- a/groot-tests/test-common/src/test/resources/grootstream.yaml +++ b/groot-tests/test-common/src/test/resources/grootstream.yaml @@ -15,3 +15,7 @@ grootstream: hos.bucket.name.traffic_file: traffic_file_bucket hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket scheduler.knowledge_base.update.interval.minutes: 5 + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java index d4cdcbd..1c1e777 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java @@ -8,6 +8,7 @@ import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; import com.geedgenetworks.test.common.container.TestContainerId; import com.geedgenetworks.test.common.junit.DisabledOnContainer; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; import java.io.IOException; @@ -23,7 +24,7 @@ import static org.awaitility.Awaitility.await; @DisabledOnContainer( value = {TestContainerId.FLINK_1_17}, type = {}, - disabledReason = "only flink adjusts the parameter configuration rules") + disabledReason = "Only flink adjusts the parameter configuration rules") public class InlineToPrintIT extends TestSuiteBase { @TestTemplate @@ -31,7 +32,10 @@ public class InlineToPrintIT extends TestSuiteBase { CompletableFuture.supplyAsync( () -> { try { - return container.executeJob("/kafka_to_print.yaml"); + List<String> variables = List.of( + "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket", + "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket"); + return container.executeJob("/inline_to_print.yaml", variables); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -85,6 +89,17 @@ public class InlineToPrintIT extends TestSuiteBase { Assertions.assertNotNull(jobNumRestartsReference.get()); }); + + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + String logs = container.getServerLogs(); + Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10); + Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10); + }); + + + } } diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml index daf6e32..b4773a1 100644 --- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml +++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml @@ -2,7 +2,7 @@ sources: inline_source: type: inline properties: - data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' format: json json.ignore.parse.errors: false @@ -14,11 +14,21 @@ filters: processing_pipelines: projection_processor: - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: - function: DROP filter: event.server_ip == '4.4.4.4' + - function: PATH_COMBINE + lookup_fields: [ rtp_pcap_path ] + output_fields: [ rtp_pcap_path ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ] + - function: PATH_COMBINE + lookup_fields: [ http_request_body ] + output_fields: [ http_request_body ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ] sinks: print_sink: @@ -33,6 +43,12 @@ application: parallelism: 3 pipeline: object-reuse: true + properties: + hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket + hos.bucket.name.http_file: job_level_traffic_http_file_bucket + hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket + topology: - name: inline_source downstream: [filter_operator] diff --git a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java index 20caace..8b44ed7 100644 --- a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java +++ b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java @@ -122,7 +122,6 @@ public class ClickHouseIT extends TestSuiteBase implements TestResource { @TestTemplate public void testClickHouse(TestContainer container) throws Exception { assertHasData(SOURCE_TABLE); - clearTable(SOURCE_TABLE); } @TestTemplate @@ -144,7 +143,6 @@ public class ClickHouseIT extends TestSuiteBase implements TestResource { () -> { assertHasData(SINK_TABLE); compareResult(); - clearTable(SINK_TABLE); }); } diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml index ab1ba72..4592f79 100644 --- a/groot-tests/test-e2e-kafka/pom.xml +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -47,6 +47,18 @@ </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>9</source> + <target>9</target> + </configuration> + </plugin> + </plugins> + </build> </project>
\ No newline at end of file @@ -23,7 +23,7 @@ </modules> <properties> - <revision>1.5.0</revision> + <revision>1.6.0</revision> <java.version>11</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>${java.version}</maven.compiler.source> @@ -55,6 +55,7 @@ <jsonpath.version>2.4.0</jsonpath.version> <fastjson2.version>2.0.32</fastjson2.version> <hutool.version>5.8.22</hutool.version> + <bouncycastle.version>1.78.1</bouncycastle.version> <galaxy.version>2.0.2</galaxy.version> <guava-retrying.version>2.0.0</guava-retrying.version> <ipaddress.version>5.3.3</ipaddress.version> @@ -392,6 +393,12 @@ </dependency> <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk18on</artifactId> + <version>${bouncycastle.version}</version> + </dependency> + + <dependency> <groupId>com.github.seancfoley</groupId> <artifactId>ipaddress</artifactId> <version>${ipaddress.version}</version> |
