diff options
| author | 李奉超 <[email protected]> | 2024-07-23 08:23:04 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-07-23 08:23:04 +0000 |
| commit | 3cff9a87fa0beab38caff2b34d7344b4186e24e1 (patch) | |
| tree | 413bcbafb3c93796680bd0c6823d7933d49dc374 | |
| parent | 0042a6c2c922e3719218ed5d88a5ba47b6a25562 (diff) | |
| parent | 6e558c28ce9f07f58e9adcbbd802b586c6a179da (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[feature][bootstrap][core][common]支持自定义聚合函数,udf接口重命名为scalarFunction,时间戳转换函数支持设置interval
See merge request galaxy/platform/groot-stream!78
64 files changed, 941 insertions, 567 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 11ba617..62a320b 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -19,7 +19,17 @@ processing_pipelines: functions: - function: DROP filter: event.server_ip == '4.4.4.4' - + aggregate_processor: + type: aggregate + output_fields: + group_by_fields: [server_ip,server_port] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 60 + window_slide: 10 #滑动窗口步长 + functions: + - function: NUMBER_SUM + lookup_fields: [ sent_pkts ] + output_fields: [ sent_pkts_sum ] sinks: print_sink: type: print diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java new file mode 100644 index 0000000..42a4828 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -0,0 +1,168 @@ +package com.geedgenetworks.bootstrap.execution; + +import com.alibaba.fastjson.JSONObject; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.config.AggregateConfigOptions; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.ProjectionConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.pojo.ProcessorConfig; +import com.geedgenetworks.core.pojo.ProjectionConfig; +import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; +import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.net.URL; +import java.util.List; +import java.util.Map; + +public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> { + + + protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) { + super(jarPaths, operatorConfig); + } + + @Override + public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + + ProcessorConfig processorConfig = operatorMap.get(node.getName()); + switch (processorConfig.getType()) { + case "aggregate": + dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig); + break; + case "projection": + dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); + break; + default:// 兼容历史版本 + dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); + } + return dataStream; + } + + protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { + + AggregateProcessor aggregateProcessor; + if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) { + aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(aggregateConfig.getType()); + aggregateProcessor = (AggregateProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); + } + } + if (node.getParallelism() > 0) { + aggregateConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + aggregateProcessor.aggregateProcessorFunction( + dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + } catch (Exception e) { + throw new JobExecuteException("Create aggregate pipeline instance failed!", e); + } + return dataStream; + } + + protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { + + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(projectionConfig.getType()); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); + } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + projectionProcessor.projectionProcessorFunction( + dataStream, projectionConfig); + } catch (Exception e) { + throw new JobExecuteException("Create processing pipeline instance failed!", e); + } + return dataStream; + } + + protected ProcessorConfig checkProcessorConfig(String key, Map<String, Object> value, Config processorsConfig) { + ProcessorConfig projectionConfig; + CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key), + ProjectionConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Postprocessor: %s, Message: %s", + key, result.getMsg())); + } + switch ((String) value.getOrDefault("type", "")) { + case "projection": + projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); + break; + case "aggregate": + projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig); + break; + default://兼容历史版本 + projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); + } + return projectionConfig; + } + + protected ProcessorConfig checkProjectionProcessorConfig(String key, Map<String, Object> value, Config projectionProcessors) { + + CheckResult result = CheckConfigUtil.checkAtLeastOneExists(projectionProcessors.getConfig(key), + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Processor: %s, At least one of [%s] should be specified.", + key, String.join(",", + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()))); + } + + ProjectionConfig projectionConfig = new JSONObject(value).toJavaObject(ProjectionConfig.class); + projectionConfig.setName(key); + + return projectionConfig; + } + + + protected AggregateConfig checkAggregateProcessorConfig(String key, Map<String, Object> value, Config aggregateProcessorsConfig) { + + + CheckResult result = CheckConfigUtil.checkAllExists(aggregateProcessorsConfig.getConfig(key), + AggregateConfigOptions.GROUP_BY_FIELDS.key(), + AggregateConfigOptions.WINDOW_TYPE.key(), + AggregateConfigOptions.FUNCTIONS.key(), + AggregateConfigOptions.WINDOW_SIZE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Aggregate processor: %s, At least one of [%s] should be specified.", + key, String.join(",", + AggregateConfigOptions.OUTPUT_FIELDS.key(), + AggregateConfigOptions.REMOVE_FIELDS.key(), + AggregateConfigOptions.FUNCTIONS.key()))); + } + + AggregateConfig aggregateConfig = new JSONObject(value).toJavaObject(AggregateConfig.class); + aggregateConfig.setName(key); + return aggregateConfig; + } + + +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index fa5f572..36fad61 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -1,16 +1,9 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -22,7 +15,7 @@ import java.util.Map; /** * Initialize config and execute postprocessor */ -public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { +public class PostprocessingExecutor extends AbstractProcessorExecutor { private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType(); public PostprocessingExecutor(List<URL> jarPaths, Config config) { @@ -30,68 +23,20 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC } @Override - protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap(); + protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); postprocessors.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key), - ProjectionConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Postprocessor: %s, Message: %s", - key, result.getMsg())); - } - result = CheckConfigUtil.checkAtLeastOneExists(postprocessors.getConfig(key), - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Postprocessor: %s, At least one of [%s] should be specified.", - key, String.join(",", - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()))); - } - - ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - projectionConfig.setName(key); - postprocessingConfigMap.put(key, projectionConfig); + postprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, postprocessors)); }); - } - return postprocessingConfigMap; } + @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - String className = projectionConfig.getType(); - ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get postprocessing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); - } catch (Exception e) { - throw new JobExecuteException("Create postprocessing pipeline instance failed!", e); - } - return dataStream; + return super.execute(dataStream, node); } - } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 31fcce8..b1e53e4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -1,16 +1,9 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -24,7 +17,7 @@ import java.util.Map; * Initialize config and execute preprocessor */ @Slf4j -public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { +public class PreprocessingExecutor extends AbstractProcessorExecutor { private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); public PreprocessingExecutor(List<URL> jarPaths, Config config) { @@ -32,67 +25,20 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo } @Override - protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap(); + protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); preprocessors.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key), - ProjectionConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Preprocessor: %s, Message: %s", - key, result.getMsg())); - } - - result = CheckConfigUtil.checkAtLeastOneExists(preprocessors.getConfig(key), - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Preprocessor: %s, At least one of [%s] should be specified.", - key, String.join(",", - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()))); - } - - ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - projectionConfig.setName(key); - preprocessingConfigMap.put(key, projectionConfig); + preprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, preprocessors)); }); } - return preprocessingConfigMap; } @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - String className = projectionConfig.getType(); - ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get preprocessing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); - } catch (Exception e) { - throw new JobExecuteException("Create preprocessing pipeline instance failed!", e); - } - return dataStream; + return super.execute(dataStream, node); + } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index 4a3b204..d69fe8c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -1,16 +1,9 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -22,7 +15,7 @@ import java.util.Map; /** * Initialize config and execute processor */ -public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfig> { +public class ProcessingExecutor extends AbstractProcessorExecutor { private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType(); public ProcessingExecutor(List<URL> jarPaths, Config config) { @@ -30,69 +23,20 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi } @Override - protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap(); + protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); processors.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key), - ProjectionConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Processor: %s, Message: %s", - key, result.getMsg())); - } - result = CheckConfigUtil.checkAtLeastOneExists(processors.getConfig(key), - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Processor: %s, At least one of [%s] should be specified.", - key, String.join(",", - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()))); - } - - ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - projectionConfig.setName(key); - processingConfigMap.put(key, projectionConfig); + processingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, processors)); }); - } - return processingConfigMap; } @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - String className = projectionConfig.getType(); - ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get processing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); - } catch (Exception e) { - throw new JobExecuteException("Create processing pipeline instance failed!", e); - } - return dataStream; + return super.execute(dataStream, node); } - } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java new file mode 100644 index 0000000..403cecc --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.common; + +import lombok.Data; +import org.apache.flink.metrics.Counter; + +import java.io.Serializable; +import java.util.Map; + +@Data +public class Accumulator implements Serializable { + private Map<String, Object> metricsFields; + private long errorCount; + private long inEvents; + private long outEvents; + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index 231fac5..d13fc4b 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -43,10 +43,10 @@ public final class Constants { public static final String HAZELCAST_UDF_PLUGIN_CONFIG_FILE_PREFIX = "udf"; public static final String HAZELCAST_UDF_PLUGIN_CONFIG_DEFAULT = "udf.plugins"; - - - - + public static final String TUMBLING_PROCESSING_TIME = "tumbling_processing_time"; + public static final String TUMBLING_EVENT_TIME = "tumbling_event_time"; + public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time"; + public static final String SLIDING_EVENT_TIME = "sliding_event_time"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java index b040b0d..4ab4aef 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java @@ -8,6 +8,10 @@ import java.util.Map; @Data public class Event implements Serializable { public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp"; + public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp"; + public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp"; + + private Map<String, Object> extractedFields; //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing. private boolean isDropped = false; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java new file mode 100644 index 0000000..f1dc38f --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java @@ -0,0 +1,34 @@ +package com.geedgenetworks.common; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +@Data +public class KeybyEntity implements Serializable { + private Map<String,Object> keys; + private String keysToString; + + public KeybyEntity(Map<String, Object> keys) { + this.keys = keys; + } + + @Override + public int hashCode() { + return Objects.hash(keysToString); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KeybyEntity other = (KeybyEntity) o; + return Objects.equals(keysToString, other.keysToString); + } +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java new file mode 100644 index 0000000..3998a3b --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -0,0 +1,49 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.udf.UDFContext; + +import java.util.List; + +public interface AggregateConfigOptions { + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of processor."); + + Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be outputted."); + + Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be removed."); + + Option<List<UDFContext>> FUNCTIONS = Options.key("functions") + .type(new TypeReference<List<UDFContext>>() {}) + .noDefaultValue() + .withDescription("The functions to be executed."); + + Option<List<String>> GROUP_BY_FIELDS = Options.key("group_by_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be key by."); + + Option<String> WINDOW_TYPE = Options.key("window_type") + .stringType() + .noDefaultValue() + .withDescription("The type of window."); + + Option<Integer> WINDOW_SIZE = Options.key("window_size") + .intType() + .noDefaultValue() + .withDescription("The size of window."); + + Option<Integer> WINDOW_SLIDE = Options.key("window_slide") + .intType() + .noDefaultValue() + .withDescription("The size of sliding window."); + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java new file mode 100644 index 0000000..98450fd --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -0,0 +1,20 @@ +package com.geedgenetworks.common.udf; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; + +import java.io.Serializable; + +public interface AggregateFunction extends Serializable { + + Accumulator open(UDFContext udfContext,Accumulator acc); + + Accumulator add(Event val, Accumulator acc); + + String functionName(); + + Accumulator getResult(Accumulator acc); + + void close(); + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java index eb76263..2aab34b 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java @@ -3,7 +3,7 @@ import com.geedgenetworks.common.Event; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; -public interface UDF extends Serializable { +public interface ScalarFunction extends Serializable { void open(RuntimeContext runtimeContext, UDFContext udfContext); diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 89f8b11..73692a2 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -13,4 +13,7 @@ com.geedgenetworks.core.udf.Rename com.geedgenetworks.core.udf.SnowflakeId com.geedgenetworks.core.udf.StringJoiner com.geedgenetworks.core.udf.UnixTimestampConverter -com.geedgenetworks.core.udf.Flatten
\ No newline at end of file +com.geedgenetworks.core.udf.Flatten +com.geedgenetworks.core.udf.udaf.NumberSum +com.geedgenetworks.core.udf.udaf.CollectList +com.geedgenetworks.core.udf.udaf.CollectSet
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index 979fc8e..8111f09 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -1,108 +1,23 @@ package com.geedgenetworks.core.pojo; +import com.geedgenetworks.common.udf.UDFContext; +import lombok.Data; +import lombok.EqualsAndHashCode; + import java.util.List; import java.util.Map; +@EqualsAndHashCode(callSuper = true) +@Data +public class AggregateConfig extends ProcessorConfig{ -public class AggregateConfig { - private String type; - private int parallelism; private List<String> group_by_fields; private String timestamp_field; private String window_type; - private Integer window_interval_seconds; - private Integer sliding_interval; - private List<AggregateFunction> functions; + private Integer window_size; + private Integer max_out_of_orderness; + private Integer window_slide; + private List<UDFContext> functions; private String[] output_fields; - private Map<String, Object> properties; - - private String name; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public int getParallelism() { - return parallelism; - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public List<String> getGroup_by_fields() { - return group_by_fields; - } - - public void setGroup_by_fields(List<String> group_by_fields) { - this.group_by_fields = group_by_fields; - } - - public String getTimestamp_field() { - return timestamp_field; - } - - public void setTimestamp_field(String timestamp_field) { - this.timestamp_field = timestamp_field; - } - - public String getWindow_type() { - return window_type; - } - - public void setWindow_type(String window_type) { - this.window_type = window_type; - } - - public Integer getWindow_interval_seconds() { - return window_interval_seconds; - } - - public void setWindow_interval_seconds(Integer window_interval_seconds) { - this.window_interval_seconds = window_interval_seconds; - } - - public Integer getSliding_interval() { - return sliding_interval; - } - - public void setSliding_interval(Integer sliding_interval) { - this.sliding_interval = sliding_interval; - } - - public List<AggregateFunction> getFunctions() { - return functions; - } - - public void setFunctions(List<AggregateFunction> functions) { - this.functions = functions; - } - - public String[] getOutput_fields() { - return output_fields; - } - - public void setOutput_fields(String[] output_fields) { - this.output_fields = output_fields; - } - - public Map<String, Object> getProperties() { - return properties; - } - public void setProperties(Map<String, Object> properties) { - this.properties = properties; - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java index f937418..7e0110e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateFunction.java @@ -1,9 +1,11 @@ package com.geedgenetworks.core.pojo; +import lombok.Data; + import java.io.Serializable; import java.util.List; import java.util.Map; - +@Data public class AggregateFunction implements Serializable { private String name; @@ -13,51 +15,5 @@ public class AggregateFunction implements Serializable { private Map<String, Object> parameters; private String class_name; - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public List<String> getInput_fields() { - return input_fields; - } - - public void setInput_fields(List<String> input_fields) { - this.input_fields = input_fields; - } - - public List<String> getOutput_fields() { - return output_fields; - } - - public void setOutput_fields(List<String> output_fields) { - this.output_fields = output_fields; - } - - public String getFilter() { - return filter; - } - - public void setFilter(String filter) { - this.filter = filter; - } - - public Map<String, Object> getParameters() { - return parameters; - } - - public void setParameters(Map<String, Object> parameters) { - this.parameters = parameters; - } - - public String getClass_name() { - return class_name; - } - public void setClass_name(String class_name) { - this.class_name = class_name; - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java new file mode 100644 index 0000000..dacbd78 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProcessorConfig.java @@ -0,0 +1,12 @@ +package com.geedgenetworks.core.pojo; + +import lombok.Data; + +import java.util.Map; +@Data +public class ProcessorConfig { + private String type; + private int parallelism; + private Map<String, Object> properties; + private String name; +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java index 4adc9ef..4670fd8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java @@ -1,83 +1,20 @@ package com.geedgenetworks.core.pojo; import com.geedgenetworks.common.udf.UDFContext; +import lombok.Data; +import lombok.EqualsAndHashCode; import java.io.Serializable; import java.util.List; import java.util.Map; +@EqualsAndHashCode(callSuper = true) +@Data +public class ProjectionConfig extends ProcessorConfig implements Serializable { -public class ProjectionConfig implements Serializable { - - private int parallelism; private List<UDFContext> functions; private String format; - private String type; private List<String> output_fields; private List<String> remove_fields; - private Map<String, Object> properties; - private String name; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getParallelism() { - return parallelism; - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public List<UDFContext> getFunctions() { - return functions; - } - - public void setFunctions(List<UDFContext> functions) { - this.functions = functions; - } - - public String getFormat() { - return format; - } - - public void setFormat(String format) { - this.format = format; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public List<String> getOutput_fields() { - return output_fields; - } - - public void setOutput_fields(List<String> output_fields) { - this.output_fields = output_fields; - } - - public Map<String, Object> getProperties() { - return properties; - } - - public void setProperties(Map<String, Object> properties) { - this.properties = properties; - } - public List<String> getRemove_fields() { - return remove_fields; - } - public void setRemove_fields(List<String> remove_fields) { - this.remove_fields = remove_fields; - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java index bbf7d8d..9acf8fc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java @@ -3,13 +3,14 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.core.pojo.AggregateConfig; import com.geedgenetworks.common.Event; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface AggregateProcessor { SingleOutputStreamOperator<Event> aggregateProcessorFunction( SingleOutputStreamOperator<Event> singleOutputStreamOperator, - AggregateConfig aggregateConfig) + AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception; String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java new file mode 100644 index 0000000..b535faf --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -0,0 +1,129 @@ +package com.geedgenetworks.core.processor.aggregate; + +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.ExecutionConfig; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +@Slf4j +public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { + private final List<UDFContext> udfContexts; + private final List<String> udfClassNameLists; + private final List<String> groupByFields; + private LinkedList<UdfEntity> functions; + + public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { + udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + udfContexts = aggregateConfig.getFunctions(); + groupByFields = aggregateConfig.getGroup_by_fields(); + } + + @Override + public Accumulator createAccumulator() { + + functions = Lists.newLinkedList(); + if (udfContexts == null || udfContexts.isEmpty()) { + throw new RuntimeException(); + } + Map<String, Object> map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(map); + Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); + try { + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); + aggregateFunction.open(udfContext, accumulator); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setAggregateFunction(aggregateFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDAF: " + udfContext.getFunction()); + } + + } + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); + + } + + return accumulator; + } + + @Override + public Accumulator add(Event event, Accumulator accumulator) { + + accumulator.setInEvents(accumulator.getInEvents()+1); + for (UdfEntity udafEntity : functions) { + try { + boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; + if (result) { + udafEntity.getAggregateFunction().add(event, accumulator); + } + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udafEntity.getName() + " Invalid filter ! "); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " execute exception !", e); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } + } + return accumulator; + } + + @Override + public Accumulator getResult(Accumulator accumulator) { + for (UdfEntity udafEntity : functions) { + try { + udafEntity.getAggregateFunction().getResult(accumulator); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " getResult exception !", e); + } + } + return accumulator; + } + + @Override + public Accumulator merge(Accumulator a, Accumulator b) { + return null; + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index f6fa9d5..43fe20d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -1,32 +1,54 @@ package com.geedgenetworks.core.processor.aggregate; -import com.geedgenetworks.core.pojo.AggregateConfig; import com.geedgenetworks.common.Event; - +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.pojo.AggregateConfig; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import static com.geedgenetworks.common.Constants.*; public class AggregateProcessorImpl implements AggregateProcessor { @Override public SingleOutputStreamOperator<Event> aggregateProcessorFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, - AggregateConfig aggregateConfig) + SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, + AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { - /* List<String> keys = aggregateConfig.getGroup_by_fields(); - - return singleOutputStreamOperator.keyBy(new OneKeySelector(keys)).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_interval_seconds()))).reduce(new ReduceFunction<Event>() { - public Event reduce(Event v1, Event v2) { - - - Integer aa = (Integer) v1.getExtractedFields().get("common_sessions"); - Integer bb = (Integer) v2.getExtractedFields().get("common_sessions"); - v1.getExtractedFields().put("common_sessions",aa+bb); - return v1; - } - }).setParallelism(aggregateConfig.getParallelism()); - }*/ - return singleOutputStreamOperator; + if (aggregateConfig.getParallelism() != 0) { + switch (aggregateConfig.getWindow_type()) { + case TUMBLING_PROCESSING_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + case TUMBLING_EVENT_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + case SLIDING_PROCESSING_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + case SLIDING_EVENT_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + default: + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); + } + }else { + switch (aggregateConfig.getWindow_type()) { + case TUMBLING_PROCESSING_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + case TUMBLING_EVENT_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + case SLIDING_PROCESSING_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + case SLIDING_EVENT_TIME: + return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl()).name(aggregateConfig.getName()); + default: + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); + } + } } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java new file mode 100644 index 0000000..4b02166 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -0,0 +1,35 @@ +package com.geedgenetworks.core.processor.aggregate; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; + +import java.util.HashMap; +import java.util.List; + +public class KeySelector implements org.apache.flink.api.java.functions.KeySelector<Event, KeybyEntity> { + + + private final List<String> keys; + + public KeySelector(List<String> keys) { + this.keys = keys; + } + + @Override + public KeybyEntity getKey(Event value) throws Exception { + + KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); + StringBuilder stringBuilder = new StringBuilder(); + for(String key: keys){ + + if(value.getExtractedFields().containsKey(key)){ + keybyEntity.getKeys().put(key,value.getExtractedFields().get(key)); + stringBuilder.append(value.getExtractedFields().get(key).toString()); + }else { + stringBuilder.append(","); + } + keybyEntity.setKeysToString(stringBuilder.toString()); + } + return keybyEntity; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java new file mode 100644 index 0000000..46c9607 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java @@ -0,0 +1,43 @@ +package com.geedgenetworks.core.processor.aggregate; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; +import com.geedgenetworks.core.metrics.InternalMetrics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import static com.geedgenetworks.common.Event.WINDOW_END_TIMESTAMP; +import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP; + +public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction< + Accumulator, // 输入类型 + Event, // 输出类型 + KeybyEntity, // 键类型 + TimeWindow> { + + private transient InternalMetrics internalMetrics; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + } + + @Override + public void process(KeybyEntity keybyEntity, ProcessWindowFunction<Accumulator, Event, KeybyEntity, TimeWindow>.Context context, Iterable<Accumulator> elements, Collector<Event> out) throws Exception { + Accumulator accumulator = elements.iterator().next(); + Event event = new Event(); + event.setExtractedFields(accumulator.getMetricsFields()); + event.getExtractedFields().put(WINDOW_START_TIMESTAMP, context.window().getStart()); + event.getExtractedFields().put(WINDOW_END_TIMESTAMP, context.window().getEnd()); + event.getExtractedFields().putAll(keybyEntity.getKeys()); + internalMetrics.incrementOutEvents(); + internalMetrics.incrementErrorEvents(accumulator.getErrorCount()); + internalMetrics.incrementInEvents(accumulator.getInEvents()); + out.collect(event); + } +} + diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index f492767..55258b3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -8,10 +8,9 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.utils.HdfsUtils; import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler; import com.google.common.collect.Lists; import com.googlecode.aviator.AviatorEvaluator; @@ -24,12 +23,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; -import java.lang.reflect.Method; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + @Slf4j public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { private final ProjectionConfig projectionConfig; @@ -61,8 +61,8 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); - UDF udf = (UDF) cls.getConstructor().newInstance(); - udf.open(getRuntimeContext(), udfContext); + ScalarFunction scalarFunction = (ScalarFunction) cls.getConstructor().newInstance(); + scalarFunction.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); @@ -71,7 +71,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { instance.setFunctionMissing(null); filterExpression = instance.compile(udfContext.getFilter(), true); } - udfEntity.setUdfFunction(udf); + udfEntity.setScalarFunction(scalarFunction); udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); @@ -101,7 +101,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { } else { boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; if (result) { - udfEntity.getUdfFunction().evaluate(event); + udfEntity.getScalarFunction().evaluate(event); } } } catch (ExpressionRuntimeException ignore) { @@ -137,43 +137,11 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { } } - public static Map<String, String> getClassReflect(List<String> plugins) { - - Map<String, String> classReflect = new HashMap<>(); - - for (String classPath : plugins) { - - Class cls = null; - try { - cls = Class.forName(classPath); - Method method = cls.getMethod("functionName"); - Object object = cls.newInstance(); - String result = (String) method.invoke(object); - classReflect.put(result, classPath); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return classReflect; - } - - public static Boolean filterExecute(Expression expression, Map<String, Object> map) { - - boolean result; - Object object = expression.execute(map); - if (object != null) { - result = (Boolean) object; - } else { - throw new ExpressionRuntimeException(); - } - return result; - } - @Override public void close() throws Exception { for (UdfEntity udfEntity : functions) { - udfEntity.getUdfFunction().close(); + udfEntity.getScalarFunction().close(); } KnowledgeBaseScheduler.decrement(); if(KnowledgeBaseScheduler.getCount()<=0) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java index abd4d97..c36a785 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java @@ -1,44 +1,18 @@ package com.geedgenetworks.core.processor.projection; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.ScalarFunction; import com.googlecode.aviator.Expression; +import lombok.Data; + import java.io.Serializable; + +@Data public class UdfEntity implements Serializable { - private UDF udf; + private ScalarFunction scalarFunction; + private AggregateFunction aggregateFunction; private Expression filterExpression; private String name; private String className; - - public UDF getUdfFunction() { - return udf; - } - - public void setUdfFunction(UDF udf) { - this.udf = udf; - } - - public Expression getFilterExpression() { - return filterExpression; - } - - public void setFilterExpression(Expression filterExpression) { - this.filterExpression = filterExpression; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getClassName() { - return className; - } - - public void setClassName(String className) { - this.className = className; - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index 3a0ba9e..bdb3698 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -6,7 +6,7 @@ import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.core.udf.knowlegdebase.handler.AsnKnowledgeBaseHandler; @@ -17,7 +17,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @Slf4j -public class AsnLookup implements UDF { +public class AsnLookup implements ScalarFunction { private String kbName; private String option; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java index 1cdb3d7..5770201 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java @@ -2,14 +2,14 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; @Slf4j -public class CurrentUnixTimestamp implements UDF { +public class CurrentUnixTimestamp implements ScalarFunction { private String precision; private String outputFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java index 551d515..3581d5c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java @@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.utils.StringUtil; @@ -14,7 +14,7 @@ import java.io.UnsupportedEncodingException; import java.util.Base64; @Slf4j -public class DecodeBase64 implements UDF { +public class DecodeBase64 implements ScalarFunction { private String valueField; private String charsetField; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index fd5299a..77b3246 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -2,10 +2,9 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.utils.FormatUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; @@ -14,7 +13,7 @@ import java.util.List; import static com.geedgenetworks.utils.FormatUtils.getTopPrivateDomain; @Slf4j -public class Domain implements UDF { +public class Domain implements ScalarFunction { private String option; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java index 1205cce..93cd0db 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java @@ -1,12 +1,12 @@ package com.geedgenetworks.core.udf; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; -public class Drop implements UDF { +public class Drop implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java index b8ebdbf..c22ff54 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java @@ -3,17 +3,15 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.utils.StringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; -import java.io.UnsupportedEncodingException; import java.util.Base64; @Slf4j -public class EncodeBase64 implements UDF { +public class EncodeBase64 implements ScalarFunction { private String valueField; private String outputFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java index e42e0be..1b83d94 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java @@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.core.expressions.Calc; import com.geedgenetworks.core.expressions.EvalExecutor; @@ -12,7 +12,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.util.List; import java.util.Map; -public class Eval implements UDF { +public class Eval implements ScalarFunction { private String output; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java index 1062587..3153ef7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java @@ -5,9 +5,8 @@ import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.googlecode.aviator.Expression; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; @@ -15,7 +14,7 @@ import java.util.*; @Slf4j -public class Flatten implements UDF { +public class Flatten implements ScalarFunction { private String prefix; private String delimiter; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java index 30006fd..e1ba384 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java @@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import lombok.extern.slf4j.Slf4j; @@ -11,7 +11,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.text.SimpleDateFormat; import java.util.TimeZone; @Slf4j -public class FromUnixTimestamp implements UDF { +public class FromUnixTimestamp implements ScalarFunction { private String precision; private String outputFieldName; private String lookupFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java index fdb262c..ce4fc48 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java @@ -3,14 +3,14 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; import java.util.*; import java.util.stream.Collectors; -public class GenerateStringArray implements UDF { +public class GenerateStringArray implements ScalarFunction { private List<String> lookupFieldNames; private String outputFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index ce85b24..afe1bfb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -5,7 +5,7 @@ import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.core.udf.knowlegdebase.handler.GeoIpKnowledgeBaseHandler; @@ -20,7 +20,7 @@ import org.apache.flink.configuration.Configuration; import java.util.Map; @Slf4j -public class GeoIpLookup implements UDF { +public class GeoIpLookup implements ScalarFunction { private String kbName; private String option; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java index ccf76ca..f78b952 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java @@ -2,13 +2,13 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.common.utils.JsonPathUtil; import org.apache.flink.api.common.functions.RuntimeContext; -public class JsonExtract implements UDF { +public class JsonExtract implements ScalarFunction { private String lookupFieldName; private String outputFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 7b142ca..e48a503 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -6,10 +6,9 @@ import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.common.utils.CustomException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @@ -17,7 +16,7 @@ import org.apache.flink.configuration.Configuration; import java.util.*; @Slf4j -public class PathCombine implements UDF { +public class PathCombine implements ScalarFunction { private final Map<String, String> pathParameters = new LinkedHashMap<>(); private String outputFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java index 7e888fa..ba9b4d2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java @@ -3,7 +3,7 @@ package com.geedgenetworks.core.udf; import com.alibaba.fastjson2.JSONArray; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import com.googlecode.aviator.AviatorEvaluator; @@ -15,7 +15,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.util.*; @Slf4j -public class Rename implements UDF { +public class Rename implements ScalarFunction { private static Expression compiledExp; private Set<String> parentFields; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java index 67fc395..42a19e6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java @@ -1,6 +1,6 @@ package com.geedgenetworks.core.udf; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; @@ -10,7 +10,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; -public class SnowflakeId implements Serializable, UDF { +public class SnowflakeId implements Serializable, ScalarFunction { private String outputFieldName; private SnowflakeIdUtils snowflakeIdUtils; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java index 4dedffb..df5c5b4 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java @@ -3,13 +3,13 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; import java.util.List; -public class StringJoiner implements UDF { +public class StringJoiner implements ScalarFunction { private List<String> lookupFieldNames; private String outputFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java index 17562eb..a8171b3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java @@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import lombok.extern.slf4j.Slf4j; @@ -11,12 +11,14 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.time.Instant; @Slf4j -public class UnixTimestampConverter implements UDF { +public class UnixTimestampConverter implements ScalarFunction { private UDFContext udfContext; private String precision; private String lookupFieldName; private String outputFieldName; + + private long interval; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { @@ -46,6 +48,12 @@ public class UnixTimestampConverter implements UDF { this.precision =udfContext.getParameters().get("precision").toString(); } } + if(!udfContext.getParameters().containsKey("interval")){ + this.interval = 1L; + } + else{ + this.interval = Long.parseLong(udfContext.getParameters().get("interval").toString()); + } this.lookupFieldName = udfContext.getLookup_fields().get(0); this.outputFieldName = udfContext.getOutput_fields().get(0); @@ -69,13 +77,13 @@ public class UnixTimestampConverter implements UDF { } switch (precision) { case "seconds": - timestamp = instant.getEpochSecond(); + timestamp = instant.getEpochSecond()/interval*interval; break; case "milliseconds": - timestamp = instant.toEpochMilli(); + timestamp = instant.toEpochMilli()/interval*interval; break; case "minutes": - timestamp = instant.getEpochSecond()/60*60; + timestamp = instant.getEpochSecond()/(interval*60)*(interval*60); break; } event.getExtractedFields().put(outputFieldName, timestamp); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java index 309ac81..7c4aca2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java @@ -3,7 +3,7 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.common.config.KnowledgeBaseConfig; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @@ -18,7 +18,7 @@ import java.util.stream.Collectors; * @version 1.0 * @date 2024/1/19 9:33 */ -public abstract class AbstractKnowledgeUDF implements UDF { +public abstract class AbstractKnowledgeScalarFunction implements ScalarFunction { protected String lookupFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleScalarFunction.java index f72f8e1..3112ec7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleScalarFunction.java @@ -18,7 +18,7 @@ import java.util.stream.Collectors; * @version 1.0 * @date 2024/1/22 18:15 */ -public abstract class AbstractKnowledgeWithRuleUDF extends AbstractKnowledgeUDF { +public abstract class AbstractKnowledgeWithRuleScalarFunction extends AbstractKnowledgeScalarFunction { protected List<KnowledgeBaseConfig> ruleConfigs; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java index 12817af..6be1c90 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java @@ -16,7 +16,7 @@ import java.util.ArrayList; * @version 1.0 * @date 2024/1/22 14:25 */ -public class AnonymityLookup extends AbstractKnowledgeWithRuleUDF { +public class AnonymityLookup extends AbstractKnowledgeWithRuleScalarFunction { private static final Logger logger = LoggerFactory.getLogger(AnonymityLookup.class); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java index 5b3371a..0052d82 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java @@ -15,7 +15,7 @@ import java.util.Map; * @version 1.0 * @date 2024/1/22 14:14 */ -public class AppCategoryLookup extends AbstractKnowledgeUDF { +public class AppCategoryLookup extends AbstractKnowledgeScalarFunction { private static final Logger logger = LoggerFactory.getLogger(AppCategoryLookup.class); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java index 27bd228..a591884 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; @@ -14,7 +14,7 @@ import java.util.stream.Collectors; * @version 1.0 * @date 2024/1/23 11:14 */ -public class ArrayElementsPrepend implements UDF { +public class ArrayElementsPrepend implements ScalarFunction { private String prefix; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java index efe13b3..7136b71 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java @@ -11,7 +11,7 @@ import java.util.List; * @version 1.0 * @date 2024/1/18 16:21 */ -public class DnsServerInfoLookup extends AbstractKnowledgeUDF { +public class DnsServerInfoLookup extends AbstractKnowledgeScalarFunction { private DnsServerInfoKnowledgeBaseHandler knowledgeBaseHandler; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java index 0871909..6cd0c6b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; @@ -15,7 +15,7 @@ import java.util.stream.Collectors; * @date 2024/1/23 11:20 */ @Deprecated -public class FieldsMerge implements UDF { +public class FieldsMerge implements ScalarFunction { private List<String> lookupFieldNames; private String outputFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java index d499d00..0e52b8c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java @@ -15,7 +15,7 @@ import java.util.Map; * @version 1.0 * @date 2024/1/19 10:22 */ -public class FqdnCategoryLookup extends AbstractKnowledgeUDF { +public class FqdnCategoryLookup extends AbstractKnowledgeScalarFunction { private static final Logger logger = LoggerFactory.getLogger(FqdnCategoryLookup.class); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java index e228e6a..0cc18a0 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java @@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnWhoisKnowledgeBaseH * @version 1.0 * @date 2024/1/22 13:59 */ -public class FqdnWhoisLookup extends AbstractKnowledgeUDF { +public class FqdnWhoisLookup extends AbstractKnowledgeScalarFunction { private FqdnWhoisKnowledgeBaseHandler knowledgeBaseHandler; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java index 0bd4045..5c1fc97 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java @@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnIcpKnowledgeBaseHan * @version 1.0 * @date 2024/1/22 10:27 */ -public class IcpLookup extends AbstractKnowledgeUDF { +public class IcpLookup extends AbstractKnowledgeScalarFunction { private FqdnIcpKnowledgeBaseHandler knowledgeBaseHandler; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java index 7f9be21..f7c5398 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java @@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.IdcRenterKnowledgeBaseH * @version 1.0 * @date 2024/1/18 11:45 */ -public class IdcRenterLookup extends AbstractKnowledgeUDF { +public class IdcRenterLookup extends AbstractKnowledgeScalarFunction { private IdcRenterKnowledgeBaseHandler knowledgeBaseHandler; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java index 545fbaa..9f14bd2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java @@ -15,7 +15,7 @@ import java.util.List; * @version 1.0 * @date 2024/4/7 14:15 */ -public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF { +public class IntelligenceIndicatorLookup extends AbstractKnowledgeScalarFunction { private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorLookup.class); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java index 30383af..4003297 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java @@ -16,7 +16,7 @@ import java.util.ArrayList; * @version 1.0 * @date 2024/1/22 14:45 */ -public class IocLookup extends AbstractKnowledgeWithRuleUDF { +public class IocLookup extends AbstractKnowledgeWithRuleScalarFunction { private static final Logger logger = LoggerFactory.getLogger(IocLookup.class); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java index 9a7df14..b9bd139 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java @@ -12,7 +12,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.InternalIpKnowledgeBase * 如果知识库配置为空,则只根据内网网段判断是否为内部IP * 该函数优先级应该置于地理位置判断和flag字段判断之后 */ -public class IpZoneLookup extends AbstractKnowledgeUDF { +public class IpZoneLookup extends AbstractKnowledgeScalarFunction { private InternalIpKnowledgeBaseHandler knowledgeBaseHandler; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java index 897e285..7983015 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/L7ProtocolAndAppExtract.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.slf4j.Logger; @@ -17,7 +17,7 @@ import java.util.stream.Collectors; * @version 1.0 * @date 2024/1/23 10:59 */ -public class L7ProtocolAndAppExtract implements UDF { +public class L7ProtocolAndAppExtract implements ScalarFunction { private static final Logger logger = LoggerFactory.getLogger(L7ProtocolAndAppExtract.class); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java index 71964f0..e653820 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java @@ -9,7 +9,7 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.LinkDirectionKnowledgeB * @version 1.0 * @date 2024/1/17 14:56 */ -public class LinkDirectionLookup extends AbstractKnowledgeUDF { +public class LinkDirectionLookup extends AbstractKnowledgeScalarFunction { private LinkDirectionKnowledgeBaseHandler knowledgeBaseHandler; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java index 0eaf2ad..e5a3f7f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java @@ -15,7 +15,7 @@ import java.util.List; * @version 1.0 * @date 2024/1/22 15:02 */ -public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF { +public class UserDefineTagLookup extends AbstractKnowledgeWithRuleScalarFunction { private String option; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java index 2c78ab9..4cdb399 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java @@ -14,7 +14,7 @@ import org.slf4j.LoggerFactory; * @version 1.0 * @date 2024/1/19 10:40 */ -public class VpnLookup extends AbstractKnowledgeUDF { +public class VpnLookup extends AbstractKnowledgeScalarFunction { private static final Logger logger = LoggerFactory.getLogger(VpnLookup.class); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java new file mode 100644 index 0000000..1a82208 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -0,0 +1,73 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import java.util.*; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class CollectList implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public Accumulator open(UDFContext udfContext,Accumulator acc) { + this.lookupField = udfContext.getLookup_fields().get(0); + if(!udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } + else { + outputField = lookupField; + } + acc.getMetricsFields().put(outputField, new ArrayList<>()); + return acc; + } + + + @Override + public Accumulator add(Event event, Accumulator acc) { + if(event.getExtractedFields().containsKey(lookupField)){ + Object object = event.getExtractedFields().get(lookupField); + List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField); + aggregate.add(object); + acc.getMetricsFields().put(outputField, aggregate); + } + return acc; + } + + @Override + public String functionName() { + return "COLLECT_LIST"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public void close() { + + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java new file mode 100644 index 0000000..42bd7e8 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -0,0 +1,60 @@ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +import java.util.HashSet; +import java.util.Set; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class CollectSet implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public Accumulator open(UDFContext udfContext,Accumulator acc) { + this.lookupField = udfContext.getLookup_fields().get(0); + if(!udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } + else { + outputField = lookupField; + } + acc.getMetricsFields().put(outputField, new HashSet<>()); + return acc; + } + + + @Override + public Accumulator add(Event event, Accumulator acc) { + if(event.getExtractedFields().containsKey(lookupField)){ + Object object = event.getExtractedFields().get(lookupField); + Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField); + aggregate.add(object); + acc.getMetricsFields().put(outputField, aggregate); + } + return acc; + } + + @Override + public String functionName() { + return "COLLECT_SET"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public void close() { + + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java new file mode 100644 index 0000000..a491844 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -0,0 +1,60 @@ +package com.geedgenetworks.core.udf.udaf; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + + +public class NumberSum implements AggregateFunction { + private String lookupField; + private String outputField; + + + @Override + public Accumulator open(UDFContext udfContext,Accumulator acc){ + lookupField = udfContext.getLookup_fields().get(0); + if(!udfContext.getOutput_fields().isEmpty()) { + outputField = udfContext.getOutput_fields().get(0); + } + else { + outputField = lookupField; + } + return acc; + } + + + @Override + public Accumulator add(Event event, Accumulator acc) { + + if(event.getExtractedFields().containsKey(lookupField)){ + Number val = (Number) event.getExtractedFields().get(lookupField); + Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); + if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) { + aggregate = aggregate.longValue() + val.longValue(); + } else if (aggregate instanceof Float || val instanceof Float) { + aggregate = aggregate.floatValue() + val.floatValue(); + } else if (aggregate instanceof Double || val instanceof Double) { + aggregate = aggregate.doubleValue() + val.doubleValue(); + } + acc.getMetricsFields().put(outputField, aggregate); + } + return acc; + } + + @Override + public String functionName() { + return "NUMBER_SUM"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public void close() { + + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java new file mode 100644 index 0000000..9d4f67b --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.core.utils; + +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.exception.ExpressionRuntimeException; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class UDFUtils { + public static Map<String, String> getClassReflect(List<String> plugins) { + + Map<String, String> classReflect = new HashMap<>(); + + for (String classPath : plugins) { + + Class cls = null; + try { + cls = Class.forName(classPath); + Method method = cls.getMethod("functionName"); + Object object = cls.newInstance(); + String result = (String) method.invoke(object); + classReflect.put(result, classPath); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return classReflect; + } + public static Boolean filterExecute(Expression expression, Map<String, Object> map) { + + boolean result; + Object object = expression.execute(map); + if (object != null) { + result = (Boolean) object; + } else { + throw new ExpressionRuntimeException(); + } + return result; + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java index 04e1c9d..12a2093 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java @@ -28,12 +28,13 @@ public class UnixTimestampConverterTest { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); + parameters.put("interval", "600"); udfContext.setParameters(parameters); UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); unixTimestampConverter.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("input", 1577808000000L); + extractedFields.put("input", 1577808090000L); event.setExtractedFields(extractedFields); Event result1 = unixTimestampConverter.evaluate(event); assertEquals(1577808000L, result1.getExtractedFields().get("output")); @@ -46,12 +47,13 @@ public class UnixTimestampConverterTest { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "milliseconds"); + parameters.put("interval", "60000"); udfContext.setParameters(parameters); UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); unixTimestampConverter.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("input", 1577808000L); + extractedFields.put("input", 1577808030L); event.setExtractedFields(extractedFields); Event result1 = unixTimestampConverter.evaluate(event); assertEquals(1577808000000L, result1.getExtractedFields().get("output")); @@ -65,12 +67,13 @@ public class UnixTimestampConverterTest { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); + parameters.put("interval", "60"); udfContext.setParameters(parameters); UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); unixTimestampConverter.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("input", 1577808000L); + extractedFields.put("input", 1577808001L); event.setExtractedFields(extractedFields); Event result1 = unixTimestampConverter.evaluate(event); assertEquals(1577808000L, result1.getExtractedFields().get("output")); @@ -84,12 +87,13 @@ public class UnixTimestampConverterTest { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "milliseconds"); + parameters.put("interval", "60000"); udfContext.setParameters(parameters); UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); unixTimestampConverter.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("input", 1577808000000L); + extractedFields.put("input", 1577808001100L); event.setExtractedFields(extractedFields); Event result1 = unixTimestampConverter.evaluate(event); assertEquals(1577808000000L, result1.getExtractedFields().get("output")); @@ -102,15 +106,16 @@ public class UnixTimestampConverterTest { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "minutes"); + parameters.put("interval", "5"); udfContext.setParameters(parameters); UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); unixTimestampConverter.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("input", 1577808100000L); + extractedFields.put("input", 1577808101001L); event.setExtractedFields(extractedFields); Event result1 = unixTimestampConverter.evaluate(event); - assertEquals(1577808060L, result1.getExtractedFields().get("output")); + assertEquals(1577808000L, result1.getExtractedFields().get("output")); } |
