diff options
Diffstat (limited to 'groot-core')
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java | 4 | ||||
| -rw-r--r-- | groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java | 10 |
2 files changed, 6 insertions, 8 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 803fefc..4f9535d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { private final List<UDFContext> udfContexts; private final List<String> udfClassNameLists; - private final List<String> groupByFields; - private LinkedList<UdfEntity> functions; + private final LinkedList<UdfEntity> functions; public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); } - groupByFields = aggregateConfig.getGroup_by_fields(); functions = Lists.newLinkedList(); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java index 7a129ef..f07b568 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -21,8 +21,8 @@ import java.util.Map; @Slf4j public class SplitFunction extends ProcessFunction<Event, Event> { - private SplitConfig splitConfig; - private List<RuleContext> routes; + private final SplitConfig splitConfig; + private List<RuleContext> rules; private transient InternalMetrics internalMetrics; public SplitFunction(SplitConfig splitConfig) { @@ -34,8 +34,8 @@ public class SplitFunction extends ProcessFunction<Event, Event> { public void open(Configuration parameters) throws Exception { this.internalMetrics = new InternalMetrics(getRuntimeContext()); - this.routes = splitConfig.getRules(); - for(RuleContext rule :routes){ + this.rules = splitConfig.getRules(); + for(RuleContext rule : rules){ String expression = rule.getExpression(); AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); instance.setCachedExpressionByDefault(true); @@ -53,7 +53,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> { public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception { try { internalMetrics.incrementInEvents(); - for (RuleContext route :routes){ + for (RuleContext route : rules){ boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; if (result) { ctx.output(route.getOutputTag(), event); |
