summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-09 11:54:02 +0800
committerwangkuan <[email protected]>2024-07-09 11:54:02 +0800
commitdec8161b58b9cf8a47af17a7c00b750898d21d33 (patch)
tree31778bd855745cd0465c6c8eed7157b1f932037c
parent05f2efaea8e3ce6c5798cf38ebb4caef9291142c (diff)
[improve][core][common][bootstrap]hdfs相关配置提出到groot的系统配置中。取消asnlook和geoiplookup函数强校验,改为输出error信息。
-rw-r--r--config/grootstream.yaml33
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java3
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java33
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java2
-rw-r--r--pom.xml42
8 files changed, 34 insertions, 97 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index df9c82b..d8d6306 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -11,35 +11,14 @@ grootstream:
files:
- 64af7077-eb9b-4b8f-80cf-2ceebc89bea9
- 004390bc-3135-4a6f-a492-3662ecb9e289
-
- # - name: tsg_ip_asn
- # fs_type: hdfs
- # fs_path: /kb
- # files:
- # - asn_builtin.mmdb
- # properties:
- # dfs.namenodes: 192.168.44.12
- # dfs.replication: 1
- # #dfs.port: 9000
- # #dfs.user: root
- # #dfs.nameservices: ns1
- # #dfs.ha.namenodes.ns1: nn1,nn2
-
- # - name: tsg_ip_location
- # fs_type: hdfs
- # fs_path: /kb
- # files:
- # - ip_user_defined.mmdb
- # - ip_builtin.mmdb
- # properties:
- # dfs.namenodes: 192.168.44.12
- # dfs.replication: 1
- # #dfs.port: 9000
- # #dfs.user: root
- # #dfs.nameservices: ns1
- # #dfs.ha.namenodes.ns1: nn1,nn2
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.traffic_file: traffic_file_bucket
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
+ hadoop.dfs.namenodes: 192.168.44.12
+ hadoop.dfs.replication: 1
+ #hadoop.dfs.port: 9000
+ #hadoop.dfs.user: root
+ #hadoop.dfs.nameservices: ns1
+ #hadoop.dfs.ha.namenodes.ns1: nn1,nn2 \ No newline at end of file
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 46955cb..3c7d095 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -26,6 +26,9 @@ public final class Constants {
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
+ public static final String SYSPROP_GROOTSTREAM_HADOOP_PREFIX = "hadoop.";
+ public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes";
+
public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream";
public static final String HAZELCAST_GROOTSTREAM_CONFIG_DEFAULT = "grootstream.yaml";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
index def3866..ccec51b 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
@@ -31,7 +31,6 @@ public class HdfsUtils {
private HdfsUtils(Map<String, String> hdfsProp) {
Configuration config = new Configuration();
String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
- String replication = hdfsProp.getOrDefault("dfs.replication","1");
String port = hdfsProp.getOrDefault("dfs.port","9000");
String name = hdfsProp.getOrDefault("dfs.user","root");
List<String> hadoopNameNodes = new ArrayList<>();
@@ -40,14 +39,16 @@ public class HdfsUtils {
try {
hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
} catch (RuntimeException e) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of hadoop.namenodes is illegal");
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal");
}
}
if (hadoopNameNodes.size() == 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","1");
defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
config.set("fs.defaultFS", defaultFS);
config.set("dfs.replication", replication);
} else if (hadoopNameNodes.size() > 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","2");
String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
defaultFS = "hdfs://" + nameservices;
@@ -67,13 +68,18 @@ public class HdfsUtils {
}
}
- public static HdfsUtils getHdfsUtilInstance(Map<String, String> hdfsProp) {
+ public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) {
if (hdfsInstance == null) {
hdfsInstance = new HdfsUtils(hdfsProp);
}
return hdfsInstance;
}
-
+ public static HdfsUtils getHdfsUtilInstance() {
+ if (hdfsInstance == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error");
+ }
+ return hdfsInstance;
+ }
public byte[] readBytes(Path hdfsPath) {
try {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index cf82ab7..8b7e8d1 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -8,6 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.utils.HdfsUtils;
import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.common.udf.UDF;
@@ -54,7 +55,17 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
- KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault("scheduler.knowledge_base.update.interval.minutes", "5")));
+ KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5")));
+
+ for (Map.Entry<String, String> entry : commonConfig.getPropertiesConfig().entrySet()) {
+ Map<String, String> hdfsProp = new HashMap<>();
+ if (entry.getKey().startsWith(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX)) {
+ hdfsProp.put(entry.getKey().replace(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX, ""), entry.getValue());
+ }
+ if(!hdfsProp.isEmpty()){
+ HdfsUtils.initHdfsUtilInstance(hdfsProp);
+ }
+ }
for (UDFContext udfContext : udfContexts) {
Expression filterExpression = null;
UdfEntity udfEntity = new UdfEntity();
@@ -182,24 +193,4 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
super.close();
}
- public static String convertToCamelCase(String input) {
- StringBuilder result = new StringBuilder();
-
- // 分割单词
- String[] words = input.toLowerCase().split("_");
-
- for (int i = 0; i < words.length; i++) {
- String word = words[i];
-
- // 首字母大写(除了第一个单词)
- if (i != 0) {
- char firstChar = Character.toUpperCase(word.charAt(0));
- word = firstChar + word.substring(1);
- }
-
- result.append(word);
- }
-
- return result.toString();
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 34ee2f9..3a0ba9e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -42,7 +42,7 @@ public class AsnLookup implements UDF {
if (AsnKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().containsKey(kbName) && AsnKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().get(kbName).getIpLookupV2() != null) {
log.debug("Init AsnKnowledgeBase success ");
} else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init Function AsnLookUp error ");
+ log.error("AsnLookup init KnowledgeBase error ");
}
this.lookupFieldName = udfContext.getLookup_fields().get(0);
this.outputFieldName = udfContext.getOutput_fields().get(0);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index 46885c4..ce85b24 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -48,7 +48,7 @@ public class GeoIpLookup implements UDF {
if (GeoIpKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().containsKey(kbName) && GeoIpKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().get(kbName).getIpLookupV2() != null) {
log.debug("Init function GeoIpLookup success ");
} else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init function GeoIpLookup error ");
+ log.error("GeoIpLookup init KnowledgeBase error ");
}
this.lookupFieldName = udfContext.getLookup_fields().get(0);
if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()){
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
index 4d2119a..9a4509b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
@@ -72,7 +72,7 @@ public abstract class AbstractKnowledgeBaseHandler {
switch (fileSystemType) {
case "hdfs":
- return HdfsUtils.getHdfsUtilInstance(knowledgeBaseProp).readBytes(new org.apache.hadoop.fs.Path(filePath, fileName));
+ return HdfsUtils.getHdfsUtilInstance().readBytes(new org.apache.hadoop.fs.Path(filePath, fileName));
case "local":
return PathUtil.readBytes(Paths.get(filePath, fileName));
default:
diff --git a/pom.xml b/pom.xml
index 600c478..88bf30b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -433,48 +433,6 @@
<version>${auto-service.version}</version>
<scope>provided</scope>
</dependency>
-
- <!-- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.1</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.1</version>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.7.1</version>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>-->
</dependencies>
</dependencyManagement>