diff options
| author | wangkuan <[email protected]> | 2024-11-19 16:59:37 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-11-19 16:59:37 +0800 |
| commit | 36572cdae6f48671a463821b610e4549286a2e65 (patch) | |
| tree | 6a2b3467e39f0af61401ccd0ec8a1e52b6b2da66 | |
| parent | 036b7e1f8d28fc25daa11ac76dc16557136e14cc (diff) | |
[improve][bootstrap]修改原UDFUtil工具类位置,后期加载udf函数方式待优化
| -rw-r--r-- | groot-api/src/main/java/com/geedgenetworks/api/utils/UDFLoaderUtils.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java) | 14 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java | 7 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java | 7 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java | 7 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java | 8 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java | 7 |
6 files changed, 8 insertions, 42 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java b/groot-api/src/main/java/com/geedgenetworks/api/utils/UDFLoaderUtils.java index 9d4f67b..da6ebe1 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/utils/UDFLoaderUtils.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.core.utils; +package com.geedgenetworks.api.utils; import com.googlecode.aviator.Expression; import com.googlecode.aviator.exception.ExpressionRuntimeException; @@ -8,7 +8,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class UDFUtils { +public class UDFLoaderUtils { public static Map<String, String> getClassReflect(List<String> plugins) { Map<String, String> classReflect = new HashMap<>(); @@ -28,15 +28,5 @@ public class UDFUtils { } return classReflect; } - public static Boolean filterExecute(Expression expression, Map<String, Object> map) { - boolean result; - Object object = expression.execute(map); - if (object != null) { - result = (Boolean) object; - } else { - throw new ExpressionRuntimeException(); - } - return result; - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java index 6461acc..e1b802e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -14,10 +14,6 @@ import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; import com.geedgenetworks.api.expressions.AviatorExecutor; import com.google.common.collect.Lists; -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.AviatorEvaluatorInstance; -import com.googlecode.aviator.Expression; -import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; @@ -30,8 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; -import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; +import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect; @Slf4j public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator> { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 3acd444..d5e62f3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -11,10 +11,6 @@ import com.geedgenetworks.api.common.udf.UDFContext; import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.event.Event; import com.google.common.collect.Lists; -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.AviatorEvaluatorInstance; -import com.googlecode.aviator.Expression; -import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -24,8 +20,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; -import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; +import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect; @Slf4j public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index b5bbbea..be86691 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -1,7 +1,6 @@ package com.geedgenetworks.core.processor.aggregate; import com.alibaba.fastjson.JSON; -import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.common.config.Accumulator; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.exception.CommonErrorCode; @@ -10,10 +9,6 @@ import com.geedgenetworks.api.common.udf.UDFContext; import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.common.udf.AggregateFunction; import com.google.common.collect.Lists; -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.AviatorEvaluatorInstance; -import com.googlecode.aviator.Expression; -import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -23,7 +18,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; +import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect; @Slf4j public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 695075d..74c8de7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -15,17 +15,13 @@ import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; import com.google.common.collect.Lists; -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.AviatorEvaluatorInstance; -import com.googlecode.aviator.Expression; -import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; -import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; -import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect; import java.util.LinkedList; import java.util.List; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java index a1e0560..5f986b6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -12,10 +12,6 @@ import com.geedgenetworks.api.common.udf.UDFEntity; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; import com.google.common.collect.Lists; -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.AviatorEvaluatorInstance; -import com.googlecode.aviator.Expression; -import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -24,8 +20,7 @@ import org.apache.flink.util.Collector; import java.util.*; -import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; -import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; +import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect; @Slf4j public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { private LinkedList<UDFEntity> functions; |
