summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-07-09 05:31:11 +0000
committer王宽 <[email protected]>2024-07-09 05:31:11 +0000
commit36cb5140d28ab103b4b9e035afbfe4dc9adf7880 (patch)
treea9a4fea1a06d95a068d7eb3cc37c19f7af8db28c
parent7cfc17b56ff27da9c0e488883447a11a465fc766 (diff)
Feature/hdfs
-rw-r--r--config/grootstream.yaml6
-rw-r--r--groot-common/pom.xml16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java3
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java107
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java33
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java7
-rw-r--r--pom.xml3
14 files changed, 174 insertions, 49 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index acb19b1..d8d6306 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -16,3 +16,9 @@ grootstream:
hos.bucket.name.traffic_file: traffic_file_bucket
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
+ hadoop.dfs.namenodes: 192.168.44.12
+ hadoop.dfs.replication: 1
+ #hadoop.dfs.port: 9000
+ #hadoop.dfs.user: root
+ #hadoop.dfs.nameservices: ns1
+ #hadoop.dfs.ha.namenodes.ns1: nn1,nn2 \ No newline at end of file
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index d6463cb..f00cfc9 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -86,7 +86,21 @@
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
</dependencies>
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 46955cb..3c7d095 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -26,6 +26,9 @@ public final class Constants {
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
+ public static final String SYSPROP_GROOTSTREAM_HADOOP_PREFIX = "hadoop.";
+ public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes";
+
public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream";
public static final String HAZELCAST_GROOTSTREAM_CONFIG_DEFAULT = "grootstream.yaml";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
new file mode 100644
index 0000000..ccec51b
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.common.utils;
+
+import com.alibaba.fastjson2.JSONArray;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.shaded.com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class HdfsUtils {
+
+ private static FileSystem fs;
+ private static HdfsUtils hdfsInstance;
+
+ private HdfsUtils(Map<String, String> hdfsProp) {
+ Configuration config = new Configuration();
+ String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
+ String port = hdfsProp.getOrDefault("dfs.port","9000");
+ String name = hdfsProp.getOrDefault("dfs.user","root");
+ List<String> hadoopNameNodes = new ArrayList<>();
+ String defaultFS = "";
+ if (!paths.isEmpty()) {
+ try {
+ hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
+ } catch (RuntimeException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal");
+ }
+ }
+ if (hadoopNameNodes.size() == 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","1");
+ defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.replication", replication);
+ } else if (hadoopNameNodes.size() > 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","2");
+ String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
+ String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
+ defaultFS = "hdfs://" + nameservices;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.nameservices", nameservices);
+ config.set("dfs.ha.namenodes.ns1", ns1);
+ config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port);
+ config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port);
+ config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
+ config.set("dfs.replication", replication);
+ }
+ try {
+ fs = FileSystem.get(new URI(defaultFS), config, name);
+ } catch (IOException | URISyntaxException | InterruptedException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e);
+ }
+ }
+
+ public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) {
+ if (hdfsInstance == null) {
+ hdfsInstance = new HdfsUtils(hdfsProp);
+ }
+ return hdfsInstance;
+ }
+ public static HdfsUtils getHdfsUtilInstance() {
+ if (hdfsInstance == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error");
+ }
+ return hdfsInstance;
+ }
+ public byte[] readBytes(Path hdfsPath) {
+
+ try {
+ if (fs.exists(hdfsPath)) {
+ try (FSDataInputStream inputStream = fs.open(hdfsPath);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ return outputStream.toByteArray();
+ } catch (RuntimeException e) {
+ String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
+ }
+ }
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index cf82ab7..8b7e8d1 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -8,6 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.utils.HdfsUtils;
import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.common.udf.UDF;
@@ -54,7 +55,17 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
- KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault("scheduler.knowledge_base.update.interval.minutes", "5")));
+ KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5")));
+
+ for (Map.Entry<String, String> entry : commonConfig.getPropertiesConfig().entrySet()) {
+ Map<String, String> hdfsProp = new HashMap<>();
+ if (entry.getKey().startsWith(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX)) {
+ hdfsProp.put(entry.getKey().replace(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX, ""), entry.getValue());
+ }
+ if(!hdfsProp.isEmpty()){
+ HdfsUtils.initHdfsUtilInstance(hdfsProp);
+ }
+ }
for (UDFContext udfContext : udfContexts) {
Expression filterExpression = null;
UdfEntity udfEntity = new UdfEntity();
@@ -182,24 +193,4 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
super.close();
}
- public static String convertToCamelCase(String input) {
- StringBuilder result = new StringBuilder();
-
- // 分割单词
- String[] words = input.toLowerCase().split("_");
-
- for (int i = 0; i < words.length; i++) {
- String word = words[i];
-
- // 首字母大写(除了第一个单词)
- if (i != 0) {
- char firstChar = Character.toUpperCase(word.charAt(0));
- word = firstChar + word.substring(1);
- }
-
- result.append(word);
- }
-
- return result.toString();
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 34ee2f9..3a0ba9e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -42,7 +42,7 @@ public class AsnLookup implements UDF {
if (AsnKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().containsKey(kbName) && AsnKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().get(kbName).getIpLookupV2() != null) {
log.debug("Init AsnKnowledgeBase success ");
} else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init Function AsnLookUp error ");
+ log.error("AsnLookup init KnowledgeBase error ");
}
this.lookupFieldName = udfContext.getLookup_fields().get(0);
this.outputFieldName = udfContext.getOutput_fields().get(0);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index 46885c4..ce85b24 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -48,7 +48,7 @@ public class GeoIpLookup implements UDF {
if (GeoIpKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().containsKey(kbName) && GeoIpKnowledgeBaseHandler.getKbNameWithKbConfigAndFileMetas().get(kbName).getIpLookupV2() != null) {
log.debug("Init function GeoIpLookup success ");
} else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init function GeoIpLookup error ");
+ log.error("GeoIpLookup init KnowledgeBase error ");
}
this.lookupFieldName = udfContext.getLookup_fields().get(0);
if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()){
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
index 6ab8cc3..9a4509b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
@@ -4,13 +4,16 @@ import cn.hutool.core.io.file.PathUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
-import com.geedgenetworks.common.utils.FileUtils;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.utils.HdfsUtils;
import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -65,7 +68,17 @@ public abstract class AbstractKnowledgeBaseHandler {
public static byte[] readFileFromLocalPath(Path filePath) {
return PathUtil.readBytes(filePath);
}
-
+ public static byte[] readFileFromFileSystem(Map<String, String> knowledgeBaseProp, String filePath,String fileName,String fileSystemType) {
+
+ switch (fileSystemType) {
+ case "hdfs":
+ return HdfsUtils.getHdfsUtilInstance().readBytes(new org.apache.hadoop.fs.Path(filePath, fileName));
+ case "local":
+ return PathUtil.readBytes(Paths.get(filePath, fileName));
+ default:
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT,fileSystemType + " is illegal");
+ }
+ }
public static List<KnowLedgeBaseFileMeta> getMetadata(String type, String path) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
index 95f9291..be64fd9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
@@ -64,10 +64,9 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl
if ("http".equals(knowledgeBaseConfig.getFsType())) {
return readFileFromKnowledgeBase(knowledgeMetedataCacheMap.get(encodeId(id)).getPath(), knowledgeMetedataCacheMap.get(encodeId(id)).getIsValid());
}
- if ("local".equals(knowledgeBaseConfig.getFsType())) {
- return readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), id.toString()));
+ else {
+ return readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0),knowledgeBaseConfig.getFsType());
}
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
}
protected List<Long> getAllFileIds() {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
index 99a631d..d38097a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
@@ -60,15 +60,10 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled
*/
public byte[] downloadFile() {
byte[] bytes;
- switch (knowledgeBaseConfig.getFsType()) {
- case "http":
- bytes = readFileFromKnowledgeBase(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid());
- break;
- case "local":
- bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0)));
- break;
- default:
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
+ if (knowledgeBaseConfig.getFsType().equals("http")) {
+ bytes = readFileFromKnowledgeBase(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid());
+ } else {
+ bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(), knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0), knowledgeBaseConfig.getFsType());
}
return decrypt(bytes);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
index c367d2d..a1927db 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
@@ -9,6 +9,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -105,8 +106,7 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
for (int i = 0; i < knowledgeBaseConfig.getFiles().size(); i++) {
try {
- byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i)));
-
+ byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i),knowledgeBaseConfig.getFsType());
switch (i) {
case 0:
asnLookupBuilder.loadAsnDataFile(new ByteArrayInputStream(bytes));
@@ -130,6 +130,8 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
return true;
}
+
+
private boolean buildKnowledgeBaseWithMeta(KnowledgeBaseConfig knowledgeBaseConfig) {
IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java
index 99b64a8..13f9a7e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java
@@ -91,8 +91,7 @@ public class GeoIpKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false);
for (int i = 0; i < knowledgeBaseConfig.getFiles().size(); i++) {
try {
- byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i)));
-
+ byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i),knowledgeBaseConfig.getFsType());
switch (i) {
case 0:
ipLookupBuilder.loadDataFile(new ByteArrayInputStream(bytes));
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
index 2e187b3..20defe3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
@@ -69,15 +69,12 @@ public class RuleKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
public void updateCache() {
if ("http".equals(knowledgeBaseConfig.getFsType())) {
requestApi();
- return;
}
- if ("local".equals(knowledgeBaseConfig.getFsType())) {
- byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath()));
+ else {
+ byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(), knowledgeBaseConfig.getFsPath(), "", knowledgeBaseConfig.getFsType());
RuleResponse ruleResponse = JSON.parseObject(new String(bytes), RuleResponse.class);
processResponse(ruleResponse);
- return;
}
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
}
private void requestApi() {
diff --git a/pom.xml b/pom.xml
index 46d385c..77a6f3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
<config.version>1.3.3</config.version>
<hazelcast.version>5.1</hazelcast.version>
<quartz.version>2.3.2</quartz.version>
+ <hadoop.version>2.7.1</hadoop.version>
<!--Option config-->
<test.dependency.skip>true</test.dependency.skip>
<skip.spotless>true</skip.spotless>
@@ -76,7 +77,6 @@
<dependencyManagement>
<dependencies>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -433,7 +433,6 @@
<version>${auto-service.version}</version>
<scope>provided</scope>
</dependency>
-
</dependencies>
</dependencyManagement>