diff options
| author | wangkuan <[email protected]> | 2024-08-26 10:48:59 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-26 10:48:59 +0800 |
| commit | 9b0297020611fcf70445284637f370b5f8c4fddd (patch) | |
| tree | 9e1c7ded8f047c7779c3338fa9d82e2bc44e935d /groot-core | |
| parent | 215dd9aa1e4ec6a509d64c78ec414a8196dace3c (diff) | |
[feature][core][common]单元测试优化,命名优化feature/split
Diffstat (limited to 'groot-core')
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java | 4 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java | 10 |
2 files changed, 6 insertions, 8 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 803fefc..4f9535d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { private final List<UDFContext> udfContexts; private final List<String> udfClassNameLists; - private final List<String> groupByFields; - private LinkedList<UdfEntity> functions; + private final LinkedList<UdfEntity> functions; public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); } - groupByFields = aggregateConfig.getGroup_by_fields(); functions = Lists.newLinkedList(); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java index 7a129ef..f07b568 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -21,8 +21,8 @@ import java.util.Map; @Slf4j public class SplitFunction extends ProcessFunction<Event, Event> { - private SplitConfig splitConfig; - private List<RuleContext> routes; + private final SplitConfig splitConfig; + private List<RuleContext> rules; private transient InternalMetrics internalMetrics; public SplitFunction(SplitConfig splitConfig) { @@ -34,8 +34,8 @@ public class SplitFunction extends ProcessFunction<Event, Event> { public void open(Configuration parameters) throws Exception { this.internalMetrics = new InternalMetrics(getRuntimeContext()); - this.routes = splitConfig.getRules(); - for(RuleContext rule :routes){ + this.rules = splitConfig.getRules(); + for(RuleContext rule : rules){ String expression = rule.getExpression(); AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); instance.setCachedExpressionByDefault(true); @@ -53,7 +53,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> { public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception { try { internalMetrics.incrementInEvents(); - for (RuleContext route :routes){ + for (RuleContext route : rules){ boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; if (result) { ctx.output(route.getOutputTag(), event); |
