summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-14 09:54:50 +0800
committerdoufenghu <[email protected]>2024-11-14 09:54:50 +0800
commitdf18fbe845df119e884e2e8f281bbf019d96c7e7 (patch)
tree88fad63d6bc0b4b6637467427d05119de809e265 /groot-core
parentd35dbe98007d341c7b41080535f5c6154063a448 (diff)
[Feature][api] AviatorFilterProcessorFactory renamed to FilterProcessorFactory.The Factory add supportsType method for supporting legency type of avaitor.
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java18
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java)2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java25
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java9
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory2
11 files changed, 71 insertions, 29 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java
index a77b2a4..6c18dc5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineConnectorFactory.java
@@ -29,6 +29,11 @@ public class InlineConnectorFactory implements SourceFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SourceProvider getSourceProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 获取DecodingFormat
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java
index 409395b..9e2aaac 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintConnectorFactory.java
@@ -29,6 +29,11 @@ public class PrintConnectorFactory implements SinkFactory {
}
@Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+ @Override
public SinkProvider getSinkProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 获取encodingFormat
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java
index 0997261..5cbcd5a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFactory.java
@@ -5,13 +5,20 @@ import com.geedgenetworks.api.factory.ProcessorFactory;
public class AggregateProcessorFactory implements ProcessorFactory {
+ private static final String IDENTIFIER = "aggregate";
+
+ @Override
+ public Processor<?> createProcessor() {
+ return new AggregateProcessor();
+ }
+
@Override
public String type() {
- return "aggregate";
+ return IDENTIFIER;
}
@Override
- public Processor<?> createProcessor() {
- return new AggregateProcessor();
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java
deleted file mode 100644
index db6d2ac..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessorFactory.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.geedgenetworks.core.processor.filter;
-
-import com.geedgenetworks.api.processor.Processor;
-import com.geedgenetworks.api.factory.ProcessorFactory;
-
-public class AviatorFilterProcessorFactory implements ProcessorFactory {
-
- @Override
- public Processor<?> createProcessor() {
- return new AviatorFilterProcessor();
- }
-
- @Override
- public String type() {
- return "aviator";
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
index a138b4e..8a08cb3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterFunction.java
@@ -51,7 +51,7 @@ public class FilterFunction extends RichFilterFunction<Event> {
} catch (ExpressionRuntimeException e){
isFilter = false;
- log.error("Invalid filter ! expression=" +expression);
+ log.error("Invalid filter expression: {}, {}", expression, e);
internalMetrics.incrementErrorEvents();
}
catch (RuntimeException ignored) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java
index 99f74e8..67fd61f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/AviatorFilterProcessor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessor.java
@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-public class AviatorFilterProcessor implements Processor<FilterConfig> {
+public class FilterProcessor implements Processor<FilterConfig> {
@Override
public DataStream<Event> process(StreamExecutionEnvironment env,
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java
new file mode 100644
index 0000000..b3cb049
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/filter/FilterProcessorFactory.java
@@ -0,0 +1,25 @@
+package com.geedgenetworks.core.processor.filter;
+
+import com.geedgenetworks.api.processor.Processor;
+import com.geedgenetworks.api.factory.ProcessorFactory;
+
+public class FilterProcessorFactory implements ProcessorFactory {
+ private static final String IDENTIFIER = "filter";
+ private static final String LEGACY_IDENTIFIER = "aviator";
+
+ @Override
+ public Processor<?> createProcessor() {
+ return new FilterProcessor();
+ }
+
+ @Override
+ public String type() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public boolean supportsType(String type) {
+ return LEGACY_IDENTIFIER.equalsIgnoreCase(type) || IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java
index 13e0c95..2adedf2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorFactory.java
@@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor;
import com.geedgenetworks.api.factory.ProcessorFactory;
public class ProjectionProcessorFactory implements ProcessorFactory {
-
+ private static final String IDENTIFIER = "projection";
@Override
public Processor<?> createProcessor() {
return new ProjectionProcessor();
@@ -12,7 +12,12 @@ public class ProjectionProcessorFactory implements ProcessorFactory {
@Override
public String type() {
- return "projection";
+ return IDENTIFIER;
+ }
+
+ @Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java
index 7777a05..4fd67e3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/split/SplitProcessorFactory.java
@@ -4,6 +4,8 @@ import com.geedgenetworks.api.processor.Processor;
import com.geedgenetworks.api.factory.ProcessorFactory;
public class SplitProcessorFactory implements ProcessorFactory {
+ private static final String IDENTIFIER = "split";
+
@Override
public Processor<?> createProcessor() {
@@ -12,7 +14,13 @@ public class SplitProcessorFactory implements ProcessorFactory {
@Override
public String type() {
- return "split";
+ return IDENTIFIER;
}
+ @Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
+ }
+
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java
index 5e51bfb..39dcb2f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFactory.java
@@ -4,7 +4,7 @@ import com.geedgenetworks.api.processor.Processor;
import com.geedgenetworks.api.factory.ProcessorFactory;
public class TableProcessorFactory implements ProcessorFactory {
-
+ private static final String IDENTIFIER = "table";
@Override
public Processor<?> createProcessor() {
return new TableProcessor();
@@ -12,7 +12,12 @@ public class TableProcessorFactory implements ProcessorFactory {
@Override
public String type() {
- return "table";
+ return IDENTIFIER;
+ }
+
+ @Override
+ public boolean supportsType(String type) {
+ return IDENTIFIER.equalsIgnoreCase(type);
}
}
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
index d8d9f73..a9e6c95 100644
--- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
@@ -1,6 +1,6 @@
com.geedgenetworks.core.connector.inline.InlineConnectorFactory
com.geedgenetworks.core.connector.print.PrintConnectorFactory
-com.geedgenetworks.core.processor.filter.AviatorFilterProcessorFactory
+com.geedgenetworks.core.processor.filter.FilterProcessorFactory
com.geedgenetworks.core.processor.split.SplitProcessorFactory
com.geedgenetworks.core.processor.projection.ProjectionProcessorFactory
com.geedgenetworks.core.processor.table.TableProcessorFactory