diff options
| author | wangkuan <[email protected]> | 2024-07-09 11:54:02 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-07-09 11:54:02 +0800 |
| commit | dec8161b58b9cf8a47af17a7c00b750898d21d33 (patch) | |
| tree | 31778bd855745cd0465c6c8eed7157b1f932037c | |
| parent | 05f2efaea8e3ce6c5798cf38ebb4caef9291142c (diff) | |
[improve][core][common][bootstrap]hdfs相关配置提出到groot的系统配置中。取消asnlook和geoiplookup函数强校验,改为输出error信息。
8 files changed, 34 insertions, 97 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml index df9c82b..d8d6306 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -11,35 +11,14 @@ grootstream: files: - 64af7077-eb9b-4b8f-80cf-2ceebc89bea9 - 004390bc-3135-4a6f-a492-3662ecb9e289 - - # - name: tsg_ip_asn - # fs_type: hdfs - # fs_path: /kb - # files: - # - asn_builtin.mmdb - # properties: - # dfs.namenodes: 192.168.44.12 - # dfs.replication: 1 - # #dfs.port: 9000 - # #dfs.user: root - # #dfs.nameservices: ns1 - # #dfs.ha.namenodes.ns1: nn1,nn2 - - # - name: tsg_ip_location - # fs_type: hdfs - # fs_path: /kb - # files: - # - ip_user_defined.mmdb - # - ip_builtin.mmdb - # properties: - # dfs.namenodes: 192.168.44.12 - # dfs.replication: 1 - # #dfs.port: 9000 - # #dfs.user: root - # #dfs.nameservices: ns1 - # #dfs.ha.namenodes.ns1: nn1,nn2 properties: hos.path: http://192.168.44.12:9098/hos hos.bucket.name.traffic_file: traffic_file_bucket hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket scheduler.knowledge_base.update.interval.minutes: 5 + hadoop.dfs.namenodes: 192.168.44.12 + hadoop.dfs.replication: 1 + #hadoop.dfs.port: 9000 + #hadoop.dfs.user: root + #hadoop.dfs.nameservices: ns1 + #hadoop.dfs.ha.namenodes.ns1: nn1,nn2
\ No newline at end of file diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index 46955cb..3c7d095 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -26,6 +26,9 @@ public final class Constants { public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; + public static final String SYSPROP_GROOTSTREAM_HADOOP_PREFIX = "hadoop."; + public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes"; + public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream"; public static final String HAZELCAST_GROOTSTREAM_CONFIG_DEFAULT = "grootstream.yaml"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java index def3866..ccec51b 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java @@ -31,7 +31,6 @@ public class HdfsUtils { private HdfsUtils(Map<String, String> hdfsProp) { Configuration config = new Configuration(); String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12"); - String replication = hdfsProp.getOrDefault("dfs.replication","1"); String port = hdfsProp.getOrDefault("dfs.port","9000"); String name = hdfsProp.getOrDefault("dfs.user","root"); List<String> hadoopNameNodes = new ArrayList<>(); @@ -40,14 +39,16 @@ public class HdfsUtils { try { hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths); } catch (RuntimeException e) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of hadoop.namenodes is illegal"); + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal"); } } if (hadoopNameNodes.size() == 1) { + String replication = hdfsProp.getOrDefault("dfs.replication","1"); defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port; config.set("fs.defaultFS", defaultFS); config.set("dfs.replication", replication); } else if (hadoopNameNodes.size() > 1) { + String replication = hdfsProp.getOrDefault("dfs.replication","2"); String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1"); String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2"); defaultFS = "hdfs://" + nameservices; @@ -67,13 +68,18 @@ public class HdfsUtils { } } - public static HdfsUtils getHdfsUtilInstance(Map<String, String> hdfsProp) { + public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) { if (hdfsInstance == null) { hdfsInstance = new HdfsUtils(hdfsProp); } return hdfsInstance; } - + public static HdfsUtils getHdfsUtilInstance() { + if (hdfsInstance == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error"); + } + return hdfsInstance; + } public byte[] readBytes(Path hdfsPath) { try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index cf82ab7..8b7e8d1 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -8,6 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.utils.HdfsUtils; import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.common.udf.UDF; @@ -54,7 +55,17 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); - KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault("scheduler.knowledge_base.update.interval.minutes", "5"))); + KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5"))); + + for (Map.Entry<String, String> entry : commonConfig.getPropertiesConfig().entrySet()) { + Map<String, String> hdfsProp = new HashMap<>(); + if (entry.getKey().startsWith(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX)) { + hdfsProp.put(entry.getKey().replace(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX, ""), entry.getValue()); + } + if(!hdfsProp.isEmpty()){ + HdfsUtils.initHdfsUtilInstance(hdfsProp); + } + } for (UDFContext udfContext : udfContexts) { Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); @@ -182,24 +193,4 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { super.close(); } - public static String convertToCamelCase(String input) { - StringBuilder result = new StringBuilder(); - - // 分割单词 - String[] words = input.toLowerCase().split("_"); - - for (int i = 0; i < words.length; i++) { - String word = words[i]; - - // 首字母大写(除了第一个单词) - if (i != 0) { - char firstChar = Character.toUpperCase(word.charAt(0)); - word = firstChar + word.substring(1); - } - - result.append(word); - } - - return result.toString(); - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index 34ee2f9..3a0ba9e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -42,7 +42,7 @@ public class AsnLookup implements UDF { if (AsnKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().containsKey(kbName) && AsnKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().get(kbName).getIpLookupV2() != null) { log.debug("Init AsnKnowledgeBase success "); } else { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init Function AsnLookUp error "); + log.error("AsnLookup init KnowledgeBase error "); } this.lookupFieldName = udfContext.getLookup_fields().get(0); this.outputFieldName = udfContext.getOutput_fields().get(0); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index 46885c4..ce85b24 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -48,7 +48,7 @@ public class GeoIpLookup implements UDF { if (GeoIpKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().containsKey(kbName) && GeoIpKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().get(kbName).getIpLookupV2() != null) { log.debug("Init function GeoIpLookup success "); } else { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init function GeoIpLookup error "); + log.error("GeoIpLookup init KnowledgeBase error "); } this.lookupFieldName = udfContext.getLookup_fields().get(0); if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()){ diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java index 4d2119a..9a4509b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java @@ -72,7 +72,7 @@ public abstract class AbstractKnowledgeBaseHandler { switch (fileSystemType) { case "hdfs": - return HdfsUtils.getHdfsUtilInstance(knowledgeBaseProp).readBytes(new org.apache.hadoop.fs.Path(filePath, fileName)); + return HdfsUtils.getHdfsUtilInstance().readBytes(new org.apache.hadoop.fs.Path(filePath, fileName)); case "local": return PathUtil.readBytes(Paths.get(filePath, fileName)); default: @@ -433,48 +433,6 @@ <version>${auto-service.version}</version> <scope>provided</scope> </dependency> - - <!-- <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>2.7.1</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>servlet-api</artifactId> - <groupId>javax.servlet</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>2.7.1</version> - <exclusions> - <exclusion> - <artifactId>servlet-api</artifactId> - <groupId>javax.servlet</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>2.7.1</version> - <exclusions> - <exclusion> - <artifactId>servlet-api</artifactId> - <groupId>javax.servlet</groupId> - </exclusion> - </exclusions> - </dependency>--> </dependencies> </dependencyManagement> |
