summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java10
2 files changed, 6 insertions, 8 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 803fefc..4f9535d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> {
private final List<UDFContext> udfContexts;
private final List<String> udfClassNameLists;
- private final List<String> groupByFields;
- private LinkedList<UdfEntity> functions;
+ private final LinkedList<UdfEntity> functions;
public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
@@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
}
- groupByFields = aggregateConfig.getGroup_by_fields();
functions = Lists.newLinkedList();
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
index 7a129ef..f07b568 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
@@ -21,8 +21,8 @@ import java.util.Map;
@Slf4j
public class SplitFunction extends ProcessFunction<Event, Event> {
- private SplitConfig splitConfig;
- private List<RuleContext> routes;
+ private final SplitConfig splitConfig;
+ private List<RuleContext> rules;
private transient InternalMetrics internalMetrics;
public SplitFunction(SplitConfig splitConfig) {
@@ -34,8 +34,8 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
public void open(Configuration parameters) throws Exception {
this.internalMetrics = new InternalMetrics(getRuntimeContext());
- this.routes = splitConfig.getRules();
- for(RuleContext rule :routes){
+ this.rules = splitConfig.getRules();
+ for(RuleContext rule : rules){
String expression = rule.getExpression();
AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
instance.setCachedExpressionByDefault(true);
@@ -53,7 +53,7 @@ public class SplitFunction extends ProcessFunction<Event, Event> {
public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
try {
internalMetrics.incrementInEvents();
- for (RuleContext route :routes){
+ for (RuleContext route : rules){
boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true;
if (result) {
ctx.output(route.getOutputTag(), event);