summaryrefslogtreecommitdiff
path: root/groot-common
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-29 18:24:42 +0800
committerwangkuan <[email protected]>2024-08-29 18:24:42 +0800
commit0ea9b9d9db5f92e7afd7b86ddad1f8d69d5c0945 (patch)
treeca735cab001f5f3a597d87122cda0c998f3b9426 /groot-common
parent8d90c04d22a5df3ac5a6d4d12fc1b9fee03f38e8 (diff)
[feature][bootstrap][core]增加预聚合功能,相关函数支持merge
Diffstat (limited to 'groot-common')
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java7
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java7
2 files changed, 6 insertions, 8 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
index 0b0379d..af94abf 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
@@ -5,8 +5,6 @@ import com.geedgenetworks.common.udf.UDFContext;
import java.util.List;
-import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP;
-
public interface AggregateConfigOptions {
Option<String> TYPE = Options.key("type")
.stringType()
@@ -42,7 +40,10 @@ public interface AggregateConfigOptions {
.intType()
.noDefaultValue()
.withDescription("The size of window.");
-
+ Option<Boolean> MINI_BATCH = Options.key("mini_batch")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("The label of pre_aggrergate.");
Option<Integer> WINDOW_SLIDE = Options.key("window_slide")
.intType()
.noDefaultValue()
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
index 455073f..6e5ab80 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
@@ -8,14 +8,11 @@ import java.io.Serializable;
public interface AggregateFunction extends Serializable {
void open(UDFContext udfContext);
-
Accumulator initAccumulator(Accumulator acc);
-
Accumulator add(Event val, Accumulator acc);
-
String functionName();
-
+ default Accumulator getMiddleResult(Accumulator acc){return acc;}
Accumulator getResult(Accumulator acc);
-
+ Accumulator merge(Accumulator a, Accumulator b);
default void close(){};
}