summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-11-19 09:07:23 +0000
committer王宽 <[email protected]>2024-11-19 09:07:23 +0000
commit30c7d561189236529810bd2d16aa246c1a9aa4c4 (patch)
tree6a2b3467e39f0af61401ccd0ec8a1e52b6b2da66
parent036b7e1f8d28fc25daa11ac76dc16557136e14cc (diff)
parent36572cdae6f48671a463821b610e4549286a2e65 (diff)
Merge branch 'improve/processor' into 'develop'v1.8.0.1-SNAPSHOT
[improve][bootstrap]修改原UDFUtil工具类位置,后期加载udf函数方式待优化 See merge request galaxy/platform/groot-stream!141
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/utils/UDFLoaderUtils.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java)14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java7
6 files changed, 8 insertions, 42 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java b/groot-api/src/main/java/com/geedgenetworks/api/utils/UDFLoaderUtils.java
index 9d4f67b..da6ebe1 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/UDFUtils.java
+++ b/groot-api/src/main/java/com/geedgenetworks/api/utils/UDFLoaderUtils.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.core.utils;
+package com.geedgenetworks.api.utils;
import com.googlecode.aviator.Expression;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
@@ -8,7 +8,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class UDFUtils {
+public class UDFLoaderUtils {
public static Map<String, String> getClassReflect(List<String> plugins) {
Map<String, String> classReflect = new HashMap<>();
@@ -28,15 +28,5 @@ public class UDFUtils {
}
return classReflect;
}
- public static Boolean filterExecute(Expression expression, Map<String, Object> map) {
- boolean result;
- Object object = expression.execute(map);
- if (object != null) {
- result = (Boolean) object;
- } else {
- throw new ExpressionRuntimeException();
- }
- return result;
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
index 6461acc..e1b802e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -14,10 +14,6 @@ import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.google.common.collect.Lists;
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.AviatorEvaluatorInstance;
-import com.googlecode.aviator.Expression;
-import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
@@ -30,8 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
-import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
-import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect;
@Slf4j
public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator> {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 3acd444..d5e62f3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -11,10 +11,6 @@ import com.geedgenetworks.api.common.udf.UDFContext;
import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.event.Event;
import com.google.common.collect.Lists;
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.AviatorEvaluatorInstance;
-import com.googlecode.aviator.Expression;
-import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -24,8 +20,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
-import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect;
@Slf4j
public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
index b5bbbea..be86691 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -1,7 +1,6 @@
package com.geedgenetworks.core.processor.aggregate;
import com.alibaba.fastjson.JSON;
-import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.common.config.Accumulator;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.exception.CommonErrorCode;
@@ -10,10 +9,6 @@ import com.geedgenetworks.api.common.udf.UDFContext;
import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.common.udf.AggregateFunction;
import com.google.common.collect.Lists;
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.AviatorEvaluatorInstance;
-import com.googlecode.aviator.Expression;
-import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -23,7 +18,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect;
@Slf4j
public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Accumulator, Accumulator, Accumulator> {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 695075d..74c8de7 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -15,17 +15,13 @@ import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
import com.google.common.collect.Lists;
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.AviatorEvaluatorInstance;
-import com.googlecode.aviator.Expression;
-import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
-import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
-import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+
+import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect;
import java.util.LinkedList;
import java.util.List;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
index a1e0560..5f986b6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
@@ -12,10 +12,6 @@ import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
import com.google.common.collect.Lists;
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.AviatorEvaluatorInstance;
-import com.googlecode.aviator.Expression;
-import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -24,8 +20,7 @@ import org.apache.flink.util.Collector;
import java.util.*;
-import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
-import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+import static com.geedgenetworks.api.utils.UDFLoaderUtils.getClassReflect;
@Slf4j
public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
private LinkedList<UDFEntity> functions;