diff options
| author | doufenghu <[email protected]> | 2024-11-14 09:54:50 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-14 09:54:50 +0800 |
| commit | df18fbe845df119e884e2e8f281bbf019d96c7e7 (patch) | |
| tree | 88fad63d6bc0b4b6637467427d05119de809e265 /groot-core | |
| parent | d35dbe98007d341c7b41080535f5c6154063a448 (diff) | |
[Feature][api] AviatorFilterProcessorFactory renamed to FilterProcessorFactory.The Factory add supportsType method for supporting legency type of avaitor.
Diffstat (limited to 'groot-core')
11 files changed, 71 insertions, 29 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java index a77b2a4..6c18dc5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java @@ -29,6 +29,11 @@ public class InlineConnectorFactory implements SourceFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SourceProvider getSourceProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // 获取DecodingFormat diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java index 409395b..9e2aaac 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java @@ -29,6 +29,11 @@ public class PrintConnectorFactory implements SinkFactory { } @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + @Override public SinkProvider getSinkProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // 获取encodingFormat diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java index 0997261..5cbcd5a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java @@ -5,13 +5,20 @@ import com.geedgenetworks.api.factory.ProcessorFactory; public class AggregateProcessorFactory implements ProcessorFactory { + private static final String IDENTIFIER = "aggregate"; + + @Override + public Processor<?> createProcessor() { + return new AggregateProcessor(); + } + @Override public String type() { - return "aggregate"; + return IDENTIFIER; } @Override - public Processor<?> createProcessor() { - return new AggregateProcessor(); + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java deleted file mode 100644 index db6d2ac..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.geedgenetworks.core.processor.filter; - -import com.geedgenetworks.api.processor.Processor; -import com.geedgenetworks.api.factory.ProcessorFactory; - -public class AviatorFilterProcessorFactory implements ProcessorFactory { - - @Override - public Processor<?> createProcessor() { - return new AviatorFilterProcessor(); - } - - @Override - public String type() { - return "aviator"; - } - -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java index a138b4e..8a08cb3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java @@ -51,7 +51,7 @@ public class FilterFunction extends RichFilterFunction<Event> { } catch (ExpressionRuntimeException e){ isFilter = false; - log.error("Invalid filter ! expression=" +expression); + log.error("Invalid filter expression: {}, {}", expression, e); internalMetrics.incrementErrorEvents(); } catch (RuntimeException ignored) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java index 99f74e8..67fd61f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java @@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -public class AviatorFilterProcessor implements Processor<FilterConfig> { +public class FilterProcessor implements Processor<FilterConfig> { @Override public DataStream<Event> process(StreamExecutionEnvironment env, diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java new file mode 100644 index 0000000..b3cb049 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.core.processor.filter; + +import com.geedgenetworks.api.processor.Processor; +import com.geedgenetworks.api.factory.ProcessorFactory; + +public class FilterProcessorFactory implements ProcessorFactory { + private static final String IDENTIFIER = "filter"; + private static final String LEGACY_IDENTIFIER = "aviator"; + + @Override + public Processor<?> createProcessor() { + return new FilterProcessor(); + } + + @Override + public String type() { + return IDENTIFIER; + } + + @Override + public boolean supportsType(String type) { + return LEGACY_IDENTIFIER.equalsIgnoreCase(type) || IDENTIFIER.equalsIgnoreCase(type); + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java index 13e0c95..2adedf2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java @@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor; import com.geedgenetworks.api.factory.ProcessorFactory; public class ProjectionProcessorFactory implements ProcessorFactory { - + private static final String IDENTIFIER = "projection"; @Override public Processor<?> createProcessor() { return new ProjectionProcessor(); @@ -12,7 +12,12 @@ public class ProjectionProcessorFactory implements ProcessorFactory { @Override public String type() { - return "projection"; + return IDENTIFIER; + } + + @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java index 7777a05..4fd67e3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java @@ -4,6 +4,8 @@ import com.geedgenetworks.api.processor.Processor; import com.geedgenetworks.api.factory.ProcessorFactory; public class SplitProcessorFactory implements ProcessorFactory { + private static final String IDENTIFIER = "split"; + @Override public Processor<?> createProcessor() { @@ -12,7 +14,13 @@ public class SplitProcessorFactory implements ProcessorFactory { @Override public String type() { - return "split"; + return IDENTIFIER; } + @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); + } + + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java index 5e51bfb..39dcb2f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java @@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor; import com.geedgenetworks.api.factory.ProcessorFactory; public class TableProcessorFactory implements ProcessorFactory { - + private static final String IDENTIFIER = "table"; @Override public Processor<?> createProcessor() { return new TableProcessor(); @@ -12,7 +12,12 @@ public class TableProcessorFactory implements ProcessorFactory { @Override public String type() { - return "table"; + return IDENTIFIER; + } + + @Override + public boolean supportsType(String type) { + return IDENTIFIER.equalsIgnoreCase(type); } } diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory index d8d9f73..a9e6c95 100644 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory @@ -1,6 +1,6 @@ com.geedgenetworks.core.connector.inline.InlineConnectorFactory com.geedgenetworks.core.connector.print.PrintConnectorFactory -com.geedgenetworks.core.processor.filter.AviatorFilterProcessorFactory +com.geedgenetworks.core.processor.filter.FilterProcessorFactory com.geedgenetworks.core.processor.split.SplitProcessorFactory com.geedgenetworks.core.processor.projection.ProjectionProcessorFactory com.geedgenetworks.core.processor.table.TableProcessorFactory |
