diff options
| author | 李奉超 <[email protected]> | 2024-07-23 08:23:04 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-07-23 08:23:04 +0000 |
| commit | 3cff9a87fa0beab38caff2b34d7344b4186e24e1 (patch) | |
| tree | 413bcbafb3c93796680bd0c6823d7933d49dc374 /groot-common | |
| parent | 0042a6c2c922e3719218ed5d88a5ba47b6a25562 (diff) | |
| parent | 6e558c28ce9f07f58e9adcbbd802b586c6a179da (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[feature][bootstrap][core][common]支持自定义聚合函数,udf接口重命名为scalarFunction,时间戳转换函数支持设置interval
See merge request galaxy/platform/groot-stream!78
Diffstat (limited to 'groot-common')
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java | 16 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/Constants.java | 8 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/Event.java | 4 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java | 34 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java | 49 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java | 20 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java) | 2 | ||||
| -rw-r--r-- | groot-common/src/main/resources/udf.plugins | 5 |
8 files changed, 132 insertions, 6 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java new file mode 100644 index 0000000..403cecc --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.common; + +import lombok.Data; +import org.apache.flink.metrics.Counter; + +import java.io.Serializable; +import java.util.Map; + +@Data +public class Accumulator implements Serializable { + private Map<String, Object> metricsFields; + private long errorCount; + private long inEvents; + private long outEvents; + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index 231fac5..d13fc4b 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -43,10 +43,10 @@ public final class Constants { public static final String HAZELCAST_UDF_PLUGIN_CONFIG_FILE_PREFIX = "udf"; public static final String HAZELCAST_UDF_PLUGIN_CONFIG_DEFAULT = "udf.plugins"; - - - - + public static final String TUMBLING_PROCESSING_TIME = "tumbling_processing_time"; + public static final String TUMBLING_EVENT_TIME = "tumbling_event_time"; + public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time"; + public static final String SLIDING_EVENT_TIME = "sliding_event_time"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java index b040b0d..4ab4aef 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java @@ -8,6 +8,10 @@ import java.util.Map; @Data public class Event implements Serializable { public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp"; + public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp"; + public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp"; + + private Map<String, Object> extractedFields; //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing. private boolean isDropped = false; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java new file mode 100644 index 0000000..f1dc38f --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java @@ -0,0 +1,34 @@ +package com.geedgenetworks.common; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +@Data +public class KeybyEntity implements Serializable { + private Map<String,Object> keys; + private String keysToString; + + public KeybyEntity(Map<String, Object> keys) { + this.keys = keys; + } + + @Override + public int hashCode() { + return Objects.hash(keysToString); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KeybyEntity other = (KeybyEntity) o; + return Objects.equals(keysToString, other.keysToString); + } +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java new file mode 100644 index 0000000..3998a3b --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -0,0 +1,49 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.udf.UDFContext; + +import java.util.List; + +public interface AggregateConfigOptions { + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of processor."); + + Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be outputted."); + + Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be removed."); + + Option<List<UDFContext>> FUNCTIONS = Options.key("functions") + .type(new TypeReference<List<UDFContext>>() {}) + .noDefaultValue() + .withDescription("The functions to be executed."); + + Option<List<String>> GROUP_BY_FIELDS = Options.key("group_by_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be key by."); + + Option<String> WINDOW_TYPE = Options.key("window_type") + .stringType() + .noDefaultValue() + .withDescription("The type of window."); + + Option<Integer> WINDOW_SIZE = Options.key("window_size") + .intType() + .noDefaultValue() + .withDescription("The size of window."); + + Option<Integer> WINDOW_SLIDE = Options.key("window_slide") + .intType() + .noDefaultValue() + .withDescription("The size of sliding window."); + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java new file mode 100644 index 0000000..98450fd --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -0,0 +1,20 @@ +package com.geedgenetworks.common.udf; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; + +import java.io.Serializable; + +public interface AggregateFunction extends Serializable { + + Accumulator open(UDFContext udfContext,Accumulator acc); + + Accumulator add(Event val, Accumulator acc); + + String functionName(); + + Accumulator getResult(Accumulator acc); + + void close(); + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java index eb76263..2aab34b 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java @@ -3,7 +3,7 @@ import com.geedgenetworks.common.Event; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; -public interface UDF extends Serializable { +public interface ScalarFunction extends Serializable { void open(RuntimeContext runtimeContext, UDFContext udfContext); diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 89f8b11..73692a2 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -13,4 +13,7 @@ com.geedgenetworks.core.udf.Rename com.geedgenetworks.core.udf.SnowflakeId com.geedgenetworks.core.udf.StringJoiner com.geedgenetworks.core.udf.UnixTimestampConverter -com.geedgenetworks.core.udf.Flatten
\ No newline at end of file +com.geedgenetworks.core.udf.Flatten +com.geedgenetworks.core.udf.udaf.NumberSum +com.geedgenetworks.core.udf.udaf.CollectList +com.geedgenetworks.core.udf.udaf.CollectSet
\ No newline at end of file |
