diff options
| author | 李奉超 <[email protected]> | 2024-08-30 01:34:08 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-08-30 01:34:08 +0000 |
| commit | b17b5a08a5b51180ab8d9f0f210c1294e4f11fe2 (patch) | |
| tree | 3dd004159328b48c3e3b2a24ef7f632798ccd690 | |
| parent | 9fd0e45c696ea08f92dc2d1c261f7e4be37a5555 (diff) | |
| parent | 13323b1fe6315cedc5e312fe084fb883442fe066 (diff) | |
Merge branch 'feature/pre-agg' into 'develop'
Feature/pre agg
See merge request galaxy/platform/groot-stream!101
32 files changed, 1091 insertions, 204 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index b77958d..f5c6610 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -34,6 +34,7 @@ processing_pipelines: window_timestamp_field: recv_time window_size: 6 window_slide: 10 #滑动窗口步长 + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sent_pkts ] diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java index 2edc5e7..6ed3888 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -65,8 +65,10 @@ public class JobSplitWithAggTest { } catch (Exception e) { throw new JobExecuteException("Job executed error", e); } - Assert.assertEquals(1, CollectSink.values.size()); + + Assert.assertEquals(2, CollectSink.values.size()); Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString()); } } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 872800f..6a7011a 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,66 +3,34 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json json.ignore.parse.errors: false + watermark_timestamp: recv_time + watermark_timestamp_unit: ms + watermark_lag: 10 sinks: collect_sink: type: collect properties: format: json -splits: - test_split: - type: split - rules: - - name: aggregate_processor - expression: event.decoded_as == 'DNS' postprocessing_pipelines: - pre_etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: [fields,tags] - output_fields: - functions: # [array of object] Function List - - - function: FLATTEN - lookup_fields: [ fields,tags ] - output_fields: [ ] - parameters: - #prefix: "" - depth: 3 - # delimiter: "." - - - function: RENAME - lookup_fields: [ '' ] - output_fields: [ '' ] - filter: - parameters: - # parent_fields: [tags] - # rename_fields: - # tags: tags - rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; - - - - function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ timestamp_ms ] - output_fields: [ recv_time ] - parameters: - precision: seconds - interval: 300 - # aggregate_processor: type: aggregate group_by_fields: [decoded_as] - window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_size: 5 window_timestamp_field: test_time + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sessions ] + - function: MEAN + lookup_fields: [ pkts ] table_processor: type: table @@ -79,9 +47,6 @@ application: # [object] Application Configuration topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [test_split] - - name: test_split - parallelism: 1 downstream: [ aggregate_processor ] - name: aggregate_processor parallelism: 1 diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java index 0b0379d..af94abf 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -5,8 +5,6 @@ import com.geedgenetworks.common.udf.UDFContext; import java.util.List; -import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP; - public interface AggregateConfigOptions { Option<String> TYPE = Options.key("type") .stringType() @@ -42,7 +40,10 @@ public interface AggregateConfigOptions { .intType() .noDefaultValue() .withDescription("The size of window."); - + Option<Boolean> MINI_BATCH = Options.key("mini_batch") + .booleanType() + .defaultValue(false) + .withDescription("The label of pre_aggrergate."); Option<Integer> WINDOW_SLIDE = Options.key("window_slide") .intType() .noDefaultValue() diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java index 455073f..6f6e048 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -8,14 +8,10 @@ import java.io.Serializable; public interface AggregateFunction extends Serializable { void open(UDFContext udfContext); - Accumulator initAccumulator(Accumulator acc); - Accumulator add(Event val, Accumulator acc); - String functionName(); - Accumulator getResult(Accumulator acc); - + Accumulator merge(Accumulator a, Accumulator b); default void close(){}; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index d3cbaac..ebdb0bd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -1,15 +1,15 @@ package com.geedgenetworks.core.pojo; +import com.alibaba.fastjson2.annotation.JSONField; import com.geedgenetworks.common.udf.UDFContext; import lombok.Data; import lombok.EqualsAndHashCode; -import java.io.Serializable; import java.util.List; -import java.util.Map; + @EqualsAndHashCode(callSuper = true) @Data -public class AggregateConfig extends ProcessorConfig { +public class AggregateConfig extends ProcessorConfig { private List<String> group_by_fields; @@ -18,5 +18,7 @@ public class AggregateConfig extends ProcessorConfig { private Integer window_size; private Integer window_slide; private List<UDFContext> functions; + @JSONField(defaultValue = "false" ) + private Boolean mini_batch; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java index 416a7ea..2508730 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java @@ -13,18 +13,50 @@ public class OnlineStatistics { aggregate += (delta * (val.doubleValue() - mean)); return this; } + + public OnlineStatistics merge(OnlineStatistics other) { + if (other.n == 0) { + return this; // Nothing to merge + } + if (this.n == 0) { + this.n = other.n; + this.mean = other.mean; + this.aggregate = other.aggregate; + return this; + } + + // Combine counts + long newN = this.n + other.n; + + // Calculate the new mean + double delta = other.mean - this.mean; + this.mean += delta * (other.n / (double) newN); + + // Update the aggregate + this.aggregate += other.aggregate + + (this.n * delta * delta) / newN; + + // Update the count + this.n = newN; + + return this; + } + //计算总体标准差 public double stddevp() { return Math.sqrt(variancep()); } + //计算总体方差 public double variancep() { return aggregate / n; } + //计算样本标准差 public double stddev() { return Math.sqrt(variance()); } + //计算样本方差 public double variance() { return aggregate / (n - 1); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java new file mode 100644 index 0000000..3632ba7 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -0,0 +1,191 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import cn.hutool.crypto.SecureUtil; +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +@Slf4j +public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator> { + + private final long windowSize; + private Long staggerOffset = null; + + protected final Map<Long, Map<String, Accumulator>> windows = new HashMap<>(); + protected List<String> groupByFields; + private LinkedList<UdfEntity> functions; + + protected InternalMetrics internalMetrics; + private final AggregateConfig aggregateConfig; + + public AbstractFirstAggregation(AggregateConfig aggregateConfig, long windowSize) { + this.windowSize = windowSize; + this.aggregateConfig = aggregateConfig; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + functions = Lists.newLinkedList(); + try { + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + List<UDFContext> udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + return; + } + Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + + udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + throw new RuntimeException(); + } + groupByFields = aggregateConfig.getGroup_by_fields(); + functions = Lists.newLinkedList(); + Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); + + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setAggregateFunction(aggregateFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + udfEntity.setUdfContext(udfContext); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDAF: " + udfContext.getFunction()); + } + + } + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); + } + + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); + + } + } + + @Override + public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception { + + } + + public void onTimer(long timestamp, Collector<Accumulator> out) throws Exception { + Map<String, Accumulator> accumulator = windows.remove(timestamp); + for (Accumulator value : accumulator.values()) { + value = getResult(value); + out.collect(value); + internalMetrics.incrementOutEvents(); + } + accumulator.clear(); + } + + private long assignWindowStart(long timestamp, long offset) { + return timestamp - (timestamp - offset + windowSize) % windowSize; + } + + protected long assignWindowEnd(long timestamp) { + if (staggerOffset == null) { + staggerOffset = getStaggerOffset(); + } + return assignWindowStart(timestamp, (staggerOffset % windowSize)) + windowSize; + } + + private long getStaggerOffset() { + return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize); + } + + public Accumulator createAccumulator() { + + Map<String, Object> map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + + accumulator.setMetricsFields(map); + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().initAccumulator(accumulator); + } + return accumulator; + + } + + public String getKey(Event value, List<String> keys) { + StringBuilder stringBuilder = new StringBuilder(); + for (String key : keys) { + + if (value.getExtractedFields().containsKey(key)) { + stringBuilder.append(value.getExtractedFields().get(key).toString()); + } else { + stringBuilder.append(","); + } + } + return SecureUtil.md5(stringBuilder.toString()); + + } + + public Accumulator add(Event event, Accumulator accumulator) { + accumulator.setInEvents(accumulator.getInEvents() + 1); + for (UdfEntity udafEntity : functions) { + try { + boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; + if (result) { + udafEntity.getAggregateFunction().add(event, accumulator); + } + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udafEntity.getName() + " Invalid filter ! "); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " execute exception !", e); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } + } + return accumulator; + } + + public Accumulator getResult(Accumulator accumulator) { + return accumulator; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index cf78310..c261fb6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -12,42 +12,83 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.OutputTag; import static com.geedgenetworks.common.Constants.*; -public class AggregateProcessorImpl implements AggregateProcessor { +public class AggregateProcessorImpl implements AggregateProcessor { @Override - public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { - if (aggregateConfig.getParallelism() != 0) { + SingleOutputStreamOperator<Event> singleOutputStreamOperator; + if (aggregateConfig.getMini_batch()) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); + } - }else { + } else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } } + if (aggregateConfig.getParallelism() != 0) { + singleOutputStreamOperator.setParallelism(aggregateConfig.getParallelism()); + } + return singleOutputStreamOperator.name(aggregateConfig.getName()); } @@ -55,4 +96,5 @@ public class AggregateProcessorImpl implements AggregateProcessor { public String type() { return "aggregate"; } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java new file mode 100644 index 0000000..9390de4 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -0,0 +1,60 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.AggregateConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +@Slf4j +public class FirstAggregationEventTime extends AbstractFirstAggregation { + + private final PriorityQueue<Long> eventTimeTimersQueue = new PriorityQueue<>(); + + public FirstAggregationEventTime(AggregateConfig aggregateConfig, long windowSize) { + super(aggregateConfig, windowSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + } + + @Override + public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception { + Long timestamp; + internalMetrics.incrementInEvents(); + try { + String key = getKey(value, groupByFields); + while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + eventTimeTimersQueue.poll(); + onTimer(timestamp, out); + } + long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + if (!windows.containsKey(windowEnd)) { + Map<String, Accumulator> map = new HashMap<>(); + map.put(key, createAccumulator()); + windows.put(windowEnd, map); + eventTimeTimersQueue.add(windowEnd); + } else { + if (!windows.get(windowEnd).containsKey(key)) { + windows.get(windowEnd).put(key, createAccumulator()); + } + } + add(value, windows.get(windowEnd).get(key)); + } catch (Exception e) { + log.error("Error in pre-aggregate processElement", e); + internalMetrics.incrementErrorEvents(); + } + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java new file mode 100644 index 0000000..e98daa5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java @@ -0,0 +1,59 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.AggregateConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +@Slf4j +public class FirstAggregationProcessingTime extends AbstractFirstAggregation { + + private final PriorityQueue<Long> processingTimeTimersQueue = new PriorityQueue<>(); + + public FirstAggregationProcessingTime(AggregateConfig aggregateConfig, long windowSize) { + super(aggregateConfig, windowSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + } + + @Override + public void processElement(Event value, ProcessFunction<Event, Accumulator>.Context ctx, Collector<Accumulator> out) throws Exception { + Long timestamp; + internalMetrics.incrementInEvents(); + try { + String key = getKey(value, groupByFields); + while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + processingTimeTimersQueue.poll(); + onTimer(timestamp, out); + } + long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + if (!windows.containsKey(windowEnd)) { + Map<String, Accumulator> map = new HashMap<>(); + map.put(key, createAccumulator()); + windows.put(windowEnd, map); + processingTimeTimersQueue.add(windowEnd); + } else { + if (!windows.get(windowEnd).containsKey(key)) { + windows.get(windowEnd).put(key, createAccumulator()); + } + } + add(value, windows.get(windowEnd).get(key)); + } catch (Exception e) { + log.error("Error in pre-aggregate processElement", e); + internalMetrics.incrementErrorEvents(); + } + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java index 165ed1b..da09690 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.processor.aggregate; +import cn.hutool.crypto.SecureUtil; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.KeybyEntity; @@ -20,16 +21,17 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); - for(String key: keys){ + for (String key : keys) { - if(value.getExtractedFields().containsKey(key)){ - keybyEntity.getKeys().put(key,value.getExtractedFields().get(key)); + if (value.getExtractedFields().containsKey(key)) { + keybyEntity.getKeys().put(key, value.getExtractedFields().get(key)); stringBuilder.append(value.getExtractedFields().get(key).toString()); - }else { + } else { stringBuilder.append(","); } } - keybyEntity.setKeysToString(stringBuilder.toString()); - return keybyEntity; + String hashedKey = SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(hashedKey); + return keybyEntity; } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java new file mode 100644 index 0000000..6e43184 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java @@ -0,0 +1,37 @@ +package com.geedgenetworks.core.processor.aggregate; + +import cn.hutool.crypto.SecureUtil; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.KeybyEntity; + +import java.util.HashMap; +import java.util.List; + +public class PreKeySelector implements org.apache.flink.api.java.functions.KeySelector<Accumulator, KeybyEntity> { + + + private final List<String> keys; + + public PreKeySelector(List<String> keys) { + this.keys = keys; + } + + @Override + public KeybyEntity getKey(Accumulator value) throws Exception { + + KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); + StringBuilder stringBuilder = new StringBuilder(); + for (String key : keys) { + + if (value.getMetricsFields().containsKey(key)) { + keybyEntity.getKeys().put(key, value.getMetricsFields().get(key)); + stringBuilder.append(value.getMetricsFields().get(key).toString()); + } else { + stringBuilder.append(","); + } + } + String hashedKey = SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(hashedKey); + return keybyEntity; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java new file mode 100644 index 0000000..7c0a434 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -0,0 +1,131 @@ +package com.geedgenetworks.core.processor.aggregate; + +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.ExecutionConfig; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +@Slf4j +public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> { + private final List<UDFContext> udfContexts; + private final List<String> udfClassNameLists; + private final LinkedList<UdfEntity> functions; + + public SecondAggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { + udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + throw new RuntimeException(); + } + functions = Lists.newLinkedList(); + Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); + try { + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setAggregateFunction(aggregateFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + udfEntity.setUdfContext(udfContext); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDAF: " + udfContext.getFunction()); + } + + } + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); + } + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); + + } + } + + @Override + public Accumulator createAccumulator() { + Map<String, Object> map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(map); + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().initAccumulator(accumulator); + } + return accumulator; + } + + @Override + public Accumulator add(Accumulator event, Accumulator accumulator) { + return merge(event, accumulator); + } + + @Override + public Accumulator getResult(Accumulator accumulator) { + for (UdfEntity udafEntity : functions) { + try { + udafEntity.getAggregateFunction().getResult(accumulator); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " getResult exception !", e); + } + } + return accumulator; + } + + @Override + public Accumulator merge(Accumulator acc1, Accumulator acc2) { + acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents()); + acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents()); + acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount()); + for (UdfEntity udafEntity : functions) { + try { + boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true; + if (result) { + udafEntity.getAggregateFunction().merge(acc1, acc2); + } + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udafEntity.getName() + " Invalid filter ! "); + acc1.setErrorCount(acc1.getErrorCount() + 1); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " execute exception !", e); + acc1.setErrorCount(acc1.getErrorCount() + 1); + } + } + return acc1; + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 423eff9..3921ee2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -22,9 +22,9 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.processor.projection.UdfEntity; -import java.util.*; +import java.util.ArrayList; +import java.util.List; /** * Collects elements within a group and returns the list of aggregated objects @@ -36,18 +36,18 @@ public class CollectList implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new ArrayList<>()); @@ -56,7 +56,7 @@ public class CollectList implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -75,4 +75,17 @@ public class CollectList implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List<Object> firstValue = (List<Object>) firstAcc.getMetricsFields().get(outputField); + List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index b4dfb14..9ec9b09 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -8,7 +8,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -23,17 +22,17 @@ public class CollectSet implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new HashSet<>()); @@ -42,7 +41,7 @@ public class CollectSet implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -61,5 +60,16 @@ public class CollectSet implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set<Object> firstValue = (Set<Object>) firstAcc.getMetricsFields().get(outputField); + Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java index 6301a01..a1a35be 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,8 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -36,14 +34,13 @@ public class FirstValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -55,7 +52,7 @@ public class FirstValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){ + if (!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class FirstValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java index 1648fa5..6af0be3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -95,7 +95,10 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { his.merge(h);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java index f27a2e6..44b374e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,9 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; -import java.util.List; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -37,17 +34,17 @@ public class LastValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; @@ -55,7 +52,7 @@ public class LastValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class LastValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java index ea33271..05de38c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java @@ -13,20 +13,22 @@ public class LongCount implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getOutput_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getOutput_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } outputField = udfContext.getOutput_fields().get(0); } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L); + acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long) v + 1L); return acc; } @@ -40,5 +42,18 @@ public class LongCount implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + long firstValue = (long) firstAcc.getMetricsFields().get(outputField); + long secondValue = (long) secondAcc.getMetricsFields().get(outputField); + firstValue = firstValue + secondValue; + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java index 2a615ef..9c4e070 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -16,20 +16,20 @@ public class Mean implements AggregateFunction { private String outputField; private Integer precision; private DecimalFormat df; + @Override - public void open(UDFContext udfContext){ + public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } - if(udfContext.getParameters()!= null && !udfContext.getParameters().isEmpty()) { + if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString()); if (precision > 0) { StringBuilder pattern = new StringBuilder("#."); @@ -38,14 +38,15 @@ public class Mean implements AggregateFunction { } df = new DecimalFormat(pattern.toString()); } - }else { + } else { precision = -1; } } + @Override public Accumulator initAccumulator(Accumulator acc) { - acc.getMetricsFields().put(outputField,new OnlineStatistics()); + acc.getMetricsFields().put(outputField, new OnlineStatistics()); return acc; } @@ -67,16 +68,27 @@ public class Mean implements AggregateFunction { @Override public Accumulator getResult(Accumulator acc) { OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField); - if(precision<0){ + if (precision < 0) { acc.getMetricsFields().put(outputField, aggregate.mean()); - } - else if(precision>0){ + } else if (precision > 0) { acc.getMetricsFields().put(outputField, df.format(aggregate.mean())); - } - else { - acc.getMetricsFields().put(outputField,(long)aggregate.mean()); + } else { + acc.getMetricsFields().put(outputField, (long) aggregate.mean()); } return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + OnlineStatistics acc1 = (OnlineStatistics) firstAcc.getMetricsFields().get(outputField); + acc1.merge((OnlineStatistics) secondAcc.getMetricsFields().get(outputField)); + long inEvents = firstAcc.getInEvents() + (secondAcc.getInEvents()); + long outEvent = firstAcc.getOutEvents() + (secondAcc.getOutEvents()); + long error = firstAcc.getErrorCount() + (secondAcc.getErrorCount()); + firstAcc.setInEvents(inEvents); + firstAcc.setErrorCount(error); + firstAcc.setOutEvents(outEvent); + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index 01e9a5b..e972133 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -6,7 +6,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.pojo.OnlineStatistics; public class NumberSum implements AggregateFunction { @@ -15,15 +14,14 @@ public class NumberSum implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getLookup_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -32,22 +30,23 @@ public class NumberSum implements AggregateFunction { public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ - Number val = (Number) event.getExtractedFields().get(lookupField); - Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); - if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) { - aggregate = aggregate.longValue() + val.longValue(); - } else if (aggregate instanceof Float || val instanceof Float) { - aggregate = aggregate.floatValue() + val.floatValue(); - } else if (aggregate instanceof Double || val instanceof Double) { - aggregate = aggregate.doubleValue() + val.doubleValue(); - } - acc.getMetricsFields().put(outputField, aggregate); + if (event.getExtractedFields().containsKey(lookupField)) { + Number val = (Number) event.getExtractedFields().get(lookupField); + Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); + if (aggregate instanceof Long && (val instanceof Integer || val instanceof Long)) { + aggregate = aggregate.longValue() + val.longValue(); + } else if (aggregate instanceof Float || val instanceof Float) { + aggregate = aggregate.floatValue() + val.floatValue(); + } else if (aggregate instanceof Double || val instanceof Double) { + aggregate = aggregate.doubleValue() + val.doubleValue(); } - return acc; + acc.getMetricsFields().put(outputField, aggregate); + } + return acc; } @Override @@ -65,4 +64,24 @@ public class NumberSum implements AggregateFunction { } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + Number firstValue = (Number) firstAcc.getMetricsFields().get(outputField); + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + if (firstValue instanceof Long && (secondValue instanceof Integer || secondValue instanceof Long)) { + firstValue = firstValue.longValue() + secondValue.longValue(); + } else if (firstValue instanceof Float || secondValue instanceof Float) { + firstValue = firstValue.floatValue() + secondValue.floatValue(); + } else if (firstValue instanceof Double || secondValue instanceof Double) { + firstValue = firstValue.doubleValue() + secondValue.doubleValue(); + } + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java index ec003f8..041bad9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java @@ -16,7 +16,10 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate { return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public String functionName() {
return "APPROX_COUNT_DISTINCT_HLLD";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java index 71d61dc..d6c3a44 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -92,7 +92,10 @@ public abstract class HlldBaseAggregate implements AggregateFunction { Hll hll = HllUtils.deserializeHll(value);
hllUnion.update(hll);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index b0d846b..2bf13a5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -36,11 +36,51 @@ public class CollectListTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List<String> arr) throws ParseException { + private void testMerge(List<String> arr,List<String> arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectList collectList = new CollectList(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectList.getResult(collectList.merge(result1,result2)); + List<String> vals = (List<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),8); + assertEquals("192.168.1.6",vals.get(5).toString()); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + CollectList collectList = new CollectList(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator agg = collectList.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectList.add(event, agg); + + } + return agg; + } + private void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index ea4fe8d..8e992f6 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -31,14 +31,53 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CollectSetTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List<String> arr) throws ParseException { + private void testMerge(List<String> arr,List<String> arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectSet collectSet = new CollectSet(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectSet.getResult(collectSet.merge(result1,result2)); + Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),6); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + CollectSet collectSet = new CollectSet(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator agg = collectSet.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectSet.add(event, agg); + + } + return agg; + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java index 506f6de..43a9732 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -34,11 +34,48 @@ public class FirstValueTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List<String> arr,List<String> arr2) { - private static void excute(List<String> arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_first")); + FirstValue firstValue = new FirstValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = firstValue.getResult(firstValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_first"); + assertEquals(val,"192.168.1.1"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + FirstValue firstValue = new FirstValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator agg = firstValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = firstValue.add(event, agg); + + } + return agg; + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java index f8306cd..e952908 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -37,11 +37,48 @@ public class LastValueTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.3"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List<String> arr,List<String> arr2) { - private static void excute(List<String> arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_last")); + LastValue lastValue = new LastValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = lastValue.getResult(lastValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_last"); + assertEquals(val,"192.168.1.3"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + LastValue lastValue = new LastValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator agg = lastValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = lastValue.add(event, agg); + + } + return agg; + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java index 3c02499..c1dfb9e 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LastValue; import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; @@ -38,10 +39,46 @@ public class LongCountTest { public void test() throws ParseException { Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; - excute(longArr); + Long[] longArr2 = new Long[]{1L, 2L, 3L, 4L}; + testMerge(longArr,longArr2); + testGetResult(longArr); } + private void testMerge(Number[] arr,Number[] arr2) { - private static void excute(Number[] arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("count")); + LongCount longCount = new LongCount(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = longCount.getResult(longCount.merge(result1,result2)); + assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),8); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + LongCount longCount = new LongCount(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator agg = longCount.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = longCount.add(event, agg); + + } + return agg; + } + private static void testGetResult(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setOutput_fields(Collections.singletonList("count")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java index 6deed0f..cc4eaf0 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -39,12 +39,52 @@ public class MeanTest { Integer[] intArr1 = new Integer[]{1, 2, 3, 4}; Integer[] intArr2 = new Integer[]{1, 6, 3}; - excute(intArr1, 0); - excute2(intArr2, 2); - excute3(intArr1); + testInt(intArr1, 0); + testDouble(intArr2, 2); + testNoPrecision(intArr1); + testMerge(intArr1,intArr2,2); + } + private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator result1 = getMiddleResult(arr1,precision); + Accumulator result2 = getMiddleResult(arr2,precision); + Accumulator result = mean.getResult(mean.merge(result1,result2)); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2.86")); + } + private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator agg = mean.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = mean.add(event, agg); + + } + return agg; } - private static void excute(Number[] arr,int precision) throws ParseException { + + private void testInt(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -65,11 +105,12 @@ public class MeanTest { agg = mean.add(event, agg); } + Accumulator result = mean.getResult(agg); assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2")); } - private static void excute2(Number[] arr,int precision) throws ParseException { + private void testDouble(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -94,7 +135,7 @@ public class MeanTest { assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33")); } - private static void excute3(Number[] arr) throws ParseException { + private void testNoPrecision(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java index d0d3d2c..a4072ca 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; import org.junit.jupiter.api.Test; @@ -41,8 +42,44 @@ public class NumberSumTest { excute(doubleArr, Double.class); excute(intArr, Long.class); excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_sum")); + NumberSum numberSum = new NumberSum(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberSum.getResult(numberSum.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_sum").toString())),20.0f); + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberSum numberSum = new NumberSum(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator agg = numberSum.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberSum.add(event, agg); + } + return agg; + } private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { UDFContext udfContext = new UDFContext(); diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml index ab1ba72..4592f79 100644 --- a/groot-tests/test-e2e-kafka/pom.xml +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -47,6 +47,18 @@ </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>9</source> + <target>9</target> + </configuration> + </plugin> + </plugins> + </build> </project>
\ No newline at end of file |
