summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-09-19 10:23:32 +0000
committer窦凤虎 <[email protected]>2024-09-19 10:23:32 +0000
commitc0b9acfc3adc85abbd06207259b2515edc5c4eae (patch)
tree366ba5634e795bcd623831c5e7bda898c83777de
parent62e969df69b28a9f435c925669cf6dfe018aa74f (diff)
parent3a95fef4c663c3f28c25daeb4cc19d0219fdfd48 (diff)
Merge branch 'release/1.6.0' into 'master'v1.6.0
[Improve][bootstrap] Improve job-level user-defined variables, move the path... See merge request galaxy/platform/groot-stream!111
-rw-r--r--config/grootstream_job_example.yaml27
-rw-r--r--config/template/grootstream_job_template.yaml2
-rw-r--r--docs/connector/config-encryption-decryption.md4
-rw-r--r--docs/env-config.md23
-rw-r--r--docs/grootstream-config.md5
-rw-r--r--docs/user-guide.md4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java11
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java37
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java18
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java16
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java143
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java29
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java5
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java88
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java2
-rw-r--r--groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade3
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java108
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java74
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java74
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java9
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java4
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java12
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml5
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml57
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml97
-rw-r--r--groot-common/pom.xml7
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java3
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java7
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java18
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java6
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java19
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java191
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java71
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java60
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java59
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java37
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java126
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/Split.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java79
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java29
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java55
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java26
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java46
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java20
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java48
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java42
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java57
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java28
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split1
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java44
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java45
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java53
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java37
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java5
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml6
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml6
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java22
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java8
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java3
-rw-r--r--groot-tests/test-common/src/test/resources/grootstream.yaml4
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java19
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml20
-rw-r--r--groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java2
-rw-r--r--groot-tests/test-e2e-kafka/pom.xml12
-rw-r--r--pom.xml9
88 files changed, 2172 insertions, 349 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 4726af0..37ef114 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -11,7 +11,14 @@ filters:
type: aviator
properties:
expression: event.server_ip != '12.12.12.12'
-
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - name: projection_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: aggregate_processor
+ expression: event.decoded_as == 'DNS'
processing_pipelines:
projection_processor:
type: projection
@@ -25,8 +32,9 @@ processing_pipelines:
group_by_fields: [server_ip,server_port]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
window_timestamp_field: recv_time
- window_size: 60
+ window_size: 6
window_slide: 10 #滑动窗口步长
+ mini_batch: true
functions:
- function: NUMBER_SUM
lookup_fields: [ sent_pkts ]
@@ -63,12 +71,19 @@ application:
execution:
restart:
strategy: none
+ properties:
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
topology:
- name: inline_source
- downstream: [filter_operator]
- - name: filter_operator
- downstream: [ projection_processor ]
+ downstream: [decoded_as_split]
+ - name: decoded_as_split
+ downstream: [ projection_processor ,aggregate_processor]
- name: projection_processor
downstream: [ print_sink ]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
- name: print_sink
- downstream: [] \ No newline at end of file
+ downstream: []
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index 7cf50c8..0ca2d68 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -151,7 +151,7 @@ preprocessing_pipelines: # [object] Define Processors for preprocessing pipeline
# It will be accomplished the common processing for the event by the user-defined functions.
#
processing_pipelines: # [object] Define Processors for processing pipelines.
- projection_processor: # [object] Define projection processor name, must be unique.
+ z: # [object] Define projection processor name, must be unique.
type: projection # [string] Processor Type
remove_fields:
output_fields:
diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md
index 3146569..c2b05f6 100644
--- a/docs/connector/config-encryption-decryption.md
+++ b/docs/connector/config-encryption-decryption.md
@@ -6,14 +6,14 @@ In production environments, sensitive configuration items such as passwords are
## How to use
-Groot Stream default support base64 and AES encryption and decryption.
+Groot Stream support base64, AES and SM4 encryption and decryption.
Base64 encryption support encrypt the following parameters:
- username
- password
- auth
-AES encryption support encrypt the following parameters:
+AES/SM4 encryption support encrypt the following parameters:
- username
- password
- auth
diff --git a/docs/env-config.md b/docs/env-config.md
index 7a31494..8e22a53 100644
--- a/docs/env-config.md
+++ b/docs/env-config.md
@@ -57,10 +57,10 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s
You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/).
Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink.
-| Groot Stream | Flink |
+| Groot Stream | Flink |
|----------------------------------------|---------------------------------------------------------------|
-| execution.buffer-timeout | flink.execution.buffer-timeout |
-| pipeline.object-reuse | flink.object-reuse |
+| execution.buffer-timeout | flink.execution.buffer-timeout.interval |
+| pipeline.object-reuse | flink.pipeline.object-reuse |
| pipeline.max-parallelism | flink.pipeline.max-parallelism |
| execution.restart.strategy | flink.restart-strategy |
| execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts |
@@ -70,3 +70,20 @@ Of course, you can use groot stream parameter, here are some parameter names cor
| execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay |
| ... | ... |
+## Properties
+Job-level user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting.
+The properties can be used in the configuration file by using `props.${property_name}`. It will override the corresponding settings in the `grootstream.yaml` file for the duration of the job.
+```yaml
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ properties:
+ hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket
+ hos.bucket.name.http_file: job_level_traffic_http_file_bucket
+ hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket
+```
+
diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md
index fb902ae..9dd442f 100644
--- a/docs/grootstream-config.md
+++ b/docs/grootstream-config.md
@@ -20,7 +20,7 @@ grootstream:
```
-### Knowledge Base
+## Knowledge Base
The knowledge base is a collection of libraries that can be used in the groot-stream job's UDFs. File system type can be specified `local`, `http` or `hdfs`.
If the value is `http`, must be ` QGW Knowledge Base Repository` URL. The library will be dynamically updated according to the `scheduler.knowledge_base.update.interval.minutes` configuration.
@@ -77,3 +77,6 @@ grootstream:
- asn_builtin.mmdb
- asn_user_defined.mmdb
```
+## Properties
+Global user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting.
+The properties can be used in the configuration file by using `props.${property_name}`. \ No newline at end of file
diff --git a/docs/user-guide.md b/docs/user-guide.md
index e35616f..d52cfed 100644
--- a/docs/user-guide.md
+++ b/docs/user-guide.md
@@ -153,7 +153,7 @@ Used to define job environment configuration information. For more details, you
# Command
## Run a job by CLI
-
+Note: When submitting a job via CLI, you can use `-D` parameter to specify flink configuration. For example, `-Dexecution.buffer-timeout.interval=1000` to set the buffer timeout to 1000ms. More details can be found in the official [flink documentation](https://flink.apache.org/).
```bash
Usage: start.sh [options]
Options:
@@ -164,7 +164,7 @@ Options:
-e, --deploy-mode <deploy mode> Deploy mode, only support [run] (default: run)
--target <target> Submitted target type, support [local, remote, yarn-session, yarn-per-job]
-n, --name <name> Job name (default: groot-stream-job)
- -i, --variable <variable> User-defined parameters, eg. -i key=value (default: [])
+ -i, --variable <variable> User-defined variables, eg. -i key=value (default: [])
-h, --help Show help message
-v, --version Show version message
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
index f83183c..6ee5151 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
@@ -31,10 +31,10 @@ public abstract class CommandArgs {
description = "Whether check config")
protected boolean checkConfig = false;
-
+ // user-defined parameters
@Parameter(names = {"-i", "--variable"},
splitter = ParameterSplitter.class,
- description = "user-defined parameters , such as -i data_center=bj "
+ description = "Job level user-defined parameters, such as -i traffic_file_bucket=traffic_file_bucket, or -i scheduler.knowledge_base.update.interval.minutes=1. "
+ "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.")
protected List<String> variables = Collections.emptyList();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
index 79a27e1..c3538b0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
@@ -26,27 +26,43 @@ public class ExecuteCommand implements Command<ExecuteCommandArgs> {
@Override
public void execute() throws CommandExecuteException, ConfigCheckException {
+ // Groot Stream Global Config for all processing jobs
GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
// check config file exist
checkConfigExist(configFile);
- Config config = ConfigBuilder.of(configFile);
+ Config jobConfig = ConfigBuilder.of(configFile);
+
// if user specified job name using command line arguments, override config option
if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) {
- config = config.withValue(
+ jobConfig = jobConfig.withValue(
ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName()));
}
+ // if user specified target type using command line arguments, override config option
if(executeCommandArgs.getTargetType() != null) {
- config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
}
- JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ // if user specified variables using command line arguments, override config option
+ if(!executeCommandArgs.getVariables().isEmpty()) {
+ for (String variable : executeCommandArgs.getVariables()) {
+ String[] keyValue = variable.split("=");
+ if (keyValue.length != 2) {
+ throw new CommandExecuteException("Invalid variable format: " + variable);
+ }
+ jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, Constants.PROPERTIES, keyValue[0]),
+ ConfigValueFactory.fromAnyRef(keyValue[1]));
+ }
+ }
+
+
+ JobExecution jobExecution = new JobExecution(jobConfig, grootStreamConfig);
try {
jobExecution.execute();
} catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
+ throw new JobExecuteException("Job executed failed", e);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
index 0c00a61..61ced82 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
@@ -20,14 +20,14 @@ public class ExecuteCommandArgs extends CommandArgs {
@Parameter(names={"-e", "--deploy-mode"},
converter = DeployModeConverter.class,
- description = "deploy mode, only support [run] ")
+ description = "Job deploy mode, only support [run] ")
private DeployMode deployMode = DeployMode.RUN;
@Parameter(
names = {"--target"},
converter = TargetTypeConverter.class,
description =
- "job submitted target type, support [local, remote, yarn-session, yarn-per-job]")
+ "Job submitted target type, support [local, remote, yarn-session, yarn-per-job]")
private TargetType targetType;
@Override
@@ -55,7 +55,7 @@ public class ExecuteCommandArgs extends CommandArgs {
@Override
public String toString() {
- return "FlinkCommandArgs{"
+ return "CommandArgs{"
+ "deployMode="
+ deployMode
+ ", targetType="
@@ -74,7 +74,6 @@ public class ExecuteCommandArgs extends CommandArgs {
}
-
private void userParamsToSysEnv() {
if (!this.variables.isEmpty()) {
variables.stream()
@@ -111,7 +110,7 @@ public class ExecuteCommandArgs extends CommandArgs {
return targetType;
} else {
throw new IllegalArgumentException(
- "Groot-Stream job on flink engine submitted target type only "
+ "GrootStream job submitted target type only "
+ "support these options: [local, remote, yarn-session, yarn-per-job]");
}
}
@@ -132,7 +131,7 @@ public class ExecuteCommandArgs extends CommandArgs {
return deployMode;
} else {
throw new IllegalArgumentException(
- "Groot-Stream job on flink engine deploy mode only "
+ "GrootStream job deploy mode only "
+ "support these options: [run, run-application]");
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java
new file mode 100644
index 0000000..05d3e52
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java
@@ -0,0 +1,37 @@
+package com.geedgenetworks.bootstrap.command;
+
+import cn.hutool.crypto.KeyUtil;
+import cn.hutool.crypto.SmUtil;
+import cn.hutool.crypto.symmetric.SM4;
+import com.geedgenetworks.common.config.ConfigShade;
+
+import java.nio.charset.StandardCharsets;
+
+public class SM4ConfigShade implements ConfigShade {
+ private static final String IDENTIFIER = "sm4";
+
+ private static final String[] SENSITIVE_OPTIONS =
+ new String[] {"connection.user", "connection.password", "kafka.sasl.jaas.config","kafka.ssl.keystore.password","kafka.ssl.truststore.password","kafka.ssl.key.password"};
+
+ private static final byte[] SECURITY_KEY = KeyUtil.generateKey(SM4.ALGORITHM_NAME, ".geedgenetworks.".getBytes(StandardCharsets.UTF_8)).getEncoded();
+
+ @Override
+ public String[] sensitiveOptions() {
+ return SENSITIVE_OPTIONS;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ return SmUtil.sm4(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String decrypt(String content) {
+ return SmUtil.sm4(SECURITY_KEY).decryptStr(content, StandardCharsets.UTF_8);
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
index d2b37f6..6f33cae 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.bootstrap.enums;
public enum ProcessorType {
SOURCE("source"),
FILTER("filter"),
+ SPLIT("split"),
PREPROCESSING("preprocessing"),
PROCESSING("processing"),
POSTPROCESSING("postprocessing"),
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 64c66b6..7a55ffe 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -1,26 +1,26 @@
package com.geedgenetworks.bootstrap.execution;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.geedgenetworks.core.filter.Filter;
import com.geedgenetworks.core.processor.Processor;
-import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.split.Split;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
+import java.util.*;
import java.util.function.BiConsumer;
public abstract class AbstractExecutor<K, V>
- implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> {
+ implements Executor<DataStream<Event>, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
protected final Map<String,Filter> filterMap = new HashMap<>();
+ protected final Map<String, Split> splitMap = new HashMap<>();
protected final Map<String, Processor> processorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
@@ -30,6 +30,10 @@ public abstract class AbstractExecutor<K, V>
for (Filter filter : filters) {
this.filterMap.put(filter.type(), filter);
}
+ ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
+ for (Split split : splits) {
+ this.splitMap.put(split.type(), split);
+ }
ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
for (Processor processor : processors) {
this.processorMap.put(processor.type(), processor);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index 66c0b0f..b0a04cd 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
@@ -13,6 +14,7 @@ import com.geedgenetworks.core.processor.table.TableProcessor;
import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -27,7 +29,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
ProcessorConfig processorConfig = operatorMap.get(node.getName());
switch (processorConfig.getType()) {
@@ -45,7 +47,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
return dataStream;
}
- protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
+ protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
TableProcessor tableProcessor;
if (processorMap.containsKey(tableConfig.getType())) {
@@ -63,15 +65,15 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
tableConfig.setParallelism(node.getParallelism());
}
try {
- dataStream =
- tableProcessor.processorFunction(
- dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+
+ dataStream = tableProcessor.processorFunction(
+ dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
} catch (Exception e) {
throw new JobExecuteException("Create orderby pipeline instance failed!", e);
}
return dataStream;
}
- protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
+ protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
AggregateProcessor aggregateProcessor;
if (processorMap.containsKey(aggregateConfig.getType())) {
@@ -98,7 +100,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
return dataStream;
}
- protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
+ protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
ProjectionProcessor projectionProcessor;
if (processorMap.containsKey(projectionConfig.getType())) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 506aa11..66f0585 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.FilterConfigOptions;
@@ -14,6 +15,7 @@ import com.geedgenetworks.core.pojo.FilterConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -54,7 +56,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
FilterConfig filterConfig = operatorMap.get(node.getName());
String className = filterConfig.getType();
Filter filter;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 2eabefa..f6e19eb 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -5,15 +5,24 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.geedgenetworks.common.config.SplitConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.pojo.SplitConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
import java.io.File;
import java.net.MalformedURLException;
@@ -27,49 +36,53 @@ import java.util.stream.Stream;
public class JobExecution {
private final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
private final List<Node> nodes;
-
private final List<URL> jarPaths;
- public JobExecution(Config config, GrootStreamConfig grootStreamConfig) {
- try {
+ private final Set<String> splitSet = new HashSet<>();
+
+ public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
+ try {
jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
.resolve(GrootStreamRunner.APP_JAR_NAME).toString())
.toURI().toURL()));
} catch (MalformedURLException e) {
- throw new JobExecuteException("load groot stream bootstrap jar error.", e);
+ throw new JobExecuteException("Load GrootStream Bootstrap jar failed.", e);
}
- registerPlugin(config.getConfig(Constants.APPLICATION));
+ registerPlugin(jobConfig.getConfig(Constants.APPLICATION));
- this.sourceExecutor = new SourceExecutor(jarPaths, config);
- this.sinkExecutor = new SinkExecutor(jarPaths, config);
- this.filterExecutor = new FilterExecutor(jarPaths, config);
- this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecutor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
+ this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig);
+ this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig);
+ this.filterExecutor = new FilterExecutor(jarPaths, jobConfig);
+ this.splitExecutor = new SplitExecutor(jarPaths, jobConfig);
+ this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig);
+ this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig);
+ this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig);
this.jobRuntimeEnvironment =
- JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
+ JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig);
this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.nodes = buildJobNode(config);
+ this.nodes = buildJobNode(jobConfig);
}
private void registerPlugin(Config appConfig) {
List<Path> thirdPartyJars = new ArrayList<>();
Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
- if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
+ if (envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
thirdPartyJars = new ArrayList<>(StartBuilder
.getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS)));
}
@@ -81,7 +94,7 @@ public class JobExecution {
.map(uri -> {
try {
return uri.toURL();
- }catch (MalformedURLException e){
+ } catch (MalformedURLException e) {
throw new RuntimeException("the uri of jar illegal: " + uri, e);
}
}).collect(Collectors.toList());
@@ -93,10 +106,10 @@ public class JobExecution {
}
- private Config registerPlugin(Config config , List<URL> jars) {
- config = this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
- return this.injectJarsToConfig(
+ private Config registerPlugin(Config config, List<URL> jars) {
+ config = this.injectJarsToConfig(
+ config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
+ return this.injectJarsToConfig(
config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars);
}
@@ -121,13 +134,13 @@ public class JobExecution {
return new URL(uri);
} catch (MalformedURLException e) {
throw new RuntimeException(
- "the uri of jar illegal:" + uri, e);
+ "The uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toSet());
paths.addAll(validJars);
- config = config.withValue(
+ config = config.withValue(
path,
ConfigValueFactory.fromAnyRef(
paths.stream()
@@ -150,8 +163,9 @@ public class JobExecution {
private List<Node> buildJobNode(Config config) {
Map<String, Object> sources = Maps.newHashMap();
- Map<String, Object> sinks =Maps.newHashMap();
+ Map<String, Object> sinks = Maps.newHashMap();
Map<String, Object> filters = Maps.newHashMap();
+ Map<String, Object> splits = Maps.newHashMap();
Map<String, Object> preprocessingPipelines = Maps.newHashMap();
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
@@ -160,11 +174,14 @@ public class JobExecution {
sources = config.getConfig(Constants.SOURCES).root().unwrapped();
}
if (config.hasPath(Constants.SINKS)) {
- sinks =config.getConfig(Constants.SINKS).root().unwrapped();
+ sinks = config.getConfig(Constants.SINKS).root().unwrapped();
}
if (config.hasPath(Constants.FILTERS)) {
filters = config.getConfig(Constants.FILTERS).root().unwrapped();
}
+ if (config.hasPath(Constants.SPLITS)) {
+ splits = config.getConfig(Constants.SPLITS).root().unwrapped();
+ }
if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
}
@@ -184,13 +201,21 @@ public class JobExecution {
nodes.add(node);
});
- for(Node node : nodes) {
+ for (Node node : nodes) {
if (sources.containsKey(node.getName())) {
node.setType(ProcessorType.SOURCE);
} else if (sinks.containsKey(node.getName())) {
node.setType(ProcessorType.SINK);
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
+ } else if (splits.containsKey(node.getName())) {
+ splits.forEach((key, value) -> {
+ SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
+ for(RuleContext ruleContext:splitConfig.getRules()) {
+ splitSet.add(ruleContext.getName());
+ }
+ });
+ node.setType(ProcessorType.SPLIT);
} else if (preprocessingPipelines.containsKey(node.getName())) {
node.setType(ProcessorType.PREPROCESSING);
} else if (processingPipelines.containsKey(node.getName())) {
@@ -214,12 +239,12 @@ public class JobExecution {
List<Node> sourceNodes = nodes
.stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
- SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
+ DataStream<Event> dataStream = null;
- for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
+ for (Node sourceNode : sourceNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceNode);
for (String nodeName : sourceNode.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
}
@@ -230,7 +255,7 @@ public class JobExecution {
log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName());
jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName());
final long jobEndTime = System.currentTimeMillis();
- log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime);
+ log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime);
} catch (Exception e) {
throw new JobExecuteException("Execute job error", e);
@@ -238,34 +263,62 @@ public class JobExecution {
}
- private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) {
+ private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
- throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
+ throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = filterExecutor.execute(dataStream, node);
+ }
+ } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ dataStream = splitExecutor.execute(dataStream, node);
+
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = preprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = processingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = postprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = sinkExecutor.execute(dataStream, node);
+ }
} else {
throw new JobExecuteException("unsupported process type " + node.getType().name());
}
for (String nodeName : node.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
}
private Optional<Node> getNode(String name) {
- return nodes.stream().filter(v-> v.getName().equals(name)).findFirst();
+ return nodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 7141f5e..a4289ff 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -22,12 +22,8 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TernaryBoolean;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@@ -38,16 +34,16 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private StreamExecutionEnvironment environment;
private String jobName = Constants.DEFAULT_JOB_NAME;
- private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
+ private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
- this.initialize(config);
+ this.initialize(jobConfig);
}
- public static JobRuntimeEnvironment getInstance(Config config, GrootStreamConfig grootStreamConfig) {
+ public static JobRuntimeEnvironment getInstance(Config jobConfig, GrootStreamConfig grootStreamConfig) {
if (INSTANCE == null) {
synchronized (JobRuntimeEnvironment.class) {
if (INSTANCE == null) {
- INSTANCE = new JobRuntimeEnvironment(config, grootStreamConfig);
+ INSTANCE = new JobRuntimeEnvironment(jobConfig, grootStreamConfig);
}
}
}
@@ -73,10 +69,19 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
@Override
public RuntimeEnvironment prepare() {
- createStreamEnvironment();
+
if (envConfig.hasPath(Constants.JOB_NAME)) {
jobName = envConfig.getString(Constants.JOB_NAME);
}
+ // Job-level user-defined variables override the grootStreamConfig
+ if (envConfig.hasPath(Constants.PROPERTIES)) {
+ envConfig.getConfig(Constants.PROPERTIES).root().unwrapped().entrySet().forEach(entry -> {
+ this.grootStreamConfig.getCommonConfig().getPropertiesConfig().put(entry.getKey(), String.valueOf(entry.getValue()));
+ });
+ }
+
+ createStreamEnvironment();
+
return this;
}
@@ -84,8 +89,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
return jobName;
}
-
-
public boolean isLocalMode() {
return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
&& envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget());
@@ -126,7 +129,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
private void createStreamEnvironment() {
-
Configuration configuration = new Configuration();
EnvironmentUtil.initConfiguration(envConfig, configuration);
if (isLocalMode()) {
@@ -272,6 +274,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
+
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index 36fad61..b9555b4 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -36,7 +38,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor {
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
return super.execute(dataStream, node);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index b1e53e4..a7b9e5e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -3,10 +3,12 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -37,7 +39,8 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+
return super.execute(dataStream, node);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index d69fe8c..f6788ed 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -35,7 +37,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
return super.execute(dataStream, node);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
index bee1c0a..b177e40 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
@@ -1,7 +1,4 @@
package com.geedgenetworks.bootstrap.execution;
-
-
-import com.geedgenetworks.bootstrap.enums.TargetType;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckResult;
import com.typesafe.config.Config;
@@ -12,11 +9,10 @@ import java.util.List;
public interface RuntimeEnvironment {
RuntimeEnvironment setEnvConfig(Config envConfig);
+ //Prepare runtime environment for job execution
+ RuntimeEnvironment prepare();
Config getEnvConfig();
-
CheckResult checkConfig();
- RuntimeEnvironment prepare();
-
void registerPlugin(List<URL> pluginPaths);
default void initialize(Config config) {
this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index a0bedc9..70934b8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SinkConfigOptions;
@@ -20,6 +21,7 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -63,7 +65,7 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
SinkConfig sinkConfig = operatorMap.get(node.getName());
try {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index d4751af..9dff6b0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -67,7 +68,7 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator outputStreamOperator, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException {
SourceConfig sourceConfig = operatorMap.get(node.getName());
SingleOutputStreamOperator sourceSingleOutputStreamOperator;
try {
@@ -152,4 +153,5 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
return 0;
}
}
+
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
new file mode 100644
index 0000000..e549087
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -0,0 +1,88 @@
+package com.geedgenetworks.bootstrap.execution;
+
+import com.alibaba.fastjson.JSONObject;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.SplitConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import com.geedgenetworks.core.split.Split;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Initialize config and execute filter operator
+ */
+@Slf4j
+public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
+
+
+ public SplitExecutor(List<URL> jarPaths, Config config) {
+ super(jarPaths, config);
+ }
+
+ @Override
+ protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
+ if (operatorConfig.hasPath(Constants.SPLITS)) {
+ Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS);
+ splitsConfig.root().unwrapped().forEach((key, value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key),
+ SplitConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "split: %s, Message: %s",
+ key, result.getMsg()));
+ }
+ SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
+ splitConfig.setName(key);
+ splitConfigMap.put(key, splitConfig);
+ });
+ }
+
+ return splitConfigMap;
+ }
+
+ @Override
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+ SplitConfig splitConfig = operatorMap.get(node.getName());
+ String className = splitConfig.getType();
+ Split split;
+ if (splitMap.containsKey(splitConfig.getType())) {
+
+ split = splitMap.get(splitConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ split = (Split) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get split instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ splitConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ split.splitFunction(
+ dataStream, splitConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create split instance failed!", e);
+ }
+ return dataStream;
+ }
+
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java
index 8ab8bdc..6a106d2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java
@@ -76,7 +76,7 @@ public class GrootStreamRunner {
bootstrapCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
.map(String::trim)
- .forEach(variable -> command.add("-D" + variable));
+ .forEach(variable -> command.add("-i " + variable));
return command;
}
diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade
index 6654db5..f490f28 100644
--- a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade
+++ b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade
@@ -1,2 +1,3 @@
com.geedgenetworks.bootstrap.command.Base64ConfigShade
-com.geedgenetworks.bootstrap.command.AESConfigShade \ No newline at end of file
+com.geedgenetworks.bootstrap.command.AESConfigShade
+com.geedgenetworks.bootstrap.command.SM4ConfigShade \ No newline at end of file
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
index 8b299df..7b9544a 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
@@ -7,8 +7,10 @@ import com.geedgenetworks.bootstrap.execution.*;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.geedgenetworks.common.udf.RuleContext;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -16,7 +18,9 @@ import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValueFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
import java.io.File;
import java.net.MalformedURLException;
@@ -33,13 +37,14 @@ import java.util.stream.Stream;
public class JobExecutionTest {
protected final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor;
-
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
+ private final Set<String> splitSet = new HashSet<>();
private final List<Node> nodes;
private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
@@ -66,21 +71,22 @@ public class JobExecutionTest {
}
registerPlugin(config.getConfig(Constants.APPLICATION));
- this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config);
- this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config);
- this.filterExecuteProcessor = new FilterExecutor(jarPaths, config);
- this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config);
+ this.sourceExecutor = new SourceExecutor(jarPaths, config);
+ this.sinkExecutor = new SinkExecutor(jarPaths, config);
+ this.splitExecutor = new SplitExecutor(jarPaths, config);
+ this.filterExecutor = new FilterExecutor(jarPaths, config);
+ this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
+ this.processingExecutor = new ProcessingExecutor(jarPaths, config);
+ this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
-
- this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.nodes = buildJobNode(config);
}
@@ -173,6 +179,7 @@ public class JobExecutionTest {
Map<String, Object> sources = Maps.newHashMap();
Map<String, Object> sinks =Maps.newHashMap();
Map<String, Object> filters = Maps.newHashMap();
+ Map<String, Object> splits = Maps.newHashMap();
Map<String, Object> preprocessingPipelines = Maps.newHashMap();
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
@@ -186,6 +193,9 @@ public class JobExecutionTest {
if (config.hasPath(Constants.FILTERS)) {
filters = config.getConfig(Constants.FILTERS).root().unwrapped();
}
+ if (config.hasPath(Constants.SPLITS)) {
+ splits = config.getConfig(Constants.SPLITS).root().unwrapped();
+ }
if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
}
@@ -210,6 +220,14 @@ public class JobExecutionTest {
node.setType(ProcessorType.SOURCE);
} else if (sinks.containsKey(node.getName())) {
node.setType(ProcessorType.SINK);
+ } else if (splits.containsKey(node.getName())) {
+ splits.forEach((key, value) -> {
+ SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
+ for(RuleContext ruleContext:splitConfig.getRules()) {
+ splitSet.add(ruleContext.getName());
+ }
+ });
+ node.setType(ProcessorType.SPLIT);
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
} else if (preprocessingPipelines.containsKey(node.getName())) {
@@ -228,15 +246,15 @@ public class JobExecutionTest {
}
- public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException {
+ public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException {
List<Node> sourceNodes = nodes
.stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
- SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
+ DataStream<Event> singleOutputStreamOperator = null;
for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode);
+ singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
for (String nodeName : sourceNode.getDownstream()) {
buildJobGraph(singleOutputStreamOperator, nodeName);
}
@@ -247,27 +265,55 @@ public class JobExecutionTest {
}
- private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) {
+ private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = filterExecutor.execute(dataStream, node);
+ }
+ } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ dataStream = splitExecutor.execute(dataStream, node);
+
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = preprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = processingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (splitSet.contains(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = postprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node);
- } else {
+ if (splitSet.contains(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = sinkExecutor.execute(dataStream, node);
+ }
+ } else {
throw new JobExecuteException("unsupported process type " + node.getType().name());
}
for (String nodeName : node.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
new file mode 100644
index 0000000..2f6984b
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
@@ -0,0 +1,74 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JobSplitTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplit() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(7, CollectSink.values.size());
+ }
+
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
new file mode 100644
index 0000000..9fa81c0
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -0,0 +1,74 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+
+public class JobSplitWithAggTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplitForAgg() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+
+ Assert.assertEquals(2, CollectSink.values.size());
+ Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString());
+
+ }
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index c4f54a3..90ff95d 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -75,16 +75,10 @@ public class SimpleJobTest {
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString()));
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString()));
assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString()));
- Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("server_super_admin_area").toString());
- Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString());
- Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString());
- Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region"));
- Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
+ Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString());
- List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list");
- Assert.assertEquals("6167", asn_list.get(0));
}
@@ -121,4 +115,5 @@ public class SimpleJobTest {
}
Assert.assertEquals(4, CollectSink.values.size());
}
+
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
index dfcb459..c5806ed 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
@@ -3,9 +3,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect;
import com.geedgenetworks.common.Event;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
public class CollectSink implements SinkFunction<Event> {
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java
index c3746a4..17f56ce 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java
@@ -65,8 +65,16 @@ public class ConfigShadeTest {
Assertions.assertEquals("159c7da83d988a9ec041d10a6bfbe221bcbaed6b62d9cc1b04ff51e633ebd105", encryptPassword);
Assertions.assertEquals(decryptUsername, USERNAME);
Assertions.assertEquals(decryptPassword, PASSWORD);
- System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";"));
- System.out.println( ConfigShadeUtils.decryptOption("aes", "454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817"));
+ encryptUsername = ConfigShadeUtils.encryptOption("sm4", USERNAME);
+ decryptUsername = ConfigShadeUtils.decryptOption("sm4", encryptUsername);
+ Assertions.assertEquals("72ea74367a15cb96b0d1d42104149519", encryptUsername);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ encryptPassword = ConfigShadeUtils.encryptOption("sm4", PASSWORD);
+ decryptPassword = ConfigShadeUtils.decryptOption("sm4", encryptPassword);
+ Assertions.assertEquals("3876c7088d395bbbfa826e3648b6c9a022e7f80941c132313bde6dc8a7f2351f", encryptPassword);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+ System.out.println( ConfigShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";"));
+ System.out.println( ConfigShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6"));
System.out.println( ConfigShadeUtils.encryptOption("aes", "testuser"));
System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";"));
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index 888c94e..9724e21 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
@@ -111,7 +111,7 @@ processing_pipelines:
lookup_fields: [ packet_capture_file ]
output_fields: [ packet_capture_file ]
parameters:
- path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
+ path: [ props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file]
- function: STRING_JOINER
lookup_fields: [ server_ip,client_ip ]
output_fields: [ ip_string ]
@@ -191,6 +191,8 @@ sinks:
application: # [object] Application Configuration
env: # [object] Environment Variables
name: groot-stream-job # [string] Job Name
+ properties:
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
@@ -206,3 +208,4 @@ application: # [object] Application Configuration
- name: collect_sink
parallelism: 1
+
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
new file mode 100644
index 0000000..5163642
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -0,0 +1,57 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+ watermark_timestamp: recv_time
+ watermark_timestamp_unit: ms
+ watermark_lag: 10
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+
+postprocessing_pipelines:
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ mini_batch: true
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+ - function: MEAN
+ lookup_fields: [ pkts ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [ aggregate_processor ]
+ - name: aggregate_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
new file mode 100644
index 0000000..9bb2900
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -0,0 +1,97 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+splits:
+ test_split:
+ type: split
+ rules:
+ - name: table_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: pre_etl_processor
+ expression: event.decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split,collect_sink]
+ - name: test_split
+ parallelism: 1
+ downstream: [ table_processor,pre_etl_processor ]
+ - name: pre_etl_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: table_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index 10e9ed4..37a4d25 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -41,6 +41,13 @@
<artifactId>hutool-all</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ </dependency>
+
+
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index d13fc4b..b523591 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -12,6 +12,8 @@ public final class Constants {
public static final int SYSTEM_EXIT_CODE = 2618;
public static final String APPLICATION = "application";
+ public static final String PROPERTIES = "properties";
+ public static final String SPLITS = "splits";
public static final String APPLICATION_ENV ="env";
public static final String APPLICATION_TOPOLOGY = "topology";
public static final String JOB_NAME = "name";
@@ -23,7 +25,6 @@ public final class Constants {
" \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" +
" \n";
-
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
index 0b0379d..af94abf 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
@@ -5,8 +5,6 @@ import com.geedgenetworks.common.udf.UDFContext;
import java.util.List;
-import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP;
-
public interface AggregateConfigOptions {
Option<String> TYPE = Options.key("type")
.stringType()
@@ -42,7 +40,10 @@ public interface AggregateConfigOptions {
.intType()
.noDefaultValue()
.withDescription("The size of window.");
-
+ Option<Boolean> MINI_BATCH = Options.key("mini_batch")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("The label of pre_aggrergate.");
Option<Integer> WINDOW_SLIDE = Options.key("window_slide")
.intType()
.noDefaultValue()
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
new file mode 100644
index 0000000..a2acb71
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
@@ -0,0 +1,18 @@
+package com.geedgenetworks.common.config;
+
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.udf.RuleContext;
+import java.util.List;
+
+public interface SplitConfigOptions {
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of route .");
+
+ Option<List<RuleContext>> RULES = Options.key("rules")
+ .type(new TypeReference<List<RuleContext>>() {})
+ .noDefaultValue()
+ .withDescription("The rules to be executed.");
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
index 455073f..6f6e048 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
@@ -8,14 +8,10 @@ import java.io.Serializable;
public interface AggregateFunction extends Serializable {
void open(UDFContext udfContext);
-
Accumulator initAccumulator(Accumulator acc);
-
Accumulator add(Event val, Accumulator acc);
-
String functionName();
-
Accumulator getResult(Accumulator acc);
-
+ Accumulator merge(Accumulator a, Accumulator b);
default void close(){};
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
new file mode 100644
index 0000000..ead0ecd
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.common.udf;
+
+import com.geedgenetworks.common.Event;
+import com.googlecode.aviator.Expression;
+import lombok.Data;
+import org.apache.flink.util.OutputTag;
+
+import java.io.Serializable;
+
+@Data
+public class RuleContext implements Serializable {
+
+ private String name;
+ private String expression;
+ private Expression compiledExpression;
+ private OutputTag<Event> outputTag ;
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
index 668ba6f..d8b8bc4 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
@@ -2,13 +2,14 @@ package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.FilterConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public class AviatorFilter implements Filter {
@Override
- public SingleOutputStreamOperator<Event> filterFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
+ public DataStream<Event> filterFunction(
+ DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
throws Exception {
if (FilterConfig.getParallelism() != 0) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
index a173438..f8b50eb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
@@ -3,12 +3,13 @@ package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.FilterConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public interface Filter {
- SingleOutputStreamOperator<Event> filterFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
+ DataStream<Event> filterFunction(
+ DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
throws Exception;
String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
index d3cbaac..ebdb0bd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
@@ -1,15 +1,15 @@
package com.geedgenetworks.core.pojo;
+import com.alibaba.fastjson2.annotation.JSONField;
import com.geedgenetworks.common.udf.UDFContext;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import java.io.Serializable;
import java.util.List;
-import java.util.Map;
+
@EqualsAndHashCode(callSuper = true)
@Data
-public class AggregateConfig extends ProcessorConfig {
+public class AggregateConfig extends ProcessorConfig {
private List<String> group_by_fields;
@@ -18,5 +18,7 @@ public class AggregateConfig extends ProcessorConfig {
private Integer window_size;
private Integer window_slide;
private List<UDFContext> functions;
+ @JSONField(defaultValue = "false" )
+ private Boolean mini_batch;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
index 416a7ea..2508730 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
@@ -13,18 +13,50 @@ public class OnlineStatistics {
aggregate += (delta * (val.doubleValue() - mean));
return this;
}
+
+ public OnlineStatistics merge(OnlineStatistics other) {
+ if (other.n == 0) {
+ return this; // Nothing to merge
+ }
+ if (this.n == 0) {
+ this.n = other.n;
+ this.mean = other.mean;
+ this.aggregate = other.aggregate;
+ return this;
+ }
+
+ // Combine counts
+ long newN = this.n + other.n;
+
+ // Calculate the new mean
+ double delta = other.mean - this.mean;
+ this.mean += delta * (other.n / (double) newN);
+
+ // Update the aggregate
+ this.aggregate += other.aggregate +
+ (this.n * delta * delta) / newN;
+
+ // Update the count
+ this.n = newN;
+
+ return this;
+ }
+
//计算总体标准差
public double stddevp() {
return Math.sqrt(variancep());
}
+
//计算总体方差
public double variancep() {
return aggregate / n;
}
+
//计算样本标准差
public double stddev() {
return Math.sqrt(variance());
}
+
//计算样本方差
public double variance() {
return aggregate / (n - 1);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java
new file mode 100644
index 0000000..4381df5
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.core.pojo;
+
+import com.geedgenetworks.common.udf.RuleContext;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+@Data
+public class SplitConfig implements Serializable {
+
+ private String type;
+ private Map<String, Object> properties;
+ private int parallelism;
+ private String name;
+ private List<RuleContext> rules;
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
index 172b368..3852414 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
@@ -2,12 +2,13 @@ package com.geedgenetworks.core.processor;
import com.geedgenetworks.common.Event;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public interface Processor<T> {
- SingleOutputStreamOperator<Event> processorFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator,
+ DataStream<Event> processorFunction(
+ DataStream<Event> singleOutputStreamOperator,
T processorConfig, ExecutionConfig config)
throws Exception;
String type();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
new file mode 100644
index 0000000..3632ba7
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -0,0 +1,191 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+
+import cn.hutool.crypto.SecureUtil;
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import com.geedgenetworks.core.processor.projection.UdfEntity;
+import com.google.common.collect.Lists;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
+import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+
+@Slf4j
+public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator> {
+
+ private final long windowSize;
+ private Long staggerOffset = null;
+
+ protected final Map<Long, Map<String, Accumulator>> windows = new HashMap<>();
+ protected List<String> groupByFields;
+ private LinkedList<UdfEntity> functions;
+
+ protected InternalMetrics internalMetrics;
+ private final AggregateConfig aggregateConfig;
+
+ public AbstractFirstAggregation(AggregateConfig aggregateConfig, long windowSize) {
+ this.windowSize = windowSize;
+ this.aggregateConfig = aggregateConfig;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ functions = Lists.newLinkedList();
+ try {
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
+ List<UDFContext> udfContexts = aggregateConfig.getFunctions();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ return;
+ }
+ Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+
+ udfContexts = aggregateConfig.getFunctions();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ throw new RuntimeException();
+ }
+ groupByFields = aggregateConfig.getGroup_by_fields();
+ functions = Lists.newLinkedList();
+ Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
+
+ for (UDFContext udfContext : udfContexts) {
+ Expression filterExpression = null;
+ UdfEntity udfEntity = new UdfEntity();
+ // 平台注册的函数包含任务中配置的函数则对函数进行实例化
+ if (udfClassReflect.containsKey(udfContext.getFunction())) {
+ Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
+ AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
+ // 函数如果包含filter,对表达式进行编译
+ if (udfContext.getFilter() != null) {
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ filterExpression = instance.compile(udfContext.getFilter(), true);
+ }
+ udfEntity.setAggregateFunction(aggregateFunction);
+ udfEntity.setFilterExpression(filterExpression);
+ udfEntity.setName(udfContext.getFunction());
+ udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ udfEntity.setUdfContext(udfContext);
+ functions.add(udfEntity);
+ } else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Unsupported UDAF: " + udfContext.getFunction());
+ }
+
+ }
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
+ }
+
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
+
+ }
+ }
+
+ @Override
+ public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception {
+
+ }
+
+ public void onTimer(long timestamp, Collector<Accumulator> out) throws Exception {
+ Map<String, Accumulator> accumulator = windows.remove(timestamp);
+ for (Accumulator value : accumulator.values()) {
+ value = getResult(value);
+ out.collect(value);
+ internalMetrics.incrementOutEvents();
+ }
+ accumulator.clear();
+ }
+
+ private long assignWindowStart(long timestamp, long offset) {
+ return timestamp - (timestamp - offset + windowSize) % windowSize;
+ }
+
+ protected long assignWindowEnd(long timestamp) {
+ if (staggerOffset == null) {
+ staggerOffset = getStaggerOffset();
+ }
+ return assignWindowStart(timestamp, (staggerOffset % windowSize)) + windowSize;
+ }
+
+ private long getStaggerOffset() {
+ return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize);
+ }
+
+ public Accumulator createAccumulator() {
+
+ Map<String, Object> map = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+
+ accumulator.setMetricsFields(map);
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().initAccumulator(accumulator);
+ }
+ return accumulator;
+
+ }
+
+ public String getKey(Event value, List<String> keys) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String key : keys) {
+
+ if (value.getExtractedFields().containsKey(key)) {
+ stringBuilder.append(value.getExtractedFields().get(key).toString());
+ } else {
+ stringBuilder.append(",");
+ }
+ }
+ return SecureUtil.md5(stringBuilder.toString());
+
+ }
+
+ public Accumulator add(Event event, Accumulator accumulator) {
+ accumulator.setInEvents(accumulator.getInEvents() + 1);
+ for (UdfEntity udafEntity : functions) {
+ try {
+ boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true;
+ if (result) {
+ udafEntity.getAggregateFunction().add(event, accumulator);
+ }
+ } catch (ExpressionRuntimeException ignore) {
+ log.error("Function " + udafEntity.getName() + " Invalid filter ! ");
+ accumulator.setErrorCount(accumulator.getErrorCount() + 1);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " execute exception !", e);
+ accumulator.setErrorCount(accumulator.getErrorCount() + 1);
+ }
+ }
+ return accumulator;
+ }
+
+ public Accumulator getResult(Accumulator accumulator) {
+ return accumulator;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 803fefc..4f9535d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> {
private final List<UDFContext> udfContexts;
private final List<String> udfClassNameLists;
- private final List<String> groupByFields;
- private LinkedList<UdfEntity> functions;
+ private final LinkedList<UdfEntity> functions;
public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
@@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
}
- groupByFields = aggregateConfig.getGroup_by_fields();
functions = Lists.newLinkedList();
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index bc87c32..c261fb6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -5,49 +5,96 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.AggregateConfig;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+
import static com.geedgenetworks.common.Constants.*;
-public class AggregateProcessorImpl implements AggregateProcessor {
+public class AggregateProcessorImpl implements AggregateProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
- if (aggregateConfig.getParallelism() != 0) {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
+
+ SingleOutputStreamOperator<Event> singleOutputStreamOperator;
+ if (aggregateConfig.getMini_batch()) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
+
}
- }else {
+ } else {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}
+ if (aggregateConfig.getParallelism() != 0) {
+ singleOutputStreamOperator.setParallelism(aggregateConfig.getParallelism());
+ }
+ return singleOutputStreamOperator.name(aggregateConfig.getName());
+
}
@Override
public String type() {
return "aggregate";
}
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
new file mode 100644
index 0000000..156c0ed
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
@@ -0,0 +1,60 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+@Slf4j
+public class FirstAggregationEventTime extends AbstractFirstAggregation {
+
+ private final PriorityQueue<Long> eventTimeTimersQueue = new PriorityQueue<>();
+
+ public FirstAggregationEventTime(AggregateConfig aggregateConfig, long windowSize) {
+ super(aggregateConfig, windowSize);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ }
+
+ @Override
+ public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception {
+ Long timestamp;
+ internalMetrics.incrementInEvents();
+ try {
+ String key = getKey(value, groupByFields);
+ while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) {
+ eventTimeTimersQueue.poll();
+ onTimer(timestamp, out);
+ }
+ long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark());
+ if (!windows.containsKey(windowEnd)) {
+ Map<String, Accumulator> map = new HashMap<>();
+ map.put(key, createAccumulator());
+ windows.put(windowEnd, map);
+ eventTimeTimersQueue.add(windowEnd);
+ } else {
+ if (!windows.get(windowEnd).containsKey(key)) {
+ windows.get(windowEnd).put(key, createAccumulator());
+ }
+ }
+ add(value, windows.get(windowEnd).get(key));
+ } catch (Exception e) {
+ log.error("Error in pre-aggregate processElement", e);
+ internalMetrics.incrementErrorEvents();
+ }
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
new file mode 100644
index 0000000..e98daa5
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
@@ -0,0 +1,59 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+@Slf4j
+public class FirstAggregationProcessingTime extends AbstractFirstAggregation {
+
+ private final PriorityQueue<Long> processingTimeTimersQueue = new PriorityQueue<>();
+
+ public FirstAggregationProcessingTime(AggregateConfig aggregateConfig, long windowSize) {
+ super(aggregateConfig, windowSize);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception {
+ Long timestamp;
+ internalMetrics.incrementInEvents();
+ try {
+ String key = getKey(value, groupByFields);
+ while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) {
+ processingTimeTimersQueue.poll();
+ onTimer(timestamp, out);
+ }
+ long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime());
+ if (!windows.containsKey(windowEnd)) {
+ Map<String, Accumulator> map = new HashMap<>();
+ map.put(key, createAccumulator());
+ windows.put(windowEnd, map);
+ processingTimeTimersQueue.add(windowEnd);
+ } else {
+ if (!windows.get(windowEnd).containsKey(key)) {
+ windows.get(windowEnd).put(key, createAccumulator());
+ }
+ }
+ add(value, windows.get(windowEnd).get(key));
+ } catch (Exception e) {
+ log.error("Error in pre-aggregate processElement", e);
+ internalMetrics.incrementErrorEvents();
+ }
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
index 165ed1b..da09690 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.processor.aggregate;
+import cn.hutool.crypto.SecureUtil;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.KeybyEntity;
@@ -20,16 +21,17 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec
KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
- for(String key: keys){
+ for (String key : keys) {
- if(value.getExtractedFields().containsKey(key)){
- keybyEntity.getKeys().put(key,value.getExtractedFields().get(key));
+ if (value.getExtractedFields().containsKey(key)) {
+ keybyEntity.getKeys().put(key, value.getExtractedFields().get(key));
stringBuilder.append(value.getExtractedFields().get(key).toString());
- }else {
+ } else {
stringBuilder.append(",");
}
}
- keybyEntity.setKeysToString(stringBuilder.toString());
- return keybyEntity;
+ String hashedKey = SecureUtil.md5(stringBuilder.toString());
+ keybyEntity.setKeysToString(hashedKey);
+ return keybyEntity;
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
new file mode 100644
index 0000000..6e43184
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
@@ -0,0 +1,37 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+import cn.hutool.crypto.SecureUtil;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.KeybyEntity;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class PreKeySelector implements org.apache.flink.api.java.functions.KeySelector<Accumulator, KeybyEntity> {
+
+
+ private final List<String> keys;
+
+ public PreKeySelector(List<String> keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public KeybyEntity getKey(Accumulator value) throws Exception {
+
+ KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String key : keys) {
+
+ if (value.getMetricsFields().containsKey(key)) {
+ keybyEntity.getKeys().put(key, value.getMetricsFields().get(key));
+ stringBuilder.append(value.getMetricsFields().get(key).toString());
+ } else {
+ stringBuilder.append(",");
+ }
+ }
+ String hashedKey = SecureUtil.md5(stringBuilder.toString());
+ keybyEntity.setKeysToString(hashedKey);
+ return keybyEntity;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
new file mode 100644
index 0000000..68fa53e
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -0,0 +1,126 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import com.geedgenetworks.core.processor.projection.UdfEntity;
+import com.google.common.collect.Lists;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.ExecutionConfig;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
+import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+
+@Slf4j
+public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> {
+ private final List<UDFContext> udfContexts;
+ private final List<String> udfClassNameLists;
+ private final LinkedList<UdfEntity> functions;
+
+ public SecondAggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
+ udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+ udfContexts = aggregateConfig.getFunctions();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ throw new RuntimeException();
+ }
+ functions = Lists.newLinkedList();
+ Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
+ try {
+ for (UDFContext udfContext : udfContexts) {
+ Expression filterExpression = null;
+ UdfEntity udfEntity = new UdfEntity();
+ // 平台注册的函数包含任务中配置的函数则对函数进行实例化
+ if (udfClassReflect.containsKey(udfContext.getFunction())) {
+ Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
+ AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
+ // 函数如果包含filter,对表达式进行编译
+ if (udfContext.getFilter() != null) {
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ filterExpression = instance.compile(udfContext.getFilter(), true);
+ }
+ udfEntity.setAggregateFunction(aggregateFunction);
+ udfEntity.setFilterExpression(filterExpression);
+ udfEntity.setName(udfContext.getFunction());
+ udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ udfEntity.setUdfContext(udfContext);
+ functions.add(udfEntity);
+ } else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Unsupported UDAF: " + udfContext.getFunction());
+ }
+
+ }
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
+ }
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
+
+ }
+ }
+
+ @Override
+ public Accumulator createAccumulator() {
+ Map<String, Object> map = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(map);
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().initAccumulator(accumulator);
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Accumulator add(Accumulator event, Accumulator accumulator) {
+ return merge(event, accumulator);
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator accumulator) {
+ for (UdfEntity udafEntity : functions) {
+ try {
+ udafEntity.getAggregateFunction().getResult(accumulator);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " getResult exception !", e);
+ }
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator acc1, Accumulator acc2) {
+ acc1.setInEvents(acc1.getInEvents() + 1);
+ for (UdfEntity udafEntity : functions) {
+ try {
+ udafEntity.getAggregateFunction().merge(acc1, acc2);
+ } catch (ExpressionRuntimeException ignore) {
+ log.error("Function " + udafEntity.getName() + " Invalid filter ! ");
+ acc1.setErrorCount(acc1.getErrorCount() + 1);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " execute exception !", e);
+ acc1.setErrorCount(acc1.getErrorCount() + 1);
+ }
+ }
+ return acc1;
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
index 6b46a7b..d87e7e2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
@@ -4,25 +4,26 @@ import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
public class ProjectionProcessorImpl implements ProjectionProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception {
if (projectionConfig.getParallelism() != 0) {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.process(new ProjectionProcessFunction(projectionConfig))
.setParallelism(projectionConfig.getParallelism())
.name(projectionConfig.getName());
} else {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.process(new ProjectionProcessFunction(projectionConfig))
.name(projectionConfig.getName());
}
}
-
@Override
public String type() {
return "projection";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
index f36f8db..6ddc616 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
@@ -2,25 +2,25 @@ package com.geedgenetworks.core.processor.table;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.TableConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+import java.util.Map;
-import java.time.Duration;
public class TableProcessorImpl implements TableProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception {
-
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) throws Exception {
if (tableConfig.getParallelism() != 0) {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.flatMap(new TableProcessorFunction(tableConfig))
.setParallelism(tableConfig.getParallelism())
.name(tableConfig.getName());
} else {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.flatMap(new TableProcessorFunction(tableConfig))
.name(tableConfig.getName());
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java
new file mode 100644
index 0000000..37e7b44
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.util.Set;
+
+public interface Split {
+
+ DataStream<Event> splitFunction(
+ DataStream<Event> dataStream, SplitConfig splitConfig)
+ throws Exception;
+ String type();
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
new file mode 100644
index 0000000..f07b568
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
@@ -0,0 +1,79 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+
+@Slf4j
+public class SplitFunction extends ProcessFunction<Event, Event> {
+ private final SplitConfig splitConfig;
+ private List<RuleContext> rules;
+ private transient InternalMetrics internalMetrics;
+
+ public SplitFunction(SplitConfig splitConfig) {
+ this.splitConfig = splitConfig;
+
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
+ this.rules = splitConfig.getRules();
+ for(RuleContext rule : rules){
+ String expression = rule.getExpression();
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ Expression compiledExp = instance.compile(expression, true);
+ rule.setCompiledExpression(compiledExp);
+ OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){};
+ rule.setOutputTag(outputTag);
+ }
+ }
+
+
+ @Override
+ public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
+ try {
+ internalMetrics.incrementInEvents();
+ for (RuleContext route : rules){
+ boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true;
+ if (result) {
+ ctx.output(route.getOutputTag(), event);
+ }
+ }
+ }catch (Exception e) {
+ internalMetrics.incrementErrorEvents();
+ log.error("error in split function", e);
+ }
+ }
+
+ public static Boolean filterExecute(Expression expression, Map<String, Object> map) {
+
+ boolean result;
+ Object object = expression.execute(map);
+ if (object != null) {
+ result = (Boolean) object;
+ } else {
+ throw new ExpressionRuntimeException();
+ }
+ return result;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java
new file mode 100644
index 0000000..f6d2c8c
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java
@@ -0,0 +1,29 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public class SplitOperator implements Split {
+
+ @Override
+ public DataStream<Event> splitFunction(
+ DataStream<Event> dataStream, SplitConfig splitConfig)
+ throws Exception {
+ if (splitConfig.getParallelism() != 0) {
+ return dataStream
+ .process(new SplitFunction(splitConfig))
+ .setParallelism(splitConfig.getParallelism())
+ .name(splitConfig.getName());
+ } else {
+ return dataStream
+ .process(new SplitFunction(splitConfig))
+ .name(splitConfig.getName());
+ }
+ }
+ @Override
+ public String type() {
+ return "split";
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index 423eff9..3921ee2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -22,9 +22,9 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.processor.projection.UdfEntity;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* Collects elements within a group and returns the list of aggregated objects
@@ -36,18 +36,18 @@ public class CollectList implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new ArrayList<>());
@@ -56,7 +56,7 @@ public class CollectList implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
Object object = event.getExtractedFields().get(lookupField);
List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField);
aggregate.add(object);
@@ -75,4 +75,17 @@ public class CollectList implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ List<Object> firstValue = (List<Object>) firstAcc.getMetricsFields().get(outputField);
+ List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstValue.addAll(secondValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index b4dfb14..9ec9b09 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -8,7 +8,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@@ -23,17 +22,17 @@ public class CollectSet implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new HashSet<>());
@@ -42,7 +41,7 @@ public class CollectSet implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
Object object = event.getExtractedFields().get(lookupField);
Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField);
aggregate.add(object);
@@ -61,5 +60,16 @@ public class CollectSet implements AggregateFunction {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Set<Object> firstValue = (Set<Object>) firstAcc.getMetricsFields().get(outputField);
+ Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstValue.addAll(secondValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
index 6301a01..a1a35be 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -23,8 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
-
/**
* Collects elements within a group and returns the list of aggregated objects
*/
@@ -36,14 +34,13 @@ public class FirstValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
@@ -55,7 +52,7 @@ public class FirstValue implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){
+ if (!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)) {
acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
}
return acc;
@@ -71,4 +68,11 @@ public class FirstValue implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
index 1648fa5..a099fde 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
@@ -59,6 +59,26 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ rst = ((Histogramer)agg).merge(((Histogramer) aggOther));
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected void updateHdr(Accumulator acc, Object value) {
Map<String, Object> aggs = acc.getMetricsFields();
ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
index f27a2e6..44b374e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -23,9 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Collects elements within a group and returns the list of aggregated objects
*/
@@ -37,17 +34,17 @@ public class LastValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
return acc;
@@ -55,7 +52,7 @@ public class LastValue implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
}
return acc;
@@ -71,4 +68,11 @@ public class LastValue implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
index ea33271..05de38c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
@@ -13,20 +13,22 @@ public class LongCount implements AggregateFunction {
@Override
- public void open(UDFContext udfContext){
- if(udfContext.getOutput_fields()==null ){
+ public void open(UDFContext udfContext) {
+ if (udfContext.getOutput_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
outputField = udfContext.getOutput_fields().get(0);
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
+
@Override
public Accumulator add(Event event, Accumulator acc) {
- acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L);
+ acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long) v + 1L);
return acc;
}
@@ -40,5 +42,18 @@ public class LongCount implements AggregateFunction {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+
+ long firstValue = (long) firstAcc.getMetricsFields().get(outputField);
+ long secondValue = (long) secondAcc.getMetricsFields().get(outputField);
+ firstValue = firstValue + secondValue;
+ firstAcc.getMetricsFields().put(outputField, firstValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
index 2a615ef..9c4e070 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
@@ -16,20 +16,20 @@ public class Mean implements AggregateFunction {
private String outputField;
private Integer precision;
private DecimalFormat df;
+
@Override
- public void open(UDFContext udfContext){
+ public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
- if(udfContext.getParameters()!= null && !udfContext.getParameters().isEmpty()) {
+ if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) {
precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString());
if (precision > 0) {
StringBuilder pattern = new StringBuilder("#.");
@@ -38,14 +38,15 @@ public class Mean implements AggregateFunction {
}
df = new DecimalFormat(pattern.toString());
}
- }else {
+ } else {
precision = -1;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
- acc.getMetricsFields().put(outputField,new OnlineStatistics());
+ acc.getMetricsFields().put(outputField, new OnlineStatistics());
return acc;
}
@@ -67,16 +68,27 @@ public class Mean implements AggregateFunction {
@Override
public Accumulator getResult(Accumulator acc) {
OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField);
- if(precision<0){
+ if (precision < 0) {
acc.getMetricsFields().put(outputField, aggregate.mean());
- }
- else if(precision>0){
+ } else if (precision > 0) {
acc.getMetricsFields().put(outputField, df.format(aggregate.mean()));
- }
- else {
- acc.getMetricsFields().put(outputField,(long)aggregate.mean());
+ } else {
+ acc.getMetricsFields().put(outputField, (long) aggregate.mean());
}
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ OnlineStatistics acc1 = (OnlineStatistics) firstAcc.getMetricsFields().get(outputField);
+ acc1.merge((OnlineStatistics) secondAcc.getMetricsFields().get(outputField));
+ long inEvents = firstAcc.getInEvents() + (secondAcc.getInEvents());
+ long outEvent = firstAcc.getOutEvents() + (secondAcc.getOutEvents());
+ long error = firstAcc.getErrorCount() + (secondAcc.getErrorCount());
+ firstAcc.setInEvents(inEvents);
+ firstAcc.setErrorCount(error);
+ firstAcc.setOutEvents(outEvent);
+ return firstAcc;
+ }
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
index 01e9a5b..e972133 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -6,7 +6,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.pojo.OnlineStatistics;
public class NumberSum implements AggregateFunction {
@@ -15,15 +14,14 @@ public class NumberSum implements AggregateFunction {
@Override
- public void open(UDFContext udfContext){
- if(udfContext.getLookup_fields()==null ){
+ public void open(UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
@@ -32,22 +30,23 @@ public class NumberSum implements AggregateFunction {
public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
+
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
- Number val = (Number) event.getExtractedFields().get(lookupField);
- Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L);
- if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) {
- aggregate = aggregate.longValue() + val.longValue();
- } else if (aggregate instanceof Float || val instanceof Float) {
- aggregate = aggregate.floatValue() + val.floatValue();
- } else if (aggregate instanceof Double || val instanceof Double) {
- aggregate = aggregate.doubleValue() + val.doubleValue();
- }
- acc.getMetricsFields().put(outputField, aggregate);
+ if (event.getExtractedFields().containsKey(lookupField)) {
+ Number val = (Number) event.getExtractedFields().get(lookupField);
+ Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L);
+ if (aggregate instanceof Long && (val instanceof Integer || val instanceof Long)) {
+ aggregate = aggregate.longValue() + val.longValue();
+ } else if (aggregate instanceof Float || val instanceof Float) {
+ aggregate = aggregate.floatValue() + val.floatValue();
+ } else if (aggregate instanceof Double || val instanceof Double) {
+ aggregate = aggregate.doubleValue() + val.doubleValue();
}
- return acc;
+ acc.getMetricsFields().put(outputField, aggregate);
+ }
+ return acc;
}
@Override
@@ -65,4 +64,24 @@ public class NumberSum implements AggregateFunction {
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+
+ Number firstValue = (Number) firstAcc.getMetricsFields().get(outputField);
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ if (firstValue instanceof Long && (secondValue instanceof Integer || secondValue instanceof Long)) {
+ firstValue = firstValue.longValue() + secondValue.longValue();
+ } else if (firstValue instanceof Float || secondValue instanceof Float) {
+ firstValue = firstValue.floatValue() + secondValue.floatValue();
+ } else if (firstValue instanceof Double || secondValue instanceof Double) {
+ firstValue = firstValue.doubleValue() + secondValue.doubleValue();
+ }
+ firstAcc.getMetricsFields().put(outputField, firstValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
index 71d61dc..0802c22 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
@@ -51,6 +51,34 @@ public abstract class HlldBaseAggregate implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ if(inputSketch){
+ ((HllUnion)agg).update(((HllUnion) aggOther).getResult());
+ rst = agg;
+ }else{
+ final HllUnion union = new HllUnion(precision);
+ union.update((Hll) agg);
+ union.update((Hll) aggOther);
+ rst = union.getResult();
+ }
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected Hll getResultHll(Accumulator acc){
Object agg = acc.getMetricsFields().get(outputField);
if (agg == null) {
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split
new file mode 100644
index 0000000..500c367
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split
@@ -0,0 +1 @@
+com.geedgenetworks.core.split.SplitOperator \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
index b0d846b..2bf13a5 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
@@ -36,11 +36,51 @@ public class CollectListTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
- private static void excute(List<String> arr) throws ParseException {
+ private void testMerge(List<String> arr,List<String> arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectList collectList = new CollectList();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectList.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = collectList.getResult(collectList.merge(result1,result2));
+ List<String> vals = (List<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),8);
+ assertEquals("192.168.1.6",vals.get(5).toString());
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ CollectList collectList = new CollectList();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectList.open(udfContext);
+ Accumulator agg = collectList.initAccumulator(accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectList.add(event, agg);
+
+ }
+ return agg;
+ }
+ private void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
index ea4fe8d..8e992f6 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
@@ -31,14 +31,53 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class CollectSetTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
- private static void excute(List<String> arr) throws ParseException {
+ private void testMerge(List<String> arr,List<String> arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectSet collectSet = new CollectSet();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectSet.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = collectSet.getResult(collectSet.merge(result1,result2));
+ Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),6);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ CollectSet collectSet = new CollectSet();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectSet.open(udfContext);
+ Accumulator agg = collectSet.initAccumulator(accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectSet.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
index 506f6de..43a9732 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
@@ -34,11 +34,48 @@ public class FirstValueTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
+ private void testMerge(List<String> arr,List<String> arr2) {
- private static void excute(List<String> arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_first"));
+ FirstValue firstValue = new FirstValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ firstValue.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = firstValue.getResult(firstValue.merge(result1,result2));
+ String val = (String) result.getMetricsFields().get("field_first");
+ assertEquals(val,"192.168.1.1");
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ FirstValue firstValue = new FirstValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ firstValue.open(udfContext);
+ Accumulator agg = firstValue.initAccumulator(accumulator);
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = firstValue.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
index f8306cd..e952908 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
@@ -37,11 +37,48 @@ public class LastValueTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.3");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
+ private void testMerge(List<String> arr,List<String> arr2) {
- private static void excute(List<String> arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_last"));
+ LastValue lastValue = new LastValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ lastValue.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = lastValue.getResult(lastValue.merge(result1,result2));
+ String val = (String) result.getMetricsFields().get("field_last");
+ assertEquals(val,"192.168.1.3");
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ LastValue lastValue = new LastValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ lastValue.open(udfContext);
+ Accumulator agg = lastValue.initAccumulator(accumulator);
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = lastValue.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
index 3c02499..c1dfb9e 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
@@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.LastValue;
import com.geedgenetworks.core.udf.udaf.LongCount;
import com.geedgenetworks.core.udf.udaf.NumberSum;
import com.ibm.icu.text.NumberFormat;
@@ -38,10 +39,46 @@ public class LongCountTest {
public void test() throws ParseException {
Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
- excute(longArr);
+ Long[] longArr2 = new Long[]{1L, 2L, 3L, 4L};
+ testMerge(longArr,longArr2);
+ testGetResult(longArr);
}
+ private void testMerge(Number[] arr,Number[] arr2) {
- private static void excute(Number[] arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("count"));
+ LongCount longCount = new LongCount();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ longCount.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = longCount.getResult(longCount.merge(result1,result2));
+ assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),8);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ LongCount longCount = new LongCount();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ longCount.open(udfContext);
+ Accumulator agg = longCount.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = longCount.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setOutput_fields(Collections.singletonList("count"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
index 6deed0f..cc4eaf0 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
@@ -39,12 +39,52 @@ public class MeanTest {
Integer[] intArr1 = new Integer[]{1, 2, 3, 4};
Integer[] intArr2 = new Integer[]{1, 6, 3};
- excute(intArr1, 0);
- excute2(intArr2, 2);
- excute3(intArr1);
+ testInt(intArr1, 0);
+ testDouble(intArr2, 2);
+ testNoPrecision(intArr1);
+ testMerge(intArr1,intArr2,2);
+ }
+ private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ mean.open(udfContext);
+ Accumulator result1 = getMiddleResult(arr1,precision);
+ Accumulator result2 = getMiddleResult(arr2,precision);
+ Accumulator result = mean.getResult(mean.merge(result1,result2));
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2.86"));
+ }
+ private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ mean.open(udfContext);
+ Accumulator agg = mean.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = mean.add(event, agg);
+
+ }
+ return agg;
}
- private static void excute(Number[] arr,int precision) throws ParseException {
+
+ private void testInt(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
@@ -65,11 +105,12 @@ public class MeanTest {
agg = mean.add(event, agg);
}
+
Accumulator result = mean.getResult(agg);
assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2"));
}
- private static void excute2(Number[] arr,int precision) throws ParseException {
+ private void testDouble(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
@@ -94,7 +135,7 @@ public class MeanTest {
assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33"));
}
- private static void excute3(Number[] arr) throws ParseException {
+ private void testNoPrecision(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
index d0d3d2c..a4072ca 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
@@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.LongCount;
import com.geedgenetworks.core.udf.udaf.NumberSum;
import com.ibm.icu.text.NumberFormat;
import org.junit.jupiter.api.Test;
@@ -41,8 +42,44 @@ public class NumberSumTest {
excute(doubleArr, Double.class);
excute(intArr, Long.class);
excute(longArr, Long.class);
+ testMerge(intArr,floatArr);
+
+ }
+ private void testMerge(Number[] arr,Number[] arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_sum"));
+ NumberSum numberSum = new NumberSum();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberSum.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = numberSum.getResult(numberSum.merge(result1,result2));
+ assertEquals(Float.parseFloat((result.getMetricsFields().get("field_sum").toString())),20.0f);
+
}
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ NumberSum numberSum = new NumberSum();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberSum.open(udfContext);
+ Accumulator agg = numberSum.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberSum.add(event, agg);
+ }
+ return agg;
+ }
private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
UDFContext udfContext = new UDFContext();
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 0eba408..9b58289 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -9,11 +9,12 @@ import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
+import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_with_aggregation.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
@@ -21,6 +22,8 @@ public class GrootStreamExample {
executeCommandArgs.setEncrypt(false);
executeCommandArgs.setDecrypt(false);
executeCommandArgs.setVersion(false);
+ executeCommandArgs.setVariables(List.of("hos.bucket.name.traffic_file=user_define_traffic_file_bucket",
+ "scheduler.knowledge_base.update.interval.minutes=1"));
executeCommandArgs.setDeployMode(DeployMode.RUN);
executeCommandArgs.setTargetType(TargetType.LOCAL);
GrootStreamServer.run(executeCommandArgs.buildCommand());
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
index 517d29b..63159c5 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
@@ -46,7 +46,7 @@ sinks:
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
- kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252
+ kafka.sasl.jaas.config: f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6
format: json
log.failures.only: true
@@ -64,7 +64,7 @@ sinks:
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
- kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
+ kafka.sasl.jaas.config: f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6
format: json
log.failures.only: true
@@ -72,7 +72,7 @@ application: # [object] Define job configuration
env:
name: example-inline-to-kafka
parallelism: 3
- shade.identifier: aes
+ shade.identifier: sm4
pipeline:
object-reuse: true
topology:
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
index 00f2a7d..408fbad 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
@@ -33,6 +33,12 @@ application:
parallelism: 3
pipeline:
object-reuse: true
+ properties:
+ hos.bucket.name.traffic_file: local_traffic_file_bucket
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
topology:
- name: inline_source
downstream: [filter_operator]
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
index 0f1e3f7..14eb5fb 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
@@ -62,8 +62,9 @@ public abstract class AbstractTestContainer implements TestContainer {
container, this.startModuleFullPath, GROOTSTREAM_HOME);
}
- protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile)
+ protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile, List<String> variables)
throws IOException, InterruptedException {
+
final String confInContainerPath = ContainerUtil.copyConfigFileToContainer(container, confFile);
// copy connectors
ContainerUtil.copyConnectorJarToContainer(
@@ -81,10 +82,27 @@ public abstract class AbstractTestContainer implements TestContainer {
command.add(ContainerUtil.adaptPathForWin(confInContainerPath));
command.add("--target");
command.add("remote");
- command.addAll(getExtraStartShellCommands());
+ List<String> extraStartShellCommands = new ArrayList<>(getExtraStartShellCommands());
+ if (variables != null && !variables.isEmpty()) {
+ variables.forEach(
+ v -> {
+ extraStartShellCommands.add("-i");
+ extraStartShellCommands.add(v);
+ });
+ }
+ command.addAll(extraStartShellCommands);
return executeCommand(container, command);
}
+
+
+ protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile)
+ throws IOException, InterruptedException {
+ return executeJob(container, confFile, null);
+ }
+
+
+
protected Container.ExecResult savepointJob(GenericContainer<?> container, String jobId)
throws IOException, InterruptedException {
final List<String> command = new ArrayList<>();
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index 30e6eb3..b833115 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -127,8 +127,14 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
+ return executeJob(confFile, null);
+ }
+
+ @Override
+ public Container.ExecResult executeJob(String confFile, List<String> variables)
+ throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(jobManager, confFile);
+ return executeJob(jobManager, confFile, variables);
}
@Override
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java
index 6e4cd1f..b3bf77a 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java
@@ -5,6 +5,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.Network;
import java.io.IOException;
+import java.util.List;
public interface TestContainer extends TestResource {
Network NETWORK = Network.newNetwork();
@@ -15,6 +16,8 @@ public interface TestContainer extends TestResource {
Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException;
+ Container.ExecResult executeJob(String confFile, List<String> variables)
+ throws IOException, InterruptedException;
default Container.ExecResult savepointJob(String jobId)
throws IOException, InterruptedException {
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml
index 5520945..2eb105b 100644
--- a/groot-tests/test-common/src/test/resources/grootstream.yaml
+++ b/groot-tests/test-common/src/test/resources/grootstream.yaml
@@ -15,3 +15,7 @@ grootstream:
hos.bucket.name.traffic_file: traffic_file_bucket
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
index d4cdcbd..1c1e777 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
@@ -8,6 +8,7 @@ import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
import com.geedgenetworks.test.common.container.TestContainerId;
import com.geedgenetworks.test.common.junit.DisabledOnContainer;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import java.io.IOException;
@@ -23,7 +24,7 @@ import static org.awaitility.Awaitility.await;
@DisabledOnContainer(
value = {TestContainerId.FLINK_1_17},
type = {},
- disabledReason = "only flink adjusts the parameter configuration rules")
+ disabledReason = "Only flink adjusts the parameter configuration rules")
public class InlineToPrintIT extends TestSuiteBase {
@TestTemplate
@@ -31,7 +32,10 @@ public class InlineToPrintIT extends TestSuiteBase {
CompletableFuture.supplyAsync(
() -> {
try {
- return container.executeJob("/kafka_to_print.yaml");
+ List<String> variables = List.of(
+ "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket",
+ "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket");
+ return container.executeJob("/inline_to_print.yaml", variables);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
@@ -85,6 +89,17 @@ public class InlineToPrintIT extends TestSuiteBase {
Assertions.assertNotNull(jobNumRestartsReference.get());
});
+
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ String logs = container.getServerLogs();
+ Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10);
+ Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10);
+ });
+
+
+
}
}
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
index daf6e32..b4773a1 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -2,7 +2,7 @@ sources:
inline_source:
type: inline
properties:
- data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
format: json
json.ignore.parse.errors: false
@@ -14,11 +14,21 @@ filters:
processing_pipelines:
projection_processor:
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
- function: DROP
filter: event.server_ip == '4.4.4.4'
+ - function: PATH_COMBINE
+ lookup_fields: [ rtp_pcap_path ]
+ output_fields: [ rtp_pcap_path ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ]
+ - function: PATH_COMBINE
+ lookup_fields: [ http_request_body ]
+ output_fields: [ http_request_body ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ]
sinks:
print_sink:
@@ -33,6 +43,12 @@ application:
parallelism: 3
pipeline:
object-reuse: true
+ properties:
+ hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket
+ hos.bucket.name.http_file: job_level_traffic_http_file_bucket
+ hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket
+
topology:
- name: inline_source
downstream: [filter_operator]
diff --git a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java
index 20caace..8b44ed7 100644
--- a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java
+++ b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java
@@ -122,7 +122,6 @@ public class ClickHouseIT extends TestSuiteBase implements TestResource {
@TestTemplate
public void testClickHouse(TestContainer container) throws Exception {
assertHasData(SOURCE_TABLE);
- clearTable(SOURCE_TABLE);
}
@TestTemplate
@@ -144,7 +143,6 @@ public class ClickHouseIT extends TestSuiteBase implements TestResource {
() -> {
assertHasData(SINK_TABLE);
compareResult();
- clearTable(SINK_TABLE);
});
}
diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml
index ab1ba72..4592f79 100644
--- a/groot-tests/test-e2e-kafka/pom.xml
+++ b/groot-tests/test-e2e-kafka/pom.xml
@@ -47,6 +47,18 @@
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>9</source>
+ <target>9</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project> \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8378a72..f1fb003 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
</modules>
<properties>
- <revision>1.5.0</revision>
+ <revision>1.6.0</revision>
<java.version>11</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>
@@ -55,6 +55,7 @@
<jsonpath.version>2.4.0</jsonpath.version>
<fastjson2.version>2.0.32</fastjson2.version>
<hutool.version>5.8.22</hutool.version>
+ <bouncycastle.version>1.78.1</bouncycastle.version>
<galaxy.version>2.0.2</galaxy.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
<ipaddress.version>5.3.3</ipaddress.version>
@@ -392,6 +393,12 @@
</dependency>
<dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ <version>${bouncycastle.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.github.seancfoley</groupId>
<artifactId>ipaddress</artifactId>
<version>${ipaddress.version}</version>