summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-23 17:58:22 +0800
committerwangkuan <[email protected]>2024-08-23 17:58:22 +0800
commit215dd9aa1e4ec6a509d64c78ec414a8196dace3c (patch)
tree5c038000b9de72684a1f8b8f1832394aed30cf13 /groot-core
parent07332297c1306aa0dac649c7d15bf131e8edbc7e (diff)
[feature][core][common]GAL-646 Groot Stream支持Split Operator实现动态分流
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/Split.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java79
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java29
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split1
11 files changed, 177 insertions, 26 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
index 668ba6f..d8b8bc4 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
@@ -2,13 +2,14 @@ package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.FilterConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public class AviatorFilter implements Filter {
@Override
- public SingleOutputStreamOperator<Event> filterFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
+ public DataStream<Event> filterFunction(
+ DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
throws Exception {
if (FilterConfig.getParallelism() != 0) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
index a173438..f8b50eb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
@@ -3,12 +3,13 @@ package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.FilterConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public interface Filter {
- SingleOutputStreamOperator<Event> filterFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
+ DataStream<Event> filterFunction(
+ DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
throws Exception;
String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java
new file mode 100644
index 0000000..4381df5
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.core.pojo;
+
+import com.geedgenetworks.common.udf.RuleContext;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+@Data
+public class SplitConfig implements Serializable {
+
+ private String type;
+ private Map<String, Object> properties;
+ private int parallelism;
+ private String name;
+ private List<RuleContext> rules;
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
index 172b368..3852414 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
@@ -2,12 +2,13 @@ package com.geedgenetworks.core.processor;
import com.geedgenetworks.common.Event;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public interface Processor<T> {
- SingleOutputStreamOperator<Event> processorFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator,
+ DataStream<Event> processorFunction(
+ DataStream<Event> singleOutputStreamOperator,
T processorConfig, ExecutionConfig config)
throws Exception;
String type();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index bc87c32..cf78310 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -5,45 +5,50 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.AggregateConfig;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
+
import static com.geedgenetworks.common.Constants.*;
public class AggregateProcessorImpl implements AggregateProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
+
if (aggregateConfig.getParallelism() != 0) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}else {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}
+
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
index 6b46a7b..d87e7e2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
@@ -4,25 +4,26 @@ import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
public class ProjectionProcessorImpl implements ProjectionProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception {
if (projectionConfig.getParallelism() != 0) {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.process(new ProjectionProcessFunction(projectionConfig))
.setParallelism(projectionConfig.getParallelism())
.name(projectionConfig.getName());
} else {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.process(new ProjectionProcessFunction(projectionConfig))
.name(projectionConfig.getName());
}
}
-
@Override
public String type() {
return "projection";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
index f36f8db..6ddc616 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
@@ -2,25 +2,25 @@ package com.geedgenetworks.core.processor.table;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.TableConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+import java.util.Map;
-import java.time.Duration;
public class TableProcessorImpl implements TableProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception {
-
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) throws Exception {
if (tableConfig.getParallelism() != 0) {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.flatMap(new TableProcessorFunction(tableConfig))
.setParallelism(tableConfig.getParallelism())
.name(tableConfig.getName());
} else {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.flatMap(new TableProcessorFunction(tableConfig))
.name(tableConfig.getName());
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java
new file mode 100644
index 0000000..37e7b44
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.util.Set;
+
+public interface Split {
+
+ DataStream<Event> splitFunction(
+ DataStream<Event> dataStream, SplitConfig splitConfig)
+ throws Exception;
+ String type();
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
new file mode 100644
index 0000000..7a129ef
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
@@ -0,0 +1,79 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+
+@Slf4j
+public class SplitFunction extends ProcessFunction<Event, Event> {
+ private SplitConfig splitConfig;
+ private List<RuleContext> routes;
+ private transient InternalMetrics internalMetrics;
+
+ public SplitFunction(SplitConfig splitConfig) {
+ this.splitConfig = splitConfig;
+
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
+ this.routes = splitConfig.getRules();
+ for(RuleContext rule :routes){
+ String expression = rule.getExpression();
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ Expression compiledExp = instance.compile(expression, true);
+ rule.setCompiledExpression(compiledExp);
+ OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){};
+ rule.setOutputTag(outputTag);
+ }
+ }
+
+
+ @Override
+ public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
+ try {
+ internalMetrics.incrementInEvents();
+ for (RuleContext route :routes){
+ boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true;
+ if (result) {
+ ctx.output(route.getOutputTag(), event);
+ }
+ }
+ }catch (Exception e) {
+ internalMetrics.incrementErrorEvents();
+ log.error("error in split function", e);
+ }
+ }
+
+ public static Boolean filterExecute(Expression expression, Map<String, Object> map) {
+
+ boolean result;
+ Object object = expression.execute(map);
+ if (object != null) {
+ result = (Boolean) object;
+ } else {
+ throw new ExpressionRuntimeException();
+ }
+ return result;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java
new file mode 100644
index 0000000..f6d2c8c
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java
@@ -0,0 +1,29 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public class SplitOperator implements Split {
+
+ @Override
+ public DataStream<Event> splitFunction(
+ DataStream<Event> dataStream, SplitConfig splitConfig)
+ throws Exception {
+ if (splitConfig.getParallelism() != 0) {
+ return dataStream
+ .process(new SplitFunction(splitConfig))
+ .setParallelism(splitConfig.getParallelism())
+ .name(splitConfig.getName());
+ } else {
+ return dataStream
+ .process(new SplitFunction(splitConfig))
+ .name(splitConfig.getName());
+ }
+ }
+ @Override
+ public String type() {
+ return "split";
+ }
+
+}
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split
new file mode 100644
index 0000000..500c367
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split
@@ -0,0 +1 @@
+com.geedgenetworks.core.split.SplitOperator \ No newline at end of file