summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java7
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java2
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java5
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java3
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java5
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java5
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java5
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java5
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java5
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java18
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java)2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java25
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java9
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory2
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml2
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java5
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java5
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java5
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java5
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java5
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml2
29 files changed, 143 insertions, 35 deletions
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java
index bd624a9..adba440 100644
--- a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java
@@ -7,4 +7,11 @@ public interface Factory {
* (e.g. {@code kafka-1}).
*/
String type();
+
+ /**
+ * Returns true if the factory supports the given type.
+ */
+ boolean supportsType(String type);
+
+
}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java
index 22a14d3..f2f4a71 100644
--- a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java
@@ -100,7 +100,7 @@ public final class FactoryUtil {
final List<Factory> matchingFactories =
foundFactories.stream()
- .filter(f -> f.type().equals(type))
+ .filter(f -> f.supportsType(type))
.collect(Collectors.toList());
if (matchingFactories.isEmpty()) {
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
index 97b2ffc..0acdda4 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
@@ -21,6 +21,11 @@ public class CollectTableFactory implements SinkFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SinkProvider getSinkProvider(Context context) {
return new SinkProvider(){
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
index ac4d0bf..b67a842 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
@@ -5,15 +5,14 @@ public final class Constants {
public static final String DEFAULT_JOB_NAME = "groot-stream-job";
public static final String SOURCES = "sources";
public static final String FILTERS = "filters";
+ public static final String SPLITS = "splits";
public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines";
public static final String PROCESSING_PIPELINES = "processing_pipelines";
public static final String POSTPROCESSING_PIPELINES = "postprocessing_pipelines";
public static final String SINKS = "sinks";
public static final int SYSTEM_EXIT_CODE = 2618;
public static final String APPLICATION = "application";
-
public static final String PROPERTIES = "properties";
- public static final String SPLITS = "splits";
public static final String APPLICATION_ENV = "env";
public static final String APPLICATION_TOPOLOGY = "topology";
public static final String JOB_NAME = "name";
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java
index 789edf6..db9a373 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java
@@ -30,6 +30,11 @@ public class ClickHouseConnectorFactory implements SinkFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SinkProvider getSinkProvider(Context context) {
final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java
index 7bb899e..ec4d762 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java
@@ -23,6 +23,11 @@ public class FileConnectorFactory implements SourceFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SourceProvider getSourceProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java
index 66d4932..c9c8f72 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java
@@ -32,6 +32,11 @@ public class IPFixConnectorFactory implements SourceFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(IPFixConnectorOptions.PORT_RANGE);
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java
index 2041b67..e903a6a 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java
@@ -31,6 +31,11 @@ public class KafkaConnectorFactory implements SourceFactory, SinkFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SourceProvider getSourceProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 获取valueDecodingFormat
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java
index 0dff9d0..a859356 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java
@@ -29,6 +29,11 @@ public class MockConnectorFactory implements SourceFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SourceProvider getSourceProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java
index ff79dda..e8f964b 100644
--- a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java
+++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java
@@ -26,6 +26,11 @@ public class StarRocksConnectorFactory implements SinkFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SinkProvider getSinkProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(CONNECTION_INFO_PREFIX);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java
index a77b2a4..6c18dc5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java
@@ -29,6 +29,11 @@ public class InlineConnectorFactory implements SourceFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SourceProvider getSourceProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 获取DecodingFormat
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java
index 409395b..9e2aaac 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java
@@ -29,6 +29,11 @@ public class PrintConnectorFactory implements SinkFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SinkProvider getSinkProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 获取encodingFormat
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java
index 0997261..5cbcd5a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java
@@ -5,13 +5,20 @@ import com.geedgenetworks.api.factory.ProcessorFactory;
public class AggregateProcessorFactory implements ProcessorFactory {
+ private static final String IDENTIFIER = "aggregate";
+
+ @Override
+ public Processor<?> createProcessor() {
+ return new AggregateProcessor();
+ }
+
@Override
public String type() {
- return "aggregate";
+ return IDENTIFIER;
}
@Override
- public Processor<?> createProcessor() {
- return new AggregateProcessor();
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java
deleted file mode 100644
index db6d2ac..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.geedgenetworks.core.processor.filter;
-
-import com.geedgenetworks.api.processor.Processor;
-import com.geedgenetworks.api.factory.ProcessorFactory;
-
-public class AviatorFilterProcessorFactory implements ProcessorFactory {
-
- @Override
- public Processor<?> createProcessor() {
- return new AviatorFilterProcessor();
- }
-
- @Override
- public String type() {
- return "aviator";
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
index a138b4e..8a08cb3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
@@ -51,7 +51,7 @@ public class FilterFunction extends RichFilterFunction<Event> {
} catch (ExpressionRuntimeException e){
isFilter = false;
- log.error("Invalid filter ! expression=" +expression);
+ log.error("Invalid filter expression: {}, {}", expression, e);
internalMetrics.incrementErrorEvents();
}
catch (RuntimeException ignored) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java
index 99f74e8..67fd61f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java
@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-public class AviatorFilterProcessor implements Processor<FilterConfig> {
+public class FilterProcessor implements Processor<FilterConfig> {
@Override
public DataStream<Event> process(StreamExecutionEnvironment env,
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java
new file mode 100644
index 0000000..b3cb049
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java
@@ -0,0 +1,25 @@
+package com.geedgenetworks.core.processor.filter;
+
+import com.geedgenetworks.api.processor.Processor;
+import com.geedgenetworks.api.factory.ProcessorFactory;
+
+public class FilterProcessorFactory implements ProcessorFactory {
+ private static final String IDENTIFIER = "filter";
+ private static final String LEGACY_IDENTIFIER = "aviator";
+
+ @Override
+ public Processor<?> createProcessor() {
+ return new FilterProcessor();
+ }
+
+ @Override
+ public String type() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public boolean supportsType(String type) {
+ return LEGACY_IDENTIFIER.equalsIgnoreCase(type) || IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java
index 13e0c95..2adedf2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java
@@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor;
import com.geedgenetworks.api.factory.ProcessorFactory;
public class ProjectionProcessorFactory implements ProcessorFactory {
-
+ private static final String IDENTIFIER = "projection";
@Override
public Processor<?> createProcessor() {
return new ProjectionProcessor();
@@ -12,7 +12,12 @@ public class ProjectionProcessorFactory implements ProcessorFactory {
@Override
public String type() {
- return "projection";
+ return IDENTIFIER;
+ }
+
+ @Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java
index 7777a05..4fd67e3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java
@@ -4,6 +4,8 @@ import com.geedgenetworks.api.processor.Processor;
import com.geedgenetworks.api.factory.ProcessorFactory;
public class SplitProcessorFactory implements ProcessorFactory {
+ private static final String IDENTIFIER = "split";
+
@Override
public Processor<?> createProcessor() {
@@ -12,7 +14,13 @@ public class SplitProcessorFactory implements ProcessorFactory {
@Override
public String type() {
- return "split";
+ return IDENTIFIER;
}
+ @Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java
index 5e51bfb..39dcb2f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java
@@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor;
import com.geedgenetworks.api.factory.ProcessorFactory;
public class TableProcessorFactory implements ProcessorFactory {
-
+ private static final String IDENTIFIER = "table";
@Override
public Processor<?> createProcessor() {
return new TableProcessor();
@@ -12,7 +12,12 @@ public class TableProcessorFactory implements ProcessorFactory {
@Override
public String type() {
- return "table";
+ return IDENTIFIER;
+ }
+
+ @Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
}
}
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
index d8d9f73..a9e6c95 100644
--- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
@@ -1,6 +1,6 @@
com.geedgenetworks.core.connector.inline.InlineConnectorFactory
com.geedgenetworks.core.connector.print.PrintConnectorFactory
-com.geedgenetworks.core.processor.filter.AviatorFilterProcessorFactory
+com.geedgenetworks.core.processor.filter.FilterProcessorFactory
com.geedgenetworks.core.processor.split.SplitProcessorFactory
com.geedgenetworks.core.processor.projection.ProjectionProcessorFactory
com.geedgenetworks.core.processor.table.TableProcessorFactory
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 5e64962..9b58289 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -14,7 +14,7 @@ import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
index 408fbad..1e7c835 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
@@ -8,7 +8,7 @@ sources:
filters:
filter_operator:
- type: aviator
+ type: filter
properties:
expression: event.server_ip != '12.12.12.12'
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
index 502f044..8c222d8 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
@@ -30,6 +30,11 @@ public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFa
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFormatFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
index 7c2c0b2..88a2748 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
@@ -27,6 +27,11 @@ public class JsonFormatFactory implements DecodingFormatFactory, EncodingFormatF
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
return new DecodingFormat(){
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
index 2a3988f..2d57bad 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
@@ -25,6 +25,11 @@ public class MessagePackFormatFactory implements DecodingFormatFactory, Encoding
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
return new DecodingFormat() {
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
index 3d59ca8..6af0ffc 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
@@ -27,6 +27,11 @@ public class ProtobufFormatFactory implements DecodingFormatFactory, EncodingFor
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
final String messageName = formatOptions.get(MESSAGE_NAME);
final String descFilePath = formatOptions.get(DESC_FILE_PATH);
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
index 7490bfc..cc2b3b3 100644
--- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
@@ -27,6 +27,11 @@ public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFa
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
return new DecodingFormat(){
@Override
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
index 2908ffb..abb42a4 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -8,7 +8,7 @@ sources:
filters:
server_ip_filter:
- type: aviator
+ type: filter
properties:
expression: event.server_ip != '4.4.4.4'