From 694dca052d291f01a122154fdc2c356e4a8fcf54 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Tue, 19 Dec 2023 18:28:57 +0800 Subject: [improve][core] projectionprocess支持remove_fields属性配置 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../geedgenetworks/common/utils/ColumnUtil.java | 16 +++++++++++-- .../geedgenetworks/core/pojo/ProjectionConfig.java | 9 ++++++++ .../projection/ProjectionProcessFunction.java | 26 ++++++++++++---------- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java index 7278261..9bc936d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java @@ -7,10 +7,10 @@ import java.util.Map; public class ColumnUtil { public static Map columnSelector( - Map extractedFields, List output_fields) { + Map extractedFields, List output_fields) { Map newExtractedFields = new HashMap<>(); - if (output_fields != null && output_fields.size() > 0) { + if (output_fields != null && !output_fields.isEmpty()) { for (String field : output_fields) { @@ -23,4 +23,16 @@ public class ColumnUtil { return extractedFields; } } + + public static Map columnRemover( + Map extractedFields, List remove_fields) { + if (remove_fields != null && !remove_fields.isEmpty()) { + + for (String field : remove_fields) { + + extractedFields.remove(field); + } + } + return extractedFields; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java index 7a49c79..a0d01bd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java @@ -11,6 +11,7 @@ public class ProjectionConfig implements Serializable { private String format; private String type; private List output_fields; + private List remove_fields; private Map properties; private String name; @@ -69,4 +70,12 @@ public class ProjectionConfig implements Serializable { public void setProperties(Map properties) { this.properties = properties; } + + public List getRemove_fields() { + return remove_fields; + } + + public void setRemove_fields(List remove_fields) { + this.remove_fields = remove_fields; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index e0e7ffe..0993670 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -44,18 +44,14 @@ public class ProjectionProcessFunction extends ProcessFunction { @Override public void open(Configuration parameters) throws Exception { - Configuration configuration = (Configuration) getRuntimeContext() - .getExecutionConfig().getGlobalJobParameters(); - List udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class); - Map udfClassReflect =getClassReflect(udfClassNameLists); - - - - - - this.functions = new LinkedList<>(); + this.functions = new LinkedList<>(); try { - // KnowledgeBaseUpdateJob.getInstance(); + Configuration configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + List udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class); + Map udfClassReflect =getClassReflect(udfClassNameLists); + + // KnowledgeBaseUpdateJob.getInstance(); for (UDFContext udfContext : projectionConfig.getFunctions()) { Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); @@ -111,11 +107,17 @@ public class ProjectionProcessFunction extends ProcessFunction { // 判断函数是否有output fields,减少输出字段 if (projectionConfig.getOutput_fields() != null - && projectionConfig.getOutput_fields().size() > 0) { + && !projectionConfig.getOutput_fields().isEmpty()) { event.setExtractedFields( ColumnUtil.columnSelector( event.getExtractedFields(), projectionConfig.getOutput_fields())); } + if (projectionConfig.getRemove_fields() != null + && !projectionConfig.getRemove_fields().isEmpty()) { + event.setExtractedFields( + ColumnUtil.columnRemover( + event.getExtractedFields(), projectionConfig.getRemove_fields())); + } if(!event.isDropped()){out.collect(event);} } public static Map getClassReflect(List plugins) { -- cgit v1.2.3