diff options
29 files changed, 143 insertions, 35 deletions
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java index bd624a9..adba440 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java @@ -7,4 +7,11 @@ public interface Factory { * (e.g. {@code kafka-1}). */ String type(); + + /** + * Returns true if the factory supports the given type. + */ + boolean supportsType(String type); + + } diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java index 22a14d3..f2f4a71 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java @@ -100,7 +100,7 @@ public final class FactoryUtil { final List<Factory> matchingFactories = foundFactories.stream() - .filter(f -> f.type().equals(type)) + .filter(f -> f.supportsType(type)) .collect(Collectors.toList()); if (matchingFactories.isEmpty()) { diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java index 97b2ffc..0acdda4 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java @@ -21,6 +21,11 @@ public class CollectTableFactory implements SinkFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SinkProvider getSinkProvider(Context context) { return new SinkProvider(){ diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java index ac4d0bf..b67a842 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java @@ -5,15 +5,14 @@ public final class Constants { public static final String DEFAULT_JOB_NAME = "groot-stream-job"; public static final String SOURCES = "sources"; public static final String FILTERS = "filters"; + public static final String SPLITS = "splits"; public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines"; public static final String PROCESSING_PIPELINES = "processing_pipelines"; public static final String POSTPROCESSING_PIPELINES = "postprocessing_pipelines"; public static final String SINKS = "sinks"; public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; - public static final String PROPERTIES = "properties"; - public static final String SPLITS = "splits"; public static final String APPLICATION_ENV = "env"; public static final String APPLICATION_TOPOLOGY = "topology"; public static final String JOB_NAME = "name"; diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java index 789edf6..db9a373 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorFactory.java @@ -30,6 +30,11 @@ public class ClickHouseConnectorFactory implements SinkFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SinkProvider getSinkProvider(Context context) { final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java index 7bb899e..ec4d762 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorFactory.java @@ -23,6 +23,11 @@ public class FileConnectorFactory implements SourceFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SourceProvider getSourceProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT); diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java index 66d4932..c9c8f72 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorFactory.java @@ -32,6 +32,11 @@ public class IPFixConnectorFactory implements SourceFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public Set<ConfigOption<?>> requiredOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(IPFixConnectorOptions.PORT_RANGE); diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java index 2041b67..e903a6a 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorFactory.java @@ -31,6 +31,11 @@ public class KafkaConnectorFactory implements SourceFactory, SinkFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SourceProvider getSourceProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // 获取valueDecodingFormat diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java index 0dff9d0..a859356 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorFactory.java @@ -29,6 +29,11 @@ public class MockConnectorFactory implements SourceFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SourceProvider getSourceProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validate(); diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java index ff79dda..e8f964b 100644 --- a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java +++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksConnectorFactory.java @@ -26,6 +26,11 @@ public class StarRocksConnectorFactory implements SinkFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SinkProvider getSinkProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validateExcept(CONNECTION_INFO_PREFIX); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java index a77b2a4..6c18dc5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java @@ -29,6 +29,11 @@ public class InlineConnectorFactory implements SourceFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SourceProvider getSourceProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // 获取DecodingFormat diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java index 409395b..9e2aaac 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java @@ -29,6 +29,11 @@ public class PrintConnectorFactory implements SinkFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SinkProvider getSinkProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // 获取encodingFormat diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java index 0997261..5cbcd5a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java @@ -5,13 +5,20 @@ import com.geedgenetworks.api.factory.ProcessorFactory; public class AggregateProcessorFactory implements ProcessorFactory { + private static final String IDENTIFIER = "aggregate"; + + @Override + public Processor<?> createProcessor() { + return new AggregateProcessor(); + } + @Override public String type() { - return "aggregate"; + return IDENTIFIER; } @Override - public Processor<?> createProcessor() { - return new AggregateProcessor(); + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java deleted file mode 100644 index db6d2ac..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.geedgenetworks.core.processor.filter; - -import com.geedgenetworks.api.processor.Processor; -import com.geedgenetworks.api.factory.ProcessorFactory; - -public class AviatorFilterProcessorFactory implements ProcessorFactory { - - @Override - public Processor<?> createProcessor() { - return new AviatorFilterProcessor(); - } - - @Override - public String type() { - return "aviator"; - } - -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java index a138b4e..8a08cb3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java @@ -51,7 +51,7 @@ public class FilterFunction extends RichFilterFunction<Event> { } catch (ExpressionRuntimeException e){ isFilter = false; - log.error("Invalid filter ! expression=" +expression); + log.error("Invalid filter expression: {}, {}", expression, e); internalMetrics.incrementErrorEvents(); } catch (RuntimeException ignored) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java index 99f74e8..67fd61f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java @@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -public class AviatorFilterProcessor implements Processor<FilterConfig> { +public class FilterProcessor implements Processor<FilterConfig> { @Override public DataStream<Event> process(StreamExecutionEnvironment env, diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java new file mode 100644 index 0000000..b3cb049 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.core.processor.filter; + +import com.geedgenetworks.api.processor.Processor; +import com.geedgenetworks.api.factory.ProcessorFactory; + +public class FilterProcessorFactory implements ProcessorFactory { + private static final String IDENTIFIER = "filter"; + private static final String LEGACY_IDENTIFIER = "aviator"; + + @Override + public Processor<?> createProcessor() { + return new FilterProcessor(); + } + + @Override + public String type() { + return IDENTIFIER; + } + + @Override + public boolean supportsType(String type) { + return LEGACY_IDENTIFIER.equalsIgnoreCase(type) || IDENTIFIER.equalsIgnoreCase(type); + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java index 13e0c95..2adedf2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java @@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor; import com.geedgenetworks.api.factory.ProcessorFactory; public class ProjectionProcessorFactory implements ProcessorFactory { - + private static final String IDENTIFIER = "projection"; @Override public Processor<?> createProcessor() { return new ProjectionProcessor(); @@ -12,7 +12,12 @@ public class ProjectionProcessorFactory implements ProcessorFactory { @Override public String type() { - return "projection"; + return IDENTIFIER; + } + + @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java index 7777a05..4fd67e3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java @@ -4,6 +4,8 @@ import com.geedgenetworks.api.processor.Processor; import com.geedgenetworks.api.factory.ProcessorFactory; public class SplitProcessorFactory implements ProcessorFactory { + private static final String IDENTIFIER = "split"; + @Override public Processor<?> createProcessor() { @@ -12,7 +14,13 @@ public class SplitProcessorFactory implements ProcessorFactory { @Override public String type() { - return "split"; + return IDENTIFIER; } + @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java index 5e51bfb..39dcb2f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java @@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor; import com.geedgenetworks.api.factory.ProcessorFactory; public class TableProcessorFactory implements ProcessorFactory { - + private static final String IDENTIFIER = "table"; @Override public Processor<?> createProcessor() { return new TableProcessor(); @@ -12,7 +12,12 @@ public class TableProcessorFactory implements ProcessorFactory { @Override public String type() { - return "table"; + return IDENTIFIER; + } + + @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); } } diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory index d8d9f73..a9e6c95 100644 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory @@ -1,6 +1,6 @@ com.geedgenetworks.core.connector.inline.InlineConnectorFactory com.geedgenetworks.core.connector.print.PrintConnectorFactory -com.geedgenetworks.core.processor.filter.AviatorFilterProcessorFactory +com.geedgenetworks.core.processor.filter.FilterProcessorFactory com.geedgenetworks.core.processor.split.SplitProcessorFactory com.geedgenetworks.core.processor.projection.ProjectionProcessorFactory com.geedgenetworks.core.processor.table.TableProcessorFactory diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 5e64962..9b58289 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -14,7 +14,7 @@ import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml index 408fbad..1e7c835 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml @@ -8,7 +8,7 @@ sources: filters: filter_operator: - type: aviator + type: filter properties: expression: event.server_ip != '12.12.12.12' diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java index 502f044..8c222d8 100644 --- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java @@ -30,6 +30,11 @@ public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFa } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFormatFactoryOptions(this, formatOptions); validateFormatOptions(formatOptions); diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java index 7c2c0b2..88a2748 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java @@ -27,6 +27,11 @@ public class JsonFormatFactory implements DecodingFormatFactory, EncodingFormatF } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) { final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); return new DecodingFormat(){ diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java index 2a3988f..2d57bad 100644 --- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java @@ -25,6 +25,11 @@ public class MessagePackFormatFactory implements DecodingFormatFactory, Encoding } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) { return new DecodingFormat() { diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java index 3d59ca8..6af0ffc 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java @@ -27,6 +27,11 @@ public class ProtobufFormatFactory implements DecodingFormatFactory, EncodingFor } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) { final String messageName = formatOptions.get(MESSAGE_NAME); final String descFilePath = formatOptions.get(DESC_FILE_PATH); diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java index 7490bfc..cc2b3b3 100644 --- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java @@ -27,6 +27,11 @@ public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFa } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) { return new DecodingFormat(){ @Override diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml index 2908ffb..abb42a4 100644 --- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml +++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml @@ -8,7 +8,7 @@ sources: filters: server_ip_filter: - type: aviator + type: filter properties: expression: event.server_ip != '4.4.4.4' |
