summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-11-18 09:14:01 +0000
committer王宽 <[email protected]>2024-11-18 09:14:01 +0000
commit7fbf2f47709b9046680ba07bb9a36717c5b0e250 (patch)
tree19312ba195d5807f36e68832812a895be9cb5e2c
parentbc8fe110e1037e75012c4fb655fff888d4356bf4 (diff)
parent3fb8f0945b88f48bfb1b26d3ab8f14fcb7680632 (diff)
Merge branch 'fix/aviator-expression' into 'develop'
[fix][core][api]统一Aviator表达式语法和调用方式,所有processor和函数使用表达式不需要使用event.字段名。 See merge request galaxy/platform/groot-stream!137
-rw-r--r--config/grootstream_job_example.yaml8
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java3
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/expressions/AviatorExecutor.java55
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/expressions/Calc.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/expressions/Calc.java)4
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/expressions/SyntaxErrorException.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/expressions/SyntaxErrorException.java)2
-rw-r--r--groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.api.factory.Factory (renamed from groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.bootstrap.command.CryptoShade (renamed from groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade)0
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml6
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_test.yaml6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/expressions/EvalExecutor.java35
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java22
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java6
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/expressions/AviatorExecutorTest.java (renamed from groot-core/src/test/java/com/geedgenetworks/core/expressions/EvalExecutorTest.java)56
-rw-r--r--groot-formats/format-csv/pom.xml12
22 files changed, 145 insertions, 165 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index aa7379a..d7ad730 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -10,22 +10,22 @@ filters:
filter_operator:
type: aviator
properties:
- expression: event.server_ip != '12.12.12.12'
+ expression: server_ip != '12.12.12.12'
splits:
decoded_as_split:
type: split
rules:
- tag: http_tag
- expression: event.decoded_as == 'HTTP'
+ expression: decoded_as == 'HTTP'
- tag: dns_tag
- expression: event.decoded_as == 'DNS'
+ expression: decoded_as == 'DNS'
processing_pipelines:
projection_processor:
type: projection
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
- function: DROP
- filter: event.server_ip == '4.4.4.4'
+ filter: server_ip == '4.4.4.4'
aggregate_processor:
type: aggregate
output_fields:
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java
index 455b074..6cb4f5a 100644
--- a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java
+++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFEntity.java
@@ -1,4 +1,5 @@
package com.geedgenetworks.api.common.udf;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.googlecode.aviator.Expression;
import lombok.Data;
@@ -9,7 +10,7 @@ public class UDFEntity implements Serializable {
private ScalarFunction scalarFunction;
private AggregateFunction aggregateFunction;
private TableFunction tableFunction;
- private Expression filterExpression;
+ private AviatorExecutor aviatorExecutor;
private String name;
private String className;
private UDFContext udfContext;
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/expressions/AviatorExecutor.java b/groot-api/src/main/java/com/geedgenetworks/api/expressions/AviatorExecutor.java
new file mode 100644
index 0000000..8052e12
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/expressions/AviatorExecutor.java
@@ -0,0 +1,55 @@
+package com.geedgenetworks.api.expressions;
+
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import com.googlecode.aviator.exception.ExpressionSyntaxErrorException;
+
+import java.util.Map;
+
+/**
+ * Eval Function Executor.
+ *
+ * @author chaoc
+ * @since 1.5
+ */
+public final class AviatorExecutor implements Calc {
+
+ private final Expression script;
+
+ public AviatorExecutor(String script) {
+ try {
+ AviatorEvaluator.validate(script);
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ this.script = instance.compile(script,true);
+ } catch (ExpressionSyntaxErrorException
+ | ExpressionRuntimeException e) {
+ throw new SyntaxErrorException("Eval script syntax error. ", e);
+ }
+ }
+
+ @Override
+ public Object eval(final Map<String, Object> data) {
+ return script.execute(data);
+ }
+
+ @Override
+ public Boolean filter(Map<String, Object> data) {
+
+ boolean result;
+ Object object = script.execute(data);
+ if (object != null) {
+ result = (Boolean) object;
+ } else {
+ throw new ExpressionRuntimeException();
+ }
+ return result;
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/expressions/Calc.java b/groot-api/src/main/java/com/geedgenetworks/api/expressions/Calc.java
index 2b88853..418ab8b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/expressions/Calc.java
+++ b/groot-api/src/main/java/com/geedgenetworks/api/expressions/Calc.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.core.expressions;
+package com.geedgenetworks.api.expressions;
import java.io.Serializable;
import java.util.Map;
@@ -19,4 +19,6 @@ public interface Calc extends Serializable {
* @return The result of the calculation operation.
*/
Object eval(final Map<String, Object> data);
+ Boolean filter(final Map<String, Object> data);
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/expressions/SyntaxErrorException.java b/groot-api/src/main/java/com/geedgenetworks/api/expressions/SyntaxErrorException.java
index f058615..72e3e49 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/expressions/SyntaxErrorException.java
+++ b/groot-api/src/main/java/com/geedgenetworks/api/expressions/SyntaxErrorException.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.core.expressions;
+package com.geedgenetworks.api.expressions;
/**
* Exception class representing a syntax error that may occur during program execution.
diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
index f9a21ea..f9a21ea 100644
--- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.bootstrap.command.CryptoShade
index 273b40d..273b40d 100644
--- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
+++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.bootstrap.command.CryptoShade
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index e3f5613..61e8293 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
@@ -15,7 +15,7 @@ filters:
type: aviator
output_fields:
properties:
- expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE'
+ expression: decoded_as == 'SSL' || decoded_as == 'BASE'
preprocessing_pipelines:
@@ -32,7 +32,7 @@ preprocessing_pipelines:
- function: DROP
lookup_fields: [ '' ]
output_fields: [ '' ]
- filter: event.common_schema_type == 'BASE'
+ filter: decoded_as == 'BASE'
processing_pipelines:
session_record_processor: # [object] Processing Pipeline
@@ -45,7 +45,7 @@ processing_pipelines:
- function: DROP
lookup_fields: [ '' ]
output_fields: [ '' ]
- filter: event.decoded_as == 'SSL'
+ filter: decoded_as == 'SSL'
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
index 01fc6dd..d40bc31 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -18,9 +18,9 @@ splits:
type: split
rules:
- tag: http_tag
- expression: event.decoded_as == 'HTTP'
+ expression: decoded_as == 'HTTP'
- tag: dns_tag
- expression: event.decoded_as == 'DNS'
+ expression: decoded_as == 'DNS'
postprocessing_pipelines:
pre_etl_processor: # [object] Processing Pipeline
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_test.yaml
index 45c8f56..107977f 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_test.yaml
@@ -15,7 +15,7 @@ filters:
type: com.geedgenetworks.core.filter.AviatorFilter
output_fields:
properties:
- expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE'
+ expression: decoded_as == 'SSL' || decoded_as == 'BASE'
preprocessing_pipelines:
@@ -32,7 +32,7 @@ preprocessing_pipelines:
- function: DROP
lookup_fields: [ '' ]
output_fields: [ '' ]
- filter: event.common_schema_type == 'BASE'
+ filter: common_schema_type == 'BASE'
processing_pipelines:
session_record_processor: # [object] Processing Pipeline
@@ -45,7 +45,7 @@ processing_pipelines:
- function: DROP
lookup_fields: [ '' ]
output_fields: [ '' ]
- filter: event.decoded_as == 'SSL'
+ filter: decoded_as == 'SSL'
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/expressions/EvalExecutor.java b/groot-core/src/main/java/com/geedgenetworks/core/expressions/EvalExecutor.java
deleted file mode 100644
index d3936f5..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/expressions/EvalExecutor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.geedgenetworks.core.expressions;
-
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.Expression;
-import com.googlecode.aviator.exception.ExpressionRuntimeException;
-import com.googlecode.aviator.exception.ExpressionSyntaxErrorException;
-
-import java.util.Map;
-
-/**
- * Eval Function Executor.
- *
- * @author chaoc
- * @since 1.5
- */
-public final class EvalExecutor implements Calc {
-
- private final Expression script;
-
- public EvalExecutor(String script) {
- try {
- AviatorEvaluator.validate(script);
- this.script =
- AviatorEvaluator.getInstance().compile(script);
- } catch (ExpressionSyntaxErrorException
- | ExpressionRuntimeException e) {
- throw new SyntaxErrorException("Eval script syntax error. ", e);
- }
- }
-
- @Override
- public Object eval(final Map<String, Object> data) {
- return script.execute(data);
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
index ab05478..6461acc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -12,6 +12,7 @@ import com.geedgenetworks.api.common.udf.UDFContext;
import com.geedgenetworks.api.common.udf.UDFEntity;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.google.common.collect.Lists;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.AviatorEvaluatorInstance;
@@ -72,7 +73,6 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
for (UDFContext udfContext : udfContexts) {
- Expression filterExpression = null;
UDFEntity udfEntity = new UDFEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
@@ -80,14 +80,9 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- filterExpression = instance.compile(udfContext.getFilter(), true);
+ udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
}
udfEntity.setAggregateFunction(aggregateFunction);
- udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
udfEntity.setUdfContext(udfContext);
@@ -171,7 +166,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
accumulator.setInEvents(accumulator.getInEvents() + 1);
for (UDFEntity udafEntity : functions) {
try {
- boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true;
+ boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true;
if (result) {
udafEntity.getAggregateFunction().add(event, accumulator);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index d57b1a2..92c7f6b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.core.processor.aggregate;
import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.common.config.Accumulator;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.exception.CommonErrorCode;
@@ -42,7 +43,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
for (UDFContext udfContext : udfContexts) {
- Expression filterExpression = null;
UDFEntity udfEntity = new UDFEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
@@ -50,14 +50,10 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- filterExpression = instance.compile(udfContext.getFilter(), true);
+ udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
+
}
udfEntity.setAggregateFunction(aggregateFunction);
- udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
udfEntity.setUdfContext(udfContext);
@@ -92,7 +88,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
accumulator.setInEvents(accumulator.getInEvents()+1);
for (UDFEntity udafEntity : functions) {
try {
- boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true;
+ boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true;
if (result) {
udafEntity.getAggregateFunction().add(event, accumulator);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
index 91e64e4..b5bbbea 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.core.processor.aggregate;
import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.common.config.Accumulator;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.exception.CommonErrorCode;
@@ -40,22 +41,13 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
for (UDFContext udfContext : udfContexts) {
- Expression filterExpression = null;
UDFEntity udfEntity = new UDFEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
// 函数如果包含filter,对表达式进行编译
- if (udfContext.getFilter() != null) {
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- filterExpression = instance.compile(udfContext.getFilter(), true);
- }
udfEntity.setAggregateFunction(aggregateFunction);
- udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
udfEntity.setUdfContext(udfContext);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
index 8a08cb3..7107161 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
@@ -1,13 +1,16 @@
package com.geedgenetworks.core.processor.filter;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
+import com.geedgenetworks.api.expressions.SyntaxErrorException;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.AviatorEvaluatorInstance;
import com.googlecode.aviator.Expression;
import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import com.googlecode.aviator.exception.ExpressionSyntaxErrorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;
@@ -16,8 +19,7 @@ import java.util.Map;
@Slf4j
public class FilterFunction extends RichFilterFunction<Event> {
private final FilterConfig filterConfig;
- private static Expression compiledExp;
- private static String expression;
+ private AviatorExecutor aviatorExecutor;
private transient InternalMetrics internalMetrics;
public FilterFunction(FilterConfig filterConfig) {
@@ -27,12 +29,8 @@ public class FilterFunction extends RichFilterFunction<Event> {
@Override
public void open(Configuration parameters) throws Exception {
this.internalMetrics = new InternalMetrics(getRuntimeContext());
- expression = filterConfig.getProperties().getOrDefault("expression", "").toString();
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- compiledExp = instance.compile(expression, true);
+ String expression = filterConfig.getProperties().getOrDefault("expression", "").toString();
+ this.aviatorExecutor = new AviatorExecutor(expression);
}
@Override
@@ -47,11 +45,11 @@ public class FilterFunction extends RichFilterFunction<Event> {
ColumnUtil.columnSelector(
value.getExtractedFields(), filterConfig.getOutput_fields()));
}
- isFilter = aviatorExcute(value.getExtractedFields());
+ isFilter = aviatorExecutor.filter(value.getExtractedFields());
} catch (ExpressionRuntimeException e){
isFilter = false;
- log.error("Invalid filter expression: {}, {}", expression, e);
+ log.error("Invalid filter expression ! "+e);
internalMetrics.incrementErrorEvents();
}
catch (RuntimeException ignored) {
@@ -66,19 +64,5 @@ public class FilterFunction extends RichFilterFunction<Event> {
}
return isFilter;
}
- public static boolean aviatorExcute(Map<String, Object> map) {
- boolean bool = true;
- if (!"".equals(expression)) {
- Object object = compiledExp.execute(compiledExp.newEnv("event", map));
- if(object != null){
- bool = (boolean) object;
- }
- else {
- throw new ExpressionRuntimeException();
- }
- }
- return bool;
-
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 76fe634..2722c05 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.core.processor.projection;
import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
@@ -56,7 +57,6 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5")));
for (UDFContext udfContext : udfContexts) {
- Expression filterExpression = null;
UDFEntity udfEntity = new UDFEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
@@ -65,14 +65,9 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
scalarFunction.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- filterExpression = instance.compile(udfContext.getFilter(), true);
+ new AviatorExecutor(udfContext.getFilter());
}
udfEntity.setScalarFunction(scalarFunction);
- udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
functions.add(udfEntity);
@@ -99,7 +94,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
if (event.isDropped()) {
break;
} else {
- boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true;
+ boolean result = udfEntity.getAviatorExecutor() != null ? udfEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true;
if (result) {
udfEntity.getScalarFunction().evaluate(event);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java
index f1cd8e8..33d135a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java
@@ -1,6 +1,8 @@
package com.geedgenetworks.core.processor.split;
import com.geedgenetworks.api.event.Event;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
+import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import lombok.Data;
import org.apache.flink.util.OutputTag;
@@ -12,7 +14,7 @@ public class RuleContext implements Serializable {
private String tag;
private String expression;
- private Expression compiledExpression;
+ private AviatorExecutor aviatorExecutor;
private OutputTag<Event> outputTag ;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java
index 9172637..bfd3d10 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.processor.split;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.api.metrics.InternalMetrics;
import com.geedgenetworks.api.event.Event;
import com.googlecode.aviator.AviatorEvaluator;
@@ -34,13 +35,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
this.internalMetrics = new InternalMetrics(getRuntimeContext());
this.rules = splitConfig.getRules();
for(RuleContext rule : rules){
- String expression = rule.getExpression();
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- Expression compiledExp = instance.compile(expression, true);
- rule.setCompiledExpression(compiledExp);
+ rule.setAviatorExecutor(new AviatorExecutor(rule.getExpression()));
OutputTag<Event> outputTag = new OutputTag<>(rule.getTag()){};
rule.setOutputTag(outputTag);
}
@@ -52,7 +47,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
try {
internalMetrics.incrementInEvents();
for (RuleContext route : rules){
- boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true;
+ boolean result = route.getAviatorExecutor() != null ? route.getAviatorExecutor().filter(event.getExtractedFields()): true;
if (result) {
ctx.output(route.getOutputTag(), event);
}
@@ -63,15 +58,4 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
}
}
- public static Boolean filterExecute(Expression expression, Map<String, Object> map) {
-
- boolean result;
- Object object = expression.execute(map);
- if (object != null) {
- result = (Boolean) object;
- } else {
- throw new ExpressionRuntimeException();
- }
- return result;
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
index 2b77df7..4fe0373 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.core.processor.table;
import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
@@ -48,7 +49,6 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
for (UDFContext udfContext : udfContexts) {
- Expression filterExpression = null;
UDFEntity udfEntity = new UDFEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
if (udfClassReflect.containsKey(udfContext.getFunction())) {
@@ -57,14 +57,9 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
tableFunction.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- filterExpression = instance.compile(udfContext.getFilter(), true);
+ new AviatorExecutor(udfContext.getFilter());
}
udfEntity.setTableFunction(tableFunction);
- udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
functions.add(udfEntity);
@@ -90,7 +85,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
List<Event> newEvents = new ArrayList<>();
for(int i=0;i<events.size();i++) {
try {
- boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", events.get(i).getExtractedFields())) : true;
+ boolean result = udfEntity.getAviatorExecutor() != null ? udfEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true;
if (!events.get(i).isDropped() && result) {
newEvents.addAll(udfEntity.getTableFunction().evaluate(events.get(i)));
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
index ca1f731..dba08c6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
@@ -2,8 +2,8 @@ package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.core.expressions.Calc;
-import com.geedgenetworks.core.expressions.EvalExecutor;
+import com.geedgenetworks.api.expressions.Calc;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
import com.geedgenetworks.api.common.udf.ScalarFunction;
import com.geedgenetworks.api.common.udf.UDFContext;
import com.geedgenetworks.api.event.Event;
@@ -30,7 +30,7 @@ public class Eval implements ScalarFunction {
String expr = (String) udfContext.getParameters().get("value_expression");
List<String> outputField = udfContext.getOutputFields();
output = outputField.get(0);
- calc = new EvalExecutor(expr);
+ calc = new AviatorExecutor(expr);
}
@Override
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/expressions/EvalExecutorTest.java b/groot-core/src/test/java/com/geedgenetworks/core/expressions/AviatorExecutorTest.java
index e1e9924..64f0fb8 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/expressions/EvalExecutorTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/expressions/AviatorExecutorTest.java
@@ -1,5 +1,7 @@
package com.geedgenetworks.core.expressions;
+import com.geedgenetworks.api.expressions.AviatorExecutor;
+import com.geedgenetworks.api.expressions.SyntaxErrorException;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -11,7 +13,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-class EvalExecutorTest {
+class AviatorExecutorTest {
private static Map<String, Object> origin;
@@ -40,15 +42,15 @@ class EvalExecutorTest {
@Test
public void testValueExpression() throws Exception {
// constant
- assertEquals(new EvalExecutor("'name'").eval(origin), "name");
- assertEquals(new EvalExecutor("5").eval(origin), 5L);
- assertEquals(new EvalExecutor("true").eval(origin), true);
- assertEquals(new EvalExecutor("false").eval(origin), false);
+ assertEquals(new AviatorExecutor("'name'").eval(origin), "name");
+ assertEquals(new AviatorExecutor("5").eval(origin), 5L);
+ assertEquals(new AviatorExecutor("true").eval(origin), true);
+ assertEquals(new AviatorExecutor("false").eval(origin), false);
// variable
- assertEquals(new EvalExecutor("common_client_ip").eval(origin),
+ assertEquals(new AviatorExecutor("common_client_ip").eval(origin),
origin.get("common_client_ip"));
- assertEquals(new EvalExecutor("common_client_port").eval(origin),
+ assertEquals(new AviatorExecutor("common_client_port").eval(origin),
origin.get("common_client_port"));
}
@@ -61,15 +63,15 @@ class EvalExecutorTest {
int commonC2sByteNum = (int) origin.get("common_c2s_byte_num");
int commonS2cByteNum = (int) origin.get("common_s2c_byte_num");
assertEquals(
- new EvalExecutor("common_client_port > common_server_port " +
+ new AviatorExecutor("common_client_port > common_server_port " +
"? common_c2s_pkt_num : common_s2c_pkt_num").eval(origin),
commonClientPort > commonServerPort ? commonC2sPktNum : commonS2cPktNum);
assertEquals(
- new EvalExecutor("common_client_port > common_server_port " +
+ new AviatorExecutor("common_client_port > common_server_port " +
"? 'C2S:' + common_c2s_pkt_num : 'S2C:' + common_s2c_pkt_num").eval(origin),
commonClientPort > commonServerPort ? "C2S:" + commonC2sPktNum : "S2C:" + commonS2cPktNum);
assertEquals(
- new EvalExecutor("common_c2s_pkt_num > common_s2c_pkt_num " +
+ new AviatorExecutor("common_c2s_pkt_num > common_s2c_pkt_num " +
"&& common_c2s_byte_num > common_s2c_byte_num ? common_client_port : common_server_port").eval(origin),
commonC2sPktNum > commonS2cPktNum
&& commonC2sByteNum > commonS2cByteNum ? commonClientPort : commonServerPort
@@ -81,59 +83,59 @@ class EvalExecutorTest {
public void testArithmeticExpression() throws Exception {
final Map<String, Object> map = Collections.emptyMap();
assertEquals(
- 1L, new EvalExecutor("5 & 3").eval(map));
+ 1L, new AviatorExecutor("5 & 3").eval(map));
assertEquals(
- 7L, new EvalExecutor("5 | 3").eval(map));
+ 7L, new AviatorExecutor("5 | 3").eval(map));
assertEquals(
- 20L, new EvalExecutor("5 << 2").eval(map));
+ 20L, new AviatorExecutor("5 << 2").eval(map));
assertEquals(
- 1L, new EvalExecutor("5 >> 2").eval(map));
+ 1L, new AviatorExecutor("5 >> 2").eval(map));
assertEquals(
- 8L, new EvalExecutor("5 + 3").eval(map));
+ 8L, new AviatorExecutor("5 + 3").eval(map));
assertEquals(
- 2L, new EvalExecutor("5 - 3").eval(map));
+ 2L, new AviatorExecutor("5 - 3").eval(map));
assertEquals(
- 15L, new EvalExecutor("5 * 3").eval(map));
+ 15L, new AviatorExecutor("5 * 3").eval(map));
assertEquals(
- 2L, new EvalExecutor("5 / 2").eval(map));
+ 2L, new AviatorExecutor("5 / 2").eval(map));
assertEquals(
- 2L, new EvalExecutor("5 % 3").eval(map));
+ 2L, new AviatorExecutor("5 % 3").eval(map));
final Map<String, Object> map1 = new HashMap<>();
map1.put("flags", 8);
assertEquals("B",
- new EvalExecutor(" ( flags & 8 ) == 9 ? 'A' : 'B'").eval(map1));
+ new AviatorExecutor(" ( flags & 8 ) == 9 ? 'A' : 'B'").eval(map1));
assertEquals("A",
- new EvalExecutor(" ( 5 | flags ) == 13 ? 'A' : 'B'").eval(map1));
+ new AviatorExecutor(" ( 5 | flags ) == 13 ? 'A' : 'B'").eval(map1));
}
@Test
public void testFieldArithmeticExpression() throws Exception {
assertEquals(1L,
- new EvalExecutor("common_c2s_pkt_num - common_s2c_pkt_num").eval(origin));
+ new AviatorExecutor("common_c2s_pkt_num - common_s2c_pkt_num").eval(origin));
- assertEquals(17L, new EvalExecutor("common_c2s_pkt_num + 10 + 1").eval(origin));
+ assertEquals(17L, new AviatorExecutor("common_c2s_pkt_num + 10 + 1").eval(origin));
}
@Test
public void testNullField() {
- var evalExecutor = new EvalExecutor("( a > nil ? a : 0 ) + ( b > nil ? b : 10 )");
+ var evalExecutor = new AviatorExecutor("( a > nil ? a : 0 ) + ( b > nil ? b : 10 )");
assertEquals(10L, evalExecutor.eval(origin));
}
@Test
public void testError() {
assertThrows(SyntaxErrorException.class,
- () -> new EvalExecutor("1L"));
+ () -> new AviatorExecutor("1L"));
assertThrows(SyntaxErrorException.class,
- () -> new EvalExecutor("2 = 3"));
+ () -> new AviatorExecutor("2 = 3"));
final Map<String, Object> map = new HashMap<>(origin);
map.put("has_address", true);
- var convert = new EvalExecutor("common_c2s_pkt_num + has_address");
+ var convert = new AviatorExecutor("common_c2s_pkt_num + has_address");
assertThrows(ExpressionRuntimeException.class, () -> convert.eval(map));
}
diff --git a/groot-formats/format-csv/pom.xml b/groot-formats/format-csv/pom.xml
index 509a9c1..c075ec2 100644
--- a/groot-formats/format-csv/pom.xml
+++ b/groot-formats/format-csv/pom.xml
@@ -39,4 +39,16 @@
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>9</source>
+ <target>9</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project> \ No newline at end of file