summaryrefslogtreecommitdiff
path: root/groot-common
diff options
context:
space:
mode:
author李奉超 <[email protected]>2024-07-23 08:23:04 +0000
committer李奉超 <[email protected]>2024-07-23 08:23:04 +0000
commit3cff9a87fa0beab38caff2b34d7344b4186e24e1 (patch)
tree413bcbafb3c93796680bd0c6823d7933d49dc374 /groot-common
parent0042a6c2c922e3719218ed5d88a5ba47b6a25562 (diff)
parent6e558c28ce9f07f58e9adcbbd802b586c6a179da (diff)
Merge branch 'feature/aggregate' into 'develop'
[feature][bootstrap][core][common]支持自定义聚合函数,udf接口重命名为scalarFunction,时间戳转换函数支持设置interval See merge request galaxy/platform/groot-stream!78
Diffstat (limited to 'groot-common')
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java8
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Event.java4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java34
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java49
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java20
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java)2
-rw-r--r--groot-common/src/main/resources/udf.plugins5
8 files changed, 132 insertions, 6 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java
new file mode 100644
index 0000000..403cecc
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.common;
+
+import lombok.Data;
+import org.apache.flink.metrics.Counter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+public class Accumulator implements Serializable {
+ private Map<String, Object> metricsFields;
+ private long errorCount;
+ private long inEvents;
+ private long outEvents;
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 231fac5..d13fc4b 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -43,10 +43,10 @@ public final class Constants {
public static final String HAZELCAST_UDF_PLUGIN_CONFIG_FILE_PREFIX = "udf";
public static final String HAZELCAST_UDF_PLUGIN_CONFIG_DEFAULT = "udf.plugins";
-
-
-
-
+ public static final String TUMBLING_PROCESSING_TIME = "tumbling_processing_time";
+ public static final String TUMBLING_EVENT_TIME = "tumbling_event_time";
+ public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time";
+ public static final String SLIDING_EVENT_TIME = "sliding_event_time";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
index b040b0d..4ab4aef 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
@@ -8,6 +8,10 @@ import java.util.Map;
@Data
public class Event implements Serializable {
public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp";
+ public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp";
+ public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp";
+
+
private Map<String, Object> extractedFields;
//Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing.
private boolean isDropped = false;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java
new file mode 100644
index 0000000..f1dc38f
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.common;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+@Data
+public class KeybyEntity implements Serializable {
+ private Map<String,Object> keys;
+ private String keysToString;
+
+ public KeybyEntity(Map<String, Object> keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keysToString);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KeybyEntity other = (KeybyEntity) o;
+ return Objects.equals(keysToString, other.keysToString);
+ }
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
new file mode 100644
index 0000000..3998a3b
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
@@ -0,0 +1,49 @@
+package com.geedgenetworks.common.config;
+
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.udf.UDFContext;
+
+import java.util.List;
+
+public interface AggregateConfigOptions {
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of processor.");
+
+ Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be outputted.");
+
+ Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be removed.");
+
+ Option<List<UDFContext>> FUNCTIONS = Options.key("functions")
+ .type(new TypeReference<List<UDFContext>>() {})
+ .noDefaultValue()
+ .withDescription("The functions to be executed.");
+
+ Option<List<String>> GROUP_BY_FIELDS = Options.key("group_by_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be key by.");
+
+ Option<String> WINDOW_TYPE = Options.key("window_type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of window.");
+
+ Option<Integer> WINDOW_SIZE = Options.key("window_size")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The size of window.");
+
+ Option<Integer> WINDOW_SLIDE = Options.key("window_slide")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The size of sliding window.");
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
new file mode 100644
index 0000000..98450fd
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
@@ -0,0 +1,20 @@
+package com.geedgenetworks.common.udf;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+
+import java.io.Serializable;
+
+public interface AggregateFunction extends Serializable {
+
+ Accumulator open(UDFContext udfContext,Accumulator acc);
+
+ Accumulator add(Event val, Accumulator acc);
+
+ String functionName();
+
+ Accumulator getResult(Accumulator acc);
+
+ void close();
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
index eb76263..2aab34b 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDF.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
@@ -3,7 +3,7 @@ import com.geedgenetworks.common.Event;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.Serializable;
-public interface UDF extends Serializable {
+public interface ScalarFunction extends Serializable {
void open(RuntimeContext runtimeContext, UDFContext udfContext);
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 89f8b11..73692a2 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -13,4 +13,7 @@ com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
com.geedgenetworks.core.udf.UnixTimestampConverter
-com.geedgenetworks.core.udf.Flatten \ No newline at end of file
+com.geedgenetworks.core.udf.Flatten
+com.geedgenetworks.core.udf.udaf.NumberSum
+com.geedgenetworks.core.udf.udaf.CollectList
+com.geedgenetworks.core.udf.udaf.CollectSet \ No newline at end of file