summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-23 15:39:48 +0800
committerwangkuan <[email protected]>2024-07-23 15:39:48 +0800
commit6e558c28ce9f07f58e9adcbbd802b586c6a179da (patch)
treecde93730383c2a7e1d944f50bd2683af7fc9fa8b
parent76b5f89c44bded7da01e27ee97065f9bd4c77848 (diff)
[feature][bootstrap][core][common]支持自定义聚合函数,udf接口重命名为scalarFunction,时间戳转换函数支持设置interval
-rw-r--r--config/grootstream_job_example.yaml12
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java168
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java69
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java68
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java68
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java8
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Event.java4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java34
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java49
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java20
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java)2
-rw-r--r--groot-common/src/main/resources/udf.plugins5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java107
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java50
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java73
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java129
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java58
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java35
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java43
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java50
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java42
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java18
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java)4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleScalarFunction.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java)2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java73
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java60
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java60
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java42
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java17
64 files changed, 941 insertions, 567 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 11ba617..62a320b 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -19,7 +19,17 @@ processing_pipelines:
functions:
- function: DROP
filter: event.server_ip == '4.4.4.4'
-
+ aggregate_processor:
+ type: aggregate
+ output_fields:
+ group_by_fields: [server_ip,server_port]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 60
+ window_slide: 10 #滑动窗口步长
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sent_pkts ]
+ output_fields: [ sent_pkts_sum ]
sinks:
print_sink:
type: print
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
new file mode 100644
index 0000000..42a4828
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -0,0 +1,168 @@
+package com.geedgenetworks.bootstrap.execution;
+
+import com.alibaba.fastjson.JSONObject;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.config.AggregateConfigOptions;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.ProjectionConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
+import com.geedgenetworks.core.pojo.ProjectionConfig;
+import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
+import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> {
+
+
+ protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) {
+ super(jarPaths, operatorConfig);
+ }
+
+ @Override
+ public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+
+ ProcessorConfig processorConfig = operatorMap.get(node.getName());
+ switch (processorConfig.getType()) {
+ case "aggregate":
+ dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig);
+ break;
+ case "projection":
+ dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
+ break;
+ default:// 兼容历史版本
+ dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
+ }
+ return dataStream;
+ }
+
+ protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
+
+ AggregateProcessor aggregateProcessor;
+ if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) {
+ aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(aggregateConfig.getType());
+ aggregateProcessor = (AggregateProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ aggregateConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ aggregateProcessor.aggregateProcessorFunction(
+ dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+ } catch (Exception e) {
+ throw new JobExecuteException("Create aggregate pipeline instance failed!", e);
+ }
+ return dataStream;
+ }
+
+ protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
+
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(projectionConfig.getType());
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ projectionProcessor.projectionProcessorFunction(
+ dataStream, projectionConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create processing pipeline instance failed!", e);
+ }
+ return dataStream;
+ }
+
+ protected ProcessorConfig checkProcessorConfig(String key, Map<String, Object> value, Config processorsConfig) {
+ ProcessorConfig projectionConfig;
+ CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key),
+ ProjectionConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Postprocessor: %s, Message: %s",
+ key, result.getMsg()));
+ }
+ switch ((String) value.getOrDefault("type", "")) {
+ case "projection":
+ projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
+ break;
+ case "aggregate":
+ projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig);
+ break;
+ default://兼容历史版本
+ projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
+ }
+ return projectionConfig;
+ }
+
+ protected ProcessorConfig checkProjectionProcessorConfig(String key, Map<String, Object> value, Config projectionProcessors) {
+
+ CheckResult result = CheckConfigUtil.checkAtLeastOneExists(projectionProcessors.getConfig(key),
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Processor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key())));
+ }
+
+ ProjectionConfig projectionConfig = new JSONObject(value).toJavaObject(ProjectionConfig.class);
+ projectionConfig.setName(key);
+
+ return projectionConfig;
+ }
+
+
+ protected AggregateConfig checkAggregateProcessorConfig(String key, Map<String, Object> value, Config aggregateProcessorsConfig) {
+
+
+ CheckResult result = CheckConfigUtil.checkAllExists(aggregateProcessorsConfig.getConfig(key),
+ AggregateConfigOptions.GROUP_BY_FIELDS.key(),
+ AggregateConfigOptions.WINDOW_TYPE.key(),
+ AggregateConfigOptions.FUNCTIONS.key(),
+ AggregateConfigOptions.WINDOW_SIZE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Aggregate processor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ AggregateConfigOptions.OUTPUT_FIELDS.key(),
+ AggregateConfigOptions.REMOVE_FIELDS.key(),
+ AggregateConfigOptions.FUNCTIONS.key())));
+ }
+
+ AggregateConfig aggregateConfig = new JSONObject(value).toJavaObject(AggregateConfig.class);
+ aggregateConfig.setName(key);
+ return aggregateConfig;
+ }
+
+
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index fa5f572..36fad61 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -1,16 +1,9 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -22,7 +15,7 @@ import java.util.Map;
/**
* Initialize config and execute postprocessor
*/
-public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
+public class PostprocessingExecutor extends AbstractProcessorExecutor {
private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType();
public PostprocessingExecutor(List<URL> jarPaths, Config config) {
@@ -30,68 +23,20 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC
}
@Override
- protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap();
+ protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
postprocessors.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key),
- ProjectionConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Postprocessor: %s, Message: %s",
- key, result.getMsg()));
- }
- result = CheckConfigUtil.checkAtLeastOneExists(postprocessors.getConfig(key),
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Postprocessor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key())));
- }
-
- ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- projectionConfig.setName(key);
- postprocessingConfigMap.put(key, projectionConfig);
+ postprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, postprocessors));
});
-
}
-
return postprocessingConfigMap;
}
+
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- String className = projectionConfig.getType();
- ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get postprocessing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create postprocessing pipeline instance failed!", e);
- }
- return dataStream;
+ return super.execute(dataStream, node);
}
-
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index 31fcce8..b1e53e4 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -1,16 +1,9 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -24,7 +17,7 @@ import java.util.Map;
* Initialize config and execute preprocessor
*/
@Slf4j
-public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
+public class PreprocessingExecutor extends AbstractProcessorExecutor {
private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType();
public PreprocessingExecutor(List<URL> jarPaths, Config config) {
@@ -32,67 +25,20 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo
}
@Override
- protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap();
+ protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
preprocessors.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key),
- ProjectionConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Preprocessor: %s, Message: %s",
- key, result.getMsg()));
- }
-
- result = CheckConfigUtil.checkAtLeastOneExists(preprocessors.getConfig(key),
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Preprocessor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key())));
- }
-
- ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- projectionConfig.setName(key);
- preprocessingConfigMap.put(key, projectionConfig);
+ preprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, preprocessors));
});
}
-
return preprocessingConfigMap;
}
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- String className = projectionConfig.getType();
- ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get preprocessing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create preprocessing pipeline instance failed!", e);
- }
- return dataStream;
+ return super.execute(dataStream, node);
+
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index 4a3b204..d69fe8c 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -1,16 +1,9 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -22,7 +15,7 @@ import java.util.Map;
/**
* Initialize config and execute processor
*/
-public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
+public class ProcessingExecutor extends AbstractProcessorExecutor {
private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType();
public ProcessingExecutor(List<URL> jarPaths, Config config) {
@@ -30,69 +23,20 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi
}
@Override
- protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap();
+ protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
processors.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key),
- ProjectionConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Processor: %s, Message: %s",
- key, result.getMsg()));
- }
- result = CheckConfigUtil.checkAtLeastOneExists(processors.getConfig(key),
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Processor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key())));
- }
-
- ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- projectionConfig.setName(key);
- processingConfigMap.put(key, projectionConfig);
+ processingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, processors));
});
-
}
-
return processingConfigMap;
}
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- String className = projectionConfig.getType();
- ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get processing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create processing pipeline instance failed!", e);
- }
- return dataStream;
+ return super.execute(dataStream, node);
}
-
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java
new file mode 100644
index 0000000..403cecc
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.common;
+
+import lombok.Data;
+import org.apache.flink.metrics.Counter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+public class Accumulator implements Serializable {
+ private Map<String, Object> metricsFields;
+ private long errorCount;
+ private long inEvents;
+ private long outEvents;
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 231fac5..d13fc4b 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -43,10 +43,10 @@ public final class Constants {
public static final String HAZELCAST_UDF_PLUGIN_CONFIG_FILE_PREFIX = "udf";
public static final String HAZELCAST_UDF_PLUGIN_CONFIG_DEFAULT = "udf.plugins";
-
-
-
-
+ public static final String TUMBLING_PROCESSING_TIME = "tumbling_processing_time";
+ public static final String TUMBLING_EVENT_TIME = "tumbling_event_time";
+ public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time";
+ public static final String SLIDING_EVENT_TIME = "sliding_event_time";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
index b040b0d..4ab4aef 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
@@ -8,6 +8,10 @@ import java.util.Map;
@Data
public class Event implements Serializable {
public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp";
+ public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp";
+ public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp";
+
+
private Map<String, Object> extractedFields;
//Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing.
private boolean isDropped = false;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java
new file mode 100644
index 0000000..f1dc38f
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.common;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+@Data
+public class KeybyEntity implements Serializable {
+ private Map<String,Object> keys;
+ private String keysToString;
+
+ public KeybyEntity(Map<String, Object> keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keysToString);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KeybyEntity other = (KeybyEntity) o;
+ return Objects.equals(keysToString, other.keysToString);
+ }
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
new file mode 100644
index 0000000..3998a3b
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
@@ -0,0 +1,49 @@
+package com.geedgenetworks.common.config;
+
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.udf.UDFContext;
+
+import java.util.List;
+
+public interface AggregateConfigOptions {
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of processor.");
+
+ Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be outputted.");
+
+ Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be removed.");
+
+ Option<List<UDFContext>> FUNCTIONS = Options.key("functions")
+ .type(new TypeReference<List<UDFContext>>() {})
+ .noDefaultValue()
+ .withDescription("The functions to be executed.");
+
+ Option<List<String>> GROUP_BY_FIELDS = Options.key("group_by_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be key by.");
+
+ Option<String> WINDOW_TYPE = Options.key("window_type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of window.");
+
+ Option<Integer> WINDOW_SIZE = Options.key("window_size")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The size of window.");
+
+ Option<Integer> WINDOW_SLIDE = Options.key("window_slide")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The size of sliding window.");
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
new file mode 100644
index 0000000..98450fd
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
@@ -0,0 +1,20 @@
+package com.geedgenetworks.common.udf;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+
+import java.io.Serializable;
+
+public interface AggregateFunction extends Serializable {
+
+ Accumulator open(UDFContext udfContext,Accumulator acc);
+
+ Accumulator add(Event val, Accumulator acc);
+
+ String functionName();
+
+ Accumulator getResult(Accumulator acc);
+
+ void close();
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
index eb76263..2aab34b 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
@@ -3,7 +3,7 @@ import com.geedgenetworks.common.Event;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.Serializable;
-public interface UDF extends Serializable {
+public interface ScalarFunction extends Serializable {
void open(RuntimeContext runtimeContext, UDFContext udfContext);
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 89f8b11..73692a2 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -13,4 +13,7 @@ com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
com.geedgenetworks.core.udf.UnixTimestampConverter
-com.geedgenetworks.core.udf.Flatten \ No newline at end of file
+com.geedgenetworks.core.udf.Flatten
+com.geedgenetworks.core.udf.udaf.NumberSum
+com.geedgenetworks.core.udf.udaf.CollectList
+com.geedgenetworks.core.udf.udaf.CollectSet \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
index 979fc8e..8111f09 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
@@ -1,108 +1,23 @@
package com.geedgenetworks.core.pojo;
+import com.geedgenetworks.common.udf.UDFContext;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
import java.util.List;
import java.util.Map;
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class AggregateConfig extends ProcessorConfig{
-public class AggregateConfig {
- private String type;
- private int parallelism;
private List<String> group_by_fields;
private String timestamp_field;
private String window_type;
- private Integer window_interval_seconds;
- private Integer sliding_interval;
- private List<AggregateFunction> functions;
+ private Integer window_size;
+ private Integer max_out_of_orderness;
+ private Integer window_slide;
+ private List<UDFContext> functions;
private String[] output_fields;
- private Map<String, Object> properties;
-
- private String name;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- }
-
- public List<String> getGroup_by_fields() {
- return group_by_fields;
- }
-
- public void setGroup_by_fields(List<String> group_by_fields) {
- this.group_by_fields = group_by_fields;
- }
-
- public String getTimestamp_field() {
- return timestamp_field;
- }
-
- public void setTimestamp_field(String timestamp_field) {
- this.timestamp_field = timestamp_field;
- }
-
- public String getWindow_type() {
- return window_type;
- }
-
- public void setWindow_type(String window_type) {
- this.window_type = window_type;
- }
-
- public Integer getWindow_interval_seconds() {
- return window_interval_seconds;
- }
-
- public void setWindow_interval_seconds(Integer window_interval_seconds) {
- this.window_interval_seconds = window_interval_seconds;
- }
-
- public Integer getSliding_interval() {
- return sliding_interval;
- }
-
- public void setSliding_interval(Integer sliding_interval) {
- this.sliding_interval = sliding_interval;
- }
-
- public List<AggregateFunction> getFunctions() {
- return functions;
- }
-
- public void setFunctions(List<AggregateFunction> functions) {
- this.functions = functions;
- }
-
- public String[] getOutput_fields() {
- return output_fields;
- }
-
- public void setOutput_fields(String[] output_fields) {
- this.output_fields = output_fields;
- }
-
- public Map<String, Object> getProperties() {
- return properties;
- }
- public void setProperties(Map<String, Object> properties) {
- this.properties = properties;
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java
index f937418..7e0110e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java
@@ -1,9 +1,11 @@
package com.geedgenetworks.core.pojo;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.List;
import java.util.Map;
-
+@Data
public class AggregateFunction implements Serializable {
private String name;
@@ -13,51 +15,5 @@ public class AggregateFunction implements Serializable {
private Map<String, Object> parameters;
private String class_name;
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public List<String> getInput_fields() {
- return input_fields;
- }
-
- public void setInput_fields(List<String> input_fields) {
- this.input_fields = input_fields;
- }
-
- public List<String> getOutput_fields() {
- return output_fields;
- }
-
- public void setOutput_fields(List<String> output_fields) {
- this.output_fields = output_fields;
- }
-
- public String getFilter() {
- return filter;
- }
-
- public void setFilter(String filter) {
- this.filter = filter;
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
- }
-
- public String getClass_name() {
- return class_name;
- }
- public void setClass_name(String class_name) {
- this.class_name = class_name;
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java
new file mode 100644
index 0000000..dacbd78
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java
@@ -0,0 +1,12 @@
+package com.geedgenetworks.core.pojo;
+
+import lombok.Data;
+
+import java.util.Map;
+@Data
+public class ProcessorConfig {
+ private String type;
+ private int parallelism;
+ private Map<String, Object> properties;
+ private String name;
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
index 4adc9ef..4670fd8 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
@@ -1,83 +1,20 @@
package com.geedgenetworks.core.pojo;
import com.geedgenetworks.common.udf.UDFContext;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class ProjectionConfig extends ProcessorConfig implements Serializable {
-public class ProjectionConfig implements Serializable {
-
- private int parallelism;
private List<UDFContext> functions;
private String format;
- private String type;
private List<String> output_fields;
private List<String> remove_fields;
- private Map<String, Object> properties;
- private String name;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- }
-
- public List<UDFContext> getFunctions() {
- return functions;
- }
-
- public void setFunctions(List<UDFContext> functions) {
- this.functions = functions;
- }
-
- public String getFormat() {
- return format;
- }
-
- public void setFormat(String format) {
- this.format = format;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public List<String> getOutput_fields() {
- return output_fields;
- }
-
- public void setOutput_fields(List<String> output_fields) {
- this.output_fields = output_fields;
- }
-
- public Map<String, Object> getProperties() {
- return properties;
- }
-
- public void setProperties(Map<String, Object> properties) {
- this.properties = properties;
- }
- public List<String> getRemove_fields() {
- return remove_fields;
- }
- public void setRemove_fields(List<String> remove_fields) {
- this.remove_fields = remove_fields;
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
index bbf7d8d..9acf8fc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
@@ -3,13 +3,14 @@ package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.core.pojo.AggregateConfig;
import com.geedgenetworks.common.Event;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public interface AggregateProcessor {
SingleOutputStreamOperator<Event> aggregateProcessorFunction(
SingleOutputStreamOperator<Event> singleOutputStreamOperator,
- AggregateConfig aggregateConfig)
+ AggregateConfig aggregateConfig, ExecutionConfig config)
throws Exception;
String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
new file mode 100644
index 0000000..b535faf
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -0,0 +1,129 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import com.geedgenetworks.core.processor.projection.UdfEntity;
+import com.google.common.collect.Lists;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.ExecutionConfig;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
+import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+
+@Slf4j
+public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> {
+ private final List<UDFContext> udfContexts;
+ private final List<String> udfClassNameLists;
+ private final List<String> groupByFields;
+ private LinkedList<UdfEntity> functions;
+
+ public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
+ udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+ udfContexts = aggregateConfig.getFunctions();
+ groupByFields = aggregateConfig.getGroup_by_fields();
+ }
+
+ @Override
+ public Accumulator createAccumulator() {
+
+ functions = Lists.newLinkedList();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ throw new RuntimeException();
+ }
+ Map<String, Object> map = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(map);
+ Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
+ try {
+ for (UDFContext udfContext : udfContexts) {
+ Expression filterExpression = null;
+ UdfEntity udfEntity = new UdfEntity();
+ // 平台注册的函数包含任务中配置的函数则对函数进行实例化
+ if (udfClassReflect.containsKey(udfContext.getFunction())) {
+ Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
+ AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
+ aggregateFunction.open(udfContext, accumulator);
+ // 函数如果包含filter,对表达式进行编译
+ if (udfContext.getFilter() != null) {
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ filterExpression = instance.compile(udfContext.getFilter(), true);
+ }
+ udfEntity.setAggregateFunction(aggregateFunction);
+ udfEntity.setFilterExpression(filterExpression);
+ udfEntity.setName(udfContext.getFunction());
+ udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ functions.add(udfEntity);
+ } else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Unsupported UDAF: " + udfContext.getFunction());
+ }
+
+ }
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
+
+ }
+
+ return accumulator;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator accumulator) {
+
+ accumulator.setInEvents(accumulator.getInEvents()+1);
+ for (UdfEntity udafEntity : functions) {
+ try {
+ boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true;
+ if (result) {
+ udafEntity.getAggregateFunction().add(event, accumulator);
+ }
+ } catch (ExpressionRuntimeException ignore) {
+ log.error("Function " + udafEntity.getName() + " Invalid filter ! ");
+ accumulator.setErrorCount(accumulator.getErrorCount() + 1);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " execute exception !", e);
+ accumulator.setErrorCount(accumulator.getErrorCount() + 1);
+ }
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator accumulator) {
+ for (UdfEntity udafEntity : functions) {
+ try {
+ udafEntity.getAggregateFunction().getResult(accumulator);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " getResult exception !", e);
+ }
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index f6fa9d5..43fe20d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -1,32 +1,54 @@
package com.geedgenetworks.core.processor.aggregate;
-import com.geedgenetworks.core.pojo.AggregateConfig;
import com.geedgenetworks.common.Event;
-
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import static com.geedgenetworks.common.Constants.*;
public class AggregateProcessorImpl implements AggregateProcessor {
@Override
public SingleOutputStreamOperator<Event> aggregateProcessorFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator,
- AggregateConfig aggregateConfig)
+ SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator,
+ AggregateConfig aggregateConfig, ExecutionConfig config)
throws Exception {
- /* List<String> keys = aggregateConfig.getGroup_by_fields();
-
- return singleOutputStreamOperator.keyBy(new OneKeySelector(keys)).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_interval_seconds()))).reduce(new ReduceFunction<Event>() {
- public Event reduce(Event v1, Event v2) {
-
-
- Integer aa = (Integer) v1.getExtractedFields().get("common_sessions");
- Integer bb = (Integer) v2.getExtractedFields().get("common_sessions");
- v1.getExtractedFields().put("common_sessions",aa+bb);
- return v1;
- }
- }).setParallelism(aggregateConfig.getParallelism());
- }*/
- return singleOutputStreamOperator;
+ if (aggregateConfig.getParallelism() != 0) {
+ switch (aggregateConfig.getWindow_type()) {
+ case TUMBLING_PROCESSING_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ case TUMBLING_EVENT_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ case SLIDING_PROCESSING_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ case SLIDING_EVENT_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ default:
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
+ }
+ }else {
+ switch (aggregateConfig.getWindow_type()) {
+ case TUMBLING_PROCESSING_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ case TUMBLING_EVENT_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ case SLIDING_PROCESSING_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ case SLIDING_EVENT_TIME:
+ return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName());
+ default:
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
+ }
+ }
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
new file mode 100644
index 0000000..4b02166
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
@@ -0,0 +1,35 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class KeySelector implements org.apache.flink.api.java.functions.KeySelector<Event, KeybyEntity> {
+
+
+ private final List<String> keys;
+
+ public KeySelector(List<String> keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public KeybyEntity getKey(Event value) throws Exception {
+
+ KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
+ StringBuilder stringBuilder = new StringBuilder();
+ for(String key: keys){
+
+ if(value.getExtractedFields().containsKey(key)){
+ keybyEntity.getKeys().put(key,value.getExtractedFields().get(key));
+ stringBuilder.append(value.getExtractedFields().get(key).toString());
+ }else {
+ stringBuilder.append(",");
+ }
+ keybyEntity.setKeysToString(stringBuilder.toString());
+ }
+ return keybyEntity;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
new file mode 100644
index 0000000..46c9607
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import static com.geedgenetworks.common.Event.WINDOW_END_TIMESTAMP;
+import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP;
+
+public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<
+ Accumulator, // 输入类型
+ Event, // 输出类型
+ KeybyEntity, // 键类型
+ TimeWindow> {
+
+ private transient InternalMetrics internalMetrics;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
+ }
+
+ @Override
+ public void process(KeybyEntity keybyEntity, ProcessWindowFunction<Accumulator, Event, KeybyEntity, TimeWindow>.Context context, Iterable<Accumulator> elements, Collector<Event> out) throws Exception {
+ Accumulator accumulator = elements.iterator().next();
+ Event event = new Event();
+ event.setExtractedFields(accumulator.getMetricsFields());
+ event.getExtractedFields().put(WINDOW_START_TIMESTAMP, context.window().getStart());
+ event.getExtractedFields().put(WINDOW_END_TIMESTAMP, context.window().getEnd());
+ event.getExtractedFields().putAll(keybyEntity.getKeys());
+ internalMetrics.incrementOutEvents();
+ internalMetrics.incrementErrorEvents(accumulator.getErrorCount());
+ internalMetrics.incrementInEvents(accumulator.getInEvents());
+ out.collect(event);
+ }
+}
+
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index f492767..55258b3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -8,10 +8,9 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.utils.HdfsUtils;
import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler;
import com.google.common.collect.Lists;
import com.googlecode.aviator.AviatorEvaluator;
@@ -24,12 +23,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
-import java.lang.reflect.Method;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
+import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+
@Slf4j
public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
private final ProjectionConfig projectionConfig;
@@ -61,8 +61,8 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
- UDF udf = (UDF) cls.getConstructor().newInstance();
- udf.open(getRuntimeContext(), udfContext);
+ ScalarFunction scalarFunction = (ScalarFunction) cls.getConstructor().newInstance();
+ scalarFunction.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
@@ -71,7 +71,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
instance.setFunctionMissing(null);
filterExpression = instance.compile(udfContext.getFilter(), true);
}
- udfEntity.setUdfFunction(udf);
+ udfEntity.setScalarFunction(scalarFunction);
udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
@@ -101,7 +101,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
} else {
boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true;
if (result) {
- udfEntity.getUdfFunction().evaluate(event);
+ udfEntity.getScalarFunction().evaluate(event);
}
}
} catch (ExpressionRuntimeException ignore) {
@@ -137,43 +137,11 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
}
}
- public static Map<String, String> getClassReflect(List<String> plugins) {
-
- Map<String, String> classReflect = new HashMap<>();
-
- for (String classPath : plugins) {
-
- Class cls = null;
- try {
- cls = Class.forName(classPath);
- Method method = cls.getMethod("functionName");
- Object object = cls.newInstance();
- String result = (String) method.invoke(object);
- classReflect.put(result, classPath);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return classReflect;
- }
-
- public static Boolean filterExecute(Expression expression, Map<String, Object> map) {
-
- boolean result;
- Object object = expression.execute(map);
- if (object != null) {
- result = (Boolean) object;
- } else {
- throw new ExpressionRuntimeException();
- }
- return result;
- }
-
@Override
public void close() throws Exception {
for (UdfEntity udfEntity : functions) {
- udfEntity.getUdfFunction().close();
+ udfEntity.getScalarFunction().close();
}
KnowledgeBaseScheduler.decrement();
if(KnowledgeBaseScheduler.getCount()<=0) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
index abd4d97..c36a785 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
@@ -1,44 +1,18 @@
package com.geedgenetworks.core.processor.projection;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.googlecode.aviator.Expression;
+import lombok.Data;
+
import java.io.Serializable;
+
+@Data
public class UdfEntity implements Serializable {
- private UDF udf;
+ private ScalarFunction scalarFunction;
+ private AggregateFunction aggregateFunction;
private Expression filterExpression;
private String name;
private String className;
-
- public UDF getUdfFunction() {
- return udf;
- }
-
- public void setUdfFunction(UDF udf) {
- this.udf = udf;
- }
-
- public Expression getFilterExpression() {
- return filterExpression;
- }
-
- public void setFilterExpression(Expression filterExpression) {
- this.filterExpression = filterExpression;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getClassName() {
- return className;
- }
-
- public void setClassName(String className) {
- this.className = className;
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 3a0ba9e..bdb3698 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -6,7 +6,7 @@ import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.core.udf.knowlegdebase.handler.AsnKnowledgeBaseHandler;
@@ -17,7 +17,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@Slf4j
-public class AsnLookup implements UDF {
+public class AsnLookup implements ScalarFunction {
private String kbName;
private String option;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
index 1cdb3d7..5770201 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
@@ -2,14 +2,14 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class CurrentUnixTimestamp implements UDF {
+public class CurrentUnixTimestamp implements ScalarFunction {
private String precision;
private String outputFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
index 551d515..3581d5c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.utils.StringUtil;
@@ -14,7 +14,7 @@ import java.io.UnsupportedEncodingException;
import java.util.Base64;
@Slf4j
-public class DecodeBase64 implements UDF {
+public class DecodeBase64 implements ScalarFunction {
private String valueField;
private String charsetField;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index fd5299a..77b3246 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -2,10 +2,9 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.utils.FormatUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -14,7 +13,7 @@ import java.util.List;
import static com.geedgenetworks.utils.FormatUtils.getTopPrivateDomain;
@Slf4j
-public class Domain implements UDF {
+public class Domain implements ScalarFunction {
private String option;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
index 1205cce..93cd0db 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.core.udf;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
-public class Drop implements UDF {
+public class Drop implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
index b8ebdbf..c22ff54 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
@@ -3,17 +3,15 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
-import java.io.UnsupportedEncodingException;
import java.util.Base64;
@Slf4j
-public class EncodeBase64 implements UDF {
+public class EncodeBase64 implements ScalarFunction {
private String valueField;
private String outputFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
index 7f62858..51f0687 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.core.expressions.Calc;
import com.geedgenetworks.core.expressions.codegen.CalcCodeConvertor;
@@ -12,7 +12,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
import java.util.Map;
-public class Eval implements UDF {
+public class Eval implements ScalarFunction {
private String output;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
index 1062587..3153ef7 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
@@ -5,9 +5,8 @@ import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.googlecode.aviator.Expression;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -15,7 +14,7 @@ import java.util.*;
@Slf4j
-public class Flatten implements UDF {
+public class Flatten implements ScalarFunction {
private String prefix;
private String delimiter;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
index 30006fd..e1ba384 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import lombok.extern.slf4j.Slf4j;
@@ -11,7 +11,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
@Slf4j
-public class FromUnixTimestamp implements UDF {
+public class FromUnixTimestamp implements ScalarFunction {
private String precision;
private String outputFieldName;
private String lookupFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java
index fdb262c..ce4fc48 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java
@@ -3,14 +3,14 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.*;
import java.util.stream.Collectors;
-public class GenerateStringArray implements UDF {
+public class GenerateStringArray implements ScalarFunction {
private List<String> lookupFieldNames;
private String outputFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index ce85b24..afe1bfb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -5,7 +5,7 @@ import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.core.udf.knowlegdebase.handler.GeoIpKnowledgeBaseHandler;
@@ -20,7 +20,7 @@ import org.apache.flink.configuration.Configuration;
import java.util.Map;
@Slf4j
-public class GeoIpLookup implements UDF {
+public class GeoIpLookup implements ScalarFunction {
private String kbName;
private String option;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
index ccf76ca..f78b952 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
@@ -2,13 +2,13 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.common.utils.JsonPathUtil;
import org.apache.flink.api.common.functions.RuntimeContext;
-public class JsonExtract implements UDF {
+public class JsonExtract implements ScalarFunction {
private String lookupFieldName;
private String outputFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 7b142ca..e48a503 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -6,10 +6,9 @@ import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.common.utils.CustomException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -17,7 +16,7 @@ import org.apache.flink.configuration.Configuration;
import java.util.*;
@Slf4j
-public class PathCombine implements UDF {
+public class PathCombine implements ScalarFunction {
private final Map<String, String> pathParameters = new LinkedHashMap<>();
private String outputFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index 7e888fa..ba9b4d2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -3,7 +3,7 @@ package com.geedgenetworks.core.udf;
import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import com.googlecode.aviator.AviatorEvaluator;
@@ -15,7 +15,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.*;
@Slf4j
-public class Rename implements UDF {
+public class Rename implements ScalarFunction {
private static Expression compiledExp;
private Set<String> parentFields;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
index 67fc395..42a19e6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.udf;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
@@ -10,7 +10,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.Serializable;
-public class SnowflakeId implements Serializable, UDF {
+public class SnowflakeId implements Serializable, ScalarFunction {
private String outputFieldName;
private SnowflakeIdUtils snowflakeIdUtils;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java
index 4dedffb..df5c5b4 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java
@@ -3,13 +3,13 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
-public class StringJoiner implements UDF {
+public class StringJoiner implements ScalarFunction {
private List<String> lookupFieldNames;
private String outputFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
index 17562eb..a8171b3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
import lombok.extern.slf4j.Slf4j;
@@ -11,12 +11,14 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.time.Instant;
@Slf4j
-public class UnixTimestampConverter implements UDF {
+public class UnixTimestampConverter implements ScalarFunction {
private UDFContext udfContext;
private String precision;
private String lookupFieldName;
private String outputFieldName;
+
+ private long interval;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
@@ -46,6 +48,12 @@ public class UnixTimestampConverter implements UDF {
this.precision =udfContext.getParameters().get("precision").toString();
}
}
+ if(!udfContext.getParameters().containsKey("interval")){
+ this.interval = 1L;
+ }
+ else{
+ this.interval = Long.parseLong(udfContext.getParameters().get("interval").toString());
+ }
this.lookupFieldName = udfContext.getLookup_fields().get(0);
this.outputFieldName = udfContext.getOutput_fields().get(0);
@@ -69,13 +77,13 @@ public class UnixTimestampConverter implements UDF {
}
switch (precision) {
case "seconds":
- timestamp = instant.getEpochSecond();
+ timestamp = instant.getEpochSecond()/interval*interval;
break;
case "milliseconds":
- timestamp = instant.toEpochMilli();
+ timestamp = instant.toEpochMilli()/interval*interval;
break;
case "minutes":
- timestamp = instant.getEpochSecond()/60*60;
+ timestamp = instant.getEpochSecond()/(interval*60)*(interval*60);
break;
}
event.getExtractedFields().put(outputFieldName, timestamp);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java
index 309ac81..7c4aca2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java
@@ -3,7 +3,7 @@ package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -18,7 +18,7 @@ import java.util.stream.Collectors;
* @version 1.0
* @date 2024/1/19 9:33
*/
-public abstract class AbstractKnowledgeUDF implements UDF {
+public abstract class AbstractKnowledgeScalarFunction implements ScalarFunction {
protected String lookupFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleScalarFunction.java
index f72f8e1..3112ec7 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleScalarFunction.java
@@ -18,7 +18,7 @@ import java.util.stream.Collectors;
* @version 1.0
* @date 2024/1/22 18:15
*/
-public abstract class AbstractKnowledgeWithRuleUDF extends AbstractKnowledgeUDF {
+public abstract class AbstractKnowledgeWithRuleScalarFunction extends AbstractKnowledgeScalarFunction {
protected List<KnowledgeBaseConfig> ruleConfigs;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java
index 12817af..6be1c90 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java
@@ -16,7 +16,7 @@ import java.util.ArrayList;
* @version 1.0
* @date 2024/1/22 14:25
*/
-public class AnonymityLookup extends AbstractKnowledgeWithRuleUDF {
+public class AnonymityLookup extends AbstractKnowledgeWithRuleScalarFunction {
private static final Logger logger = LoggerFactory.getLogger(AnonymityLookup.class);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java
index 5b3371a..0052d82 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java
@@ -15,7 +15,7 @@ import java.util.Map;
* @version 1.0
* @date 2024/1/22 14:14
*/
-public class AppCategoryLookup extends AbstractKnowledgeUDF {
+public class AppCategoryLookup extends AbstractKnowledgeScalarFunction {
private static final Logger logger = LoggerFactory.getLogger(AppCategoryLookup.class);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java
index 27bd228..a591884 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -14,7 +14,7 @@ import java.util.stream.Collectors;
* @version 1.0
* @date 2024/1/23 11:14
*/
-public class ArrayElementsPrepend implements UDF {
+public class ArrayElementsPrepend implements ScalarFunction {
private String prefix;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java
index efe13b3..7136b71 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java
@@ -11,7 +11,7 @@ import java.util.List;
* @version 1.0
* @date 2024/1/18 16:21
*/
-public class DnsServerInfoLookup extends AbstractKnowledgeUDF {
+public class DnsServerInfoLookup extends AbstractKnowledgeScalarFunction {
private DnsServerInfoKnowledgeBaseHandler knowledgeBaseHandler;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java
index 0871909..6cd0c6b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -15,7 +15,7 @@ import java.util.stream.Collectors;
* @date 2024/1/23 11:20
*/
@Deprecated
-public class FieldsMerge implements UDF {
+public class FieldsMerge implements ScalarFunction {
private List<String> lookupFieldNames;
private String outputFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java
index d499d00..0e52b8c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java
@@ -15,7 +15,7 @@ import java.util.Map;
* @version 1.0
* @date 2024/1/19 10:22
*/
-public class FqdnCategoryLookup extends AbstractKnowledgeUDF {
+public class FqdnCategoryLookup extends AbstractKnowledgeScalarFunction {
private static final Logger logger = LoggerFactory.getLogger(FqdnCategoryLookup.class);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java
index e228e6a..0cc18a0 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java
@@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnWhoisKnowledgeBaseH
* @version 1.0
* @date 2024/1/22 13:59
*/
-public class FqdnWhoisLookup extends AbstractKnowledgeUDF {
+public class FqdnWhoisLookup extends AbstractKnowledgeScalarFunction {
private FqdnWhoisKnowledgeBaseHandler knowledgeBaseHandler;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java
index 0bd4045..5c1fc97 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java
@@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnIcpKnowledgeBaseHan
* @version 1.0
* @date 2024/1/22 10:27
*/
-public class IcpLookup extends AbstractKnowledgeUDF {
+public class IcpLookup extends AbstractKnowledgeScalarFunction {
private FqdnIcpKnowledgeBaseHandler knowledgeBaseHandler;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java
index 7f9be21..f7c5398 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java
@@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.IdcRenterKnowledgeBaseH
* @version 1.0
* @date 2024/1/18 11:45
*/
-public class IdcRenterLookup extends AbstractKnowledgeUDF {
+public class IdcRenterLookup extends AbstractKnowledgeScalarFunction {
private IdcRenterKnowledgeBaseHandler knowledgeBaseHandler;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java
index 545fbaa..9f14bd2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java
@@ -15,7 +15,7 @@ import java.util.List;
* @version 1.0
* @date 2024/4/7 14:15
*/
-public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF {
+public class IntelligenceIndicatorLookup extends AbstractKnowledgeScalarFunction {
private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorLookup.class);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java
index 30383af..4003297 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java
@@ -16,7 +16,7 @@ import java.util.ArrayList;
* @version 1.0
* @date 2024/1/22 14:45
*/
-public class IocLookup extends AbstractKnowledgeWithRuleUDF {
+public class IocLookup extends AbstractKnowledgeWithRuleScalarFunction {
private static final Logger logger = LoggerFactory.getLogger(IocLookup.class);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java
index 9a7df14..b9bd139 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java
@@ -12,7 +12,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.InternalIpKnowledgeBase
* 如果知识库配置为空,则只根据内网网段判断是否为内部IP
* 该函数优先级应该置于地理位置判断和flag字段判断之后
*/
-public class IpZoneLookup extends AbstractKnowledgeUDF {
+public class IpZoneLookup extends AbstractKnowledgeScalarFunction {
private InternalIpKnowledgeBaseHandler knowledgeBaseHandler;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java
index 897e285..7983015 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.slf4j.Logger;
@@ -17,7 +17,7 @@ import java.util.stream.Collectors;
* @version 1.0
* @date 2024/1/23 10:59
*/
-public class L7ProtocolAndAppExtract implements UDF {
+public class L7ProtocolAndAppExtract implements ScalarFunction {
private static final Logger logger = LoggerFactory.getLogger(L7ProtocolAndAppExtract.class);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java
index 71964f0..e653820 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java
@@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.LinkDirectionKnowledgeB
* @version 1.0
* @date 2024/1/17 14:56
*/
-public class LinkDirectionLookup extends AbstractKnowledgeUDF {
+public class LinkDirectionLookup extends AbstractKnowledgeScalarFunction {
private LinkDirectionKnowledgeBaseHandler knowledgeBaseHandler;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java
index 0eaf2ad..e5a3f7f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java
@@ -15,7 +15,7 @@ import java.util.List;
* @version 1.0
* @date 2024/1/22 15:02
*/
-public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF {
+public class UserDefineTagLookup extends AbstractKnowledgeWithRuleScalarFunction {
private String option;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java
index 2c78ab9..4cdb399 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java
@@ -14,7 +14,7 @@ import org.slf4j.LoggerFactory;
* @version 1.0
* @date 2024/1/19 10:40
*/
-public class VpnLookup extends AbstractKnowledgeUDF {
+public class VpnLookup extends AbstractKnowledgeScalarFunction {
private static final Logger logger = LoggerFactory.getLogger(VpnLookup.class);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
new file mode 100644
index 0000000..1a82208
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import java.util.*;
+
+/**
+ * Collects elements within a group and returns the list of aggregated objects
+ */
+public class CollectList implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+
+ @Override
+ public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if(!udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputField = lookupField;
+ }
+ acc.getMetricsFields().put(outputField, new ArrayList<>());
+ return acc;
+ }
+
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ if(event.getExtractedFields().containsKey(lookupField)){
+ Object object = event.getExtractedFields().get(lookupField);
+ List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField);
+ aggregate.add(object);
+ acc.getMetricsFields().put(outputField, aggregate);
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "COLLECT_LIST";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
new file mode 100644
index 0000000..42bd7e8
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -0,0 +1,60 @@
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Collects elements within a group and returns the list of aggregated objects
+ */
+public class CollectSet implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+
+ @Override
+ public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if(!udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputField = lookupField;
+ }
+ acc.getMetricsFields().put(outputField, new HashSet<>());
+ return acc;
+ }
+
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ if(event.getExtractedFields().containsKey(lookupField)){
+ Object object = event.getExtractedFields().get(lookupField);
+ Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField);
+ aggregate.add(object);
+ acc.getMetricsFields().put(outputField, aggregate);
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "COLLECT_SET";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
new file mode 100644
index 0000000..a491844
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -0,0 +1,60 @@
+package com.geedgenetworks.core.udf.udaf;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+
+public class NumberSum implements AggregateFunction {
+ private String lookupField;
+ private String outputField;
+
+
+ @Override
+ public Accumulator open(UDFContext udfContext,Accumulator acc){
+ lookupField = udfContext.getLookup_fields().get(0);
+ if(!udfContext.getOutput_fields().isEmpty()) {
+ outputField = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputField = lookupField;
+ }
+ return acc;
+ }
+
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+
+ if(event.getExtractedFields().containsKey(lookupField)){
+ Number val = (Number) event.getExtractedFields().get(lookupField);
+ Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L);
+ if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) {
+ aggregate = aggregate.longValue() + val.longValue();
+ } else if (aggregate instanceof Float || val instanceof Float) {
+ aggregate = aggregate.floatValue() + val.floatValue();
+ } else if (aggregate instanceof Double || val instanceof Double) {
+ aggregate = aggregate.doubleValue() + val.doubleValue();
+ }
+ acc.getMetricsFields().put(outputField, aggregate);
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "NUMBER_SUM";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java
new file mode 100644
index 0000000..9d4f67b
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java
@@ -0,0 +1,42 @@
+package com.geedgenetworks.core.utils;
+
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class UDFUtils {
+ public static Map<String, String> getClassReflect(List<String> plugins) {
+
+ Map<String, String> classReflect = new HashMap<>();
+
+ for (String classPath : plugins) {
+
+ Class cls = null;
+ try {
+ cls = Class.forName(classPath);
+ Method method = cls.getMethod("functionName");
+ Object object = cls.newInstance();
+ String result = (String) method.invoke(object);
+ classReflect.put(result, classPath);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return classReflect;
+ }
+ public static Boolean filterExecute(Expression expression, Map<String, Object> map) {
+
+ boolean result;
+ Object object = expression.execute(map);
+ if (object != null) {
+ result = (Boolean) object;
+ } else {
+ throw new ExpressionRuntimeException();
+ }
+ return result;
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
index 04e1c9d..12a2093 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
@@ -28,12 +28,13 @@ public class UnixTimestampConverterTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
+ parameters.put("interval", "600");
udfContext.setParameters(parameters);
UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
unixTimestampConverter.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("input", 1577808000000L);
+ extractedFields.put("input", 1577808090000L);
event.setExtractedFields(extractedFields);
Event result1 = unixTimestampConverter.evaluate(event);
assertEquals(1577808000L, result1.getExtractedFields().get("output"));
@@ -46,12 +47,13 @@ public class UnixTimestampConverterTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");
+ parameters.put("interval", "60000");
udfContext.setParameters(parameters);
UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
unixTimestampConverter.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("input", 1577808000L);
+ extractedFields.put("input", 1577808030L);
event.setExtractedFields(extractedFields);
Event result1 = unixTimestampConverter.evaluate(event);
assertEquals(1577808000000L, result1.getExtractedFields().get("output"));
@@ -65,12 +67,13 @@ public class UnixTimestampConverterTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
+ parameters.put("interval", "60");
udfContext.setParameters(parameters);
UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
unixTimestampConverter.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("input", 1577808000L);
+ extractedFields.put("input", 1577808001L);
event.setExtractedFields(extractedFields);
Event result1 = unixTimestampConverter.evaluate(event);
assertEquals(1577808000L, result1.getExtractedFields().get("output"));
@@ -84,12 +87,13 @@ public class UnixTimestampConverterTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");
+ parameters.put("interval", "60000");
udfContext.setParameters(parameters);
UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
unixTimestampConverter.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("input", 1577808000000L);
+ extractedFields.put("input", 1577808001100L);
event.setExtractedFields(extractedFields);
Event result1 = unixTimestampConverter.evaluate(event);
assertEquals(1577808000000L, result1.getExtractedFields().get("output"));
@@ -102,15 +106,16 @@ public class UnixTimestampConverterTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "minutes");
+ parameters.put("interval", "5");
udfContext.setParameters(parameters);
UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
unixTimestampConverter.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("input", 1577808100000L);
+ extractedFields.put("input", 1577808101001L);
event.setExtractedFields(extractedFields);
Event result1 = unixTimestampConverter.evaluate(event);
- assertEquals(1577808060L, result1.getExtractedFields().get("output"));
+ assertEquals(1577808000L, result1.getExtractedFields().get("output"));
}