summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-12-19 18:28:57 +0800
committerwangkuan <[email protected]>2023-12-19 18:28:57 +0800
commit694dca052d291f01a122154fdc2c356e4a8fcf54 (patch)
treebd9234f1c9f625e35678e9067520787bbc5260e5
parentff28a1cc2c58a3455295b677c0eaed2f2a475065 (diff)
[improve][core] projectionprocess支持remove_fields属性配置
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java26
3 files changed, 37 insertions, 14 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java
index 7278261..9bc936d 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java
@@ -7,10 +7,10 @@ import java.util.Map;
public class ColumnUtil {
public static Map<String, Object> columnSelector(
- Map<String, Object> extractedFields, List<String> output_fields) {
+ Map<String, Object> extractedFields, List<String> output_fields) {
Map<String, Object> newExtractedFields = new HashMap<>();
- if (output_fields != null && output_fields.size() > 0) {
+ if (output_fields != null && !output_fields.isEmpty()) {
for (String field : output_fields) {
@@ -23,4 +23,16 @@ public class ColumnUtil {
return extractedFields;
}
}
+
+ public static Map<String, Object> columnRemover(
+ Map<String, Object> extractedFields, List<String> remove_fields) {
+ if (remove_fields != null && !remove_fields.isEmpty()) {
+
+ for (String field : remove_fields) {
+
+ extractedFields.remove(field);
+ }
+ }
+ return extractedFields;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
index 7a49c79..a0d01bd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java
@@ -11,6 +11,7 @@ public class ProjectionConfig implements Serializable {
private String format;
private String type;
private List<String> output_fields;
+ private List<String> remove_fields;
private Map<String, Object> properties;
private String name;
@@ -69,4 +70,12 @@ public class ProjectionConfig implements Serializable {
public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}
+
+ public List<String> getRemove_fields() {
+ return remove_fields;
+ }
+
+ public void setRemove_fields(List<String> remove_fields) {
+ this.remove_fields = remove_fields;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index e0e7ffe..0993670 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -44,18 +44,14 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
@Override
public void open(Configuration parameters) throws Exception {
- Configuration configuration = (Configuration) getRuntimeContext()
- .getExecutionConfig().getGlobalJobParameters();
- List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class);
- Map<String,String> udfClassReflect =getClassReflect(udfClassNameLists);
-
-
-
-
-
- this.functions = new LinkedList<>();
+ this.functions = new LinkedList<>();
try {
- // KnowledgeBaseUpdateJob.getInstance();
+ Configuration configuration = (Configuration) getRuntimeContext()
+ .getExecutionConfig().getGlobalJobParameters();
+ List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class);
+ Map<String,String> udfClassReflect =getClassReflect(udfClassNameLists);
+
+ // KnowledgeBaseUpdateJob.getInstance();
for (UDFContext udfContext : projectionConfig.getFunctions()) {
Expression filterExpression = null;
UdfEntity udfEntity = new UdfEntity();
@@ -111,11 +107,17 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
// 判断函数是否有output fields,减少输出字段
if (projectionConfig.getOutput_fields() != null
- && projectionConfig.getOutput_fields().size() > 0) {
+ && !projectionConfig.getOutput_fields().isEmpty()) {
event.setExtractedFields(
ColumnUtil.columnSelector(
event.getExtractedFields(), projectionConfig.getOutput_fields()));
}
+ if (projectionConfig.getRemove_fields() != null
+ && !projectionConfig.getRemove_fields().isEmpty()) {
+ event.setExtractedFields(
+ ColumnUtil.columnRemover(
+ event.getExtractedFields(), projectionConfig.getRemove_fields()));
+ }
if(!event.isDropped()){out.collect(event);}
}
public static Map<String, String> getClassReflect(List<String> plugins) {