summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-05 17:55:09 +0800
committerwangkuan <[email protected]>2024-07-05 17:55:09 +0800
commit05f2efaea8e3ce6c5798cf38ebb4caef9291142c (patch)
treea6a8c79cb9ebaa3d5583ff47f1d448d5d58fb0fe
parentbbb50013547e0bf83bb64a7611b911a5a92a41c0 (diff)
[improve][core]支持通过hdfs读取知识库
-rw-r--r--config/grootstream.yaml27
-rw-r--r--groot-common/pom.xml16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java101
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java7
-rw-r--r--pom.xml43
10 files changed, 213 insertions, 25 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index acb19b1..df9c82b 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -11,6 +11,33 @@ grootstream:
files:
- 64af7077-eb9b-4b8f-80cf-2ceebc89bea9
- 004390bc-3135-4a6f-a492-3662ecb9e289
+
+ # - name: tsg_ip_asn
+ # fs_type: hdfs
+ # fs_path: /kb
+ # files:
+ # - asn_builtin.mmdb
+ # properties:
+ # dfs.namenodes: 192.168.44.12
+ # dfs.replication: 1
+ # #dfs.port: 9000
+ # #dfs.user: root
+ # #dfs.nameservices: ns1
+ # #dfs.ha.namenodes.ns1: nn1,nn2
+
+ # - name: tsg_ip_location
+ # fs_type: hdfs
+ # fs_path: /kb
+ # files:
+ # - ip_user_defined.mmdb
+ # - ip_builtin.mmdb
+ # properties:
+ # dfs.namenodes: 192.168.44.12
+ # dfs.replication: 1
+ # #dfs.port: 9000
+ # #dfs.user: root
+ # #dfs.nameservices: ns1
+ # #dfs.ha.namenodes.ns1: nn1,nn2
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.traffic_file: traffic_file_bucket
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index d6463cb..f00cfc9 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -86,7 +86,21 @@
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
</dependencies>
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
new file mode 100644
index 0000000..def3866
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
@@ -0,0 +1,101 @@
+package com.geedgenetworks.common.utils;
+
+import com.alibaba.fastjson2.JSONArray;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.shaded.com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class HdfsUtils {
+
+ private static FileSystem fs;
+ private static HdfsUtils hdfsInstance;
+
+ private HdfsUtils(Map<String, String> hdfsProp) {
+ Configuration config = new Configuration();
+ String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
+ String replication = hdfsProp.getOrDefault("dfs.replication","1");
+ String port = hdfsProp.getOrDefault("dfs.port","9000");
+ String name = hdfsProp.getOrDefault("dfs.user","root");
+ List<String> hadoopNameNodes = new ArrayList<>();
+ String defaultFS = "";
+ if (!paths.isEmpty()) {
+ try {
+ hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
+ } catch (RuntimeException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of hadoop.namenodes is illegal");
+ }
+ }
+ if (hadoopNameNodes.size() == 1) {
+ defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.replication", replication);
+ } else if (hadoopNameNodes.size() > 1) {
+ String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
+ String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
+ defaultFS = "hdfs://" + nameservices;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.nameservices", nameservices);
+ config.set("dfs.ha.namenodes.ns1", ns1);
+ config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port);
+ config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port);
+ config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
+ config.set("dfs.replication", replication);
+ }
+ try {
+ fs = FileSystem.get(new URI(defaultFS), config, name);
+ } catch (IOException | URISyntaxException | InterruptedException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e);
+ }
+ }
+
+ public static HdfsUtils getHdfsUtilInstance(Map<String, String> hdfsProp) {
+ if (hdfsInstance == null) {
+ hdfsInstance = new HdfsUtils(hdfsProp);
+ }
+ return hdfsInstance;
+ }
+
+ public byte[] readBytes(Path hdfsPath) {
+
+ try {
+ if (fs.exists(hdfsPath)) {
+ try (FSDataInputStream inputStream = fs.open(hdfsPath);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ return outputStream.toByteArray();
+ } catch (RuntimeException e) {
+ String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
+ }
+ }
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
index 6ab8cc3..4d2119a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
@@ -4,13 +4,16 @@ import cn.hutool.core.io.file.PathUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
-import com.geedgenetworks.common.utils.FileUtils;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.utils.HdfsUtils;
import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -65,7 +68,17 @@ public abstract class AbstractKnowledgeBaseHandler {
public static byte[] readFileFromLocalPath(Path filePath) {
return PathUtil.readBytes(filePath);
}
-
+ public static byte[] readFileFromFileSystem(Map<String, String> knowledgeBaseProp, String filePath,String fileName,String fileSystemType) {
+
+ switch (fileSystemType) {
+ case "hdfs":
+ return HdfsUtils.getHdfsUtilInstance(knowledgeBaseProp).readBytes(new org.apache.hadoop.fs.Path(filePath, fileName));
+ case "local":
+ return PathUtil.readBytes(Paths.get(filePath, fileName));
+ default:
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT,fileSystemType + " is illegal");
+ }
+ }
public static List<KnowLedgeBaseFileMeta> getMetadata(String type, String path) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
index 95f9291..be64fd9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
@@ -64,10 +64,9 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl
if ("http".equals(knowledgeBaseConfig.getFsType())) {
return readFileFromKnowledgeBase(knowledgeMetedataCacheMap.get(encodeId(id)).getPath(), knowledgeMetedataCacheMap.get(encodeId(id)).getIsValid());
}
- if ("local".equals(knowledgeBaseConfig.getFsType())) {
- return readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), id.toString()));
+ else {
+ return readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0),knowledgeBaseConfig.getFsType());
}
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
}
protected List<Long> getAllFileIds() {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
index 99a631d..d38097a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
@@ -60,15 +60,10 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled
*/
public byte[] downloadFile() {
byte[] bytes;
- switch (knowledgeBaseConfig.getFsType()) {
- case "http":
- bytes = readFileFromKnowledgeBase(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid());
- break;
- case "local":
- bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0)));
- break;
- default:
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
+ if (knowledgeBaseConfig.getFsType().equals("http")) {
+ bytes = readFileFromKnowledgeBase(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid());
+ } else {
+ bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(), knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0), knowledgeBaseConfig.getFsType());
}
return decrypt(bytes);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
index c367d2d..a1927db 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
@@ -9,6 +9,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -105,8 +106,7 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
for (int i = 0; i < knowledgeBaseConfig.getFiles().size(); i++) {
try {
- byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i)));
-
+ byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i),knowledgeBaseConfig.getFsType());
switch (i) {
case 0:
asnLookupBuilder.loadAsnDataFile(new ByteArrayInputStream(bytes));
@@ -130,6 +130,8 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
return true;
}
+
+
private boolean buildKnowledgeBaseWithMeta(KnowledgeBaseConfig knowledgeBaseConfig) {
IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java
index 99b64a8..13f9a7e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java
@@ -91,8 +91,7 @@ public class GeoIpKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false);
for (int i = 0; i < knowledgeBaseConfig.getFiles().size(); i++) {
try {
- byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i)));
-
+ byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i),knowledgeBaseConfig.getFsType());
switch (i) {
case 0:
ipLookupBuilder.loadDataFile(new ByteArrayInputStream(bytes));
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
index 2e187b3..20defe3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
@@ -69,15 +69,12 @@ public class RuleKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
public void updateCache() {
if ("http".equals(knowledgeBaseConfig.getFsType())) {
requestApi();
- return;
}
- if ("local".equals(knowledgeBaseConfig.getFsType())) {
- byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath()));
+ else {
+ byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(), knowledgeBaseConfig.getFsPath(), "", knowledgeBaseConfig.getFsType());
RuleResponse ruleResponse = JSON.parseObject(new String(bytes), RuleResponse.class);
processResponse(ruleResponse);
- return;
}
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
}
private void requestApi() {
diff --git a/pom.xml b/pom.xml
index feb1b63..600c478 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
<config.version>1.3.3</config.version>
<hazelcast.version>5.1</hazelcast.version>
<quartz.version>2.3.2</quartz.version>
+ <hadoop.version>2.7.1</hadoop.version>
<!--Option config-->
<test.dependency.skip>true</test.dependency.skip>
<skip.spotless>true</skip.spotless>
@@ -76,7 +77,6 @@
<dependencyManagement>
<dependencies>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -434,6 +434,47 @@
<scope>provided</scope>
</dependency>
+ <!-- <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.7.1</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.1</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.7.1</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>-->
</dependencies>
</dependencyManagement>