diff options
| author | wangkuan <[email protected]> | 2024-11-18 14:07:29 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-11-18 14:07:29 +0800 |
| commit | 85f95e2c20ad76d9a00b188aa41ce6fef9a2ac99 (patch) | |
| tree | 1b6d8a67a5672f516aa8764a4669b3efbd680f2b | |
| parent | 7d6c1eb13837931b4b526f05adb550a58fec1aea (diff) | |
[fix][core][api]统一Aviator表达式语法和调用方式,所有processor和函数使用表达式不需要使用event.字段名。
21 files changed, 140 insertions, 158 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index aa7379a..d7ad730 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -10,22 +10,22 @@ filters: filter_operator: type: aviator properties: - expression: event.server_ip != '12.12.12.12' + expression: server_ip != '12.12.12.12' splits: decoded_as_split: type: split rules: - tag: http_tag - expression: event.decoded_as == 'HTTP' + expression: decoded_as == 'HTTP' - tag: dns_tag - expression: event.decoded_as == 'DNS' + expression: decoded_as == 'DNS' processing_pipelines: projection_processor: type: projection remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: - function: DROP - filter: event.server_ip == '4.4.4.4' + filter: server_ip == '4.4.4.4' aggregate_processor: type: aggregate output_fields: diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java index d8434d6..771bc66 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java @@ -1,4 +1,5 @@ package com.geedgenetworks.api.common.udf; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.googlecode.aviator.Expression; import lombok.Data; @@ -9,7 +10,7 @@ public class UdfEntity implements Serializable { private ScalarFunction scalarFunction; private AggregateFunction aggregateFunction; private TableFunction tableFunction; - private Expression filterExpression; + private AviatorExecutor aviatorExecutor; private String name; private String className; private UDFContext udfContext; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/expressions/AviatorExecutor.java b/groot-api/src/main/java/com/geedgenetworks/api/expressions/AviatorExecutor.java new file mode 100644 index 0000000..8052e12 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/expressions/AviatorExecutor.java @@ -0,0 +1,55 @@ +package com.geedgenetworks.api.expressions; + +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import com.googlecode.aviator.exception.ExpressionSyntaxErrorException; + +import java.util.Map; + +/** + * Eval Function Executor. + * + * @author chaoc + * @since 1.5 + */ +public final class AviatorExecutor implements Calc { + + private final Expression script; + + public AviatorExecutor(String script) { + try { + AviatorEvaluator.validate(script); + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + this.script = instance.compile(script,true); + } catch (ExpressionSyntaxErrorException + | ExpressionRuntimeException e) { + throw new SyntaxErrorException("Eval script syntax error. ", e); + } + } + + @Override + public Object eval(final Map<String, Object> data) { + return script.execute(data); + } + + @Override + public Boolean filter(Map<String, Object> data) { + + boolean result; + Object object = script.execute(data); + if (object != null) { + result = (Boolean) object; + } else { + throw new ExpressionRuntimeException(); + } + return result; + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/expressions/Calc.java b/groot-api/src/main/java/com/geedgenetworks/api/expressions/Calc.java index 2b88853..418ab8b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/expressions/Calc.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/expressions/Calc.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.core.expressions; +package com.geedgenetworks.api.expressions; import java.io.Serializable; import java.util.Map; @@ -19,4 +19,6 @@ public interface Calc extends Serializable { * @return The result of the calculation operation. */ Object eval(final Map<String, Object> data); + Boolean filter(final Map<String, Object> data); + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/expressions/SyntaxErrorException.java b/groot-api/src/main/java/com/geedgenetworks/api/expressions/SyntaxErrorException.java index f058615..72e3e49 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/expressions/SyntaxErrorException.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/expressions/SyntaxErrorException.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.core.expressions; +package com.geedgenetworks.api.expressions; /** * Exception class representing a syntax error that may occur during program execution. diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.api.factory.Factory index f9a21ea..f9a21ea 100644 --- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.api.factory.Factory diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.bootstrap.command.CryptoShade index 273b40d..273b40d 100644 --- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade +++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.bootstrap.command.CryptoShade diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index e3f5613..bbde37b 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -15,7 +15,7 @@ filters: type: aviator output_fields: properties: - expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE' + expression: decoded_as == 'SSL' || decoded_as == 'BASE' preprocessing_pipelines: diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml index 01fc6dd..d40bc31 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -18,9 +18,9 @@ splits: type: split rules: - tag: http_tag - expression: event.decoded_as == 'HTTP' + expression: decoded_as == 'HTTP' - tag: dns_tag - expression: event.decoded_as == 'DNS' + expression: decoded_as == 'DNS' postprocessing_pipelines: pre_etl_processor: # [object] Processing Pipeline diff --git a/groot-core/src/main/java/com/geedgenetworks/core/expressions/EvalExecutor.java b/groot-core/src/main/java/com/geedgenetworks/core/expressions/EvalExecutor.java deleted file mode 100644 index d3936f5..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/expressions/EvalExecutor.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.geedgenetworks.core.expressions; - -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.Expression; -import com.googlecode.aviator.exception.ExpressionRuntimeException; -import com.googlecode.aviator.exception.ExpressionSyntaxErrorException; - -import java.util.Map; - -/** - * Eval Function Executor. - * - * @author chaoc - * @since 1.5 - */ -public final class EvalExecutor implements Calc { - - private final Expression script; - - public EvalExecutor(String script) { - try { - AviatorEvaluator.validate(script); - this.script = - AviatorEvaluator.getInstance().compile(script); - } catch (ExpressionSyntaxErrorException - | ExpressionRuntimeException e) { - throw new SyntaxErrorException("Eval script syntax error. ", e); - } - } - - @Override - public Object eval(final Map<String, Object> data) { - return script.execute(data); - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java index 0d9c3ca..00d7d9c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -12,6 +12,7 @@ import com.geedgenetworks.api.common.udf.UDFContext; import com.geedgenetworks.api.common.udf.UdfEntity; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.google.common.collect.Lists; import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.AviatorEvaluatorInstance; @@ -72,7 +73,6 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); for (UDFContext udfContext : udfContexts) { - Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { @@ -80,14 +80,9 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - filterExpression = instance.compile(udfContext.getFilter(), true); + udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); } udfEntity.setAggregateFunction(aggregateFunction); - udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); udfEntity.setUdfContext(udfContext); @@ -171,7 +166,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator accumulator.setInEvents(accumulator.getInEvents() + 1); for (UdfEntity udafEntity : functions) { try { - boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; + boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true; if (result) { udafEntity.getAggregateFunction().add(event, accumulator); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 2fb0ff1..ea05c1c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -1,6 +1,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.alibaba.fastjson.JSON; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.common.config.Accumulator; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.exception.CommonErrorCode; @@ -42,7 +43,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { for (UDFContext udfContext : udfContexts) { - Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { @@ -50,14 +50,10 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - filterExpression = instance.compile(udfContext.getFilter(), true); + udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); + } udfEntity.setAggregateFunction(aggregateFunction); - udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); udfEntity.setUdfContext(udfContext); @@ -92,7 +88,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f accumulator.setInEvents(accumulator.getInEvents()+1); for (UdfEntity udafEntity : functions) { try { - boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; + boolean result = udafEntity.getAviatorExecutor() != null ? udafEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true; if (result) { udafEntity.getAggregateFunction().add(event, accumulator); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index 86cf3f6..db5336a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -1,6 +1,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.alibaba.fastjson.JSON; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.common.config.Accumulator; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.exception.CommonErrorCode; @@ -47,15 +48,7 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); // 函数如果包含filter,对表达式进行编译 - if (udfContext.getFilter() != null) { - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - filterExpression = instance.compile(udfContext.getFilter(), true); - } udfEntity.setAggregateFunction(aggregateFunction); - udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); udfEntity.setUdfContext(udfContext); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java index 8a08cb3..7107161 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java @@ -1,13 +1,16 @@ package com.geedgenetworks.core.processor.filter; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.api.expressions.SyntaxErrorException; import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.AviatorEvaluatorInstance; import com.googlecode.aviator.Expression; import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; +import com.googlecode.aviator.exception.ExpressionSyntaxErrorException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.configuration.Configuration; @@ -16,8 +19,7 @@ import java.util.Map; @Slf4j public class FilterFunction extends RichFilterFunction<Event> { private final FilterConfig filterConfig; - private static Expression compiledExp; - private static String expression; + private AviatorExecutor aviatorExecutor; private transient InternalMetrics internalMetrics; public FilterFunction(FilterConfig filterConfig) { @@ -27,12 +29,8 @@ public class FilterFunction extends RichFilterFunction<Event> { @Override public void open(Configuration parameters) throws Exception { this.internalMetrics = new InternalMetrics(getRuntimeContext()); - expression = filterConfig.getProperties().getOrDefault("expression", "").toString(); - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - compiledExp = instance.compile(expression, true); + String expression = filterConfig.getProperties().getOrDefault("expression", "").toString(); + this.aviatorExecutor = new AviatorExecutor(expression); } @Override @@ -47,11 +45,11 @@ public class FilterFunction extends RichFilterFunction<Event> { ColumnUtil.columnSelector( value.getExtractedFields(), filterConfig.getOutput_fields())); } - isFilter = aviatorExcute(value.getExtractedFields()); + isFilter = aviatorExecutor.filter(value.getExtractedFields()); } catch (ExpressionRuntimeException e){ isFilter = false; - log.error("Invalid filter expression: {}, {}", expression, e); + log.error("Invalid filter expression ! "+e); internalMetrics.incrementErrorEvents(); } catch (RuntimeException ignored) { @@ -66,19 +64,5 @@ public class FilterFunction extends RichFilterFunction<Event> { } return isFilter; } - public static boolean aviatorExcute(Map<String, Object> map) { - boolean bool = true; - if (!"".equals(expression)) { - Object object = compiledExp.execute(compiledExp.newEnv("event", map)); - if(object != null){ - bool = (boolean) object; - } - else { - throw new ExpressionRuntimeException(); - } - } - return bool; - - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 03a8f5a..e079790 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -2,6 +2,7 @@ package com.geedgenetworks.core.processor.projection; import com.alibaba.fastjson.JSON; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; @@ -56,7 +57,6 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5"))); for (UDFContext udfContext : udfContexts) { - Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { @@ -65,14 +65,9 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { scalarFunction.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - filterExpression = instance.compile(udfContext.getFilter(), true); + new AviatorExecutor(udfContext.getFilter()); } udfEntity.setScalarFunction(scalarFunction); - udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); functions.add(udfEntity); @@ -99,7 +94,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { if (event.isDropped()) { break; } else { - boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; + boolean result = udfEntity.getAviatorExecutor() != null ? udfEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true; if (result) { udfEntity.getScalarFunction().evaluate(event); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java index f1cd8e8..33d135a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/RuleContext.java @@ -1,6 +1,8 @@ package com.geedgenetworks.core.processor.split; import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.api.expressions.AviatorExecutor; +import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.Expression; import lombok.Data; import org.apache.flink.util.OutputTag; @@ -12,7 +14,7 @@ public class RuleContext implements Serializable { private String tag; private String expression; - private Expression compiledExpression; + private AviatorExecutor aviatorExecutor; private OutputTag<Event> outputTag ; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java index 2cbf74d..d9232d7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitFunction.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.processor.split; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.api.metrics.InternalMetrics; import com.geedgenetworks.api.event.Event; import com.googlecode.aviator.AviatorEvaluator; @@ -34,13 +35,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> { this.internalMetrics = new InternalMetrics(getRuntimeContext()); this.rules = splitConfig.getRules(); for(RuleContext rule : rules){ - String expression = rule.getExpression(); - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - Expression compiledExp = instance.compile(expression, true); - rule.setCompiledExpression(compiledExp); + rule.setAviatorExecutor(new AviatorExecutor(rule.getExpression())); OutputTag<Event> outputTag = new OutputTag<>(rule.getTag()){}; rule.setOutputTag(outputTag); } @@ -52,7 +47,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> { try { internalMetrics.incrementInEvents(); for (RuleContext route : rules){ - boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; + boolean result = route.getAviatorExecutor() != null ? route.getAviatorExecutor().filter(event.getExtractedFields()): true; if (result) { ctx.output(route.getOutputTag(), event); } @@ -63,15 +58,4 @@ public class SplitFunction extends ProcessFunction<Event, Event> { } } - public static Boolean filterExecute(Expression expression, Map<String, Object> map) { - - boolean result; - Object object = expression.execute(map); - if (object != null) { - result = (Boolean) object; - } else { - throw new ExpressionRuntimeException(); - } - return result; - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java index f061028..a054fcd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -1,6 +1,7 @@ package com.geedgenetworks.core.processor.table; import com.alibaba.fastjson.JSON; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; @@ -57,14 +58,9 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { tableFunction.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - filterExpression = instance.compile(udfContext.getFilter(), true); + new AviatorExecutor(udfContext.getFilter()); } udfEntity.setTableFunction(tableFunction); - udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); functions.add(udfEntity); @@ -90,7 +86,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { List<Event> newEvents = new ArrayList<>(); for(int i=0;i<events.size();i++) { try { - boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", events.get(i).getExtractedFields())) : true; + boolean result = udfEntity.getAviatorExecutor() != null ? udfEntity.getAviatorExecutor().filter(event.getExtractedFields()) : true; if (!events.get(i).isDropped() && result) { newEvents.addAll(udfEntity.getTableFunction().evaluate(events.get(i))); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java index ca1f731..dba08c6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java @@ -2,8 +2,8 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.core.expressions.Calc; -import com.geedgenetworks.core.expressions.EvalExecutor; +import com.geedgenetworks.api.expressions.Calc; +import com.geedgenetworks.api.expressions.AviatorExecutor; import com.geedgenetworks.api.common.udf.ScalarFunction; import com.geedgenetworks.api.common.udf.UDFContext; import com.geedgenetworks.api.event.Event; @@ -30,7 +30,7 @@ public class Eval implements ScalarFunction { String expr = (String) udfContext.getParameters().get("value_expression"); List<String> outputField = udfContext.getOutputFields(); output = outputField.get(0); - calc = new EvalExecutor(expr); + calc = new AviatorExecutor(expr); } @Override diff --git a/groot-core/src/test/java/com/geedgenetworks/core/expressions/EvalExecutorTest.java b/groot-core/src/test/java/com/geedgenetworks/core/expressions/AviatorExecutorTest.java index e1e9924..64f0fb8 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/expressions/EvalExecutorTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/expressions/AviatorExecutorTest.java @@ -1,5 +1,7 @@ package com.geedgenetworks.core.expressions; +import com.geedgenetworks.api.expressions.AviatorExecutor; +import com.geedgenetworks.api.expressions.SyntaxErrorException; import com.googlecode.aviator.exception.ExpressionRuntimeException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -11,7 +13,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -class EvalExecutorTest { +class AviatorExecutorTest { private static Map<String, Object> origin; @@ -40,15 +42,15 @@ class EvalExecutorTest { @Test public void testValueExpression() throws Exception { // constant - assertEquals(new EvalExecutor("'name'").eval(origin), "name"); - assertEquals(new EvalExecutor("5").eval(origin), 5L); - assertEquals(new EvalExecutor("true").eval(origin), true); - assertEquals(new EvalExecutor("false").eval(origin), false); + assertEquals(new AviatorExecutor("'name'").eval(origin), "name"); + assertEquals(new AviatorExecutor("5").eval(origin), 5L); + assertEquals(new AviatorExecutor("true").eval(origin), true); + assertEquals(new AviatorExecutor("false").eval(origin), false); // variable - assertEquals(new EvalExecutor("common_client_ip").eval(origin), + assertEquals(new AviatorExecutor("common_client_ip").eval(origin), origin.get("common_client_ip")); - assertEquals(new EvalExecutor("common_client_port").eval(origin), + assertEquals(new AviatorExecutor("common_client_port").eval(origin), origin.get("common_client_port")); } @@ -61,15 +63,15 @@ class EvalExecutorTest { int commonC2sByteNum = (int) origin.get("common_c2s_byte_num"); int commonS2cByteNum = (int) origin.get("common_s2c_byte_num"); assertEquals( - new EvalExecutor("common_client_port > common_server_port " + + new AviatorExecutor("common_client_port > common_server_port " + "? common_c2s_pkt_num : common_s2c_pkt_num").eval(origin), commonClientPort > commonServerPort ? commonC2sPktNum : commonS2cPktNum); assertEquals( - new EvalExecutor("common_client_port > common_server_port " + + new AviatorExecutor("common_client_port > common_server_port " + "? 'C2S:' + common_c2s_pkt_num : 'S2C:' + common_s2c_pkt_num").eval(origin), commonClientPort > commonServerPort ? "C2S:" + commonC2sPktNum : "S2C:" + commonS2cPktNum); assertEquals( - new EvalExecutor("common_c2s_pkt_num > common_s2c_pkt_num " + + new AviatorExecutor("common_c2s_pkt_num > common_s2c_pkt_num " + "&& common_c2s_byte_num > common_s2c_byte_num ? common_client_port : common_server_port").eval(origin), commonC2sPktNum > commonS2cPktNum && commonC2sByteNum > commonS2cByteNum ? commonClientPort : commonServerPort @@ -81,59 +83,59 @@ class EvalExecutorTest { public void testArithmeticExpression() throws Exception { final Map<String, Object> map = Collections.emptyMap(); assertEquals( - 1L, new EvalExecutor("5 & 3").eval(map)); + 1L, new AviatorExecutor("5 & 3").eval(map)); assertEquals( - 7L, new EvalExecutor("5 | 3").eval(map)); + 7L, new AviatorExecutor("5 | 3").eval(map)); assertEquals( - 20L, new EvalExecutor("5 << 2").eval(map)); + 20L, new AviatorExecutor("5 << 2").eval(map)); assertEquals( - 1L, new EvalExecutor("5 >> 2").eval(map)); + 1L, new AviatorExecutor("5 >> 2").eval(map)); assertEquals( - 8L, new EvalExecutor("5 + 3").eval(map)); + 8L, new AviatorExecutor("5 + 3").eval(map)); assertEquals( - 2L, new EvalExecutor("5 - 3").eval(map)); + 2L, new AviatorExecutor("5 - 3").eval(map)); assertEquals( - 15L, new EvalExecutor("5 * 3").eval(map)); + 15L, new AviatorExecutor("5 * 3").eval(map)); assertEquals( - 2L, new EvalExecutor("5 / 2").eval(map)); + 2L, new AviatorExecutor("5 / 2").eval(map)); assertEquals( - 2L, new EvalExecutor("5 % 3").eval(map)); + 2L, new AviatorExecutor("5 % 3").eval(map)); final Map<String, Object> map1 = new HashMap<>(); map1.put("flags", 8); assertEquals("B", - new EvalExecutor(" ( flags & 8 ) == 9 ? 'A' : 'B'").eval(map1)); + new AviatorExecutor(" ( flags & 8 ) == 9 ? 'A' : 'B'").eval(map1)); assertEquals("A", - new EvalExecutor(" ( 5 | flags ) == 13 ? 'A' : 'B'").eval(map1)); + new AviatorExecutor(" ( 5 | flags ) == 13 ? 'A' : 'B'").eval(map1)); } @Test public void testFieldArithmeticExpression() throws Exception { assertEquals(1L, - new EvalExecutor("common_c2s_pkt_num - common_s2c_pkt_num").eval(origin)); + new AviatorExecutor("common_c2s_pkt_num - common_s2c_pkt_num").eval(origin)); - assertEquals(17L, new EvalExecutor("common_c2s_pkt_num + 10 + 1").eval(origin)); + assertEquals(17L, new AviatorExecutor("common_c2s_pkt_num + 10 + 1").eval(origin)); } @Test public void testNullField() { - var evalExecutor = new EvalExecutor("( a > nil ? a : 0 ) + ( b > nil ? b : 10 )"); + var evalExecutor = new AviatorExecutor("( a > nil ? a : 0 ) + ( b > nil ? b : 10 )"); assertEquals(10L, evalExecutor.eval(origin)); } @Test public void testError() { assertThrows(SyntaxErrorException.class, - () -> new EvalExecutor("1L")); + () -> new AviatorExecutor("1L")); assertThrows(SyntaxErrorException.class, - () -> new EvalExecutor("2 = 3")); + () -> new AviatorExecutor("2 = 3")); final Map<String, Object> map = new HashMap<>(origin); map.put("has_address", true); - var convert = new EvalExecutor("common_c2s_pkt_num + has_address"); + var convert = new AviatorExecutor("common_c2s_pkt_num + has_address"); assertThrows(ExpressionRuntimeException.class, () -> convert.eval(map)); } diff --git a/groot-formats/format-csv/pom.xml b/groot-formats/format-csv/pom.xml index 509a9c1..c075ec2 100644 --- a/groot-formats/format-csv/pom.xml +++ b/groot-formats/format-csv/pom.xml @@ -39,4 +39,16 @@ </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>9</source> + <target>9</target> + </configuration> + </plugin> + </plugins> + </build> </project>
\ No newline at end of file |
