summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李奉超 <[email protected]>2024-08-30 01:34:08 +0000
committer李奉超 <[email protected]>2024-08-30 01:34:08 +0000
commitb17b5a08a5b51180ab8d9f0f210c1294e4f11fe2 (patch)
tree3dd004159328b48c3e3b2a24ef7f632798ccd690
parent9fd0e45c696ea08f92dc2d1c261f7e4be37a5555 (diff)
parent13323b1fe6315cedc5e312fe084fb883442fe066 (diff)
Merge branch 'feature/pre-agg' into 'develop'
Feature/pre agg See merge request galaxy/platform/groot-stream!101
-rw-r--r--config/grootstream_job_example.yaml1
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml51
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java7
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java191
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java68
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java60
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java59
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java37
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java131
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java55
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java26
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java46
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java48
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java42
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java57
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java44
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java45
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java53
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java37
-rw-r--r--groot-tests/test-e2e-kafka/pom.xml12
32 files changed, 1091 insertions, 204 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index b77958d..f5c6610 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -34,6 +34,7 @@ processing_pipelines:
window_timestamp_field: recv_time
window_size: 6
window_slide: 10 #滑动窗口步长
+ mini_batch: true
functions:
- function: NUMBER_SUM
lookup_fields: [ sent_pkts ]
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
index 2edc5e7..6ed3888 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -65,8 +65,10 @@ public class JobSplitWithAggTest {
} catch (Exception e) {
throw new JobExecuteException("Job executed error", e);
}
- Assert.assertEquals(1, CollectSink.values.size());
+
+ Assert.assertEquals(2, CollectSink.values.size());
Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString());
}
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
index 872800f..6a7011a 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -3,66 +3,34 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
json.ignore.parse.errors: false
+ watermark_timestamp: recv_time
+ watermark_timestamp_unit: ms
+ watermark_lag: 10
sinks:
collect_sink:
type: collect
properties:
format: json
-splits:
- test_split:
- type: split
- rules:
- - name: aggregate_processor
- expression: event.decoded_as == 'DNS'
postprocessing_pipelines:
- pre_etl_processor: # [object] Processing Pipeline
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
- remove_fields: [fields,tags]
- output_fields:
- functions: # [array of object] Function List
-
- - function: FLATTEN
- lookup_fields: [ fields,tags ]
- output_fields: [ ]
- parameters:
- #prefix: ""
- depth: 3
- # delimiter: "."
-
- - function: RENAME
- lookup_fields: [ '' ]
- output_fields: [ '' ]
- filter:
- parameters:
- # parent_fields: [tags]
- # rename_fields:
- # tags: tags
- rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
-
-
- - function: UNIX_TIMESTAMP_CONVERTER
- lookup_fields: [ timestamp_ms ]
- output_fields: [ recv_time ]
- parameters:
- precision: seconds
- interval: 300
- #
aggregate_processor:
type: aggregate
group_by_fields: [decoded_as]
- window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time
window_size: 5
window_timestamp_field: test_time
+ mini_batch: true
functions:
- function: NUMBER_SUM
lookup_fields: [ sessions ]
+ - function: MEAN
+ lookup_fields: [ pkts ]
table_processor:
type: table
@@ -79,9 +47,6 @@ application: # [object] Application Configuration
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
parallelism: 1 # [number] Operator-Level Parallelism.
- downstream: [test_split]
- - name: test_split
- parallelism: 1
downstream: [ aggregate_processor ]
- name: aggregate_processor
parallelism: 1
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
index 0b0379d..af94abf 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
@@ -5,8 +5,6 @@ import com.geedgenetworks.common.udf.UDFContext;
import java.util.List;
-import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP;
-
public interface AggregateConfigOptions {
Option<String> TYPE = Options.key("type")
.stringType()
@@ -42,7 +40,10 @@ public interface AggregateConfigOptions {
.intType()
.noDefaultValue()
.withDescription("The size of window.");
-
+ Option<Boolean> MINI_BATCH = Options.key("mini_batch")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("The label of pre_aggrergate.");
Option<Integer> WINDOW_SLIDE = Options.key("window_slide")
.intType()
.noDefaultValue()
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
index 455073f..6f6e048 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
@@ -8,14 +8,10 @@ import java.io.Serializable;
public interface AggregateFunction extends Serializable {
void open(UDFContext udfContext);
-
Accumulator initAccumulator(Accumulator acc);
-
Accumulator add(Event val, Accumulator acc);
-
String functionName();
-
Accumulator getResult(Accumulator acc);
-
+ Accumulator merge(Accumulator a, Accumulator b);
default void close(){};
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
index d3cbaac..ebdb0bd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
@@ -1,15 +1,15 @@
package com.geedgenetworks.core.pojo;
+import com.alibaba.fastjson2.annotation.JSONField;
import com.geedgenetworks.common.udf.UDFContext;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import java.io.Serializable;
import java.util.List;
-import java.util.Map;
+
@EqualsAndHashCode(callSuper = true)
@Data
-public class AggregateConfig extends ProcessorConfig {
+public class AggregateConfig extends ProcessorConfig {
private List<String> group_by_fields;
@@ -18,5 +18,7 @@ public class AggregateConfig extends ProcessorConfig {
private Integer window_size;
private Integer window_slide;
private List<UDFContext> functions;
+ @JSONField(defaultValue = "false" )
+ private Boolean mini_batch;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
index 416a7ea..2508730 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
@@ -13,18 +13,50 @@ public class OnlineStatistics {
aggregate += (delta * (val.doubleValue() - mean));
return this;
}
+
+ public OnlineStatistics merge(OnlineStatistics other) {
+ if (other.n == 0) {
+ return this; // Nothing to merge
+ }
+ if (this.n == 0) {
+ this.n = other.n;
+ this.mean = other.mean;
+ this.aggregate = other.aggregate;
+ return this;
+ }
+
+ // Combine counts
+ long newN = this.n + other.n;
+
+ // Calculate the new mean
+ double delta = other.mean - this.mean;
+ this.mean += delta * (other.n / (double) newN);
+
+ // Update the aggregate
+ this.aggregate += other.aggregate +
+ (this.n * delta * delta) / newN;
+
+ // Update the count
+ this.n = newN;
+
+ return this;
+ }
+
//计算总体标准差
public double stddevp() {
return Math.sqrt(variancep());
}
+
//计算总体方差
public double variancep() {
return aggregate / n;
}
+
//计算样本标准差
public double stddev() {
return Math.sqrt(variance());
}
+
//计算样本方差
public double variance() {
return aggregate / (n - 1);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
new file mode 100644
index 0000000..3632ba7
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -0,0 +1,191 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+
+import cn.hutool.crypto.SecureUtil;
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import com.geedgenetworks.core.processor.projection.UdfEntity;
+import com.google.common.collect.Lists;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
+import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+
+@Slf4j
+public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator> {
+
+ private final long windowSize;
+ private Long staggerOffset = null;
+
+ protected final Map<Long, Map<String, Accumulator>> windows = new HashMap<>();
+ protected List<String> groupByFields;
+ private LinkedList<UdfEntity> functions;
+
+ protected InternalMetrics internalMetrics;
+ private final AggregateConfig aggregateConfig;
+
+ public AbstractFirstAggregation(AggregateConfig aggregateConfig, long windowSize) {
+ this.windowSize = windowSize;
+ this.aggregateConfig = aggregateConfig;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ functions = Lists.newLinkedList();
+ try {
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
+ List<UDFContext> udfContexts = aggregateConfig.getFunctions();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ return;
+ }
+ Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+
+ udfContexts = aggregateConfig.getFunctions();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ throw new RuntimeException();
+ }
+ groupByFields = aggregateConfig.getGroup_by_fields();
+ functions = Lists.newLinkedList();
+ Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
+
+ for (UDFContext udfContext : udfContexts) {
+ Expression filterExpression = null;
+ UdfEntity udfEntity = new UdfEntity();
+ // 平台注册的函数包含任务中配置的函数则对函数进行实例化
+ if (udfClassReflect.containsKey(udfContext.getFunction())) {
+ Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
+ AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
+ // 函数如果包含filter,对表达式进行编译
+ if (udfContext.getFilter() != null) {
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ filterExpression = instance.compile(udfContext.getFilter(), true);
+ }
+ udfEntity.setAggregateFunction(aggregateFunction);
+ udfEntity.setFilterExpression(filterExpression);
+ udfEntity.setName(udfContext.getFunction());
+ udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ udfEntity.setUdfContext(udfContext);
+ functions.add(udfEntity);
+ } else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Unsupported UDAF: " + udfContext.getFunction());
+ }
+
+ }
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
+ }
+
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
+
+ }
+ }
+
+ @Override
+ public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception {
+
+ }
+
+ public void onTimer(long timestamp, Collector<Accumulator> out) throws Exception {
+ Map<String, Accumulator> accumulator = windows.remove(timestamp);
+ for (Accumulator value : accumulator.values()) {
+ value = getResult(value);
+ out.collect(value);
+ internalMetrics.incrementOutEvents();
+ }
+ accumulator.clear();
+ }
+
+ private long assignWindowStart(long timestamp, long offset) {
+ return timestamp - (timestamp - offset + windowSize) % windowSize;
+ }
+
+ protected long assignWindowEnd(long timestamp) {
+ if (staggerOffset == null) {
+ staggerOffset = getStaggerOffset();
+ }
+ return assignWindowStart(timestamp, (staggerOffset % windowSize)) + windowSize;
+ }
+
+ private long getStaggerOffset() {
+ return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize);
+ }
+
+ public Accumulator createAccumulator() {
+
+ Map<String, Object> map = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+
+ accumulator.setMetricsFields(map);
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().initAccumulator(accumulator);
+ }
+ return accumulator;
+
+ }
+
+ public String getKey(Event value, List<String> keys) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String key : keys) {
+
+ if (value.getExtractedFields().containsKey(key)) {
+ stringBuilder.append(value.getExtractedFields().get(key).toString());
+ } else {
+ stringBuilder.append(",");
+ }
+ }
+ return SecureUtil.md5(stringBuilder.toString());
+
+ }
+
+ public Accumulator add(Event event, Accumulator accumulator) {
+ accumulator.setInEvents(accumulator.getInEvents() + 1);
+ for (UdfEntity udafEntity : functions) {
+ try {
+ boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true;
+ if (result) {
+ udafEntity.getAggregateFunction().add(event, accumulator);
+ }
+ } catch (ExpressionRuntimeException ignore) {
+ log.error("Function " + udafEntity.getName() + " Invalid filter ! ");
+ accumulator.setErrorCount(accumulator.getErrorCount() + 1);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " execute exception !", e);
+ accumulator.setErrorCount(accumulator.getErrorCount() + 1);
+ }
+ }
+ return accumulator;
+ }
+
+ public Accumulator getResult(Accumulator accumulator) {
+ return accumulator;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index cf78310..c261fb6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -12,42 +12,83 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.OutputTag;
import static com.geedgenetworks.common.Constants.*;
-public class AggregateProcessorImpl implements AggregateProcessor {
+public class AggregateProcessorImpl implements AggregateProcessor {
@Override
- public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
- if (aggregateConfig.getParallelism() != 0) {
+ SingleOutputStreamOperator<Event> singleOutputStreamOperator;
+ if (aggregateConfig.getMini_batch()) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case TUMBLING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
+
}
- }else {
+ } else {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case TUMBLING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}
+ if (aggregateConfig.getParallelism() != 0) {
+ singleOutputStreamOperator.setParallelism(aggregateConfig.getParallelism());
+ }
+ return singleOutputStreamOperator.name(aggregateConfig.getName());
}
@@ -55,4 +96,5 @@ public class AggregateProcessorImpl implements AggregateProcessor {
public String type() {
return "aggregate";
}
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
new file mode 100644
index 0000000..9390de4
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
@@ -0,0 +1,60 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+@Slf4j
+public class FirstAggregationEventTime extends AbstractFirstAggregation {
+
+ private final PriorityQueue<Long> eventTimeTimersQueue = new PriorityQueue<>();
+
+ public FirstAggregationEventTime(AggregateConfig aggregateConfig, long windowSize) {
+ super(aggregateConfig, windowSize);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ }
+
+ @Override
+ public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception {
+ Long timestamp;
+ internalMetrics.incrementInEvents();
+ try {
+ String key = getKey(value, groupByFields);
+ while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) {
+ eventTimeTimersQueue.poll();
+ onTimer(timestamp, out);
+ }
+ long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime());
+ if (!windows.containsKey(windowEnd)) {
+ Map<String, Accumulator> map = new HashMap<>();
+ map.put(key, createAccumulator());
+ windows.put(windowEnd, map);
+ eventTimeTimersQueue.add(windowEnd);
+ } else {
+ if (!windows.get(windowEnd).containsKey(key)) {
+ windows.get(windowEnd).put(key, createAccumulator());
+ }
+ }
+ add(value, windows.get(windowEnd).get(key));
+ } catch (Exception e) {
+ log.error("Error in pre-aggregate processElement", e);
+ internalMetrics.incrementErrorEvents();
+ }
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
new file mode 100644
index 0000000..e98daa5
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
@@ -0,0 +1,59 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+@Slf4j
+public class FirstAggregationProcessingTime extends AbstractFirstAggregation {
+
+ private final PriorityQueue<Long> processingTimeTimersQueue = new PriorityQueue<>();
+
+ public FirstAggregationProcessingTime(AggregateConfig aggregateConfig, long windowSize) {
+ super(aggregateConfig, windowSize);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception {
+ Long timestamp;
+ internalMetrics.incrementInEvents();
+ try {
+ String key = getKey(value, groupByFields);
+ while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) {
+ processingTimeTimersQueue.poll();
+ onTimer(timestamp, out);
+ }
+ long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime());
+ if (!windows.containsKey(windowEnd)) {
+ Map<String, Accumulator> map = new HashMap<>();
+ map.put(key, createAccumulator());
+ windows.put(windowEnd, map);
+ processingTimeTimersQueue.add(windowEnd);
+ } else {
+ if (!windows.get(windowEnd).containsKey(key)) {
+ windows.get(windowEnd).put(key, createAccumulator());
+ }
+ }
+ add(value, windows.get(windowEnd).get(key));
+ } catch (Exception e) {
+ log.error("Error in pre-aggregate processElement", e);
+ internalMetrics.incrementErrorEvents();
+ }
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
index 165ed1b..da09690 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.processor.aggregate;
+import cn.hutool.crypto.SecureUtil;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.KeybyEntity;
@@ -20,16 +21,17 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec
KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
- for(String key: keys){
+ for (String key : keys) {
- if(value.getExtractedFields().containsKey(key)){
- keybyEntity.getKeys().put(key,value.getExtractedFields().get(key));
+ if (value.getExtractedFields().containsKey(key)) {
+ keybyEntity.getKeys().put(key, value.getExtractedFields().get(key));
stringBuilder.append(value.getExtractedFields().get(key).toString());
- }else {
+ } else {
stringBuilder.append(",");
}
}
- keybyEntity.setKeysToString(stringBuilder.toString());
- return keybyEntity;
+ String hashedKey = SecureUtil.md5(stringBuilder.toString());
+ keybyEntity.setKeysToString(hashedKey);
+ return keybyEntity;
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
new file mode 100644
index 0000000..6e43184
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
@@ -0,0 +1,37 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+import cn.hutool.crypto.SecureUtil;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.KeybyEntity;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class PreKeySelector implements org.apache.flink.api.java.functions.KeySelector<Accumulator, KeybyEntity> {
+
+
+ private final List<String> keys;
+
+ public PreKeySelector(List<String> keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public KeybyEntity getKey(Accumulator value) throws Exception {
+
+ KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String key : keys) {
+
+ if (value.getMetricsFields().containsKey(key)) {
+ keybyEntity.getKeys().put(key, value.getMetricsFields().get(key));
+ stringBuilder.append(value.getMetricsFields().get(key).toString());
+ } else {
+ stringBuilder.append(",");
+ }
+ }
+ String hashedKey = SecureUtil.md5(stringBuilder.toString());
+ keybyEntity.setKeysToString(hashedKey);
+ return keybyEntity;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
new file mode 100644
index 0000000..7c0a434
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -0,0 +1,131 @@
+package com.geedgenetworks.core.processor.aggregate;
+
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import com.geedgenetworks.core.processor.projection.UdfEntity;
+import com.google.common.collect.Lists;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.ExecutionConfig;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
+import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+
+@Slf4j
+public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> {
+ private final List<UDFContext> udfContexts;
+ private final List<String> udfClassNameLists;
+ private final LinkedList<UdfEntity> functions;
+
+ public SecondAggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
+ udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+ udfContexts = aggregateConfig.getFunctions();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ throw new RuntimeException();
+ }
+ functions = Lists.newLinkedList();
+ Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
+ try {
+ for (UDFContext udfContext : udfContexts) {
+ Expression filterExpression = null;
+ UdfEntity udfEntity = new UdfEntity();
+ // 平台注册的函数包含任务中配置的函数则对函数进行实例化
+ if (udfClassReflect.containsKey(udfContext.getFunction())) {
+ Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
+ AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
+ // 函数如果包含filter,对表达式进行编译
+ if (udfContext.getFilter() != null) {
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ filterExpression = instance.compile(udfContext.getFilter(), true);
+ }
+ udfEntity.setAggregateFunction(aggregateFunction);
+ udfEntity.setFilterExpression(filterExpression);
+ udfEntity.setName(udfContext.getFunction());
+ udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ udfEntity.setUdfContext(udfContext);
+ functions.add(udfEntity);
+ } else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Unsupported UDAF: " + udfContext.getFunction());
+ }
+
+ }
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
+ }
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
+
+ }
+ }
+
+ @Override
+ public Accumulator createAccumulator() {
+ Map<String, Object> map = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(map);
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().initAccumulator(accumulator);
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Accumulator add(Accumulator event, Accumulator accumulator) {
+ return merge(event, accumulator);
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator accumulator) {
+ for (UdfEntity udafEntity : functions) {
+ try {
+ udafEntity.getAggregateFunction().getResult(accumulator);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " getResult exception !", e);
+ }
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator acc1, Accumulator acc2) {
+ acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents());
+ acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents());
+ acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount());
+ for (UdfEntity udafEntity : functions) {
+ try {
+ boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true;
+ if (result) {
+ udafEntity.getAggregateFunction().merge(acc1, acc2);
+ }
+ } catch (ExpressionRuntimeException ignore) {
+ log.error("Function " + udafEntity.getName() + " Invalid filter ! ");
+ acc1.setErrorCount(acc1.getErrorCount() + 1);
+ } catch (Exception e) {
+ log.error("Function " + udafEntity.getName() + " execute exception !", e);
+ acc1.setErrorCount(acc1.getErrorCount() + 1);
+ }
+ }
+ return acc1;
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index 423eff9..3921ee2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -22,9 +22,9 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.processor.projection.UdfEntity;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* Collects elements within a group and returns the list of aggregated objects
@@ -36,18 +36,18 @@ public class CollectList implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new ArrayList<>());
@@ -56,7 +56,7 @@ public class CollectList implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
Object object = event.getExtractedFields().get(lookupField);
List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField);
aggregate.add(object);
@@ -75,4 +75,17 @@ public class CollectList implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ List<Object> firstValue = (List<Object>) firstAcc.getMetricsFields().get(outputField);
+ List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstValue.addAll(secondValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index b4dfb14..9ec9b09 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -8,7 +8,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@@ -23,17 +22,17 @@ public class CollectSet implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new HashSet<>());
@@ -42,7 +41,7 @@ public class CollectSet implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
Object object = event.getExtractedFields().get(lookupField);
Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField);
aggregate.add(object);
@@ -61,5 +60,16 @@ public class CollectSet implements AggregateFunction {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Set<Object> firstValue = (Set<Object>) firstAcc.getMetricsFields().get(outputField);
+ Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstValue.addAll(secondValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
index 6301a01..a1a35be 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -23,8 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
-
/**
* Collects elements within a group and returns the list of aggregated objects
*/
@@ -36,14 +34,13 @@ public class FirstValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
@@ -55,7 +52,7 @@ public class FirstValue implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){
+ if (!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)) {
acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
}
return acc;
@@ -71,4 +68,11 @@ public class FirstValue implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
index 1648fa5..6af0be3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
@@ -95,7 +95,10 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
his.merge(h);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
index f27a2e6..44b374e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -23,9 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Collects elements within a group and returns the list of aggregated objects
*/
@@ -37,17 +34,17 @@ public class LastValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
return acc;
@@ -55,7 +52,7 @@ public class LastValue implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
}
return acc;
@@ -71,4 +68,11 @@ public class LastValue implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
index ea33271..05de38c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
@@ -13,20 +13,22 @@ public class LongCount implements AggregateFunction {
@Override
- public void open(UDFContext udfContext){
- if(udfContext.getOutput_fields()==null ){
+ public void open(UDFContext udfContext) {
+ if (udfContext.getOutput_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
outputField = udfContext.getOutput_fields().get(0);
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
+
@Override
public Accumulator add(Event event, Accumulator acc) {
- acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L);
+ acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long) v + 1L);
return acc;
}
@@ -40,5 +42,18 @@ public class LongCount implements AggregateFunction {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+
+ long firstValue = (long) firstAcc.getMetricsFields().get(outputField);
+ long secondValue = (long) secondAcc.getMetricsFields().get(outputField);
+ firstValue = firstValue + secondValue;
+ firstAcc.getMetricsFields().put(outputField, firstValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
index 2a615ef..9c4e070 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
@@ -16,20 +16,20 @@ public class Mean implements AggregateFunction {
private String outputField;
private Integer precision;
private DecimalFormat df;
+
@Override
- public void open(UDFContext udfContext){
+ public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
- if(udfContext.getParameters()!= null && !udfContext.getParameters().isEmpty()) {
+ if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) {
precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString());
if (precision > 0) {
StringBuilder pattern = new StringBuilder("#.");
@@ -38,14 +38,15 @@ public class Mean implements AggregateFunction {
}
df = new DecimalFormat(pattern.toString());
}
- }else {
+ } else {
precision = -1;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
- acc.getMetricsFields().put(outputField,new OnlineStatistics());
+ acc.getMetricsFields().put(outputField, new OnlineStatistics());
return acc;
}
@@ -67,16 +68,27 @@ public class Mean implements AggregateFunction {
@Override
public Accumulator getResult(Accumulator acc) {
OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField);
- if(precision<0){
+ if (precision < 0) {
acc.getMetricsFields().put(outputField, aggregate.mean());
- }
- else if(precision>0){
+ } else if (precision > 0) {
acc.getMetricsFields().put(outputField, df.format(aggregate.mean()));
- }
- else {
- acc.getMetricsFields().put(outputField,(long)aggregate.mean());
+ } else {
+ acc.getMetricsFields().put(outputField, (long) aggregate.mean());
}
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ OnlineStatistics acc1 = (OnlineStatistics) firstAcc.getMetricsFields().get(outputField);
+ acc1.merge((OnlineStatistics) secondAcc.getMetricsFields().get(outputField));
+ long inEvents = firstAcc.getInEvents() + (secondAcc.getInEvents());
+ long outEvent = firstAcc.getOutEvents() + (secondAcc.getOutEvents());
+ long error = firstAcc.getErrorCount() + (secondAcc.getErrorCount());
+ firstAcc.setInEvents(inEvents);
+ firstAcc.setErrorCount(error);
+ firstAcc.setOutEvents(outEvent);
+ return firstAcc;
+ }
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
index 01e9a5b..e972133 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -6,7 +6,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.pojo.OnlineStatistics;
public class NumberSum implements AggregateFunction {
@@ -15,15 +14,14 @@ public class NumberSum implements AggregateFunction {
@Override
- public void open(UDFContext udfContext){
- if(udfContext.getLookup_fields()==null ){
+ public void open(UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
@@ -32,22 +30,23 @@ public class NumberSum implements AggregateFunction {
public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
+
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
- Number val = (Number) event.getExtractedFields().get(lookupField);
- Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L);
- if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) {
- aggregate = aggregate.longValue() + val.longValue();
- } else if (aggregate instanceof Float || val instanceof Float) {
- aggregate = aggregate.floatValue() + val.floatValue();
- } else if (aggregate instanceof Double || val instanceof Double) {
- aggregate = aggregate.doubleValue() + val.doubleValue();
- }
- acc.getMetricsFields().put(outputField, aggregate);
+ if (event.getExtractedFields().containsKey(lookupField)) {
+ Number val = (Number) event.getExtractedFields().get(lookupField);
+ Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L);
+ if (aggregate instanceof Long && (val instanceof Integer || val instanceof Long)) {
+ aggregate = aggregate.longValue() + val.longValue();
+ } else if (aggregate instanceof Float || val instanceof Float) {
+ aggregate = aggregate.floatValue() + val.floatValue();
+ } else if (aggregate instanceof Double || val instanceof Double) {
+ aggregate = aggregate.doubleValue() + val.doubleValue();
}
- return acc;
+ acc.getMetricsFields().put(outputField, aggregate);
+ }
+ return acc;
}
@Override
@@ -65,4 +64,24 @@ public class NumberSum implements AggregateFunction {
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+
+ Number firstValue = (Number) firstAcc.getMetricsFields().get(outputField);
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ if (firstValue instanceof Long && (secondValue instanceof Integer || secondValue instanceof Long)) {
+ firstValue = firstValue.longValue() + secondValue.longValue();
+ } else if (firstValue instanceof Float || secondValue instanceof Float) {
+ firstValue = firstValue.floatValue() + secondValue.floatValue();
+ } else if (firstValue instanceof Double || secondValue instanceof Double) {
+ firstValue = firstValue.doubleValue() + secondValue.doubleValue();
+ }
+ firstAcc.getMetricsFields().put(outputField, firstValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
index ec003f8..041bad9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
@@ -16,7 +16,10 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public String functionName() {
return "APPROX_COUNT_DISTINCT_HLLD";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
index 71d61dc..d6c3a44 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
@@ -92,7 +92,10 @@ public abstract class HlldBaseAggregate implements AggregateFunction {
Hll hll = HllUtils.deserializeHll(value);
hllUnion.update(hll);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
index b0d846b..2bf13a5 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
@@ -36,11 +36,51 @@ public class CollectListTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
- private static void excute(List<String> arr) throws ParseException {
+ private void testMerge(List<String> arr,List<String> arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectList collectList = new CollectList();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectList.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = collectList.getResult(collectList.merge(result1,result2));
+ List<String> vals = (List<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),8);
+ assertEquals("192.168.1.6",vals.get(5).toString());
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ CollectList collectList = new CollectList();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectList.open(udfContext);
+ Accumulator agg = collectList.initAccumulator(accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectList.add(event, agg);
+
+ }
+ return agg;
+ }
+ private void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
index ea4fe8d..8e992f6 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
@@ -31,14 +31,53 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class CollectSetTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
- private static void excute(List<String> arr) throws ParseException {
+ private void testMerge(List<String> arr,List<String> arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectSet collectSet = new CollectSet();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectSet.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = collectSet.getResult(collectSet.merge(result1,result2));
+ Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),6);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ CollectSet collectSet = new CollectSet();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectSet.open(udfContext);
+ Accumulator agg = collectSet.initAccumulator(accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectSet.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
index 506f6de..43a9732 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
@@ -34,11 +34,48 @@ public class FirstValueTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
+ private void testMerge(List<String> arr,List<String> arr2) {
- private static void excute(List<String> arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_first"));
+ FirstValue firstValue = new FirstValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ firstValue.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = firstValue.getResult(firstValue.merge(result1,result2));
+ String val = (String) result.getMetricsFields().get("field_first");
+ assertEquals(val,"192.168.1.1");
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ FirstValue firstValue = new FirstValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ firstValue.open(udfContext);
+ Accumulator agg = firstValue.initAccumulator(accumulator);
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = firstValue.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
index f8306cd..e952908 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
@@ -37,11 +37,48 @@ public class LastValueTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.3");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
+ private void testMerge(List<String> arr,List<String> arr2) {
- private static void excute(List<String> arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_last"));
+ LastValue lastValue = new LastValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ lastValue.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = lastValue.getResult(lastValue.merge(result1,result2));
+ String val = (String) result.getMetricsFields().get("field_last");
+ assertEquals(val,"192.168.1.3");
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ LastValue lastValue = new LastValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ lastValue.open(udfContext);
+ Accumulator agg = lastValue.initAccumulator(accumulator);
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = lastValue.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
index 3c02499..c1dfb9e 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
@@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.LastValue;
import com.geedgenetworks.core.udf.udaf.LongCount;
import com.geedgenetworks.core.udf.udaf.NumberSum;
import com.ibm.icu.text.NumberFormat;
@@ -38,10 +39,46 @@ public class LongCountTest {
public void test() throws ParseException {
Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
- excute(longArr);
+ Long[] longArr2 = new Long[]{1L, 2L, 3L, 4L};
+ testMerge(longArr,longArr2);
+ testGetResult(longArr);
}
+ private void testMerge(Number[] arr,Number[] arr2) {
- private static void excute(Number[] arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("count"));
+ LongCount longCount = new LongCount();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ longCount.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = longCount.getResult(longCount.merge(result1,result2));
+ assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),8);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ LongCount longCount = new LongCount();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ longCount.open(udfContext);
+ Accumulator agg = longCount.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = longCount.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void testGetResult(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setOutput_fields(Collections.singletonList("count"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
index 6deed0f..cc4eaf0 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
@@ -39,12 +39,52 @@ public class MeanTest {
Integer[] intArr1 = new Integer[]{1, 2, 3, 4};
Integer[] intArr2 = new Integer[]{1, 6, 3};
- excute(intArr1, 0);
- excute2(intArr2, 2);
- excute3(intArr1);
+ testInt(intArr1, 0);
+ testDouble(intArr2, 2);
+ testNoPrecision(intArr1);
+ testMerge(intArr1,intArr2,2);
+ }
+ private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ mean.open(udfContext);
+ Accumulator result1 = getMiddleResult(arr1,precision);
+ Accumulator result2 = getMiddleResult(arr2,precision);
+ Accumulator result = mean.getResult(mean.merge(result1,result2));
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2.86"));
+ }
+ private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ mean.open(udfContext);
+ Accumulator agg = mean.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = mean.add(event, agg);
+
+ }
+ return agg;
}
- private static void excute(Number[] arr,int precision) throws ParseException {
+
+ private void testInt(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
@@ -65,11 +105,12 @@ public class MeanTest {
agg = mean.add(event, agg);
}
+
Accumulator result = mean.getResult(agg);
assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2"));
}
- private static void excute2(Number[] arr,int precision) throws ParseException {
+ private void testDouble(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
@@ -94,7 +135,7 @@ public class MeanTest {
assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33"));
}
- private static void excute3(Number[] arr) throws ParseException {
+ private void testNoPrecision(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
index d0d3d2c..a4072ca 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
@@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.LongCount;
import com.geedgenetworks.core.udf.udaf.NumberSum;
import com.ibm.icu.text.NumberFormat;
import org.junit.jupiter.api.Test;
@@ -41,8 +42,44 @@ public class NumberSumTest {
excute(doubleArr, Double.class);
excute(intArr, Long.class);
excute(longArr, Long.class);
+ testMerge(intArr,floatArr);
+
+ }
+ private void testMerge(Number[] arr,Number[] arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_sum"));
+ NumberSum numberSum = new NumberSum();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberSum.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = numberSum.getResult(numberSum.merge(result1,result2));
+ assertEquals(Float.parseFloat((result.getMetricsFields().get("field_sum").toString())),20.0f);
+
}
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ NumberSum numberSum = new NumberSum();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberSum.open(udfContext);
+ Accumulator agg = numberSum.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberSum.add(event, agg);
+ }
+ return agg;
+ }
private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
UDFContext udfContext = new UDFContext();
diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml
index ab1ba72..4592f79 100644
--- a/groot-tests/test-e2e-kafka/pom.xml
+++ b/groot-tests/test-e2e-kafka/pom.xml
@@ -47,6 +47,18 @@
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>9</source>
+ <target>9</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project> \ No newline at end of file