diff options
| author | wangkuan <[email protected]> | 2023-12-19 18:28:57 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2023-12-19 18:28:57 +0800 |
| commit | 694dca052d291f01a122154fdc2c356e4a8fcf54 (patch) | |
| tree | bd9234f1c9f625e35678e9067520787bbc5260e5 | |
| parent | ff28a1cc2c58a3455295b677c0eaed2f2a475065 (diff) | |
[improve][core] projectionprocess支持remove_fields属性配置
3 files changed, 37 insertions, 14 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java index 7278261..9bc936d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/ColumnUtil.java @@ -7,10 +7,10 @@ import java.util.Map; public class ColumnUtil { public static Map<String, Object> columnSelector( - Map<String, Object> extractedFields, List<String> output_fields) { + Map<String, Object> extractedFields, List<String> output_fields) { Map<String, Object> newExtractedFields = new HashMap<>(); - if (output_fields != null && output_fields.size() > 0) { + if (output_fields != null && !output_fields.isEmpty()) { for (String field : output_fields) { @@ -23,4 +23,16 @@ public class ColumnUtil { return extractedFields; } } + + public static Map<String, Object> columnRemover( + Map<String, Object> extractedFields, List<String> remove_fields) { + if (remove_fields != null && !remove_fields.isEmpty()) { + + for (String field : remove_fields) { + + extractedFields.remove(field); + } + } + return extractedFields; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java index 7a49c79..a0d01bd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/ProjectionConfig.java @@ -11,6 +11,7 @@ public class ProjectionConfig implements Serializable { private String format; private String type; private List<String> output_fields; + private List<String> remove_fields; private Map<String, Object> properties; private String name; @@ -69,4 +70,12 @@ public class ProjectionConfig implements Serializable { public void setProperties(Map<String, Object> properties) { this.properties = properties; } + + public List<String> getRemove_fields() { + return remove_fields; + } + + public void setRemove_fields(List<String> remove_fields) { + this.remove_fields = remove_fields; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index e0e7ffe..0993670 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -44,18 +44,14 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { @Override public void open(Configuration parameters) throws Exception { - Configuration configuration = (Configuration) getRuntimeContext() - .getExecutionConfig().getGlobalJobParameters(); - List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class); - Map<String,String> udfClassReflect =getClassReflect(udfClassNameLists); - - - - - - this.functions = new LinkedList<>(); + this.functions = new LinkedList<>(); try { - // KnowledgeBaseUpdateJob.getInstance(); + Configuration configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class); + Map<String,String> udfClassReflect =getClassReflect(udfClassNameLists); + + // KnowledgeBaseUpdateJob.getInstance(); for (UDFContext udfContext : projectionConfig.getFunctions()) { Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); @@ -111,11 +107,17 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { // 判断函数是否有output fields,减少输出字段 if (projectionConfig.getOutput_fields() != null - && projectionConfig.getOutput_fields().size() > 0) { + && !projectionConfig.getOutput_fields().isEmpty()) { event.setExtractedFields( ColumnUtil.columnSelector( event.getExtractedFields(), projectionConfig.getOutput_fields())); } + if (projectionConfig.getRemove_fields() != null + && !projectionConfig.getRemove_fields().isEmpty()) { + event.setExtractedFields( + ColumnUtil.columnRemover( + event.getExtractedFields(), projectionConfig.getRemove_fields())); + } if(!event.isDropped()){out.collect(event);} } public static Map<String, String> getClassReflect(List<String> plugins) { |
